OR-1 dataflow CPU sketch
at main 533 lines 17 kB view raw
1"""Convert IRGraph with StateSnapshot to JSON for the monitor frontend. 2 3Extends dfgraph/graph_json.py with execution state overlay: active, matched, 4executed flags on nodes, and token_flow on edges. Synthesizes SM nodes with 5live state from snapshots and event-driven overlay. 6""" 7 8from __future__ import annotations 9 10import logging 11import math 12from typing import Any 13 14from cm_inst import MemOp 15from asm.ir import ( 16 IRGraph, IRNode, IREdge, ResolvedDest, 17 collect_all_nodes_and_edges, collect_all_data_defs, 18) 19from asm.opcodes import OP_TO_MNEMONIC 20from dfgraph.categories import OpcodeCategory, categorise, CATEGORY_COLOURS 21from emu.events import ( 22 SimEvent, TokenReceived, Matched, Executed, Emitted, 23 CellWritten, DeferredRead as DeferredReadEvent, 24 DeferredSatisfied, ResultSent, 25 FrameAllocated, FrameFreed, FrameSlotWritten, TokenRejected, 26) 27from monitor.snapshot import StateSnapshot 28 29logger = logging.getLogger(__name__) 30 31SM_NODE_PREFIX = "__sm_" 32 33 34def _safe_time(value: float) -> float | None: 35 """Convert non-finite floats to None for JSON serialization.""" 36 if math.isinf(value) or math.isnan(value): 37 return None 38 return value 39 40 41def _parse_component(component: str) -> tuple[str, int]: 42 """Parse component string format 'pe:0' or 'sm:1' into (kind, id).""" 43 kind, _, id_str = component.partition(":") 44 if not id_str: 45 raise ValueError(f"Invalid component format: {component}") 46 return kind, int(id_str) 47 48 49def _serialise_node(node: IRNode) -> dict[str, Any]: 50 """Serialize an IR node to JSON.""" 51 category = categorise(node.opcode) 52 mnemonic = OP_TO_MNEMONIC[node.opcode] 53 54 return { 55 "id": node.name, 56 "opcode": mnemonic, 57 "category": category.value, 58 "colour": CATEGORY_COLOURS[category], 59 "const": node.const, 60 "pe": node.pe, 61 "iram_offset": node.iram_offset, 62 "act_id": node.act_id, 63 "has_error": False, 64 "loc": { 65 "line": 0, 66 "column": 0, 67 "end_line": None, 68 "end_column": None, 69 }, 70 "active": False, 71 "matched": False, 72 "executed": False, 73 } 74 75 76def _serialise_slot(slot: Any) -> Any: 77 """Serialize a frame slot value to JSON-compatible format.""" 78 if slot is None: 79 return None 80 if isinstance(slot, int): 81 return slot 82 # slot is a FrameDest 83 return { 84 "target_pe": slot.target_pe, 85 "offset": slot.offset, 86 "act_id": slot.act_id, 87 "port": slot.port.name, 88 "token_kind": slot.token_kind.name, 89 } 90 91 92def _serialise_edge(edge: IREdge) -> dict[str, Any]: 93 """Serialize an IR edge to JSON.""" 94 return { 95 "source": edge.source, 96 "target": edge.dest, 97 "port": edge.port.name, 98 "source_port": edge.source_port.name if edge.source_port else None, 99 "has_error": False, 100 "token_flow": False, 101 } 102 103 104def _serialise_pe_state(pe_id: int, snapshot: StateSnapshot) -> dict[str, Any]: 105 """Serialize PE state from snapshot.""" 106 pe_snap = snapshot.pes.get(pe_id) 107 if not pe_snap: 108 return {} 109 110 iram_json = {} 111 for offset, inst in pe_snap.iram.items(): 112 opcode_str = str(inst.opcode.name) if hasattr(inst.opcode, 'name') else str(inst.opcode) 113 iram_json[str(offset)] = { 114 "opcode": opcode_str, 115 "offset": offset, 116 } 117 118 frames_json = [[_serialise_slot(s) for s in frame] for frame in pe_snap.frames] 119 120 tag_store_json = { 121 str(act_id): {"frame_id": fid, "lane": lane} 122 for act_id, (fid, lane) in pe_snap.tag_store.items() 123 } 124 125 return { 126 "pe_id": pe_snap.pe_id, 127 "iram": iram_json, 128 "frames": frames_json, 129 "tag_store": tag_store_json, 130 "lane_count": pe_snap.lane_count, 131 "free_frames": list(pe_snap.free_frames), 132 "input_queue_size": len(pe_snap.input_queue), 133 } 134 135 136def _serialise_sm_state(sm_id: int, snapshot: StateSnapshot) -> dict[str, Any]: 137 """Serialize SM state from snapshot.""" 138 sm_snap = snapshot.sms.get(sm_id) 139 if not sm_snap: 140 return {} 141 142 cells = {} 143 for addr, cell_snap in sm_snap.cells.items(): 144 cells[str(addr)] = { 145 "presence": cell_snap.pres.name, 146 "data_l": cell_snap.data_l, 147 "data_r": cell_snap.data_r, 148 } 149 150 deferred_read = None 151 if sm_snap.deferred_read: 152 deferred_read = { 153 "cell_addr": sm_snap.deferred_read["cell_addr"], 154 "return_route": str(sm_snap.deferred_read["return_route"]), 155 } 156 157 return { 158 "cells": cells, 159 "deferred_read": deferred_read, 160 "t0_store_size": len(sm_snap.t0_store), 161 "input_queue_depth": len(sm_snap.input_queue), 162 } 163 164 165def _serialise_event(event: SimEvent) -> dict[str, Any]: 166 """Serialize a SimEvent to JSON.""" 167 base = { 168 "type": type(event).__name__, 169 "time": event.time, 170 "component": event.component, 171 } 172 173 if isinstance(event, TokenReceived): 174 base["details"] = {"token": str(event.token)} 175 elif isinstance(event, Matched): 176 base["details"] = { 177 "left": event.left, 178 "right": event.right, 179 "act_id": event.act_id, 180 "frame_id": event.frame_id, 181 "offset": event.offset, 182 } 183 elif isinstance(event, Executed): 184 base["details"] = { 185 "op": str(event.op), 186 "result": event.result, 187 "bool_out": event.bool_out, 188 } 189 elif isinstance(event, Emitted): 190 base["details"] = {"token": str(event.token)} 191 elif isinstance(event, FrameAllocated): 192 base["details"] = { 193 "act_id": event.act_id, 194 "frame_id": event.frame_id, 195 "lane": event.lane, 196 } 197 elif isinstance(event, FrameFreed): 198 base["details"] = { 199 "act_id": event.act_id, 200 "frame_id": event.frame_id, 201 "lane": event.lane, 202 "frame_freed": event.frame_freed, 203 } 204 elif isinstance(event, FrameSlotWritten): 205 base["details"] = { 206 "frame_id": event.frame_id, 207 "slot": event.slot, 208 "value": event.value, 209 } 210 elif isinstance(event, TokenRejected): 211 base["details"] = { 212 "reason": event.reason, 213 } 214 else: 215 base["details"] = {} 216 217 return base 218 219 220# --------------------------------------------------------------------------- 221# SM node/edge synthesis 222# --------------------------------------------------------------------------- 223 224 225def _collect_referenced_sm_ids( 226 all_nodes: dict[str, IRNode], 227 ir_graph: IRGraph, 228) -> set[int]: 229 """Collect SM IDs referenced by MemOp nodes or data definitions.""" 230 sm_ids: set[int] = set() 231 for node in all_nodes.values(): 232 if isinstance(node.opcode, MemOp) and node.sm_id is not None: 233 sm_ids.add(node.sm_id) 234 for data_def in collect_all_data_defs(ir_graph): 235 if data_def.sm_id is not None: 236 sm_ids.add(data_def.sm_id) 237 return sm_ids 238 239 240def _build_sm_node_label(sm_id: int, snapshot: StateSnapshot) -> str: 241 """Build a compact text label for an SM node showing cell contents and deferred reads.""" 242 sm_snap = snapshot.sms.get(sm_id) 243 if not sm_snap: 244 return f"SM {sm_id}" 245 246 lines = [f"SM {sm_id}"] 247 248 for addr in sorted(sm_snap.cells.keys()): 249 cell = sm_snap.cells[addr] 250 pres = cell.pres.name 251 if cell.data_l is not None: 252 lines.append(f"[{addr}] {pres} {cell.data_l}") 253 else: 254 lines.append(f"[{addr}] {pres}") 255 256 if sm_snap.deferred_read: 257 dr = sm_snap.deferred_read 258 lines.append(f"DR: [{dr['cell_addr']}]") 259 260 return "\n".join(lines) 261 262 263def _synthesize_sm_nodes( 264 sm_ids: set[int], 265 snapshot: StateSnapshot, 266) -> list[dict[str, Any]]: 267 """Create synthetic monitor graph nodes for each referenced SM instance.""" 268 category = OpcodeCategory.STRUCTURE_MEMORY 269 nodes = [] 270 for sm_id in sorted(sm_ids): 271 label = _build_sm_node_label(sm_id, snapshot) 272 sm_snap = snapshot.sms.get(sm_id) 273 274 sm_state = None 275 if sm_snap: 276 cells = {} 277 for addr, cell in sm_snap.cells.items(): 278 cells[str(addr)] = { 279 "presence": cell.pres.name, 280 "data_l": cell.data_l, 281 "data_r": cell.data_r, 282 } 283 deferred_read = None 284 if sm_snap.deferred_read: 285 deferred_read = { 286 "cell_addr": sm_snap.deferred_read["cell_addr"], 287 "return_route": str(sm_snap.deferred_read["return_route"]), 288 } 289 sm_state = { 290 "cells": cells, 291 "deferred_read": deferred_read, 292 "t0_store_size": len(sm_snap.t0_store), 293 } 294 295 nodes.append({ 296 "id": f"{SM_NODE_PREFIX}{sm_id}", 297 "opcode": "sm", 298 "label": label, 299 "category": category.value, 300 "colour": CATEGORY_COLOURS[category], 301 "const": None, 302 "pe": None, 303 "iram_offset": None, 304 "act_id": None, 305 "has_error": False, 306 "loc": {"line": 0, "column": 0, "end_line": None, "end_column": None}, 307 "sm_id": sm_id, 308 "synthetic": True, 309 "sm_state": sm_state, 310 # Execution overlay 311 "active": False, 312 "matched": False, 313 "executed": False, 314 "cell_written": False, 315 }) 316 return nodes 317 318 319def _synthesize_sm_edges( 320 all_nodes: dict[str, IRNode], 321) -> list[dict[str, Any]]: 322 """Create synthetic edges between MemOp nodes and their target SM nodes.""" 323 edges: list[dict[str, Any]] = [] 324 for node in all_nodes.values(): 325 if not isinstance(node.opcode, MemOp) or node.sm_id is None: 326 continue 327 328 sm_node_id = f"{SM_NODE_PREFIX}{node.sm_id}" 329 330 # Request edge: instruction → SM 331 edges.append({ 332 "source": node.name, 333 "target": sm_node_id, 334 "port": "REQ", 335 "source_port": None, 336 "has_error": False, 337 "token_flow": False, 338 "synthetic": True, 339 }) 340 341 # Return edge: SM → requesting node (data flows back to the reader) 342 if isinstance(node.dest_l, ResolvedDest): 343 edges.append({ 344 "source": sm_node_id, 345 "target": node.name, 346 "port": "RET", 347 "source_port": None, 348 "has_error": False, 349 "token_flow": False, 350 "synthetic": True, 351 }) 352 353 return edges 354 355 356def _apply_sm_event_overlay( 357 events: list[SimEvent], 358 node_by_id: dict[str, dict[str, Any]], 359 edge_by_key: dict[tuple[str, str], dict[str, Any]], 360) -> None: 361 """Apply execution overlay from SM events onto synthesized SM nodes and edges.""" 362 for event in events: 363 try: 364 kind, comp_id = _parse_component(event.component) 365 except ValueError: 366 continue 367 if kind != "sm": 368 continue 369 370 sm_node_id = f"{SM_NODE_PREFIX}{comp_id}" 371 sm_node = node_by_id.get(sm_node_id) 372 if not sm_node: 373 continue 374 375 if isinstance(event, TokenReceived): 376 sm_node["active"] = True 377 378 elif isinstance(event, CellWritten): 379 sm_node["cell_written"] = True 380 381 elif isinstance(event, (DeferredReadEvent, DeferredSatisfied)): 382 sm_node["active"] = True 383 384 elif isinstance(event, ResultSent): 385 # Mark return edges from this SM as having token flow 386 for key, edge in edge_by_key.items(): 387 if key[0] == sm_node_id: 388 edge["token_flow"] = True 389 390 391# --------------------------------------------------------------------------- 392# Public API 393# --------------------------------------------------------------------------- 394 395 396def graph_to_monitor_json( 397 ir_graph: IRGraph, 398 snapshot: StateSnapshot, 399 events: list[SimEvent], 400) -> dict[str, Any]: 401 """Convert IRGraph + StateSnapshot + events to monitor JSON.""" 402 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph) 403 404 # Serialize base nodes and edges 405 nodes_json = [_serialise_node(node) for node in all_nodes.values()] 406 edges_json = [_serialise_edge(edge) for edge in all_edges] 407 408 # Synthesize SM nodes and edges 409 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph) 410 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot) 411 sm_edges = _synthesize_sm_edges(all_nodes) 412 nodes_json.extend(sm_nodes) 413 edges_json.extend(sm_edges) 414 415 # Create lookup dicts for quick updates 416 node_by_id = {n["id"]: n for n in nodes_json} 417 edge_by_key = {(e["source"], e["target"]): e for e in edges_json} 418 419 # Apply PE execution overlay from events 420 for event in events: 421 if isinstance(event, TokenReceived): 422 try: 423 kind, pe_id = _parse_component(event.component) 424 if kind == "pe": 425 for node in all_nodes.values(): 426 if node.pe == pe_id: 427 if node.name in node_by_id: 428 node_by_id[node.name]["active"] = True 429 except ValueError: 430 pass 431 432 elif isinstance(event, Matched): 433 try: 434 kind, pe_id = _parse_component(event.component) 435 if kind == "pe": 436 for node in all_nodes.values(): 437 if node.pe == pe_id and node.iram_offset == event.offset: 438 if node.name in node_by_id: 439 node_by_id[node.name]["matched"] = True 440 except ValueError: 441 pass 442 443 elif isinstance(event, Executed): 444 try: 445 kind, pe_id = _parse_component(event.component) 446 if kind == "pe": 447 for node in all_nodes.values(): 448 if node.pe == pe_id: 449 if node.name in node_by_id: 450 node_by_id[node.name]["executed"] = True 451 except ValueError: 452 pass 453 454 # Mark token flow on edges based on emitted tokens 455 for event in events: 456 if isinstance(event, Emitted): 457 token = event.token 458 if hasattr(token, 'target'): 459 for edge in edges_json: 460 dest_id = edge["target"] 461 dest_node = node_by_id.get(dest_id) 462 if dest_node and dest_node.get("pe") == token.target: 463 edge["token_flow"] = True 464 465 # Apply SM event overlay 466 _apply_sm_event_overlay(events, node_by_id, edge_by_key) 467 468 # Serialize state 469 state_json = { 470 "pes": { 471 str(pe_id): _serialise_pe_state(pe_id, snapshot) 472 for pe_id in snapshot.pes.keys() 473 }, 474 "sms": { 475 str(sm_id): _serialise_sm_state(sm_id, snapshot) 476 for sm_id in snapshot.sms.keys() 477 }, 478 } 479 480 events_json = [_serialise_event(e) for e in events] 481 482 return { 483 "type": "monitor_update", 484 "graph": { 485 "nodes": nodes_json, 486 "edges": edges_json, 487 "regions": [], 488 }, 489 "state": state_json, 490 "events": events_json, 491 "sim_time": snapshot.sim_time, 492 "next_time": _safe_time(snapshot.next_time), 493 "finished": snapshot.next_time == float('inf'), 494 } 495 496 497def graph_loaded_json(ir_graph: IRGraph, snapshot: StateSnapshot) -> dict[str, Any]: 498 """Initial graph load (no events, no execution overlay flags set).""" 499 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph) 500 501 nodes_json = [_serialise_node(node) for node in all_nodes.values()] 502 edges_json = [_serialise_edge(edge) for edge in all_edges] 503 504 # Synthesize SM nodes and edges 505 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph) 506 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot) 507 sm_edges = _synthesize_sm_edges(all_nodes) 508 nodes_json.extend(sm_nodes) 509 edges_json.extend(sm_edges) 510 511 state_json = { 512 "pes": { 513 str(pe_id): _serialise_pe_state(pe_id, snapshot) 514 for pe_id in snapshot.pes.keys() 515 }, 516 "sms": { 517 str(sm_id): _serialise_sm_state(sm_id, snapshot) 518 for sm_id in snapshot.sms.keys() 519 }, 520 } 521 522 return { 523 "type": "graph_loaded", 524 "graph": { 525 "nodes": nodes_json, 526 "edges": edges_json, 527 "regions": [], 528 }, 529 "state": state_json, 530 "sim_time": snapshot.sim_time, 531 "next_time": _safe_time(snapshot.next_time), 532 "finished": snapshot.next_time == float('inf'), 533 }