""" Frame-based ProcessingElement for OR1 dataflow CPU. Implements: - Frame-based matching with tag_store + presence bits - Mode-driven output routing (INHERIT, CHANGE_TAG, SINK) - PE-level EXTRACT_TAG and ALLOC_REMOTE handling - Side path handling for PELocalWriteToken and FrameControlToken - Cycle-accurate pipeline: 5 cycles dyadic, 4 cycles monadic, 2 cycles side paths """ import logging from typing import Optional import simpy from cm_inst import ( ALUOp, ArithOp, FrameDest, FrameOp, FrameSlotValue, Instruction, LogicOp, MemOp, OutputStyle, Port, RoutingOp, TokenKind, is_monadic_alu, ) from encoding import pack_flit1, unpack_flit1, unpack_instruction from emu.alu import execute from emu.events import ( Emitted, EventCallback, Executed, FrameAllocated, FrameFreed, FrameSlotWritten, IRAMWritten, Matched, TokenReceived, TokenRejected, ) from emu.types import PEConfig from tokens import ( CMToken, DyadToken, FrameControlToken, MonadToken, PELocalWriteToken, PEToken, SMToken, Token, ) logger = logging.getLogger(__name__) class ProcessingElement: """Frame-based Processing Element for OR1 dataflow CPU. Manages: - Frame store: [frame_count][frame_slots] dense per-activation data - Tag store: act_id → (frame_id, lane) mapping - Match data: [frame_id][matchable_offsets][lane_count] for operand values - Presence bits: [frame_id][matchable_offsets][lane_count] for dyadic matching - Port store: [frame_id][matchable_offsets][lane_count] for port metadata - Lane free: per-frame set of available lane IDs - Free frames: pool of available frame IDs Pipeline (per token): - Side paths (FrameControlToken, PELocalWriteToken): 1 cycle - Dyadic CMToken: 5 cycles (dequeue + IFETCH + MATCH + EXECUTE + EMIT) - Monadic CMToken: 4 cycles (dequeue + IFETCH + EXECUTE + EMIT) """ def __init__( self, env: simpy.Environment, pe_id: int, config: PEConfig, ): self.env = env self.pe_id = pe_id self.frame_count = config.frame_count self.frame_slots = config.frame_slots self.matchable_offsets = config.matchable_offsets # Frame storage self.frames: list[list[Optional[FrameSlotValue]]] = [ [None for _ in range(config.frame_slots)] for _ in range(config.frame_count) ] # Tag store: act_id → (frame_id, lane) self.tag_store: dict[int, tuple[int, int]] = dict(config.initial_tag_store or {}) # Match data: [frame_id][match_slot][lane] - operand values waiting for partner self.match_data: list[list[list[Optional[int]]]] = [ [ [None for _ in range(config.lane_count)] for _ in range(config.matchable_offsets) ] for _ in range(config.frame_count) ] # Presence bits: [frame_id][match_slot][lane] - True if operand waiting for partner self.presence: list[list[list[bool]]] = [ [ [False for _ in range(config.lane_count)] for _ in range(config.matchable_offsets) ] for _ in range(config.frame_count) ] # Port store: [frame_id][match_slot][lane] - port of waiting operand self.port_store: list[list[list[Optional[Port]]]] = [ [ [None for _ in range(config.lane_count)] for _ in range(config.matchable_offsets) ] for _ in range(config.frame_count) ] self.lane_count = config.lane_count # Free frames pool self.free_frames = list(range(config.frame_count)) for frame_id, _lane in self.tag_store.values(): if frame_id in self.free_frames: self.free_frames.remove(frame_id) # Lane tracking: which lanes are free per frame self.lane_free: dict[int, set[int]] = {} # Initialize lane_free for pre-loaded tag_store entries for act_id, (frame_id, lane) in self.tag_store.items(): if frame_id not in self.lane_free: # First time seeing this frame — set up lane tracking all_lanes = set(range(self.lane_count)) self.lane_free[frame_id] = all_lanes - {lane} else: self.lane_free[frame_id].discard(lane) # Load initial frame data if config.initial_frames: for frame_id, slots in config.initial_frames.items(): if isinstance(slots, dict): # Dict format: {slot_idx: slot_value} # Slot values can be: # - int (packed flit1 from codegen) → unpack to FrameDest # - FrameDest (from direct test construction) → use as-is for slot_idx, slot_value in slots.items(): if 0 <= slot_idx < config.frame_slots: if isinstance(slot_value, int): # Packed flit1 from codegen: unpack to FrameDest self.frames[frame_id][slot_idx] = unpack_flit1(slot_value) else: # Already a FrameDest or other value: use as-is self.frames[frame_id][slot_idx] = slot_value elif isinstance(slots, list): # List format: [slot0, slot1, ...] raw values for slot_idx, slot_value in enumerate(slots): if 0 <= slot_idx < config.frame_slots: self.frames[frame_id][slot_idx] = slot_value # IRAM self.iram: dict[int, Instruction] = config.iram or {} # Network routing self.input_store: simpy.Store = simpy.Store(env) self.route_table: dict[int, simpy.Store] = {} self.sm_routes: dict[int, simpy.Store] = {} # Observability self._on_event: EventCallback = config.on_event or (lambda _: None) self._component = f"pe:{pe_id}" self.output_log: list = [] # Start main process self.process = env.process(self._run()) def _run(self) -> None: """Main loop: dequeue token, emit TokenReceived, spawn processor.""" while True: token = yield self.input_store.get() yield self.env.timeout(1) # dequeue cycle self._on_event(TokenReceived( time=self.env.now, component=self._component, token=token, )) self.env.process(self._process_token(token)) def _process_token(self, token: PEToken) -> None: """Process a single token through the pipeline. Dispatches to side paths (FrameControlToken, PELocalWriteToken) or CMToken pipeline (IFETCH → act_id resolution → MATCH → EXECUTE → EMIT). """ if isinstance(token, FrameControlToken): yield self.env.timeout(1) self._handle_frame_control(token) return if isinstance(token, PELocalWriteToken): yield self.env.timeout(1) self._handle_local_write(token) return # CMToken pipeline: IFETCH → act_id resolution → MATCH → EXECUTE → EMIT if not isinstance(token, CMToken): logger.warning(f"PE {self.pe_id}: unknown token type {type(token)}") return # IFETCH (1 cycle) inst = self.iram.get(token.offset) yield self.env.timeout(1) if inst is None: logger.warning(f"PE {self.pe_id}: no instruction at offset {token.offset}") return # Act_id resolution (no cycle - just validation) if token.act_id not in self.tag_store: self._on_event(TokenRejected( time=self.env.now, component=self._component, token=token, reason=f"act_id {token.act_id} not in tag store", )) return frame_id, lane = self.tag_store[token.act_id] # Determine if monadic or dyadic instruction is_monadic = ( isinstance(token, MonadToken) or (isinstance(token, DyadToken) and ( isinstance(inst.opcode, MemOp) or (isinstance(inst.opcode, ALUOp) and is_monadic_alu(inst.opcode)) )) ) # MATCH (1 cycle for dyadic, 0 for monadic) if isinstance(token, MonadToken): left, right = token.data, None elif isinstance(token, DyadToken): if is_monadic: left, right = token.data, None else: # Dyadic matching via presence bits operands = self._match_frame(token, inst, frame_id, lane) yield self.env.timeout(1) # match cycle if operands is None: return # waiting for partner left, right = operands else: return # EXECUTE & EMIT depends on opcode type if isinstance(inst.opcode, MemOp): # SM dispatch: EXECUTE cycle computes, then EMIT cycle delivers # Total: 4 cycles for monadic (dequeue + IFETCH + EXECUTE + EMIT) self._on_event(Executed( time=self.env.now, component=self._component, op=inst.opcode, result=0, bool_out=False, )) yield self.env.timeout(1) # EXECUTE cycle yield self.env.timeout(1) # EMIT cycle self._build_and_emit_sm_new(inst, left, right, token.act_id, frame_id) elif inst.opcode == RoutingOp.EXTRACT_TAG: # PE-level: pack current PE/act_id/offset into flit 1 # Total: 4 cycles (dequeue + IFETCH + EXECUTE + EMIT) result = pack_flit1(FrameDest( target_pe=self.pe_id, offset=token.offset, act_id=token.act_id, port=Port.L, token_kind=TokenKind.DYADIC, )) self._on_event(Executed( time=self.env.now, component=self._component, op=inst.opcode, result=result, bool_out=False, )) yield self.env.timeout(1) # EXECUTE cycle yield self.env.timeout(1) # EMIT cycle self._do_emit_new(inst, result, False, token.act_id, frame_id) elif inst.opcode == RoutingOp.ALLOC_REMOTE: # PE-level: read target PE, act_id, and optional parent act_id from frame constants # fref+0: target PE # fref+1: target act_id # fref+2: parent act_id (0 = fresh ALLOC, non-zero = ALLOC_SHARED) # Total: 4 cycles (dequeue + IFETCH + EXECUTE + EMIT) target_pe = self.frames[frame_id][inst.fref] if inst.fref < len(self.frames[frame_id]) else 0 target_act = self.frames[frame_id][inst.fref + 1] if inst.fref + 1 < len(self.frames[frame_id]) else 0 parent_act = self.frames[frame_id][inst.fref + 2] if inst.fref + 2 < len(self.frames[frame_id]) else 0 # Guard against None slot values if target_pe is None or target_act is None: logger.warning(f"PE {self.pe_id}: ALLOC_REMOTE has None at fref slots, skipping") return if parent_act: alloc_op = FrameOp.ALLOC_SHARED payload = parent_act else: alloc_op = FrameOp.ALLOC payload = 0 fct = FrameControlToken( target=target_pe, act_id=target_act, op=alloc_op, payload=payload, ) self._on_event(Executed( time=self.env.now, component=self._component, op=inst.opcode, result=0, bool_out=False, )) yield self.env.timeout(1) # EXECUTE cycle yield self.env.timeout(1) # EMIT cycle self.env.process(self._deliver(self.route_table[target_pe], fct)) elif inst.opcode == RoutingOp.FREE_FRAME: # Deallocate frame: compute and free, then EMIT cycle (no output token) # Total: 4 cycles (dequeue + IFETCH + EXECUTE + EMIT) result, bool_out = execute(inst.opcode, left, right, None) self._on_event(Executed( time=self.env.now, component=self._component, op=inst.opcode, result=result, bool_out=bool_out, )) yield self.env.timeout(1) # EXECUTE cycle yield self.env.timeout(1) # EMIT cycle (no output token) # Frame deallocation happens during EMIT cycle with smart FREE logic if token.act_id in self.tag_store: self._smart_free(token.act_id) else: logger.warning(f"PE {self.pe_id}: FREE_FRAME for unknown act_id {token.act_id}") else: # Normal ALU execute # MINOR FIX: Restructure const_val handling to avoid dead code const_val = None if inst.has_const and inst.fref < len(self.frames[frame_id]): const_val = self.frames[frame_id][inst.fref] if not isinstance(const_val, int): const_val = None result, bool_out = execute(inst.opcode, left, right, const_val) self._on_event(Executed( time=self.env.now, component=self._component, op=inst.opcode, result=result, bool_out=bool_out, )) yield self.env.timeout(1) # EXECUTE cycle yield self.env.timeout(1) # EMIT cycle self._do_emit_new(inst, result, bool_out, token.act_id, frame_id, left=left) def _smart_free(self, act_id: int) -> None: """Smart FREE helper: deallocate lane, possibly returning frame to free list. Does NOT yield. Caller handles timing. Emits FrameFreed event. """ if act_id not in self.tag_store: return # Caller should have checked, but skip silently frame_id, lane = self.tag_store.pop(act_id) # Clear this lane's match state for i in range(self.matchable_offsets): self.match_data[frame_id][i][lane] = None self.presence[frame_id][i][lane] = False self.port_store[frame_id][i][lane] = None # Check if any other activations use this frame frame_in_use = any(fid == frame_id for fid, _ in self.tag_store.values()) if frame_in_use: # Return lane to pool, keep frame self.lane_free[frame_id].add(lane) self._on_event(FrameFreed( time=self.env.now, component=self._component, act_id=act_id, frame_id=frame_id, lane=lane, frame_freed=False, )) else: # Last lane — return frame to free list self.free_frames.append(frame_id) if frame_id in self.lane_free: del self.lane_free[frame_id] # Clear frame slots for i in range(self.frame_slots): self.frames[frame_id][i] = None self._on_event(FrameFreed( time=self.env.now, component=self._component, act_id=act_id, frame_id=frame_id, lane=lane, frame_freed=True, )) def _handle_frame_control(self, token: FrameControlToken) -> None: """Handle ALLOC, FREE, ALLOC_SHARED, and FREE_LANE operations.""" if token.op == FrameOp.ALLOC: if self.free_frames: frame_id = self.free_frames.pop() self.tag_store[token.act_id] = (frame_id, 0) # Set up lane tracking: lane 0 is taken, rest are free self.lane_free[frame_id] = set(range(1, self.lane_count)) # Initialize frame slots to None for i in range(self.frame_slots): self.frames[frame_id][i] = None # Reset all lanes' match state for i in range(self.matchable_offsets): for ln in range(self.lane_count): self.match_data[frame_id][i][ln] = None self.presence[frame_id][i][ln] = False self.port_store[frame_id][i][ln] = None self._on_event(FrameAllocated( time=self.env.now, component=self._component, act_id=token.act_id, frame_id=frame_id, lane=0, )) else: logger.warning(f"PE {self.pe_id}: no free frames available") elif token.op == FrameOp.FREE: if token.act_id in self.tag_store: self._smart_free(token.act_id) else: logger.warning(f"PE {self.pe_id}: FREE for unknown act_id {token.act_id}") elif token.op == FrameOp.ALLOC_SHARED: # Shared allocation: find parent's frame, assign next free lane # Guard against self-referential act_id (would leak old lane) if token.act_id in self.tag_store: self._on_event(TokenRejected( time=self.env.now, component=self._component, token=token, reason=f"act_id {token.act_id} already in tag store", )) return parent_act_id = token.payload if parent_act_id not in self.tag_store: self._on_event(TokenRejected( time=self.env.now, component=self._component, token=token, reason=f"parent act_id {parent_act_id} not in tag store", )) return parent_frame_id, _ = self.tag_store[parent_act_id] free_lanes = self.lane_free.get(parent_frame_id, set()) if not free_lanes: self._on_event(TokenRejected( time=self.env.now, component=self._component, token=token, reason="no free lanes", )) return lane = min(free_lanes) # Deterministic: pick lowest free lane free_lanes.remove(lane) self.tag_store[token.act_id] = (parent_frame_id, lane) # Clear only this lane's match state for i in range(self.matchable_offsets): self.match_data[parent_frame_id][i][lane] = None self.presence[parent_frame_id][i][lane] = False self.port_store[parent_frame_id][i][lane] = None self._on_event(FrameAllocated( time=self.env.now, component=self._component, act_id=token.act_id, frame_id=parent_frame_id, lane=lane, )) elif token.op == FrameOp.FREE_LANE: # Free lane with smart frame deallocation. # If this is the last lane using the frame, the frame is returned to free_frames. # Otherwise, just the lane is returned to the pool. if token.act_id in self.tag_store: self._smart_free(token.act_id) else: logger.warning(f"PE {self.pe_id}: FREE_LANE for unknown act_id {token.act_id}") def _handle_local_write(self, token: PELocalWriteToken) -> None: """Handle IRAM write and frame write.""" if token.region == 0: # IRAM self.iram[token.slot] = unpack_instruction(token.data) self._on_event(IRAMWritten( time=self.env.now, component=self._component, offset=token.slot, count=1, )) elif token.region == 1: # Frame if token.act_id in self.tag_store: frame_id, _lane = self.tag_store[token.act_id] if token.is_dest: # Decode flit 1 to FrameDest dest = unpack_flit1(token.data) self.frames[frame_id][token.slot] = dest else: # Store as int self.frames[frame_id][token.slot] = token.data self._on_event(FrameSlotWritten( time=self.env.now, component=self._component, frame_id=frame_id, slot=token.slot, value=token.data if not token.is_dest else None, )) else: # MINOR FIX: Emit TokenRejected for invalid act_id, consistent with other paths logger.warning(f"PE {self.pe_id}: PELocalWriteToken with invalid act_id {token.act_id}") self._on_event(TokenRejected( time=self.env.now, component=self._component, token=token, reason=f"act_id {token.act_id} not in tag store", )) def _match_frame( self, token: DyadToken, inst: Instruction, frame_id: int, lane: int, ) -> Optional[tuple[int, int]]: """Frame-based dyadic matching with lane support. Derives match slot from low bits of token.offset: match_slot = token.offset % matchable_offsets Match data, presence, and port are per-lane. Frame constants/destinations remain shared. """ match_slot = token.offset % self.matchable_offsets if self.presence[frame_id][match_slot][lane]: # Partner already waiting — pair them partner_data = self.match_data[frame_id][match_slot][lane] partner_port = self.port_store[frame_id][match_slot][lane] self.presence[frame_id][match_slot][lane] = False self.match_data[frame_id][match_slot][lane] = None # Use port metadata to determine left/right ordering if partner_port == Port.L: left, right = partner_data, token.data else: left, right = token.data, partner_data self._on_event(Matched( time=self.env.now, component=self._component, left=left, right=right, act_id=token.act_id, offset=token.offset, frame_id=frame_id, )) return left, right else: # Store and wait for partner self.match_data[frame_id][match_slot][lane] = token.data self.port_store[frame_id][match_slot][lane] = token.port self.presence[frame_id][match_slot][lane] = True return None def _do_emit_new( self, inst: Instruction, result: int, bool_out: bool, act_id: int, frame_id: int, left: int = 0, ) -> None: """Mode-driven output routing. Reads OutputStyle from instruction and delegates to appropriate handler. Suppresses output for GATE when bool_out=False. """ if isinstance(inst.opcode, RoutingOp) and inst.opcode == RoutingOp.GATE and not bool_out: return # GATE suppressed match inst.output: case OutputStyle.INHERIT: self._emit_inherit(inst, result, bool_out, frame_id) case OutputStyle.CHANGE_TAG: self._emit_change_tag(inst, result, left) case OutputStyle.SINK: self._emit_sink(inst, result, frame_id) def _emit_inherit( self, inst: Instruction, result: int, bool_out: bool, frame_id: int, ) -> None: """INHERIT output: read FrameDest from frame and route token. Frame layout per mode table: - Mode 0: [dest] - Mode 1: [const, dest] - Mode 2: [dest1, dest2] - Mode 3: [const, dest1, dest2] CRITICAL FIX: Check for switch ops BEFORE emitting dest_l to avoid spawning delivery processes that cannot be cancelled. """ dest_base = inst.fref + (1 if inst.has_const else 0) # CRITICAL FIX: Check for switch ops first is_switch = (isinstance(inst.opcode, RoutingOp) and inst.opcode in ( RoutingOp.SWEQ, RoutingOp.SWGT, RoutingOp.SWGE, RoutingOp.SWOF, )) # Handle switch ops specially: emit both outputs at once based on bool_out if is_switch and inst.dest_count >= 2: dest_l = self.frames[frame_id][dest_base] dest_r = self.frames[frame_id][dest_base + 1] if isinstance(dest_l, FrameDest) and isinstance(dest_r, FrameDest): if bool_out: taken, not_taken = dest_l, dest_r else: taken, not_taken = dest_r, dest_l data_tok = self._make_token_from_dest(taken, result) trig_tok = self._make_token_from_dest(not_taken, 0) self.output_log.append(data_tok) self.output_log.append(trig_tok) self._on_event(Emitted( time=self.env.now, component=self._component, token=data_tok, )) self._on_event(Emitted( time=self.env.now, component=self._component, token=trig_tok, )) self.env.process(self._deliver(self.route_table[taken.target_pe], data_tok)) self.env.process(self._deliver(self.route_table[not_taken.target_pe], trig_tok)) return # Non-switch path: emit normally if inst.dest_count >= 1: dest_l = self.frames[frame_id][dest_base] if isinstance(dest_l, FrameDest): out_token = self._make_token_from_dest(dest_l, result) self.output_log.append(out_token) self._on_event(Emitted( time=self.env.now, component=self._component, token=out_token, )) self.env.process(self._deliver(self.route_table[dest_l.target_pe], out_token)) else: logger.warning("PE %d: frame[%d][%d] is not FrameDest: %r", self.pe_id, frame_id, dest_base, dest_l) if inst.dest_count >= 2: dest_r = self.frames[frame_id][dest_base + 1] if isinstance(dest_r, FrameDest): out_r = self._make_token_from_dest(dest_r, result) self.output_log.append(out_r) self._on_event(Emitted( time=self.env.now, component=self._component, token=out_r, )) self.env.process(self._deliver( self.route_table[dest_r.target_pe], out_r, )) else: logger.warning("PE %d: frame[%d][%d] is not FrameDest: %r", self.pe_id, frame_id, dest_base + 1, dest_r) def _emit_change_tag( self, inst: Instruction, result: int, left: int, ) -> None: """CHANGE_TAG output: unpack left operand (flit 1) to get destination.""" dest = unpack_flit1(left) out_token = self._make_token_from_dest(dest, result) self.output_log.append(out_token) self._on_event(Emitted( time=self.env.now, component=self._component, token=out_token, )) self.env.process(self._deliver(self.route_table[dest.target_pe], out_token)) def _emit_sink(self, inst: Instruction, result: int, frame_id: int) -> None: """SINK output: write result to frame slot, emit no token.""" self.frames[frame_id][inst.fref] = result self._on_event(FrameSlotWritten( time=self.env.now, component=self._component, frame_id=frame_id, slot=inst.fref, value=result, )) def _build_and_emit_sm_new( self, inst: Instruction, left: int, right: Optional[int], act_id: int, frame_id: int, ) -> None: """Build and emit SM token. Return route is a FrameDest stored at inst.fref + (1 if has_const else 0). SM target comes from frame[fref] (if has_const) or from left operand. """ ret_slot = inst.fref + (1 if inst.has_const else 0) ret_dest = self.frames[frame_id][ret_slot] if inst.dest_count > 0 else None # Build return CMToken from FrameDest if return route exists ret_token = None if isinstance(ret_dest, FrameDest): ret_token = self._make_token_from_dest(ret_dest, 0) # Determine SM target source if inst.has_const: target_packed = self.frames[frame_id][inst.fref] else: target_packed = left sm_token = SMToken( target=(target_packed >> 8) & 0xFF, addr=target_packed & 0xFF, op=inst.opcode, flags=right if right is not None else None, data=right if inst.has_const else left, ret=ret_token, ) self.output_log.append(sm_token) self._on_event(Emitted( time=self.env.now, component=self._component, token=sm_token, )) self.env.process(self._deliver(self.sm_routes[sm_token.target], sm_token)) def _make_token_from_dest(self, dest: FrameDest, data: int) -> CMToken: """Construct CMToken from FrameDest and data.""" match dest.token_kind: case TokenKind.DYADIC: return DyadToken( target=dest.target_pe, offset=dest.offset, act_id=dest.act_id, data=data, port=dest.port, ) case TokenKind.MONADIC: return MonadToken( target=dest.target_pe, offset=dest.offset, act_id=dest.act_id, data=data, inline=False, ) case TokenKind.INLINE: return MonadToken( target=dest.target_pe, offset=dest.offset, act_id=dest.act_id, data=data, inline=True, ) case _: raise ValueError(f"unknown token_kind: {dest.token_kind}") def _deliver(self, store: simpy.Store, token: Token) -> None: """Spawn delivery process: 1 cycle delay, then put token. Accepts any Token type (CMToken, SMToken, FrameControlToken) since all are delivered to stores. """ yield self.env.timeout(1) yield store.put(token)