diff --git a/stubs/mcrfpy.pyi b/stubs/mcrfpy.pyi index c89e2e8..bc0c00f 100644 --- a/stubs/mcrfpy.pyi +++ b/stubs/mcrfpy.pyi @@ -75,22 +75,31 @@ class Font: class Drawable: """Base class for all drawable UI elements.""" - + x: float y: float visible: bool z_index: int name: str pos: Vector - + + # Mouse event callbacks (#140, #141) + on_click: Optional[Callable[[float, float, int, str], None]] + on_enter: Optional[Callable[[float, float, int, str], None]] + on_exit: Optional[Callable[[float, float, int, str], None]] + on_move: Optional[Callable[[float, float, int, str], None]] + + # Read-only hover state (#140) + hovered: bool + def get_bounds(self) -> Tuple[float, float, float, float]: """Get bounding box as (x, y, width, height).""" ... - + def move(self, dx: float, dy: float) -> None: """Move by relative offset (dx, dy).""" ... - + def resize(self, width: float, height: float) -> None: """Resize to new dimensions (width, height).""" ... @@ -331,45 +340,47 @@ class EntityCollection: class Scene: """Base class for object-oriented scenes.""" - + name: str - + children: UICollection # #151: UI elements collection (read-only alias for get_ui()) + on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action) + def __init__(self, name: str) -> None: ... - + def activate(self) -> None: """Called when scene becomes active.""" ... - + def deactivate(self) -> None: """Called when scene becomes inactive.""" ... - + def get_ui(self) -> UICollection: """Get UI elements collection.""" ... - + def on_keypress(self, key: str, pressed: bool) -> None: - """Handle keyboard events.""" + """Handle keyboard events (override in subclass).""" ... - + def on_click(self, x: float, y: float, button: int) -> None: - """Handle mouse clicks.""" + """Handle mouse clicks (override in subclass).""" ... - + def on_enter(self) -> None: - """Called when entering the scene.""" + """Called when entering the scene (override in subclass).""" ... - + def on_exit(self) -> None: - """Called when leaving the scene.""" + """Called when leaving the scene (override in subclass).""" ... - + def on_resize(self, width: int, height: int) -> None: - """Handle window resize events.""" + """Handle window resize events (override in subclass).""" ... - + def update(self, dt: float) -> None: - """Update scene logic.""" + """Update scene logic (override in subclass).""" ... class Timer: diff --git a/tests/demo/screens/focus_system_demo.py b/tests/demo/screens/focus_system_demo.py new file mode 100644 index 0000000..fc3ba88 --- /dev/null +++ b/tests/demo/screens/focus_system_demo.py @@ -0,0 +1,808 @@ +#!/usr/bin/env python3 +"""Focus System Demo for McRogueFace + +Demonstrates a Python-level focus management system using engine primitives. +This shows how game developers can implement keyboard navigation without +requiring C++ engine changes. + +Features demonstrated: +- Click-to-focus +- Tab/Shift+Tab cycling +- Visual focus indicators +- Keyboard routing to focused widget +- Modal focus stack +- Three widget types: Grid (WASD), TextInput, MenuIcon + +Issue: #143 +""" + +import mcrfpy +import sys + +# ============================================================================= +# Modifier Key Tracker (workaround until #160 is implemented) +# ============================================================================= + +class ModifierTracker: + """Tracks modifier key state since engine doesn't expose this yet.""" + + def __init__(self): + self.shift = False + self.ctrl = False + self.alt = False + + def update(self, key: str, action: str): + """Call this from your key handler to update modifier state.""" + if key in ("LShift", "RShift"): + self.shift = (action == "start") + elif key in ("LControl", "RControl"): + self.ctrl = (action == "start") + elif key in ("LAlt", "RAlt"): + self.alt = (action == "start") + + +# ============================================================================= +# Focus Manager +# ============================================================================= + +class FocusManager: + """Central focus coordinator for a scene. + + Manages which widget receives keyboard input, handles tab cycling, + and maintains a modal stack for popup dialogs. + """ + + # Focus indicator colors + FOCUS_COLOR = mcrfpy.Color(0, 150, 255) # Blue + UNFOCUS_COLOR = mcrfpy.Color(80, 80, 80) # Dark gray + FOCUS_OUTLINE = 3.0 + UNFOCUS_OUTLINE = 1.0 + + def __init__(self): + self.widgets = [] # List of (widget, focusable: bool) + self.focus_index = -1 # Currently focused widget index + self.modal_stack = [] # Stack of (modal_frame, previous_focus_index) + self.modifiers = ModifierTracker() + + def register(self, widget, focusable: bool = True): + """Add a widget to the focus order. + + Args: + widget: Object implementing on_focus(), on_blur(), handle_key() + focusable: Whether this widget can receive focus via Tab + """ + self.widgets.append((widget, focusable)) + # Give widget a reference back to us for click-to-focus + widget._focus_manager = self + widget._focus_index = len(self.widgets) - 1 + + def focus(self, widget_or_index): + """Set focus to a specific widget.""" + # Resolve to index + if isinstance(widget_or_index, int): + new_index = widget_or_index + else: + new_index = next( + (i for i, (w, _) in enumerate(self.widgets) if w is widget_or_index), + -1 + ) + + if new_index < 0 or new_index >= len(self.widgets): + return + + # Blur old widget + if 0 <= self.focus_index < len(self.widgets): + old_widget, _ = self.widgets[self.focus_index] + if hasattr(old_widget, 'on_blur'): + old_widget.on_blur() + + # Focus new widget + self.focus_index = new_index + new_widget, _ = self.widgets[new_index] + if hasattr(new_widget, 'on_focus'): + new_widget.on_focus() + + def cycle(self, direction: int = 1): + """Cycle focus to next/previous focusable widget. + + Args: + direction: 1 for next (Tab), -1 for previous (Shift+Tab) + """ + if not self.widgets: + return + + start = self.focus_index if self.focus_index >= 0 else 0 + current = start + + for _ in range(len(self.widgets)): + current = (current + direction) % len(self.widgets) + widget, focusable = self.widgets[current] + if focusable: + self.focus(current) + return + + # No focusable widget found, stay where we are + + def push_modal(self, modal_frame, first_focus_widget=None): + """Push a modal onto the focus stack. + + Args: + modal_frame: The Frame to show as modal + first_focus_widget: Widget to focus inside modal (optional) + """ + # Save current focus + self.modal_stack.append((modal_frame, self.focus_index)) + + # Show modal + modal_frame.visible = True + + # Focus first widget in modal if specified + if first_focus_widget is not None: + self.focus(first_focus_widget) + + def pop_modal(self): + """Pop the top modal and restore previous focus.""" + if not self.modal_stack: + return False + + modal_frame, previous_focus = self.modal_stack.pop() + modal_frame.visible = False + + # Restore focus + if previous_focus >= 0: + self.focus(previous_focus) + + return True + + def handle_key(self, key: str, action: str) -> bool: + """Main key handler - route to focused widget or handle global keys. + + Returns True if key was consumed. + """ + # Always update modifier state + self.modifiers.update(key, action) + + # Only process on key press, not release (key repeat sends multiple "start") + if action != "start": + return False + + # Global: Escape closes modals + if key == "Escape": + if self.pop_modal(): + return True + + # Global: Tab cycles focus + if key == "Tab": + direction = -1 if self.modifiers.shift else 1 + self.cycle(direction) + return True + + # Route to focused widget + if 0 <= self.focus_index < len(self.widgets): + widget, _ = self.widgets[self.focus_index] + if hasattr(widget, 'handle_key'): + if widget.handle_key(key, action): + return True + + return False + + +# ============================================================================= +# Focusable Widgets +# ============================================================================= + +class FocusableGrid: + """A grid where WASD keys move a player entity. + + Demonstrates focus on a game-world element. + """ + + def __init__(self, x: float, y: float, grid_w: int, grid_h: int, + tile_size: int = 16, zoom: float = 2.0): + self.grid_w = grid_w + self.grid_h = grid_h + self.tile_size = tile_size + self.zoom = zoom + self.base_x = x + self.base_y = y + + # Calculate pixel dimensions + self.cell_px = tile_size * zoom # Pixels per cell + grid_pixel_w = grid_w * self.cell_px + grid_pixel_h = grid_h * self.cell_px + + # Create the grid background + self.grid = mcrfpy.Grid( + pos=(x, y), + grid_size=(grid_w, grid_h), + size=(grid_pixel_w, grid_pixel_h) + ) + self.grid.zoom = zoom + self.grid.fill_color = mcrfpy.Color(40, 40, 55) + + # Add outline frame for focus indication + self.outline_frame = mcrfpy.Frame( + pos=(x - 2, y - 2), + size=(grid_pixel_w + 4, grid_pixel_h + 4), + fill_color=mcrfpy.Color(0, 0, 0, 0), + outline_color=FocusManager.UNFOCUS_COLOR, + outline=FocusManager.UNFOCUS_OUTLINE + ) + + # Player marker (a bright square overlay) + self.player_x = grid_w // 2 + self.player_y = grid_h // 2 + marker_size = self.cell_px - 4 # Slightly smaller than cell + self.player_marker = mcrfpy.Frame( + pos=(0, 0), # Will be positioned by _update_player_display + size=(marker_size, marker_size), + fill_color=mcrfpy.Color(255, 200, 50), + outline_color=mcrfpy.Color(255, 150, 0), + outline=2 + ) + self._update_player_display() + + # Click handler + self.grid.on_click = self._on_click + + # Focus manager reference (set by FocusManager.register) + self._focus_manager = None + self._focus_index = -1 + + def _on_click(self, x, y, button, action): + """Handle click to focus this grid.""" + if self._focus_manager and action == "start": + self._focus_manager.focus(self._focus_index) + + def _update_player_display(self): + """Update the visual representation of player position.""" + # Position the player marker + px = self.base_x + (self.player_x * self.cell_px) + 2 + py = self.base_y + (self.player_y * self.cell_px) + 2 + self.player_marker.x = px + self.player_marker.y = py + + def on_focus(self): + """Called when this widget gains focus.""" + self.outline_frame.outline_color = FocusManager.FOCUS_COLOR + self.outline_frame.outline = FocusManager.FOCUS_OUTLINE + + def on_blur(self): + """Called when this widget loses focus.""" + self.outline_frame.outline_color = FocusManager.UNFOCUS_COLOR + self.outline_frame.outline = FocusManager.UNFOCUS_OUTLINE + + def handle_key(self, key: str, action: str) -> bool: + """Handle WASD movement.""" + moves = { + "W": (0, -1), "Up": (0, -1), + "A": (-1, 0), "Left": (-1, 0), + "S": (0, 1), "Down": (0, 1), + "D": (1, 0), "Right": (1, 0), + } + + if key in moves: + dx, dy = moves[key] + new_x = self.player_x + dx + new_y = self.player_y + dy + + # Bounds check + if 0 <= new_x < self.grid_w and 0 <= new_y < self.grid_h: + self.player_x = new_x + self.player_y = new_y + self._update_player_display() + return True + + return False + + def add_to_scene(self, ui): + """Add all components to a scene's UI collection.""" + ui.append(self.outline_frame) + ui.append(self.grid) + ui.append(self.player_marker) + + +class TextInputWidget: + """A text input field with cursor and editing. + + Demonstrates text entry with focus indication. + """ + + def __init__(self, x: float, y: float, width: float, label: str = "", + placeholder: str = ""): + self.x = x + self.y = y + self.width = width + self.height = 28 + self.label_text = label + self.placeholder_text = placeholder + + # State + self.text = "" + self.cursor_pos = 0 + self.focused = False + + # Create UI elements + self._create_ui() + + # Focus manager reference + self._focus_manager = None + self._focus_index = -1 + + def _create_ui(self): + """Create the visual components.""" + # Label above input + if self.label_text: + self.label = mcrfpy.Caption( + text=self.label_text, + pos=(self.x, self.y - 20) + ) + self.label.fill_color = mcrfpy.Color(200, 200, 200) + + # Input background + self.frame = mcrfpy.Frame( + pos=(self.x, self.y), + size=(self.width, self.height), + fill_color=mcrfpy.Color(40, 40, 50), + outline_color=FocusManager.UNFOCUS_COLOR, + outline=FocusManager.UNFOCUS_OUTLINE + ) + self.frame.on_click = self._on_click + + # Placeholder text + self.placeholder = mcrfpy.Caption( + text=self.placeholder_text, + pos=(self.x + 6, self.y + 5) + ) + self.placeholder.fill_color = mcrfpy.Color(100, 100, 100) + + # Actual text display + self.display = mcrfpy.Caption( + text="", + pos=(self.x + 6, self.y + 5) + ) + self.display.fill_color = mcrfpy.Color(255, 255, 255) + + # Cursor (thin frame) + self.cursor = mcrfpy.Frame( + pos=(self.x + 6, self.y + 4), + size=(2, self.height - 8), + fill_color=mcrfpy.Color(255, 255, 255) + ) + self.cursor.visible = False + + def _on_click(self, x, y, button, action): + """Handle click to focus.""" + if self._focus_manager and action == "start": + self._focus_manager.focus(self._focus_index) + + def _update_display(self): + """Update visual state.""" + self.display.text = self.text + self.placeholder.visible = (not self.text and not self.focused) + self._update_cursor() + + def _update_cursor(self): + """Update cursor position.""" + # Approximate character width (monospace assumption) + char_width = 10 + self.cursor.x = self.x + 6 + (self.cursor_pos * char_width) + + def on_focus(self): + """Called when gaining focus.""" + self.focused = True + self.frame.outline_color = FocusManager.FOCUS_COLOR + self.frame.outline = FocusManager.FOCUS_OUTLINE + self.cursor.visible = True + self._update_display() + + def on_blur(self): + """Called when losing focus.""" + self.focused = False + self.frame.outline_color = FocusManager.UNFOCUS_COLOR + self.frame.outline = FocusManager.UNFOCUS_OUTLINE + self.cursor.visible = False + self._update_display() + + def handle_key(self, key: str, action: str) -> bool: + """Handle text input and editing keys.""" + if not self.focused: + return False + + old_text = self.text + handled = True + + if key == "BackSpace": + if self.cursor_pos > 0: + self.text = self.text[:self.cursor_pos-1] + self.text[self.cursor_pos:] + self.cursor_pos -= 1 + elif key == "Delete": + if self.cursor_pos < len(self.text): + self.text = self.text[:self.cursor_pos] + self.text[self.cursor_pos+1:] + elif key == "Left": + self.cursor_pos = max(0, self.cursor_pos - 1) + elif key == "Right": + self.cursor_pos = min(len(self.text), self.cursor_pos + 1) + elif key == "Home": + self.cursor_pos = 0 + elif key == "End": + self.cursor_pos = len(self.text) + elif key in ("Return", "Tab"): + # Don't consume - let focus manager handle + handled = False + elif len(key) == 1 and key.isprintable(): + # Insert character + self.text = self.text[:self.cursor_pos] + key + self.text[self.cursor_pos:] + self.cursor_pos += 1 + else: + handled = False + + self._update_display() + return handled + + def get_text(self) -> str: + """Get the current text value.""" + return self.text + + def set_text(self, text: str): + """Set the text value.""" + self.text = text + self.cursor_pos = len(text) + self._update_display() + + def add_to_scene(self, ui): + """Add all components to the scene.""" + if hasattr(self, 'label'): + ui.append(self.label) + ui.append(self.frame) + ui.append(self.placeholder) + ui.append(self.display) + ui.append(self.cursor) + + +class MenuIcon: + """An icon that opens a modal dialog when activated. + + Demonstrates activation via Space/Enter and modal focus. + """ + + def __init__(self, x: float, y: float, size: float, icon_char: str, + tooltip: str, modal_content_builder=None): + self.x = x + self.y = y + self.size = size + self.tooltip = tooltip + self.modal_content_builder = modal_content_builder + self.modal = None + + # Create icon frame + self.frame = mcrfpy.Frame( + pos=(x, y), + size=(size, size), + fill_color=mcrfpy.Color(60, 60, 80), + outline_color=FocusManager.UNFOCUS_COLOR, + outline=FocusManager.UNFOCUS_OUTLINE + ) + self.frame.on_click = self._on_click + + # Icon character (centered) + self.icon = mcrfpy.Caption( + text=icon_char, + pos=(x + size//3, y + size//6) + ) + self.icon.fill_color = mcrfpy.Color(200, 200, 220) + + # Tooltip (shown on hover/focus) + self.tooltip_caption = mcrfpy.Caption( + text=tooltip, + pos=(x, y + size + 4) + ) + self.tooltip_caption.fill_color = mcrfpy.Color(150, 150, 150) + self.tooltip_caption.visible = False + + # Focus manager reference + self._focus_manager = None + self._focus_index = -1 + + def _on_click(self, x, y, button, action): + """Handle click to focus or activate.""" + if not self._focus_manager: + return + + if action == "start": + # If already focused, activate; otherwise just focus + if self._focus_manager.focus_index == self._focus_index: + self._activate() + else: + self._focus_manager.focus(self._focus_index) + + def _activate(self): + """Open the modal dialog.""" + if self.modal and self._focus_manager: + self._focus_manager.push_modal(self.modal) + + def on_focus(self): + """Called when gaining focus.""" + self.frame.outline_color = FocusManager.FOCUS_COLOR + self.frame.outline = FocusManager.FOCUS_OUTLINE + self.frame.fill_color = mcrfpy.Color(80, 80, 110) + self.tooltip_caption.visible = True + + def on_blur(self): + """Called when losing focus.""" + self.frame.outline_color = FocusManager.UNFOCUS_COLOR + self.frame.outline = FocusManager.UNFOCUS_OUTLINE + self.frame.fill_color = mcrfpy.Color(60, 60, 80) + self.tooltip_caption.visible = False + + def handle_key(self, key: str, action: str) -> bool: + """Handle activation keys.""" + if key in ("Space", "Return"): + self._activate() + return True + return False + + def set_modal(self, modal_frame): + """Set the modal frame this icon opens.""" + self.modal = modal_frame + + def add_to_scene(self, ui): + """Add all components to the scene.""" + ui.append(self.frame) + ui.append(self.icon) + ui.append(self.tooltip_caption) + + +# ============================================================================= +# Modal Dialog Builder +# ============================================================================= + +def create_modal(x: float, y: float, width: float, height: float, + title: str) -> mcrfpy.Frame: + """Create a modal dialog frame.""" + # Semi-transparent backdrop + # Note: This is simplified - real implementation might want fullscreen backdrop + + # Modal frame + modal = mcrfpy.Frame( + pos=(x, y), + size=(width, height), + fill_color=mcrfpy.Color(40, 40, 50), + outline_color=mcrfpy.Color(100, 100, 120), + outline=2 + ) + modal.visible = False + + # Title + title_caption = mcrfpy.Caption( + text=title, + pos=(x + 10, y + 8) + ) + title_caption.fill_color = mcrfpy.Color(220, 220, 240) + modal.children.append(title_caption) + + # Close hint + close_hint = mcrfpy.Caption( + text="[Esc to close]", + pos=(x + width - 100, y + 8) + ) + close_hint.fill_color = mcrfpy.Color(120, 120, 140) + modal.children.append(close_hint) + + return modal + + +# ============================================================================= +# Demo Scene Setup +# ============================================================================= + +def create_demo_scene(): + """Create and populate the focus system demo scene.""" + + # Create scene + mcrfpy.createScene("focus_demo") + ui = mcrfpy.sceneUI("focus_demo") + + # Background + bg = mcrfpy.Frame( + pos=(0, 0), + size=(1024, 768), + fill_color=mcrfpy.Color(25, 25, 35) + ) + ui.append(bg) + + # Title + title = mcrfpy.Caption( + text="Focus System Demo", + pos=(20, 15) + ) + title.fill_color = mcrfpy.Color(255, 255, 255) + ui.append(title) + + # Instructions + instructions = mcrfpy.Caption( + text="Tab: cycle focus | Shift+Tab: reverse | WASD: move in grid | Space/Enter: activate | Esc: close modal", + pos=(20, 45) + ) + instructions.fill_color = mcrfpy.Color(150, 150, 170) + ui.append(instructions) + + # Create focus manager + focus_mgr = FocusManager() + + # --- Grid Section --- + grid_label = mcrfpy.Caption(text="Game Grid (WASD to move)", pos=(50, 90)) + grid_label.fill_color = mcrfpy.Color(180, 180, 200) + ui.append(grid_label) + + grid_widget = FocusableGrid(50, 115, 10, 8, tile_size=16, zoom=2.0) + grid_widget.add_to_scene(ui) + focus_mgr.register(grid_widget) + + # --- Text Inputs Section --- + input_label = mcrfpy.Caption(text="Text Inputs", pos=(400, 90)) + input_label.fill_color = mcrfpy.Color(180, 180, 200) + ui.append(input_label) + + name_input = TextInputWidget(400, 130, 250, label="Name:", placeholder="Enter your name") + name_input.add_to_scene(ui) + focus_mgr.register(name_input) + + class_input = TextInputWidget(400, 200, 250, label="Class:", placeholder="e.g. Warrior, Mage") + class_input.add_to_scene(ui) + focus_mgr.register(class_input) + + notes_input = TextInputWidget(400, 270, 350, label="Notes:", placeholder="Additional notes...") + notes_input.add_to_scene(ui) + focus_mgr.register(notes_input) + + # --- Menu Icons Section --- + icons_label = mcrfpy.Caption(text="Menu Icons", pos=(50, 390)) + icons_label.fill_color = mcrfpy.Color(180, 180, 200) + ui.append(icons_label) + + # Help icon + help_icon = MenuIcon(50, 420, 48, "?", "Help") + help_icon.add_to_scene(ui) + focus_mgr.register(help_icon) + + help_modal = create_modal(200, 150, 400, 300, "Help") + ui.append(help_modal) + help_text = mcrfpy.Caption( + text="This demo shows focus management.\n\nUse Tab to move between widgets.\nWASD moves the player in the grid.\nType in text fields.\nPress Space on icons to open dialogs.", + pos=(210, 190) + ) + help_text.fill_color = mcrfpy.Color(200, 200, 200) + help_modal.children.append(help_text) + help_icon.set_modal(help_modal) + + # Settings icon + settings_icon = MenuIcon(110, 420, 48, "S", "Settings") + settings_icon.add_to_scene(ui) + focus_mgr.register(settings_icon) + + settings_modal = create_modal(200, 150, 400, 250, "Settings") + ui.append(settings_modal) + settings_text = mcrfpy.Caption( + text="Settings would go here.\n\n(This is a placeholder modal)", + pos=(210, 190) + ) + settings_text.fill_color = mcrfpy.Color(200, 200, 200) + settings_modal.children.append(settings_text) + settings_icon.set_modal(settings_modal) + + # Inventory icon + inv_icon = MenuIcon(170, 420, 48, "I", "Inventory") + inv_icon.add_to_scene(ui) + focus_mgr.register(inv_icon) + + inv_modal = create_modal(200, 150, 400, 300, "Inventory") + ui.append(inv_modal) + inv_text = mcrfpy.Caption( + text="Your inventory:\n\n- Sword\n- Shield\n- 3x Potions", + pos=(210, 190) + ) + inv_text.fill_color = mcrfpy.Color(200, 200, 200) + inv_modal.children.append(inv_text) + inv_icon.set_modal(inv_modal) + + # --- Status Display --- + status_frame = mcrfpy.Frame( + pos=(50, 520), + size=(700, 80), + fill_color=mcrfpy.Color(35, 35, 45), + outline_color=mcrfpy.Color(60, 60, 70), + outline=1 + ) + ui.append(status_frame) + + status_label = mcrfpy.Caption(text="Status", pos=(60, 530)) + status_label.fill_color = mcrfpy.Color(150, 150, 170) + ui.append(status_label) + + status_text = mcrfpy.Caption(text="Click or Tab to focus a widget", pos=(60, 555)) + status_text.fill_color = mcrfpy.Color(200, 200, 200) + ui.append(status_text) + + # Store references for status updates + demo_state = { + 'focus_mgr': focus_mgr, + 'status_text': status_text, + 'grid': grid_widget, + 'inputs': [name_input, class_input, notes_input], + 'icons': [help_icon, settings_icon, inv_icon], + } + + # Key handler that routes to focus manager + def on_key(key: str, action: str): + focus_mgr.handle_key(key, action) + + # Update status display + if focus_mgr.focus_index >= 0: + widget, _ = focus_mgr.widgets[focus_mgr.focus_index] + if widget is grid_widget: + status_text.text = f"Grid focused - Player at ({grid_widget.player_x}, {grid_widget.player_y})" + elif widget in demo_state['inputs']: + idx = demo_state['inputs'].index(widget) + labels = ["Name", "Class", "Notes"] + status_text.text = f"{labels[idx]} input focused - Text: '{widget.get_text()}'" + elif widget in demo_state['icons']: + status_text.text = f"Icon focused: {widget.tooltip}" + else: + status_text.text = "No widget focused" + + # Activate scene first (keypressScene sets handler for CURRENT scene) + mcrfpy.setScene("focus_demo") + + # Register key handler for the now-current scene + mcrfpy.keypressScene(on_key) + + # Set initial focus + focus_mgr.focus(0) + + return demo_state + + +# ============================================================================= +# Entry Point +# ============================================================================= + +def run_demo(): + """Run the focus system demo.""" + print("=== Focus System Demo ===") + print("Demonstrating Python-level focus management") + print() + print("Controls:") + print(" Tab / Shift+Tab - Cycle between widgets") + print(" WASD / Arrows - Move player in grid (when focused)") + print(" Type - Enter text in inputs (when focused)") + print(" Space / Enter - Activate icons (when focused)") + print(" Escape - Close modal dialogs") + print(" Click - Focus clicked widget") + print() + + demo_state = create_demo_scene() + + # Set up exit timer for headless testing + def check_exit(dt): + # In headless mode, exit after a short delay + # In interactive mode, this won't trigger + pass + + # mcrfpy.setTimer("demo_check", check_exit, 100) + + +# Run if executed directly +if __name__ == "__main__": + import sys + from mcrfpy import automation + + run_demo() + + # If --screenshot flag, take a screenshot and exit + if "--screenshot" in sys.argv or len(sys.argv) > 1: + def take_screenshot(dt): + automation.screenshot("focus_demo_screenshot.png") + print("Screenshot saved: focus_demo_screenshot.png") + sys.exit(0) + mcrfpy.setTimer("screenshot", take_screenshot, 200) diff --git a/tests/vllm_demo/1_multi_agent_demo.py b/tests/vllm_demo/1_multi_agent_demo.py index debc98e..b69bccb 100644 --- a/tests/vllm_demo/1_multi_agent_demo.py +++ b/tests/vllm_demo/1_multi_agent_demo.py @@ -14,12 +14,15 @@ Three agents: Each agent gets their own screenshot and VLLM query. """ +import sys +import os +# Add the vllm_demo directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + import mcrfpy from mcrfpy import automation -import sys import requests import base64 -import os import random from action_parser import parse_action diff --git a/tests/vllm_demo/4_enhanced_action_demo.py b/tests/vllm_demo/4_enhanced_action_demo.py new file mode 100644 index 0000000..2986733 --- /dev/null +++ b/tests/vllm_demo/4_enhanced_action_demo.py @@ -0,0 +1,436 @@ +#!/usr/bin/env python3 +""" +Enhanced Action Demo +==================== + +Demonstrates the enhanced action economy system: +- Free actions (LOOK, SPEAK/ANNOUNCE) vs turn-ending (MOVE, WAIT) +- Points of interest targeting for LOOK/MOVE +- Speech system with room-wide ANNOUNCE and proximity SPEAK +- Multi-tile path continuation with FOV interrupts +- Enhanced logging for offline viewer replay + +This implements the turn-based LLM agent orchestration from issue #156. +""" + +import sys +import os +# Add the vllm_demo directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import mcrfpy +from mcrfpy import automation +import requests +import base64 + +from world_graph import ( + WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, + create_two_room_scenario, create_button_door_scenario +) +from action_parser import parse_action +from enhanced_executor import EnhancedExecutor +from enhanced_orchestrator import EnhancedOrchestrator, EnhancedSimulationLog + +# Configuration +VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" +SCREENSHOT_DIR = "/tmp/vllm_enhanced_demo" +LOG_PATH = "/tmp/vllm_enhanced_demo/simulation_log.json" +MAX_TURNS = 3 + +# Sprites +FLOOR_TILE = 0 +WALL_TILE = 40 +WIZARD_SPRITE = 84 +KNIGHT_SPRITE = 96 +RAT_SPRITE = 123 + + +class Agent: + """Agent with WorldGraph integration.""" + + def __init__(self, name: str, display_name: str, entity, world: WorldGraph): + self.name = name + self.display_name = display_name + self.entity = entity + self.world = world + self.message_history = [] + + @property + def pos(self) -> tuple: + return (int(self.entity.pos[0]), int(self.entity.pos[1])) + + @property + def current_room(self) -> str: + room = self.world.room_at(*self.pos) + return room.name if room else None + + def get_context(self, visible_agents: list) -> dict: + """Build context for LLM query.""" + room_name = self.current_room + agent_infos = [ + AgentInfo( + name=a.name, + display_name=a.display_name, + position=a.pos, + is_player=(a.name == self.name) + ) + for a in visible_agents + ] + return { + "location": self.world.describe_room(room_name, agent_infos, self.name), + "available_actions": self.world.get_available_actions(room_name), + "recent_messages": self.message_history[-5:], + } + + +def file_to_base64(path: str) -> str: + """Convert file to base64 string.""" + with open(path, 'rb') as f: + return base64.b64encode(f.read()).decode('utf-8') + + +def llm_query(agent, screenshot_path: str, context: dict) -> str: + """ + Query VLLM for agent action with enhanced context. + + Includes points of interest, action economy hints, error feedback, + and conversation history. + """ + system_prompt = f"""You are {agent.display_name} exploring a dungeon. +You receive visual and text information about your surroundings. + +ACTION ECONOMY: +- LOOK : Free action. Examine something, then choose another action. +- SPEAK "" or ANNOUNCE "": Free action (once per turn). Then choose another action. +- GO : Ends your turn. Move one tile in that direction (NORTH/SOUTH/EAST/WEST). +- TAKE : Ends your turn. Pick up an item you are standing next to. +- WAIT: Ends your turn without moving. + +IMPORTANT: You can only TAKE items that are adjacent to you (1 tile away). If something is far away, GO towards it first. + +You can LOOK or SPEAK, then still MOVE in the same turn. +Always end your final response with: Action: """ + + # Build enhanced prompt + parts = [context["location"]] + + # Add received messages + if context.get("messages"): + parts.append("\nMessages received this turn:") + for msg in context["messages"]: + sender = msg.get("sender", "someone") + content = msg.get("content", "") + parts.append(f' {sender} says: "{content}"') + + # Add points of interest + if context.get("poi_prompt"): + parts.append(f"\n{context['poi_prompt']}") + + # Add available actions + actions_str = ", ".join(context.get("available_actions", [])) + parts.append(f"\nAvailable actions: {actions_str}") + + # Add action economy hint + if context.get("has_spoken"): + parts.append("\n[You have already spoken this turn - you can still MOVE or WAIT]") + + # Add error feedback from last failed action + if context.get("last_error"): + parts.append(f"\n[ERROR: {context['last_error']}]") + parts.append("[Your last action failed. Please try a different action.]") + + # Add conversation history from this turn + if context.get("conversation_history"): + parts.append("\n[Previous attempts this turn:") + for exch in context["conversation_history"]: + action_str = f"{exch.get('action_type', '?')} {exch.get('action_args', '')}" + if exch.get("error"): + parts.append(f" - You tried: {action_str} -> FAILED: {exch['error']}") + else: + parts.append(f" - You did: {action_str}") + parts.append("]") + + parts.append("\n[Screenshot attached showing your current view]") + parts.append("\nWhat do you do? Brief reasoning (1-2 sentences), then Action: ") + + user_prompt = "\n".join(parts) + + messages = [ + {"role": "system", "content": system_prompt}, + { + "role": "user", + "content": [ + {"type": "text", "text": user_prompt}, + {"type": "image_url", "image_url": { + "url": "data:image/png;base64," + file_to_base64(screenshot_path) + }} + ] + } + ] + + try: + resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) + data = resp.json() + if "error" in data: + return f"[VLLM Error: {data['error']}]" + return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response') + except Exception as e: + return f"[Connection Error: {e}]" + + +def setup_scene(world: WorldGraph): + """Create McRogueFace scene from WorldGraph.""" + mcrfpy.createScene("enhanced_demo") + mcrfpy.setScene("enhanced_demo") + ui = mcrfpy.sceneUI("enhanced_demo") + + texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) + + grid = mcrfpy.Grid( + grid_size=(25, 15), + texture=texture, + pos=(5, 5), + size=(1014, 700) + ) + grid.fill_color = mcrfpy.Color(20, 20, 30) + grid.zoom = 2.0 + ui.append(grid) + + # Initialize all as walls + for x in range(25): + for y in range(15): + p = grid.at(x, y) + p.tilesprite = WALL_TILE + p.walkable = False + p.transparent = False + + # Carve rooms from WorldGraph + for room in world.rooms.values(): + for rx in range(room.x, room.x + room.width): + for ry in range(room.y, room.y + room.height): + if 0 <= rx < 25 and 0 <= ry < 15: + p = grid.at(rx, ry) + p.tilesprite = FLOOR_TILE + p.walkable = True + p.transparent = True + + # Place doors + for door in world.doors: + dx, dy = door.position + if 0 <= dx < 25 and 0 <= dy < 15: + p = grid.at(dx, dy) + p.tilesprite = FLOOR_TILE + p.walkable = not door.locked + p.transparent = True + + # FOV layer + fov_layer = grid.add_layer('color', z_index=10) + fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + + return grid, fov_layer, texture + + +def create_agents(grid, world: WorldGraph, texture) -> list: + """Create agents in their starting rooms.""" + agents = [] + + # Wizard in guard_room (left) + room_a = world.rooms["guard_room"] + wizard = mcrfpy.Entity( + grid_pos=room_a.center, + texture=texture, + sprite_index=WIZARD_SPRITE + ) + wizard.name = "wizard" + grid.entities.append(wizard) + agents.append(Agent("Wizard", "a wizard", wizard, world)) + + # Knight in armory (right) + room_b = world.rooms["armory"] + knight = mcrfpy.Entity( + grid_pos=room_b.center, + texture=texture, + sprite_index=KNIGHT_SPRITE + ) + knight.name = "knight" + grid.entities.append(knight) + agents.append(Agent("Knight", "a knight", knight, world)) + + return agents + + +def add_rat(grid, world: WorldGraph, texture, position: tuple): + """Add a rat entity at the specified position.""" + rat = mcrfpy.Entity( + grid_pos=position, + texture=texture, + sprite_index=RAT_SPRITE + ) + rat.name = "rat" + grid.entities.append(rat) + return rat + + +def run_demo(): + """Run enhanced action demo.""" + print("=" * 70) + print("Enhanced Action Demo") + print("=" * 70) + print(""" +Features demonstrated: +- LOOK as free action (doesn't end turn) +- SPEAK/ANNOUNCE as free action (once per turn) +- Points of interest targeting +- Enhanced logging for offline viewer +""") + + os.makedirs(SCREENSHOT_DIR, exist_ok=True) + + # Create world + print("Creating world...") + world = create_two_room_scenario() + print(f" Rooms: {list(world.rooms.keys())}") + print(f" Objects: {list(world.objects.keys())}") + + # Setup scene + print("\nSetting up scene...") + grid, fov_layer, texture = setup_scene(world) + + # Create agents + print("\nCreating agents...") + agents = create_agents(grid, world, texture) + + # Add a rat near the door for interest + rat = add_rat(grid, world, texture, (9, 4)) + print(f" Added rat at (9, 4)") + + for agent in agents: + print(f" {agent.name} at {agent.pos} in {agent.current_room}") + + # Create enhanced orchestrator + print("\nInitializing enhanced orchestrator...") + orchestrator = EnhancedOrchestrator( + grid=grid, + fov_layer=fov_layer, + world=world, + agents=agents, + screenshot_dir=SCREENSHOT_DIR, + llm_query_fn=llm_query + ) + + # Run simulation + print(f"\nRunning simulation ({MAX_TURNS} turns)...") + log = orchestrator.run_simulation(max_turns=MAX_TURNS) + + # Save enhanced log + log.save(LOG_PATH) + + # Print summary + print("\n" + "=" * 70) + print("SIMULATION SUMMARY") + print("=" * 70) + + for turn in range(1, orchestrator.turn_number + 1): + print(log.get_turn_summary(turn)) + + # Print speech log + if log.speech_log: + print("\n" + "-" * 40) + print("SPEECH LOG") + print("-" * 40) + for entry in log.speech_log: + print(f" Turn {entry['turn']}: {entry['speaker']} {entry['type']}s: \"{entry['content'][:50]}...\"") + if entry['recipients']: + print(f" -> Heard by: {', '.join(entry['recipients'])}") + + print("\n" + "=" * 70) + print("Demo Complete") + print("=" * 70) + print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/") + print(f"Simulation log saved to: {LOG_PATH}") + print("\nLog structure (for offline viewer):") + print(" - metadata: simulation info") + print(" - steps[]: per-agent-turn records with:") + print(" - screenshot_path, position, room") + print(" - llm_prompt_user, llm_response") + print(" - free_actions[] (LOOK, SPEAK)") + print(" - final_action (MOVE, WAIT)") + print(" - speech_log[]: all speech events") + + return True + + +def replay_log(log_path: str): + """ + Replay a simulation from log file. + + This is a text-based preview of what the offline viewer would show. + """ + print(f"Loading simulation from: {log_path}") + + try: + log = EnhancedSimulationLog.load(log_path) + except FileNotFoundError: + print(f"Log file not found: {log_path}") + return + + print("\n" + "=" * 70) + print("SIMULATION REPLAY") + print("=" * 70) + print(f"Turns: {log.metadata.get('total_turns', '?')}") + print(f"Agents: {', '.join(log.metadata.get('agent_names', []))}") + print(f"Rooms: {', '.join(log.metadata.get('world_rooms', []))}") + + for step in log.steps: + print(f"\n{'='*40}") + print(f"Turn {step.turn}: {step.agent_id}") + print(f"{'='*40}") + print(f"Position: {step.position_start} -> {step.position_end}") + print(f"Room: {step.room}") + + if step.pending_messages: + print(f"\nMessages received:") + for msg in step.pending_messages: + print(f" {msg.get('sender')}: \"{msg.get('content', '')[:40]}...\"") + + if step.llm_was_queried: + print(f"\nLLM Response (truncated):") + print(f" {step.llm_response[:200]}...") + else: + print(f"\n[Path continuation - no LLM query]") + + if step.free_actions: + print(f"\nFree actions:") + for fa in step.free_actions: + print(f" - {fa['action_type']}: {fa.get('args', ())}") + + status = "OK" if step.final_action_success else "FAIL" + print(f"\nFinal: {step.final_action_type} {step.final_action_args} [{status}]") + print(f" {step.final_action_message}") + + # Speech summary + if log.speech_log: + print("\n" + "=" * 40) + print("ALL SPEECH") + print("=" * 40) + for entry in log.speech_log: + print(f"Turn {entry['turn']}: {entry['speaker']} -> {entry['recipients']}") + print(f" \"{entry['content']}\"") + + +if __name__ == "__main__": + # Check for replay mode + if len(sys.argv) > 1 and sys.argv[1] == "--replay": + log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH + replay_log(log_file) + sys.exit(0) + + # Normal execution + try: + success = run_demo() + print("\nPASS" if success else "\nFAIL") + sys.exit(0 if success else 1) + except Exception as e: + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/vllm_demo/OFFLINE_VIEWER_SPEC.md b/tests/vllm_demo/OFFLINE_VIEWER_SPEC.md new file mode 100644 index 0000000..c4f762b --- /dev/null +++ b/tests/vllm_demo/OFFLINE_VIEWER_SPEC.md @@ -0,0 +1,152 @@ +# Offline Viewer Specification + +**Status**: Planned (issue #154) +**Priority**: After core simulation features are stable + +## Overview + +The Offline Viewer allows users to replay stored simulation logs in McRogueFace, stepping through turn-by-turn to review: +- Each agent's perspective (FOV, camera position) +- LLM chain-of-thought reasoning +- Actions taken and their results +- Speech between agents + +## Log Format + +Simulation logs are stored as JSON with this structure: + +```json +{ + "metadata": { + "total_turns": 5, + "num_agents": 2, + "agent_names": ["Wizard", "Knight"], + "timestamp_start": "2025-01-15T10:30:00", + "timestamp_end": "2025-01-15T10:32:45", + "world_rooms": ["guard_room", "armory"], + "screenshot_dir": "/tmp/vllm_enhanced_demo" + }, + "steps": [ + { + "turn": 1, + "agent_id": "Wizard", + "timestamp": "2025-01-15T10:30:15", + + "position_start": [5, 4], + "position_end": [6, 4], + "room": "guard_room", + + "visible_entities": ["rat_123", "knight_456"], + "visible_tiles": 42, + "points_of_interest": [ + {"name": "door", "direction": "east", "distance": 4} + ], + + "location_description": "You are in the guard room...", + "available_actions": ["GO EAST", "LOOK", "WAIT"], + "pending_messages": [], + "poi_prompt": "Points of interest:\n - a door to the armory (east)", + + "screenshot_path": "/tmp/.../turn1_wizard.png", + + "llm_prompt_system": "You are a wizard...", + "llm_prompt_user": "You are in the guard room...", + "llm_response": "I see a door to the east. I should explore. Action: GO EAST", + "llm_was_queried": true, + + "free_actions": [ + {"action_type": "LOOK", "args": ["DOOR"], "result": {"description": "A wooden door..."}} + ], + + "final_action_type": "GO", + "final_action_args": ["EAST"], + "final_action_success": true, + "final_action_message": "Moved east to (6, 4)", + + "path_taken": [[5, 4], [6, 4]], + "path_remaining": 0 + } + ], + "speech_log": [ + { + "turn": 2, + "speaker": "Wizard", + "type": "announce", + "content": "Hello, is anyone there?", + "recipients": ["Knight"] + } + ] +} +``` + +## Viewer Features (Planned) + +### Core Features + +1. **Turn Navigation** + - Step forward/backward through turns + - Jump to specific turn number + - Auto-play at configurable speed + +2. **Agent Perspective** + - Reconstruct agent's FOV from stored data + - Center camera on current agent + - Show visible entities and tiles + +3. **LLM Review Panel** + - Display system prompt + - Display user prompt (context) + - Display LLM response + - Highlight parsed action + +4. **Action Log** + - Show free actions (LOOK, SPEAK) + - Show final action and result + - Color-code success/failure + +5. **Speech History** + - Timeline of all speech events + - Filter by agent + - Show recipients + +### Implementation Notes + +The viewer should: +- Load screenshots from `screenshot_path` (if available) +- OR reconstruct scene from WorldGraph + step data +- Support keyboard navigation (arrow keys) +- Display agent state in sidebar + +### UI Layout (Suggested) + +``` ++----------------------------------+------------------+ +| | Turn: 3/10 | +| Main Viewport | Agent: Wizard | +| (Agent's Perspective) | Room: armory | +| +------------------+ +| | LLM Response: | +| | "I see a rat | +| | to the east. | +| | Action: LOOK | +| | AT RAT" | ++----------------------------------+------------------+ +| < Prev | Turn 3 | Next > | Actions: | +| [Agent: Wizard v] | - LOOK AT RAT | +| | - GO EAST [OK] | ++----------------------------------+------------------+ +``` + +## Files + +- `enhanced_orchestrator.py` - Generates `EnhancedSimulationLog` +- `4_enhanced_action_demo.py` - Demo with `--replay` mode for text preview +- Logs stored in `/tmp/vllm_enhanced_demo/simulation_log.json` + +## Future Enhancements + +- Animated path replay (smooth entity movement) +- Side-by-side multi-agent view +- Diff view comparing agent perceptions +- Export to video/GIF +- Integration with annotation tools for research diff --git a/tests/vllm_demo/action_economy.py b/tests/vllm_demo/action_economy.py new file mode 100644 index 0000000..0449cfb --- /dev/null +++ b/tests/vllm_demo/action_economy.py @@ -0,0 +1,302 @@ +""" +Action Economy System +===================== + +Defines which actions consume turns and which are free. +Manages multi-tile pathing with FOV interruption. + +Action Categories: +- FREE: LOOK, SPEAK, ANNOUNCE (don't end turn) +- FULL: MOVE, WAIT (end turn) + +Constraints: +- Only ONE speech action per turn +- LOOK provides description and prompts for another action +- Multi-tile paths continue without LLM until FOV changes +""" + +from dataclasses import dataclass, field +from typing import List, Tuple, Optional, Set, Dict, Any +from enum import Enum + +from action_parser import Action, ActionType + + +class TurnCost(Enum): + """How much of a turn an action consumes.""" + FREE = "free" # Doesn't end turn + FULL = "full" # Ends turn + + +# Action cost mapping +ACTION_COSTS = { + ActionType.LOOK: TurnCost.FREE, + ActionType.SPEAK: TurnCost.FREE, + ActionType.ANNOUNCE: TurnCost.FREE, + ActionType.GO: TurnCost.FULL, + ActionType.WAIT: TurnCost.FULL, + ActionType.TAKE: TurnCost.FULL, + ActionType.DROP: TurnCost.FULL, + ActionType.PUSH: TurnCost.FULL, + ActionType.USE: TurnCost.FULL, + ActionType.OPEN: TurnCost.FULL, + ActionType.CLOSE: TurnCost.FULL, + ActionType.INVALID: TurnCost.FULL, # Invalid action ends turn +} + + +@dataclass +class TurnState: + """ + Tracks state within a single turn. + + Used to enforce constraints like "only one speech per turn" + and track free actions taken before turn-ending action. + """ + has_spoken: bool = False + free_actions: List[Dict[str, Any]] = field(default_factory=list) + turn_ended: bool = False + + def can_speak(self) -> bool: + """Check if agent can still speak this turn.""" + return not self.has_spoken + + def record_speech(self): + """Record that agent has spoken this turn.""" + self.has_spoken = True + + def record_free_action(self, action_type: str, details: Dict[str, Any]): + """Record a free action for logging.""" + self.free_actions.append({ + "type": action_type, + **details + }) + + def end_turn(self): + """Mark turn as ended.""" + self.turn_ended = True + + +@dataclass +class PathState: + """ + Tracks multi-tile movement path for an agent. + + When an agent decides to move to a distant location, + we store the path and continue moving without LLM calls + until the path completes or FOV changes. + """ + path: List[Tuple[int, int]] = field(default_factory=list) + current_index: int = 0 + destination_description: str = "" # "the armory", "the door" + + # FOV state when path was planned + visible_entities_at_start: Set[str] = field(default_factory=set) + + @property + def has_path(self) -> bool: + """Check if there's an active path.""" + return len(self.path) > self.current_index + + @property + def next_tile(self) -> Optional[Tuple[int, int]]: + """Get next tile in path, or None if path complete.""" + if self.has_path: + return self.path[self.current_index] + return None + + @property + def remaining_tiles(self) -> int: + """Number of tiles left in path.""" + return max(0, len(self.path) - self.current_index) + + def advance(self): + """Move to next tile in path.""" + if self.has_path: + self.current_index += 1 + + def clear(self): + """Clear the current path.""" + self.path = [] + self.current_index = 0 + self.destination_description = "" + self.visible_entities_at_start = set() + + def should_interrupt(self, current_visible_entities: Set[str]) -> bool: + """ + Check if path should be interrupted due to FOV change. + + Returns True if a NEW entity has entered the agent's FOV + since the path was planned. + """ + new_entities = current_visible_entities - self.visible_entities_at_start + return len(new_entities) > 0 + + +@dataclass +class PointOfInterest: + """ + A targetable object/location for LOOK/MOVE actions. + + Listed in LLM prompts to guide valid targeting. + """ + name: str # Short name: "door", "rat", "button" + display_name: str # Full description: "a wooden door to the east" + position: Tuple[int, int] # Tile coordinates + direction: str # Cardinal direction from agent: "north", "east" + distance: int # Manhattan distance from agent + can_look: bool = True # Can be examined with LOOK + can_move_to: bool = False # Can be targeted with GO TO + entity_id: Optional[str] = None # Entity ID if this is an entity + + +def get_action_cost(action: Action) -> TurnCost: + """Get the turn cost for an action.""" + return ACTION_COSTS.get(action.type, TurnCost.FULL) + + +def get_direction_name(from_pos: Tuple[int, int], to_pos: Tuple[int, int]) -> str: + """Get cardinal direction name from one position to another.""" + dx = to_pos[0] - from_pos[0] + dy = to_pos[1] - from_pos[1] + + if abs(dx) > abs(dy): + return "east" if dx > 0 else "west" + elif abs(dy) > abs(dx): + return "south" if dy > 0 else "north" + else: + # Diagonal + ns = "south" if dy > 0 else "north" + ew = "east" if dx > 0 else "west" + return f"{ns}-{ew}" + + +def manhattan_distance(a: Tuple[int, int], b: Tuple[int, int]) -> int: + """Calculate Manhattan distance between two points.""" + return abs(a[0] - b[0]) + abs(a[1] - b[1]) + + +class PointOfInterestCollector: + """ + Collects points of interest visible to an agent. + + Used to populate LLM prompts with valid LOOK/MOVE targets. + """ + + def __init__(self, grid, agent_pos: Tuple[int, int]): + self.grid = grid + self.agent_pos = agent_pos + self.points: List[PointOfInterest] = [] + + def collect_from_fov(self, world_graph=None) -> List[PointOfInterest]: + """ + Collect all points of interest visible in current FOV. + + Examines: + - Entities (other agents, NPCs, items) + - Doors/exits + - Interactive objects (buttons, chests) + - Notable tiles (walls with features) + """ + self.points = [] + + # Collect entities + for entity in self.grid.entities: + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if (ex, ey) == self.agent_pos: + continue # Skip self + + if self.grid.is_in_fov(ex, ey): + direction = get_direction_name(self.agent_pos, (ex, ey)) + distance = manhattan_distance(self.agent_pos, (ex, ey)) + + # Try to get entity name/description + entity_name = getattr(entity, 'name', None) or f"creature" + entity_id = getattr(entity, 'id', None) or str(id(entity)) + + self.points.append(PointOfInterest( + name=entity_name, + display_name=f"a {entity_name} to the {direction}", + position=(ex, ey), + direction=direction, + distance=distance, + can_look=True, + can_move_to=False, # Can't move onto entities + entity_id=entity_id + )) + + # Collect from WorldGraph if provided + if world_graph: + self._collect_from_world_graph(world_graph) + + # Sort by distance + self.points.sort(key=lambda p: p.distance) + + return self.points + + def _collect_from_world_graph(self, world): + """Collect doors and objects from WorldGraph.""" + agent_room = world.room_at(*self.agent_pos) + if not agent_room: + return + + # Doors + for door in world.get_exits(agent_room.name): + dx, dy = door.position + if self.grid.is_in_fov(dx, dy): + direction = get_direction_name(self.agent_pos, (dx, dy)) + distance = manhattan_distance(self.agent_pos, (dx, dy)) + + # Get destination room name + if door.room_a == agent_room.name: + dest = world.rooms.get(door.room_b) + else: + dest = world.rooms.get(door.room_a) + dest_name = dest.display_name if dest else "unknown" + + lock_str = " (locked)" if door.locked else "" + + self.points.append(PointOfInterest( + name="door", + display_name=f"a door to {dest_name}{lock_str} ({direction})", + position=(dx, dy), + direction=direction, + distance=distance, + can_look=True, + can_move_to=not door.locked + )) + + # Objects in room + for obj in world.get_objects_in_room(agent_room.name): + ox, oy = obj.position + if self.grid.is_in_fov(ox, oy): + direction = get_direction_name(self.agent_pos, (ox, oy)) + distance = manhattan_distance(self.agent_pos, (ox, oy)) + + self.points.append(PointOfInterest( + name=obj.name, + display_name=f"{obj.display_name} ({direction})", + position=(ox, oy), + direction=direction, + distance=distance, + can_look=True, + can_move_to="pressable" not in obj.affordances # Can walk to items + )) + + def format_for_prompt(self) -> str: + """Format points of interest for inclusion in LLM prompt.""" + if not self.points: + return "No notable objects in view." + + lines = ["Points of interest:"] + for poi in self.points: + actions = [] + if poi.can_look: + actions.append(f"LOOK AT {poi.name.upper()}") + if poi.can_move_to: + actions.append(f"GO TO {poi.name.upper()}") + + action_str = ", ".join(actions) if actions else "observe only" + lines.append(f" - {poi.display_name}: {action_str}") + + return "\n".join(lines) diff --git a/tests/vllm_demo/enhanced_executor.py b/tests/vllm_demo/enhanced_executor.py new file mode 100644 index 0000000..56b3075 --- /dev/null +++ b/tests/vllm_demo/enhanced_executor.py @@ -0,0 +1,731 @@ +""" +Enhanced Action Executor +======================== + +Extends ActionExecutor with: +- LOOK action with detailed descriptions +- SPEAK/ANNOUNCE execution with range checking +- Multi-tile path planning +- Free action vs turn-ending action handling +""" + +from dataclasses import dataclass +from typing import Optional, List, Tuple, Dict, Any, Set +from action_parser import Action, ActionType +from action_executor import ActionResult +from action_economy import ( + TurnState, PathState, TurnCost, get_action_cost, + manhattan_distance, get_direction_name +) + + +@dataclass +class TakeResult: + """Result of a TAKE action.""" + success: bool + message: str + item_name: str + item_position: Optional[Tuple[int, int]] = None + + +@dataclass +class LookResult: + """Result of a LOOK action.""" + success: bool + description: str + target_name: str + target_position: Optional[Tuple[int, int]] = None + + +@dataclass +class SpeechResult: + """Result of a SPEAK/ANNOUNCE action.""" + success: bool + message: str + recipients: List[str] # Names of agents who received the message + speech_type: str # "announce" or "speak" + content: str # What was said + + +@dataclass +class Message: + """A message received by an agent.""" + sender: str + content: str + speech_type: str # "announce" or "speak" + turn: int + distance: Optional[int] = None # For SPEAK, how far away sender was + + +class EnhancedExecutor: + """ + Enhanced action executor with LOOK, SPEAK, and multi-tile support. + """ + + # Direction vectors for movement + DIRECTION_VECTORS = { + 'NORTH': (0, -1), + 'SOUTH': (0, 1), + 'EAST': (1, 0), + 'WEST': (-1, 0), + } + + # SPEAK range (Manhattan distance) + SPEAK_RANGE = 4 + + def __init__(self, grid, world_graph=None): + """ + Initialize executor. + + Args: + grid: mcrfpy.Grid instance + world_graph: Optional WorldGraph for detailed descriptions + """ + self.grid = grid + self.world = world_graph + + # Agent path states (agent_name -> PathState) + self.path_states: Dict[str, PathState] = {} + + # Speech channel for message delivery + self.pending_messages: Dict[str, List[Message]] = {} # agent_name -> messages + + def get_path_state(self, agent_name: str) -> PathState: + """Get or create path state for an agent.""" + if agent_name not in self.path_states: + self.path_states[agent_name] = PathState() + return self.path_states[agent_name] + + def get_pending_messages(self, agent_name: str) -> List[Message]: + """Get and clear pending messages for an agent.""" + messages = self.pending_messages.get(agent_name, []) + self.pending_messages[agent_name] = [] + return messages + + # ========================================================================= + # LOOK Action + # ========================================================================= + + def execute_look(self, agent, action: Action) -> LookResult: + """ + Execute LOOK action - examine a tile or entity. + + Args: + agent: Agent performing the look + action: Parsed LOOK action with optional target + + Returns: + LookResult with detailed description + """ + target = action.args[0] if action.args and action.args[0] else None + + if target is None: + # General look around + return self._look_around(agent) + else: + # Look at specific target + return self._look_at_target(agent, target.upper()) + + def _look_around(self, agent) -> LookResult: + """Describe the general surroundings.""" + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + + descriptions = [] + + # Describe current room + if self.world: + room = self.world.room_at(ax, ay) + if room: + descriptions.append(f"You are in {room.display_name}.") + if room.description_template and room.properties: + try: + desc = room.description_template.format(**room.properties) + descriptions.append(desc) + except KeyError: + pass + + # Count visible entities + visible_count = 0 + for entity in self.grid.entities: + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if (ex, ey) != (ax, ay) and self.grid.is_in_fov(ex, ey): + visible_count += 1 + + if visible_count > 0: + descriptions.append(f"You can see {visible_count} other creature(s) nearby.") + + # Describe nearby walls/openings + wall_dirs = [] + open_dirs = [] + for direction, (dx, dy) in self.DIRECTION_VECTORS.items(): + nx, ny = ax + dx, ay + dy + if 0 <= nx < self.grid.grid_size[0] and 0 <= ny < self.grid.grid_size[1]: + cell = self.grid.at(nx, ny) + if cell.walkable: + open_dirs.append(direction.lower()) + else: + wall_dirs.append(direction.lower()) + + if open_dirs: + descriptions.append(f"Open passages: {', '.join(open_dirs)}.") + if wall_dirs: + descriptions.append(f"Walls to the: {', '.join(wall_dirs)}.") + + return LookResult( + success=True, + description=" ".join(descriptions), + target_name="surroundings" + ) + + def _look_at_target(self, agent, target: str) -> LookResult: + """Look at a specific target (direction, entity, or object name).""" + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + + # Check if target is a direction + if target in self.DIRECTION_VECTORS: + return self._look_in_direction(agent, target) + + # Check if target matches an entity + for entity in self.grid.entities: + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if (ex, ey) == (ax, ay): + continue + + entity_name = getattr(entity, 'name', '').upper() + if target in entity_name or entity_name in target: + if self.grid.is_in_fov(ex, ey): + return self._describe_entity(agent, entity) + else: + return LookResult( + success=False, + description=f"You cannot see {target.lower()} from here.", + target_name=target.lower() + ) + + # Check WorldGraph objects + if self.world: + room = self.world.room_at(ax, ay) + if room: + for obj in self.world.get_objects_in_room(room.name): + if target in obj.name.upper() or obj.name.upper() in target: + ox, oy = obj.position + if self.grid.is_in_fov(ox, oy): + return self._describe_object(agent, obj) + + # Check doors + for door in self.world.get_exits(room.name): + if "DOOR" in target: + dx, dy = door.position + if self.grid.is_in_fov(dx, dy): + return self._describe_door(agent, door) + + return LookResult( + success=False, + description=f"You don't see anything called '{target.lower()}' nearby.", + target_name=target.lower() + ) + + def _look_in_direction(self, agent, direction: str) -> LookResult: + """Look in a cardinal direction.""" + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + dx, dy = self.DIRECTION_VECTORS[direction] + + descriptions = [] + + # Scan tiles in that direction + for distance in range(1, 10): + tx, ty = ax + dx * distance, ay + dy * distance + + if not (0 <= tx < self.grid.grid_size[0] and 0 <= ty < self.grid.grid_size[1]): + descriptions.append(f"The edge of the known world lies {direction.lower()}.") + break + + if not self.grid.is_in_fov(tx, ty): + descriptions.append(f"Darkness obscures your vision beyond {distance} tiles.") + break + + cell = self.grid.at(tx, ty) + + # Check for entity at this tile + for entity in self.grid.entities: + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if (ex, ey) == (tx, ty): + entity_name = getattr(entity, 'name', 'creature') + descriptions.append(f"A {entity_name} stands {distance} tile(s) to the {direction.lower()}.") + + # Check for wall + if not cell.walkable: + # Check if it's a door + if self.world: + room = self.world.room_at(ax, ay) + if room: + for door in self.world.get_exits(room.name): + if door.position == (tx, ty): + dest = self.world.rooms.get( + door.room_b if door.room_a == room.name else door.room_a + ) + dest_name = dest.display_name if dest else "another area" + lock_str = " It is locked." if door.locked else "" + descriptions.append( + f"A door to {dest_name} lies {distance} tile(s) {direction.lower()}.{lock_str}" + ) + break + else: + descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.") + else: + descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.") + else: + descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.") + break + + if not descriptions: + descriptions.append(f"Open floor extends to the {direction.lower()}.") + + return LookResult( + success=True, + description=" ".join(descriptions), + target_name=direction.lower(), + target_position=None + ) + + def _describe_entity(self, agent, entity) -> LookResult: + """Generate detailed description of an entity.""" + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + + entity_name = getattr(entity, 'name', 'creature') + direction = get_direction_name((ax, ay), (ex, ey)) + distance = manhattan_distance((ax, ay), (ex, ey)) + + descriptions = [ + f"You examine the {entity_name} carefully.", + f"It stands {distance} tile(s) to the {direction}." + ] + + # Add any entity-specific description + if hasattr(entity, 'description'): + descriptions.append(entity.description) + + # Add behavior hints if available + if hasattr(entity, 'behavior'): + descriptions.append(f"It appears to be {entity.behavior}.") + + return LookResult( + success=True, + description=" ".join(descriptions), + target_name=entity_name, + target_position=(ex, ey) + ) + + def _describe_object(self, agent, obj) -> LookResult: + """Generate detailed description of a WorldGraph object.""" + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + ox, oy = obj.position + + direction = get_direction_name((ax, ay), (ox, oy)) + distance = manhattan_distance((ax, ay), (ox, oy)) + + descriptions = [ + f"You examine {obj.display_name}.", + f"It is {distance} tile(s) to the {direction}." + ] + + if obj.description: + descriptions.append(obj.description) + + # Describe affordances + if "takeable" in obj.affordances: + descriptions.append("It looks small enough to pick up.") + if "pressable" in obj.affordances: + descriptions.append("It appears to be some kind of mechanism.") + if "openable" in obj.affordances: + descriptions.append("It can be opened.") + if "readable" in obj.affordances: + descriptions.append("There is writing on it.") + + return LookResult( + success=True, + description=" ".join(descriptions), + target_name=obj.name, + target_position=(ox, oy) + ) + + def _describe_door(self, agent, door) -> LookResult: + """Generate detailed description of a door.""" + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + dx, dy = door.position + + direction = get_direction_name((ax, ay), (dx, dy)) + distance = manhattan_distance((ax, ay), (dx, dy)) + + # Get destination + if self.world: + current_room = self.world.room_at(ax, ay) + if current_room: + if door.room_a == current_room.name: + dest = self.world.rooms.get(door.room_b) + else: + dest = self.world.rooms.get(door.room_a) + dest_name = dest.display_name if dest else "another area" + else: + dest_name = "another area" + else: + dest_name = "another area" + + descriptions = [ + f"You examine the doorway to the {direction}.", + f"It leads to {dest_name}, {distance} tile(s) away." + ] + + if door.locked: + descriptions.append("The door is locked. You'll need a key or mechanism to open it.") + else: + descriptions.append("The passage is open.") + + return LookResult( + success=True, + description=" ".join(descriptions), + target_name="door", + target_position=(dx, dy) + ) + + # ========================================================================= + # SPEAK/ANNOUNCE Actions + # ========================================================================= + + def execute_speech(self, agent, action: Action, all_agents: list, + turn_number: int) -> SpeechResult: + """ + Execute SPEAK or ANNOUNCE action. + + ANNOUNCE: All agents in the same room hear the message + SPEAK: Only agents within SPEAK_RANGE tiles hear the message + """ + message_content = action.args[0] if action.args else "" + + if not message_content: + return SpeechResult( + success=False, + message="Nothing to say.", + recipients=[], + speech_type=action.type.value.lower(), + content="" + ) + + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + recipients = [] + + if action.type == ActionType.ANNOUNCE: + # Room-wide broadcast + recipients = self._get_agents_in_room(agent, all_agents) + speech_type = "announce" + else: + # Proximity-based speech + recipients = self._get_agents_in_range(agent, all_agents, self.SPEAK_RANGE) + speech_type = "speak" + + # Deliver messages + for recipient in recipients: + if recipient.name not in self.pending_messages: + self.pending_messages[recipient.name] = [] + + distance = manhattan_distance( + (ax, ay), + (int(recipient.entity.pos[0]), int(recipient.entity.pos[1])) + ) if speech_type == "speak" else None + + self.pending_messages[recipient.name].append(Message( + sender=agent.name, + content=message_content, + speech_type=speech_type, + turn=turn_number, + distance=distance + )) + + recipient_names = [r.name for r in recipients] + + if recipients: + return SpeechResult( + success=True, + message=f"You {speech_type}: \"{message_content}\"", + recipients=recipient_names, + speech_type=speech_type, + content=message_content + ) + else: + return SpeechResult( + success=True, # Still succeeds, just nobody heard + message=f"You {speech_type} into the emptiness: \"{message_content}\"", + recipients=[], + speech_type=speech_type, + content=message_content + ) + + def _get_agents_in_room(self, speaker, all_agents: list) -> list: + """Get all agents in the same room as speaker (excluding speaker).""" + if not self.world: + # Fallback: use proximity + return self._get_agents_in_range(speaker, all_agents, 20) + + ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1]) + speaker_room = self.world.room_at(ax, ay) + + if not speaker_room: + return [] + + recipients = [] + for agent in all_agents: + if agent.name == speaker.name: + continue + rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + agent_room = self.world.room_at(rx, ry) + if agent_room and agent_room.name == speaker_room.name: + recipients.append(agent) + + return recipients + + def _get_agents_in_range(self, speaker, all_agents: list, range_tiles: int) -> list: + """Get all agents within Manhattan distance of speaker.""" + ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1]) + + recipients = [] + for agent in all_agents: + if agent.name == speaker.name: + continue + rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + if manhattan_distance((ax, ay), (rx, ry)) <= range_tiles: + recipients.append(agent) + + return recipients + + # ========================================================================= + # TAKE Action + # ========================================================================= + + def execute_take(self, agent, action: Action) -> TakeResult: + """ + Execute TAKE action - pick up an item. + + Items must be: + 1. In the WorldGraph as a takeable object + 2. Within reach (adjacent tile or same tile, distance <= 1) + 3. Visible in FOV + """ + item_name = action.args[0].lower() if action.args and action.args[0] else None + + if not item_name: + return TakeResult( + success=False, + message="Take what? Specify an item name.", + item_name="" + ) + + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + + # Search for the item in WorldGraph + if not self.world: + return TakeResult( + success=False, + message="No items exist in this world.", + item_name=item_name + ) + + # Find matching object + matching_obj = None + for obj_name, obj in self.world.objects.items(): + if item_name in obj_name.lower() or obj_name.lower() in item_name: + matching_obj = obj + break + + if not matching_obj: + return TakeResult( + success=False, + message=f"You don't see any '{item_name}' here.", + item_name=item_name + ) + + # Check if takeable + if "takeable" not in matching_obj.affordances: + return TakeResult( + success=False, + message=f"The {matching_obj.display_name} cannot be picked up.", + item_name=item_name, + item_position=matching_obj.position + ) + + ox, oy = matching_obj.position + + # Check if visible in FOV + if not self.grid.is_in_fov(ox, oy): + return TakeResult( + success=False, + message=f"You can't see the {matching_obj.display_name} from here.", + item_name=item_name, + item_position=(ox, oy) + ) + + # Check distance (must be adjacent or same tile) + distance = manhattan_distance((ax, ay), (ox, oy)) + if distance > 1: + direction = get_direction_name((ax, ay), (ox, oy)) + # Use name for cleaner message (display_name often has article already) + return TakeResult( + success=False, + message=f"The {matching_obj.name.replace('_', ' ')} is {distance} tiles away to the {direction}. Move closer to pick it up.", + item_name=item_name, + item_position=(ox, oy) + ) + + # Success! Remove from world (simplified - no inventory system yet) + del self.world.objects[matching_obj.name] + + return TakeResult( + success=True, + message=f"You pick up {matching_obj.display_name}.", + item_name=matching_obj.name, + item_position=(ox, oy) + ) + + # ========================================================================= + # Movement (single tile, delegates to original executor) + # ========================================================================= + + def execute_move(self, agent, action: Action) -> ActionResult: + """ + Execute single-tile movement. + + This is the per-turn movement. Multi-tile paths are handled + at the orchestrator level. + """ + if not action.args or not action.args[0]: + return ActionResult(False, "No direction specified") + + direction = action.args[0] + if direction not in self.DIRECTION_VECTORS: + return ActionResult(False, f"Invalid direction: {direction}") + + dx, dy = self.DIRECTION_VECTORS[direction] + current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + new_x, new_y = current_x + dx, current_y + dy + + # Bounds check + grid_w, grid_h = self.grid.grid_size + if not (0 <= new_x < grid_w and 0 <= new_y < grid_h): + return ActionResult(False, f"Cannot go {direction} - edge of map") + + # Walkability check + target_cell = self.grid.at(new_x, new_y) + if not target_cell.walkable: + return ActionResult(False, f"Cannot go {direction} - path blocked") + + # Entity collision check + for entity in self.grid.entities: + if entity is agent.entity: + continue + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if ex == new_x and ey == new_y: + return ActionResult(False, f"Cannot go {direction} - occupied") + + # Execute movement + agent.entity.pos = (new_x, new_y) + + return ActionResult( + success=True, + message=f"Moved {direction.lower()} to ({new_x}, {new_y})", + new_position=(new_x, new_y), + path=[(current_x, current_y), (new_x, new_y)] + ) + + def execute_wait(self, agent, action: Action) -> ActionResult: + """Execute WAIT action.""" + return ActionResult(True, "Waited and observed surroundings") + + # ========================================================================= + # Multi-tile Pathfinding + # ========================================================================= + + def plan_path_to(self, agent, target_pos: Tuple[int, int], + visible_entities: Set[str]) -> Optional[List[Tuple[int, int]]]: + """ + Plan a path to a target position. + + Uses A* via libtcod if available, otherwise simple pathfinding. + Returns list of tiles from current position to target (excluding current). + """ + try: + from mcrfpy import libtcod + ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + + path = libtcod.find_path(self.grid, ax, ay, target_pos[0], target_pos[1]) + + if path: + # Store path state + path_state = self.get_path_state(agent.name) + path_state.path = path + path_state.current_index = 0 + path_state.visible_entities_at_start = visible_entities.copy() + + return path + except ImportError: + pass + + return None + + def continue_path(self, agent, current_visible: Set[str]) -> Optional[ActionResult]: + """ + Continue an existing multi-tile path. + + Returns ActionResult if moved, None if path complete or interrupted. + """ + path_state = self.get_path_state(agent.name) + + if not path_state.has_path: + return None + + # Check for FOV interrupt + if path_state.should_interrupt(current_visible): + path_state.clear() + return None # Signal that LLM should be queried + + # Get next tile + next_tile = path_state.next_tile + if not next_tile: + path_state.clear() + return None + + # Move to next tile + current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) + new_x, new_y = next_tile + + # Verify still walkable + target_cell = self.grid.at(new_x, new_y) + if not target_cell.walkable: + path_state.clear() + return ActionResult(False, "Path blocked - recalculating") + + # Check for entity collision + for entity in self.grid.entities: + if entity is agent.entity: + continue + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if ex == new_x and ey == new_y: + path_state.clear() + return ActionResult(False, "Path blocked by creature") + + # Execute movement + agent.entity.pos = (new_x, new_y) + path_state.advance() + + remaining = path_state.remaining_tiles + if remaining > 0: + msg = f"Continuing path ({remaining} tiles remaining)" + else: + msg = "Arrived at destination" + path_state.clear() + + return ActionResult( + success=True, + message=msg, + new_position=(new_x, new_y), + path=[(current_x, current_y), (new_x, new_y)] + ) diff --git a/tests/vllm_demo/enhanced_orchestrator.py b/tests/vllm_demo/enhanced_orchestrator.py new file mode 100644 index 0000000..2febc18 --- /dev/null +++ b/tests/vllm_demo/enhanced_orchestrator.py @@ -0,0 +1,606 @@ +""" +Enhanced Turn Orchestrator +========================== + +Extends TurnOrchestrator with: +- Action economy (free actions vs turn-ending) +- Multi-tile path continuation +- FOV interrupt detection +- Enhanced logging for offline viewer replay +""" + +import json +import os +from dataclasses import dataclass, asdict, field +from typing import List, Dict, Any, Optional, Callable, Set +from datetime import datetime + +from world_graph import WorldGraph, AgentInfo +from action_parser import Action, ActionType, parse_action +from action_executor import ActionResult +from action_economy import ( + TurnState, PathState, TurnCost, get_action_cost, + PointOfInterestCollector, PointOfInterest +) +from enhanced_executor import EnhancedExecutor, LookResult, SpeechResult, Message, TakeResult + + +@dataclass +class FreeActionRecord: + """Record of a free action taken during a turn.""" + action_type: str + args: tuple + result: Dict[str, Any] + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class EnhancedSimulationStep: + """ + Enhanced simulation step for offline viewer replay. + + Contains all data needed to reconstruct the agent's perspective + and decision-making for that turn. + """ + # Turn identification + turn: int + agent_id: str + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + # Agent state at start of turn + position_start: tuple = (0, 0) + room: str = "" + path_in_progress: bool = False + + # FOV and perception + visible_entities: List[str] = field(default_factory=list) + visible_tiles: int = 0 # Count of visible tiles + points_of_interest: List[Dict] = field(default_factory=list) + + # Context provided to LLM + location_description: str = "" + available_actions: List[str] = field(default_factory=list) + pending_messages: List[Dict] = field(default_factory=list) + poi_prompt: str = "" + + # Screenshot path (for viewer to load) + screenshot_path: str = "" + + # LLM interaction + llm_prompt_system: str = "" + llm_prompt_user: str = "" + llm_response: str = "" + llm_was_queried: bool = True # False if path continuation + + # Conversation history (LLM queries within this turn) + llm_exchanges: List[Dict] = field(default_factory=list) # [{prompt, response, action, error}] + action_retries: int = 0 # How many times we re-prompted due to errors + + # Free actions taken (LOOK, SPEAK) + free_actions: List[Dict] = field(default_factory=list) + + # Turn-ending action + final_action_type: str = "" + final_action_args: tuple = () + final_action_success: bool = False + final_action_message: str = "" + + # Movement result + position_end: tuple = (0, 0) + path_taken: List[tuple] = field(default_factory=list) + path_remaining: int = 0 # Tiles left if multi-tile path + + +@dataclass +class EnhancedSimulationLog: + """ + Complete simulation log for offline viewer. + + Designed to support: + - Turn-by-turn replay + - Per-agent perspective reconstruction + - LLM chain-of-thought review + - Speech history tracking + """ + metadata: Dict[str, Any] = field(default_factory=dict) + steps: List[EnhancedSimulationStep] = field(default_factory=list) + speech_log: List[Dict] = field(default_factory=list) + + def save(self, path: str): + """Save log to JSON file.""" + data = { + "metadata": self.metadata, + "steps": [asdict(s) for s in self.steps], + "speech_log": self.speech_log + } + with open(path, 'w') as f: + json.dump(data, f, indent=2, default=str) + print(f"Enhanced simulation log saved to: {path}") + + @classmethod + def load(cls, path: str) -> 'EnhancedSimulationLog': + """Load log from JSON file.""" + with open(path) as f: + data = json.load(f) + + steps = [] + for s in data.get("steps", []): + # Convert lists back to tuples where needed + if isinstance(s.get("position_start"), list): + s["position_start"] = tuple(s["position_start"]) + if isinstance(s.get("position_end"), list): + s["position_end"] = tuple(s["position_end"]) + if isinstance(s.get("final_action_args"), list): + s["final_action_args"] = tuple(s["final_action_args"]) + if s.get("path_taken"): + s["path_taken"] = [tuple(p) for p in s["path_taken"]] + steps.append(EnhancedSimulationStep(**s)) + + return cls( + metadata=data.get("metadata", {}), + steps=steps, + speech_log=data.get("speech_log", []) + ) + + def get_turn_summary(self, turn: int) -> str: + """Get summary of a specific turn for display.""" + turn_steps = [s for s in self.steps if s.turn == turn] + lines = [f"=== Turn {turn} ==="] + + for step in turn_steps: + lines.append(f"\n{step.agent_id}:") + lines.append(f" Position: {step.position_start} -> {step.position_end}") + + if step.free_actions: + lines.append(f" Free actions: {len(step.free_actions)}") + for fa in step.free_actions: + lines.append(f" - {fa['action_type']}: {fa.get('result', {}).get('message', '')[:50]}") + + status = "OK" if step.final_action_success else "FAIL" + lines.append(f" Action: {step.final_action_type} {step.final_action_args} [{status}]") + + if not step.llm_was_queried: + lines.append(" (Path continuation - no LLM query)") + + return "\n".join(lines) + + +class EnhancedOrchestrator: + """ + Enhanced turn orchestrator with action economy and improved logging. + """ + + def __init__(self, grid, fov_layer, world: WorldGraph, agents: list, + screenshot_dir: str, llm_query_fn: Callable): + """ + Initialize enhanced orchestrator. + + Args: + grid: mcrfpy.Grid instance + fov_layer: Color layer for FOV rendering + world: WorldGraph instance + agents: List of Agent objects + screenshot_dir: Directory for screenshots + llm_query_fn: Function(agent, screenshot_path, context) -> str + """ + self.grid = grid + self.fov_layer = fov_layer + self.world = world + self.agents = agents + self.screenshot_dir = screenshot_dir + self.llm_query_fn = llm_query_fn + + self.executor = EnhancedExecutor(grid, world) + self.turn_number = 0 + self.steps: List[EnhancedSimulationStep] = [] + self.speech_log: List[Dict] = [] + + os.makedirs(screenshot_dir, exist_ok=True) + + def run_simulation(self, max_turns: int = 10, + stop_condition: Callable = None) -> EnhancedSimulationLog: + """ + Run complete simulation with enhanced logging. + + Args: + max_turns: Maximum number of turns + stop_condition: Optional callable(orchestrator) -> bool + + Returns: + EnhancedSimulationLog for offline viewer + """ + print(f"\nStarting enhanced simulation: max {max_turns} turns") + print(f"Agents: {[a.name for a in self.agents]}") + print("=" * 60) + + for turn in range(max_turns): + self.run_turn() + + if stop_condition and stop_condition(self): + print(f"\nStop condition met at turn {self.turn_number}") + break + + # Build log + log = EnhancedSimulationLog( + metadata={ + "total_turns": self.turn_number, + "num_agents": len(self.agents), + "agent_names": [a.name for a in self.agents], + "timestamp_start": self.steps[0].timestamp if self.steps else "", + "timestamp_end": self.steps[-1].timestamp if self.steps else "", + "world_rooms": list(self.world.rooms.keys()), + "screenshot_dir": self.screenshot_dir, + }, + steps=self.steps, + speech_log=self.speech_log + ) + + return log + + def run_turn(self) -> List[EnhancedSimulationStep]: + """Execute one full turn (all agents act once).""" + import mcrfpy + + self.turn_number += 1 + turn_steps = [] + + print(f"\n{'='*60}") + print(f"TURN {self.turn_number}") + print("=" * 60) + + for agent in self.agents: + step = self._run_agent_turn(agent) + turn_steps.append(step) + self.steps.append(step) + + return turn_steps + + def _run_agent_turn(self, agent) -> EnhancedSimulationStep: + """Execute one agent's turn with action economy.""" + import mcrfpy + from mcrfpy import automation + + print(f"\n--- {agent.name}'s Turn ---") + + # Initialize step record + step = EnhancedSimulationStep( + turn=self.turn_number, + agent_id=agent.name, + position_start=agent.pos, + room=agent.current_room + ) + + # Check for path continuation + path_state = self.executor.get_path_state(agent.name) + current_visible = self._get_visible_entity_ids(agent) + + if path_state.has_path: + # Check for FOV interrupt + if path_state.should_interrupt(current_visible): + print(f" Path interrupted: new entity in FOV") + path_state.clear() + else: + # Continue path without LLM query + result = self.executor.continue_path(agent, current_visible) + if result and result.success: + step.llm_was_queried = False + step.path_in_progress = True + step.final_action_type = "GO" + step.final_action_args = ("CONTINUE",) + step.final_action_success = True + step.final_action_message = result.message + step.position_end = result.new_position or agent.pos + step.path_taken = result.path or [] + step.path_remaining = self.executor.get_path_state(agent.name).remaining_tiles + + print(f" Path continuation: {result.message}") + return step + + # Need LLM query - set up perspective + step.visible_entities = list(current_visible) + self._switch_perspective(agent) + mcrfpy.step(0.016) + + # Take screenshot + screenshot_path = os.path.join( + self.screenshot_dir, + f"turn{self.turn_number}_{agent.name.lower()}.png" + ) + automation.screenshot(screenshot_path) + step.screenshot_path = screenshot_path + + # Collect points of interest + poi_collector = PointOfInterestCollector(self.grid, agent.pos) + pois = poi_collector.collect_from_fov(self.world) + step.points_of_interest = [asdict(p) for p in pois] + step.poi_prompt = poi_collector.format_for_prompt() + + # Get pending messages + messages = self.executor.get_pending_messages(agent.name) + step.pending_messages = [asdict(m) for m in messages] + + # Build context + visible_agents = self._get_visible_agents(agent) + context = agent.get_context(visible_agents + [agent]) + step.location_description = context["location"] + step.available_actions = context["available_actions"] + + # Turn state for action economy + turn_state = TurnState() + + # Error feedback for retry loop + last_error = None + MAX_RETRIES = 3 + + # Action loop - handle free actions until turn-ending action + while not turn_state.turn_ended: + # Build prompt with current state (includes error feedback if any) + prompt = self._build_prompt(agent, context, step.poi_prompt, messages, turn_state, last_error) + step.llm_prompt_user = prompt # Store last prompt + + # Query LLM + print(f" Querying LLM...") + response = self.llm_query_fn(agent, screenshot_path, { + **context, + "poi_prompt": step.poi_prompt, + "messages": [asdict(m) for m in messages], + "has_spoken": turn_state.has_spoken, + "last_error": last_error, + "conversation_history": step.llm_exchanges # Include past exchanges + }) + step.llm_response = response + print(f" Response: {response[:200]}...") + + # Parse action + action = parse_action(response) + cost = get_action_cost(action) + + print(f" Action: {action.type.value} {action.args} (cost: {cost.value})") + + # Track this exchange + exchange = { + "prompt": prompt[:500], # Truncate for storage + "response": response, + "action_type": action.type.value, + "action_args": action.args, + "error": None + } + + # Execute action based on type + if action.type == ActionType.LOOK: + result = self.executor.execute_look(agent, action) + turn_state.record_free_action("LOOK", { + "target": result.target_name, + "description": result.description + }) + step.free_actions.append({ + "action_type": "LOOK", + "args": action.args, + "result": {"description": result.description} + }) + # Provide result and continue loop for another action + context["look_result"] = result.description + last_error = None # Clear error on success + print(f" LOOK result: {result.description[:100]}...") + + elif action.type in (ActionType.SPEAK, ActionType.ANNOUNCE): + if not turn_state.can_speak(): + print(f" Already spoke this turn") + last_error = "You have already spoken this turn. Choose a different action." + exchange["error"] = last_error + step.action_retries += 1 + if step.action_retries >= MAX_RETRIES: + # Force end turn + step.final_action_type = "WAIT" + step.final_action_args = () + step.final_action_success = False + step.final_action_message = "Too many invalid actions - turn ended" + step.position_end = agent.pos + turn_state.end_turn() + else: + result = self.executor.execute_speech( + agent, action, self.agents, self.turn_number + ) + turn_state.record_speech() + turn_state.record_free_action(action.type.value, { + "content": result.content, + "recipients": result.recipients + }) + step.free_actions.append({ + "action_type": action.type.value, + "args": action.args, + "result": { + "content": result.content, + "recipients": result.recipients + } + }) + # Record in speech log + self.speech_log.append({ + "turn": self.turn_number, + "speaker": agent.name, + "type": result.speech_type, + "content": result.content, + "recipients": result.recipients + }) + last_error = None + print(f" {result.speech_type.upper()}: {result.content[:50]}... -> {result.recipients}") + # Continue loop for another action (can still move) + + elif action.type == ActionType.TAKE: + result = self.executor.execute_take(agent, action) + if result.success: + step.final_action_type = "TAKE" + step.final_action_args = action.args + step.final_action_success = True + step.final_action_message = result.message + step.position_end = agent.pos + last_error = None + turn_state.end_turn() + print(f" TAKE: {result.message}") + else: + # Failed - give error feedback and let LLM try again + last_error = result.message + exchange["error"] = last_error + step.action_retries += 1 + print(f" TAKE FAILED: {result.message}") + if step.action_retries >= MAX_RETRIES: + step.final_action_type = "TAKE" + step.final_action_args = action.args + step.final_action_success = False + step.final_action_message = result.message + step.position_end = agent.pos + turn_state.end_turn() + + elif action.type == ActionType.GO: + result = self.executor.execute_move(agent, action) + if result.success: + step.final_action_type = "GO" + step.final_action_args = action.args + step.final_action_success = True + step.final_action_message = result.message + step.position_end = result.new_position or agent.pos + step.path_taken = result.path or [] + last_error = None + turn_state.end_turn() + print(f" MOVE: {result.message}") + else: + # Failed - give error feedback + last_error = result.message + exchange["error"] = last_error + step.action_retries += 1 + print(f" MOVE FAILED: {result.message}") + if step.action_retries >= MAX_RETRIES: + step.final_action_type = "GO" + step.final_action_args = action.args + step.final_action_success = False + step.final_action_message = result.message + step.position_end = agent.pos + turn_state.end_turn() + + elif action.type == ActionType.WAIT: + result = self.executor.execute_wait(agent, action) + step.final_action_type = "WAIT" + step.final_action_args = () + step.final_action_success = True + step.final_action_message = result.message + step.position_end = agent.pos + last_error = None + turn_state.end_turn() + print(f" WAIT") + + elif action.type == ActionType.INVALID: + # Could not parse action - give feedback + last_error = f"Could not understand your action. Please use a valid action format like 'Action: GO EAST' or 'Action: TAKE key'." + exchange["error"] = last_error + step.action_retries += 1 + print(f" INVALID ACTION: {action.args}") + if step.action_retries >= MAX_RETRIES: + step.final_action_type = "INVALID" + step.final_action_args = action.args + step.final_action_success = False + step.final_action_message = "Could not parse action" + step.position_end = agent.pos + turn_state.end_turn() + + else: + # Unimplemented action type - give feedback + last_error = f"The action '{action.type.value}' is not yet supported. Try GO, TAKE, LOOK, SPEAK, or WAIT." + exchange["error"] = last_error + step.action_retries += 1 + print(f" Unsupported: {action.type.value}") + if step.action_retries >= MAX_RETRIES: + step.final_action_type = action.type.value + step.final_action_args = action.args + step.final_action_success = False + step.final_action_message = f"Unsupported action: {action.type.value}" + step.position_end = agent.pos + turn_state.end_turn() + + # Record exchange + step.llm_exchanges.append(exchange) + + return step + + def _build_prompt(self, agent, context: dict, poi_prompt: str, + messages: List[Message], turn_state: TurnState, + last_error: Optional[str] = None) -> str: + """Build LLM prompt with current state and error feedback.""" + parts = [context["location"]] + + # Add messages received + if messages: + parts.append("\nMessages received:") + for msg in messages: + if msg.speech_type == "announce": + parts.append(f' {msg.sender} announces: "{msg.content}"') + else: + parts.append(f' {msg.sender} says: "{msg.content}"') + + # Add points of interest + parts.append(f"\n{poi_prompt}") + + # Add available actions + actions_str = ", ".join(context["available_actions"]) + parts.append(f"\nAvailable actions: {actions_str}") + + # Add LOOK result if we just looked + if "look_result" in context: + parts.append(f"\n[LOOK result: {context['look_result']}]") + + # Add constraints + constraints = [] + if turn_state.has_spoken: + constraints.append("You have already spoken this turn.") + if constraints: + parts.append(f"\nConstraints: {' '.join(constraints)}") + + # Add error feedback from last action attempt + if last_error: + parts.append(f"\n[ERROR: {last_error}]") + parts.append("[Please try a different action.]") + + parts.append("\nWhat do you do? Brief reasoning, then Action: ") + + return "\n".join(parts) + + def _switch_perspective(self, agent): + """Switch grid view to agent's perspective.""" + import mcrfpy + + self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + self.fov_layer.apply_perspective( + entity=agent.entity, + visible=mcrfpy.Color(0, 0, 0, 0), + discovered=mcrfpy.Color(40, 40, 60, 180), + unknown=mcrfpy.Color(0, 0, 0, 255) + ) + agent.entity.update_visibility() + + px, py = agent.pos + self.grid.center = (px * 16 + 8, py * 16 + 8) + + def _get_visible_agents(self, observer) -> list: + """Get agents visible to observer based on FOV.""" + visible = [] + for agent in self.agents: + if agent.name == observer.name: + continue + ax, ay = agent.pos + if self.grid.is_in_fov(ax, ay): + visible.append(agent) + return visible + + def _get_visible_entity_ids(self, agent) -> Set[str]: + """Get set of entity IDs currently visible to agent.""" + visible = set() + ax, ay = agent.pos + + for entity in self.grid.entities: + if entity is agent.entity: + continue + ex, ey = int(entity.pos[0]), int(entity.pos[1]) + if self.grid.is_in_fov(ex, ey): + entity_id = getattr(entity, 'id', None) or str(id(entity)) + visible.add(entity_id) + + return visible diff --git a/tools/generate_stubs_v2.py b/tools/generate_stubs_v2.py index 77e8ddb..3de0ad4 100644 --- a/tools/generate_stubs_v2.py +++ b/tools/generate_stubs_v2.py @@ -87,22 +87,31 @@ class Font: class Drawable: """Base class for all drawable UI elements.""" - + x: float y: float visible: bool z_index: int name: str pos: Vector - + + # Mouse event callbacks (#140, #141) + on_click: Optional[Callable[[float, float, int, str], None]] + on_enter: Optional[Callable[[float, float, int, str], None]] + on_exit: Optional[Callable[[float, float, int, str], None]] + on_move: Optional[Callable[[float, float, int, str], None]] + + # Read-only hover state (#140) + hovered: bool + def get_bounds(self) -> Tuple[float, float, float, float]: """Get bounding box as (x, y, width, height).""" ... - + def move(self, dx: float, dy: float) -> None: """Move by relative offset (dx, dy).""" ... - + def resize(self, width: float, height: float) -> None: """Resize to new dimensions (width, height).""" ... @@ -343,45 +352,47 @@ class EntityCollection: class Scene: """Base class for object-oriented scenes.""" - + name: str - + children: UICollection # #151: UI elements collection (read-only alias for get_ui()) + on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action) + def __init__(self, name: str) -> None: ... - + def activate(self) -> None: """Called when scene becomes active.""" ... - + def deactivate(self) -> None: """Called when scene becomes inactive.""" ... - + def get_ui(self) -> UICollection: """Get UI elements collection.""" ... - + def on_keypress(self, key: str, pressed: bool) -> None: - """Handle keyboard events.""" + """Handle keyboard events (override in subclass).""" ... - + def on_click(self, x: float, y: float, button: int) -> None: - """Handle mouse clicks.""" + """Handle mouse clicks (override in subclass).""" ... - + def on_enter(self) -> None: - """Called when entering the scene.""" + """Called when entering the scene (override in subclass).""" ... - + def on_exit(self) -> None: - """Called when leaving the scene.""" + """Called when leaving the scene (override in subclass).""" ... - + def on_resize(self, width: int, height: int) -> None: - """Handle window resize events.""" + """Handle window resize events (override in subclass).""" ... - + def update(self, dt: float) -> None: - """Update scene logic.""" + """Update scene logic (override in subclass).""" ... class Timer: