Compare commits

...

4 commits

Author SHA1 Message Date
eb4a398e09 docs: Add development plans for VLLM agent infrastructure
Implementation plans for LLM agent orchestration work:
- Hour 1: Action parser and executor design
- Hour 2: WorldGraph foundation design
- Hours 3-4: Integration and multi-turn demo design

These plans were used to parallelize development of #155 and #156.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-14 12:54:03 -05:00
de739037f0 feat: Add TurnOrchestrator for multi-turn LLM simulation (addresses #156)
TurnOrchestrator: Coordinates multi-agent turn-based simulation
- Perspective switching with FOV layer updates
- Screenshot capture per agent per turn
- Pluggable LLM query callback
- SimulationStep/SimulationLog for full context capture
- JSON save/load with replay support

New demos:
- 2_integrated_demo.py: WorldGraph + action execution integration
- 3_multi_turn_demo.py: Complete multi-turn simulation with logging

Updated 1_multi_agent_demo.py with action parser/executor integration.

Tested with Qwen2.5-VL-32B: agents successfully navigate based on
WorldGraph descriptions and VLM visual input.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-14 12:53:48 -05:00
2890528e21 feat: Add action parser and executor for LLM agent actions
ActionParser: Extracts structured actions from LLM text responses
- Regex patterns for GO, WAIT, LOOK, TAKE, DROP, PUSH, USE, etc.
- Direction normalization (N→NORTH, UP→NORTH)
- Handles "Action: GO EAST" and fallback patterns
- 12 unit tests covering edge cases

ActionExecutor: Executes parsed actions in the game world
- Movement with collision detection (walls, entities)
- Boundary checking
- ActionResult with path data for animation replay

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-14 12:53:39 -05:00
e45760c2ac feat: Add WorldGraph for deterministic room descriptions (closes #155)
Implements Python-side room graph data structures for LLM agent environments:
- Room, Door, WorldObject dataclasses with full metadata
- WorldGraph class with spatial queries (room_at, get_exits)
- Deterministic text generation (describe_room, describe_exits)
- Available action enumeration based on room state
- Factory functions for test scenarios (two_room, button_door)

Example output:
"You are in the guard room. The air is musty. On the ground you see
a brass key. Exits: east (the armory)."

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-14 12:53:30 -05:00
12 changed files with 4101 additions and 0 deletions

View file

@ -22,6 +22,9 @@ import base64
import os import os
import random import random
from action_parser import parse_action
from action_executor import ActionExecutor
# VLLM configuration # VLLM configuration
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
SCREENSHOT_DIR = "/tmp/vllm_multi_agent" SCREENSHOT_DIR = "/tmp/vllm_multi_agent"
@ -284,6 +287,9 @@ def run_demo():
# Setup scene # Setup scene
grid, fov_layer, agents, rat = setup_scene() grid, fov_layer, agents, rat = setup_scene()
# Create action executor
executor = ActionExecutor(grid)
# Cycle through each agent's perspective # Cycle through each agent's perspective
for i, agent in enumerate(agents): for i, agent in enumerate(agents):
print(f"\n{'='*70}") print(f"\n{'='*70}")
@ -319,6 +325,21 @@ def run_demo():
print(f"\n{agent.name}'s Response:\n{response}") print(f"\n{agent.name}'s Response:\n{response}")
print() print()
# Parse and execute action
print(f"--- Action Execution ---")
action = parse_action(response)
print(f"Parsed action: {action.type.value} {action.args}")
result = executor.execute(agent, action)
if result.success:
print(f"SUCCESS: {result.message}")
if result.new_position:
# Update perspective after movement
switch_perspective(grid, fov_layer, agent)
mcrfpy.step(0.016)
else:
print(f"FAILED: {result.message}")
print("\n" + "=" * 70) print("\n" + "=" * 70)
print("Multi-Agent Demo Complete") print("Multi-Agent Demo Complete")
print("=" * 70) print("=" * 70)

View file

@ -0,0 +1,391 @@
# Hour 1: Action Parser & Executor
**Issue**: #156 Turn-based LLM Agent Orchestration
**Goal**: Agents can actually move when they say "GO EAST"
**Parallelizable with**: Hour 2 (no dependencies)
---
## Deliverables
1. `action_parser.py` - Parse LLM text responses into structured actions
2. `action_executor.py` - Execute parsed actions in the game world
3. Modified `1_multi_agent_demo.py` - Integrate parser/executor to show movement
---
## File 1: `action_parser.py`
```python
"""
Action Parser for LLM Agent Responses
=====================================
Extracts structured actions from free-form LLM text responses.
Handles variations like "Action: GO EAST", "I'll go east", "GO E", etc.
"""
import re
from dataclasses import dataclass
from typing import Optional, Tuple, Any
from enum import Enum
class ActionType(Enum):
GO = "GO"
WAIT = "WAIT"
LOOK = "LOOK"
TAKE = "TAKE"
DROP = "DROP"
PUSH = "PUSH"
USE = "USE"
OPEN = "OPEN"
CLOSE = "CLOSE"
ANNOUNCE = "ANNOUNCE"
SPEAK = "SPEAK"
INVALID = "INVALID"
@dataclass
class Action:
type: ActionType
args: Tuple[Any, ...] = ()
raw_match: str = ""
class ActionParser:
"""Parse LLM responses into structured actions."""
# Direction normalization
DIRECTIONS = {
'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST',
'NORTH': 'NORTH', 'SOUTH': 'SOUTH', 'EAST': 'EAST', 'WEST': 'WEST',
'UP': 'NORTH', 'DOWN': 'SOUTH', 'LEFT': 'WEST', 'RIGHT': 'EAST',
}
# Patterns ordered by specificity (most specific first)
PATTERNS = [
# Explicit "Action: X" format (preferred)
(ActionType.GO, r'Action:\s*GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.WAIT, r'Action:\s*WAIT\b', 0),
(ActionType.LOOK, r'Action:\s*LOOK(?:\s+AT\s+(\w+))?\b', 1),
(ActionType.TAKE, r'Action:\s*TAKE\s+(\w+)', 1),
(ActionType.DROP, r'Action:\s*DROP\s+(\w+)', 1),
(ActionType.PUSH, r'Action:\s*PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)', 2),
(ActionType.USE, r'Action:\s*USE\s+(\w+)(?:\s+ON\s+(\w+))?', 2),
(ActionType.OPEN, r'Action:\s*OPEN\s+(\w+)', 1),
(ActionType.CLOSE, r'Action:\s*CLOSE\s+(\w+)', 1),
(ActionType.ANNOUNCE, r'Action:\s*ANNOUNCE\s+["\'](.+?)["\']', 1),
(ActionType.SPEAK, r'Action:\s*SPEAK\s+["\'](.+?)["\']', 1),
# Fallback patterns (less strict)
(ActionType.GO, r'\bGO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.GO, r'\bmove\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.GO, r'\bhead\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.WAIT, r'\bWAIT\b', 0),
(ActionType.LOOK, r'\bLOOK\b', 0),
]
def parse(self, llm_response: str) -> Action:
"""
Parse an LLM response and extract the action.
Returns Action with type=INVALID if no valid action found.
"""
# Normalize to uppercase for matching
text = llm_response.upper()
for action_type, pattern, num_groups in self.PATTERNS:
match = re.search(pattern, text, re.IGNORECASE)
if match:
args = self._extract_args(match, num_groups, action_type)
return Action(
type=action_type,
args=args,
raw_match=match.group(0)
)
# No valid action found
return Action(
type=ActionType.INVALID,
args=(llm_response[:100],), # First 100 chars for debugging
raw_match=""
)
def _extract_args(self, match, num_groups: int, action_type: ActionType) -> tuple:
"""Extract and normalize arguments from regex match."""
if num_groups == 0:
return ()
args = []
for i in range(1, num_groups + 1):
group = match.group(i)
if group:
# Normalize directions
if action_type == ActionType.GO or (action_type == ActionType.PUSH and i == 2):
group = self.DIRECTIONS.get(group.upper(), group.upper())
args.append(group)
else:
args.append(None)
return tuple(args)
# Convenience function
def parse_action(llm_response: str) -> Action:
"""Parse an LLM response into an Action."""
return ActionParser().parse(llm_response)
```
---
## File 2: `action_executor.py`
```python
"""
Action Executor for McRogueFace
===============================
Executes parsed actions in the game world.
Handles movement, collision detection, and action results.
"""
from dataclasses import dataclass
from typing import Optional, List, Tuple
from action_parser import Action, ActionType
@dataclass
class ActionResult:
success: bool
message: str
new_position: Optional[Tuple[int, int]] = None
path: Optional[List[Tuple[int, int]]] = None # For animation replay
class ActionExecutor:
"""Execute actions in the McRogueFace game world."""
# Direction vectors
DIRECTION_VECTORS = {
'NORTH': (0, -1),
'SOUTH': (0, 1),
'EAST': (1, 0),
'WEST': (-1, 0),
}
def __init__(self, grid):
"""
Initialize executor with a grid reference.
Args:
grid: mcrfpy.Grid instance
"""
self.grid = grid
def execute(self, agent, action: Action) -> ActionResult:
"""
Execute an action for an agent.
Args:
agent: Agent wrapper with .entity attribute
action: Parsed Action to execute
Returns:
ActionResult with success status and message
"""
handlers = {
ActionType.GO: self._execute_go,
ActionType.WAIT: self._execute_wait,
ActionType.LOOK: self._execute_look,
ActionType.TAKE: self._execute_take,
ActionType.DROP: self._execute_drop,
ActionType.INVALID: self._execute_invalid,
}
handler = handlers.get(action.type, self._execute_unimplemented)
return handler(agent, action)
def _execute_go(self, agent, action: Action) -> ActionResult:
"""Execute movement in a direction."""
if not action.args or not action.args[0]:
return ActionResult(False, "No direction specified")
direction = action.args[0]
if direction not in self.DIRECTION_VECTORS:
return ActionResult(False, f"Invalid direction: {direction}")
dx, dy = self.DIRECTION_VECTORS[direction]
# Get current position
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
new_x, new_y = current_x + dx, current_y + dy
# Check bounds
grid_w, grid_h = self.grid.grid_size
if not (0 <= new_x < grid_w and 0 <= new_y < grid_h):
return ActionResult(False, f"Cannot go {direction} - edge of map")
# Check walkability
target_cell = self.grid.at(new_x, new_y)
if not target_cell.walkable:
return ActionResult(False, f"Cannot go {direction} - path blocked")
# Check for entity collision (optional - depends on game rules)
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if ex == new_x and ey == new_y:
return ActionResult(False, f"Cannot go {direction} - someone is there")
# Execute movement
agent.entity.grid_pos = (new_x, new_y)
return ActionResult(
success=True,
message=f"Moved {direction.lower()} to ({new_x}, {new_y})",
new_position=(new_x, new_y),
path=[(current_x, current_y), (new_x, new_y)]
)
def _execute_wait(self, agent, action: Action) -> ActionResult:
"""Execute wait action (no-op)."""
return ActionResult(True, "Waited and observed surroundings")
def _execute_look(self, agent, action: Action) -> ActionResult:
"""Execute look action - returns enhanced observation."""
target = action.args[0] if action.args else None
if target:
return ActionResult(True, f"Examined {target} closely")
return ActionResult(True, "Looked around carefully")
def _execute_take(self, agent, action: Action) -> ActionResult:
"""Execute take action (placeholder)."""
item = action.args[0] if action.args else "unknown"
# TODO: Implement inventory system
return ActionResult(False, f"Cannot take {item} - not implemented yet")
def _execute_drop(self, agent, action: Action) -> ActionResult:
"""Execute drop action (placeholder)."""
item = action.args[0] if action.args else "unknown"
return ActionResult(False, f"Cannot drop {item} - not implemented yet")
def _execute_invalid(self, agent, action: Action) -> ActionResult:
"""Handle invalid/unparseable action."""
return ActionResult(False, f"Could not understand action: {action.args[0]}")
def _execute_unimplemented(self, agent, action: Action) -> ActionResult:
"""Handle unimplemented action types."""
return ActionResult(False, f"Action {action.type.value} not yet implemented")
```
---
## Modifications to `1_multi_agent_demo.py`
Add these changes after the existing `query_agent` function:
```python
# Add imports at top
from action_parser import parse_action
from action_executor import ActionExecutor, ActionResult
# In run_demo(), after setup_scene():
executor = ActionExecutor(grid)
# Replace the agent loop with:
for i, agent in enumerate(agents):
print(f"\n{'='*70}")
print(f"Agent {i+1}/3: {agent.name} ({agent.description})")
print(f"Position: {agent.pos}")
print("=" * 70)
# Switch to this agent's perspective
switch_perspective(grid, fov_layer, agent)
mcrfpy.step(0.016)
# Take screenshot
screenshot_path = os.path.join(SCREENSHOT_DIR, f"{i}_{agent.name.lower()}_view.png")
result = automation.screenshot(screenshot_path)
if not result:
print(f"ERROR: Failed to take screenshot for {agent.name}")
continue
# Get visible entities and query VLLM
visible = get_visible_entities(grid, agent, agents, rat)
grounded_text = build_grounded_prompt(visible)
print(f"Grounded observations: {grounded_text}")
print(f"\nQuerying VLLM for {agent.name}...")
response = query_agent(agent, screenshot_path, grounded_text)
print(f"\n{agent.name}'s Response:\n{response}")
# NEW: Parse and execute action
print(f"\n--- Action Execution ---")
action = parse_action(response)
print(f"Parsed action: {action.type.value} {action.args}")
result = executor.execute(agent, action)
if result.success:
print(f"SUCCESS: {result.message}")
if result.new_position:
# Update perspective after movement
switch_perspective(grid, fov_layer, agent)
mcrfpy.step(0.016)
else:
print(f"FAILED: {result.message}")
```
---
## Testing
### Unit test for parser (`test_action_parser.py`):
```python
from action_parser import parse_action, ActionType
def test_parser():
# Explicit format
assert parse_action("Action: GO NORTH").type == ActionType.GO
assert parse_action("Action: GO NORTH").args == ("NORTH",)
# Short directions
assert parse_action("Action: GO E").args == ("EAST",)
# Case insensitive
assert parse_action("action: go south").type == ActionType.GO
# Fallback patterns
assert parse_action("I think I'll GO WEST").type == ActionType.GO
# Wait and Look
assert parse_action("Action: WAIT").type == ActionType.WAIT
assert parse_action("Action: LOOK").type == ActionType.LOOK
# Invalid
assert parse_action("I'm not sure what to do").type == ActionType.INVALID
print("All parser tests passed!")
if __name__ == "__main__":
test_parser()
```
---
## Success Criteria
- [ ] `action_parser.py` correctly parses all GO directions (N/S/E/W and full names)
- [ ] `action_parser.py` handles WAIT, LOOK, and INVALID cases
- [ ] `action_executor.py` moves entities when GO succeeds
- [ ] `action_executor.py` returns failure message when path is blocked
- [ ] Modified demo shows "Moved east to (5, 7)" style output
- [ ] Entities visibly change position between turns
---
## Notes for Integration (Hour 3)
The `ActionExecutor` will be enhanced in Hour 3 to:
- Use `WorldGraph` for room-based movement (GO NORTH = walk through door to next room)
- Support multi-tile pathfinding for room transitions
- Return path data for animation replay
Keep the current single-tile movement as the foundation.

View file

@ -0,0 +1,684 @@
# Hour 2: WorldGraph Foundation
**Issue**: #155 Deterministic Text Descriptions From Room Graph
**Goal**: Structured room data that generates both tilemaps AND text descriptions
**Parallelizable with**: Hour 1 (no dependencies)
---
## Deliverables
1. `world_graph.py` - Core data structures and description generation
2. `test_world_graph.py` - Unit tests for WorldGraph functionality
3. Example scenario: two connected rooms with a door
---
## File 1: `world_graph.py`
```python
"""
WorldGraph: Room-based World Representation
============================================
Provides dual-purpose data structures for:
1. Generating 2D tilemaps (visual representation)
2. Generating text descriptions (LLM context)
Ensures deterministic text output: same state = same description.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from enum import Enum
class Direction(Enum):
NORTH = "north"
SOUTH = "south"
EAST = "east"
WEST = "west"
@property
def opposite(self) -> 'Direction':
opposites = {
Direction.NORTH: Direction.SOUTH,
Direction.SOUTH: Direction.NORTH,
Direction.EAST: Direction.WEST,
Direction.WEST: Direction.EAST,
}
return opposites[self]
@property
def vector(self) -> Tuple[int, int]:
vectors = {
Direction.NORTH: (0, -1),
Direction.SOUTH: (0, 1),
Direction.EAST: (1, 0),
Direction.WEST: (-1, 0),
}
return vectors[self]
@dataclass
class Room:
"""A room in the world graph."""
name: str # Internal ID: "kitchen", "guard_room"
display_name: str # Text output: "the kitchen", "a dimly lit guard room"
bounds: Tuple[int, int, int, int] # (x, y, width, height) in tile coords
properties: Dict[str, Any] = field(default_factory=dict) # {"lit": True, "temperature": "warm"}
description_template: Optional[str] = None # "A {temperature} room with {features}."
@property
def x(self) -> int:
return self.bounds[0]
@property
def y(self) -> int:
return self.bounds[1]
@property
def width(self) -> int:
return self.bounds[2]
@property
def height(self) -> int:
return self.bounds[3]
@property
def center(self) -> Tuple[int, int]:
return (self.x + self.width // 2, self.y + self.height // 2)
def contains(self, x: int, y: int) -> bool:
"""Check if a tile coordinate is within this room."""
return (self.x <= x < self.x + self.width and
self.y <= y < self.y + self.height)
@dataclass
class Door:
"""A connection between two rooms."""
room_a: str # Room name
room_b: str # Room name
position: Tuple[int, int] # Tile position of the door
direction_from_a: Direction # Direction from room_a to reach room_b
locked: bool = False
key_id: Optional[str] = None # Which key unlocks this door
@property
def direction_from_b(self) -> Direction:
return self.direction_from_a.opposite
@dataclass
class WorldObject:
"""An interactable object in the world."""
name: str # Internal ID: "brass_key"
display_name: str # Text output: "a brass key"
room: str # Which room contains it
position: Tuple[int, int] # Tile position (or None if carried)
affordances: List[str] = field(default_factory=list) # ["takeable", "unlocks:pantry_door"]
description: str = "" # "A tarnished brass key with ornate handle."
@dataclass
class AgentInfo:
"""Information about an agent for description purposes."""
name: str # "Wizard", "Knight"
display_name: str # "a wizard", "the knight"
position: Tuple[int, int] # Current tile position
is_player: bool = False # Is this the observing agent?
class WorldGraph:
"""
Graph-based world representation.
Provides:
- Room/door/object storage
- Deterministic text description generation
- Spatial queries (what room is at x,y?)
- Available action enumeration
"""
def __init__(self):
self.rooms: Dict[str, Room] = {}
self.doors: List[Door] = []
self.objects: Dict[str, WorldObject] = {}
# =========================================================================
# Building the World
# =========================================================================
def add_room(self, room: Room) -> None:
"""Add a room to the world."""
self.rooms[room.name] = room
def add_door(self, door: Door) -> None:
"""Add a door connecting two rooms."""
self.doors.append(door)
def add_object(self, obj: WorldObject) -> None:
"""Add an object to the world."""
self.objects[obj.name] = obj
# =========================================================================
# Spatial Queries
# =========================================================================
def room_at(self, x: int, y: int) -> Optional[Room]:
"""Get the room containing a tile coordinate."""
for room in self.rooms.values():
if room.contains(x, y):
return room
return None
def get_exits(self, room_name: str) -> List[Door]:
"""Get all doors leading out of a room."""
exits = []
for door in self.doors:
if door.room_a == room_name or door.room_b == room_name:
exits.append(door)
return exits
def get_door_in_direction(self, room_name: str, direction: Direction) -> Optional[Door]:
"""Get the door in a specific direction from a room."""
for door in self.doors:
if door.room_a == room_name and door.direction_from_a == direction:
return door
if door.room_b == room_name and door.direction_from_b == direction:
return door
return None
def get_objects_in_room(self, room_name: str) -> List[WorldObject]:
"""Get all objects in a room."""
return [obj for obj in self.objects.values() if obj.room == room_name]
# =========================================================================
# Text Description Generation (Deterministic!)
# =========================================================================
def describe_room(self, room_name: str,
visible_agents: List[AgentInfo] = None,
observer_name: str = None) -> str:
"""
Generate a complete room description.
Args:
room_name: The room to describe
visible_agents: List of agents visible in the room
observer_name: Name of the observing agent (excluded from description)
Returns:
Deterministic prose description of the room
"""
room = self.rooms.get(room_name)
if not room:
return "You are in an unknown location."
parts = []
# Base location
parts.append(f"You are in {room.display_name}.")
# Room template description (if any)
if room.description_template and room.properties:
try:
desc = room.description_template.format(**room.properties)
parts.append(desc)
except KeyError:
pass
# Visible agents
if visible_agents:
agent_desc = self._describe_agents(visible_agents, observer_name)
if agent_desc:
parts.append(agent_desc)
# Objects on the ground
objects = self.get_objects_in_room(room_name)
if objects:
obj_desc = self._describe_objects(objects)
parts.append(obj_desc)
# Exits
exits = self.get_exits(room_name)
parts.append(self._describe_exits(room_name, exits))
return " ".join(parts)
def _describe_agents(self, agents: List[AgentInfo], observer_name: str = None) -> str:
"""Describe visible agents (excluding observer)."""
others = [a for a in agents if a.name != observer_name and not a.is_player]
if not others:
return ""
if len(others) == 1:
return f"You see {others[0].display_name} here."
else:
names = [a.display_name for a in others]
formatted = ", ".join(names[:-1]) + f" and {names[-1]}"
return f"You see {formatted} here."
def _describe_objects(self, objects: List[WorldObject]) -> str:
"""Describe objects in the room."""
if not objects:
return ""
# Group by affordance for natural description
takeable = [o for o in objects if "takeable" in o.affordances]
furniture = [o for o in objects if "takeable" not in o.affordances]
parts = []
if takeable:
if len(takeable) == 1:
parts.append(f"On the ground you see {takeable[0].display_name}.")
else:
names = [o.display_name for o in takeable]
formatted = ", ".join(names[:-1]) + f" and {names[-1]}"
parts.append(f"On the ground you see {formatted}.")
if furniture:
for obj in furniture:
parts.append(f"There is {obj.display_name} here.")
return " ".join(parts)
def _describe_exits(self, room_name: str, exits: List[Door]) -> str:
"""Describe available exits."""
if not exits:
return "There are no visible exits."
exit_parts = []
for door in exits:
# Determine direction and destination from this room's perspective
if door.room_a == room_name:
direction = door.direction_from_a.value
dest_room = self.rooms.get(door.room_b)
else:
direction = door.direction_from_b.value
dest_room = self.rooms.get(door.room_a)
dest_name = dest_room.display_name if dest_room else "unknown"
if door.locked:
exit_parts.append(f"{direction} ({dest_name}, locked)")
else:
exit_parts.append(f"{direction} ({dest_name})")
# Sort for deterministic output
exit_parts.sort()
return "Exits: " + ", ".join(exit_parts) + "."
# =========================================================================
# Action Enumeration
# =========================================================================
def get_available_actions(self, room_name: str,
can_speak: bool = True) -> List[str]:
"""
Get list of available actions for an agent in a room.
Returns list of action strings like:
["GO NORTH", "GO EAST", "TAKE brass_key", "WAIT", "LOOK"]
"""
actions = ["LOOK", "WAIT"]
# Movement actions
for door in self.get_exits(room_name):
if door.room_a == room_name:
direction = door.direction_from_a.value.upper()
else:
direction = door.direction_from_b.value.upper()
if not door.locked:
actions.append(f"GO {direction}")
else:
# Could add UNLOCK action here if agent has key
pass
# Object interactions
for obj in self.get_objects_in_room(room_name):
if "takeable" in obj.affordances:
actions.append(f"TAKE {obj.name}")
if "pushable" in obj.affordances:
actions.append(f"PUSH {obj.name} <direction>")
if "openable" in obj.affordances:
actions.append(f"OPEN {obj.name}")
if "readable" in obj.affordances:
actions.append(f"READ {obj.name}")
# Speech actions
if can_speak:
actions.append("ANNOUNCE '<message>'")
actions.append("SPEAK '<message>'")
return sorted(actions)
# =============================================================================
# Factory Functions for Common Scenarios
# =============================================================================
def create_two_room_scenario() -> WorldGraph:
"""
Create a simple two-room test scenario.
Layout:
+--------+ +--------+
| Room A |===| Room B |
| (west) | | (east) |
+--------+ +--------+
Room A: "the guard room" - contains a brass key
Room B: "the armory" - destination room
Door: unlocked, between rooms
"""
world = WorldGraph()
# Room A (left side)
room_a = Room(
name="guard_room",
display_name="the guard room",
bounds=(1, 1, 8, 8), # x, y, width, height
properties={"lit": True, "atmosphere": "musty"},
description_template="The air is {atmosphere}."
)
world.add_room(room_a)
# Room B (right side)
room_b = Room(
name="armory",
display_name="the armory",
bounds=(11, 1, 8, 8),
properties={"lit": True, "atmosphere": "cold"},
description_template="Weapon racks line the walls."
)
world.add_room(room_b)
# Door connecting them
door = Door(
room_a="guard_room",
room_b="armory",
position=(9, 4), # Between the rooms
direction_from_a=Direction.EAST,
locked=False
)
world.add_door(door)
# Object in Room A
key = WorldObject(
name="brass_key",
display_name="a brass key",
room="guard_room",
position=(3, 3),
affordances=["takeable", "unlocks:dungeon_door"],
description="A tarnished brass key with an ornate handle."
)
world.add_object(key)
return world
def create_button_door_scenario() -> WorldGraph:
"""
Create the Phase 1 scenario from issue #154.
Layout:
+----------+ +----------+
| Room A | | Room B |
| [Button] |===| [Goal] |
| Agent A | | Agent B |
+----------+ +----------+
- Door starts locked
- Button in Room A unlocks the door
- Agent A can reach button; Agent B's goal is blocked by door
- Success: Agents coordinate to solve puzzle
"""
world = WorldGraph()
# Room A (button room)
room_a = Room(
name="button_room",
display_name="the button room",
bounds=(1, 1, 8, 8),
properties={"lit": True}
)
world.add_room(room_a)
# Room B (goal room)
room_b = Room(
name="goal_room",
display_name="the goal room",
bounds=(11, 1, 8, 8),
properties={"lit": True}
)
world.add_room(room_b)
# Locked door
door = Door(
room_a="button_room",
room_b="goal_room",
position=(9, 4),
direction_from_a=Direction.EAST,
locked=True,
key_id="button_mechanism"
)
world.add_door(door)
# Button in Room A
button = WorldObject(
name="wall_button",
display_name="a large button on the wall",
room="button_room",
position=(2, 4),
affordances=["pressable", "activates:main_door"],
description="A heavy stone button protrudes from the wall."
)
world.add_object(button)
# Goal marker in Room B
goal = WorldObject(
name="goal_marker",
display_name="a glowing rune on the floor",
room="goal_room",
position=(15, 4),
affordances=["examinable"],
description="An arcane symbol pulses with soft light."
)
world.add_object(goal)
return world
```
---
## File 2: `test_world_graph.py`
```python
"""
Unit tests for WorldGraph
"""
from world_graph import (
WorldGraph, Room, Door, WorldObject, Direction,
AgentInfo, create_two_room_scenario, create_button_door_scenario
)
def test_room_contains():
"""Test room boundary checking."""
room = Room("test", "test room", bounds=(5, 5, 10, 10))
assert room.contains(5, 5) == True # Top-left corner
assert room.contains(14, 14) == True # Bottom-right (exclusive)
assert room.contains(15, 15) == False # Outside
assert room.contains(4, 5) == False # Just outside left
print("PASS: room_contains")
def test_room_at():
"""Test spatial room lookup."""
world = create_two_room_scenario()
# Guard room is at (1,1) with size (8,8)
room = world.room_at(3, 3)
assert room is not None
assert room.name == "guard_room"
# Armory is at (11,1) with size (8,8)
room = world.room_at(13, 3)
assert room is not None
assert room.name == "armory"
# Between rooms (the door area) - should return None
room = world.room_at(9, 4)
assert room is None
print("PASS: room_at")
def test_describe_room_basic():
"""Test basic room description."""
world = create_two_room_scenario()
desc = world.describe_room("guard_room")
assert "You are in the guard room" in desc
assert "brass key" in desc
assert "Exits:" in desc
assert "east" in desc
assert "armory" in desc
print("PASS: describe_room_basic")
print(f" Output: {desc}")
def test_describe_room_with_agents():
"""Test room description with visible agents."""
world = create_two_room_scenario()
agents = [
AgentInfo("Wizard", "a wizard", (3, 3)),
AgentInfo("Knight", "a knight", (4, 4)),
]
desc = world.describe_room("guard_room", visible_agents=agents, observer_name="Wizard")
assert "knight" in desc.lower()
assert "wizard" not in desc.lower() # Observer excluded
print("PASS: describe_room_with_agents")
print(f" Output: {desc}")
def test_describe_locked_door():
"""Test that locked doors are described correctly."""
world = create_button_door_scenario()
desc = world.describe_room("button_room")
assert "locked" in desc.lower()
print("PASS: describe_locked_door")
print(f" Output: {desc}")
def test_available_actions():
"""Test action enumeration."""
world = create_two_room_scenario()
actions = world.get_available_actions("guard_room")
assert "GO EAST" in actions
assert "TAKE brass_key" in actions
assert "LOOK" in actions
assert "WAIT" in actions
print("PASS: available_actions")
print(f" Actions: {actions}")
def test_determinism():
"""Test that descriptions are deterministic."""
world = create_two_room_scenario()
desc1 = world.describe_room("guard_room")
desc2 = world.describe_room("guard_room")
desc3 = world.describe_room("guard_room")
assert desc1 == desc2 == desc3, "Descriptions must be deterministic!"
print("PASS: determinism")
def test_direction_opposites():
"""Test direction opposite calculation."""
assert Direction.NORTH.opposite == Direction.SOUTH
assert Direction.SOUTH.opposite == Direction.NORTH
assert Direction.EAST.opposite == Direction.WEST
assert Direction.WEST.opposite == Direction.EAST
print("PASS: direction_opposites")
def run_all_tests():
"""Run all WorldGraph tests."""
print("=" * 50)
print("WorldGraph Unit Tests")
print("=" * 50)
test_room_contains()
test_room_at()
test_describe_room_basic()
test_describe_room_with_agents()
test_describe_locked_door()
test_available_actions()
test_determinism()
test_direction_opposites()
print("=" * 50)
print("All tests passed!")
print("=" * 50)
if __name__ == "__main__":
run_all_tests()
```
---
## Example Output
When `describe_room("guard_room")` is called:
```
You are in the guard room. The air is musty. On the ground you see a brass key.
Exits: east (the armory).
```
When `describe_room("button_room")` with locked door:
```
You are in the button room. There is a large button on the wall here.
Exits: east (the goal room, locked).
```
---
## Success Criteria
- [ ] `Room`, `Door`, `WorldObject` dataclasses defined with all fields
- [ ] `WorldGraph.room_at(x, y)` returns correct room
- [ ] `WorldGraph.describe_room()` produces IF-style prose
- [ ] Descriptions include visible agents, objects, and exits
- [ ] Locked doors are marked as "(locked)" in exit descriptions
- [ ] `get_available_actions()` returns appropriate action list
- [ ] All tests pass
- [ ] Output is deterministic (same input = same output)
---
## Notes for Integration (Hour 3)
The `WorldGraph` will be integrated with the demo by:
1. Creating a scenario using factory functions
2. Calling `world.room_at(agent.x, agent.y)` to get current room
3. Calling `world.describe_room()` instead of ad-hoc `build_grounded_prompt()`
4. Including `world.get_available_actions()` in the LLM prompt
The tilemap generation (`generate_tilemap()`) is a stretch goal - the manual tile setup from the current demos works fine for now.

View file

@ -0,0 +1,906 @@
# Hours 3-4: Integration and Multi-Turn Demo
**Issues**: #154, #155, #156 (integration)
**Goal**: Complete turn-based simulation with proper context and logging
**Dependencies**: Hour 1 (Action Parser/Executor), Hour 2 (WorldGraph)
---
## Hour 3: Integration
### Goal
Wire WorldGraph into the demo so agents receive proper IF-style descriptions.
### Deliverables
1. `2_integrated_demo.py` - New demo combining WorldGraph + Action execution
2. Enhanced `ActionExecutor` with room-aware movement
---
### File: `2_integrated_demo.py`
```python
#!/usr/bin/env python3
"""
Integrated VLLM Demo
====================
Combines:
- WorldGraph for structured room descriptions (#155)
- Action parsing and execution (#156)
- Per-agent perspective rendering
This is the foundation for multi-turn simulation.
"""
import mcrfpy
from mcrfpy import automation
import sys
import os
import requests
import base64
from world_graph import WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, create_two_room_scenario
from action_parser import parse_action, ActionType
from action_executor import ActionExecutor
# Configuration
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
SCREENSHOT_DIR = "/tmp/vllm_integrated"
# Sprite constants
FLOOR_TILE = 0
WALL_TILE = 40
WIZARD_SPRITE = 84
KNIGHT_SPRITE = 96
class Agent:
"""Agent wrapper with WorldGraph integration."""
def __init__(self, name: str, display_name: str, entity, world: WorldGraph):
self.name = name
self.display_name = display_name
self.entity = entity
self.world = world
self.message_history = [] # For speech system
@property
def pos(self) -> tuple:
return (int(self.entity.pos[0]), int(self.entity.pos[1]))
@property
def current_room(self) -> str:
room = self.world.room_at(*self.pos)
return room.name if room else None
def get_context(self, visible_agents: list) -> dict:
"""Build complete context for LLM query."""
room_name = self.current_room
# Convert to AgentInfo for WorldGraph
agent_infos = [
AgentInfo(a.name, a.display_name, a.pos, is_player=(a.name == self.name))
for a in visible_agents
]
return {
"location": self.world.describe_room(
room_name,
visible_agents=agent_infos,
observer_name=self.name
),
"available_actions": self.world.get_available_actions(room_name),
"recent_messages": self.message_history[-5:],
}
def file_to_base64(file_path):
with open(file_path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def llm_chat_completion(messages: list):
try:
response = requests.post(VLLM_URL, json={'messages': messages}, timeout=60)
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def message_with_image(text, image_path):
image_data = file_to_base64(image_path)
return {
"role": "user",
"content": [
{"type": "text", "text": text},
{"type": "image_url", "image_url": {"url": "data:image/png;base64," + image_data}}
]
}
def setup_scene(world: WorldGraph):
"""Create scene from WorldGraph."""
mcrfpy.createScene("integrated_demo")
mcrfpy.setScene("integrated_demo")
ui = mcrfpy.sceneUI("integrated_demo")
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
# Create grid sized for the world
grid = mcrfpy.Grid(
grid_size=(25, 15),
texture=texture,
pos=(5, 5),
size=(1014, 700)
)
grid.fill_color = mcrfpy.Color(20, 20, 30)
grid.zoom = 2.0
ui.append(grid)
# Initialize all as walls
for x in range(25):
for y in range(15):
point = grid.at(x, y)
point.tilesprite = WALL_TILE
point.walkable = False
point.transparent = False
# Carve out rooms from WorldGraph
for room in world.rooms.values():
for rx in range(room.x, room.x + room.width):
for ry in range(room.y, room.y + room.height):
if 0 <= rx < 25 and 0 <= ry < 15:
point = grid.at(rx, ry)
point.tilesprite = FLOOR_TILE
point.walkable = True
point.transparent = True
# Place doors
for door in world.doors:
dx, dy = door.position
if 0 <= dx < 25 and 0 <= dy < 15:
point = grid.at(dx, dy)
point.tilesprite = FLOOR_TILE
point.walkable = not door.locked
point.transparent = True
# Create FOV layer
fov_layer = grid.add_layer('color', z_index=10)
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
return grid, fov_layer
def create_agents(grid, world: WorldGraph, texture) -> list:
"""Create agent entities in their starting rooms."""
agents = []
# Agent A: Wizard in guard_room
guard_room = world.rooms["guard_room"]
wizard_entity = mcrfpy.Entity(
grid_pos=guard_room.center,
texture=texture,
sprite_index=WIZARD_SPRITE
)
grid.entities.append(wizard_entity)
agents.append(Agent("Wizard", "a wizard", wizard_entity, world))
# Agent B: Knight in armory
armory = world.rooms["armory"]
knight_entity = mcrfpy.Entity(
grid_pos=armory.center,
texture=texture,
sprite_index=KNIGHT_SPRITE
)
grid.entities.append(knight_entity)
agents.append(Agent("Knight", "a knight", knight_entity, world))
return agents
def switch_perspective(grid, fov_layer, agent):
"""Switch view to agent's perspective."""
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
fov_layer.apply_perspective(
entity=agent.entity,
visible=mcrfpy.Color(0, 0, 0, 0),
discovered=mcrfpy.Color(40, 40, 60, 180),
unknown=mcrfpy.Color(0, 0, 0, 255)
)
agent.entity.update_visibility()
px, py = agent.pos
grid.center = (px * 16 + 8, py * 16 + 8)
def get_visible_agents(grid, observer, all_agents) -> list:
"""Get agents visible to the observer."""
visible = []
for agent in all_agents:
if agent.name == observer.name:
continue
ax, ay = agent.pos
if grid.is_in_fov(ax, ay):
visible.append(agent)
return visible
def query_agent_llm(agent, screenshot_path, context) -> str:
"""Query VLLM for agent's action."""
system_prompt = f"""You are {agent.display_name} in a roguelike dungeon game.
You see the world through screenshots and receive text descriptions.
Your goal is to explore and interact with your environment.
Always end your response with a clear action declaration: "Action: <ACTION>"
"""
# Build the user prompt with WorldGraph context
actions_str = ", ".join(context["available_actions"])
user_prompt = f"""{context["location"]}
Available actions: {actions_str}
Look at the screenshot showing your current view. The dark areas are outside your field of vision.
What would you like to do? State your reasoning briefly (1-2 sentences), then declare your action.
Example: "I see a key on the ground that might be useful. Action: TAKE brass_key"
"""
messages = [
{"role": "system", "content": system_prompt},
message_with_image(user_prompt, screenshot_path)
]
resp = llm_chat_completion(messages)
if "error" in resp:
return f"[VLLM Error: {resp['error']}]"
return resp.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
def run_single_turn(grid, fov_layer, agents, executor, turn_num):
"""Execute one turn for all agents."""
print(f"\n{'='*70}")
print(f"TURN {turn_num}")
print("=" * 70)
results = []
for agent in agents:
print(f"\n--- {agent.name}'s Turn ---")
print(f"Position: {agent.pos} | Room: {agent.current_room}")
# Switch perspective
switch_perspective(grid, fov_layer, agent)
mcrfpy.step(0.016)
# Screenshot
screenshot_path = os.path.join(SCREENSHOT_DIR, f"turn{turn_num}_{agent.name.lower()}.png")
automation.screenshot(screenshot_path)
# Get context using WorldGraph
visible = get_visible_agents(grid, agent, agents)
context = agent.get_context(visible + [agent]) # Include self for filtering
print(f"Context: {context['location']}")
print(f"Actions: {context['available_actions']}")
# Query LLM
print(f"\nQuerying VLLM...")
response = query_agent_llm(agent, screenshot_path, context)
print(f"Response: {response[:200]}...")
# Parse and execute
action = parse_action(response)
print(f"Parsed: {action.type.value} {action.args}")
result = executor.execute(agent, action)
print(f"Result: {'SUCCESS' if result.success else 'FAILED'} - {result.message}")
results.append({
"agent": agent.name,
"context": context,
"response": response,
"action": action,
"result": result
})
return results
def run_demo():
"""Main demo: single integrated turn."""
print("=" * 70)
print("Integrated WorldGraph + Action Demo")
print("=" * 70)
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Create world from WorldGraph
world = create_two_room_scenario()
# Setup scene
grid, fov_layer = setup_scene(world)
# Create agents
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
agents = create_agents(grid, world, texture)
# Create executor
executor = ActionExecutor(grid)
# Run one turn
results = run_single_turn(grid, fov_layer, agents, executor, turn_num=1)
print("\n" + "=" * 70)
print("Demo Complete")
print("=" * 70)
return True
if __name__ == "__main__":
try:
success = run_demo()
sys.exit(0 if success else 1)
except Exception as e:
import traceback
traceback.print_exc()
sys.exit(1)
```
---
## Hour 4: Multi-Turn Demo
### Goal
Run multiple turns with simulation logging for replay.
### Deliverables
1. `turn_orchestrator.py` - Turn management and logging
2. `3_multi_turn_demo.py` - Complete multi-turn simulation
3. `simulation_log.json` - Saved output for replay
---
### File: `turn_orchestrator.py`
```python
"""
Turn Orchestrator
=================
Manages multi-turn simulation with logging for replay.
"""
import json
import os
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional
from datetime import datetime
from world_graph import WorldGraph
from action_parser import Action, ActionType, parse_action
from action_executor import ActionExecutor, ActionResult
@dataclass
class SimulationStep:
"""Record of one agent's turn."""
turn: int
agent_id: str
agent_position: tuple
room: str
perception: Dict[str, Any] # Context shown to LLM
llm_response: str # Raw LLM output
parsed_action_type: str # Action type as string
parsed_action_args: tuple # Action arguments
result_success: bool
result_message: str
new_position: Optional[tuple] = None
path: Optional[List[tuple]] = None # For animation
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
@dataclass
class SimulationLog:
"""Complete simulation record."""
metadata: Dict[str, Any]
steps: List[SimulationStep]
def save(self, path: str):
"""Save log to JSON file."""
data = {
"metadata": self.metadata,
"steps": [asdict(s) for s in self.steps]
}
with open(path, 'w') as f:
json.dump(data, f, indent=2, default=str)
@classmethod
def load(cls, path: str) -> 'SimulationLog':
"""Load log from JSON file."""
with open(path) as f:
data = json.load(f)
steps = [SimulationStep(**s) for s in data["steps"]]
return cls(metadata=data["metadata"], steps=steps)
class TurnOrchestrator:
"""
Orchestrates multi-turn simulation.
Handles:
- Turn sequencing
- Perspective switching
- LLM queries
- Action execution
- Simulation logging
"""
def __init__(self, grid, fov_layer, world: WorldGraph, agents: list,
screenshot_dir: str, llm_query_fn):
self.grid = grid
self.fov_layer = fov_layer
self.world = world
self.agents = agents
self.screenshot_dir = screenshot_dir
self.llm_query_fn = llm_query_fn # Function to query LLM
self.executor = ActionExecutor(grid)
self.turn_number = 0
self.steps: List[SimulationStep] = []
os.makedirs(screenshot_dir, exist_ok=True)
def run_turn(self) -> List[SimulationStep]:
"""Execute one full turn (all agents act once)."""
self.turn_number += 1
turn_steps = []
for agent in self.agents:
step = self._run_agent_turn(agent)
turn_steps.append(step)
self.steps.append(step)
return turn_steps
def run_simulation(self, max_turns: int = 10,
stop_condition=None) -> SimulationLog:
"""
Run complete simulation.
Args:
max_turns: Maximum number of turns to run
stop_condition: Optional callable(orchestrator) -> bool
Returns:
SimulationLog with all steps
"""
print(f"\nStarting simulation: max {max_turns} turns")
print("=" * 50)
for turn in range(max_turns):
print(f"\n--- Turn {turn + 1}/{max_turns} ---")
self.run_turn()
# Check stop condition
if stop_condition and stop_condition(self):
print(f"Stop condition met at turn {turn + 1}")
break
# Create log
log = SimulationLog(
metadata={
"total_turns": self.turn_number,
"num_agents": len(self.agents),
"agent_names": [a.name for a in self.agents],
"timestamp": datetime.now().isoformat(),
"world_rooms": list(self.world.rooms.keys()),
},
steps=self.steps
)
return log
def _run_agent_turn(self, agent) -> SimulationStep:
"""Execute one agent's turn."""
from mcrfpy import automation
import mcrfpy
# Switch perspective
self._switch_perspective(agent)
mcrfpy.step(0.016)
# Screenshot
screenshot_path = os.path.join(
self.screenshot_dir,
f"turn{self.turn_number}_{agent.name.lower()}.png"
)
automation.screenshot(screenshot_path)
# Build context
visible_agents = self._get_visible_agents(agent)
context = agent.get_context(visible_agents + [agent])
# Query LLM
llm_response = self.llm_query_fn(agent, screenshot_path, context)
# Parse and execute
action = parse_action(llm_response)
result = self.executor.execute(agent, action)
# Log
print(f" {agent.name}: {action.type.value} -> {result.message}")
return SimulationStep(
turn=self.turn_number,
agent_id=agent.name,
agent_position=agent.pos,
room=agent.current_room,
perception=context,
llm_response=llm_response,
parsed_action_type=action.type.value,
parsed_action_args=action.args,
result_success=result.success,
result_message=result.message,
new_position=result.new_position,
path=result.path
)
def _switch_perspective(self, agent):
"""Switch grid view to agent's perspective."""
import mcrfpy
self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
self.fov_layer.apply_perspective(
entity=agent.entity,
visible=mcrfpy.Color(0, 0, 0, 0),
discovered=mcrfpy.Color(40, 40, 60, 180),
unknown=mcrfpy.Color(0, 0, 0, 255)
)
agent.entity.update_visibility()
px, py = agent.pos
self.grid.center = (px * 16 + 8, py * 16 + 8)
def _get_visible_agents(self, observer) -> list:
"""Get agents visible to observer."""
visible = []
for agent in self.agents:
if agent.name == observer.name:
continue
ax, ay = agent.pos
if self.grid.is_in_fov(ax, ay):
visible.append(agent)
return visible
```
---
### File: `3_multi_turn_demo.py`
```python
#!/usr/bin/env python3
"""
Multi-Turn Simulation Demo
==========================
Runs multiple turns of agent interaction with full logging.
This is the Phase 1 implementation from issue #154.
"""
import mcrfpy
from mcrfpy import automation
import sys
import os
import requests
import base64
from world_graph import create_two_room_scenario, AgentInfo
from action_parser import parse_action
from action_executor import ActionExecutor
from turn_orchestrator import TurnOrchestrator, SimulationLog
# Configuration
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
SCREENSHOT_DIR = "/tmp/vllm_multi_turn"
LOG_PATH = "/tmp/vllm_multi_turn/simulation_log.json"
MAX_TURNS = 5
# Sprites
FLOOR_TILE = 0
WALL_TILE = 40
WIZARD_SPRITE = 84
KNIGHT_SPRITE = 96
class Agent:
"""Agent with WorldGraph integration."""
def __init__(self, name, display_name, entity, world):
self.name = name
self.display_name = display_name
self.entity = entity
self.world = world
self.message_history = []
@property
def pos(self):
return (int(self.entity.pos[0]), int(self.entity.pos[1]))
@property
def current_room(self):
room = self.world.room_at(*self.pos)
return room.name if room else None
def get_context(self, visible_agents):
room_name = self.current_room
agent_infos = [
AgentInfo(a.name, a.display_name, a.pos, is_player=(a.name == self.name))
for a in visible_agents
]
return {
"location": self.world.describe_room(room_name, agent_infos, self.name),
"available_actions": self.world.get_available_actions(room_name),
"recent_messages": self.message_history[-5:],
}
def file_to_base64(path):
with open(path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def llm_query(agent, screenshot_path, context) -> str:
"""Query VLLM for agent action."""
system = f"""You are {agent.display_name} exploring a dungeon.
You receive visual and text information about your surroundings.
Always end with: Action: <YOUR_ACTION>"""
actions_str = ", ".join(context["available_actions"])
user = f"""{context["location"]}
Available: {actions_str}
[Screenshot attached showing your view]
What do you do? Brief reasoning, then Action: <action>"""
messages = [
{"role": "system", "content": system},
{
"role": "user",
"content": [
{"type": "text", "text": user},
{"type": "image_url", "image_url": {
"url": "data:image/png;base64," + file_to_base64(screenshot_path)
}}
]
}
]
try:
resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60)
data = resp.json()
if "error" in data:
return f"[Error: {data['error']}]"
return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
except Exception as e:
return f"[Error: {e}]"
def setup_scene(world):
"""Create scene from WorldGraph."""
mcrfpy.createScene("multi_turn")
mcrfpy.setScene("multi_turn")
ui = mcrfpy.sceneUI("multi_turn")
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
grid = mcrfpy.Grid(
grid_size=(25, 15),
texture=texture,
pos=(5, 5),
size=(1014, 700)
)
grid.fill_color = mcrfpy.Color(20, 20, 30)
grid.zoom = 2.0
ui.append(grid)
# Walls everywhere first
for x in range(25):
for y in range(15):
p = grid.at(x, y)
p.tilesprite = WALL_TILE
p.walkable = False
p.transparent = False
# Carve rooms
for room in world.rooms.values():
for rx in range(room.x, room.x + room.width):
for ry in range(room.y, room.y + room.height):
if 0 <= rx < 25 and 0 <= ry < 15:
p = grid.at(rx, ry)
p.tilesprite = FLOOR_TILE
p.walkable = True
p.transparent = True
# Place doors
for door in world.doors:
dx, dy = door.position
if 0 <= dx < 25 and 0 <= dy < 15:
p = grid.at(dx, dy)
p.tilesprite = FLOOR_TILE
p.walkable = not door.locked
p.transparent = True
# FOV layer
fov_layer = grid.add_layer('color', z_index=10)
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
return grid, fov_layer, texture
def create_agents(grid, world, texture):
"""Create agents in starting positions."""
agents = []
# Wizard in guard_room
room_a = world.rooms["guard_room"]
wizard = mcrfpy.Entity(grid_pos=room_a.center, texture=texture, sprite_index=WIZARD_SPRITE)
grid.entities.append(wizard)
agents.append(Agent("Wizard", "a wizard", wizard, world))
# Knight in armory
room_b = world.rooms["armory"]
knight = mcrfpy.Entity(grid_pos=room_b.center, texture=texture, sprite_index=KNIGHT_SPRITE)
grid.entities.append(knight)
agents.append(Agent("Knight", "a knight", knight, world))
return agents
def run_demo():
"""Run multi-turn simulation."""
print("=" * 70)
print("Multi-Turn Simulation Demo")
print(f"Running {MAX_TURNS} turns with 2 agents")
print("=" * 70)
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Setup
world = create_two_room_scenario()
grid, fov_layer, texture = setup_scene(world)
agents = create_agents(grid, world, texture)
# Create orchestrator
orchestrator = TurnOrchestrator(
grid=grid,
fov_layer=fov_layer,
world=world,
agents=agents,
screenshot_dir=SCREENSHOT_DIR,
llm_query_fn=llm_query
)
# Run simulation
log = orchestrator.run_simulation(max_turns=MAX_TURNS)
# Save log
log.save(LOG_PATH)
print(f"\nSimulation log saved to: {LOG_PATH}")
# Summary
print("\n" + "=" * 70)
print("SIMULATION SUMMARY")
print("=" * 70)
print(f"Total turns: {log.metadata['total_turns']}")
print(f"Total steps: {len(log.steps)}")
# Per-agent summary
for agent_name in log.metadata['agent_names']:
agent_steps = [s for s in log.steps if s.agent_id == agent_name]
successes = sum(1 for s in agent_steps if s.result_success)
print(f"\n{agent_name}:")
print(f" Actions: {len(agent_steps)}")
print(f" Successful: {successes}")
print(f" Final position: {agent_steps[-1].new_position or agent_steps[-1].agent_position}")
return True
if __name__ == "__main__":
try:
success = run_demo()
print("\nPASS" if success else "\nFAIL")
sys.exit(0 if success else 1)
except Exception as e:
import traceback
traceback.print_exc()
sys.exit(1)
```
---
## Success Criteria
### Hour 3 Integration
- [ ] WorldGraph generates scene tiles correctly
- [ ] Agents receive IF-style room descriptions from WorldGraph
- [ ] Available actions list appears in LLM prompt
- [ ] Actions are parsed and executed
- [ ] Single turn completes successfully
### Hour 4 Multi-Turn
- [ ] TurnOrchestrator cycles through all agents
- [ ] Multiple turns run sequentially
- [ ] SimulationLog captures all steps
- [ ] Log saves to JSON correctly
- [ ] Log can be loaded back
- [ ] Summary shows agent actions and positions
---
## Example Output
```
======================================================================
Multi-Turn Simulation Demo
Running 5 turns with 2 agents
======================================================================
Starting simulation: max 5 turns
==================================================
--- Turn 1/5 ---
Wizard: GO EAST -> Moved east to (6, 4)
Knight: WAIT -> Waited and observed surroundings
--- Turn 2/5 ---
Wizard: GO EAST -> Moved east to (7, 4)
Knight: GO WEST -> Moved west to (14, 4)
[... more turns ...]
======================================================================
SIMULATION SUMMARY
======================================================================
Total turns: 5
Total steps: 10
Wizard:
Actions: 5
Successful: 4
Final position: (9, 4)
Knight:
Actions: 5
Successful: 3
Final position: (11, 4)
Simulation log saved to: /tmp/vllm_multi_turn/simulation_log.json
PASS
```
---
## Next Steps (Future Sessions)
After Hours 3-4 are complete:
1. **Speech System** - Add ANNOUNCE/SPEAK actions with message passing
2. **Button-Door Puzzle** - Use `create_button_door_scenario()` for coordination test
3. **Animated Replay** - Play back simulation with movement animations
4. **NPC Behaviors** - Add scripted entities (patrol, flee, etc.)
5. **Affordance Learning** - Track what agents discover about objects

View file

@ -0,0 +1,399 @@
#!/usr/bin/env python3
"""
Integrated VLLM Demo
====================
Combines:
- WorldGraph for structured room descriptions (#155)
- Action parsing and execution (#156)
- Per-agent perspective rendering
This is the foundation for multi-turn simulation.
"""
import sys
import os
# Add the vllm_demo directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import mcrfpy
from mcrfpy import automation
import requests
import base64
from world_graph import (
WorldGraph, Room, Door, WorldObject, Direction, AgentInfo,
create_two_room_scenario
)
from action_parser import parse_action, ActionType
from action_executor import ActionExecutor
# Configuration
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
SCREENSHOT_DIR = "/tmp/vllm_integrated"
# Sprite constants
FLOOR_TILE = 0
WALL_TILE = 40
WIZARD_SPRITE = 84
KNIGHT_SPRITE = 96
class Agent:
"""Agent wrapper with WorldGraph integration."""
def __init__(self, name: str, display_name: str, entity, world: WorldGraph):
self.name = name
self.display_name = display_name
self.entity = entity
self.world = world
self.message_history = [] # For speech system (future)
@property
def pos(self) -> tuple:
return (int(self.entity.pos[0]), int(self.entity.pos[1]))
@property
def current_room(self) -> str:
"""Get the name of the room this agent is in."""
room = self.world.room_at(*self.pos)
return room.name if room else None
def get_context(self, visible_agents: list) -> dict:
"""
Build complete context for LLM query.
Args:
visible_agents: List of Agent objects visible to this agent
Returns:
Dict with location description, available actions, messages
"""
room_name = self.current_room
# Convert Agent objects to AgentInfo for WorldGraph
agent_infos = [
AgentInfo(
name=a.name,
display_name=a.display_name,
position=a.pos,
is_player=(a.name == self.name)
)
for a in visible_agents
]
return {
"location": self.world.describe_room(
room_name,
visible_agents=agent_infos,
observer_name=self.name
),
"available_actions": self.world.get_available_actions(room_name),
"recent_messages": self.message_history[-5:],
}
def file_to_base64(file_path):
"""Convert image file to base64 string."""
with open(file_path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def llm_chat_completion(messages: list):
"""Send chat completion request to local LLM."""
try:
response = requests.post(VLLM_URL, json={'messages': messages}, timeout=60)
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def message_with_image(text, image_path):
"""Create a message with embedded image for vision models."""
image_data = file_to_base64(image_path)
return {
"role": "user",
"content": [
{"type": "text", "text": text},
{"type": "image_url", "image_url": {"url": "data:image/png;base64," + image_data}}
]
}
def setup_scene_from_world(world: WorldGraph):
"""
Create McRogueFace scene from WorldGraph.
Carves out rooms and places doors based on WorldGraph data.
"""
mcrfpy.createScene("integrated_demo")
mcrfpy.setScene("integrated_demo")
ui = mcrfpy.sceneUI("integrated_demo")
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
# Create grid sized for the world (with margin)
grid = mcrfpy.Grid(
grid_size=(25, 15),
texture=texture,
pos=(5, 5),
size=(1014, 700)
)
grid.fill_color = mcrfpy.Color(20, 20, 30)
grid.zoom = 2.0
ui.append(grid)
# Initialize all tiles as walls
for x in range(25):
for y in range(15):
point = grid.at(x, y)
point.tilesprite = WALL_TILE
point.walkable = False
point.transparent = False
# Carve out rooms from WorldGraph
for room in world.rooms.values():
for rx in range(room.x, room.x + room.width):
for ry in range(room.y, room.y + room.height):
if 0 <= rx < 25 and 0 <= ry < 15:
point = grid.at(rx, ry)
point.tilesprite = FLOOR_TILE
point.walkable = True
point.transparent = True
# Place doors (carve corridor between rooms)
for door in world.doors:
dx, dy = door.position
if 0 <= dx < 25 and 0 <= dy < 15:
point = grid.at(dx, dy)
point.tilesprite = FLOOR_TILE
point.walkable = not door.locked
point.transparent = True
# Create FOV layer for fog of war
fov_layer = grid.add_layer('color', z_index=10)
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
return grid, fov_layer, texture
def create_agents(grid, world: WorldGraph, texture) -> list:
"""Create agent entities in their starting rooms."""
agents = []
# Agent A: Wizard in guard_room
guard_room = world.rooms["guard_room"]
wizard_entity = mcrfpy.Entity(
grid_pos=guard_room.center,
texture=texture,
sprite_index=WIZARD_SPRITE
)
grid.entities.append(wizard_entity)
agents.append(Agent("Wizard", "a wizard", wizard_entity, world))
# Agent B: Knight in armory
armory = world.rooms["armory"]
knight_entity = mcrfpy.Entity(
grid_pos=armory.center,
texture=texture,
sprite_index=KNIGHT_SPRITE
)
grid.entities.append(knight_entity)
agents.append(Agent("Knight", "a knight", knight_entity, world))
return agents
def switch_perspective(grid, fov_layer, agent):
"""Switch grid view to an agent's perspective."""
# Reset fog layer to all unknown (black)
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
# Apply this agent's perspective
fov_layer.apply_perspective(
entity=agent.entity,
visible=mcrfpy.Color(0, 0, 0, 0),
discovered=mcrfpy.Color(40, 40, 60, 180),
unknown=mcrfpy.Color(0, 0, 0, 255)
)
# Update visibility from agent's position
agent.entity.update_visibility()
# Center camera on this agent
px, py = agent.pos
grid.center = (px * 16 + 8, py * 16 + 8)
def get_visible_agents(grid, observer, all_agents) -> list:
"""Get agents visible to the observer based on FOV."""
visible = []
for agent in all_agents:
if agent.name == observer.name:
continue
ax, ay = agent.pos
if grid.is_in_fov(ax, ay):
visible.append(agent)
return visible
def query_agent_llm(agent, screenshot_path, context) -> str:
"""
Query VLLM for agent's action using WorldGraph context.
This uses the structured context from WorldGraph instead of
ad-hoc grounded prompts.
"""
system_prompt = f"""You are {agent.display_name} in a roguelike dungeon game.
You see the world through screenshots and receive text descriptions.
Your goal is to explore and interact with your environment.
Always end your response with a clear action declaration: "Action: <ACTION>"
"""
# Build the user prompt with WorldGraph context
actions_str = ", ".join(context["available_actions"])
user_prompt = f"""{context["location"]}
Available actions: {actions_str}
Look at the screenshot showing your current view. The dark areas are outside your field of vision.
What would you like to do? State your reasoning briefly (1-2 sentences), then declare your action.
Example: "I see a key on the ground that might be useful. Action: TAKE brass_key"
"""
messages = [
{"role": "system", "content": system_prompt},
message_with_image(user_prompt, screenshot_path)
]
resp = llm_chat_completion(messages)
if "error" in resp:
return f"[VLLM Error: {resp['error']}]"
return resp.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
def run_single_turn(grid, fov_layer, agents, executor, turn_num):
"""
Execute one turn for all agents.
Each agent:
1. Gets their perspective rendered
2. Receives WorldGraph context
3. Queries LLM for action
4. Executes the action
"""
print(f"\n{'='*70}")
print(f"TURN {turn_num}")
print("=" * 70)
results = []
for agent in agents:
print(f"\n--- {agent.name}'s Turn ---")
print(f"Position: {agent.pos} | Room: {agent.current_room}")
# Switch perspective to this agent
switch_perspective(grid, fov_layer, agent)
mcrfpy.step(0.016)
# Take screenshot
screenshot_path = os.path.join(
SCREENSHOT_DIR,
f"turn{turn_num}_{agent.name.lower()}.png"
)
automation.screenshot(screenshot_path)
print(f"Screenshot: {screenshot_path}")
# Get context using WorldGraph
visible = get_visible_agents(grid, agent, agents)
context = agent.get_context(visible + [agent]) # Include self for filtering
print(f"\nContext from WorldGraph:")
print(f" Location: {context['location']}")
print(f" Actions: {context['available_actions']}")
# Query LLM
print(f"\nQuerying VLLM...")
response = query_agent_llm(agent, screenshot_path, context)
print(f"Response: {response[:300]}{'...' if len(response) > 300 else ''}")
# Parse and execute action
action = parse_action(response)
print(f"\nParsed: {action.type.value} {action.args}")
result = executor.execute(agent, action)
status = "SUCCESS" if result.success else "FAILED"
print(f"Result: {status} - {result.message}")
results.append({
"agent": agent.name,
"room": agent.current_room,
"context": context,
"response": response,
"action": action,
"result": result
})
return results
def run_demo():
"""Main demo: single integrated turn with WorldGraph context."""
print("=" * 70)
print("Integrated WorldGraph + Action Demo")
print("=" * 70)
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Create world from WorldGraph factory
print("\nCreating world from WorldGraph...")
world = create_two_room_scenario()
print(f" Rooms: {list(world.rooms.keys())}")
print(f" Doors: {len(world.doors)}")
print(f" Objects: {list(world.objects.keys())}")
# Setup scene from WorldGraph
print("\nSetting up scene...")
grid, fov_layer, texture = setup_scene_from_world(world)
# Create agents
print("\nCreating agents...")
agents = create_agents(grid, world, texture)
for agent in agents:
print(f" {agent.name} at {agent.pos} in {agent.current_room}")
# Create executor
executor = ActionExecutor(grid)
# Run one turn
results = run_single_turn(grid, fov_layer, agents, executor, turn_num=1)
# Summary
print("\n" + "=" * 70)
print("TURN SUMMARY")
print("=" * 70)
for r in results:
status = "OK" if r["result"].success else "FAIL"
print(f" {r['agent']}: {r['action'].type.value} -> {status}")
if r["result"].new_position:
print(f" New position: {r['result'].new_position}")
print("\n" + "=" * 70)
print("Demo Complete")
print("=" * 70)
return True
if __name__ == "__main__":
try:
success = run_demo()
print("\nPASS" if success else "\nFAIL")
sys.exit(0 if success else 1)
except Exception as e:
import traceback
traceback.print_exc()
sys.exit(1)

View file

@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""
Multi-Turn Simulation Demo
==========================
Runs multiple turns of agent interaction with full logging.
This is the Phase 1 implementation from issue #154.
Two agents start in separate rooms and can move, observe,
and (in future versions) communicate to solve puzzles.
"""
import sys
import os
# Add the vllm_demo directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import mcrfpy
from mcrfpy import automation
import requests
import base64
from world_graph import (
WorldGraph, Room, Door, WorldObject, Direction, AgentInfo,
create_two_room_scenario, create_button_door_scenario
)
from action_parser import parse_action
from action_executor import ActionExecutor
from turn_orchestrator import TurnOrchestrator, SimulationLog
# Configuration
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
SCREENSHOT_DIR = "/tmp/vllm_multi_turn"
LOG_PATH = "/tmp/vllm_multi_turn/simulation_log.json"
MAX_TURNS = 5
# Sprites
FLOOR_TILE = 0
WALL_TILE = 40
WIZARD_SPRITE = 84
KNIGHT_SPRITE = 96
class Agent:
"""Agent with WorldGraph integration."""
def __init__(self, name: str, display_name: str, entity, world: WorldGraph):
self.name = name
self.display_name = display_name
self.entity = entity
self.world = world
self.message_history = []
@property
def pos(self) -> tuple:
return (int(self.entity.pos[0]), int(self.entity.pos[1]))
@property
def current_room(self) -> str:
room = self.world.room_at(*self.pos)
return room.name if room else None
def get_context(self, visible_agents: list) -> dict:
"""Build context for LLM query."""
room_name = self.current_room
agent_infos = [
AgentInfo(
name=a.name,
display_name=a.display_name,
position=a.pos,
is_player=(a.name == self.name)
)
for a in visible_agents
]
return {
"location": self.world.describe_room(room_name, agent_infos, self.name),
"available_actions": self.world.get_available_actions(room_name),
"recent_messages": self.message_history[-5:],
}
def file_to_base64(path: str) -> str:
"""Convert file to base64 string."""
with open(path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def llm_query(agent, screenshot_path: str, context: dict) -> str:
"""
Query VLLM for agent action.
This function is passed to TurnOrchestrator as the LLM query callback.
"""
system_prompt = f"""You are {agent.display_name} exploring a dungeon.
You receive visual and text information about your surroundings.
Your goal is to explore, find items, and interact with the environment.
Always end your response with: Action: <YOUR_ACTION>"""
actions_str = ", ".join(context["available_actions"])
user_prompt = f"""{context["location"]}
Available actions: {actions_str}
[Screenshot attached showing your current view - dark areas are outside your vision]
What do you do? Brief reasoning (1-2 sentences), then Action: <action>"""
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
{"type": "image_url", "image_url": {
"url": "data:image/png;base64," + file_to_base64(screenshot_path)
}}
]
}
]
try:
resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60)
data = resp.json()
if "error" in data:
return f"[VLLM Error: {data['error']}]"
return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
except Exception as e:
return f"[Connection Error: {e}]"
def setup_scene(world: WorldGraph):
"""Create McRogueFace scene from WorldGraph."""
mcrfpy.createScene("multi_turn")
mcrfpy.setScene("multi_turn")
ui = mcrfpy.sceneUI("multi_turn")
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
grid = mcrfpy.Grid(
grid_size=(25, 15),
texture=texture,
pos=(5, 5),
size=(1014, 700)
)
grid.fill_color = mcrfpy.Color(20, 20, 30)
grid.zoom = 2.0
ui.append(grid)
# Initialize all as walls
for x in range(25):
for y in range(15):
p = grid.at(x, y)
p.tilesprite = WALL_TILE
p.walkable = False
p.transparent = False
# Carve rooms from WorldGraph
for room in world.rooms.values():
for rx in range(room.x, room.x + room.width):
for ry in range(room.y, room.y + room.height):
if 0 <= rx < 25 and 0 <= ry < 15:
p = grid.at(rx, ry)
p.tilesprite = FLOOR_TILE
p.walkable = True
p.transparent = True
# Place doors
for door in world.doors:
dx, dy = door.position
if 0 <= dx < 25 and 0 <= dy < 15:
p = grid.at(dx, dy)
p.tilesprite = FLOOR_TILE
p.walkable = not door.locked
p.transparent = True
# FOV layer
fov_layer = grid.add_layer('color', z_index=10)
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
return grid, fov_layer, texture
def create_agents(grid, world: WorldGraph, texture) -> list:
"""Create agents in their starting rooms."""
agents = []
# Wizard in guard_room (left)
room_a = world.rooms["guard_room"]
wizard = mcrfpy.Entity(
grid_pos=room_a.center,
texture=texture,
sprite_index=WIZARD_SPRITE
)
grid.entities.append(wizard)
agents.append(Agent("Wizard", "a wizard", wizard, world))
# Knight in armory (right)
room_b = world.rooms["armory"]
knight = mcrfpy.Entity(
grid_pos=room_b.center,
texture=texture,
sprite_index=KNIGHT_SPRITE
)
grid.entities.append(knight)
agents.append(Agent("Knight", "a knight", knight, world))
return agents
def run_demo():
"""Run multi-turn simulation."""
print("=" * 70)
print("Multi-Turn Simulation Demo")
print(f"Running up to {MAX_TURNS} turns with 2 agents")
print("=" * 70)
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Create world
print("\nCreating world...")
world = create_two_room_scenario()
print(f" Rooms: {list(world.rooms.keys())}")
print(f" Objects: {list(world.objects.keys())}")
# Setup scene
print("\nSetting up scene...")
grid, fov_layer, texture = setup_scene(world)
# Create agents
print("\nCreating agents...")
agents = create_agents(grid, world, texture)
for agent in agents:
print(f" {agent.name} at {agent.pos} in {agent.current_room}")
# Create orchestrator
orchestrator = TurnOrchestrator(
grid=grid,
fov_layer=fov_layer,
world=world,
agents=agents,
screenshot_dir=SCREENSHOT_DIR,
llm_query_fn=llm_query
)
# Optional: Define a stop condition
def agents_met(orch):
"""Stop when agents are in the same room."""
return orch.agents_in_same_room()
# Run simulation
log = orchestrator.run_simulation(
max_turns=MAX_TURNS,
stop_condition=None # Or use agents_met for early stopping
)
# Save log
log.save(LOG_PATH)
# Print summary
print("\n" + "=" * 70)
print(log.summary())
print("=" * 70)
# Show final positions
print("\nFinal Agent Positions:")
for agent in agents:
print(f" {agent.name}: {agent.pos} in {agent.current_room}")
print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/")
print(f"Simulation log saved to: {LOG_PATH}")
return True
def replay_log(log_path: str):
"""
Replay a simulation from a log file.
This is a utility function for reviewing past simulations.
"""
print(f"Loading simulation from: {log_path}")
log = SimulationLog.load(log_path)
print("\n" + log.summary())
print("\nTurn-by-Turn Replay:")
print("-" * 50)
current_turn = 0
for step in log.steps:
if step.turn != current_turn:
current_turn = step.turn
print(f"\n=== Turn {current_turn} ===")
status = "OK" if step.result_success else "FAIL"
print(f" {step.agent_id}: {step.parsed_action_type} {step.parsed_action_args}")
print(f" {status}: {step.result_message}")
if step.new_position:
print(f" Moved to: {step.new_position}")
if __name__ == "__main__":
# Check for replay mode
if len(sys.argv) > 1 and sys.argv[1] == "--replay":
log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH
replay_log(log_file)
sys.exit(0)
# Normal execution
try:
success = run_demo()
print("\nPASS" if success else "\nFAIL")
sys.exit(0 if success else 1)
except Exception as e:
import traceback
traceback.print_exc()
sys.exit(1)

View file

@ -0,0 +1,136 @@
"""
Action Executor for McRogueFace
===============================
Executes parsed actions in the game world.
Handles movement, collision detection, and action results.
"""
from dataclasses import dataclass
from typing import Optional, List, Tuple
from action_parser import Action, ActionType
@dataclass
class ActionResult:
success: bool
message: str
new_position: Optional[Tuple[int, int]] = None
path: Optional[List[Tuple[int, int]]] = None # For animation replay
class ActionExecutor:
"""Execute actions in the McRogueFace game world."""
# Direction vectors
DIRECTION_VECTORS = {
'NORTH': (0, -1),
'SOUTH': (0, 1),
'EAST': (1, 0),
'WEST': (-1, 0),
}
def __init__(self, grid):
"""
Initialize executor with a grid reference.
Args:
grid: mcrfpy.Grid instance
"""
self.grid = grid
def execute(self, agent, action: Action) -> ActionResult:
"""
Execute an action for an agent.
Args:
agent: Agent wrapper with .entity attribute
action: Parsed Action to execute
Returns:
ActionResult with success status and message
"""
handlers = {
ActionType.GO: self._execute_go,
ActionType.WAIT: self._execute_wait,
ActionType.LOOK: self._execute_look,
ActionType.TAKE: self._execute_take,
ActionType.DROP: self._execute_drop,
ActionType.INVALID: self._execute_invalid,
}
handler = handlers.get(action.type, self._execute_unimplemented)
return handler(agent, action)
def _execute_go(self, agent, action: Action) -> ActionResult:
"""Execute movement in a direction."""
if not action.args or not action.args[0]:
return ActionResult(False, "No direction specified")
direction = action.args[0]
if direction not in self.DIRECTION_VECTORS:
return ActionResult(False, f"Invalid direction: {direction}")
dx, dy = self.DIRECTION_VECTORS[direction]
# Get current position
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
new_x, new_y = current_x + dx, current_y + dy
# Check bounds
grid_w, grid_h = self.grid.grid_size
if not (0 <= new_x < grid_w and 0 <= new_y < grid_h):
return ActionResult(False, f"Cannot go {direction} - edge of map")
# Check walkability
target_cell = self.grid.at(new_x, new_y)
if not target_cell.walkable:
return ActionResult(False, f"Cannot go {direction} - path blocked")
# Check for entity collision (optional - depends on game rules)
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if ex == new_x and ey == new_y:
return ActionResult(False, f"Cannot go {direction} - someone is there")
# Execute movement
agent.entity.pos = (new_x, new_y)
return ActionResult(
success=True,
message=f"Moved {direction.lower()} to ({new_x}, {new_y})",
new_position=(new_x, new_y),
path=[(current_x, current_y), (new_x, new_y)]
)
def _execute_wait(self, agent, action: Action) -> ActionResult:
"""Execute wait action (no-op)."""
return ActionResult(True, "Waited and observed surroundings")
def _execute_look(self, agent, action: Action) -> ActionResult:
"""Execute look action - returns enhanced observation."""
target = action.args[0] if action.args else None
if target:
return ActionResult(True, f"Examined {target} closely")
return ActionResult(True, "Looked around carefully")
def _execute_take(self, agent, action: Action) -> ActionResult:
"""Execute take action (placeholder)."""
item = action.args[0] if action.args else "unknown"
# TODO: Implement inventory system
return ActionResult(False, f"Cannot take {item} - not implemented yet")
def _execute_drop(self, agent, action: Action) -> ActionResult:
"""Execute drop action (placeholder)."""
item = action.args[0] if action.args else "unknown"
return ActionResult(False, f"Cannot drop {item} - not implemented yet")
def _execute_invalid(self, agent, action: Action) -> ActionResult:
"""Handle invalid/unparseable action."""
return ActionResult(False, f"Could not understand action: {action.args[0]}")
def _execute_unimplemented(self, agent, action: Action) -> ActionResult:
"""Handle unimplemented action types."""
return ActionResult(False, f"Action {action.type.value} not yet implemented")

View file

@ -0,0 +1,118 @@
"""
Action Parser for LLM Agent Responses
=====================================
Extracts structured actions from free-form LLM text responses.
Handles variations like "Action: GO EAST", "I'll go east", "GO E", etc.
"""
import re
from dataclasses import dataclass
from typing import Optional, Tuple, Any
from enum import Enum
class ActionType(Enum):
GO = "GO"
WAIT = "WAIT"
LOOK = "LOOK"
TAKE = "TAKE"
DROP = "DROP"
PUSH = "PUSH"
USE = "USE"
OPEN = "OPEN"
CLOSE = "CLOSE"
ANNOUNCE = "ANNOUNCE"
SPEAK = "SPEAK"
INVALID = "INVALID"
@dataclass
class Action:
type: ActionType
args: Tuple[Any, ...] = ()
raw_match: str = ""
class ActionParser:
"""Parse LLM responses into structured actions."""
# Direction normalization
DIRECTIONS = {
'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST',
'NORTH': 'NORTH', 'SOUTH': 'SOUTH', 'EAST': 'EAST', 'WEST': 'WEST',
'UP': 'NORTH', 'DOWN': 'SOUTH', 'LEFT': 'WEST', 'RIGHT': 'EAST',
}
# Patterns ordered by specificity (most specific first)
PATTERNS = [
# Explicit "Action: X" format (preferred)
(ActionType.GO, r'Action:\s*GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.WAIT, r'Action:\s*WAIT\b', 0),
(ActionType.LOOK, r'Action:\s*LOOK(?:\s+AT\s+(\w+))?\b', 1),
(ActionType.TAKE, r'Action:\s*TAKE\s+(\w+)', 1),
(ActionType.DROP, r'Action:\s*DROP\s+(\w+)', 1),
(ActionType.PUSH, r'Action:\s*PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)', 2),
(ActionType.USE, r'Action:\s*USE\s+(\w+)(?:\s+ON\s+(\w+))?', 2),
(ActionType.OPEN, r'Action:\s*OPEN\s+(\w+)', 1),
(ActionType.CLOSE, r'Action:\s*CLOSE\s+(\w+)', 1),
(ActionType.ANNOUNCE, r'Action:\s*ANNOUNCE\s+["\'](.+?)["\']', 1),
(ActionType.SPEAK, r'Action:\s*SPEAK\s+["\'](.+?)["\']', 1),
# Fallback patterns (less strict)
(ActionType.GO, r'\bGO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.GO, r'\bmove\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.GO, r'\bhead\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1),
(ActionType.WAIT, r'\bWAIT\b', 0),
(ActionType.LOOK, r'\bLOOK\b', 0),
]
def parse(self, llm_response: str) -> Action:
"""
Parse an LLM response and extract the action.
Returns Action with type=INVALID if no valid action found.
"""
# Normalize to uppercase for matching
text = llm_response.upper()
for action_type, pattern, num_groups in self.PATTERNS:
match = re.search(pattern, text, re.IGNORECASE)
if match:
args = self._extract_args(match, num_groups, action_type)
return Action(
type=action_type,
args=args,
raw_match=match.group(0)
)
# No valid action found
return Action(
type=ActionType.INVALID,
args=(llm_response[:100],), # First 100 chars for debugging
raw_match=""
)
def _extract_args(self, match, num_groups: int, action_type: ActionType) -> tuple:
"""Extract and normalize arguments from regex match."""
if num_groups == 0:
return ()
args = []
for i in range(1, num_groups + 1):
group = match.group(i)
if group:
# Normalize directions
if action_type == ActionType.GO or (action_type == ActionType.PUSH and i == 2):
group = self.DIRECTIONS.get(group.upper(), group.upper())
args.append(group)
else:
args.append(None)
return tuple(args)
# Convenience function
def parse_action(llm_response: str) -> Action:
"""Parse an LLM response into an Action."""
return ActionParser().parse(llm_response)

View file

@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""
Unit tests for action_parser.py
===============================
Tests the ActionParser's ability to extract structured actions
from various LLM response formats.
"""
import sys
from action_parser import parse_action, ActionType
def test_explicit_go_directions():
"""Test explicit 'Action: GO <direction>' format."""
# Cardinal directions
assert parse_action("Action: GO NORTH").type == ActionType.GO
assert parse_action("Action: GO NORTH").args == ("NORTH",)
assert parse_action("Action: GO SOUTH").type == ActionType.GO
assert parse_action("Action: GO SOUTH").args == ("SOUTH",)
assert parse_action("Action: GO EAST").type == ActionType.GO
assert parse_action("Action: GO EAST").args == ("EAST",)
assert parse_action("Action: GO WEST").type == ActionType.GO
assert parse_action("Action: GO WEST").args == ("WEST",)
print(" [PASS] Explicit GO directions")
def test_short_directions():
"""Test short direction abbreviations (N, S, E, W)."""
assert parse_action("Action: GO N").args == ("NORTH",)
assert parse_action("Action: GO S").args == ("SOUTH",)
assert parse_action("Action: GO E").args == ("EAST",)
assert parse_action("Action: GO W").args == ("WEST",)
print(" [PASS] Short direction abbreviations")
def test_case_insensitivity():
"""Test that parsing is case-insensitive."""
assert parse_action("action: go south").type == ActionType.GO
assert parse_action("ACTION: GO SOUTH").type == ActionType.GO
assert parse_action("Action: Go South").type == ActionType.GO
assert parse_action("action: GO south").type == ActionType.GO
print(" [PASS] Case insensitivity")
def test_fallback_patterns():
"""Test fallback patterns without 'Action:' prefix."""
# Natural language variations
assert parse_action("I think I'll GO WEST to explore").type == ActionType.GO
assert parse_action("I'll GO NORTH").type == ActionType.GO
assert parse_action("Let me GO EAST").type == ActionType.GO
# Move variations
assert parse_action("I should move NORTH").type == ActionType.GO
assert parse_action("Let me head SOUTH").type == ActionType.GO
print(" [PASS] Fallback patterns")
def test_wait_action():
"""Test WAIT action parsing."""
assert parse_action("Action: WAIT").type == ActionType.WAIT
assert parse_action("I'll WAIT here").type == ActionType.WAIT
assert parse_action("Let me WAIT and see").type == ActionType.WAIT
print(" [PASS] WAIT action")
def test_look_action():
"""Test LOOK action parsing."""
assert parse_action("Action: LOOK").type == ActionType.LOOK
assert parse_action("Action: LOOK AT door").type == ActionType.LOOK
assert parse_action("Action: LOOK AT door").args == ("DOOR",)
print(" [PASS] LOOK action")
def test_invalid_actions():
"""Test that invalid actions are properly flagged."""
result = parse_action("I'm not sure what to do")
assert result.type == ActionType.INVALID
result = parse_action("Let me think about this...")
assert result.type == ActionType.INVALID
result = parse_action("The weather is nice today")
assert result.type == ActionType.INVALID
print(" [PASS] Invalid action detection")
def test_raw_match_capture():
"""Test that raw_match captures the matched text."""
result = parse_action("After thinking, Action: GO NORTH is best")
assert "GO NORTH" in result.raw_match
print(" [PASS] Raw match capture")
def test_embedded_actions():
"""Test extraction of actions embedded in longer text."""
long_response = """
Looking at the screenshot, I can see I'm in a dungeon corridor.
There's a rat to the east and a wall to the north.
The path south appears clear.
I think the best course of action is to investigate the rat.
Action: GO EAST
"""
result = parse_action(long_response)
assert result.type == ActionType.GO
assert result.args == ("EAST",)
print(" [PASS] Embedded action extraction")
def test_complex_actions():
"""Test more complex action types."""
# TAKE action
assert parse_action("Action: TAKE sword").type == ActionType.TAKE
assert parse_action("Action: TAKE sword").args == ("SWORD",)
# DROP action
assert parse_action("Action: DROP shield").type == ActionType.DROP
# USE action
assert parse_action("Action: USE key").type == ActionType.USE
assert parse_action("Action: USE key ON door").type == ActionType.USE
# OPEN/CLOSE
assert parse_action("Action: OPEN chest").type == ActionType.OPEN
assert parse_action("Action: CLOSE door").type == ActionType.CLOSE
print(" [PASS] Complex action types")
def test_push_action():
"""Test PUSH action with direction."""
result = parse_action("Action: PUSH boulder NORTH")
assert result.type == ActionType.PUSH
assert result.args == ("BOULDER", "NORTH")
result = parse_action("Action: PUSH box E")
assert result.type == ActionType.PUSH
assert result.args == ("BOX", "EAST")
print(" [PASS] PUSH action")
def test_speak_announce_actions():
"""Test SPEAK and ANNOUNCE with quoted strings."""
result = parse_action('Action: SPEAK "Hello there!"')
assert result.type == ActionType.SPEAK
assert result.args[0] == "HELLO THERE!" # Uppercase due to text normalization
result = parse_action("Action: ANNOUNCE 'Watch out!'")
assert result.type == ActionType.ANNOUNCE
print(" [PASS] SPEAK/ANNOUNCE actions")
def run_all_tests():
"""Run all parser tests."""
print("=" * 60)
print("Action Parser Tests")
print("=" * 60)
tests = [
test_explicit_go_directions,
test_short_directions,
test_case_insensitivity,
test_fallback_patterns,
test_wait_action,
test_look_action,
test_invalid_actions,
test_raw_match_capture,
test_embedded_actions,
test_complex_actions,
test_push_action,
test_speak_announce_actions,
]
passed = 0
failed = 0
for test in tests:
try:
test()
passed += 1
except AssertionError as e:
print(f" [FAIL] {test.__name__}: {e}")
failed += 1
except Exception as e:
print(f" [ERROR] {test.__name__}: {e}")
failed += 1
print("=" * 60)
print(f"Results: {passed} passed, {failed} failed")
print("=" * 60)
return failed == 0
if __name__ == "__main__":
success = run_all_tests()
sys.exit(0 if success else 1)

View file

@ -0,0 +1,139 @@
"""
Unit tests for WorldGraph
"""
from world_graph import (
WorldGraph, Room, Door, WorldObject, Direction,
AgentInfo, create_two_room_scenario, create_button_door_scenario
)
def test_room_contains():
"""Test room boundary checking."""
room = Room("test", "test room", bounds=(5, 5, 10, 10))
assert room.contains(5, 5) == True # Top-left corner
assert room.contains(14, 14) == True # Bottom-right (exclusive)
assert room.contains(15, 15) == False # Outside
assert room.contains(4, 5) == False # Just outside left
print("PASS: room_contains")
def test_room_at():
"""Test spatial room lookup."""
world = create_two_room_scenario()
# Guard room is at (1,1) with size (8,8)
room = world.room_at(3, 3)
assert room is not None
assert room.name == "guard_room"
# Armory is at (11,1) with size (8,8)
room = world.room_at(13, 3)
assert room is not None
assert room.name == "armory"
# Between rooms (the door area) - should return None
room = world.room_at(9, 4)
assert room is None
print("PASS: room_at")
def test_describe_room_basic():
"""Test basic room description."""
world = create_two_room_scenario()
desc = world.describe_room("guard_room")
assert "You are in the guard room" in desc
assert "brass key" in desc
assert "Exits:" in desc
assert "east" in desc
assert "armory" in desc
print("PASS: describe_room_basic")
print(f" Output: {desc}")
def test_describe_room_with_agents():
"""Test room description with visible agents."""
world = create_two_room_scenario()
agents = [
AgentInfo("Wizard", "a wizard", (3, 3)),
AgentInfo("Knight", "a knight", (4, 4)),
]
desc = world.describe_room("guard_room", visible_agents=agents, observer_name="Wizard")
assert "knight" in desc.lower()
assert "wizard" not in desc.lower() # Observer excluded
print("PASS: describe_room_with_agents")
print(f" Output: {desc}")
def test_describe_locked_door():
"""Test that locked doors are described correctly."""
world = create_button_door_scenario()
desc = world.describe_room("button_room")
assert "locked" in desc.lower()
print("PASS: describe_locked_door")
print(f" Output: {desc}")
def test_available_actions():
"""Test action enumeration."""
world = create_two_room_scenario()
actions = world.get_available_actions("guard_room")
assert "GO EAST" in actions
assert "TAKE brass_key" in actions
assert "LOOK" in actions
assert "WAIT" in actions
print("PASS: available_actions")
print(f" Actions: {actions}")
def test_determinism():
"""Test that descriptions are deterministic."""
world = create_two_room_scenario()
desc1 = world.describe_room("guard_room")
desc2 = world.describe_room("guard_room")
desc3 = world.describe_room("guard_room")
assert desc1 == desc2 == desc3, "Descriptions must be deterministic!"
print("PASS: determinism")
def test_direction_opposites():
"""Test direction opposite calculation."""
assert Direction.NORTH.opposite == Direction.SOUTH
assert Direction.SOUTH.opposite == Direction.NORTH
assert Direction.EAST.opposite == Direction.WEST
assert Direction.WEST.opposite == Direction.EAST
print("PASS: direction_opposites")
def run_all_tests():
"""Run all WorldGraph tests."""
print("=" * 50)
print("WorldGraph Unit Tests")
print("=" * 50)
test_room_contains()
test_room_at()
test_describe_room_basic()
test_describe_room_with_agents()
test_describe_locked_door()
test_available_actions()
test_determinism()
test_direction_opposites()
print("=" * 50)
print("All tests passed!")
print("=" * 50)
if __name__ == "__main__":
run_all_tests()

View file

@ -0,0 +1,301 @@
"""
Turn Orchestrator
=================
Manages multi-turn simulation with logging for replay.
Coordinates perspective switching, LLM queries, and action execution.
"""
import json
import os
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
from world_graph import WorldGraph, AgentInfo
from action_parser import Action, ActionType, parse_action
from action_executor import ActionExecutor, ActionResult
@dataclass
class SimulationStep:
"""Record of one agent's turn."""
turn: int
agent_id: str
agent_position: tuple
room: str
perception: Dict[str, Any] # Context shown to LLM
llm_response: str # Raw LLM output
parsed_action_type: str # Action type as string
parsed_action_args: tuple # Action arguments
result_success: bool
result_message: str
new_position: Optional[tuple] = None
path: Optional[List[tuple]] = None # For animation replay
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@dataclass
class SimulationLog:
"""Complete simulation record for replay and analysis."""
metadata: Dict[str, Any]
steps: List[SimulationStep] = field(default_factory=list)
def save(self, path: str):
"""Save log to JSON file."""
data = {
"metadata": self.metadata,
"steps": [asdict(s) for s in self.steps]
}
with open(path, 'w') as f:
json.dump(data, f, indent=2, default=str)
print(f"Simulation log saved to: {path}")
@classmethod
def load(cls, path: str) -> 'SimulationLog':
"""Load log from JSON file."""
with open(path) as f:
data = json.load(f)
steps = []
for s in data["steps"]:
# Convert tuple strings back to tuples
if isinstance(s.get("agent_position"), list):
s["agent_position"] = tuple(s["agent_position"])
if isinstance(s.get("new_position"), list):
s["new_position"] = tuple(s["new_position"])
if isinstance(s.get("parsed_action_args"), list):
s["parsed_action_args"] = tuple(s["parsed_action_args"])
if s.get("path"):
s["path"] = [tuple(p) for p in s["path"]]
steps.append(SimulationStep(**s))
return cls(metadata=data["metadata"], steps=steps)
def get_agent_steps(self, agent_name: str) -> List[SimulationStep]:
"""Get all steps for a specific agent."""
return [s for s in self.steps if s.agent_id == agent_name]
def get_turn_steps(self, turn: int) -> List[SimulationStep]:
"""Get all steps from a specific turn."""
return [s for s in self.steps if s.turn == turn]
def summary(self) -> str:
"""Generate a summary of the simulation."""
lines = [
f"Simulation Summary",
f"==================",
f"Total turns: {self.metadata.get('total_turns', 'unknown')}",
f"Total steps: {len(self.steps)}",
f"Agents: {', '.join(self.metadata.get('agent_names', []))}",
f"",
]
# Per-agent stats
for agent_name in self.metadata.get('agent_names', []):
agent_steps = self.get_agent_steps(agent_name)
successes = sum(1 for s in agent_steps if s.result_success)
lines.append(f"{agent_name}:")
lines.append(f" Actions: {len(agent_steps)}")
lines.append(f" Successful: {successes}")
if agent_steps:
final = agent_steps[-1]
final_pos = final.new_position or final.agent_position
lines.append(f" Final position: {final_pos}")
lines.append(f" Final room: {final.room}")
lines.append("")
return "\n".join(lines)
class TurnOrchestrator:
"""
Orchestrates multi-turn simulation.
Handles:
- Turn sequencing
- Perspective switching
- LLM queries
- Action execution
- Simulation logging
"""
def __init__(self, grid, fov_layer, world: WorldGraph, agents: list,
screenshot_dir: str, llm_query_fn: Callable):
"""
Initialize orchestrator.
Args:
grid: mcrfpy.Grid instance
fov_layer: Color layer for FOV rendering
world: WorldGraph instance
agents: List of Agent objects
screenshot_dir: Directory for screenshots
llm_query_fn: Function(agent, screenshot_path, context) -> str
"""
self.grid = grid
self.fov_layer = fov_layer
self.world = world
self.agents = agents
self.screenshot_dir = screenshot_dir
self.llm_query_fn = llm_query_fn
self.executor = ActionExecutor(grid)
self.turn_number = 0
self.steps: List[SimulationStep] = []
os.makedirs(screenshot_dir, exist_ok=True)
def run_turn(self) -> List[SimulationStep]:
"""
Execute one full turn (all agents act once).
Returns list of SimulationSteps for this turn.
"""
import mcrfpy
self.turn_number += 1
turn_steps = []
print(f"\n{'='*60}")
print(f"TURN {self.turn_number}")
print("=" * 60)
for agent in self.agents:
step = self._run_agent_turn(agent)
turn_steps.append(step)
self.steps.append(step)
return turn_steps
def run_simulation(self, max_turns: int = 10,
stop_condition: Callable = None) -> SimulationLog:
"""
Run complete simulation.
Args:
max_turns: Maximum number of turns to run
stop_condition: Optional callable(orchestrator) -> bool
Returns True to stop simulation early
Returns:
SimulationLog with all steps
"""
print(f"\nStarting simulation: max {max_turns} turns")
print(f"Agents: {[a.name for a in self.agents]}")
print("=" * 60)
for turn in range(max_turns):
self.run_turn()
# Check stop condition
if stop_condition and stop_condition(self):
print(f"\nStop condition met at turn {self.turn_number}")
break
# Create log
log = SimulationLog(
metadata={
"total_turns": self.turn_number,
"num_agents": len(self.agents),
"agent_names": [a.name for a in self.agents],
"timestamp": datetime.now().isoformat(),
"world_rooms": list(self.world.rooms.keys()),
"screenshot_dir": self.screenshot_dir,
},
steps=self.steps
)
return log
def _run_agent_turn(self, agent) -> SimulationStep:
"""Execute one agent's turn."""
import mcrfpy
from mcrfpy import automation
print(f"\n--- {agent.name}'s Turn ---")
print(f"Position: {agent.pos} | Room: {agent.current_room}")
# Switch perspective
self._switch_perspective(agent)
mcrfpy.step(0.016)
# Screenshot
screenshot_path = os.path.join(
self.screenshot_dir,
f"turn{self.turn_number}_{agent.name.lower()}.png"
)
automation.screenshot(screenshot_path)
# Build context
visible_agents = self._get_visible_agents(agent)
context = agent.get_context(visible_agents + [agent])
# Query LLM
llm_response = self.llm_query_fn(agent, screenshot_path, context)
# Parse and execute
action = parse_action(llm_response)
result = self.executor.execute(agent, action)
# Log output
status = "SUCCESS" if result.success else "FAILED"
print(f" Action: {action.type.value} {action.args}")
print(f" Result: {status} - {result.message}")
# Build step record
step = SimulationStep(
turn=self.turn_number,
agent_id=agent.name,
agent_position=agent.pos,
room=agent.current_room,
perception={
"location": context["location"],
"available_actions": context["available_actions"],
},
llm_response=llm_response,
parsed_action_type=action.type.value,
parsed_action_args=action.args,
result_success=result.success,
result_message=result.message,
new_position=result.new_position,
path=result.path
)
return step
def _switch_perspective(self, agent):
"""Switch grid view to agent's perspective."""
import mcrfpy
self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
self.fov_layer.apply_perspective(
entity=agent.entity,
visible=mcrfpy.Color(0, 0, 0, 0),
discovered=mcrfpy.Color(40, 40, 60, 180),
unknown=mcrfpy.Color(0, 0, 0, 255)
)
agent.entity.update_visibility()
px, py = agent.pos
self.grid.center = (px * 16 + 8, py * 16 + 8)
def _get_visible_agents(self, observer) -> list:
"""Get agents visible to observer based on FOV."""
visible = []
for agent in self.agents:
if agent.name == observer.name:
continue
ax, ay = agent.pos
if self.grid.is_in_fov(ax, ay):
visible.append(agent)
return visible
def get_agent_positions(self) -> Dict[str, tuple]:
"""Get current positions of all agents."""
return {a.name: a.pos for a in self.agents}
def agents_in_same_room(self) -> bool:
"""Check if all agents are in the same room."""
rooms = [a.current_room for a in self.agents]
return len(set(rooms)) == 1

View file

@ -0,0 +1,474 @@
"""
WorldGraph: Room-based World Representation
============================================
Provides dual-purpose data structures for:
1. Generating 2D tilemaps (visual representation)
2. Generating text descriptions (LLM context)
Ensures deterministic text output: same state = same description.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from enum import Enum
class Direction(Enum):
NORTH = "north"
SOUTH = "south"
EAST = "east"
WEST = "west"
@property
def opposite(self) -> 'Direction':
opposites = {
Direction.NORTH: Direction.SOUTH,
Direction.SOUTH: Direction.NORTH,
Direction.EAST: Direction.WEST,
Direction.WEST: Direction.EAST,
}
return opposites[self]
@property
def vector(self) -> Tuple[int, int]:
vectors = {
Direction.NORTH: (0, -1),
Direction.SOUTH: (0, 1),
Direction.EAST: (1, 0),
Direction.WEST: (-1, 0),
}
return vectors[self]
@dataclass
class Room:
"""A room in the world graph."""
name: str # Internal ID: "kitchen", "guard_room"
display_name: str # Text output: "the kitchen", "a dimly lit guard room"
bounds: Tuple[int, int, int, int] # (x, y, width, height) in tile coords
properties: Dict[str, Any] = field(default_factory=dict) # {"lit": True, "temperature": "warm"}
description_template: Optional[str] = None # "A {temperature} room with {features}."
@property
def x(self) -> int:
return self.bounds[0]
@property
def y(self) -> int:
return self.bounds[1]
@property
def width(self) -> int:
return self.bounds[2]
@property
def height(self) -> int:
return self.bounds[3]
@property
def center(self) -> Tuple[int, int]:
return (self.x + self.width // 2, self.y + self.height // 2)
def contains(self, x: int, y: int) -> bool:
"""Check if a tile coordinate is within this room."""
return (self.x <= x < self.x + self.width and
self.y <= y < self.y + self.height)
@dataclass
class Door:
"""A connection between two rooms."""
room_a: str # Room name
room_b: str # Room name
position: Tuple[int, int] # Tile position of the door
direction_from_a: Direction # Direction from room_a to reach room_b
locked: bool = False
key_id: Optional[str] = None # Which key unlocks this door
@property
def direction_from_b(self) -> Direction:
return self.direction_from_a.opposite
@dataclass
class WorldObject:
"""An interactable object in the world."""
name: str # Internal ID: "brass_key"
display_name: str # Text output: "a brass key"
room: str # Which room contains it
position: Tuple[int, int] # Tile position (or None if carried)
affordances: List[str] = field(default_factory=list) # ["takeable", "unlocks:pantry_door"]
description: str = "" # "A tarnished brass key with ornate handle."
@dataclass
class AgentInfo:
"""Information about an agent for description purposes."""
name: str # "Wizard", "Knight"
display_name: str # "a wizard", "the knight"
position: Tuple[int, int] # Current tile position
is_player: bool = False # Is this the observing agent?
class WorldGraph:
"""
Graph-based world representation.
Provides:
- Room/door/object storage
- Deterministic text description generation
- Spatial queries (what room is at x,y?)
- Available action enumeration
"""
def __init__(self):
self.rooms: Dict[str, Room] = {}
self.doors: List[Door] = []
self.objects: Dict[str, WorldObject] = {}
# =========================================================================
# Building the World
# =========================================================================
def add_room(self, room: Room) -> None:
"""Add a room to the world."""
self.rooms[room.name] = room
def add_door(self, door: Door) -> None:
"""Add a door connecting two rooms."""
self.doors.append(door)
def add_object(self, obj: WorldObject) -> None:
"""Add an object to the world."""
self.objects[obj.name] = obj
# =========================================================================
# Spatial Queries
# =========================================================================
def room_at(self, x: int, y: int) -> Optional[Room]:
"""Get the room containing a tile coordinate."""
for room in self.rooms.values():
if room.contains(x, y):
return room
return None
def get_exits(self, room_name: str) -> List[Door]:
"""Get all doors leading out of a room."""
exits = []
for door in self.doors:
if door.room_a == room_name or door.room_b == room_name:
exits.append(door)
return exits
def get_door_in_direction(self, room_name: str, direction: Direction) -> Optional[Door]:
"""Get the door in a specific direction from a room."""
for door in self.doors:
if door.room_a == room_name and door.direction_from_a == direction:
return door
if door.room_b == room_name and door.direction_from_b == direction:
return door
return None
def get_objects_in_room(self, room_name: str) -> List[WorldObject]:
"""Get all objects in a room."""
return [obj for obj in self.objects.values() if obj.room == room_name]
# =========================================================================
# Text Description Generation (Deterministic!)
# =========================================================================
def describe_room(self, room_name: str,
visible_agents: List[AgentInfo] = None,
observer_name: str = None) -> str:
"""
Generate a complete room description.
Args:
room_name: The room to describe
visible_agents: List of agents visible in the room
observer_name: Name of the observing agent (excluded from description)
Returns:
Deterministic prose description of the room
"""
room = self.rooms.get(room_name)
if not room:
return "You are in an unknown location."
parts = []
# Base location
parts.append(f"You are in {room.display_name}.")
# Room template description (if any)
if room.description_template and room.properties:
try:
desc = room.description_template.format(**room.properties)
parts.append(desc)
except KeyError:
pass
# Visible agents
if visible_agents:
agent_desc = self._describe_agents(visible_agents, observer_name)
if agent_desc:
parts.append(agent_desc)
# Objects on the ground
objects = self.get_objects_in_room(room_name)
if objects:
obj_desc = self._describe_objects(objects)
parts.append(obj_desc)
# Exits
exits = self.get_exits(room_name)
parts.append(self._describe_exits(room_name, exits))
return " ".join(parts)
def _describe_agents(self, agents: List[AgentInfo], observer_name: str = None) -> str:
"""Describe visible agents (excluding observer)."""
others = [a for a in agents if a.name != observer_name and not a.is_player]
if not others:
return ""
if len(others) == 1:
return f"You see {others[0].display_name} here."
else:
names = [a.display_name for a in others]
formatted = ", ".join(names[:-1]) + f" and {names[-1]}"
return f"You see {formatted} here."
def _describe_objects(self, objects: List[WorldObject]) -> str:
"""Describe objects in the room."""
if not objects:
return ""
# Group by affordance for natural description
takeable = [o for o in objects if "takeable" in o.affordances]
furniture = [o for o in objects if "takeable" not in o.affordances]
parts = []
if takeable:
if len(takeable) == 1:
parts.append(f"On the ground you see {takeable[0].display_name}.")
else:
names = [o.display_name for o in takeable]
formatted = ", ".join(names[:-1]) + f" and {names[-1]}"
parts.append(f"On the ground you see {formatted}.")
if furniture:
for obj in furniture:
parts.append(f"There is {obj.display_name} here.")
return " ".join(parts)
def _describe_exits(self, room_name: str, exits: List[Door]) -> str:
"""Describe available exits."""
if not exits:
return "There are no visible exits."
exit_parts = []
for door in exits:
# Determine direction and destination from this room's perspective
if door.room_a == room_name:
direction = door.direction_from_a.value
dest_room = self.rooms.get(door.room_b)
else:
direction = door.direction_from_b.value
dest_room = self.rooms.get(door.room_a)
dest_name = dest_room.display_name if dest_room else "unknown"
if door.locked:
exit_parts.append(f"{direction} ({dest_name}, locked)")
else:
exit_parts.append(f"{direction} ({dest_name})")
# Sort for deterministic output
exit_parts.sort()
return "Exits: " + ", ".join(exit_parts) + "."
# =========================================================================
# Action Enumeration
# =========================================================================
def get_available_actions(self, room_name: str,
can_speak: bool = True) -> List[str]:
"""
Get list of available actions for an agent in a room.
Returns list of action strings like:
["GO NORTH", "GO EAST", "TAKE brass_key", "WAIT", "LOOK"]
"""
actions = ["LOOK", "WAIT"]
# Movement actions
for door in self.get_exits(room_name):
if door.room_a == room_name:
direction = door.direction_from_a.value.upper()
else:
direction = door.direction_from_b.value.upper()
if not door.locked:
actions.append(f"GO {direction}")
else:
# Could add UNLOCK action here if agent has key
pass
# Object interactions
for obj in self.get_objects_in_room(room_name):
if "takeable" in obj.affordances:
actions.append(f"TAKE {obj.name}")
if "pushable" in obj.affordances:
actions.append(f"PUSH {obj.name} <direction>")
if "openable" in obj.affordances:
actions.append(f"OPEN {obj.name}")
if "readable" in obj.affordances:
actions.append(f"READ {obj.name}")
# Speech actions
if can_speak:
actions.append("ANNOUNCE '<message>'")
actions.append("SPEAK '<message>'")
return sorted(actions)
# =============================================================================
# Factory Functions for Common Scenarios
# =============================================================================
def create_two_room_scenario() -> WorldGraph:
"""
Create a simple two-room test scenario.
Layout:
+--------+ +--------+
| Room A |===| Room B |
| (west) | | (east) |
+--------+ +--------+
Room A: "the guard room" - contains a brass key
Room B: "the armory" - destination room
Door: unlocked, between rooms
"""
world = WorldGraph()
# Room A (left side)
room_a = Room(
name="guard_room",
display_name="the guard room",
bounds=(1, 1, 8, 8), # x, y, width, height
properties={"lit": True, "atmosphere": "musty"},
description_template="The air is {atmosphere}."
)
world.add_room(room_a)
# Room B (right side)
room_b = Room(
name="armory",
display_name="the armory",
bounds=(11, 1, 8, 8),
properties={"lit": True, "atmosphere": "cold"},
description_template="Weapon racks line the walls."
)
world.add_room(room_b)
# Door connecting them
door = Door(
room_a="guard_room",
room_b="armory",
position=(9, 4), # Between the rooms
direction_from_a=Direction.EAST,
locked=False
)
world.add_door(door)
# Object in Room A
key = WorldObject(
name="brass_key",
display_name="a brass key",
room="guard_room",
position=(3, 3),
affordances=["takeable", "unlocks:dungeon_door"],
description="A tarnished brass key with an ornate handle."
)
world.add_object(key)
return world
def create_button_door_scenario() -> WorldGraph:
"""
Create the Phase 1 scenario from issue #154.
Layout:
+----------+ +----------+
| Room A | | Room B |
| [Button] |===| [Goal] |
| Agent A | | Agent B |
+----------+ +----------+
- Door starts locked
- Button in Room A unlocks the door
- Agent A can reach button; Agent B's goal is blocked by door
- Success: Agents coordinate to solve puzzle
"""
world = WorldGraph()
# Room A (button room)
room_a = Room(
name="button_room",
display_name="the button room",
bounds=(1, 1, 8, 8),
properties={"lit": True}
)
world.add_room(room_a)
# Room B (goal room)
room_b = Room(
name="goal_room",
display_name="the goal room",
bounds=(11, 1, 8, 8),
properties={"lit": True}
)
world.add_room(room_b)
# Locked door
door = Door(
room_a="button_room",
room_b="goal_room",
position=(9, 4),
direction_from_a=Direction.EAST,
locked=True,
key_id="button_mechanism"
)
world.add_door(door)
# Button in Room A
button = WorldObject(
name="wall_button",
display_name="a large button on the wall",
room="button_room",
position=(2, 4),
affordances=["pressable", "activates:main_door"],
description="A heavy stone button protrudes from the wall."
)
world.add_object(button)
# Goal marker in Room B
goal = WorldObject(
name="goal_marker",
display_name="a glowing rune on the floor",
room="goal_room",
position=(15, 4),
affordances=["examinable"],
description="An arcane symbol pulses with soft light."
)
world.add_object(goal)
return world