# Hour 1: Action Parser & Executor **Issue**: #156 Turn-based LLM Agent Orchestration **Goal**: Agents can actually move when they say "GO EAST" **Parallelizable with**: Hour 2 (no dependencies) --- ## Deliverables 1. `action_parser.py` - Parse LLM text responses into structured actions 2. `action_executor.py` - Execute parsed actions in the game world 3. Modified `1_multi_agent_demo.py` - Integrate parser/executor to show movement --- ## File 1: `action_parser.py` ```python """ Action Parser for LLM Agent Responses ===================================== Extracts structured actions from free-form LLM text responses. Handles variations like "Action: GO EAST", "I'll go east", "GO E", etc. """ import re from dataclasses import dataclass from typing import Optional, Tuple, Any from enum import Enum class ActionType(Enum): GO = "GO" WAIT = "WAIT" LOOK = "LOOK" TAKE = "TAKE" DROP = "DROP" PUSH = "PUSH" USE = "USE" OPEN = "OPEN" CLOSE = "CLOSE" ANNOUNCE = "ANNOUNCE" SPEAK = "SPEAK" INVALID = "INVALID" @dataclass class Action: type: ActionType args: Tuple[Any, ...] = () raw_match: str = "" class ActionParser: """Parse LLM responses into structured actions.""" # Direction normalization DIRECTIONS = { 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', 'NORTH': 'NORTH', 'SOUTH': 'SOUTH', 'EAST': 'EAST', 'WEST': 'WEST', 'UP': 'NORTH', 'DOWN': 'SOUTH', 'LEFT': 'WEST', 'RIGHT': 'EAST', } # Patterns ordered by specificity (most specific first) PATTERNS = [ # Explicit "Action: X" format (preferred) (ActionType.GO, r'Action:\s*GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), (ActionType.WAIT, r'Action:\s*WAIT\b', 0), (ActionType.LOOK, r'Action:\s*LOOK(?:\s+AT\s+(\w+))?\b', 1), (ActionType.TAKE, r'Action:\s*TAKE\s+(\w+)', 1), (ActionType.DROP, r'Action:\s*DROP\s+(\w+)', 1), (ActionType.PUSH, r'Action:\s*PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)', 2), (ActionType.USE, r'Action:\s*USE\s+(\w+)(?:\s+ON\s+(\w+))?', 2), (ActionType.OPEN, r'Action:\s*OPEN\s+(\w+)', 1), (ActionType.CLOSE, r'Action:\s*CLOSE\s+(\w+)', 1), (ActionType.ANNOUNCE, r'Action:\s*ANNOUNCE\s+["\'](.+?)["\']', 1), (ActionType.SPEAK, r'Action:\s*SPEAK\s+["\'](.+?)["\']', 1), # Fallback patterns (less strict) (ActionType.GO, r'\bGO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), (ActionType.GO, r'\bmove\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), (ActionType.GO, r'\bhead\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)\b', 1), (ActionType.WAIT, r'\bWAIT\b', 0), (ActionType.LOOK, r'\bLOOK\b', 0), ] def parse(self, llm_response: str) -> Action: """ Parse an LLM response and extract the action. Returns Action with type=INVALID if no valid action found. """ # Normalize to uppercase for matching text = llm_response.upper() for action_type, pattern, num_groups in self.PATTERNS: match = re.search(pattern, text, re.IGNORECASE) if match: args = self._extract_args(match, num_groups, action_type) return Action( type=action_type, args=args, raw_match=match.group(0) ) # No valid action found return Action( type=ActionType.INVALID, args=(llm_response[:100],), # First 100 chars for debugging raw_match="" ) def _extract_args(self, match, num_groups: int, action_type: ActionType) -> tuple: """Extract and normalize arguments from regex match.""" if num_groups == 0: return () args = [] for i in range(1, num_groups + 1): group = match.group(i) if group: # Normalize directions if action_type == ActionType.GO or (action_type == ActionType.PUSH and i == 2): group = self.DIRECTIONS.get(group.upper(), group.upper()) args.append(group) else: args.append(None) return tuple(args) # Convenience function def parse_action(llm_response: str) -> Action: """Parse an LLM response into an Action.""" return ActionParser().parse(llm_response) ``` --- ## File 2: `action_executor.py` ```python """ Action Executor for McRogueFace =============================== Executes parsed actions in the game world. Handles movement, collision detection, and action results. """ from dataclasses import dataclass from typing import Optional, List, Tuple from action_parser import Action, ActionType @dataclass class ActionResult: success: bool message: str new_position: Optional[Tuple[int, int]] = None path: Optional[List[Tuple[int, int]]] = None # For animation replay class ActionExecutor: """Execute actions in the McRogueFace game world.""" # Direction vectors DIRECTION_VECTORS = { 'NORTH': (0, -1), 'SOUTH': (0, 1), 'EAST': (1, 0), 'WEST': (-1, 0), } def __init__(self, grid): """ Initialize executor with a grid reference. Args: grid: mcrfpy.Grid instance """ self.grid = grid def execute(self, agent, action: Action) -> ActionResult: """ Execute an action for an agent. Args: agent: Agent wrapper with .entity attribute action: Parsed Action to execute Returns: ActionResult with success status and message """ handlers = { ActionType.GO: self._execute_go, ActionType.WAIT: self._execute_wait, ActionType.LOOK: self._execute_look, ActionType.TAKE: self._execute_take, ActionType.DROP: self._execute_drop, ActionType.INVALID: self._execute_invalid, } handler = handlers.get(action.type, self._execute_unimplemented) return handler(agent, action) def _execute_go(self, agent, action: Action) -> ActionResult: """Execute movement in a direction.""" if not action.args or not action.args[0]: return ActionResult(False, "No direction specified") direction = action.args[0] if direction not in self.DIRECTION_VECTORS: return ActionResult(False, f"Invalid direction: {direction}") dx, dy = self.DIRECTION_VECTORS[direction] # Get current position current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) new_x, new_y = current_x + dx, current_y + dy # Check bounds grid_w, grid_h = self.grid.grid_size if not (0 <= new_x < grid_w and 0 <= new_y < grid_h): return ActionResult(False, f"Cannot go {direction} - edge of map") # Check walkability target_cell = self.grid.at(new_x, new_y) if not target_cell.walkable: return ActionResult(False, f"Cannot go {direction} - path blocked") # Check for entity collision (optional - depends on game rules) for entity in self.grid.entities: if entity is agent.entity: continue ex, ey = int(entity.pos[0]), int(entity.pos[1]) if ex == new_x and ey == new_y: return ActionResult(False, f"Cannot go {direction} - someone is there") # Execute movement agent.entity.grid_pos = (new_x, new_y) return ActionResult( success=True, message=f"Moved {direction.lower()} to ({new_x}, {new_y})", new_position=(new_x, new_y), path=[(current_x, current_y), (new_x, new_y)] ) def _execute_wait(self, agent, action: Action) -> ActionResult: """Execute wait action (no-op).""" return ActionResult(True, "Waited and observed surroundings") def _execute_look(self, agent, action: Action) -> ActionResult: """Execute look action - returns enhanced observation.""" target = action.args[0] if action.args else None if target: return ActionResult(True, f"Examined {target} closely") return ActionResult(True, "Looked around carefully") def _execute_take(self, agent, action: Action) -> ActionResult: """Execute take action (placeholder).""" item = action.args[0] if action.args else "unknown" # TODO: Implement inventory system return ActionResult(False, f"Cannot take {item} - not implemented yet") def _execute_drop(self, agent, action: Action) -> ActionResult: """Execute drop action (placeholder).""" item = action.args[0] if action.args else "unknown" return ActionResult(False, f"Cannot drop {item} - not implemented yet") def _execute_invalid(self, agent, action: Action) -> ActionResult: """Handle invalid/unparseable action.""" return ActionResult(False, f"Could not understand action: {action.args[0]}") def _execute_unimplemented(self, agent, action: Action) -> ActionResult: """Handle unimplemented action types.""" return ActionResult(False, f"Action {action.type.value} not yet implemented") ``` --- ## Modifications to `1_multi_agent_demo.py` Add these changes after the existing `query_agent` function: ```python # Add imports at top from action_parser import parse_action from action_executor import ActionExecutor, ActionResult # In run_demo(), after setup_scene(): executor = ActionExecutor(grid) # Replace the agent loop with: for i, agent in enumerate(agents): print(f"\n{'='*70}") print(f"Agent {i+1}/3: {agent.name} ({agent.description})") print(f"Position: {agent.pos}") print("=" * 70) # Switch to this agent's perspective switch_perspective(grid, fov_layer, agent) mcrfpy.step(0.016) # Take screenshot screenshot_path = os.path.join(SCREENSHOT_DIR, f"{i}_{agent.name.lower()}_view.png") result = automation.screenshot(screenshot_path) if not result: print(f"ERROR: Failed to take screenshot for {agent.name}") continue # Get visible entities and query VLLM visible = get_visible_entities(grid, agent, agents, rat) grounded_text = build_grounded_prompt(visible) print(f"Grounded observations: {grounded_text}") print(f"\nQuerying VLLM for {agent.name}...") response = query_agent(agent, screenshot_path, grounded_text) print(f"\n{agent.name}'s Response:\n{response}") # NEW: Parse and execute action print(f"\n--- Action Execution ---") action = parse_action(response) print(f"Parsed action: {action.type.value} {action.args}") result = executor.execute(agent, action) if result.success: print(f"SUCCESS: {result.message}") if result.new_position: # Update perspective after movement switch_perspective(grid, fov_layer, agent) mcrfpy.step(0.016) else: print(f"FAILED: {result.message}") ``` --- ## Testing ### Unit test for parser (`test_action_parser.py`): ```python from action_parser import parse_action, ActionType def test_parser(): # Explicit format assert parse_action("Action: GO NORTH").type == ActionType.GO assert parse_action("Action: GO NORTH").args == ("NORTH",) # Short directions assert parse_action("Action: GO E").args == ("EAST",) # Case insensitive assert parse_action("action: go south").type == ActionType.GO # Fallback patterns assert parse_action("I think I'll GO WEST").type == ActionType.GO # Wait and Look assert parse_action("Action: WAIT").type == ActionType.WAIT assert parse_action("Action: LOOK").type == ActionType.LOOK # Invalid assert parse_action("I'm not sure what to do").type == ActionType.INVALID print("All parser tests passed!") if __name__ == "__main__": test_parser() ``` --- ## Success Criteria - [ ] `action_parser.py` correctly parses all GO directions (N/S/E/W and full names) - [ ] `action_parser.py` handles WAIT, LOOK, and INVALID cases - [ ] `action_executor.py` moves entities when GO succeeds - [ ] `action_executor.py` returns failure message when path is blocked - [ ] Modified demo shows "Moved east to (5, 7)" style output - [ ] Entities visibly change position between turns --- ## Notes for Integration (Hour 3) The `ActionExecutor` will be enhanced in Hour 3 to: - Use `WorldGraph` for room-based movement (GO NORTH = walk through door to next room) - Support multi-tile pathfinding for room transitions - Return path data for animation replay Keep the current single-tile movement as the foundation.