OR-1 dataflow CPU sketch
1"""FastAPI server for the dataflow graph renderer.
2
3Serves the frontend static files and pushes graph updates over WebSocket
4when the source dfasm file changes.
5"""
6
7from __future__ import annotations
8
9import asyncio
10import os
11import threading
12from pathlib import Path
13from typing import Optional
14
15from contextlib import asynccontextmanager
16
17from fastapi import FastAPI, WebSocket, WebSocketDisconnect
18from fastapi.staticfiles import StaticFiles
19from watchdog.observers import Observer
20from watchdog.events import FileSystemEventHandler
21
22from dfgraph.pipeline import run_progressive
23from dfgraph.graph_json import graph_to_json
24
25
26class ConnectionManager:
27 def __init__(self) -> None:
28 self.active_connections: list[WebSocket] = []
29
30 async def connect(self, websocket: WebSocket) -> None:
31 await websocket.accept()
32 self.active_connections.append(websocket)
33
34 def disconnect(self, websocket: WebSocket) -> None:
35 try:
36 self.active_connections.remove(websocket)
37 except ValueError:
38 # WebSocket already removed by broadcast
39 pass
40
41 async def broadcast(self, message: dict) -> None:
42 disconnected: list[WebSocket] = []
43 for connection in self.active_connections:
44 try:
45 await connection.send_json(message)
46 except Exception:
47 disconnected.append(connection)
48 for conn in disconnected:
49 self.active_connections.remove(conn)
50
51
52class DebouncedFileHandler(FileSystemEventHandler):
53 def __init__(self, target_path: str, callback, debounce_s: float = 0.3) -> None:
54 self.target_path = os.path.realpath(target_path)
55 self.callback = callback
56 self.debounce_s = debounce_s
57 self._timer: Optional[threading.Timer] = None
58
59 def on_modified(self, event) -> None:
60 if event.is_directory:
61 return
62 if os.path.realpath(event.src_path) != self.target_path:
63 return
64 if self._timer is not None:
65 self._timer.cancel()
66 self._timer = threading.Timer(self.debounce_s, self.callback)
67 self._timer.daemon = True
68 self._timer.start()
69
70
71def create_app(source_path: Path) -> FastAPI:
72 manager = ConnectionManager()
73 current_json: dict = {}
74 loop: Optional[asyncio.AbstractEventLoop] = None
75 observer: Optional[Observer] = None
76
77 def _reassemble() -> dict:
78 try:
79 source = source_path.read_text()
80 except (FileNotFoundError, OSError, UnicodeDecodeError) as e:
81 # Return error structure if file is unreadable
82 return {
83 "type": "graph_update",
84 "stage": "error",
85 "parse_error": f"Failed to read source file: {e}",
86 "nodes": [],
87 "edges": [],
88 "regions": [],
89 "errors": [],
90 "metadata": {},
91 }
92 result = run_progressive(source)
93 return graph_to_json(result)
94
95 def _on_file_change() -> None:
96 nonlocal current_json, loop
97 try:
98 current_json = _reassemble()
99 except Exception:
100 # On reassembly failure, keep current_json unchanged
101 # (don't broadcast incomplete or error state)
102 return
103 if loop is not None and not loop.is_closed():
104 try:
105 asyncio.run_coroutine_threadsafe(
106 manager.broadcast(current_json), loop
107 )
108 except RuntimeError:
109 # Loop might be stopped or closed
110 pass
111
112 @asynccontextmanager
113 async def lifespan(app: FastAPI):
114 nonlocal current_json, loop, observer
115 loop = asyncio.get_event_loop()
116 current_json = _reassemble()
117
118 handler = DebouncedFileHandler(
119 str(source_path), _on_file_change, debounce_s=0.3
120 )
121 observer = Observer()
122 observer.schedule(handler, str(source_path.parent), recursive=False)
123 observer.daemon = True
124 observer.start()
125
126 yield
127
128 observer.stop()
129 observer.join(timeout=2)
130
131 app = FastAPI(lifespan=lifespan)
132
133 @app.websocket("/ws")
134 async def websocket_endpoint(websocket: WebSocket) -> None:
135 await manager.connect(websocket)
136 try:
137 await websocket.send_json(current_json)
138 while True:
139 await websocket.receive_text()
140 except WebSocketDisconnect:
141 manager.disconnect(websocket)
142
143 frontend_dir = Path(__file__).parent / "frontend"
144 app.mount(
145 "/dist",
146 StaticFiles(directory=str(frontend_dir / "dist")),
147 name="dist",
148 )
149 app.mount(
150 "/",
151 StaticFiles(directory=str(frontend_dir), html=True),
152 name="frontend",
153 )
154
155 return app