feat: Implement enhanced action economy for LLM agent orchestration (#156)

- Add action economy system with free (LOOK, SPEAK) vs turn-ending (GO, WAIT, TAKE) actions
- Implement LOOK action with detailed descriptions for doors, objects, entities, directions
- Add SPEAK/ANNOUNCE speech system with room-wide and proximity-based message delivery
- Create multi-tile pathing with FOV interrupt detection (path cancels when new entity visible)
- Implement TAKE action with adjacency requirement and clear error messages
- Add conversation history and error feedback loop so agents learn from failed actions
- Create structured simulation logging for offline viewer replay
- Document offline viewer requirements in OFFLINE_VIEWER_SPEC.md
- Fix import path in 1_multi_agent_demo.py for standalone execution

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
John McCardle 2025-12-28 20:50:00 -05:00
commit 335efc5514
6 changed files with 2232 additions and 2 deletions

View file

@ -14,12 +14,15 @@ Three agents:
Each agent gets their own screenshot and VLLM query.
"""
import sys
import os
# Add the vllm_demo directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import mcrfpy
from mcrfpy import automation
import sys
import requests
import base64
import os
import random
from action_parser import parse_action

View file

@ -0,0 +1,436 @@
#!/usr/bin/env python3
"""
Enhanced Action Demo
====================
Demonstrates the enhanced action economy system:
- Free actions (LOOK, SPEAK/ANNOUNCE) vs turn-ending (MOVE, WAIT)
- Points of interest targeting for LOOK/MOVE
- Speech system with room-wide ANNOUNCE and proximity SPEAK
- Multi-tile path continuation with FOV interrupts
- Enhanced logging for offline viewer replay
This implements the turn-based LLM agent orchestration from issue #156.
"""
import sys
import os
# Add the vllm_demo directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import mcrfpy
from mcrfpy import automation
import requests
import base64
from world_graph import (
WorldGraph, Room, Door, WorldObject, Direction, AgentInfo,
create_two_room_scenario, create_button_door_scenario
)
from action_parser import parse_action
from enhanced_executor import EnhancedExecutor
from enhanced_orchestrator import EnhancedOrchestrator, EnhancedSimulationLog
# Configuration
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
SCREENSHOT_DIR = "/tmp/vllm_enhanced_demo"
LOG_PATH = "/tmp/vllm_enhanced_demo/simulation_log.json"
MAX_TURNS = 3
# Sprites
FLOOR_TILE = 0
WALL_TILE = 40
WIZARD_SPRITE = 84
KNIGHT_SPRITE = 96
RAT_SPRITE = 123
class Agent:
"""Agent with WorldGraph integration."""
def __init__(self, name: str, display_name: str, entity, world: WorldGraph):
self.name = name
self.display_name = display_name
self.entity = entity
self.world = world
self.message_history = []
@property
def pos(self) -> tuple:
return (int(self.entity.pos[0]), int(self.entity.pos[1]))
@property
def current_room(self) -> str:
room = self.world.room_at(*self.pos)
return room.name if room else None
def get_context(self, visible_agents: list) -> dict:
"""Build context for LLM query."""
room_name = self.current_room
agent_infos = [
AgentInfo(
name=a.name,
display_name=a.display_name,
position=a.pos,
is_player=(a.name == self.name)
)
for a in visible_agents
]
return {
"location": self.world.describe_room(room_name, agent_infos, self.name),
"available_actions": self.world.get_available_actions(room_name),
"recent_messages": self.message_history[-5:],
}
def file_to_base64(path: str) -> str:
"""Convert file to base64 string."""
with open(path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def llm_query(agent, screenshot_path: str, context: dict) -> str:
"""
Query VLLM for agent action with enhanced context.
Includes points of interest, action economy hints, error feedback,
and conversation history.
"""
system_prompt = f"""You are {agent.display_name} exploring a dungeon.
You receive visual and text information about your surroundings.
ACTION ECONOMY:
- LOOK <target>: Free action. Examine something, then choose another action.
- SPEAK "<message>" or ANNOUNCE "<message>": Free action (once per turn). Then choose another action.
- GO <direction>: Ends your turn. Move one tile in that direction (NORTH/SOUTH/EAST/WEST).
- TAKE <item>: Ends your turn. Pick up an item you are standing next to.
- WAIT: Ends your turn without moving.
IMPORTANT: You can only TAKE items that are adjacent to you (1 tile away). If something is far away, GO towards it first.
You can LOOK or SPEAK, then still MOVE in the same turn.
Always end your final response with: Action: <YOUR_ACTION>"""
# Build enhanced prompt
parts = [context["location"]]
# Add received messages
if context.get("messages"):
parts.append("\nMessages received this turn:")
for msg in context["messages"]:
sender = msg.get("sender", "someone")
content = msg.get("content", "")
parts.append(f' {sender} says: "{content}"')
# Add points of interest
if context.get("poi_prompt"):
parts.append(f"\n{context['poi_prompt']}")
# Add available actions
actions_str = ", ".join(context.get("available_actions", []))
parts.append(f"\nAvailable actions: {actions_str}")
# Add action economy hint
if context.get("has_spoken"):
parts.append("\n[You have already spoken this turn - you can still MOVE or WAIT]")
# Add error feedback from last failed action
if context.get("last_error"):
parts.append(f"\n[ERROR: {context['last_error']}]")
parts.append("[Your last action failed. Please try a different action.]")
# Add conversation history from this turn
if context.get("conversation_history"):
parts.append("\n[Previous attempts this turn:")
for exch in context["conversation_history"]:
action_str = f"{exch.get('action_type', '?')} {exch.get('action_args', '')}"
if exch.get("error"):
parts.append(f" - You tried: {action_str} -> FAILED: {exch['error']}")
else:
parts.append(f" - You did: {action_str}")
parts.append("]")
parts.append("\n[Screenshot attached showing your current view]")
parts.append("\nWhat do you do? Brief reasoning (1-2 sentences), then Action: <action>")
user_prompt = "\n".join(parts)
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
{"type": "image_url", "image_url": {
"url": "data:image/png;base64," + file_to_base64(screenshot_path)
}}
]
}
]
try:
resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60)
data = resp.json()
if "error" in data:
return f"[VLLM Error: {data['error']}]"
return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
except Exception as e:
return f"[Connection Error: {e}]"
def setup_scene(world: WorldGraph):
"""Create McRogueFace scene from WorldGraph."""
mcrfpy.createScene("enhanced_demo")
mcrfpy.setScene("enhanced_demo")
ui = mcrfpy.sceneUI("enhanced_demo")
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
grid = mcrfpy.Grid(
grid_size=(25, 15),
texture=texture,
pos=(5, 5),
size=(1014, 700)
)
grid.fill_color = mcrfpy.Color(20, 20, 30)
grid.zoom = 2.0
ui.append(grid)
# Initialize all as walls
for x in range(25):
for y in range(15):
p = grid.at(x, y)
p.tilesprite = WALL_TILE
p.walkable = False
p.transparent = False
# Carve rooms from WorldGraph
for room in world.rooms.values():
for rx in range(room.x, room.x + room.width):
for ry in range(room.y, room.y + room.height):
if 0 <= rx < 25 and 0 <= ry < 15:
p = grid.at(rx, ry)
p.tilesprite = FLOOR_TILE
p.walkable = True
p.transparent = True
# Place doors
for door in world.doors:
dx, dy = door.position
if 0 <= dx < 25 and 0 <= dy < 15:
p = grid.at(dx, dy)
p.tilesprite = FLOOR_TILE
p.walkable = not door.locked
p.transparent = True
# FOV layer
fov_layer = grid.add_layer('color', z_index=10)
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
return grid, fov_layer, texture
def create_agents(grid, world: WorldGraph, texture) -> list:
"""Create agents in their starting rooms."""
agents = []
# Wizard in guard_room (left)
room_a = world.rooms["guard_room"]
wizard = mcrfpy.Entity(
grid_pos=room_a.center,
texture=texture,
sprite_index=WIZARD_SPRITE
)
wizard.name = "wizard"
grid.entities.append(wizard)
agents.append(Agent("Wizard", "a wizard", wizard, world))
# Knight in armory (right)
room_b = world.rooms["armory"]
knight = mcrfpy.Entity(
grid_pos=room_b.center,
texture=texture,
sprite_index=KNIGHT_SPRITE
)
knight.name = "knight"
grid.entities.append(knight)
agents.append(Agent("Knight", "a knight", knight, world))
return agents
def add_rat(grid, world: WorldGraph, texture, position: tuple):
"""Add a rat entity at the specified position."""
rat = mcrfpy.Entity(
grid_pos=position,
texture=texture,
sprite_index=RAT_SPRITE
)
rat.name = "rat"
grid.entities.append(rat)
return rat
def run_demo():
"""Run enhanced action demo."""
print("=" * 70)
print("Enhanced Action Demo")
print("=" * 70)
print("""
Features demonstrated:
- LOOK as free action (doesn't end turn)
- SPEAK/ANNOUNCE as free action (once per turn)
- Points of interest targeting
- Enhanced logging for offline viewer
""")
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Create world
print("Creating world...")
world = create_two_room_scenario()
print(f" Rooms: {list(world.rooms.keys())}")
print(f" Objects: {list(world.objects.keys())}")
# Setup scene
print("\nSetting up scene...")
grid, fov_layer, texture = setup_scene(world)
# Create agents
print("\nCreating agents...")
agents = create_agents(grid, world, texture)
# Add a rat near the door for interest
rat = add_rat(grid, world, texture, (9, 4))
print(f" Added rat at (9, 4)")
for agent in agents:
print(f" {agent.name} at {agent.pos} in {agent.current_room}")
# Create enhanced orchestrator
print("\nInitializing enhanced orchestrator...")
orchestrator = EnhancedOrchestrator(
grid=grid,
fov_layer=fov_layer,
world=world,
agents=agents,
screenshot_dir=SCREENSHOT_DIR,
llm_query_fn=llm_query
)
# Run simulation
print(f"\nRunning simulation ({MAX_TURNS} turns)...")
log = orchestrator.run_simulation(max_turns=MAX_TURNS)
# Save enhanced log
log.save(LOG_PATH)
# Print summary
print("\n" + "=" * 70)
print("SIMULATION SUMMARY")
print("=" * 70)
for turn in range(1, orchestrator.turn_number + 1):
print(log.get_turn_summary(turn))
# Print speech log
if log.speech_log:
print("\n" + "-" * 40)
print("SPEECH LOG")
print("-" * 40)
for entry in log.speech_log:
print(f" Turn {entry['turn']}: {entry['speaker']} {entry['type']}s: \"{entry['content'][:50]}...\"")
if entry['recipients']:
print(f" -> Heard by: {', '.join(entry['recipients'])}")
print("\n" + "=" * 70)
print("Demo Complete")
print("=" * 70)
print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/")
print(f"Simulation log saved to: {LOG_PATH}")
print("\nLog structure (for offline viewer):")
print(" - metadata: simulation info")
print(" - steps[]: per-agent-turn records with:")
print(" - screenshot_path, position, room")
print(" - llm_prompt_user, llm_response")
print(" - free_actions[] (LOOK, SPEAK)")
print(" - final_action (MOVE, WAIT)")
print(" - speech_log[]: all speech events")
return True
def replay_log(log_path: str):
"""
Replay a simulation from log file.
This is a text-based preview of what the offline viewer would show.
"""
print(f"Loading simulation from: {log_path}")
try:
log = EnhancedSimulationLog.load(log_path)
except FileNotFoundError:
print(f"Log file not found: {log_path}")
return
print("\n" + "=" * 70)
print("SIMULATION REPLAY")
print("=" * 70)
print(f"Turns: {log.metadata.get('total_turns', '?')}")
print(f"Agents: {', '.join(log.metadata.get('agent_names', []))}")
print(f"Rooms: {', '.join(log.metadata.get('world_rooms', []))}")
for step in log.steps:
print(f"\n{'='*40}")
print(f"Turn {step.turn}: {step.agent_id}")
print(f"{'='*40}")
print(f"Position: {step.position_start} -> {step.position_end}")
print(f"Room: {step.room}")
if step.pending_messages:
print(f"\nMessages received:")
for msg in step.pending_messages:
print(f" {msg.get('sender')}: \"{msg.get('content', '')[:40]}...\"")
if step.llm_was_queried:
print(f"\nLLM Response (truncated):")
print(f" {step.llm_response[:200]}...")
else:
print(f"\n[Path continuation - no LLM query]")
if step.free_actions:
print(f"\nFree actions:")
for fa in step.free_actions:
print(f" - {fa['action_type']}: {fa.get('args', ())}")
status = "OK" if step.final_action_success else "FAIL"
print(f"\nFinal: {step.final_action_type} {step.final_action_args} [{status}]")
print(f" {step.final_action_message}")
# Speech summary
if log.speech_log:
print("\n" + "=" * 40)
print("ALL SPEECH")
print("=" * 40)
for entry in log.speech_log:
print(f"Turn {entry['turn']}: {entry['speaker']} -> {entry['recipients']}")
print(f" \"{entry['content']}\"")
if __name__ == "__main__":
# Check for replay mode
if len(sys.argv) > 1 and sys.argv[1] == "--replay":
log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH
replay_log(log_file)
sys.exit(0)
# Normal execution
try:
success = run_demo()
print("\nPASS" if success else "\nFAIL")
sys.exit(0 if success else 1)
except Exception as e:
import traceback
traceback.print_exc()
sys.exit(1)

View file

@ -0,0 +1,152 @@
# Offline Viewer Specification
**Status**: Planned (issue #154)
**Priority**: After core simulation features are stable
## Overview
The Offline Viewer allows users to replay stored simulation logs in McRogueFace, stepping through turn-by-turn to review:
- Each agent's perspective (FOV, camera position)
- LLM chain-of-thought reasoning
- Actions taken and their results
- Speech between agents
## Log Format
Simulation logs are stored as JSON with this structure:
```json
{
"metadata": {
"total_turns": 5,
"num_agents": 2,
"agent_names": ["Wizard", "Knight"],
"timestamp_start": "2025-01-15T10:30:00",
"timestamp_end": "2025-01-15T10:32:45",
"world_rooms": ["guard_room", "armory"],
"screenshot_dir": "/tmp/vllm_enhanced_demo"
},
"steps": [
{
"turn": 1,
"agent_id": "Wizard",
"timestamp": "2025-01-15T10:30:15",
"position_start": [5, 4],
"position_end": [6, 4],
"room": "guard_room",
"visible_entities": ["rat_123", "knight_456"],
"visible_tiles": 42,
"points_of_interest": [
{"name": "door", "direction": "east", "distance": 4}
],
"location_description": "You are in the guard room...",
"available_actions": ["GO EAST", "LOOK", "WAIT"],
"pending_messages": [],
"poi_prompt": "Points of interest:\n - a door to the armory (east)",
"screenshot_path": "/tmp/.../turn1_wizard.png",
"llm_prompt_system": "You are a wizard...",
"llm_prompt_user": "You are in the guard room...",
"llm_response": "I see a door to the east. I should explore. Action: GO EAST",
"llm_was_queried": true,
"free_actions": [
{"action_type": "LOOK", "args": ["DOOR"], "result": {"description": "A wooden door..."}}
],
"final_action_type": "GO",
"final_action_args": ["EAST"],
"final_action_success": true,
"final_action_message": "Moved east to (6, 4)",
"path_taken": [[5, 4], [6, 4]],
"path_remaining": 0
}
],
"speech_log": [
{
"turn": 2,
"speaker": "Wizard",
"type": "announce",
"content": "Hello, is anyone there?",
"recipients": ["Knight"]
}
]
}
```
## Viewer Features (Planned)
### Core Features
1. **Turn Navigation**
- Step forward/backward through turns
- Jump to specific turn number
- Auto-play at configurable speed
2. **Agent Perspective**
- Reconstruct agent's FOV from stored data
- Center camera on current agent
- Show visible entities and tiles
3. **LLM Review Panel**
- Display system prompt
- Display user prompt (context)
- Display LLM response
- Highlight parsed action
4. **Action Log**
- Show free actions (LOOK, SPEAK)
- Show final action and result
- Color-code success/failure
5. **Speech History**
- Timeline of all speech events
- Filter by agent
- Show recipients
### Implementation Notes
The viewer should:
- Load screenshots from `screenshot_path` (if available)
- OR reconstruct scene from WorldGraph + step data
- Support keyboard navigation (arrow keys)
- Display agent state in sidebar
### UI Layout (Suggested)
```
+----------------------------------+------------------+
| | Turn: 3/10 |
| Main Viewport | Agent: Wizard |
| (Agent's Perspective) | Room: armory |
| +------------------+
| | LLM Response: |
| | "I see a rat |
| | to the east. |
| | Action: LOOK |
| | AT RAT" |
+----------------------------------+------------------+
| < Prev | Turn 3 | Next > | Actions: |
| [Agent: Wizard v] | - LOOK AT RAT |
| | - GO EAST [OK] |
+----------------------------------+------------------+
```
## Files
- `enhanced_orchestrator.py` - Generates `EnhancedSimulationLog`
- `4_enhanced_action_demo.py` - Demo with `--replay` mode for text preview
- Logs stored in `/tmp/vllm_enhanced_demo/simulation_log.json`
## Future Enhancements
- Animated path replay (smooth entity movement)
- Side-by-side multi-agent view
- Diff view comparing agent perceptions
- Export to video/GIF
- Integration with annotation tools for research

View file

@ -0,0 +1,302 @@
"""
Action Economy System
=====================
Defines which actions consume turns and which are free.
Manages multi-tile pathing with FOV interruption.
Action Categories:
- FREE: LOOK, SPEAK, ANNOUNCE (don't end turn)
- FULL: MOVE, WAIT (end turn)
Constraints:
- Only ONE speech action per turn
- LOOK provides description and prompts for another action
- Multi-tile paths continue without LLM until FOV changes
"""
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Set, Dict, Any
from enum import Enum
from action_parser import Action, ActionType
class TurnCost(Enum):
"""How much of a turn an action consumes."""
FREE = "free" # Doesn't end turn
FULL = "full" # Ends turn
# Action cost mapping
ACTION_COSTS = {
ActionType.LOOK: TurnCost.FREE,
ActionType.SPEAK: TurnCost.FREE,
ActionType.ANNOUNCE: TurnCost.FREE,
ActionType.GO: TurnCost.FULL,
ActionType.WAIT: TurnCost.FULL,
ActionType.TAKE: TurnCost.FULL,
ActionType.DROP: TurnCost.FULL,
ActionType.PUSH: TurnCost.FULL,
ActionType.USE: TurnCost.FULL,
ActionType.OPEN: TurnCost.FULL,
ActionType.CLOSE: TurnCost.FULL,
ActionType.INVALID: TurnCost.FULL, # Invalid action ends turn
}
@dataclass
class TurnState:
"""
Tracks state within a single turn.
Used to enforce constraints like "only one speech per turn"
and track free actions taken before turn-ending action.
"""
has_spoken: bool = False
free_actions: List[Dict[str, Any]] = field(default_factory=list)
turn_ended: bool = False
def can_speak(self) -> bool:
"""Check if agent can still speak this turn."""
return not self.has_spoken
def record_speech(self):
"""Record that agent has spoken this turn."""
self.has_spoken = True
def record_free_action(self, action_type: str, details: Dict[str, Any]):
"""Record a free action for logging."""
self.free_actions.append({
"type": action_type,
**details
})
def end_turn(self):
"""Mark turn as ended."""
self.turn_ended = True
@dataclass
class PathState:
"""
Tracks multi-tile movement path for an agent.
When an agent decides to move to a distant location,
we store the path and continue moving without LLM calls
until the path completes or FOV changes.
"""
path: List[Tuple[int, int]] = field(default_factory=list)
current_index: int = 0
destination_description: str = "" # "the armory", "the door"
# FOV state when path was planned
visible_entities_at_start: Set[str] = field(default_factory=set)
@property
def has_path(self) -> bool:
"""Check if there's an active path."""
return len(self.path) > self.current_index
@property
def next_tile(self) -> Optional[Tuple[int, int]]:
"""Get next tile in path, or None if path complete."""
if self.has_path:
return self.path[self.current_index]
return None
@property
def remaining_tiles(self) -> int:
"""Number of tiles left in path."""
return max(0, len(self.path) - self.current_index)
def advance(self):
"""Move to next tile in path."""
if self.has_path:
self.current_index += 1
def clear(self):
"""Clear the current path."""
self.path = []
self.current_index = 0
self.destination_description = ""
self.visible_entities_at_start = set()
def should_interrupt(self, current_visible_entities: Set[str]) -> bool:
"""
Check if path should be interrupted due to FOV change.
Returns True if a NEW entity has entered the agent's FOV
since the path was planned.
"""
new_entities = current_visible_entities - self.visible_entities_at_start
return len(new_entities) > 0
@dataclass
class PointOfInterest:
"""
A targetable object/location for LOOK/MOVE actions.
Listed in LLM prompts to guide valid targeting.
"""
name: str # Short name: "door", "rat", "button"
display_name: str # Full description: "a wooden door to the east"
position: Tuple[int, int] # Tile coordinates
direction: str # Cardinal direction from agent: "north", "east"
distance: int # Manhattan distance from agent
can_look: bool = True # Can be examined with LOOK
can_move_to: bool = False # Can be targeted with GO TO
entity_id: Optional[str] = None # Entity ID if this is an entity
def get_action_cost(action: Action) -> TurnCost:
"""Get the turn cost for an action."""
return ACTION_COSTS.get(action.type, TurnCost.FULL)
def get_direction_name(from_pos: Tuple[int, int], to_pos: Tuple[int, int]) -> str:
"""Get cardinal direction name from one position to another."""
dx = to_pos[0] - from_pos[0]
dy = to_pos[1] - from_pos[1]
if abs(dx) > abs(dy):
return "east" if dx > 0 else "west"
elif abs(dy) > abs(dx):
return "south" if dy > 0 else "north"
else:
# Diagonal
ns = "south" if dy > 0 else "north"
ew = "east" if dx > 0 else "west"
return f"{ns}-{ew}"
def manhattan_distance(a: Tuple[int, int], b: Tuple[int, int]) -> int:
"""Calculate Manhattan distance between two points."""
return abs(a[0] - b[0]) + abs(a[1] - b[1])
class PointOfInterestCollector:
"""
Collects points of interest visible to an agent.
Used to populate LLM prompts with valid LOOK/MOVE targets.
"""
def __init__(self, grid, agent_pos: Tuple[int, int]):
self.grid = grid
self.agent_pos = agent_pos
self.points: List[PointOfInterest] = []
def collect_from_fov(self, world_graph=None) -> List[PointOfInterest]:
"""
Collect all points of interest visible in current FOV.
Examines:
- Entities (other agents, NPCs, items)
- Doors/exits
- Interactive objects (buttons, chests)
- Notable tiles (walls with features)
"""
self.points = []
# Collect entities
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) == self.agent_pos:
continue # Skip self
if self.grid.is_in_fov(ex, ey):
direction = get_direction_name(self.agent_pos, (ex, ey))
distance = manhattan_distance(self.agent_pos, (ex, ey))
# Try to get entity name/description
entity_name = getattr(entity, 'name', None) or f"creature"
entity_id = getattr(entity, 'id', None) or str(id(entity))
self.points.append(PointOfInterest(
name=entity_name,
display_name=f"a {entity_name} to the {direction}",
position=(ex, ey),
direction=direction,
distance=distance,
can_look=True,
can_move_to=False, # Can't move onto entities
entity_id=entity_id
))
# Collect from WorldGraph if provided
if world_graph:
self._collect_from_world_graph(world_graph)
# Sort by distance
self.points.sort(key=lambda p: p.distance)
return self.points
def _collect_from_world_graph(self, world):
"""Collect doors and objects from WorldGraph."""
agent_room = world.room_at(*self.agent_pos)
if not agent_room:
return
# Doors
for door in world.get_exits(agent_room.name):
dx, dy = door.position
if self.grid.is_in_fov(dx, dy):
direction = get_direction_name(self.agent_pos, (dx, dy))
distance = manhattan_distance(self.agent_pos, (dx, dy))
# Get destination room name
if door.room_a == agent_room.name:
dest = world.rooms.get(door.room_b)
else:
dest = world.rooms.get(door.room_a)
dest_name = dest.display_name if dest else "unknown"
lock_str = " (locked)" if door.locked else ""
self.points.append(PointOfInterest(
name="door",
display_name=f"a door to {dest_name}{lock_str} ({direction})",
position=(dx, dy),
direction=direction,
distance=distance,
can_look=True,
can_move_to=not door.locked
))
# Objects in room
for obj in world.get_objects_in_room(agent_room.name):
ox, oy = obj.position
if self.grid.is_in_fov(ox, oy):
direction = get_direction_name(self.agent_pos, (ox, oy))
distance = manhattan_distance(self.agent_pos, (ox, oy))
self.points.append(PointOfInterest(
name=obj.name,
display_name=f"{obj.display_name} ({direction})",
position=(ox, oy),
direction=direction,
distance=distance,
can_look=True,
can_move_to="pressable" not in obj.affordances # Can walk to items
))
def format_for_prompt(self) -> str:
"""Format points of interest for inclusion in LLM prompt."""
if not self.points:
return "No notable objects in view."
lines = ["Points of interest:"]
for poi in self.points:
actions = []
if poi.can_look:
actions.append(f"LOOK AT {poi.name.upper()}")
if poi.can_move_to:
actions.append(f"GO TO {poi.name.upper()}")
action_str = ", ".join(actions) if actions else "observe only"
lines.append(f" - {poi.display_name}: {action_str}")
return "\n".join(lines)

View file

@ -0,0 +1,731 @@
"""
Enhanced Action Executor
========================
Extends ActionExecutor with:
- LOOK action with detailed descriptions
- SPEAK/ANNOUNCE execution with range checking
- Multi-tile path planning
- Free action vs turn-ending action handling
"""
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict, Any, Set
from action_parser import Action, ActionType
from action_executor import ActionResult
from action_economy import (
TurnState, PathState, TurnCost, get_action_cost,
manhattan_distance, get_direction_name
)
@dataclass
class TakeResult:
"""Result of a TAKE action."""
success: bool
message: str
item_name: str
item_position: Optional[Tuple[int, int]] = None
@dataclass
class LookResult:
"""Result of a LOOK action."""
success: bool
description: str
target_name: str
target_position: Optional[Tuple[int, int]] = None
@dataclass
class SpeechResult:
"""Result of a SPEAK/ANNOUNCE action."""
success: bool
message: str
recipients: List[str] # Names of agents who received the message
speech_type: str # "announce" or "speak"
content: str # What was said
@dataclass
class Message:
"""A message received by an agent."""
sender: str
content: str
speech_type: str # "announce" or "speak"
turn: int
distance: Optional[int] = None # For SPEAK, how far away sender was
class EnhancedExecutor:
"""
Enhanced action executor with LOOK, SPEAK, and multi-tile support.
"""
# Direction vectors for movement
DIRECTION_VECTORS = {
'NORTH': (0, -1),
'SOUTH': (0, 1),
'EAST': (1, 0),
'WEST': (-1, 0),
}
# SPEAK range (Manhattan distance)
SPEAK_RANGE = 4
def __init__(self, grid, world_graph=None):
"""
Initialize executor.
Args:
grid: mcrfpy.Grid instance
world_graph: Optional WorldGraph for detailed descriptions
"""
self.grid = grid
self.world = world_graph
# Agent path states (agent_name -> PathState)
self.path_states: Dict[str, PathState] = {}
# Speech channel for message delivery
self.pending_messages: Dict[str, List[Message]] = {} # agent_name -> messages
def get_path_state(self, agent_name: str) -> PathState:
"""Get or create path state for an agent."""
if agent_name not in self.path_states:
self.path_states[agent_name] = PathState()
return self.path_states[agent_name]
def get_pending_messages(self, agent_name: str) -> List[Message]:
"""Get and clear pending messages for an agent."""
messages = self.pending_messages.get(agent_name, [])
self.pending_messages[agent_name] = []
return messages
# =========================================================================
# LOOK Action
# =========================================================================
def execute_look(self, agent, action: Action) -> LookResult:
"""
Execute LOOK action - examine a tile or entity.
Args:
agent: Agent performing the look
action: Parsed LOOK action with optional target
Returns:
LookResult with detailed description
"""
target = action.args[0] if action.args and action.args[0] else None
if target is None:
# General look around
return self._look_around(agent)
else:
# Look at specific target
return self._look_at_target(agent, target.upper())
def _look_around(self, agent) -> LookResult:
"""Describe the general surroundings."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
descriptions = []
# Describe current room
if self.world:
room = self.world.room_at(ax, ay)
if room:
descriptions.append(f"You are in {room.display_name}.")
if room.description_template and room.properties:
try:
desc = room.description_template.format(**room.properties)
descriptions.append(desc)
except KeyError:
pass
# Count visible entities
visible_count = 0
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) != (ax, ay) and self.grid.is_in_fov(ex, ey):
visible_count += 1
if visible_count > 0:
descriptions.append(f"You can see {visible_count} other creature(s) nearby.")
# Describe nearby walls/openings
wall_dirs = []
open_dirs = []
for direction, (dx, dy) in self.DIRECTION_VECTORS.items():
nx, ny = ax + dx, ay + dy
if 0 <= nx < self.grid.grid_size[0] and 0 <= ny < self.grid.grid_size[1]:
cell = self.grid.at(nx, ny)
if cell.walkable:
open_dirs.append(direction.lower())
else:
wall_dirs.append(direction.lower())
if open_dirs:
descriptions.append(f"Open passages: {', '.join(open_dirs)}.")
if wall_dirs:
descriptions.append(f"Walls to the: {', '.join(wall_dirs)}.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name="surroundings"
)
def _look_at_target(self, agent, target: str) -> LookResult:
"""Look at a specific target (direction, entity, or object name)."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
# Check if target is a direction
if target in self.DIRECTION_VECTORS:
return self._look_in_direction(agent, target)
# Check if target matches an entity
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) == (ax, ay):
continue
entity_name = getattr(entity, 'name', '').upper()
if target in entity_name or entity_name in target:
if self.grid.is_in_fov(ex, ey):
return self._describe_entity(agent, entity)
else:
return LookResult(
success=False,
description=f"You cannot see {target.lower()} from here.",
target_name=target.lower()
)
# Check WorldGraph objects
if self.world:
room = self.world.room_at(ax, ay)
if room:
for obj in self.world.get_objects_in_room(room.name):
if target in obj.name.upper() or obj.name.upper() in target:
ox, oy = obj.position
if self.grid.is_in_fov(ox, oy):
return self._describe_object(agent, obj)
# Check doors
for door in self.world.get_exits(room.name):
if "DOOR" in target:
dx, dy = door.position
if self.grid.is_in_fov(dx, dy):
return self._describe_door(agent, door)
return LookResult(
success=False,
description=f"You don't see anything called '{target.lower()}' nearby.",
target_name=target.lower()
)
def _look_in_direction(self, agent, direction: str) -> LookResult:
"""Look in a cardinal direction."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
dx, dy = self.DIRECTION_VECTORS[direction]
descriptions = []
# Scan tiles in that direction
for distance in range(1, 10):
tx, ty = ax + dx * distance, ay + dy * distance
if not (0 <= tx < self.grid.grid_size[0] and 0 <= ty < self.grid.grid_size[1]):
descriptions.append(f"The edge of the known world lies {direction.lower()}.")
break
if not self.grid.is_in_fov(tx, ty):
descriptions.append(f"Darkness obscures your vision beyond {distance} tiles.")
break
cell = self.grid.at(tx, ty)
# Check for entity at this tile
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) == (tx, ty):
entity_name = getattr(entity, 'name', 'creature')
descriptions.append(f"A {entity_name} stands {distance} tile(s) to the {direction.lower()}.")
# Check for wall
if not cell.walkable:
# Check if it's a door
if self.world:
room = self.world.room_at(ax, ay)
if room:
for door in self.world.get_exits(room.name):
if door.position == (tx, ty):
dest = self.world.rooms.get(
door.room_b if door.room_a == room.name else door.room_a
)
dest_name = dest.display_name if dest else "another area"
lock_str = " It is locked." if door.locked else ""
descriptions.append(
f"A door to {dest_name} lies {distance} tile(s) {direction.lower()}.{lock_str}"
)
break
else:
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
else:
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
else:
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
break
if not descriptions:
descriptions.append(f"Open floor extends to the {direction.lower()}.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name=direction.lower(),
target_position=None
)
def _describe_entity(self, agent, entity) -> LookResult:
"""Generate detailed description of an entity."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
ex, ey = int(entity.pos[0]), int(entity.pos[1])
entity_name = getattr(entity, 'name', 'creature')
direction = get_direction_name((ax, ay), (ex, ey))
distance = manhattan_distance((ax, ay), (ex, ey))
descriptions = [
f"You examine the {entity_name} carefully.",
f"It stands {distance} tile(s) to the {direction}."
]
# Add any entity-specific description
if hasattr(entity, 'description'):
descriptions.append(entity.description)
# Add behavior hints if available
if hasattr(entity, 'behavior'):
descriptions.append(f"It appears to be {entity.behavior}.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name=entity_name,
target_position=(ex, ey)
)
def _describe_object(self, agent, obj) -> LookResult:
"""Generate detailed description of a WorldGraph object."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
ox, oy = obj.position
direction = get_direction_name((ax, ay), (ox, oy))
distance = manhattan_distance((ax, ay), (ox, oy))
descriptions = [
f"You examine {obj.display_name}.",
f"It is {distance} tile(s) to the {direction}."
]
if obj.description:
descriptions.append(obj.description)
# Describe affordances
if "takeable" in obj.affordances:
descriptions.append("It looks small enough to pick up.")
if "pressable" in obj.affordances:
descriptions.append("It appears to be some kind of mechanism.")
if "openable" in obj.affordances:
descriptions.append("It can be opened.")
if "readable" in obj.affordances:
descriptions.append("There is writing on it.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name=obj.name,
target_position=(ox, oy)
)
def _describe_door(self, agent, door) -> LookResult:
"""Generate detailed description of a door."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
dx, dy = door.position
direction = get_direction_name((ax, ay), (dx, dy))
distance = manhattan_distance((ax, ay), (dx, dy))
# Get destination
if self.world:
current_room = self.world.room_at(ax, ay)
if current_room:
if door.room_a == current_room.name:
dest = self.world.rooms.get(door.room_b)
else:
dest = self.world.rooms.get(door.room_a)
dest_name = dest.display_name if dest else "another area"
else:
dest_name = "another area"
else:
dest_name = "another area"
descriptions = [
f"You examine the doorway to the {direction}.",
f"It leads to {dest_name}, {distance} tile(s) away."
]
if door.locked:
descriptions.append("The door is locked. You'll need a key or mechanism to open it.")
else:
descriptions.append("The passage is open.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name="door",
target_position=(dx, dy)
)
# =========================================================================
# SPEAK/ANNOUNCE Actions
# =========================================================================
def execute_speech(self, agent, action: Action, all_agents: list,
turn_number: int) -> SpeechResult:
"""
Execute SPEAK or ANNOUNCE action.
ANNOUNCE: All agents in the same room hear the message
SPEAK: Only agents within SPEAK_RANGE tiles hear the message
"""
message_content = action.args[0] if action.args else ""
if not message_content:
return SpeechResult(
success=False,
message="Nothing to say.",
recipients=[],
speech_type=action.type.value.lower(),
content=""
)
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
recipients = []
if action.type == ActionType.ANNOUNCE:
# Room-wide broadcast
recipients = self._get_agents_in_room(agent, all_agents)
speech_type = "announce"
else:
# Proximity-based speech
recipients = self._get_agents_in_range(agent, all_agents, self.SPEAK_RANGE)
speech_type = "speak"
# Deliver messages
for recipient in recipients:
if recipient.name not in self.pending_messages:
self.pending_messages[recipient.name] = []
distance = manhattan_distance(
(ax, ay),
(int(recipient.entity.pos[0]), int(recipient.entity.pos[1]))
) if speech_type == "speak" else None
self.pending_messages[recipient.name].append(Message(
sender=agent.name,
content=message_content,
speech_type=speech_type,
turn=turn_number,
distance=distance
))
recipient_names = [r.name for r in recipients]
if recipients:
return SpeechResult(
success=True,
message=f"You {speech_type}: \"{message_content}\"",
recipients=recipient_names,
speech_type=speech_type,
content=message_content
)
else:
return SpeechResult(
success=True, # Still succeeds, just nobody heard
message=f"You {speech_type} into the emptiness: \"{message_content}\"",
recipients=[],
speech_type=speech_type,
content=message_content
)
def _get_agents_in_room(self, speaker, all_agents: list) -> list:
"""Get all agents in the same room as speaker (excluding speaker)."""
if not self.world:
# Fallback: use proximity
return self._get_agents_in_range(speaker, all_agents, 20)
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
speaker_room = self.world.room_at(ax, ay)
if not speaker_room:
return []
recipients = []
for agent in all_agents:
if agent.name == speaker.name:
continue
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
agent_room = self.world.room_at(rx, ry)
if agent_room and agent_room.name == speaker_room.name:
recipients.append(agent)
return recipients
def _get_agents_in_range(self, speaker, all_agents: list, range_tiles: int) -> list:
"""Get all agents within Manhattan distance of speaker."""
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
recipients = []
for agent in all_agents:
if agent.name == speaker.name:
continue
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
if manhattan_distance((ax, ay), (rx, ry)) <= range_tiles:
recipients.append(agent)
return recipients
# =========================================================================
# TAKE Action
# =========================================================================
def execute_take(self, agent, action: Action) -> TakeResult:
"""
Execute TAKE action - pick up an item.
Items must be:
1. In the WorldGraph as a takeable object
2. Within reach (adjacent tile or same tile, distance <= 1)
3. Visible in FOV
"""
item_name = action.args[0].lower() if action.args and action.args[0] else None
if not item_name:
return TakeResult(
success=False,
message="Take what? Specify an item name.",
item_name=""
)
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
# Search for the item in WorldGraph
if not self.world:
return TakeResult(
success=False,
message="No items exist in this world.",
item_name=item_name
)
# Find matching object
matching_obj = None
for obj_name, obj in self.world.objects.items():
if item_name in obj_name.lower() or obj_name.lower() in item_name:
matching_obj = obj
break
if not matching_obj:
return TakeResult(
success=False,
message=f"You don't see any '{item_name}' here.",
item_name=item_name
)
# Check if takeable
if "takeable" not in matching_obj.affordances:
return TakeResult(
success=False,
message=f"The {matching_obj.display_name} cannot be picked up.",
item_name=item_name,
item_position=matching_obj.position
)
ox, oy = matching_obj.position
# Check if visible in FOV
if not self.grid.is_in_fov(ox, oy):
return TakeResult(
success=False,
message=f"You can't see the {matching_obj.display_name} from here.",
item_name=item_name,
item_position=(ox, oy)
)
# Check distance (must be adjacent or same tile)
distance = manhattan_distance((ax, ay), (ox, oy))
if distance > 1:
direction = get_direction_name((ax, ay), (ox, oy))
# Use name for cleaner message (display_name often has article already)
return TakeResult(
success=False,
message=f"The {matching_obj.name.replace('_', ' ')} is {distance} tiles away to the {direction}. Move closer to pick it up.",
item_name=item_name,
item_position=(ox, oy)
)
# Success! Remove from world (simplified - no inventory system yet)
del self.world.objects[matching_obj.name]
return TakeResult(
success=True,
message=f"You pick up {matching_obj.display_name}.",
item_name=matching_obj.name,
item_position=(ox, oy)
)
# =========================================================================
# Movement (single tile, delegates to original executor)
# =========================================================================
def execute_move(self, agent, action: Action) -> ActionResult:
"""
Execute single-tile movement.
This is the per-turn movement. Multi-tile paths are handled
at the orchestrator level.
"""
if not action.args or not action.args[0]:
return ActionResult(False, "No direction specified")
direction = action.args[0]
if direction not in self.DIRECTION_VECTORS:
return ActionResult(False, f"Invalid direction: {direction}")
dx, dy = self.DIRECTION_VECTORS[direction]
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
new_x, new_y = current_x + dx, current_y + dy
# Bounds check
grid_w, grid_h = self.grid.grid_size
if not (0 <= new_x < grid_w and 0 <= new_y < grid_h):
return ActionResult(False, f"Cannot go {direction} - edge of map")
# Walkability check
target_cell = self.grid.at(new_x, new_y)
if not target_cell.walkable:
return ActionResult(False, f"Cannot go {direction} - path blocked")
# Entity collision check
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if ex == new_x and ey == new_y:
return ActionResult(False, f"Cannot go {direction} - occupied")
# Execute movement
agent.entity.pos = (new_x, new_y)
return ActionResult(
success=True,
message=f"Moved {direction.lower()} to ({new_x}, {new_y})",
new_position=(new_x, new_y),
path=[(current_x, current_y), (new_x, new_y)]
)
def execute_wait(self, agent, action: Action) -> ActionResult:
"""Execute WAIT action."""
return ActionResult(True, "Waited and observed surroundings")
# =========================================================================
# Multi-tile Pathfinding
# =========================================================================
def plan_path_to(self, agent, target_pos: Tuple[int, int],
visible_entities: Set[str]) -> Optional[List[Tuple[int, int]]]:
"""
Plan a path to a target position.
Uses A* via libtcod if available, otherwise simple pathfinding.
Returns list of tiles from current position to target (excluding current).
"""
try:
from mcrfpy import libtcod
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
path = libtcod.find_path(self.grid, ax, ay, target_pos[0], target_pos[1])
if path:
# Store path state
path_state = self.get_path_state(agent.name)
path_state.path = path
path_state.current_index = 0
path_state.visible_entities_at_start = visible_entities.copy()
return path
except ImportError:
pass
return None
def continue_path(self, agent, current_visible: Set[str]) -> Optional[ActionResult]:
"""
Continue an existing multi-tile path.
Returns ActionResult if moved, None if path complete or interrupted.
"""
path_state = self.get_path_state(agent.name)
if not path_state.has_path:
return None
# Check for FOV interrupt
if path_state.should_interrupt(current_visible):
path_state.clear()
return None # Signal that LLM should be queried
# Get next tile
next_tile = path_state.next_tile
if not next_tile:
path_state.clear()
return None
# Move to next tile
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
new_x, new_y = next_tile
# Verify still walkable
target_cell = self.grid.at(new_x, new_y)
if not target_cell.walkable:
path_state.clear()
return ActionResult(False, "Path blocked - recalculating")
# Check for entity collision
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if ex == new_x and ey == new_y:
path_state.clear()
return ActionResult(False, "Path blocked by creature")
# Execute movement
agent.entity.pos = (new_x, new_y)
path_state.advance()
remaining = path_state.remaining_tiles
if remaining > 0:
msg = f"Continuing path ({remaining} tiles remaining)"
else:
msg = "Arrived at destination"
path_state.clear()
return ActionResult(
success=True,
message=msg,
new_position=(new_x, new_y),
path=[(current_x, current_y), (new_x, new_y)]
)

View file

@ -0,0 +1,606 @@
"""
Enhanced Turn Orchestrator
==========================
Extends TurnOrchestrator with:
- Action economy (free actions vs turn-ending)
- Multi-tile path continuation
- FOV interrupt detection
- Enhanced logging for offline viewer replay
"""
import json
import os
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional, Callable, Set
from datetime import datetime
from world_graph import WorldGraph, AgentInfo
from action_parser import Action, ActionType, parse_action
from action_executor import ActionResult
from action_economy import (
TurnState, PathState, TurnCost, get_action_cost,
PointOfInterestCollector, PointOfInterest
)
from enhanced_executor import EnhancedExecutor, LookResult, SpeechResult, Message, TakeResult
@dataclass
class FreeActionRecord:
"""Record of a free action taken during a turn."""
action_type: str
args: tuple
result: Dict[str, Any]
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@dataclass
class EnhancedSimulationStep:
"""
Enhanced simulation step for offline viewer replay.
Contains all data needed to reconstruct the agent's perspective
and decision-making for that turn.
"""
# Turn identification
turn: int
agent_id: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
# Agent state at start of turn
position_start: tuple = (0, 0)
room: str = ""
path_in_progress: bool = False
# FOV and perception
visible_entities: List[str] = field(default_factory=list)
visible_tiles: int = 0 # Count of visible tiles
points_of_interest: List[Dict] = field(default_factory=list)
# Context provided to LLM
location_description: str = ""
available_actions: List[str] = field(default_factory=list)
pending_messages: List[Dict] = field(default_factory=list)
poi_prompt: str = ""
# Screenshot path (for viewer to load)
screenshot_path: str = ""
# LLM interaction
llm_prompt_system: str = ""
llm_prompt_user: str = ""
llm_response: str = ""
llm_was_queried: bool = True # False if path continuation
# Conversation history (LLM queries within this turn)
llm_exchanges: List[Dict] = field(default_factory=list) # [{prompt, response, action, error}]
action_retries: int = 0 # How many times we re-prompted due to errors
# Free actions taken (LOOK, SPEAK)
free_actions: List[Dict] = field(default_factory=list)
# Turn-ending action
final_action_type: str = ""
final_action_args: tuple = ()
final_action_success: bool = False
final_action_message: str = ""
# Movement result
position_end: tuple = (0, 0)
path_taken: List[tuple] = field(default_factory=list)
path_remaining: int = 0 # Tiles left if multi-tile path
@dataclass
class EnhancedSimulationLog:
"""
Complete simulation log for offline viewer.
Designed to support:
- Turn-by-turn replay
- Per-agent perspective reconstruction
- LLM chain-of-thought review
- Speech history tracking
"""
metadata: Dict[str, Any] = field(default_factory=dict)
steps: List[EnhancedSimulationStep] = field(default_factory=list)
speech_log: List[Dict] = field(default_factory=list)
def save(self, path: str):
"""Save log to JSON file."""
data = {
"metadata": self.metadata,
"steps": [asdict(s) for s in self.steps],
"speech_log": self.speech_log
}
with open(path, 'w') as f:
json.dump(data, f, indent=2, default=str)
print(f"Enhanced simulation log saved to: {path}")
@classmethod
def load(cls, path: str) -> 'EnhancedSimulationLog':
"""Load log from JSON file."""
with open(path) as f:
data = json.load(f)
steps = []
for s in data.get("steps", []):
# Convert lists back to tuples where needed
if isinstance(s.get("position_start"), list):
s["position_start"] = tuple(s["position_start"])
if isinstance(s.get("position_end"), list):
s["position_end"] = tuple(s["position_end"])
if isinstance(s.get("final_action_args"), list):
s["final_action_args"] = tuple(s["final_action_args"])
if s.get("path_taken"):
s["path_taken"] = [tuple(p) for p in s["path_taken"]]
steps.append(EnhancedSimulationStep(**s))
return cls(
metadata=data.get("metadata", {}),
steps=steps,
speech_log=data.get("speech_log", [])
)
def get_turn_summary(self, turn: int) -> str:
"""Get summary of a specific turn for display."""
turn_steps = [s for s in self.steps if s.turn == turn]
lines = [f"=== Turn {turn} ==="]
for step in turn_steps:
lines.append(f"\n{step.agent_id}:")
lines.append(f" Position: {step.position_start} -> {step.position_end}")
if step.free_actions:
lines.append(f" Free actions: {len(step.free_actions)}")
for fa in step.free_actions:
lines.append(f" - {fa['action_type']}: {fa.get('result', {}).get('message', '')[:50]}")
status = "OK" if step.final_action_success else "FAIL"
lines.append(f" Action: {step.final_action_type} {step.final_action_args} [{status}]")
if not step.llm_was_queried:
lines.append(" (Path continuation - no LLM query)")
return "\n".join(lines)
class EnhancedOrchestrator:
"""
Enhanced turn orchestrator with action economy and improved logging.
"""
def __init__(self, grid, fov_layer, world: WorldGraph, agents: list,
screenshot_dir: str, llm_query_fn: Callable):
"""
Initialize enhanced orchestrator.
Args:
grid: mcrfpy.Grid instance
fov_layer: Color layer for FOV rendering
world: WorldGraph instance
agents: List of Agent objects
screenshot_dir: Directory for screenshots
llm_query_fn: Function(agent, screenshot_path, context) -> str
"""
self.grid = grid
self.fov_layer = fov_layer
self.world = world
self.agents = agents
self.screenshot_dir = screenshot_dir
self.llm_query_fn = llm_query_fn
self.executor = EnhancedExecutor(grid, world)
self.turn_number = 0
self.steps: List[EnhancedSimulationStep] = []
self.speech_log: List[Dict] = []
os.makedirs(screenshot_dir, exist_ok=True)
def run_simulation(self, max_turns: int = 10,
stop_condition: Callable = None) -> EnhancedSimulationLog:
"""
Run complete simulation with enhanced logging.
Args:
max_turns: Maximum number of turns
stop_condition: Optional callable(orchestrator) -> bool
Returns:
EnhancedSimulationLog for offline viewer
"""
print(f"\nStarting enhanced simulation: max {max_turns} turns")
print(f"Agents: {[a.name for a in self.agents]}")
print("=" * 60)
for turn in range(max_turns):
self.run_turn()
if stop_condition and stop_condition(self):
print(f"\nStop condition met at turn {self.turn_number}")
break
# Build log
log = EnhancedSimulationLog(
metadata={
"total_turns": self.turn_number,
"num_agents": len(self.agents),
"agent_names": [a.name for a in self.agents],
"timestamp_start": self.steps[0].timestamp if self.steps else "",
"timestamp_end": self.steps[-1].timestamp if self.steps else "",
"world_rooms": list(self.world.rooms.keys()),
"screenshot_dir": self.screenshot_dir,
},
steps=self.steps,
speech_log=self.speech_log
)
return log
def run_turn(self) -> List[EnhancedSimulationStep]:
"""Execute one full turn (all agents act once)."""
import mcrfpy
self.turn_number += 1
turn_steps = []
print(f"\n{'='*60}")
print(f"TURN {self.turn_number}")
print("=" * 60)
for agent in self.agents:
step = self._run_agent_turn(agent)
turn_steps.append(step)
self.steps.append(step)
return turn_steps
def _run_agent_turn(self, agent) -> EnhancedSimulationStep:
"""Execute one agent's turn with action economy."""
import mcrfpy
from mcrfpy import automation
print(f"\n--- {agent.name}'s Turn ---")
# Initialize step record
step = EnhancedSimulationStep(
turn=self.turn_number,
agent_id=agent.name,
position_start=agent.pos,
room=agent.current_room
)
# Check for path continuation
path_state = self.executor.get_path_state(agent.name)
current_visible = self._get_visible_entity_ids(agent)
if path_state.has_path:
# Check for FOV interrupt
if path_state.should_interrupt(current_visible):
print(f" Path interrupted: new entity in FOV")
path_state.clear()
else:
# Continue path without LLM query
result = self.executor.continue_path(agent, current_visible)
if result and result.success:
step.llm_was_queried = False
step.path_in_progress = True
step.final_action_type = "GO"
step.final_action_args = ("CONTINUE",)
step.final_action_success = True
step.final_action_message = result.message
step.position_end = result.new_position or agent.pos
step.path_taken = result.path or []
step.path_remaining = self.executor.get_path_state(agent.name).remaining_tiles
print(f" Path continuation: {result.message}")
return step
# Need LLM query - set up perspective
step.visible_entities = list(current_visible)
self._switch_perspective(agent)
mcrfpy.step(0.016)
# Take screenshot
screenshot_path = os.path.join(
self.screenshot_dir,
f"turn{self.turn_number}_{agent.name.lower()}.png"
)
automation.screenshot(screenshot_path)
step.screenshot_path = screenshot_path
# Collect points of interest
poi_collector = PointOfInterestCollector(self.grid, agent.pos)
pois = poi_collector.collect_from_fov(self.world)
step.points_of_interest = [asdict(p) for p in pois]
step.poi_prompt = poi_collector.format_for_prompt()
# Get pending messages
messages = self.executor.get_pending_messages(agent.name)
step.pending_messages = [asdict(m) for m in messages]
# Build context
visible_agents = self._get_visible_agents(agent)
context = agent.get_context(visible_agents + [agent])
step.location_description = context["location"]
step.available_actions = context["available_actions"]
# Turn state for action economy
turn_state = TurnState()
# Error feedback for retry loop
last_error = None
MAX_RETRIES = 3
# Action loop - handle free actions until turn-ending action
while not turn_state.turn_ended:
# Build prompt with current state (includes error feedback if any)
prompt = self._build_prompt(agent, context, step.poi_prompt, messages, turn_state, last_error)
step.llm_prompt_user = prompt # Store last prompt
# Query LLM
print(f" Querying LLM...")
response = self.llm_query_fn(agent, screenshot_path, {
**context,
"poi_prompt": step.poi_prompt,
"messages": [asdict(m) for m in messages],
"has_spoken": turn_state.has_spoken,
"last_error": last_error,
"conversation_history": step.llm_exchanges # Include past exchanges
})
step.llm_response = response
print(f" Response: {response[:200]}...")
# Parse action
action = parse_action(response)
cost = get_action_cost(action)
print(f" Action: {action.type.value} {action.args} (cost: {cost.value})")
# Track this exchange
exchange = {
"prompt": prompt[:500], # Truncate for storage
"response": response,
"action_type": action.type.value,
"action_args": action.args,
"error": None
}
# Execute action based on type
if action.type == ActionType.LOOK:
result = self.executor.execute_look(agent, action)
turn_state.record_free_action("LOOK", {
"target": result.target_name,
"description": result.description
})
step.free_actions.append({
"action_type": "LOOK",
"args": action.args,
"result": {"description": result.description}
})
# Provide result and continue loop for another action
context["look_result"] = result.description
last_error = None # Clear error on success
print(f" LOOK result: {result.description[:100]}...")
elif action.type in (ActionType.SPEAK, ActionType.ANNOUNCE):
if not turn_state.can_speak():
print(f" Already spoke this turn")
last_error = "You have already spoken this turn. Choose a different action."
exchange["error"] = last_error
step.action_retries += 1
if step.action_retries >= MAX_RETRIES:
# Force end turn
step.final_action_type = "WAIT"
step.final_action_args = ()
step.final_action_success = False
step.final_action_message = "Too many invalid actions - turn ended"
step.position_end = agent.pos
turn_state.end_turn()
else:
result = self.executor.execute_speech(
agent, action, self.agents, self.turn_number
)
turn_state.record_speech()
turn_state.record_free_action(action.type.value, {
"content": result.content,
"recipients": result.recipients
})
step.free_actions.append({
"action_type": action.type.value,
"args": action.args,
"result": {
"content": result.content,
"recipients": result.recipients
}
})
# Record in speech log
self.speech_log.append({
"turn": self.turn_number,
"speaker": agent.name,
"type": result.speech_type,
"content": result.content,
"recipients": result.recipients
})
last_error = None
print(f" {result.speech_type.upper()}: {result.content[:50]}... -> {result.recipients}")
# Continue loop for another action (can still move)
elif action.type == ActionType.TAKE:
result = self.executor.execute_take(agent, action)
if result.success:
step.final_action_type = "TAKE"
step.final_action_args = action.args
step.final_action_success = True
step.final_action_message = result.message
step.position_end = agent.pos
last_error = None
turn_state.end_turn()
print(f" TAKE: {result.message}")
else:
# Failed - give error feedback and let LLM try again
last_error = result.message
exchange["error"] = last_error
step.action_retries += 1
print(f" TAKE FAILED: {result.message}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = "TAKE"
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = result.message
step.position_end = agent.pos
turn_state.end_turn()
elif action.type == ActionType.GO:
result = self.executor.execute_move(agent, action)
if result.success:
step.final_action_type = "GO"
step.final_action_args = action.args
step.final_action_success = True
step.final_action_message = result.message
step.position_end = result.new_position or agent.pos
step.path_taken = result.path or []
last_error = None
turn_state.end_turn()
print(f" MOVE: {result.message}")
else:
# Failed - give error feedback
last_error = result.message
exchange["error"] = last_error
step.action_retries += 1
print(f" MOVE FAILED: {result.message}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = "GO"
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = result.message
step.position_end = agent.pos
turn_state.end_turn()
elif action.type == ActionType.WAIT:
result = self.executor.execute_wait(agent, action)
step.final_action_type = "WAIT"
step.final_action_args = ()
step.final_action_success = True
step.final_action_message = result.message
step.position_end = agent.pos
last_error = None
turn_state.end_turn()
print(f" WAIT")
elif action.type == ActionType.INVALID:
# Could not parse action - give feedback
last_error = f"Could not understand your action. Please use a valid action format like 'Action: GO EAST' or 'Action: TAKE key'."
exchange["error"] = last_error
step.action_retries += 1
print(f" INVALID ACTION: {action.args}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = "INVALID"
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = "Could not parse action"
step.position_end = agent.pos
turn_state.end_turn()
else:
# Unimplemented action type - give feedback
last_error = f"The action '{action.type.value}' is not yet supported. Try GO, TAKE, LOOK, SPEAK, or WAIT."
exchange["error"] = last_error
step.action_retries += 1
print(f" Unsupported: {action.type.value}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = action.type.value
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = f"Unsupported action: {action.type.value}"
step.position_end = agent.pos
turn_state.end_turn()
# Record exchange
step.llm_exchanges.append(exchange)
return step
def _build_prompt(self, agent, context: dict, poi_prompt: str,
messages: List[Message], turn_state: TurnState,
last_error: Optional[str] = None) -> str:
"""Build LLM prompt with current state and error feedback."""
parts = [context["location"]]
# Add messages received
if messages:
parts.append("\nMessages received:")
for msg in messages:
if msg.speech_type == "announce":
parts.append(f' {msg.sender} announces: "{msg.content}"')
else:
parts.append(f' {msg.sender} says: "{msg.content}"')
# Add points of interest
parts.append(f"\n{poi_prompt}")
# Add available actions
actions_str = ", ".join(context["available_actions"])
parts.append(f"\nAvailable actions: {actions_str}")
# Add LOOK result if we just looked
if "look_result" in context:
parts.append(f"\n[LOOK result: {context['look_result']}]")
# Add constraints
constraints = []
if turn_state.has_spoken:
constraints.append("You have already spoken this turn.")
if constraints:
parts.append(f"\nConstraints: {' '.join(constraints)}")
# Add error feedback from last action attempt
if last_error:
parts.append(f"\n[ERROR: {last_error}]")
parts.append("[Please try a different action.]")
parts.append("\nWhat do you do? Brief reasoning, then Action: <action>")
return "\n".join(parts)
def _switch_perspective(self, agent):
"""Switch grid view to agent's perspective."""
import mcrfpy
self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
self.fov_layer.apply_perspective(
entity=agent.entity,
visible=mcrfpy.Color(0, 0, 0, 0),
discovered=mcrfpy.Color(40, 40, 60, 180),
unknown=mcrfpy.Color(0, 0, 0, 255)
)
agent.entity.update_visibility()
px, py = agent.pos
self.grid.center = (px * 16 + 8, py * 16 + 8)
def _get_visible_agents(self, observer) -> list:
"""Get agents visible to observer based on FOV."""
visible = []
for agent in self.agents:
if agent.name == observer.name:
continue
ax, ay = agent.pos
if self.grid.is_in_fov(ax, ay):
visible.append(agent)
return visible
def _get_visible_entity_ids(self, agent) -> Set[str]:
"""Get set of entity IDs currently visible to agent."""
visible = set()
ax, ay = agent.pos
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if self.grid.is_in_fov(ex, ey):
entity_id = getattr(entity, 'id', None) or str(id(entity))
visible.add(entity_id)
return visible