"""Code generation for OR1 assembly. Converts fully allocated IRGraphs to emulator-ready configuration objects and token streams. Two output modes: 1. Direct mode: Produces PEConfig/SMConfig lists + seed tokens (for direct setup) 2. Token stream mode: Produces bootstrap sequence (SM init → IRAM writes → ALLOC → frame slot writes → seeds) Reference: Phase 6 design doc, Tasks 1-3. """ from dataclasses import dataclass from collections import defaultdict from asm.ir import ( IRGraph, IRNode, IREdge, ResolvedDest, collect_all_nodes_and_edges, collect_all_data_defs, DEFAULT_IRAM_CAPACITY, DEFAULT_FRAME_COUNT ) from asm.opcodes import is_dyadic from cm_inst import ( Instruction, OutputStyle, TokenKind, FrameOp, FrameDest, MemOp, Port, RoutingOp ) from emu.types import PEConfig, SMConfig from tokens import ( DyadToken, MonadToken, SMToken, Token, PELocalWriteToken, FrameControlToken ) from encoding import pack_instruction, pack_flit1, pack_token from sm_mod import Presence @dataclass(frozen=True) class AssemblyResult: """Result of code generation in direct mode. Attributes: pe_configs: List of PEConfig objects, one per PE sm_configs: List of SMConfig objects, one per SM with data_defs seed_tokens: List of CMTokens (DyadToken/MonadToken) for seed injections setup_tokens: List of tokens for frame setup sequence (SM init → IRAM writes → ALLOC → frame slots) """ pe_configs: list[PEConfig] sm_configs: list[SMConfig] seed_tokens: list[Token] setup_tokens: list[Token] def _build_iram_for_pe( nodes_on_pe: list[IRNode], all_nodes: dict[str, IRNode], all_edges: list[IREdge], ) -> dict[int, Instruction]: """Build IRAM entries as Instruction objects for a single PE. Args: nodes_on_pe: List of IRNodes on this PE all_nodes: All nodes in graph (for lookups) all_edges: All edges in graph (unused in frame model) Returns: Dict mapping IRAM offset to Instruction object """ iram: dict[int, Instruction] = {} for node in nodes_on_pe: # Skip seed nodes and unallocated nodes if node.seed or node.iram_offset is None: continue # Skip nodes without mode (output style) allocation if node.mode is None: continue output_style, has_const, dest_count = node.mode # Build Instruction object inst = Instruction( opcode=node.opcode, output=output_style, has_const=has_const, dest_count=dest_count, wide=node.wide, fref=node.fref or 0, ) iram[node.iram_offset] = inst return iram def _find_const_for_slot( act_nodes: list[IRNode], slot_idx: int, frame_layout, ) -> int | None: """Find the constant value for a frame slot, if any. Args: act_nodes: List of nodes in this activation slot_idx: Frame slot index frame_layout: FrameLayout from the activation Returns: Constant value (0-65535) or None if slot is not a constant slot """ # Scan nodes to find one that stores a constant in this slot # The frame layout's const_slots tell us which slots are for constants. # We map from slot_idx to the node's fref to find the source node. for node in act_nodes: if node.fref is None or node.const is None or not isinstance(node.const, int): continue if not node.mode: continue output_style, has_const, dest_count = node.mode # If this node has a constant (has_const=True), it occupies fref slot if has_const and node.fref == slot_idx: return node.const & 0xFFFF return None def _find_dest_for_slot( act_nodes: list[IRNode], slot_idx: int, frame_layout, all_nodes: dict[str, IRNode], all_edges: list[IREdge], ) -> FrameDest | None: """Find the destination routing for a frame slot. Args: act_nodes: List of nodes in this activation slot_idx: Frame slot index frame_layout: FrameLayout from the activation all_nodes: All nodes in graph (for lookups) all_edges: All edges in graph (unused) Returns: FrameDest if this slot is a destination slot, None otherwise """ # For each node in this activation, destinations are allocated after the constant slot(s). # The frame layout's dest_slots tell us which slots are for destinations. # We map from slot_idx to find which node's destination it represents. for node in act_nodes: if node.fref is None or node.mode is None: continue output_style, has_const, dest_count = node.mode # Compute where this node's destination slots start const_slots_count = int(has_const) # 0 or 1 dest_start = node.fref + const_slots_count # Check if slot_idx falls within this node's destination slots if dest_start <= slot_idx < dest_start + dest_count: # Determine which destination (left or right, 0-indexed) dest_idx = slot_idx - dest_start # Get the destination from the node if dest_idx == 0 and node.dest_l is not None: if isinstance(node.dest_l, ResolvedDest) and node.dest_l.frame_dest is not None: return node.dest_l.frame_dest elif dest_idx == 1 and node.dest_r is not None: if isinstance(node.dest_r, ResolvedDest) and node.dest_r.frame_dest is not None: return node.dest_r.frame_dest return None def _generate_setup_tokens( pe_configs: list[PEConfig], sm_configs: list[SMConfig], nodes_by_pe: dict[int, list[IRNode]], all_nodes: dict[str, IRNode], all_edges: list[IREdge], data_defs: list, ) -> list[Token]: """Generate the ordered setup token sequence for frame-based bootstrap. Order: SM init → IRAM writes → ALLOC → frame slot writes → (seed tokens added separately) Args: pe_configs: List of PEConfig objects (with populated iram) sm_configs: List of SMConfig objects nodes_by_pe: Dict mapping PE ID to list of IRNodes all_nodes: All nodes in graph all_edges: All edges in graph data_defs: List of IRDataDef objects Returns: List of setup tokens in bootstrap order """ tokens: list[Token] = [] # 1. SM init (WRITE ops to populate I-structure cells) for data_def in sorted(data_defs, key=lambda d: (d.sm_id or 0, d.cell_addr or 0)): if data_def.sm_id is not None and data_def.cell_addr is not None: tokens.append(SMToken( target=data_def.sm_id, addr=data_def.cell_addr, op=MemOp.WRITE, flags=None, data=data_def.value, ret=None, )) # 2. IRAM writes via PELocalWriteToken(region=0) for pe_cfg in pe_configs: for offset in sorted(pe_cfg.iram.keys()): inst = pe_cfg.iram[offset] tokens.append(PELocalWriteToken( target=pe_cfg.pe_id, act_id=0, # IRAM writes are activation-independent region=0, slot=offset, data=pack_instruction(inst), is_dest=False, )) # 3. ALLOC — one per activation per PE for pe_id in sorted(nodes_by_pe.keys()): nodes = nodes_by_pe[pe_id] # Collect unique act_ids on this PE (excluding seed nodes) act_ids = sorted({n.act_id for n in nodes if n.act_id is not None and not n.seed}) for act_id in act_ids: tokens.append(FrameControlToken( target=pe_id, act_id=act_id, op=FrameOp.ALLOC, payload=0, )) # 4. Frame slot writes via PELocalWriteToken(region=1) for pe_id in sorted(nodes_by_pe.keys()): nodes = nodes_by_pe[pe_id] # Collect unique act_ids on this PE (excluding seed nodes) act_ids = sorted({n.act_id for n in nodes if n.act_id is not None and not n.seed}) for act_id in act_ids: act_nodes = [n for n in nodes if n.act_id == act_id and not n.seed] if not act_nodes: continue # Get frame layout from first node (canonical per activation) layout = act_nodes[0].frame_layout if layout is None: continue # Write const and dest values per node using fref # (slot_map regions are approximate; node fref is authoritative) for node in act_nodes: if node.fref is None or node.mode is None: continue _, has_const, dest_count = node.mode slot = node.fref if has_const and isinstance(node.const, int): tokens.append(PELocalWriteToken( target=pe_id, act_id=act_id, region=1, slot=slot, data=node.const & 0xFFFF, is_dest=False, )) slot += 1 # Destinations from resolved dest_l/dest_r dests = [] if node.dest_l and hasattr(node.dest_l, 'frame_dest') and node.dest_l.frame_dest: dests.append(node.dest_l.frame_dest) if node.dest_r and hasattr(node.dest_r, 'frame_dest') and node.dest_r.frame_dest: dests.append(node.dest_r.frame_dest) for i, fd in enumerate(dests[:dest_count]): tokens.append(PELocalWriteToken( target=pe_id, act_id=act_id, region=1, slot=slot + i, data=pack_flit1(fd), is_dest=True, )) return tokens def _compute_route_restrictions( nodes_by_pe: dict[int, list[IRNode]], all_edges: list[IREdge], all_nodes: dict[str, IRNode], pe_id: int, ) -> tuple[set[int], set[int]]: """Compute allowed PE and SM routes for a given PE. Analyzes all edges involving nodes on this PE to determine which other PEs and SMs it can route to. Includes self-routes. Args: nodes_by_pe: Dict mapping PE ID to list of nodes on that PE all_edges: List of all edges in graph all_nodes: Dict of all nodes pe_id: The PE we're computing routes for Returns: Tuple of (allowed_pe_routes set, allowed_sm_routes set) """ nodes_on_pe_set = {node.name for node in nodes_by_pe.get(pe_id, [])} pe_routes = {pe_id} # Always include self-route sm_routes = set() # Scan all edges for those sourced from this PE for edge in all_edges: if edge.source in nodes_on_pe_set: # This edge originates from our PE dest_node = all_nodes.get(edge.dest) if dest_node is not None: if dest_node.pe is not None: pe_routes.add(dest_node.pe) # Scan all nodes on this PE for SM instructions for node in nodes_by_pe.get(pe_id, []): if isinstance(node.opcode, MemOp) and node.sm_id is not None: sm_routes.add(node.sm_id) return pe_routes, sm_routes def generate_direct(graph: IRGraph) -> AssemblyResult: """Generate PEConfig, SMConfig, seed tokens, and setup tokens from an allocated IRGraph. Args: graph: A fully allocated IRGraph (after allocate pass) Returns: AssemblyResult with pe_configs, sm_configs, seed_tokens, and setup_tokens """ all_nodes, all_edges = collect_all_nodes_and_edges(graph) all_data_defs = collect_all_data_defs(graph) # Group nodes by PE nodes_by_pe: dict[int, list[IRNode]] = defaultdict(list) for node in all_nodes.values(): if node.pe is not None: nodes_by_pe[node.pe].append(node) # Build PEConfigs pe_configs = [] for pe_id in sorted(nodes_by_pe.keys()): nodes_on_pe = nodes_by_pe[pe_id] # Build IRAM for this PE (Task 1) iram = _build_iram_for_pe(nodes_on_pe, all_nodes, all_edges) # Compute route restrictions allowed_pe_routes, allowed_sm_routes = _compute_route_restrictions( nodes_by_pe, all_edges, all_nodes, pe_id ) # Compute frame configuration from system and node layout frame_count = graph.system.frame_count if graph.system else DEFAULT_FRAME_COUNT frame_slots = graph.system.frame_slots if graph.system and hasattr(graph.system, 'frame_slots') else 64 matchable_offsets = graph.system.matchable_offsets if graph.system and hasattr(graph.system, 'matchable_offsets') else 8 # Build initial_frames and initial_tag_store from node data # Map each activation on this PE to its frame ID and initial slot values initial_frames = {} initial_tag_store = {} act_ids = sorted({n.act_id for n in nodes_on_pe if n.act_id is not None and not n.seed}) for frame_id, act_id in enumerate(act_ids): act_nodes = [n for n in nodes_on_pe if n.act_id == act_id and not n.seed] if not act_nodes: continue # Get frame layout from first node layout = act_nodes[0].frame_layout if layout is None: initial_frames[frame_id] = {} initial_tag_store[act_id] = (frame_id, 0) continue # Build frame slot values for this activation as a sparse dict. # Walk nodes directly using their fref to place const and dest values, # since slot_map regions are approximate (interleaved per-node layout). frame_slots_dict: dict[int, int] = {} for node in act_nodes: if node.fref is None or node.mode is None or node.seed: continue _, has_const, dest_count = node.mode slot = node.fref # Constant at fref position if has_const and isinstance(node.const, int): frame_slots_dict[slot] = node.const & 0xFFFF slot += 1 # Destinations follow const (or start at fref if no const) dests = [] if node.dest_l and hasattr(node.dest_l, 'frame_dest') and node.dest_l.frame_dest: dests.append(node.dest_l.frame_dest) if node.dest_r and hasattr(node.dest_r, 'frame_dest') and node.dest_r.frame_dest: dests.append(node.dest_r.frame_dest) for i, fd in enumerate(dests[:dest_count]): frame_slots_dict[slot + i] = pack_flit1(fd) initial_frames[frame_id] = frame_slots_dict initial_tag_store[act_id] = (frame_id, 0) # Create PEConfig config = PEConfig( pe_id=pe_id, iram=iram, frame_count=frame_count, frame_slots=frame_slots, matchable_offsets=matchable_offsets, initial_frames=initial_frames if initial_frames else None, initial_tag_store=initial_tag_store if initial_tag_store else None, allowed_pe_routes=allowed_pe_routes, allowed_sm_routes=allowed_sm_routes, ) pe_configs.append(config) # Build SMConfigs from data_defs sm_configs_by_id: dict[int, dict[int, tuple[Presence, int]]] = defaultdict(dict) for data_def in all_data_defs: if data_def.sm_id is not None and data_def.cell_addr is not None: sm_configs_by_id[data_def.sm_id][data_def.cell_addr] = ( Presence.FULL, data_def.value ) sm_count = max(1, graph.system.sm_count if graph.system else 1) for sm_id in range(sm_count): if sm_id not in sm_configs_by_id: sm_configs_by_id[sm_id] = {} sm_configs = [] for sm_id in sorted(sm_configs_by_id.keys()): initial_cells = sm_configs_by_id[sm_id] config = SMConfig( sm_id=sm_id, initial_cells=initial_cells if initial_cells else None, ) sm_configs.append(config) # Generate setup tokens (Task 2) setup_tokens = _generate_setup_tokens( pe_configs, sm_configs, nodes_by_pe, all_nodes, all_edges, all_data_defs, ) # Detect seed tokens (Task 3) seed_tokens = [] # Build edge indices edges_by_dest = defaultdict(list) edges_by_source = defaultdict(list) for edge in all_edges: edges_by_dest[edge.dest].append(edge) edges_by_source[edge.source].append(edge) for node in all_nodes.values(): if node.seed: # Seed node: generate token(s) targeted at destination(s) out_edges = edges_by_source.get(node.name, []) for edge in out_edges: dest_node = all_nodes.get(edge.dest) if dest_node is None or dest_node.pe is None: continue dest_is_dyadic = is_dyadic(dest_node.opcode, dest_node.const) if dest_is_dyadic: # Task 3: Use act_id, not ctx; no gen field token = DyadToken( target=dest_node.pe, offset=dest_node.iram_offset if dest_node.iram_offset is not None else 0, act_id=dest_node.act_id if dest_node.act_id is not None else 0, data=node.const if node.const is not None else 0, port=edge.port, ) else: # Task 3: Use act_id, not ctx token = MonadToken( target=dest_node.pe, offset=dest_node.iram_offset if dest_node.iram_offset is not None else 0, act_id=dest_node.act_id if dest_node.act_id is not None else 0, data=node.const if node.const is not None else 0, inline=False, ) seed_tokens.append(token) elif node.opcode == RoutingOp.CONST: # Triggerable constant: CONST node in IRAM with no incoming edges if node.name not in edges_by_dest: token = MonadToken( target=node.pe if node.pe is not None else 0, offset=node.iram_offset if node.iram_offset is not None else 0, act_id=node.act_id if node.act_id is not None else 0, data=node.const if node.const is not None else 0, inline=False, ) seed_tokens.append(token) return AssemblyResult( pe_configs=pe_configs, sm_configs=sm_configs, seed_tokens=seed_tokens, setup_tokens=setup_tokens, ) def generate_tokens(graph: IRGraph) -> list[Token]: """Generate bootstrap token sequence from an allocated IRGraph. Produces tokens in order: SM init → IRAM writes → ALLOC → frame slot writes → seeds Args: graph: A fully allocated IRGraph (after allocate pass) Returns: List of tokens in bootstrap order """ # Use direct mode to get configs, setup tokens, and seeds result = generate_direct(graph) tokens = [] # 1. Setup tokens (SM init → IRAM writes → ALLOC → frame slot writes) tokens.extend(result.setup_tokens) # 2. Seed tokens (Task 3) tokens.extend(result.seed_tokens) return tokens