HTTP Communication Architecture
Elevator Saga uses a client-server architecture with HTTP-based communication. The server manages the simulation state and physics, while clients send commands and query state over HTTP.
Architecture Overview
┌─────────────────┐ ┌─────────────────┐
│ Client │ │ Server │
│ (Controller) │ │ (Simulator) │
├─────────────────┤ ├─────────────────┤
│ base_controller │◄──── HTTP ────────►│ simulator.py │
│ proxy_models │ │ Flask Routes │
│ api_client │ │ │
└─────────────────┘ └─────────────────┘
│ │
│ GET /api/state │
│ POST /api/step │
│ POST /api/elevators/:id/go_to_floor │
│ │
└───────────────────────────────────────┘
The separation provides:
Modularity: Server and client can be developed independently
Multiple Clients: Multiple controllers can compete/cooperate
Language Flexibility: Clients can be written in any language
Network Deployment: Server and client can run on different machines
Server Side: Simulator
The server is implemented in elevator_saga/server/simulator.py
using Flask as the HTTP framework.
API Endpoints
GET /api/state
Returns complete simulation state:
@app.route("/api/state", methods=["GET"])
def get_state() -> Response:
try:
state = simulation.get_state()
return json_response(state)
except Exception as e:
return json_response({"error": str(e)}, 500)
Response format:
{
"tick": 42,
"elevators": [
{
"id": 0,
"position": {"current_floor": 2, "target_floor": 5, "floor_up_position": 3},
"passengers": [101, 102],
"max_capacity": 10,
"run_status": "constant_speed",
"energy_consumed": 38.5,
"energy_rate": 1.0,
"..."
}
],
"floors": [
{"floor": 0, "up_queue": [103], "down_queue": []},
"..."
],
"passengers": {
"101": {"id": 101, "origin": 0, "destination": 5, "..."}
},
"metrics": {
"done": 50,
"total": 100,
"avg_wait": 15.2,
"p95_wait": 30.0,
"avg_system": 25.5,
"p95_system": 45.0,
"total_energy_consumption": 156.0
}
}
POST /api/step
Advances simulation by specified number of ticks:
@app.route("/api/step", methods=["POST"])
def step_simulation() -> Response:
try:
data = request.get_json() or {}
ticks = data.get("ticks", 1)
events = simulation.step(ticks)
return json_response({
"tick": simulation.tick,
"events": events,
})
except Exception as e:
return json_response({"error": str(e)}, 500)
Request body:
{"ticks": 1}
Response:
{
"tick": 43,
"events": [
{
"tick": 43,
"type": "stopped_at_floor",
"data": {"elevator": 0, "floor": 5, "reason": "move_reached"}
}
]
}
POST /api/elevators/:id/go_to_floor
Commands an elevator to go to a floor:
@app.route("/api/elevators/<int:elevator_id>/go_to_floor", methods=["POST"])
def elevator_go_to_floor(elevator_id: int) -> Response:
try:
data = request.get_json() or {}
floor = data["floor"]
immediate = data.get("immediate", False)
simulation.elevator_go_to_floor(elevator_id, floor, immediate)
return json_response({"success": True})
except Exception as e:
return json_response({"error": str(e)}, 500)
Request body:
{"floor": 5, "immediate": false}
immediate=false
: Set as next target after current destinationimmediate=true
: Change target immediately (cancels current target)
POST /api/reset
Resets simulation to initial state:
@app.route("/api/reset", methods=["POST"])
def reset_simulation() -> Response:
try:
simulation.reset()
return json_response({"success": True})
except Exception as e:
return json_response({"error": str(e)}, 500)
POST /api/traffic/next
Loads next traffic scenario:
@app.route("/api/traffic/next", methods=["POST"])
def next_traffic_round() -> Response:
try:
full_reset = request.get_json().get("full_reset", False)
success = simulation.next_traffic_round(full_reset)
if success:
return json_response({"success": True})
else:
return json_response({"success": False, "error": "No more scenarios"}, 400)
except Exception as e:
return json_response({"error": str(e)}, 500)
GET /api/traffic/info
Gets current traffic scenario information:
@app.route("/api/traffic/info", methods=["GET"])
def get_traffic_info() -> Response:
try:
info = simulation.get_traffic_info()
return json_response(info)
except Exception as e:
return json_response({"error": str(e)}, 500)
Response:
{
"current_index": 0,
"total_files": 5,
"max_tick": 1000
}
Client Side: API Client
The client is implemented in elevator_saga/client/api_client.py
using Python’s built-in urllib
library.
ElevatorAPIClient Class
class ElevatorAPIClient:
"""Unified elevator API client"""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip("/")
# Caching fields
self._cached_state: Optional[SimulationState] = None
self._cached_tick: int = -1
self._tick_processed: bool = False
State Caching Strategy
The API client implements smart caching to reduce HTTP requests:
def get_state(self, force_reload: bool = False) -> SimulationState:
"""Get simulation state with caching"""
# Return cached state if valid
if not force_reload and self._cached_state is not None and not self._tick_processed:
return self._cached_state
# Fetch fresh state
response_data = self._send_get_request("/api/state")
# ... parse and create SimulationState ...
# Update cache
self._cached_state = simulation_state
self._cached_tick = simulation_state.tick
self._tick_processed = False # Mark as fresh
return simulation_state
def mark_tick_processed(self) -> None:
"""Mark current tick as processed, invalidating cache"""
self._tick_processed = True
Cache Behavior:
First
get_state()
call in a tick fetches from serverSubsequent calls within same tick return cached data
After
step()
is called, cache is invalidatedNext
get_state()
fetches fresh data
This provides:
Performance: Minimize HTTP requests
Consistency: All operations in a tick see same state
Freshness: New tick always gets new state
Core API Methods
get_state(force_reload=False)
Fetches current simulation state:
def get_state(self, force_reload: bool = False) -> SimulationState:
if not force_reload and self._cached_state is not None and not self._tick_processed:
return self._cached_state
response_data = self._send_get_request("/api/state")
# Parse response into data models
elevators = [ElevatorState.from_dict(e) for e in response_data["elevators"]]
floors = [FloorState.from_dict(f) for f in response_data["floors"]]
# ... handle passengers and metrics ...
simulation_state = SimulationState(
tick=response_data["tick"],
elevators=elevators,
floors=floors,
passengers=passengers,
metrics=metrics,
events=[]
)
# Update cache
self._cached_state = simulation_state
self._cached_tick = simulation_state.tick
self._tick_processed = False
return simulation_state
step(ticks=1)
Advances simulation:
def step(self, ticks: int = 1) -> StepResponse:
response_data = self._send_post_request("/api/step", {"ticks": ticks})
# Parse events
events = []
for event_data in response_data["events"]:
event_dict = event_data.copy()
if "type" in event_dict:
event_dict["type"] = EventType(event_dict["type"])
events.append(SimulationEvent.from_dict(event_dict))
return StepResponse(
success=True,
tick=response_data["tick"],
events=events
)
go_to_floor(elevator_id, floor, immediate=False)
Sends elevator to floor:
def go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> bool:
command = GoToFloorCommand(
elevator_id=elevator_id,
floor=floor,
immediate=immediate
)
try:
response = self.send_elevator_command(command)
return response
except Exception as e:
debug_log(f"Go to floor failed: {e}")
return False
HTTP Request Implementation
The client uses urllib.request
for HTTP communication:
GET Request:
def _send_get_request(self, endpoint: str) -> Dict[str, Any]:
url = f"{self.base_url}{endpoint}"
try:
with urllib.request.urlopen(url, timeout=60) as response:
data = json.loads(response.read().decode("utf-8"))
return data
except urllib.error.URLError as e:
raise RuntimeError(f"GET {url} failed: {e}")
POST Request:
def _send_post_request(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}{endpoint}"
request_body = json.dumps(data).encode("utf-8")
req = urllib.request.Request(
url,
data=request_body,
headers={"Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=600) as response:
response_data = json.loads(response.read().decode("utf-8"))
return response_data
except urllib.error.URLError as e:
raise RuntimeError(f"POST {url} failed: {e}")
Communication Flow
Typical communication sequence during one tick:
Client Server
│ │
│ 1. GET /api/state │
├─────────────────────────────────────►│
│ ◄── SimulationState (cached) │
│ │
│ 2. Analyze state, make decisions │
│ │
│ 3. POST /api/elevators/0/go_to_floor│
├─────────────────────────────────────►│
│ ◄── {"success": true} │
│ │
│ 4. GET /api/state (from cache) │
│ No HTTP request! │
│ │
│ 5. POST /api/step │
├─────────────────────────────────────►│
│ Server processes tick │
│ - Moves elevators │
│ - Boards/alights passengers │
│ - Generates events │
│ ◄── {tick: 43, events: [...]} │
│ │
│ 6. Process events │
│ Cache invalidated │
│ │
│ 7. GET /api/state (fetches fresh) │
├─────────────────────────────────────►│
│ ◄── SimulationState │
│ │
└──────────────────────────────────────┘
Error Handling
Both client and server implement robust error handling:
Server Side:
@app.route("/api/step", methods=["POST"])
def step_simulation() -> Response:
try:
# ... process request ...
return json_response(result)
except Exception as e:
return json_response({"error": str(e)}, 500)
Client Side:
def go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> bool:
try:
response = self.send_elevator_command(command)
return response
except Exception as e:
debug_log(f"Go to floor failed: {e}")
return False
Thread Safety
The simulator uses a lock to ensure thread-safe access:
class ElevatorSimulation:
def __init__(self, ...):
self.lock = threading.Lock()
def step(self, num_ticks: int = 1) -> List[SimulationEvent]:
with self.lock:
# ... process ticks ...
def get_state(self) -> SimulationStateResponse:
with self.lock:
# ... return state ...
This allows Flask to handle concurrent requests safely.
Batch Commands:
# ❌ Bad - sequential commands
elevator1.go_to_floor(5)
time.sleep(0.1) # Wait for response
elevator2.go_to_floor(3)
# ✅ Good - issue commands quickly
elevator1.go_to_floor(5)
elevator2.go_to_floor(3)
# All commands received before next tick
Cache Awareness:
Use mark_tick_processed()
to explicitly invalidate cache if needed, but normally the framework handles this automatically.
Testing the API
You can test the API directly using curl:
# Get state
curl http://127.0.0.1:8000/api/state
# Step simulation
curl -X POST http://127.0.0.1:8000/api/step \
-H "Content-Type: application/json" \
-d '{"ticks": 1}'
# Send elevator to floor
curl -X POST http://127.0.0.1:8000/api/elevators/0/go_to_floor \
-H "Content-Type: application/json" \
-d '{"floor": 5, "immediate": false}'
# Reset simulation
curl -X POST http://127.0.0.1:8000/api/reset \
-H "Content-Type: application/json" \
-d '{}'
Next Steps
See Event-Driven Simulation and Tick-Based Execution for understanding how events drive the simulation
See Client Architecture and Proxy Models for using the API through proxy models
Check the source code for complete implementation details