731 lines
26 KiB
Python
731 lines
26 KiB
Python
|
|
"""
|
||
|
|
Enhanced Action Executor
|
||
|
|
========================
|
||
|
|
|
||
|
|
Extends ActionExecutor with:
|
||
|
|
- LOOK action with detailed descriptions
|
||
|
|
- SPEAK/ANNOUNCE execution with range checking
|
||
|
|
- Multi-tile path planning
|
||
|
|
- Free action vs turn-ending action handling
|
||
|
|
"""
|
||
|
|
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from typing import Optional, List, Tuple, Dict, Any, Set
|
||
|
|
from action_parser import Action, ActionType
|
||
|
|
from action_executor import ActionResult
|
||
|
|
from action_economy import (
|
||
|
|
TurnState, PathState, TurnCost, get_action_cost,
|
||
|
|
manhattan_distance, get_direction_name
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class TakeResult:
|
||
|
|
"""Result of a TAKE action."""
|
||
|
|
success: bool
|
||
|
|
message: str
|
||
|
|
item_name: str
|
||
|
|
item_position: Optional[Tuple[int, int]] = None
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class LookResult:
|
||
|
|
"""Result of a LOOK action."""
|
||
|
|
success: bool
|
||
|
|
description: str
|
||
|
|
target_name: str
|
||
|
|
target_position: Optional[Tuple[int, int]] = None
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SpeechResult:
|
||
|
|
"""Result of a SPEAK/ANNOUNCE action."""
|
||
|
|
success: bool
|
||
|
|
message: str
|
||
|
|
recipients: List[str] # Names of agents who received the message
|
||
|
|
speech_type: str # "announce" or "speak"
|
||
|
|
content: str # What was said
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Message:
|
||
|
|
"""A message received by an agent."""
|
||
|
|
sender: str
|
||
|
|
content: str
|
||
|
|
speech_type: str # "announce" or "speak"
|
||
|
|
turn: int
|
||
|
|
distance: Optional[int] = None # For SPEAK, how far away sender was
|
||
|
|
|
||
|
|
|
||
|
|
class EnhancedExecutor:
|
||
|
|
"""
|
||
|
|
Enhanced action executor with LOOK, SPEAK, and multi-tile support.
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Direction vectors for movement
|
||
|
|
DIRECTION_VECTORS = {
|
||
|
|
'NORTH': (0, -1),
|
||
|
|
'SOUTH': (0, 1),
|
||
|
|
'EAST': (1, 0),
|
||
|
|
'WEST': (-1, 0),
|
||
|
|
}
|
||
|
|
|
||
|
|
# SPEAK range (Manhattan distance)
|
||
|
|
SPEAK_RANGE = 4
|
||
|
|
|
||
|
|
def __init__(self, grid, world_graph=None):
|
||
|
|
"""
|
||
|
|
Initialize executor.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
grid: mcrfpy.Grid instance
|
||
|
|
world_graph: Optional WorldGraph for detailed descriptions
|
||
|
|
"""
|
||
|
|
self.grid = grid
|
||
|
|
self.world = world_graph
|
||
|
|
|
||
|
|
# Agent path states (agent_name -> PathState)
|
||
|
|
self.path_states: Dict[str, PathState] = {}
|
||
|
|
|
||
|
|
# Speech channel for message delivery
|
||
|
|
self.pending_messages: Dict[str, List[Message]] = {} # agent_name -> messages
|
||
|
|
|
||
|
|
def get_path_state(self, agent_name: str) -> PathState:
|
||
|
|
"""Get or create path state for an agent."""
|
||
|
|
if agent_name not in self.path_states:
|
||
|
|
self.path_states[agent_name] = PathState()
|
||
|
|
return self.path_states[agent_name]
|
||
|
|
|
||
|
|
def get_pending_messages(self, agent_name: str) -> List[Message]:
|
||
|
|
"""Get and clear pending messages for an agent."""
|
||
|
|
messages = self.pending_messages.get(agent_name, [])
|
||
|
|
self.pending_messages[agent_name] = []
|
||
|
|
return messages
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# LOOK Action
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
def execute_look(self, agent, action: Action) -> LookResult:
|
||
|
|
"""
|
||
|
|
Execute LOOK action - examine a tile or entity.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
agent: Agent performing the look
|
||
|
|
action: Parsed LOOK action with optional target
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
LookResult with detailed description
|
||
|
|
"""
|
||
|
|
target = action.args[0] if action.args and action.args[0] else None
|
||
|
|
|
||
|
|
if target is None:
|
||
|
|
# General look around
|
||
|
|
return self._look_around(agent)
|
||
|
|
else:
|
||
|
|
# Look at specific target
|
||
|
|
return self._look_at_target(agent, target.upper())
|
||
|
|
|
||
|
|
def _look_around(self, agent) -> LookResult:
|
||
|
|
"""Describe the general surroundings."""
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
|
||
|
|
descriptions = []
|
||
|
|
|
||
|
|
# Describe current room
|
||
|
|
if self.world:
|
||
|
|
room = self.world.room_at(ax, ay)
|
||
|
|
if room:
|
||
|
|
descriptions.append(f"You are in {room.display_name}.")
|
||
|
|
if room.description_template and room.properties:
|
||
|
|
try:
|
||
|
|
desc = room.description_template.format(**room.properties)
|
||
|
|
descriptions.append(desc)
|
||
|
|
except KeyError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Count visible entities
|
||
|
|
visible_count = 0
|
||
|
|
for entity in self.grid.entities:
|
||
|
|
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||
|
|
if (ex, ey) != (ax, ay) and self.grid.is_in_fov(ex, ey):
|
||
|
|
visible_count += 1
|
||
|
|
|
||
|
|
if visible_count > 0:
|
||
|
|
descriptions.append(f"You can see {visible_count} other creature(s) nearby.")
|
||
|
|
|
||
|
|
# Describe nearby walls/openings
|
||
|
|
wall_dirs = []
|
||
|
|
open_dirs = []
|
||
|
|
for direction, (dx, dy) in self.DIRECTION_VECTORS.items():
|
||
|
|
nx, ny = ax + dx, ay + dy
|
||
|
|
if 0 <= nx < self.grid.grid_size[0] and 0 <= ny < self.grid.grid_size[1]:
|
||
|
|
cell = self.grid.at(nx, ny)
|
||
|
|
if cell.walkable:
|
||
|
|
open_dirs.append(direction.lower())
|
||
|
|
else:
|
||
|
|
wall_dirs.append(direction.lower())
|
||
|
|
|
||
|
|
if open_dirs:
|
||
|
|
descriptions.append(f"Open passages: {', '.join(open_dirs)}.")
|
||
|
|
if wall_dirs:
|
||
|
|
descriptions.append(f"Walls to the: {', '.join(wall_dirs)}.")
|
||
|
|
|
||
|
|
return LookResult(
|
||
|
|
success=True,
|
||
|
|
description=" ".join(descriptions),
|
||
|
|
target_name="surroundings"
|
||
|
|
)
|
||
|
|
|
||
|
|
def _look_at_target(self, agent, target: str) -> LookResult:
|
||
|
|
"""Look at a specific target (direction, entity, or object name)."""
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
|
||
|
|
# Check if target is a direction
|
||
|
|
if target in self.DIRECTION_VECTORS:
|
||
|
|
return self._look_in_direction(agent, target)
|
||
|
|
|
||
|
|
# Check if target matches an entity
|
||
|
|
for entity in self.grid.entities:
|
||
|
|
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||
|
|
if (ex, ey) == (ax, ay):
|
||
|
|
continue
|
||
|
|
|
||
|
|
entity_name = getattr(entity, 'name', '').upper()
|
||
|
|
if target in entity_name or entity_name in target:
|
||
|
|
if self.grid.is_in_fov(ex, ey):
|
||
|
|
return self._describe_entity(agent, entity)
|
||
|
|
else:
|
||
|
|
return LookResult(
|
||
|
|
success=False,
|
||
|
|
description=f"You cannot see {target.lower()} from here.",
|
||
|
|
target_name=target.lower()
|
||
|
|
)
|
||
|
|
|
||
|
|
# Check WorldGraph objects
|
||
|
|
if self.world:
|
||
|
|
room = self.world.room_at(ax, ay)
|
||
|
|
if room:
|
||
|
|
for obj in self.world.get_objects_in_room(room.name):
|
||
|
|
if target in obj.name.upper() or obj.name.upper() in target:
|
||
|
|
ox, oy = obj.position
|
||
|
|
if self.grid.is_in_fov(ox, oy):
|
||
|
|
return self._describe_object(agent, obj)
|
||
|
|
|
||
|
|
# Check doors
|
||
|
|
for door in self.world.get_exits(room.name):
|
||
|
|
if "DOOR" in target:
|
||
|
|
dx, dy = door.position
|
||
|
|
if self.grid.is_in_fov(dx, dy):
|
||
|
|
return self._describe_door(agent, door)
|
||
|
|
|
||
|
|
return LookResult(
|
||
|
|
success=False,
|
||
|
|
description=f"You don't see anything called '{target.lower()}' nearby.",
|
||
|
|
target_name=target.lower()
|
||
|
|
)
|
||
|
|
|
||
|
|
def _look_in_direction(self, agent, direction: str) -> LookResult:
|
||
|
|
"""Look in a cardinal direction."""
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
dx, dy = self.DIRECTION_VECTORS[direction]
|
||
|
|
|
||
|
|
descriptions = []
|
||
|
|
|
||
|
|
# Scan tiles in that direction
|
||
|
|
for distance in range(1, 10):
|
||
|
|
tx, ty = ax + dx * distance, ay + dy * distance
|
||
|
|
|
||
|
|
if not (0 <= tx < self.grid.grid_size[0] and 0 <= ty < self.grid.grid_size[1]):
|
||
|
|
descriptions.append(f"The edge of the known world lies {direction.lower()}.")
|
||
|
|
break
|
||
|
|
|
||
|
|
if not self.grid.is_in_fov(tx, ty):
|
||
|
|
descriptions.append(f"Darkness obscures your vision beyond {distance} tiles.")
|
||
|
|
break
|
||
|
|
|
||
|
|
cell = self.grid.at(tx, ty)
|
||
|
|
|
||
|
|
# Check for entity at this tile
|
||
|
|
for entity in self.grid.entities:
|
||
|
|
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||
|
|
if (ex, ey) == (tx, ty):
|
||
|
|
entity_name = getattr(entity, 'name', 'creature')
|
||
|
|
descriptions.append(f"A {entity_name} stands {distance} tile(s) to the {direction.lower()}.")
|
||
|
|
|
||
|
|
# Check for wall
|
||
|
|
if not cell.walkable:
|
||
|
|
# Check if it's a door
|
||
|
|
if self.world:
|
||
|
|
room = self.world.room_at(ax, ay)
|
||
|
|
if room:
|
||
|
|
for door in self.world.get_exits(room.name):
|
||
|
|
if door.position == (tx, ty):
|
||
|
|
dest = self.world.rooms.get(
|
||
|
|
door.room_b if door.room_a == room.name else door.room_a
|
||
|
|
)
|
||
|
|
dest_name = dest.display_name if dest else "another area"
|
||
|
|
lock_str = " It is locked." if door.locked else ""
|
||
|
|
descriptions.append(
|
||
|
|
f"A door to {dest_name} lies {distance} tile(s) {direction.lower()}.{lock_str}"
|
||
|
|
)
|
||
|
|
break
|
||
|
|
else:
|
||
|
|
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
|
||
|
|
else:
|
||
|
|
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
|
||
|
|
else:
|
||
|
|
descriptions.append(f"A wall blocks passage {distance} tile(s) to the {direction.lower()}.")
|
||
|
|
break
|
||
|
|
|
||
|
|
if not descriptions:
|
||
|
|
descriptions.append(f"Open floor extends to the {direction.lower()}.")
|
||
|
|
|
||
|
|
return LookResult(
|
||
|
|
success=True,
|
||
|
|
description=" ".join(descriptions),
|
||
|
|
target_name=direction.lower(),
|
||
|
|
target_position=None
|
||
|
|
)
|
||
|
|
|
||
|
|
def _describe_entity(self, agent, entity) -> LookResult:
|
||
|
|
"""Generate detailed description of an entity."""
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||
|
|
|
||
|
|
entity_name = getattr(entity, 'name', 'creature')
|
||
|
|
direction = get_direction_name((ax, ay), (ex, ey))
|
||
|
|
distance = manhattan_distance((ax, ay), (ex, ey))
|
||
|
|
|
||
|
|
descriptions = [
|
||
|
|
f"You examine the {entity_name} carefully.",
|
||
|
|
f"It stands {distance} tile(s) to the {direction}."
|
||
|
|
]
|
||
|
|
|
||
|
|
# Add any entity-specific description
|
||
|
|
if hasattr(entity, 'description'):
|
||
|
|
descriptions.append(entity.description)
|
||
|
|
|
||
|
|
# Add behavior hints if available
|
||
|
|
if hasattr(entity, 'behavior'):
|
||
|
|
descriptions.append(f"It appears to be {entity.behavior}.")
|
||
|
|
|
||
|
|
return LookResult(
|
||
|
|
success=True,
|
||
|
|
description=" ".join(descriptions),
|
||
|
|
target_name=entity_name,
|
||
|
|
target_position=(ex, ey)
|
||
|
|
)
|
||
|
|
|
||
|
|
def _describe_object(self, agent, obj) -> LookResult:
|
||
|
|
"""Generate detailed description of a WorldGraph object."""
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
ox, oy = obj.position
|
||
|
|
|
||
|
|
direction = get_direction_name((ax, ay), (ox, oy))
|
||
|
|
distance = manhattan_distance((ax, ay), (ox, oy))
|
||
|
|
|
||
|
|
descriptions = [
|
||
|
|
f"You examine {obj.display_name}.",
|
||
|
|
f"It is {distance} tile(s) to the {direction}."
|
||
|
|
]
|
||
|
|
|
||
|
|
if obj.description:
|
||
|
|
descriptions.append(obj.description)
|
||
|
|
|
||
|
|
# Describe affordances
|
||
|
|
if "takeable" in obj.affordances:
|
||
|
|
descriptions.append("It looks small enough to pick up.")
|
||
|
|
if "pressable" in obj.affordances:
|
||
|
|
descriptions.append("It appears to be some kind of mechanism.")
|
||
|
|
if "openable" in obj.affordances:
|
||
|
|
descriptions.append("It can be opened.")
|
||
|
|
if "readable" in obj.affordances:
|
||
|
|
descriptions.append("There is writing on it.")
|
||
|
|
|
||
|
|
return LookResult(
|
||
|
|
success=True,
|
||
|
|
description=" ".join(descriptions),
|
||
|
|
target_name=obj.name,
|
||
|
|
target_position=(ox, oy)
|
||
|
|
)
|
||
|
|
|
||
|
|
def _describe_door(self, agent, door) -> LookResult:
|
||
|
|
"""Generate detailed description of a door."""
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
dx, dy = door.position
|
||
|
|
|
||
|
|
direction = get_direction_name((ax, ay), (dx, dy))
|
||
|
|
distance = manhattan_distance((ax, ay), (dx, dy))
|
||
|
|
|
||
|
|
# Get destination
|
||
|
|
if self.world:
|
||
|
|
current_room = self.world.room_at(ax, ay)
|
||
|
|
if current_room:
|
||
|
|
if door.room_a == current_room.name:
|
||
|
|
dest = self.world.rooms.get(door.room_b)
|
||
|
|
else:
|
||
|
|
dest = self.world.rooms.get(door.room_a)
|
||
|
|
dest_name = dest.display_name if dest else "another area"
|
||
|
|
else:
|
||
|
|
dest_name = "another area"
|
||
|
|
else:
|
||
|
|
dest_name = "another area"
|
||
|
|
|
||
|
|
descriptions = [
|
||
|
|
f"You examine the doorway to the {direction}.",
|
||
|
|
f"It leads to {dest_name}, {distance} tile(s) away."
|
||
|
|
]
|
||
|
|
|
||
|
|
if door.locked:
|
||
|
|
descriptions.append("The door is locked. You'll need a key or mechanism to open it.")
|
||
|
|
else:
|
||
|
|
descriptions.append("The passage is open.")
|
||
|
|
|
||
|
|
return LookResult(
|
||
|
|
success=True,
|
||
|
|
description=" ".join(descriptions),
|
||
|
|
target_name="door",
|
||
|
|
target_position=(dx, dy)
|
||
|
|
)
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# SPEAK/ANNOUNCE Actions
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
def execute_speech(self, agent, action: Action, all_agents: list,
|
||
|
|
turn_number: int) -> SpeechResult:
|
||
|
|
"""
|
||
|
|
Execute SPEAK or ANNOUNCE action.
|
||
|
|
|
||
|
|
ANNOUNCE: All agents in the same room hear the message
|
||
|
|
SPEAK: Only agents within SPEAK_RANGE tiles hear the message
|
||
|
|
"""
|
||
|
|
message_content = action.args[0] if action.args else ""
|
||
|
|
|
||
|
|
if not message_content:
|
||
|
|
return SpeechResult(
|
||
|
|
success=False,
|
||
|
|
message="Nothing to say.",
|
||
|
|
recipients=[],
|
||
|
|
speech_type=action.type.value.lower(),
|
||
|
|
content=""
|
||
|
|
)
|
||
|
|
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
recipients = []
|
||
|
|
|
||
|
|
if action.type == ActionType.ANNOUNCE:
|
||
|
|
# Room-wide broadcast
|
||
|
|
recipients = self._get_agents_in_room(agent, all_agents)
|
||
|
|
speech_type = "announce"
|
||
|
|
else:
|
||
|
|
# Proximity-based speech
|
||
|
|
recipients = self._get_agents_in_range(agent, all_agents, self.SPEAK_RANGE)
|
||
|
|
speech_type = "speak"
|
||
|
|
|
||
|
|
# Deliver messages
|
||
|
|
for recipient in recipients:
|
||
|
|
if recipient.name not in self.pending_messages:
|
||
|
|
self.pending_messages[recipient.name] = []
|
||
|
|
|
||
|
|
distance = manhattan_distance(
|
||
|
|
(ax, ay),
|
||
|
|
(int(recipient.entity.pos[0]), int(recipient.entity.pos[1]))
|
||
|
|
) if speech_type == "speak" else None
|
||
|
|
|
||
|
|
self.pending_messages[recipient.name].append(Message(
|
||
|
|
sender=agent.name,
|
||
|
|
content=message_content,
|
||
|
|
speech_type=speech_type,
|
||
|
|
turn=turn_number,
|
||
|
|
distance=distance
|
||
|
|
))
|
||
|
|
|
||
|
|
recipient_names = [r.name for r in recipients]
|
||
|
|
|
||
|
|
if recipients:
|
||
|
|
return SpeechResult(
|
||
|
|
success=True,
|
||
|
|
message=f"You {speech_type}: \"{message_content}\"",
|
||
|
|
recipients=recipient_names,
|
||
|
|
speech_type=speech_type,
|
||
|
|
content=message_content
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
return SpeechResult(
|
||
|
|
success=True, # Still succeeds, just nobody heard
|
||
|
|
message=f"You {speech_type} into the emptiness: \"{message_content}\"",
|
||
|
|
recipients=[],
|
||
|
|
speech_type=speech_type,
|
||
|
|
content=message_content
|
||
|
|
)
|
||
|
|
|
||
|
|
def _get_agents_in_room(self, speaker, all_agents: list) -> list:
|
||
|
|
"""Get all agents in the same room as speaker (excluding speaker)."""
|
||
|
|
if not self.world:
|
||
|
|
# Fallback: use proximity
|
||
|
|
return self._get_agents_in_range(speaker, all_agents, 20)
|
||
|
|
|
||
|
|
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
|
||
|
|
speaker_room = self.world.room_at(ax, ay)
|
||
|
|
|
||
|
|
if not speaker_room:
|
||
|
|
return []
|
||
|
|
|
||
|
|
recipients = []
|
||
|
|
for agent in all_agents:
|
||
|
|
if agent.name == speaker.name:
|
||
|
|
continue
|
||
|
|
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
agent_room = self.world.room_at(rx, ry)
|
||
|
|
if agent_room and agent_room.name == speaker_room.name:
|
||
|
|
recipients.append(agent)
|
||
|
|
|
||
|
|
return recipients
|
||
|
|
|
||
|
|
def _get_agents_in_range(self, speaker, all_agents: list, range_tiles: int) -> list:
|
||
|
|
"""Get all agents within Manhattan distance of speaker."""
|
||
|
|
ax, ay = int(speaker.entity.pos[0]), int(speaker.entity.pos[1])
|
||
|
|
|
||
|
|
recipients = []
|
||
|
|
for agent in all_agents:
|
||
|
|
if agent.name == speaker.name:
|
||
|
|
continue
|
||
|
|
rx, ry = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
if manhattan_distance((ax, ay), (rx, ry)) <= range_tiles:
|
||
|
|
recipients.append(agent)
|
||
|
|
|
||
|
|
return recipients
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TAKE Action
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
def execute_take(self, agent, action: Action) -> TakeResult:
|
||
|
|
"""
|
||
|
|
Execute TAKE action - pick up an item.
|
||
|
|
|
||
|
|
Items must be:
|
||
|
|
1. In the WorldGraph as a takeable object
|
||
|
|
2. Within reach (adjacent tile or same tile, distance <= 1)
|
||
|
|
3. Visible in FOV
|
||
|
|
"""
|
||
|
|
item_name = action.args[0].lower() if action.args and action.args[0] else None
|
||
|
|
|
||
|
|
if not item_name:
|
||
|
|
return TakeResult(
|
||
|
|
success=False,
|
||
|
|
message="Take what? Specify an item name.",
|
||
|
|
item_name=""
|
||
|
|
)
|
||
|
|
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
|
||
|
|
# Search for the item in WorldGraph
|
||
|
|
if not self.world:
|
||
|
|
return TakeResult(
|
||
|
|
success=False,
|
||
|
|
message="No items exist in this world.",
|
||
|
|
item_name=item_name
|
||
|
|
)
|
||
|
|
|
||
|
|
# Find matching object
|
||
|
|
matching_obj = None
|
||
|
|
for obj_name, obj in self.world.objects.items():
|
||
|
|
if item_name in obj_name.lower() or obj_name.lower() in item_name:
|
||
|
|
matching_obj = obj
|
||
|
|
break
|
||
|
|
|
||
|
|
if not matching_obj:
|
||
|
|
return TakeResult(
|
||
|
|
success=False,
|
||
|
|
message=f"You don't see any '{item_name}' here.",
|
||
|
|
item_name=item_name
|
||
|
|
)
|
||
|
|
|
||
|
|
# Check if takeable
|
||
|
|
if "takeable" not in matching_obj.affordances:
|
||
|
|
return TakeResult(
|
||
|
|
success=False,
|
||
|
|
message=f"The {matching_obj.display_name} cannot be picked up.",
|
||
|
|
item_name=item_name,
|
||
|
|
item_position=matching_obj.position
|
||
|
|
)
|
||
|
|
|
||
|
|
ox, oy = matching_obj.position
|
||
|
|
|
||
|
|
# Check if visible in FOV
|
||
|
|
if not self.grid.is_in_fov(ox, oy):
|
||
|
|
return TakeResult(
|
||
|
|
success=False,
|
||
|
|
message=f"You can't see the {matching_obj.display_name} from here.",
|
||
|
|
item_name=item_name,
|
||
|
|
item_position=(ox, oy)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Check distance (must be adjacent or same tile)
|
||
|
|
distance = manhattan_distance((ax, ay), (ox, oy))
|
||
|
|
if distance > 1:
|
||
|
|
direction = get_direction_name((ax, ay), (ox, oy))
|
||
|
|
# Use name for cleaner message (display_name often has article already)
|
||
|
|
return TakeResult(
|
||
|
|
success=False,
|
||
|
|
message=f"The {matching_obj.name.replace('_', ' ')} is {distance} tiles away to the {direction}. Move closer to pick it up.",
|
||
|
|
item_name=item_name,
|
||
|
|
item_position=(ox, oy)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Success! Remove from world (simplified - no inventory system yet)
|
||
|
|
del self.world.objects[matching_obj.name]
|
||
|
|
|
||
|
|
return TakeResult(
|
||
|
|
success=True,
|
||
|
|
message=f"You pick up {matching_obj.display_name}.",
|
||
|
|
item_name=matching_obj.name,
|
||
|
|
item_position=(ox, oy)
|
||
|
|
)
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# Movement (single tile, delegates to original executor)
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
def execute_move(self, agent, action: Action) -> ActionResult:
|
||
|
|
"""
|
||
|
|
Execute single-tile movement.
|
||
|
|
|
||
|
|
This is the per-turn movement. Multi-tile paths are handled
|
||
|
|
at the orchestrator level.
|
||
|
|
"""
|
||
|
|
if not action.args or not action.args[0]:
|
||
|
|
return ActionResult(False, "No direction specified")
|
||
|
|
|
||
|
|
direction = action.args[0]
|
||
|
|
if direction not in self.DIRECTION_VECTORS:
|
||
|
|
return ActionResult(False, f"Invalid direction: {direction}")
|
||
|
|
|
||
|
|
dx, dy = self.DIRECTION_VECTORS[direction]
|
||
|
|
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
new_x, new_y = current_x + dx, current_y + dy
|
||
|
|
|
||
|
|
# Bounds check
|
||
|
|
grid_w, grid_h = self.grid.grid_size
|
||
|
|
if not (0 <= new_x < grid_w and 0 <= new_y < grid_h):
|
||
|
|
return ActionResult(False, f"Cannot go {direction} - edge of map")
|
||
|
|
|
||
|
|
# Walkability check
|
||
|
|
target_cell = self.grid.at(new_x, new_y)
|
||
|
|
if not target_cell.walkable:
|
||
|
|
return ActionResult(False, f"Cannot go {direction} - path blocked")
|
||
|
|
|
||
|
|
# Entity collision check
|
||
|
|
for entity in self.grid.entities:
|
||
|
|
if entity is agent.entity:
|
||
|
|
continue
|
||
|
|
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||
|
|
if ex == new_x and ey == new_y:
|
||
|
|
return ActionResult(False, f"Cannot go {direction} - occupied")
|
||
|
|
|
||
|
|
# Execute movement
|
||
|
|
agent.entity.pos = (new_x, new_y)
|
||
|
|
|
||
|
|
return ActionResult(
|
||
|
|
success=True,
|
||
|
|
message=f"Moved {direction.lower()} to ({new_x}, {new_y})",
|
||
|
|
new_position=(new_x, new_y),
|
||
|
|
path=[(current_x, current_y), (new_x, new_y)]
|
||
|
|
)
|
||
|
|
|
||
|
|
def execute_wait(self, agent, action: Action) -> ActionResult:
|
||
|
|
"""Execute WAIT action."""
|
||
|
|
return ActionResult(True, "Waited and observed surroundings")
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# Multi-tile Pathfinding
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
def plan_path_to(self, agent, target_pos: Tuple[int, int],
|
||
|
|
visible_entities: Set[str]) -> Optional[List[Tuple[int, int]]]:
|
||
|
|
"""
|
||
|
|
Plan a path to a target position.
|
||
|
|
|
||
|
|
Uses A* via libtcod if available, otherwise simple pathfinding.
|
||
|
|
Returns list of tiles from current position to target (excluding current).
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
from mcrfpy import libtcod
|
||
|
|
ax, ay = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
|
||
|
|
path = libtcod.find_path(self.grid, ax, ay, target_pos[0], target_pos[1])
|
||
|
|
|
||
|
|
if path:
|
||
|
|
# Store path state
|
||
|
|
path_state = self.get_path_state(agent.name)
|
||
|
|
path_state.path = path
|
||
|
|
path_state.current_index = 0
|
||
|
|
path_state.visible_entities_at_start = visible_entities.copy()
|
||
|
|
|
||
|
|
return path
|
||
|
|
except ImportError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
def continue_path(self, agent, current_visible: Set[str]) -> Optional[ActionResult]:
|
||
|
|
"""
|
||
|
|
Continue an existing multi-tile path.
|
||
|
|
|
||
|
|
Returns ActionResult if moved, None if path complete or interrupted.
|
||
|
|
"""
|
||
|
|
path_state = self.get_path_state(agent.name)
|
||
|
|
|
||
|
|
if not path_state.has_path:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Check for FOV interrupt
|
||
|
|
if path_state.should_interrupt(current_visible):
|
||
|
|
path_state.clear()
|
||
|
|
return None # Signal that LLM should be queried
|
||
|
|
|
||
|
|
# Get next tile
|
||
|
|
next_tile = path_state.next_tile
|
||
|
|
if not next_tile:
|
||
|
|
path_state.clear()
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Move to next tile
|
||
|
|
current_x, current_y = int(agent.entity.pos[0]), int(agent.entity.pos[1])
|
||
|
|
new_x, new_y = next_tile
|
||
|
|
|
||
|
|
# Verify still walkable
|
||
|
|
target_cell = self.grid.at(new_x, new_y)
|
||
|
|
if not target_cell.walkable:
|
||
|
|
path_state.clear()
|
||
|
|
return ActionResult(False, "Path blocked - recalculating")
|
||
|
|
|
||
|
|
# Check for entity collision
|
||
|
|
for entity in self.grid.entities:
|
||
|
|
if entity is agent.entity:
|
||
|
|
continue
|
||
|
|
ex, ey = int(entity.pos[0]), int(entity.pos[1])
|
||
|
|
if ex == new_x and ey == new_y:
|
||
|
|
path_state.clear()
|
||
|
|
return ActionResult(False, "Path blocked by creature")
|
||
|
|
|
||
|
|
# Execute movement
|
||
|
|
agent.entity.pos = (new_x, new_y)
|
||
|
|
path_state.advance()
|
||
|
|
|
||
|
|
remaining = path_state.remaining_tiles
|
||
|
|
if remaining > 0:
|
||
|
|
msg = f"Continuing path ({remaining} tiles remaining)"
|
||
|
|
else:
|
||
|
|
msg = "Arrived at destination"
|
||
|
|
path_state.clear()
|
||
|
|
|
||
|
|
return ActionResult(
|
||
|
|
success=True,
|
||
|
|
message=msg,
|
||
|
|
new_position=(new_x, new_y),
|
||
|
|
path=[(current_x, current_y), (new_x, new_y)]
|
||
|
|
)
|