Compare commits

...

4 commits

Author SHA1 Message Date
335efc5514 feat: Implement enhanced action economy for LLM agent orchestration (#156)
- Add action economy system with free (LOOK, SPEAK) vs turn-ending (GO, WAIT, TAKE) actions
- Implement LOOK action with detailed descriptions for doors, objects, entities, directions
- Add SPEAK/ANNOUNCE speech system with room-wide and proximity-based message delivery
- Create multi-tile pathing with FOV interrupt detection (path cancels when new entity visible)
- Implement TAKE action with adjacency requirement and clear error messages
- Add conversation history and error feedback loop so agents learn from failed actions
- Create structured simulation logging for offline viewer replay
- Document offline viewer requirements in OFFLINE_VIEWER_SPEC.md
- Fix import path in 1_multi_agent_demo.py for standalone execution

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 20:50:00 -05:00
85e90088d5 fix: Register keypressScene after setScene (closes #143)
keypressScene() sets the handler for the CURRENT scene, so we must
call setScene() first to make focus_demo the active scene before
registering the key handler.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 15:35:48 -05:00
b6ec0fe7ab feat: Add focus system demo for #143
Implements a comprehensive Python-level focus management system showing:
- FocusManager: central coordinator for keyboard routing, tab cycling, modal stack
- ModifierTracker: workaround for tracking Shift/Ctrl/Alt state (#160)
- FocusableGrid: WASD movement in a grid with player marker
- TextInputWidget: text entry with cursor, backspace, home/end
- MenuIcon: icons that open modal dialogs on Space/Enter

Features demonstrated:
- Click-to-focus on any widget
- Tab/Shift+Tab cycling through focusable widgets
- Visual focus indicators (blue outline)
- Keyboard routing to focused widget
- Modal dialog push/pop stack
- Escape to close modals

Addresses #143

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 15:30:17 -05:00
89986323f8 docs: Add missing Drawable callbacks and Scene.on_key to stubs
Add to Drawable base class:
- on_click, on_enter, on_exit, on_move callbacks (#140, #141)
- hovered read-only property (#140)

Add to Scene class:
- children property (#151)
- on_key handler property

Discovered while defining implementation details for #143.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 14:49:17 -05:00
9 changed files with 3104 additions and 44 deletions

View file

@ -75,22 +75,31 @@ class Font:
class Drawable:
"""Base class for all drawable UI elements."""
x: float
y: float
visible: bool
z_index: int
name: str
pos: Vector
# Mouse event callbacks (#140, #141)
on_click: Optional[Callable[[float, float, int, str], None]]
on_enter: Optional[Callable[[float, float, int, str], None]]
on_exit: Optional[Callable[[float, float, int, str], None]]
on_move: Optional[Callable[[float, float, int, str], None]]
# Read-only hover state (#140)
hovered: bool
def get_bounds(self) -> Tuple[float, float, float, float]:
"""Get bounding box as (x, y, width, height)."""
...
def move(self, dx: float, dy: float) -> None:
"""Move by relative offset (dx, dy)."""
...
def resize(self, width: float, height: float) -> None:
"""Resize to new dimensions (width, height)."""
...
@ -331,45 +340,47 @@ class EntityCollection:
class Scene:
"""Base class for object-oriented scenes."""
name: str
children: UICollection # #151: UI elements collection (read-only alias for get_ui())
on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action)
def __init__(self, name: str) -> None: ...
def activate(self) -> None:
"""Called when scene becomes active."""
...
def deactivate(self) -> None:
"""Called when scene becomes inactive."""
...
def get_ui(self) -> UICollection:
"""Get UI elements collection."""
...
def on_keypress(self, key: str, pressed: bool) -> None:
"""Handle keyboard events."""
"""Handle keyboard events (override in subclass)."""
...
def on_click(self, x: float, y: float, button: int) -> None:
"""Handle mouse clicks."""
"""Handle mouse clicks (override in subclass)."""
...
def on_enter(self) -> None:
"""Called when entering the scene."""
"""Called when entering the scene (override in subclass)."""
...
def on_exit(self) -> None:
"""Called when leaving the scene."""
"""Called when leaving the scene (override in subclass)."""
...
def on_resize(self, width: int, height: int) -> None:
"""Handle window resize events."""
"""Handle window resize events (override in subclass)."""
...
def update(self, dt: float) -> None:
"""Update scene logic."""
"""Update scene logic (override in subclass)."""
...
class Timer:

View file

@ -0,0 +1,808 @@
#!/usr/bin/env python3
"""Focus System Demo for McRogueFace
Demonstrates a Python-level focus management system using engine primitives.
This shows how game developers can implement keyboard navigation without
requiring C++ engine changes.
Features demonstrated:
- Click-to-focus
- Tab/Shift+Tab cycling
- Visual focus indicators
- Keyboard routing to focused widget
- Modal focus stack
- Three widget types: Grid (WASD), TextInput, MenuIcon
Issue: #143
"""
import mcrfpy
import sys
# =============================================================================
# Modifier Key Tracker (workaround until #160 is implemented)
# =============================================================================
class ModifierTracker:
"""Tracks modifier key state since engine doesn't expose this yet."""
def __init__(self):
self.shift = False
self.ctrl = False
self.alt = False
def update(self, key: str, action: str):
"""Call this from your key handler to update modifier state."""
if key in ("LShift", "RShift"):
self.shift = (action == "start")
elif key in ("LControl", "RControl"):
self.ctrl = (action == "start")
elif key in ("LAlt", "RAlt"):
self.alt = (action == "start")
# =============================================================================
# Focus Manager
# =============================================================================
class FocusManager:
"""Central focus coordinator for a scene.
Manages which widget receives keyboard input, handles tab cycling,
and maintains a modal stack for popup dialogs.
"""
# Focus indicator colors
FOCUS_COLOR = mcrfpy.Color(0, 150, 255) # Blue
UNFOCUS_COLOR = mcrfpy.Color(80, 80, 80) # Dark gray
FOCUS_OUTLINE = 3.0
UNFOCUS_OUTLINE = 1.0
def __init__(self):
self.widgets = [] # List of (widget, focusable: bool)
self.focus_index = -1 # Currently focused widget index
self.modal_stack = [] # Stack of (modal_frame, previous_focus_index)
self.modifiers = ModifierTracker()
def register(self, widget, focusable: bool = True):
"""Add a widget to the focus order.
Args:
widget: Object implementing on_focus(), on_blur(), handle_key()
focusable: Whether this widget can receive focus via Tab
"""
self.widgets.append((widget, focusable))
# Give widget a reference back to us for click-to-focus
widget._focus_manager = self
widget._focus_index = len(self.widgets) - 1
def focus(self, widget_or_index):
"""Set focus to a specific widget."""
# Resolve to index
if isinstance(widget_or_index, int):
new_index = widget_or_index
else:
new_index = next(
(i for i, (w, _) in enumerate(self.widgets) if w is widget_or_index),
-1
)
if new_index < 0 or new_index >= len(self.widgets):
return
# Blur old widget
if 0 <= self.focus_index < len(self.widgets):
old_widget, _ = self.widgets[self.focus_index]
if hasattr(old_widget, 'on_blur'):
old_widget.on_blur()
# Focus new widget
self.focus_index = new_index
new_widget, _ = self.widgets[new_index]
if hasattr(new_widget, 'on_focus'):
new_widget.on_focus()
def cycle(self, direction: int = 1):
"""Cycle focus to next/previous focusable widget.
Args:
direction: 1 for next (Tab), -1 for previous (Shift+Tab)
"""
if not self.widgets:
return
start = self.focus_index if self.focus_index >= 0 else 0
current = start
for _ in range(len(self.widgets)):
current = (current + direction) % len(self.widgets)
widget, focusable = self.widgets[current]
if focusable:
self.focus(current)
return
# No focusable widget found, stay where we are
def push_modal(self, modal_frame, first_focus_widget=None):
"""Push a modal onto the focus stack.
Args:
modal_frame: The Frame to show as modal
first_focus_widget: Widget to focus inside modal (optional)
"""
# Save current focus
self.modal_stack.append((modal_frame, self.focus_index))
# Show modal
modal_frame.visible = True
# Focus first widget in modal if specified
if first_focus_widget is not None:
self.focus(first_focus_widget)
def pop_modal(self):
"""Pop the top modal and restore previous focus."""
if not self.modal_stack:
return False
modal_frame, previous_focus = self.modal_stack.pop()
modal_frame.visible = False
# Restore focus
if previous_focus >= 0:
self.focus(previous_focus)
return True
def handle_key(self, key: str, action: str) -> bool:
"""Main key handler - route to focused widget or handle global keys.
Returns True if key was consumed.
"""
# Always update modifier state
self.modifiers.update(key, action)
# Only process on key press, not release (key repeat sends multiple "start")
if action != "start":
return False
# Global: Escape closes modals
if key == "Escape":
if self.pop_modal():
return True
# Global: Tab cycles focus
if key == "Tab":
direction = -1 if self.modifiers.shift else 1
self.cycle(direction)
return True
# Route to focused widget
if 0 <= self.focus_index < len(self.widgets):
widget, _ = self.widgets[self.focus_index]
if hasattr(widget, 'handle_key'):
if widget.handle_key(key, action):
return True
return False
# =============================================================================
# Focusable Widgets
# =============================================================================
class FocusableGrid:
"""A grid where WASD keys move a player entity.
Demonstrates focus on a game-world element.
"""
def __init__(self, x: float, y: float, grid_w: int, grid_h: int,
tile_size: int = 16, zoom: float = 2.0):
self.grid_w = grid_w
self.grid_h = grid_h
self.tile_size = tile_size
self.zoom = zoom
self.base_x = x
self.base_y = y
# Calculate pixel dimensions
self.cell_px = tile_size * zoom # Pixels per cell
grid_pixel_w = grid_w * self.cell_px
grid_pixel_h = grid_h * self.cell_px
# Create the grid background
self.grid = mcrfpy.Grid(
pos=(x, y),
grid_size=(grid_w, grid_h),
size=(grid_pixel_w, grid_pixel_h)
)
self.grid.zoom = zoom
self.grid.fill_color = mcrfpy.Color(40, 40, 55)
# Add outline frame for focus indication
self.outline_frame = mcrfpy.Frame(
pos=(x - 2, y - 2),
size=(grid_pixel_w + 4, grid_pixel_h + 4),
fill_color=mcrfpy.Color(0, 0, 0, 0),
outline_color=FocusManager.UNFOCUS_COLOR,
outline=FocusManager.UNFOCUS_OUTLINE
)
# Player marker (a bright square overlay)
self.player_x = grid_w // 2
self.player_y = grid_h // 2
marker_size = self.cell_px - 4 # Slightly smaller than cell
self.player_marker = mcrfpy.Frame(
pos=(0, 0), # Will be positioned by _update_player_display
size=(marker_size, marker_size),
fill_color=mcrfpy.Color(255, 200, 50),
outline_color=mcrfpy.Color(255, 150, 0),
outline=2
)
self._update_player_display()
# Click handler
self.grid.on_click = self._on_click
# Focus manager reference (set by FocusManager.register)
self._focus_manager = None
self._focus_index = -1
def _on_click(self, x, y, button, action):
"""Handle click to focus this grid."""
if self._focus_manager and action == "start":
self._focus_manager.focus(self._focus_index)
def _update_player_display(self):
"""Update the visual representation of player position."""
# Position the player marker
px = self.base_x + (self.player_x * self.cell_px) + 2
py = self.base_y + (self.player_y * self.cell_px) + 2
self.player_marker.x = px
self.player_marker.y = py
def on_focus(self):
"""Called when this widget gains focus."""
self.outline_frame.outline_color = FocusManager.FOCUS_COLOR
self.outline_frame.outline = FocusManager.FOCUS_OUTLINE
def on_blur(self):
"""Called when this widget loses focus."""
self.outline_frame.outline_color = FocusManager.UNFOCUS_COLOR
self.outline_frame.outline = FocusManager.UNFOCUS_OUTLINE
def handle_key(self, key: str, action: str) -> bool:
"""Handle WASD movement."""
moves = {
"W": (0, -1), "Up": (0, -1),
"A": (-1, 0), "Left": (-1, 0),
"S": (0, 1), "Down": (0, 1),
"D": (1, 0), "Right": (1, 0),
}
if key in moves:
dx, dy = moves[key]
new_x = self.player_x + dx
new_y = self.player_y + dy
# Bounds check
if 0 <= new_x < self.grid_w and 0 <= new_y < self.grid_h:
self.player_x = new_x
self.player_y = new_y
self._update_player_display()
return True
return False
def add_to_scene(self, ui):
"""Add all components to a scene's UI collection."""
ui.append(self.outline_frame)
ui.append(self.grid)
ui.append(self.player_marker)
class TextInputWidget:
"""A text input field with cursor and editing.
Demonstrates text entry with focus indication.
"""
def __init__(self, x: float, y: float, width: float, label: str = "",
placeholder: str = ""):
self.x = x
self.y = y
self.width = width
self.height = 28
self.label_text = label
self.placeholder_text = placeholder
# State
self.text = ""
self.cursor_pos = 0
self.focused = False
# Create UI elements
self._create_ui()
# Focus manager reference
self._focus_manager = None
self._focus_index = -1
def _create_ui(self):
"""Create the visual components."""
# Label above input
if self.label_text:
self.label = mcrfpy.Caption(
text=self.label_text,
pos=(self.x, self.y - 20)
)
self.label.fill_color = mcrfpy.Color(200, 200, 200)
# Input background
self.frame = mcrfpy.Frame(
pos=(self.x, self.y),
size=(self.width, self.height),
fill_color=mcrfpy.Color(40, 40, 50),
outline_color=FocusManager.UNFOCUS_COLOR,
outline=FocusManager.UNFOCUS_OUTLINE
)
self.frame.on_click = self._on_click
# Placeholder text
self.placeholder = mcrfpy.Caption(
text=self.placeholder_text,
pos=(self.x + 6, self.y + 5)
)
self.placeholder.fill_color = mcrfpy.Color(100, 100, 100)
# Actual text display
self.display = mcrfpy.Caption(
text="",
pos=(self.x + 6, self.y + 5)
)
self.display.fill_color = mcrfpy.Color(255, 255, 255)
# Cursor (thin frame)
self.cursor = mcrfpy.Frame(
pos=(self.x + 6, self.y + 4),
size=(2, self.height - 8),
fill_color=mcrfpy.Color(255, 255, 255)
)
self.cursor.visible = False
def _on_click(self, x, y, button, action):
"""Handle click to focus."""
if self._focus_manager and action == "start":
self._focus_manager.focus(self._focus_index)
def _update_display(self):
"""Update visual state."""
self.display.text = self.text
self.placeholder.visible = (not self.text and not self.focused)
self._update_cursor()
def _update_cursor(self):
"""Update cursor position."""
# Approximate character width (monospace assumption)
char_width = 10
self.cursor.x = self.x + 6 + (self.cursor_pos * char_width)
def on_focus(self):
"""Called when gaining focus."""
self.focused = True
self.frame.outline_color = FocusManager.FOCUS_COLOR
self.frame.outline = FocusManager.FOCUS_OUTLINE
self.cursor.visible = True
self._update_display()
def on_blur(self):
"""Called when losing focus."""
self.focused = False
self.frame.outline_color = FocusManager.UNFOCUS_COLOR
self.frame.outline = FocusManager.UNFOCUS_OUTLINE
self.cursor.visible = False
self._update_display()
def handle_key(self, key: str, action: str) -> bool:
"""Handle text input and editing keys."""
if not self.focused:
return False
old_text = self.text
handled = True
if key == "BackSpace":
if self.cursor_pos > 0:
self.text = self.text[:self.cursor_pos-1] + self.text[self.cursor_pos:]
self.cursor_pos -= 1
elif key == "Delete":
if self.cursor_pos < len(self.text):
self.text = self.text[:self.cursor_pos] + self.text[self.cursor_pos+1:]
elif key == "Left":
self.cursor_pos = max(0, self.cursor_pos - 1)
elif key == "Right":
self.cursor_pos = min(len(self.text), self.cursor_pos + 1)
elif key == "Home":
self.cursor_pos = 0
elif key == "End":
self.cursor_pos = len(self.text)
elif key in ("Return", "Tab"):
# Don't consume - let focus manager handle
handled = False
elif len(key) == 1 and key.isprintable():
# Insert character
self.text = self.text[:self.cursor_pos] + key + self.text[self.cursor_pos:]
self.cursor_pos += 1
else:
handled = False
self._update_display()
return handled
def get_text(self) -> str:
"""Get the current text value."""
return self.text
def set_text(self, text: str):
"""Set the text value."""
self.text = text
self.cursor_pos = len(text)
self._update_display()
def add_to_scene(self, ui):
"""Add all components to the scene."""
if hasattr(self, 'label'):
ui.append(self.label)
ui.append(self.frame)
ui.append(self.placeholder)
ui.append(self.display)
ui.append(self.cursor)
class MenuIcon:
"""An icon that opens a modal dialog when activated.
Demonstrates activation via Space/Enter and modal focus.
"""
def __init__(self, x: float, y: float, size: float, icon_char: str,
tooltip: str, modal_content_builder=None):
self.x = x
self.y = y
self.size = size
self.tooltip = tooltip
self.modal_content_builder = modal_content_builder
self.modal = None
# Create icon frame
self.frame = mcrfpy.Frame(
pos=(x, y),
size=(size, size),
fill_color=mcrfpy.Color(60, 60, 80),
outline_color=FocusManager.UNFOCUS_COLOR,
outline=FocusManager.UNFOCUS_OUTLINE
)
self.frame.on_click = self._on_click
# Icon character (centered)
self.icon = mcrfpy.Caption(
text=icon_char,
pos=(x + size//3, y + size//6)
)
self.icon.fill_color = mcrfpy.Color(200, 200, 220)
# Tooltip (shown on hover/focus)
self.tooltip_caption = mcrfpy.Caption(
text=tooltip,
pos=(x, y + size + 4)
)
self.tooltip_caption.fill_color = mcrfpy.Color(150, 150, 150)
self.tooltip_caption.visible = False
# Focus manager reference
self._focus_manager = None
self._focus_index = -1
def _on_click(self, x, y, button, action):
"""Handle click to focus or activate."""
if not self._focus_manager:
return
if action == "start":
# If already focused, activate; otherwise just focus
if self._focus_manager.focus_index == self._focus_index:
self._activate()
else:
self._focus_manager.focus(self._focus_index)
def _activate(self):
"""Open the modal dialog."""
if self.modal and self._focus_manager:
self._focus_manager.push_modal(self.modal)
def on_focus(self):
"""Called when gaining focus."""
self.frame.outline_color = FocusManager.FOCUS_COLOR
self.frame.outline = FocusManager.FOCUS_OUTLINE
self.frame.fill_color = mcrfpy.Color(80, 80, 110)
self.tooltip_caption.visible = True
def on_blur(self):
"""Called when losing focus."""
self.frame.outline_color = FocusManager.UNFOCUS_COLOR
self.frame.outline = FocusManager.UNFOCUS_OUTLINE
self.frame.fill_color = mcrfpy.Color(60, 60, 80)
self.tooltip_caption.visible = False
def handle_key(self, key: str, action: str) -> bool:
"""Handle activation keys."""
if key in ("Space", "Return"):
self._activate()
return True
return False
def set_modal(self, modal_frame):
"""Set the modal frame this icon opens."""
self.modal = modal_frame
def add_to_scene(self, ui):
"""Add all components to the scene."""
ui.append(self.frame)
ui.append(self.icon)
ui.append(self.tooltip_caption)
# =============================================================================
# Modal Dialog Builder
# =============================================================================
def create_modal(x: float, y: float, width: float, height: float,
title: str) -> mcrfpy.Frame:
"""Create a modal dialog frame."""
# Semi-transparent backdrop
# Note: This is simplified - real implementation might want fullscreen backdrop
# Modal frame
modal = mcrfpy.Frame(
pos=(x, y),
size=(width, height),
fill_color=mcrfpy.Color(40, 40, 50),
outline_color=mcrfpy.Color(100, 100, 120),
outline=2
)
modal.visible = False
# Title
title_caption = mcrfpy.Caption(
text=title,
pos=(x + 10, y + 8)
)
title_caption.fill_color = mcrfpy.Color(220, 220, 240)
modal.children.append(title_caption)
# Close hint
close_hint = mcrfpy.Caption(
text="[Esc to close]",
pos=(x + width - 100, y + 8)
)
close_hint.fill_color = mcrfpy.Color(120, 120, 140)
modal.children.append(close_hint)
return modal
# =============================================================================
# Demo Scene Setup
# =============================================================================
def create_demo_scene():
"""Create and populate the focus system demo scene."""
# Create scene
mcrfpy.createScene("focus_demo")
ui = mcrfpy.sceneUI("focus_demo")
# Background
bg = mcrfpy.Frame(
pos=(0, 0),
size=(1024, 768),
fill_color=mcrfpy.Color(25, 25, 35)
)
ui.append(bg)
# Title
title = mcrfpy.Caption(
text="Focus System Demo",
pos=(20, 15)
)
title.fill_color = mcrfpy.Color(255, 255, 255)
ui.append(title)
# Instructions
instructions = mcrfpy.Caption(
text="Tab: cycle focus | Shift+Tab: reverse | WASD: move in grid | Space/Enter: activate | Esc: close modal",
pos=(20, 45)
)
instructions.fill_color = mcrfpy.Color(150, 150, 170)
ui.append(instructions)
# Create focus manager
focus_mgr = FocusManager()
# --- Grid Section ---
grid_label = mcrfpy.Caption(text="Game Grid (WASD to move)", pos=(50, 90))
grid_label.fill_color = mcrfpy.Color(180, 180, 200)
ui.append(grid_label)
grid_widget = FocusableGrid(50, 115, 10, 8, tile_size=16, zoom=2.0)
grid_widget.add_to_scene(ui)
focus_mgr.register(grid_widget)
# --- Text Inputs Section ---
input_label = mcrfpy.Caption(text="Text Inputs", pos=(400, 90))
input_label.fill_color = mcrfpy.Color(180, 180, 200)
ui.append(input_label)
name_input = TextInputWidget(400, 130, 250, label="Name:", placeholder="Enter your name")
name_input.add_to_scene(ui)
focus_mgr.register(name_input)
class_input = TextInputWidget(400, 200, 250, label="Class:", placeholder="e.g. Warrior, Mage")
class_input.add_to_scene(ui)
focus_mgr.register(class_input)
notes_input = TextInputWidget(400, 270, 350, label="Notes:", placeholder="Additional notes...")
notes_input.add_to_scene(ui)
focus_mgr.register(notes_input)
# --- Menu Icons Section ---
icons_label = mcrfpy.Caption(text="Menu Icons", pos=(50, 390))
icons_label.fill_color = mcrfpy.Color(180, 180, 200)
ui.append(icons_label)
# Help icon
help_icon = MenuIcon(50, 420, 48, "?", "Help")
help_icon.add_to_scene(ui)
focus_mgr.register(help_icon)
help_modal = create_modal(200, 150, 400, 300, "Help")
ui.append(help_modal)
help_text = mcrfpy.Caption(
text="This demo shows focus management.\n\nUse Tab to move between widgets.\nWASD moves the player in the grid.\nType in text fields.\nPress Space on icons to open dialogs.",
pos=(210, 190)
)
help_text.fill_color = mcrfpy.Color(200, 200, 200)
help_modal.children.append(help_text)
help_icon.set_modal(help_modal)
# Settings icon
settings_icon = MenuIcon(110, 420, 48, "S", "Settings")
settings_icon.add_to_scene(ui)
focus_mgr.register(settings_icon)
settings_modal = create_modal(200, 150, 400, 250, "Settings")
ui.append(settings_modal)
settings_text = mcrfpy.Caption(
text="Settings would go here.\n\n(This is a placeholder modal)",
pos=(210, 190)
)
settings_text.fill_color = mcrfpy.Color(200, 200, 200)
settings_modal.children.append(settings_text)
settings_icon.set_modal(settings_modal)
# Inventory icon
inv_icon = MenuIcon(170, 420, 48, "I", "Inventory")
inv_icon.add_to_scene(ui)
focus_mgr.register(inv_icon)
inv_modal = create_modal(200, 150, 400, 300, "Inventory")
ui.append(inv_modal)
inv_text = mcrfpy.Caption(
text="Your inventory:\n\n- Sword\n- Shield\n- 3x Potions",
pos=(210, 190)
)
inv_text.fill_color = mcrfpy.Color(200, 200, 200)
inv_modal.children.append(inv_text)
inv_icon.set_modal(inv_modal)
# --- Status Display ---
status_frame = mcrfpy.Frame(
pos=(50, 520),
size=(700, 80),
fill_color=mcrfpy.Color(35, 35, 45),
outline_color=mcrfpy.Color(60, 60, 70),
outline=1
)
ui.append(status_frame)
status_label = mcrfpy.Caption(text="Status", pos=(60, 530))
status_label.fill_color = mcrfpy.Color(150, 150, 170)
ui.append(status_label)
status_text = mcrfpy.Caption(text="Click or Tab to focus a widget", pos=(60, 555))
status_text.fill_color = mcrfpy.Color(200, 200, 200)
ui.append(status_text)
# Store references for status updates
demo_state = {
'focus_mgr': focus_mgr,
'status_text': status_text,
'grid': grid_widget,
'inputs': [name_input, class_input, notes_input],
'icons': [help_icon, settings_icon, inv_icon],
}
# Key handler that routes to focus manager
def on_key(key: str, action: str):
focus_mgr.handle_key(key, action)
# Update status display
if focus_mgr.focus_index >= 0:
widget, _ = focus_mgr.widgets[focus_mgr.focus_index]
if widget is grid_widget:
status_text.text = f"Grid focused - Player at ({grid_widget.player_x}, {grid_widget.player_y})"
elif widget in demo_state['inputs']:
idx = demo_state['inputs'].index(widget)
labels = ["Name", "Class", "Notes"]
status_text.text = f"{labels[idx]} input focused - Text: '{widget.get_text()}'"
elif widget in demo_state['icons']:
status_text.text = f"Icon focused: {widget.tooltip}"
else:
status_text.text = "No widget focused"
# Activate scene first (keypressScene sets handler for CURRENT scene)
mcrfpy.setScene("focus_demo")
# Register key handler for the now-current scene
mcrfpy.keypressScene(on_key)
# Set initial focus
focus_mgr.focus(0)
return demo_state
# =============================================================================
# Entry Point
# =============================================================================
def run_demo():
"""Run the focus system demo."""
print("=== Focus System Demo ===")
print("Demonstrating Python-level focus management")
print()
print("Controls:")
print(" Tab / Shift+Tab - Cycle between widgets")
print(" WASD / Arrows - Move player in grid (when focused)")
print(" Type - Enter text in inputs (when focused)")
print(" Space / Enter - Activate icons (when focused)")
print(" Escape - Close modal dialogs")
print(" Click - Focus clicked widget")
print()
demo_state = create_demo_scene()
# Set up exit timer for headless testing
def check_exit(dt):
# In headless mode, exit after a short delay
# In interactive mode, this won't trigger
pass
# mcrfpy.setTimer("demo_check", check_exit, 100)
# Run if executed directly
if __name__ == "__main__":
import sys
from mcrfpy import automation
run_demo()
# If --screenshot flag, take a screenshot and exit
if "--screenshot" in sys.argv or len(sys.argv) > 1:
def take_screenshot(dt):
automation.screenshot("focus_demo_screenshot.png")
print("Screenshot saved: focus_demo_screenshot.png")
sys.exit(0)
mcrfpy.setTimer("screenshot", take_screenshot, 200)

View file

@ -14,12 +14,15 @@ Three agents:
Each agent gets their own screenshot and VLLM query.
"""
import sys
import os
# Add the vllm_demo directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import mcrfpy
from mcrfpy import automation
import sys
import requests
import base64
import os
import random
from action_parser import parse_action

View file

@ -0,0 +1,436 @@
#!/usr/bin/env python3
"""
Enhanced Action Demo
====================
Demonstrates the enhanced action economy system:
- Free actions (LOOK, SPEAK/ANNOUNCE) vs turn-ending (MOVE, WAIT)
- Points of interest targeting for LOOK/MOVE
- Speech system with room-wide ANNOUNCE and proximity SPEAK
- Multi-tile path continuation with FOV interrupts
- Enhanced logging for offline viewer replay
This implements the turn-based LLM agent orchestration from issue #156.
"""
import sys
import os
# Add the vllm_demo directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import mcrfpy
from mcrfpy import automation
import requests
import base64
from world_graph import (
WorldGraph, Room, Door, WorldObject, Direction, AgentInfo,
create_two_room_scenario, create_button_door_scenario
)
from action_parser import parse_action
from enhanced_executor import EnhancedExecutor
from enhanced_orchestrator import EnhancedOrchestrator, EnhancedSimulationLog
# Configuration
VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions"
SCREENSHOT_DIR = "/tmp/vllm_enhanced_demo"
LOG_PATH = "/tmp/vllm_enhanced_demo/simulation_log.json"
MAX_TURNS = 3
# Sprites
FLOOR_TILE = 0
WALL_TILE = 40
WIZARD_SPRITE = 84
KNIGHT_SPRITE = 96
RAT_SPRITE = 123
class Agent:
"""Agent with WorldGraph integration."""
def __init__(self, name: str, display_name: str, entity, world: WorldGraph):
self.name = name
self.display_name = display_name
self.entity = entity
self.world = world
self.message_history = []
@property
def pos(self) -> tuple:
return (int(self.entity.pos[0]), int(self.entity.pos[1]))
@property
def current_room(self) -> str:
room = self.world.room_at(*self.pos)
return room.name if room else None
def get_context(self, visible_agents: list) -> dict:
"""Build context for LLM query."""
room_name = self.current_room
agent_infos = [
AgentInfo(
name=a.name,
display_name=a.display_name,
position=a.pos,
is_player=(a.name == self.name)
)
for a in visible_agents
]
return {
"location": self.world.describe_room(room_name, agent_infos, self.name),
"available_actions": self.world.get_available_actions(room_name),
"recent_messages": self.message_history[-5:],
}
def file_to_base64(path: str) -> str:
"""Convert file to base64 string."""
with open(path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def llm_query(agent, screenshot_path: str, context: dict) -> str:
"""
Query VLLM for agent action with enhanced context.
Includes points of interest, action economy hints, error feedback,
and conversation history.
"""
system_prompt = f"""You are {agent.display_name} exploring a dungeon.
You receive visual and text information about your surroundings.
ACTION ECONOMY:
- LOOK <target>: Free action. Examine something, then choose another action.
- SPEAK "<message>" or ANNOUNCE "<message>": Free action (once per turn). Then choose another action.
- GO <direction>: Ends your turn. Move one tile in that direction (NORTH/SOUTH/EAST/WEST).
- TAKE <item>: Ends your turn. Pick up an item you are standing next to.
- WAIT: Ends your turn without moving.
IMPORTANT: You can only TAKE items that are adjacent to you (1 tile away). If something is far away, GO towards it first.
You can LOOK or SPEAK, then still MOVE in the same turn.
Always end your final response with: Action: <YOUR_ACTION>"""
# Build enhanced prompt
parts = [context["location"]]
# Add received messages
if context.get("messages"):
parts.append("\nMessages received this turn:")
for msg in context["messages"]:
sender = msg.get("sender", "someone")
content = msg.get("content", "")
parts.append(f' {sender} says: "{content}"')
# Add points of interest
if context.get("poi_prompt"):
parts.append(f"\n{context['poi_prompt']}")
# Add available actions
actions_str = ", ".join(context.get("available_actions", []))
parts.append(f"\nAvailable actions: {actions_str}")
# Add action economy hint
if context.get("has_spoken"):
parts.append("\n[You have already spoken this turn - you can still MOVE or WAIT]")
# Add error feedback from last failed action
if context.get("last_error"):
parts.append(f"\n[ERROR: {context['last_error']}]")
parts.append("[Your last action failed. Please try a different action.]")
# Add conversation history from this turn
if context.get("conversation_history"):
parts.append("\n[Previous attempts this turn:")
for exch in context["conversation_history"]:
action_str = f"{exch.get('action_type', '?')} {exch.get('action_args', '')}"
if exch.get("error"):
parts.append(f" - You tried: {action_str} -> FAILED: {exch['error']}")
else:
parts.append(f" - You did: {action_str}")
parts.append("]")
parts.append("\n[Screenshot attached showing your current view]")
parts.append("\nWhat do you do? Brief reasoning (1-2 sentences), then Action: <action>")
user_prompt = "\n".join(parts)
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
{"type": "image_url", "image_url": {
"url": "data:image/png;base64," + file_to_base64(screenshot_path)
}}
]
}
]
try:
resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60)
data = resp.json()
if "error" in data:
return f"[VLLM Error: {data['error']}]"
return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
except Exception as e:
return f"[Connection Error: {e}]"
def setup_scene(world: WorldGraph):
"""Create McRogueFace scene from WorldGraph."""
mcrfpy.createScene("enhanced_demo")
mcrfpy.setScene("enhanced_demo")
ui = mcrfpy.sceneUI("enhanced_demo")
texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16)
grid = mcrfpy.Grid(
grid_size=(25, 15),
texture=texture,
pos=(5, 5),
size=(1014, 700)
)
grid.fill_color = mcrfpy.Color(20, 20, 30)
grid.zoom = 2.0
ui.append(grid)
# Initialize all as walls
for x in range(25):
for y in range(15):
p = grid.at(x, y)
p.tilesprite = WALL_TILE
p.walkable = False
p.transparent = False
# Carve rooms from WorldGraph
for room in world.rooms.values():
for rx in range(room.x, room.x + room.width):
for ry in range(room.y, room.y + room.height):
if 0 <= rx < 25 and 0 <= ry < 15:
p = grid.at(rx, ry)
p.tilesprite = FLOOR_TILE
p.walkable = True
p.transparent = True
# Place doors
for door in world.doors:
dx, dy = door.position
if 0 <= dx < 25 and 0 <= dy < 15:
p = grid.at(dx, dy)
p.tilesprite = FLOOR_TILE
p.walkable = not door.locked
p.transparent = True
# FOV layer
fov_layer = grid.add_layer('color', z_index=10)
fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
return grid, fov_layer, texture
def create_agents(grid, world: WorldGraph, texture) -> list:
"""Create agents in their starting rooms."""
agents = []
# Wizard in guard_room (left)
room_a = world.rooms["guard_room"]
wizard = mcrfpy.Entity(
grid_pos=room_a.center,
texture=texture,
sprite_index=WIZARD_SPRITE
)
wizard.name = "wizard"
grid.entities.append(wizard)
agents.append(Agent("Wizard", "a wizard", wizard, world))
# Knight in armory (right)
room_b = world.rooms["armory"]
knight = mcrfpy.Entity(
grid_pos=room_b.center,
texture=texture,
sprite_index=KNIGHT_SPRITE
)
knight.name = "knight"
grid.entities.append(knight)
agents.append(Agent("Knight", "a knight", knight, world))
return agents
def add_rat(grid, world: WorldGraph, texture, position: tuple):
"""Add a rat entity at the specified position."""
rat = mcrfpy.Entity(
grid_pos=position,
texture=texture,
sprite_index=RAT_SPRITE
)
rat.name = "rat"
grid.entities.append(rat)
return rat
def run_demo():
"""Run enhanced action demo."""
print("=" * 70)
print("Enhanced Action Demo")
print("=" * 70)
print("""
Features demonstrated:
- LOOK as free action (doesn't end turn)
- SPEAK/ANNOUNCE as free action (once per turn)
- Points of interest targeting
- Enhanced logging for offline viewer
""")
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Create world
print("Creating world...")
world = create_two_room_scenario()
print(f" Rooms: {list(world.rooms.keys())}")
print(f" Objects: {list(world.objects.keys())}")
# Setup scene
print("\nSetting up scene...")
grid, fov_layer, texture = setup_scene(world)
# Create agents
print("\nCreating agents...")
agents = create_agents(grid, world, texture)
# Add a rat near the door for interest
rat = add_rat(grid, world, texture, (9, 4))
print(f" Added rat at (9, 4)")
for agent in agents:
print(f" {agent.name} at {agent.pos} in {agent.current_room}")
# Create enhanced orchestrator
print("\nInitializing enhanced orchestrator...")
orchestrator = EnhancedOrchestrator(
grid=grid,
fov_layer=fov_layer,
world=world,
agents=agents,
screenshot_dir=SCREENSHOT_DIR,
llm_query_fn=llm_query
)
# Run simulation
print(f"\nRunning simulation ({MAX_TURNS} turns)...")
log = orchestrator.run_simulation(max_turns=MAX_TURNS)
# Save enhanced log
log.save(LOG_PATH)
# Print summary
print("\n" + "=" * 70)
print("SIMULATION SUMMARY")
print("=" * 70)
for turn in range(1, orchestrator.turn_number + 1):
print(log.get_turn_summary(turn))
# Print speech log
if log.speech_log:
print("\n" + "-" * 40)
print("SPEECH LOG")
print("-" * 40)
for entry in log.speech_log:
print(f" Turn {entry['turn']}: {entry['speaker']} {entry['type']}s: \"{entry['content'][:50]}...\"")
if entry['recipients']:
print(f" -> Heard by: {', '.join(entry['recipients'])}")
print("\n" + "=" * 70)
print("Demo Complete")
print("=" * 70)
print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/")
print(f"Simulation log saved to: {LOG_PATH}")
print("\nLog structure (for offline viewer):")
print(" - metadata: simulation info")
print(" - steps[]: per-agent-turn records with:")
print(" - screenshot_path, position, room")
print(" - llm_prompt_user, llm_response")
print(" - free_actions[] (LOOK, SPEAK)")
print(" - final_action (MOVE, WAIT)")
print(" - speech_log[]: all speech events")
return True
def replay_log(log_path: str):
"""
Replay a simulation from log file.
This is a text-based preview of what the offline viewer would show.
"""
print(f"Loading simulation from: {log_path}")
try:
log = EnhancedSimulationLog.load(log_path)
except FileNotFoundError:
print(f"Log file not found: {log_path}")
return
print("\n" + "=" * 70)
print("SIMULATION REPLAY")
print("=" * 70)
print(f"Turns: {log.metadata.get('total_turns', '?')}")
print(f"Agents: {', '.join(log.metadata.get('agent_names', []))}")
print(f"Rooms: {', '.join(log.metadata.get('world_rooms', []))}")
for step in log.steps:
print(f"\n{'='*40}")
print(f"Turn {step.turn}: {step.agent_id}")
print(f"{'='*40}")
print(f"Position: {step.position_start} -> {step.position_end}")
print(f"Room: {step.room}")
if step.pending_messages:
print(f"\nMessages received:")
for msg in step.pending_messages:
print(f" {msg.get('sender')}: \"{msg.get('content', '')[:40]}...\"")
if step.llm_was_queried:
print(f"\nLLM Response (truncated):")
print(f" {step.llm_response[:200]}...")
else:
print(f"\n[Path continuation - no LLM query]")
if step.free_actions:
print(f"\nFree actions:")
for fa in step.free_actions:
print(f" - {fa['action_type']}: {fa.get('args', ())}")
status = "OK" if step.final_action_success else "FAIL"
print(f"\nFinal: {step.final_action_type} {step.final_action_args} [{status}]")
print(f" {step.final_action_message}")
# Speech summary
if log.speech_log:
print("\n" + "=" * 40)
print("ALL SPEECH")
print("=" * 40)
for entry in log.speech_log:
print(f"Turn {entry['turn']}: {entry['speaker']} -> {entry['recipients']}")
print(f" \"{entry['content']}\"")
if __name__ == "__main__":
# Check for replay mode
if len(sys.argv) > 1 and sys.argv[1] == "--replay":
log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH
replay_log(log_file)
sys.exit(0)
# Normal execution
try:
success = run_demo()
print("\nPASS" if success else "\nFAIL")
sys.exit(0 if success else 1)
except Exception as e:
import traceback
traceback.print_exc()
sys.exit(1)

View file

@ -0,0 +1,152 @@
# Offline Viewer Specification
**Status**: Planned (issue #154)
**Priority**: After core simulation features are stable
## Overview
The Offline Viewer allows users to replay stored simulation logs in McRogueFace, stepping through turn-by-turn to review:
- Each agent's perspective (FOV, camera position)
- LLM chain-of-thought reasoning
- Actions taken and their results
- Speech between agents
## Log Format
Simulation logs are stored as JSON with this structure:
```json
{
"metadata": {
"total_turns": 5,
"num_agents": 2,
"agent_names": ["Wizard", "Knight"],
"timestamp_start": "2025-01-15T10:30:00",
"timestamp_end": "2025-01-15T10:32:45",
"world_rooms": ["guard_room", "armory"],
"screenshot_dir": "/tmp/vllm_enhanced_demo"
},
"steps": [
{
"turn": 1,
"agent_id": "Wizard",
"timestamp": "2025-01-15T10:30:15",
"position_start": [5, 4],
"position_end": [6, 4],
"room": "guard_room",
"visible_entities": ["rat_123", "knight_456"],
"visible_tiles": 42,
"points_of_interest": [
{"name": "door", "direction": "east", "distance": 4}
],
"location_description": "You are in the guard room...",
"available_actions": ["GO EAST", "LOOK", "WAIT"],
"pending_messages": [],
"poi_prompt": "Points of interest:\n - a door to the armory (east)",
"screenshot_path": "/tmp/.../turn1_wizard.png",
"llm_prompt_system": "You are a wizard...",
"llm_prompt_user": "You are in the guard room...",
"llm_response": "I see a door to the east. I should explore. Action: GO EAST",
"llm_was_queried": true,
"free_actions": [
{"action_type": "LOOK", "args": ["DOOR"], "result": {"description": "A wooden door..."}}
],
"final_action_type": "GO",
"final_action_args": ["EAST"],
"final_action_success": true,
"final_action_message": "Moved east to (6, 4)",
"path_taken": [[5, 4], [6, 4]],
"path_remaining": 0
}
],
"speech_log": [
{
"turn": 2,
"speaker": "Wizard",
"type": "announce",
"content": "Hello, is anyone there?",
"recipients": ["Knight"]
}
]
}
```
## Viewer Features (Planned)
### Core Features
1. **Turn Navigation**
- Step forward/backward through turns
- Jump to specific turn number
- Auto-play at configurable speed
2. **Agent Perspective**
- Reconstruct agent's FOV from stored data
- Center camera on current agent
- Show visible entities and tiles
3. **LLM Review Panel**
- Display system prompt
- Display user prompt (context)
- Display LLM response
- Highlight parsed action
4. **Action Log**
- Show free actions (LOOK, SPEAK)
- Show final action and result
- Color-code success/failure
5. **Speech History**
- Timeline of all speech events
- Filter by agent
- Show recipients
### Implementation Notes
The viewer should:
- Load screenshots from `screenshot_path` (if available)
- OR reconstruct scene from WorldGraph + step data
- Support keyboard navigation (arrow keys)
- Display agent state in sidebar
### UI Layout (Suggested)
```
+----------------------------------+------------------+
| | Turn: 3/10 |
| Main Viewport | Agent: Wizard |
| (Agent's Perspective) | Room: armory |
| +------------------+
| | LLM Response: |
| | "I see a rat |
| | to the east. |
| | Action: LOOK |
| | AT RAT" |
+----------------------------------+------------------+
| < Prev | Turn 3 | Next > | Actions: |
| [Agent: Wizard v] | - LOOK AT RAT |
| | - GO EAST [OK] |
+----------------------------------+------------------+
```
## Files
- `enhanced_orchestrator.py` - Generates `EnhancedSimulationLog`
- `4_enhanced_action_demo.py` - Demo with `--replay` mode for text preview
- Logs stored in `/tmp/vllm_enhanced_demo/simulation_log.json`
## Future Enhancements
- Animated path replay (smooth entity movement)
- Side-by-side multi-agent view
- Diff view comparing agent perceptions
- Export to video/GIF
- Integration with annotation tools for research

View file

@ -0,0 +1,302 @@
"""
Action Economy System
=====================
Defines which actions consume turns and which are free.
Manages multi-tile pathing with FOV interruption.
Action Categories:
- FREE: LOOK, SPEAK, ANNOUNCE (don't end turn)
- FULL: MOVE, WAIT (end turn)
Constraints:
- Only ONE speech action per turn
- LOOK provides description and prompts for another action
- Multi-tile paths continue without LLM until FOV changes
"""
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Set, Dict, Any
from enum import Enum
from action_parser import Action, ActionType
class TurnCost(Enum):
"""How much of a turn an action consumes."""
FREE = "free" # Doesn't end turn
FULL = "full" # Ends turn
# Action cost mapping
ACTION_COSTS = {
ActionType.LOOK: TurnCost.FREE,
ActionType.SPEAK: TurnCost.FREE,
ActionType.ANNOUNCE: TurnCost.FREE,
ActionType.GO: TurnCost.FULL,
ActionType.WAIT: TurnCost.FULL,
ActionType.TAKE: TurnCost.FULL,
ActionType.DROP: TurnCost.FULL,
ActionType.PUSH: TurnCost.FULL,
ActionType.USE: TurnCost.FULL,
ActionType.OPEN: TurnCost.FULL,
ActionType.CLOSE: TurnCost.FULL,
ActionType.INVALID: TurnCost.FULL, # Invalid action ends turn
}
@dataclass
class TurnState:
"""
Tracks state within a single turn.
Used to enforce constraints like "only one speech per turn"
and track free actions taken before turn-ending action.
"""
has_spoken: bool = False
free_actions: List[Dict[str, Any]] = field(default_factory=list)
turn_ended: bool = False
def can_speak(self) -> bool:
"""Check if agent can still speak this turn."""
return not self.has_spoken
def record_speech(self):
"""Record that agent has spoken this turn."""
self.has_spoken = True
def record_free_action(self, action_type: str, details: Dict[str, Any]):
"""Record a free action for logging."""
self.free_actions.append({
"type": action_type,
**details
})
def end_turn(self):
"""Mark turn as ended."""
self.turn_ended = True
@dataclass
class PathState:
"""
Tracks multi-tile movement path for an agent.
When an agent decides to move to a distant location,
we store the path and continue moving without LLM calls
until the path completes or FOV changes.
"""
path: List[Tuple[int, int]] = field(default_factory=list)
current_index: int = 0
destination_description: str = "" # "the armory", "the door"
# FOV state when path was planned
visible_entities_at_start: Set[str] = field(default_factory=set)
@property
def has_path(self) -> bool:
"""Check if there's an active path."""
return len(self.path) > self.current_index
@property
def next_tile(self) -> Optional[Tuple[int, int]]:
"""Get next tile in path, or None if path complete."""
if self.has_path:
return self.path[self.current_index]
return None
@property
def remaining_tiles(self) -> int:
"""Number of tiles left in path."""
return max(0, len(self.path) - self.current_index)
def advance(self):
"""Move to next tile in path."""
if self.has_path:
self.current_index += 1
def clear(self):
"""Clear the current path."""
self.path = []
self.current_index = 0
self.destination_description = ""
self.visible_entities_at_start = set()
def should_interrupt(self, current_visible_entities: Set[str]) -> bool:
"""
Check if path should be interrupted due to FOV change.
Returns True if a NEW entity has entered the agent's FOV
since the path was planned.
"""
new_entities = current_visible_entities - self.visible_entities_at_start
return len(new_entities) > 0
@dataclass
class PointOfInterest:
"""
A targetable object/location for LOOK/MOVE actions.
Listed in LLM prompts to guide valid targeting.
"""
name: str # Short name: "door", "rat", "button"
display_name: str # Full description: "a wooden door to the east"
position: Tuple[int, int] # Tile coordinates
direction: str # Cardinal direction from agent: "north", "east"
distance: int # Manhattan distance from agent
can_look: bool = True # Can be examined with LOOK
can_move_to: bool = False # Can be targeted with GO TO
entity_id: Optional[str] = None # Entity ID if this is an entity
def get_action_cost(action: Action) -> TurnCost:
"""Get the turn cost for an action."""
return ACTION_COSTS.get(action.type, TurnCost.FULL)
def get_direction_name(from_pos: Tuple[int, int], to_pos: Tuple[int, int]) -> str:
"""Get cardinal direction name from one position to another."""
dx = to_pos[0] - from_pos[0]
dy = to_pos[1] - from_pos[1]
if abs(dx) > abs(dy):
return "east" if dx > 0 else "west"
elif abs(dy) > abs(dx):
return "south" if dy > 0 else "north"
else:
# Diagonal
ns = "south" if dy > 0 else "north"
ew = "east" if dx > 0 else "west"
return f"{ns}-{ew}"
def manhattan_distance(a: Tuple[int, int], b: Tuple[int, int]) -> int:
"""Calculate Manhattan distance between two points."""
return abs(a[0] - b[0]) + abs(a[1] - b[1])
class PointOfInterestCollector:
"""
Collects points of interest visible to an agent.
Used to populate LLM prompts with valid LOOK/MOVE targets.
"""
def __init__(self, grid, agent_pos: Tuple[int, int]):
self.grid = grid
self.agent_pos = agent_pos
self.points: List[PointOfInterest] = []
def collect_from_fov(self, world_graph=None) -> List[PointOfInterest]:
"""
Collect all points of interest visible in current FOV.
Examines:
- Entities (other agents, NPCs, items)
- Doors/exits
- Interactive objects (buttons, chests)
- Notable tiles (walls with features)
"""
self.points = []
# Collect entities
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) == self.agent_pos:
continue # Skip self
if self.grid.is_in_fov(ex, ey):
direction = get_direction_name(self.agent_pos, (ex, ey))
distance = manhattan_distance(self.agent_pos, (ex, ey))
# Try to get entity name/description
entity_name = getattr(entity, 'name', None) or f"creature"
entity_id = getattr(entity, 'id', None) or str(id(entity))
self.points.append(PointOfInterest(
name=entity_name,
display_name=f"a {entity_name} to the {direction}",
position=(ex, ey),
direction=direction,
distance=distance,
can_look=True,
can_move_to=False, # Can't move onto entities
entity_id=entity_id
))
# Collect from WorldGraph if provided
if world_graph:
self._collect_from_world_graph(world_graph)
# Sort by distance
self.points.sort(key=lambda p: p.distance)
return self.points
def _collect_from_world_graph(self, world):
"""Collect doors and objects from WorldGraph."""
agent_room = world.room_at(*self.agent_pos)
if not agent_room:
return
# Doors
for door in world.get_exits(agent_room.name):
dx, dy = door.position
if self.grid.is_in_fov(dx, dy):
direction = get_direction_name(self.agent_pos, (dx, dy))
distance = manhattan_distance(self.agent_pos, (dx, dy))
# Get destination room name
if door.room_a == agent_room.name:
dest = world.rooms.get(door.room_b)
else:
dest = world.rooms.get(door.room_a)
dest_name = dest.display_name if dest else "unknown"
lock_str = " (locked)" if door.locked else ""
self.points.append(PointOfInterest(
name="door",
display_name=f"a door to {dest_name}{lock_str} ({direction})",
position=(dx, dy),
direction=direction,
distance=distance,
can_look=True,
can_move_to=not door.locked
))
# Objects in room
for obj in world.get_objects_in_room(agent_room.name):
ox, oy = obj.position
if self.grid.is_in_fov(ox, oy):
direction = get_direction_name(self.agent_pos, (ox, oy))
distance = manhattan_distance(self.agent_pos, (ox, oy))
self.points.append(PointOfInterest(
name=obj.name,
display_name=f"{obj.display_name} ({direction})",
position=(ox, oy),
direction=direction,
distance=distance,
can_look=True,
can_move_to="pressable" not in obj.affordances # Can walk to items
))
def format_for_prompt(self) -> str:
"""Format points of interest for inclusion in LLM prompt."""
if not self.points:
return "No notable objects in view."
lines = ["Points of interest:"]
for poi in self.points:
actions = []
if poi.can_look:
actions.append(f"LOOK AT {poi.name.upper()}")
if poi.can_move_to:
actions.append(f"GO TO {poi.name.upper()}")
action_str = ", ".join(actions) if actions else "observe only"
lines.append(f" - {poi.display_name}: {action_str}")
return "\n".join(lines)

View file

@ -0,0 +1,731 @@
"""
Enhanced Action Executor
========================
Extends ActionExecutor with:
- LOOK action with detailed descriptions
- SPEAK/ANNOUNCE execution with range checking
- Multi-tile path planning
- Free action vs turn-ending action handling
"""
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict, Any, Set
from action_parser import Action, ActionType
from action_executor import ActionResult
from action_economy import (
TurnState, PathState, TurnCost, get_action_cost,
manhattan_distance, get_direction_name
)
@dataclass
class TakeResult:
"""Result of a TAKE action."""
success: bool
message: str
item_name: str
item_position: Optional[Tuple[int, int]] = None
@dataclass
class LookResult:
"""Result of a LOOK action."""
success: bool
description: str
target_name: str
target_position: Optional[Tuple[int, int]] = None
@dataclass
class SpeechResult:
"""Result of a SPEAK/ANNOUNCE action."""
success: bool
message: str
recipients: List[str] # Names of agents who received the message
speech_type: str # "announce" or "speak"
content: str # What was said
@dataclass
class Message:
"""A message received by an agent."""
sender: str
content: str
speech_type: str # "announce" or "speak"
turn: int
distance: Optional[int] = None # For SPEAK, how far away sender was
class EnhancedExecutor:
"""
Enhanced action executor with LOOK, SPEAK, and multi-tile support.
"""
# Direction vectors for movement
DIRECTION_VECTORS = {
'NORTH': (0, -1),
'SOUTH': (0, 1),
'EAST': (1, 0),
'WEST': (-1, 0),
}
# SPEAK range (Manhattan distance)
SPEAK_RANGE = 4
def __init__(self, grid, world_graph=None):
"""
Initialize executor.
Args:
grid: mcrfpy.Grid instance
world_graph: Optional WorldGraph for detailed descriptions
"""
self.grid = grid
self.world = world_graph
# Agent path states (agent_name -> PathState)
self.path_states: Dict[str, PathState] = {}
# Speech channel for message delivery
self.pending_messages: Dict[str, List[Message]] = {} # agent_name -> messages
def get_path_state(self, agent_name: str) -> PathState:
"""Get or create path state for an agent."""
if agent_name not in self.path_states:
self.path_states[agent_name] = PathState()
return self.path_states[agent_name]
def get_pending_messages(self, agent_name: str) -> List[Message]:
"""Get and clear pending messages for an agent."""
messages = self.pending_messages.get(agent_name, [])
self.pending_messages[agent_name] = []
return messages
# =========================================================================
# LOOK Action
# =========================================================================
def execute_look(self, agent, action: Action) -> LookResult:
"""
Execute LOOK action - examine a tile or entity.
Args:
agent: Agent performing the look
action: Parsed LOOK action with optional target
Returns:
LookResult with detailed description
"""
target = action.args[0] if action.args and action.args[0] else None
if target is None:
# General look around
return self._look_around(agent)
else:
# Look at specific target
return self._look_at_target(agent, target.upper())
def _look_around(self, agent) -> LookResult:
"""Describe the general surroundings."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
descriptions = []
# Describe current room
if self.world:
room = self.world.room_at(ax, ay)
if room:
descriptions.append(f"You are in {room.display_name}.")
if room.description_template and room.properties:
try:
desc = room.description_template.format(**room.properties)
descriptions.append(desc)
except KeyError:
pass
# Count visible entities
visible_count = 0
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) != (ax, ay) and self.grid.is_in_fov(ex, ey):
visible_count += 1
if visible_count > 0:
descriptions.append(f"You can see {visible_count} other creature(s) nearby.")
# Describe nearby walls/openings
wall_dirs = []
open_dirs = []
for direction, (dx, dy) in self.DIRECTION_VECTORS.items():
nx, ny = ax + dx, ay + dy
if 0 <= nx < self.grid.grid_size[0] and 0 <= ny < self.grid.grid_size[1]:
cell = self.grid.at(nx, ny)
if cell.walkable:
open_dirs.append(direction.lower())
else:
wall_dirs.append(direction.lower())
if open_dirs:
descriptions.append(f"Open passages: {', '.join(open_dirs)}.")
if wall_dirs:
descriptions.append(f"Walls to the: {', '.join(wall_dirs)}.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name="surroundings"
)
def _look_at_target(self, agent, target: str) -> LookResult:
"""Look at a specific target (direction, entity, or object name)."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
# Check if target is a direction
if target in self.DIRECTION_VECTORS:
return self._look_in_direction(agent, target)
# Check if target matches an entity
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) == (ax, ay):
continue
entity_name = getattr(entity, 'name', '').upper()
if target in entity_name or entity_name in target:
if self.grid.is_in_fov(ex, ey):
return self._describe_entity(agent, entity)
else:
return LookResult(
success=False,
description=f"You cannot see {target.lower()} from here.",
target_name=target.lower()
)
# Check WorldGraph objects
if self.world:
room = self.world.room_at(ax, ay)
if room:
for obj in self.world.get_objects_in_room(room.name):
if target in obj.name.upper() or obj.name.upper() in target:
ox, oy = obj.position
if self.grid.is_in_fov(ox, oy):
return self._describe_object(agent, obj)
# Check doors
for door in self.world.get_exits(room.name):
if "DOOR" in target:
dx, dy = door.position
if self.grid.is_in_fov(dx, dy):
return self._describe_door(agent, door)
return LookResult(
success=False,
description=f"You don't see anything called '{target.lower()}' nearby.",
target_name=target.lower()
)
def _look_in_direction(self, agent, direction: str) -> LookResult:
"""Look in a cardinal direction."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
dx, dy = self.DIRECTION_VECTORS[direction]
descriptions = []
# Scan tiles in that direction
for distance in range(1, 10):
tx, ty = ax + dx * distance, ay + dy * distance
if not (0 <= tx < self.grid.grid_size[0] and 0 <= ty < self.grid.grid_size[1]):
descriptions.append(f"The edge of the known world lies {direction.lower()}.")
break
if not self.grid.is_in_fov(tx, ty):
descriptions.append(f"Darkness obscures your vision beyond {distance} tiles.")
break
cell = self.grid.at(tx, ty)
# Check for entity at this tile
for entity in self.grid.entities:
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if (ex, ey) == (tx, ty):
entity_name = getattr(entity, 'name', 'creature')
descriptions.append(f"A {entity_name} stands {distance} tile(s) to the {direction.lower()}.")
# Check for wall
if not cell.walkable:
# Check if it's a door
if self.world:
room = self.world.room_at(ax, ay)
if room:
for door in self.world.get_exits(room.name):
if door.position == (tx, ty):
dest = self.world.rooms.get(
door.room_b if door.room_a == room.name else door.room_a
)
dest_name = dest.display_name if dest else "another area"
lock_str = " It is locked." if door.locked else ""
descriptions.append(
f"A door to {dest_name} lies {distance} tile(s) {direction.lower()}.{lock_str}"
)
break
else:
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
else:
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
else:
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
break
if not descriptions:
descriptions.append(f"Open floor extends to the {direction.lower()}.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name=direction.lower(),
target_position=None
)
def _describe_entity(self, agent, entity) -> LookResult:
"""Generate detailed description of an entity."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
ex, ey = int(entity.pos[0]), int(entity.pos[1])
entity_name = getattr(entity, 'name', 'creature')
direction = get_direction_name((ax, ay), (ex, ey))
distance = manhattan_distance((ax, ay), (ex, ey))
descriptions = [
f"You examine the {entity_name} carefully.",
f"It stands {distance} tile(s) to the {direction}."
]
# Add any entity-specific description
if hasattr(entity, 'description'):
descriptions.append(entity.description)
# Add behavior hints if available
if hasattr(entity, 'behavior'):
descriptions.append(f"It appears to be {entity.behavior}.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name=entity_name,
target_position=(ex, ey)
)
def _describe_object(self, agent, obj) -> LookResult:
"""Generate detailed description of a WorldGraph object."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
ox, oy = obj.position
direction = get_direction_name((ax, ay), (ox, oy))
distance = manhattan_distance((ax, ay), (ox, oy))
descriptions = [
f"You examine {obj.display_name}.",
f"It is {distance} tile(s) to the {direction}."
]
if obj.description:
descriptions.append(obj.description)
# Describe affordances
if "takeable" in obj.affordances:
descriptions.append("It looks small enough to pick up.")
if "pressable" in obj.affordances:
descriptions.append("It appears to be some kind of mechanism.")
if "openable" in obj.affordances:
descriptions.append("It can be opened.")
if "readable" in obj.affordances:
descriptions.append("There is writing on it.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name=obj.name,
target_position=(ox, oy)
)
def _describe_door(self, agent, door) -> LookResult:
"""Generate detailed description of a door."""
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
dx, dy = door.position
direction = get_direction_name((ax, ay), (dx, dy))
distance = manhattan_distance((ax, ay), (dx, dy))
# Get destination
if self.world:
current_room = self.world.room_at(ax, ay)
if current_room:
if door.room_a == current_room.name:
dest = self.world.rooms.get(door.room_b)
else:
dest = self.world.rooms.get(door.room_a)
dest_name = dest.display_name if dest else "another area"
else:
dest_name = "another area"
else:
dest_name = "another area"
descriptions = [
f"You examine the doorway to the {direction}.",
f"It leads to {dest_name}, {distance} tile(s) away."
]
if door.locked:
descriptions.append("The door is locked. You'll need a key or mechanism to open it.")
else:
descriptions.append("The passage is open.")
return LookResult(
success=True,
description=" ".join(descriptions),
target_name="door",
target_position=(dx, dy)
)
# =========================================================================
# SPEAK/ANNOUNCE Actions
# =========================================================================
def execute_speech(self, agent, action: Action, all_agents: list,
turn_number: int) -> SpeechResult:
"""
Execute SPEAK or ANNOUNCE action.
ANNOUNCE: All agents in the same room hear the message
SPEAK: Only agents within SPEAK_RANGE tiles hear the message
"""
message_content = action.args[0] if action.args else ""
if not message_content:
return SpeechResult(
success=False,
message="Nothing to say.",
recipients=[],
speech_type=action.type.value.lower(),
content=""
)
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
recipients = []
if action.type == ActionType.ANNOUNCE:
# Room-wide broadcast
recipients = self._get_agents_in_room(agent, all_agents)
speech_type = "announce"
else:
# Proximity-based speech
recipients = self._get_agents_in_range(agent, all_agents, self.SPEAK_RANGE)
speech_type = "speak"
# Deliver messages
for recipient in recipients:
if recipient.name not in self.pending_messages:
self.pending_messages[recipient.name] = []
distance = manhattan_distance(
(ax, ay),
(int(recipient.entity.pos[0]), int(recipient.entity.pos[1]))
) if speech_type == "speak" else None
self.pending_messages[recipient.name].append(Message(
sender=agent.name,
content=message_content,
speech_type=speech_type,
turn=turn_number,
distance=distance
))
recipient_names = [r.name for r in recipients]
if recipients:
return SpeechResult(
success=True,
message=f"You {speech_type}: \"{message_content}\"",
recipients=recipient_names,
speech_type=speech_type,
content=message_content
)
else:
return SpeechResult(
success=True, # Still succeeds, just nobody heard
message=f"You {speech_type} into the emptiness: \"{message_content}\"",
recipients=[],
speech_type=speech_type,
content=message_content
)
def _get_agents_in_room(self, speaker, all_agents: list) -> list:
"""Get all agents in the same room as speaker (excluding speaker)."""
if not self.world:
# Fallback: use proximity
return self._get_agents_in_range(speaker, all_agents, 20)
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
speaker_room = self.world.room_at(ax, ay)
if not speaker_room:
return []
recipients = []
for agent in all_agents:
if agent.name == speaker.name:
continue
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
agent_room = self.world.room_at(rx, ry)
if agent_room and agent_room.name == speaker_room.name:
recipients.append(agent)
return recipients
def _get_agents_in_range(self, speaker, all_agents: list, range_tiles: int) -> list:
"""Get all agents within Manhattan distance of speaker."""
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
recipients = []
for agent in all_agents:
if agent.name == speaker.name:
continue
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
if manhattan_distance((ax, ay), (rx, ry)) <= range_tiles:
recipients.append(agent)
return recipients
# =========================================================================
# TAKE Action
# =========================================================================
def execute_take(self, agent, action: Action) -> TakeResult:
"""
Execute TAKE action - pick up an item.
Items must be:
1. In the WorldGraph as a takeable object
2. Within reach (adjacent tile or same tile, distance <= 1)
3. Visible in FOV
"""
item_name = action.args[0].lower() if action.args and action.args[0] else None
if not item_name:
return TakeResult(
success=False,
message="Take what? Specify an item name.",
item_name=""
)
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
# Search for the item in WorldGraph
if not self.world:
return TakeResult(
success=False,
message="No items exist in this world.",
item_name=item_name
)
# Find matching object
matching_obj = None
for obj_name, obj in self.world.objects.items():
if item_name in obj_name.lower() or obj_name.lower() in item_name:
matching_obj = obj
break
if not matching_obj:
return TakeResult(
success=False,
message=f"You don't see any '{item_name}' here.",
item_name=item_name
)
# Check if takeable
if "takeable" not in matching_obj.affordances:
return TakeResult(
success=False,
message=f"The {matching_obj.display_name} cannot be picked up.",
item_name=item_name,
item_position=matching_obj.position
)
ox, oy = matching_obj.position
# Check if visible in FOV
if not self.grid.is_in_fov(ox, oy):
return TakeResult(
success=False,
message=f"You can't see the {matching_obj.display_name} from here.",
item_name=item_name,
item_position=(ox, oy)
)
# Check distance (must be adjacent or same tile)
distance = manhattan_distance((ax, ay), (ox, oy))
if distance > 1:
direction = get_direction_name((ax, ay), (ox, oy))
# Use name for cleaner message (display_name often has article already)
return TakeResult(
success=False,
message=f"The {matching_obj.name.replace('_', ' ')} is {distance} tiles away to the {direction}. Move closer to pick it up.",
item_name=item_name,
item_position=(ox, oy)
)
# Success! Remove from world (simplified - no inventory system yet)
del self.world.objects[matching_obj.name]
return TakeResult(
success=True,
message=f"You pick up {matching_obj.display_name}.",
item_name=matching_obj.name,
item_position=(ox, oy)
)
# =========================================================================
# Movement (single tile, delegates to original executor)
# =========================================================================
def execute_move(self, agent, action: Action) -> ActionResult:
"""
Execute single-tile movement.
This is the per-turn movement. Multi-tile paths are handled
at the orchestrator level.
"""
if not action.args or not action.args[0]:
return ActionResult(False, "No direction specified")
direction = action.args[0]
if direction not in self.DIRECTION_VECTORS:
return ActionResult(False, f"Invalid direction: {direction}")
dx, dy = self.DIRECTION_VECTORS[direction]
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
new_x, new_y = current_x + dx, current_y + dy
# Bounds check
grid_w, grid_h = self.grid.grid_size
if not (0 <= new_x < grid_w and 0 <= new_y < grid_h):
return ActionResult(False, f"Cannot go {direction} - edge of map")
# Walkability check
target_cell = self.grid.at(new_x, new_y)
if not target_cell.walkable:
return ActionResult(False, f"Cannot go {direction} - path blocked")
# Entity collision check
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if ex == new_x and ey == new_y:
return ActionResult(False, f"Cannot go {direction} - occupied")
# Execute movement
agent.entity.pos = (new_x, new_y)
return ActionResult(
success=True,
message=f"Moved {direction.lower()} to ({new_x}, {new_y})",
new_position=(new_x, new_y),
path=[(current_x, current_y), (new_x, new_y)]
)
def execute_wait(self, agent, action: Action) -> ActionResult:
"""Execute WAIT action."""
return ActionResult(True, "Waited and observed surroundings")
# =========================================================================
# Multi-tile Pathfinding
# =========================================================================
def plan_path_to(self, agent, target_pos: Tuple[int, int],
visible_entities: Set[str]) -> Optional[List[Tuple[int, int]]]:
"""
Plan a path to a target position.
Uses A* via libtcod if available, otherwise simple pathfinding.
Returns list of tiles from current position to target (excluding current).
"""
try:
from mcrfpy import libtcod
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
path = libtcod.find_path(self.grid, ax, ay, target_pos[0], target_pos[1])
if path:
# Store path state
path_state = self.get_path_state(agent.name)
path_state.path = path
path_state.current_index = 0
path_state.visible_entities_at_start = visible_entities.copy()
return path
except ImportError:
pass
return None
def continue_path(self, agent, current_visible: Set[str]) -> Optional[ActionResult]:
"""
Continue an existing multi-tile path.
Returns ActionResult if moved, None if path complete or interrupted.
"""
path_state = self.get_path_state(agent.name)
if not path_state.has_path:
return None
# Check for FOV interrupt
if path_state.should_interrupt(current_visible):
path_state.clear()
return None # Signal that LLM should be queried
# Get next tile
next_tile = path_state.next_tile
if not next_tile:
path_state.clear()
return None
# Move to next tile
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
new_x, new_y = next_tile
# Verify still walkable
target_cell = self.grid.at(new_x, new_y)
if not target_cell.walkable:
path_state.clear()
return ActionResult(False, "Path blocked - recalculating")
# Check for entity collision
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if ex == new_x and ey == new_y:
path_state.clear()
return ActionResult(False, "Path blocked by creature")
# Execute movement
agent.entity.pos = (new_x, new_y)
path_state.advance()
remaining = path_state.remaining_tiles
if remaining > 0:
msg = f"Continuing path ({remaining} tiles remaining)"
else:
msg = "Arrived at destination"
path_state.clear()
return ActionResult(
success=True,
message=msg,
new_position=(new_x, new_y),
path=[(current_x, current_y), (new_x, new_y)]
)

View file

@ -0,0 +1,606 @@
"""
Enhanced Turn Orchestrator
==========================
Extends TurnOrchestrator with:
- Action economy (free actions vs turn-ending)
- Multi-tile path continuation
- FOV interrupt detection
- Enhanced logging for offline viewer replay
"""
import json
import os
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional, Callable, Set
from datetime import datetime
from world_graph import WorldGraph, AgentInfo
from action_parser import Action, ActionType, parse_action
from action_executor import ActionResult
from action_economy import (
TurnState, PathState, TurnCost, get_action_cost,
PointOfInterestCollector, PointOfInterest
)
from enhanced_executor import EnhancedExecutor, LookResult, SpeechResult, Message, TakeResult
@dataclass
class FreeActionRecord:
"""Record of a free action taken during a turn."""
action_type: str
args: tuple
result: Dict[str, Any]
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@dataclass
class EnhancedSimulationStep:
"""
Enhanced simulation step for offline viewer replay.
Contains all data needed to reconstruct the agent's perspective
and decision-making for that turn.
"""
# Turn identification
turn: int
agent_id: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
# Agent state at start of turn
position_start: tuple = (0, 0)
room: str = ""
path_in_progress: bool = False
# FOV and perception
visible_entities: List[str] = field(default_factory=list)
visible_tiles: int = 0 # Count of visible tiles
points_of_interest: List[Dict] = field(default_factory=list)
# Context provided to LLM
location_description: str = ""
available_actions: List[str] = field(default_factory=list)
pending_messages: List[Dict] = field(default_factory=list)
poi_prompt: str = ""
# Screenshot path (for viewer to load)
screenshot_path: str = ""
# LLM interaction
llm_prompt_system: str = ""
llm_prompt_user: str = ""
llm_response: str = ""
llm_was_queried: bool = True # False if path continuation
# Conversation history (LLM queries within this turn)
llm_exchanges: List[Dict] = field(default_factory=list) # [{prompt, response, action, error}]
action_retries: int = 0 # How many times we re-prompted due to errors
# Free actions taken (LOOK, SPEAK)
free_actions: List[Dict] = field(default_factory=list)
# Turn-ending action
final_action_type: str = ""
final_action_args: tuple = ()
final_action_success: bool = False
final_action_message: str = ""
# Movement result
position_end: tuple = (0, 0)
path_taken: List[tuple] = field(default_factory=list)
path_remaining: int = 0 # Tiles left if multi-tile path
@dataclass
class EnhancedSimulationLog:
"""
Complete simulation log for offline viewer.
Designed to support:
- Turn-by-turn replay
- Per-agent perspective reconstruction
- LLM chain-of-thought review
- Speech history tracking
"""
metadata: Dict[str, Any] = field(default_factory=dict)
steps: List[EnhancedSimulationStep] = field(default_factory=list)
speech_log: List[Dict] = field(default_factory=list)
def save(self, path: str):
"""Save log to JSON file."""
data = {
"metadata": self.metadata,
"steps": [asdict(s) for s in self.steps],
"speech_log": self.speech_log
}
with open(path, 'w') as f:
json.dump(data, f, indent=2, default=str)
print(f"Enhanced simulation log saved to: {path}")
@classmethod
def load(cls, path: str) -> 'EnhancedSimulationLog':
"""Load log from JSON file."""
with open(path) as f:
data = json.load(f)
steps = []
for s in data.get("steps", []):
# Convert lists back to tuples where needed
if isinstance(s.get("position_start"), list):
s["position_start"] = tuple(s["position_start"])
if isinstance(s.get("position_end"), list):
s["position_end"] = tuple(s["position_end"])
if isinstance(s.get("final_action_args"), list):
s["final_action_args"] = tuple(s["final_action_args"])
if s.get("path_taken"):
s["path_taken"] = [tuple(p) for p in s["path_taken"]]
steps.append(EnhancedSimulationStep(**s))
return cls(
metadata=data.get("metadata", {}),
steps=steps,
speech_log=data.get("speech_log", [])
)
def get_turn_summary(self, turn: int) -> str:
"""Get summary of a specific turn for display."""
turn_steps = [s for s in self.steps if s.turn == turn]
lines = [f"=== Turn {turn} ==="]
for step in turn_steps:
lines.append(f"\n{step.agent_id}:")
lines.append(f" Position: {step.position_start} -> {step.position_end}")
if step.free_actions:
lines.append(f" Free actions: {len(step.free_actions)}")
for fa in step.free_actions:
lines.append(f" - {fa['action_type']}: {fa.get('result', {}).get('message', '')[:50]}")
status = "OK" if step.final_action_success else "FAIL"
lines.append(f" Action: {step.final_action_type} {step.final_action_args} [{status}]")
if not step.llm_was_queried:
lines.append(" (Path continuation - no LLM query)")
return "\n".join(lines)
class EnhancedOrchestrator:
"""
Enhanced turn orchestrator with action economy and improved logging.
"""
def __init__(self, grid, fov_layer, world: WorldGraph, agents: list,
screenshot_dir: str, llm_query_fn: Callable):
"""
Initialize enhanced orchestrator.
Args:
grid: mcrfpy.Grid instance
fov_layer: Color layer for FOV rendering
world: WorldGraph instance
agents: List of Agent objects
screenshot_dir: Directory for screenshots
llm_query_fn: Function(agent, screenshot_path, context) -> str
"""
self.grid = grid
self.fov_layer = fov_layer
self.world = world
self.agents = agents
self.screenshot_dir = screenshot_dir
self.llm_query_fn = llm_query_fn
self.executor = EnhancedExecutor(grid, world)
self.turn_number = 0
self.steps: List[EnhancedSimulationStep] = []
self.speech_log: List[Dict] = []
os.makedirs(screenshot_dir, exist_ok=True)
def run_simulation(self, max_turns: int = 10,
stop_condition: Callable = None) -> EnhancedSimulationLog:
"""
Run complete simulation with enhanced logging.
Args:
max_turns: Maximum number of turns
stop_condition: Optional callable(orchestrator) -> bool
Returns:
EnhancedSimulationLog for offline viewer
"""
print(f"\nStarting enhanced simulation: max {max_turns} turns")
print(f"Agents: {[a.name for a in self.agents]}")
print("=" * 60)
for turn in range(max_turns):
self.run_turn()
if stop_condition and stop_condition(self):
print(f"\nStop condition met at turn {self.turn_number}")
break
# Build log
log = EnhancedSimulationLog(
metadata={
"total_turns": self.turn_number,
"num_agents": len(self.agents),
"agent_names": [a.name for a in self.agents],
"timestamp_start": self.steps[0].timestamp if self.steps else "",
"timestamp_end": self.steps[-1].timestamp if self.steps else "",
"world_rooms": list(self.world.rooms.keys()),
"screenshot_dir": self.screenshot_dir,
},
steps=self.steps,
speech_log=self.speech_log
)
return log
def run_turn(self) -> List[EnhancedSimulationStep]:
"""Execute one full turn (all agents act once)."""
import mcrfpy
self.turn_number += 1
turn_steps = []
print(f"\n{'='*60}")
print(f"TURN {self.turn_number}")
print("=" * 60)
for agent in self.agents:
step = self._run_agent_turn(agent)
turn_steps.append(step)
self.steps.append(step)
return turn_steps
def _run_agent_turn(self, agent) -> EnhancedSimulationStep:
"""Execute one agent's turn with action economy."""
import mcrfpy
from mcrfpy import automation
print(f"\n--- {agent.name}'s Turn ---")
# Initialize step record
step = EnhancedSimulationStep(
turn=self.turn_number,
agent_id=agent.name,
position_start=agent.pos,
room=agent.current_room
)
# Check for path continuation
path_state = self.executor.get_path_state(agent.name)
current_visible = self._get_visible_entity_ids(agent)
if path_state.has_path:
# Check for FOV interrupt
if path_state.should_interrupt(current_visible):
print(f" Path interrupted: new entity in FOV")
path_state.clear()
else:
# Continue path without LLM query
result = self.executor.continue_path(agent, current_visible)
if result and result.success:
step.llm_was_queried = False
step.path_in_progress = True
step.final_action_type = "GO"
step.final_action_args = ("CONTINUE",)
step.final_action_success = True
step.final_action_message = result.message
step.position_end = result.new_position or agent.pos
step.path_taken = result.path or []
step.path_remaining = self.executor.get_path_state(agent.name).remaining_tiles
print(f" Path continuation: {result.message}")
return step
# Need LLM query - set up perspective
step.visible_entities = list(current_visible)
self._switch_perspective(agent)
mcrfpy.step(0.016)
# Take screenshot
screenshot_path = os.path.join(
self.screenshot_dir,
f"turn{self.turn_number}_{agent.name.lower()}.png"
)
automation.screenshot(screenshot_path)
step.screenshot_path = screenshot_path
# Collect points of interest
poi_collector = PointOfInterestCollector(self.grid, agent.pos)
pois = poi_collector.collect_from_fov(self.world)
step.points_of_interest = [asdict(p) for p in pois]
step.poi_prompt = poi_collector.format_for_prompt()
# Get pending messages
messages = self.executor.get_pending_messages(agent.name)
step.pending_messages = [asdict(m) for m in messages]
# Build context
visible_agents = self._get_visible_agents(agent)
context = agent.get_context(visible_agents + [agent])
step.location_description = context["location"]
step.available_actions = context["available_actions"]
# Turn state for action economy
turn_state = TurnState()
# Error feedback for retry loop
last_error = None
MAX_RETRIES = 3
# Action loop - handle free actions until turn-ending action
while not turn_state.turn_ended:
# Build prompt with current state (includes error feedback if any)
prompt = self._build_prompt(agent, context, step.poi_prompt, messages, turn_state, last_error)
step.llm_prompt_user = prompt # Store last prompt
# Query LLM
print(f" Querying LLM...")
response = self.llm_query_fn(agent, screenshot_path, {
**context,
"poi_prompt": step.poi_prompt,
"messages": [asdict(m) for m in messages],
"has_spoken": turn_state.has_spoken,
"last_error": last_error,
"conversation_history": step.llm_exchanges # Include past exchanges
})
step.llm_response = response
print(f" Response: {response[:200]}...")
# Parse action
action = parse_action(response)
cost = get_action_cost(action)
print(f" Action: {action.type.value} {action.args} (cost: {cost.value})")
# Track this exchange
exchange = {
"prompt": prompt[:500], # Truncate for storage
"response": response,
"action_type": action.type.value,
"action_args": action.args,
"error": None
}
# Execute action based on type
if action.type == ActionType.LOOK:
result = self.executor.execute_look(agent, action)
turn_state.record_free_action("LOOK", {
"target": result.target_name,
"description": result.description
})
step.free_actions.append({
"action_type": "LOOK",
"args": action.args,
"result": {"description": result.description}
})
# Provide result and continue loop for another action
context["look_result"] = result.description
last_error = None # Clear error on success
print(f" LOOK result: {result.description[:100]}...")
elif action.type in (ActionType.SPEAK, ActionType.ANNOUNCE):
if not turn_state.can_speak():
print(f" Already spoke this turn")
last_error = "You have already spoken this turn. Choose a different action."
exchange["error"] = last_error
step.action_retries += 1
if step.action_retries >= MAX_RETRIES:
# Force end turn
step.final_action_type = "WAIT"
step.final_action_args = ()
step.final_action_success = False
step.final_action_message = "Too many invalid actions - turn ended"
step.position_end = agent.pos
turn_state.end_turn()
else:
result = self.executor.execute_speech(
agent, action, self.agents, self.turn_number
)
turn_state.record_speech()
turn_state.record_free_action(action.type.value, {
"content": result.content,
"recipients": result.recipients
})
step.free_actions.append({
"action_type": action.type.value,
"args": action.args,
"result": {
"content": result.content,
"recipients": result.recipients
}
})
# Record in speech log
self.speech_log.append({
"turn": self.turn_number,
"speaker": agent.name,
"type": result.speech_type,
"content": result.content,
"recipients": result.recipients
})
last_error = None
print(f" {result.speech_type.upper()}: {result.content[:50]}... -> {result.recipients}")
# Continue loop for another action (can still move)
elif action.type == ActionType.TAKE:
result = self.executor.execute_take(agent, action)
if result.success:
step.final_action_type = "TAKE"
step.final_action_args = action.args
step.final_action_success = True
step.final_action_message = result.message
step.position_end = agent.pos
last_error = None
turn_state.end_turn()
print(f" TAKE: {result.message}")
else:
# Failed - give error feedback and let LLM try again
last_error = result.message
exchange["error"] = last_error
step.action_retries += 1
print(f" TAKE FAILED: {result.message}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = "TAKE"
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = result.message
step.position_end = agent.pos
turn_state.end_turn()
elif action.type == ActionType.GO:
result = self.executor.execute_move(agent, action)
if result.success:
step.final_action_type = "GO"
step.final_action_args = action.args
step.final_action_success = True
step.final_action_message = result.message
step.position_end = result.new_position or agent.pos
step.path_taken = result.path or []
last_error = None
turn_state.end_turn()
print(f" MOVE: {result.message}")
else:
# Failed - give error feedback
last_error = result.message
exchange["error"] = last_error
step.action_retries += 1
print(f" MOVE FAILED: {result.message}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = "GO"
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = result.message
step.position_end = agent.pos
turn_state.end_turn()
elif action.type == ActionType.WAIT:
result = self.executor.execute_wait(agent, action)
step.final_action_type = "WAIT"
step.final_action_args = ()
step.final_action_success = True
step.final_action_message = result.message
step.position_end = agent.pos
last_error = None
turn_state.end_turn()
print(f" WAIT")
elif action.type == ActionType.INVALID:
# Could not parse action - give feedback
last_error = f"Could not understand your action. Please use a valid action format like 'Action: GO EAST' or 'Action: TAKE key'."
exchange["error"] = last_error
step.action_retries += 1
print(f" INVALID ACTION: {action.args}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = "INVALID"
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = "Could not parse action"
step.position_end = agent.pos
turn_state.end_turn()
else:
# Unimplemented action type - give feedback
last_error = f"The action '{action.type.value}' is not yet supported. Try GO, TAKE, LOOK, SPEAK, or WAIT."
exchange["error"] = last_error
step.action_retries += 1
print(f" Unsupported: {action.type.value}")
if step.action_retries >= MAX_RETRIES:
step.final_action_type = action.type.value
step.final_action_args = action.args
step.final_action_success = False
step.final_action_message = f"Unsupported action: {action.type.value}"
step.position_end = agent.pos
turn_state.end_turn()
# Record exchange
step.llm_exchanges.append(exchange)
return step
def _build_prompt(self, agent, context: dict, poi_prompt: str,
messages: List[Message], turn_state: TurnState,
last_error: Optional[str] = None) -> str:
"""Build LLM prompt with current state and error feedback."""
parts = [context["location"]]
# Add messages received
if messages:
parts.append("\nMessages received:")
for msg in messages:
if msg.speech_type == "announce":
parts.append(f' {msg.sender} announces: "{msg.content}"')
else:
parts.append(f' {msg.sender} says: "{msg.content}"')
# Add points of interest
parts.append(f"\n{poi_prompt}")
# Add available actions
actions_str = ", ".join(context["available_actions"])
parts.append(f"\nAvailable actions: {actions_str}")
# Add LOOK result if we just looked
if "look_result" in context:
parts.append(f"\n[LOOK result: {context['look_result']}]")
# Add constraints
constraints = []
if turn_state.has_spoken:
constraints.append("You have already spoken this turn.")
if constraints:
parts.append(f"\nConstraints: {' '.join(constraints)}")
# Add error feedback from last action attempt
if last_error:
parts.append(f"\n[ERROR: {last_error}]")
parts.append("[Please try a different action.]")
parts.append("\nWhat do you do? Brief reasoning, then Action: <action>")
return "\n".join(parts)
def _switch_perspective(self, agent):
"""Switch grid view to agent's perspective."""
import mcrfpy
self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))
self.fov_layer.apply_perspective(
entity=agent.entity,
visible=mcrfpy.Color(0, 0, 0, 0),
discovered=mcrfpy.Color(40, 40, 60, 180),
unknown=mcrfpy.Color(0, 0, 0, 255)
)
agent.entity.update_visibility()
px, py = agent.pos
self.grid.center = (px * 16 + 8, py * 16 + 8)
def _get_visible_agents(self, observer) -> list:
"""Get agents visible to observer based on FOV."""
visible = []
for agent in self.agents:
if agent.name == observer.name:
continue
ax, ay = agent.pos
if self.grid.is_in_fov(ax, ay):
visible.append(agent)
return visible
def _get_visible_entity_ids(self, agent) -> Set[str]:
"""Get set of entity IDs currently visible to agent."""
visible = set()
ax, ay = agent.pos
for entity in self.grid.entities:
if entity is agent.entity:
continue
ex, ey = int(entity.pos[0]), int(entity.pos[1])
if self.grid.is_in_fov(ex, ey):
entity_id = getattr(entity, 'id', None) or str(id(entity))
visible.add(entity_id)
return visible

View file

@ -87,22 +87,31 @@ class Font:
class Drawable:
"""Base class for all drawable UI elements."""
x: float
y: float
visible: bool
z_index: int
name: str
pos: Vector
# Mouse event callbacks (#140, #141)
on_click: Optional[Callable[[float, float, int, str], None]]
on_enter: Optional[Callable[[float, float, int, str], None]]
on_exit: Optional[Callable[[float, float, int, str], None]]
on_move: Optional[Callable[[float, float, int, str], None]]
# Read-only hover state (#140)
hovered: bool
def get_bounds(self) -> Tuple[float, float, float, float]:
"""Get bounding box as (x, y, width, height)."""
...
def move(self, dx: float, dy: float) -> None:
"""Move by relative offset (dx, dy)."""
...
def resize(self, width: float, height: float) -> None:
"""Resize to new dimensions (width, height)."""
...
@ -343,45 +352,47 @@ class EntityCollection:
class Scene:
"""Base class for object-oriented scenes."""
name: str
children: UICollection # #151: UI elements collection (read-only alias for get_ui())
on_key: Optional[Callable[[str, str], None]] # Keyboard handler (key, action)
def __init__(self, name: str) -> None: ...
def activate(self) -> None:
"""Called when scene becomes active."""
...
def deactivate(self) -> None:
"""Called when scene becomes inactive."""
...
def get_ui(self) -> UICollection:
"""Get UI elements collection."""
...
def on_keypress(self, key: str, pressed: bool) -> None:
"""Handle keyboard events."""
"""Handle keyboard events (override in subclass)."""
...
def on_click(self, x: float, y: float, button: int) -> None:
"""Handle mouse clicks."""
"""Handle mouse clicks (override in subclass)."""
...
def on_enter(self) -> None:
"""Called when entering the scene."""
"""Called when entering the scene (override in subclass)."""
...
def on_exit(self) -> None:
"""Called when leaving the scene."""
"""Called when leaving the scene (override in subclass)."""
...
def on_resize(self, width: int, height: int) -> None:
"""Handle window resize events."""
"""Handle window resize events (override in subclass)."""
...
def update(self, dt: float) -> None:
"""Update scene logic."""
"""Update scene logic (override in subclass)."""
...
class Timer: