"""FastAPI server for the dataflow graph renderer. Serves the frontend static files and pushes graph updates over WebSocket when the source dfasm file changes. """ from __future__ import annotations import asyncio import os import threading from pathlib import Path from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from dfgraph.pipeline import run_progressive from dfgraph.graph_json import graph_to_json class ConnectionManager: def __init__(self) -> None: self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket) -> None: await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket) -> None: try: self.active_connections.remove(websocket) except ValueError: # WebSocket already removed by broadcast pass async def broadcast(self, message: dict) -> None: disconnected: list[WebSocket] = [] for connection in self.active_connections: try: await connection.send_json(message) except Exception: disconnected.append(connection) for conn in disconnected: self.active_connections.remove(conn) class DebouncedFileHandler(FileSystemEventHandler): def __init__(self, target_path: str, callback, debounce_s: float = 0.3) -> None: self.target_path = os.path.realpath(target_path) self.callback = callback self.debounce_s = debounce_s self._timer: Optional[threading.Timer] = None def on_modified(self, event) -> None: if event.is_directory: return if os.path.realpath(event.src_path) != self.target_path: return if self._timer is not None: self._timer.cancel() self._timer = threading.Timer(self.debounce_s, self.callback) self._timer.daemon = True self._timer.start() def create_app(source_path: Path) -> FastAPI: manager = ConnectionManager() current_json: dict = {} loop: Optional[asyncio.AbstractEventLoop] = None observer: Optional[Observer] = None def _reassemble() -> dict: try: source = source_path.read_text() except (FileNotFoundError, OSError, UnicodeDecodeError) as e: # Return error structure if file is unreadable return { "type": "graph_update", "stage": "error", "parse_error": f"Failed to read source file: {e}", "nodes": [], "edges": [], "regions": [], "errors": [], "metadata": {}, } result = run_progressive(source) return graph_to_json(result) def _on_file_change() -> None: nonlocal current_json, loop try: current_json = _reassemble() except Exception: # On reassembly failure, keep current_json unchanged # (don't broadcast incomplete or error state) return if loop is not None and not loop.is_closed(): try: asyncio.run_coroutine_threadsafe( manager.broadcast(current_json), loop ) except RuntimeError: # Loop might be stopped or closed pass @asynccontextmanager async def lifespan(app: FastAPI): nonlocal current_json, loop, observer loop = asyncio.get_event_loop() current_json = _reassemble() handler = DebouncedFileHandler( str(source_path), _on_file_change, debounce_s=0.3 ) observer = Observer() observer.schedule(handler, str(source_path.parent), recursive=False) observer.daemon = True observer.start() yield observer.stop() observer.join(timeout=2) app = FastAPI(lifespan=lifespan) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket) -> None: await manager.connect(websocket) try: await websocket.send_json(current_json) while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) frontend_dir = Path(__file__).parent / "frontend" app.mount( "/dist", StaticFiles(directory=str(frontend_dir / "dist")), name="dist", ) app.mount( "/", StaticFiles(directory=str(frontend_dir), html=True), name="frontend", ) return app