HTTP Communication Architecture

Elevator Saga uses a client-server architecture with HTTP-based communication. The server manages the simulation state and physics, while clients send commands and query state over HTTP.

Architecture Overview

┌─────────────────┐                    ┌─────────────────┐
│  Client         │                    │  Server         │
│  (Controller)   │                    │  (Simulator)    │
├─────────────────┤                    ├─────────────────┤
│ base_controller │◄──── HTTP ────────►│ simulator.py    │
│ proxy_models    │                    │ Flask Routes    │
│ api_client      │                    │                 │
└─────────────────┘                    └─────────────────┘
        │                                       │
        │  GET /api/state                       │
        │  POST /api/step                       │
        │  POST /api/elevators/:id/go_to_floor  │
        │                                       │
        └───────────────────────────────────────┘

The separation provides:

  • Modularity: Server and client can be developed independently

  • Multiple Clients: Multiple controllers can compete/cooperate

  • Language Flexibility: Clients can be written in any language

  • Network Deployment: Server and client can run on different machines

Server Side: Simulator

The server is implemented in elevator_saga/server/simulator.py using Flask as the HTTP framework.

API Endpoints

GET /api/state

Returns complete simulation state:

@app.route("/api/state", methods=["GET"])
def get_state() -> Response:
    try:
        state = simulation.get_state()
        return json_response(state)
    except Exception as e:
        return json_response({"error": str(e)}, 500)

Response format:

{
  "tick": 42,
  "elevators": [
    {
      "id": 0,
      "position": {"current_floor": 2, "target_floor": 5, "floor_up_position": 3},
      "passengers": [101, 102],
      "max_capacity": 10,
      "run_status": "constant_speed",
      "energy_consumed": 38.5,
      "energy_rate": 1.0,
      "..."
    }
  ],
  "floors": [
    {"floor": 0, "up_queue": [103], "down_queue": []},
    "..."
  ],
  "passengers": {
    "101": {"id": 101, "origin": 0, "destination": 5, "..."}
  },
  "metrics": {
    "done": 50,
    "total": 100,
    "avg_wait": 15.2,
    "p95_wait": 30.0,
    "avg_system": 25.5,
    "p95_system": 45.0,
    "total_energy_consumption": 156.0
  }
}

POST /api/step

Advances simulation by specified number of ticks:

@app.route("/api/step", methods=["POST"])
def step_simulation() -> Response:
    try:
        data = request.get_json() or {}
        ticks = data.get("ticks", 1)
        events = simulation.step(ticks)
        return json_response({
            "tick": simulation.tick,
            "events": events,
        })
    except Exception as e:
        return json_response({"error": str(e)}, 500)

Request body:

{"ticks": 1}

Response:

{
  "tick": 43,
  "events": [
    {
      "tick": 43,
      "type": "stopped_at_floor",
      "data": {"elevator": 0, "floor": 5, "reason": "move_reached"}
    }
  ]
}

POST /api/elevators/:id/go_to_floor

Commands an elevator to go to a floor:

@app.route("/api/elevators/<int:elevator_id>/go_to_floor", methods=["POST"])
def elevator_go_to_floor(elevator_id: int) -> Response:
    try:
        data = request.get_json() or {}
        floor = data["floor"]
        immediate = data.get("immediate", False)
        simulation.elevator_go_to_floor(elevator_id, floor, immediate)
        return json_response({"success": True})
    except Exception as e:
        return json_response({"error": str(e)}, 500)

Request body:

{"floor": 5, "immediate": false}
  • immediate=false: Set as next target after current destination

  • immediate=true: Change target immediately (cancels current target)

POST /api/reset

Resets simulation to initial state:

@app.route("/api/reset", methods=["POST"])
def reset_simulation() -> Response:
    try:
        simulation.reset()
        return json_response({"success": True})
    except Exception as e:
        return json_response({"error": str(e)}, 500)

POST /api/traffic/next

Loads next traffic scenario:

@app.route("/api/traffic/next", methods=["POST"])
def next_traffic_round() -> Response:
    try:
        full_reset = request.get_json().get("full_reset", False)
        success = simulation.next_traffic_round(full_reset)
        if success:
            return json_response({"success": True})
        else:
            return json_response({"success": False, "error": "No more scenarios"}, 400)
    except Exception as e:
        return json_response({"error": str(e)}, 500)

GET /api/traffic/info

Gets current traffic scenario information:

@app.route("/api/traffic/info", methods=["GET"])
def get_traffic_info() -> Response:
    try:
        info = simulation.get_traffic_info()
        return json_response(info)
    except Exception as e:
        return json_response({"error": str(e)}, 500)

Response:

{
  "current_index": 0,
  "total_files": 5,
  "max_tick": 1000
}

Client Side: API Client

The client is implemented in elevator_saga/client/api_client.py using Python’s built-in urllib library.

ElevatorAPIClient Class

class ElevatorAPIClient:
    """Unified elevator API client"""

    def __init__(self, base_url: str):
        self.base_url = base_url.rstrip("/")
        # Caching fields
        self._cached_state: Optional[SimulationState] = None
        self._cached_tick: int = -1
        self._tick_processed: bool = False

State Caching Strategy

The API client implements smart caching to reduce HTTP requests:

def get_state(self, force_reload: bool = False) -> SimulationState:
    """Get simulation state with caching"""
    # Return cached state if valid
    if not force_reload and self._cached_state is not None and not self._tick_processed:
        return self._cached_state

    # Fetch fresh state
    response_data = self._send_get_request("/api/state")
    # ... parse and create SimulationState ...

    # Update cache
    self._cached_state = simulation_state
    self._cached_tick = simulation_state.tick
    self._tick_processed = False  # Mark as fresh

    return simulation_state

def mark_tick_processed(self) -> None:
    """Mark current tick as processed, invalidating cache"""
    self._tick_processed = True

Cache Behavior:

  1. First get_state() call in a tick fetches from server

  2. Subsequent calls within same tick return cached data

  3. After step() is called, cache is invalidated

  4. Next get_state() fetches fresh data

This provides:

  • Performance: Minimize HTTP requests

  • Consistency: All operations in a tick see same state

  • Freshness: New tick always gets new state

Core API Methods

get_state(force_reload=False)

Fetches current simulation state:

def get_state(self, force_reload: bool = False) -> SimulationState:
    if not force_reload and self._cached_state is not None and not self._tick_processed:
        return self._cached_state

    response_data = self._send_get_request("/api/state")

    # Parse response into data models
    elevators = [ElevatorState.from_dict(e) for e in response_data["elevators"]]
    floors = [FloorState.from_dict(f) for f in response_data["floors"]]
    # ... handle passengers and metrics ...

    simulation_state = SimulationState(
        tick=response_data["tick"],
        elevators=elevators,
        floors=floors,
        passengers=passengers,
        metrics=metrics,
        events=[]
    )

    # Update cache
    self._cached_state = simulation_state
    self._cached_tick = simulation_state.tick
    self._tick_processed = False

    return simulation_state

step(ticks=1)

Advances simulation:

def step(self, ticks: int = 1) -> StepResponse:
    response_data = self._send_post_request("/api/step", {"ticks": ticks})

    # Parse events
    events = []
    for event_data in response_data["events"]:
        event_dict = event_data.copy()
        if "type" in event_dict:
            event_dict["type"] = EventType(event_dict["type"])
        events.append(SimulationEvent.from_dict(event_dict))

    return StepResponse(
        success=True,
        tick=response_data["tick"],
        events=events
    )

go_to_floor(elevator_id, floor, immediate=False)

Sends elevator to floor:

def go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> bool:
    command = GoToFloorCommand(
        elevator_id=elevator_id,
        floor=floor,
        immediate=immediate
    )

    try:
        response = self.send_elevator_command(command)
        return response
    except Exception as e:
        debug_log(f"Go to floor failed: {e}")
        return False

HTTP Request Implementation

The client uses urllib.request for HTTP communication:

GET Request:

def _send_get_request(self, endpoint: str) -> Dict[str, Any]:
    url = f"{self.base_url}{endpoint}"

    try:
        with urllib.request.urlopen(url, timeout=60) as response:
            data = json.loads(response.read().decode("utf-8"))
            return data
    except urllib.error.URLError as e:
        raise RuntimeError(f"GET {url} failed: {e}")

POST Request:

def _send_post_request(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
    url = f"{self.base_url}{endpoint}"
    request_body = json.dumps(data).encode("utf-8")

    req = urllib.request.Request(
        url,
        data=request_body,
        headers={"Content-Type": "application/json"}
    )

    try:
        with urllib.request.urlopen(req, timeout=600) as response:
            response_data = json.loads(response.read().decode("utf-8"))
            return response_data
    except urllib.error.URLError as e:
        raise RuntimeError(f"POST {url} failed: {e}")

Communication Flow

Typical communication sequence during one tick:

Client                                 Server
  │                                      │
  │  1. GET /api/state                   │
  ├─────────────────────────────────────►│
  │  ◄── SimulationState (cached)        │
  │                                      │
  │  2. Analyze state, make decisions    │
  │                                      │
  │  3. POST /api/elevators/0/go_to_floor│
  ├─────────────────────────────────────►│
  │  ◄── {"success": true}               │
  │                                      │
  │  4. GET /api/state (from cache)      │
  │     No HTTP request!                 │
  │                                      │
  │  5. POST /api/step                   │
  ├─────────────────────────────────────►│
  │         Server processes tick        │
  │         - Moves elevators            │
  │         - Boards/alights passengers  │
  │         - Generates events           │
  │  ◄── {tick: 43, events: [...]}       │
  │                                      │
  │  6. Process events                   │
  │     Cache invalidated                │
  │                                      │
  │  7. GET /api/state (fetches fresh)   │
  ├─────────────────────────────────────►│
  │  ◄── SimulationState                 │
  │                                      │
  └──────────────────────────────────────┘

Error Handling

Both client and server implement robust error handling:

Server Side:

@app.route("/api/step", methods=["POST"])
def step_simulation() -> Response:
    try:
        # ... process request ...
        return json_response(result)
    except Exception as e:
        return json_response({"error": str(e)}, 500)

Client Side:

def go_to_floor(self, elevator_id: int, floor: int, immediate: bool = False) -> bool:
    try:
        response = self.send_elevator_command(command)
        return response
    except Exception as e:
        debug_log(f"Go to floor failed: {e}")
        return False

Thread Safety

The simulator uses a lock to ensure thread-safe access:

class ElevatorSimulation:
    def __init__(self, ...):
        self.lock = threading.Lock()

    def step(self, num_ticks: int = 1) -> List[SimulationEvent]:
        with self.lock:
            # ... process ticks ...

    def get_state(self) -> SimulationStateResponse:
        with self.lock:
            # ... return state ...

This allows Flask to handle concurrent requests safely.

Batch Commands:

# ❌ Bad - sequential commands
elevator1.go_to_floor(5)
time.sleep(0.1)  # Wait for response
elevator2.go_to_floor(3)

# ✅ Good - issue commands quickly
elevator1.go_to_floor(5)
elevator2.go_to_floor(3)
# All commands received before next tick

Cache Awareness:

Use mark_tick_processed() to explicitly invalidate cache if needed, but normally the framework handles this automatically.

Testing the API

You can test the API directly using curl:

# Get state
curl http://127.0.0.1:8000/api/state

# Step simulation
curl -X POST http://127.0.0.1:8000/api/step \
  -H "Content-Type: application/json" \
  -d '{"ticks": 1}'

# Send elevator to floor
curl -X POST http://127.0.0.1:8000/api/elevators/0/go_to_floor \
  -H "Content-Type: application/json" \
  -d '{"floor": 5, "immediate": false}'

# Reset simulation
curl -X POST http://127.0.0.1:8000/api/reset \
  -H "Content-Type: application/json" \
  -d '{}'

Next Steps