diff --git a/tests/vllm_demo/1_multi_agent_demo.py b/tests/vllm_demo/1_multi_agent_demo.py index debc98e..50e06fb 100644 --- a/tests/vllm_demo/1_multi_agent_demo.py +++ b/tests/vllm_demo/1_multi_agent_demo.py @@ -22,9 +22,6 @@ import base64 import os import random -from action_parser import parse_action -from action_executor import ActionExecutor - # VLLM configuration VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" SCREENSHOT_DIR = "/tmp/vllm_multi_agent" @@ -287,9 +284,6 @@ def run_demo(): # Setup scene grid, fov_layer, agents, rat = setup_scene() - # Create action executor - executor = ActionExecutor(grid) - # Cycle through each agent's perspective for i, agent in enumerate(agents): print(f"\n{'='*70}") @@ -325,21 +319,6 @@ def run_demo(): print(f"\n{agent.name}'s Response:\n{response}") print() - # Parse and execute action - print(f"--- Action Execution ---") - action = parse_action(response) - print(f"Parsed action: {action.type.value} {action.args}") - - result = executor.execute(agent, action) - if result.success: - print(f"SUCCESS: {result.message}") - if result.new_position: - # Update perspective after movement - switch_perspective(grid, fov_layer, agent) - mcrfpy.step(0.016) - else: - print(f"FAILED: {result.message}") - print("\n" + "=" * 70) print("Multi-Agent Demo Complete") print("=" * 70) diff --git a/tests/vllm_demo/2025-12-14_HOUR-1-PLAN.md b/tests/vllm_demo/2025-12-14_HOUR-1-PLAN.md deleted file mode 100644 index 912083b..0000000 --- a/tests/vllm_demo/2025-12-14_HOUR-1-PLAN.md +++ /dev/null @@ -1,391 +0,0 @@ -# Hour 1: Action Parser & Executor - -**Issue**: #156 Turn-based LLM Agent Orchestration -**Goal**: Agents can actually move when they say "GO EAST" -**Parallelizable with**: Hour 2 (no dependencies) - ---- - -## Deliverables - -1. `action_parser.py` - Parse LLM text responses into structured actions -2. `action_executor.py` - Execute parsed actions in the game world -3. Modified `1_multi_agent_demo.py` - Integrate parser/executor to show movement - ---- - -## File 1: `action_parser.py` - -```python -""" -Action Parser for LLM Agent Responses -===================================== - -Extracts structured actions from free-form LLM text responses. -Handles variations like "Action: GO EAST", "I'll go east", "GO E", etc. -""" - -import re -from dataclasses import dataclass -from typing import Optional, Tuple, Any -from enum import Enum - -class ActionType(Enum): - GO = "GO" - WAIT = "WAIT" - LOOK = "LOOK" - TAKE = "TAKE" - DROP = "DROP" - PUSH = "PUSH" - USE = "USE" - OPEN = "OPEN" - CLOSE = "CLOSE" - ANNOUNCE = "ANNOUNCE" - SPEAK = "SPEAK" - INVALID = "INVALID" - -@dataclass -class Action: - type: ActionType - args: Tuple[Any, ...] = () - raw_match: str = "" - -class ActionParser: - """Parse LLM responses into structured actions.""" - - # Direction normalization - DIRECTIONS = { - 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', - 'NORTH': 'NORTH', 'SOUTH': 'SOUTH', 'EAST': 'EAST', 'WEST': 'WEST', - 'UP': 'NORTH', 'DOWN': 'SOUTH', 'LEFT': 'WEST', 'RIGHT': 'EAST', - } - - # Patterns ordered by specificity (most specific first) - PATTERNS = [ - # Explicit "Action: X" format (preferred) - (ActionType.GO, r'Action:\s*GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.WAIT, r'Action:\s*WAIT\b', 0), - (ActionType.LOOK, r'Action:\s*LOOK(?:\s+AT\s+(\w+))?\b', 1), - (ActionType.TAKE, r'Action:\s*TAKE\s+(\w+)', 1), - (ActionType.DROP, r'Action:\s*DROP\s+(\w+)', 1), - (ActionType.PUSH, r'Action:\s*PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)', 2), - (ActionType.USE, r'Action:\s*USE\s+(\w+)(?:\s+ON\s+(\w+))?', 2), - (ActionType.OPEN, r'Action:\s*OPEN\s+(\w+)', 1), - (ActionType.CLOSE, r'Action:\s*CLOSE\s+(\w+)', 1), - (ActionType.ANNOUNCE, r'Action:\s*ANNOUNCE\s+["\'](.+?)["\']', 1), - (ActionType.SPEAK, r'Action:\s*SPEAK\s+["\'](.+?)["\']', 1), - - # Fallback patterns (less strict) - (ActionType.GO, r'\bGO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.GO, r'\bmove\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.GO, r'\bhead\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.WAIT, r'\bWAIT\b', 0), - (ActionType.LOOK, r'\bLOOK\b', 0), - ] - - def parse(self, llm_response: str) -> Action: - """ - Parse an LLM response and extract the action. - - Returns Action with type=INVALID if no valid action found. - """ - # Normalize to uppercase for matching - text = llm_response.upper() - - for action_type, pattern, num_groups in self.PATTERNS: - match = re.search(pattern, text, re.IGNORECASE) - if match: - args = self._extract_args(match, num_groups, action_type) - return Action( - type=action_type, - args=args, - raw_match=match.group(0) - ) - - # No valid action found - return Action( - type=ActionType.INVALID, - args=(llm_response[:100],), # First 100 chars for debugging - raw_match="" - ) - - def _extract_args(self, match, num_groups: int, action_type: ActionType) -> tuple: - """Extract and normalize arguments from regex match.""" - if num_groups == 0: - return () - - args = [] - for i in range(1, num_groups + 1): - group = match.group(i) - if group: - # Normalize directions - if action_type == ActionType.GO or (action_type == ActionType.PUSH and i == 2): - group = self.DIRECTIONS.get(group.upper(), group.upper()) - args.append(group) - else: - args.append(None) - - return tuple(args) - - -# Convenience function -def parse_action(llm_response: str) -> Action: - """Parse an LLM response into an Action.""" - return ActionParser().parse(llm_response) -``` - ---- - -## File 2: `action_executor.py` - -```python -""" -Action Executor for McRogueFace -=============================== - -Executes parsed actions in the game world. -Handles movement, collision detection, and action results. -""" - -from dataclasses import dataclass -from typing import Optional, List, Tuple -from action_parser import Action, ActionType - -@dataclass -class ActionResult: - success: bool - message: str - new_position: Optional[Tuple[int, int]] = None - path: Optional[List[Tuple[int, int]]] = None # For animation replay - -class ActionExecutor: - """Execute actions in the McRogueFace game world.""" - - # Direction vectors - DIRECTION_VECTORS = { - 'NORTH': (0, -1), - 'SOUTH': (0, 1), - 'EAST': (1, 0), - 'WEST': (-1, 0), - } - - def __init__(self, grid): - """ - Initialize executor with a grid reference. - - Args: - grid: mcrfpy.Grid instance - """ - self.grid = grid - - def execute(self, agent, action: Action) -> ActionResult: - """ - Execute an action for an agent. - - Args: - agent: Agent wrapper with .entity attribute - action: Parsed Action to execute - - Returns: - ActionResult with success status and message - """ - handlers = { - ActionType.GO: self._execute_go, - ActionType.WAIT: self._execute_wait, - ActionType.LOOK: self._execute_look, - ActionType.TAKE: self._execute_take, - ActionType.DROP: self._execute_drop, - ActionType.INVALID: self._execute_invalid, - } - - handler = handlers.get(action.type, self._execute_unimplemented) - return handler(agent, action) - - def _execute_go(self, agent, action: Action) -> ActionResult: - """Execute movement in a direction.""" - if not action.args or not action.args[0]: - return ActionResult(False, "No direction specified") - - direction = action.args[0] - if direction not in self.DIRECTION_VECTORS: - return ActionResult(False, f"Invalid direction: {direction}") - - dx, dy = self.DIRECTION_VECTORS[direction] - - # Get current position - current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - new_x, new_y = current_x + dx, current_y + dy - - # Check bounds - grid_w, grid_h = self.grid.grid_size - if not (0 <= new_x < grid_w and 0 <= new_y < grid_h): - return ActionResult(False, f"Cannot go {direction} - edge of map") - - # Check walkability - target_cell = self.grid.at(new_x, new_y) - if not target_cell.walkable: - return ActionResult(False, f"Cannot go {direction} - path blocked") - - # Check for entity collision (optional - depends on game rules) - for entity in self.grid.entities: - if entity is agent.entity: - continue - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if ex == new_x and ey == new_y: - return ActionResult(False, f"Cannot go {direction} - someone is there") - - # Execute movement - agent.entity.grid_pos = (new_x, new_y) - - return ActionResult( - success=True, - message=f"Moved {direction.lower()} to ({new_x}, {new_y})", - new_position=(new_x, new_y), - path=[(current_x, current_y), (new_x, new_y)] - ) - - def _execute_wait(self, agent, action: Action) -> ActionResult: - """Execute wait action (no-op).""" - return ActionResult(True, "Waited and observed surroundings") - - def _execute_look(self, agent, action: Action) -> ActionResult: - """Execute look action - returns enhanced observation.""" - target = action.args[0] if action.args else None - if target: - return ActionResult(True, f"Examined {target} closely") - return ActionResult(True, "Looked around carefully") - - def _execute_take(self, agent, action: Action) -> ActionResult: - """Execute take action (placeholder).""" - item = action.args[0] if action.args else "unknown" - # TODO: Implement inventory system - return ActionResult(False, f"Cannot take {item} - not implemented yet") - - def _execute_drop(self, agent, action: Action) -> ActionResult: - """Execute drop action (placeholder).""" - item = action.args[0] if action.args else "unknown" - return ActionResult(False, f"Cannot drop {item} - not implemented yet") - - def _execute_invalid(self, agent, action: Action) -> ActionResult: - """Handle invalid/unparseable action.""" - return ActionResult(False, f"Could not understand action: {action.args[0]}") - - def _execute_unimplemented(self, agent, action: Action) -> ActionResult: - """Handle unimplemented action types.""" - return ActionResult(False, f"Action {action.type.value} not yet implemented") -``` - ---- - -## Modifications to `1_multi_agent_demo.py` - -Add these changes after the existing `query_agent` function: - -```python -# Add imports at top -from action_parser import parse_action -from action_executor import ActionExecutor, ActionResult - -# In run_demo(), after setup_scene(): -executor = ActionExecutor(grid) - -# Replace the agent loop with: -for i, agent in enumerate(agents): - print(f"\n{'='*70}") - print(f"Agent {i+1}/3: {agent.name} ({agent.description})") - print(f"Position: {agent.pos}") - print("=" * 70) - - # Switch to this agent's perspective - switch_perspective(grid, fov_layer, agent) - mcrfpy.step(0.016) - - # Take screenshot - screenshot_path = os.path.join(SCREENSHOT_DIR, f"{i}_{agent.name.lower()}_view.png") - result = automation.screenshot(screenshot_path) - if not result: - print(f"ERROR: Failed to take screenshot for {agent.name}") - continue - - # Get visible entities and query VLLM - visible = get_visible_entities(grid, agent, agents, rat) - grounded_text = build_grounded_prompt(visible) - print(f"Grounded observations: {grounded_text}") - - print(f"\nQuerying VLLM for {agent.name}...") - response = query_agent(agent, screenshot_path, grounded_text) - print(f"\n{agent.name}'s Response:\n{response}") - - # NEW: Parse and execute action - print(f"\n--- Action Execution ---") - action = parse_action(response) - print(f"Parsed action: {action.type.value} {action.args}") - - result = executor.execute(agent, action) - if result.success: - print(f"SUCCESS: {result.message}") - if result.new_position: - # Update perspective after movement - switch_perspective(grid, fov_layer, agent) - mcrfpy.step(0.016) - else: - print(f"FAILED: {result.message}") -``` - ---- - -## Testing - -### Unit test for parser (`test_action_parser.py`): - -```python -from action_parser import parse_action, ActionType - -def test_parser(): - # Explicit format - assert parse_action("Action: GO NORTH").type == ActionType.GO - assert parse_action("Action: GO NORTH").args == ("NORTH",) - - # Short directions - assert parse_action("Action: GO E").args == ("EAST",) - - # Case insensitive - assert parse_action("action: go south").type == ActionType.GO - - # Fallback patterns - assert parse_action("I think I'll GO WEST").type == ActionType.GO - - # Wait and Look - assert parse_action("Action: WAIT").type == ActionType.WAIT - assert parse_action("Action: LOOK").type == ActionType.LOOK - - # Invalid - assert parse_action("I'm not sure what to do").type == ActionType.INVALID - - print("All parser tests passed!") - -if __name__ == "__main__": - test_parser() -``` - ---- - -## Success Criteria - -- [ ] `action_parser.py` correctly parses all GO directions (N/S/E/W and full names) -- [ ] `action_parser.py` handles WAIT, LOOK, and INVALID cases -- [ ] `action_executor.py` moves entities when GO succeeds -- [ ] `action_executor.py` returns failure message when path is blocked -- [ ] Modified demo shows "Moved east to (5, 7)" style output -- [ ] Entities visibly change position between turns - ---- - -## Notes for Integration (Hour 3) - -The `ActionExecutor` will be enhanced in Hour 3 to: -- Use `WorldGraph` for room-based movement (GO NORTH = walk through door to next room) -- Support multi-tile pathfinding for room transitions -- Return path data for animation replay - -Keep the current single-tile movement as the foundation. diff --git a/tests/vllm_demo/2025-12-14_HOUR-2-PLAN.md b/tests/vllm_demo/2025-12-14_HOUR-2-PLAN.md deleted file mode 100644 index 7ddb723..0000000 --- a/tests/vllm_demo/2025-12-14_HOUR-2-PLAN.md +++ /dev/null @@ -1,684 +0,0 @@ -# Hour 2: WorldGraph Foundation - -**Issue**: #155 Deterministic Text Descriptions From Room Graph -**Goal**: Structured room data that generates both tilemaps AND text descriptions -**Parallelizable with**: Hour 1 (no dependencies) - ---- - -## Deliverables - -1. `world_graph.py` - Core data structures and description generation -2. `test_world_graph.py` - Unit tests for WorldGraph functionality -3. Example scenario: two connected rooms with a door - ---- - -## File 1: `world_graph.py` - -```python -""" -WorldGraph: Room-based World Representation -============================================ - -Provides dual-purpose data structures for: -1. Generating 2D tilemaps (visual representation) -2. Generating text descriptions (LLM context) - -Ensures deterministic text output: same state = same description. -""" - -from dataclasses import dataclass, field -from typing import Dict, List, Optional, Tuple, Any -from enum import Enum - - -class Direction(Enum): - NORTH = "north" - SOUTH = "south" - EAST = "east" - WEST = "west" - - @property - def opposite(self) -> 'Direction': - opposites = { - Direction.NORTH: Direction.SOUTH, - Direction.SOUTH: Direction.NORTH, - Direction.EAST: Direction.WEST, - Direction.WEST: Direction.EAST, - } - return opposites[self] - - @property - def vector(self) -> Tuple[int, int]: - vectors = { - Direction.NORTH: (0, -1), - Direction.SOUTH: (0, 1), - Direction.EAST: (1, 0), - Direction.WEST: (-1, 0), - } - return vectors[self] - - -@dataclass -class Room: - """A room in the world graph.""" - name: str # Internal ID: "kitchen", "guard_room" - display_name: str # Text output: "the kitchen", "a dimly lit guard room" - bounds: Tuple[int, int, int, int] # (x, y, width, height) in tile coords - properties: Dict[str, Any] = field(default_factory=dict) # {"lit": True, "temperature": "warm"} - description_template: Optional[str] = None # "A {temperature} room with {features}." - - @property - def x(self) -> int: - return self.bounds[0] - - @property - def y(self) -> int: - return self.bounds[1] - - @property - def width(self) -> int: - return self.bounds[2] - - @property - def height(self) -> int: - return self.bounds[3] - - @property - def center(self) -> Tuple[int, int]: - return (self.x + self.width // 2, self.y + self.height // 2) - - def contains(self, x: int, y: int) -> bool: - """Check if a tile coordinate is within this room.""" - return (self.x <= x < self.x + self.width and - self.y <= y < self.y + self.height) - - -@dataclass -class Door: - """A connection between two rooms.""" - room_a: str # Room name - room_b: str # Room name - position: Tuple[int, int] # Tile position of the door - direction_from_a: Direction # Direction from room_a to reach room_b - locked: bool = False - key_id: Optional[str] = None # Which key unlocks this door - - @property - def direction_from_b(self) -> Direction: - return self.direction_from_a.opposite - - -@dataclass -class WorldObject: - """An interactable object in the world.""" - name: str # Internal ID: "brass_key" - display_name: str # Text output: "a brass key" - room: str # Which room contains it - position: Tuple[int, int] # Tile position (or None if carried) - affordances: List[str] = field(default_factory=list) # ["takeable", "unlocks:pantry_door"] - description: str = "" # "A tarnished brass key with ornate handle." - - -@dataclass -class AgentInfo: - """Information about an agent for description purposes.""" - name: str # "Wizard", "Knight" - display_name: str # "a wizard", "the knight" - position: Tuple[int, int] # Current tile position - is_player: bool = False # Is this the observing agent? - - -class WorldGraph: - """ - Graph-based world representation. - - Provides: - - Room/door/object storage - - Deterministic text description generation - - Spatial queries (what room is at x,y?) - - Available action enumeration - """ - - def __init__(self): - self.rooms: Dict[str, Room] = {} - self.doors: List[Door] = [] - self.objects: Dict[str, WorldObject] = {} - - # ========================================================================= - # Building the World - # ========================================================================= - - def add_room(self, room: Room) -> None: - """Add a room to the world.""" - self.rooms[room.name] = room - - def add_door(self, door: Door) -> None: - """Add a door connecting two rooms.""" - self.doors.append(door) - - def add_object(self, obj: WorldObject) -> None: - """Add an object to the world.""" - self.objects[obj.name] = obj - - # ========================================================================= - # Spatial Queries - # ========================================================================= - - def room_at(self, x: int, y: int) -> Optional[Room]: - """Get the room containing a tile coordinate.""" - for room in self.rooms.values(): - if room.contains(x, y): - return room - return None - - def get_exits(self, room_name: str) -> List[Door]: - """Get all doors leading out of a room.""" - exits = [] - for door in self.doors: - if door.room_a == room_name or door.room_b == room_name: - exits.append(door) - return exits - - def get_door_in_direction(self, room_name: str, direction: Direction) -> Optional[Door]: - """Get the door in a specific direction from a room.""" - for door in self.doors: - if door.room_a == room_name and door.direction_from_a == direction: - return door - if door.room_b == room_name and door.direction_from_b == direction: - return door - return None - - def get_objects_in_room(self, room_name: str) -> List[WorldObject]: - """Get all objects in a room.""" - return [obj for obj in self.objects.values() if obj.room == room_name] - - # ========================================================================= - # Text Description Generation (Deterministic!) - # ========================================================================= - - def describe_room(self, room_name: str, - visible_agents: List[AgentInfo] = None, - observer_name: str = None) -> str: - """ - Generate a complete room description. - - Args: - room_name: The room to describe - visible_agents: List of agents visible in the room - observer_name: Name of the observing agent (excluded from description) - - Returns: - Deterministic prose description of the room - """ - room = self.rooms.get(room_name) - if not room: - return "You are in an unknown location." - - parts = [] - - # Base location - parts.append(f"You are in {room.display_name}.") - - # Room template description (if any) - if room.description_template and room.properties: - try: - desc = room.description_template.format(**room.properties) - parts.append(desc) - except KeyError: - pass - - # Visible agents - if visible_agents: - agent_desc = self._describe_agents(visible_agents, observer_name) - if agent_desc: - parts.append(agent_desc) - - # Objects on the ground - objects = self.get_objects_in_room(room_name) - if objects: - obj_desc = self._describe_objects(objects) - parts.append(obj_desc) - - # Exits - exits = self.get_exits(room_name) - parts.append(self._describe_exits(room_name, exits)) - - return " ".join(parts) - - def _describe_agents(self, agents: List[AgentInfo], observer_name: str = None) -> str: - """Describe visible agents (excluding observer).""" - others = [a for a in agents if a.name != observer_name and not a.is_player] - if not others: - return "" - - if len(others) == 1: - return f"You see {others[0].display_name} here." - else: - names = [a.display_name for a in others] - formatted = ", ".join(names[:-1]) + f" and {names[-1]}" - return f"You see {formatted} here." - - def _describe_objects(self, objects: List[WorldObject]) -> str: - """Describe objects in the room.""" - if not objects: - return "" - - # Group by affordance for natural description - takeable = [o for o in objects if "takeable" in o.affordances] - furniture = [o for o in objects if "takeable" not in o.affordances] - - parts = [] - if takeable: - if len(takeable) == 1: - parts.append(f"On the ground you see {takeable[0].display_name}.") - else: - names = [o.display_name for o in takeable] - formatted = ", ".join(names[:-1]) + f" and {names[-1]}" - parts.append(f"On the ground you see {formatted}.") - - if furniture: - for obj in furniture: - parts.append(f"There is {obj.display_name} here.") - - return " ".join(parts) - - def _describe_exits(self, room_name: str, exits: List[Door]) -> str: - """Describe available exits.""" - if not exits: - return "There are no visible exits." - - exit_parts = [] - for door in exits: - # Determine direction and destination from this room's perspective - if door.room_a == room_name: - direction = door.direction_from_a.value - dest_room = self.rooms.get(door.room_b) - else: - direction = door.direction_from_b.value - dest_room = self.rooms.get(door.room_a) - - dest_name = dest_room.display_name if dest_room else "unknown" - - if door.locked: - exit_parts.append(f"{direction} ({dest_name}, locked)") - else: - exit_parts.append(f"{direction} ({dest_name})") - - # Sort for deterministic output - exit_parts.sort() - - return "Exits: " + ", ".join(exit_parts) + "." - - # ========================================================================= - # Action Enumeration - # ========================================================================= - - def get_available_actions(self, room_name: str, - can_speak: bool = True) -> List[str]: - """ - Get list of available actions for an agent in a room. - - Returns list of action strings like: - ["GO NORTH", "GO EAST", "TAKE brass_key", "WAIT", "LOOK"] - """ - actions = ["LOOK", "WAIT"] - - # Movement actions - for door in self.get_exits(room_name): - if door.room_a == room_name: - direction = door.direction_from_a.value.upper() - else: - direction = door.direction_from_b.value.upper() - - if not door.locked: - actions.append(f"GO {direction}") - else: - # Could add UNLOCK action here if agent has key - pass - - # Object interactions - for obj in self.get_objects_in_room(room_name): - if "takeable" in obj.affordances: - actions.append(f"TAKE {obj.name}") - if "pushable" in obj.affordances: - actions.append(f"PUSH {obj.name} ") - if "openable" in obj.affordances: - actions.append(f"OPEN {obj.name}") - if "readable" in obj.affordances: - actions.append(f"READ {obj.name}") - - # Speech actions - if can_speak: - actions.append("ANNOUNCE ''") - actions.append("SPEAK ''") - - return sorted(actions) - - -# ============================================================================= -# Factory Functions for Common Scenarios -# ============================================================================= - -def create_two_room_scenario() -> WorldGraph: - """ - Create a simple two-room test scenario. - - Layout: - +--------+ +--------+ - | Room A |===| Room B | - | (west) | | (east) | - +--------+ +--------+ - - Room A: "the guard room" - contains a brass key - Room B: "the armory" - destination room - Door: unlocked, between rooms - """ - world = WorldGraph() - - # Room A (left side) - room_a = Room( - name="guard_room", - display_name="the guard room", - bounds=(1, 1, 8, 8), # x, y, width, height - properties={"lit": True, "atmosphere": "musty"}, - description_template="The air is {atmosphere}." - ) - world.add_room(room_a) - - # Room B (right side) - room_b = Room( - name="armory", - display_name="the armory", - bounds=(11, 1, 8, 8), - properties={"lit": True, "atmosphere": "cold"}, - description_template="Weapon racks line the walls." - ) - world.add_room(room_b) - - # Door connecting them - door = Door( - room_a="guard_room", - room_b="armory", - position=(9, 4), # Between the rooms - direction_from_a=Direction.EAST, - locked=False - ) - world.add_door(door) - - # Object in Room A - key = WorldObject( - name="brass_key", - display_name="a brass key", - room="guard_room", - position=(3, 3), - affordances=["takeable", "unlocks:dungeon_door"], - description="A tarnished brass key with an ornate handle." - ) - world.add_object(key) - - return world - - -def create_button_door_scenario() -> WorldGraph: - """ - Create the Phase 1 scenario from issue #154. - - Layout: - +----------+ +----------+ - | Room A | | Room B | - | [Button] |===| [Goal] | - | Agent A | | Agent B | - +----------+ +----------+ - - - Door starts locked - - Button in Room A unlocks the door - - Agent A can reach button; Agent B's goal is blocked by door - - Success: Agents coordinate to solve puzzle - """ - world = WorldGraph() - - # Room A (button room) - room_a = Room( - name="button_room", - display_name="the button room", - bounds=(1, 1, 8, 8), - properties={"lit": True} - ) - world.add_room(room_a) - - # Room B (goal room) - room_b = Room( - name="goal_room", - display_name="the goal room", - bounds=(11, 1, 8, 8), - properties={"lit": True} - ) - world.add_room(room_b) - - # Locked door - door = Door( - room_a="button_room", - room_b="goal_room", - position=(9, 4), - direction_from_a=Direction.EAST, - locked=True, - key_id="button_mechanism" - ) - world.add_door(door) - - # Button in Room A - button = WorldObject( - name="wall_button", - display_name="a large button on the wall", - room="button_room", - position=(2, 4), - affordances=["pressable", "activates:main_door"], - description="A heavy stone button protrudes from the wall." - ) - world.add_object(button) - - # Goal marker in Room B - goal = WorldObject( - name="goal_marker", - display_name="a glowing rune on the floor", - room="goal_room", - position=(15, 4), - affordances=["examinable"], - description="An arcane symbol pulses with soft light." - ) - world.add_object(goal) - - return world -``` - ---- - -## File 2: `test_world_graph.py` - -```python -""" -Unit tests for WorldGraph -""" - -from world_graph import ( - WorldGraph, Room, Door, WorldObject, Direction, - AgentInfo, create_two_room_scenario, create_button_door_scenario -) - -def test_room_contains(): - """Test room boundary checking.""" - room = Room("test", "test room", bounds=(5, 5, 10, 10)) - - assert room.contains(5, 5) == True # Top-left corner - assert room.contains(14, 14) == True # Bottom-right (exclusive) - assert room.contains(15, 15) == False # Outside - assert room.contains(4, 5) == False # Just outside left - - print("PASS: room_contains") - -def test_room_at(): - """Test spatial room lookup.""" - world = create_two_room_scenario() - - # Guard room is at (1,1) with size (8,8) - room = world.room_at(3, 3) - assert room is not None - assert room.name == "guard_room" - - # Armory is at (11,1) with size (8,8) - room = world.room_at(13, 3) - assert room is not None - assert room.name == "armory" - - # Between rooms (the door area) - should return None - room = world.room_at(9, 4) - assert room is None - - print("PASS: room_at") - -def test_describe_room_basic(): - """Test basic room description.""" - world = create_two_room_scenario() - - desc = world.describe_room("guard_room") - - assert "You are in the guard room" in desc - assert "brass key" in desc - assert "Exits:" in desc - assert "east" in desc - assert "armory" in desc - - print("PASS: describe_room_basic") - print(f" Output: {desc}") - -def test_describe_room_with_agents(): - """Test room description with visible agents.""" - world = create_two_room_scenario() - - agents = [ - AgentInfo("Wizard", "a wizard", (3, 3)), - AgentInfo("Knight", "a knight", (4, 4)), - ] - - desc = world.describe_room("guard_room", visible_agents=agents, observer_name="Wizard") - - assert "knight" in desc.lower() - assert "wizard" not in desc.lower() # Observer excluded - - print("PASS: describe_room_with_agents") - print(f" Output: {desc}") - -def test_describe_locked_door(): - """Test that locked doors are described correctly.""" - world = create_button_door_scenario() - - desc = world.describe_room("button_room") - - assert "locked" in desc.lower() - - print("PASS: describe_locked_door") - print(f" Output: {desc}") - -def test_available_actions(): - """Test action enumeration.""" - world = create_two_room_scenario() - - actions = world.get_available_actions("guard_room") - - assert "GO EAST" in actions - assert "TAKE brass_key" in actions - assert "LOOK" in actions - assert "WAIT" in actions - - print("PASS: available_actions") - print(f" Actions: {actions}") - -def test_determinism(): - """Test that descriptions are deterministic.""" - world = create_two_room_scenario() - - desc1 = world.describe_room("guard_room") - desc2 = world.describe_room("guard_room") - desc3 = world.describe_room("guard_room") - - assert desc1 == desc2 == desc3, "Descriptions must be deterministic!" - - print("PASS: determinism") - -def test_direction_opposites(): - """Test direction opposite calculation.""" - assert Direction.NORTH.opposite == Direction.SOUTH - assert Direction.SOUTH.opposite == Direction.NORTH - assert Direction.EAST.opposite == Direction.WEST - assert Direction.WEST.opposite == Direction.EAST - - print("PASS: direction_opposites") - -def run_all_tests(): - """Run all WorldGraph tests.""" - print("=" * 50) - print("WorldGraph Unit Tests") - print("=" * 50) - - test_room_contains() - test_room_at() - test_describe_room_basic() - test_describe_room_with_agents() - test_describe_locked_door() - test_available_actions() - test_determinism() - test_direction_opposites() - - print("=" * 50) - print("All tests passed!") - print("=" * 50) - -if __name__ == "__main__": - run_all_tests() -``` - ---- - -## Example Output - -When `describe_room("guard_room")` is called: - -``` -You are in the guard room. The air is musty. On the ground you see a brass key. -Exits: east (the armory). -``` - -When `describe_room("button_room")` with locked door: - -``` -You are in the button room. There is a large button on the wall here. -Exits: east (the goal room, locked). -``` - ---- - -## Success Criteria - -- [ ] `Room`, `Door`, `WorldObject` dataclasses defined with all fields -- [ ] `WorldGraph.room_at(x, y)` returns correct room -- [ ] `WorldGraph.describe_room()` produces IF-style prose -- [ ] Descriptions include visible agents, objects, and exits -- [ ] Locked doors are marked as "(locked)" in exit descriptions -- [ ] `get_available_actions()` returns appropriate action list -- [ ] All tests pass -- [ ] Output is deterministic (same input = same output) - ---- - -## Notes for Integration (Hour 3) - -The `WorldGraph` will be integrated with the demo by: - -1. Creating a scenario using factory functions -2. Calling `world.room_at(agent.x, agent.y)` to get current room -3. Calling `world.describe_room()` instead of ad-hoc `build_grounded_prompt()` -4. Including `world.get_available_actions()` in the LLM prompt - -The tilemap generation (`generate_tilemap()`) is a stretch goal - the manual tile setup from the current demos works fine for now. diff --git a/tests/vllm_demo/2025-12-14_HOUR-3-4-PLAN.md b/tests/vllm_demo/2025-12-14_HOUR-3-4-PLAN.md deleted file mode 100644 index 0b5e988..0000000 --- a/tests/vllm_demo/2025-12-14_HOUR-3-4-PLAN.md +++ /dev/null @@ -1,906 +0,0 @@ -# Hours 3-4: Integration and Multi-Turn Demo - -**Issues**: #154, #155, #156 (integration) -**Goal**: Complete turn-based simulation with proper context and logging -**Dependencies**: Hour 1 (Action Parser/Executor), Hour 2 (WorldGraph) - ---- - -## Hour 3: Integration - -### Goal -Wire WorldGraph into the demo so agents receive proper IF-style descriptions. - -### Deliverables - -1. `2_integrated_demo.py` - New demo combining WorldGraph + Action execution -2. Enhanced `ActionExecutor` with room-aware movement - ---- - -### File: `2_integrated_demo.py` - -```python -#!/usr/bin/env python3 -""" -Integrated VLLM Demo -==================== - -Combines: -- WorldGraph for structured room descriptions (#155) -- Action parsing and execution (#156) -- Per-agent perspective rendering - -This is the foundation for multi-turn simulation. -""" - -import mcrfpy -from mcrfpy import automation -import sys -import os -import requests -import base64 - -from world_graph import WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, create_two_room_scenario -from action_parser import parse_action, ActionType -from action_executor import ActionExecutor - -# Configuration -VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" -SCREENSHOT_DIR = "/tmp/vllm_integrated" - -# Sprite constants -FLOOR_TILE = 0 -WALL_TILE = 40 -WIZARD_SPRITE = 84 -KNIGHT_SPRITE = 96 - - -class Agent: - """Agent wrapper with WorldGraph integration.""" - - def __init__(self, name: str, display_name: str, entity, world: WorldGraph): - self.name = name - self.display_name = display_name - self.entity = entity - self.world = world - self.message_history = [] # For speech system - - @property - def pos(self) -> tuple: - return (int(self.entity.pos[0]), int(self.entity.pos[1])) - - @property - def current_room(self) -> str: - room = self.world.room_at(*self.pos) - return room.name if room else None - - def get_context(self, visible_agents: list) -> dict: - """Build complete context for LLM query.""" - room_name = self.current_room - - # Convert to AgentInfo for WorldGraph - agent_infos = [ - AgentInfo(a.name, a.display_name, a.pos, is_player=(a.name == self.name)) - for a in visible_agents - ] - - return { - "location": self.world.describe_room( - room_name, - visible_agents=agent_infos, - observer_name=self.name - ), - "available_actions": self.world.get_available_actions(room_name), - "recent_messages": self.message_history[-5:], - } - - -def file_to_base64(file_path): - with open(file_path, 'rb') as f: - return base64.b64encode(f.read()).decode('utf-8') - - -def llm_chat_completion(messages: list): - try: - response = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) - return response.json() - except requests.exceptions.RequestException as e: - return {"error": str(e)} - - -def message_with_image(text, image_path): - image_data = file_to_base64(image_path) - return { - "role": "user", - "content": [ - {"type": "text", "text": text}, - {"type": "image_url", "image_url": {"url": "data:image/png;base64," + image_data}} - ] - } - - -def setup_scene(world: WorldGraph): - """Create scene from WorldGraph.""" - mcrfpy.createScene("integrated_demo") - mcrfpy.setScene("integrated_demo") - ui = mcrfpy.sceneUI("integrated_demo") - - texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) - - # Create grid sized for the world - grid = mcrfpy.Grid( - grid_size=(25, 15), - texture=texture, - pos=(5, 5), - size=(1014, 700) - ) - grid.fill_color = mcrfpy.Color(20, 20, 30) - grid.zoom = 2.0 - ui.append(grid) - - # Initialize all as walls - for x in range(25): - for y in range(15): - point = grid.at(x, y) - point.tilesprite = WALL_TILE - point.walkable = False - point.transparent = False - - # Carve out rooms from WorldGraph - for room in world.rooms.values(): - for rx in range(room.x, room.x + room.width): - for ry in range(room.y, room.y + room.height): - if 0 <= rx < 25 and 0 <= ry < 15: - point = grid.at(rx, ry) - point.tilesprite = FLOOR_TILE - point.walkable = True - point.transparent = True - - # Place doors - for door in world.doors: - dx, dy = door.position - if 0 <= dx < 25 and 0 <= dy < 15: - point = grid.at(dx, dy) - point.tilesprite = FLOOR_TILE - point.walkable = not door.locked - point.transparent = True - - # Create FOV layer - fov_layer = grid.add_layer('color', z_index=10) - fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - - return grid, fov_layer - - -def create_agents(grid, world: WorldGraph, texture) -> list: - """Create agent entities in their starting rooms.""" - agents = [] - - # Agent A: Wizard in guard_room - guard_room = world.rooms["guard_room"] - wizard_entity = mcrfpy.Entity( - grid_pos=guard_room.center, - texture=texture, - sprite_index=WIZARD_SPRITE - ) - grid.entities.append(wizard_entity) - agents.append(Agent("Wizard", "a wizard", wizard_entity, world)) - - # Agent B: Knight in armory - armory = world.rooms["armory"] - knight_entity = mcrfpy.Entity( - grid_pos=armory.center, - texture=texture, - sprite_index=KNIGHT_SPRITE - ) - grid.entities.append(knight_entity) - agents.append(Agent("Knight", "a knight", knight_entity, world)) - - return agents - - -def switch_perspective(grid, fov_layer, agent): - """Switch view to agent's perspective.""" - fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - fov_layer.apply_perspective( - entity=agent.entity, - visible=mcrfpy.Color(0, 0, 0, 0), - discovered=mcrfpy.Color(40, 40, 60, 180), - unknown=mcrfpy.Color(0, 0, 0, 255) - ) - agent.entity.update_visibility() - - px, py = agent.pos - grid.center = (px * 16 + 8, py * 16 + 8) - - -def get_visible_agents(grid, observer, all_agents) -> list: - """Get agents visible to the observer.""" - visible = [] - for agent in all_agents: - if agent.name == observer.name: - continue - ax, ay = agent.pos - if grid.is_in_fov(ax, ay): - visible.append(agent) - return visible - - -def query_agent_llm(agent, screenshot_path, context) -> str: - """Query VLLM for agent's action.""" - - system_prompt = f"""You are {agent.display_name} in a roguelike dungeon game. -You see the world through screenshots and receive text descriptions. -Your goal is to explore and interact with your environment. -Always end your response with a clear action declaration: "Action: " -""" - - # Build the user prompt with WorldGraph context - actions_str = ", ".join(context["available_actions"]) - - user_prompt = f"""{context["location"]} - -Available actions: {actions_str} - -Look at the screenshot showing your current view. The dark areas are outside your field of vision. - -What would you like to do? State your reasoning briefly (1-2 sentences), then declare your action. -Example: "I see a key on the ground that might be useful. Action: TAKE brass_key" -""" - - messages = [ - {"role": "system", "content": system_prompt}, - message_with_image(user_prompt, screenshot_path) - ] - - resp = llm_chat_completion(messages) - - if "error" in resp: - return f"[VLLM Error: {resp['error']}]" - return resp.get('choices', [{}])[0].get('message', {}).get('content', 'No response') - - -def run_single_turn(grid, fov_layer, agents, executor, turn_num): - """Execute one turn for all agents.""" - print(f"\n{'='*70}") - print(f"TURN {turn_num}") - print("=" * 70) - - results = [] - - for agent in agents: - print(f"\n--- {agent.name}'s Turn ---") - print(f"Position: {agent.pos} | Room: {agent.current_room}") - - # Switch perspective - switch_perspective(grid, fov_layer, agent) - mcrfpy.step(0.016) - - # Screenshot - screenshot_path = os.path.join(SCREENSHOT_DIR, f"turn{turn_num}_{agent.name.lower()}.png") - automation.screenshot(screenshot_path) - - # Get context using WorldGraph - visible = get_visible_agents(grid, agent, agents) - context = agent.get_context(visible + [agent]) # Include self for filtering - - print(f"Context: {context['location']}") - print(f"Actions: {context['available_actions']}") - - # Query LLM - print(f"\nQuerying VLLM...") - response = query_agent_llm(agent, screenshot_path, context) - print(f"Response: {response[:200]}...") - - # Parse and execute - action = parse_action(response) - print(f"Parsed: {action.type.value} {action.args}") - - result = executor.execute(agent, action) - print(f"Result: {'SUCCESS' if result.success else 'FAILED'} - {result.message}") - - results.append({ - "agent": agent.name, - "context": context, - "response": response, - "action": action, - "result": result - }) - - return results - - -def run_demo(): - """Main demo: single integrated turn.""" - print("=" * 70) - print("Integrated WorldGraph + Action Demo") - print("=" * 70) - - os.makedirs(SCREENSHOT_DIR, exist_ok=True) - - # Create world from WorldGraph - world = create_two_room_scenario() - - # Setup scene - grid, fov_layer = setup_scene(world) - - # Create agents - texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) - agents = create_agents(grid, world, texture) - - # Create executor - executor = ActionExecutor(grid) - - # Run one turn - results = run_single_turn(grid, fov_layer, agents, executor, turn_num=1) - - print("\n" + "=" * 70) - print("Demo Complete") - print("=" * 70) - - return True - - -if __name__ == "__main__": - try: - success = run_demo() - sys.exit(0 if success else 1) - except Exception as e: - import traceback - traceback.print_exc() - sys.exit(1) -``` - ---- - -## Hour 4: Multi-Turn Demo - -### Goal -Run multiple turns with simulation logging for replay. - -### Deliverables - -1. `turn_orchestrator.py` - Turn management and logging -2. `3_multi_turn_demo.py` - Complete multi-turn simulation -3. `simulation_log.json` - Saved output for replay - ---- - -### File: `turn_orchestrator.py` - -```python -""" -Turn Orchestrator -================= - -Manages multi-turn simulation with logging for replay. -""" - -import json -import os -from dataclasses import dataclass, asdict -from typing import List, Dict, Any, Optional -from datetime import datetime - -from world_graph import WorldGraph -from action_parser import Action, ActionType, parse_action -from action_executor import ActionExecutor, ActionResult - - -@dataclass -class SimulationStep: - """Record of one agent's turn.""" - turn: int - agent_id: str - agent_position: tuple - room: str - perception: Dict[str, Any] # Context shown to LLM - llm_response: str # Raw LLM output - parsed_action_type: str # Action type as string - parsed_action_args: tuple # Action arguments - result_success: bool - result_message: str - new_position: Optional[tuple] = None - path: Optional[List[tuple]] = None # For animation - timestamp: str = "" - - def __post_init__(self): - if not self.timestamp: - self.timestamp = datetime.now().isoformat() - - -@dataclass -class SimulationLog: - """Complete simulation record.""" - metadata: Dict[str, Any] - steps: List[SimulationStep] - - def save(self, path: str): - """Save log to JSON file.""" - data = { - "metadata": self.metadata, - "steps": [asdict(s) for s in self.steps] - } - with open(path, 'w') as f: - json.dump(data, f, indent=2, default=str) - - @classmethod - def load(cls, path: str) -> 'SimulationLog': - """Load log from JSON file.""" - with open(path) as f: - data = json.load(f) - steps = [SimulationStep(**s) for s in data["steps"]] - return cls(metadata=data["metadata"], steps=steps) - - -class TurnOrchestrator: - """ - Orchestrates multi-turn simulation. - - Handles: - - Turn sequencing - - Perspective switching - - LLM queries - - Action execution - - Simulation logging - """ - - def __init__(self, grid, fov_layer, world: WorldGraph, agents: list, - screenshot_dir: str, llm_query_fn): - self.grid = grid - self.fov_layer = fov_layer - self.world = world - self.agents = agents - self.screenshot_dir = screenshot_dir - self.llm_query_fn = llm_query_fn # Function to query LLM - - self.executor = ActionExecutor(grid) - self.turn_number = 0 - self.steps: List[SimulationStep] = [] - - os.makedirs(screenshot_dir, exist_ok=True) - - def run_turn(self) -> List[SimulationStep]: - """Execute one full turn (all agents act once).""" - self.turn_number += 1 - turn_steps = [] - - for agent in self.agents: - step = self._run_agent_turn(agent) - turn_steps.append(step) - self.steps.append(step) - - return turn_steps - - def run_simulation(self, max_turns: int = 10, - stop_condition=None) -> SimulationLog: - """ - Run complete simulation. - - Args: - max_turns: Maximum number of turns to run - stop_condition: Optional callable(orchestrator) -> bool - - Returns: - SimulationLog with all steps - """ - print(f"\nStarting simulation: max {max_turns} turns") - print("=" * 50) - - for turn in range(max_turns): - print(f"\n--- Turn {turn + 1}/{max_turns} ---") - - self.run_turn() - - # Check stop condition - if stop_condition and stop_condition(self): - print(f"Stop condition met at turn {turn + 1}") - break - - # Create log - log = SimulationLog( - metadata={ - "total_turns": self.turn_number, - "num_agents": len(self.agents), - "agent_names": [a.name for a in self.agents], - "timestamp": datetime.now().isoformat(), - "world_rooms": list(self.world.rooms.keys()), - }, - steps=self.steps - ) - - return log - - def _run_agent_turn(self, agent) -> SimulationStep: - """Execute one agent's turn.""" - from mcrfpy import automation - import mcrfpy - - # Switch perspective - self._switch_perspective(agent) - mcrfpy.step(0.016) - - # Screenshot - screenshot_path = os.path.join( - self.screenshot_dir, - f"turn{self.turn_number}_{agent.name.lower()}.png" - ) - automation.screenshot(screenshot_path) - - # Build context - visible_agents = self._get_visible_agents(agent) - context = agent.get_context(visible_agents + [agent]) - - # Query LLM - llm_response = self.llm_query_fn(agent, screenshot_path, context) - - # Parse and execute - action = parse_action(llm_response) - result = self.executor.execute(agent, action) - - # Log - print(f" {agent.name}: {action.type.value} -> {result.message}") - - return SimulationStep( - turn=self.turn_number, - agent_id=agent.name, - agent_position=agent.pos, - room=agent.current_room, - perception=context, - llm_response=llm_response, - parsed_action_type=action.type.value, - parsed_action_args=action.args, - result_success=result.success, - result_message=result.message, - new_position=result.new_position, - path=result.path - ) - - def _switch_perspective(self, agent): - """Switch grid view to agent's perspective.""" - import mcrfpy - - self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - self.fov_layer.apply_perspective( - entity=agent.entity, - visible=mcrfpy.Color(0, 0, 0, 0), - discovered=mcrfpy.Color(40, 40, 60, 180), - unknown=mcrfpy.Color(0, 0, 0, 255) - ) - agent.entity.update_visibility() - - px, py = agent.pos - self.grid.center = (px * 16 + 8, py * 16 + 8) - - def _get_visible_agents(self, observer) -> list: - """Get agents visible to observer.""" - visible = [] - for agent in self.agents: - if agent.name == observer.name: - continue - ax, ay = agent.pos - if self.grid.is_in_fov(ax, ay): - visible.append(agent) - return visible -``` - ---- - -### File: `3_multi_turn_demo.py` - -```python -#!/usr/bin/env python3 -""" -Multi-Turn Simulation Demo -========================== - -Runs multiple turns of agent interaction with full logging. -This is the Phase 1 implementation from issue #154. -""" - -import mcrfpy -from mcrfpy import automation -import sys -import os -import requests -import base64 - -from world_graph import create_two_room_scenario, AgentInfo -from action_parser import parse_action -from action_executor import ActionExecutor -from turn_orchestrator import TurnOrchestrator, SimulationLog - -# Configuration -VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" -SCREENSHOT_DIR = "/tmp/vllm_multi_turn" -LOG_PATH = "/tmp/vllm_multi_turn/simulation_log.json" -MAX_TURNS = 5 - -# Sprites -FLOOR_TILE = 0 -WALL_TILE = 40 -WIZARD_SPRITE = 84 -KNIGHT_SPRITE = 96 - - -class Agent: - """Agent with WorldGraph integration.""" - def __init__(self, name, display_name, entity, world): - self.name = name - self.display_name = display_name - self.entity = entity - self.world = world - self.message_history = [] - - @property - def pos(self): - return (int(self.entity.pos[0]), int(self.entity.pos[1])) - - @property - def current_room(self): - room = self.world.room_at(*self.pos) - return room.name if room else None - - def get_context(self, visible_agents): - room_name = self.current_room - agent_infos = [ - AgentInfo(a.name, a.display_name, a.pos, is_player=(a.name == self.name)) - for a in visible_agents - ] - return { - "location": self.world.describe_room(room_name, agent_infos, self.name), - "available_actions": self.world.get_available_actions(room_name), - "recent_messages": self.message_history[-5:], - } - - -def file_to_base64(path): - with open(path, 'rb') as f: - return base64.b64encode(f.read()).decode('utf-8') - - -def llm_query(agent, screenshot_path, context) -> str: - """Query VLLM for agent action.""" - system = f"""You are {agent.display_name} exploring a dungeon. -You receive visual and text information about your surroundings. -Always end with: Action: """ - - actions_str = ", ".join(context["available_actions"]) - user = f"""{context["location"]} - -Available: {actions_str} - -[Screenshot attached showing your view] - -What do you do? Brief reasoning, then Action: """ - - messages = [ - {"role": "system", "content": system}, - { - "role": "user", - "content": [ - {"type": "text", "text": user}, - {"type": "image_url", "image_url": { - "url": "data:image/png;base64," + file_to_base64(screenshot_path) - }} - ] - } - ] - - try: - resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) - data = resp.json() - if "error" in data: - return f"[Error: {data['error']}]" - return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response') - except Exception as e: - return f"[Error: {e}]" - - -def setup_scene(world): - """Create scene from WorldGraph.""" - mcrfpy.createScene("multi_turn") - mcrfpy.setScene("multi_turn") - ui = mcrfpy.sceneUI("multi_turn") - - texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) - - grid = mcrfpy.Grid( - grid_size=(25, 15), - texture=texture, - pos=(5, 5), - size=(1014, 700) - ) - grid.fill_color = mcrfpy.Color(20, 20, 30) - grid.zoom = 2.0 - ui.append(grid) - - # Walls everywhere first - for x in range(25): - for y in range(15): - p = grid.at(x, y) - p.tilesprite = WALL_TILE - p.walkable = False - p.transparent = False - - # Carve rooms - for room in world.rooms.values(): - for rx in range(room.x, room.x + room.width): - for ry in range(room.y, room.y + room.height): - if 0 <= rx < 25 and 0 <= ry < 15: - p = grid.at(rx, ry) - p.tilesprite = FLOOR_TILE - p.walkable = True - p.transparent = True - - # Place doors - for door in world.doors: - dx, dy = door.position - if 0 <= dx < 25 and 0 <= dy < 15: - p = grid.at(dx, dy) - p.tilesprite = FLOOR_TILE - p.walkable = not door.locked - p.transparent = True - - # FOV layer - fov_layer = grid.add_layer('color', z_index=10) - fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - - return grid, fov_layer, texture - - -def create_agents(grid, world, texture): - """Create agents in starting positions.""" - agents = [] - - # Wizard in guard_room - room_a = world.rooms["guard_room"] - wizard = mcrfpy.Entity(grid_pos=room_a.center, texture=texture, sprite_index=WIZARD_SPRITE) - grid.entities.append(wizard) - agents.append(Agent("Wizard", "a wizard", wizard, world)) - - # Knight in armory - room_b = world.rooms["armory"] - knight = mcrfpy.Entity(grid_pos=room_b.center, texture=texture, sprite_index=KNIGHT_SPRITE) - grid.entities.append(knight) - agents.append(Agent("Knight", "a knight", knight, world)) - - return agents - - -def run_demo(): - """Run multi-turn simulation.""" - print("=" * 70) - print("Multi-Turn Simulation Demo") - print(f"Running {MAX_TURNS} turns with 2 agents") - print("=" * 70) - - os.makedirs(SCREENSHOT_DIR, exist_ok=True) - - # Setup - world = create_two_room_scenario() - grid, fov_layer, texture = setup_scene(world) - agents = create_agents(grid, world, texture) - - # Create orchestrator - orchestrator = TurnOrchestrator( - grid=grid, - fov_layer=fov_layer, - world=world, - agents=agents, - screenshot_dir=SCREENSHOT_DIR, - llm_query_fn=llm_query - ) - - # Run simulation - log = orchestrator.run_simulation(max_turns=MAX_TURNS) - - # Save log - log.save(LOG_PATH) - print(f"\nSimulation log saved to: {LOG_PATH}") - - # Summary - print("\n" + "=" * 70) - print("SIMULATION SUMMARY") - print("=" * 70) - print(f"Total turns: {log.metadata['total_turns']}") - print(f"Total steps: {len(log.steps)}") - - # Per-agent summary - for agent_name in log.metadata['agent_names']: - agent_steps = [s for s in log.steps if s.agent_id == agent_name] - successes = sum(1 for s in agent_steps if s.result_success) - print(f"\n{agent_name}:") - print(f" Actions: {len(agent_steps)}") - print(f" Successful: {successes}") - print(f" Final position: {agent_steps[-1].new_position or agent_steps[-1].agent_position}") - - return True - - -if __name__ == "__main__": - try: - success = run_demo() - print("\nPASS" if success else "\nFAIL") - sys.exit(0 if success else 1) - except Exception as e: - import traceback - traceback.print_exc() - sys.exit(1) -``` - ---- - -## Success Criteria - -### Hour 3 Integration -- [ ] WorldGraph generates scene tiles correctly -- [ ] Agents receive IF-style room descriptions from WorldGraph -- [ ] Available actions list appears in LLM prompt -- [ ] Actions are parsed and executed -- [ ] Single turn completes successfully - -### Hour 4 Multi-Turn -- [ ] TurnOrchestrator cycles through all agents -- [ ] Multiple turns run sequentially -- [ ] SimulationLog captures all steps -- [ ] Log saves to JSON correctly -- [ ] Log can be loaded back -- [ ] Summary shows agent actions and positions - ---- - -## Example Output - -``` -====================================================================== -Multi-Turn Simulation Demo -Running 5 turns with 2 agents -====================================================================== - -Starting simulation: max 5 turns -================================================== - ---- Turn 1/5 --- - Wizard: GO EAST -> Moved east to (6, 4) - Knight: WAIT -> Waited and observed surroundings - ---- Turn 2/5 --- - Wizard: GO EAST -> Moved east to (7, 4) - Knight: GO WEST -> Moved west to (14, 4) - -[... more turns ...] - -====================================================================== -SIMULATION SUMMARY -====================================================================== -Total turns: 5 -Total steps: 10 - -Wizard: - Actions: 5 - Successful: 4 - Final position: (9, 4) - -Knight: - Actions: 5 - Successful: 3 - Final position: (11, 4) - -Simulation log saved to: /tmp/vllm_multi_turn/simulation_log.json - -PASS -``` - ---- - -## Next Steps (Future Sessions) - -After Hours 3-4 are complete: - -1. **Speech System** - Add ANNOUNCE/SPEAK actions with message passing -2. **Button-Door Puzzle** - Use `create_button_door_scenario()` for coordination test -3. **Animated Replay** - Play back simulation with movement animations -4. **NPC Behaviors** - Add scripted entities (patrol, flee, etc.) -5. **Affordance Learning** - Track what agents discover about objects diff --git a/tests/vllm_demo/2_integrated_demo.py b/tests/vllm_demo/2_integrated_demo.py deleted file mode 100644 index f499079..0000000 --- a/tests/vllm_demo/2_integrated_demo.py +++ /dev/null @@ -1,399 +0,0 @@ -#!/usr/bin/env python3 -""" -Integrated VLLM Demo -==================== - -Combines: -- WorldGraph for structured room descriptions (#155) -- Action parsing and execution (#156) -- Per-agent perspective rendering - -This is the foundation for multi-turn simulation. -""" - -import sys -import os -# Add the vllm_demo directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -import mcrfpy -from mcrfpy import automation -import requests -import base64 - -from world_graph import ( - WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, - create_two_room_scenario -) -from action_parser import parse_action, ActionType -from action_executor import ActionExecutor - -# Configuration -VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" -SCREENSHOT_DIR = "/tmp/vllm_integrated" - -# Sprite constants -FLOOR_TILE = 0 -WALL_TILE = 40 -WIZARD_SPRITE = 84 -KNIGHT_SPRITE = 96 - - -class Agent: - """Agent wrapper with WorldGraph integration.""" - - def __init__(self, name: str, display_name: str, entity, world: WorldGraph): - self.name = name - self.display_name = display_name - self.entity = entity - self.world = world - self.message_history = [] # For speech system (future) - - @property - def pos(self) -> tuple: - return (int(self.entity.pos[0]), int(self.entity.pos[1])) - - @property - def current_room(self) -> str: - """Get the name of the room this agent is in.""" - room = self.world.room_at(*self.pos) - return room.name if room else None - - def get_context(self, visible_agents: list) -> dict: - """ - Build complete context for LLM query. - - Args: - visible_agents: List of Agent objects visible to this agent - - Returns: - Dict with location description, available actions, messages - """ - room_name = self.current_room - - # Convert Agent objects to AgentInfo for WorldGraph - agent_infos = [ - AgentInfo( - name=a.name, - display_name=a.display_name, - position=a.pos, - is_player=(a.name == self.name) - ) - for a in visible_agents - ] - - return { - "location": self.world.describe_room( - room_name, - visible_agents=agent_infos, - observer_name=self.name - ), - "available_actions": self.world.get_available_actions(room_name), - "recent_messages": self.message_history[-5:], - } - - -def file_to_base64(file_path): - """Convert image file to base64 string.""" - with open(file_path, 'rb') as f: - return base64.b64encode(f.read()).decode('utf-8') - - -def llm_chat_completion(messages: list): - """Send chat completion request to local LLM.""" - try: - response = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) - return response.json() - except requests.exceptions.RequestException as e: - return {"error": str(e)} - - -def message_with_image(text, image_path): - """Create a message with embedded image for vision models.""" - image_data = file_to_base64(image_path) - return { - "role": "user", - "content": [ - {"type": "text", "text": text}, - {"type": "image_url", "image_url": {"url": "data:image/png;base64," + image_data}} - ] - } - - -def setup_scene_from_world(world: WorldGraph): - """ - Create McRogueFace scene from WorldGraph. - - Carves out rooms and places doors based on WorldGraph data. - """ - mcrfpy.createScene("integrated_demo") - mcrfpy.setScene("integrated_demo") - ui = mcrfpy.sceneUI("integrated_demo") - - texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) - - # Create grid sized for the world (with margin) - grid = mcrfpy.Grid( - grid_size=(25, 15), - texture=texture, - pos=(5, 5), - size=(1014, 700) - ) - grid.fill_color = mcrfpy.Color(20, 20, 30) - grid.zoom = 2.0 - ui.append(grid) - - # Initialize all tiles as walls - for x in range(25): - for y in range(15): - point = grid.at(x, y) - point.tilesprite = WALL_TILE - point.walkable = False - point.transparent = False - - # Carve out rooms from WorldGraph - for room in world.rooms.values(): - for rx in range(room.x, room.x + room.width): - for ry in range(room.y, room.y + room.height): - if 0 <= rx < 25 and 0 <= ry < 15: - point = grid.at(rx, ry) - point.tilesprite = FLOOR_TILE - point.walkable = True - point.transparent = True - - # Place doors (carve corridor between rooms) - for door in world.doors: - dx, dy = door.position - if 0 <= dx < 25 and 0 <= dy < 15: - point = grid.at(dx, dy) - point.tilesprite = FLOOR_TILE - point.walkable = not door.locked - point.transparent = True - - # Create FOV layer for fog of war - fov_layer = grid.add_layer('color', z_index=10) - fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - - return grid, fov_layer, texture - - -def create_agents(grid, world: WorldGraph, texture) -> list: - """Create agent entities in their starting rooms.""" - agents = [] - - # Agent A: Wizard in guard_room - guard_room = world.rooms["guard_room"] - wizard_entity = mcrfpy.Entity( - grid_pos=guard_room.center, - texture=texture, - sprite_index=WIZARD_SPRITE - ) - grid.entities.append(wizard_entity) - agents.append(Agent("Wizard", "a wizard", wizard_entity, world)) - - # Agent B: Knight in armory - armory = world.rooms["armory"] - knight_entity = mcrfpy.Entity( - grid_pos=armory.center, - texture=texture, - sprite_index=KNIGHT_SPRITE - ) - grid.entities.append(knight_entity) - agents.append(Agent("Knight", "a knight", knight_entity, world)) - - return agents - - -def switch_perspective(grid, fov_layer, agent): - """Switch grid view to an agent's perspective.""" - # Reset fog layer to all unknown (black) - fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - - # Apply this agent's perspective - fov_layer.apply_perspective( - entity=agent.entity, - visible=mcrfpy.Color(0, 0, 0, 0), - discovered=mcrfpy.Color(40, 40, 60, 180), - unknown=mcrfpy.Color(0, 0, 0, 255) - ) - - # Update visibility from agent's position - agent.entity.update_visibility() - - # Center camera on this agent - px, py = agent.pos - grid.center = (px * 16 + 8, py * 16 + 8) - - -def get_visible_agents(grid, observer, all_agents) -> list: - """Get agents visible to the observer based on FOV.""" - visible = [] - for agent in all_agents: - if agent.name == observer.name: - continue - ax, ay = agent.pos - if grid.is_in_fov(ax, ay): - visible.append(agent) - return visible - - -def query_agent_llm(agent, screenshot_path, context) -> str: - """ - Query VLLM for agent's action using WorldGraph context. - - This uses the structured context from WorldGraph instead of - ad-hoc grounded prompts. - """ - system_prompt = f"""You are {agent.display_name} in a roguelike dungeon game. -You see the world through screenshots and receive text descriptions. -Your goal is to explore and interact with your environment. -Always end your response with a clear action declaration: "Action: " -""" - - # Build the user prompt with WorldGraph context - actions_str = ", ".join(context["available_actions"]) - - user_prompt = f"""{context["location"]} - -Available actions: {actions_str} - -Look at the screenshot showing your current view. The dark areas are outside your field of vision. - -What would you like to do? State your reasoning briefly (1-2 sentences), then declare your action. -Example: "I see a key on the ground that might be useful. Action: TAKE brass_key" -""" - - messages = [ - {"role": "system", "content": system_prompt}, - message_with_image(user_prompt, screenshot_path) - ] - - resp = llm_chat_completion(messages) - - if "error" in resp: - return f"[VLLM Error: {resp['error']}]" - return resp.get('choices', [{}])[0].get('message', {}).get('content', 'No response') - - -def run_single_turn(grid, fov_layer, agents, executor, turn_num): - """ - Execute one turn for all agents. - - Each agent: - 1. Gets their perspective rendered - 2. Receives WorldGraph context - 3. Queries LLM for action - 4. Executes the action - """ - print(f"\n{'='*70}") - print(f"TURN {turn_num}") - print("=" * 70) - - results = [] - - for agent in agents: - print(f"\n--- {agent.name}'s Turn ---") - print(f"Position: {agent.pos} | Room: {agent.current_room}") - - # Switch perspective to this agent - switch_perspective(grid, fov_layer, agent) - mcrfpy.step(0.016) - - # Take screenshot - screenshot_path = os.path.join( - SCREENSHOT_DIR, - f"turn{turn_num}_{agent.name.lower()}.png" - ) - automation.screenshot(screenshot_path) - print(f"Screenshot: {screenshot_path}") - - # Get context using WorldGraph - visible = get_visible_agents(grid, agent, agents) - context = agent.get_context(visible + [agent]) # Include self for filtering - - print(f"\nContext from WorldGraph:") - print(f" Location: {context['location']}") - print(f" Actions: {context['available_actions']}") - - # Query LLM - print(f"\nQuerying VLLM...") - response = query_agent_llm(agent, screenshot_path, context) - print(f"Response: {response[:300]}{'...' if len(response) > 300 else ''}") - - # Parse and execute action - action = parse_action(response) - print(f"\nParsed: {action.type.value} {action.args}") - - result = executor.execute(agent, action) - status = "SUCCESS" if result.success else "FAILED" - print(f"Result: {status} - {result.message}") - - results.append({ - "agent": agent.name, - "room": agent.current_room, - "context": context, - "response": response, - "action": action, - "result": result - }) - - return results - - -def run_demo(): - """Main demo: single integrated turn with WorldGraph context.""" - print("=" * 70) - print("Integrated WorldGraph + Action Demo") - print("=" * 70) - - os.makedirs(SCREENSHOT_DIR, exist_ok=True) - - # Create world from WorldGraph factory - print("\nCreating world from WorldGraph...") - world = create_two_room_scenario() - print(f" Rooms: {list(world.rooms.keys())}") - print(f" Doors: {len(world.doors)}") - print(f" Objects: {list(world.objects.keys())}") - - # Setup scene from WorldGraph - print("\nSetting up scene...") - grid, fov_layer, texture = setup_scene_from_world(world) - - # Create agents - print("\nCreating agents...") - agents = create_agents(grid, world, texture) - for agent in agents: - print(f" {agent.name} at {agent.pos} in {agent.current_room}") - - # Create executor - executor = ActionExecutor(grid) - - # Run one turn - results = run_single_turn(grid, fov_layer, agents, executor, turn_num=1) - - # Summary - print("\n" + "=" * 70) - print("TURN SUMMARY") - print("=" * 70) - for r in results: - status = "OK" if r["result"].success else "FAIL" - print(f" {r['agent']}: {r['action'].type.value} -> {status}") - if r["result"].new_position: - print(f" New position: {r['result'].new_position}") - - print("\n" + "=" * 70) - print("Demo Complete") - print("=" * 70) - - return True - - -if __name__ == "__main__": - try: - success = run_demo() - print("\nPASS" if success else "\nFAIL") - sys.exit(0 if success else 1) - except Exception as e: - import traceback - traceback.print_exc() - sys.exit(1) diff --git a/tests/vllm_demo/3_multi_turn_demo.py b/tests/vllm_demo/3_multi_turn_demo.py deleted file mode 100644 index 3e830c3..0000000 --- a/tests/vllm_demo/3_multi_turn_demo.py +++ /dev/null @@ -1,318 +0,0 @@ -#!/usr/bin/env python3 -""" -Multi-Turn Simulation Demo -========================== - -Runs multiple turns of agent interaction with full logging. -This is the Phase 1 implementation from issue #154. - -Two agents start in separate rooms and can move, observe, -and (in future versions) communicate to solve puzzles. -""" - -import sys -import os -# Add the vllm_demo directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -import mcrfpy -from mcrfpy import automation -import requests -import base64 - -from world_graph import ( - WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, - create_two_room_scenario, create_button_door_scenario -) -from action_parser import parse_action -from action_executor import ActionExecutor -from turn_orchestrator import TurnOrchestrator, SimulationLog - -# Configuration -VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" -SCREENSHOT_DIR = "/tmp/vllm_multi_turn" -LOG_PATH = "/tmp/vllm_multi_turn/simulation_log.json" -MAX_TURNS = 5 - -# Sprites -FLOOR_TILE = 0 -WALL_TILE = 40 -WIZARD_SPRITE = 84 -KNIGHT_SPRITE = 96 - - -class Agent: - """Agent with WorldGraph integration.""" - - def __init__(self, name: str, display_name: str, entity, world: WorldGraph): - self.name = name - self.display_name = display_name - self.entity = entity - self.world = world - self.message_history = [] - - @property - def pos(self) -> tuple: - return (int(self.entity.pos[0]), int(self.entity.pos[1])) - - @property - def current_room(self) -> str: - room = self.world.room_at(*self.pos) - return room.name if room else None - - def get_context(self, visible_agents: list) -> dict: - """Build context for LLM query.""" - room_name = self.current_room - agent_infos = [ - AgentInfo( - name=a.name, - display_name=a.display_name, - position=a.pos, - is_player=(a.name == self.name) - ) - for a in visible_agents - ] - return { - "location": self.world.describe_room(room_name, agent_infos, self.name), - "available_actions": self.world.get_available_actions(room_name), - "recent_messages": self.message_history[-5:], - } - - -def file_to_base64(path: str) -> str: - """Convert file to base64 string.""" - with open(path, 'rb') as f: - return base64.b64encode(f.read()).decode('utf-8') - - -def llm_query(agent, screenshot_path: str, context: dict) -> str: - """ - Query VLLM for agent action. - - This function is passed to TurnOrchestrator as the LLM query callback. - """ - system_prompt = f"""You are {agent.display_name} exploring a dungeon. -You receive visual and text information about your surroundings. -Your goal is to explore, find items, and interact with the environment. -Always end your response with: Action: """ - - actions_str = ", ".join(context["available_actions"]) - - user_prompt = f"""{context["location"]} - -Available actions: {actions_str} - -[Screenshot attached showing your current view - dark areas are outside your vision] - -What do you do? Brief reasoning (1-2 sentences), then Action: """ - - messages = [ - {"role": "system", "content": system_prompt}, - { - "role": "user", - "content": [ - {"type": "text", "text": user_prompt}, - {"type": "image_url", "image_url": { - "url": "data:image/png;base64," + file_to_base64(screenshot_path) - }} - ] - } - ] - - try: - resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) - data = resp.json() - if "error" in data: - return f"[VLLM Error: {data['error']}]" - return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response') - except Exception as e: - return f"[Connection Error: {e}]" - - -def setup_scene(world: WorldGraph): - """Create McRogueFace scene from WorldGraph.""" - mcrfpy.createScene("multi_turn") - mcrfpy.setScene("multi_turn") - ui = mcrfpy.sceneUI("multi_turn") - - texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) - - grid = mcrfpy.Grid( - grid_size=(25, 15), - texture=texture, - pos=(5, 5), - size=(1014, 700) - ) - grid.fill_color = mcrfpy.Color(20, 20, 30) - grid.zoom = 2.0 - ui.append(grid) - - # Initialize all as walls - for x in range(25): - for y in range(15): - p = grid.at(x, y) - p.tilesprite = WALL_TILE - p.walkable = False - p.transparent = False - - # Carve rooms from WorldGraph - for room in world.rooms.values(): - for rx in range(room.x, room.x + room.width): - for ry in range(room.y, room.y + room.height): - if 0 <= rx < 25 and 0 <= ry < 15: - p = grid.at(rx, ry) - p.tilesprite = FLOOR_TILE - p.walkable = True - p.transparent = True - - # Place doors - for door in world.doors: - dx, dy = door.position - if 0 <= dx < 25 and 0 <= dy < 15: - p = grid.at(dx, dy) - p.tilesprite = FLOOR_TILE - p.walkable = not door.locked - p.transparent = True - - # FOV layer - fov_layer = grid.add_layer('color', z_index=10) - fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - - return grid, fov_layer, texture - - -def create_agents(grid, world: WorldGraph, texture) -> list: - """Create agents in their starting rooms.""" - agents = [] - - # Wizard in guard_room (left) - room_a = world.rooms["guard_room"] - wizard = mcrfpy.Entity( - grid_pos=room_a.center, - texture=texture, - sprite_index=WIZARD_SPRITE - ) - grid.entities.append(wizard) - agents.append(Agent("Wizard", "a wizard", wizard, world)) - - # Knight in armory (right) - room_b = world.rooms["armory"] - knight = mcrfpy.Entity( - grid_pos=room_b.center, - texture=texture, - sprite_index=KNIGHT_SPRITE - ) - grid.entities.append(knight) - agents.append(Agent("Knight", "a knight", knight, world)) - - return agents - - -def run_demo(): - """Run multi-turn simulation.""" - print("=" * 70) - print("Multi-Turn Simulation Demo") - print(f"Running up to {MAX_TURNS} turns with 2 agents") - print("=" * 70) - - os.makedirs(SCREENSHOT_DIR, exist_ok=True) - - # Create world - print("\nCreating world...") - world = create_two_room_scenario() - print(f" Rooms: {list(world.rooms.keys())}") - print(f" Objects: {list(world.objects.keys())}") - - # Setup scene - print("\nSetting up scene...") - grid, fov_layer, texture = setup_scene(world) - - # Create agents - print("\nCreating agents...") - agents = create_agents(grid, world, texture) - for agent in agents: - print(f" {agent.name} at {agent.pos} in {agent.current_room}") - - # Create orchestrator - orchestrator = TurnOrchestrator( - grid=grid, - fov_layer=fov_layer, - world=world, - agents=agents, - screenshot_dir=SCREENSHOT_DIR, - llm_query_fn=llm_query - ) - - # Optional: Define a stop condition - def agents_met(orch): - """Stop when agents are in the same room.""" - return orch.agents_in_same_room() - - # Run simulation - log = orchestrator.run_simulation( - max_turns=MAX_TURNS, - stop_condition=None # Or use agents_met for early stopping - ) - - # Save log - log.save(LOG_PATH) - - # Print summary - print("\n" + "=" * 70) - print(log.summary()) - print("=" * 70) - - # Show final positions - print("\nFinal Agent Positions:") - for agent in agents: - print(f" {agent.name}: {agent.pos} in {agent.current_room}") - - print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/") - print(f"Simulation log saved to: {LOG_PATH}") - - return True - - -def replay_log(log_path: str): - """ - Replay a simulation from a log file. - - This is a utility function for reviewing past simulations. - """ - print(f"Loading simulation from: {log_path}") - log = SimulationLog.load(log_path) - - print("\n" + log.summary()) - - print("\nTurn-by-Turn Replay:") - print("-" * 50) - - current_turn = 0 - for step in log.steps: - if step.turn != current_turn: - current_turn = step.turn - print(f"\n=== Turn {current_turn} ===") - - status = "OK" if step.result_success else "FAIL" - print(f" {step.agent_id}: {step.parsed_action_type} {step.parsed_action_args}") - print(f" {status}: {step.result_message}") - if step.new_position: - print(f" Moved to: {step.new_position}") - - -if __name__ == "__main__": - # Check for replay mode - if len(sys.argv) > 1 and sys.argv[1] == "--replay": - log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH - replay_log(log_file) - sys.exit(0) - - # Normal execution - try: - success = run_demo() - print("\nPASS" if success else "\nFAIL") - sys.exit(0 if success else 1) - except Exception as e: - import traceback - traceback.print_exc() - sys.exit(1) diff --git a/tests/vllm_demo/action_executor.py b/tests/vllm_demo/action_executor.py deleted file mode 100644 index d95caf2..0000000 --- a/tests/vllm_demo/action_executor.py +++ /dev/null @@ -1,136 +0,0 @@ -""" -Action Executor for McRogueFace -=============================== - -Executes parsed actions in the game world. -Handles movement, collision detection, and action results. -""" - -from dataclasses import dataclass -from typing import Optional, List, Tuple -from action_parser import Action, ActionType - - -@dataclass -class ActionResult: - success: bool - message: str - new_position: Optional[Tuple[int, int]] = None - path: Optional[List[Tuple[int, int]]] = None # For animation replay - - -class ActionExecutor: - """Execute actions in the McRogueFace game world.""" - - # Direction vectors - DIRECTION_VECTORS = { - 'NORTH': (0, -1), - 'SOUTH': (0, 1), - 'EAST': (1, 0), - 'WEST': (-1, 0), - } - - def __init__(self, grid): - """ - Initialize executor with a grid reference. - - Args: - grid: mcrfpy.Grid instance - """ - self.grid = grid - - def execute(self, agent, action: Action) -> ActionResult: - """ - Execute an action for an agent. - - Args: - agent: Agent wrapper with .entity attribute - action: Parsed Action to execute - - Returns: - ActionResult with success status and message - """ - handlers = { - ActionType.GO: self._execute_go, - ActionType.WAIT: self._execute_wait, - ActionType.LOOK: self._execute_look, - ActionType.TAKE: self._execute_take, - ActionType.DROP: self._execute_drop, - ActionType.INVALID: self._execute_invalid, - } - - handler = handlers.get(action.type, self._execute_unimplemented) - return handler(agent, action) - - def _execute_go(self, agent, action: Action) -> ActionResult: - """Execute movement in a direction.""" - if not action.args or not action.args[0]: - return ActionResult(False, "No direction specified") - - direction = action.args[0] - if direction not in self.DIRECTION_VECTORS: - return ActionResult(False, f"Invalid direction: {direction}") - - dx, dy = self.DIRECTION_VECTORS[direction] - - # Get current position - current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - new_x, new_y = current_x + dx, current_y + dy - - # Check bounds - grid_w, grid_h = self.grid.grid_size - if not (0 <= new_x < grid_w and 0 <= new_y < grid_h): - return ActionResult(False, f"Cannot go {direction} - edge of map") - - # Check walkability - target_cell = self.grid.at(new_x, new_y) - if not target_cell.walkable: - return ActionResult(False, f"Cannot go {direction} - path blocked") - - # Check for entity collision (optional - depends on game rules) - for entity in self.grid.entities: - if entity is agent.entity: - continue - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if ex == new_x and ey == new_y: - return ActionResult(False, f"Cannot go {direction} - someone is there") - - # Execute movement - agent.entity.pos = (new_x, new_y) - - return ActionResult( - success=True, - message=f"Moved {direction.lower()} to ({new_x}, {new_y})", - new_position=(new_x, new_y), - path=[(current_x, current_y), (new_x, new_y)] - ) - - def _execute_wait(self, agent, action: Action) -> ActionResult: - """Execute wait action (no-op).""" - return ActionResult(True, "Waited and observed surroundings") - - def _execute_look(self, agent, action: Action) -> ActionResult: - """Execute look action - returns enhanced observation.""" - target = action.args[0] if action.args else None - if target: - return ActionResult(True, f"Examined {target} closely") - return ActionResult(True, "Looked around carefully") - - def _execute_take(self, agent, action: Action) -> ActionResult: - """Execute take action (placeholder).""" - item = action.args[0] if action.args else "unknown" - # TODO: Implement inventory system - return ActionResult(False, f"Cannot take {item} - not implemented yet") - - def _execute_drop(self, agent, action: Action) -> ActionResult: - """Execute drop action (placeholder).""" - item = action.args[0] if action.args else "unknown" - return ActionResult(False, f"Cannot drop {item} - not implemented yet") - - def _execute_invalid(self, agent, action: Action) -> ActionResult: - """Handle invalid/unparseable action.""" - return ActionResult(False, f"Could not understand action: {action.args[0]}") - - def _execute_unimplemented(self, agent, action: Action) -> ActionResult: - """Handle unimplemented action types.""" - return ActionResult(False, f"Action {action.type.value} not yet implemented") diff --git a/tests/vllm_demo/action_parser.py b/tests/vllm_demo/action_parser.py deleted file mode 100644 index 18ec209..0000000 --- a/tests/vllm_demo/action_parser.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Action Parser for LLM Agent Responses -===================================== - -Extracts structured actions from free-form LLM text responses. -Handles variations like "Action: GO EAST", "I'll go east", "GO E", etc. -""" - -import re -from dataclasses import dataclass -from typing import Optional, Tuple, Any -from enum import Enum - - -class ActionType(Enum): - GO = "GO" - WAIT = "WAIT" - LOOK = "LOOK" - TAKE = "TAKE" - DROP = "DROP" - PUSH = "PUSH" - USE = "USE" - OPEN = "OPEN" - CLOSE = "CLOSE" - ANNOUNCE = "ANNOUNCE" - SPEAK = "SPEAK" - INVALID = "INVALID" - - -@dataclass -class Action: - type: ActionType - args: Tuple[Any, ...] = () - raw_match: str = "" - - -class ActionParser: - """Parse LLM responses into structured actions.""" - - # Direction normalization - DIRECTIONS = { - 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', - 'NORTH': 'NORTH', 'SOUTH': 'SOUTH', 'EAST': 'EAST', 'WEST': 'WEST', - 'UP': 'NORTH', 'DOWN': 'SOUTH', 'LEFT': 'WEST', 'RIGHT': 'EAST', - } - - # Patterns ordered by specificity (most specific first) - PATTERNS = [ - # Explicit "Action: X" format (preferred) - (ActionType.GO, r'Action:\s*GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.WAIT, r'Action:\s*WAIT\b', 0), - (ActionType.LOOK, r'Action:\s*LOOK(?:\s+AT\s+(\w+))?\b', 1), - (ActionType.TAKE, r'Action:\s*TAKE\s+(\w+)', 1), - (ActionType.DROP, r'Action:\s*DROP\s+(\w+)', 1), - (ActionType.PUSH, r'Action:\s*PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)', 2), - (ActionType.USE, r'Action:\s*USE\s+(\w+)(?:\s+ON\s+(\w+))?', 2), - (ActionType.OPEN, r'Action:\s*OPEN\s+(\w+)', 1), - (ActionType.CLOSE, r'Action:\s*CLOSE\s+(\w+)', 1), - (ActionType.ANNOUNCE, r'Action:\s*ANNOUNCE\s+["\'](.+?)["\']', 1), - (ActionType.SPEAK, r'Action:\s*SPEAK\s+["\'](.+?)["\']', 1), - - # Fallback patterns (less strict) - (ActionType.GO, r'\bGO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.GO, r'\bmove\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.GO, r'\bhead\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), - (ActionType.WAIT, r'\bWAIT\b', 0), - (ActionType.LOOK, r'\bLOOK\b', 0), - ] - - def parse(self, llm_response: str) -> Action: - """ - Parse an LLM response and extract the action. - - Returns Action with type=INVALID if no valid action found. - """ - # Normalize to uppercase for matching - text = llm_response.upper() - - for action_type, pattern, num_groups in self.PATTERNS: - match = re.search(pattern, text, re.IGNORECASE) - if match: - args = self._extract_args(match, num_groups, action_type) - return Action( - type=action_type, - args=args, - raw_match=match.group(0) - ) - - # No valid action found - return Action( - type=ActionType.INVALID, - args=(llm_response[:100],), # First 100 chars for debugging - raw_match="" - ) - - def _extract_args(self, match, num_groups: int, action_type: ActionType) -> tuple: - """Extract and normalize arguments from regex match.""" - if num_groups == 0: - return () - - args = [] - for i in range(1, num_groups + 1): - group = match.group(i) - if group: - # Normalize directions - if action_type == ActionType.GO or (action_type == ActionType.PUSH and i == 2): - group = self.DIRECTIONS.get(group.upper(), group.upper()) - args.append(group) - else: - args.append(None) - - return tuple(args) - - -# Convenience function -def parse_action(llm_response: str) -> Action: - """Parse an LLM response into an Action.""" - return ActionParser().parse(llm_response) diff --git a/tests/vllm_demo/test_action_parser.py b/tests/vllm_demo/test_action_parser.py deleted file mode 100644 index 4c2173d..0000000 --- a/tests/vllm_demo/test_action_parser.py +++ /dev/null @@ -1,214 +0,0 @@ -#!/usr/bin/env python3 -""" -Unit tests for action_parser.py -=============================== - -Tests the ActionParser's ability to extract structured actions -from various LLM response formats. -""" - -import sys -from action_parser import parse_action, ActionType - - -def test_explicit_go_directions(): - """Test explicit 'Action: GO ' format.""" - # Cardinal directions - assert parse_action("Action: GO NORTH").type == ActionType.GO - assert parse_action("Action: GO NORTH").args == ("NORTH",) - - assert parse_action("Action: GO SOUTH").type == ActionType.GO - assert parse_action("Action: GO SOUTH").args == ("SOUTH",) - - assert parse_action("Action: GO EAST").type == ActionType.GO - assert parse_action("Action: GO EAST").args == ("EAST",) - - assert parse_action("Action: GO WEST").type == ActionType.GO - assert parse_action("Action: GO WEST").args == ("WEST",) - - print(" [PASS] Explicit GO directions") - - -def test_short_directions(): - """Test short direction abbreviations (N, S, E, W).""" - assert parse_action("Action: GO N").args == ("NORTH",) - assert parse_action("Action: GO S").args == ("SOUTH",) - assert parse_action("Action: GO E").args == ("EAST",) - assert parse_action("Action: GO W").args == ("WEST",) - - print(" [PASS] Short direction abbreviations") - - -def test_case_insensitivity(): - """Test that parsing is case-insensitive.""" - assert parse_action("action: go south").type == ActionType.GO - assert parse_action("ACTION: GO SOUTH").type == ActionType.GO - assert parse_action("Action: Go South").type == ActionType.GO - assert parse_action("action: GO south").type == ActionType.GO - - print(" [PASS] Case insensitivity") - - -def test_fallback_patterns(): - """Test fallback patterns without 'Action:' prefix.""" - # Natural language variations - assert parse_action("I think I'll GO WEST to explore").type == ActionType.GO - assert parse_action("I'll GO NORTH").type == ActionType.GO - assert parse_action("Let me GO EAST").type == ActionType.GO - - # Move variations - assert parse_action("I should move NORTH").type == ActionType.GO - assert parse_action("Let me head SOUTH").type == ActionType.GO - - print(" [PASS] Fallback patterns") - - -def test_wait_action(): - """Test WAIT action parsing.""" - assert parse_action("Action: WAIT").type == ActionType.WAIT - assert parse_action("I'll WAIT here").type == ActionType.WAIT - assert parse_action("Let me WAIT and see").type == ActionType.WAIT - - print(" [PASS] WAIT action") - - -def test_look_action(): - """Test LOOK action parsing.""" - assert parse_action("Action: LOOK").type == ActionType.LOOK - assert parse_action("Action: LOOK AT door").type == ActionType.LOOK - assert parse_action("Action: LOOK AT door").args == ("DOOR",) - - print(" [PASS] LOOK action") - - -def test_invalid_actions(): - """Test that invalid actions are properly flagged.""" - result = parse_action("I'm not sure what to do") - assert result.type == ActionType.INVALID - - result = parse_action("Let me think about this...") - assert result.type == ActionType.INVALID - - result = parse_action("The weather is nice today") - assert result.type == ActionType.INVALID - - print(" [PASS] Invalid action detection") - - -def test_raw_match_capture(): - """Test that raw_match captures the matched text.""" - result = parse_action("After thinking, Action: GO NORTH is best") - assert "GO NORTH" in result.raw_match - - print(" [PASS] Raw match capture") - - -def test_embedded_actions(): - """Test extraction of actions embedded in longer text.""" - long_response = """ - Looking at the screenshot, I can see I'm in a dungeon corridor. - There's a rat to the east and a wall to the north. - The path south appears clear. - - I think the best course of action is to investigate the rat. - - Action: GO EAST - """ - - result = parse_action(long_response) - assert result.type == ActionType.GO - assert result.args == ("EAST",) - - print(" [PASS] Embedded action extraction") - - -def test_complex_actions(): - """Test more complex action types.""" - # TAKE action - assert parse_action("Action: TAKE sword").type == ActionType.TAKE - assert parse_action("Action: TAKE sword").args == ("SWORD",) - - # DROP action - assert parse_action("Action: DROP shield").type == ActionType.DROP - - # USE action - assert parse_action("Action: USE key").type == ActionType.USE - assert parse_action("Action: USE key ON door").type == ActionType.USE - - # OPEN/CLOSE - assert parse_action("Action: OPEN chest").type == ActionType.OPEN - assert parse_action("Action: CLOSE door").type == ActionType.CLOSE - - print(" [PASS] Complex action types") - - -def test_push_action(): - """Test PUSH action with direction.""" - result = parse_action("Action: PUSH boulder NORTH") - assert result.type == ActionType.PUSH - assert result.args == ("BOULDER", "NORTH") - - result = parse_action("Action: PUSH box E") - assert result.type == ActionType.PUSH - assert result.args == ("BOX", "EAST") - - print(" [PASS] PUSH action") - - -def test_speak_announce_actions(): - """Test SPEAK and ANNOUNCE with quoted strings.""" - result = parse_action('Action: SPEAK "Hello there!"') - assert result.type == ActionType.SPEAK - assert result.args[0] == "HELLO THERE!" # Uppercase due to text normalization - - result = parse_action("Action: ANNOUNCE 'Watch out!'") - assert result.type == ActionType.ANNOUNCE - - print(" [PASS] SPEAK/ANNOUNCE actions") - - -def run_all_tests(): - """Run all parser tests.""" - print("=" * 60) - print("Action Parser Tests") - print("=" * 60) - - tests = [ - test_explicit_go_directions, - test_short_directions, - test_case_insensitivity, - test_fallback_patterns, - test_wait_action, - test_look_action, - test_invalid_actions, - test_raw_match_capture, - test_embedded_actions, - test_complex_actions, - test_push_action, - test_speak_announce_actions, - ] - - passed = 0 - failed = 0 - - for test in tests: - try: - test() - passed += 1 - except AssertionError as e: - print(f" [FAIL] {test.__name__}: {e}") - failed += 1 - except Exception as e: - print(f" [ERROR] {test.__name__}: {e}") - failed += 1 - - print("=" * 60) - print(f"Results: {passed} passed, {failed} failed") - print("=" * 60) - - return failed == 0 - - -if __name__ == "__main__": - success = run_all_tests() - sys.exit(0 if success else 1) diff --git a/tests/vllm_demo/test_world_graph.py b/tests/vllm_demo/test_world_graph.py deleted file mode 100644 index 12a99b4..0000000 --- a/tests/vllm_demo/test_world_graph.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -Unit tests for WorldGraph -""" - -from world_graph import ( - WorldGraph, Room, Door, WorldObject, Direction, - AgentInfo, create_two_room_scenario, create_button_door_scenario -) - -def test_room_contains(): - """Test room boundary checking.""" - room = Room("test", "test room", bounds=(5, 5, 10, 10)) - - assert room.contains(5, 5) == True # Top-left corner - assert room.contains(14, 14) == True # Bottom-right (exclusive) - assert room.contains(15, 15) == False # Outside - assert room.contains(4, 5) == False # Just outside left - - print("PASS: room_contains") - -def test_room_at(): - """Test spatial room lookup.""" - world = create_two_room_scenario() - - # Guard room is at (1,1) with size (8,8) - room = world.room_at(3, 3) - assert room is not None - assert room.name == "guard_room" - - # Armory is at (11,1) with size (8,8) - room = world.room_at(13, 3) - assert room is not None - assert room.name == "armory" - - # Between rooms (the door area) - should return None - room = world.room_at(9, 4) - assert room is None - - print("PASS: room_at") - -def test_describe_room_basic(): - """Test basic room description.""" - world = create_two_room_scenario() - - desc = world.describe_room("guard_room") - - assert "You are in the guard room" in desc - assert "brass key" in desc - assert "Exits:" in desc - assert "east" in desc - assert "armory" in desc - - print("PASS: describe_room_basic") - print(f" Output: {desc}") - -def test_describe_room_with_agents(): - """Test room description with visible agents.""" - world = create_two_room_scenario() - - agents = [ - AgentInfo("Wizard", "a wizard", (3, 3)), - AgentInfo("Knight", "a knight", (4, 4)), - ] - - desc = world.describe_room("guard_room", visible_agents=agents, observer_name="Wizard") - - assert "knight" in desc.lower() - assert "wizard" not in desc.lower() # Observer excluded - - print("PASS: describe_room_with_agents") - print(f" Output: {desc}") - -def test_describe_locked_door(): - """Test that locked doors are described correctly.""" - world = create_button_door_scenario() - - desc = world.describe_room("button_room") - - assert "locked" in desc.lower() - - print("PASS: describe_locked_door") - print(f" Output: {desc}") - -def test_available_actions(): - """Test action enumeration.""" - world = create_two_room_scenario() - - actions = world.get_available_actions("guard_room") - - assert "GO EAST" in actions - assert "TAKE brass_key" in actions - assert "LOOK" in actions - assert "WAIT" in actions - - print("PASS: available_actions") - print(f" Actions: {actions}") - -def test_determinism(): - """Test that descriptions are deterministic.""" - world = create_two_room_scenario() - - desc1 = world.describe_room("guard_room") - desc2 = world.describe_room("guard_room") - desc3 = world.describe_room("guard_room") - - assert desc1 == desc2 == desc3, "Descriptions must be deterministic!" - - print("PASS: determinism") - -def test_direction_opposites(): - """Test direction opposite calculation.""" - assert Direction.NORTH.opposite == Direction.SOUTH - assert Direction.SOUTH.opposite == Direction.NORTH - assert Direction.EAST.opposite == Direction.WEST - assert Direction.WEST.opposite == Direction.EAST - - print("PASS: direction_opposites") - -def run_all_tests(): - """Run all WorldGraph tests.""" - print("=" * 50) - print("WorldGraph Unit Tests") - print("=" * 50) - - test_room_contains() - test_room_at() - test_describe_room_basic() - test_describe_room_with_agents() - test_describe_locked_door() - test_available_actions() - test_determinism() - test_direction_opposites() - - print("=" * 50) - print("All tests passed!") - print("=" * 50) - -if __name__ == "__main__": - run_all_tests() diff --git a/tests/vllm_demo/turn_orchestrator.py b/tests/vllm_demo/turn_orchestrator.py deleted file mode 100644 index b664e7d..0000000 --- a/tests/vllm_demo/turn_orchestrator.py +++ /dev/null @@ -1,301 +0,0 @@ -""" -Turn Orchestrator -================= - -Manages multi-turn simulation with logging for replay. -Coordinates perspective switching, LLM queries, and action execution. -""" - -import json -import os -from dataclasses import dataclass, asdict, field -from typing import List, Dict, Any, Optional, Callable -from datetime import datetime - -from world_graph import WorldGraph, AgentInfo -from action_parser import Action, ActionType, parse_action -from action_executor import ActionExecutor, ActionResult - - -@dataclass -class SimulationStep: - """Record of one agent's turn.""" - turn: int - agent_id: str - agent_position: tuple - room: str - perception: Dict[str, Any] # Context shown to LLM - llm_response: str # Raw LLM output - parsed_action_type: str # Action type as string - parsed_action_args: tuple # Action arguments - result_success: bool - result_message: str - new_position: Optional[tuple] = None - path: Optional[List[tuple]] = None # For animation replay - timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) - - -@dataclass -class SimulationLog: - """Complete simulation record for replay and analysis.""" - metadata: Dict[str, Any] - steps: List[SimulationStep] = field(default_factory=list) - - def save(self, path: str): - """Save log to JSON file.""" - data = { - "metadata": self.metadata, - "steps": [asdict(s) for s in self.steps] - } - with open(path, 'w') as f: - json.dump(data, f, indent=2, default=str) - print(f"Simulation log saved to: {path}") - - @classmethod - def load(cls, path: str) -> 'SimulationLog': - """Load log from JSON file.""" - with open(path) as f: - data = json.load(f) - - steps = [] - for s in data["steps"]: - # Convert tuple strings back to tuples - if isinstance(s.get("agent_position"), list): - s["agent_position"] = tuple(s["agent_position"]) - if isinstance(s.get("new_position"), list): - s["new_position"] = tuple(s["new_position"]) - if isinstance(s.get("parsed_action_args"), list): - s["parsed_action_args"] = tuple(s["parsed_action_args"]) - if s.get("path"): - s["path"] = [tuple(p) for p in s["path"]] - steps.append(SimulationStep(**s)) - - return cls(metadata=data["metadata"], steps=steps) - - def get_agent_steps(self, agent_name: str) -> List[SimulationStep]: - """Get all steps for a specific agent.""" - return [s for s in self.steps if s.agent_id == agent_name] - - def get_turn_steps(self, turn: int) -> List[SimulationStep]: - """Get all steps from a specific turn.""" - return [s for s in self.steps if s.turn == turn] - - def summary(self) -> str: - """Generate a summary of the simulation.""" - lines = [ - f"Simulation Summary", - f"==================", - f"Total turns: {self.metadata.get('total_turns', 'unknown')}", - f"Total steps: {len(self.steps)}", - f"Agents: {', '.join(self.metadata.get('agent_names', []))}", - f"", - ] - - # Per-agent stats - for agent_name in self.metadata.get('agent_names', []): - agent_steps = self.get_agent_steps(agent_name) - successes = sum(1 for s in agent_steps if s.result_success) - lines.append(f"{agent_name}:") - lines.append(f" Actions: {len(agent_steps)}") - lines.append(f" Successful: {successes}") - if agent_steps: - final = agent_steps[-1] - final_pos = final.new_position or final.agent_position - lines.append(f" Final position: {final_pos}") - lines.append(f" Final room: {final.room}") - lines.append("") - - return "\n".join(lines) - - -class TurnOrchestrator: - """ - Orchestrates multi-turn simulation. - - Handles: - - Turn sequencing - - Perspective switching - - LLM queries - - Action execution - - Simulation logging - """ - - def __init__(self, grid, fov_layer, world: WorldGraph, agents: list, - screenshot_dir: str, llm_query_fn: Callable): - """ - Initialize orchestrator. - - Args: - grid: mcrfpy.Grid instance - fov_layer: Color layer for FOV rendering - world: WorldGraph instance - agents: List of Agent objects - screenshot_dir: Directory for screenshots - llm_query_fn: Function(agent, screenshot_path, context) -> str - """ - self.grid = grid - self.fov_layer = fov_layer - self.world = world - self.agents = agents - self.screenshot_dir = screenshot_dir - self.llm_query_fn = llm_query_fn - - self.executor = ActionExecutor(grid) - self.turn_number = 0 - self.steps: List[SimulationStep] = [] - - os.makedirs(screenshot_dir, exist_ok=True) - - def run_turn(self) -> List[SimulationStep]: - """ - Execute one full turn (all agents act once). - - Returns list of SimulationSteps for this turn. - """ - import mcrfpy - - self.turn_number += 1 - turn_steps = [] - - print(f"\n{'='*60}") - print(f"TURN {self.turn_number}") - print("=" * 60) - - for agent in self.agents: - step = self._run_agent_turn(agent) - turn_steps.append(step) - self.steps.append(step) - - return turn_steps - - def run_simulation(self, max_turns: int = 10, - stop_condition: Callable = None) -> SimulationLog: - """ - Run complete simulation. - - Args: - max_turns: Maximum number of turns to run - stop_condition: Optional callable(orchestrator) -> bool - Returns True to stop simulation early - - Returns: - SimulationLog with all steps - """ - print(f"\nStarting simulation: max {max_turns} turns") - print(f"Agents: {[a.name for a in self.agents]}") - print("=" * 60) - - for turn in range(max_turns): - self.run_turn() - - # Check stop condition - if stop_condition and stop_condition(self): - print(f"\nStop condition met at turn {self.turn_number}") - break - - # Create log - log = SimulationLog( - metadata={ - "total_turns": self.turn_number, - "num_agents": len(self.agents), - "agent_names": [a.name for a in self.agents], - "timestamp": datetime.now().isoformat(), - "world_rooms": list(self.world.rooms.keys()), - "screenshot_dir": self.screenshot_dir, - }, - steps=self.steps - ) - - return log - - def _run_agent_turn(self, agent) -> SimulationStep: - """Execute one agent's turn.""" - import mcrfpy - from mcrfpy import automation - - print(f"\n--- {agent.name}'s Turn ---") - print(f"Position: {agent.pos} | Room: {agent.current_room}") - - # Switch perspective - self._switch_perspective(agent) - mcrfpy.step(0.016) - - # Screenshot - screenshot_path = os.path.join( - self.screenshot_dir, - f"turn{self.turn_number}_{agent.name.lower()}.png" - ) - automation.screenshot(screenshot_path) - - # Build context - visible_agents = self._get_visible_agents(agent) - context = agent.get_context(visible_agents + [agent]) - - # Query LLM - llm_response = self.llm_query_fn(agent, screenshot_path, context) - - # Parse and execute - action = parse_action(llm_response) - result = self.executor.execute(agent, action) - - # Log output - status = "SUCCESS" if result.success else "FAILED" - print(f" Action: {action.type.value} {action.args}") - print(f" Result: {status} - {result.message}") - - # Build step record - step = SimulationStep( - turn=self.turn_number, - agent_id=agent.name, - agent_position=agent.pos, - room=agent.current_room, - perception={ - "location": context["location"], - "available_actions": context["available_actions"], - }, - llm_response=llm_response, - parsed_action_type=action.type.value, - parsed_action_args=action.args, - result_success=result.success, - result_message=result.message, - new_position=result.new_position, - path=result.path - ) - - return step - - def _switch_perspective(self, agent): - """Switch grid view to agent's perspective.""" - import mcrfpy - - self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - self.fov_layer.apply_perspective( - entity=agent.entity, - visible=mcrfpy.Color(0, 0, 0, 0), - discovered=mcrfpy.Color(40, 40, 60, 180), - unknown=mcrfpy.Color(0, 0, 0, 255) - ) - agent.entity.update_visibility() - - px, py = agent.pos - self.grid.center = (px * 16 + 8, py * 16 + 8) - - def _get_visible_agents(self, observer) -> list: - """Get agents visible to observer based on FOV.""" - visible = [] - for agent in self.agents: - if agent.name == observer.name: - continue - ax, ay = agent.pos - if self.grid.is_in_fov(ax, ay): - visible.append(agent) - return visible - - def get_agent_positions(self) -> Dict[str, tuple]: - """Get current positions of all agents.""" - return {a.name: a.pos for a in self.agents} - - def agents_in_same_room(self) -> bool: - """Check if all agents are in the same room.""" - rooms = [a.current_room for a in self.agents] - return len(set(rooms)) == 1 diff --git a/tests/vllm_demo/world_graph.py b/tests/vllm_demo/world_graph.py deleted file mode 100644 index c923d1e..0000000 --- a/tests/vllm_demo/world_graph.py +++ /dev/null @@ -1,474 +0,0 @@ -""" -WorldGraph: Room-based World Representation -============================================ - -Provides dual-purpose data structures for: -1. Generating 2D tilemaps (visual representation) -2. Generating text descriptions (LLM context) - -Ensures deterministic text output: same state = same description. -""" - -from dataclasses import dataclass, field -from typing import Dict, List, Optional, Tuple, Any -from enum import Enum - - -class Direction(Enum): - NORTH = "north" - SOUTH = "south" - EAST = "east" - WEST = "west" - - @property - def opposite(self) -> 'Direction': - opposites = { - Direction.NORTH: Direction.SOUTH, - Direction.SOUTH: Direction.NORTH, - Direction.EAST: Direction.WEST, - Direction.WEST: Direction.EAST, - } - return opposites[self] - - @property - def vector(self) -> Tuple[int, int]: - vectors = { - Direction.NORTH: (0, -1), - Direction.SOUTH: (0, 1), - Direction.EAST: (1, 0), - Direction.WEST: (-1, 0), - } - return vectors[self] - - -@dataclass -class Room: - """A room in the world graph.""" - name: str # Internal ID: "kitchen", "guard_room" - display_name: str # Text output: "the kitchen", "a dimly lit guard room" - bounds: Tuple[int, int, int, int] # (x, y, width, height) in tile coords - properties: Dict[str, Any] = field(default_factory=dict) # {"lit": True, "temperature": "warm"} - description_template: Optional[str] = None # "A {temperature} room with {features}." - - @property - def x(self) -> int: - return self.bounds[0] - - @property - def y(self) -> int: - return self.bounds[1] - - @property - def width(self) -> int: - return self.bounds[2] - - @property - def height(self) -> int: - return self.bounds[3] - - @property - def center(self) -> Tuple[int, int]: - return (self.x + self.width // 2, self.y + self.height // 2) - - def contains(self, x: int, y: int) -> bool: - """Check if a tile coordinate is within this room.""" - return (self.x <= x < self.x + self.width and - self.y <= y < self.y + self.height) - - -@dataclass -class Door: - """A connection between two rooms.""" - room_a: str # Room name - room_b: str # Room name - position: Tuple[int, int] # Tile position of the door - direction_from_a: Direction # Direction from room_a to reach room_b - locked: bool = False - key_id: Optional[str] = None # Which key unlocks this door - - @property - def direction_from_b(self) -> Direction: - return self.direction_from_a.opposite - - -@dataclass -class WorldObject: - """An interactable object in the world.""" - name: str # Internal ID: "brass_key" - display_name: str # Text output: "a brass key" - room: str # Which room contains it - position: Tuple[int, int] # Tile position (or None if carried) - affordances: List[str] = field(default_factory=list) # ["takeable", "unlocks:pantry_door"] - description: str = "" # "A tarnished brass key with ornate handle." - - -@dataclass -class AgentInfo: - """Information about an agent for description purposes.""" - name: str # "Wizard", "Knight" - display_name: str # "a wizard", "the knight" - position: Tuple[int, int] # Current tile position - is_player: bool = False # Is this the observing agent? - - -class WorldGraph: - """ - Graph-based world representation. - - Provides: - - Room/door/object storage - - Deterministic text description generation - - Spatial queries (what room is at x,y?) - - Available action enumeration - """ - - def __init__(self): - self.rooms: Dict[str, Room] = {} - self.doors: List[Door] = [] - self.objects: Dict[str, WorldObject] = {} - - # ========================================================================= - # Building the World - # ========================================================================= - - def add_room(self, room: Room) -> None: - """Add a room to the world.""" - self.rooms[room.name] = room - - def add_door(self, door: Door) -> None: - """Add a door connecting two rooms.""" - self.doors.append(door) - - def add_object(self, obj: WorldObject) -> None: - """Add an object to the world.""" - self.objects[obj.name] = obj - - # ========================================================================= - # Spatial Queries - # ========================================================================= - - def room_at(self, x: int, y: int) -> Optional[Room]: - """Get the room containing a tile coordinate.""" - for room in self.rooms.values(): - if room.contains(x, y): - return room - return None - - def get_exits(self, room_name: str) -> List[Door]: - """Get all doors leading out of a room.""" - exits = [] - for door in self.doors: - if door.room_a == room_name or door.room_b == room_name: - exits.append(door) - return exits - - def get_door_in_direction(self, room_name: str, direction: Direction) -> Optional[Door]: - """Get the door in a specific direction from a room.""" - for door in self.doors: - if door.room_a == room_name and door.direction_from_a == direction: - return door - if door.room_b == room_name and door.direction_from_b == direction: - return door - return None - - def get_objects_in_room(self, room_name: str) -> List[WorldObject]: - """Get all objects in a room.""" - return [obj for obj in self.objects.values() if obj.room == room_name] - - # ========================================================================= - # Text Description Generation (Deterministic!) - # ========================================================================= - - def describe_room(self, room_name: str, - visible_agents: List[AgentInfo] = None, - observer_name: str = None) -> str: - """ - Generate a complete room description. - - Args: - room_name: The room to describe - visible_agents: List of agents visible in the room - observer_name: Name of the observing agent (excluded from description) - - Returns: - Deterministic prose description of the room - """ - room = self.rooms.get(room_name) - if not room: - return "You are in an unknown location." - - parts = [] - - # Base location - parts.append(f"You are in {room.display_name}.") - - # Room template description (if any) - if room.description_template and room.properties: - try: - desc = room.description_template.format(**room.properties) - parts.append(desc) - except KeyError: - pass - - # Visible agents - if visible_agents: - agent_desc = self._describe_agents(visible_agents, observer_name) - if agent_desc: - parts.append(agent_desc) - - # Objects on the ground - objects = self.get_objects_in_room(room_name) - if objects: - obj_desc = self._describe_objects(objects) - parts.append(obj_desc) - - # Exits - exits = self.get_exits(room_name) - parts.append(self._describe_exits(room_name, exits)) - - return " ".join(parts) - - def _describe_agents(self, agents: List[AgentInfo], observer_name: str = None) -> str: - """Describe visible agents (excluding observer).""" - others = [a for a in agents if a.name != observer_name and not a.is_player] - if not others: - return "" - - if len(others) == 1: - return f"You see {others[0].display_name} here." - else: - names = [a.display_name for a in others] - formatted = ", ".join(names[:-1]) + f" and {names[-1]}" - return f"You see {formatted} here." - - def _describe_objects(self, objects: List[WorldObject]) -> str: - """Describe objects in the room.""" - if not objects: - return "" - - # Group by affordance for natural description - takeable = [o for o in objects if "takeable" in o.affordances] - furniture = [o for o in objects if "takeable" not in o.affordances] - - parts = [] - if takeable: - if len(takeable) == 1: - parts.append(f"On the ground you see {takeable[0].display_name}.") - else: - names = [o.display_name for o in takeable] - formatted = ", ".join(names[:-1]) + f" and {names[-1]}" - parts.append(f"On the ground you see {formatted}.") - - if furniture: - for obj in furniture: - parts.append(f"There is {obj.display_name} here.") - - return " ".join(parts) - - def _describe_exits(self, room_name: str, exits: List[Door]) -> str: - """Describe available exits.""" - if not exits: - return "There are no visible exits." - - exit_parts = [] - for door in exits: - # Determine direction and destination from this room's perspective - if door.room_a == room_name: - direction = door.direction_from_a.value - dest_room = self.rooms.get(door.room_b) - else: - direction = door.direction_from_b.value - dest_room = self.rooms.get(door.room_a) - - dest_name = dest_room.display_name if dest_room else "unknown" - - if door.locked: - exit_parts.append(f"{direction} ({dest_name}, locked)") - else: - exit_parts.append(f"{direction} ({dest_name})") - - # Sort for deterministic output - exit_parts.sort() - - return "Exits: " + ", ".join(exit_parts) + "." - - # ========================================================================= - # Action Enumeration - # ========================================================================= - - def get_available_actions(self, room_name: str, - can_speak: bool = True) -> List[str]: - """ - Get list of available actions for an agent in a room. - - Returns list of action strings like: - ["GO NORTH", "GO EAST", "TAKE brass_key", "WAIT", "LOOK"] - """ - actions = ["LOOK", "WAIT"] - - # Movement actions - for door in self.get_exits(room_name): - if door.room_a == room_name: - direction = door.direction_from_a.value.upper() - else: - direction = door.direction_from_b.value.upper() - - if not door.locked: - actions.append(f"GO {direction}") - else: - # Could add UNLOCK action here if agent has key - pass - - # Object interactions - for obj in self.get_objects_in_room(room_name): - if "takeable" in obj.affordances: - actions.append(f"TAKE {obj.name}") - if "pushable" in obj.affordances: - actions.append(f"PUSH {obj.name} ") - if "openable" in obj.affordances: - actions.append(f"OPEN {obj.name}") - if "readable" in obj.affordances: - actions.append(f"READ {obj.name}") - - # Speech actions - if can_speak: - actions.append("ANNOUNCE ''") - actions.append("SPEAK ''") - - return sorted(actions) - - -# ============================================================================= -# Factory Functions for Common Scenarios -# ============================================================================= - -def create_two_room_scenario() -> WorldGraph: - """ - Create a simple two-room test scenario. - - Layout: - +--------+ +--------+ - | Room A |===| Room B | - | (west) | | (east) | - +--------+ +--------+ - - Room A: "the guard room" - contains a brass key - Room B: "the armory" - destination room - Door: unlocked, between rooms - """ - world = WorldGraph() - - # Room A (left side) - room_a = Room( - name="guard_room", - display_name="the guard room", - bounds=(1, 1, 8, 8), # x, y, width, height - properties={"lit": True, "atmosphere": "musty"}, - description_template="The air is {atmosphere}." - ) - world.add_room(room_a) - - # Room B (right side) - room_b = Room( - name="armory", - display_name="the armory", - bounds=(11, 1, 8, 8), - properties={"lit": True, "atmosphere": "cold"}, - description_template="Weapon racks line the walls." - ) - world.add_room(room_b) - - # Door connecting them - door = Door( - room_a="guard_room", - room_b="armory", - position=(9, 4), # Between the rooms - direction_from_a=Direction.EAST, - locked=False - ) - world.add_door(door) - - # Object in Room A - key = WorldObject( - name="brass_key", - display_name="a brass key", - room="guard_room", - position=(3, 3), - affordances=["takeable", "unlocks:dungeon_door"], - description="A tarnished brass key with an ornate handle." - ) - world.add_object(key) - - return world - - -def create_button_door_scenario() -> WorldGraph: - """ - Create the Phase 1 scenario from issue #154. - - Layout: - +----------+ +----------+ - | Room A | | Room B | - | [Button] |===| [Goal] | - | Agent A | | Agent B | - +----------+ +----------+ - - - Door starts locked - - Button in Room A unlocks the door - - Agent A can reach button; Agent B's goal is blocked by door - - Success: Agents coordinate to solve puzzle - """ - world = WorldGraph() - - # Room A (button room) - room_a = Room( - name="button_room", - display_name="the button room", - bounds=(1, 1, 8, 8), - properties={"lit": True} - ) - world.add_room(room_a) - - # Room B (goal room) - room_b = Room( - name="goal_room", - display_name="the goal room", - bounds=(11, 1, 8, 8), - properties={"lit": True} - ) - world.add_room(room_b) - - # Locked door - door = Door( - room_a="button_room", - room_b="goal_room", - position=(9, 4), - direction_from_a=Direction.EAST, - locked=True, - key_id="button_mechanism" - ) - world.add_door(door) - - # Button in Room A - button = WorldObject( - name="wall_button", - display_name="a large button on the wall", - room="button_room", - position=(2, 4), - affordances=["pressable", "activates:main_door"], - description="A heavy stone button protrudes from the wall." - ) - world.add_object(button) - - # Goal marker in Room B - goal = WorldObject( - name="goal_marker", - display_name="a glowing rune on the floor", - room="goal_room", - position=(15, 4), - affordances=["examinable"], - description="An arcane symbol pulses with soft light." - ) - world.add_object(goal) - - return world