Add Game-to-API Bridge for external client integration
Implements a general-purpose HTTP API that exposes McRogueFace games to external clients (LLMs, accessibility tools, Twitch integrations, testing harnesses). API endpoints: - GET /scene - Full scene graph with all UI elements - GET /affordances - Interactive elements with semantic labels - GET /screenshot - PNG screenshot (binary or base64) - GET /metadata - Game metadata for LLM context - GET /wait - Long-poll for state changes - POST /input - Inject clicks, keys, or affordance clicks Key features: - Automatic affordance detection from Frame+Caption+on_click patterns - Label extraction from caption text with fallback to element.name - Thread-safe scene access via mcrfpy.lock() - Fuzzy label matching for click_affordance - Full input injection via mcrfpy.automation Usage: from api import start_server; start_server(8765) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
b47132b052
commit
ff46043023
12 changed files with 2391 additions and 0 deletions
197
src/scripts/api/README.md
Normal file
197
src/scripts/api/README.md
Normal file
|
|
@ -0,0 +1,197 @@
|
|||
# McRogueFace Game-to-API Bridge
|
||||
|
||||
A general-purpose API layer that exposes any McRogueFace game to external clients (LLMs, accessibility tools, Twitch integrations, testing harnesses).
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
# In your game script
|
||||
import sys
|
||||
sys.path.insert(0, '../src/scripts')
|
||||
from api import start_server
|
||||
|
||||
# Start the API server
|
||||
server = start_server(8765)
|
||||
```
|
||||
|
||||
The API will be available at `http://localhost:8765`.
|
||||
|
||||
## API Endpoints
|
||||
|
||||
### GET /health
|
||||
Health check endpoint.
|
||||
|
||||
```bash
|
||||
curl http://localhost:8765/health
|
||||
```
|
||||
|
||||
### GET /scene
|
||||
Returns the current scene graph with all UI elements.
|
||||
|
||||
```bash
|
||||
curl http://localhost:8765/scene
|
||||
```
|
||||
|
||||
Response includes:
|
||||
- Scene name
|
||||
- Viewport dimensions
|
||||
- All UI elements with type, bounds, visibility, interactivity
|
||||
- Nested children
|
||||
- Type-specific properties (text for Caption, grid_size for Grid, etc.)
|
||||
|
||||
### GET /affordances
|
||||
Returns only interactive elements with semantic labels.
|
||||
|
||||
```bash
|
||||
curl http://localhost:8765/affordances
|
||||
```
|
||||
|
||||
Response includes:
|
||||
- List of clickable elements with:
|
||||
- `id`: Unique affordance ID (for click_affordance)
|
||||
- `label`: Human-readable label (button text or element name)
|
||||
- `type`: Affordance type (button, text_button, icon_button, interactive_grid)
|
||||
- `bounds`: Position and size
|
||||
- `actions`: Available actions (click, hover, grid_cell_click)
|
||||
- `hint`: Developer label if different from display label
|
||||
- Default keyboard hints
|
||||
|
||||
### GET /screenshot
|
||||
Returns a screenshot of the current game state.
|
||||
|
||||
```bash
|
||||
# Binary PNG
|
||||
curl http://localhost:8765/screenshot -o screenshot.png
|
||||
|
||||
# Base64 JSON
|
||||
curl "http://localhost:8765/screenshot?format=base64"
|
||||
```
|
||||
|
||||
### GET /metadata
|
||||
Returns game metadata for LLM context.
|
||||
|
||||
```bash
|
||||
curl http://localhost:8765/metadata
|
||||
```
|
||||
|
||||
### POST /input
|
||||
Inject keyboard or mouse input.
|
||||
|
||||
```bash
|
||||
# Click at coordinates
|
||||
curl -X POST -H "Content-Type: application/json" \
|
||||
-d '{"action":"click","x":150,"y":100}' \
|
||||
http://localhost:8765/input
|
||||
|
||||
# Click affordance by label (fuzzy match)
|
||||
curl -X POST -H "Content-Type: application/json" \
|
||||
-d '{"action":"click_affordance","label":"Play"}' \
|
||||
http://localhost:8765/input
|
||||
|
||||
# Press a key
|
||||
curl -X POST -H "Content-Type: application/json" \
|
||||
-d '{"action":"key","key":"W"}' \
|
||||
http://localhost:8765/input
|
||||
|
||||
# Type text
|
||||
curl -X POST -H "Content-Type: application/json" \
|
||||
-d '{"action":"type","text":"Hello"}' \
|
||||
http://localhost:8765/input
|
||||
|
||||
# Key combination
|
||||
curl -X POST -H "Content-Type: application/json" \
|
||||
-d '{"action":"hotkey","keys":["ctrl","s"]}' \
|
||||
http://localhost:8765/input
|
||||
```
|
||||
|
||||
### GET /wait
|
||||
Long-poll endpoint that returns when scene state changes.
|
||||
|
||||
```bash
|
||||
curl "http://localhost:8765/wait?timeout=30&scene_hash=abc123"
|
||||
```
|
||||
|
||||
## Customizing Game Metadata
|
||||
|
||||
Games can provide rich metadata for external clients:
|
||||
|
||||
```python
|
||||
from api.metadata import (
|
||||
set_game_info,
|
||||
set_controls,
|
||||
set_keyboard_hints,
|
||||
set_custom_hints,
|
||||
)
|
||||
|
||||
set_game_info(
|
||||
name="My Game",
|
||||
version="1.0.0",
|
||||
description="A puzzle roguelike"
|
||||
)
|
||||
|
||||
set_controls({
|
||||
"movement": "W/A/S/D",
|
||||
"attack": "Space",
|
||||
})
|
||||
|
||||
set_keyboard_hints([
|
||||
{"key": "W", "action": "Move up"},
|
||||
{"key": "Space", "action": "Attack"},
|
||||
])
|
||||
|
||||
set_custom_hints("""
|
||||
Strategy tips:
|
||||
- Always check corners
|
||||
- Save potions for boss fights
|
||||
""")
|
||||
```
|
||||
|
||||
## Affordance Detection
|
||||
|
||||
The API automatically detects interactive elements:
|
||||
|
||||
1. **Buttons**: Frame with on_click + Caption child
|
||||
2. **Icon Buttons**: Frame with on_click + Sprite child
|
||||
3. **Interactive Grids**: Grid with on_cell_click
|
||||
4. **Text Buttons**: Caption with on_click
|
||||
|
||||
Labels are extracted from:
|
||||
1. Caption text inside the element
|
||||
2. Element's `name` property (developer hint)
|
||||
|
||||
Set meaningful `name` properties on your interactive elements for best results:
|
||||
|
||||
```python
|
||||
button = mcrfpy.Frame(...)
|
||||
button.name = "attack_button" # Will show in affordances
|
||||
```
|
||||
|
||||
## Use Cases
|
||||
|
||||
- **LLM Co-Play**: AI analyzes game state via /scene and /affordances, sends moves via /input
|
||||
- **Accessibility**: Screen readers can get semantic labels from /affordances
|
||||
- **Twitch Plays**: Aggregate chat votes and inject via /input
|
||||
- **Automated Testing**: Query state, inject inputs, verify outcomes
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────┐
|
||||
│ McRogueFace Game Loop │
|
||||
│ (Main Thread - renders game) │
|
||||
└────────────────┬────────────────┘
|
||||
│
|
||||
┌────────────────┴────────────────┐
|
||||
│ API Server (Background Thread)│
|
||||
│ - Introspects scene graph │
|
||||
│ - Injects input via automation │
|
||||
└────────────────┬────────────────┘
|
||||
│ HTTP :8765
|
||||
▼
|
||||
┌─────────────────────────────────┐
|
||||
│ External Clients │
|
||||
│ (LLMs, tests, accessibility) │
|
||||
└─────────────────────────────────┘
|
||||
```
|
||||
|
||||
The API server runs in a daemon thread and uses `mcrfpy.lock()` for thread-safe access to the scene graph.
|
||||
55
src/scripts/api/__init__.py
Normal file
55
src/scripts/api/__init__.py
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
"""McRogueFace Game-to-API Bridge
|
||||
|
||||
Exposes any McRogueFace game to external clients (LLMs, accessibility tools,
|
||||
Twitch integrations, testing harnesses) via a background HTTP server.
|
||||
|
||||
Usage:
|
||||
# In game script or via --exec
|
||||
import api
|
||||
api.start_server(port=8765)
|
||||
|
||||
# Or auto-starts on import if run via --exec
|
||||
"""
|
||||
|
||||
from .server import GameAPIServer
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__all__ = ["start_server", "stop_server", "get_server"]
|
||||
|
||||
_server = None
|
||||
|
||||
|
||||
def start_server(port: int = 8765) -> GameAPIServer:
|
||||
"""Start the API server on the specified port.
|
||||
|
||||
Args:
|
||||
port: HTTP port to listen on (default: 8765)
|
||||
|
||||
Returns:
|
||||
The GameAPIServer instance
|
||||
"""
|
||||
global _server
|
||||
if _server is None:
|
||||
_server = GameAPIServer(port)
|
||||
_server.start()
|
||||
return _server
|
||||
|
||||
|
||||
def stop_server() -> None:
|
||||
"""Stop the API server if running."""
|
||||
global _server
|
||||
if _server is not None:
|
||||
_server.stop()
|
||||
_server = None
|
||||
|
||||
|
||||
def get_server() -> GameAPIServer:
|
||||
"""Get the current server instance, or None if not running."""
|
||||
return _server
|
||||
|
||||
|
||||
# Auto-start when imported via --exec
|
||||
# Check if we're being imported as main script
|
||||
import sys
|
||||
if __name__ == "__main__" or (hasattr(sys, '_called_from_exec') and sys._called_from_exec):
|
||||
start_server()
|
||||
350
src/scripts/api/affordances.py
Normal file
350
src/scripts/api/affordances.py
Normal file
|
|
@ -0,0 +1,350 @@
|
|||
"""Affordance detection for the McRogueFace Game API.
|
||||
|
||||
Analyzes the UI hierarchy to extract semantic meaning and identify
|
||||
interactive elements with their labels and action types.
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
|
||||
import mcrfpy
|
||||
|
||||
|
||||
# Global affordance ID counter (reset per extraction)
|
||||
_affordance_id = 0
|
||||
|
||||
|
||||
def get_element_type(element) -> str:
|
||||
"""Get the type name of a UI element."""
|
||||
return type(element).__name__
|
||||
|
||||
|
||||
def get_bounds(element) -> Dict[str, float]:
|
||||
"""Extract bounding box from an element."""
|
||||
try:
|
||||
if hasattr(element, 'get_bounds'):
|
||||
x, y, w, h = element.get_bounds()
|
||||
return {"x": x, "y": y, "w": w, "h": h}
|
||||
elif hasattr(element, 'x') and hasattr(element, 'y'):
|
||||
x = float(element.x) if element.x is not None else 0
|
||||
y = float(element.y) if element.y is not None else 0
|
||||
w = float(getattr(element, 'w', 0) or 0)
|
||||
h = float(getattr(element, 'h', 0) or 0)
|
||||
return {"x": x, "y": y, "w": w, "h": h}
|
||||
except Exception:
|
||||
pass
|
||||
return {"x": 0, "y": 0, "w": 0, "h": 0}
|
||||
|
||||
|
||||
def find_label_in_children(element) -> Optional[str]:
|
||||
"""Search children for Caption text to use as label."""
|
||||
if not hasattr(element, 'children'):
|
||||
return None
|
||||
|
||||
try:
|
||||
for child in element.children:
|
||||
child_type = get_element_type(child)
|
||||
if child_type == "Caption":
|
||||
text = str(getattr(child, 'text', ''))
|
||||
if text.strip():
|
||||
return text.strip()
|
||||
# Recurse into child frames
|
||||
if child_type == "Frame":
|
||||
label = find_label_in_children(child)
|
||||
if label:
|
||||
return label
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def find_icon_in_children(element) -> Optional[int]:
|
||||
"""Search children for Sprite to identify icon buttons."""
|
||||
if not hasattr(element, 'children'):
|
||||
return None
|
||||
|
||||
try:
|
||||
for child in element.children:
|
||||
child_type = get_element_type(child)
|
||||
if child_type == "Sprite":
|
||||
return int(getattr(child, 'sprite_index', 0))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def find_label(element) -> Optional[str]:
|
||||
"""Find the best label for an interactive element.
|
||||
|
||||
Priority:
|
||||
1. Caption child text (most user-visible)
|
||||
2. Element name property (developer hint)
|
||||
3. Nearby caption text (for icon buttons)
|
||||
|
||||
Returns the most user-friendly label for display/matching.
|
||||
"""
|
||||
element_type = get_element_type(element)
|
||||
|
||||
# For Caption elements, use their text directly
|
||||
if element_type == "Caption":
|
||||
text = str(getattr(element, 'text', ''))
|
||||
if text.strip():
|
||||
return text.strip()
|
||||
|
||||
# Search children for Caption first (most visible to user)
|
||||
label = find_label_in_children(element)
|
||||
if label:
|
||||
return label
|
||||
|
||||
# Fall back to element name (developer hint)
|
||||
name = getattr(element, 'name', None)
|
||||
if name and str(name).strip():
|
||||
return str(name).strip()
|
||||
|
||||
# For Sprite buttons, mention it's an icon
|
||||
if element_type == "Sprite":
|
||||
sprite_index = int(getattr(element, 'sprite_index', 0))
|
||||
return f"Icon button (sprite #{sprite_index})"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def classify_affordance(element) -> str:
|
||||
"""Classify what type of affordance this element represents."""
|
||||
element_type = get_element_type(element)
|
||||
|
||||
if element_type == "Grid":
|
||||
if getattr(element, 'on_cell_click', None):
|
||||
return "interactive_grid"
|
||||
return "grid"
|
||||
|
||||
if element_type == "Caption":
|
||||
if getattr(element, 'on_click', None):
|
||||
return "text_button"
|
||||
return "label"
|
||||
|
||||
if element_type == "Sprite":
|
||||
if getattr(element, 'on_click', None):
|
||||
return "icon_button"
|
||||
return "icon"
|
||||
|
||||
if element_type == "Frame":
|
||||
has_click = getattr(element, 'on_click', None) is not None
|
||||
|
||||
# Frame with click + caption child = button
|
||||
if has_click:
|
||||
label = find_label_in_children(element)
|
||||
icon = find_icon_in_children(element)
|
||||
if label and icon is not None:
|
||||
return "button" # Has both text and icon
|
||||
elif label:
|
||||
return "text_button"
|
||||
elif icon is not None:
|
||||
return "icon_button"
|
||||
return "clickable_area"
|
||||
|
||||
# Frame without click but with children = container
|
||||
if hasattr(element, 'children'):
|
||||
try:
|
||||
if len(list(element.children)) > 0:
|
||||
return "container"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return "panel"
|
||||
|
||||
return "unknown"
|
||||
|
||||
|
||||
def get_accepted_actions(element, affordance_type: str) -> List[str]:
|
||||
"""Determine what actions an affordance accepts."""
|
||||
actions = []
|
||||
|
||||
if getattr(element, 'on_click', None):
|
||||
actions.append("click")
|
||||
|
||||
if getattr(element, 'on_enter', None) or getattr(element, 'on_exit', None):
|
||||
actions.append("hover")
|
||||
|
||||
if affordance_type == "interactive_grid":
|
||||
actions.append("grid_cell_click")
|
||||
if getattr(element, 'on_cell_enter', None):
|
||||
actions.append("grid_cell_hover")
|
||||
|
||||
return actions if actions else ["none"]
|
||||
|
||||
|
||||
def extract_affordance(element, parent_bounds: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
|
||||
"""Extract affordance information from a single element.
|
||||
|
||||
Args:
|
||||
element: UI element to analyze
|
||||
parent_bounds: Parent's bounds for relative positioning
|
||||
|
||||
Returns:
|
||||
Affordance dict or None if not interactive
|
||||
"""
|
||||
global _affordance_id
|
||||
|
||||
element_type = get_element_type(element)
|
||||
has_click = getattr(element, 'on_click', None) is not None
|
||||
has_cell_click = getattr(element, 'on_cell_click', None) is not None
|
||||
has_hover = (getattr(element, 'on_enter', None) is not None or
|
||||
getattr(element, 'on_exit', None) is not None)
|
||||
|
||||
# Skip non-interactive elements (unless they're grids with cell callbacks)
|
||||
if not (has_click or has_cell_click or has_hover):
|
||||
return None
|
||||
|
||||
bounds = get_bounds(element)
|
||||
label = find_label(element)
|
||||
affordance_type = classify_affordance(element)
|
||||
actions = get_accepted_actions(element, affordance_type)
|
||||
|
||||
_affordance_id += 1
|
||||
|
||||
affordance = {
|
||||
"id": _affordance_id,
|
||||
"type": affordance_type,
|
||||
"element_type": element_type,
|
||||
"label": label,
|
||||
"bounds": bounds,
|
||||
"actions": actions,
|
||||
}
|
||||
|
||||
# Add grid-specific info
|
||||
if element_type == "Grid":
|
||||
grid_size = getattr(element, 'grid_size', None)
|
||||
if grid_size:
|
||||
try:
|
||||
grid_w = int(grid_size.x) if hasattr(grid_size, 'x') else int(grid_size[0])
|
||||
grid_h = int(grid_size.y) if hasattr(grid_size, 'y') else int(grid_size[1])
|
||||
affordance["grid_size"] = {"w": grid_w, "h": grid_h}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Count entities
|
||||
try:
|
||||
entity_count = len(list(element.entities))
|
||||
affordance["entity_count"] = entity_count
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Add hint if name was used
|
||||
name = getattr(element, 'name', None)
|
||||
if name and str(name).strip():
|
||||
affordance["hint"] = f"Developer label: {name}"
|
||||
|
||||
return affordance
|
||||
|
||||
|
||||
def extract_affordances_recursive(element, affordances: List[Dict], depth: int = 0, max_depth: int = 10) -> None:
|
||||
"""Recursively extract affordances from element and children.
|
||||
|
||||
Args:
|
||||
element: Current element to process
|
||||
affordances: List to append affordances to
|
||||
depth: Current recursion depth
|
||||
max_depth: Maximum depth to recurse
|
||||
"""
|
||||
if depth > max_depth:
|
||||
return
|
||||
|
||||
# Extract affordance for this element
|
||||
affordance = extract_affordance(element)
|
||||
if affordance:
|
||||
affordances.append(affordance)
|
||||
|
||||
# Process children
|
||||
if hasattr(element, 'children'):
|
||||
try:
|
||||
for child in element.children:
|
||||
extract_affordances_recursive(child, affordances, depth + 1, max_depth)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def extract_affordances() -> List[Dict[str, Any]]:
|
||||
"""Extract all interactive affordances from the current scene.
|
||||
|
||||
Returns:
|
||||
List of affordance dictionaries
|
||||
"""
|
||||
global _affordance_id
|
||||
_affordance_id = 0 # Reset counter
|
||||
|
||||
scene = mcrfpy.current_scene
|
||||
affordances = []
|
||||
|
||||
try:
|
||||
if scene:
|
||||
for element in scene.children:
|
||||
extract_affordances_recursive(element, affordances)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
return affordances
|
||||
|
||||
|
||||
def extract_keyboard_hints() -> List[Dict[str, str]]:
|
||||
"""Extract keyboard control hints.
|
||||
|
||||
This is a placeholder that returns common roguelike controls.
|
||||
Games can override this via the metadata endpoint.
|
||||
|
||||
Returns:
|
||||
List of keyboard hint dictionaries
|
||||
"""
|
||||
# Default roguelike controls - games should customize via metadata
|
||||
return [
|
||||
{"key": "W/A/S/D", "action": "Move"},
|
||||
{"key": "Arrow keys", "action": "Move (alternative)"},
|
||||
{"key": "ESCAPE", "action": "Menu/Cancel"},
|
||||
{"key": "SPACE", "action": "Confirm/Interact"},
|
||||
{"key": "1-9", "action": "Use inventory item"},
|
||||
]
|
||||
|
||||
|
||||
def find_affordance_by_id(affordance_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""Find an affordance by its ID.
|
||||
|
||||
Args:
|
||||
affordance_id: The ID to search for
|
||||
|
||||
Returns:
|
||||
The affordance dict or None
|
||||
"""
|
||||
affordances = extract_affordances()
|
||||
for aff in affordances:
|
||||
if aff.get("id") == affordance_id:
|
||||
return aff
|
||||
return None
|
||||
|
||||
|
||||
def find_affordance_by_label(label: str, fuzzy: bool = True) -> Optional[Dict[str, Any]]:
|
||||
"""Find an affordance by its label.
|
||||
|
||||
Args:
|
||||
label: The label to search for
|
||||
fuzzy: If True, do case-insensitive substring matching
|
||||
|
||||
Returns:
|
||||
The affordance dict or None
|
||||
"""
|
||||
affordances = extract_affordances()
|
||||
label_lower = label.lower()
|
||||
|
||||
for aff in affordances:
|
||||
aff_label = aff.get("label")
|
||||
if aff_label is None:
|
||||
continue
|
||||
|
||||
if fuzzy:
|
||||
if label_lower in aff_label.lower():
|
||||
return aff
|
||||
else:
|
||||
if aff_label == label:
|
||||
return aff
|
||||
|
||||
return None
|
||||
100
src/scripts/api/example_integration.py
Normal file
100
src/scripts/api/example_integration.py
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
"""Example: Integrating a McRogueFace game with the API bridge.
|
||||
|
||||
This file shows how a game script can integrate with the API to provide
|
||||
rich metadata and semantic hints for external clients.
|
||||
"""
|
||||
|
||||
# Import the API module
|
||||
import sys
|
||||
sys.path.insert(0, '../src/scripts')
|
||||
|
||||
from api import start_server
|
||||
from api.metadata import (
|
||||
set_game_info,
|
||||
set_controls,
|
||||
set_keyboard_hints,
|
||||
set_custom_hints,
|
||||
register_scene,
|
||||
)
|
||||
|
||||
|
||||
def setup_api_for_crypt_of_sokoban():
|
||||
"""Example setup for the Crypt of Sokoban game."""
|
||||
|
||||
# Set basic game info
|
||||
set_game_info(
|
||||
name="Crypt of Sokoban",
|
||||
version="0.1.0",
|
||||
description="A puzzle roguelike combining Sokoban mechanics with dungeon exploration. Push boulders, avoid enemies, and descend deeper into the crypt.",
|
||||
author="7DRL 2025"
|
||||
)
|
||||
|
||||
# Set control descriptions
|
||||
set_controls({
|
||||
"movement": "W/A/S/D keys",
|
||||
"wait": "Period (.) to skip turn",
|
||||
"zap": "Z to use equipped item's active ability",
|
||||
"use_item_1": "Numpad 1 to use first inventory slot",
|
||||
"use_item_2": "Numpad 2 to use second inventory slot",
|
||||
"use_item_3": "Numpad 3 to use third inventory slot",
|
||||
"pull_boulder": "X to pull adjacent boulder toward you",
|
||||
"debug_descend": "P to descend (debug)",
|
||||
})
|
||||
|
||||
# Set keyboard hints for LLM context
|
||||
set_keyboard_hints([
|
||||
{"key": "W", "action": "Move up"},
|
||||
{"key": "A", "action": "Move left"},
|
||||
{"key": "S", "action": "Move down"},
|
||||
{"key": "D", "action": "Move right"},
|
||||
{"key": ".", "action": "Wait (skip turn, enemies still move)"},
|
||||
{"key": "Z", "action": "Zap - use equipped item's ability"},
|
||||
{"key": "X", "action": "Pull boulder toward you"},
|
||||
{"key": "1", "action": "Use item in slot 1"},
|
||||
{"key": "2", "action": "Use item in slot 2"},
|
||||
{"key": "3", "action": "Use item in slot 3"},
|
||||
])
|
||||
|
||||
# Set custom hints for LLM strategizing
|
||||
set_custom_hints("""
|
||||
Crypt of Sokoban Strategy Guide:
|
||||
|
||||
CORE MECHANICS:
|
||||
- Push boulders by walking into them (if space behind is clear)
|
||||
- Pull boulders with X key while standing adjacent
|
||||
- Boulders block enemy movement - use them as barriers!
|
||||
- Step on buttons to unlock doors/exits
|
||||
- Each floor has one exit that leads deeper
|
||||
|
||||
ENEMIES:
|
||||
- Rats: Basic enemy, moves toward you
|
||||
- Big Rats: Tougher, 2 damage per hit
|
||||
- Cyclops: Very dangerous, 3 damage, can push boulders!
|
||||
|
||||
ITEMS:
|
||||
- Potions: Consumable healing/buffs (use with 1/2/3 keys)
|
||||
- Weapons: Equip for passive bonuses and active abilities
|
||||
- Each item has a "zap" ability on cooldown
|
||||
|
||||
STRATEGY TIPS:
|
||||
- Always look for buttons before moving - stepping on them opens paths
|
||||
- Trap enemies behind boulders when possible
|
||||
- Don't get cornered - keep escape routes open
|
||||
- Use Z ability when enemies cluster together
|
||||
- Higher floors have better loot but harder enemies
|
||||
""")
|
||||
|
||||
# Register scenes
|
||||
register_scene("menu", "Main menu with Play, Settings, and audio controls")
|
||||
register_scene("play", "Main gameplay scene with dungeon grid, HUD, and sidebar")
|
||||
|
||||
# Start the server
|
||||
server = start_server(8765)
|
||||
print("[Game] API bridge configured for Crypt of Sokoban")
|
||||
|
||||
return server
|
||||
|
||||
|
||||
# Example usage in game.py:
|
||||
# from api.example_integration import setup_api_for_crypt_of_sokoban
|
||||
# api_server = setup_api_for_crypt_of_sokoban()
|
||||
373
src/scripts/api/input_handler.py
Normal file
373
src/scripts/api/input_handler.py
Normal file
|
|
@ -0,0 +1,373 @@
|
|||
"""Input action handling for the McRogueFace Game API.
|
||||
|
||||
Dispatches input actions to the mcrfpy.automation module for
|
||||
keyboard/mouse injection into the game.
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
import mcrfpy
|
||||
|
||||
from .affordances import find_affordance_by_id, find_affordance_by_label
|
||||
|
||||
|
||||
def execute_action(action_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Execute an input action.
|
||||
|
||||
Args:
|
||||
action_data: Dictionary describing the action to perform
|
||||
|
||||
Returns:
|
||||
Result dictionary with success status
|
||||
|
||||
Raises:
|
||||
ValueError: If action is invalid or missing required fields
|
||||
"""
|
||||
action = action_data.get("action")
|
||||
|
||||
if action == "click":
|
||||
return execute_click(action_data)
|
||||
elif action == "click_affordance":
|
||||
return execute_click_affordance(action_data)
|
||||
elif action == "type":
|
||||
return execute_type(action_data)
|
||||
elif action == "key":
|
||||
return execute_key(action_data)
|
||||
elif action == "hotkey":
|
||||
return execute_hotkey(action_data)
|
||||
elif action == "grid_click":
|
||||
return execute_grid_click(action_data)
|
||||
elif action == "move":
|
||||
return execute_move(action_data)
|
||||
elif action == "drag":
|
||||
return execute_drag(action_data)
|
||||
else:
|
||||
raise ValueError(f"Unknown action: {action}")
|
||||
|
||||
|
||||
def execute_click(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Execute a mouse click at coordinates.
|
||||
|
||||
Required fields:
|
||||
x: X coordinate
|
||||
y: Y coordinate
|
||||
|
||||
Optional fields:
|
||||
button: 'left', 'right', or 'middle' (default: 'left')
|
||||
clicks: Number of clicks (default: 1)
|
||||
"""
|
||||
x = data.get("x")
|
||||
y = data.get("y")
|
||||
|
||||
if x is None or y is None:
|
||||
raise ValueError("click requires 'x' and 'y' coordinates")
|
||||
|
||||
button = data.get("button", "left")
|
||||
clicks = data.get("clicks", 1)
|
||||
|
||||
# Validate button
|
||||
if button not in ("left", "right", "middle"):
|
||||
raise ValueError(f"Invalid button: {button}")
|
||||
|
||||
# Use automation module - click expects pos as tuple
|
||||
mcrfpy.automation.click(pos=(int(x), int(y)), clicks=clicks, button=button)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "click",
|
||||
"x": x,
|
||||
"y": y,
|
||||
"button": button,
|
||||
"clicks": clicks
|
||||
}
|
||||
|
||||
|
||||
def execute_click_affordance(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Click an affordance by ID or label.
|
||||
|
||||
Required fields (one of):
|
||||
id: Affordance ID
|
||||
label: Affordance label (supports fuzzy matching)
|
||||
|
||||
Optional fields:
|
||||
button: 'left', 'right', or 'middle' (default: 'left')
|
||||
"""
|
||||
affordance = None
|
||||
|
||||
if "id" in data:
|
||||
affordance = find_affordance_by_id(int(data["id"]))
|
||||
if not affordance:
|
||||
raise ValueError(f"Affordance with ID {data['id']} not found")
|
||||
elif "label" in data:
|
||||
affordance = find_affordance_by_label(data["label"], fuzzy=True)
|
||||
if not affordance:
|
||||
raise ValueError(f"Affordance with label '{data['label']}' not found")
|
||||
else:
|
||||
raise ValueError("click_affordance requires 'id' or 'label'")
|
||||
|
||||
# Get center of affordance bounds
|
||||
bounds = affordance["bounds"]
|
||||
center_x = bounds["x"] + bounds["w"] / 2
|
||||
center_y = bounds["y"] + bounds["h"] / 2
|
||||
|
||||
button = data.get("button", "left")
|
||||
|
||||
mcrfpy.automation.click(pos=(int(center_x), int(center_y)), button=button)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "click_affordance",
|
||||
"affordance_id": affordance["id"],
|
||||
"affordance_label": affordance.get("label"),
|
||||
"x": center_x,
|
||||
"y": center_y,
|
||||
"button": button
|
||||
}
|
||||
|
||||
|
||||
def execute_type(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Type text into the game.
|
||||
|
||||
Required fields:
|
||||
text: String to type
|
||||
|
||||
Optional fields:
|
||||
interval: Delay between keystrokes in seconds (default: 0)
|
||||
"""
|
||||
text = data.get("text")
|
||||
if text is None:
|
||||
raise ValueError("type requires 'text' field")
|
||||
|
||||
interval = float(data.get("interval", 0))
|
||||
|
||||
mcrfpy.automation.typewrite(str(text), interval=interval)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "type",
|
||||
"text": text,
|
||||
"length": len(text)
|
||||
}
|
||||
|
||||
|
||||
def execute_key(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Press a single key.
|
||||
|
||||
Required fields:
|
||||
key: Key name (e.g., 'ESCAPE', 'SPACE', 'W', 'F1')
|
||||
"""
|
||||
key = data.get("key")
|
||||
if key is None:
|
||||
raise ValueError("key requires 'key' field")
|
||||
|
||||
# Normalize key name
|
||||
key = str(key).upper()
|
||||
|
||||
# Map common key names
|
||||
key_map = {
|
||||
"ESCAPE": "escape",
|
||||
"ESC": "escape",
|
||||
"SPACE": "space",
|
||||
"ENTER": "return",
|
||||
"RETURN": "return",
|
||||
"TAB": "tab",
|
||||
"BACKSPACE": "backspace",
|
||||
"DELETE": "delete",
|
||||
"UP": "up",
|
||||
"DOWN": "down",
|
||||
"LEFT": "left",
|
||||
"RIGHT": "right",
|
||||
"SHIFT": "shift",
|
||||
"CTRL": "ctrl",
|
||||
"CONTROL": "ctrl",
|
||||
"ALT": "alt",
|
||||
}
|
||||
|
||||
normalized_key = key_map.get(key, key.lower())
|
||||
|
||||
# Press and release
|
||||
mcrfpy.automation.keyDown(normalized_key)
|
||||
mcrfpy.automation.keyUp(normalized_key)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "key",
|
||||
"key": key
|
||||
}
|
||||
|
||||
|
||||
def execute_hotkey(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Press a key combination.
|
||||
|
||||
Required fields:
|
||||
keys: List of keys to press together (e.g., ['ctrl', 's'])
|
||||
"""
|
||||
keys = data.get("keys")
|
||||
if keys is None or not isinstance(keys, list):
|
||||
raise ValueError("hotkey requires 'keys' as a list")
|
||||
|
||||
if len(keys) == 0:
|
||||
raise ValueError("hotkey requires at least one key")
|
||||
|
||||
# Press all modifier keys down
|
||||
for key in keys[:-1]:
|
||||
mcrfpy.automation.keyDown(str(key).lower())
|
||||
|
||||
# Press and release the final key
|
||||
final_key = str(keys[-1]).lower()
|
||||
mcrfpy.automation.keyDown(final_key)
|
||||
mcrfpy.automation.keyUp(final_key)
|
||||
|
||||
# Release modifier keys in reverse order
|
||||
for key in reversed(keys[:-1]):
|
||||
mcrfpy.automation.keyUp(str(key).lower())
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "hotkey",
|
||||
"keys": keys
|
||||
}
|
||||
|
||||
|
||||
def execute_grid_click(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Click a specific cell in a grid.
|
||||
|
||||
Required fields:
|
||||
cell: Dict with 'x' and 'y' grid coordinates
|
||||
|
||||
Optional fields:
|
||||
grid_id: Name of the grid (uses first grid if not specified)
|
||||
button: 'left', 'right', or 'middle' (default: 'left')
|
||||
"""
|
||||
cell = data.get("cell")
|
||||
if cell is None or "x" not in cell or "y" not in cell:
|
||||
raise ValueError("grid_click requires 'cell' with 'x' and 'y'")
|
||||
|
||||
cell_x = int(cell["x"])
|
||||
cell_y = int(cell["y"])
|
||||
button = data.get("button", "left")
|
||||
grid_name = data.get("grid_id")
|
||||
|
||||
# Find the grid
|
||||
scene = mcrfpy.current_scene
|
||||
if not scene:
|
||||
raise ValueError("No active scene")
|
||||
|
||||
target_grid = None
|
||||
for element in scene.children:
|
||||
if type(element).__name__ == "Grid":
|
||||
name = getattr(element, 'name', None)
|
||||
if grid_name is None or name == grid_name:
|
||||
target_grid = element
|
||||
break
|
||||
|
||||
if target_grid is None:
|
||||
raise ValueError(f"Grid not found" + (f": {grid_name}" if grid_name else ""))
|
||||
|
||||
# Calculate pixel position from cell coordinates
|
||||
# Get grid bounds and cell size
|
||||
try:
|
||||
gx, gy, gw, gh = target_grid.get_bounds()
|
||||
except Exception:
|
||||
gx = float(target_grid.x)
|
||||
gy = float(target_grid.y)
|
||||
gw = float(target_grid.w)
|
||||
gh = float(target_grid.h)
|
||||
|
||||
grid_size = target_grid.grid_size
|
||||
grid_w = int(grid_size.x) if hasattr(grid_size, 'x') else int(grid_size[0])
|
||||
grid_h = int(grid_size.y) if hasattr(grid_size, 'y') else int(grid_size[1])
|
||||
|
||||
zoom = float(getattr(target_grid, 'zoom', 1.0))
|
||||
center = getattr(target_grid, 'center', None)
|
||||
|
||||
# Calculate cell size in pixels
|
||||
# This is approximate - actual rendering may differ based on zoom/center
|
||||
if grid_w > 0 and grid_h > 0:
|
||||
cell_pixel_w = gw / grid_w
|
||||
cell_pixel_h = gh / grid_h
|
||||
|
||||
# Calculate center of target cell
|
||||
pixel_x = gx + (cell_x + 0.5) * cell_pixel_w
|
||||
pixel_y = gy + (cell_y + 0.5) * cell_pixel_h
|
||||
|
||||
mcrfpy.automation.click(pos=(int(pixel_x), int(pixel_y)), button=button)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "grid_click",
|
||||
"cell": {"x": cell_x, "y": cell_y},
|
||||
"pixel": {"x": pixel_x, "y": pixel_y},
|
||||
"button": button
|
||||
}
|
||||
else:
|
||||
raise ValueError("Grid has invalid dimensions")
|
||||
|
||||
|
||||
def execute_move(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Move the mouse to coordinates without clicking.
|
||||
|
||||
Required fields:
|
||||
x: X coordinate
|
||||
y: Y coordinate
|
||||
|
||||
Optional fields:
|
||||
duration: Time to move in seconds (default: 0)
|
||||
"""
|
||||
x = data.get("x")
|
||||
y = data.get("y")
|
||||
|
||||
if x is None or y is None:
|
||||
raise ValueError("move requires 'x' and 'y' coordinates")
|
||||
|
||||
duration = float(data.get("duration", 0))
|
||||
|
||||
mcrfpy.automation.moveTo(int(x), int(y), duration=duration)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "move",
|
||||
"x": x,
|
||||
"y": y,
|
||||
"duration": duration
|
||||
}
|
||||
|
||||
|
||||
def execute_drag(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Drag from current position or specified start to end.
|
||||
|
||||
Required fields:
|
||||
end_x: End X coordinate
|
||||
end_y: End Y coordinate
|
||||
|
||||
Optional fields:
|
||||
start_x: Start X coordinate
|
||||
start_y: Start Y coordinate
|
||||
button: 'left', 'right', or 'middle' (default: 'left')
|
||||
duration: Time to drag in seconds (default: 0)
|
||||
"""
|
||||
end_x = data.get("end_x")
|
||||
end_y = data.get("end_y")
|
||||
|
||||
if end_x is None or end_y is None:
|
||||
raise ValueError("drag requires 'end_x' and 'end_y' coordinates")
|
||||
|
||||
start_x = data.get("start_x")
|
||||
start_y = data.get("start_y")
|
||||
button = data.get("button", "left")
|
||||
duration = float(data.get("duration", 0))
|
||||
|
||||
# Move to start if specified
|
||||
if start_x is not None and start_y is not None:
|
||||
mcrfpy.automation.moveTo(int(start_x), int(start_y))
|
||||
|
||||
mcrfpy.automation.dragTo(int(end_x), int(end_y), duration=duration, button=button)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "drag",
|
||||
"end_x": end_x,
|
||||
"end_y": end_y,
|
||||
"button": button,
|
||||
"duration": duration
|
||||
}
|
||||
282
src/scripts/api/introspection.py
Normal file
282
src/scripts/api/introspection.py
Normal file
|
|
@ -0,0 +1,282 @@
|
|||
"""Scene graph introspection for the McRogueFace Game API.
|
||||
|
||||
Provides functions to serialize the current scene graph to JSON,
|
||||
extracting element types, bounds, properties, and children.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import time
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
|
||||
import mcrfpy
|
||||
|
||||
|
||||
def get_element_type(element) -> str:
|
||||
"""Get the type name of a UI element."""
|
||||
type_name = type(element).__name__
|
||||
# Handle potential wrapper types
|
||||
if hasattr(element, '__class__'):
|
||||
return element.__class__.__name__
|
||||
return type_name
|
||||
|
||||
|
||||
def get_bounds(element) -> Dict[str, float]:
|
||||
"""Extract bounding box from an element."""
|
||||
try:
|
||||
if hasattr(element, 'get_bounds'):
|
||||
x, y, w, h = element.get_bounds()
|
||||
return {"x": x, "y": y, "w": w, "h": h}
|
||||
elif hasattr(element, 'x') and hasattr(element, 'y'):
|
||||
x = float(element.x) if element.x is not None else 0
|
||||
y = float(element.y) if element.y is not None else 0
|
||||
w = float(getattr(element, 'w', 0) or 0)
|
||||
h = float(getattr(element, 'h', 0) or 0)
|
||||
return {"x": x, "y": y, "w": w, "h": h}
|
||||
except Exception:
|
||||
pass
|
||||
return {"x": 0, "y": 0, "w": 0, "h": 0}
|
||||
|
||||
|
||||
def is_interactive(element) -> bool:
|
||||
"""Check if an element has interactive callbacks."""
|
||||
return (
|
||||
getattr(element, 'on_click', None) is not None or
|
||||
getattr(element, 'on_enter', None) is not None or
|
||||
getattr(element, 'on_exit', None) is not None or
|
||||
getattr(element, 'on_cell_click', None) is not None
|
||||
)
|
||||
|
||||
|
||||
def serialize_color(color) -> Optional[Dict[str, int]]:
|
||||
"""Serialize a color object to dict."""
|
||||
if color is None:
|
||||
return None
|
||||
try:
|
||||
return {
|
||||
"r": int(color.r),
|
||||
"g": int(color.g),
|
||||
"b": int(color.b),
|
||||
"a": int(getattr(color, 'a', 255))
|
||||
}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def serialize_vector(vec) -> Optional[Dict[str, float]]:
|
||||
"""Serialize a Vector object to dict."""
|
||||
if vec is None:
|
||||
return None
|
||||
try:
|
||||
return {"x": float(vec.x), "y": float(vec.y)}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def serialize_entity(entity) -> Dict[str, Any]:
|
||||
"""Serialize an Entity to dict."""
|
||||
data = {
|
||||
"type": "Entity",
|
||||
"name": getattr(entity, 'name', None) or "",
|
||||
"grid_x": float(getattr(entity, 'grid_x', 0)),
|
||||
"grid_y": float(getattr(entity, 'grid_y', 0)),
|
||||
"sprite_index": int(getattr(entity, 'sprite_index', 0)),
|
||||
"visible": bool(getattr(entity, 'visible', True)),
|
||||
}
|
||||
return data
|
||||
|
||||
|
||||
def serialize_grid(grid) -> Dict[str, Any]:
|
||||
"""Serialize a Grid element with its entities."""
|
||||
bounds = get_bounds(grid)
|
||||
|
||||
# Get grid dimensions
|
||||
grid_size = getattr(grid, 'grid_size', None)
|
||||
if grid_size:
|
||||
try:
|
||||
grid_w = int(grid_size.x) if hasattr(grid_size, 'x') else int(grid_size[0])
|
||||
grid_h = int(grid_size.y) if hasattr(grid_size, 'y') else int(grid_size[1])
|
||||
except Exception:
|
||||
grid_w = grid_h = 0
|
||||
else:
|
||||
grid_w = int(getattr(grid, 'grid_w', 0))
|
||||
grid_h = int(getattr(grid, 'grid_h', 0))
|
||||
|
||||
# Get camera info
|
||||
center = getattr(grid, 'center', None)
|
||||
zoom = float(getattr(grid, 'zoom', 1.0))
|
||||
|
||||
# Serialize entities
|
||||
entities = []
|
||||
try:
|
||||
for entity in grid.entities:
|
||||
entities.append(serialize_entity(entity))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
data = {
|
||||
"type": "Grid",
|
||||
"name": getattr(grid, 'name', None) or "",
|
||||
"bounds": bounds,
|
||||
"visible": bool(getattr(grid, 'visible', True)),
|
||||
"interactive": is_interactive(grid),
|
||||
"grid_size": {"w": grid_w, "h": grid_h},
|
||||
"zoom": zoom,
|
||||
"center": serialize_vector(center),
|
||||
"entity_count": len(entities),
|
||||
"entities": entities,
|
||||
"has_cell_click": getattr(grid, 'on_cell_click', None) is not None,
|
||||
"has_cell_enter": getattr(grid, 'on_cell_enter', None) is not None,
|
||||
}
|
||||
|
||||
# Add cell size estimate if texture available
|
||||
texture = getattr(grid, 'texture', None)
|
||||
if texture:
|
||||
# Texture dimensions divided by sprite count would give cell size
|
||||
# but this is an approximation
|
||||
data["has_texture"] = True
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def serialize_element(element, depth: int = 0, max_depth: int = 10) -> Dict[str, Any]:
|
||||
"""Serialize a UI element to a dictionary.
|
||||
|
||||
Args:
|
||||
element: The UI element to serialize
|
||||
depth: Current recursion depth
|
||||
max_depth: Maximum recursion depth for children
|
||||
|
||||
Returns:
|
||||
Dictionary representation of the element
|
||||
"""
|
||||
element_type = get_element_type(element)
|
||||
bounds = get_bounds(element)
|
||||
|
||||
# Special handling for Grid
|
||||
if element_type == "Grid":
|
||||
return serialize_grid(element)
|
||||
|
||||
data = {
|
||||
"type": element_type,
|
||||
"name": getattr(element, 'name', None) or "",
|
||||
"bounds": bounds,
|
||||
"visible": bool(getattr(element, 'visible', True)),
|
||||
"interactive": is_interactive(element),
|
||||
"z_index": int(getattr(element, 'z_index', 0)),
|
||||
}
|
||||
|
||||
# Add type-specific properties
|
||||
if element_type == "Caption":
|
||||
data["text"] = str(getattr(element, 'text', ""))
|
||||
data["font_size"] = float(getattr(element, 'font_size', 12))
|
||||
data["fill_color"] = serialize_color(getattr(element, 'fill_color', None))
|
||||
|
||||
elif element_type == "Frame":
|
||||
data["fill_color"] = serialize_color(getattr(element, 'fill_color', None))
|
||||
data["outline"] = float(getattr(element, 'outline', 0))
|
||||
data["clip_children"] = bool(getattr(element, 'clip_children', False))
|
||||
|
||||
elif element_type == "Sprite":
|
||||
data["sprite_index"] = int(getattr(element, 'sprite_index', 0))
|
||||
data["scale"] = float(getattr(element, 'scale', 1.0))
|
||||
|
||||
elif element_type in ("Line", "Circle", "Arc"):
|
||||
data["color"] = serialize_color(getattr(element, 'color', None))
|
||||
if element_type == "Circle":
|
||||
data["radius"] = float(getattr(element, 'radius', 0))
|
||||
elif element_type == "Arc":
|
||||
data["radius"] = float(getattr(element, 'radius', 0))
|
||||
data["start_angle"] = float(getattr(element, 'start_angle', 0))
|
||||
data["end_angle"] = float(getattr(element, 'end_angle', 0))
|
||||
|
||||
# Handle children recursively
|
||||
if hasattr(element, 'children') and depth < max_depth:
|
||||
children = []
|
||||
try:
|
||||
for child in element.children:
|
||||
children.append(serialize_element(child, depth + 1, max_depth))
|
||||
except Exception:
|
||||
pass
|
||||
data["children"] = children
|
||||
data["child_count"] = len(children)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def serialize_scene() -> Dict[str, Any]:
|
||||
"""Serialize the entire current scene graph.
|
||||
|
||||
Returns:
|
||||
Dictionary with scene name, viewport info, and all elements
|
||||
"""
|
||||
scene = mcrfpy.current_scene
|
||||
scene_name = scene.name if scene else "unknown"
|
||||
|
||||
# Get viewport size
|
||||
try:
|
||||
width, height = mcrfpy.automation.size()
|
||||
except Exception:
|
||||
width, height = 1024, 768
|
||||
|
||||
# Get scene UI elements from the scene object directly
|
||||
elements = []
|
||||
try:
|
||||
if scene:
|
||||
for element in scene.children:
|
||||
elements.append(serialize_element(element))
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
return {
|
||||
"scene_name": scene_name,
|
||||
"timestamp": time.time(),
|
||||
"viewport": {"width": width, "height": height},
|
||||
"element_count": len(elements),
|
||||
"elements": elements
|
||||
}
|
||||
|
||||
|
||||
def get_scene_hash() -> str:
|
||||
"""Compute a hash of the current scene state for change detection.
|
||||
|
||||
This is a lightweight hash that captures:
|
||||
- Scene name
|
||||
- Number of top-level elements
|
||||
- Element names and types
|
||||
|
||||
Returns:
|
||||
Hex string hash of scene state
|
||||
"""
|
||||
scene = mcrfpy.current_scene
|
||||
scene_name = scene.name if scene else "unknown"
|
||||
|
||||
state_parts = [scene_name]
|
||||
|
||||
try:
|
||||
if scene:
|
||||
ui = scene.children
|
||||
state_parts.append(str(len(ui)))
|
||||
|
||||
for element in ui:
|
||||
element_type = get_element_type(element)
|
||||
name = getattr(element, 'name', '') or ''
|
||||
state_parts.append(f"{element_type}:{name}")
|
||||
|
||||
# For captions, include text (truncated)
|
||||
if element_type == "Caption":
|
||||
text = str(getattr(element, 'text', ''))[:50]
|
||||
state_parts.append(text)
|
||||
|
||||
# For grids, include entity count
|
||||
if element_type == "Grid":
|
||||
try:
|
||||
entity_count = len(list(element.entities))
|
||||
state_parts.append(f"entities:{entity_count}")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
state_str = "|".join(state_parts)
|
||||
return hashlib.md5(state_str.encode()).hexdigest()[:16]
|
||||
169
src/scripts/api/metadata.py
Normal file
169
src/scripts/api/metadata.py
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
"""Game metadata management for the McRogueFace Game API.
|
||||
|
||||
Stores and retrieves game-specific metadata that helps external clients
|
||||
understand the game's controls, purpose, and current state.
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, List, Optional
|
||||
import time
|
||||
|
||||
# Global metadata storage
|
||||
_metadata: Dict[str, Any] = {
|
||||
"game_name": "McRogueFace Game",
|
||||
"version": "0.1.0",
|
||||
"description": "A game built with McRogueFace engine",
|
||||
"controls": {
|
||||
"movement": "W/A/S/D or Arrow keys",
|
||||
"interact": "Space or Enter",
|
||||
"cancel": "Escape",
|
||||
},
|
||||
"scenes": [],
|
||||
"custom_hints": "",
|
||||
"keyboard_hints": [],
|
||||
}
|
||||
|
||||
|
||||
def get_game_metadata() -> Dict[str, Any]:
|
||||
"""Get the current game metadata.
|
||||
|
||||
Returns:
|
||||
Dictionary containing game metadata
|
||||
"""
|
||||
import mcrfpy
|
||||
|
||||
# Add dynamic data
|
||||
result = dict(_metadata)
|
||||
scene = mcrfpy.current_scene
|
||||
result["current_scene"] = scene.name if scene else "unknown"
|
||||
result["timestamp"] = time.time()
|
||||
|
||||
# Get viewport size
|
||||
try:
|
||||
width, height = mcrfpy.automation.size()
|
||||
result["viewport"] = {"width": width, "height": height}
|
||||
except Exception:
|
||||
result["viewport"] = {"width": 1024, "height": 768}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def set_game_metadata(data: Dict[str, Any]) -> None:
|
||||
"""Update game metadata.
|
||||
|
||||
Games can call this to provide richer information to API clients.
|
||||
|
||||
Args:
|
||||
data: Dictionary of metadata fields to update
|
||||
"""
|
||||
global _metadata
|
||||
|
||||
# Allowed fields to update
|
||||
allowed_fields = {
|
||||
"game_name",
|
||||
"version",
|
||||
"description",
|
||||
"controls",
|
||||
"scenes",
|
||||
"custom_hints",
|
||||
"keyboard_hints",
|
||||
"author",
|
||||
"url",
|
||||
"tags",
|
||||
}
|
||||
|
||||
for key, value in data.items():
|
||||
if key in allowed_fields:
|
||||
_metadata[key] = value
|
||||
|
||||
|
||||
def set_game_info(
|
||||
name: Optional[str] = None,
|
||||
version: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
author: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Convenience function to set basic game info.
|
||||
|
||||
Args:
|
||||
name: Game name
|
||||
version: Game version string
|
||||
description: Brief description
|
||||
author: Author name
|
||||
"""
|
||||
if name is not None:
|
||||
_metadata["game_name"] = name
|
||||
if version is not None:
|
||||
_metadata["version"] = version
|
||||
if description is not None:
|
||||
_metadata["description"] = description
|
||||
if author is not None:
|
||||
_metadata["author"] = author
|
||||
|
||||
|
||||
def set_controls(controls: Dict[str, str]) -> None:
|
||||
"""Set control descriptions.
|
||||
|
||||
Args:
|
||||
controls: Dict mapping action names to control descriptions
|
||||
"""
|
||||
_metadata["controls"] = controls
|
||||
|
||||
|
||||
def set_keyboard_hints(hints: List[Dict[str, str]]) -> None:
|
||||
"""Set keyboard hints for the API.
|
||||
|
||||
Args:
|
||||
hints: List of dicts with 'key' and 'action' fields
|
||||
"""
|
||||
_metadata["keyboard_hints"] = hints
|
||||
|
||||
|
||||
def add_keyboard_hint(key: str, action: str) -> None:
|
||||
"""Add a single keyboard hint.
|
||||
|
||||
Args:
|
||||
key: Key or key combination
|
||||
action: Description of what it does
|
||||
"""
|
||||
if "keyboard_hints" not in _metadata:
|
||||
_metadata["keyboard_hints"] = []
|
||||
|
||||
_metadata["keyboard_hints"].append({
|
||||
"key": key,
|
||||
"action": action
|
||||
})
|
||||
|
||||
|
||||
def set_custom_hints(hints: str) -> None:
|
||||
"""Set custom hints text for LLM context.
|
||||
|
||||
This can be any text that helps an LLM understand
|
||||
the game better - strategy tips, game rules, etc.
|
||||
|
||||
Args:
|
||||
hints: Free-form hint text
|
||||
"""
|
||||
_metadata["custom_hints"] = hints
|
||||
|
||||
|
||||
def register_scene(scene_name: str, description: Optional[str] = None) -> None:
|
||||
"""Register a scene for the API.
|
||||
|
||||
Args:
|
||||
scene_name: Name of the scene
|
||||
description: Optional description of what this scene is for
|
||||
"""
|
||||
if "scenes" not in _metadata:
|
||||
_metadata["scenes"] = []
|
||||
|
||||
scene_info = {"name": scene_name}
|
||||
if description:
|
||||
scene_info["description"] = description
|
||||
|
||||
# Update if exists, add if not
|
||||
for i, s in enumerate(_metadata["scenes"]):
|
||||
if s.get("name") == scene_name:
|
||||
_metadata["scenes"][i] = scene_info
|
||||
return
|
||||
|
||||
_metadata["scenes"].append(scene_info)
|
||||
371
src/scripts/api/server.py
Normal file
371
src/scripts/api/server.py
Normal file
|
|
@ -0,0 +1,371 @@
|
|||
"""HTTP server for the McRogueFace Game API.
|
||||
|
||||
Runs in a background daemon thread and provides REST endpoints for
|
||||
scene introspection, affordance analysis, and input injection.
|
||||
"""
|
||||
|
||||
import json
|
||||
import threading
|
||||
import hashlib
|
||||
import time
|
||||
import tempfile
|
||||
import os
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
import mcrfpy
|
||||
|
||||
from .introspection import serialize_scene, get_scene_hash
|
||||
from .affordances import extract_affordances, extract_keyboard_hints
|
||||
from .input_handler import execute_action
|
||||
from .metadata import get_game_metadata, set_game_metadata
|
||||
|
||||
|
||||
class GameAPIHandler(BaseHTTPRequestHandler):
|
||||
"""HTTP request handler for the game API."""
|
||||
|
||||
# Class-level reference to server for metadata access
|
||||
api_server = None
|
||||
|
||||
def log_message(self, format, *args):
|
||||
"""Override to use custom logging instead of stderr."""
|
||||
# Enable for debugging
|
||||
print(f"[API] {format % args}", flush=True)
|
||||
|
||||
def log_error(self, format, *args):
|
||||
"""Log errors."""
|
||||
print(f"[API ERROR] {format % args}", flush=True)
|
||||
|
||||
def send_json(self, data: Dict[str, Any], status: int = 200) -> None:
|
||||
"""Send a JSON response."""
|
||||
body = json.dumps(data, indent=2).encode('utf-8')
|
||||
self.send_response(status)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.send_header('Content-Length', len(body))
|
||||
self.send_header('Access-Control-Allow-Origin', '*')
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def send_error_json(self, message: str, status: int = 400) -> None:
|
||||
"""Send a JSON error response."""
|
||||
self.send_json({"error": message}, status)
|
||||
|
||||
def send_file(self, filepath: str, content_type: str = 'application/octet-stream') -> None:
|
||||
"""Send a file as response."""
|
||||
try:
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', content_type)
|
||||
self.send_header('Content-Length', len(data))
|
||||
self.send_header('Access-Control-Allow-Origin', '*')
|
||||
self.end_headers()
|
||||
self.wfile.write(data)
|
||||
except FileNotFoundError:
|
||||
self.send_error_json("File not found", 404)
|
||||
|
||||
def parse_json_body(self) -> Optional[Dict[str, Any]]:
|
||||
"""Parse JSON body from request."""
|
||||
try:
|
||||
content_length = int(self.headers.get('Content-Length', 0))
|
||||
if content_length == 0:
|
||||
return {}
|
||||
body = self.rfile.read(content_length)
|
||||
return json.loads(body.decode('utf-8'))
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
return None
|
||||
|
||||
def do_OPTIONS(self):
|
||||
"""Handle CORS preflight requests."""
|
||||
self.send_response(200)
|
||||
self.send_header('Access-Control-Allow-Origin', '*')
|
||||
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
||||
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
||||
self.end_headers()
|
||||
|
||||
def do_GET(self):
|
||||
"""Handle GET requests."""
|
||||
try:
|
||||
parsed = urlparse(self.path)
|
||||
path = parsed.path
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
if path == '/scene':
|
||||
self.handle_scene()
|
||||
elif path == '/affordances':
|
||||
self.handle_affordances()
|
||||
elif path == '/screenshot':
|
||||
self.handle_screenshot(query)
|
||||
elif path == '/metadata':
|
||||
self.handle_metadata()
|
||||
elif path == '/wait':
|
||||
self.handle_wait(query)
|
||||
elif path == '/health':
|
||||
self.handle_health()
|
||||
elif path == '/':
|
||||
self.handle_health() # Root path also returns health
|
||||
else:
|
||||
self.send_error_json("Unknown endpoint", 404)
|
||||
except Exception as e:
|
||||
print(f"[API] Error in do_GET: {e}", flush=True)
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
try:
|
||||
self.send_error_json(f"Internal error: {str(e)}", 500)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def do_POST(self):
|
||||
"""Handle POST requests."""
|
||||
try:
|
||||
parsed = urlparse(self.path)
|
||||
path = parsed.path
|
||||
|
||||
if path == '/input':
|
||||
self.handle_input()
|
||||
elif path == '/metadata':
|
||||
self.handle_set_metadata()
|
||||
else:
|
||||
self.send_error_json("Unknown endpoint", 404)
|
||||
except Exception as e:
|
||||
print(f"[API] Error in do_POST: {e}", flush=True)
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
try:
|
||||
self.send_error_json(f"Internal error: {str(e)}", 500)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def handle_health(self) -> None:
|
||||
"""Health check endpoint."""
|
||||
self.send_json({
|
||||
"status": "ok",
|
||||
"version": "0.1.0",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
def handle_scene(self) -> None:
|
||||
"""Return the current scene graph with semantic annotations."""
|
||||
try:
|
||||
# Try to use lock for thread safety, but fall back if not available
|
||||
try:
|
||||
with mcrfpy.lock():
|
||||
scene_data = serialize_scene()
|
||||
except (RuntimeError, AttributeError):
|
||||
# Lock not available (e.g., main thread or headless mode issue)
|
||||
scene_data = serialize_scene()
|
||||
self.send_json(scene_data)
|
||||
except Exception as e:
|
||||
self.send_error_json(f"Scene introspection failed: {str(e)}", 500)
|
||||
|
||||
def handle_affordances(self) -> None:
|
||||
"""Return only interactive elements with their semantic labels."""
|
||||
try:
|
||||
# Try to use lock for thread safety
|
||||
try:
|
||||
with mcrfpy.lock():
|
||||
scene = mcrfpy.current_scene
|
||||
scene_name = scene.name if scene else "unknown"
|
||||
affordances = extract_affordances()
|
||||
keyboard_hints = extract_keyboard_hints()
|
||||
except (RuntimeError, AttributeError):
|
||||
scene = mcrfpy.current_scene
|
||||
scene_name = scene.name if scene else "unknown"
|
||||
affordances = extract_affordances()
|
||||
keyboard_hints = extract_keyboard_hints()
|
||||
|
||||
self.send_json({
|
||||
"scene_name": scene_name,
|
||||
"affordances": affordances,
|
||||
"keyboard_hints": keyboard_hints,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
except Exception as e:
|
||||
self.send_error_json(f"Affordance extraction failed: {str(e)}", 500)
|
||||
|
||||
def handle_screenshot(self, query: Dict) -> None:
|
||||
"""Return a PNG screenshot."""
|
||||
try:
|
||||
# Create temp file for screenshot
|
||||
fd, filepath = tempfile.mkstemp(suffix='.png')
|
||||
os.close(fd)
|
||||
|
||||
# Take screenshot (may or may not need lock depending on mode)
|
||||
try:
|
||||
with mcrfpy.lock():
|
||||
mcrfpy.automation.screenshot(filepath)
|
||||
except (RuntimeError, AttributeError):
|
||||
mcrfpy.automation.screenshot(filepath)
|
||||
|
||||
# Check format preference
|
||||
format_type = query.get('format', ['binary'])[0]
|
||||
|
||||
if format_type == 'base64':
|
||||
import base64
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
b64 = base64.b64encode(data).decode('ascii')
|
||||
os.unlink(filepath)
|
||||
self.send_json({
|
||||
"image": f"data:image/png;base64,{b64}",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
else:
|
||||
self.send_file(filepath, 'image/png')
|
||||
os.unlink(filepath)
|
||||
|
||||
except Exception as e:
|
||||
self.send_error_json(f"Screenshot failed: {str(e)}", 500)
|
||||
|
||||
def handle_metadata(self) -> None:
|
||||
"""Return game metadata."""
|
||||
metadata = get_game_metadata()
|
||||
self.send_json(metadata)
|
||||
|
||||
def handle_set_metadata(self) -> None:
|
||||
"""Update game metadata."""
|
||||
data = self.parse_json_body()
|
||||
if data is None:
|
||||
self.send_error_json("Invalid JSON body")
|
||||
return
|
||||
|
||||
set_game_metadata(data)
|
||||
self.send_json({"success": True})
|
||||
|
||||
def handle_wait(self, query: Dict) -> None:
|
||||
"""Long-poll endpoint that returns when scene state changes."""
|
||||
timeout = float(query.get('timeout', [30])[0])
|
||||
previous_hash = query.get('scene_hash', [None])[0]
|
||||
|
||||
start_time = time.time()
|
||||
poll_interval = 0.1 # 100ms
|
||||
|
||||
current_hash = None
|
||||
scene_name = "unknown"
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
try:
|
||||
with mcrfpy.lock():
|
||||
current_hash = get_scene_hash()
|
||||
scene = mcrfpy.current_scene
|
||||
scene_name = scene.name if scene else "unknown"
|
||||
except (RuntimeError, AttributeError):
|
||||
current_hash = get_scene_hash()
|
||||
scene = mcrfpy.current_scene
|
||||
scene_name = scene.name if scene else "unknown"
|
||||
|
||||
if previous_hash is None:
|
||||
# First call - return current state
|
||||
self.send_json({
|
||||
"changed": False,
|
||||
"hash": current_hash,
|
||||
"scene_name": scene_name
|
||||
})
|
||||
return
|
||||
|
||||
if current_hash != previous_hash:
|
||||
self.send_json({
|
||||
"changed": True,
|
||||
"new_hash": current_hash,
|
||||
"old_hash": previous_hash,
|
||||
"scene_name": scene_name,
|
||||
"reason": "state_change"
|
||||
})
|
||||
return
|
||||
|
||||
time.sleep(poll_interval)
|
||||
|
||||
# Timeout - no change
|
||||
self.send_json({
|
||||
"changed": False,
|
||||
"hash": current_hash,
|
||||
"scene_name": scene_name
|
||||
})
|
||||
|
||||
def handle_input(self) -> None:
|
||||
"""Inject keyboard or mouse input."""
|
||||
data = self.parse_json_body()
|
||||
if data is None:
|
||||
self.send_error_json("Invalid JSON body")
|
||||
return
|
||||
|
||||
if 'action' not in data:
|
||||
self.send_error_json("Missing 'action' field")
|
||||
return
|
||||
|
||||
try:
|
||||
result = execute_action(data)
|
||||
self.send_json(result)
|
||||
except ValueError as e:
|
||||
self.send_error_json(str(e), 400)
|
||||
except Exception as e:
|
||||
self.send_error_json(f"Input action failed: {str(e)}", 500)
|
||||
|
||||
|
||||
class GameAPIServer:
|
||||
"""Background HTTP server for the game API."""
|
||||
|
||||
def __init__(self, port: int = 8765):
|
||||
self.port = port
|
||||
self.server = None
|
||||
self.thread = None
|
||||
self._running = False
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start the server in a background thread."""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
GameAPIHandler.api_server = self
|
||||
|
||||
# Allow socket reuse to avoid "Address already in use" errors
|
||||
import socket
|
||||
class ReuseHTTPServer(HTTPServer):
|
||||
allow_reuse_address = True
|
||||
|
||||
self.server = ReuseHTTPServer(('localhost', self.port), GameAPIHandler)
|
||||
self.server.timeout = 1.0 # Don't block forever on handle_request
|
||||
self._running = True
|
||||
|
||||
self.thread = threading.Thread(
|
||||
target=self._serve_forever,
|
||||
daemon=True,
|
||||
name="GameAPI"
|
||||
)
|
||||
self.thread.start()
|
||||
print(f"[API] Game API running on http://localhost:{self.port}", flush=True)
|
||||
|
||||
def _serve_forever(self) -> None:
|
||||
"""Server loop (runs in background thread)."""
|
||||
try:
|
||||
while self._running:
|
||||
try:
|
||||
self.server.handle_request()
|
||||
except Exception as e:
|
||||
if self._running: # Only log if we're still supposed to be running
|
||||
print(f"[API] Request handling error: {e}", flush=True)
|
||||
except Exception as e:
|
||||
print(f"[API] Server thread error: {e}", flush=True)
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the server."""
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
self._running = False
|
||||
# Send a dummy request to unblock handle_request
|
||||
try:
|
||||
import urllib.request
|
||||
urllib.request.urlopen(f"http://localhost:{self.port}/health", timeout=0.5)
|
||||
except Exception:
|
||||
pass
|
||||
if self.thread:
|
||||
self.thread.join(timeout=2.0)
|
||||
if self.server:
|
||||
self.server.server_close()
|
||||
print("[API] Game API stopped", flush=True)
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
1
tests/api/__init__.py
Normal file
1
tests/api/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
# API test package
|
||||
84
tests/api/run_api_server.py
Normal file
84
tests/api/run_api_server.py
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Simple script to start the McRogueFace Game API server.
|
||||
|
||||
Run with: cd build && ./mcrogueface --exec ../tests/api/run_api_server.py
|
||||
|
||||
Then test with:
|
||||
curl http://localhost:8765/health
|
||||
curl http://localhost:8765/scene
|
||||
curl http://localhost:8765/affordances
|
||||
"""
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, '../src/scripts')
|
||||
|
||||
import mcrfpy
|
||||
|
||||
print("Creating test scene...", flush=True)
|
||||
|
||||
# Create a simple test scene
|
||||
scene = mcrfpy.Scene("test")
|
||||
ui = scene.children
|
||||
|
||||
font = mcrfpy.Font("assets/JetbrainsMono.ttf")
|
||||
|
||||
# Title
|
||||
title = mcrfpy.Caption(text="API Test Scene", pos=(50, 20), font=font, fill_color=(255, 255, 0))
|
||||
title.font_size = 24
|
||||
ui.append(title)
|
||||
|
||||
# A clickable button
|
||||
button = mcrfpy.Frame(pos=(50, 80), size=(200, 50), fill_color=(64, 64, 128))
|
||||
button.name = "test_button"
|
||||
button.on_click = lambda pos, btn, action: print(f"Button clicked: {action}", flush=True)
|
||||
button_text = mcrfpy.Caption(text="Click Me", pos=(50, 10), font=font, fill_color=(255, 255, 255))
|
||||
button.children.append(button_text)
|
||||
ui.append(button)
|
||||
|
||||
# A second button
|
||||
button2 = mcrfpy.Frame(pos=(50, 150), size=(200, 50), fill_color=(64, 128, 64))
|
||||
button2.name = "settings_button"
|
||||
button2.on_click = lambda pos, btn, action: print(f"Settings clicked: {action}", flush=True)
|
||||
button2_text = mcrfpy.Caption(text="Settings", pos=(50, 10), font=font, fill_color=(255, 255, 255))
|
||||
button2.children.append(button2_text)
|
||||
ui.append(button2)
|
||||
|
||||
# Status text
|
||||
status = mcrfpy.Caption(text="API Server running on http://localhost:8765", pos=(50, 230), font=font, fill_color=(128, 255, 128))
|
||||
ui.append(status)
|
||||
|
||||
status2 = mcrfpy.Caption(text="Press ESC to exit", pos=(50, 260), font=font, fill_color=(200, 200, 200))
|
||||
ui.append(status2)
|
||||
|
||||
mcrfpy.current_scene = scene
|
||||
|
||||
print("Starting API server...", flush=True)
|
||||
|
||||
# Start the API server
|
||||
from api import start_server
|
||||
server = start_server(8765)
|
||||
|
||||
print("", flush=True)
|
||||
print("=" * 50, flush=True)
|
||||
print("API Server is ready!", flush=True)
|
||||
print("", flush=True)
|
||||
print("Test endpoints:", flush=True)
|
||||
print(" curl http://localhost:8765/health", flush=True)
|
||||
print(" curl http://localhost:8765/scene", flush=True)
|
||||
print(" curl http://localhost:8765/affordances", flush=True)
|
||||
print(" curl http://localhost:8765/metadata", flush=True)
|
||||
print(" curl http://localhost:8765/screenshot?format=base64 | jq -r .image", flush=True)
|
||||
print("", flush=True)
|
||||
print("Input examples:", flush=True)
|
||||
print(' curl -X POST -H "Content-Type: application/json" -d \'{"action":"key","key":"W"}\' http://localhost:8765/input', flush=True)
|
||||
print(' curl -X POST -H "Content-Type: application/json" -d \'{"action":"click","x":150,"y":100}\' http://localhost:8765/input', flush=True)
|
||||
print("=" * 50, flush=True)
|
||||
print("", flush=True)
|
||||
|
||||
# Key handler
|
||||
def on_key(key, action):
|
||||
if key == mcrfpy.Key.ESCAPE and action == mcrfpy.InputState.PRESSED:
|
||||
print("Exiting...", flush=True)
|
||||
mcrfpy.exit()
|
||||
|
||||
scene.on_key = on_key
|
||||
198
tests/api/test_api_basic.py
Normal file
198
tests/api/test_api_basic.py
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Basic test for the McRogueFace Game API.
|
||||
|
||||
Run with: cd build && ./mcrogueface --headless --exec ../tests/api/test_api_basic.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
import mcrfpy
|
||||
|
||||
# Create a test scene with some UI elements
|
||||
test_scene = mcrfpy.Scene("api_test")
|
||||
ui = test_scene.children
|
||||
|
||||
# Add various interactive elements
|
||||
font = mcrfpy.Font("assets/JetbrainsMono.ttf")
|
||||
|
||||
# A button-like frame with click handler
|
||||
button_frame = mcrfpy.Frame(pos=(50, 50), size=(200, 60), fill_color=(64, 64, 128))
|
||||
button_frame.name = "play_button"
|
||||
|
||||
button_label = mcrfpy.Caption(text="Play Game", pos=(20, 15), font=font, fill_color=(255, 255, 255))
|
||||
button_frame.children.append(button_label)
|
||||
|
||||
click_count = [0]
|
||||
|
||||
def on_button_click(pos, button, action):
|
||||
if str(action) == "PRESSED" or action == mcrfpy.InputState.PRESSED:
|
||||
click_count[0] += 1
|
||||
print(f"Button clicked! Count: {click_count[0]}")
|
||||
|
||||
button_frame.on_click = on_button_click
|
||||
ui.append(button_frame)
|
||||
|
||||
# A second button
|
||||
settings_frame = mcrfpy.Frame(pos=(50, 130), size=(200, 60), fill_color=(64, 128, 64))
|
||||
settings_frame.name = "settings_button"
|
||||
settings_label = mcrfpy.Caption(text="Settings", pos=(20, 15), font=font, fill_color=(255, 255, 255))
|
||||
settings_frame.children.append(settings_label)
|
||||
settings_frame.on_click = lambda pos, btn, action: print("Settings clicked")
|
||||
ui.append(settings_frame)
|
||||
|
||||
# A caption without click (for display)
|
||||
title = mcrfpy.Caption(text="API Test Scene", pos=(50, 10), font=font, fill_color=(255, 255, 0))
|
||||
title.font_size = 24
|
||||
ui.append(title)
|
||||
|
||||
# Activate scene
|
||||
mcrfpy.current_scene = test_scene
|
||||
|
||||
|
||||
def run_api_tests(timer, runtime):
|
||||
"""Run the API tests after scene is set up."""
|
||||
print("\n=== Starting API Tests ===\n")
|
||||
|
||||
base_url = "http://localhost:8765"
|
||||
|
||||
# Test 1: Health check
|
||||
print("Test 1: Health check...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/health")
|
||||
with urllib.request.urlopen(req, timeout=2) as response:
|
||||
data = json.loads(response.read())
|
||||
assert data["status"] == "ok", f"Expected 'ok', got '{data['status']}'"
|
||||
print(f" PASS: Server healthy, version {data.get('version')}")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Test 2: Scene introspection
|
||||
print("\nTest 2: Scene introspection...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/scene")
|
||||
with urllib.request.urlopen(req, timeout=2) as response:
|
||||
data = json.loads(response.read())
|
||||
assert data["scene_name"] == "api_test", f"Expected 'api_test', got '{data['scene_name']}'"
|
||||
assert data["element_count"] == 3, f"Expected 3 elements, got {data['element_count']}"
|
||||
print(f" PASS: Scene '{data['scene_name']}' with {data['element_count']} elements")
|
||||
print(f" Viewport: {data['viewport']}")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Test 3: Affordances
|
||||
print("\nTest 3: Affordance extraction...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/affordances")
|
||||
with urllib.request.urlopen(req, timeout=2) as response:
|
||||
data = json.loads(response.read())
|
||||
affordances = data["affordances"]
|
||||
assert len(affordances) >= 2, f"Expected at least 2 affordances, got {len(affordances)}"
|
||||
|
||||
# Check for our named buttons
|
||||
labels = [a.get("label") for a in affordances]
|
||||
print(f" Found affordances with labels: {labels}")
|
||||
|
||||
# Find play_button by name hint
|
||||
play_affordance = None
|
||||
for a in affordances:
|
||||
if a.get("hint") and "play_button" in a.get("hint", ""):
|
||||
play_affordance = a
|
||||
break
|
||||
if a.get("label") and "Play" in a.get("label", ""):
|
||||
play_affordance = a
|
||||
break
|
||||
|
||||
if play_affordance:
|
||||
print(f" PASS: Found play button affordance, ID={play_affordance['id']}")
|
||||
else:
|
||||
print(f" WARN: Could not find play button by name or label")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Test 4: Metadata
|
||||
print("\nTest 4: Metadata...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/metadata")
|
||||
with urllib.request.urlopen(req, timeout=2) as response:
|
||||
data = json.loads(response.read())
|
||||
assert "current_scene" in data, "Missing current_scene"
|
||||
print(f" PASS: Got metadata, current_scene={data['current_scene']}")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Test 5: Input - click
|
||||
print("\nTest 5: Input click...")
|
||||
try:
|
||||
# Click the play button
|
||||
req = urllib.request.Request(
|
||||
f"{base_url}/input",
|
||||
data=json.dumps({
|
||||
"action": "click",
|
||||
"x": 150, # Center of play button
|
||||
"y": 80
|
||||
}).encode('utf-8'),
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=2) as response:
|
||||
data = json.loads(response.read())
|
||||
assert data["success"], "Click failed"
|
||||
print(f" PASS: Click executed at ({data['x']}, {data['y']})")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Test 6: Input - key
|
||||
print("\nTest 6: Input key press...")
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{base_url}/input",
|
||||
data=json.dumps({
|
||||
"action": "key",
|
||||
"key": "ESCAPE"
|
||||
}).encode('utf-8'),
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=2) as response:
|
||||
data = json.loads(response.read())
|
||||
assert data["success"], "Key press failed"
|
||||
print(f" PASS: Key '{data['key']}' pressed")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Test 7: Wait endpoint (quick check)
|
||||
print("\nTest 7: Wait endpoint...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/wait?timeout=1")
|
||||
with urllib.request.urlopen(req, timeout=3) as response:
|
||||
data = json.loads(response.read())
|
||||
assert "hash" in data, "Missing hash"
|
||||
print(f" PASS: Got scene hash: {data['hash']}")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
print("\n=== All API Tests Passed ===\n")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
# Start the API server
|
||||
print("Starting API server...")
|
||||
import sys
|
||||
sys.path.insert(0, '../src/scripts')
|
||||
from api import start_server
|
||||
server = start_server(8765)
|
||||
|
||||
# Give server time to start
|
||||
time.sleep(0.5)
|
||||
|
||||
# Run tests after a short delay
|
||||
test_timer = mcrfpy.Timer("api_test", run_api_tests, 500)
|
||||
211
tests/api/test_api_windowed.py
Normal file
211
tests/api/test_api_windowed.py
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Test for the McRogueFace Game API in windowed mode.
|
||||
|
||||
Run with: cd build && ./mcrogueface --exec ../tests/api/test_api_windowed.py
|
||||
|
||||
Tests all API endpoints and verifies proper functionality.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
import mcrfpy
|
||||
|
||||
# Force flush on print
|
||||
import functools
|
||||
print = functools.partial(print, flush=True)
|
||||
|
||||
def log(msg):
|
||||
print(msg)
|
||||
|
||||
# Create a test scene with some UI elements
|
||||
test_scene = mcrfpy.Scene("api_test")
|
||||
ui = test_scene.children
|
||||
|
||||
# Add various interactive elements
|
||||
font = mcrfpy.Font("assets/JetbrainsMono.ttf")
|
||||
|
||||
# A button-like frame with click handler
|
||||
button_frame = mcrfpy.Frame(pos=(50, 50), size=(200, 60), fill_color=(64, 64, 128))
|
||||
button_frame.name = "play_button"
|
||||
|
||||
button_label = mcrfpy.Caption(text="Play Game", pos=(20, 15), font=font, fill_color=(255, 255, 255))
|
||||
button_frame.children.append(button_label)
|
||||
|
||||
click_count = [0]
|
||||
|
||||
def on_button_click(pos, button, action):
|
||||
if action == mcrfpy.InputState.PRESSED:
|
||||
click_count[0] += 1
|
||||
print(f"Button clicked! Count: {click_count[0]}")
|
||||
|
||||
button_frame.on_click = on_button_click
|
||||
ui.append(button_frame)
|
||||
|
||||
# A second button
|
||||
settings_frame = mcrfpy.Frame(pos=(50, 130), size=(200, 60), fill_color=(64, 128, 64))
|
||||
settings_frame.name = "settings_button"
|
||||
settings_label = mcrfpy.Caption(text="Settings", pos=(20, 15), font=font, fill_color=(255, 255, 255))
|
||||
settings_frame.children.append(settings_label)
|
||||
settings_frame.on_click = lambda pos, btn, action: print("Settings clicked")
|
||||
ui.append(settings_frame)
|
||||
|
||||
# A caption without click (for display)
|
||||
title = mcrfpy.Caption(text="API Test Scene", pos=(50, 10), font=font, fill_color=(255, 255, 0))
|
||||
title.font_size = 24
|
||||
ui.append(title)
|
||||
|
||||
# Status caption to show test progress
|
||||
status = mcrfpy.Caption(text="Starting API tests...", pos=(50, 220), font=font, fill_color=(255, 255, 255))
|
||||
ui.append(status)
|
||||
|
||||
# Activate scene
|
||||
mcrfpy.current_scene = test_scene
|
||||
|
||||
# Start the API server
|
||||
log("Starting API server...")
|
||||
sys.path.insert(0, '../src/scripts')
|
||||
from api import start_server
|
||||
server = start_server(8765)
|
||||
print("API server started on http://localhost:8765")
|
||||
|
||||
|
||||
def run_api_tests(timer, runtime):
|
||||
"""Run the API tests after scene is set up."""
|
||||
print("\n=== Starting API Tests ===\n")
|
||||
status.text = "Running tests..."
|
||||
|
||||
base_url = "http://localhost:8765"
|
||||
all_passed = True
|
||||
|
||||
# Test 1: Health check
|
||||
print("Test 1: Health check...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/health")
|
||||
with urllib.request.urlopen(req, timeout=5) as response:
|
||||
data = json.loads(response.read())
|
||||
assert data["status"] == "ok", f"Expected 'ok', got '{data['status']}'"
|
||||
print(f" PASS: Server healthy, version {data.get('version')}")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
all_passed = False
|
||||
|
||||
# Test 2: Scene introspection
|
||||
print("\nTest 2: Scene introspection...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/scene")
|
||||
with urllib.request.urlopen(req, timeout=5) as response:
|
||||
data = json.loads(response.read())
|
||||
assert data["scene_name"] == "api_test", f"Expected 'api_test', got '{data['scene_name']}'"
|
||||
print(f" PASS: Scene '{data['scene_name']}' with {data['element_count']} elements")
|
||||
print(f" Viewport: {data['viewport']}")
|
||||
for elem in data.get('elements', []):
|
||||
print(f" - {elem['type']}: name='{elem.get('name', '')}' interactive={elem.get('interactive')}")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
all_passed = False
|
||||
|
||||
# Test 3: Affordances
|
||||
print("\nTest 3: Affordance extraction...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/affordances")
|
||||
with urllib.request.urlopen(req, timeout=5) as response:
|
||||
data = json.loads(response.read())
|
||||
affordances = data["affordances"]
|
||||
print(f" Found {len(affordances)} affordances:")
|
||||
for aff in affordances:
|
||||
print(f" ID={aff['id']} type={aff['type']} label='{aff.get('label')}' actions={aff.get('actions')}")
|
||||
if len(affordances) >= 2:
|
||||
print(f" PASS")
|
||||
else:
|
||||
print(f" WARN: Expected at least 2 affordances")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
all_passed = False
|
||||
|
||||
# Test 4: Metadata
|
||||
print("\nTest 4: Metadata...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/metadata")
|
||||
with urllib.request.urlopen(req, timeout=5) as response:
|
||||
data = json.loads(response.read())
|
||||
print(f" Game: {data.get('game_name')}")
|
||||
print(f" Current scene: {data.get('current_scene')}")
|
||||
print(f" PASS")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
all_passed = False
|
||||
|
||||
# Test 5: Input - click affordance by label
|
||||
print("\nTest 5: Click affordance by label...")
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{base_url}/input",
|
||||
data=json.dumps({
|
||||
"action": "click_affordance",
|
||||
"label": "Play Game" # Matches the button text
|
||||
}).encode('utf-8'),
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as response:
|
||||
data = json.loads(response.read())
|
||||
if data.get("success"):
|
||||
print(f" PASS: Clicked affordance '{data.get('affordance_label')}' at ({data.get('x'):.0f}, {data.get('y'):.0f})")
|
||||
else:
|
||||
print(f" FAIL: {data}")
|
||||
all_passed = False
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
all_passed = False
|
||||
|
||||
# Test 6: Screenshot (base64)
|
||||
print("\nTest 6: Screenshot...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/screenshot?format=base64")
|
||||
with urllib.request.urlopen(req, timeout=5) as response:
|
||||
data = json.loads(response.read())
|
||||
if "image" in data and data["image"].startswith("data:image/png;base64,"):
|
||||
img_size = len(data["image"])
|
||||
print(f" PASS: Got base64 image ({img_size} chars)")
|
||||
else:
|
||||
print(f" FAIL: Invalid image data")
|
||||
all_passed = False
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
all_passed = False
|
||||
|
||||
# Test 7: Wait endpoint (quick check)
|
||||
print("\nTest 7: Wait endpoint...")
|
||||
try:
|
||||
req = urllib.request.Request(f"{base_url}/wait?timeout=1")
|
||||
with urllib.request.urlopen(req, timeout=3) as response:
|
||||
data = json.loads(response.read())
|
||||
print(f" PASS: Scene hash: {data.get('hash')}")
|
||||
except Exception as e:
|
||||
print(f" FAIL: {e}")
|
||||
all_passed = False
|
||||
|
||||
if all_passed:
|
||||
print("\n=== All API Tests Passed ===\n")
|
||||
status.text = "All tests PASSED!"
|
||||
status.fill_color = (0, 255, 0)
|
||||
else:
|
||||
print("\n=== Some Tests Failed ===\n")
|
||||
status.text = "Some tests FAILED"
|
||||
status.fill_color = (255, 0, 0)
|
||||
|
||||
print("Press ESC to exit, or interact with the scene...")
|
||||
print(f"API still running at {base_url}")
|
||||
|
||||
|
||||
# Add key handler to exit
|
||||
def on_key(key, action):
|
||||
if key == mcrfpy.Key.ESCAPE and action == mcrfpy.InputState.PRESSED:
|
||||
mcrfpy.exit()
|
||||
|
||||
test_scene.on_key = on_key
|
||||
|
||||
# Run tests after a short delay to let rendering settle
|
||||
test_timer = mcrfpy.Timer("api_test", run_api_tests, 1000)
|
||||
Loading…
Add table
Add a link
Reference in a new issue