diff --git a/tests/vllm_demo/2025-12-14_HOUR-1-PLAN.md b/tests/vllm_demo/2025-12-14_HOUR-1-PLAN.md new file mode 100644 index 0000000..912083b --- /dev/null +++ b/tests/vllm_demo/2025-12-14_HOUR-1-PLAN.md @@ -0,0 +1,391 @@ +# Hour 1: Action Parser & Executor + +**Issue**: #156 Turn-based LLM Agent Orchestration +**Goal**: Agents can actually move when they say "GO EAST" +**Parallelizable with**: Hour 2 (no dependencies) + +--- + +## Deliverables + +1. `action_parser.py` - Parse LLM text responses into structured actions +2. `action_executor.py` - Execute parsed actions in the game world +3. Modified `1_multi_agent_demo.py` - Integrate parser/executor to show movement + +--- + +## File 1: `action_parser.py` + +```python +""" +Action Parser for LLM Agent Responses +===================================== + +Extracts structured actions from free-form LLM text responses. +Handles variations like "Action: GO EAST", "I'll go east", "GO E", etc. +""" + +import re +from dataclasses import dataclass +from typing import Optional, Tuple, Any +from enum import Enum + +class ActionType(Enum): + GO = "GO" + WAIT = "WAIT" + LOOK = "LOOK" + TAKE = "TAKE" + DROP = "DROP" + PUSH = "PUSH" + USE = "USE" + OPEN = "OPEN" + CLOSE = "CLOSE" + ANNOUNCE = "ANNOUNCE" + SPEAK = "SPEAK" + INVALID = "INVALID" + +@dataclass +class Action: + type: ActionType + args: Tuple[Any, ...] = () + raw_match: str = "" + +class ActionParser: + """Parse LLM responses into structured actions.""" + + # Direction normalization + DIRECTIONS = { + 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', + 'NORTH': 'NORTH', 'SOUTH': 'SOUTH', 'EAST': 'EAST', 'WEST': 'WEST', + 'UP': 'NORTH', 'DOWN': 'SOUTH', 'LEFT': 'WEST', 'RIGHT': 'EAST', + } + + # Patterns ordered by specificity (most specific first) + PATTERNS = [ + # Explicit "Action: X" format (preferred) + (ActionType.GO, r'Action:\s*GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), + (ActionType.WAIT, r'Action:\s*WAIT\b', 0), + (ActionType.LOOK, r'Action:\s*LOOK(?:\s+AT\s+(\w+))?\b', 1), + (ActionType.TAKE, r'Action:\s*TAKE\s+(\w+)', 1), + (ActionType.DROP, r'Action:\s*DROP\s+(\w+)', 1), + (ActionType.PUSH, r'Action:\s*PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)', 2), + (ActionType.USE, r'Action:\s*USE\s+(\w+)(?:\s+ON\s+(\w+))?', 2), + (ActionType.OPEN, r'Action:\s*OPEN\s+(\w+)', 1), + (ActionType.CLOSE, r'Action:\s*CLOSE\s+(\w+)', 1), + (ActionType.ANNOUNCE, r'Action:\s*ANNOUNCE\s+["\'](.+?)["\']', 1), + (ActionType.SPEAK, r'Action:\s*SPEAK\s+["\'](.+?)["\']', 1), + + # Fallback patterns (less strict) + (ActionType.GO, r'\bGO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), + (ActionType.GO, r'\bmove\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), + (ActionType.GO, r'\bhead\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), + (ActionType.WAIT, r'\bWAIT\b', 0), + (ActionType.LOOK, r'\bLOOK\b', 0), + ] + + def parse(self, llm_response: str) -> Action: + """ + Parse an LLM response and extract the action. + + Returns Action with type=INVALID if no valid action found. + """ + # Normalize to uppercase for matching + text = llm_response.upper() + + for action_type, pattern, num_groups in self.PATTERNS: + match = re.search(pattern, text, re.IGNORECASE) + if match: + args = self._extract_args(match, num_groups, action_type) + return Action( + type=action_type, + args=args, + raw_match=match.group(0) + ) + + # No valid action found + return Action( + type=ActionType.INVALID, + args=(llm_response[:100],), # First 100 chars for debugging + raw_match="" + ) + + def _extract_args(self, match, num_groups: int, action_type: ActionType) -> tuple: + """Extract and normalize arguments from regex match.""" + if num_groups == 0: + return () + + args = [] + for i in range(1, num_groups + 1): + group = match.group(i) + if group: + # Normalize directions + if action_type == ActionType.GO or (action_type == ActionType.PUSH and i == 2): + group = self.DIRECTIONS.get(group.upper(), group.upper()) + args.append(group) + else: + args.append(None) + + return tuple(args) + + +# Convenience function +def parse_action(llm_response: str) -> Action: + """Parse an LLM response into an Action.""" + return ActionParser().parse(llm_response) +``` + +--- + +## File 2: `action_executor.py` + +```python +""" +Action Executor for McRogueFace +=============================== + +Executes parsed actions in the game world. +Handles movement, collision detection, and action results. +""" + +from dataclasses import dataclass +from typing import Optional, List, Tuple +from action_parser import Action, ActionType + +@dataclass +class ActionResult: + success: bool + message: str + new_position: Optional[Tuple[int, int]] = None + path: Optional[List[Tuple[int, int]]] = None # For animation replay + +class ActionExecutor: + """Execute actions in the McRogueFace game world.""" + + # Direction vectors + DIRECTION_VECTORS = { + 'NORTH': (0, -1), + 'SOUTH': (0, 1), + 'EAST': (1, 0), + 'WEST': (-1, 0), + } + + def __init__(self, grid): + """ + Initialize executor with a grid reference. + + Args: + grid: mcrfpy.Grid instance + """ + self.grid = grid + + def execute(self, agent, action: Action) -> ActionResult: + """ + Execute an action for an agent. + + Args: + agent: Agent wrapper with .entity attribute + action: Parsed Action to execute + + Returns: + ActionResult with success status and message + """ + handlers = { + ActionType.GO: self._execute_go, + ActionType.WAIT: self._execute_wait, + ActionType.LOOK: self._execute_look, + ActionType.TAKE: self._execute_take, + ActionType.DROP: self._execute_drop, + ActionType.INVALID: self._execute_invalid, + } + + handler = handlers.get(action.type, self._execute_unimplemented) + return handler(agent, action) + + def _execute_go(self, agent, action: Action) -> ActionResult: + """Execute movement in a direction.""" + if not action.args or not action.args[0]: + return ActionResult(False, "No direction specified") + + direction = action.args[0] + if direction not in self.DIRECTION_VECTORS: + return ActionResult(False, f"Invalid direction: {direction}") + + dx, dy = self.DIRECTION_VECTORS[direction] + + # Get current position + current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + new_x, new_y = current_x + dx, current_y + dy + + # Check bounds + grid_w, grid_h = self.grid.grid_size + if not (0 <= new_x < grid_w and 0 <= new_y < grid_h): + return ActionResult(False, f"Cannot go {direction} - edge of map") + + # Check walkability + target_cell = self.grid.at(new_x, new_y) + if not target_cell.walkable: + return ActionResult(False, f"Cannot go {direction} - path blocked") + + # Check for entity collision (optional - depends on game rules) + for entity in self.grid.entities: + if entity is agent.entity: + continue + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if ex == new_x and ey == new_y: + return ActionResult(False, f"Cannot go {direction} - someone is there") + + # Execute movement + agent.entity.grid_pos = (new_x, new_y) + + return ActionResult( + success=True, + message=f"Moved {direction.lower()} to ({new_x}, {new_y})", + new_position=(new_x, new_y), + path=[(current_x, current_y), (new_x, new_y)] + ) + + def _execute_wait(self, agent, action: Action) -> ActionResult: + """Execute wait action (no-op).""" + return ActionResult(True, "Waited and observed surroundings") + + def _execute_look(self, agent, action: Action) -> ActionResult: + """Execute look action - returns enhanced observation.""" + target = action.args[0] if action.args else None + if target: + return ActionResult(True, f"Examined {target} closely") + return ActionResult(True, "Looked around carefully") + + def _execute_take(self, agent, action: Action) -> ActionResult: + """Execute take action (placeholder).""" + item = action.args[0] if action.args else "unknown" + # TODO: Implement inventory system + return ActionResult(False, f"Cannot take {item} - not implemented yet") + + def _execute_drop(self, agent, action: Action) -> ActionResult: + """Execute drop action (placeholder).""" + item = action.args[0] if action.args else "unknown" + return ActionResult(False, f"Cannot drop {item} - not implemented yet") + + def _execute_invalid(self, agent, action: Action) -> ActionResult: + """Handle invalid/unparseable action.""" + return ActionResult(False, f"Could not understand action: {action.args[0]}") + + def _execute_unimplemented(self, agent, action: Action) -> ActionResult: + """Handle unimplemented action types.""" + return ActionResult(False, f"Action {action.type.value} not yet implemented") +``` + +--- + +## Modifications to `1_multi_agent_demo.py` + +Add these changes after the existing `query_agent` function: + +```python +# Add imports at top +from action_parser import parse_action +from action_executor import ActionExecutor, ActionResult + +# In run_demo(), after setup_scene(): +executor = ActionExecutor(grid) + +# Replace the agent loop with: +for i, agent in enumerate(agents): + print(f"\n{'='*70}") + print(f"Agent {i+1}/3: {agent.name} ({agent.description})") + print(f"Position: {agent.pos}") + print("=" * 70) + + # Switch to this agent's perspective + switch_perspective(grid, fov_layer, agent) + mcrfpy.step(0.016) + + # Take screenshot + screenshot_path = os.path.join(SCREENSHOT_DIR, f"{i}_{agent.name.lower()}_view.png") + result = automation.screenshot(screenshot_path) + if not result: + print(f"ERROR: Failed to take screenshot for {agent.name}") + continue + + # Get visible entities and query VLLM + visible = get_visible_entities(grid, agent, agents, rat) + grounded_text = build_grounded_prompt(visible) + print(f"Grounded observations: {grounded_text}") + + print(f"\nQuerying VLLM for {agent.name}...") + response = query_agent(agent, screenshot_path, grounded_text) + print(f"\n{agent.name}'s Response:\n{response}") + + # NEW: Parse and execute action + print(f"\n--- Action Execution ---") + action = parse_action(response) + print(f"Parsed action: {action.type.value} {action.args}") + + result = executor.execute(agent, action) + if result.success: + print(f"SUCCESS: {result.message}") + if result.new_position: + # Update perspective after movement + switch_perspective(grid, fov_layer, agent) + mcrfpy.step(0.016) + else: + print(f"FAILED: {result.message}") +``` + +--- + +## Testing + +### Unit test for parser (`test_action_parser.py`): + +```python +from action_parser import parse_action, ActionType + +def test_parser(): + # Explicit format + assert parse_action("Action: GO NORTH").type == ActionType.GO + assert parse_action("Action: GO NORTH").args == ("NORTH",) + + # Short directions + assert parse_action("Action: GO E").args == ("EAST",) + + # Case insensitive + assert parse_action("action: go south").type == ActionType.GO + + # Fallback patterns + assert parse_action("I think I'll GO WEST").type == ActionType.GO + + # Wait and Look + assert parse_action("Action: WAIT").type == ActionType.WAIT + assert parse_action("Action: LOOK").type == ActionType.LOOK + + # Invalid + assert parse_action("I'm not sure what to do").type == ActionType.INVALID + + print("All parser tests passed!") + +if __name__ == "__main__": + test_parser() +``` + +--- + +## Success Criteria + +- [ ] `action_parser.py` correctly parses all GO directions (N/S/E/W and full names) +- [ ] `action_parser.py` handles WAIT, LOOK, and INVALID cases +- [ ] `action_executor.py` moves entities when GO succeeds +- [ ] `action_executor.py` returns failure message when path is blocked +- [ ] Modified demo shows "Moved east to (5, 7)" style output +- [ ] Entities visibly change position between turns + +--- + +## Notes for Integration (Hour 3) + +The `ActionExecutor` will be enhanced in Hour 3 to: +- Use `WorldGraph` for room-based movement (GO NORTH = walk through door to next room) +- Support multi-tile pathfinding for room transitions +- Return path data for animation replay + +Keep the current single-tile movement as the foundation. diff --git a/tests/vllm_demo/2025-12-14_HOUR-2-PLAN.md b/tests/vllm_demo/2025-12-14_HOUR-2-PLAN.md new file mode 100644 index 0000000..7ddb723 --- /dev/null +++ b/tests/vllm_demo/2025-12-14_HOUR-2-PLAN.md @@ -0,0 +1,684 @@ +# Hour 2: WorldGraph Foundation + +**Issue**: #155 Deterministic Text Descriptions From Room Graph +**Goal**: Structured room data that generates both tilemaps AND text descriptions +**Parallelizable with**: Hour 1 (no dependencies) + +--- + +## Deliverables + +1. `world_graph.py` - Core data structures and description generation +2. `test_world_graph.py` - Unit tests for WorldGraph functionality +3. Example scenario: two connected rooms with a door + +--- + +## File 1: `world_graph.py` + +```python +""" +WorldGraph: Room-based World Representation +============================================ + +Provides dual-purpose data structures for: +1. Generating 2D tilemaps (visual representation) +2. Generating text descriptions (LLM context) + +Ensures deterministic text output: same state = same description. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple, Any +from enum import Enum + + +class Direction(Enum): + NORTH = "north" + SOUTH = "south" + EAST = "east" + WEST = "west" + + @property + def opposite(self) -> 'Direction': + opposites = { + Direction.NORTH: Direction.SOUTH, + Direction.SOUTH: Direction.NORTH, + Direction.EAST: Direction.WEST, + Direction.WEST: Direction.EAST, + } + return opposites[self] + + @property + def vector(self) -> Tuple[int, int]: + vectors = { + Direction.NORTH: (0, -1), + Direction.SOUTH: (0, 1), + Direction.EAST: (1, 0), + Direction.WEST: (-1, 0), + } + return vectors[self] + + +@dataclass +class Room: + """A room in the world graph.""" + name: str # Internal ID: "kitchen", "guard_room" + display_name: str # Text output: "the kitchen", "a dimly lit guard room" + bounds: Tuple[int, int, int, int] # (x, y, width, height) in tile coords + properties: Dict[str, Any] = field(default_factory=dict) # {"lit": True, "temperature": "warm"} + description_template: Optional[str] = None # "A {temperature} room with {features}." + + @property + def x(self) -> int: + return self.bounds[0] + + @property + def y(self) -> int: + return self.bounds[1] + + @property + def width(self) -> int: + return self.bounds[2] + + @property + def height(self) -> int: + return self.bounds[3] + + @property + def center(self) -> Tuple[int, int]: + return (self.x + self.width // 2, self.y + self.height // 2) + + def contains(self, x: int, y: int) -> bool: + """Check if a tile coordinate is within this room.""" + return (self.x <= x < self.x + self.width and + self.y <= y < self.y + self.height) + + +@dataclass +class Door: + """A connection between two rooms.""" + room_a: str # Room name + room_b: str # Room name + position: Tuple[int, int] # Tile position of the door + direction_from_a: Direction # Direction from room_a to reach room_b + locked: bool = False + key_id: Optional[str] = None # Which key unlocks this door + + @property + def direction_from_b(self) -> Direction: + return self.direction_from_a.opposite + + +@dataclass +class WorldObject: + """An interactable object in the world.""" + name: str # Internal ID: "brass_key" + display_name: str # Text output: "a brass key" + room: str # Which room contains it + position: Tuple[int, int] # Tile position (or None if carried) + affordances: List[str] = field(default_factory=list) # ["takeable", "unlocks:pantry_door"] + description: str = "" # "A tarnished brass key with ornate handle." + + +@dataclass +class AgentInfo: + """Information about an agent for description purposes.""" + name: str # "Wizard", "Knight" + display_name: str # "a wizard", "the knight" + position: Tuple[int, int] # Current tile position + is_player: bool = False # Is this the observing agent? + + +class WorldGraph: + """ + Graph-based world representation. + + Provides: + - Room/door/object storage + - Deterministic text description generation + - Spatial queries (what room is at x,y?) + - Available action enumeration + """ + + def __init__(self): + self.rooms: Dict[str, Room] = {} + self.doors: List[Door] = [] + self.objects: Dict[str, WorldObject] = {} + + # ========================================================================= + # Building the World + # ========================================================================= + + def add_room(self, room: Room) -> None: + """Add a room to the world.""" + self.rooms[room.name] = room + + def add_door(self, door: Door) -> None: + """Add a door connecting two rooms.""" + self.doors.append(door) + + def add_object(self, obj: WorldObject) -> None: + """Add an object to the world.""" + self.objects[obj.name] = obj + + # ========================================================================= + # Spatial Queries + # ========================================================================= + + def room_at(self, x: int, y: int) -> Optional[Room]: + """Get the room containing a tile coordinate.""" + for room in self.rooms.values(): + if room.contains(x, y): + return room + return None + + def get_exits(self, room_name: str) -> List[Door]: + """Get all doors leading out of a room.""" + exits = [] + for door in self.doors: + if door.room_a == room_name or door.room_b == room_name: + exits.append(door) + return exits + + def get_door_in_direction(self, room_name: str, direction: Direction) -> Optional[Door]: + """Get the door in a specific direction from a room.""" + for door in self.doors: + if door.room_a == room_name and door.direction_from_a == direction: + return door + if door.room_b == room_name and door.direction_from_b == direction: + return door + return None + + def get_objects_in_room(self, room_name: str) -> List[WorldObject]: + """Get all objects in a room.""" + return [obj for obj in self.objects.values() if obj.room == room_name] + + # ========================================================================= + # Text Description Generation (Deterministic!) + # ========================================================================= + + def describe_room(self, room_name: str, + visible_agents: List[AgentInfo] = None, + observer_name: str = None) -> str: + """ + Generate a complete room description. + + Args: + room_name: The room to describe + visible_agents: List of agents visible in the room + observer_name: Name of the observing agent (excluded from description) + + Returns: + Deterministic prose description of the room + """ + room = self.rooms.get(room_name) + if not room: + return "You are in an unknown location." + + parts = [] + + # Base location + parts.append(f"You are in {room.display_name}.") + + # Room template description (if any) + if room.description_template and room.properties: + try: + desc = room.description_template.format(**room.properties) + parts.append(desc) + except KeyError: + pass + + # Visible agents + if visible_agents: + agent_desc = self._describe_agents(visible_agents, observer_name) + if agent_desc: + parts.append(agent_desc) + + # Objects on the ground + objects = self.get_objects_in_room(room_name) + if objects: + obj_desc = self._describe_objects(objects) + parts.append(obj_desc) + + # Exits + exits = self.get_exits(room_name) + parts.append(self._describe_exits(room_name, exits)) + + return " ".join(parts) + + def _describe_agents(self, agents: List[AgentInfo], observer_name: str = None) -> str: + """Describe visible agents (excluding observer).""" + others = [a for a in agents if a.name != observer_name and not a.is_player] + if not others: + return "" + + if len(others) == 1: + return f"You see {others[0].display_name} here." + else: + names = [a.display_name for a in others] + formatted = ", ".join(names[:-1]) + f" and {names[-1]}" + return f"You see {formatted} here." + + def _describe_objects(self, objects: List[WorldObject]) -> str: + """Describe objects in the room.""" + if not objects: + return "" + + # Group by affordance for natural description + takeable = [o for o in objects if "takeable" in o.affordances] + furniture = [o for o in objects if "takeable" not in o.affordances] + + parts = [] + if takeable: + if len(takeable) == 1: + parts.append(f"On the ground you see {takeable[0].display_name}.") + else: + names = [o.display_name for o in takeable] + formatted = ", ".join(names[:-1]) + f" and {names[-1]}" + parts.append(f"On the ground you see {formatted}.") + + if furniture: + for obj in furniture: + parts.append(f"There is {obj.display_name} here.") + + return " ".join(parts) + + def _describe_exits(self, room_name: str, exits: List[Door]) -> str: + """Describe available exits.""" + if not exits: + return "There are no visible exits." + + exit_parts = [] + for door in exits: + # Determine direction and destination from this room's perspective + if door.room_a == room_name: + direction = door.direction_from_a.value + dest_room = self.rooms.get(door.room_b) + else: + direction = door.direction_from_b.value + dest_room = self.rooms.get(door.room_a) + + dest_name = dest_room.display_name if dest_room else "unknown" + + if door.locked: + exit_parts.append(f"{direction} ({dest_name}, locked)") + else: + exit_parts.append(f"{direction} ({dest_name})") + + # Sort for deterministic output + exit_parts.sort() + + return "Exits: " + ", ".join(exit_parts) + "." + + # ========================================================================= + # Action Enumeration + # ========================================================================= + + def get_available_actions(self, room_name: str, + can_speak: bool = True) -> List[str]: + """ + Get list of available actions for an agent in a room. + + Returns list of action strings like: + ["GO NORTH", "GO EAST", "TAKE brass_key", "WAIT", "LOOK"] + """ + actions = ["LOOK", "WAIT"] + + # Movement actions + for door in self.get_exits(room_name): + if door.room_a == room_name: + direction = door.direction_from_a.value.upper() + else: + direction = door.direction_from_b.value.upper() + + if not door.locked: + actions.append(f"GO {direction}") + else: + # Could add UNLOCK action here if agent has key + pass + + # Object interactions + for obj in self.get_objects_in_room(room_name): + if "takeable" in obj.affordances: + actions.append(f"TAKE {obj.name}") + if "pushable" in obj.affordances: + actions.append(f"PUSH {obj.name} ") + if "openable" in obj.affordances: + actions.append(f"OPEN {obj.name}") + if "readable" in obj.affordances: + actions.append(f"READ {obj.name}") + + # Speech actions + if can_speak: + actions.append("ANNOUNCE ''") + actions.append("SPEAK ''") + + return sorted(actions) + + +# ============================================================================= +# Factory Functions for Common Scenarios +# ============================================================================= + +def create_two_room_scenario() -> WorldGraph: + """ + Create a simple two-room test scenario. + + Layout: + +--------+ +--------+ + | Room A |===| Room B | + | (west) | | (east) | + +--------+ +--------+ + + Room A: "the guard room" - contains a brass key + Room B: "the armory" - destination room + Door: unlocked, between rooms + """ + world = WorldGraph() + + # Room A (left side) + room_a = Room( + name="guard_room", + display_name="the guard room", + bounds=(1, 1, 8, 8), # x, y, width, height + properties={"lit": True, "atmosphere": "musty"}, + description_template="The air is {atmosphere}." + ) + world.add_room(room_a) + + # Room B (right side) + room_b = Room( + name="armory", + display_name="the armory", + bounds=(11, 1, 8, 8), + properties={"lit": True, "atmosphere": "cold"}, + description_template="Weapon racks line the walls." + ) + world.add_room(room_b) + + # Door connecting them + door = Door( + room_a="guard_room", + room_b="armory", + position=(9, 4), # Between the rooms + direction_from_a=Direction.EAST, + locked=False + ) + world.add_door(door) + + # Object in Room A + key = WorldObject( + name="brass_key", + display_name="a brass key", + room="guard_room", + position=(3, 3), + affordances=["takeable", "unlocks:dungeon_door"], + description="A tarnished brass key with an ornate handle." + ) + world.add_object(key) + + return world + + +def create_button_door_scenario() -> WorldGraph: + """ + Create the Phase 1 scenario from issue #154. + + Layout: + +----------+ +----------+ + | Room A | | Room B | + | [Button] |===| [Goal] | + | Agent A | | Agent B | + +----------+ +----------+ + + - Door starts locked + - Button in Room A unlocks the door + - Agent A can reach button; Agent B's goal is blocked by door + - Success: Agents coordinate to solve puzzle + """ + world = WorldGraph() + + # Room A (button room) + room_a = Room( + name="button_room", + display_name="the button room", + bounds=(1, 1, 8, 8), + properties={"lit": True} + ) + world.add_room(room_a) + + # Room B (goal room) + room_b = Room( + name="goal_room", + display_name="the goal room", + bounds=(11, 1, 8, 8), + properties={"lit": True} + ) + world.add_room(room_b) + + # Locked door + door = Door( + room_a="button_room", + room_b="goal_room", + position=(9, 4), + direction_from_a=Direction.EAST, + locked=True, + key_id="button_mechanism" + ) + world.add_door(door) + + # Button in Room A + button = WorldObject( + name="wall_button", + display_name="a large button on the wall", + room="button_room", + position=(2, 4), + affordances=["pressable", "activates:main_door"], + description="A heavy stone button protrudes from the wall." + ) + world.add_object(button) + + # Goal marker in Room B + goal = WorldObject( + name="goal_marker", + display_name="a glowing rune on the floor", + room="goal_room", + position=(15, 4), + affordances=["examinable"], + description="An arcane symbol pulses with soft light." + ) + world.add_object(goal) + + return world +``` + +--- + +## File 2: `test_world_graph.py` + +```python +""" +Unit tests for WorldGraph +""" + +from world_graph import ( + WorldGraph, Room, Door, WorldObject, Direction, + AgentInfo, create_two_room_scenario, create_button_door_scenario +) + +def test_room_contains(): + """Test room boundary checking.""" + room = Room("test", "test room", bounds=(5, 5, 10, 10)) + + assert room.contains(5, 5) == True # Top-left corner + assert room.contains(14, 14) == True # Bottom-right (exclusive) + assert room.contains(15, 15) == False # Outside + assert room.contains(4, 5) == False # Just outside left + + print("PASS: room_contains") + +def test_room_at(): + """Test spatial room lookup.""" + world = create_two_room_scenario() + + # Guard room is at (1,1) with size (8,8) + room = world.room_at(3, 3) + assert room is not None + assert room.name == "guard_room" + + # Armory is at (11,1) with size (8,8) + room = world.room_at(13, 3) + assert room is not None + assert room.name == "armory" + + # Between rooms (the door area) - should return None + room = world.room_at(9, 4) + assert room is None + + print("PASS: room_at") + +def test_describe_room_basic(): + """Test basic room description.""" + world = create_two_room_scenario() + + desc = world.describe_room("guard_room") + + assert "You are in the guard room" in desc + assert "brass key" in desc + assert "Exits:" in desc + assert "east" in desc + assert "armory" in desc + + print("PASS: describe_room_basic") + print(f" Output: {desc}") + +def test_describe_room_with_agents(): + """Test room description with visible agents.""" + world = create_two_room_scenario() + + agents = [ + AgentInfo("Wizard", "a wizard", (3, 3)), + AgentInfo("Knight", "a knight", (4, 4)), + ] + + desc = world.describe_room("guard_room", visible_agents=agents, observer_name="Wizard") + + assert "knight" in desc.lower() + assert "wizard" not in desc.lower() # Observer excluded + + print("PASS: describe_room_with_agents") + print(f" Output: {desc}") + +def test_describe_locked_door(): + """Test that locked doors are described correctly.""" + world = create_button_door_scenario() + + desc = world.describe_room("button_room") + + assert "locked" in desc.lower() + + print("PASS: describe_locked_door") + print(f" Output: {desc}") + +def test_available_actions(): + """Test action enumeration.""" + world = create_two_room_scenario() + + actions = world.get_available_actions("guard_room") + + assert "GO EAST" in actions + assert "TAKE brass_key" in actions + assert "LOOK" in actions + assert "WAIT" in actions + + print("PASS: available_actions") + print(f" Actions: {actions}") + +def test_determinism(): + """Test that descriptions are deterministic.""" + world = create_two_room_scenario() + + desc1 = world.describe_room("guard_room") + desc2 = world.describe_room("guard_room") + desc3 = world.describe_room("guard_room") + + assert desc1 == desc2 == desc3, "Descriptions must be deterministic!" + + print("PASS: determinism") + +def test_direction_opposites(): + """Test direction opposite calculation.""" + assert Direction.NORTH.opposite == Direction.SOUTH + assert Direction.SOUTH.opposite == Direction.NORTH + assert Direction.EAST.opposite == Direction.WEST + assert Direction.WEST.opposite == Direction.EAST + + print("PASS: direction_opposites") + +def run_all_tests(): + """Run all WorldGraph tests.""" + print("=" * 50) + print("WorldGraph Unit Tests") + print("=" * 50) + + test_room_contains() + test_room_at() + test_describe_room_basic() + test_describe_room_with_agents() + test_describe_locked_door() + test_available_actions() + test_determinism() + test_direction_opposites() + + print("=" * 50) + print("All tests passed!") + print("=" * 50) + +if __name__ == "__main__": + run_all_tests() +``` + +--- + +## Example Output + +When `describe_room("guard_room")` is called: + +``` +You are in the guard room. The air is musty. On the ground you see a brass key. +Exits: east (the armory). +``` + +When `describe_room("button_room")` with locked door: + +``` +You are in the button room. There is a large button on the wall here. +Exits: east (the goal room, locked). +``` + +--- + +## Success Criteria + +- [ ] `Room`, `Door`, `WorldObject` dataclasses defined with all fields +- [ ] `WorldGraph.room_at(x, y)` returns correct room +- [ ] `WorldGraph.describe_room()` produces IF-style prose +- [ ] Descriptions include visible agents, objects, and exits +- [ ] Locked doors are marked as "(locked)" in exit descriptions +- [ ] `get_available_actions()` returns appropriate action list +- [ ] All tests pass +- [ ] Output is deterministic (same input = same output) + +--- + +## Notes for Integration (Hour 3) + +The `WorldGraph` will be integrated with the demo by: + +1. Creating a scenario using factory functions +2. Calling `world.room_at(agent.x, agent.y)` to get current room +3. Calling `world.describe_room()` instead of ad-hoc `build_grounded_prompt()` +4. Including `world.get_available_actions()` in the LLM prompt + +The tilemap generation (`generate_tilemap()`) is a stretch goal - the manual tile setup from the current demos works fine for now. diff --git a/tests/vllm_demo/2025-12-14_HOUR-3-4-PLAN.md b/tests/vllm_demo/2025-12-14_HOUR-3-4-PLAN.md new file mode 100644 index 0000000..0b5e988 --- /dev/null +++ b/tests/vllm_demo/2025-12-14_HOUR-3-4-PLAN.md @@ -0,0 +1,906 @@ +# Hours 3-4: Integration and Multi-Turn Demo + +**Issues**: #154, #155, #156 (integration) +**Goal**: Complete turn-based simulation with proper context and logging +**Dependencies**: Hour 1 (Action Parser/Executor), Hour 2 (WorldGraph) + +--- + +## Hour 3: Integration + +### Goal +Wire WorldGraph into the demo so agents receive proper IF-style descriptions. + +### Deliverables + +1. `2_integrated_demo.py` - New demo combining WorldGraph + Action execution +2. Enhanced `ActionExecutor` with room-aware movement + +--- + +### File: `2_integrated_demo.py` + +```python +#!/usr/bin/env python3 +""" +Integrated VLLM Demo +==================== + +Combines: +- WorldGraph for structured room descriptions (#155) +- Action parsing and execution (#156) +- Per-agent perspective rendering + +This is the foundation for multi-turn simulation. +""" + +import mcrfpy +from mcrfpy import automation +import sys +import os +import requests +import base64 + +from world_graph import WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, create_two_room_scenario +from action_parser import parse_action, ActionType +from action_executor import ActionExecutor + +# Configuration +VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" +SCREENSHOT_DIR = "/tmp/vllm_integrated" + +# Sprite constants +FLOOR_TILE = 0 +WALL_TILE = 40 +WIZARD_SPRITE = 84 +KNIGHT_SPRITE = 96 + + +class Agent: + """Agent wrapper with WorldGraph integration.""" + + def __init__(self, name: str, display_name: str, entity, world: WorldGraph): + self.name = name + self.display_name = display_name + self.entity = entity + self.world = world + self.message_history = [] # For speech system + + @property + def pos(self) -> tuple: + return (int(self.entity.pos[0]), int(self.entity.pos[1])) + + @property + def current_room(self) -> str: + room = self.world.room_at(*self.pos) + return room.name if room else None + + def get_context(self, visible_agents: list) -> dict: + """Build complete context for LLM query.""" + room_name = self.current_room + + # Convert to AgentInfo for WorldGraph + agent_infos = [ + AgentInfo(a.name, a.display_name, a.pos, is_player=(a.name == self.name)) + for a in visible_agents + ] + + return { + "location": self.world.describe_room( + room_name, + visible_agents=agent_infos, + observer_name=self.name + ), + "available_actions": self.world.get_available_actions(room_name), + "recent_messages": self.message_history[-5:], + } + + +def file_to_base64(file_path): + with open(file_path, 'rb') as f: + return base64.b64encode(f.read()).decode('utf-8') + + +def llm_chat_completion(messages: list): + try: + response = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) + return response.json() + except requests.exceptions.RequestException as e: + return {"error": str(e)} + + +def message_with_image(text, image_path): + image_data = file_to_base64(image_path) + return { + "role": "user", + "content": [ + {"type": "text", "text": text}, + {"type": "image_url", "image_url": {"url": "data:image/png;base64," + image_data}} + ] + } + + +def setup_scene(world: WorldGraph): + """Create scene from WorldGraph.""" + mcrfpy.createScene("integrated_demo") + mcrfpy.setScene("integrated_demo") + ui = mcrfpy.sceneUI("integrated_demo") + + texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) + + # Create grid sized for the world + grid = mcrfpy.Grid( + grid_size=(25, 15), + texture=texture, + pos=(5, 5), + size=(1014, 700) + ) + grid.fill_color = mcrfpy.Color(20, 20, 30) + grid.zoom = 2.0 + ui.append(grid) + + # Initialize all as walls + for x in range(25): + for y in range(15): + point = grid.at(x, y) + point.tilesprite = WALL_TILE + point.walkable = False + point.transparent = False + + # Carve out rooms from WorldGraph + for room in world.rooms.values(): + for rx in range(room.x, room.x + room.width): + for ry in range(room.y, room.y + room.height): + if 0 <= rx < 25 and 0 <= ry < 15: + point = grid.at(rx, ry) + point.tilesprite = FLOOR_TILE + point.walkable = True + point.transparent = True + + # Place doors + for door in world.doors: + dx, dy = door.position + if 0 <= dx < 25 and 0 <= dy < 15: + point = grid.at(dx, dy) + point.tilesprite = FLOOR_TILE + point.walkable = not door.locked + point.transparent = True + + # Create FOV layer + fov_layer = grid.add_layer('color', z_index=10) + fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + + return grid, fov_layer + + +def create_agents(grid, world: WorldGraph, texture) -> list: + """Create agent entities in their starting rooms.""" + agents = [] + + # Agent A: Wizard in guard_room + guard_room = world.rooms["guard_room"] + wizard_entity = mcrfpy.Entity( + grid_pos=guard_room.center, + texture=texture, + sprite_index=WIZARD_SPRITE + ) + grid.entities.append(wizard_entity) + agents.append(Agent("Wizard", "a wizard", wizard_entity, world)) + + # Agent B: Knight in armory + armory = world.rooms["armory"] + knight_entity = mcrfpy.Entity( + grid_pos=armory.center, + texture=texture, + sprite_index=KNIGHT_SPRITE + ) + grid.entities.append(knight_entity) + agents.append(Agent("Knight", "a knight", knight_entity, world)) + + return agents + + +def switch_perspective(grid, fov_layer, agent): + """Switch view to agent's perspective.""" + fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + fov_layer.apply_perspective( + entity=agent.entity, + visible=mcrfpy.Color(0, 0, 0, 0), + discovered=mcrfpy.Color(40, 40, 60, 180), + unknown=mcrfpy.Color(0, 0, 0, 255) + ) + agent.entity.update_visibility() + + px, py = agent.pos + grid.center = (px * 16 + 8, py * 16 + 8) + + +def get_visible_agents(grid, observer, all_agents) -> list: + """Get agents visible to the observer.""" + visible = [] + for agent in all_agents: + if agent.name == observer.name: + continue + ax, ay = agent.pos + if grid.is_in_fov(ax, ay): + visible.append(agent) + return visible + + +def query_agent_llm(agent, screenshot_path, context) -> str: + """Query VLLM for agent's action.""" + + system_prompt = f"""You are {agent.display_name} in a roguelike dungeon game. +You see the world through screenshots and receive text descriptions. +Your goal is to explore and interact with your environment. +Always end your response with a clear action declaration: "Action: " +""" + + # Build the user prompt with WorldGraph context + actions_str = ", ".join(context["available_actions"]) + + user_prompt = f"""{context["location"]} + +Available actions: {actions_str} + +Look at the screenshot showing your current view. The dark areas are outside your field of vision. + +What would you like to do? State your reasoning briefly (1-2 sentences), then declare your action. +Example: "I see a key on the ground that might be useful. Action: TAKE brass_key" +""" + + messages = [ + {"role": "system", "content": system_prompt}, + message_with_image(user_prompt, screenshot_path) + ] + + resp = llm_chat_completion(messages) + + if "error" in resp: + return f"[VLLM Error: {resp['error']}]" + return resp.get('choices', [{}])[0].get('message', {}).get('content', 'No response') + + +def run_single_turn(grid, fov_layer, agents, executor, turn_num): + """Execute one turn for all agents.""" + print(f"\n{'='*70}") + print(f"TURN {turn_num}") + print("=" * 70) + + results = [] + + for agent in agents: + print(f"\n--- {agent.name}'s Turn ---") + print(f"Position: {agent.pos} | Room: {agent.current_room}") + + # Switch perspective + switch_perspective(grid, fov_layer, agent) + mcrfpy.step(0.016) + + # Screenshot + screenshot_path = os.path.join(SCREENSHOT_DIR, f"turn{turn_num}_{agent.name.lower()}.png") + automation.screenshot(screenshot_path) + + # Get context using WorldGraph + visible = get_visible_agents(grid, agent, agents) + context = agent.get_context(visible + [agent]) # Include self for filtering + + print(f"Context: {context['location']}") + print(f"Actions: {context['available_actions']}") + + # Query LLM + print(f"\nQuerying VLLM...") + response = query_agent_llm(agent, screenshot_path, context) + print(f"Response: {response[:200]}...") + + # Parse and execute + action = parse_action(response) + print(f"Parsed: {action.type.value} {action.args}") + + result = executor.execute(agent, action) + print(f"Result: {'SUCCESS' if result.success else 'FAILED'} - {result.message}") + + results.append({ + "agent": agent.name, + "context": context, + "response": response, + "action": action, + "result": result + }) + + return results + + +def run_demo(): + """Main demo: single integrated turn.""" + print("=" * 70) + print("Integrated WorldGraph + Action Demo") + print("=" * 70) + + os.makedirs(SCREENSHOT_DIR, exist_ok=True) + + # Create world from WorldGraph + world = create_two_room_scenario() + + # Setup scene + grid, fov_layer = setup_scene(world) + + # Create agents + texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) + agents = create_agents(grid, world, texture) + + # Create executor + executor = ActionExecutor(grid) + + # Run one turn + results = run_single_turn(grid, fov_layer, agents, executor, turn_num=1) + + print("\n" + "=" * 70) + print("Demo Complete") + print("=" * 70) + + return True + + +if __name__ == "__main__": + try: + success = run_demo() + sys.exit(0 if success else 1) + except Exception as e: + import traceback + traceback.print_exc() + sys.exit(1) +``` + +--- + +## Hour 4: Multi-Turn Demo + +### Goal +Run multiple turns with simulation logging for replay. + +### Deliverables + +1. `turn_orchestrator.py` - Turn management and logging +2. `3_multi_turn_demo.py` - Complete multi-turn simulation +3. `simulation_log.json` - Saved output for replay + +--- + +### File: `turn_orchestrator.py` + +```python +""" +Turn Orchestrator +================= + +Manages multi-turn simulation with logging for replay. +""" + +import json +import os +from dataclasses import dataclass, asdict +from typing import List, Dict, Any, Optional +from datetime import datetime + +from world_graph import WorldGraph +from action_parser import Action, ActionType, parse_action +from action_executor import ActionExecutor, ActionResult + + +@dataclass +class SimulationStep: + """Record of one agent's turn.""" + turn: int + agent_id: str + agent_position: tuple + room: str + perception: Dict[str, Any] # Context shown to LLM + llm_response: str # Raw LLM output + parsed_action_type: str # Action type as string + parsed_action_args: tuple # Action arguments + result_success: bool + result_message: str + new_position: Optional[tuple] = None + path: Optional[List[tuple]] = None # For animation + timestamp: str = "" + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now().isoformat() + + +@dataclass +class SimulationLog: + """Complete simulation record.""" + metadata: Dict[str, Any] + steps: List[SimulationStep] + + def save(self, path: str): + """Save log to JSON file.""" + data = { + "metadata": self.metadata, + "steps": [asdict(s) for s in self.steps] + } + with open(path, 'w') as f: + json.dump(data, f, indent=2, default=str) + + @classmethod + def load(cls, path: str) -> 'SimulationLog': + """Load log from JSON file.""" + with open(path) as f: + data = json.load(f) + steps = [SimulationStep(**s) for s in data["steps"]] + return cls(metadata=data["metadata"], steps=steps) + + +class TurnOrchestrator: + """ + Orchestrates multi-turn simulation. + + Handles: + - Turn sequencing + - Perspective switching + - LLM queries + - Action execution + - Simulation logging + """ + + def __init__(self, grid, fov_layer, world: WorldGraph, agents: list, + screenshot_dir: str, llm_query_fn): + self.grid = grid + self.fov_layer = fov_layer + self.world = world + self.agents = agents + self.screenshot_dir = screenshot_dir + self.llm_query_fn = llm_query_fn # Function to query LLM + + self.executor = ActionExecutor(grid) + self.turn_number = 0 + self.steps: List[SimulationStep] = [] + + os.makedirs(screenshot_dir, exist_ok=True) + + def run_turn(self) -> List[SimulationStep]: + """Execute one full turn (all agents act once).""" + self.turn_number += 1 + turn_steps = [] + + for agent in self.agents: + step = self._run_agent_turn(agent) + turn_steps.append(step) + self.steps.append(step) + + return turn_steps + + def run_simulation(self, max_turns: int = 10, + stop_condition=None) -> SimulationLog: + """ + Run complete simulation. + + Args: + max_turns: Maximum number of turns to run + stop_condition: Optional callable(orchestrator) -> bool + + Returns: + SimulationLog with all steps + """ + print(f"\nStarting simulation: max {max_turns} turns") + print("=" * 50) + + for turn in range(max_turns): + print(f"\n--- Turn {turn + 1}/{max_turns} ---") + + self.run_turn() + + # Check stop condition + if stop_condition and stop_condition(self): + print(f"Stop condition met at turn {turn + 1}") + break + + # Create log + log = SimulationLog( + metadata={ + "total_turns": self.turn_number, + "num_agents": len(self.agents), + "agent_names": [a.name for a in self.agents], + "timestamp": datetime.now().isoformat(), + "world_rooms": list(self.world.rooms.keys()), + }, + steps=self.steps + ) + + return log + + def _run_agent_turn(self, agent) -> SimulationStep: + """Execute one agent's turn.""" + from mcrfpy import automation + import mcrfpy + + # Switch perspective + self._switch_perspective(agent) + mcrfpy.step(0.016) + + # Screenshot + screenshot_path = os.path.join( + self.screenshot_dir, + f"turn{self.turn_number}_{agent.name.lower()}.png" + ) + automation.screenshot(screenshot_path) + + # Build context + visible_agents = self._get_visible_agents(agent) + context = agent.get_context(visible_agents + [agent]) + + # Query LLM + llm_response = self.llm_query_fn(agent, screenshot_path, context) + + # Parse and execute + action = parse_action(llm_response) + result = self.executor.execute(agent, action) + + # Log + print(f" {agent.name}: {action.type.value} -> {result.message}") + + return SimulationStep( + turn=self.turn_number, + agent_id=agent.name, + agent_position=agent.pos, + room=agent.current_room, + perception=context, + llm_response=llm_response, + parsed_action_type=action.type.value, + parsed_action_args=action.args, + result_success=result.success, + result_message=result.message, + new_position=result.new_position, + path=result.path + ) + + def _switch_perspective(self, agent): + """Switch grid view to agent's perspective.""" + import mcrfpy + + self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + self.fov_layer.apply_perspective( + entity=agent.entity, + visible=mcrfpy.Color(0, 0, 0, 0), + discovered=mcrfpy.Color(40, 40, 60, 180), + unknown=mcrfpy.Color(0, 0, 0, 255) + ) + agent.entity.update_visibility() + + px, py = agent.pos + self.grid.center = (px * 16 + 8, py * 16 + 8) + + def _get_visible_agents(self, observer) -> list: + """Get agents visible to observer.""" + visible = [] + for agent in self.agents: + if agent.name == observer.name: + continue + ax, ay = agent.pos + if self.grid.is_in_fov(ax, ay): + visible.append(agent) + return visible +``` + +--- + +### File: `3_multi_turn_demo.py` + +```python +#!/usr/bin/env python3 +""" +Multi-Turn Simulation Demo +========================== + +Runs multiple turns of agent interaction with full logging. +This is the Phase 1 implementation from issue #154. +""" + +import mcrfpy +from mcrfpy import automation +import sys +import os +import requests +import base64 + +from world_graph import create_two_room_scenario, AgentInfo +from action_parser import parse_action +from action_executor import ActionExecutor +from turn_orchestrator import TurnOrchestrator, SimulationLog + +# Configuration +VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" +SCREENSHOT_DIR = "/tmp/vllm_multi_turn" +LOG_PATH = "/tmp/vllm_multi_turn/simulation_log.json" +MAX_TURNS = 5 + +# Sprites +FLOOR_TILE = 0 +WALL_TILE = 40 +WIZARD_SPRITE = 84 +KNIGHT_SPRITE = 96 + + +class Agent: + """Agent with WorldGraph integration.""" + def __init__(self, name, display_name, entity, world): + self.name = name + self.display_name = display_name + self.entity = entity + self.world = world + self.message_history = [] + + @property + def pos(self): + return (int(self.entity.pos[0]), int(self.entity.pos[1])) + + @property + def current_room(self): + room = self.world.room_at(*self.pos) + return room.name if room else None + + def get_context(self, visible_agents): + room_name = self.current_room + agent_infos = [ + AgentInfo(a.name, a.display_name, a.pos, is_player=(a.name == self.name)) + for a in visible_agents + ] + return { + "location": self.world.describe_room(room_name, agent_infos, self.name), + "available_actions": self.world.get_available_actions(room_name), + "recent_messages": self.message_history[-5:], + } + + +def file_to_base64(path): + with open(path, 'rb') as f: + return base64.b64encode(f.read()).decode('utf-8') + + +def llm_query(agent, screenshot_path, context) -> str: + """Query VLLM for agent action.""" + system = f"""You are {agent.display_name} exploring a dungeon. +You receive visual and text information about your surroundings. +Always end with: Action: """ + + actions_str = ", ".join(context["available_actions"]) + user = f"""{context["location"]} + +Available: {actions_str} + +[Screenshot attached showing your view] + +What do you do? Brief reasoning, then Action: """ + + messages = [ + {"role": "system", "content": system}, + { + "role": "user", + "content": [ + {"type": "text", "text": user}, + {"type": "image_url", "image_url": { + "url": "data:image/png;base64," + file_to_base64(screenshot_path) + }} + ] + } + ] + + try: + resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) + data = resp.json() + if "error" in data: + return f"[Error: {data['error']}]" + return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response') + except Exception as e: + return f"[Error: {e}]" + + +def setup_scene(world): + """Create scene from WorldGraph.""" + mcrfpy.createScene("multi_turn") + mcrfpy.setScene("multi_turn") + ui = mcrfpy.sceneUI("multi_turn") + + texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) + + grid = mcrfpy.Grid( + grid_size=(25, 15), + texture=texture, + pos=(5, 5), + size=(1014, 700) + ) + grid.fill_color = mcrfpy.Color(20, 20, 30) + grid.zoom = 2.0 + ui.append(grid) + + # Walls everywhere first + for x in range(25): + for y in range(15): + p = grid.at(x, y) + p.tilesprite = WALL_TILE + p.walkable = False + p.transparent = False + + # Carve rooms + for room in world.rooms.values(): + for rx in range(room.x, room.x + room.width): + for ry in range(room.y, room.y + room.height): + if 0 <= rx < 25 and 0 <= ry < 15: + p = grid.at(rx, ry) + p.tilesprite = FLOOR_TILE + p.walkable = True + p.transparent = True + + # Place doors + for door in world.doors: + dx, dy = door.position + if 0 <= dx < 25 and 0 <= dy < 15: + p = grid.at(dx, dy) + p.tilesprite = FLOOR_TILE + p.walkable = not door.locked + p.transparent = True + + # FOV layer + fov_layer = grid.add_layer('color', z_index=10) + fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + + return grid, fov_layer, texture + + +def create_agents(grid, world, texture): + """Create agents in starting positions.""" + agents = [] + + # Wizard in guard_room + room_a = world.rooms["guard_room"] + wizard = mcrfpy.Entity(grid_pos=room_a.center, texture=texture, sprite_index=WIZARD_SPRITE) + grid.entities.append(wizard) + agents.append(Agent("Wizard", "a wizard", wizard, world)) + + # Knight in armory + room_b = world.rooms["armory"] + knight = mcrfpy.Entity(grid_pos=room_b.center, texture=texture, sprite_index=KNIGHT_SPRITE) + grid.entities.append(knight) + agents.append(Agent("Knight", "a knight", knight, world)) + + return agents + + +def run_demo(): + """Run multi-turn simulation.""" + print("=" * 70) + print("Multi-Turn Simulation Demo") + print(f"Running {MAX_TURNS} turns with 2 agents") + print("=" * 70) + + os.makedirs(SCREENSHOT_DIR, exist_ok=True) + + # Setup + world = create_two_room_scenario() + grid, fov_layer, texture = setup_scene(world) + agents = create_agents(grid, world, texture) + + # Create orchestrator + orchestrator = TurnOrchestrator( + grid=grid, + fov_layer=fov_layer, + world=world, + agents=agents, + screenshot_dir=SCREENSHOT_DIR, + llm_query_fn=llm_query + ) + + # Run simulation + log = orchestrator.run_simulation(max_turns=MAX_TURNS) + + # Save log + log.save(LOG_PATH) + print(f"\nSimulation log saved to: {LOG_PATH}") + + # Summary + print("\n" + "=" * 70) + print("SIMULATION SUMMARY") + print("=" * 70) + print(f"Total turns: {log.metadata['total_turns']}") + print(f"Total steps: {len(log.steps)}") + + # Per-agent summary + for agent_name in log.metadata['agent_names']: + agent_steps = [s for s in log.steps if s.agent_id == agent_name] + successes = sum(1 for s in agent_steps if s.result_success) + print(f"\n{agent_name}:") + print(f" Actions: {len(agent_steps)}") + print(f" Successful: {successes}") + print(f" Final position: {agent_steps[-1].new_position or agent_steps[-1].agent_position}") + + return True + + +if __name__ == "__main__": + try: + success = run_demo() + print("\nPASS" if success else "\nFAIL") + sys.exit(0 if success else 1) + except Exception as e: + import traceback + traceback.print_exc() + sys.exit(1) +``` + +--- + +## Success Criteria + +### Hour 3 Integration +- [ ] WorldGraph generates scene tiles correctly +- [ ] Agents receive IF-style room descriptions from WorldGraph +- [ ] Available actions list appears in LLM prompt +- [ ] Actions are parsed and executed +- [ ] Single turn completes successfully + +### Hour 4 Multi-Turn +- [ ] TurnOrchestrator cycles through all agents +- [ ] Multiple turns run sequentially +- [ ] SimulationLog captures all steps +- [ ] Log saves to JSON correctly +- [ ] Log can be loaded back +- [ ] Summary shows agent actions and positions + +--- + +## Example Output + +``` +====================================================================== +Multi-Turn Simulation Demo +Running 5 turns with 2 agents +====================================================================== + +Starting simulation: max 5 turns +================================================== + +--- Turn 1/5 --- + Wizard: GO EAST -> Moved east to (6, 4) + Knight: WAIT -> Waited and observed surroundings + +--- Turn 2/5 --- + Wizard: GO EAST -> Moved east to (7, 4) + Knight: GO WEST -> Moved west to (14, 4) + +[... more turns ...] + +====================================================================== +SIMULATION SUMMARY +====================================================================== +Total turns: 5 +Total steps: 10 + +Wizard: + Actions: 5 + Successful: 4 + Final position: (9, 4) + +Knight: + Actions: 5 + Successful: 3 + Final position: (11, 4) + +Simulation log saved to: /tmp/vllm_multi_turn/simulation_log.json + +PASS +``` + +--- + +## Next Steps (Future Sessions) + +After Hours 3-4 are complete: + +1. **Speech System** - Add ANNOUNCE/SPEAK actions with message passing +2. **Button-Door Puzzle** - Use `create_button_door_scenario()` for coordination test +3. **Animated Replay** - Play back simulation with movement animations +4. **NPC Behaviors** - Add scripted entities (patrol, flee, etc.) +5. **Affordance Learning** - Track what agents discover about objects