Compare commits
4 commits
da6f4a3e62
...
335efc5514
| Author | SHA1 | Date | |
|---|---|---|---|
| 335efc5514 | |||
| 85e90088d5 | |||
| b6ec0fe7ab | |||
| 89986323f8 |
9 changed files with 3104 additions and 44 deletions
|
|
@ -75,22 +75,31 @@ class Font:
|
|||
|
||||
class Drawable:
|
||||
"""Base class for all drawable UI elements."""
|
||||
|
||||
|
||||
x: float
|
||||
y: float
|
||||
visible: bool
|
||||
z_index: int
|
||||
name: str
|
||||
pos: Vector
|
||||
|
||||
|
||||
# Mouse event callbacks (#140, #141)
|
||||
on_click: Optional[Callable[[float, float, int, str], None]]
|
||||
on_enter: Optional[Callable[[float, float, int, str], None]]
|
||||
on_exit: Optional[Callable[[float, float, int, str], None]]
|
||||
on_move: Optional[Callable[[float, float, int, str], None]]
|
||||
|
||||
# Read-only hover state (#140)
|
||||
hovered: bool
|
||||
|
||||
def get_bounds(self) -> Tuple[float, float, float, float]:
|
||||
"""Get bounding box as (x, y, width, height)."""
|
||||
...
|
||||
|
||||
|
||||
def move(self, dx: float, dy: float) -> None:
|
||||
"""Move by relative offset (dx, dy)."""
|
||||
...
|
||||
|
||||
|
||||
def resize(self, width: float, height: float) -> None:
|
||||
"""Resize to new dimensions (width, height)."""
|
||||
...
|
||||
|
|
@ -331,45 +340,47 @@ class EntityCollection:
|
|||
|
||||
class Scene:
|
||||
"""Base class for object-oriented scenes."""
|
||||
|
||||
|
||||
name: str
|
||||
|
||||
children: UICollection # #151: UI elements collection (read-only alias for get_ui())
|
||||
on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action)
|
||||
|
||||
def __init__(self, name: str) -> None: ...
|
||||
|
||||
|
||||
def activate(self) -> None:
|
||||
"""Called when scene becomes active."""
|
||||
...
|
||||
|
||||
|
||||
def deactivate(self) -> None:
|
||||
"""Called when scene becomes inactive."""
|
||||
...
|
||||
|
||||
|
||||
def get_ui(self) -> UICollection:
|
||||
"""Get UI elements collection."""
|
||||
...
|
||||
|
||||
|
||||
def on_keypress(self, key: str, pressed: bool) -> None:
|
||||
"""Handle keyboard events."""
|
||||
"""Handle keyboard events (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_click(self, x: float, y: float, button: int) -> None:
|
||||
"""Handle mouse clicks."""
|
||||
"""Handle mouse clicks (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_enter(self) -> None:
|
||||
"""Called when entering the scene."""
|
||||
"""Called when entering the scene (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_exit(self) -> None:
|
||||
"""Called when leaving the scene."""
|
||||
"""Called when leaving the scene (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_resize(self, width: int, height: int) -> None:
|
||||
"""Handle window resize events."""
|
||||
"""Handle window resize events (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def update(self, dt: float) -> None:
|
||||
"""Update scene logic."""
|
||||
"""Update scene logic (override in subclass)."""
|
||||
...
|
||||
|
||||
class Timer:
|
||||
|
|
|
|||
808
tests/demo/screens/focus_system_demo.py
Normal file
808
tests/demo/screens/focus_system_demo.py
Normal file
|
|
@ -0,0 +1,808 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Focus System Demo for McRogueFace
|
||||
|
||||
Demonstrates a Python-level focus management system using engine primitives.
|
||||
This shows how game developers can implement keyboard navigation without
|
||||
requiring C++ engine changes.
|
||||
|
||||
Features demonstrated:
|
||||
- Click-to-focus
|
||||
- Tab/Shift+Tab cycling
|
||||
- Visual focus indicators
|
||||
- Keyboard routing to focused widget
|
||||
- Modal focus stack
|
||||
- Three widget types: Grid (WASD), TextInput, MenuIcon
|
||||
|
||||
Issue: #143
|
||||
"""
|
||||
|
||||
import mcrfpy
|
||||
import sys
|
||||
|
||||
# =============================================================================
|
||||
# Modifier Key Tracker (workaround until #160 is implemented)
|
||||
# =============================================================================
|
||||
|
||||
class ModifierTracker:
|
||||
"""Tracks modifier key state since engine doesn't expose this yet."""
|
||||
|
||||
def __init__(self):
|
||||
self.shift = False
|
||||
self.ctrl = False
|
||||
self.alt = False
|
||||
|
||||
def update(self, key: str, action: str):
|
||||
"""Call this from your key handler to update modifier state."""
|
||||
if key in ("LShift", "RShift"):
|
||||
self.shift = (action == "start")
|
||||
elif key in ("LControl", "RControl"):
|
||||
self.ctrl = (action == "start")
|
||||
elif key in ("LAlt", "RAlt"):
|
||||
self.alt = (action == "start")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Focus Manager
|
||||
# =============================================================================
|
||||
|
||||
class FocusManager:
|
||||
"""Central focus coordinator for a scene.
|
||||
|
||||
Manages which widget receives keyboard input, handles tab cycling,
|
||||
and maintains a modal stack for popup dialogs.
|
||||
"""
|
||||
|
||||
# Focus indicator colors
|
||||
FOCUS_COLOR = mcrfpy.Color(0, 150, 255) # Blue
|
||||
UNFOCUS_COLOR = mcrfpy.Color(80, 80, 80) # Dark gray
|
||||
FOCUS_OUTLINE = 3.0
|
||||
UNFOCUS_OUTLINE = 1.0
|
||||
|
||||
def __init__(self):
|
||||
self.widgets = [] # List of (widget, focusable: bool)
|
||||
self.focus_index = -1 # Currently focused widget index
|
||||
self.modal_stack = [] # Stack of (modal_frame, previous_focus_index)
|
||||
self.modifiers = ModifierTracker()
|
||||
|
||||
def register(self, widget, focusable: bool = True):
|
||||
"""Add a widget to the focus order.
|
||||
|
||||
Args:
|
||||
widget: Object implementing on_focus(), on_blur(), handle_key()
|
||||
focusable: Whether this widget can receive focus via Tab
|
||||
"""
|
||||
self.widgets.append((widget, focusable))
|
||||
# Give widget a reference back to us for click-to-focus
|
||||
widget._focus_manager = self
|
||||
widget._focus_index = len(self.widgets) - 1
|
||||
|
||||
def focus(self, widget_or_index):
|
||||
"""Set focus to a specific widget."""
|
||||
# Resolve to index
|
||||
if isinstance(widget_or_index, int):
|
||||
new_index = widget_or_index
|
||||
else:
|
||||
new_index = next(
|
||||
(i for i, (w, _) in enumerate(self.widgets) if w is widget_or_index),
|
||||
-1
|
||||
)
|
||||
|
||||
if new_index < 0 or new_index >= len(self.widgets):
|
||||
return
|
||||
|
||||
# Blur old widget
|
||||
if 0 <= self.focus_index < len(self.widgets):
|
||||
old_widget, _ = self.widgets[self.focus_index]
|
||||
if hasattr(old_widget, 'on_blur'):
|
||||
old_widget.on_blur()
|
||||
|
||||
# Focus new widget
|
||||
self.focus_index = new_index
|
||||
new_widget, _ = self.widgets[new_index]
|
||||
if hasattr(new_widget, 'on_focus'):
|
||||
new_widget.on_focus()
|
||||
|
||||
def cycle(self, direction: int = 1):
|
||||
"""Cycle focus to next/previous focusable widget.
|
||||
|
||||
Args:
|
||||
direction: 1 for next (Tab), -1 for previous (Shift+Tab)
|
||||
"""
|
||||
if not self.widgets:
|
||||
return
|
||||
|
||||
start = self.focus_index if self.focus_index >= 0 else 0
|
||||
current = start
|
||||
|
||||
for _ in range(len(self.widgets)):
|
||||
current = (current + direction) % len(self.widgets)
|
||||
widget, focusable = self.widgets[current]
|
||||
if focusable:
|
||||
self.focus(current)
|
||||
return
|
||||
|
||||
# No focusable widget found, stay where we are
|
||||
|
||||
def push_modal(self, modal_frame, first_focus_widget=None):
|
||||
"""Push a modal onto the focus stack.
|
||||
|
||||
Args:
|
||||
modal_frame: The Frame to show as modal
|
||||
first_focus_widget: Widget to focus inside modal (optional)
|
||||
"""
|
||||
# Save current focus
|
||||
self.modal_stack.append((modal_frame, self.focus_index))
|
||||
|
||||
# Show modal
|
||||
modal_frame.visible = True
|
||||
|
||||
# Focus first widget in modal if specified
|
||||
if first_focus_widget is not None:
|
||||
self.focus(first_focus_widget)
|
||||
|
||||
def pop_modal(self):
|
||||
"""Pop the top modal and restore previous focus."""
|
||||
if not self.modal_stack:
|
||||
return False
|
||||
|
||||
modal_frame, previous_focus = self.modal_stack.pop()
|
||||
modal_frame.visible = False
|
||||
|
||||
# Restore focus
|
||||
if previous_focus >= 0:
|
||||
self.focus(previous_focus)
|
||||
|
||||
return True
|
||||
|
||||
def handle_key(self, key: str, action: str) -> bool:
|
||||
"""Main key handler - route to focused widget or handle global keys.
|
||||
|
||||
Returns True if key was consumed.
|
||||
"""
|
||||
# Always update modifier state
|
||||
self.modifiers.update(key, action)
|
||||
|
||||
# Only process on key press, not release (key repeat sends multiple "start")
|
||||
if action != "start":
|
||||
return False
|
||||
|
||||
# Global: Escape closes modals
|
||||
if key == "Escape":
|
||||
if self.pop_modal():
|
||||
return True
|
||||
|
||||
# Global: Tab cycles focus
|
||||
if key == "Tab":
|
||||
direction = -1 if self.modifiers.shift else 1
|
||||
self.cycle(direction)
|
||||
return True
|
||||
|
||||
# Route to focused widget
|
||||
if 0 <= self.focus_index < len(self.widgets):
|
||||
widget, _ = self.widgets[self.focus_index]
|
||||
if hasattr(widget, 'handle_key'):
|
||||
if widget.handle_key(key, action):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Focusable Widgets
|
||||
# =============================================================================
|
||||
|
||||
class FocusableGrid:
|
||||
"""A grid where WASD keys move a player entity.
|
||||
|
||||
Demonstrates focus on a game-world element.
|
||||
"""
|
||||
|
||||
def __init__(self, x: float, y: float, grid_w: int, grid_h: int,
|
||||
tile_size: int = 16, zoom: float = 2.0):
|
||||
self.grid_w = grid_w
|
||||
self.grid_h = grid_h
|
||||
self.tile_size = tile_size
|
||||
self.zoom = zoom
|
||||
self.base_x = x
|
||||
self.base_y = y
|
||||
|
||||
# Calculate pixel dimensions
|
||||
self.cell_px = tile_size * zoom # Pixels per cell
|
||||
grid_pixel_w = grid_w * self.cell_px
|
||||
grid_pixel_h = grid_h * self.cell_px
|
||||
|
||||
# Create the grid background
|
||||
self.grid = mcrfpy.Grid(
|
||||
pos=(x, y),
|
||||
grid_size=(grid_w, grid_h),
|
||||
size=(grid_pixel_w, grid_pixel_h)
|
||||
)
|
||||
self.grid.zoom = zoom
|
||||
self.grid.fill_color = mcrfpy.Color(40, 40, 55)
|
||||
|
||||
# Add outline frame for focus indication
|
||||
self.outline_frame = mcrfpy.Frame(
|
||||
pos=(x - 2, y - 2),
|
||||
size=(grid_pixel_w + 4, grid_pixel_h + 4),
|
||||
fill_color=mcrfpy.Color(0, 0, 0, 0),
|
||||
outline_color=FocusManager.UNFOCUS_COLOR,
|
||||
outline=FocusManager.UNFOCUS_OUTLINE
|
||||
)
|
||||
|
||||
# Player marker (a bright square overlay)
|
||||
self.player_x = grid_w // 2
|
||||
self.player_y = grid_h // 2
|
||||
marker_size = self.cell_px - 4 # Slightly smaller than cell
|
||||
self.player_marker = mcrfpy.Frame(
|
||||
pos=(0, 0), # Will be positioned by _update_player_display
|
||||
size=(marker_size, marker_size),
|
||||
fill_color=mcrfpy.Color(255, 200, 50),
|
||||
outline_color=mcrfpy.Color(255, 150, 0),
|
||||
outline=2
|
||||
)
|
||||
self._update_player_display()
|
||||
|
||||
# Click handler
|
||||
self.grid.on_click = self._on_click
|
||||
|
||||
# Focus manager reference (set by FocusManager.register)
|
||||
self._focus_manager = None
|
||||
self._focus_index = -1
|
||||
|
||||
def _on_click(self, x, y, button, action):
|
||||
"""Handle click to focus this grid."""
|
||||
if self._focus_manager and action == "start":
|
||||
self._focus_manager.focus(self._focus_index)
|
||||
|
||||
def _update_player_display(self):
|
||||
"""Update the visual representation of player position."""
|
||||
# Position the player marker
|
||||
px = self.base_x + (self.player_x * self.cell_px) + 2
|
||||
py = self.base_y + (self.player_y * self.cell_px) + 2
|
||||
self.player_marker.x = px
|
||||
self.player_marker.y = py
|
||||
|
||||
def on_focus(self):
|
||||
"""Called when this widget gains focus."""
|
||||
self.outline_frame.outline_color = FocusManager.FOCUS_COLOR
|
||||
self.outline_frame.outline = FocusManager.FOCUS_OUTLINE
|
||||
|
||||
def on_blur(self):
|
||||
"""Called when this widget loses focus."""
|
||||
self.outline_frame.outline_color = FocusManager.UNFOCUS_COLOR
|
||||
self.outline_frame.outline = FocusManager.UNFOCUS_OUTLINE
|
||||
|
||||
def handle_key(self, key: str, action: str) -> bool:
|
||||
"""Handle WASD movement."""
|
||||
moves = {
|
||||
"W": (0, -1), "Up": (0, -1),
|
||||
"A": (-1, 0), "Left": (-1, 0),
|
||||
"S": (0, 1), "Down": (0, 1),
|
||||
"D": (1, 0), "Right": (1, 0),
|
||||
}
|
||||
|
||||
if key in moves:
|
||||
dx, dy = moves[key]
|
||||
new_x = self.player_x + dx
|
||||
new_y = self.player_y + dy
|
||||
|
||||
# Bounds check
|
||||
if 0 <= new_x < self.grid_w and 0 <= new_y < self.grid_h:
|
||||
self.player_x = new_x
|
||||
self.player_y = new_y
|
||||
self._update_player_display()
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def add_to_scene(self, ui):
|
||||
"""Add all components to a scene's UI collection."""
|
||||
ui.append(self.outline_frame)
|
||||
ui.append(self.grid)
|
||||
ui.append(self.player_marker)
|
||||
|
||||
|
||||
class TextInputWidget:
|
||||
"""A text input field with cursor and editing.
|
||||
|
||||
Demonstrates text entry with focus indication.
|
||||
"""
|
||||
|
||||
def __init__(self, x: float, y: float, width: float, label: str = "",
|
||||
placeholder: str = ""):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.width = width
|
||||
self.height = 28
|
||||
self.label_text = label
|
||||
self.placeholder_text = placeholder
|
||||
|
||||
# State
|
||||
self.text = ""
|
||||
self.cursor_pos = 0
|
||||
self.focused = False
|
||||
|
||||
# Create UI elements
|
||||
self._create_ui()
|
||||
|
||||
# Focus manager reference
|
||||
self._focus_manager = None
|
||||
self._focus_index = -1
|
||||
|
||||
def _create_ui(self):
|
||||
"""Create the visual components."""
|
||||
# Label above input
|
||||
if self.label_text:
|
||||
self.label = mcrfpy.Caption(
|
||||
text=self.label_text,
|
||||
pos=(self.x, self.y - 20)
|
||||
)
|
||||
self.label.fill_color = mcrfpy.Color(200, 200, 200)
|
||||
|
||||
# Input background
|
||||
self.frame = mcrfpy.Frame(
|
||||
pos=(self.x, self.y),
|
||||
size=(self.width, self.height),
|
||||
fill_color=mcrfpy.Color(40, 40, 50),
|
||||
outline_color=FocusManager.UNFOCUS_COLOR,
|
||||
outline=FocusManager.UNFOCUS_OUTLINE
|
||||
)
|
||||
self.frame.on_click = self._on_click
|
||||
|
||||
# Placeholder text
|
||||
self.placeholder = mcrfpy.Caption(
|
||||
text=self.placeholder_text,
|
||||
pos=(self.x + 6, self.y + 5)
|
||||
)
|
||||
self.placeholder.fill_color = mcrfpy.Color(100, 100, 100)
|
||||
|
||||
# Actual text display
|
||||
self.display = mcrfpy.Caption(
|
||||
text="",
|
||||
pos=(self.x + 6, self.y + 5)
|
||||
)
|
||||
self.display.fill_color = mcrfpy.Color(255, 255, 255)
|
||||
|
||||
# Cursor (thin frame)
|
||||
self.cursor = mcrfpy.Frame(
|
||||
pos=(self.x + 6, self.y + 4),
|
||||
size=(2, self.height - 8),
|
||||
fill_color=mcrfpy.Color(255, 255, 255)
|
||||
)
|
||||
self.cursor.visible = False
|
||||
|
||||
def _on_click(self, x, y, button, action):
|
||||
"""Handle click to focus."""
|
||||
if self._focus_manager and action == "start":
|
||||
self._focus_manager.focus(self._focus_index)
|
||||
|
||||
def _update_display(self):
|
||||
"""Update visual state."""
|
||||
self.display.text = self.text
|
||||
self.placeholder.visible = (not self.text and not self.focused)
|
||||
self._update_cursor()
|
||||
|
||||
def _update_cursor(self):
|
||||
"""Update cursor position."""
|
||||
# Approximate character width (monospace assumption)
|
||||
char_width = 10
|
||||
self.cursor.x = self.x + 6 + (self.cursor_pos * char_width)
|
||||
|
||||
def on_focus(self):
|
||||
"""Called when gaining focus."""
|
||||
self.focused = True
|
||||
self.frame.outline_color = FocusManager.FOCUS_COLOR
|
||||
self.frame.outline = FocusManager.FOCUS_OUTLINE
|
||||
self.cursor.visible = True
|
||||
self._update_display()
|
||||
|
||||
def on_blur(self):
|
||||
"""Called when losing focus."""
|
||||
self.focused = False
|
||||
self.frame.outline_color = FocusManager.UNFOCUS_COLOR
|
||||
self.frame.outline = FocusManager.UNFOCUS_OUTLINE
|
||||
self.cursor.visible = False
|
||||
self._update_display()
|
||||
|
||||
def handle_key(self, key: str, action: str) -> bool:
|
||||
"""Handle text input and editing keys."""
|
||||
if not self.focused:
|
||||
return False
|
||||
|
||||
old_text = self.text
|
||||
handled = True
|
||||
|
||||
if key == "BackSpace":
|
||||
if self.cursor_pos > 0:
|
||||
self.text = self.text[:self.cursor_pos-1] + self.text[self.cursor_pos:]
|
||||
self.cursor_pos -= 1
|
||||
elif key == "Delete":
|
||||
if self.cursor_pos < len(self.text):
|
||||
self.text = self.text[:self.cursor_pos] + self.text[self.cursor_pos+1:]
|
||||
elif key == "Left":
|
||||
self.cursor_pos = max(0, self.cursor_pos - 1)
|
||||
elif key == "Right":
|
||||
self.cursor_pos = min(len(self.text), self.cursor_pos + 1)
|
||||
elif key == "Home":
|
||||
self.cursor_pos = 0
|
||||
elif key == "End":
|
||||
self.cursor_pos = len(self.text)
|
||||
elif key in ("Return", "Tab"):
|
||||
# Don't consume - let focus manager handle
|
||||
handled = False
|
||||
elif len(key) == 1 and key.isprintable():
|
||||
# Insert character
|
||||
self.text = self.text[:self.cursor_pos] + key + self.text[self.cursor_pos:]
|
||||
self.cursor_pos += 1
|
||||
else:
|
||||
handled = False
|
||||
|
||||
self._update_display()
|
||||
return handled
|
||||
|
||||
def get_text(self) -> str:
|
||||
"""Get the current text value."""
|
||||
return self.text
|
||||
|
||||
def set_text(self, text: str):
|
||||
"""Set the text value."""
|
||||
self.text = text
|
||||
self.cursor_pos = len(text)
|
||||
self._update_display()
|
||||
|
||||
def add_to_scene(self, ui):
|
||||
"""Add all components to the scene."""
|
||||
if hasattr(self, 'label'):
|
||||
ui.append(self.label)
|
||||
ui.append(self.frame)
|
||||
ui.append(self.placeholder)
|
||||
ui.append(self.display)
|
||||
ui.append(self.cursor)
|
||||
|
||||
|
||||
class MenuIcon:
|
||||
"""An icon that opens a modal dialog when activated.
|
||||
|
||||
Demonstrates activation via Space/Enter and modal focus.
|
||||
"""
|
||||
|
||||
def __init__(self, x: float, y: float, size: float, icon_char: str,
|
||||
tooltip: str, modal_content_builder=None):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.size = size
|
||||
self.tooltip = tooltip
|
||||
self.modal_content_builder = modal_content_builder
|
||||
self.modal = None
|
||||
|
||||
# Create icon frame
|
||||
self.frame = mcrfpy.Frame(
|
||||
pos=(x, y),
|
||||
size=(size, size),
|
||||
fill_color=mcrfpy.Color(60, 60, 80),
|
||||
outline_color=FocusManager.UNFOCUS_COLOR,
|
||||
outline=FocusManager.UNFOCUS_OUTLINE
|
||||
)
|
||||
self.frame.on_click = self._on_click
|
||||
|
||||
# Icon character (centered)
|
||||
self.icon = mcrfpy.Caption(
|
||||
text=icon_char,
|
||||
pos=(x + size//3, y + size//6)
|
||||
)
|
||||
self.icon.fill_color = mcrfpy.Color(200, 200, 220)
|
||||
|
||||
# Tooltip (shown on hover/focus)
|
||||
self.tooltip_caption = mcrfpy.Caption(
|
||||
text=tooltip,
|
||||
pos=(x, y + size + 4)
|
||||
)
|
||||
self.tooltip_caption.fill_color = mcrfpy.Color(150, 150, 150)
|
||||
self.tooltip_caption.visible = False
|
||||
|
||||
# Focus manager reference
|
||||
self._focus_manager = None
|
||||
self._focus_index = -1
|
||||
|
||||
def _on_click(self, x, y, button, action):
|
||||
"""Handle click to focus or activate."""
|
||||
if not self._focus_manager:
|
||||
return
|
||||
|
||||
if action == "start":
|
||||
# If already focused, activate; otherwise just focus
|
||||
if self._focus_manager.focus_index == self._focus_index:
|
||||
self._activate()
|
||||
else:
|
||||
self._focus_manager.focus(self._focus_index)
|
||||
|
||||
def _activate(self):
|
||||
"""Open the modal dialog."""
|
||||
if self.modal and self._focus_manager:
|
||||
self._focus_manager.push_modal(self.modal)
|
||||
|
||||
def on_focus(self):
|
||||
"""Called when gaining focus."""
|
||||
self.frame.outline_color = FocusManager.FOCUS_COLOR
|
||||
self.frame.outline = FocusManager.FOCUS_OUTLINE
|
||||
self.frame.fill_color = mcrfpy.Color(80, 80, 110)
|
||||
self.tooltip_caption.visible = True
|
||||
|
||||
def on_blur(self):
|
||||
"""Called when losing focus."""
|
||||
self.frame.outline_color = FocusManager.UNFOCUS_COLOR
|
||||
self.frame.outline = FocusManager.UNFOCUS_OUTLINE
|
||||
self.frame.fill_color = mcrfpy.Color(60, 60, 80)
|
||||
self.tooltip_caption.visible = False
|
||||
|
||||
def handle_key(self, key: str, action: str) -> bool:
|
||||
"""Handle activation keys."""
|
||||
if key in ("Space", "Return"):
|
||||
self._activate()
|
||||
return True
|
||||
return False
|
||||
|
||||
def set_modal(self, modal_frame):
|
||||
"""Set the modal frame this icon opens."""
|
||||
self.modal = modal_frame
|
||||
|
||||
def add_to_scene(self, ui):
|
||||
"""Add all components to the scene."""
|
||||
ui.append(self.frame)
|
||||
ui.append(self.icon)
|
||||
ui.append(self.tooltip_caption)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Modal Dialog Builder
|
||||
# =============================================================================
|
||||
|
||||
def create_modal(x: float, y: float, width: float, height: float,
|
||||
title: str) -> mcrfpy.Frame:
|
||||
"""Create a modal dialog frame."""
|
||||
# Semi-transparent backdrop
|
||||
# Note: This is simplified - real implementation might want fullscreen backdrop
|
||||
|
||||
# Modal frame
|
||||
modal = mcrfpy.Frame(
|
||||
pos=(x, y),
|
||||
size=(width, height),
|
||||
fill_color=mcrfpy.Color(40, 40, 50),
|
||||
outline_color=mcrfpy.Color(100, 100, 120),
|
||||
outline=2
|
||||
)
|
||||
modal.visible = False
|
||||
|
||||
# Title
|
||||
title_caption = mcrfpy.Caption(
|
||||
text=title,
|
||||
pos=(x + 10, y + 8)
|
||||
)
|
||||
title_caption.fill_color = mcrfpy.Color(220, 220, 240)
|
||||
modal.children.append(title_caption)
|
||||
|
||||
# Close hint
|
||||
close_hint = mcrfpy.Caption(
|
||||
text="[Esc to close]",
|
||||
pos=(x + width - 100, y + 8)
|
||||
)
|
||||
close_hint.fill_color = mcrfpy.Color(120, 120, 140)
|
||||
modal.children.append(close_hint)
|
||||
|
||||
return modal
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Demo Scene Setup
|
||||
# =============================================================================
|
||||
|
||||
def create_demo_scene():
|
||||
"""Create and populate the focus system demo scene."""
|
||||
|
||||
# Create scene
|
||||
mcrfpy.createScene("focus_demo")
|
||||
ui = mcrfpy.sceneUI("focus_demo")
|
||||
|
||||
# Background
|
||||
bg = mcrfpy.Frame(
|
||||
pos=(0, 0),
|
||||
size=(1024, 768),
|
||||
fill_color=mcrfpy.Color(25, 25, 35)
|
||||
)
|
||||
ui.append(bg)
|
||||
|
||||
# Title
|
||||
title = mcrfpy.Caption(
|
||||
text="Focus System Demo",
|
||||
pos=(20, 15)
|
||||
)
|
||||
title.fill_color = mcrfpy.Color(255, 255, 255)
|
||||
ui.append(title)
|
||||
|
||||
# Instructions
|
||||
instructions = mcrfpy.Caption(
|
||||
text="Tab: cycle focus | Shift+Tab: reverse | WASD: move in grid | Space/Enter: activate | Esc: close modal",
|
||||
pos=(20, 45)
|
||||
)
|
||||
instructions.fill_color = mcrfpy.Color(150, 150, 170)
|
||||
ui.append(instructions)
|
||||
|
||||
# Create focus manager
|
||||
focus_mgr = FocusManager()
|
||||
|
||||
# --- Grid Section ---
|
||||
grid_label = mcrfpy.Caption(text="Game Grid (WASD to move)", pos=(50, 90))
|
||||
grid_label.fill_color = mcrfpy.Color(180, 180, 200)
|
||||
ui.append(grid_label)
|
||||
|
||||
grid_widget = FocusableGrid(50, 115, 10, 8, tile_size=16, zoom=2.0)
|
||||
grid_widget.add_to_scene(ui)
|
||||
focus_mgr.register(grid_widget)
|
||||
|
||||
# --- Text Inputs Section ---
|
||||
input_label = mcrfpy.Caption(text="Text Inputs", pos=(400, 90))
|
||||
input_label.fill_color = mcrfpy.Color(180, 180, 200)
|
||||
ui.append(input_label)
|
||||
|
||||
name_input = TextInputWidget(400, 130, 250, label="Name:", placeholder="Enter your name")
|
||||
name_input.add_to_scene(ui)
|
||||
focus_mgr.register(name_input)
|
||||
|
||||
class_input = TextInputWidget(400, 200, 250, label="Class:", placeholder="e.g. Warrior, Mage")
|
||||
class_input.add_to_scene(ui)
|
||||
focus_mgr.register(class_input)
|
||||
|
||||
notes_input = TextInputWidget(400, 270, 350, label="Notes:", placeholder="Additional notes...")
|
||||
notes_input.add_to_scene(ui)
|
||||
focus_mgr.register(notes_input)
|
||||
|
||||
# --- Menu Icons Section ---
|
||||
icons_label = mcrfpy.Caption(text="Menu Icons", pos=(50, 390))
|
||||
icons_label.fill_color = mcrfpy.Color(180, 180, 200)
|
||||
ui.append(icons_label)
|
||||
|
||||
# Help icon
|
||||
help_icon = MenuIcon(50, 420, 48, "?", "Help")
|
||||
help_icon.add_to_scene(ui)
|
||||
focus_mgr.register(help_icon)
|
||||
|
||||
help_modal = create_modal(200, 150, 400, 300, "Help")
|
||||
ui.append(help_modal)
|
||||
help_text = mcrfpy.Caption(
|
||||
text="This demo shows focus management.\n\nUse Tab to move between widgets.\nWASD moves the player in the grid.\nType in text fields.\nPress Space on icons to open dialogs.",
|
||||
pos=(210, 190)
|
||||
)
|
||||
help_text.fill_color = mcrfpy.Color(200, 200, 200)
|
||||
help_modal.children.append(help_text)
|
||||
help_icon.set_modal(help_modal)
|
||||
|
||||
# Settings icon
|
||||
settings_icon = MenuIcon(110, 420, 48, "S", "Settings")
|
||||
settings_icon.add_to_scene(ui)
|
||||
focus_mgr.register(settings_icon)
|
||||
|
||||
settings_modal = create_modal(200, 150, 400, 250, "Settings")
|
||||
ui.append(settings_modal)
|
||||
settings_text = mcrfpy.Caption(
|
||||
text="Settings would go here.\n\n(This is a placeholder modal)",
|
||||
pos=(210, 190)
|
||||
)
|
||||
settings_text.fill_color = mcrfpy.Color(200, 200, 200)
|
||||
settings_modal.children.append(settings_text)
|
||||
settings_icon.set_modal(settings_modal)
|
||||
|
||||
# Inventory icon
|
||||
inv_icon = MenuIcon(170, 420, 48, "I", "Inventory")
|
||||
inv_icon.add_to_scene(ui)
|
||||
focus_mgr.register(inv_icon)
|
||||
|
||||
inv_modal = create_modal(200, 150, 400, 300, "Inventory")
|
||||
ui.append(inv_modal)
|
||||
inv_text = mcrfpy.Caption(
|
||||
text="Your inventory:\n\n- Sword\n- Shield\n- 3x Potions",
|
||||
pos=(210, 190)
|
||||
)
|
||||
inv_text.fill_color = mcrfpy.Color(200, 200, 200)
|
||||
inv_modal.children.append(inv_text)
|
||||
inv_icon.set_modal(inv_modal)
|
||||
|
||||
# --- Status Display ---
|
||||
status_frame = mcrfpy.Frame(
|
||||
pos=(50, 520),
|
||||
size=(700, 80),
|
||||
fill_color=mcrfpy.Color(35, 35, 45),
|
||||
outline_color=mcrfpy.Color(60, 60, 70),
|
||||
outline=1
|
||||
)
|
||||
ui.append(status_frame)
|
||||
|
||||
status_label = mcrfpy.Caption(text="Status", pos=(60, 530))
|
||||
status_label.fill_color = mcrfpy.Color(150, 150, 170)
|
||||
ui.append(status_label)
|
||||
|
||||
status_text = mcrfpy.Caption(text="Click or Tab to focus a widget", pos=(60, 555))
|
||||
status_text.fill_color = mcrfpy.Color(200, 200, 200)
|
||||
ui.append(status_text)
|
||||
|
||||
# Store references for status updates
|
||||
demo_state = {
|
||||
'focus_mgr': focus_mgr,
|
||||
'status_text': status_text,
|
||||
'grid': grid_widget,
|
||||
'inputs': [name_input, class_input, notes_input],
|
||||
'icons': [help_icon, settings_icon, inv_icon],
|
||||
}
|
||||
|
||||
# Key handler that routes to focus manager
|
||||
def on_key(key: str, action: str):
|
||||
focus_mgr.handle_key(key, action)
|
||||
|
||||
# Update status display
|
||||
if focus_mgr.focus_index >= 0:
|
||||
widget, _ = focus_mgr.widgets[focus_mgr.focus_index]
|
||||
if widget is grid_widget:
|
||||
status_text.text = f"Grid focused - Player at ({grid_widget.player_x}, {grid_widget.player_y})"
|
||||
elif widget in demo_state['inputs']:
|
||||
idx = demo_state['inputs'].index(widget)
|
||||
labels = ["Name", "Class", "Notes"]
|
||||
status_text.text = f"{labels[idx]} input focused - Text: '{widget.get_text()}'"
|
||||
elif widget in demo_state['icons']:
|
||||
status_text.text = f"Icon focused: {widget.tooltip}"
|
||||
else:
|
||||
status_text.text = "No widget focused"
|
||||
|
||||
# Activate scene first (keypressScene sets handler for CURRENT scene)
|
||||
mcrfpy.setScene("focus_demo")
|
||||
|
||||
# Register key handler for the now-current scene
|
||||
mcrfpy.keypressScene(on_key)
|
||||
|
||||
# Set initial focus
|
||||
focus_mgr.focus(0)
|
||||
|
||||
return demo_state
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Entry Point
|
||||
# =============================================================================
|
||||
|
||||
def run_demo():
|
||||
"""Run the focus system demo."""
|
||||
print("=== Focus System Demo ===")
|
||||
print("Demonstrating Python-level focus management")
|
||||
print()
|
||||
print("Controls:")
|
||||
print(" Tab / Shift+Tab - Cycle between widgets")
|
||||
print(" WASD / Arrows - Move player in grid (when focused)")
|
||||
print(" Type - Enter text in inputs (when focused)")
|
||||
print(" Space / Enter - Activate icons (when focused)")
|
||||
print(" Escape - Close modal dialogs")
|
||||
print(" Click - Focus clicked widget")
|
||||
print()
|
||||
|
||||
demo_state = create_demo_scene()
|
||||
|
||||
# Set up exit timer for headless testing
|
||||
def check_exit(dt):
|
||||
# In headless mode, exit after a short delay
|
||||
# In interactive mode, this won't trigger
|
||||
pass
|
||||
|
||||
# mcrfpy.setTimer("demo_check", check_exit, 100)
|
||||
|
||||
|
||||
# Run if executed directly
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
from mcrfpy import automation
|
||||
|
||||
run_demo()
|
||||
|
||||
# If --screenshot flag, take a screenshot and exit
|
||||
if "--screenshot" in sys.argv or len(sys.argv) > 1:
|
||||
def take_screenshot(dt):
|
||||
automation.screenshot("focus_demo_screenshot.png")
|
||||
print("Screenshot saved: focus_demo_screenshot.png")
|
||||
sys.exit(0)
|
||||
mcrfpy.setTimer("screenshot", take_screenshot, 200)
|
||||
|
|
@ -14,12 +14,15 @@ Three agents:
|
|||
Each agent gets their own screenshot and VLLM query.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
# Add the vllm_demo directory to path for imports
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
import mcrfpy
|
||||
from mcrfpy import automation
|
||||
import sys
|
||||
import requests
|
||||
import base64
|
||||
import os
|
||||
import random
|
||||
|
||||
from action_parser import parse_action
|
||||
|
|
|
|||
436
tests/vllm_demo/4_enhanced_action_demo.py
Normal file
436
tests/vllm_demo/4_enhanced_action_demo.py
Normal file
|
|
@ -0,0 +1,436 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Enhanced Action Demo
|
||||
====================
|
||||
|
||||
Demonstrates the enhanced action economy system:
|
||||
- Free actions (LOOK, SPEAK/ANNOUNCE) vs turn-ending (MOVE, WAIT)
|
||||
- Points of interest targeting for LOOK/MOVE
|
||||
- Speech system with room-wide ANNOUNCE and proximity SPEAK
|
||||
- Multi-tile path continuation with FOV interrupts
|
||||
- Enhanced logging for offline viewer replay
|
||||
|
||||
This implements the turn-based LLM agent orchestration from issue #156.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
# Add the vllm_demo directory to path for imports
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
import mcrfpy
|
||||
from mcrfpy import automation
|
||||
import requests
|
||||
import base64
|
||||
|
||||
from world_graph import (
|
||||
WorldGraph, Room, Door, WorldObject, Direction, AgentInfo,
|
||||
create_two_room_scenario, create_button_door_scenario
|
||||
)
|
||||
from action_parser import parse_action
|
||||
from enhanced_executor import EnhancedExecutor
|
||||
from enhanced_orchestrator import EnhancedOrchestrator, EnhancedSimulationLog
|
||||
|
||||
# Configuration
|
||||
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
|
||||
SCREENSHOT_DIR = "/tmp/vllm_enhanced_demo"
|
||||
LOG_PATH = "/tmp/vllm_enhanced_demo/simulation_log.json"
|
||||
MAX_TURNS = 3
|
||||
|
||||
# Sprites
|
||||
FLOOR_TILE = 0
|
||||
WALL_TILE = 40
|
||||
WIZARD_SPRITE = 84
|
||||
KNIGHT_SPRITE = 96
|
||||
RAT_SPRITE = 123
|
||||
|
||||
|
||||
class Agent:
|
||||
"""Agent with WorldGraph integration."""
|
||||
|
||||
def __init__(self, name: str, display_name: str, entity, world: WorldGraph):
|
||||
self.name = name
|
||||
self.display_name = display_name
|
||||
self.entity = entity
|
||||
self.world = world
|
||||
self.message_history = []
|
||||
|
||||
@property
|
||||
def pos(self) -> tuple:
|
||||
return (int(self.entity.pos[0]), int(self.entity.pos[1]))
|
||||
|
||||
@property
|
||||
def current_room(self) -> str:
|
||||
room = self.world.room_at(*self.pos)
|
||||
return room.name if room else None
|
||||
|
||||
def get_context(self, visible_agents: list) -> dict:
|
||||
"""Build context for LLM query."""
|
||||
room_name = self.current_room
|
||||
agent_infos = [
|
||||
AgentInfo(
|
||||
name=a.name,
|
||||
display_name=a.display_name,
|
||||
position=a.pos,
|
||||
is_player=(a.name == self.name)
|
||||
)
|
||||
for a in visible_agents
|
||||
]
|
||||
return {
|
||||
"location": self.world.describe_room(room_name, agent_infos, self.name),
|
||||
"available_actions": self.world.get_available_actions(room_name),
|
||||
"recent_messages": self.message_history[-5:],
|
||||
}
|
||||
|
||||
|
||||
def file_to_base64(path: str) -> str:
|
||||
"""Convert file to base64 string."""
|
||||
with open(path, 'rb') as f:
|
||||
return base64.b64encode(f.read()).decode('utf-8')
|
||||
|
||||
|
||||
def llm_query(agent, screenshot_path: str, context: dict) -> str:
|
||||
"""
|
||||
Query VLLM for agent action with enhanced context.
|
||||
|
||||
Includes points of interest, action economy hints, error feedback,
|
||||
and conversation history.
|
||||
"""
|
||||
system_prompt = f"""You are {agent.display_name} exploring a dungeon.
|
||||
You receive visual and text information about your surroundings.
|
||||
|
||||
ACTION ECONOMY:
|
||||
- LOOK <target>: Free action. Examine something, then choose another action.
|
||||
- SPEAK "<message>" or ANNOUNCE "<message>": Free action (once per turn). Then choose another action.
|
||||
- GO <direction>: Ends your turn. Move one tile in that direction (NORTH/SOUTH/EAST/WEST).
|
||||
- TAKE <item>: Ends your turn. Pick up an item you are standing next to.
|
||||
- WAIT: Ends your turn without moving.
|
||||
|
||||
IMPORTANT: You can only TAKE items that are adjacent to you (1 tile away). If something is far away, GO towards it first.
|
||||
|
||||
You can LOOK or SPEAK, then still MOVE in the same turn.
|
||||
Always end your final response with: Action: <YOUR_ACTION>"""
|
||||
|
||||
# Build enhanced prompt
|
||||
parts = [context["location"]]
|
||||
|
||||
# Add received messages
|
||||
if context.get("messages"):
|
||||
parts.append("\nMessages received this turn:")
|
||||
for msg in context["messages"]:
|
||||
sender = msg.get("sender", "someone")
|
||||
content = msg.get("content", "")
|
||||
parts.append(f' {sender} says: "{content}"')
|
||||
|
||||
# Add points of interest
|
||||
if context.get("poi_prompt"):
|
||||
parts.append(f"\n{context['poi_prompt']}")
|
||||
|
||||
# Add available actions
|
||||
actions_str = ", ".join(context.get("available_actions", []))
|
||||
parts.append(f"\nAvailable actions: {actions_str}")
|
||||
|
||||
# Add action economy hint
|
||||
if context.get("has_spoken"):
|
||||
parts.append("\n[You have already spoken this turn - you can still MOVE or WAIT]")
|
||||
|
||||
# Add error feedback from last failed action
|
||||
if context.get("last_error"):
|
||||
parts.append(f"\n[ERROR: {context['last_error']}]")
|
||||
parts.append("[Your last action failed. Please try a different action.]")
|
||||
|
||||
# Add conversation history from this turn
|
||||
if context.get("conversation_history"):
|
||||
parts.append("\n[Previous attempts this turn:")
|
||||
for exch in context["conversation_history"]:
|
||||
action_str = f"{exch.get('action_type', '?')} {exch.get('action_args', '')}"
|
||||
if exch.get("error"):
|
||||
parts.append(f" - You tried: {action_str} -> FAILED: {exch['error']}")
|
||||
else:
|
||||
parts.append(f" - You did: {action_str}")
|
||||
parts.append("]")
|
||||
|
||||
parts.append("\n[Screenshot attached showing your current view]")
|
||||
parts.append("\nWhat do you do? Brief reasoning (1-2 sentences), then Action: <action>")
|
||||
|
||||
user_prompt = "\n".join(parts)
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": user_prompt},
|
||||
{"type": "image_url", "image_url": {
|
||||
"url": "data:image/png;base64," + file_to_base64(screenshot_path)
|
||||
}}
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
try:
|
||||
resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60)
|
||||
data = resp.json()
|
||||
if "error" in data:
|
||||
return f"[VLLM Error: {data['error']}]"
|
||||
return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
|
||||
except Exception as e:
|
||||
return f"[Connection Error: {e}]"
|
||||
|
||||
|
||||
def setup_scene(world: WorldGraph):
|
||||
"""Create McRogueFace scene from WorldGraph."""
|
||||
mcrfpy.createScene("enhanced_demo")
|
||||
mcrfpy.setScene("enhanced_demo")
|
||||
ui = mcrfpy.sceneUI("enhanced_demo")
|
||||
|
||||
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
|
||||
|
||||
grid = mcrfpy.Grid(
|
||||
grid_size=(25, 15),
|
||||
texture=texture,
|
||||
pos=(5, 5),
|
||||
size=(1014, 700)
|
||||
)
|
||||
grid.fill_color = mcrfpy.Color(20, 20, 30)
|
||||
grid.zoom = 2.0
|
||||
ui.append(grid)
|
||||
|
||||
# Initialize all as walls
|
||||
for x in range(25):
|
||||
for y in range(15):
|
||||
p = grid.at(x, y)
|
||||
p.tilesprite = WALL_TILE
|
||||
p.walkable = False
|
||||
p.transparent = False
|
||||
|
||||
# Carve rooms from WorldGraph
|
||||
for room in world.rooms.values():
|
||||
for rx in range(room.x, room.x + room.width):
|
||||
for ry in range(room.y, room.y + room.height):
|
||||
if 0 <= rx < 25 and 0 <= ry < 15:
|
||||
p = grid.at(rx, ry)
|
||||
p.tilesprite = FLOOR_TILE
|
||||
p.walkable = True
|
||||
p.transparent = True
|
||||
|
||||
# Place doors
|
||||
for door in world.doors:
|
||||
dx, dy = door.position
|
||||
if 0 <= dx < 25 and 0 <= dy < 15:
|
||||
p = grid.at(dx, dy)
|
||||
p.tilesprite = FLOOR_TILE
|
||||
p.walkable = not door.locked
|
||||
p.transparent = True
|
||||
|
||||
# FOV layer
|
||||
fov_layer = grid.add_layer('color', z_index=10)
|
||||
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
|
||||
|
||||
return grid, fov_layer, texture
|
||||
|
||||
|
||||
def create_agents(grid, world: WorldGraph, texture) -> list:
|
||||
"""Create agents in their starting rooms."""
|
||||
agents = []
|
||||
|
||||
# Wizard in guard_room (left)
|
||||
room_a = world.rooms["guard_room"]
|
||||
wizard = mcrfpy.Entity(
|
||||
grid_pos=room_a.center,
|
||||
texture=texture,
|
||||
sprite_index=WIZARD_SPRITE
|
||||
)
|
||||
wizard.name = "wizard"
|
||||
grid.entities.append(wizard)
|
||||
agents.append(Agent("Wizard", "a wizard", wizard, world))
|
||||
|
||||
# Knight in armory (right)
|
||||
room_b = world.rooms["armory"]
|
||||
knight = mcrfpy.Entity(
|
||||
grid_pos=room_b.center,
|
||||
texture=texture,
|
||||
sprite_index=KNIGHT_SPRITE
|
||||
)
|
||||
knight.name = "knight"
|
||||
grid.entities.append(knight)
|
||||
agents.append(Agent("Knight", "a knight", knight, world))
|
||||
|
||||
return agents
|
||||
|
||||
|
||||
def add_rat(grid, world: WorldGraph, texture, position: tuple):
|
||||
"""Add a rat entity at the specified position."""
|
||||
rat = mcrfpy.Entity(
|
||||
grid_pos=position,
|
||||
texture=texture,
|
||||
sprite_index=RAT_SPRITE
|
||||
)
|
||||
rat.name = "rat"
|
||||
grid.entities.append(rat)
|
||||
return rat
|
||||
|
||||
|
||||
def run_demo():
|
||||
"""Run enhanced action demo."""
|
||||
print("=" * 70)
|
||||
print("Enhanced Action Demo")
|
||||
print("=" * 70)
|
||||
print("""
|
||||
Features demonstrated:
|
||||
- LOOK as free action (doesn't end turn)
|
||||
- SPEAK/ANNOUNCE as free action (once per turn)
|
||||
- Points of interest targeting
|
||||
- Enhanced logging for offline viewer
|
||||
""")
|
||||
|
||||
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
|
||||
|
||||
# Create world
|
||||
print("Creating world...")
|
||||
world = create_two_room_scenario()
|
||||
print(f" Rooms: {list(world.rooms.keys())}")
|
||||
print(f" Objects: {list(world.objects.keys())}")
|
||||
|
||||
# Setup scene
|
||||
print("\nSetting up scene...")
|
||||
grid, fov_layer, texture = setup_scene(world)
|
||||
|
||||
# Create agents
|
||||
print("\nCreating agents...")
|
||||
agents = create_agents(grid, world, texture)
|
||||
|
||||
# Add a rat near the door for interest
|
||||
rat = add_rat(grid, world, texture, (9, 4))
|
||||
print(f" Added rat at (9, 4)")
|
||||
|
||||
for agent in agents:
|
||||
print(f" {agent.name} at {agent.pos} in {agent.current_room}")
|
||||
|
||||
# Create enhanced orchestrator
|
||||
print("\nInitializing enhanced orchestrator...")
|
||||
orchestrator = EnhancedOrchestrator(
|
||||
grid=grid,
|
||||
fov_layer=fov_layer,
|
||||
world=world,
|
||||
agents=agents,
|
||||
screenshot_dir=SCREENSHOT_DIR,
|
||||
llm_query_fn=llm_query
|
||||
)
|
||||
|
||||
# Run simulation
|
||||
print(f"\nRunning simulation ({MAX_TURNS} turns)...")
|
||||
log = orchestrator.run_simulation(max_turns=MAX_TURNS)
|
||||
|
||||
# Save enhanced log
|
||||
log.save(LOG_PATH)
|
||||
|
||||
# Print summary
|
||||
print("\n" + "=" * 70)
|
||||
print("SIMULATION SUMMARY")
|
||||
print("=" * 70)
|
||||
|
||||
for turn in range(1, orchestrator.turn_number + 1):
|
||||
print(log.get_turn_summary(turn))
|
||||
|
||||
# Print speech log
|
||||
if log.speech_log:
|
||||
print("\n" + "-" * 40)
|
||||
print("SPEECH LOG")
|
||||
print("-" * 40)
|
||||
for entry in log.speech_log:
|
||||
print(f" Turn {entry['turn']}: {entry['speaker']} {entry['type']}s: \"{entry['content'][:50]}...\"")
|
||||
if entry['recipients']:
|
||||
print(f" -> Heard by: {', '.join(entry['recipients'])}")
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("Demo Complete")
|
||||
print("=" * 70)
|
||||
print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/")
|
||||
print(f"Simulation log saved to: {LOG_PATH}")
|
||||
print("\nLog structure (for offline viewer):")
|
||||
print(" - metadata: simulation info")
|
||||
print(" - steps[]: per-agent-turn records with:")
|
||||
print(" - screenshot_path, position, room")
|
||||
print(" - llm_prompt_user, llm_response")
|
||||
print(" - free_actions[] (LOOK, SPEAK)")
|
||||
print(" - final_action (MOVE, WAIT)")
|
||||
print(" - speech_log[]: all speech events")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def replay_log(log_path: str):
|
||||
"""
|
||||
Replay a simulation from log file.
|
||||
|
||||
This is a text-based preview of what the offline viewer would show.
|
||||
"""
|
||||
print(f"Loading simulation from: {log_path}")
|
||||
|
||||
try:
|
||||
log = EnhancedSimulationLog.load(log_path)
|
||||
except FileNotFoundError:
|
||||
print(f"Log file not found: {log_path}")
|
||||
return
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("SIMULATION REPLAY")
|
||||
print("=" * 70)
|
||||
print(f"Turns: {log.metadata.get('total_turns', '?')}")
|
||||
print(f"Agents: {', '.join(log.metadata.get('agent_names', []))}")
|
||||
print(f"Rooms: {', '.join(log.metadata.get('world_rooms', []))}")
|
||||
|
||||
for step in log.steps:
|
||||
print(f"\n{'='*40}")
|
||||
print(f"Turn {step.turn}: {step.agent_id}")
|
||||
print(f"{'='*40}")
|
||||
print(f"Position: {step.position_start} -> {step.position_end}")
|
||||
print(f"Room: {step.room}")
|
||||
|
||||
if step.pending_messages:
|
||||
print(f"\nMessages received:")
|
||||
for msg in step.pending_messages:
|
||||
print(f" {msg.get('sender')}: \"{msg.get('content', '')[:40]}...\"")
|
||||
|
||||
if step.llm_was_queried:
|
||||
print(f"\nLLM Response (truncated):")
|
||||
print(f" {step.llm_response[:200]}...")
|
||||
else:
|
||||
print(f"\n[Path continuation - no LLM query]")
|
||||
|
||||
if step.free_actions:
|
||||
print(f"\nFree actions:")
|
||||
for fa in step.free_actions:
|
||||
print(f" - {fa['action_type']}: {fa.get('args', ())}")
|
||||
|
||||
status = "OK" if step.final_action_success else "FAIL"
|
||||
print(f"\nFinal: {step.final_action_type} {step.final_action_args} [{status}]")
|
||||
print(f" {step.final_action_message}")
|
||||
|
||||
# Speech summary
|
||||
if log.speech_log:
|
||||
print("\n" + "=" * 40)
|
||||
print("ALL SPEECH")
|
||||
print("=" * 40)
|
||||
for entry in log.speech_log:
|
||||
print(f"Turn {entry['turn']}: {entry['speaker']} -> {entry['recipients']}")
|
||||
print(f" \"{entry['content']}\"")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check for replay mode
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "--replay":
|
||||
log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH
|
||||
replay_log(log_file)
|
||||
sys.exit(0)
|
||||
|
||||
# Normal execution
|
||||
try:
|
||||
success = run_demo()
|
||||
print("\nPASS" if success else "\nFAIL")
|
||||
sys.exit(0 if success else 1)
|
||||
except Exception as e:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
152
tests/vllm_demo/OFFLINE_VIEWER_SPEC.md
Normal file
152
tests/vllm_demo/OFFLINE_VIEWER_SPEC.md
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
# Offline Viewer Specification
|
||||
|
||||
**Status**: Planned (issue #154)
|
||||
**Priority**: After core simulation features are stable
|
||||
|
||||
## Overview
|
||||
|
||||
The Offline Viewer allows users to replay stored simulation logs in McRogueFace, stepping through turn-by-turn to review:
|
||||
- Each agent's perspective (FOV, camera position)
|
||||
- LLM chain-of-thought reasoning
|
||||
- Actions taken and their results
|
||||
- Speech between agents
|
||||
|
||||
## Log Format
|
||||
|
||||
Simulation logs are stored as JSON with this structure:
|
||||
|
||||
```json
|
||||
{
|
||||
"metadata": {
|
||||
"total_turns": 5,
|
||||
"num_agents": 2,
|
||||
"agent_names": ["Wizard", "Knight"],
|
||||
"timestamp_start": "2025-01-15T10:30:00",
|
||||
"timestamp_end": "2025-01-15T10:32:45",
|
||||
"world_rooms": ["guard_room", "armory"],
|
||||
"screenshot_dir": "/tmp/vllm_enhanced_demo"
|
||||
},
|
||||
"steps": [
|
||||
{
|
||||
"turn": 1,
|
||||
"agent_id": "Wizard",
|
||||
"timestamp": "2025-01-15T10:30:15",
|
||||
|
||||
"position_start": [5, 4],
|
||||
"position_end": [6, 4],
|
||||
"room": "guard_room",
|
||||
|
||||
"visible_entities": ["rat_123", "knight_456"],
|
||||
"visible_tiles": 42,
|
||||
"points_of_interest": [
|
||||
{"name": "door", "direction": "east", "distance": 4}
|
||||
],
|
||||
|
||||
"location_description": "You are in the guard room...",
|
||||
"available_actions": ["GO EAST", "LOOK", "WAIT"],
|
||||
"pending_messages": [],
|
||||
"poi_prompt": "Points of interest:\n - a door to the armory (east)",
|
||||
|
||||
"screenshot_path": "/tmp/.../turn1_wizard.png",
|
||||
|
||||
"llm_prompt_system": "You are a wizard...",
|
||||
"llm_prompt_user": "You are in the guard room...",
|
||||
"llm_response": "I see a door to the east. I should explore. Action: GO EAST",
|
||||
"llm_was_queried": true,
|
||||
|
||||
"free_actions": [
|
||||
{"action_type": "LOOK", "args": ["DOOR"], "result": {"description": "A wooden door..."}}
|
||||
],
|
||||
|
||||
"final_action_type": "GO",
|
||||
"final_action_args": ["EAST"],
|
||||
"final_action_success": true,
|
||||
"final_action_message": "Moved east to (6, 4)",
|
||||
|
||||
"path_taken": [[5, 4], [6, 4]],
|
||||
"path_remaining": 0
|
||||
}
|
||||
],
|
||||
"speech_log": [
|
||||
{
|
||||
"turn": 2,
|
||||
"speaker": "Wizard",
|
||||
"type": "announce",
|
||||
"content": "Hello, is anyone there?",
|
||||
"recipients": ["Knight"]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Viewer Features (Planned)
|
||||
|
||||
### Core Features
|
||||
|
||||
1. **Turn Navigation**
|
||||
- Step forward/backward through turns
|
||||
- Jump to specific turn number
|
||||
- Auto-play at configurable speed
|
||||
|
||||
2. **Agent Perspective**
|
||||
- Reconstruct agent's FOV from stored data
|
||||
- Center camera on current agent
|
||||
- Show visible entities and tiles
|
||||
|
||||
3. **LLM Review Panel**
|
||||
- Display system prompt
|
||||
- Display user prompt (context)
|
||||
- Display LLM response
|
||||
- Highlight parsed action
|
||||
|
||||
4. **Action Log**
|
||||
- Show free actions (LOOK, SPEAK)
|
||||
- Show final action and result
|
||||
- Color-code success/failure
|
||||
|
||||
5. **Speech History**
|
||||
- Timeline of all speech events
|
||||
- Filter by agent
|
||||
- Show recipients
|
||||
|
||||
### Implementation Notes
|
||||
|
||||
The viewer should:
|
||||
- Load screenshots from `screenshot_path` (if available)
|
||||
- OR reconstruct scene from WorldGraph + step data
|
||||
- Support keyboard navigation (arrow keys)
|
||||
- Display agent state in sidebar
|
||||
|
||||
### UI Layout (Suggested)
|
||||
|
||||
```
|
||||
+----------------------------------+------------------+
|
||||
| | Turn: 3/10 |
|
||||
| Main Viewport | Agent: Wizard |
|
||||
| (Agent's Perspective) | Room: armory |
|
||||
| +------------------+
|
||||
| | LLM Response: |
|
||||
| | "I see a rat |
|
||||
| | to the east. |
|
||||
| | Action: LOOK |
|
||||
| | AT RAT" |
|
||||
+----------------------------------+------------------+
|
||||
| < Prev | Turn 3 | Next > | Actions: |
|
||||
| [Agent: Wizard v] | - LOOK AT RAT |
|
||||
| | - GO EAST [OK] |
|
||||
+----------------------------------+------------------+
|
||||
```
|
||||
|
||||
## Files
|
||||
|
||||
- `enhanced_orchestrator.py` - Generates `EnhancedSimulationLog`
|
||||
- `4_enhanced_action_demo.py` - Demo with `--replay` mode for text preview
|
||||
- Logs stored in `/tmp/vllm_enhanced_demo/simulation_log.json`
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
- Animated path replay (smooth entity movement)
|
||||
- Side-by-side multi-agent view
|
||||
- Diff view comparing agent perceptions
|
||||
- Export to video/GIF
|
||||
- Integration with annotation tools for research
|
||||
302
tests/vllm_demo/action_economy.py
Normal file
302
tests/vllm_demo/action_economy.py
Normal file
|
|
@ -0,0 +1,302 @@
|
|||
"""
|
||||
Action Economy System
|
||||
=====================
|
||||
|
||||
Defines which actions consume turns and which are free.
|
||||
Manages multi-tile pathing with FOV interruption.
|
||||
|
||||
Action Categories:
|
||||
- FREE: LOOK, SPEAK, ANNOUNCE (don't end turn)
|
||||
- FULL: MOVE, WAIT (end turn)
|
||||
|
||||
Constraints:
|
||||
- Only ONE speech action per turn
|
||||
- LOOK provides description and prompts for another action
|
||||
- Multi-tile paths continue without LLM until FOV changes
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Tuple, Optional, Set, Dict, Any
|
||||
from enum import Enum
|
||||
|
||||
from action_parser import Action, ActionType
|
||||
|
||||
|
||||
class TurnCost(Enum):
|
||||
"""How much of a turn an action consumes."""
|
||||
FREE = "free" # Doesn't end turn
|
||||
FULL = "full" # Ends turn
|
||||
|
||||
|
||||
# Action cost mapping
|
||||
ACTION_COSTS = {
|
||||
ActionType.LOOK: TurnCost.FREE,
|
||||
ActionType.SPEAK: TurnCost.FREE,
|
||||
ActionType.ANNOUNCE: TurnCost.FREE,
|
||||
ActionType.GO: TurnCost.FULL,
|
||||
ActionType.WAIT: TurnCost.FULL,
|
||||
ActionType.TAKE: TurnCost.FULL,
|
||||
ActionType.DROP: TurnCost.FULL,
|
||||
ActionType.PUSH: TurnCost.FULL,
|
||||
ActionType.USE: TurnCost.FULL,
|
||||
ActionType.OPEN: TurnCost.FULL,
|
||||
ActionType.CLOSE: TurnCost.FULL,
|
||||
ActionType.INVALID: TurnCost.FULL, # Invalid action ends turn
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TurnState:
|
||||
"""
|
||||
Tracks state within a single turn.
|
||||
|
||||
Used to enforce constraints like "only one speech per turn"
|
||||
and track free actions taken before turn-ending action.
|
||||
"""
|
||||
has_spoken: bool = False
|
||||
free_actions: List[Dict[str, Any]] = field(default_factory=list)
|
||||
turn_ended: bool = False
|
||||
|
||||
def can_speak(self) -> bool:
|
||||
"""Check if agent can still speak this turn."""
|
||||
return not self.has_spoken
|
||||
|
||||
def record_speech(self):
|
||||
"""Record that agent has spoken this turn."""
|
||||
self.has_spoken = True
|
||||
|
||||
def record_free_action(self, action_type: str, details: Dict[str, Any]):
|
||||
"""Record a free action for logging."""
|
||||
self.free_actions.append({
|
||||
"type": action_type,
|
||||
**details
|
||||
})
|
||||
|
||||
def end_turn(self):
|
||||
"""Mark turn as ended."""
|
||||
self.turn_ended = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class PathState:
|
||||
"""
|
||||
Tracks multi-tile movement path for an agent.
|
||||
|
||||
When an agent decides to move to a distant location,
|
||||
we store the path and continue moving without LLM calls
|
||||
until the path completes or FOV changes.
|
||||
"""
|
||||
path: List[Tuple[int, int]] = field(default_factory=list)
|
||||
current_index: int = 0
|
||||
destination_description: str = "" # "the armory", "the door"
|
||||
|
||||
# FOV state when path was planned
|
||||
visible_entities_at_start: Set[str] = field(default_factory=set)
|
||||
|
||||
@property
|
||||
def has_path(self) -> bool:
|
||||
"""Check if there's an active path."""
|
||||
return len(self.path) > self.current_index
|
||||
|
||||
@property
|
||||
def next_tile(self) -> Optional[Tuple[int, int]]:
|
||||
"""Get next tile in path, or None if path complete."""
|
||||
if self.has_path:
|
||||
return self.path[self.current_index]
|
||||
return None
|
||||
|
||||
@property
|
||||
def remaining_tiles(self) -> int:
|
||||
"""Number of tiles left in path."""
|
||||
return max(0, len(self.path) - self.current_index)
|
||||
|
||||
def advance(self):
|
||||
"""Move to next tile in path."""
|
||||
if self.has_path:
|
||||
self.current_index += 1
|
||||
|
||||
def clear(self):
|
||||
"""Clear the current path."""
|
||||
self.path = []
|
||||
self.current_index = 0
|
||||
self.destination_description = ""
|
||||
self.visible_entities_at_start = set()
|
||||
|
||||
def should_interrupt(self, current_visible_entities: Set[str]) -> bool:
|
||||
"""
|
||||
Check if path should be interrupted due to FOV change.
|
||||
|
||||
Returns True if a NEW entity has entered the agent's FOV
|
||||
since the path was planned.
|
||||
"""
|
||||
new_entities = current_visible_entities - self.visible_entities_at_start
|
||||
return len(new_entities) > 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class PointOfInterest:
|
||||
"""
|
||||
A targetable object/location for LOOK/MOVE actions.
|
||||
|
||||
Listed in LLM prompts to guide valid targeting.
|
||||
"""
|
||||
name: str # Short name: "door", "rat", "button"
|
||||
display_name: str # Full description: "a wooden door to the east"
|
||||
position: Tuple[int, int] # Tile coordinates
|
||||
direction: str # Cardinal direction from agent: "north", "east"
|
||||
distance: int # Manhattan distance from agent
|
||||
can_look: bool = True # Can be examined with LOOK
|
||||
can_move_to: bool = False # Can be targeted with GO TO
|
||||
entity_id: Optional[str] = None # Entity ID if this is an entity
|
||||
|
||||
|
||||
def get_action_cost(action: Action) -> TurnCost:
|
||||
"""Get the turn cost for an action."""
|
||||
return ACTION_COSTS.get(action.type, TurnCost.FULL)
|
||||
|
||||
|
||||
def get_direction_name(from_pos: Tuple[int, int], to_pos: Tuple[int, int]) -> str:
|
||||
"""Get cardinal direction name from one position to another."""
|
||||
dx = to_pos[0] - from_pos[0]
|
||||
dy = to_pos[1] - from_pos[1]
|
||||
|
||||
if abs(dx) > abs(dy):
|
||||
return "east" if dx > 0 else "west"
|
||||
elif abs(dy) > abs(dx):
|
||||
return "south" if dy > 0 else "north"
|
||||
else:
|
||||
# Diagonal
|
||||
ns = "south" if dy > 0 else "north"
|
||||
ew = "east" if dx > 0 else "west"
|
||||
return f"{ns}-{ew}"
|
||||
|
||||
|
||||
def manhattan_distance(a: Tuple[int, int], b: Tuple[int, int]) -> int:
|
||||
"""Calculate Manhattan distance between two points."""
|
||||
return abs(a[0] - b[0]) + abs(a[1] - b[1])
|
||||
|
||||
|
||||
class PointOfInterestCollector:
|
||||
"""
|
||||
Collects points of interest visible to an agent.
|
||||
|
||||
Used to populate LLM prompts with valid LOOK/MOVE targets.
|
||||
"""
|
||||
|
||||
def __init__(self, grid, agent_pos: Tuple[int, int]):
|
||||
self.grid = grid
|
||||
self.agent_pos = agent_pos
|
||||
self.points: List[PointOfInterest] = []
|
||||
|
||||
def collect_from_fov(self, world_graph=None) -> List[PointOfInterest]:
|
||||
"""
|
||||
Collect all points of interest visible in current FOV.
|
||||
|
||||
Examines:
|
||||
- Entities (other agents, NPCs, items)
|
||||
- Doors/exits
|
||||
- Interactive objects (buttons, chests)
|
||||
- Notable tiles (walls with features)
|
||||
"""
|
||||
self.points = []
|
||||
|
||||
# Collect entities
|
||||
for entity in self.grid.entities:
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
if (ex, ey) == self.agent_pos:
|
||||
continue # Skip self
|
||||
|
||||
if self.grid.is_in_fov(ex, ey):
|
||||
direction = get_direction_name(self.agent_pos, (ex, ey))
|
||||
distance = manhattan_distance(self.agent_pos, (ex, ey))
|
||||
|
||||
# Try to get entity name/description
|
||||
entity_name = getattr(entity, 'name', None) or f"creature"
|
||||
entity_id = getattr(entity, 'id', None) or str(id(entity))
|
||||
|
||||
self.points.append(PointOfInterest(
|
||||
name=entity_name,
|
||||
display_name=f"a {entity_name} to the {direction}",
|
||||
position=(ex, ey),
|
||||
direction=direction,
|
||||
distance=distance,
|
||||
can_look=True,
|
||||
can_move_to=False, # Can't move onto entities
|
||||
entity_id=entity_id
|
||||
))
|
||||
|
||||
# Collect from WorldGraph if provided
|
||||
if world_graph:
|
||||
self._collect_from_world_graph(world_graph)
|
||||
|
||||
# Sort by distance
|
||||
self.points.sort(key=lambda p: p.distance)
|
||||
|
||||
return self.points
|
||||
|
||||
def _collect_from_world_graph(self, world):
|
||||
"""Collect doors and objects from WorldGraph."""
|
||||
agent_room = world.room_at(*self.agent_pos)
|
||||
if not agent_room:
|
||||
return
|
||||
|
||||
# Doors
|
||||
for door in world.get_exits(agent_room.name):
|
||||
dx, dy = door.position
|
||||
if self.grid.is_in_fov(dx, dy):
|
||||
direction = get_direction_name(self.agent_pos, (dx, dy))
|
||||
distance = manhattan_distance(self.agent_pos, (dx, dy))
|
||||
|
||||
# Get destination room name
|
||||
if door.room_a == agent_room.name:
|
||||
dest = world.rooms.get(door.room_b)
|
||||
else:
|
||||
dest = world.rooms.get(door.room_a)
|
||||
dest_name = dest.display_name if dest else "unknown"
|
||||
|
||||
lock_str = " (locked)" if door.locked else ""
|
||||
|
||||
self.points.append(PointOfInterest(
|
||||
name="door",
|
||||
display_name=f"a door to {dest_name}{lock_str} ({direction})",
|
||||
position=(dx, dy),
|
||||
direction=direction,
|
||||
distance=distance,
|
||||
can_look=True,
|
||||
can_move_to=not door.locked
|
||||
))
|
||||
|
||||
# Objects in room
|
||||
for obj in world.get_objects_in_room(agent_room.name):
|
||||
ox, oy = obj.position
|
||||
if self.grid.is_in_fov(ox, oy):
|
||||
direction = get_direction_name(self.agent_pos, (ox, oy))
|
||||
distance = manhattan_distance(self.agent_pos, (ox, oy))
|
||||
|
||||
self.points.append(PointOfInterest(
|
||||
name=obj.name,
|
||||
display_name=f"{obj.display_name} ({direction})",
|
||||
position=(ox, oy),
|
||||
direction=direction,
|
||||
distance=distance,
|
||||
can_look=True,
|
||||
can_move_to="pressable" not in obj.affordances # Can walk to items
|
||||
))
|
||||
|
||||
def format_for_prompt(self) -> str:
|
||||
"""Format points of interest for inclusion in LLM prompt."""
|
||||
if not self.points:
|
||||
return "No notable objects in view."
|
||||
|
||||
lines = ["Points of interest:"]
|
||||
for poi in self.points:
|
||||
actions = []
|
||||
if poi.can_look:
|
||||
actions.append(f"LOOK AT {poi.name.upper()}")
|
||||
if poi.can_move_to:
|
||||
actions.append(f"GO TO {poi.name.upper()}")
|
||||
|
||||
action_str = ", ".join(actions) if actions else "observe only"
|
||||
lines.append(f" - {poi.display_name}: {action_str}")
|
||||
|
||||
return "\n".join(lines)
|
||||
731
tests/vllm_demo/enhanced_executor.py
Normal file
731
tests/vllm_demo/enhanced_executor.py
Normal file
|
|
@ -0,0 +1,731 @@
|
|||
"""
|
||||
Enhanced Action Executor
|
||||
========================
|
||||
|
||||
Extends ActionExecutor with:
|
||||
- LOOK action with detailed descriptions
|
||||
- SPEAK/ANNOUNCE execution with range checking
|
||||
- Multi-tile path planning
|
||||
- Free action vs turn-ending action handling
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, List, Tuple, Dict, Any, Set
|
||||
from action_parser import Action, ActionType
|
||||
from action_executor import ActionResult
|
||||
from action_economy import (
|
||||
TurnState, PathState, TurnCost, get_action_cost,
|
||||
manhattan_distance, get_direction_name
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TakeResult:
|
||||
"""Result of a TAKE action."""
|
||||
success: bool
|
||||
message: str
|
||||
item_name: str
|
||||
item_position: Optional[Tuple[int, int]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookResult:
|
||||
"""Result of a LOOK action."""
|
||||
success: bool
|
||||
description: str
|
||||
target_name: str
|
||||
target_position: Optional[Tuple[int, int]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class SpeechResult:
|
||||
"""Result of a SPEAK/ANNOUNCE action."""
|
||||
success: bool
|
||||
message: str
|
||||
recipients: List[str] # Names of agents who received the message
|
||||
speech_type: str # "announce" or "speak"
|
||||
content: str # What was said
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
"""A message received by an agent."""
|
||||
sender: str
|
||||
content: str
|
||||
speech_type: str # "announce" or "speak"
|
||||
turn: int
|
||||
distance: Optional[int] = None # For SPEAK, how far away sender was
|
||||
|
||||
|
||||
class EnhancedExecutor:
|
||||
"""
|
||||
Enhanced action executor with LOOK, SPEAK, and multi-tile support.
|
||||
"""
|
||||
|
||||
# Direction vectors for movement
|
||||
DIRECTION_VECTORS = {
|
||||
'NORTH': (0, -1),
|
||||
'SOUTH': (0, 1),
|
||||
'EAST': (1, 0),
|
||||
'WEST': (-1, 0),
|
||||
}
|
||||
|
||||
# SPEAK range (Manhattan distance)
|
||||
SPEAK_RANGE = 4
|
||||
|
||||
def __init__(self, grid, world_graph=None):
|
||||
"""
|
||||
Initialize executor.
|
||||
|
||||
Args:
|
||||
grid: mcrfpy.Grid instance
|
||||
world_graph: Optional WorldGraph for detailed descriptions
|
||||
"""
|
||||
self.grid = grid
|
||||
self.world = world_graph
|
||||
|
||||
# Agent path states (agent_name -> PathState)
|
||||
self.path_states: Dict[str, PathState] = {}
|
||||
|
||||
# Speech channel for message delivery
|
||||
self.pending_messages: Dict[str, List[Message]] = {} # agent_name -> messages
|
||||
|
||||
def get_path_state(self, agent_name: str) -> PathState:
|
||||
"""Get or create path state for an agent."""
|
||||
if agent_name not in self.path_states:
|
||||
self.path_states[agent_name] = PathState()
|
||||
return self.path_states[agent_name]
|
||||
|
||||
def get_pending_messages(self, agent_name: str) -> List[Message]:
|
||||
"""Get and clear pending messages for an agent."""
|
||||
messages = self.pending_messages.get(agent_name, [])
|
||||
self.pending_messages[agent_name] = []
|
||||
return messages
|
||||
|
||||
# =========================================================================
|
||||
# LOOK Action
|
||||
# =========================================================================
|
||||
|
||||
def execute_look(self, agent, action: Action) -> LookResult:
|
||||
"""
|
||||
Execute LOOK action - examine a tile or entity.
|
||||
|
||||
Args:
|
||||
agent: Agent performing the look
|
||||
action: Parsed LOOK action with optional target
|
||||
|
||||
Returns:
|
||||
LookResult with detailed description
|
||||
"""
|
||||
target = action.args[0] if action.args and action.args[0] else None
|
||||
|
||||
if target is None:
|
||||
# General look around
|
||||
return self._look_around(agent)
|
||||
else:
|
||||
# Look at specific target
|
||||
return self._look_at_target(agent, target.upper())
|
||||
|
||||
def _look_around(self, agent) -> LookResult:
|
||||
"""Describe the general surroundings."""
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
|
||||
descriptions = []
|
||||
|
||||
# Describe current room
|
||||
if self.world:
|
||||
room = self.world.room_at(ax, ay)
|
||||
if room:
|
||||
descriptions.append(f"You are in {room.display_name}.")
|
||||
if room.description_template and room.properties:
|
||||
try:
|
||||
desc = room.description_template.format(**room.properties)
|
||||
descriptions.append(desc)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Count visible entities
|
||||
visible_count = 0
|
||||
for entity in self.grid.entities:
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
if (ex, ey) != (ax, ay) and self.grid.is_in_fov(ex, ey):
|
||||
visible_count += 1
|
||||
|
||||
if visible_count > 0:
|
||||
descriptions.append(f"You can see {visible_count} other creature(s) nearby.")
|
||||
|
||||
# Describe nearby walls/openings
|
||||
wall_dirs = []
|
||||
open_dirs = []
|
||||
for direction, (dx, dy) in self.DIRECTION_VECTORS.items():
|
||||
nx, ny = ax + dx, ay + dy
|
||||
if 0 <= nx < self.grid.grid_size[0] and 0 <= ny < self.grid.grid_size[1]:
|
||||
cell = self.grid.at(nx, ny)
|
||||
if cell.walkable:
|
||||
open_dirs.append(direction.lower())
|
||||
else:
|
||||
wall_dirs.append(direction.lower())
|
||||
|
||||
if open_dirs:
|
||||
descriptions.append(f"Open passages: {', '.join(open_dirs)}.")
|
||||
if wall_dirs:
|
||||
descriptions.append(f"Walls to the: {', '.join(wall_dirs)}.")
|
||||
|
||||
return LookResult(
|
||||
success=True,
|
||||
description=" ".join(descriptions),
|
||||
target_name="surroundings"
|
||||
)
|
||||
|
||||
def _look_at_target(self, agent, target: str) -> LookResult:
|
||||
"""Look at a specific target (direction, entity, or object name)."""
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
|
||||
# Check if target is a direction
|
||||
if target in self.DIRECTION_VECTORS:
|
||||
return self._look_in_direction(agent, target)
|
||||
|
||||
# Check if target matches an entity
|
||||
for entity in self.grid.entities:
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
if (ex, ey) == (ax, ay):
|
||||
continue
|
||||
|
||||
entity_name = getattr(entity, 'name', '').upper()
|
||||
if target in entity_name or entity_name in target:
|
||||
if self.grid.is_in_fov(ex, ey):
|
||||
return self._describe_entity(agent, entity)
|
||||
else:
|
||||
return LookResult(
|
||||
success=False,
|
||||
description=f"You cannot see {target.lower()} from here.",
|
||||
target_name=target.lower()
|
||||
)
|
||||
|
||||
# Check WorldGraph objects
|
||||
if self.world:
|
||||
room = self.world.room_at(ax, ay)
|
||||
if room:
|
||||
for obj in self.world.get_objects_in_room(room.name):
|
||||
if target in obj.name.upper() or obj.name.upper() in target:
|
||||
ox, oy = obj.position
|
||||
if self.grid.is_in_fov(ox, oy):
|
||||
return self._describe_object(agent, obj)
|
||||
|
||||
# Check doors
|
||||
for door in self.world.get_exits(room.name):
|
||||
if "DOOR" in target:
|
||||
dx, dy = door.position
|
||||
if self.grid.is_in_fov(dx, dy):
|
||||
return self._describe_door(agent, door)
|
||||
|
||||
return LookResult(
|
||||
success=False,
|
||||
description=f"You don't see anything called '{target.lower()}' nearby.",
|
||||
target_name=target.lower()
|
||||
)
|
||||
|
||||
def _look_in_direction(self, agent, direction: str) -> LookResult:
|
||||
"""Look in a cardinal direction."""
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
dx, dy = self.DIRECTION_VECTORS[direction]
|
||||
|
||||
descriptions = []
|
||||
|
||||
# Scan tiles in that direction
|
||||
for distance in range(1, 10):
|
||||
tx, ty = ax + dx * distance, ay + dy * distance
|
||||
|
||||
if not (0 <= tx < self.grid.grid_size[0] and 0 <= ty < self.grid.grid_size[1]):
|
||||
descriptions.append(f"The edge of the known world lies {direction.lower()}.")
|
||||
break
|
||||
|
||||
if not self.grid.is_in_fov(tx, ty):
|
||||
descriptions.append(f"Darkness obscures your vision beyond {distance} tiles.")
|
||||
break
|
||||
|
||||
cell = self.grid.at(tx, ty)
|
||||
|
||||
# Check for entity at this tile
|
||||
for entity in self.grid.entities:
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
if (ex, ey) == (tx, ty):
|
||||
entity_name = getattr(entity, 'name', 'creature')
|
||||
descriptions.append(f"A {entity_name} stands {distance} tile(s) to the {direction.lower()}.")
|
||||
|
||||
# Check for wall
|
||||
if not cell.walkable:
|
||||
# Check if it's a door
|
||||
if self.world:
|
||||
room = self.world.room_at(ax, ay)
|
||||
if room:
|
||||
for door in self.world.get_exits(room.name):
|
||||
if door.position == (tx, ty):
|
||||
dest = self.world.rooms.get(
|
||||
door.room_b if door.room_a == room.name else door.room_a
|
||||
)
|
||||
dest_name = dest.display_name if dest else "another area"
|
||||
lock_str = " It is locked." if door.locked else ""
|
||||
descriptions.append(
|
||||
f"A door to {dest_name} lies {distance} tile(s) {direction.lower()}.{lock_str}"
|
||||
)
|
||||
break
|
||||
else:
|
||||
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
|
||||
else:
|
||||
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
|
||||
else:
|
||||
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
|
||||
break
|
||||
|
||||
if not descriptions:
|
||||
descriptions.append(f"Open floor extends to the {direction.lower()}.")
|
||||
|
||||
return LookResult(
|
||||
success=True,
|
||||
description=" ".join(descriptions),
|
||||
target_name=direction.lower(),
|
||||
target_position=None
|
||||
)
|
||||
|
||||
def _describe_entity(self, agent, entity) -> LookResult:
|
||||
"""Generate detailed description of an entity."""
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
|
||||
entity_name = getattr(entity, 'name', 'creature')
|
||||
direction = get_direction_name((ax, ay), (ex, ey))
|
||||
distance = manhattan_distance((ax, ay), (ex, ey))
|
||||
|
||||
descriptions = [
|
||||
f"You examine the {entity_name} carefully.",
|
||||
f"It stands {distance} tile(s) to the {direction}."
|
||||
]
|
||||
|
||||
# Add any entity-specific description
|
||||
if hasattr(entity, 'description'):
|
||||
descriptions.append(entity.description)
|
||||
|
||||
# Add behavior hints if available
|
||||
if hasattr(entity, 'behavior'):
|
||||
descriptions.append(f"It appears to be {entity.behavior}.")
|
||||
|
||||
return LookResult(
|
||||
success=True,
|
||||
description=" ".join(descriptions),
|
||||
target_name=entity_name,
|
||||
target_position=(ex, ey)
|
||||
)
|
||||
|
||||
def _describe_object(self, agent, obj) -> LookResult:
|
||||
"""Generate detailed description of a WorldGraph object."""
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
ox, oy = obj.position
|
||||
|
||||
direction = get_direction_name((ax, ay), (ox, oy))
|
||||
distance = manhattan_distance((ax, ay), (ox, oy))
|
||||
|
||||
descriptions = [
|
||||
f"You examine {obj.display_name}.",
|
||||
f"It is {distance} tile(s) to the {direction}."
|
||||
]
|
||||
|
||||
if obj.description:
|
||||
descriptions.append(obj.description)
|
||||
|
||||
# Describe affordances
|
||||
if "takeable" in obj.affordances:
|
||||
descriptions.append("It looks small enough to pick up.")
|
||||
if "pressable" in obj.affordances:
|
||||
descriptions.append("It appears to be some kind of mechanism.")
|
||||
if "openable" in obj.affordances:
|
||||
descriptions.append("It can be opened.")
|
||||
if "readable" in obj.affordances:
|
||||
descriptions.append("There is writing on it.")
|
||||
|
||||
return LookResult(
|
||||
success=True,
|
||||
description=" ".join(descriptions),
|
||||
target_name=obj.name,
|
||||
target_position=(ox, oy)
|
||||
)
|
||||
|
||||
def _describe_door(self, agent, door) -> LookResult:
|
||||
"""Generate detailed description of a door."""
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
dx, dy = door.position
|
||||
|
||||
direction = get_direction_name((ax, ay), (dx, dy))
|
||||
distance = manhattan_distance((ax, ay), (dx, dy))
|
||||
|
||||
# Get destination
|
||||
if self.world:
|
||||
current_room = self.world.room_at(ax, ay)
|
||||
if current_room:
|
||||
if door.room_a == current_room.name:
|
||||
dest = self.world.rooms.get(door.room_b)
|
||||
else:
|
||||
dest = self.world.rooms.get(door.room_a)
|
||||
dest_name = dest.display_name if dest else "another area"
|
||||
else:
|
||||
dest_name = "another area"
|
||||
else:
|
||||
dest_name = "another area"
|
||||
|
||||
descriptions = [
|
||||
f"You examine the doorway to the {direction}.",
|
||||
f"It leads to {dest_name}, {distance} tile(s) away."
|
||||
]
|
||||
|
||||
if door.locked:
|
||||
descriptions.append("The door is locked. You'll need a key or mechanism to open it.")
|
||||
else:
|
||||
descriptions.append("The passage is open.")
|
||||
|
||||
return LookResult(
|
||||
success=True,
|
||||
description=" ".join(descriptions),
|
||||
target_name="door",
|
||||
target_position=(dx, dy)
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# SPEAK/ANNOUNCE Actions
|
||||
# =========================================================================
|
||||
|
||||
def execute_speech(self, agent, action: Action, all_agents: list,
|
||||
turn_number: int) -> SpeechResult:
|
||||
"""
|
||||
Execute SPEAK or ANNOUNCE action.
|
||||
|
||||
ANNOUNCE: All agents in the same room hear the message
|
||||
SPEAK: Only agents within SPEAK_RANGE tiles hear the message
|
||||
"""
|
||||
message_content = action.args[0] if action.args else ""
|
||||
|
||||
if not message_content:
|
||||
return SpeechResult(
|
||||
success=False,
|
||||
message="Nothing to say.",
|
||||
recipients=[],
|
||||
speech_type=action.type.value.lower(),
|
||||
content=""
|
||||
)
|
||||
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
recipients = []
|
||||
|
||||
if action.type == ActionType.ANNOUNCE:
|
||||
# Room-wide broadcast
|
||||
recipients = self._get_agents_in_room(agent, all_agents)
|
||||
speech_type = "announce"
|
||||
else:
|
||||
# Proximity-based speech
|
||||
recipients = self._get_agents_in_range(agent, all_agents, self.SPEAK_RANGE)
|
||||
speech_type = "speak"
|
||||
|
||||
# Deliver messages
|
||||
for recipient in recipients:
|
||||
if recipient.name not in self.pending_messages:
|
||||
self.pending_messages[recipient.name] = []
|
||||
|
||||
distance = manhattan_distance(
|
||||
(ax, ay),
|
||||
(int(recipient.entity.pos[0]), int(recipient.entity.pos[1]))
|
||||
) if speech_type == "speak" else None
|
||||
|
||||
self.pending_messages[recipient.name].append(Message(
|
||||
sender=agent.name,
|
||||
content=message_content,
|
||||
speech_type=speech_type,
|
||||
turn=turn_number,
|
||||
distance=distance
|
||||
))
|
||||
|
||||
recipient_names = [r.name for r in recipients]
|
||||
|
||||
if recipients:
|
||||
return SpeechResult(
|
||||
success=True,
|
||||
message=f"You {speech_type}: \"{message_content}\"",
|
||||
recipients=recipient_names,
|
||||
speech_type=speech_type,
|
||||
content=message_content
|
||||
)
|
||||
else:
|
||||
return SpeechResult(
|
||||
success=True, # Still succeeds, just nobody heard
|
||||
message=f"You {speech_type} into the emptiness: \"{message_content}\"",
|
||||
recipients=[],
|
||||
speech_type=speech_type,
|
||||
content=message_content
|
||||
)
|
||||
|
||||
def _get_agents_in_room(self, speaker, all_agents: list) -> list:
|
||||
"""Get all agents in the same room as speaker (excluding speaker)."""
|
||||
if not self.world:
|
||||
# Fallback: use proximity
|
||||
return self._get_agents_in_range(speaker, all_agents, 20)
|
||||
|
||||
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
|
||||
speaker_room = self.world.room_at(ax, ay)
|
||||
|
||||
if not speaker_room:
|
||||
return []
|
||||
|
||||
recipients = []
|
||||
for agent in all_agents:
|
||||
if agent.name == speaker.name:
|
||||
continue
|
||||
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
agent_room = self.world.room_at(rx, ry)
|
||||
if agent_room and agent_room.name == speaker_room.name:
|
||||
recipients.append(agent)
|
||||
|
||||
return recipients
|
||||
|
||||
def _get_agents_in_range(self, speaker, all_agents: list, range_tiles: int) -> list:
|
||||
"""Get all agents within Manhattan distance of speaker."""
|
||||
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
|
||||
|
||||
recipients = []
|
||||
for agent in all_agents:
|
||||
if agent.name == speaker.name:
|
||||
continue
|
||||
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
if manhattan_distance((ax, ay), (rx, ry)) <= range_tiles:
|
||||
recipients.append(agent)
|
||||
|
||||
return recipients
|
||||
|
||||
# =========================================================================
|
||||
# TAKE Action
|
||||
# =========================================================================
|
||||
|
||||
def execute_take(self, agent, action: Action) -> TakeResult:
|
||||
"""
|
||||
Execute TAKE action - pick up an item.
|
||||
|
||||
Items must be:
|
||||
1. In the WorldGraph as a takeable object
|
||||
2. Within reach (adjacent tile or same tile, distance <= 1)
|
||||
3. Visible in FOV
|
||||
"""
|
||||
item_name = action.args[0].lower() if action.args and action.args[0] else None
|
||||
|
||||
if not item_name:
|
||||
return TakeResult(
|
||||
success=False,
|
||||
message="Take what? Specify an item name.",
|
||||
item_name=""
|
||||
)
|
||||
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
|
||||
# Search for the item in WorldGraph
|
||||
if not self.world:
|
||||
return TakeResult(
|
||||
success=False,
|
||||
message="No items exist in this world.",
|
||||
item_name=item_name
|
||||
)
|
||||
|
||||
# Find matching object
|
||||
matching_obj = None
|
||||
for obj_name, obj in self.world.objects.items():
|
||||
if item_name in obj_name.lower() or obj_name.lower() in item_name:
|
||||
matching_obj = obj
|
||||
break
|
||||
|
||||
if not matching_obj:
|
||||
return TakeResult(
|
||||
success=False,
|
||||
message=f"You don't see any '{item_name}' here.",
|
||||
item_name=item_name
|
||||
)
|
||||
|
||||
# Check if takeable
|
||||
if "takeable" not in matching_obj.affordances:
|
||||
return TakeResult(
|
||||
success=False,
|
||||
message=f"The {matching_obj.display_name} cannot be picked up.",
|
||||
item_name=item_name,
|
||||
item_position=matching_obj.position
|
||||
)
|
||||
|
||||
ox, oy = matching_obj.position
|
||||
|
||||
# Check if visible in FOV
|
||||
if not self.grid.is_in_fov(ox, oy):
|
||||
return TakeResult(
|
||||
success=False,
|
||||
message=f"You can't see the {matching_obj.display_name} from here.",
|
||||
item_name=item_name,
|
||||
item_position=(ox, oy)
|
||||
)
|
||||
|
||||
# Check distance (must be adjacent or same tile)
|
||||
distance = manhattan_distance((ax, ay), (ox, oy))
|
||||
if distance > 1:
|
||||
direction = get_direction_name((ax, ay), (ox, oy))
|
||||
# Use name for cleaner message (display_name often has article already)
|
||||
return TakeResult(
|
||||
success=False,
|
||||
message=f"The {matching_obj.name.replace('_', ' ')} is {distance} tiles away to the {direction}. Move closer to pick it up.",
|
||||
item_name=item_name,
|
||||
item_position=(ox, oy)
|
||||
)
|
||||
|
||||
# Success! Remove from world (simplified - no inventory system yet)
|
||||
del self.world.objects[matching_obj.name]
|
||||
|
||||
return TakeResult(
|
||||
success=True,
|
||||
message=f"You pick up {matching_obj.display_name}.",
|
||||
item_name=matching_obj.name,
|
||||
item_position=(ox, oy)
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# Movement (single tile, delegates to original executor)
|
||||
# =========================================================================
|
||||
|
||||
def execute_move(self, agent, action: Action) -> ActionResult:
|
||||
"""
|
||||
Execute single-tile movement.
|
||||
|
||||
This is the per-turn movement. Multi-tile paths are handled
|
||||
at the orchestrator level.
|
||||
"""
|
||||
if not action.args or not action.args[0]:
|
||||
return ActionResult(False, "No direction specified")
|
||||
|
||||
direction = action.args[0]
|
||||
if direction not in self.DIRECTION_VECTORS:
|
||||
return ActionResult(False, f"Invalid direction: {direction}")
|
||||
|
||||
dx, dy = self.DIRECTION_VECTORS[direction]
|
||||
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
new_x, new_y = current_x + dx, current_y + dy
|
||||
|
||||
# Bounds check
|
||||
grid_w, grid_h = self.grid.grid_size
|
||||
if not (0 <= new_x < grid_w and 0 <= new_y < grid_h):
|
||||
return ActionResult(False, f"Cannot go {direction} - edge of map")
|
||||
|
||||
# Walkability check
|
||||
target_cell = self.grid.at(new_x, new_y)
|
||||
if not target_cell.walkable:
|
||||
return ActionResult(False, f"Cannot go {direction} - path blocked")
|
||||
|
||||
# Entity collision check
|
||||
for entity in self.grid.entities:
|
||||
if entity is agent.entity:
|
||||
continue
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
if ex == new_x and ey == new_y:
|
||||
return ActionResult(False, f"Cannot go {direction} - occupied")
|
||||
|
||||
# Execute movement
|
||||
agent.entity.pos = (new_x, new_y)
|
||||
|
||||
return ActionResult(
|
||||
success=True,
|
||||
message=f"Moved {direction.lower()} to ({new_x}, {new_y})",
|
||||
new_position=(new_x, new_y),
|
||||
path=[(current_x, current_y), (new_x, new_y)]
|
||||
)
|
||||
|
||||
def execute_wait(self, agent, action: Action) -> ActionResult:
|
||||
"""Execute WAIT action."""
|
||||
return ActionResult(True, "Waited and observed surroundings")
|
||||
|
||||
# =========================================================================
|
||||
# Multi-tile Pathfinding
|
||||
# =========================================================================
|
||||
|
||||
def plan_path_to(self, agent, target_pos: Tuple[int, int],
|
||||
visible_entities: Set[str]) -> Optional[List[Tuple[int, int]]]:
|
||||
"""
|
||||
Plan a path to a target position.
|
||||
|
||||
Uses A* via libtcod if available, otherwise simple pathfinding.
|
||||
Returns list of tiles from current position to target (excluding current).
|
||||
"""
|
||||
try:
|
||||
from mcrfpy import libtcod
|
||||
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
|
||||
path = libtcod.find_path(self.grid, ax, ay, target_pos[0], target_pos[1])
|
||||
|
||||
if path:
|
||||
# Store path state
|
||||
path_state = self.get_path_state(agent.name)
|
||||
path_state.path = path
|
||||
path_state.current_index = 0
|
||||
path_state.visible_entities_at_start = visible_entities.copy()
|
||||
|
||||
return path
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def continue_path(self, agent, current_visible: Set[str]) -> Optional[ActionResult]:
|
||||
"""
|
||||
Continue an existing multi-tile path.
|
||||
|
||||
Returns ActionResult if moved, None if path complete or interrupted.
|
||||
"""
|
||||
path_state = self.get_path_state(agent.name)
|
||||
|
||||
if not path_state.has_path:
|
||||
return None
|
||||
|
||||
# Check for FOV interrupt
|
||||
if path_state.should_interrupt(current_visible):
|
||||
path_state.clear()
|
||||
return None # Signal that LLM should be queried
|
||||
|
||||
# Get next tile
|
||||
next_tile = path_state.next_tile
|
||||
if not next_tile:
|
||||
path_state.clear()
|
||||
return None
|
||||
|
||||
# Move to next tile
|
||||
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||||
new_x, new_y = next_tile
|
||||
|
||||
# Verify still walkable
|
||||
target_cell = self.grid.at(new_x, new_y)
|
||||
if not target_cell.walkable:
|
||||
path_state.clear()
|
||||
return ActionResult(False, "Path blocked - recalculating")
|
||||
|
||||
# Check for entity collision
|
||||
for entity in self.grid.entities:
|
||||
if entity is agent.entity:
|
||||
continue
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
if ex == new_x and ey == new_y:
|
||||
path_state.clear()
|
||||
return ActionResult(False, "Path blocked by creature")
|
||||
|
||||
# Execute movement
|
||||
agent.entity.pos = (new_x, new_y)
|
||||
path_state.advance()
|
||||
|
||||
remaining = path_state.remaining_tiles
|
||||
if remaining > 0:
|
||||
msg = f"Continuing path ({remaining} tiles remaining)"
|
||||
else:
|
||||
msg = "Arrived at destination"
|
||||
path_state.clear()
|
||||
|
||||
return ActionResult(
|
||||
success=True,
|
||||
message=msg,
|
||||
new_position=(new_x, new_y),
|
||||
path=[(current_x, current_y), (new_x, new_y)]
|
||||
)
|
||||
606
tests/vllm_demo/enhanced_orchestrator.py
Normal file
606
tests/vllm_demo/enhanced_orchestrator.py
Normal file
|
|
@ -0,0 +1,606 @@
|
|||
"""
|
||||
Enhanced Turn Orchestrator
|
||||
==========================
|
||||
|
||||
Extends TurnOrchestrator with:
|
||||
- Action economy (free actions vs turn-ending)
|
||||
- Multi-tile path continuation
|
||||
- FOV interrupt detection
|
||||
- Enhanced logging for offline viewer replay
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from typing import List, Dict, Any, Optional, Callable, Set
|
||||
from datetime import datetime
|
||||
|
||||
from world_graph import WorldGraph, AgentInfo
|
||||
from action_parser import Action, ActionType, parse_action
|
||||
from action_executor import ActionResult
|
||||
from action_economy import (
|
||||
TurnState, PathState, TurnCost, get_action_cost,
|
||||
PointOfInterestCollector, PointOfInterest
|
||||
)
|
||||
from enhanced_executor import EnhancedExecutor, LookResult, SpeechResult, Message, TakeResult
|
||||
|
||||
|
||||
@dataclass
|
||||
class FreeActionRecord:
|
||||
"""Record of a free action taken during a turn."""
|
||||
action_type: str
|
||||
args: tuple
|
||||
result: Dict[str, Any]
|
||||
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnhancedSimulationStep:
|
||||
"""
|
||||
Enhanced simulation step for offline viewer replay.
|
||||
|
||||
Contains all data needed to reconstruct the agent's perspective
|
||||
and decision-making for that turn.
|
||||
"""
|
||||
# Turn identification
|
||||
turn: int
|
||||
agent_id: str
|
||||
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
|
||||
|
||||
# Agent state at start of turn
|
||||
position_start: tuple = (0, 0)
|
||||
room: str = ""
|
||||
path_in_progress: bool = False
|
||||
|
||||
# FOV and perception
|
||||
visible_entities: List[str] = field(default_factory=list)
|
||||
visible_tiles: int = 0 # Count of visible tiles
|
||||
points_of_interest: List[Dict] = field(default_factory=list)
|
||||
|
||||
# Context provided to LLM
|
||||
location_description: str = ""
|
||||
available_actions: List[str] = field(default_factory=list)
|
||||
pending_messages: List[Dict] = field(default_factory=list)
|
||||
poi_prompt: str = ""
|
||||
|
||||
# Screenshot path (for viewer to load)
|
||||
screenshot_path: str = ""
|
||||
|
||||
# LLM interaction
|
||||
llm_prompt_system: str = ""
|
||||
llm_prompt_user: str = ""
|
||||
llm_response: str = ""
|
||||
llm_was_queried: bool = True # False if path continuation
|
||||
|
||||
# Conversation history (LLM queries within this turn)
|
||||
llm_exchanges: List[Dict] = field(default_factory=list) # [{prompt, response, action, error}]
|
||||
action_retries: int = 0 # How many times we re-prompted due to errors
|
||||
|
||||
# Free actions taken (LOOK, SPEAK)
|
||||
free_actions: List[Dict] = field(default_factory=list)
|
||||
|
||||
# Turn-ending action
|
||||
final_action_type: str = ""
|
||||
final_action_args: tuple = ()
|
||||
final_action_success: bool = False
|
||||
final_action_message: str = ""
|
||||
|
||||
# Movement result
|
||||
position_end: tuple = (0, 0)
|
||||
path_taken: List[tuple] = field(default_factory=list)
|
||||
path_remaining: int = 0 # Tiles left if multi-tile path
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnhancedSimulationLog:
|
||||
"""
|
||||
Complete simulation log for offline viewer.
|
||||
|
||||
Designed to support:
|
||||
- Turn-by-turn replay
|
||||
- Per-agent perspective reconstruction
|
||||
- LLM chain-of-thought review
|
||||
- Speech history tracking
|
||||
"""
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
steps: List[EnhancedSimulationStep] = field(default_factory=list)
|
||||
speech_log: List[Dict] = field(default_factory=list)
|
||||
|
||||
def save(self, path: str):
|
||||
"""Save log to JSON file."""
|
||||
data = {
|
||||
"metadata": self.metadata,
|
||||
"steps": [asdict(s) for s in self.steps],
|
||||
"speech_log": self.speech_log
|
||||
}
|
||||
with open(path, 'w') as f:
|
||||
json.dump(data, f, indent=2, default=str)
|
||||
print(f"Enhanced simulation log saved to: {path}")
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: str) -> 'EnhancedSimulationLog':
|
||||
"""Load log from JSON file."""
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
steps = []
|
||||
for s in data.get("steps", []):
|
||||
# Convert lists back to tuples where needed
|
||||
if isinstance(s.get("position_start"), list):
|
||||
s["position_start"] = tuple(s["position_start"])
|
||||
if isinstance(s.get("position_end"), list):
|
||||
s["position_end"] = tuple(s["position_end"])
|
||||
if isinstance(s.get("final_action_args"), list):
|
||||
s["final_action_args"] = tuple(s["final_action_args"])
|
||||
if s.get("path_taken"):
|
||||
s["path_taken"] = [tuple(p) for p in s["path_taken"]]
|
||||
steps.append(EnhancedSimulationStep(**s))
|
||||
|
||||
return cls(
|
||||
metadata=data.get("metadata", {}),
|
||||
steps=steps,
|
||||
speech_log=data.get("speech_log", [])
|
||||
)
|
||||
|
||||
def get_turn_summary(self, turn: int) -> str:
|
||||
"""Get summary of a specific turn for display."""
|
||||
turn_steps = [s for s in self.steps if s.turn == turn]
|
||||
lines = [f"=== Turn {turn} ==="]
|
||||
|
||||
for step in turn_steps:
|
||||
lines.append(f"\n{step.agent_id}:")
|
||||
lines.append(f" Position: {step.position_start} -> {step.position_end}")
|
||||
|
||||
if step.free_actions:
|
||||
lines.append(f" Free actions: {len(step.free_actions)}")
|
||||
for fa in step.free_actions:
|
||||
lines.append(f" - {fa['action_type']}: {fa.get('result', {}).get('message', '')[:50]}")
|
||||
|
||||
status = "OK" if step.final_action_success else "FAIL"
|
||||
lines.append(f" Action: {step.final_action_type} {step.final_action_args} [{status}]")
|
||||
|
||||
if not step.llm_was_queried:
|
||||
lines.append(" (Path continuation - no LLM query)")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class EnhancedOrchestrator:
|
||||
"""
|
||||
Enhanced turn orchestrator with action economy and improved logging.
|
||||
"""
|
||||
|
||||
def __init__(self, grid, fov_layer, world: WorldGraph, agents: list,
|
||||
screenshot_dir: str, llm_query_fn: Callable):
|
||||
"""
|
||||
Initialize enhanced orchestrator.
|
||||
|
||||
Args:
|
||||
grid: mcrfpy.Grid instance
|
||||
fov_layer: Color layer for FOV rendering
|
||||
world: WorldGraph instance
|
||||
agents: List of Agent objects
|
||||
screenshot_dir: Directory for screenshots
|
||||
llm_query_fn: Function(agent, screenshot_path, context) -> str
|
||||
"""
|
||||
self.grid = grid
|
||||
self.fov_layer = fov_layer
|
||||
self.world = world
|
||||
self.agents = agents
|
||||
self.screenshot_dir = screenshot_dir
|
||||
self.llm_query_fn = llm_query_fn
|
||||
|
||||
self.executor = EnhancedExecutor(grid, world)
|
||||
self.turn_number = 0
|
||||
self.steps: List[EnhancedSimulationStep] = []
|
||||
self.speech_log: List[Dict] = []
|
||||
|
||||
os.makedirs(screenshot_dir, exist_ok=True)
|
||||
|
||||
def run_simulation(self, max_turns: int = 10,
|
||||
stop_condition: Callable = None) -> EnhancedSimulationLog:
|
||||
"""
|
||||
Run complete simulation with enhanced logging.
|
||||
|
||||
Args:
|
||||
max_turns: Maximum number of turns
|
||||
stop_condition: Optional callable(orchestrator) -> bool
|
||||
|
||||
Returns:
|
||||
EnhancedSimulationLog for offline viewer
|
||||
"""
|
||||
print(f"\nStarting enhanced simulation: max {max_turns} turns")
|
||||
print(f"Agents: {[a.name for a in self.agents]}")
|
||||
print("=" * 60)
|
||||
|
||||
for turn in range(max_turns):
|
||||
self.run_turn()
|
||||
|
||||
if stop_condition and stop_condition(self):
|
||||
print(f"\nStop condition met at turn {self.turn_number}")
|
||||
break
|
||||
|
||||
# Build log
|
||||
log = EnhancedSimulationLog(
|
||||
metadata={
|
||||
"total_turns": self.turn_number,
|
||||
"num_agents": len(self.agents),
|
||||
"agent_names": [a.name for a in self.agents],
|
||||
"timestamp_start": self.steps[0].timestamp if self.steps else "",
|
||||
"timestamp_end": self.steps[-1].timestamp if self.steps else "",
|
||||
"world_rooms": list(self.world.rooms.keys()),
|
||||
"screenshot_dir": self.screenshot_dir,
|
||||
},
|
||||
steps=self.steps,
|
||||
speech_log=self.speech_log
|
||||
)
|
||||
|
||||
return log
|
||||
|
||||
def run_turn(self) -> List[EnhancedSimulationStep]:
|
||||
"""Execute one full turn (all agents act once)."""
|
||||
import mcrfpy
|
||||
|
||||
self.turn_number += 1
|
||||
turn_steps = []
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"TURN {self.turn_number}")
|
||||
print("=" * 60)
|
||||
|
||||
for agent in self.agents:
|
||||
step = self._run_agent_turn(agent)
|
||||
turn_steps.append(step)
|
||||
self.steps.append(step)
|
||||
|
||||
return turn_steps
|
||||
|
||||
def _run_agent_turn(self, agent) -> EnhancedSimulationStep:
|
||||
"""Execute one agent's turn with action economy."""
|
||||
import mcrfpy
|
||||
from mcrfpy import automation
|
||||
|
||||
print(f"\n--- {agent.name}'s Turn ---")
|
||||
|
||||
# Initialize step record
|
||||
step = EnhancedSimulationStep(
|
||||
turn=self.turn_number,
|
||||
agent_id=agent.name,
|
||||
position_start=agent.pos,
|
||||
room=agent.current_room
|
||||
)
|
||||
|
||||
# Check for path continuation
|
||||
path_state = self.executor.get_path_state(agent.name)
|
||||
current_visible = self._get_visible_entity_ids(agent)
|
||||
|
||||
if path_state.has_path:
|
||||
# Check for FOV interrupt
|
||||
if path_state.should_interrupt(current_visible):
|
||||
print(f" Path interrupted: new entity in FOV")
|
||||
path_state.clear()
|
||||
else:
|
||||
# Continue path without LLM query
|
||||
result = self.executor.continue_path(agent, current_visible)
|
||||
if result and result.success:
|
||||
step.llm_was_queried = False
|
||||
step.path_in_progress = True
|
||||
step.final_action_type = "GO"
|
||||
step.final_action_args = ("CONTINUE",)
|
||||
step.final_action_success = True
|
||||
step.final_action_message = result.message
|
||||
step.position_end = result.new_position or agent.pos
|
||||
step.path_taken = result.path or []
|
||||
step.path_remaining = self.executor.get_path_state(agent.name).remaining_tiles
|
||||
|
||||
print(f" Path continuation: {result.message}")
|
||||
return step
|
||||
|
||||
# Need LLM query - set up perspective
|
||||
step.visible_entities = list(current_visible)
|
||||
self._switch_perspective(agent)
|
||||
mcrfpy.step(0.016)
|
||||
|
||||
# Take screenshot
|
||||
screenshot_path = os.path.join(
|
||||
self.screenshot_dir,
|
||||
f"turn{self.turn_number}_{agent.name.lower()}.png"
|
||||
)
|
||||
automation.screenshot(screenshot_path)
|
||||
step.screenshot_path = screenshot_path
|
||||
|
||||
# Collect points of interest
|
||||
poi_collector = PointOfInterestCollector(self.grid, agent.pos)
|
||||
pois = poi_collector.collect_from_fov(self.world)
|
||||
step.points_of_interest = [asdict(p) for p in pois]
|
||||
step.poi_prompt = poi_collector.format_for_prompt()
|
||||
|
||||
# Get pending messages
|
||||
messages = self.executor.get_pending_messages(agent.name)
|
||||
step.pending_messages = [asdict(m) for m in messages]
|
||||
|
||||
# Build context
|
||||
visible_agents = self._get_visible_agents(agent)
|
||||
context = agent.get_context(visible_agents + [agent])
|
||||
step.location_description = context["location"]
|
||||
step.available_actions = context["available_actions"]
|
||||
|
||||
# Turn state for action economy
|
||||
turn_state = TurnState()
|
||||
|
||||
# Error feedback for retry loop
|
||||
last_error = None
|
||||
MAX_RETRIES = 3
|
||||
|
||||
# Action loop - handle free actions until turn-ending action
|
||||
while not turn_state.turn_ended:
|
||||
# Build prompt with current state (includes error feedback if any)
|
||||
prompt = self._build_prompt(agent, context, step.poi_prompt, messages, turn_state, last_error)
|
||||
step.llm_prompt_user = prompt # Store last prompt
|
||||
|
||||
# Query LLM
|
||||
print(f" Querying LLM...")
|
||||
response = self.llm_query_fn(agent, screenshot_path, {
|
||||
**context,
|
||||
"poi_prompt": step.poi_prompt,
|
||||
"messages": [asdict(m) for m in messages],
|
||||
"has_spoken": turn_state.has_spoken,
|
||||
"last_error": last_error,
|
||||
"conversation_history": step.llm_exchanges # Include past exchanges
|
||||
})
|
||||
step.llm_response = response
|
||||
print(f" Response: {response[:200]}...")
|
||||
|
||||
# Parse action
|
||||
action = parse_action(response)
|
||||
cost = get_action_cost(action)
|
||||
|
||||
print(f" Action: {action.type.value} {action.args} (cost: {cost.value})")
|
||||
|
||||
# Track this exchange
|
||||
exchange = {
|
||||
"prompt": prompt[:500], # Truncate for storage
|
||||
"response": response,
|
||||
"action_type": action.type.value,
|
||||
"action_args": action.args,
|
||||
"error": None
|
||||
}
|
||||
|
||||
# Execute action based on type
|
||||
if action.type == ActionType.LOOK:
|
||||
result = self.executor.execute_look(agent, action)
|
||||
turn_state.record_free_action("LOOK", {
|
||||
"target": result.target_name,
|
||||
"description": result.description
|
||||
})
|
||||
step.free_actions.append({
|
||||
"action_type": "LOOK",
|
||||
"args": action.args,
|
||||
"result": {"description": result.description}
|
||||
})
|
||||
# Provide result and continue loop for another action
|
||||
context["look_result"] = result.description
|
||||
last_error = None # Clear error on success
|
||||
print(f" LOOK result: {result.description[:100]}...")
|
||||
|
||||
elif action.type in (ActionType.SPEAK, ActionType.ANNOUNCE):
|
||||
if not turn_state.can_speak():
|
||||
print(f" Already spoke this turn")
|
||||
last_error = "You have already spoken this turn. Choose a different action."
|
||||
exchange["error"] = last_error
|
||||
step.action_retries += 1
|
||||
if step.action_retries >= MAX_RETRIES:
|
||||
# Force end turn
|
||||
step.final_action_type = "WAIT"
|
||||
step.final_action_args = ()
|
||||
step.final_action_success = False
|
||||
step.final_action_message = "Too many invalid actions - turn ended"
|
||||
step.position_end = agent.pos
|
||||
turn_state.end_turn()
|
||||
else:
|
||||
result = self.executor.execute_speech(
|
||||
agent, action, self.agents, self.turn_number
|
||||
)
|
||||
turn_state.record_speech()
|
||||
turn_state.record_free_action(action.type.value, {
|
||||
"content": result.content,
|
||||
"recipients": result.recipients
|
||||
})
|
||||
step.free_actions.append({
|
||||
"action_type": action.type.value,
|
||||
"args": action.args,
|
||||
"result": {
|
||||
"content": result.content,
|
||||
"recipients": result.recipients
|
||||
}
|
||||
})
|
||||
# Record in speech log
|
||||
self.speech_log.append({
|
||||
"turn": self.turn_number,
|
||||
"speaker": agent.name,
|
||||
"type": result.speech_type,
|
||||
"content": result.content,
|
||||
"recipients": result.recipients
|
||||
})
|
||||
last_error = None
|
||||
print(f" {result.speech_type.upper()}: {result.content[:50]}... -> {result.recipients}")
|
||||
# Continue loop for another action (can still move)
|
||||
|
||||
elif action.type == ActionType.TAKE:
|
||||
result = self.executor.execute_take(agent, action)
|
||||
if result.success:
|
||||
step.final_action_type = "TAKE"
|
||||
step.final_action_args = action.args
|
||||
step.final_action_success = True
|
||||
step.final_action_message = result.message
|
||||
step.position_end = agent.pos
|
||||
last_error = None
|
||||
turn_state.end_turn()
|
||||
print(f" TAKE: {result.message}")
|
||||
else:
|
||||
# Failed - give error feedback and let LLM try again
|
||||
last_error = result.message
|
||||
exchange["error"] = last_error
|
||||
step.action_retries += 1
|
||||
print(f" TAKE FAILED: {result.message}")
|
||||
if step.action_retries >= MAX_RETRIES:
|
||||
step.final_action_type = "TAKE"
|
||||
step.final_action_args = action.args
|
||||
step.final_action_success = False
|
||||
step.final_action_message = result.message
|
||||
step.position_end = agent.pos
|
||||
turn_state.end_turn()
|
||||
|
||||
elif action.type == ActionType.GO:
|
||||
result = self.executor.execute_move(agent, action)
|
||||
if result.success:
|
||||
step.final_action_type = "GO"
|
||||
step.final_action_args = action.args
|
||||
step.final_action_success = True
|
||||
step.final_action_message = result.message
|
||||
step.position_end = result.new_position or agent.pos
|
||||
step.path_taken = result.path or []
|
||||
last_error = None
|
||||
turn_state.end_turn()
|
||||
print(f" MOVE: {result.message}")
|
||||
else:
|
||||
# Failed - give error feedback
|
||||
last_error = result.message
|
||||
exchange["error"] = last_error
|
||||
step.action_retries += 1
|
||||
print(f" MOVE FAILED: {result.message}")
|
||||
if step.action_retries >= MAX_RETRIES:
|
||||
step.final_action_type = "GO"
|
||||
step.final_action_args = action.args
|
||||
step.final_action_success = False
|
||||
step.final_action_message = result.message
|
||||
step.position_end = agent.pos
|
||||
turn_state.end_turn()
|
||||
|
||||
elif action.type == ActionType.WAIT:
|
||||
result = self.executor.execute_wait(agent, action)
|
||||
step.final_action_type = "WAIT"
|
||||
step.final_action_args = ()
|
||||
step.final_action_success = True
|
||||
step.final_action_message = result.message
|
||||
step.position_end = agent.pos
|
||||
last_error = None
|
||||
turn_state.end_turn()
|
||||
print(f" WAIT")
|
||||
|
||||
elif action.type == ActionType.INVALID:
|
||||
# Could not parse action - give feedback
|
||||
last_error = f"Could not understand your action. Please use a valid action format like 'Action: GO EAST' or 'Action: TAKE key'."
|
||||
exchange["error"] = last_error
|
||||
step.action_retries += 1
|
||||
print(f" INVALID ACTION: {action.args}")
|
||||
if step.action_retries >= MAX_RETRIES:
|
||||
step.final_action_type = "INVALID"
|
||||
step.final_action_args = action.args
|
||||
step.final_action_success = False
|
||||
step.final_action_message = "Could not parse action"
|
||||
step.position_end = agent.pos
|
||||
turn_state.end_turn()
|
||||
|
||||
else:
|
||||
# Unimplemented action type - give feedback
|
||||
last_error = f"The action '{action.type.value}' is not yet supported. Try GO, TAKE, LOOK, SPEAK, or WAIT."
|
||||
exchange["error"] = last_error
|
||||
step.action_retries += 1
|
||||
print(f" Unsupported: {action.type.value}")
|
||||
if step.action_retries >= MAX_RETRIES:
|
||||
step.final_action_type = action.type.value
|
||||
step.final_action_args = action.args
|
||||
step.final_action_success = False
|
||||
step.final_action_message = f"Unsupported action: {action.type.value}"
|
||||
step.position_end = agent.pos
|
||||
turn_state.end_turn()
|
||||
|
||||
# Record exchange
|
||||
step.llm_exchanges.append(exchange)
|
||||
|
||||
return step
|
||||
|
||||
def _build_prompt(self, agent, context: dict, poi_prompt: str,
|
||||
messages: List[Message], turn_state: TurnState,
|
||||
last_error: Optional[str] = None) -> str:
|
||||
"""Build LLM prompt with current state and error feedback."""
|
||||
parts = [context["location"]]
|
||||
|
||||
# Add messages received
|
||||
if messages:
|
||||
parts.append("\nMessages received:")
|
||||
for msg in messages:
|
||||
if msg.speech_type == "announce":
|
||||
parts.append(f' {msg.sender} announces: "{msg.content}"')
|
||||
else:
|
||||
parts.append(f' {msg.sender} says: "{msg.content}"')
|
||||
|
||||
# Add points of interest
|
||||
parts.append(f"\n{poi_prompt}")
|
||||
|
||||
# Add available actions
|
||||
actions_str = ", ".join(context["available_actions"])
|
||||
parts.append(f"\nAvailable actions: {actions_str}")
|
||||
|
||||
# Add LOOK result if we just looked
|
||||
if "look_result" in context:
|
||||
parts.append(f"\n[LOOK result: {context['look_result']}]")
|
||||
|
||||
# Add constraints
|
||||
constraints = []
|
||||
if turn_state.has_spoken:
|
||||
constraints.append("You have already spoken this turn.")
|
||||
if constraints:
|
||||
parts.append(f"\nConstraints: {' '.join(constraints)}")
|
||||
|
||||
# Add error feedback from last action attempt
|
||||
if last_error:
|
||||
parts.append(f"\n[ERROR: {last_error}]")
|
||||
parts.append("[Please try a different action.]")
|
||||
|
||||
parts.append("\nWhat do you do? Brief reasoning, then Action: <action>")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
def _switch_perspective(self, agent):
|
||||
"""Switch grid view to agent's perspective."""
|
||||
import mcrfpy
|
||||
|
||||
self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
|
||||
self.fov_layer.apply_perspective(
|
||||
entity=agent.entity,
|
||||
visible=mcrfpy.Color(0, 0, 0, 0),
|
||||
discovered=mcrfpy.Color(40, 40, 60, 180),
|
||||
unknown=mcrfpy.Color(0, 0, 0, 255)
|
||||
)
|
||||
agent.entity.update_visibility()
|
||||
|
||||
px, py = agent.pos
|
||||
self.grid.center = (px * 16 + 8, py * 16 + 8)
|
||||
|
||||
def _get_visible_agents(self, observer) -> list:
|
||||
"""Get agents visible to observer based on FOV."""
|
||||
visible = []
|
||||
for agent in self.agents:
|
||||
if agent.name == observer.name:
|
||||
continue
|
||||
ax, ay = agent.pos
|
||||
if self.grid.is_in_fov(ax, ay):
|
||||
visible.append(agent)
|
||||
return visible
|
||||
|
||||
def _get_visible_entity_ids(self, agent) -> Set[str]:
|
||||
"""Get set of entity IDs currently visible to agent."""
|
||||
visible = set()
|
||||
ax, ay = agent.pos
|
||||
|
||||
for entity in self.grid.entities:
|
||||
if entity is agent.entity:
|
||||
continue
|
||||
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||||
if self.grid.is_in_fov(ex, ey):
|
||||
entity_id = getattr(entity, 'id', None) or str(id(entity))
|
||||
visible.add(entity_id)
|
||||
|
||||
return visible
|
||||
|
|
@ -87,22 +87,31 @@ class Font:
|
|||
|
||||
class Drawable:
|
||||
"""Base class for all drawable UI elements."""
|
||||
|
||||
|
||||
x: float
|
||||
y: float
|
||||
visible: bool
|
||||
z_index: int
|
||||
name: str
|
||||
pos: Vector
|
||||
|
||||
|
||||
# Mouse event callbacks (#140, #141)
|
||||
on_click: Optional[Callable[[float, float, int, str], None]]
|
||||
on_enter: Optional[Callable[[float, float, int, str], None]]
|
||||
on_exit: Optional[Callable[[float, float, int, str], None]]
|
||||
on_move: Optional[Callable[[float, float, int, str], None]]
|
||||
|
||||
# Read-only hover state (#140)
|
||||
hovered: bool
|
||||
|
||||
def get_bounds(self) -> Tuple[float, float, float, float]:
|
||||
"""Get bounding box as (x, y, width, height)."""
|
||||
...
|
||||
|
||||
|
||||
def move(self, dx: float, dy: float) -> None:
|
||||
"""Move by relative offset (dx, dy)."""
|
||||
...
|
||||
|
||||
|
||||
def resize(self, width: float, height: float) -> None:
|
||||
"""Resize to new dimensions (width, height)."""
|
||||
...
|
||||
|
|
@ -343,45 +352,47 @@ class EntityCollection:
|
|||
|
||||
class Scene:
|
||||
"""Base class for object-oriented scenes."""
|
||||
|
||||
|
||||
name: str
|
||||
|
||||
children: UICollection # #151: UI elements collection (read-only alias for get_ui())
|
||||
on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action)
|
||||
|
||||
def __init__(self, name: str) -> None: ...
|
||||
|
||||
|
||||
def activate(self) -> None:
|
||||
"""Called when scene becomes active."""
|
||||
...
|
||||
|
||||
|
||||
def deactivate(self) -> None:
|
||||
"""Called when scene becomes inactive."""
|
||||
...
|
||||
|
||||
|
||||
def get_ui(self) -> UICollection:
|
||||
"""Get UI elements collection."""
|
||||
...
|
||||
|
||||
|
||||
def on_keypress(self, key: str, pressed: bool) -> None:
|
||||
"""Handle keyboard events."""
|
||||
"""Handle keyboard events (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_click(self, x: float, y: float, button: int) -> None:
|
||||
"""Handle mouse clicks."""
|
||||
"""Handle mouse clicks (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_enter(self) -> None:
|
||||
"""Called when entering the scene."""
|
||||
"""Called when entering the scene (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_exit(self) -> None:
|
||||
"""Called when leaving the scene."""
|
||||
"""Called when leaving the scene (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def on_resize(self, width: int, height: int) -> None:
|
||||
"""Handle window resize events."""
|
||||
"""Handle window resize events (override in subclass)."""
|
||||
...
|
||||
|
||||
|
||||
def update(self, dt: float) -> None:
|
||||
"""Update scene logic."""
|
||||
"""Update scene logic (override in subclass)."""
|
||||
...
|
||||
|
||||
class Timer:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue