OR-1 dataflow CPU sketch
at main 155 lines 4.8 kB view raw
1"""FastAPI server for the dataflow graph renderer. 2 3Serves the frontend static files and pushes graph updates over WebSocket 4when the source dfasm file changes. 5""" 6 7from __future__ import annotations 8 9import asyncio 10import os 11import threading 12from pathlib import Path 13from typing import Optional 14 15from contextlib import asynccontextmanager 16 17from fastapi import FastAPI, WebSocket, WebSocketDisconnect 18from fastapi.staticfiles import StaticFiles 19from watchdog.observers import Observer 20from watchdog.events import FileSystemEventHandler 21 22from dfgraph.pipeline import run_progressive 23from dfgraph.graph_json import graph_to_json 24 25 26class ConnectionManager: 27 def __init__(self) -> None: 28 self.active_connections: list[WebSocket] = [] 29 30 async def connect(self, websocket: WebSocket) -> None: 31 await websocket.accept() 32 self.active_connections.append(websocket) 33 34 def disconnect(self, websocket: WebSocket) -> None: 35 try: 36 self.active_connections.remove(websocket) 37 except ValueError: 38 # WebSocket already removed by broadcast 39 pass 40 41 async def broadcast(self, message: dict) -> None: 42 disconnected: list[WebSocket] = [] 43 for connection in self.active_connections: 44 try: 45 await connection.send_json(message) 46 except Exception: 47 disconnected.append(connection) 48 for conn in disconnected: 49 self.active_connections.remove(conn) 50 51 52class DebouncedFileHandler(FileSystemEventHandler): 53 def __init__(self, target_path: str, callback, debounce_s: float = 0.3) -> None: 54 self.target_path = os.path.realpath(target_path) 55 self.callback = callback 56 self.debounce_s = debounce_s 57 self._timer: Optional[threading.Timer] = None 58 59 def on_modified(self, event) -> None: 60 if event.is_directory: 61 return 62 if os.path.realpath(event.src_path) != self.target_path: 63 return 64 if self._timer is not None: 65 self._timer.cancel() 66 self._timer = threading.Timer(self.debounce_s, self.callback) 67 self._timer.daemon = True 68 self._timer.start() 69 70 71def create_app(source_path: Path) -> FastAPI: 72 manager = ConnectionManager() 73 current_json: dict = {} 74 loop: Optional[asyncio.AbstractEventLoop] = None 75 observer: Optional[Observer] = None 76 77 def _reassemble() -> dict: 78 try: 79 source = source_path.read_text() 80 except (FileNotFoundError, OSError, UnicodeDecodeError) as e: 81 # Return error structure if file is unreadable 82 return { 83 "type": "graph_update", 84 "stage": "error", 85 "parse_error": f"Failed to read source file: {e}", 86 "nodes": [], 87 "edges": [], 88 "regions": [], 89 "errors": [], 90 "metadata": {}, 91 } 92 result = run_progressive(source) 93 return graph_to_json(result) 94 95 def _on_file_change() -> None: 96 nonlocal current_json, loop 97 try: 98 current_json = _reassemble() 99 except Exception: 100 # On reassembly failure, keep current_json unchanged 101 # (don't broadcast incomplete or error state) 102 return 103 if loop is not None and not loop.is_closed(): 104 try: 105 asyncio.run_coroutine_threadsafe( 106 manager.broadcast(current_json), loop 107 ) 108 except RuntimeError: 109 # Loop might be stopped or closed 110 pass 111 112 @asynccontextmanager 113 async def lifespan(app: FastAPI): 114 nonlocal current_json, loop, observer 115 loop = asyncio.get_event_loop() 116 current_json = _reassemble() 117 118 handler = DebouncedFileHandler( 119 str(source_path), _on_file_change, debounce_s=0.3 120 ) 121 observer = Observer() 122 observer.schedule(handler, str(source_path.parent), recursive=False) 123 observer.daemon = True 124 observer.start() 125 126 yield 127 128 observer.stop() 129 observer.join(timeout=2) 130 131 app = FastAPI(lifespan=lifespan) 132 133 @app.websocket("/ws") 134 async def websocket_endpoint(websocket: WebSocket) -> None: 135 await manager.connect(websocket) 136 try: 137 await websocket.send_json(current_json) 138 while True: 139 await websocket.receive_text() 140 except WebSocketDisconnect: 141 manager.disconnect(websocket) 142 143 frontend_dir = Path(__file__).parent / "frontend" 144 app.mount( 145 "/dist", 146 StaticFiles(directory=str(frontend_dir / "dist")), 147 name="dist", 148 ) 149 app.mount( 150 "/", 151 StaticFiles(directory=str(frontend_dir), html=True), 152 name="frontend", 153 ) 154 155 return app