feat: Add mcrfpy.step() and synchronous screenshot for headless mode (closes #153)
Implements Python-controlled simulation advancement for headless mode: - Add mcrfpy.step(dt) to advance simulation by dt seconds - step(None) advances to next scheduled event (timer/animation) - Timers use simulation_time in headless mode for deterministic behavior - automation.screenshot() now renders synchronously in headless mode (captures current state, not previous frame) This enables LLM agent orchestration (#156) by allowing: - Set perspective, take screenshot, query LLM - all synchronous - Deterministic simulation control without frame timing issues - Event-driven advancement with step(None) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
f33e79a123
commit
60ffa68d04
7 changed files with 409 additions and 10 deletions
124
tests/unit/test_step_function.py
Normal file
124
tests/unit/test_step_function.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test mcrfpy.step() function (#153)
|
||||
===================================
|
||||
|
||||
Tests the Python-controlled simulation advancement for headless mode.
|
||||
|
||||
Key behavior:
|
||||
- step(dt) advances simulation by dt seconds
|
||||
- step(None) or step() advances to next scheduled event
|
||||
- Returns actual time advanced
|
||||
- In windowed mode, returns 0.0 (no-op)
|
||||
"""
|
||||
|
||||
import mcrfpy
|
||||
import sys
|
||||
|
||||
def run_tests():
|
||||
"""Run step() function tests"""
|
||||
print("=== mcrfpy.step() Tests ===\n")
|
||||
|
||||
# Test 1: step() with specific dt value
|
||||
print("Test 1: step() with specific dt value")
|
||||
dt = mcrfpy.step(0.1) # Advance 100ms
|
||||
print(f" step(0.1) returned: {dt}")
|
||||
# In headless mode, should return 0.1
|
||||
# In windowed mode, returns 0.0
|
||||
if dt == 0.0:
|
||||
print(" Note: Running in windowed mode - step() is no-op")
|
||||
else:
|
||||
assert abs(dt - 0.1) < 0.001, f"Expected ~0.1, got {dt}"
|
||||
print(" Correctly advanced by 0.1 seconds")
|
||||
print()
|
||||
|
||||
# Test 2: step() with integer value (converts to float)
|
||||
print("Test 2: step() with integer value")
|
||||
dt = mcrfpy.step(1) # Advance 1 second
|
||||
print(f" step(1) returned: {dt}")
|
||||
if dt != 0.0:
|
||||
assert abs(dt - 1.0) < 0.001, f"Expected ~1.0, got {dt}"
|
||||
print(" Correctly advanced by 1.0 seconds")
|
||||
print()
|
||||
|
||||
# Test 3: step(None) - advance to next event
|
||||
print("Test 3: step(None) - advance to next event")
|
||||
dt = mcrfpy.step(None)
|
||||
print(f" step(None) returned: {dt}")
|
||||
if dt != 0.0:
|
||||
assert dt >= 0, "step(None) should return non-negative dt"
|
||||
print(f" Advanced by {dt} seconds to next event")
|
||||
print()
|
||||
|
||||
# Test 4: step() with no argument (same as step(None))
|
||||
print("Test 4: step() with no argument")
|
||||
dt = mcrfpy.step()
|
||||
print(f" step() returned: {dt}")
|
||||
if dt != 0.0:
|
||||
assert dt >= 0, "step() should return non-negative dt"
|
||||
print(f" Advanced by {dt} seconds")
|
||||
print()
|
||||
|
||||
# Test 5: Timer callback with step()
|
||||
print("Test 5: Timer fires after step() advances past interval")
|
||||
timer_fired = [False] # Use list for mutable closure
|
||||
|
||||
def on_timer(runtime):
|
||||
"""Timer callback - receives runtime in ms"""
|
||||
timer_fired[0] = True
|
||||
print(f" Timer fired at simulation time={runtime}ms")
|
||||
|
||||
# Set a timer for 500ms
|
||||
mcrfpy.setTimer("test_timer", on_timer, 500)
|
||||
|
||||
# Step 600ms - timer should fire (500ms interval + some buffer)
|
||||
dt = mcrfpy.step(0.6)
|
||||
if dt != 0.0: # Headless mode
|
||||
# Timer should have fired
|
||||
if timer_fired[0]:
|
||||
print(" Timer correctly fired after step(0.6)")
|
||||
else:
|
||||
# Try another step to ensure timer fires
|
||||
mcrfpy.step(0.1)
|
||||
if timer_fired[0]:
|
||||
print(" Timer fired after additional step")
|
||||
else:
|
||||
print(" WARNING: Timer didn't fire - check timer synchronization")
|
||||
else:
|
||||
print(" Skipping timer test in windowed mode")
|
||||
|
||||
# Clean up
|
||||
mcrfpy.delTimer("test_timer")
|
||||
print()
|
||||
|
||||
# Test 6: Error handling - invalid argument type
|
||||
print("Test 6: Error handling - invalid argument type")
|
||||
try:
|
||||
mcrfpy.step("invalid")
|
||||
print(" ERROR: Should have raised TypeError")
|
||||
return False
|
||||
except TypeError as e:
|
||||
print(f" Correctly raised TypeError: {e}")
|
||||
print()
|
||||
|
||||
print("=== All step() Tests Passed! ===")
|
||||
return True
|
||||
|
||||
# Main execution
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# Create a scene for the test
|
||||
mcrfpy.createScene("test_step")
|
||||
mcrfpy.setScene("test_step")
|
||||
|
||||
if run_tests():
|
||||
print("\nPASS")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("\nFAIL")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"\nFAIL: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
Loading…
Add table
Add a link
Reference in a new issue