diff --git a/src/scripts/api/README.md b/src/scripts/api/README.md new file mode 100644 index 0000000..ca97e7e --- /dev/null +++ b/src/scripts/api/README.md @@ -0,0 +1,197 @@ +# McRogueFace Game-to-API Bridge + +A general-purpose API layer that exposes any McRogueFace game to external clients (LLMs, accessibility tools, Twitch integrations, testing harnesses). + +## Quick Start + +```python +# In your game script +import sys +sys.path.insert(0, '../src/scripts') +from api import start_server + +# Start the API server +server = start_server(8765) +``` + +The API will be available at `http://localhost:8765`. + +## API Endpoints + +### GET /health +Health check endpoint. + +```bash +curl http://localhost:8765/health +``` + +### GET /scene +Returns the current scene graph with all UI elements. + +```bash +curl http://localhost:8765/scene +``` + +Response includes: +- Scene name +- Viewport dimensions +- All UI elements with type, bounds, visibility, interactivity +- Nested children +- Type-specific properties (text for Caption, grid_size for Grid, etc.) + +### GET /affordances +Returns only interactive elements with semantic labels. + +```bash +curl http://localhost:8765/affordances +``` + +Response includes: +- List of clickable elements with: + - `id`: Unique affordance ID (for click_affordance) + - `label`: Human-readable label (button text or element name) + - `type`: Affordance type (button, text_button, icon_button, interactive_grid) + - `bounds`: Position and size + - `actions`: Available actions (click, hover, grid_cell_click) + - `hint`: Developer label if different from display label +- Default keyboard hints + +### GET /screenshot +Returns a screenshot of the current game state. + +```bash +# Binary PNG +curl http://localhost:8765/screenshot -o screenshot.png + +# Base64 JSON +curl "http://localhost:8765/screenshot?format=base64" +``` + +### GET /metadata +Returns game metadata for LLM context. + +```bash +curl http://localhost:8765/metadata +``` + +### POST /input +Inject keyboard or mouse input. + +```bash +# Click at coordinates +curl -X POST -H "Content-Type: application/json" \ + -d '{"action":"click","x":150,"y":100}' \ + http://localhost:8765/input + +# Click affordance by label (fuzzy match) +curl -X POST -H "Content-Type: application/json" \ + -d '{"action":"click_affordance","label":"Play"}' \ + http://localhost:8765/input + +# Press a key +curl -X POST -H "Content-Type: application/json" \ + -d '{"action":"key","key":"W"}' \ + http://localhost:8765/input + +# Type text +curl -X POST -H "Content-Type: application/json" \ + -d '{"action":"type","text":"Hello"}' \ + http://localhost:8765/input + +# Key combination +curl -X POST -H "Content-Type: application/json" \ + -d '{"action":"hotkey","keys":["ctrl","s"]}' \ + http://localhost:8765/input +``` + +### GET /wait +Long-poll endpoint that returns when scene state changes. + +```bash +curl "http://localhost:8765/wait?timeout=30&scene_hash=abc123" +``` + +## Customizing Game Metadata + +Games can provide rich metadata for external clients: + +```python +from api.metadata import ( + set_game_info, + set_controls, + set_keyboard_hints, + set_custom_hints, +) + +set_game_info( + name="My Game", + version="1.0.0", + description="A puzzle roguelike" +) + +set_controls({ + "movement": "W/A/S/D", + "attack": "Space", +}) + +set_keyboard_hints([ + {"key": "W", "action": "Move up"}, + {"key": "Space", "action": "Attack"}, +]) + +set_custom_hints(""" +Strategy tips: +- Always check corners +- Save potions for boss fights +""") +``` + +## Affordance Detection + +The API automatically detects interactive elements: + +1. **Buttons**: Frame with on_click + Caption child +2. **Icon Buttons**: Frame with on_click + Sprite child +3. **Interactive Grids**: Grid with on_cell_click +4. **Text Buttons**: Caption with on_click + +Labels are extracted from: +1. Caption text inside the element +2. Element's `name` property (developer hint) + +Set meaningful `name` properties on your interactive elements for best results: + +```python +button = mcrfpy.Frame(...) +button.name = "attack_button" # Will show in affordances +``` + +## Use Cases + +- **LLM Co-Play**: AI analyzes game state via /scene and /affordances, sends moves via /input +- **Accessibility**: Screen readers can get semantic labels from /affordances +- **Twitch Plays**: Aggregate chat votes and inject via /input +- **Automated Testing**: Query state, inject inputs, verify outcomes + +## Architecture + +``` +┌─────────────────────────────────┐ +│ McRogueFace Game Loop │ +│ (Main Thread - renders game) │ +└────────────────┬────────────────┘ + │ +┌────────────────┴────────────────┐ +│ API Server (Background Thread)│ +│ - Introspects scene graph │ +│ - Injects input via automation │ +└────────────────┬────────────────┘ + │ HTTP :8765 + ▼ +┌─────────────────────────────────┐ +│ External Clients │ +│ (LLMs, tests, accessibility) │ +└─────────────────────────────────┘ +``` + +The API server runs in a daemon thread and uses `mcrfpy.lock()` for thread-safe access to the scene graph. diff --git a/src/scripts/api/__init__.py b/src/scripts/api/__init__.py new file mode 100644 index 0000000..202c660 --- /dev/null +++ b/src/scripts/api/__init__.py @@ -0,0 +1,55 @@ +"""McRogueFace Game-to-API Bridge + +Exposes any McRogueFace game to external clients (LLMs, accessibility tools, +Twitch integrations, testing harnesses) via a background HTTP server. + +Usage: + # In game script or via --exec + import api + api.start_server(port=8765) + + # Or auto-starts on import if run via --exec +""" + +from .server import GameAPIServer + +__version__ = "0.1.0" +__all__ = ["start_server", "stop_server", "get_server"] + +_server = None + + +def start_server(port: int = 8765) -> GameAPIServer: + """Start the API server on the specified port. + + Args: + port: HTTP port to listen on (default: 8765) + + Returns: + The GameAPIServer instance + """ + global _server + if _server is None: + _server = GameAPIServer(port) + _server.start() + return _server + + +def stop_server() -> None: + """Stop the API server if running.""" + global _server + if _server is not None: + _server.stop() + _server = None + + +def get_server() -> GameAPIServer: + """Get the current server instance, or None if not running.""" + return _server + + +# Auto-start when imported via --exec +# Check if we're being imported as main script +import sys +if __name__ == "__main__" or (hasattr(sys, '_called_from_exec') and sys._called_from_exec): + start_server() diff --git a/src/scripts/api/affordances.py b/src/scripts/api/affordances.py new file mode 100644 index 0000000..146fcec --- /dev/null +++ b/src/scripts/api/affordances.py @@ -0,0 +1,350 @@ +"""Affordance detection for the McRogueFace Game API. + +Analyzes the UI hierarchy to extract semantic meaning and identify +interactive elements with their labels and action types. +""" + +from typing import Dict, Any, List, Optional, Tuple + +import mcrfpy + + +# Global affordance ID counter (reset per extraction) +_affordance_id = 0 + + +def get_element_type(element) -> str: + """Get the type name of a UI element.""" + return type(element).__name__ + + +def get_bounds(element) -> Dict[str, float]: + """Extract bounding box from an element.""" + try: + if hasattr(element, 'get_bounds'): + x, y, w, h = element.get_bounds() + return {"x": x, "y": y, "w": w, "h": h} + elif hasattr(element, 'x') and hasattr(element, 'y'): + x = float(element.x) if element.x is not None else 0 + y = float(element.y) if element.y is not None else 0 + w = float(getattr(element, 'w', 0) or 0) + h = float(getattr(element, 'h', 0) or 0) + return {"x": x, "y": y, "w": w, "h": h} + except Exception: + pass + return {"x": 0, "y": 0, "w": 0, "h": 0} + + +def find_label_in_children(element) -> Optional[str]: + """Search children for Caption text to use as label.""" + if not hasattr(element, 'children'): + return None + + try: + for child in element.children: + child_type = get_element_type(child) + if child_type == "Caption": + text = str(getattr(child, 'text', '')) + if text.strip(): + return text.strip() + # Recurse into child frames + if child_type == "Frame": + label = find_label_in_children(child) + if label: + return label + except Exception: + pass + + return None + + +def find_icon_in_children(element) -> Optional[int]: + """Search children for Sprite to identify icon buttons.""" + if not hasattr(element, 'children'): + return None + + try: + for child in element.children: + child_type = get_element_type(child) + if child_type == "Sprite": + return int(getattr(child, 'sprite_index', 0)) + except Exception: + pass + + return None + + +def find_label(element) -> Optional[str]: + """Find the best label for an interactive element. + + Priority: + 1. Caption child text (most user-visible) + 2. Element name property (developer hint) + 3. Nearby caption text (for icon buttons) + + Returns the most user-friendly label for display/matching. + """ + element_type = get_element_type(element) + + # For Caption elements, use their text directly + if element_type == "Caption": + text = str(getattr(element, 'text', '')) + if text.strip(): + return text.strip() + + # Search children for Caption first (most visible to user) + label = find_label_in_children(element) + if label: + return label + + # Fall back to element name (developer hint) + name = getattr(element, 'name', None) + if name and str(name).strip(): + return str(name).strip() + + # For Sprite buttons, mention it's an icon + if element_type == "Sprite": + sprite_index = int(getattr(element, 'sprite_index', 0)) + return f"Icon button (sprite #{sprite_index})" + + return None + + +def classify_affordance(element) -> str: + """Classify what type of affordance this element represents.""" + element_type = get_element_type(element) + + if element_type == "Grid": + if getattr(element, 'on_cell_click', None): + return "interactive_grid" + return "grid" + + if element_type == "Caption": + if getattr(element, 'on_click', None): + return "text_button" + return "label" + + if element_type == "Sprite": + if getattr(element, 'on_click', None): + return "icon_button" + return "icon" + + if element_type == "Frame": + has_click = getattr(element, 'on_click', None) is not None + + # Frame with click + caption child = button + if has_click: + label = find_label_in_children(element) + icon = find_icon_in_children(element) + if label and icon is not None: + return "button" # Has both text and icon + elif label: + return "text_button" + elif icon is not None: + return "icon_button" + return "clickable_area" + + # Frame without click but with children = container + if hasattr(element, 'children'): + try: + if len(list(element.children)) > 0: + return "container" + except Exception: + pass + + return "panel" + + return "unknown" + + +def get_accepted_actions(element, affordance_type: str) -> List[str]: + """Determine what actions an affordance accepts.""" + actions = [] + + if getattr(element, 'on_click', None): + actions.append("click") + + if getattr(element, 'on_enter', None) or getattr(element, 'on_exit', None): + actions.append("hover") + + if affordance_type == "interactive_grid": + actions.append("grid_cell_click") + if getattr(element, 'on_cell_enter', None): + actions.append("grid_cell_hover") + + return actions if actions else ["none"] + + +def extract_affordance(element, parent_bounds: Optional[Dict] = None) -> Optional[Dict[str, Any]]: + """Extract affordance information from a single element. + + Args: + element: UI element to analyze + parent_bounds: Parent's bounds for relative positioning + + Returns: + Affordance dict or None if not interactive + """ + global _affordance_id + + element_type = get_element_type(element) + has_click = getattr(element, 'on_click', None) is not None + has_cell_click = getattr(element, 'on_cell_click', None) is not None + has_hover = (getattr(element, 'on_enter', None) is not None or + getattr(element, 'on_exit', None) is not None) + + # Skip non-interactive elements (unless they're grids with cell callbacks) + if not (has_click or has_cell_click or has_hover): + return None + + bounds = get_bounds(element) + label = find_label(element) + affordance_type = classify_affordance(element) + actions = get_accepted_actions(element, affordance_type) + + _affordance_id += 1 + + affordance = { + "id": _affordance_id, + "type": affordance_type, + "element_type": element_type, + "label": label, + "bounds": bounds, + "actions": actions, + } + + # Add grid-specific info + if element_type == "Grid": + grid_size = getattr(element, 'grid_size', None) + if grid_size: + try: + grid_w = int(grid_size.x) if hasattr(grid_size, 'x') else int(grid_size[0]) + grid_h = int(grid_size.y) if hasattr(grid_size, 'y') else int(grid_size[1]) + affordance["grid_size"] = {"w": grid_w, "h": grid_h} + except Exception: + pass + + # Count entities + try: + entity_count = len(list(element.entities)) + affordance["entity_count"] = entity_count + except Exception: + pass + + # Add hint if name was used + name = getattr(element, 'name', None) + if name and str(name).strip(): + affordance["hint"] = f"Developer label: {name}" + + return affordance + + +def extract_affordances_recursive(element, affordances: List[Dict], depth: int = 0, max_depth: int = 10) -> None: + """Recursively extract affordances from element and children. + + Args: + element: Current element to process + affordances: List to append affordances to + depth: Current recursion depth + max_depth: Maximum depth to recurse + """ + if depth > max_depth: + return + + # Extract affordance for this element + affordance = extract_affordance(element) + if affordance: + affordances.append(affordance) + + # Process children + if hasattr(element, 'children'): + try: + for child in element.children: + extract_affordances_recursive(child, affordances, depth + 1, max_depth) + except Exception: + pass + + +def extract_affordances() -> List[Dict[str, Any]]: + """Extract all interactive affordances from the current scene. + + Returns: + List of affordance dictionaries + """ + global _affordance_id + _affordance_id = 0 # Reset counter + + scene = mcrfpy.current_scene + affordances = [] + + try: + if scene: + for element in scene.children: + extract_affordances_recursive(element, affordances) + except Exception as e: + pass + + return affordances + + +def extract_keyboard_hints() -> List[Dict[str, str]]: + """Extract keyboard control hints. + + This is a placeholder that returns common roguelike controls. + Games can override this via the metadata endpoint. + + Returns: + List of keyboard hint dictionaries + """ + # Default roguelike controls - games should customize via metadata + return [ + {"key": "W/A/S/D", "action": "Move"}, + {"key": "Arrow keys", "action": "Move (alternative)"}, + {"key": "ESCAPE", "action": "Menu/Cancel"}, + {"key": "SPACE", "action": "Confirm/Interact"}, + {"key": "1-9", "action": "Use inventory item"}, + ] + + +def find_affordance_by_id(affordance_id: int) -> Optional[Dict[str, Any]]: + """Find an affordance by its ID. + + Args: + affordance_id: The ID to search for + + Returns: + The affordance dict or None + """ + affordances = extract_affordances() + for aff in affordances: + if aff.get("id") == affordance_id: + return aff + return None + + +def find_affordance_by_label(label: str, fuzzy: bool = True) -> Optional[Dict[str, Any]]: + """Find an affordance by its label. + + Args: + label: The label to search for + fuzzy: If True, do case-insensitive substring matching + + Returns: + The affordance dict or None + """ + affordances = extract_affordances() + label_lower = label.lower() + + for aff in affordances: + aff_label = aff.get("label") + if aff_label is None: + continue + + if fuzzy: + if label_lower in aff_label.lower(): + return aff + else: + if aff_label == label: + return aff + + return None diff --git a/src/scripts/api/example_integration.py b/src/scripts/api/example_integration.py new file mode 100644 index 0000000..1eeda3d --- /dev/null +++ b/src/scripts/api/example_integration.py @@ -0,0 +1,100 @@ +"""Example: Integrating a McRogueFace game with the API bridge. + +This file shows how a game script can integrate with the API to provide +rich metadata and semantic hints for external clients. +""" + +# Import the API module +import sys +sys.path.insert(0, '../src/scripts') + +from api import start_server +from api.metadata import ( + set_game_info, + set_controls, + set_keyboard_hints, + set_custom_hints, + register_scene, +) + + +def setup_api_for_crypt_of_sokoban(): + """Example setup for the Crypt of Sokoban game.""" + + # Set basic game info + set_game_info( + name="Crypt of Sokoban", + version="0.1.0", + description="A puzzle roguelike combining Sokoban mechanics with dungeon exploration. Push boulders, avoid enemies, and descend deeper into the crypt.", + author="7DRL 2025" + ) + + # Set control descriptions + set_controls({ + "movement": "W/A/S/D keys", + "wait": "Period (.) to skip turn", + "zap": "Z to use equipped item's active ability", + "use_item_1": "Numpad 1 to use first inventory slot", + "use_item_2": "Numpad 2 to use second inventory slot", + "use_item_3": "Numpad 3 to use third inventory slot", + "pull_boulder": "X to pull adjacent boulder toward you", + "debug_descend": "P to descend (debug)", + }) + + # Set keyboard hints for LLM context + set_keyboard_hints([ + {"key": "W", "action": "Move up"}, + {"key": "A", "action": "Move left"}, + {"key": "S", "action": "Move down"}, + {"key": "D", "action": "Move right"}, + {"key": ".", "action": "Wait (skip turn, enemies still move)"}, + {"key": "Z", "action": "Zap - use equipped item's ability"}, + {"key": "X", "action": "Pull boulder toward you"}, + {"key": "1", "action": "Use item in slot 1"}, + {"key": "2", "action": "Use item in slot 2"}, + {"key": "3", "action": "Use item in slot 3"}, + ]) + + # Set custom hints for LLM strategizing + set_custom_hints(""" +Crypt of Sokoban Strategy Guide: + +CORE MECHANICS: +- Push boulders by walking into them (if space behind is clear) +- Pull boulders with X key while standing adjacent +- Boulders block enemy movement - use them as barriers! +- Step on buttons to unlock doors/exits +- Each floor has one exit that leads deeper + +ENEMIES: +- Rats: Basic enemy, moves toward you +- Big Rats: Tougher, 2 damage per hit +- Cyclops: Very dangerous, 3 damage, can push boulders! + +ITEMS: +- Potions: Consumable healing/buffs (use with 1/2/3 keys) +- Weapons: Equip for passive bonuses and active abilities +- Each item has a "zap" ability on cooldown + +STRATEGY TIPS: +- Always look for buttons before moving - stepping on them opens paths +- Trap enemies behind boulders when possible +- Don't get cornered - keep escape routes open +- Use Z ability when enemies cluster together +- Higher floors have better loot but harder enemies +""") + + # Register scenes + register_scene("menu", "Main menu with Play, Settings, and audio controls") + register_scene("play", "Main gameplay scene with dungeon grid, HUD, and sidebar") + + # Start the server + server = start_server(8765) + print("[Game] API bridge configured for Crypt of Sokoban") + + return server + + +# Example usage in game.py: +# from api.example_integration import setup_api_for_crypt_of_sokoban +# api_server = setup_api_for_crypt_of_sokoban() diff --git a/src/scripts/api/input_handler.py b/src/scripts/api/input_handler.py new file mode 100644 index 0000000..445ca5e --- /dev/null +++ b/src/scripts/api/input_handler.py @@ -0,0 +1,373 @@ +"""Input action handling for the McRogueFace Game API. + +Dispatches input actions to the mcrfpy.automation module for +keyboard/mouse injection into the game. +""" + +from typing import Dict, Any, Optional + +import mcrfpy + +from .affordances import find_affordance_by_id, find_affordance_by_label + + +def execute_action(action_data: Dict[str, Any]) -> Dict[str, Any]: + """Execute an input action. + + Args: + action_data: Dictionary describing the action to perform + + Returns: + Result dictionary with success status + + Raises: + ValueError: If action is invalid or missing required fields + """ + action = action_data.get("action") + + if action == "click": + return execute_click(action_data) + elif action == "click_affordance": + return execute_click_affordance(action_data) + elif action == "type": + return execute_type(action_data) + elif action == "key": + return execute_key(action_data) + elif action == "hotkey": + return execute_hotkey(action_data) + elif action == "grid_click": + return execute_grid_click(action_data) + elif action == "move": + return execute_move(action_data) + elif action == "drag": + return execute_drag(action_data) + else: + raise ValueError(f"Unknown action: {action}") + + +def execute_click(data: Dict[str, Any]) -> Dict[str, Any]: + """Execute a mouse click at coordinates. + + Required fields: + x: X coordinate + y: Y coordinate + + Optional fields: + button: 'left', 'right', or 'middle' (default: 'left') + clicks: Number of clicks (default: 1) + """ + x = data.get("x") + y = data.get("y") + + if x is None or y is None: + raise ValueError("click requires 'x' and 'y' coordinates") + + button = data.get("button", "left") + clicks = data.get("clicks", 1) + + # Validate button + if button not in ("left", "right", "middle"): + raise ValueError(f"Invalid button: {button}") + + # Use automation module - click expects pos as tuple + mcrfpy.automation.click(pos=(int(x), int(y)), clicks=clicks, button=button) + + return { + "success": True, + "action": "click", + "x": x, + "y": y, + "button": button, + "clicks": clicks + } + + +def execute_click_affordance(data: Dict[str, Any]) -> Dict[str, Any]: + """Click an affordance by ID or label. + + Required fields (one of): + id: Affordance ID + label: Affordance label (supports fuzzy matching) + + Optional fields: + button: 'left', 'right', or 'middle' (default: 'left') + """ + affordance = None + + if "id" in data: + affordance = find_affordance_by_id(int(data["id"])) + if not affordance: + raise ValueError(f"Affordance with ID {data['id']} not found") + elif "label" in data: + affordance = find_affordance_by_label(data["label"], fuzzy=True) + if not affordance: + raise ValueError(f"Affordance with label '{data['label']}' not found") + else: + raise ValueError("click_affordance requires 'id' or 'label'") + + # Get center of affordance bounds + bounds = affordance["bounds"] + center_x = bounds["x"] + bounds["w"] / 2 + center_y = bounds["y"] + bounds["h"] / 2 + + button = data.get("button", "left") + + mcrfpy.automation.click(pos=(int(center_x), int(center_y)), button=button) + + return { + "success": True, + "action": "click_affordance", + "affordance_id": affordance["id"], + "affordance_label": affordance.get("label"), + "x": center_x, + "y": center_y, + "button": button + } + + +def execute_type(data: Dict[str, Any]) -> Dict[str, Any]: + """Type text into the game. + + Required fields: + text: String to type + + Optional fields: + interval: Delay between keystrokes in seconds (default: 0) + """ + text = data.get("text") + if text is None: + raise ValueError("type requires 'text' field") + + interval = float(data.get("interval", 0)) + + mcrfpy.automation.typewrite(str(text), interval=interval) + + return { + "success": True, + "action": "type", + "text": text, + "length": len(text) + } + + +def execute_key(data: Dict[str, Any]) -> Dict[str, Any]: + """Press a single key. + + Required fields: + key: Key name (e.g., 'ESCAPE', 'SPACE', 'W', 'F1') + """ + key = data.get("key") + if key is None: + raise ValueError("key requires 'key' field") + + # Normalize key name + key = str(key).upper() + + # Map common key names + key_map = { + "ESCAPE": "escape", + "ESC": "escape", + "SPACE": "space", + "ENTER": "return", + "RETURN": "return", + "TAB": "tab", + "BACKSPACE": "backspace", + "DELETE": "delete", + "UP": "up", + "DOWN": "down", + "LEFT": "left", + "RIGHT": "right", + "SHIFT": "shift", + "CTRL": "ctrl", + "CONTROL": "ctrl", + "ALT": "alt", + } + + normalized_key = key_map.get(key, key.lower()) + + # Press and release + mcrfpy.automation.keyDown(normalized_key) + mcrfpy.automation.keyUp(normalized_key) + + return { + "success": True, + "action": "key", + "key": key + } + + +def execute_hotkey(data: Dict[str, Any]) -> Dict[str, Any]: + """Press a key combination. + + Required fields: + keys: List of keys to press together (e.g., ['ctrl', 's']) + """ + keys = data.get("keys") + if keys is None or not isinstance(keys, list): + raise ValueError("hotkey requires 'keys' as a list") + + if len(keys) == 0: + raise ValueError("hotkey requires at least one key") + + # Press all modifier keys down + for key in keys[:-1]: + mcrfpy.automation.keyDown(str(key).lower()) + + # Press and release the final key + final_key = str(keys[-1]).lower() + mcrfpy.automation.keyDown(final_key) + mcrfpy.automation.keyUp(final_key) + + # Release modifier keys in reverse order + for key in reversed(keys[:-1]): + mcrfpy.automation.keyUp(str(key).lower()) + + return { + "success": True, + "action": "hotkey", + "keys": keys + } + + +def execute_grid_click(data: Dict[str, Any]) -> Dict[str, Any]: + """Click a specific cell in a grid. + + Required fields: + cell: Dict with 'x' and 'y' grid coordinates + + Optional fields: + grid_id: Name of the grid (uses first grid if not specified) + button: 'left', 'right', or 'middle' (default: 'left') + """ + cell = data.get("cell") + if cell is None or "x" not in cell or "y" not in cell: + raise ValueError("grid_click requires 'cell' with 'x' and 'y'") + + cell_x = int(cell["x"]) + cell_y = int(cell["y"]) + button = data.get("button", "left") + grid_name = data.get("grid_id") + + # Find the grid + scene = mcrfpy.current_scene + if not scene: + raise ValueError("No active scene") + + target_grid = None + for element in scene.children: + if type(element).__name__ == "Grid": + name = getattr(element, 'name', None) + if grid_name is None or name == grid_name: + target_grid = element + break + + if target_grid is None: + raise ValueError(f"Grid not found" + (f": {grid_name}" if grid_name else "")) + + # Calculate pixel position from cell coordinates + # Get grid bounds and cell size + try: + gx, gy, gw, gh = target_grid.get_bounds() + except Exception: + gx = float(target_grid.x) + gy = float(target_grid.y) + gw = float(target_grid.w) + gh = float(target_grid.h) + + grid_size = target_grid.grid_size + grid_w = int(grid_size.x) if hasattr(grid_size, 'x') else int(grid_size[0]) + grid_h = int(grid_size.y) if hasattr(grid_size, 'y') else int(grid_size[1]) + + zoom = float(getattr(target_grid, 'zoom', 1.0)) + center = getattr(target_grid, 'center', None) + + # Calculate cell size in pixels + # This is approximate - actual rendering may differ based on zoom/center + if grid_w > 0 and grid_h > 0: + cell_pixel_w = gw / grid_w + cell_pixel_h = gh / grid_h + + # Calculate center of target cell + pixel_x = gx + (cell_x + 0.5) * cell_pixel_w + pixel_y = gy + (cell_y + 0.5) * cell_pixel_h + + mcrfpy.automation.click(pos=(int(pixel_x), int(pixel_y)), button=button) + + return { + "success": True, + "action": "grid_click", + "cell": {"x": cell_x, "y": cell_y}, + "pixel": {"x": pixel_x, "y": pixel_y}, + "button": button + } + else: + raise ValueError("Grid has invalid dimensions") + + +def execute_move(data: Dict[str, Any]) -> Dict[str, Any]: + """Move the mouse to coordinates without clicking. + + Required fields: + x: X coordinate + y: Y coordinate + + Optional fields: + duration: Time to move in seconds (default: 0) + """ + x = data.get("x") + y = data.get("y") + + if x is None or y is None: + raise ValueError("move requires 'x' and 'y' coordinates") + + duration = float(data.get("duration", 0)) + + mcrfpy.automation.moveTo(int(x), int(y), duration=duration) + + return { + "success": True, + "action": "move", + "x": x, + "y": y, + "duration": duration + } + + +def execute_drag(data: Dict[str, Any]) -> Dict[str, Any]: + """Drag from current position or specified start to end. + + Required fields: + end_x: End X coordinate + end_y: End Y coordinate + + Optional fields: + start_x: Start X coordinate + start_y: Start Y coordinate + button: 'left', 'right', or 'middle' (default: 'left') + duration: Time to drag in seconds (default: 0) + """ + end_x = data.get("end_x") + end_y = data.get("end_y") + + if end_x is None or end_y is None: + raise ValueError("drag requires 'end_x' and 'end_y' coordinates") + + start_x = data.get("start_x") + start_y = data.get("start_y") + button = data.get("button", "left") + duration = float(data.get("duration", 0)) + + # Move to start if specified + if start_x is not None and start_y is not None: + mcrfpy.automation.moveTo(int(start_x), int(start_y)) + + mcrfpy.automation.dragTo(int(end_x), int(end_y), duration=duration, button=button) + + return { + "success": True, + "action": "drag", + "end_x": end_x, + "end_y": end_y, + "button": button, + "duration": duration + } diff --git a/src/scripts/api/introspection.py b/src/scripts/api/introspection.py new file mode 100644 index 0000000..35fa86e --- /dev/null +++ b/src/scripts/api/introspection.py @@ -0,0 +1,282 @@ +"""Scene graph introspection for the McRogueFace Game API. + +Provides functions to serialize the current scene graph to JSON, +extracting element types, bounds, properties, and children. +""" + +import hashlib +import json +import time +from typing import Dict, Any, List, Optional, Tuple + +import mcrfpy + + +def get_element_type(element) -> str: + """Get the type name of a UI element.""" + type_name = type(element).__name__ + # Handle potential wrapper types + if hasattr(element, '__class__'): + return element.__class__.__name__ + return type_name + + +def get_bounds(element) -> Dict[str, float]: + """Extract bounding box from an element.""" + try: + if hasattr(element, 'get_bounds'): + x, y, w, h = element.get_bounds() + return {"x": x, "y": y, "w": w, "h": h} + elif hasattr(element, 'x') and hasattr(element, 'y'): + x = float(element.x) if element.x is not None else 0 + y = float(element.y) if element.y is not None else 0 + w = float(getattr(element, 'w', 0) or 0) + h = float(getattr(element, 'h', 0) or 0) + return {"x": x, "y": y, "w": w, "h": h} + except Exception: + pass + return {"x": 0, "y": 0, "w": 0, "h": 0} + + +def is_interactive(element) -> bool: + """Check if an element has interactive callbacks.""" + return ( + getattr(element, 'on_click', None) is not None or + getattr(element, 'on_enter', None) is not None or + getattr(element, 'on_exit', None) is not None or + getattr(element, 'on_cell_click', None) is not None + ) + + +def serialize_color(color) -> Optional[Dict[str, int]]: + """Serialize a color object to dict.""" + if color is None: + return None + try: + return { + "r": int(color.r), + "g": int(color.g), + "b": int(color.b), + "a": int(getattr(color, 'a', 255)) + } + except Exception: + return None + + +def serialize_vector(vec) -> Optional[Dict[str, float]]: + """Serialize a Vector object to dict.""" + if vec is None: + return None + try: + return {"x": float(vec.x), "y": float(vec.y)} + except Exception: + return None + + +def serialize_entity(entity) -> Dict[str, Any]: + """Serialize an Entity to dict.""" + data = { + "type": "Entity", + "name": getattr(entity, 'name', None) or "", + "grid_x": float(getattr(entity, 'grid_x', 0)), + "grid_y": float(getattr(entity, 'grid_y', 0)), + "sprite_index": int(getattr(entity, 'sprite_index', 0)), + "visible": bool(getattr(entity, 'visible', True)), + } + return data + + +def serialize_grid(grid) -> Dict[str, Any]: + """Serialize a Grid element with its entities.""" + bounds = get_bounds(grid) + + # Get grid dimensions + grid_size = getattr(grid, 'grid_size', None) + if grid_size: + try: + grid_w = int(grid_size.x) if hasattr(grid_size, 'x') else int(grid_size[0]) + grid_h = int(grid_size.y) if hasattr(grid_size, 'y') else int(grid_size[1]) + except Exception: + grid_w = grid_h = 0 + else: + grid_w = int(getattr(grid, 'grid_w', 0)) + grid_h = int(getattr(grid, 'grid_h', 0)) + + # Get camera info + center = getattr(grid, 'center', None) + zoom = float(getattr(grid, 'zoom', 1.0)) + + # Serialize entities + entities = [] + try: + for entity in grid.entities: + entities.append(serialize_entity(entity)) + except Exception: + pass + + data = { + "type": "Grid", + "name": getattr(grid, 'name', None) or "", + "bounds": bounds, + "visible": bool(getattr(grid, 'visible', True)), + "interactive": is_interactive(grid), + "grid_size": {"w": grid_w, "h": grid_h}, + "zoom": zoom, + "center": serialize_vector(center), + "entity_count": len(entities), + "entities": entities, + "has_cell_click": getattr(grid, 'on_cell_click', None) is not None, + "has_cell_enter": getattr(grid, 'on_cell_enter', None) is not None, + } + + # Add cell size estimate if texture available + texture = getattr(grid, 'texture', None) + if texture: + # Texture dimensions divided by sprite count would give cell size + # but this is an approximation + data["has_texture"] = True + + return data + + +def serialize_element(element, depth: int = 0, max_depth: int = 10) -> Dict[str, Any]: + """Serialize a UI element to a dictionary. + + Args: + element: The UI element to serialize + depth: Current recursion depth + max_depth: Maximum recursion depth for children + + Returns: + Dictionary representation of the element + """ + element_type = get_element_type(element) + bounds = get_bounds(element) + + # Special handling for Grid + if element_type == "Grid": + return serialize_grid(element) + + data = { + "type": element_type, + "name": getattr(element, 'name', None) or "", + "bounds": bounds, + "visible": bool(getattr(element, 'visible', True)), + "interactive": is_interactive(element), + "z_index": int(getattr(element, 'z_index', 0)), + } + + # Add type-specific properties + if element_type == "Caption": + data["text"] = str(getattr(element, 'text', "")) + data["font_size"] = float(getattr(element, 'font_size', 12)) + data["fill_color"] = serialize_color(getattr(element, 'fill_color', None)) + + elif element_type == "Frame": + data["fill_color"] = serialize_color(getattr(element, 'fill_color', None)) + data["outline"] = float(getattr(element, 'outline', 0)) + data["clip_children"] = bool(getattr(element, 'clip_children', False)) + + elif element_type == "Sprite": + data["sprite_index"] = int(getattr(element, 'sprite_index', 0)) + data["scale"] = float(getattr(element, 'scale', 1.0)) + + elif element_type in ("Line", "Circle", "Arc"): + data["color"] = serialize_color(getattr(element, 'color', None)) + if element_type == "Circle": + data["radius"] = float(getattr(element, 'radius', 0)) + elif element_type == "Arc": + data["radius"] = float(getattr(element, 'radius', 0)) + data["start_angle"] = float(getattr(element, 'start_angle', 0)) + data["end_angle"] = float(getattr(element, 'end_angle', 0)) + + # Handle children recursively + if hasattr(element, 'children') and depth < max_depth: + children = [] + try: + for child in element.children: + children.append(serialize_element(child, depth + 1, max_depth)) + except Exception: + pass + data["children"] = children + data["child_count"] = len(children) + + return data + + +def serialize_scene() -> Dict[str, Any]: + """Serialize the entire current scene graph. + + Returns: + Dictionary with scene name, viewport info, and all elements + """ + scene = mcrfpy.current_scene + scene_name = scene.name if scene else "unknown" + + # Get viewport size + try: + width, height = mcrfpy.automation.size() + except Exception: + width, height = 1024, 768 + + # Get scene UI elements from the scene object directly + elements = [] + try: + if scene: + for element in scene.children: + elements.append(serialize_element(element)) + except Exception as e: + pass + + return { + "scene_name": scene_name, + "timestamp": time.time(), + "viewport": {"width": width, "height": height}, + "element_count": len(elements), + "elements": elements + } + + +def get_scene_hash() -> str: + """Compute a hash of the current scene state for change detection. + + This is a lightweight hash that captures: + - Scene name + - Number of top-level elements + - Element names and types + + Returns: + Hex string hash of scene state + """ + scene = mcrfpy.current_scene + scene_name = scene.name if scene else "unknown" + + state_parts = [scene_name] + + try: + if scene: + ui = scene.children + state_parts.append(str(len(ui))) + + for element in ui: + element_type = get_element_type(element) + name = getattr(element, 'name', '') or '' + state_parts.append(f"{element_type}:{name}") + + # For captions, include text (truncated) + if element_type == "Caption": + text = str(getattr(element, 'text', ''))[:50] + state_parts.append(text) + + # For grids, include entity count + if element_type == "Grid": + try: + entity_count = len(list(element.entities)) + state_parts.append(f"entities:{entity_count}") + except Exception: + pass + except Exception: + pass + + state_str = "|".join(state_parts) + return hashlib.md5(state_str.encode()).hexdigest()[:16] diff --git a/src/scripts/api/metadata.py b/src/scripts/api/metadata.py new file mode 100644 index 0000000..f94bd91 --- /dev/null +++ b/src/scripts/api/metadata.py @@ -0,0 +1,169 @@ +"""Game metadata management for the McRogueFace Game API. + +Stores and retrieves game-specific metadata that helps external clients +understand the game's controls, purpose, and current state. +""" + +from typing import Dict, Any, List, Optional +import time + +# Global metadata storage +_metadata: Dict[str, Any] = { + "game_name": "McRogueFace Game", + "version": "0.1.0", + "description": "A game built with McRogueFace engine", + "controls": { + "movement": "W/A/S/D or Arrow keys", + "interact": "Space or Enter", + "cancel": "Escape", + }, + "scenes": [], + "custom_hints": "", + "keyboard_hints": [], +} + + +def get_game_metadata() -> Dict[str, Any]: + """Get the current game metadata. + + Returns: + Dictionary containing game metadata + """ + import mcrfpy + + # Add dynamic data + result = dict(_metadata) + scene = mcrfpy.current_scene + result["current_scene"] = scene.name if scene else "unknown" + result["timestamp"] = time.time() + + # Get viewport size + try: + width, height = mcrfpy.automation.size() + result["viewport"] = {"width": width, "height": height} + except Exception: + result["viewport"] = {"width": 1024, "height": 768} + + return result + + +def set_game_metadata(data: Dict[str, Any]) -> None: + """Update game metadata. + + Games can call this to provide richer information to API clients. + + Args: + data: Dictionary of metadata fields to update + """ + global _metadata + + # Allowed fields to update + allowed_fields = { + "game_name", + "version", + "description", + "controls", + "scenes", + "custom_hints", + "keyboard_hints", + "author", + "url", + "tags", + } + + for key, value in data.items(): + if key in allowed_fields: + _metadata[key] = value + + +def set_game_info( + name: Optional[str] = None, + version: Optional[str] = None, + description: Optional[str] = None, + author: Optional[str] = None, +) -> None: + """Convenience function to set basic game info. + + Args: + name: Game name + version: Game version string + description: Brief description + author: Author name + """ + if name is not None: + _metadata["game_name"] = name + if version is not None: + _metadata["version"] = version + if description is not None: + _metadata["description"] = description + if author is not None: + _metadata["author"] = author + + +def set_controls(controls: Dict[str, str]) -> None: + """Set control descriptions. + + Args: + controls: Dict mapping action names to control descriptions + """ + _metadata["controls"] = controls + + +def set_keyboard_hints(hints: List[Dict[str, str]]) -> None: + """Set keyboard hints for the API. + + Args: + hints: List of dicts with 'key' and 'action' fields + """ + _metadata["keyboard_hints"] = hints + + +def add_keyboard_hint(key: str, action: str) -> None: + """Add a single keyboard hint. + + Args: + key: Key or key combination + action: Description of what it does + """ + if "keyboard_hints" not in _metadata: + _metadata["keyboard_hints"] = [] + + _metadata["keyboard_hints"].append({ + "key": key, + "action": action + }) + + +def set_custom_hints(hints: str) -> None: + """Set custom hints text for LLM context. + + This can be any text that helps an LLM understand + the game better - strategy tips, game rules, etc. + + Args: + hints: Free-form hint text + """ + _metadata["custom_hints"] = hints + + +def register_scene(scene_name: str, description: Optional[str] = None) -> None: + """Register a scene for the API. + + Args: + scene_name: Name of the scene + description: Optional description of what this scene is for + """ + if "scenes" not in _metadata: + _metadata["scenes"] = [] + + scene_info = {"name": scene_name} + if description: + scene_info["description"] = description + + # Update if exists, add if not + for i, s in enumerate(_metadata["scenes"]): + if s.get("name") == scene_name: + _metadata["scenes"][i] = scene_info + return + + _metadata["scenes"].append(scene_info) diff --git a/src/scripts/api/server.py b/src/scripts/api/server.py new file mode 100644 index 0000000..4a9bb51 --- /dev/null +++ b/src/scripts/api/server.py @@ -0,0 +1,371 @@ +"""HTTP server for the McRogueFace Game API. + +Runs in a background daemon thread and provides REST endpoints for +scene introspection, affordance analysis, and input injection. +""" + +import json +import threading +import hashlib +import time +import tempfile +import os +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse, parse_qs +from typing import Dict, Any, Optional + +import mcrfpy + +from .introspection import serialize_scene, get_scene_hash +from .affordances import extract_affordances, extract_keyboard_hints +from .input_handler import execute_action +from .metadata import get_game_metadata, set_game_metadata + + +class GameAPIHandler(BaseHTTPRequestHandler): + """HTTP request handler for the game API.""" + + # Class-level reference to server for metadata access + api_server = None + + def log_message(self, format, *args): + """Override to use custom logging instead of stderr.""" + # Enable for debugging + print(f"[API] {format % args}", flush=True) + + def log_error(self, format, *args): + """Log errors.""" + print(f"[API ERROR] {format % args}", flush=True) + + def send_json(self, data: Dict[str, Any], status: int = 200) -> None: + """Send a JSON response.""" + body = json.dumps(data, indent=2).encode('utf-8') + self.send_response(status) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', len(body)) + self.send_header('Access-Control-Allow-Origin', '*') + self.end_headers() + self.wfile.write(body) + + def send_error_json(self, message: str, status: int = 400) -> None: + """Send a JSON error response.""" + self.send_json({"error": message}, status) + + def send_file(self, filepath: str, content_type: str = 'application/octet-stream') -> None: + """Send a file as response.""" + try: + with open(filepath, 'rb') as f: + data = f.read() + self.send_response(200) + self.send_header('Content-Type', content_type) + self.send_header('Content-Length', len(data)) + self.send_header('Access-Control-Allow-Origin', '*') + self.end_headers() + self.wfile.write(data) + except FileNotFoundError: + self.send_error_json("File not found", 404) + + def parse_json_body(self) -> Optional[Dict[str, Any]]: + """Parse JSON body from request.""" + try: + content_length = int(self.headers.get('Content-Length', 0)) + if content_length == 0: + return {} + body = self.rfile.read(content_length) + return json.loads(body.decode('utf-8')) + except (json.JSONDecodeError, ValueError) as e: + return None + + def do_OPTIONS(self): + """Handle CORS preflight requests.""" + self.send_response(200) + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') + self.send_header('Access-Control-Allow-Headers', 'Content-Type') + self.end_headers() + + def do_GET(self): + """Handle GET requests.""" + try: + parsed = urlparse(self.path) + path = parsed.path + query = parse_qs(parsed.query) + + if path == '/scene': + self.handle_scene() + elif path == '/affordances': + self.handle_affordances() + elif path == '/screenshot': + self.handle_screenshot(query) + elif path == '/metadata': + self.handle_metadata() + elif path == '/wait': + self.handle_wait(query) + elif path == '/health': + self.handle_health() + elif path == '/': + self.handle_health() # Root path also returns health + else: + self.send_error_json("Unknown endpoint", 404) + except Exception as e: + print(f"[API] Error in do_GET: {e}", flush=True) + import traceback + traceback.print_exc() + try: + self.send_error_json(f"Internal error: {str(e)}", 500) + except Exception: + pass + + def do_POST(self): + """Handle POST requests.""" + try: + parsed = urlparse(self.path) + path = parsed.path + + if path == '/input': + self.handle_input() + elif path == '/metadata': + self.handle_set_metadata() + else: + self.send_error_json("Unknown endpoint", 404) + except Exception as e: + print(f"[API] Error in do_POST: {e}", flush=True) + import traceback + traceback.print_exc() + try: + self.send_error_json(f"Internal error: {str(e)}", 500) + except Exception: + pass + + def handle_health(self) -> None: + """Health check endpoint.""" + self.send_json({ + "status": "ok", + "version": "0.1.0", + "timestamp": time.time() + }) + + def handle_scene(self) -> None: + """Return the current scene graph with semantic annotations.""" + try: + # Try to use lock for thread safety, but fall back if not available + try: + with mcrfpy.lock(): + scene_data = serialize_scene() + except (RuntimeError, AttributeError): + # Lock not available (e.g., main thread or headless mode issue) + scene_data = serialize_scene() + self.send_json(scene_data) + except Exception as e: + self.send_error_json(f"Scene introspection failed: {str(e)}", 500) + + def handle_affordances(self) -> None: + """Return only interactive elements with their semantic labels.""" + try: + # Try to use lock for thread safety + try: + with mcrfpy.lock(): + scene = mcrfpy.current_scene + scene_name = scene.name if scene else "unknown" + affordances = extract_affordances() + keyboard_hints = extract_keyboard_hints() + except (RuntimeError, AttributeError): + scene = mcrfpy.current_scene + scene_name = scene.name if scene else "unknown" + affordances = extract_affordances() + keyboard_hints = extract_keyboard_hints() + + self.send_json({ + "scene_name": scene_name, + "affordances": affordances, + "keyboard_hints": keyboard_hints, + "timestamp": time.time() + }) + except Exception as e: + self.send_error_json(f"Affordance extraction failed: {str(e)}", 500) + + def handle_screenshot(self, query: Dict) -> None: + """Return a PNG screenshot.""" + try: + # Create temp file for screenshot + fd, filepath = tempfile.mkstemp(suffix='.png') + os.close(fd) + + # Take screenshot (may or may not need lock depending on mode) + try: + with mcrfpy.lock(): + mcrfpy.automation.screenshot(filepath) + except (RuntimeError, AttributeError): + mcrfpy.automation.screenshot(filepath) + + # Check format preference + format_type = query.get('format', ['binary'])[0] + + if format_type == 'base64': + import base64 + with open(filepath, 'rb') as f: + data = f.read() + b64 = base64.b64encode(data).decode('ascii') + os.unlink(filepath) + self.send_json({ + "image": f"data:image/png;base64,{b64}", + "timestamp": time.time() + }) + else: + self.send_file(filepath, 'image/png') + os.unlink(filepath) + + except Exception as e: + self.send_error_json(f"Screenshot failed: {str(e)}", 500) + + def handle_metadata(self) -> None: + """Return game metadata.""" + metadata = get_game_metadata() + self.send_json(metadata) + + def handle_set_metadata(self) -> None: + """Update game metadata.""" + data = self.parse_json_body() + if data is None: + self.send_error_json("Invalid JSON body") + return + + set_game_metadata(data) + self.send_json({"success": True}) + + def handle_wait(self, query: Dict) -> None: + """Long-poll endpoint that returns when scene state changes.""" + timeout = float(query.get('timeout', [30])[0]) + previous_hash = query.get('scene_hash', [None])[0] + + start_time = time.time() + poll_interval = 0.1 # 100ms + + current_hash = None + scene_name = "unknown" + + while time.time() - start_time < timeout: + try: + with mcrfpy.lock(): + current_hash = get_scene_hash() + scene = mcrfpy.current_scene + scene_name = scene.name if scene else "unknown" + except (RuntimeError, AttributeError): + current_hash = get_scene_hash() + scene = mcrfpy.current_scene + scene_name = scene.name if scene else "unknown" + + if previous_hash is None: + # First call - return current state + self.send_json({ + "changed": False, + "hash": current_hash, + "scene_name": scene_name + }) + return + + if current_hash != previous_hash: + self.send_json({ + "changed": True, + "new_hash": current_hash, + "old_hash": previous_hash, + "scene_name": scene_name, + "reason": "state_change" + }) + return + + time.sleep(poll_interval) + + # Timeout - no change + self.send_json({ + "changed": False, + "hash": current_hash, + "scene_name": scene_name + }) + + def handle_input(self) -> None: + """Inject keyboard or mouse input.""" + data = self.parse_json_body() + if data is None: + self.send_error_json("Invalid JSON body") + return + + if 'action' not in data: + self.send_error_json("Missing 'action' field") + return + + try: + result = execute_action(data) + self.send_json(result) + except ValueError as e: + self.send_error_json(str(e), 400) + except Exception as e: + self.send_error_json(f"Input action failed: {str(e)}", 500) + + +class GameAPIServer: + """Background HTTP server for the game API.""" + + def __init__(self, port: int = 8765): + self.port = port + self.server = None + self.thread = None + self._running = False + + def start(self) -> None: + """Start the server in a background thread.""" + if self._running: + return + + GameAPIHandler.api_server = self + + # Allow socket reuse to avoid "Address already in use" errors + import socket + class ReuseHTTPServer(HTTPServer): + allow_reuse_address = True + + self.server = ReuseHTTPServer(('localhost', self.port), GameAPIHandler) + self.server.timeout = 1.0 # Don't block forever on handle_request + self._running = True + + self.thread = threading.Thread( + target=self._serve_forever, + daemon=True, + name="GameAPI" + ) + self.thread.start() + print(f"[API] Game API running on http://localhost:{self.port}", flush=True) + + def _serve_forever(self) -> None: + """Server loop (runs in background thread).""" + try: + while self._running: + try: + self.server.handle_request() + except Exception as e: + if self._running: # Only log if we're still supposed to be running + print(f"[API] Request handling error: {e}", flush=True) + except Exception as e: + print(f"[API] Server thread error: {e}", flush=True) + + def stop(self) -> None: + """Stop the server.""" + if not self._running: + return + + self._running = False + # Send a dummy request to unblock handle_request + try: + import urllib.request + urllib.request.urlopen(f"http://localhost:{self.port}/health", timeout=0.5) + except Exception: + pass + if self.thread: + self.thread.join(timeout=2.0) + if self.server: + self.server.server_close() + print("[API] Game API stopped", flush=True) + + @property + def is_running(self) -> bool: + return self._running diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 0000000..0e3b6e9 --- /dev/null +++ b/tests/api/__init__.py @@ -0,0 +1 @@ +# API test package diff --git a/tests/api/run_api_server.py b/tests/api/run_api_server.py new file mode 100644 index 0000000..89c8818 --- /dev/null +++ b/tests/api/run_api_server.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +"""Simple script to start the McRogueFace Game API server. + +Run with: cd build && ./mcrogueface --exec ../tests/api/run_api_server.py + +Then test with: + curl http://localhost:8765/health + curl http://localhost:8765/scene + curl http://localhost:8765/affordances +""" + +import sys +sys.path.insert(0, '../src/scripts') + +import mcrfpy + +print("Creating test scene...", flush=True) + +# Create a simple test scene +scene = mcrfpy.Scene("test") +ui = scene.children + +font = mcrfpy.Font("assets/JetbrainsMono.ttf") + +# Title +title = mcrfpy.Caption(text="API Test Scene", pos=(50, 20), font=font, fill_color=(255, 255, 0)) +title.font_size = 24 +ui.append(title) + +# A clickable button +button = mcrfpy.Frame(pos=(50, 80), size=(200, 50), fill_color=(64, 64, 128)) +button.name = "test_button" +button.on_click = lambda pos, btn, action: print(f"Button clicked: {action}", flush=True) +button_text = mcrfpy.Caption(text="Click Me", pos=(50, 10), font=font, fill_color=(255, 255, 255)) +button.children.append(button_text) +ui.append(button) + +# A second button +button2 = mcrfpy.Frame(pos=(50, 150), size=(200, 50), fill_color=(64, 128, 64)) +button2.name = "settings_button" +button2.on_click = lambda pos, btn, action: print(f"Settings clicked: {action}", flush=True) +button2_text = mcrfpy.Caption(text="Settings", pos=(50, 10), font=font, fill_color=(255, 255, 255)) +button2.children.append(button2_text) +ui.append(button2) + +# Status text +status = mcrfpy.Caption(text="API Server running on http://localhost:8765", pos=(50, 230), font=font, fill_color=(128, 255, 128)) +ui.append(status) + +status2 = mcrfpy.Caption(text="Press ESC to exit", pos=(50, 260), font=font, fill_color=(200, 200, 200)) +ui.append(status2) + +mcrfpy.current_scene = scene + +print("Starting API server...", flush=True) + +# Start the API server +from api import start_server +server = start_server(8765) + +print("", flush=True) +print("=" * 50, flush=True) +print("API Server is ready!", flush=True) +print("", flush=True) +print("Test endpoints:", flush=True) +print(" curl http://localhost:8765/health", flush=True) +print(" curl http://localhost:8765/scene", flush=True) +print(" curl http://localhost:8765/affordances", flush=True) +print(" curl http://localhost:8765/metadata", flush=True) +print(" curl http://localhost:8765/screenshot?format=base64 | jq -r .image", flush=True) +print("", flush=True) +print("Input examples:", flush=True) +print(' curl -X POST -H "Content-Type: application/json" -d \'{"action":"key","key":"W"}\' http://localhost:8765/input', flush=True) +print(' curl -X POST -H "Content-Type: application/json" -d \'{"action":"click","x":150,"y":100}\' http://localhost:8765/input', flush=True) +print("=" * 50, flush=True) +print("", flush=True) + +# Key handler +def on_key(key, action): + if key == mcrfpy.Key.ESCAPE and action == mcrfpy.InputState.PRESSED: + print("Exiting...", flush=True) + mcrfpy.exit() + +scene.on_key = on_key diff --git a/tests/api/test_api_basic.py b/tests/api/test_api_basic.py new file mode 100644 index 0000000..1c5cf91 --- /dev/null +++ b/tests/api/test_api_basic.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +"""Basic test for the McRogueFace Game API. + +Run with: cd build && ./mcrogueface --headless --exec ../tests/api/test_api_basic.py +""" + +import sys +import time +import threading +import urllib.request +import json + +import mcrfpy + +# Create a test scene with some UI elements +test_scene = mcrfpy.Scene("api_test") +ui = test_scene.children + +# Add various interactive elements +font = mcrfpy.Font("assets/JetbrainsMono.ttf") + +# A button-like frame with click handler +button_frame = mcrfpy.Frame(pos=(50, 50), size=(200, 60), fill_color=(64, 64, 128)) +button_frame.name = "play_button" + +button_label = mcrfpy.Caption(text="Play Game", pos=(20, 15), font=font, fill_color=(255, 255, 255)) +button_frame.children.append(button_label) + +click_count = [0] + +def on_button_click(pos, button, action): + if str(action) == "PRESSED" or action == mcrfpy.InputState.PRESSED: + click_count[0] += 1 + print(f"Button clicked! Count: {click_count[0]}") + +button_frame.on_click = on_button_click +ui.append(button_frame) + +# A second button +settings_frame = mcrfpy.Frame(pos=(50, 130), size=(200, 60), fill_color=(64, 128, 64)) +settings_frame.name = "settings_button" +settings_label = mcrfpy.Caption(text="Settings", pos=(20, 15), font=font, fill_color=(255, 255, 255)) +settings_frame.children.append(settings_label) +settings_frame.on_click = lambda pos, btn, action: print("Settings clicked") +ui.append(settings_frame) + +# A caption without click (for display) +title = mcrfpy.Caption(text="API Test Scene", pos=(50, 10), font=font, fill_color=(255, 255, 0)) +title.font_size = 24 +ui.append(title) + +# Activate scene +mcrfpy.current_scene = test_scene + + +def run_api_tests(timer, runtime): + """Run the API tests after scene is set up.""" + print("\n=== Starting API Tests ===\n") + + base_url = "http://localhost:8765" + + # Test 1: Health check + print("Test 1: Health check...") + try: + req = urllib.request.Request(f"{base_url}/health") + with urllib.request.urlopen(req, timeout=2) as response: + data = json.loads(response.read()) + assert data["status"] == "ok", f"Expected 'ok', got '{data['status']}'" + print(f" PASS: Server healthy, version {data.get('version')}") + except Exception as e: + print(f" FAIL: {e}") + sys.exit(1) + + # Test 2: Scene introspection + print("\nTest 2: Scene introspection...") + try: + req = urllib.request.Request(f"{base_url}/scene") + with urllib.request.urlopen(req, timeout=2) as response: + data = json.loads(response.read()) + assert data["scene_name"] == "api_test", f"Expected 'api_test', got '{data['scene_name']}'" + assert data["element_count"] == 3, f"Expected 3 elements, got {data['element_count']}" + print(f" PASS: Scene '{data['scene_name']}' with {data['element_count']} elements") + print(f" Viewport: {data['viewport']}") + except Exception as e: + print(f" FAIL: {e}") + sys.exit(1) + + # Test 3: Affordances + print("\nTest 3: Affordance extraction...") + try: + req = urllib.request.Request(f"{base_url}/affordances") + with urllib.request.urlopen(req, timeout=2) as response: + data = json.loads(response.read()) + affordances = data["affordances"] + assert len(affordances) >= 2, f"Expected at least 2 affordances, got {len(affordances)}" + + # Check for our named buttons + labels = [a.get("label") for a in affordances] + print(f" Found affordances with labels: {labels}") + + # Find play_button by name hint + play_affordance = None + for a in affordances: + if a.get("hint") and "play_button" in a.get("hint", ""): + play_affordance = a + break + if a.get("label") and "Play" in a.get("label", ""): + play_affordance = a + break + + if play_affordance: + print(f" PASS: Found play button affordance, ID={play_affordance['id']}") + else: + print(f" WARN: Could not find play button by name or label") + except Exception as e: + print(f" FAIL: {e}") + sys.exit(1) + + # Test 4: Metadata + print("\nTest 4: Metadata...") + try: + req = urllib.request.Request(f"{base_url}/metadata") + with urllib.request.urlopen(req, timeout=2) as response: + data = json.loads(response.read()) + assert "current_scene" in data, "Missing current_scene" + print(f" PASS: Got metadata, current_scene={data['current_scene']}") + except Exception as e: + print(f" FAIL: {e}") + sys.exit(1) + + # Test 5: Input - click + print("\nTest 5: Input click...") + try: + # Click the play button + req = urllib.request.Request( + f"{base_url}/input", + data=json.dumps({ + "action": "click", + "x": 150, # Center of play button + "y": 80 + }).encode('utf-8'), + headers={"Content-Type": "application/json"} + ) + with urllib.request.urlopen(req, timeout=2) as response: + data = json.loads(response.read()) + assert data["success"], "Click failed" + print(f" PASS: Click executed at ({data['x']}, {data['y']})") + except Exception as e: + print(f" FAIL: {e}") + sys.exit(1) + + # Test 6: Input - key + print("\nTest 6: Input key press...") + try: + req = urllib.request.Request( + f"{base_url}/input", + data=json.dumps({ + "action": "key", + "key": "ESCAPE" + }).encode('utf-8'), + headers={"Content-Type": "application/json"} + ) + with urllib.request.urlopen(req, timeout=2) as response: + data = json.loads(response.read()) + assert data["success"], "Key press failed" + print(f" PASS: Key '{data['key']}' pressed") + except Exception as e: + print(f" FAIL: {e}") + sys.exit(1) + + # Test 7: Wait endpoint (quick check) + print("\nTest 7: Wait endpoint...") + try: + req = urllib.request.Request(f"{base_url}/wait?timeout=1") + with urllib.request.urlopen(req, timeout=3) as response: + data = json.loads(response.read()) + assert "hash" in data, "Missing hash" + print(f" PASS: Got scene hash: {data['hash']}") + except Exception as e: + print(f" FAIL: {e}") + sys.exit(1) + + print("\n=== All API Tests Passed ===\n") + sys.exit(0) + + +# Start the API server +print("Starting API server...") +import sys +sys.path.insert(0, '../src/scripts') +from api import start_server +server = start_server(8765) + +# Give server time to start +time.sleep(0.5) + +# Run tests after a short delay +test_timer = mcrfpy.Timer("api_test", run_api_tests, 500) diff --git a/tests/api/test_api_windowed.py b/tests/api/test_api_windowed.py new file mode 100644 index 0000000..0686f75 --- /dev/null +++ b/tests/api/test_api_windowed.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +"""Test for the McRogueFace Game API in windowed mode. + +Run with: cd build && ./mcrogueface --exec ../tests/api/test_api_windowed.py + +Tests all API endpoints and verifies proper functionality. +""" + +import sys +import time +import urllib.request +import json + +import mcrfpy + +# Force flush on print +import functools +print = functools.partial(print, flush=True) + +def log(msg): + print(msg) + +# Create a test scene with some UI elements +test_scene = mcrfpy.Scene("api_test") +ui = test_scene.children + +# Add various interactive elements +font = mcrfpy.Font("assets/JetbrainsMono.ttf") + +# A button-like frame with click handler +button_frame = mcrfpy.Frame(pos=(50, 50), size=(200, 60), fill_color=(64, 64, 128)) +button_frame.name = "play_button" + +button_label = mcrfpy.Caption(text="Play Game", pos=(20, 15), font=font, fill_color=(255, 255, 255)) +button_frame.children.append(button_label) + +click_count = [0] + +def on_button_click(pos, button, action): + if action == mcrfpy.InputState.PRESSED: + click_count[0] += 1 + print(f"Button clicked! Count: {click_count[0]}") + +button_frame.on_click = on_button_click +ui.append(button_frame) + +# A second button +settings_frame = mcrfpy.Frame(pos=(50, 130), size=(200, 60), fill_color=(64, 128, 64)) +settings_frame.name = "settings_button" +settings_label = mcrfpy.Caption(text="Settings", pos=(20, 15), font=font, fill_color=(255, 255, 255)) +settings_frame.children.append(settings_label) +settings_frame.on_click = lambda pos, btn, action: print("Settings clicked") +ui.append(settings_frame) + +# A caption without click (for display) +title = mcrfpy.Caption(text="API Test Scene", pos=(50, 10), font=font, fill_color=(255, 255, 0)) +title.font_size = 24 +ui.append(title) + +# Status caption to show test progress +status = mcrfpy.Caption(text="Starting API tests...", pos=(50, 220), font=font, fill_color=(255, 255, 255)) +ui.append(status) + +# Activate scene +mcrfpy.current_scene = test_scene + +# Start the API server +log("Starting API server...") +sys.path.insert(0, '../src/scripts') +from api import start_server +server = start_server(8765) +print("API server started on http://localhost:8765") + + +def run_api_tests(timer, runtime): + """Run the API tests after scene is set up.""" + print("\n=== Starting API Tests ===\n") + status.text = "Running tests..." + + base_url = "http://localhost:8765" + all_passed = True + + # Test 1: Health check + print("Test 1: Health check...") + try: + req = urllib.request.Request(f"{base_url}/health") + with urllib.request.urlopen(req, timeout=5) as response: + data = json.loads(response.read()) + assert data["status"] == "ok", f"Expected 'ok', got '{data['status']}'" + print(f" PASS: Server healthy, version {data.get('version')}") + except Exception as e: + print(f" FAIL: {e}") + all_passed = False + + # Test 2: Scene introspection + print("\nTest 2: Scene introspection...") + try: + req = urllib.request.Request(f"{base_url}/scene") + with urllib.request.urlopen(req, timeout=5) as response: + data = json.loads(response.read()) + assert data["scene_name"] == "api_test", f"Expected 'api_test', got '{data['scene_name']}'" + print(f" PASS: Scene '{data['scene_name']}' with {data['element_count']} elements") + print(f" Viewport: {data['viewport']}") + for elem in data.get('elements', []): + print(f" - {elem['type']}: name='{elem.get('name', '')}' interactive={elem.get('interactive')}") + except Exception as e: + print(f" FAIL: {e}") + all_passed = False + + # Test 3: Affordances + print("\nTest 3: Affordance extraction...") + try: + req = urllib.request.Request(f"{base_url}/affordances") + with urllib.request.urlopen(req, timeout=5) as response: + data = json.loads(response.read()) + affordances = data["affordances"] + print(f" Found {len(affordances)} affordances:") + for aff in affordances: + print(f" ID={aff['id']} type={aff['type']} label='{aff.get('label')}' actions={aff.get('actions')}") + if len(affordances) >= 2: + print(f" PASS") + else: + print(f" WARN: Expected at least 2 affordances") + except Exception as e: + print(f" FAIL: {e}") + all_passed = False + + # Test 4: Metadata + print("\nTest 4: Metadata...") + try: + req = urllib.request.Request(f"{base_url}/metadata") + with urllib.request.urlopen(req, timeout=5) as response: + data = json.loads(response.read()) + print(f" Game: {data.get('game_name')}") + print(f" Current scene: {data.get('current_scene')}") + print(f" PASS") + except Exception as e: + print(f" FAIL: {e}") + all_passed = False + + # Test 5: Input - click affordance by label + print("\nTest 5: Click affordance by label...") + try: + req = urllib.request.Request( + f"{base_url}/input", + data=json.dumps({ + "action": "click_affordance", + "label": "Play Game" # Matches the button text + }).encode('utf-8'), + headers={"Content-Type": "application/json"} + ) + with urllib.request.urlopen(req, timeout=5) as response: + data = json.loads(response.read()) + if data.get("success"): + print(f" PASS: Clicked affordance '{data.get('affordance_label')}' at ({data.get('x'):.0f}, {data.get('y'):.0f})") + else: + print(f" FAIL: {data}") + all_passed = False + except Exception as e: + print(f" FAIL: {e}") + all_passed = False + + # Test 6: Screenshot (base64) + print("\nTest 6: Screenshot...") + try: + req = urllib.request.Request(f"{base_url}/screenshot?format=base64") + with urllib.request.urlopen(req, timeout=5) as response: + data = json.loads(response.read()) + if "image" in data and data["image"].startswith("data:image/png;base64,"): + img_size = len(data["image"]) + print(f" PASS: Got base64 image ({img_size} chars)") + else: + print(f" FAIL: Invalid image data") + all_passed = False + except Exception as e: + print(f" FAIL: {e}") + all_passed = False + + # Test 7: Wait endpoint (quick check) + print("\nTest 7: Wait endpoint...") + try: + req = urllib.request.Request(f"{base_url}/wait?timeout=1") + with urllib.request.urlopen(req, timeout=3) as response: + data = json.loads(response.read()) + print(f" PASS: Scene hash: {data.get('hash')}") + except Exception as e: + print(f" FAIL: {e}") + all_passed = False + + if all_passed: + print("\n=== All API Tests Passed ===\n") + status.text = "All tests PASSED!" + status.fill_color = (0, 255, 0) + else: + print("\n=== Some Tests Failed ===\n") + status.text = "Some tests FAILED" + status.fill_color = (255, 0, 0) + + print("Press ESC to exit, or interact with the scene...") + print(f"API still running at {base_url}") + + +# Add key handler to exit +def on_key(key, action): + if key == mcrfpy.Key.ESCAPE and action == mcrfpy.InputState.PRESSED: + mcrfpy.exit() + +test_scene.on_key = on_key + +# Run tests after a short delay to let rendering settle +test_timer = mcrfpy.Timer("api_test", run_api_tests, 1000)