OR-1 dataflow CPU sketch
at main 1530 lines 57 kB view raw
1"""Lower pass: Convert Lark CST to IR graph. 2 3This module implements a Lark Transformer that converts a parse tree from the 4dfasm grammar into an IRGraph. The transformer handles: 5- Instruction definitions and node creation 6- Plain, strong, and weak edge routing 7- Function and location regions 8- Data definitions 9- System configuration pragmas 10- Name qualification (scoping) 11- Error collection for reserved names and duplicates 12""" 13 14from typing import Any, Optional, Union, Tuple, List, Dict 15from dataclasses import replace 16import re 17from lark import Transformer, v_args, Tree 18from lark.lexer import Token as LarkToken 19 20from asm.ir import ( 21 IRGraph, IRNode, IREdge, IRRegion, RegionKind, IRDataDef, SystemConfig, 22 SourceLoc, NameRef, ResolvedDest, MacroParam, ParamRef, MacroDef, IRMacroCall, 23 CallSiteResult, IRRepetitionBlock, 24 PlacementRef, PortRef, ActSlotRef, ActSlotRange, 25) 26from asm.errors import AssemblyError, ErrorCategory 27from asm.opcodes import MNEMONIC_TO_OP 28from cm_inst import ALUOp, MemOp, Port, RoutingOp 29 30# Reserved names that cannot be used as node definitions 31_RESERVED_NAMES = frozenset({"@system", "@io", "@debug"}) 32 33# Pattern for detecting ${param} token pasting in identifiers 34_PASTE_PATTERN = re.compile(r'^(.*?)\$\{([a-zA-Z_][a-zA-Z0-9_]*)\}(.*)$') 35 36 37def _filter_args(args: tuple) -> list: 38 """Filter out LarkTokens from argument list.""" 39 return [arg for arg in args if not isinstance(arg, LarkToken)] 40 41 42def _normalize_port(value: Union[int, Port, PortRef]) -> Union[Port, PortRef]: 43 """Normalize a port value to Port enum, preserving PortRef for macro templates. 44 45 Args: 46 value: An int (0/1), Port enum, or PortRef (macro parameter) 47 48 Returns: 49 Port enum value, or PortRef passed through for later expansion 50 """ 51 if isinstance(value, PortRef): 52 return value 53 if isinstance(value, Port): 54 return value 55 if isinstance(value, int): 56 return Port.L if value == 0 else (Port.R if value == 1 else Port.L) 57 return Port.L 58 59 60# Structured statement result types 61class StatementResult: 62 """Base class for statement processing results.""" 63 pass 64 65 66class NodeResult(StatementResult): 67 """Result from inst_def: one or more IRNodes.""" 68 def __init__(self, nodes: Dict[str, IRNode]): 69 self.nodes = nodes 70 71 72class EdgeResult(StatementResult): 73 """Result from plain_edge or anonymous edges: IREdges.""" 74 def __init__(self, edges: List[IREdge]): 75 self.edges = edges 76 77 78class FunctionResult(StatementResult): 79 """Result from func_def: an IRRegion.""" 80 def __init__(self, region: IRRegion): 81 self.region = region 82 83 84class LocationResult(StatementResult): 85 """Result from location_dir: an IRRegion.""" 86 def __init__(self, region: IRRegion): 87 self.region = region 88 89 90class DataDefResult(StatementResult): 91 """Result from data_def: IRDataDefs.""" 92 def __init__(self, data_defs: List[IRDataDef]): 93 self.data_defs = data_defs 94 95 96class MacroDefResult(StatementResult): 97 """Result from macro_def: a MacroDef.""" 98 def __init__(self, macro_def: MacroDef): 99 self.macro_def = macro_def 100 101 102class MacroCallResult(StatementResult): 103 """Result from macro_call_stmt: an IRMacroCall.""" 104 def __init__(self, macro_call: IRMacroCall): 105 self.macro_call = macro_call 106 107 108class CallSiteResultStatement(StatementResult): 109 """Result from call_stmt: a CallSiteResult.""" 110 def __init__(self, call_site_result: CallSiteResult): 111 self.call_site_result = call_site_result 112 113 114class RepetitionBlockResult(StatementResult): 115 """Result from repetition_block: an IRRepetitionBlock.""" 116 def __init__(self, repetition_block: IRRepetitionBlock): 117 self.repetition_block = repetition_block 118 119 120class CompositeResult(StatementResult): 121 """Result combining nodes and edges (for strong/weak edges).""" 122 def __init__(self, nodes: Dict[str, IRNode], edges: List[IREdge]): 123 self.nodes = nodes 124 self.edges = edges 125 126 127class LowerTransformer(Transformer): 128 """Transformer that converts a CST into an IRGraph. 129 130 The transformer collects statement results and then in the `start` rule 131 organizes them into the final IRGraph structure. 132 """ 133 134 def __init__(self): 135 super().__init__() 136 self._anon_counter: int = 0 137 self._errors: list[AssemblyError] = [] 138 self._defined_names: dict[str, SourceLoc] = {} 139 self._system: Optional[SystemConfig] = None 140 141 def _qualify_name(self, name, func_scope: Optional[str]): 142 """Apply function scope qualification to a name. 143 144 ParamRef values pass through unchanged — they are resolved during 145 macro expansion, not during lowering. 146 """ 147 if isinstance(name, ParamRef): 148 return name 149 if isinstance(name, str) and name.startswith("&") and func_scope: 150 return f"{func_scope}.{name}" 151 return name 152 153 def _extract_loc(self, meta: Any) -> SourceLoc: 154 """Extract SourceLoc from Lark's meta object.""" 155 return SourceLoc( 156 line=meta.line, 157 column=meta.column, 158 end_line=meta.end_line if hasattr(meta, "end_line") else None, 159 end_column=meta.end_column if hasattr(meta, "end_column") else None, 160 ) 161 162 def _gen_anon_name(self, func_scope: Optional[str]) -> str: 163 """Generate an anonymous node name, qualified by current scope.""" 164 name = f"&__anon_{self._anon_counter}" 165 self._anon_counter += 1 166 return self._qualify_name(name, func_scope) 167 168 def _check_reserved_name(self, name: str, loc: SourceLoc) -> bool: 169 """Check if name is reserved. Return True if reserved (and add error).""" 170 if name in _RESERVED_NAMES: 171 self._errors.append(AssemblyError( 172 loc=loc, 173 category=ErrorCategory.NAME, 174 message=f"Reserved name '{name}' cannot be used as a node definition" 175 )) 176 return True 177 return False 178 179 def _check_duplicate_name(self, name: str, loc: SourceLoc) -> bool: 180 """Check for duplicate definition. Return True if duplicate (and add error).""" 181 if name in self._defined_names: 182 prev_loc = self._defined_names[name] 183 self._errors.append(AssemblyError( 184 loc=loc, 185 category=ErrorCategory.SCOPE, 186 message=f"Duplicate label '{name}'", 187 suggestions=[f"First defined at line {prev_loc.line}"] 188 )) 189 return True 190 self._defined_names[name] = loc 191 return False 192 193 def _process_statements( 194 self, 195 statements: list, 196 func_scope: Optional[str] = None 197 ) -> Tuple[Dict[str, IRNode], List[IREdge], List[IRRegion], List[IRDataDef], List]: 198 """Process a list of statement results and collect them into containers.""" 199 nodes = {} 200 edges = [] 201 regions = [] 202 data_defs = [] 203 call_sites = [] 204 205 # Reset defined names for this scope 206 prev_defined_names = self._defined_names 207 self._defined_names = {} 208 209 for stmt in statements: 210 if isinstance(stmt, NodeResult): 211 # Qualify and add nodes 212 for node_name, node in stmt.nodes.items(): 213 qualified_name = self._qualify_name(node_name, func_scope) 214 if not self._check_duplicate_name(qualified_name, node.loc): 215 # Update node with qualified name 216 qualified_node = replace(node, name=qualified_name) 217 nodes[qualified_name] = qualified_node 218 219 elif isinstance(stmt, EdgeResult): 220 # Qualify and add edges 221 for edge in stmt.edges: 222 qualified_edge = replace( 223 edge, 224 source=self._qualify_name(edge.source, func_scope), 225 dest=self._qualify_name(edge.dest, func_scope), 226 ) 227 edges.append(qualified_edge) 228 229 elif isinstance(stmt, CompositeResult): 230 # Composite: both nodes and edges (strong/weak edges) 231 for node_name, node in stmt.nodes.items(): 232 qualified_name = self._qualify_name(node_name, func_scope) 233 if not self._check_duplicate_name(qualified_name, node.loc): 234 qualified_node = replace(node, name=qualified_name) 235 nodes[qualified_name] = qualified_node 236 for edge in stmt.edges: 237 qualified_edge = replace( 238 edge, 239 source=self._qualify_name(edge.source, func_scope), 240 dest=self._qualify_name(edge.dest, func_scope), 241 ) 242 edges.append(qualified_edge) 243 244 elif isinstance(stmt, FunctionResult): 245 regions.append(stmt.region) 246 247 elif isinstance(stmt, LocationResult): 248 regions.append(stmt.region) 249 250 elif isinstance(stmt, DataDefResult): 251 data_defs.extend(stmt.data_defs) 252 253 elif isinstance(stmt, MacroDefResult): 254 # Macro definitions are stored separately, not as regions 255 pass # Collected at the start() level 256 257 elif isinstance(stmt, MacroCallResult): 258 # Macro calls are stored separately 259 pass # Collected at the start() level 260 261 elif isinstance(stmt, CallSiteResultStatement): 262 # Call sites are stored separately 263 call_sites.append(stmt.call_site_result) 264 265 # Restore defined names 266 self._defined_names = prev_defined_names 267 268 return nodes, edges, regions, data_defs, call_sites 269 270 def start(self, items: list) -> IRGraph: 271 """Process the entire program and return an IRGraph. 272 273 Post-processing: Groups statements following location_dir into that region's body. 274 """ 275 # First pass: collect all items 276 nodes, edges, regions, data_defs, call_sites = self._process_statements(items, None) 277 278 # Second pass: post-process location regions to collect subsequent statements 279 # Find LocationResult objects and collect subsequent statements into their body 280 location_results = [r for r in regions if r.kind == RegionKind.LOCATION] 281 282 # Track which nodes, data_defs, and edges are moved into location regions 283 moved_node_names = set() 284 moved_data_names = set() 285 moved_edge_sources = set() # Track edges by (source, dest) tuple 286 287 if location_results: 288 # Build a mapping of location regions to their collected body 289 for loc_region in location_results: 290 # Find the position of this region in the items list 291 # by matching the tag 292 body_nodes = {} 293 body_edges = [] 294 body_data_defs = [] 295 296 # Collect subsequent non-region statements 297 collecting = False 298 for item in items: 299 if isinstance(item, LocationResult) and item.region.tag == loc_region.tag: 300 collecting = True 301 continue 302 303 if collecting: 304 # Stop at next region boundary 305 if isinstance(item, (FunctionResult, LocationResult)): 306 break 307 308 # Collect into location body 309 if isinstance(item, NodeResult): 310 body_nodes.update(item.nodes) 311 moved_node_names.update(item.nodes.keys()) 312 elif isinstance(item, EdgeResult): 313 body_edges.extend(item.edges) 314 moved_edge_sources.update((e.source, e.dest) for e in item.edges) 315 elif isinstance(item, DataDefResult): 316 body_data_defs.extend(item.data_defs) 317 moved_data_names.update(d.name for d in item.data_defs) 318 elif isinstance(item, CompositeResult): 319 body_nodes.update(item.nodes) 320 moved_node_names.update(item.nodes.keys()) 321 body_edges.extend(item.edges) 322 moved_edge_sources.update((e.source, e.dest) for e in item.edges) 323 324 # Update the location region with collected body 325 if body_nodes or body_edges or body_data_defs: 326 new_body = IRGraph( 327 nodes=body_nodes, 328 edges=body_edges, 329 regions=[], 330 data_defs=body_data_defs, 331 ) 332 # Find and replace this region in the regions list 333 regions = [ 334 IRRegion( 335 tag=r.tag, 336 kind=r.kind, 337 body=new_body if r.tag == loc_region.tag else r.body, 338 loc=r.loc, 339 ) 340 for r in regions 341 ] 342 343 # Remove items that were moved into location regions from top-level containers 344 nodes = {k: v for k, v in nodes.items() if k not in moved_node_names} 345 data_defs = [d for d in data_defs if d.name not in moved_data_names] 346 edges = [e for e in edges if (e.source, e.dest) not in moved_edge_sources] 347 348 # Collect macro definitions and calls from items 349 macro_defs = [] 350 macro_calls = [] 351 for item in items: 352 if isinstance(item, MacroDefResult): 353 macro_defs.append(item.macro_def) 354 elif isinstance(item, MacroCallResult): 355 macro_calls.append(item.macro_call) 356 357 return IRGraph( 358 nodes=nodes, 359 edges=edges, 360 regions=regions, 361 data_defs=data_defs, 362 system=self._system, 363 errors=self._errors, 364 macro_defs=macro_defs, 365 macro_calls=macro_calls, 366 raw_call_sites=tuple(call_sites), 367 ) 368 369 @v_args(inline=True) 370 def inline_const(self, value) -> Union[int, ParamRef]: 371 """Parse inline constant (space-separated, e.g., 'add 7' or '${param}').""" 372 if isinstance(value, ParamRef): 373 return value 374 return int(str(value), 0) 375 376 @v_args(inline=True, meta=True) 377 def inst_def(self, meta, *args) -> StatementResult: 378 """Process instruction definition.""" 379 loc = self._extract_loc(meta) 380 381 # Filter out tokens (FLOW_IN, etc.) - keep only transformed results 382 args_list = _filter_args(args) 383 384 # First arg is qualified_ref_dict, second is opcode, rest are arguments 385 qualified_ref_dict = args_list[0] 386 opcode = args_list[1] 387 remaining_args = args_list[2:] if len(args_list) > 2 else [] 388 389 # Extract name (will be qualified later in _process_statements) 390 name = qualified_ref_dict["name"] 391 392 # Check reserved names 393 if self._check_reserved_name(name, loc): 394 return NodeResult({}) 395 396 # If opcode is None (invalid), skip node creation (error already added) 397 if opcode is None: 398 return NodeResult({}) 399 400 # Extract placement (PE qualifier) 401 pe = None 402 if "placement" in qualified_ref_dict and qualified_ref_dict["placement"]: 403 placement_val = qualified_ref_dict["placement"] 404 if isinstance(placement_val, PlacementRef): 405 pe = placement_val 406 elif isinstance(placement_val, str) and placement_val.startswith("pe"): 407 try: 408 pe = int(placement_val[2:]) 409 except ValueError: 410 pass 411 412 # Extract activation slot qualifier 413 act_slot = qualified_ref_dict.get("act_slot") 414 415 # Extract const and named args from arguments 416 # Check if first remaining arg is an inline_const (int directly after opcode) 417 const = None 418 args_dict = {} 419 positional_count = 0 420 421 for arg in remaining_args: 422 if isinstance(arg, tuple): # named_arg 423 arg_name, arg_value = arg 424 args_dict[arg_name] = arg_value 425 else: 426 # positional argument 427 if positional_count == 0: 428 if isinstance(arg, dict) and isinstance(arg.get("name"), ParamRef): 429 const = arg["name"] 430 elif not isinstance(arg, dict): 431 const = arg 432 positional_count += 1 433 434 # Create IRNode 435 node = IRNode( 436 name=name, 437 opcode=opcode, 438 dest_l=None, 439 dest_r=None, 440 const=const, 441 pe=pe, 442 act_slot=act_slot, 443 loc=loc, 444 args=args_dict if args_dict else None, 445 ) 446 return NodeResult({name: node}) 447 448 @v_args(inline=True, meta=True) 449 def plain_edge(self, meta, *args) -> StatementResult: 450 """Process plain edge (wiring between named nodes). 451 452 The source's port (if specified) becomes source_port (output slot). 453 The dest's port (if specified) becomes port (input port), defaulting to L. 454 """ 455 loc = self._extract_loc(meta) 456 457 args_list = _filter_args(args) 458 source_dict = args_list[0] 459 dest_list = args_list[1] 460 461 source_name = source_dict["name"] 462 # Source port is from the source's port specification 463 source_port = source_dict.get("port") if "port" in source_dict else None 464 # Normalize source_port to Port if it's a raw int (convert 0→L, 1→R) 465 if source_port is not None: 466 source_port = _normalize_port(source_port) 467 468 edges = [] 469 for dest_dict in dest_list: 470 dest_name = dest_dict["name"] 471 # Dest port is from the dest's port specification, defaults to L 472 raw_port = dest_dict.get("port") 473 port_explicit = raw_port is not None 474 if raw_port is None: 475 dest_port = Port.L 476 else: 477 dest_port = _normalize_port(raw_port) 478 479 edge = IREdge( 480 source=source_name, 481 dest=dest_name, 482 port=dest_port, 483 source_port=source_port, 484 port_explicit=port_explicit, 485 loc=loc, 486 ) 487 edges.append(edge) 488 489 return EdgeResult(edges) 490 491 def _wire_anonymous_node( 492 self, opcode: Union[ALUOp, MemOp], inputs: list, outputs: list, loc: SourceLoc, 493 const_value: Optional[int] = None, is_seed: bool = False, 494 ) -> StatementResult: 495 """Wire inputs and outputs for an anonymous edge node. 496 497 Generates the IRNode for an anonymous edge and all associated edges 498 (both input and output wiring). This logic is shared between strong_edge 499 and weak_edge, which differ only in how they parse their arguments. 500 501 Args: 502 opcode: The instruction opcode 503 inputs: List of input reference dicts with "name" and optional "port" 504 outputs: List of output reference dicts with "name" and optional "port" 505 loc: Source location for error reporting 506 const_value: Optional constant value for the node 507 is_seed: If True, mark the node as a seed (no IRAM slot, emits seed token) 508 509 Returns: 510 CompositeResult with anonymous node and all input/output edges 511 """ 512 # Generate anonymous node (not qualified yet) 513 anon_name = f"&__anon_{self._anon_counter}" 514 self._anon_counter += 1 515 516 # Create anonymous IRNode 517 anon_node = IRNode( 518 name=anon_name, 519 opcode=opcode, 520 const=const_value, 521 loc=loc, 522 seed=is_seed, 523 ) 524 525 # Wire inputs: first input → Port.L, second → Port.R 526 edges = [] 527 for idx, input_arg in enumerate(inputs): 528 if isinstance(input_arg, dict) and "name" in input_arg: 529 # It's a qualified_ref 530 input_name = input_arg["name"] 531 input_port = Port.L if idx == 0 else Port.R 532 edge = IREdge( 533 source=input_name, 534 dest=anon_name, 535 port=input_port, 536 source_port=None, 537 loc=loc, 538 ) 539 edges.append(edge) 540 541 # Wire outputs 542 for output_dict in outputs: 543 output_name = output_dict["name"] 544 raw_port = output_dict.get("port") 545 out_port_explicit = raw_port is not None 546 if raw_port is None: 547 output_port = Port.L 548 else: 549 output_port = _normalize_port(raw_port) 550 551 edge = IREdge( 552 source=anon_name, 553 dest=output_name, 554 port=output_port, 555 source_port=None, 556 port_explicit=out_port_explicit, 557 loc=loc, 558 ) 559 edges.append(edge) 560 561 # Return both the node and edges 562 return CompositeResult({anon_name: anon_node}, edges) 563 564 @v_args(inline=True, meta=True) 565 def strong_edge(self, meta, *args) -> StatementResult: 566 """Process strong inline edge (anonymous node with inputs and outputs). 567 568 Syntax: opcode input [, input ...] |> output [, output ...] 569 570 Special case: `const N |> &dest` creates a seed node — a CONST node 571 that emits a seed token at startup without occupying an IRAM slot. 572 """ 573 loc = self._extract_loc(meta) 574 575 args_list = _filter_args(args) 576 opcode = args_list[0] 577 remaining_args = args_list[1:] 578 579 # If opcode is None (invalid), skip edge creation (error already added) 580 if opcode is None: 581 return CompositeResult({}, []) 582 583 # Split arguments into inputs and outputs 584 inputs = [] 585 outputs = [] 586 processing_outputs = False 587 const_value = None 588 589 for arg in remaining_args: 590 if isinstance(arg, list): # This is ref_list 591 processing_outputs = True 592 outputs = arg 593 elif not processing_outputs: 594 if isinstance(arg, int): 595 const_value = arg 596 else: 597 inputs.append(arg) 598 599 # Detect seed pattern: `const N |> &dest` 600 is_seed = ( 601 isinstance(opcode, RoutingOp) and opcode == RoutingOp.CONST 602 and const_value is not None 603 and len(inputs) == 0 604 ) 605 606 # Wire the anonymous node and its edges 607 return self._wire_anonymous_node(opcode, inputs, outputs, loc, 608 const_value=const_value, is_seed=is_seed) 609 610 @v_args(inline=True, meta=True) 611 def weak_edge(self, meta, *args) -> StatementResult: 612 """Process weak inline edge (outputs then opcode then inputs). 613 614 Syntax: outputs... opcode inputs... 615 Semantically identical to strong_edge but syntactically reversed. 616 """ 617 loc = self._extract_loc(meta) 618 619 args_list = _filter_args(args) 620 output_list = args_list[0] 621 opcode = args_list[1] 622 remaining_args = args_list[2:] if len(args_list) > 2 else [] 623 624 # If opcode is None (invalid), skip edge creation (error already added) 625 if opcode is None: 626 return CompositeResult({}, []) 627 628 inputs = list(remaining_args) 629 outputs = output_list 630 631 # Wire the anonymous node and its edges 632 return self._wire_anonymous_node(opcode, inputs, outputs, loc) 633 634 def func_def(self, args: list) -> StatementResult: 635 """Process function definition (region with nested scope).""" 636 # Without v_args decorator, args come as a list with LarkToken terminals mixed in 637 # Filter out tokens and extract the actual data 638 args_list = _filter_args(args) 639 640 # args[0] is func_ref dict, rest are statement results 641 func_ref_dict = args_list[0] if args_list else {} 642 func_name = func_ref_dict.get("name", "$unknown") if isinstance(func_ref_dict, dict) else "$unknown" 643 statement_results = args_list[1:] if len(args_list) > 1 else [] 644 645 # Try to extract location from the raw args (may have meta on Tree nodes) 646 loc = SourceLoc(0, 0) 647 for arg in args: 648 if hasattr(arg, 'meta'): 649 try: 650 loc = self._extract_loc(arg.meta) 651 break 652 except (AttributeError, TypeError): 653 pass 654 655 # Process the statements with the function scope 656 func_nodes, func_edges, func_regions, func_data_defs, func_call_sites = self._process_statements( 657 statement_results, 658 func_scope=func_name 659 ) 660 661 # Collect macro_calls from function body statements 662 func_macro_calls = [] 663 for stmt in statement_results: 664 if isinstance(stmt, MacroCallResult): 665 func_macro_calls.append(stmt.macro_call) 666 667 # Create IRRegion for the function 668 body_graph = IRGraph( 669 nodes=func_nodes, 670 edges=func_edges, 671 regions=func_regions, 672 data_defs=func_data_defs, 673 macro_calls=func_macro_calls, 674 raw_call_sites=tuple(func_call_sites), 675 ) 676 677 region = IRRegion( 678 tag=func_name, 679 kind=RegionKind.FUNCTION, 680 body=body_graph, 681 loc=loc, 682 ) 683 684 return FunctionResult(region) 685 686 def _apply_paste_patterns(self, body: IRGraph) -> IRGraph: 687 """Post-process macro body to replace ${param} patterns with ParamRef. 688 689 Scans all node names and edge endpoints in the body for ${param} patterns 690 and constructs ParamRef instances with appropriate prefix/suffix fields. 691 This post-processing approach avoids the bottom-up traversal issue where 692 Lark processes node_ref/label_ref terminals before macro_def is invoked. 693 694 Args: 695 body: The constructed IRGraph from macro body processing 696 697 Returns: 698 New IRGraph with all ${param} patterns replaced by ParamRef instances 699 """ 700 # Process all nodes to replace ${param} patterns in their names 701 new_nodes = {} 702 for node_name, node in body.nodes.items(): 703 match = _PASTE_PATTERN.match(node.name) 704 if match: 705 # Node name contains ${param} pattern 706 new_name = ParamRef( 707 param=match.group(2), 708 prefix=match.group(1), 709 suffix=match.group(3), 710 ) 711 new_nodes[node_name] = replace(node, name=new_name) 712 else: 713 new_nodes[node_name] = node 714 715 # Process all edges to replace ${param} patterns in source/dest 716 new_edges = [] 717 for edge in body.edges: 718 new_source = edge.source 719 new_dest = edge.dest 720 721 # Check source for pattern 722 if isinstance(edge.source, str): 723 match = _PASTE_PATTERN.match(edge.source) 724 if match: 725 new_source = ParamRef( 726 param=match.group(2), 727 prefix=match.group(1), 728 suffix=match.group(3), 729 ) 730 731 # Check dest for pattern 732 if isinstance(edge.dest, str): 733 match = _PASTE_PATTERN.match(edge.dest) 734 if match: 735 new_dest = ParamRef( 736 param=match.group(2), 737 prefix=match.group(1), 738 suffix=match.group(3), 739 ) 740 741 # Add edge with potential replacements 742 if new_source != edge.source or new_dest != edge.dest: 743 new_edges.append(replace(edge, source=new_source, dest=new_dest)) 744 else: 745 new_edges.append(edge) 746 747 # Return new IRGraph with updated nodes and edges 748 return replace(body, nodes=new_nodes, edges=new_edges) 749 750 @v_args(meta=True) 751 def macro_def(self, meta, args: list) -> StatementResult: 752 """Process macro definition (template with parameters). 753 754 Uses @v_args(meta=True) to receive source location metadata. 755 """ 756 # Extract macro name from first IDENT terminal (before filtering) 757 macro_name = "unknown" 758 for arg in args: 759 if isinstance(arg, LarkToken): 760 macro_name = str(arg) 761 break 762 763 # Extract location from meta 764 loc = self._extract_loc(meta) 765 766 # Check for reserved name (starts with "ret") 767 if macro_name.startswith("ret"): 768 self._errors.append(AssemblyError( 769 loc=loc, 770 category=ErrorCategory.NAME, 771 message=f"Macro name '#{macro_name}' uses reserved prefix 'ret'", 772 )) 773 return MacroDefResult(MacroDef(name=macro_name, params=(), body=IRGraph(), loc=loc)) 774 775 # Separate params from body statements 776 params: list[MacroParam] = [] 777 statement_results: list = [] 778 variadic_param_name: Optional[str] = None 779 780 for item in args: 781 if isinstance(item, list) and all(isinstance(p, tuple) and len(p) == 2 for p in item): 782 # This is the macro_params result (list of (name, variadic) tuples) 783 seen_names: set[str] = set() 784 for param_name, is_variadic in item: 785 if param_name in seen_names: 786 self._errors.append(AssemblyError( 787 loc=loc, 788 category=ErrorCategory.NAME, 789 message=f"Duplicate parameter name '{param_name}' in macro '#{macro_name}'", 790 )) 791 else: 792 seen_names.add(param_name) 793 if is_variadic: 794 # Validate: variadic param must be last 795 if variadic_param_name is not None: 796 self._errors.append(AssemblyError( 797 loc=loc, 798 category=ErrorCategory.NAME, 799 message=f"Multiple variadic parameters in macro '#{macro_name}' (only one allowed)", 800 )) 801 variadic_param_name = param_name 802 elif variadic_param_name is not None: 803 # Non-variadic param after variadic param 804 self._errors.append(AssemblyError( 805 loc=loc, 806 category=ErrorCategory.NAME, 807 message=f"Variadic parameter must be last in macro '#{macro_name}'", 808 )) 809 params.append(MacroParam(name=param_name, variadic=is_variadic)) 810 elif isinstance(item, StatementResult): 811 statement_results.append(item) 812 813 # Process body statements (no function scope — macros don't create ctx scopes) 814 body_nodes, body_edges, body_regions, body_data_defs, body_call_sites = self._process_statements( 815 statement_results, 816 func_scope=None 817 ) 818 819 # Collect macro_calls and repetition_blocks from body statements 820 body_macro_calls = [] 821 repetition_blocks = [] 822 for stmt in statement_results: 823 if isinstance(stmt, MacroCallResult): 824 body_macro_calls.append(stmt.macro_call) 825 elif isinstance(stmt, RepetitionBlockResult): 826 # Update variadic_param in the repetition block if we have a variadic param 827 rep_block = stmt.repetition_block 828 if variadic_param_name and rep_block.variadic_param == "": 829 # Replace the placeholder with the actual variadic param name 830 rep_block = replace(rep_block, variadic_param=variadic_param_name) 831 repetition_blocks.append(rep_block) 832 833 body = IRGraph( 834 nodes=body_nodes, 835 edges=body_edges, 836 regions=body_regions, 837 data_defs=body_data_defs, 838 macro_calls=body_macro_calls, 839 raw_call_sites=tuple(body_call_sites), 840 ) 841 842 # Post-process to apply ${param} token pasting patterns 843 body = self._apply_paste_patterns(body) 844 845 macro = MacroDef( 846 name=macro_name, 847 params=tuple(params), 848 body=body, 849 repetition_blocks=repetition_blocks, 850 loc=loc, 851 ) 852 853 return MacroDefResult(macro) 854 855 def macro_params(self, args: list) -> list[tuple]: 856 """Process macro parameter list. 857 858 Returns list of (name, variadic) tuples. 859 860 Note: Comma tokens and other non-tuple/string types from the 861 grammar are silently skipped during iteration. 862 """ 863 result = [] 864 for arg in args: 865 if isinstance(arg, tuple): 866 # From macro_param rule (variadic_param or regular_param) 867 result.append(arg) 868 elif isinstance(arg, str): 869 # Fallback for simple string params 870 result.append((arg, False)) 871 # Other token types (commas) are silently skipped 872 return result 873 874 def variadic_param(self, args: list) -> tuple: 875 """Process a variadic macro parameter (*name). 876 877 Returns (name, True) tuple. 878 """ 879 # args will be [VARIADIC_token, IDENT_token] 880 # IDENT is always the last token per the grammar rule 881 name = str(args[-1]) 882 return (name, True) 883 884 def regular_param(self, args: list) -> tuple: 885 """Process a regular macro parameter (name). 886 887 Returns (name, False) tuple. 888 """ 889 # args will be [IDENT_token] 890 if args: 891 name = str(args[0].value if hasattr(args[0], 'value') else args[0]) 892 else: 893 name = "unknown" 894 return (name, False) 895 896 @v_args(meta=True) 897 def repetition_block(self, meta, args: list) -> StatementResult: 898 """Process repetition block: $( body ),*. 899 900 The repetition block syntax within macro bodies will be expanded 901 in the expand pass. Here we collect the body as an IRGraph. 902 903 Creates an IRRepetitionBlock with an empty string placeholder for 904 variadic_param. The placeholder will be resolved during macro_def 905 processing by matching against the macro's actual variadic parameter. 906 """ 907 loc = self._extract_loc(meta) 908 909 # Filter statement results from args 910 statement_results = [arg for arg in args if isinstance(arg, StatementResult)] 911 912 # Process body statements 913 body_nodes, body_edges, body_regions, body_data_defs, body_call_sites = self._process_statements( 914 statement_results, 915 func_scope=None 916 ) 917 918 body = IRGraph( 919 nodes=body_nodes, 920 edges=body_edges, 921 regions=body_regions, 922 data_defs=body_data_defs, 923 raw_call_sites=tuple(body_call_sites), 924 ) 925 926 # Apply token pasting patterns to the body 927 body = self._apply_paste_patterns(body) 928 929 # Create a placeholder IRRepetitionBlock 930 # The variadic_param will be resolved in the expand pass 931 # For now, use empty string as a placeholder 932 rep_block = IRRepetitionBlock( 933 body=body, 934 variadic_param="", # Placeholder, resolved in expand pass 935 loc=loc, 936 ) 937 938 return RepetitionBlockResult(rep_block) 939 940 @v_args(meta=True) 941 def macro_call_stmt(self, meta, args: list) -> StatementResult: 942 """Process standalone macro invocation.""" 943 loc = self._extract_loc(meta) 944 945 # Extract macro name from first IDENT terminal 946 macro_name = "unknown" 947 for arg in args: 948 if isinstance(arg, LarkToken): 949 macro_name = str(arg) 950 break 951 952 positional_args = [] 953 named_args: dict[str, object] = {} 954 output_dests = () 955 found_name = False 956 for item in args: 957 if isinstance(item, LarkToken): 958 if not found_name: 959 # First LarkToken is the macro name 960 found_name = True 961 continue 962 if item.type in ("OPCODE", "IDENT"): 963 # Bare opcode or identifier as macro argument — wrap as string 964 positional_args.append(str(item)) 965 continue 966 # Skip other tokens (FLOW_OUT, commas, etc.) 967 continue 968 elif isinstance(item, list) and all(isinstance(x, dict) for x in item): 969 # call_output_list result — list of output dest dicts 970 output_dests = tuple(item) 971 elif isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str): 972 # Named argument from named_arg rule (name, value) 973 named_args[item[0]] = item[1] 974 elif isinstance(item, dict) and "name" in item: 975 # Positional argument (qualified_ref or value) 976 positional_args.append(item) 977 elif item is not None: 978 # Other argument types (int literals, etc.) 979 positional_args.append(item) 980 981 macro_call = IRMacroCall( 982 name=macro_name, 983 positional_args=tuple(positional_args), 984 named_args=tuple(named_args.items()), 985 output_dests=output_dests, 986 loc=loc, 987 ) 988 989 return MacroCallResult(macro_call) 990 991 @v_args(meta=True) 992 def call_stmt(self, meta, args: list) -> StatementResult: 993 """Process function call statement. 994 995 The call_stmt grammar rule is: 996 call_stmt: func_ref argument ("," argument)* FLOW_OUT call_output_list 997 998 Args are: [func_ref_dict, arg1, arg2, ..., call_output_list] 999 """ 1000 loc = self._extract_loc(meta) 1001 1002 # Filter out LarkTokens (FLOW_OUT) 1003 args_list = _filter_args(args) 1004 1005 if not args_list: 1006 self._errors.append(AssemblyError( 1007 loc=loc, 1008 category=ErrorCategory.PARSE, 1009 message="call_stmt requires function name and arguments" 1010 )) 1011 return CallSiteResultStatement(CallSiteResult( 1012 func_name="$unknown", 1013 input_args=(), 1014 output_dests=(), 1015 loc=loc, 1016 )) 1017 1018 # First arg is func_ref dict 1019 func_ref_dict = args_list[0] 1020 func_name = func_ref_dict.get("name", "$unknown") 1021 1022 # Process remaining args: arguments come before output_dests 1023 # We need to find where call_output_list starts (it's a list of dicts/named outputs) 1024 input_args = [] 1025 output_dests = [] 1026 1027 for i, item in enumerate(args_list[1:], start=1): 1028 if isinstance(item, list): 1029 # This is call_output_list result — flatten into output_dests 1030 output_dests.extend(item) 1031 elif isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str): 1032 # named_arg: (name, value) 1033 input_args.append(item) 1034 elif isinstance(item, dict) and "name" in item: 1035 # positional_arg (qualified_ref) 1036 input_args.append((None, item)) # Store as (None, ref_dict) for positional 1037 elif isinstance(item, int): 1038 # literal value 1039 input_args.append((None, item)) 1040 else: 1041 # Fallback: treat as positional value 1042 input_args.append((None, item)) 1043 1044 call_site = CallSiteResult( 1045 func_name=func_name, 1046 input_args=tuple(input_args), 1047 output_dests=tuple(output_dests), 1048 loc=loc, 1049 ) 1050 1051 return CallSiteResultStatement(call_site) 1052 1053 def call_output_list(self, args: list) -> list: 1054 """Process call output list — returns list of output dests.""" 1055 return [a for a in args if a is not None] 1056 1057 @v_args(inline=True) 1058 def named_output(self, name_tok, ref) -> dict: 1059 """Process named output: name=@dest. 1060 Returns {"name": str, "ref": ref_dict} so the expand pass can map 1061 @ret_name return markers to the specified call-site destination. 1062 """ 1063 # name_tok could be a LarkToken 1064 if isinstance(name_tok, LarkToken): 1065 name_str = str(name_tok) 1066 else: 1067 name_str = name_tok 1068 return {"name": name_str, "ref": ref} 1069 1070 @v_args(inline=True) 1071 def positional_output(self, ref) -> dict: 1072 """Process positional output: bare @dest or &ref.""" 1073 return ref 1074 1075 def macro_ref(self, args: list) -> dict: 1076 """Process macro reference (#name).""" 1077 token = args[0] 1078 return {"name": f"#{token}"} 1079 1080 def scoped_ref(self, args: list) -> dict: 1081 """Process dot-notation scope reference ($func.&label or #macro.&label).""" 1082 args_list = _filter_args(args) 1083 scope_dict = args_list[0] # func_ref or macro_ref dict 1084 inner_dict = args_list[1] # label_ref or node_ref dict 1085 scope_name = scope_dict["name"] 1086 inner_name = inner_dict["name"] 1087 return {"name": f"{scope_name}.{inner_name}"} 1088 1089 @v_args(inline=True, meta=True) 1090 def data_def(self, meta, *args) -> StatementResult: 1091 """Process data definition.""" 1092 loc = self._extract_loc(meta) 1093 1094 args_list = _filter_args(args) 1095 qualified_ref_dict = args_list[0] 1096 value_data = args_list[1] if len(args_list) > 1 else None 1097 1098 name = qualified_ref_dict["name"] 1099 1100 # Extract SM ID from placement 1101 sm_id = None 1102 if "placement" in qualified_ref_dict and qualified_ref_dict["placement"]: 1103 placement_val = qualified_ref_dict["placement"] 1104 if isinstance(placement_val, str) and placement_val.startswith("sm"): 1105 try: 1106 sm_id = int(placement_val[2:]) 1107 except ValueError: 1108 pass 1109 1110 # Extract cell address from port 1111 # The port value from qualified_ref can be: 1112 # - Port.L/Port.R (for plain edge context) 1113 # - raw int (for data_def context, e.g., :0, :1, :2, etc.) 1114 cell_addr = None 1115 if "port" in qualified_ref_dict and qualified_ref_dict["port"] is not None: 1116 port_val = qualified_ref_dict["port"] 1117 # Extract the numeric value regardless of type 1118 if isinstance(port_val, Port): 1119 cell_addr = int(port_val) 1120 elif isinstance(port_val, int): 1121 cell_addr = port_val 1122 1123 # Handle value_data 1124 value = 0 1125 if isinstance(value_data, list): 1126 # value_list: pack values 1127 if all(isinstance(v, int) for v in value_data): 1128 # Integer values or char values 1129 if len(value_data) == 1: 1130 value = value_data[0] 1131 else: 1132 # Multiple values: only valid if all are bytes (0-255) 1133 if any(v > 255 for v in value_data): 1134 self._errors.append(AssemblyError( 1135 loc=loc, 1136 category=ErrorCategory.VALUE, 1137 message=f"Multi-value data definition cannot contain values > 255. " 1138 f"Data defs support either a single 16-bit value OR multiple byte-values packed into one word.", 1139 )) 1140 value = value_data[0] # Use first value as fallback 1141 else: 1142 # All bytes: take the already-packed value from value_list 1143 value = value_data[0] # value_list already packs consecutive pairs 1144 else: 1145 value = value_data 1146 1147 data_def = IRDataDef( 1148 name=name, 1149 sm_id=sm_id, 1150 cell_addr=cell_addr, 1151 value=value, 1152 loc=loc, 1153 ) 1154 1155 return DataDefResult([data_def]) 1156 1157 @v_args(inline=True, meta=True) 1158 def location_dir(self, meta, *args) -> StatementResult: 1159 """Process location directive.""" 1160 loc = self._extract_loc(meta) 1161 1162 args_list = _filter_args(args) 1163 qualified_ref_dict = args_list[0] 1164 1165 tag = qualified_ref_dict["name"] 1166 1167 # Create region for location 1168 region = IRRegion( 1169 tag=tag, 1170 kind=RegionKind.LOCATION, 1171 body=IRGraph(), 1172 loc=loc, 1173 ) 1174 1175 return LocationResult(region) 1176 1177 @v_args(inline=True, meta=True) 1178 def system_pragma(self, meta, *params) -> Optional[StatementResult]: 1179 """Process @system pragma.""" 1180 loc = self._extract_loc(meta) 1181 1182 # Filter out tokens 1183 params_list = _filter_args(params) 1184 1185 # Check for duplicate @system pragma 1186 if self._system is not None: 1187 self._errors.append(AssemblyError( 1188 loc=loc, 1189 category=ErrorCategory.PARSE, 1190 message="Duplicate @system pragma", 1191 )) 1192 return None 1193 1194 # params are (name, value) tuples from system_param 1195 config_dict = {} 1196 for param_tuple in params_list: 1197 if isinstance(param_tuple, tuple): 1198 param_name, param_value = param_tuple 1199 config_dict[param_name] = param_value 1200 1201 # Map parameter names 1202 pe_count = config_dict.get("pe") 1203 sm_count = config_dict.get("sm") 1204 iram_capacity = config_dict.get("iram", 256) 1205 frame_count = config_dict.get("frames", 8) 1206 1207 if pe_count is None or sm_count is None: 1208 self._errors.append(AssemblyError( 1209 loc=loc, 1210 category=ErrorCategory.PARSE, 1211 message="@system pragma requires at least 'pe' and 'sm' parameters", 1212 )) 1213 return None 1214 1215 self._system = SystemConfig( 1216 pe_count=pe_count, 1217 sm_count=sm_count, 1218 iram_capacity=iram_capacity, 1219 frame_count=frame_count, 1220 loc=loc, 1221 ) 1222 return None # Don't return a StatementResult for pragmas 1223 1224 @v_args(inline=True) 1225 def system_param(self, param_name: LarkToken, value) -> tuple[str, int]: 1226 """Process @system parameter.""" 1227 # value can be a token (DEC_LIT or HEX_LIT) or already an int 1228 if isinstance(value, LarkToken): 1229 value = int(str(value), 0) # 0 base handles both decimal and 0x hex 1230 return (str(param_name), value) 1231 1232 @v_args(inline=True) 1233 def opcode(self, token) -> Optional[Union[ALUOp, MemOp, ParamRef]]: 1234 """Map opcode token to ALUOp/MemOp enum, ParamRef, or None if invalid.""" 1235 if isinstance(token, ParamRef): 1236 return token 1237 mnemonic = str(token) 1238 if mnemonic not in MNEMONIC_TO_OP: 1239 self._errors.append(AssemblyError( 1240 loc=SourceLoc(line=token.line, column=token.column), 1241 category=ErrorCategory.PARSE, 1242 message=f"Unknown opcode '{mnemonic}'", 1243 )) 1244 return None 1245 1246 return MNEMONIC_TO_OP[mnemonic] 1247 1248 @v_args(inline=True) 1249 def qualified_ref(self, *args) -> dict: 1250 """Collect qualified reference components into a dict.""" 1251 ref_type = None 1252 placement = None 1253 act_slot = None 1254 port = None 1255 1256 for arg in args: 1257 if isinstance(arg, PlacementRef): 1258 placement = arg 1259 elif isinstance(arg, PortRef): 1260 port = arg 1261 elif isinstance(arg, (ActSlotRef, ActSlotRange)): 1262 act_slot = arg 1263 elif isinstance(arg, (Port, int)): 1264 port = arg 1265 elif isinstance(arg, ParamRef): 1266 ref_type = {"name": arg} 1267 elif isinstance(arg, dict): 1268 ref_type = arg 1269 elif isinstance(arg, str) and (arg.startswith("pe") or arg.startswith("sm")): 1270 placement = arg 1271 1272 result = ref_type.copy() if ref_type else {} 1273 if placement is not None: 1274 result["placement"] = placement 1275 if act_slot is not None: 1276 result["act_slot"] = act_slot 1277 if port is not None: 1278 result["port"] = port 1279 1280 return result 1281 1282 @v_args(inline=True) 1283 def node_ref(self, token: LarkToken) -> dict: 1284 """Process @name reference.""" 1285 return {"name": f"@{token}"} 1286 1287 @v_args(inline=True) 1288 def label_ref(self, token: LarkToken) -> dict: 1289 """Process &name reference.""" 1290 return {"name": f"&{token}"} 1291 1292 @v_args(inline=True) 1293 def func_ref(self, token: LarkToken) -> dict: 1294 """Process $name reference.""" 1295 return {"name": f"${token}"} 1296 1297 def param_ref(self, args: list) -> Union[ParamRef, dict]: 1298 """Process ${name} macro parameter reference. 1299 1300 Returns ParamRef directly. When used in qualified_ref context, 1301 the qualified_ref handler wraps it in a dict. 1302 """ 1303 name = str(args[-1]) 1304 return ParamRef(param=name) 1305 1306 @v_args(inline=True) 1307 def placement(self, token) -> Union[str, PlacementRef]: 1308 """Extract placement specifier.""" 1309 if isinstance(token, ParamRef): 1310 return PlacementRef(param=token) 1311 return str(token) 1312 1313 def ctx_slot(self, args: list): 1314 """Extract context slot specifier. 1315 1316 Always returns a typed wrapper (ActSlotRef, ActSlotRange) so 1317 qualified_ref can distinguish ctx_slot ints from port ints. 1318 """ 1319 if len(args) == 1: 1320 arg = args[0] 1321 if isinstance(arg, ParamRef): 1322 return ActSlotRef(param=arg) 1323 if isinstance(arg, ActSlotRange): 1324 return arg 1325 n = int(str(arg)) 1326 return ActSlotRange(start=n, end=n) 1327 return args[0] 1328 1329 def ctx_range(self, args: list) -> ActSlotRange: 1330 """Extract context slot range (start..end).""" 1331 return ActSlotRange(start=int(str(args[0])), end=int(str(args[1]))) 1332 1333 @v_args(inline=True) 1334 def port(self, token) -> Union[Port, int, PortRef]: 1335 """Convert port specifier to Port enum, raw int, or PortRef. 1336 1337 Returns: 1338 Port.L for "L" 1339 Port.R for "R" 1340 Raw int for numeric values (e.g., cell address in data_def) 1341 PortRef for param_ref 1342 """ 1343 if isinstance(token, ParamRef): 1344 return PortRef(param=token) 1345 spec = str(token) 1346 if spec == "L": 1347 return Port.L 1348 elif spec == "R": 1349 return Port.R 1350 else: 1351 try: 1352 return int(spec) 1353 except ValueError: 1354 return Port.L 1355 1356 @v_args(inline=True) 1357 def hex_literal(self, token: LarkToken) -> int: 1358 """Parse hexadecimal literal.""" 1359 return int(str(token), 16) 1360 1361 @v_args(inline=True) 1362 def dec_literal(self, token: LarkToken) -> int: 1363 """Parse decimal literal.""" 1364 return int(str(token)) 1365 1366 def _process_escape_sequences(self, s: str) -> list[int]: 1367 """Process escape sequences in a string. 1368 1369 Handles: \\n, \\t, \\r, \\0, \\\\, \\\', \\x## 1370 1371 Args: 1372 s: String with potential escape sequences 1373 1374 Returns: 1375 List of character codes 1376 """ 1377 result = [] 1378 i = 0 1379 while i < len(s): 1380 if i + 1 < len(s) and s[i] == "\\": 1381 next_char = s[i + 1] 1382 if next_char == "n": 1383 result.append(ord("\n")) 1384 i += 2 1385 elif next_char == "t": 1386 result.append(ord("\t")) 1387 i += 2 1388 elif next_char == "r": 1389 result.append(ord("\r")) 1390 i += 2 1391 elif next_char == "0": 1392 result.append(0) 1393 i += 2 1394 elif next_char == "\\": 1395 result.append(ord("\\")) 1396 i += 2 1397 elif next_char == "'": 1398 result.append(ord("'")) 1399 i += 2 1400 elif next_char == '"': 1401 result.append(ord('"')) 1402 i += 2 1403 elif next_char == "x" and i + 3 < len(s): 1404 # Hex escape: \xHH 1405 hex_str = s[i + 2:i + 4] 1406 try: 1407 result.append(int(hex_str, 16)) 1408 i += 4 1409 except ValueError: 1410 # Invalid hex, just include the character 1411 result.append(ord(s[i])) 1412 i += 1 1413 else: 1414 # Unknown escape, just include the character 1415 result.append(ord(s[i])) 1416 i += 1 1417 else: 1418 result.append(ord(s[i])) 1419 i += 1 1420 return result 1421 1422 @v_args(inline=True) 1423 def char_literal(self, token: LarkToken) -> int: 1424 """Parse character literal.""" 1425 s = str(token) 1426 # Remove surrounding quotes 1427 s = s[1:-1] 1428 # Handle escape sequences 1429 if s == "\\n": 1430 return ord("\n") 1431 elif s == "\\t": 1432 return ord("\t") 1433 elif s == "\\r": 1434 return ord("\r") 1435 elif s == "\\0": 1436 return 0 1437 elif s == "\\\\": 1438 return ord("\\") 1439 elif s == "\\'": 1440 return ord("'") 1441 elif s.startswith("\\x"): 1442 return int(s[2:], 16) 1443 else: 1444 return ord(s[0]) 1445 1446 @v_args(inline=True) 1447 def string_literal(self, token: LarkToken) -> list[int]: 1448 """Parse string literal (returns list of character codes).""" 1449 s = str(token)[1:-1] # Remove quotes 1450 return self._process_escape_sequences(s) 1451 1452 @v_args(inline=True) 1453 def raw_string_literal(self, token: LarkToken) -> list[int]: 1454 """Parse raw string literal (no escape processing).""" 1455 s = str(token)[2:-1] # Remove r" and " 1456 return [ord(c) for c in s] 1457 1458 @v_args(inline=True) 1459 def byte_string_literal(self, token: LarkToken) -> list[int]: 1460 """Parse byte string literal.""" 1461 s = str(token)[2:-1] # Remove b" and " 1462 return self._process_escape_sequences(s) 1463 1464 @v_args(inline=True) 1465 def named_arg(self, arg_name: LarkToken, value: Any) -> tuple[str, Any]: 1466 """Process named argument.""" 1467 return (str(arg_name), value) 1468 1469 @v_args(inline=True) 1470 def ref_list(self, *refs) -> list[dict]: 1471 """Collect reference list.""" 1472 return list(refs) 1473 1474 @v_args(inline=True) 1475 def value_list(self, *values) -> list[int]: 1476 """Collect value list and pack multi-char values big-endian. 1477 1478 - Hex/dec literals: returned as single values (not packed) 1479 - Multiple char values: packed big-endian into 16-bit words 1480 - String/list data: chars extracted and packed 1481 """ 1482 # Flatten values (strings return lists of char codes) 1483 result = [] 1484 for value in values: 1485 if isinstance(value, list): 1486 # String data from string_literal, etc. 1487 result.extend(value) 1488 else: 1489 # Single value (char or hex/dec literal) 1490 result.append(value) 1491 1492 # Only pack if we have multiple values (char pairs) AND all are bytes 1493 if len(result) <= 1: 1494 # Single value: return as-is (whether hex literal or single char) 1495 return result 1496 1497 all_bytes = all(0 <= v <= 255 for v in result) 1498 if not all_bytes: 1499 # Mixed or large values, return as-is 1500 return result 1501 1502 # Multiple bytes: pack consecutive pairs big-endian 1503 packed = [] 1504 i = 0 1505 while i < len(result): 1506 if i + 1 < len(result): 1507 # Two bytes: big-endian 1508 val = (result[i] << 8) | result[i + 1] 1509 packed.append(val) 1510 i += 2 1511 else: 1512 # Single byte: pad with 0 in low byte 1513 val = (result[i] << 8) | 0x00 1514 packed.append(val) 1515 i += 1 1516 1517 return packed 1518 1519 1520def lower(tree) -> IRGraph: 1521 """Lower a parse tree into an IRGraph. 1522 1523 Args: 1524 tree: A Lark parse tree from parsing dfasm source 1525 1526 Returns: 1527 An IRGraph with nodes, edges, regions, and any errors encountered 1528 """ 1529 transformer = LowerTransformer() 1530 return transformer.transform(tree)