diff --git a/stubs/mcrfpy.pyi b/stubs/mcrfpy.pyi index bc0c00f..c89e2e8 100644 --- a/stubs/mcrfpy.pyi +++ b/stubs/mcrfpy.pyi @@ -75,31 +75,22 @@ class Font: class Drawable: """Base class for all drawable UI elements.""" - + x: float y: float visible: bool z_index: int name: str pos: Vector - - # Mouse event callbacks (#140, #141) - on_click: Optional[Callable[[float, float, int, str], None]] - on_enter: Optional[Callable[[float, float, int, str], None]] - on_exit: Optional[Callable[[float, float, int, str], None]] - on_move: Optional[Callable[[float, float, int, str], None]] - - # Read-only hover state (#140) - hovered: bool - + def get_bounds(self) -> Tuple[float, float, float, float]: """Get bounding box as (x, y, width, height).""" ... - + def move(self, dx: float, dy: float) -> None: """Move by relative offset (dx, dy).""" ... - + def resize(self, width: float, height: float) -> None: """Resize to new dimensions (width, height).""" ... @@ -340,47 +331,45 @@ class EntityCollection: class Scene: """Base class for object-oriented scenes.""" - + name: str - children: UICollection # #151: UI elements collection (read-only alias for get_ui()) - on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action) - + def __init__(self, name: str) -> None: ... - + def activate(self) -> None: """Called when scene becomes active.""" ... - + def deactivate(self) -> None: """Called when scene becomes inactive.""" ... - + def get_ui(self) -> UICollection: """Get UI elements collection.""" ... - + def on_keypress(self, key: str, pressed: bool) -> None: - """Handle keyboard events (override in subclass).""" + """Handle keyboard events.""" ... - + def on_click(self, x: float, y: float, button: int) -> None: - """Handle mouse clicks (override in subclass).""" + """Handle mouse clicks.""" ... - + def on_enter(self) -> None: - """Called when entering the scene (override in subclass).""" + """Called when entering the scene.""" ... - + def on_exit(self) -> None: - """Called when leaving the scene (override in subclass).""" + """Called when leaving the scene.""" ... - + def on_resize(self, width: int, height: int) -> None: - """Handle window resize events (override in subclass).""" + """Handle window resize events.""" ... - + def update(self, dt: float) -> None: - """Update scene logic (override in subclass).""" + """Update scene logic.""" ... class Timer: diff --git a/tests/demo/screens/focus_system_demo.py b/tests/demo/screens/focus_system_demo.py deleted file mode 100644 index fc3ba88..0000000 --- a/tests/demo/screens/focus_system_demo.py +++ /dev/null @@ -1,808 +0,0 @@ -#!/usr/bin/env python3 -"""Focus System Demo for McRogueFace - -Demonstrates a Python-level focus management system using engine primitives. -This shows how game developers can implement keyboard navigation without -requiring C++ engine changes. - -Features demonstrated: -- Click-to-focus -- Tab/Shift+Tab cycling -- Visual focus indicators -- Keyboard routing to focused widget -- Modal focus stack -- Three widget types: Grid (WASD), TextInput, MenuIcon - -Issue: #143 -""" - -import mcrfpy -import sys - -# ============================================================================= -# Modifier Key Tracker (workaround until #160 is implemented) -# ============================================================================= - -class ModifierTracker: - """Tracks modifier key state since engine doesn't expose this yet.""" - - def __init__(self): - self.shift = False - self.ctrl = False - self.alt = False - - def update(self, key: str, action: str): - """Call this from your key handler to update modifier state.""" - if key in ("LShift", "RShift"): - self.shift = (action == "start") - elif key in ("LControl", "RControl"): - self.ctrl = (action == "start") - elif key in ("LAlt", "RAlt"): - self.alt = (action == "start") - - -# ============================================================================= -# Focus Manager -# ============================================================================= - -class FocusManager: - """Central focus coordinator for a scene. - - Manages which widget receives keyboard input, handles tab cycling, - and maintains a modal stack for popup dialogs. - """ - - # Focus indicator colors - FOCUS_COLOR = mcrfpy.Color(0, 150, 255) # Blue - UNFOCUS_COLOR = mcrfpy.Color(80, 80, 80) # Dark gray - FOCUS_OUTLINE = 3.0 - UNFOCUS_OUTLINE = 1.0 - - def __init__(self): - self.widgets = [] # List of (widget, focusable: bool) - self.focus_index = -1 # Currently focused widget index - self.modal_stack = [] # Stack of (modal_frame, previous_focus_index) - self.modifiers = ModifierTracker() - - def register(self, widget, focusable: bool = True): - """Add a widget to the focus order. - - Args: - widget: Object implementing on_focus(), on_blur(), handle_key() - focusable: Whether this widget can receive focus via Tab - """ - self.widgets.append((widget, focusable)) - # Give widget a reference back to us for click-to-focus - widget._focus_manager = self - widget._focus_index = len(self.widgets) - 1 - - def focus(self, widget_or_index): - """Set focus to a specific widget.""" - # Resolve to index - if isinstance(widget_or_index, int): - new_index = widget_or_index - else: - new_index = next( - (i for i, (w, _) in enumerate(self.widgets) if w is widget_or_index), - -1 - ) - - if new_index < 0 or new_index >= len(self.widgets): - return - - # Blur old widget - if 0 <= self.focus_index < len(self.widgets): - old_widget, _ = self.widgets[self.focus_index] - if hasattr(old_widget, 'on_blur'): - old_widget.on_blur() - - # Focus new widget - self.focus_index = new_index - new_widget, _ = self.widgets[new_index] - if hasattr(new_widget, 'on_focus'): - new_widget.on_focus() - - def cycle(self, direction: int = 1): - """Cycle focus to next/previous focusable widget. - - Args: - direction: 1 for next (Tab), -1 for previous (Shift+Tab) - """ - if not self.widgets: - return - - start = self.focus_index if self.focus_index >= 0 else 0 - current = start - - for _ in range(len(self.widgets)): - current = (current + direction) % len(self.widgets) - widget, focusable = self.widgets[current] - if focusable: - self.focus(current) - return - - # No focusable widget found, stay where we are - - def push_modal(self, modal_frame, first_focus_widget=None): - """Push a modal onto the focus stack. - - Args: - modal_frame: The Frame to show as modal - first_focus_widget: Widget to focus inside modal (optional) - """ - # Save current focus - self.modal_stack.append((modal_frame, self.focus_index)) - - # Show modal - modal_frame.visible = True - - # Focus first widget in modal if specified - if first_focus_widget is not None: - self.focus(first_focus_widget) - - def pop_modal(self): - """Pop the top modal and restore previous focus.""" - if not self.modal_stack: - return False - - modal_frame, previous_focus = self.modal_stack.pop() - modal_frame.visible = False - - # Restore focus - if previous_focus >= 0: - self.focus(previous_focus) - - return True - - def handle_key(self, key: str, action: str) -> bool: - """Main key handler - route to focused widget or handle global keys. - - Returns True if key was consumed. - """ - # Always update modifier state - self.modifiers.update(key, action) - - # Only process on key press, not release (key repeat sends multiple "start") - if action != "start": - return False - - # Global: Escape closes modals - if key == "Escape": - if self.pop_modal(): - return True - - # Global: Tab cycles focus - if key == "Tab": - direction = -1 if self.modifiers.shift else 1 - self.cycle(direction) - return True - - # Route to focused widget - if 0 <= self.focus_index < len(self.widgets): - widget, _ = self.widgets[self.focus_index] - if hasattr(widget, 'handle_key'): - if widget.handle_key(key, action): - return True - - return False - - -# ============================================================================= -# Focusable Widgets -# ============================================================================= - -class FocusableGrid: - """A grid where WASD keys move a player entity. - - Demonstrates focus on a game-world element. - """ - - def __init__(self, x: float, y: float, grid_w: int, grid_h: int, - tile_size: int = 16, zoom: float = 2.0): - self.grid_w = grid_w - self.grid_h = grid_h - self.tile_size = tile_size - self.zoom = zoom - self.base_x = x - self.base_y = y - - # Calculate pixel dimensions - self.cell_px = tile_size * zoom # Pixels per cell - grid_pixel_w = grid_w * self.cell_px - grid_pixel_h = grid_h * self.cell_px - - # Create the grid background - self.grid = mcrfpy.Grid( - pos=(x, y), - grid_size=(grid_w, grid_h), - size=(grid_pixel_w, grid_pixel_h) - ) - self.grid.zoom = zoom - self.grid.fill_color = mcrfpy.Color(40, 40, 55) - - # Add outline frame for focus indication - self.outline_frame = mcrfpy.Frame( - pos=(x - 2, y - 2), - size=(grid_pixel_w + 4, grid_pixel_h + 4), - fill_color=mcrfpy.Color(0, 0, 0, 0), - outline_color=FocusManager.UNFOCUS_COLOR, - outline=FocusManager.UNFOCUS_OUTLINE - ) - - # Player marker (a bright square overlay) - self.player_x = grid_w // 2 - self.player_y = grid_h // 2 - marker_size = self.cell_px - 4 # Slightly smaller than cell - self.player_marker = mcrfpy.Frame( - pos=(0, 0), # Will be positioned by _update_player_display - size=(marker_size, marker_size), - fill_color=mcrfpy.Color(255, 200, 50), - outline_color=mcrfpy.Color(255, 150, 0), - outline=2 - ) - self._update_player_display() - - # Click handler - self.grid.on_click = self._on_click - - # Focus manager reference (set by FocusManager.register) - self._focus_manager = None - self._focus_index = -1 - - def _on_click(self, x, y, button, action): - """Handle click to focus this grid.""" - if self._focus_manager and action == "start": - self._focus_manager.focus(self._focus_index) - - def _update_player_display(self): - """Update the visual representation of player position.""" - # Position the player marker - px = self.base_x + (self.player_x * self.cell_px) + 2 - py = self.base_y + (self.player_y * self.cell_px) + 2 - self.player_marker.x = px - self.player_marker.y = py - - def on_focus(self): - """Called when this widget gains focus.""" - self.outline_frame.outline_color = FocusManager.FOCUS_COLOR - self.outline_frame.outline = FocusManager.FOCUS_OUTLINE - - def on_blur(self): - """Called when this widget loses focus.""" - self.outline_frame.outline_color = FocusManager.UNFOCUS_COLOR - self.outline_frame.outline = FocusManager.UNFOCUS_OUTLINE - - def handle_key(self, key: str, action: str) -> bool: - """Handle WASD movement.""" - moves = { - "W": (0, -1), "Up": (0, -1), - "A": (-1, 0), "Left": (-1, 0), - "S": (0, 1), "Down": (0, 1), - "D": (1, 0), "Right": (1, 0), - } - - if key in moves: - dx, dy = moves[key] - new_x = self.player_x + dx - new_y = self.player_y + dy - - # Bounds check - if 0 <= new_x < self.grid_w and 0 <= new_y < self.grid_h: - self.player_x = new_x - self.player_y = new_y - self._update_player_display() - return True - - return False - - def add_to_scene(self, ui): - """Add all components to a scene's UI collection.""" - ui.append(self.outline_frame) - ui.append(self.grid) - ui.append(self.player_marker) - - -class TextInputWidget: - """A text input field with cursor and editing. - - Demonstrates text entry with focus indication. - """ - - def __init__(self, x: float, y: float, width: float, label: str = "", - placeholder: str = ""): - self.x = x - self.y = y - self.width = width - self.height = 28 - self.label_text = label - self.placeholder_text = placeholder - - # State - self.text = "" - self.cursor_pos = 0 - self.focused = False - - # Create UI elements - self._create_ui() - - # Focus manager reference - self._focus_manager = None - self._focus_index = -1 - - def _create_ui(self): - """Create the visual components.""" - # Label above input - if self.label_text: - self.label = mcrfpy.Caption( - text=self.label_text, - pos=(self.x, self.y - 20) - ) - self.label.fill_color = mcrfpy.Color(200, 200, 200) - - # Input background - self.frame = mcrfpy.Frame( - pos=(self.x, self.y), - size=(self.width, self.height), - fill_color=mcrfpy.Color(40, 40, 50), - outline_color=FocusManager.UNFOCUS_COLOR, - outline=FocusManager.UNFOCUS_OUTLINE - ) - self.frame.on_click = self._on_click - - # Placeholder text - self.placeholder = mcrfpy.Caption( - text=self.placeholder_text, - pos=(self.x + 6, self.y + 5) - ) - self.placeholder.fill_color = mcrfpy.Color(100, 100, 100) - - # Actual text display - self.display = mcrfpy.Caption( - text="", - pos=(self.x + 6, self.y + 5) - ) - self.display.fill_color = mcrfpy.Color(255, 255, 255) - - # Cursor (thin frame) - self.cursor = mcrfpy.Frame( - pos=(self.x + 6, self.y + 4), - size=(2, self.height - 8), - fill_color=mcrfpy.Color(255, 255, 255) - ) - self.cursor.visible = False - - def _on_click(self, x, y, button, action): - """Handle click to focus.""" - if self._focus_manager and action == "start": - self._focus_manager.focus(self._focus_index) - - def _update_display(self): - """Update visual state.""" - self.display.text = self.text - self.placeholder.visible = (not self.text and not self.focused) - self._update_cursor() - - def _update_cursor(self): - """Update cursor position.""" - # Approximate character width (monospace assumption) - char_width = 10 - self.cursor.x = self.x + 6 + (self.cursor_pos * char_width) - - def on_focus(self): - """Called when gaining focus.""" - self.focused = True - self.frame.outline_color = FocusManager.FOCUS_COLOR - self.frame.outline = FocusManager.FOCUS_OUTLINE - self.cursor.visible = True - self._update_display() - - def on_blur(self): - """Called when losing focus.""" - self.focused = False - self.frame.outline_color = FocusManager.UNFOCUS_COLOR - self.frame.outline = FocusManager.UNFOCUS_OUTLINE - self.cursor.visible = False - self._update_display() - - def handle_key(self, key: str, action: str) -> bool: - """Handle text input and editing keys.""" - if not self.focused: - return False - - old_text = self.text - handled = True - - if key == "BackSpace": - if self.cursor_pos > 0: - self.text = self.text[:self.cursor_pos-1] + self.text[self.cursor_pos:] - self.cursor_pos -= 1 - elif key == "Delete": - if self.cursor_pos < len(self.text): - self.text = self.text[:self.cursor_pos] + self.text[self.cursor_pos+1:] - elif key == "Left": - self.cursor_pos = max(0, self.cursor_pos - 1) - elif key == "Right": - self.cursor_pos = min(len(self.text), self.cursor_pos + 1) - elif key == "Home": - self.cursor_pos = 0 - elif key == "End": - self.cursor_pos = len(self.text) - elif key in ("Return", "Tab"): - # Don't consume - let focus manager handle - handled = False - elif len(key) == 1 and key.isprintable(): - # Insert character - self.text = self.text[:self.cursor_pos] + key + self.text[self.cursor_pos:] - self.cursor_pos += 1 - else: - handled = False - - self._update_display() - return handled - - def get_text(self) -> str: - """Get the current text value.""" - return self.text - - def set_text(self, text: str): - """Set the text value.""" - self.text = text - self.cursor_pos = len(text) - self._update_display() - - def add_to_scene(self, ui): - """Add all components to the scene.""" - if hasattr(self, 'label'): - ui.append(self.label) - ui.append(self.frame) - ui.append(self.placeholder) - ui.append(self.display) - ui.append(self.cursor) - - -class MenuIcon: - """An icon that opens a modal dialog when activated. - - Demonstrates activation via Space/Enter and modal focus. - """ - - def __init__(self, x: float, y: float, size: float, icon_char: str, - tooltip: str, modal_content_builder=None): - self.x = x - self.y = y - self.size = size - self.tooltip = tooltip - self.modal_content_builder = modal_content_builder - self.modal = None - - # Create icon frame - self.frame = mcrfpy.Frame( - pos=(x, y), - size=(size, size), - fill_color=mcrfpy.Color(60, 60, 80), - outline_color=FocusManager.UNFOCUS_COLOR, - outline=FocusManager.UNFOCUS_OUTLINE - ) - self.frame.on_click = self._on_click - - # Icon character (centered) - self.icon = mcrfpy.Caption( - text=icon_char, - pos=(x + size//3, y + size//6) - ) - self.icon.fill_color = mcrfpy.Color(200, 200, 220) - - # Tooltip (shown on hover/focus) - self.tooltip_caption = mcrfpy.Caption( - text=tooltip, - pos=(x, y + size + 4) - ) - self.tooltip_caption.fill_color = mcrfpy.Color(150, 150, 150) - self.tooltip_caption.visible = False - - # Focus manager reference - self._focus_manager = None - self._focus_index = -1 - - def _on_click(self, x, y, button, action): - """Handle click to focus or activate.""" - if not self._focus_manager: - return - - if action == "start": - # If already focused, activate; otherwise just focus - if self._focus_manager.focus_index == self._focus_index: - self._activate() - else: - self._focus_manager.focus(self._focus_index) - - def _activate(self): - """Open the modal dialog.""" - if self.modal and self._focus_manager: - self._focus_manager.push_modal(self.modal) - - def on_focus(self): - """Called when gaining focus.""" - self.frame.outline_color = FocusManager.FOCUS_COLOR - self.frame.outline = FocusManager.FOCUS_OUTLINE - self.frame.fill_color = mcrfpy.Color(80, 80, 110) - self.tooltip_caption.visible = True - - def on_blur(self): - """Called when losing focus.""" - self.frame.outline_color = FocusManager.UNFOCUS_COLOR - self.frame.outline = FocusManager.UNFOCUS_OUTLINE - self.frame.fill_color = mcrfpy.Color(60, 60, 80) - self.tooltip_caption.visible = False - - def handle_key(self, key: str, action: str) -> bool: - """Handle activation keys.""" - if key in ("Space", "Return"): - self._activate() - return True - return False - - def set_modal(self, modal_frame): - """Set the modal frame this icon opens.""" - self.modal = modal_frame - - def add_to_scene(self, ui): - """Add all components to the scene.""" - ui.append(self.frame) - ui.append(self.icon) - ui.append(self.tooltip_caption) - - -# ============================================================================= -# Modal Dialog Builder -# ============================================================================= - -def create_modal(x: float, y: float, width: float, height: float, - title: str) -> mcrfpy.Frame: - """Create a modal dialog frame.""" - # Semi-transparent backdrop - # Note: This is simplified - real implementation might want fullscreen backdrop - - # Modal frame - modal = mcrfpy.Frame( - pos=(x, y), - size=(width, height), - fill_color=mcrfpy.Color(40, 40, 50), - outline_color=mcrfpy.Color(100, 100, 120), - outline=2 - ) - modal.visible = False - - # Title - title_caption = mcrfpy.Caption( - text=title, - pos=(x + 10, y + 8) - ) - title_caption.fill_color = mcrfpy.Color(220, 220, 240) - modal.children.append(title_caption) - - # Close hint - close_hint = mcrfpy.Caption( - text="[Esc to close]", - pos=(x + width - 100, y + 8) - ) - close_hint.fill_color = mcrfpy.Color(120, 120, 140) - modal.children.append(close_hint) - - return modal - - -# ============================================================================= -# Demo Scene Setup -# ============================================================================= - -def create_demo_scene(): - """Create and populate the focus system demo scene.""" - - # Create scene - mcrfpy.createScene("focus_demo") - ui = mcrfpy.sceneUI("focus_demo") - - # Background - bg = mcrfpy.Frame( - pos=(0, 0), - size=(1024, 768), - fill_color=mcrfpy.Color(25, 25, 35) - ) - ui.append(bg) - - # Title - title = mcrfpy.Caption( - text="Focus System Demo", - pos=(20, 15) - ) - title.fill_color = mcrfpy.Color(255, 255, 255) - ui.append(title) - - # Instructions - instructions = mcrfpy.Caption( - text="Tab: cycle focus | Shift+Tab: reverse | WASD: move in grid | Space/Enter: activate | Esc: close modal", - pos=(20, 45) - ) - instructions.fill_color = mcrfpy.Color(150, 150, 170) - ui.append(instructions) - - # Create focus manager - focus_mgr = FocusManager() - - # --- Grid Section --- - grid_label = mcrfpy.Caption(text="Game Grid (WASD to move)", pos=(50, 90)) - grid_label.fill_color = mcrfpy.Color(180, 180, 200) - ui.append(grid_label) - - grid_widget = FocusableGrid(50, 115, 10, 8, tile_size=16, zoom=2.0) - grid_widget.add_to_scene(ui) - focus_mgr.register(grid_widget) - - # --- Text Inputs Section --- - input_label = mcrfpy.Caption(text="Text Inputs", pos=(400, 90)) - input_label.fill_color = mcrfpy.Color(180, 180, 200) - ui.append(input_label) - - name_input = TextInputWidget(400, 130, 250, label="Name:", placeholder="Enter your name") - name_input.add_to_scene(ui) - focus_mgr.register(name_input) - - class_input = TextInputWidget(400, 200, 250, label="Class:", placeholder="e.g. Warrior, Mage") - class_input.add_to_scene(ui) - focus_mgr.register(class_input) - - notes_input = TextInputWidget(400, 270, 350, label="Notes:", placeholder="Additional notes...") - notes_input.add_to_scene(ui) - focus_mgr.register(notes_input) - - # --- Menu Icons Section --- - icons_label = mcrfpy.Caption(text="Menu Icons", pos=(50, 390)) - icons_label.fill_color = mcrfpy.Color(180, 180, 200) - ui.append(icons_label) - - # Help icon - help_icon = MenuIcon(50, 420, 48, "?", "Help") - help_icon.add_to_scene(ui) - focus_mgr.register(help_icon) - - help_modal = create_modal(200, 150, 400, 300, "Help") - ui.append(help_modal) - help_text = mcrfpy.Caption( - text="This demo shows focus management.\n\nUse Tab to move between widgets.\nWASD moves the player in the grid.\nType in text fields.\nPress Space on icons to open dialogs.", - pos=(210, 190) - ) - help_text.fill_color = mcrfpy.Color(200, 200, 200) - help_modal.children.append(help_text) - help_icon.set_modal(help_modal) - - # Settings icon - settings_icon = MenuIcon(110, 420, 48, "S", "Settings") - settings_icon.add_to_scene(ui) - focus_mgr.register(settings_icon) - - settings_modal = create_modal(200, 150, 400, 250, "Settings") - ui.append(settings_modal) - settings_text = mcrfpy.Caption( - text="Settings would go here.\n\n(This is a placeholder modal)", - pos=(210, 190) - ) - settings_text.fill_color = mcrfpy.Color(200, 200, 200) - settings_modal.children.append(settings_text) - settings_icon.set_modal(settings_modal) - - # Inventory icon - inv_icon = MenuIcon(170, 420, 48, "I", "Inventory") - inv_icon.add_to_scene(ui) - focus_mgr.register(inv_icon) - - inv_modal = create_modal(200, 150, 400, 300, "Inventory") - ui.append(inv_modal) - inv_text = mcrfpy.Caption( - text="Your inventory:\n\n- Sword\n- Shield\n- 3x Potions", - pos=(210, 190) - ) - inv_text.fill_color = mcrfpy.Color(200, 200, 200) - inv_modal.children.append(inv_text) - inv_icon.set_modal(inv_modal) - - # --- Status Display --- - status_frame = mcrfpy.Frame( - pos=(50, 520), - size=(700, 80), - fill_color=mcrfpy.Color(35, 35, 45), - outline_color=mcrfpy.Color(60, 60, 70), - outline=1 - ) - ui.append(status_frame) - - status_label = mcrfpy.Caption(text="Status", pos=(60, 530)) - status_label.fill_color = mcrfpy.Color(150, 150, 170) - ui.append(status_label) - - status_text = mcrfpy.Caption(text="Click or Tab to focus a widget", pos=(60, 555)) - status_text.fill_color = mcrfpy.Color(200, 200, 200) - ui.append(status_text) - - # Store references for status updates - demo_state = { - 'focus_mgr': focus_mgr, - 'status_text': status_text, - 'grid': grid_widget, - 'inputs': [name_input, class_input, notes_input], - 'icons': [help_icon, settings_icon, inv_icon], - } - - # Key handler that routes to focus manager - def on_key(key: str, action: str): - focus_mgr.handle_key(key, action) - - # Update status display - if focus_mgr.focus_index >= 0: - widget, _ = focus_mgr.widgets[focus_mgr.focus_index] - if widget is grid_widget: - status_text.text = f"Grid focused - Player at ({grid_widget.player_x}, {grid_widget.player_y})" - elif widget in demo_state['inputs']: - idx = demo_state['inputs'].index(widget) - labels = ["Name", "Class", "Notes"] - status_text.text = f"{labels[idx]} input focused - Text: '{widget.get_text()}'" - elif widget in demo_state['icons']: - status_text.text = f"Icon focused: {widget.tooltip}" - else: - status_text.text = "No widget focused" - - # Activate scene first (keypressScene sets handler for CURRENT scene) - mcrfpy.setScene("focus_demo") - - # Register key handler for the now-current scene - mcrfpy.keypressScene(on_key) - - # Set initial focus - focus_mgr.focus(0) - - return demo_state - - -# ============================================================================= -# Entry Point -# ============================================================================= - -def run_demo(): - """Run the focus system demo.""" - print("=== Focus System Demo ===") - print("Demonstrating Python-level focus management") - print() - print("Controls:") - print(" Tab / Shift+Tab - Cycle between widgets") - print(" WASD / Arrows - Move player in grid (when focused)") - print(" Type - Enter text in inputs (when focused)") - print(" Space / Enter - Activate icons (when focused)") - print(" Escape - Close modal dialogs") - print(" Click - Focus clicked widget") - print() - - demo_state = create_demo_scene() - - # Set up exit timer for headless testing - def check_exit(dt): - # In headless mode, exit after a short delay - # In interactive mode, this won't trigger - pass - - # mcrfpy.setTimer("demo_check", check_exit, 100) - - -# Run if executed directly -if __name__ == "__main__": - import sys - from mcrfpy import automation - - run_demo() - - # If --screenshot flag, take a screenshot and exit - if "--screenshot" in sys.argv or len(sys.argv) > 1: - def take_screenshot(dt): - automation.screenshot("focus_demo_screenshot.png") - print("Screenshot saved: focus_demo_screenshot.png") - sys.exit(0) - mcrfpy.setTimer("screenshot", take_screenshot, 200) diff --git a/tests/vllm_demo/1_multi_agent_demo.py b/tests/vllm_demo/1_multi_agent_demo.py index b69bccb..debc98e 100644 --- a/tests/vllm_demo/1_multi_agent_demo.py +++ b/tests/vllm_demo/1_multi_agent_demo.py @@ -14,15 +14,12 @@ Three agents: Each agent gets their own screenshot and VLLM query. """ -import sys -import os -# Add the vllm_demo directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - import mcrfpy from mcrfpy import automation +import sys import requests import base64 +import os import random from action_parser import parse_action diff --git a/tests/vllm_demo/4_enhanced_action_demo.py b/tests/vllm_demo/4_enhanced_action_demo.py deleted file mode 100644 index 2986733..0000000 --- a/tests/vllm_demo/4_enhanced_action_demo.py +++ /dev/null @@ -1,436 +0,0 @@ -#!/usr/bin/env python3 -""" -Enhanced Action Demo -==================== - -Demonstrates the enhanced action economy system: -- Free actions (LOOK, SPEAK/ANNOUNCE) vs turn-ending (MOVE, WAIT) -- Points of interest targeting for LOOK/MOVE -- Speech system with room-wide ANNOUNCE and proximity SPEAK -- Multi-tile path continuation with FOV interrupts -- Enhanced logging for offline viewer replay - -This implements the turn-based LLM agent orchestration from issue #156. -""" - -import sys -import os -# Add the vllm_demo directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -import mcrfpy -from mcrfpy import automation -import requests -import base64 - -from world_graph import ( - WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, - create_two_room_scenario, create_button_door_scenario -) -from action_parser import parse_action -from enhanced_executor import EnhancedExecutor -from enhanced_orchestrator import EnhancedOrchestrator, EnhancedSimulationLog - -# Configuration -VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" -SCREENSHOT_DIR = "/tmp/vllm_enhanced_demo" -LOG_PATH = "/tmp/vllm_enhanced_demo/simulation_log.json" -MAX_TURNS = 3 - -# Sprites -FLOOR_TILE = 0 -WALL_TILE = 40 -WIZARD_SPRITE = 84 -KNIGHT_SPRITE = 96 -RAT_SPRITE = 123 - - -class Agent: - """Agent with WorldGraph integration.""" - - def __init__(self, name: str, display_name: str, entity, world: WorldGraph): - self.name = name - self.display_name = display_name - self.entity = entity - self.world = world - self.message_history = [] - - @property - def pos(self) -> tuple: - return (int(self.entity.pos[0]), int(self.entity.pos[1])) - - @property - def current_room(self) -> str: - room = self.world.room_at(*self.pos) - return room.name if room else None - - def get_context(self, visible_agents: list) -> dict: - """Build context for LLM query.""" - room_name = self.current_room - agent_infos = [ - AgentInfo( - name=a.name, - display_name=a.display_name, - position=a.pos, - is_player=(a.name == self.name) - ) - for a in visible_agents - ] - return { - "location": self.world.describe_room(room_name, agent_infos, self.name), - "available_actions": self.world.get_available_actions(room_name), - "recent_messages": self.message_history[-5:], - } - - -def file_to_base64(path: str) -> str: - """Convert file to base64 string.""" - with open(path, 'rb') as f: - return base64.b64encode(f.read()).decode('utf-8') - - -def llm_query(agent, screenshot_path: str, context: dict) -> str: - """ - Query VLLM for agent action with enhanced context. - - Includes points of interest, action economy hints, error feedback, - and conversation history. - """ - system_prompt = f"""You are {agent.display_name} exploring a dungeon. -You receive visual and text information about your surroundings. - -ACTION ECONOMY: -- LOOK : Free action. Examine something, then choose another action. -- SPEAK "" or ANNOUNCE "": Free action (once per turn). Then choose another action. -- GO : Ends your turn. Move one tile in that direction (NORTH/SOUTH/EAST/WEST). -- TAKE : Ends your turn. Pick up an item you are standing next to. -- WAIT: Ends your turn without moving. - -IMPORTANT: You can only TAKE items that are adjacent to you (1 tile away). If something is far away, GO towards it first. - -You can LOOK or SPEAK, then still MOVE in the same turn. -Always end your final response with: Action: """ - - # Build enhanced prompt - parts = [context["location"]] - - # Add received messages - if context.get("messages"): - parts.append("\nMessages received this turn:") - for msg in context["messages"]: - sender = msg.get("sender", "someone") - content = msg.get("content", "") - parts.append(f' {sender} says: "{content}"') - - # Add points of interest - if context.get("poi_prompt"): - parts.append(f"\n{context['poi_prompt']}") - - # Add available actions - actions_str = ", ".join(context.get("available_actions", [])) - parts.append(f"\nAvailable actions: {actions_str}") - - # Add action economy hint - if context.get("has_spoken"): - parts.append("\n[You have already spoken this turn - you can still MOVE or WAIT]") - - # Add error feedback from last failed action - if context.get("last_error"): - parts.append(f"\n[ERROR: {context['last_error']}]") - parts.append("[Your last action failed. Please try a different action.]") - - # Add conversation history from this turn - if context.get("conversation_history"): - parts.append("\n[Previous attempts this turn:") - for exch in context["conversation_history"]: - action_str = f"{exch.get('action_type', '?')} {exch.get('action_args', '')}" - if exch.get("error"): - parts.append(f" - You tried: {action_str} -> FAILED: {exch['error']}") - else: - parts.append(f" - You did: {action_str}") - parts.append("]") - - parts.append("\n[Screenshot attached showing your current view]") - parts.append("\nWhat do you do? Brief reasoning (1-2 sentences), then Action: ") - - user_prompt = "\n".join(parts) - - messages = [ - {"role": "system", "content": system_prompt}, - { - "role": "user", - "content": [ - {"type": "text", "text": user_prompt}, - {"type": "image_url", "image_url": { - "url": "data:image/png;base64," + file_to_base64(screenshot_path) - }} - ] - } - ] - - try: - resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) - data = resp.json() - if "error" in data: - return f"[VLLM Error: {data['error']}]" - return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response') - except Exception as e: - return f"[Connection Error: {e}]" - - -def setup_scene(world: WorldGraph): - """Create McRogueFace scene from WorldGraph.""" - mcrfpy.createScene("enhanced_demo") - mcrfpy.setScene("enhanced_demo") - ui = mcrfpy.sceneUI("enhanced_demo") - - texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) - - grid = mcrfpy.Grid( - grid_size=(25, 15), - texture=texture, - pos=(5, 5), - size=(1014, 700) - ) - grid.fill_color = mcrfpy.Color(20, 20, 30) - grid.zoom = 2.0 - ui.append(grid) - - # Initialize all as walls - for x in range(25): - for y in range(15): - p = grid.at(x, y) - p.tilesprite = WALL_TILE - p.walkable = False - p.transparent = False - - # Carve rooms from WorldGraph - for room in world.rooms.values(): - for rx in range(room.x, room.x + room.width): - for ry in range(room.y, room.y + room.height): - if 0 <= rx < 25 and 0 <= ry < 15: - p = grid.at(rx, ry) - p.tilesprite = FLOOR_TILE - p.walkable = True - p.transparent = True - - # Place doors - for door in world.doors: - dx, dy = door.position - if 0 <= dx < 25 and 0 <= dy < 15: - p = grid.at(dx, dy) - p.tilesprite = FLOOR_TILE - p.walkable = not door.locked - p.transparent = True - - # FOV layer - fov_layer = grid.add_layer('color', z_index=10) - fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - - return grid, fov_layer, texture - - -def create_agents(grid, world: WorldGraph, texture) -> list: - """Create agents in their starting rooms.""" - agents = [] - - # Wizard in guard_room (left) - room_a = world.rooms["guard_room"] - wizard = mcrfpy.Entity( - grid_pos=room_a.center, - texture=texture, - sprite_index=WIZARD_SPRITE - ) - wizard.name = "wizard" - grid.entities.append(wizard) - agents.append(Agent("Wizard", "a wizard", wizard, world)) - - # Knight in armory (right) - room_b = world.rooms["armory"] - knight = mcrfpy.Entity( - grid_pos=room_b.center, - texture=texture, - sprite_index=KNIGHT_SPRITE - ) - knight.name = "knight" - grid.entities.append(knight) - agents.append(Agent("Knight", "a knight", knight, world)) - - return agents - - -def add_rat(grid, world: WorldGraph, texture, position: tuple): - """Add a rat entity at the specified position.""" - rat = mcrfpy.Entity( - grid_pos=position, - texture=texture, - sprite_index=RAT_SPRITE - ) - rat.name = "rat" - grid.entities.append(rat) - return rat - - -def run_demo(): - """Run enhanced action demo.""" - print("=" * 70) - print("Enhanced Action Demo") - print("=" * 70) - print(""" -Features demonstrated: -- LOOK as free action (doesn't end turn) -- SPEAK/ANNOUNCE as free action (once per turn) -- Points of interest targeting -- Enhanced logging for offline viewer -""") - - os.makedirs(SCREENSHOT_DIR, exist_ok=True) - - # Create world - print("Creating world...") - world = create_two_room_scenario() - print(f" Rooms: {list(world.rooms.keys())}") - print(f" Objects: {list(world.objects.keys())}") - - # Setup scene - print("\nSetting up scene...") - grid, fov_layer, texture = setup_scene(world) - - # Create agents - print("\nCreating agents...") - agents = create_agents(grid, world, texture) - - # Add a rat near the door for interest - rat = add_rat(grid, world, texture, (9, 4)) - print(f" Added rat at (9, 4)") - - for agent in agents: - print(f" {agent.name} at {agent.pos} in {agent.current_room}") - - # Create enhanced orchestrator - print("\nInitializing enhanced orchestrator...") - orchestrator = EnhancedOrchestrator( - grid=grid, - fov_layer=fov_layer, - world=world, - agents=agents, - screenshot_dir=SCREENSHOT_DIR, - llm_query_fn=llm_query - ) - - # Run simulation - print(f"\nRunning simulation ({MAX_TURNS} turns)...") - log = orchestrator.run_simulation(max_turns=MAX_TURNS) - - # Save enhanced log - log.save(LOG_PATH) - - # Print summary - print("\n" + "=" * 70) - print("SIMULATION SUMMARY") - print("=" * 70) - - for turn in range(1, orchestrator.turn_number + 1): - print(log.get_turn_summary(turn)) - - # Print speech log - if log.speech_log: - print("\n" + "-" * 40) - print("SPEECH LOG") - print("-" * 40) - for entry in log.speech_log: - print(f" Turn {entry['turn']}: {entry['speaker']} {entry['type']}s: \"{entry['content'][:50]}...\"") - if entry['recipients']: - print(f" -> Heard by: {', '.join(entry['recipients'])}") - - print("\n" + "=" * 70) - print("Demo Complete") - print("=" * 70) - print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/") - print(f"Simulation log saved to: {LOG_PATH}") - print("\nLog structure (for offline viewer):") - print(" - metadata: simulation info") - print(" - steps[]: per-agent-turn records with:") - print(" - screenshot_path, position, room") - print(" - llm_prompt_user, llm_response") - print(" - free_actions[] (LOOK, SPEAK)") - print(" - final_action (MOVE, WAIT)") - print(" - speech_log[]: all speech events") - - return True - - -def replay_log(log_path: str): - """ - Replay a simulation from log file. - - This is a text-based preview of what the offline viewer would show. - """ - print(f"Loading simulation from: {log_path}") - - try: - log = EnhancedSimulationLog.load(log_path) - except FileNotFoundError: - print(f"Log file not found: {log_path}") - return - - print("\n" + "=" * 70) - print("SIMULATION REPLAY") - print("=" * 70) - print(f"Turns: {log.metadata.get('total_turns', '?')}") - print(f"Agents: {', '.join(log.metadata.get('agent_names', []))}") - print(f"Rooms: {', '.join(log.metadata.get('world_rooms', []))}") - - for step in log.steps: - print(f"\n{'='*40}") - print(f"Turn {step.turn}: {step.agent_id}") - print(f"{'='*40}") - print(f"Position: {step.position_start} -> {step.position_end}") - print(f"Room: {step.room}") - - if step.pending_messages: - print(f"\nMessages received:") - for msg in step.pending_messages: - print(f" {msg.get('sender')}: \"{msg.get('content', '')[:40]}...\"") - - if step.llm_was_queried: - print(f"\nLLM Response (truncated):") - print(f" {step.llm_response[:200]}...") - else: - print(f"\n[Path continuation - no LLM query]") - - if step.free_actions: - print(f"\nFree actions:") - for fa in step.free_actions: - print(f" - {fa['action_type']}: {fa.get('args', ())}") - - status = "OK" if step.final_action_success else "FAIL" - print(f"\nFinal: {step.final_action_type} {step.final_action_args} [{status}]") - print(f" {step.final_action_message}") - - # Speech summary - if log.speech_log: - print("\n" + "=" * 40) - print("ALL SPEECH") - print("=" * 40) - for entry in log.speech_log: - print(f"Turn {entry['turn']}: {entry['speaker']} -> {entry['recipients']}") - print(f" \"{entry['content']}\"") - - -if __name__ == "__main__": - # Check for replay mode - if len(sys.argv) > 1 and sys.argv[1] == "--replay": - log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH - replay_log(log_file) - sys.exit(0) - - # Normal execution - try: - success = run_demo() - print("\nPASS" if success else "\nFAIL") - sys.exit(0 if success else 1) - except Exception as e: - import traceback - traceback.print_exc() - sys.exit(1) diff --git a/tests/vllm_demo/OFFLINE_VIEWER_SPEC.md b/tests/vllm_demo/OFFLINE_VIEWER_SPEC.md deleted file mode 100644 index c4f762b..0000000 --- a/tests/vllm_demo/OFFLINE_VIEWER_SPEC.md +++ /dev/null @@ -1,152 +0,0 @@ -# Offline Viewer Specification - -**Status**: Planned (issue #154) -**Priority**: After core simulation features are stable - -## Overview - -The Offline Viewer allows users to replay stored simulation logs in McRogueFace, stepping through turn-by-turn to review: -- Each agent's perspective (FOV, camera position) -- LLM chain-of-thought reasoning -- Actions taken and their results -- Speech between agents - -## Log Format - -Simulation logs are stored as JSON with this structure: - -```json -{ - "metadata": { - "total_turns": 5, - "num_agents": 2, - "agent_names": ["Wizard", "Knight"], - "timestamp_start": "2025-01-15T10:30:00", - "timestamp_end": "2025-01-15T10:32:45", - "world_rooms": ["guard_room", "armory"], - "screenshot_dir": "/tmp/vllm_enhanced_demo" - }, - "steps": [ - { - "turn": 1, - "agent_id": "Wizard", - "timestamp": "2025-01-15T10:30:15", - - "position_start": [5, 4], - "position_end": [6, 4], - "room": "guard_room", - - "visible_entities": ["rat_123", "knight_456"], - "visible_tiles": 42, - "points_of_interest": [ - {"name": "door", "direction": "east", "distance": 4} - ], - - "location_description": "You are in the guard room...", - "available_actions": ["GO EAST", "LOOK", "WAIT"], - "pending_messages": [], - "poi_prompt": "Points of interest:\n - a door to the armory (east)", - - "screenshot_path": "/tmp/.../turn1_wizard.png", - - "llm_prompt_system": "You are a wizard...", - "llm_prompt_user": "You are in the guard room...", - "llm_response": "I see a door to the east. I should explore. Action: GO EAST", - "llm_was_queried": true, - - "free_actions": [ - {"action_type": "LOOK", "args": ["DOOR"], "result": {"description": "A wooden door..."}} - ], - - "final_action_type": "GO", - "final_action_args": ["EAST"], - "final_action_success": true, - "final_action_message": "Moved east to (6, 4)", - - "path_taken": [[5, 4], [6, 4]], - "path_remaining": 0 - } - ], - "speech_log": [ - { - "turn": 2, - "speaker": "Wizard", - "type": "announce", - "content": "Hello, is anyone there?", - "recipients": ["Knight"] - } - ] -} -``` - -## Viewer Features (Planned) - -### Core Features - -1. **Turn Navigation** - - Step forward/backward through turns - - Jump to specific turn number - - Auto-play at configurable speed - -2. **Agent Perspective** - - Reconstruct agent's FOV from stored data - - Center camera on current agent - - Show visible entities and tiles - -3. **LLM Review Panel** - - Display system prompt - - Display user prompt (context) - - Display LLM response - - Highlight parsed action - -4. **Action Log** - - Show free actions (LOOK, SPEAK) - - Show final action and result - - Color-code success/failure - -5. **Speech History** - - Timeline of all speech events - - Filter by agent - - Show recipients - -### Implementation Notes - -The viewer should: -- Load screenshots from `screenshot_path` (if available) -- OR reconstruct scene from WorldGraph + step data -- Support keyboard navigation (arrow keys) -- Display agent state in sidebar - -### UI Layout (Suggested) - -``` -+----------------------------------+------------------+ -| | Turn: 3/10 | -| Main Viewport | Agent: Wizard | -| (Agent's Perspective) | Room: armory | -| +------------------+ -| | LLM Response: | -| | "I see a rat | -| | to the east. | -| | Action: LOOK | -| | AT RAT" | -+----------------------------------+------------------+ -| < Prev | Turn 3 | Next > | Actions: | -| [Agent: Wizard v] | - LOOK AT RAT | -| | - GO EAST [OK] | -+----------------------------------+------------------+ -``` - -## Files - -- `enhanced_orchestrator.py` - Generates `EnhancedSimulationLog` -- `4_enhanced_action_demo.py` - Demo with `--replay` mode for text preview -- Logs stored in `/tmp/vllm_enhanced_demo/simulation_log.json` - -## Future Enhancements - -- Animated path replay (smooth entity movement) -- Side-by-side multi-agent view -- Diff view comparing agent perceptions -- Export to video/GIF -- Integration with annotation tools for research diff --git a/tests/vllm_demo/action_economy.py b/tests/vllm_demo/action_economy.py deleted file mode 100644 index 0449cfb..0000000 --- a/tests/vllm_demo/action_economy.py +++ /dev/null @@ -1,302 +0,0 @@ -""" -Action Economy System -===================== - -Defines which actions consume turns and which are free. -Manages multi-tile pathing with FOV interruption. - -Action Categories: -- FREE: LOOK, SPEAK, ANNOUNCE (don't end turn) -- FULL: MOVE, WAIT (end turn) - -Constraints: -- Only ONE speech action per turn -- LOOK provides description and prompts for another action -- Multi-tile paths continue without LLM until FOV changes -""" - -from dataclasses import dataclass, field -from typing import List, Tuple, Optional, Set, Dict, Any -from enum import Enum - -from action_parser import Action, ActionType - - -class TurnCost(Enum): - """How much of a turn an action consumes.""" - FREE = "free" # Doesn't end turn - FULL = "full" # Ends turn - - -# Action cost mapping -ACTION_COSTS = { - ActionType.LOOK: TurnCost.FREE, - ActionType.SPEAK: TurnCost.FREE, - ActionType.ANNOUNCE: TurnCost.FREE, - ActionType.GO: TurnCost.FULL, - ActionType.WAIT: TurnCost.FULL, - ActionType.TAKE: TurnCost.FULL, - ActionType.DROP: TurnCost.FULL, - ActionType.PUSH: TurnCost.FULL, - ActionType.USE: TurnCost.FULL, - ActionType.OPEN: TurnCost.FULL, - ActionType.CLOSE: TurnCost.FULL, - ActionType.INVALID: TurnCost.FULL, # Invalid action ends turn -} - - -@dataclass -class TurnState: - """ - Tracks state within a single turn. - - Used to enforce constraints like "only one speech per turn" - and track free actions taken before turn-ending action. - """ - has_spoken: bool = False - free_actions: List[Dict[str, Any]] = field(default_factory=list) - turn_ended: bool = False - - def can_speak(self) -> bool: - """Check if agent can still speak this turn.""" - return not self.has_spoken - - def record_speech(self): - """Record that agent has spoken this turn.""" - self.has_spoken = True - - def record_free_action(self, action_type: str, details: Dict[str, Any]): - """Record a free action for logging.""" - self.free_actions.append({ - "type": action_type, - **details - }) - - def end_turn(self): - """Mark turn as ended.""" - self.turn_ended = True - - -@dataclass -class PathState: - """ - Tracks multi-tile movement path for an agent. - - When an agent decides to move to a distant location, - we store the path and continue moving without LLM calls - until the path completes or FOV changes. - """ - path: List[Tuple[int, int]] = field(default_factory=list) - current_index: int = 0 - destination_description: str = "" # "the armory", "the door" - - # FOV state when path was planned - visible_entities_at_start: Set[str] = field(default_factory=set) - - @property - def has_path(self) -> bool: - """Check if there's an active path.""" - return len(self.path) > self.current_index - - @property - def next_tile(self) -> Optional[Tuple[int, int]]: - """Get next tile in path, or None if path complete.""" - if self.has_path: - return self.path[self.current_index] - return None - - @property - def remaining_tiles(self) -> int: - """Number of tiles left in path.""" - return max(0, len(self.path) - self.current_index) - - def advance(self): - """Move to next tile in path.""" - if self.has_path: - self.current_index += 1 - - def clear(self): - """Clear the current path.""" - self.path = [] - self.current_index = 0 - self.destination_description = "" - self.visible_entities_at_start = set() - - def should_interrupt(self, current_visible_entities: Set[str]) -> bool: - """ - Check if path should be interrupted due to FOV change. - - Returns True if a NEW entity has entered the agent's FOV - since the path was planned. - """ - new_entities = current_visible_entities - self.visible_entities_at_start - return len(new_entities) > 0 - - -@dataclass -class PointOfInterest: - """ - A targetable object/location for LOOK/MOVE actions. - - Listed in LLM prompts to guide valid targeting. - """ - name: str # Short name: "door", "rat", "button" - display_name: str # Full description: "a wooden door to the east" - position: Tuple[int, int] # Tile coordinates - direction: str # Cardinal direction from agent: "north", "east" - distance: int # Manhattan distance from agent - can_look: bool = True # Can be examined with LOOK - can_move_to: bool = False # Can be targeted with GO TO - entity_id: Optional[str] = None # Entity ID if this is an entity - - -def get_action_cost(action: Action) -> TurnCost: - """Get the turn cost for an action.""" - return ACTION_COSTS.get(action.type, TurnCost.FULL) - - -def get_direction_name(from_pos: Tuple[int, int], to_pos: Tuple[int, int]) -> str: - """Get cardinal direction name from one position to another.""" - dx = to_pos[0] - from_pos[0] - dy = to_pos[1] - from_pos[1] - - if abs(dx) > abs(dy): - return "east" if dx > 0 else "west" - elif abs(dy) > abs(dx): - return "south" if dy > 0 else "north" - else: - # Diagonal - ns = "south" if dy > 0 else "north" - ew = "east" if dx > 0 else "west" - return f"{ns}-{ew}" - - -def manhattan_distance(a: Tuple[int, int], b: Tuple[int, int]) -> int: - """Calculate Manhattan distance between two points.""" - return abs(a[0] - b[0]) + abs(a[1] - b[1]) - - -class PointOfInterestCollector: - """ - Collects points of interest visible to an agent. - - Used to populate LLM prompts with valid LOOK/MOVE targets. - """ - - def __init__(self, grid, agent_pos: Tuple[int, int]): - self.grid = grid - self.agent_pos = agent_pos - self.points: List[PointOfInterest] = [] - - def collect_from_fov(self, world_graph=None) -> List[PointOfInterest]: - """ - Collect all points of interest visible in current FOV. - - Examines: - - Entities (other agents, NPCs, items) - - Doors/exits - - Interactive objects (buttons, chests) - - Notable tiles (walls with features) - """ - self.points = [] - - # Collect entities - for entity in self.grid.entities: - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if (ex, ey) == self.agent_pos: - continue # Skip self - - if self.grid.is_in_fov(ex, ey): - direction = get_direction_name(self.agent_pos, (ex, ey)) - distance = manhattan_distance(self.agent_pos, (ex, ey)) - - # Try to get entity name/description - entity_name = getattr(entity, 'name', None) or f"creature" - entity_id = getattr(entity, 'id', None) or str(id(entity)) - - self.points.append(PointOfInterest( - name=entity_name, - display_name=f"a {entity_name} to the {direction}", - position=(ex, ey), - direction=direction, - distance=distance, - can_look=True, - can_move_to=False, # Can't move onto entities - entity_id=entity_id - )) - - # Collect from WorldGraph if provided - if world_graph: - self._collect_from_world_graph(world_graph) - - # Sort by distance - self.points.sort(key=lambda p: p.distance) - - return self.points - - def _collect_from_world_graph(self, world): - """Collect doors and objects from WorldGraph.""" - agent_room = world.room_at(*self.agent_pos) - if not agent_room: - return - - # Doors - for door in world.get_exits(agent_room.name): - dx, dy = door.position - if self.grid.is_in_fov(dx, dy): - direction = get_direction_name(self.agent_pos, (dx, dy)) - distance = manhattan_distance(self.agent_pos, (dx, dy)) - - # Get destination room name - if door.room_a == agent_room.name: - dest = world.rooms.get(door.room_b) - else: - dest = world.rooms.get(door.room_a) - dest_name = dest.display_name if dest else "unknown" - - lock_str = " (locked)" if door.locked else "" - - self.points.append(PointOfInterest( - name="door", - display_name=f"a door to {dest_name}{lock_str} ({direction})", - position=(dx, dy), - direction=direction, - distance=distance, - can_look=True, - can_move_to=not door.locked - )) - - # Objects in room - for obj in world.get_objects_in_room(agent_room.name): - ox, oy = obj.position - if self.grid.is_in_fov(ox, oy): - direction = get_direction_name(self.agent_pos, (ox, oy)) - distance = manhattan_distance(self.agent_pos, (ox, oy)) - - self.points.append(PointOfInterest( - name=obj.name, - display_name=f"{obj.display_name} ({direction})", - position=(ox, oy), - direction=direction, - distance=distance, - can_look=True, - can_move_to="pressable" not in obj.affordances # Can walk to items - )) - - def format_for_prompt(self) -> str: - """Format points of interest for inclusion in LLM prompt.""" - if not self.points: - return "No notable objects in view." - - lines = ["Points of interest:"] - for poi in self.points: - actions = [] - if poi.can_look: - actions.append(f"LOOK AT {poi.name.upper()}") - if poi.can_move_to: - actions.append(f"GO TO {poi.name.upper()}") - - action_str = ", ".join(actions) if actions else "observe only" - lines.append(f" - {poi.display_name}: {action_str}") - - return "\n".join(lines) diff --git a/tests/vllm_demo/enhanced_executor.py b/tests/vllm_demo/enhanced_executor.py deleted file mode 100644 index 56b3075..0000000 --- a/tests/vllm_demo/enhanced_executor.py +++ /dev/null @@ -1,731 +0,0 @@ -""" -Enhanced Action Executor -======================== - -Extends ActionExecutor with: -- LOOK action with detailed descriptions -- SPEAK/ANNOUNCE execution with range checking -- Multi-tile path planning -- Free action vs turn-ending action handling -""" - -from dataclasses import dataclass -from typing import Optional, List, Tuple, Dict, Any, Set -from action_parser import Action, ActionType -from action_executor import ActionResult -from action_economy import ( - TurnState, PathState, TurnCost, get_action_cost, - manhattan_distance, get_direction_name -) - - -@dataclass -class TakeResult: - """Result of a TAKE action.""" - success: bool - message: str - item_name: str - item_position: Optional[Tuple[int, int]] = None - - -@dataclass -class LookResult: - """Result of a LOOK action.""" - success: bool - description: str - target_name: str - target_position: Optional[Tuple[int, int]] = None - - -@dataclass -class SpeechResult: - """Result of a SPEAK/ANNOUNCE action.""" - success: bool - message: str - recipients: List[str] # Names of agents who received the message - speech_type: str # "announce" or "speak" - content: str # What was said - - -@dataclass -class Message: - """A message received by an agent.""" - sender: str - content: str - speech_type: str # "announce" or "speak" - turn: int - distance: Optional[int] = None # For SPEAK, how far away sender was - - -class EnhancedExecutor: - """ - Enhanced action executor with LOOK, SPEAK, and multi-tile support. - """ - - # Direction vectors for movement - DIRECTION_VECTORS = { - 'NORTH': (0, -1), - 'SOUTH': (0, 1), - 'EAST': (1, 0), - 'WEST': (-1, 0), - } - - # SPEAK range (Manhattan distance) - SPEAK_RANGE = 4 - - def __init__(self, grid, world_graph=None): - """ - Initialize executor. - - Args: - grid: mcrfpy.Grid instance - world_graph: Optional WorldGraph for detailed descriptions - """ - self.grid = grid - self.world = world_graph - - # Agent path states (agent_name -> PathState) - self.path_states: Dict[str, PathState] = {} - - # Speech channel for message delivery - self.pending_messages: Dict[str, List[Message]] = {} # agent_name -> messages - - def get_path_state(self, agent_name: str) -> PathState: - """Get or create path state for an agent.""" - if agent_name not in self.path_states: - self.path_states[agent_name] = PathState() - return self.path_states[agent_name] - - def get_pending_messages(self, agent_name: str) -> List[Message]: - """Get and clear pending messages for an agent.""" - messages = self.pending_messages.get(agent_name, []) - self.pending_messages[agent_name] = [] - return messages - - # ========================================================================= - # LOOK Action - # ========================================================================= - - def execute_look(self, agent, action: Action) -> LookResult: - """ - Execute LOOK action - examine a tile or entity. - - Args: - agent: Agent performing the look - action: Parsed LOOK action with optional target - - Returns: - LookResult with detailed description - """ - target = action.args[0] if action.args and action.args[0] else None - - if target is None: - # General look around - return self._look_around(agent) - else: - # Look at specific target - return self._look_at_target(agent, target.upper()) - - def _look_around(self, agent) -> LookResult: - """Describe the general surroundings.""" - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - - descriptions = [] - - # Describe current room - if self.world: - room = self.world.room_at(ax, ay) - if room: - descriptions.append(f"You are in {room.display_name}.") - if room.description_template and room.properties: - try: - desc = room.description_template.format(**room.properties) - descriptions.append(desc) - except KeyError: - pass - - # Count visible entities - visible_count = 0 - for entity in self.grid.entities: - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if (ex, ey) != (ax, ay) and self.grid.is_in_fov(ex, ey): - visible_count += 1 - - if visible_count > 0: - descriptions.append(f"You can see {visible_count} other creature(s) nearby.") - - # Describe nearby walls/openings - wall_dirs = [] - open_dirs = [] - for direction, (dx, dy) in self.DIRECTION_VECTORS.items(): - nx, ny = ax + dx, ay + dy - if 0 <= nx < self.grid.grid_size[0] and 0 <= ny < self.grid.grid_size[1]: - cell = self.grid.at(nx, ny) - if cell.walkable: - open_dirs.append(direction.lower()) - else: - wall_dirs.append(direction.lower()) - - if open_dirs: - descriptions.append(f"Open passages: {', '.join(open_dirs)}.") - if wall_dirs: - descriptions.append(f"Walls to the: {', '.join(wall_dirs)}.") - - return LookResult( - success=True, - description=" ".join(descriptions), - target_name="surroundings" - ) - - def _look_at_target(self, agent, target: str) -> LookResult: - """Look at a specific target (direction, entity, or object name).""" - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - - # Check if target is a direction - if target in self.DIRECTION_VECTORS: - return self._look_in_direction(agent, target) - - # Check if target matches an entity - for entity in self.grid.entities: - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if (ex, ey) == (ax, ay): - continue - - entity_name = getattr(entity, 'name', '').upper() - if target in entity_name or entity_name in target: - if self.grid.is_in_fov(ex, ey): - return self._describe_entity(agent, entity) - else: - return LookResult( - success=False, - description=f"You cannot see {target.lower()} from here.", - target_name=target.lower() - ) - - # Check WorldGraph objects - if self.world: - room = self.world.room_at(ax, ay) - if room: - for obj in self.world.get_objects_in_room(room.name): - if target in obj.name.upper() or obj.name.upper() in target: - ox, oy = obj.position - if self.grid.is_in_fov(ox, oy): - return self._describe_object(agent, obj) - - # Check doors - for door in self.world.get_exits(room.name): - if "DOOR" in target: - dx, dy = door.position - if self.grid.is_in_fov(dx, dy): - return self._describe_door(agent, door) - - return LookResult( - success=False, - description=f"You don't see anything called '{target.lower()}' nearby.", - target_name=target.lower() - ) - - def _look_in_direction(self, agent, direction: str) -> LookResult: - """Look in a cardinal direction.""" - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - dx, dy = self.DIRECTION_VECTORS[direction] - - descriptions = [] - - # Scan tiles in that direction - for distance in range(1, 10): - tx, ty = ax + dx * distance, ay + dy * distance - - if not (0 <= tx < self.grid.grid_size[0] and 0 <= ty < self.grid.grid_size[1]): - descriptions.append(f"The edge of the known world lies {direction.lower()}.") - break - - if not self.grid.is_in_fov(tx, ty): - descriptions.append(f"Darkness obscures your vision beyond {distance} tiles.") - break - - cell = self.grid.at(tx, ty) - - # Check for entity at this tile - for entity in self.grid.entities: - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if (ex, ey) == (tx, ty): - entity_name = getattr(entity, 'name', 'creature') - descriptions.append(f"A {entity_name} stands {distance} tile(s) to the {direction.lower()}.") - - # Check for wall - if not cell.walkable: - # Check if it's a door - if self.world: - room = self.world.room_at(ax, ay) - if room: - for door in self.world.get_exits(room.name): - if door.position == (tx, ty): - dest = self.world.rooms.get( - door.room_b if door.room_a == room.name else door.room_a - ) - dest_name = dest.display_name if dest else "another area" - lock_str = " It is locked." if door.locked else "" - descriptions.append( - f"A door to {dest_name} lies {distance} tile(s) {direction.lower()}.{lock_str}" - ) - break - else: - descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.") - else: - descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.") - else: - descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.") - break - - if not descriptions: - descriptions.append(f"Open floor extends to the {direction.lower()}.") - - return LookResult( - success=True, - description=" ".join(descriptions), - target_name=direction.lower(), - target_position=None - ) - - def _describe_entity(self, agent, entity) -> LookResult: - """Generate detailed description of an entity.""" - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - - entity_name = getattr(entity, 'name', 'creature') - direction = get_direction_name((ax, ay), (ex, ey)) - distance = manhattan_distance((ax, ay), (ex, ey)) - - descriptions = [ - f"You examine the {entity_name} carefully.", - f"It stands {distance} tile(s) to the {direction}." - ] - - # Add any entity-specific description - if hasattr(entity, 'description'): - descriptions.append(entity.description) - - # Add behavior hints if available - if hasattr(entity, 'behavior'): - descriptions.append(f"It appears to be {entity.behavior}.") - - return LookResult( - success=True, - description=" ".join(descriptions), - target_name=entity_name, - target_position=(ex, ey) - ) - - def _describe_object(self, agent, obj) -> LookResult: - """Generate detailed description of a WorldGraph object.""" - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - ox, oy = obj.position - - direction = get_direction_name((ax, ay), (ox, oy)) - distance = manhattan_distance((ax, ay), (ox, oy)) - - descriptions = [ - f"You examine {obj.display_name}.", - f"It is {distance} tile(s) to the {direction}." - ] - - if obj.description: - descriptions.append(obj.description) - - # Describe affordances - if "takeable" in obj.affordances: - descriptions.append("It looks small enough to pick up.") - if "pressable" in obj.affordances: - descriptions.append("It appears to be some kind of mechanism.") - if "openable" in obj.affordances: - descriptions.append("It can be opened.") - if "readable" in obj.affordances: - descriptions.append("There is writing on it.") - - return LookResult( - success=True, - description=" ".join(descriptions), - target_name=obj.name, - target_position=(ox, oy) - ) - - def _describe_door(self, agent, door) -> LookResult: - """Generate detailed description of a door.""" - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - dx, dy = door.position - - direction = get_direction_name((ax, ay), (dx, dy)) - distance = manhattan_distance((ax, ay), (dx, dy)) - - # Get destination - if self.world: - current_room = self.world.room_at(ax, ay) - if current_room: - if door.room_a == current_room.name: - dest = self.world.rooms.get(door.room_b) - else: - dest = self.world.rooms.get(door.room_a) - dest_name = dest.display_name if dest else "another area" - else: - dest_name = "another area" - else: - dest_name = "another area" - - descriptions = [ - f"You examine the doorway to the {direction}.", - f"It leads to {dest_name}, {distance} tile(s) away." - ] - - if door.locked: - descriptions.append("The door is locked. You'll need a key or mechanism to open it.") - else: - descriptions.append("The passage is open.") - - return LookResult( - success=True, - description=" ".join(descriptions), - target_name="door", - target_position=(dx, dy) - ) - - # ========================================================================= - # SPEAK/ANNOUNCE Actions - # ========================================================================= - - def execute_speech(self, agent, action: Action, all_agents: list, - turn_number: int) -> SpeechResult: - """ - Execute SPEAK or ANNOUNCE action. - - ANNOUNCE: All agents in the same room hear the message - SPEAK: Only agents within SPEAK_RANGE tiles hear the message - """ - message_content = action.args[0] if action.args else "" - - if not message_content: - return SpeechResult( - success=False, - message="Nothing to say.", - recipients=[], - speech_type=action.type.value.lower(), - content="" - ) - - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - recipients = [] - - if action.type == ActionType.ANNOUNCE: - # Room-wide broadcast - recipients = self._get_agents_in_room(agent, all_agents) - speech_type = "announce" - else: - # Proximity-based speech - recipients = self._get_agents_in_range(agent, all_agents, self.SPEAK_RANGE) - speech_type = "speak" - - # Deliver messages - for recipient in recipients: - if recipient.name not in self.pending_messages: - self.pending_messages[recipient.name] = [] - - distance = manhattan_distance( - (ax, ay), - (int(recipient.entity.pos[0]), int(recipient.entity.pos[1])) - ) if speech_type == "speak" else None - - self.pending_messages[recipient.name].append(Message( - sender=agent.name, - content=message_content, - speech_type=speech_type, - turn=turn_number, - distance=distance - )) - - recipient_names = [r.name for r in recipients] - - if recipients: - return SpeechResult( - success=True, - message=f"You {speech_type}: \"{message_content}\"", - recipients=recipient_names, - speech_type=speech_type, - content=message_content - ) - else: - return SpeechResult( - success=True, # Still succeeds, just nobody heard - message=f"You {speech_type} into the emptiness: \"{message_content}\"", - recipients=[], - speech_type=speech_type, - content=message_content - ) - - def _get_agents_in_room(self, speaker, all_agents: list) -> list: - """Get all agents in the same room as speaker (excluding speaker).""" - if not self.world: - # Fallback: use proximity - return self._get_agents_in_range(speaker, all_agents, 20) - - ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1]) - speaker_room = self.world.room_at(ax, ay) - - if not speaker_room: - return [] - - recipients = [] - for agent in all_agents: - if agent.name == speaker.name: - continue - rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - agent_room = self.world.room_at(rx, ry) - if agent_room and agent_room.name == speaker_room.name: - recipients.append(agent) - - return recipients - - def _get_agents_in_range(self, speaker, all_agents: list, range_tiles: int) -> list: - """Get all agents within Manhattan distance of speaker.""" - ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1]) - - recipients = [] - for agent in all_agents: - if agent.name == speaker.name: - continue - rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - if manhattan_distance((ax, ay), (rx, ry)) <= range_tiles: - recipients.append(agent) - - return recipients - - # ========================================================================= - # TAKE Action - # ========================================================================= - - def execute_take(self, agent, action: Action) -> TakeResult: - """ - Execute TAKE action - pick up an item. - - Items must be: - 1. In the WorldGraph as a takeable object - 2. Within reach (adjacent tile or same tile, distance <= 1) - 3. Visible in FOV - """ - item_name = action.args[0].lower() if action.args and action.args[0] else None - - if not item_name: - return TakeResult( - success=False, - message="Take what? Specify an item name.", - item_name="" - ) - - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - - # Search for the item in WorldGraph - if not self.world: - return TakeResult( - success=False, - message="No items exist in this world.", - item_name=item_name - ) - - # Find matching object - matching_obj = None - for obj_name, obj in self.world.objects.items(): - if item_name in obj_name.lower() or obj_name.lower() in item_name: - matching_obj = obj - break - - if not matching_obj: - return TakeResult( - success=False, - message=f"You don't see any '{item_name}' here.", - item_name=item_name - ) - - # Check if takeable - if "takeable" not in matching_obj.affordances: - return TakeResult( - success=False, - message=f"The {matching_obj.display_name} cannot be picked up.", - item_name=item_name, - item_position=matching_obj.position - ) - - ox, oy = matching_obj.position - - # Check if visible in FOV - if not self.grid.is_in_fov(ox, oy): - return TakeResult( - success=False, - message=f"You can't see the {matching_obj.display_name} from here.", - item_name=item_name, - item_position=(ox, oy) - ) - - # Check distance (must be adjacent or same tile) - distance = manhattan_distance((ax, ay), (ox, oy)) - if distance > 1: - direction = get_direction_name((ax, ay), (ox, oy)) - # Use name for cleaner message (display_name often has article already) - return TakeResult( - success=False, - message=f"The {matching_obj.name.replace('_', ' ')} is {distance} tiles away to the {direction}. Move closer to pick it up.", - item_name=item_name, - item_position=(ox, oy) - ) - - # Success! Remove from world (simplified - no inventory system yet) - del self.world.objects[matching_obj.name] - - return TakeResult( - success=True, - message=f"You pick up {matching_obj.display_name}.", - item_name=matching_obj.name, - item_position=(ox, oy) - ) - - # ========================================================================= - # Movement (single tile, delegates to original executor) - # ========================================================================= - - def execute_move(self, agent, action: Action) -> ActionResult: - """ - Execute single-tile movement. - - This is the per-turn movement. Multi-tile paths are handled - at the orchestrator level. - """ - if not action.args or not action.args[0]: - return ActionResult(False, "No direction specified") - - direction = action.args[0] - if direction not in self.DIRECTION_VECTORS: - return ActionResult(False, f"Invalid direction: {direction}") - - dx, dy = self.DIRECTION_VECTORS[direction] - current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - new_x, new_y = current_x + dx, current_y + dy - - # Bounds check - grid_w, grid_h = self.grid.grid_size - if not (0 <= new_x < grid_w and 0 <= new_y < grid_h): - return ActionResult(False, f"Cannot go {direction} - edge of map") - - # Walkability check - target_cell = self.grid.at(new_x, new_y) - if not target_cell.walkable: - return ActionResult(False, f"Cannot go {direction} - path blocked") - - # Entity collision check - for entity in self.grid.entities: - if entity is agent.entity: - continue - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if ex == new_x and ey == new_y: - return ActionResult(False, f"Cannot go {direction} - occupied") - - # Execute movement - agent.entity.pos = (new_x, new_y) - - return ActionResult( - success=True, - message=f"Moved {direction.lower()} to ({new_x}, {new_y})", - new_position=(new_x, new_y), - path=[(current_x, current_y), (new_x, new_y)] - ) - - def execute_wait(self, agent, action: Action) -> ActionResult: - """Execute WAIT action.""" - return ActionResult(True, "Waited and observed surroundings") - - # ========================================================================= - # Multi-tile Pathfinding - # ========================================================================= - - def plan_path_to(self, agent, target_pos: Tuple[int, int], - visible_entities: Set[str]) -> Optional[List[Tuple[int, int]]]: - """ - Plan a path to a target position. - - Uses A* via libtcod if available, otherwise simple pathfinding. - Returns list of tiles from current position to target (excluding current). - """ - try: - from mcrfpy import libtcod - ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - - path = libtcod.find_path(self.grid, ax, ay, target_pos[0], target_pos[1]) - - if path: - # Store path state - path_state = self.get_path_state(agent.name) - path_state.path = path - path_state.current_index = 0 - path_state.visible_entities_at_start = visible_entities.copy() - - return path - except ImportError: - pass - - return None - - def continue_path(self, agent, current_visible: Set[str]) -> Optional[ActionResult]: - """ - Continue an existing multi-tile path. - - Returns ActionResult if moved, None if path complete or interrupted. - """ - path_state = self.get_path_state(agent.name) - - if not path_state.has_path: - return None - - # Check for FOV interrupt - if path_state.should_interrupt(current_visible): - path_state.clear() - return None # Signal that LLM should be queried - - # Get next tile - next_tile = path_state.next_tile - if not next_tile: - path_state.clear() - return None - - # Move to next tile - current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1]) - new_x, new_y = next_tile - - # Verify still walkable - target_cell = self.grid.at(new_x, new_y) - if not target_cell.walkable: - path_state.clear() - return ActionResult(False, "Path blocked - recalculating") - - # Check for entity collision - for entity in self.grid.entities: - if entity is agent.entity: - continue - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if ex == new_x and ey == new_y: - path_state.clear() - return ActionResult(False, "Path blocked by creature") - - # Execute movement - agent.entity.pos = (new_x, new_y) - path_state.advance() - - remaining = path_state.remaining_tiles - if remaining > 0: - msg = f"Continuing path ({remaining} tiles remaining)" - else: - msg = "Arrived at destination" - path_state.clear() - - return ActionResult( - success=True, - message=msg, - new_position=(new_x, new_y), - path=[(current_x, current_y), (new_x, new_y)] - ) diff --git a/tests/vllm_demo/enhanced_orchestrator.py b/tests/vllm_demo/enhanced_orchestrator.py deleted file mode 100644 index 2febc18..0000000 --- a/tests/vllm_demo/enhanced_orchestrator.py +++ /dev/null @@ -1,606 +0,0 @@ -""" -Enhanced Turn Orchestrator -========================== - -Extends TurnOrchestrator with: -- Action economy (free actions vs turn-ending) -- Multi-tile path continuation -- FOV interrupt detection -- Enhanced logging for offline viewer replay -""" - -import json -import os -from dataclasses import dataclass, asdict, field -from typing import List, Dict, Any, Optional, Callable, Set -from datetime import datetime - -from world_graph import WorldGraph, AgentInfo -from action_parser import Action, ActionType, parse_action -from action_executor import ActionResult -from action_economy import ( - TurnState, PathState, TurnCost, get_action_cost, - PointOfInterestCollector, PointOfInterest -) -from enhanced_executor import EnhancedExecutor, LookResult, SpeechResult, Message, TakeResult - - -@dataclass -class FreeActionRecord: - """Record of a free action taken during a turn.""" - action_type: str - args: tuple - result: Dict[str, Any] - timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) - - -@dataclass -class EnhancedSimulationStep: - """ - Enhanced simulation step for offline viewer replay. - - Contains all data needed to reconstruct the agent's perspective - and decision-making for that turn. - """ - # Turn identification - turn: int - agent_id: str - timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) - - # Agent state at start of turn - position_start: tuple = (0, 0) - room: str = "" - path_in_progress: bool = False - - # FOV and perception - visible_entities: List[str] = field(default_factory=list) - visible_tiles: int = 0 # Count of visible tiles - points_of_interest: List[Dict] = field(default_factory=list) - - # Context provided to LLM - location_description: str = "" - available_actions: List[str] = field(default_factory=list) - pending_messages: List[Dict] = field(default_factory=list) - poi_prompt: str = "" - - # Screenshot path (for viewer to load) - screenshot_path: str = "" - - # LLM interaction - llm_prompt_system: str = "" - llm_prompt_user: str = "" - llm_response: str = "" - llm_was_queried: bool = True # False if path continuation - - # Conversation history (LLM queries within this turn) - llm_exchanges: List[Dict] = field(default_factory=list) # [{prompt, response, action, error}] - action_retries: int = 0 # How many times we re-prompted due to errors - - # Free actions taken (LOOK, SPEAK) - free_actions: List[Dict] = field(default_factory=list) - - # Turn-ending action - final_action_type: str = "" - final_action_args: tuple = () - final_action_success: bool = False - final_action_message: str = "" - - # Movement result - position_end: tuple = (0, 0) - path_taken: List[tuple] = field(default_factory=list) - path_remaining: int = 0 # Tiles left if multi-tile path - - -@dataclass -class EnhancedSimulationLog: - """ - Complete simulation log for offline viewer. - - Designed to support: - - Turn-by-turn replay - - Per-agent perspective reconstruction - - LLM chain-of-thought review - - Speech history tracking - """ - metadata: Dict[str, Any] = field(default_factory=dict) - steps: List[EnhancedSimulationStep] = field(default_factory=list) - speech_log: List[Dict] = field(default_factory=list) - - def save(self, path: str): - """Save log to JSON file.""" - data = { - "metadata": self.metadata, - "steps": [asdict(s) for s in self.steps], - "speech_log": self.speech_log - } - with open(path, 'w') as f: - json.dump(data, f, indent=2, default=str) - print(f"Enhanced simulation log saved to: {path}") - - @classmethod - def load(cls, path: str) -> 'EnhancedSimulationLog': - """Load log from JSON file.""" - with open(path) as f: - data = json.load(f) - - steps = [] - for s in data.get("steps", []): - # Convert lists back to tuples where needed - if isinstance(s.get("position_start"), list): - s["position_start"] = tuple(s["position_start"]) - if isinstance(s.get("position_end"), list): - s["position_end"] = tuple(s["position_end"]) - if isinstance(s.get("final_action_args"), list): - s["final_action_args"] = tuple(s["final_action_args"]) - if s.get("path_taken"): - s["path_taken"] = [tuple(p) for p in s["path_taken"]] - steps.append(EnhancedSimulationStep(**s)) - - return cls( - metadata=data.get("metadata", {}), - steps=steps, - speech_log=data.get("speech_log", []) - ) - - def get_turn_summary(self, turn: int) -> str: - """Get summary of a specific turn for display.""" - turn_steps = [s for s in self.steps if s.turn == turn] - lines = [f"=== Turn {turn} ==="] - - for step in turn_steps: - lines.append(f"\n{step.agent_id}:") - lines.append(f" Position: {step.position_start} -> {step.position_end}") - - if step.free_actions: - lines.append(f" Free actions: {len(step.free_actions)}") - for fa in step.free_actions: - lines.append(f" - {fa['action_type']}: {fa.get('result', {}).get('message', '')[:50]}") - - status = "OK" if step.final_action_success else "FAIL" - lines.append(f" Action: {step.final_action_type} {step.final_action_args} [{status}]") - - if not step.llm_was_queried: - lines.append(" (Path continuation - no LLM query)") - - return "\n".join(lines) - - -class EnhancedOrchestrator: - """ - Enhanced turn orchestrator with action economy and improved logging. - """ - - def __init__(self, grid, fov_layer, world: WorldGraph, agents: list, - screenshot_dir: str, llm_query_fn: Callable): - """ - Initialize enhanced orchestrator. - - Args: - grid: mcrfpy.Grid instance - fov_layer: Color layer for FOV rendering - world: WorldGraph instance - agents: List of Agent objects - screenshot_dir: Directory for screenshots - llm_query_fn: Function(agent, screenshot_path, context) -> str - """ - self.grid = grid - self.fov_layer = fov_layer - self.world = world - self.agents = agents - self.screenshot_dir = screenshot_dir - self.llm_query_fn = llm_query_fn - - self.executor = EnhancedExecutor(grid, world) - self.turn_number = 0 - self.steps: List[EnhancedSimulationStep] = [] - self.speech_log: List[Dict] = [] - - os.makedirs(screenshot_dir, exist_ok=True) - - def run_simulation(self, max_turns: int = 10, - stop_condition: Callable = None) -> EnhancedSimulationLog: - """ - Run complete simulation with enhanced logging. - - Args: - max_turns: Maximum number of turns - stop_condition: Optional callable(orchestrator) -> bool - - Returns: - EnhancedSimulationLog for offline viewer - """ - print(f"\nStarting enhanced simulation: max {max_turns} turns") - print(f"Agents: {[a.name for a in self.agents]}") - print("=" * 60) - - for turn in range(max_turns): - self.run_turn() - - if stop_condition and stop_condition(self): - print(f"\nStop condition met at turn {self.turn_number}") - break - - # Build log - log = EnhancedSimulationLog( - metadata={ - "total_turns": self.turn_number, - "num_agents": len(self.agents), - "agent_names": [a.name for a in self.agents], - "timestamp_start": self.steps[0].timestamp if self.steps else "", - "timestamp_end": self.steps[-1].timestamp if self.steps else "", - "world_rooms": list(self.world.rooms.keys()), - "screenshot_dir": self.screenshot_dir, - }, - steps=self.steps, - speech_log=self.speech_log - ) - - return log - - def run_turn(self) -> List[EnhancedSimulationStep]: - """Execute one full turn (all agents act once).""" - import mcrfpy - - self.turn_number += 1 - turn_steps = [] - - print(f"\n{'='*60}") - print(f"TURN {self.turn_number}") - print("=" * 60) - - for agent in self.agents: - step = self._run_agent_turn(agent) - turn_steps.append(step) - self.steps.append(step) - - return turn_steps - - def _run_agent_turn(self, agent) -> EnhancedSimulationStep: - """Execute one agent's turn with action economy.""" - import mcrfpy - from mcrfpy import automation - - print(f"\n--- {agent.name}'s Turn ---") - - # Initialize step record - step = EnhancedSimulationStep( - turn=self.turn_number, - agent_id=agent.name, - position_start=agent.pos, - room=agent.current_room - ) - - # Check for path continuation - path_state = self.executor.get_path_state(agent.name) - current_visible = self._get_visible_entity_ids(agent) - - if path_state.has_path: - # Check for FOV interrupt - if path_state.should_interrupt(current_visible): - print(f" Path interrupted: new entity in FOV") - path_state.clear() - else: - # Continue path without LLM query - result = self.executor.continue_path(agent, current_visible) - if result and result.success: - step.llm_was_queried = False - step.path_in_progress = True - step.final_action_type = "GO" - step.final_action_args = ("CONTINUE",) - step.final_action_success = True - step.final_action_message = result.message - step.position_end = result.new_position or agent.pos - step.path_taken = result.path or [] - step.path_remaining = self.executor.get_path_state(agent.name).remaining_tiles - - print(f" Path continuation: {result.message}") - return step - - # Need LLM query - set up perspective - step.visible_entities = list(current_visible) - self._switch_perspective(agent) - mcrfpy.step(0.016) - - # Take screenshot - screenshot_path = os.path.join( - self.screenshot_dir, - f"turn{self.turn_number}_{agent.name.lower()}.png" - ) - automation.screenshot(screenshot_path) - step.screenshot_path = screenshot_path - - # Collect points of interest - poi_collector = PointOfInterestCollector(self.grid, agent.pos) - pois = poi_collector.collect_from_fov(self.world) - step.points_of_interest = [asdict(p) for p in pois] - step.poi_prompt = poi_collector.format_for_prompt() - - # Get pending messages - messages = self.executor.get_pending_messages(agent.name) - step.pending_messages = [asdict(m) for m in messages] - - # Build context - visible_agents = self._get_visible_agents(agent) - context = agent.get_context(visible_agents + [agent]) - step.location_description = context["location"] - step.available_actions = context["available_actions"] - - # Turn state for action economy - turn_state = TurnState() - - # Error feedback for retry loop - last_error = None - MAX_RETRIES = 3 - - # Action loop - handle free actions until turn-ending action - while not turn_state.turn_ended: - # Build prompt with current state (includes error feedback if any) - prompt = self._build_prompt(agent, context, step.poi_prompt, messages, turn_state, last_error) - step.llm_prompt_user = prompt # Store last prompt - - # Query LLM - print(f" Querying LLM...") - response = self.llm_query_fn(agent, screenshot_path, { - **context, - "poi_prompt": step.poi_prompt, - "messages": [asdict(m) for m in messages], - "has_spoken": turn_state.has_spoken, - "last_error": last_error, - "conversation_history": step.llm_exchanges # Include past exchanges - }) - step.llm_response = response - print(f" Response: {response[:200]}...") - - # Parse action - action = parse_action(response) - cost = get_action_cost(action) - - print(f" Action: {action.type.value} {action.args} (cost: {cost.value})") - - # Track this exchange - exchange = { - "prompt": prompt[:500], # Truncate for storage - "response": response, - "action_type": action.type.value, - "action_args": action.args, - "error": None - } - - # Execute action based on type - if action.type == ActionType.LOOK: - result = self.executor.execute_look(agent, action) - turn_state.record_free_action("LOOK", { - "target": result.target_name, - "description": result.description - }) - step.free_actions.append({ - "action_type": "LOOK", - "args": action.args, - "result": {"description": result.description} - }) - # Provide result and continue loop for another action - context["look_result"] = result.description - last_error = None # Clear error on success - print(f" LOOK result: {result.description[:100]}...") - - elif action.type in (ActionType.SPEAK, ActionType.ANNOUNCE): - if not turn_state.can_speak(): - print(f" Already spoke this turn") - last_error = "You have already spoken this turn. Choose a different action." - exchange["error"] = last_error - step.action_retries += 1 - if step.action_retries >= MAX_RETRIES: - # Force end turn - step.final_action_type = "WAIT" - step.final_action_args = () - step.final_action_success = False - step.final_action_message = "Too many invalid actions - turn ended" - step.position_end = agent.pos - turn_state.end_turn() - else: - result = self.executor.execute_speech( - agent, action, self.agents, self.turn_number - ) - turn_state.record_speech() - turn_state.record_free_action(action.type.value, { - "content": result.content, - "recipients": result.recipients - }) - step.free_actions.append({ - "action_type": action.type.value, - "args": action.args, - "result": { - "content": result.content, - "recipients": result.recipients - } - }) - # Record in speech log - self.speech_log.append({ - "turn": self.turn_number, - "speaker": agent.name, - "type": result.speech_type, - "content": result.content, - "recipients": result.recipients - }) - last_error = None - print(f" {result.speech_type.upper()}: {result.content[:50]}... -> {result.recipients}") - # Continue loop for another action (can still move) - - elif action.type == ActionType.TAKE: - result = self.executor.execute_take(agent, action) - if result.success: - step.final_action_type = "TAKE" - step.final_action_args = action.args - step.final_action_success = True - step.final_action_message = result.message - step.position_end = agent.pos - last_error = None - turn_state.end_turn() - print(f" TAKE: {result.message}") - else: - # Failed - give error feedback and let LLM try again - last_error = result.message - exchange["error"] = last_error - step.action_retries += 1 - print(f" TAKE FAILED: {result.message}") - if step.action_retries >= MAX_RETRIES: - step.final_action_type = "TAKE" - step.final_action_args = action.args - step.final_action_success = False - step.final_action_message = result.message - step.position_end = agent.pos - turn_state.end_turn() - - elif action.type == ActionType.GO: - result = self.executor.execute_move(agent, action) - if result.success: - step.final_action_type = "GO" - step.final_action_args = action.args - step.final_action_success = True - step.final_action_message = result.message - step.position_end = result.new_position or agent.pos - step.path_taken = result.path or [] - last_error = None - turn_state.end_turn() - print(f" MOVE: {result.message}") - else: - # Failed - give error feedback - last_error = result.message - exchange["error"] = last_error - step.action_retries += 1 - print(f" MOVE FAILED: {result.message}") - if step.action_retries >= MAX_RETRIES: - step.final_action_type = "GO" - step.final_action_args = action.args - step.final_action_success = False - step.final_action_message = result.message - step.position_end = agent.pos - turn_state.end_turn() - - elif action.type == ActionType.WAIT: - result = self.executor.execute_wait(agent, action) - step.final_action_type = "WAIT" - step.final_action_args = () - step.final_action_success = True - step.final_action_message = result.message - step.position_end = agent.pos - last_error = None - turn_state.end_turn() - print(f" WAIT") - - elif action.type == ActionType.INVALID: - # Could not parse action - give feedback - last_error = f"Could not understand your action. Please use a valid action format like 'Action: GO EAST' or 'Action: TAKE key'." - exchange["error"] = last_error - step.action_retries += 1 - print(f" INVALID ACTION: {action.args}") - if step.action_retries >= MAX_RETRIES: - step.final_action_type = "INVALID" - step.final_action_args = action.args - step.final_action_success = False - step.final_action_message = "Could not parse action" - step.position_end = agent.pos - turn_state.end_turn() - - else: - # Unimplemented action type - give feedback - last_error = f"The action '{action.type.value}' is not yet supported. Try GO, TAKE, LOOK, SPEAK, or WAIT." - exchange["error"] = last_error - step.action_retries += 1 - print(f" Unsupported: {action.type.value}") - if step.action_retries >= MAX_RETRIES: - step.final_action_type = action.type.value - step.final_action_args = action.args - step.final_action_success = False - step.final_action_message = f"Unsupported action: {action.type.value}" - step.position_end = agent.pos - turn_state.end_turn() - - # Record exchange - step.llm_exchanges.append(exchange) - - return step - - def _build_prompt(self, agent, context: dict, poi_prompt: str, - messages: List[Message], turn_state: TurnState, - last_error: Optional[str] = None) -> str: - """Build LLM prompt with current state and error feedback.""" - parts = [context["location"]] - - # Add messages received - if messages: - parts.append("\nMessages received:") - for msg in messages: - if msg.speech_type == "announce": - parts.append(f' {msg.sender} announces: "{msg.content}"') - else: - parts.append(f' {msg.sender} says: "{msg.content}"') - - # Add points of interest - parts.append(f"\n{poi_prompt}") - - # Add available actions - actions_str = ", ".join(context["available_actions"]) - parts.append(f"\nAvailable actions: {actions_str}") - - # Add LOOK result if we just looked - if "look_result" in context: - parts.append(f"\n[LOOK result: {context['look_result']}]") - - # Add constraints - constraints = [] - if turn_state.has_spoken: - constraints.append("You have already spoken this turn.") - if constraints: - parts.append(f"\nConstraints: {' '.join(constraints)}") - - # Add error feedback from last action attempt - if last_error: - parts.append(f"\n[ERROR: {last_error}]") - parts.append("[Please try a different action.]") - - parts.append("\nWhat do you do? Brief reasoning, then Action: ") - - return "\n".join(parts) - - def _switch_perspective(self, agent): - """Switch grid view to agent's perspective.""" - import mcrfpy - - self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) - self.fov_layer.apply_perspective( - entity=agent.entity, - visible=mcrfpy.Color(0, 0, 0, 0), - discovered=mcrfpy.Color(40, 40, 60, 180), - unknown=mcrfpy.Color(0, 0, 0, 255) - ) - agent.entity.update_visibility() - - px, py = agent.pos - self.grid.center = (px * 16 + 8, py * 16 + 8) - - def _get_visible_agents(self, observer) -> list: - """Get agents visible to observer based on FOV.""" - visible = [] - for agent in self.agents: - if agent.name == observer.name: - continue - ax, ay = agent.pos - if self.grid.is_in_fov(ax, ay): - visible.append(agent) - return visible - - def _get_visible_entity_ids(self, agent) -> Set[str]: - """Get set of entity IDs currently visible to agent.""" - visible = set() - ax, ay = agent.pos - - for entity in self.grid.entities: - if entity is agent.entity: - continue - ex, ey = int(entity.pos[0]), int(entity.pos[1]) - if self.grid.is_in_fov(ex, ey): - entity_id = getattr(entity, 'id', None) or str(id(entity)) - visible.add(entity_id) - - return visible diff --git a/tools/generate_stubs_v2.py b/tools/generate_stubs_v2.py index 3de0ad4..77e8ddb 100644 --- a/tools/generate_stubs_v2.py +++ b/tools/generate_stubs_v2.py @@ -87,31 +87,22 @@ class Font: class Drawable: """Base class for all drawable UI elements.""" - + x: float y: float visible: bool z_index: int name: str pos: Vector - - # Mouse event callbacks (#140, #141) - on_click: Optional[Callable[[float, float, int, str], None]] - on_enter: Optional[Callable[[float, float, int, str], None]] - on_exit: Optional[Callable[[float, float, int, str], None]] - on_move: Optional[Callable[[float, float, int, str], None]] - - # Read-only hover state (#140) - hovered: bool - + def get_bounds(self) -> Tuple[float, float, float, float]: """Get bounding box as (x, y, width, height).""" ... - + def move(self, dx: float, dy: float) -> None: """Move by relative offset (dx, dy).""" ... - + def resize(self, width: float, height: float) -> None: """Resize to new dimensions (width, height).""" ... @@ -352,47 +343,45 @@ class EntityCollection: class Scene: """Base class for object-oriented scenes.""" - + name: str - children: UICollection # #151: UI elements collection (read-only alias for get_ui()) - on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action) - + def __init__(self, name: str) -> None: ... - + def activate(self) -> None: """Called when scene becomes active.""" ... - + def deactivate(self) -> None: """Called when scene becomes inactive.""" ... - + def get_ui(self) -> UICollection: """Get UI elements collection.""" ... - + def on_keypress(self, key: str, pressed: bool) -> None: - """Handle keyboard events (override in subclass).""" + """Handle keyboard events.""" ... - + def on_click(self, x: float, y: float, button: int) -> None: - """Handle mouse clicks (override in subclass).""" + """Handle mouse clicks.""" ... - + def on_enter(self) -> None: - """Called when entering the scene (override in subclass).""" + """Called when entering the scene.""" ... - + def on_exit(self) -> None: - """Called when leaving the scene (override in subclass).""" + """Called when leaving the scene.""" ... - + def on_resize(self, width: int, height: int) -> None: - """Handle window resize events (override in subclass).""" + """Handle window resize events.""" ... - + def update(self, dt: float) -> None: - """Update scene logic (override in subclass).""" + """Update scene logic.""" ... class Timer: