OR-1 dataflow CPU sketch
1"""Lower pass: Convert Lark CST to IR graph.
2
3This module implements a Lark Transformer that converts a parse tree from the
4dfasm grammar into an IRGraph. The transformer handles:
5- Instruction definitions and node creation
6- Plain, strong, and weak edge routing
7- Function and location regions
8- Data definitions
9- System configuration pragmas
10- Name qualification (scoping)
11- Error collection for reserved names and duplicates
12"""
13
14from typing import Any, Optional, Union, Tuple, List, Dict
15from dataclasses import replace
16import re
17from lark import Transformer, v_args, Tree
18from lark.lexer import Token as LarkToken
19
20from asm.ir import (
21 IRGraph, IRNode, IREdge, IRRegion, RegionKind, IRDataDef, SystemConfig,
22 SourceLoc, NameRef, ResolvedDest, MacroParam, ParamRef, MacroDef, IRMacroCall,
23 CallSiteResult, IRRepetitionBlock,
24 PlacementRef, PortRef, ActSlotRef, ActSlotRange,
25)
26from asm.errors import AssemblyError, ErrorCategory
27from asm.opcodes import MNEMONIC_TO_OP
28from cm_inst import ALUOp, MemOp, Port, RoutingOp
29
30# Reserved names that cannot be used as node definitions
31_RESERVED_NAMES = frozenset({"@system", "@io", "@debug"})
32
33# Pattern for detecting ${param} token pasting in identifiers
34_PASTE_PATTERN = re.compile(r'^(.*?)\$\{([a-zA-Z_][a-zA-Z0-9_]*)\}(.*)$')
35
36
37def _filter_args(args: tuple) -> list:
38 """Filter out LarkTokens from argument list."""
39 return [arg for arg in args if not isinstance(arg, LarkToken)]
40
41
42def _normalize_port(value: Union[int, Port, PortRef]) -> Union[Port, PortRef]:
43 """Normalize a port value to Port enum, preserving PortRef for macro templates.
44
45 Args:
46 value: An int (0/1), Port enum, or PortRef (macro parameter)
47
48 Returns:
49 Port enum value, or PortRef passed through for later expansion
50 """
51 if isinstance(value, PortRef):
52 return value
53 if isinstance(value, Port):
54 return value
55 if isinstance(value, int):
56 return Port.L if value == 0 else (Port.R if value == 1 else Port.L)
57 return Port.L
58
59
60# Structured statement result types
61class StatementResult:
62 """Base class for statement processing results."""
63 pass
64
65
66class NodeResult(StatementResult):
67 """Result from inst_def: one or more IRNodes."""
68 def __init__(self, nodes: Dict[str, IRNode]):
69 self.nodes = nodes
70
71
72class EdgeResult(StatementResult):
73 """Result from plain_edge or anonymous edges: IREdges."""
74 def __init__(self, edges: List[IREdge]):
75 self.edges = edges
76
77
78class FunctionResult(StatementResult):
79 """Result from func_def: an IRRegion."""
80 def __init__(self, region: IRRegion):
81 self.region = region
82
83
84class LocationResult(StatementResult):
85 """Result from location_dir: an IRRegion."""
86 def __init__(self, region: IRRegion):
87 self.region = region
88
89
90class DataDefResult(StatementResult):
91 """Result from data_def: IRDataDefs."""
92 def __init__(self, data_defs: List[IRDataDef]):
93 self.data_defs = data_defs
94
95
96class MacroDefResult(StatementResult):
97 """Result from macro_def: a MacroDef."""
98 def __init__(self, macro_def: MacroDef):
99 self.macro_def = macro_def
100
101
102class MacroCallResult(StatementResult):
103 """Result from macro_call_stmt: an IRMacroCall."""
104 def __init__(self, macro_call: IRMacroCall):
105 self.macro_call = macro_call
106
107
108class CallSiteResultStatement(StatementResult):
109 """Result from call_stmt: a CallSiteResult."""
110 def __init__(self, call_site_result: CallSiteResult):
111 self.call_site_result = call_site_result
112
113
114class RepetitionBlockResult(StatementResult):
115 """Result from repetition_block: an IRRepetitionBlock."""
116 def __init__(self, repetition_block: IRRepetitionBlock):
117 self.repetition_block = repetition_block
118
119
120class CompositeResult(StatementResult):
121 """Result combining nodes and edges (for strong/weak edges)."""
122 def __init__(self, nodes: Dict[str, IRNode], edges: List[IREdge]):
123 self.nodes = nodes
124 self.edges = edges
125
126
127class LowerTransformer(Transformer):
128 """Transformer that converts a CST into an IRGraph.
129
130 The transformer collects statement results and then in the `start` rule
131 organizes them into the final IRGraph structure.
132 """
133
134 def __init__(self):
135 super().__init__()
136 self._anon_counter: int = 0
137 self._errors: list[AssemblyError] = []
138 self._defined_names: dict[str, SourceLoc] = {}
139 self._system: Optional[SystemConfig] = None
140
141 def _qualify_name(self, name, func_scope: Optional[str]):
142 """Apply function scope qualification to a name.
143
144 ParamRef values pass through unchanged — they are resolved during
145 macro expansion, not during lowering.
146 """
147 if isinstance(name, ParamRef):
148 return name
149 if isinstance(name, str) and name.startswith("&") and func_scope:
150 return f"{func_scope}.{name}"
151 return name
152
153 def _extract_loc(self, meta: Any) -> SourceLoc:
154 """Extract SourceLoc from Lark's meta object."""
155 return SourceLoc(
156 line=meta.line,
157 column=meta.column,
158 end_line=meta.end_line if hasattr(meta, "end_line") else None,
159 end_column=meta.end_column if hasattr(meta, "end_column") else None,
160 )
161
162 def _gen_anon_name(self, func_scope: Optional[str]) -> str:
163 """Generate an anonymous node name, qualified by current scope."""
164 name = f"&__anon_{self._anon_counter}"
165 self._anon_counter += 1
166 return self._qualify_name(name, func_scope)
167
168 def _check_reserved_name(self, name: str, loc: SourceLoc) -> bool:
169 """Check if name is reserved. Return True if reserved (and add error)."""
170 if name in _RESERVED_NAMES:
171 self._errors.append(AssemblyError(
172 loc=loc,
173 category=ErrorCategory.NAME,
174 message=f"Reserved name '{name}' cannot be used as a node definition"
175 ))
176 return True
177 return False
178
179 def _check_duplicate_name(self, name: str, loc: SourceLoc) -> bool:
180 """Check for duplicate definition. Return True if duplicate (and add error)."""
181 if name in self._defined_names:
182 prev_loc = self._defined_names[name]
183 self._errors.append(AssemblyError(
184 loc=loc,
185 category=ErrorCategory.SCOPE,
186 message=f"Duplicate label '{name}'",
187 suggestions=[f"First defined at line {prev_loc.line}"]
188 ))
189 return True
190 self._defined_names[name] = loc
191 return False
192
193 def _process_statements(
194 self,
195 statements: list,
196 func_scope: Optional[str] = None
197 ) -> Tuple[Dict[str, IRNode], List[IREdge], List[IRRegion], List[IRDataDef], List]:
198 """Process a list of statement results and collect them into containers."""
199 nodes = {}
200 edges = []
201 regions = []
202 data_defs = []
203 call_sites = []
204
205 # Reset defined names for this scope
206 prev_defined_names = self._defined_names
207 self._defined_names = {}
208
209 for stmt in statements:
210 if isinstance(stmt, NodeResult):
211 # Qualify and add nodes
212 for node_name, node in stmt.nodes.items():
213 qualified_name = self._qualify_name(node_name, func_scope)
214 if not self._check_duplicate_name(qualified_name, node.loc):
215 # Update node with qualified name
216 qualified_node = replace(node, name=qualified_name)
217 nodes[qualified_name] = qualified_node
218
219 elif isinstance(stmt, EdgeResult):
220 # Qualify and add edges
221 for edge in stmt.edges:
222 qualified_edge = replace(
223 edge,
224 source=self._qualify_name(edge.source, func_scope),
225 dest=self._qualify_name(edge.dest, func_scope),
226 )
227 edges.append(qualified_edge)
228
229 elif isinstance(stmt, CompositeResult):
230 # Composite: both nodes and edges (strong/weak edges)
231 for node_name, node in stmt.nodes.items():
232 qualified_name = self._qualify_name(node_name, func_scope)
233 if not self._check_duplicate_name(qualified_name, node.loc):
234 qualified_node = replace(node, name=qualified_name)
235 nodes[qualified_name] = qualified_node
236 for edge in stmt.edges:
237 qualified_edge = replace(
238 edge,
239 source=self._qualify_name(edge.source, func_scope),
240 dest=self._qualify_name(edge.dest, func_scope),
241 )
242 edges.append(qualified_edge)
243
244 elif isinstance(stmt, FunctionResult):
245 regions.append(stmt.region)
246
247 elif isinstance(stmt, LocationResult):
248 regions.append(stmt.region)
249
250 elif isinstance(stmt, DataDefResult):
251 data_defs.extend(stmt.data_defs)
252
253 elif isinstance(stmt, MacroDefResult):
254 # Macro definitions are stored separately, not as regions
255 pass # Collected at the start() level
256
257 elif isinstance(stmt, MacroCallResult):
258 # Macro calls are stored separately
259 pass # Collected at the start() level
260
261 elif isinstance(stmt, CallSiteResultStatement):
262 # Call sites are stored separately
263 call_sites.append(stmt.call_site_result)
264
265 # Restore defined names
266 self._defined_names = prev_defined_names
267
268 return nodes, edges, regions, data_defs, call_sites
269
270 def start(self, items: list) -> IRGraph:
271 """Process the entire program and return an IRGraph.
272
273 Post-processing: Groups statements following location_dir into that region's body.
274 """
275 # First pass: collect all items
276 nodes, edges, regions, data_defs, call_sites = self._process_statements(items, None)
277
278 # Second pass: post-process location regions to collect subsequent statements
279 # Find LocationResult objects and collect subsequent statements into their body
280 location_results = [r for r in regions if r.kind == RegionKind.LOCATION]
281
282 # Track which nodes, data_defs, and edges are moved into location regions
283 moved_node_names = set()
284 moved_data_names = set()
285 moved_edge_sources = set() # Track edges by (source, dest) tuple
286
287 if location_results:
288 # Build a mapping of location regions to their collected body
289 for loc_region in location_results:
290 # Find the position of this region in the items list
291 # by matching the tag
292 body_nodes = {}
293 body_edges = []
294 body_data_defs = []
295
296 # Collect subsequent non-region statements
297 collecting = False
298 for item in items:
299 if isinstance(item, LocationResult) and item.region.tag == loc_region.tag:
300 collecting = True
301 continue
302
303 if collecting:
304 # Stop at next region boundary
305 if isinstance(item, (FunctionResult, LocationResult)):
306 break
307
308 # Collect into location body
309 if isinstance(item, NodeResult):
310 body_nodes.update(item.nodes)
311 moved_node_names.update(item.nodes.keys())
312 elif isinstance(item, EdgeResult):
313 body_edges.extend(item.edges)
314 moved_edge_sources.update((e.source, e.dest) for e in item.edges)
315 elif isinstance(item, DataDefResult):
316 body_data_defs.extend(item.data_defs)
317 moved_data_names.update(d.name for d in item.data_defs)
318 elif isinstance(item, CompositeResult):
319 body_nodes.update(item.nodes)
320 moved_node_names.update(item.nodes.keys())
321 body_edges.extend(item.edges)
322 moved_edge_sources.update((e.source, e.dest) for e in item.edges)
323
324 # Update the location region with collected body
325 if body_nodes or body_edges or body_data_defs:
326 new_body = IRGraph(
327 nodes=body_nodes,
328 edges=body_edges,
329 regions=[],
330 data_defs=body_data_defs,
331 )
332 # Find and replace this region in the regions list
333 regions = [
334 IRRegion(
335 tag=r.tag,
336 kind=r.kind,
337 body=new_body if r.tag == loc_region.tag else r.body,
338 loc=r.loc,
339 )
340 for r in regions
341 ]
342
343 # Remove items that were moved into location regions from top-level containers
344 nodes = {k: v for k, v in nodes.items() if k not in moved_node_names}
345 data_defs = [d for d in data_defs if d.name not in moved_data_names]
346 edges = [e for e in edges if (e.source, e.dest) not in moved_edge_sources]
347
348 # Collect macro definitions and calls from items
349 macro_defs = []
350 macro_calls = []
351 for item in items:
352 if isinstance(item, MacroDefResult):
353 macro_defs.append(item.macro_def)
354 elif isinstance(item, MacroCallResult):
355 macro_calls.append(item.macro_call)
356
357 return IRGraph(
358 nodes=nodes,
359 edges=edges,
360 regions=regions,
361 data_defs=data_defs,
362 system=self._system,
363 errors=self._errors,
364 macro_defs=macro_defs,
365 macro_calls=macro_calls,
366 raw_call_sites=tuple(call_sites),
367 )
368
369 @v_args(inline=True)
370 def inline_const(self, value) -> Union[int, ParamRef]:
371 """Parse inline constant (space-separated, e.g., 'add 7' or '${param}')."""
372 if isinstance(value, ParamRef):
373 return value
374 return int(str(value), 0)
375
376 @v_args(inline=True, meta=True)
377 def inst_def(self, meta, *args) -> StatementResult:
378 """Process instruction definition."""
379 loc = self._extract_loc(meta)
380
381 # Filter out tokens (FLOW_IN, etc.) - keep only transformed results
382 args_list = _filter_args(args)
383
384 # First arg is qualified_ref_dict, second is opcode, rest are arguments
385 qualified_ref_dict = args_list[0]
386 opcode = args_list[1]
387 remaining_args = args_list[2:] if len(args_list) > 2 else []
388
389 # Extract name (will be qualified later in _process_statements)
390 name = qualified_ref_dict["name"]
391
392 # Check reserved names
393 if self._check_reserved_name(name, loc):
394 return NodeResult({})
395
396 # If opcode is None (invalid), skip node creation (error already added)
397 if opcode is None:
398 return NodeResult({})
399
400 # Extract placement (PE qualifier)
401 pe = None
402 if "placement" in qualified_ref_dict and qualified_ref_dict["placement"]:
403 placement_val = qualified_ref_dict["placement"]
404 if isinstance(placement_val, PlacementRef):
405 pe = placement_val
406 elif isinstance(placement_val, str) and placement_val.startswith("pe"):
407 try:
408 pe = int(placement_val[2:])
409 except ValueError:
410 pass
411
412 # Extract activation slot qualifier
413 act_slot = qualified_ref_dict.get("act_slot")
414
415 # Extract const and named args from arguments
416 # Check if first remaining arg is an inline_const (int directly after opcode)
417 const = None
418 args_dict = {}
419 positional_count = 0
420
421 for arg in remaining_args:
422 if isinstance(arg, tuple): # named_arg
423 arg_name, arg_value = arg
424 args_dict[arg_name] = arg_value
425 else:
426 # positional argument
427 if positional_count == 0:
428 if isinstance(arg, dict) and isinstance(arg.get("name"), ParamRef):
429 const = arg["name"]
430 elif not isinstance(arg, dict):
431 const = arg
432 positional_count += 1
433
434 # Create IRNode
435 node = IRNode(
436 name=name,
437 opcode=opcode,
438 dest_l=None,
439 dest_r=None,
440 const=const,
441 pe=pe,
442 act_slot=act_slot,
443 loc=loc,
444 args=args_dict if args_dict else None,
445 )
446 return NodeResult({name: node})
447
448 @v_args(inline=True, meta=True)
449 def plain_edge(self, meta, *args) -> StatementResult:
450 """Process plain edge (wiring between named nodes).
451
452 The source's port (if specified) becomes source_port (output slot).
453 The dest's port (if specified) becomes port (input port), defaulting to L.
454 """
455 loc = self._extract_loc(meta)
456
457 args_list = _filter_args(args)
458 source_dict = args_list[0]
459 dest_list = args_list[1]
460
461 source_name = source_dict["name"]
462 # Source port is from the source's port specification
463 source_port = source_dict.get("port") if "port" in source_dict else None
464 # Normalize source_port to Port if it's a raw int (convert 0→L, 1→R)
465 if source_port is not None:
466 source_port = _normalize_port(source_port)
467
468 edges = []
469 for dest_dict in dest_list:
470 dest_name = dest_dict["name"]
471 # Dest port is from the dest's port specification, defaults to L
472 raw_port = dest_dict.get("port")
473 port_explicit = raw_port is not None
474 if raw_port is None:
475 dest_port = Port.L
476 else:
477 dest_port = _normalize_port(raw_port)
478
479 edge = IREdge(
480 source=source_name,
481 dest=dest_name,
482 port=dest_port,
483 source_port=source_port,
484 port_explicit=port_explicit,
485 loc=loc,
486 )
487 edges.append(edge)
488
489 return EdgeResult(edges)
490
491 def _wire_anonymous_node(
492 self, opcode: Union[ALUOp, MemOp], inputs: list, outputs: list, loc: SourceLoc,
493 const_value: Optional[int] = None, is_seed: bool = False,
494 ) -> StatementResult:
495 """Wire inputs and outputs for an anonymous edge node.
496
497 Generates the IRNode for an anonymous edge and all associated edges
498 (both input and output wiring). This logic is shared between strong_edge
499 and weak_edge, which differ only in how they parse their arguments.
500
501 Args:
502 opcode: The instruction opcode
503 inputs: List of input reference dicts with "name" and optional "port"
504 outputs: List of output reference dicts with "name" and optional "port"
505 loc: Source location for error reporting
506 const_value: Optional constant value for the node
507 is_seed: If True, mark the node as a seed (no IRAM slot, emits seed token)
508
509 Returns:
510 CompositeResult with anonymous node and all input/output edges
511 """
512 # Generate anonymous node (not qualified yet)
513 anon_name = f"&__anon_{self._anon_counter}"
514 self._anon_counter += 1
515
516 # Create anonymous IRNode
517 anon_node = IRNode(
518 name=anon_name,
519 opcode=opcode,
520 const=const_value,
521 loc=loc,
522 seed=is_seed,
523 )
524
525 # Wire inputs: first input → Port.L, second → Port.R
526 edges = []
527 for idx, input_arg in enumerate(inputs):
528 if isinstance(input_arg, dict) and "name" in input_arg:
529 # It's a qualified_ref
530 input_name = input_arg["name"]
531 input_port = Port.L if idx == 0 else Port.R
532 edge = IREdge(
533 source=input_name,
534 dest=anon_name,
535 port=input_port,
536 source_port=None,
537 loc=loc,
538 )
539 edges.append(edge)
540
541 # Wire outputs
542 for output_dict in outputs:
543 output_name = output_dict["name"]
544 raw_port = output_dict.get("port")
545 out_port_explicit = raw_port is not None
546 if raw_port is None:
547 output_port = Port.L
548 else:
549 output_port = _normalize_port(raw_port)
550
551 edge = IREdge(
552 source=anon_name,
553 dest=output_name,
554 port=output_port,
555 source_port=None,
556 port_explicit=out_port_explicit,
557 loc=loc,
558 )
559 edges.append(edge)
560
561 # Return both the node and edges
562 return CompositeResult({anon_name: anon_node}, edges)
563
564 @v_args(inline=True, meta=True)
565 def strong_edge(self, meta, *args) -> StatementResult:
566 """Process strong inline edge (anonymous node with inputs and outputs).
567
568 Syntax: opcode input [, input ...] |> output [, output ...]
569
570 Special case: `const N |> &dest` creates a seed node — a CONST node
571 that emits a seed token at startup without occupying an IRAM slot.
572 """
573 loc = self._extract_loc(meta)
574
575 args_list = _filter_args(args)
576 opcode = args_list[0]
577 remaining_args = args_list[1:]
578
579 # If opcode is None (invalid), skip edge creation (error already added)
580 if opcode is None:
581 return CompositeResult({}, [])
582
583 # Split arguments into inputs and outputs
584 inputs = []
585 outputs = []
586 processing_outputs = False
587 const_value = None
588
589 for arg in remaining_args:
590 if isinstance(arg, list): # This is ref_list
591 processing_outputs = True
592 outputs = arg
593 elif not processing_outputs:
594 if isinstance(arg, int):
595 const_value = arg
596 else:
597 inputs.append(arg)
598
599 # Detect seed pattern: `const N |> &dest`
600 is_seed = (
601 isinstance(opcode, RoutingOp) and opcode == RoutingOp.CONST
602 and const_value is not None
603 and len(inputs) == 0
604 )
605
606 # Wire the anonymous node and its edges
607 return self._wire_anonymous_node(opcode, inputs, outputs, loc,
608 const_value=const_value, is_seed=is_seed)
609
610 @v_args(inline=True, meta=True)
611 def weak_edge(self, meta, *args) -> StatementResult:
612 """Process weak inline edge (outputs then opcode then inputs).
613
614 Syntax: outputs... opcode inputs...
615 Semantically identical to strong_edge but syntactically reversed.
616 """
617 loc = self._extract_loc(meta)
618
619 args_list = _filter_args(args)
620 output_list = args_list[0]
621 opcode = args_list[1]
622 remaining_args = args_list[2:] if len(args_list) > 2 else []
623
624 # If opcode is None (invalid), skip edge creation (error already added)
625 if opcode is None:
626 return CompositeResult({}, [])
627
628 inputs = list(remaining_args)
629 outputs = output_list
630
631 # Wire the anonymous node and its edges
632 return self._wire_anonymous_node(opcode, inputs, outputs, loc)
633
634 def func_def(self, args: list) -> StatementResult:
635 """Process function definition (region with nested scope)."""
636 # Without v_args decorator, args come as a list with LarkToken terminals mixed in
637 # Filter out tokens and extract the actual data
638 args_list = _filter_args(args)
639
640 # args[0] is func_ref dict, rest are statement results
641 func_ref_dict = args_list[0] if args_list else {}
642 func_name = func_ref_dict.get("name", "$unknown") if isinstance(func_ref_dict, dict) else "$unknown"
643 statement_results = args_list[1:] if len(args_list) > 1 else []
644
645 # Try to extract location from the raw args (may have meta on Tree nodes)
646 loc = SourceLoc(0, 0)
647 for arg in args:
648 if hasattr(arg, 'meta'):
649 try:
650 loc = self._extract_loc(arg.meta)
651 break
652 except (AttributeError, TypeError):
653 pass
654
655 # Process the statements with the function scope
656 func_nodes, func_edges, func_regions, func_data_defs, func_call_sites = self._process_statements(
657 statement_results,
658 func_scope=func_name
659 )
660
661 # Collect macro_calls from function body statements
662 func_macro_calls = []
663 for stmt in statement_results:
664 if isinstance(stmt, MacroCallResult):
665 func_macro_calls.append(stmt.macro_call)
666
667 # Create IRRegion for the function
668 body_graph = IRGraph(
669 nodes=func_nodes,
670 edges=func_edges,
671 regions=func_regions,
672 data_defs=func_data_defs,
673 macro_calls=func_macro_calls,
674 raw_call_sites=tuple(func_call_sites),
675 )
676
677 region = IRRegion(
678 tag=func_name,
679 kind=RegionKind.FUNCTION,
680 body=body_graph,
681 loc=loc,
682 )
683
684 return FunctionResult(region)
685
686 def _apply_paste_patterns(self, body: IRGraph) -> IRGraph:
687 """Post-process macro body to replace ${param} patterns with ParamRef.
688
689 Scans all node names and edge endpoints in the body for ${param} patterns
690 and constructs ParamRef instances with appropriate prefix/suffix fields.
691 This post-processing approach avoids the bottom-up traversal issue where
692 Lark processes node_ref/label_ref terminals before macro_def is invoked.
693
694 Args:
695 body: The constructed IRGraph from macro body processing
696
697 Returns:
698 New IRGraph with all ${param} patterns replaced by ParamRef instances
699 """
700 # Process all nodes to replace ${param} patterns in their names
701 new_nodes = {}
702 for node_name, node in body.nodes.items():
703 match = _PASTE_PATTERN.match(node.name)
704 if match:
705 # Node name contains ${param} pattern
706 new_name = ParamRef(
707 param=match.group(2),
708 prefix=match.group(1),
709 suffix=match.group(3),
710 )
711 new_nodes[node_name] = replace(node, name=new_name)
712 else:
713 new_nodes[node_name] = node
714
715 # Process all edges to replace ${param} patterns in source/dest
716 new_edges = []
717 for edge in body.edges:
718 new_source = edge.source
719 new_dest = edge.dest
720
721 # Check source for pattern
722 if isinstance(edge.source, str):
723 match = _PASTE_PATTERN.match(edge.source)
724 if match:
725 new_source = ParamRef(
726 param=match.group(2),
727 prefix=match.group(1),
728 suffix=match.group(3),
729 )
730
731 # Check dest for pattern
732 if isinstance(edge.dest, str):
733 match = _PASTE_PATTERN.match(edge.dest)
734 if match:
735 new_dest = ParamRef(
736 param=match.group(2),
737 prefix=match.group(1),
738 suffix=match.group(3),
739 )
740
741 # Add edge with potential replacements
742 if new_source != edge.source or new_dest != edge.dest:
743 new_edges.append(replace(edge, source=new_source, dest=new_dest))
744 else:
745 new_edges.append(edge)
746
747 # Return new IRGraph with updated nodes and edges
748 return replace(body, nodes=new_nodes, edges=new_edges)
749
750 @v_args(meta=True)
751 def macro_def(self, meta, args: list) -> StatementResult:
752 """Process macro definition (template with parameters).
753
754 Uses @v_args(meta=True) to receive source location metadata.
755 """
756 # Extract macro name from first IDENT terminal (before filtering)
757 macro_name = "unknown"
758 for arg in args:
759 if isinstance(arg, LarkToken):
760 macro_name = str(arg)
761 break
762
763 # Extract location from meta
764 loc = self._extract_loc(meta)
765
766 # Check for reserved name (starts with "ret")
767 if macro_name.startswith("ret"):
768 self._errors.append(AssemblyError(
769 loc=loc,
770 category=ErrorCategory.NAME,
771 message=f"Macro name '#{macro_name}' uses reserved prefix 'ret'",
772 ))
773 return MacroDefResult(MacroDef(name=macro_name, params=(), body=IRGraph(), loc=loc))
774
775 # Separate params from body statements
776 params: list[MacroParam] = []
777 statement_results: list = []
778 variadic_param_name: Optional[str] = None
779
780 for item in args:
781 if isinstance(item, list) and all(isinstance(p, tuple) and len(p) == 2 for p in item):
782 # This is the macro_params result (list of (name, variadic) tuples)
783 seen_names: set[str] = set()
784 for param_name, is_variadic in item:
785 if param_name in seen_names:
786 self._errors.append(AssemblyError(
787 loc=loc,
788 category=ErrorCategory.NAME,
789 message=f"Duplicate parameter name '{param_name}' in macro '#{macro_name}'",
790 ))
791 else:
792 seen_names.add(param_name)
793 if is_variadic:
794 # Validate: variadic param must be last
795 if variadic_param_name is not None:
796 self._errors.append(AssemblyError(
797 loc=loc,
798 category=ErrorCategory.NAME,
799 message=f"Multiple variadic parameters in macro '#{macro_name}' (only one allowed)",
800 ))
801 variadic_param_name = param_name
802 elif variadic_param_name is not None:
803 # Non-variadic param after variadic param
804 self._errors.append(AssemblyError(
805 loc=loc,
806 category=ErrorCategory.NAME,
807 message=f"Variadic parameter must be last in macro '#{macro_name}'",
808 ))
809 params.append(MacroParam(name=param_name, variadic=is_variadic))
810 elif isinstance(item, StatementResult):
811 statement_results.append(item)
812
813 # Process body statements (no function scope — macros don't create ctx scopes)
814 body_nodes, body_edges, body_regions, body_data_defs, body_call_sites = self._process_statements(
815 statement_results,
816 func_scope=None
817 )
818
819 # Collect macro_calls and repetition_blocks from body statements
820 body_macro_calls = []
821 repetition_blocks = []
822 for stmt in statement_results:
823 if isinstance(stmt, MacroCallResult):
824 body_macro_calls.append(stmt.macro_call)
825 elif isinstance(stmt, RepetitionBlockResult):
826 # Update variadic_param in the repetition block if we have a variadic param
827 rep_block = stmt.repetition_block
828 if variadic_param_name and rep_block.variadic_param == "":
829 # Replace the placeholder with the actual variadic param name
830 rep_block = replace(rep_block, variadic_param=variadic_param_name)
831 repetition_blocks.append(rep_block)
832
833 body = IRGraph(
834 nodes=body_nodes,
835 edges=body_edges,
836 regions=body_regions,
837 data_defs=body_data_defs,
838 macro_calls=body_macro_calls,
839 raw_call_sites=tuple(body_call_sites),
840 )
841
842 # Post-process to apply ${param} token pasting patterns
843 body = self._apply_paste_patterns(body)
844
845 macro = MacroDef(
846 name=macro_name,
847 params=tuple(params),
848 body=body,
849 repetition_blocks=repetition_blocks,
850 loc=loc,
851 )
852
853 return MacroDefResult(macro)
854
855 def macro_params(self, args: list) -> list[tuple]:
856 """Process macro parameter list.
857
858 Returns list of (name, variadic) tuples.
859
860 Note: Comma tokens and other non-tuple/string types from the
861 grammar are silently skipped during iteration.
862 """
863 result = []
864 for arg in args:
865 if isinstance(arg, tuple):
866 # From macro_param rule (variadic_param or regular_param)
867 result.append(arg)
868 elif isinstance(arg, str):
869 # Fallback for simple string params
870 result.append((arg, False))
871 # Other token types (commas) are silently skipped
872 return result
873
874 def variadic_param(self, args: list) -> tuple:
875 """Process a variadic macro parameter (*name).
876
877 Returns (name, True) tuple.
878 """
879 # args will be [VARIADIC_token, IDENT_token]
880 # IDENT is always the last token per the grammar rule
881 name = str(args[-1])
882 return (name, True)
883
884 def regular_param(self, args: list) -> tuple:
885 """Process a regular macro parameter (name).
886
887 Returns (name, False) tuple.
888 """
889 # args will be [IDENT_token]
890 if args:
891 name = str(args[0].value if hasattr(args[0], 'value') else args[0])
892 else:
893 name = "unknown"
894 return (name, False)
895
896 @v_args(meta=True)
897 def repetition_block(self, meta, args: list) -> StatementResult:
898 """Process repetition block: $( body ),*.
899
900 The repetition block syntax within macro bodies will be expanded
901 in the expand pass. Here we collect the body as an IRGraph.
902
903 Creates an IRRepetitionBlock with an empty string placeholder for
904 variadic_param. The placeholder will be resolved during macro_def
905 processing by matching against the macro's actual variadic parameter.
906 """
907 loc = self._extract_loc(meta)
908
909 # Filter statement results from args
910 statement_results = [arg for arg in args if isinstance(arg, StatementResult)]
911
912 # Process body statements
913 body_nodes, body_edges, body_regions, body_data_defs, body_call_sites = self._process_statements(
914 statement_results,
915 func_scope=None
916 )
917
918 body = IRGraph(
919 nodes=body_nodes,
920 edges=body_edges,
921 regions=body_regions,
922 data_defs=body_data_defs,
923 raw_call_sites=tuple(body_call_sites),
924 )
925
926 # Apply token pasting patterns to the body
927 body = self._apply_paste_patterns(body)
928
929 # Create a placeholder IRRepetitionBlock
930 # The variadic_param will be resolved in the expand pass
931 # For now, use empty string as a placeholder
932 rep_block = IRRepetitionBlock(
933 body=body,
934 variadic_param="", # Placeholder, resolved in expand pass
935 loc=loc,
936 )
937
938 return RepetitionBlockResult(rep_block)
939
940 @v_args(meta=True)
941 def macro_call_stmt(self, meta, args: list) -> StatementResult:
942 """Process standalone macro invocation."""
943 loc = self._extract_loc(meta)
944
945 # Extract macro name from first IDENT terminal
946 macro_name = "unknown"
947 for arg in args:
948 if isinstance(arg, LarkToken):
949 macro_name = str(arg)
950 break
951
952 positional_args = []
953 named_args: dict[str, object] = {}
954 output_dests = ()
955 found_name = False
956 for item in args:
957 if isinstance(item, LarkToken):
958 if not found_name:
959 # First LarkToken is the macro name
960 found_name = True
961 continue
962 if item.type in ("OPCODE", "IDENT"):
963 # Bare opcode or identifier as macro argument — wrap as string
964 positional_args.append(str(item))
965 continue
966 # Skip other tokens (FLOW_OUT, commas, etc.)
967 continue
968 elif isinstance(item, list) and all(isinstance(x, dict) for x in item):
969 # call_output_list result — list of output dest dicts
970 output_dests = tuple(item)
971 elif isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str):
972 # Named argument from named_arg rule (name, value)
973 named_args[item[0]] = item[1]
974 elif isinstance(item, dict) and "name" in item:
975 # Positional argument (qualified_ref or value)
976 positional_args.append(item)
977 elif item is not None:
978 # Other argument types (int literals, etc.)
979 positional_args.append(item)
980
981 macro_call = IRMacroCall(
982 name=macro_name,
983 positional_args=tuple(positional_args),
984 named_args=tuple(named_args.items()),
985 output_dests=output_dests,
986 loc=loc,
987 )
988
989 return MacroCallResult(macro_call)
990
991 @v_args(meta=True)
992 def call_stmt(self, meta, args: list) -> StatementResult:
993 """Process function call statement.
994
995 The call_stmt grammar rule is:
996 call_stmt: func_ref argument ("," argument)* FLOW_OUT call_output_list
997
998 Args are: [func_ref_dict, arg1, arg2, ..., call_output_list]
999 """
1000 loc = self._extract_loc(meta)
1001
1002 # Filter out LarkTokens (FLOW_OUT)
1003 args_list = _filter_args(args)
1004
1005 if not args_list:
1006 self._errors.append(AssemblyError(
1007 loc=loc,
1008 category=ErrorCategory.PARSE,
1009 message="call_stmt requires function name and arguments"
1010 ))
1011 return CallSiteResultStatement(CallSiteResult(
1012 func_name="$unknown",
1013 input_args=(),
1014 output_dests=(),
1015 loc=loc,
1016 ))
1017
1018 # First arg is func_ref dict
1019 func_ref_dict = args_list[0]
1020 func_name = func_ref_dict.get("name", "$unknown")
1021
1022 # Process remaining args: arguments come before output_dests
1023 # We need to find where call_output_list starts (it's a list of dicts/named outputs)
1024 input_args = []
1025 output_dests = []
1026
1027 for i, item in enumerate(args_list[1:], start=1):
1028 if isinstance(item, list):
1029 # This is call_output_list result — flatten into output_dests
1030 output_dests.extend(item)
1031 elif isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str):
1032 # named_arg: (name, value)
1033 input_args.append(item)
1034 elif isinstance(item, dict) and "name" in item:
1035 # positional_arg (qualified_ref)
1036 input_args.append((None, item)) # Store as (None, ref_dict) for positional
1037 elif isinstance(item, int):
1038 # literal value
1039 input_args.append((None, item))
1040 else:
1041 # Fallback: treat as positional value
1042 input_args.append((None, item))
1043
1044 call_site = CallSiteResult(
1045 func_name=func_name,
1046 input_args=tuple(input_args),
1047 output_dests=tuple(output_dests),
1048 loc=loc,
1049 )
1050
1051 return CallSiteResultStatement(call_site)
1052
1053 def call_output_list(self, args: list) -> list:
1054 """Process call output list — returns list of output dests."""
1055 return [a for a in args if a is not None]
1056
1057 @v_args(inline=True)
1058 def named_output(self, name_tok, ref) -> dict:
1059 """Process named output: name=@dest.
1060 Returns {"name": str, "ref": ref_dict} so the expand pass can map
1061 @ret_name return markers to the specified call-site destination.
1062 """
1063 # name_tok could be a LarkToken
1064 if isinstance(name_tok, LarkToken):
1065 name_str = str(name_tok)
1066 else:
1067 name_str = name_tok
1068 return {"name": name_str, "ref": ref}
1069
1070 @v_args(inline=True)
1071 def positional_output(self, ref) -> dict:
1072 """Process positional output: bare @dest or &ref."""
1073 return ref
1074
1075 def macro_ref(self, args: list) -> dict:
1076 """Process macro reference (#name)."""
1077 token = args[0]
1078 return {"name": f"#{token}"}
1079
1080 def scoped_ref(self, args: list) -> dict:
1081 """Process dot-notation scope reference ($func.&label or #macro.&label)."""
1082 args_list = _filter_args(args)
1083 scope_dict = args_list[0] # func_ref or macro_ref dict
1084 inner_dict = args_list[1] # label_ref or node_ref dict
1085 scope_name = scope_dict["name"]
1086 inner_name = inner_dict["name"]
1087 return {"name": f"{scope_name}.{inner_name}"}
1088
1089 @v_args(inline=True, meta=True)
1090 def data_def(self, meta, *args) -> StatementResult:
1091 """Process data definition."""
1092 loc = self._extract_loc(meta)
1093
1094 args_list = _filter_args(args)
1095 qualified_ref_dict = args_list[0]
1096 value_data = args_list[1] if len(args_list) > 1 else None
1097
1098 name = qualified_ref_dict["name"]
1099
1100 # Extract SM ID from placement
1101 sm_id = None
1102 if "placement" in qualified_ref_dict and qualified_ref_dict["placement"]:
1103 placement_val = qualified_ref_dict["placement"]
1104 if isinstance(placement_val, str) and placement_val.startswith("sm"):
1105 try:
1106 sm_id = int(placement_val[2:])
1107 except ValueError:
1108 pass
1109
1110 # Extract cell address from port
1111 # The port value from qualified_ref can be:
1112 # - Port.L/Port.R (for plain edge context)
1113 # - raw int (for data_def context, e.g., :0, :1, :2, etc.)
1114 cell_addr = None
1115 if "port" in qualified_ref_dict and qualified_ref_dict["port"] is not None:
1116 port_val = qualified_ref_dict["port"]
1117 # Extract the numeric value regardless of type
1118 if isinstance(port_val, Port):
1119 cell_addr = int(port_val)
1120 elif isinstance(port_val, int):
1121 cell_addr = port_val
1122
1123 # Handle value_data
1124 value = 0
1125 if isinstance(value_data, list):
1126 # value_list: pack values
1127 if all(isinstance(v, int) for v in value_data):
1128 # Integer values or char values
1129 if len(value_data) == 1:
1130 value = value_data[0]
1131 else:
1132 # Multiple values: only valid if all are bytes (0-255)
1133 if any(v > 255 for v in value_data):
1134 self._errors.append(AssemblyError(
1135 loc=loc,
1136 category=ErrorCategory.VALUE,
1137 message=f"Multi-value data definition cannot contain values > 255. "
1138 f"Data defs support either a single 16-bit value OR multiple byte-values packed into one word.",
1139 ))
1140 value = value_data[0] # Use first value as fallback
1141 else:
1142 # All bytes: take the already-packed value from value_list
1143 value = value_data[0] # value_list already packs consecutive pairs
1144 else:
1145 value = value_data
1146
1147 data_def = IRDataDef(
1148 name=name,
1149 sm_id=sm_id,
1150 cell_addr=cell_addr,
1151 value=value,
1152 loc=loc,
1153 )
1154
1155 return DataDefResult([data_def])
1156
1157 @v_args(inline=True, meta=True)
1158 def location_dir(self, meta, *args) -> StatementResult:
1159 """Process location directive."""
1160 loc = self._extract_loc(meta)
1161
1162 args_list = _filter_args(args)
1163 qualified_ref_dict = args_list[0]
1164
1165 tag = qualified_ref_dict["name"]
1166
1167 # Create region for location
1168 region = IRRegion(
1169 tag=tag,
1170 kind=RegionKind.LOCATION,
1171 body=IRGraph(),
1172 loc=loc,
1173 )
1174
1175 return LocationResult(region)
1176
1177 @v_args(inline=True, meta=True)
1178 def system_pragma(self, meta, *params) -> Optional[StatementResult]:
1179 """Process @system pragma."""
1180 loc = self._extract_loc(meta)
1181
1182 # Filter out tokens
1183 params_list = _filter_args(params)
1184
1185 # Check for duplicate @system pragma
1186 if self._system is not None:
1187 self._errors.append(AssemblyError(
1188 loc=loc,
1189 category=ErrorCategory.PARSE,
1190 message="Duplicate @system pragma",
1191 ))
1192 return None
1193
1194 # params are (name, value) tuples from system_param
1195 config_dict = {}
1196 for param_tuple in params_list:
1197 if isinstance(param_tuple, tuple):
1198 param_name, param_value = param_tuple
1199 config_dict[param_name] = param_value
1200
1201 # Map parameter names
1202 pe_count = config_dict.get("pe")
1203 sm_count = config_dict.get("sm")
1204 iram_capacity = config_dict.get("iram", 256)
1205 frame_count = config_dict.get("frames", 8)
1206
1207 if pe_count is None or sm_count is None:
1208 self._errors.append(AssemblyError(
1209 loc=loc,
1210 category=ErrorCategory.PARSE,
1211 message="@system pragma requires at least 'pe' and 'sm' parameters",
1212 ))
1213 return None
1214
1215 self._system = SystemConfig(
1216 pe_count=pe_count,
1217 sm_count=sm_count,
1218 iram_capacity=iram_capacity,
1219 frame_count=frame_count,
1220 loc=loc,
1221 )
1222 return None # Don't return a StatementResult for pragmas
1223
1224 @v_args(inline=True)
1225 def system_param(self, param_name: LarkToken, value) -> tuple[str, int]:
1226 """Process @system parameter."""
1227 # value can be a token (DEC_LIT or HEX_LIT) or already an int
1228 if isinstance(value, LarkToken):
1229 value = int(str(value), 0) # 0 base handles both decimal and 0x hex
1230 return (str(param_name), value)
1231
1232 @v_args(inline=True)
1233 def opcode(self, token) -> Optional[Union[ALUOp, MemOp, ParamRef]]:
1234 """Map opcode token to ALUOp/MemOp enum, ParamRef, or None if invalid."""
1235 if isinstance(token, ParamRef):
1236 return token
1237 mnemonic = str(token)
1238 if mnemonic not in MNEMONIC_TO_OP:
1239 self._errors.append(AssemblyError(
1240 loc=SourceLoc(line=token.line, column=token.column),
1241 category=ErrorCategory.PARSE,
1242 message=f"Unknown opcode '{mnemonic}'",
1243 ))
1244 return None
1245
1246 return MNEMONIC_TO_OP[mnemonic]
1247
1248 @v_args(inline=True)
1249 def qualified_ref(self, *args) -> dict:
1250 """Collect qualified reference components into a dict."""
1251 ref_type = None
1252 placement = None
1253 act_slot = None
1254 port = None
1255
1256 for arg in args:
1257 if isinstance(arg, PlacementRef):
1258 placement = arg
1259 elif isinstance(arg, PortRef):
1260 port = arg
1261 elif isinstance(arg, (ActSlotRef, ActSlotRange)):
1262 act_slot = arg
1263 elif isinstance(arg, (Port, int)):
1264 port = arg
1265 elif isinstance(arg, ParamRef):
1266 ref_type = {"name": arg}
1267 elif isinstance(arg, dict):
1268 ref_type = arg
1269 elif isinstance(arg, str) and (arg.startswith("pe") or arg.startswith("sm")):
1270 placement = arg
1271
1272 result = ref_type.copy() if ref_type else {}
1273 if placement is not None:
1274 result["placement"] = placement
1275 if act_slot is not None:
1276 result["act_slot"] = act_slot
1277 if port is not None:
1278 result["port"] = port
1279
1280 return result
1281
1282 @v_args(inline=True)
1283 def node_ref(self, token: LarkToken) -> dict:
1284 """Process @name reference."""
1285 return {"name": f"@{token}"}
1286
1287 @v_args(inline=True)
1288 def label_ref(self, token: LarkToken) -> dict:
1289 """Process &name reference."""
1290 return {"name": f"&{token}"}
1291
1292 @v_args(inline=True)
1293 def func_ref(self, token: LarkToken) -> dict:
1294 """Process $name reference."""
1295 return {"name": f"${token}"}
1296
1297 def param_ref(self, args: list) -> Union[ParamRef, dict]:
1298 """Process ${name} macro parameter reference.
1299
1300 Returns ParamRef directly. When used in qualified_ref context,
1301 the qualified_ref handler wraps it in a dict.
1302 """
1303 name = str(args[-1])
1304 return ParamRef(param=name)
1305
1306 @v_args(inline=True)
1307 def placement(self, token) -> Union[str, PlacementRef]:
1308 """Extract placement specifier."""
1309 if isinstance(token, ParamRef):
1310 return PlacementRef(param=token)
1311 return str(token)
1312
1313 def ctx_slot(self, args: list):
1314 """Extract context slot specifier.
1315
1316 Always returns a typed wrapper (ActSlotRef, ActSlotRange) so
1317 qualified_ref can distinguish ctx_slot ints from port ints.
1318 """
1319 if len(args) == 1:
1320 arg = args[0]
1321 if isinstance(arg, ParamRef):
1322 return ActSlotRef(param=arg)
1323 if isinstance(arg, ActSlotRange):
1324 return arg
1325 n = int(str(arg))
1326 return ActSlotRange(start=n, end=n)
1327 return args[0]
1328
1329 def ctx_range(self, args: list) -> ActSlotRange:
1330 """Extract context slot range (start..end)."""
1331 return ActSlotRange(start=int(str(args[0])), end=int(str(args[1])))
1332
1333 @v_args(inline=True)
1334 def port(self, token) -> Union[Port, int, PortRef]:
1335 """Convert port specifier to Port enum, raw int, or PortRef.
1336
1337 Returns:
1338 Port.L for "L"
1339 Port.R for "R"
1340 Raw int for numeric values (e.g., cell address in data_def)
1341 PortRef for param_ref
1342 """
1343 if isinstance(token, ParamRef):
1344 return PortRef(param=token)
1345 spec = str(token)
1346 if spec == "L":
1347 return Port.L
1348 elif spec == "R":
1349 return Port.R
1350 else:
1351 try:
1352 return int(spec)
1353 except ValueError:
1354 return Port.L
1355
1356 @v_args(inline=True)
1357 def hex_literal(self, token: LarkToken) -> int:
1358 """Parse hexadecimal literal."""
1359 return int(str(token), 16)
1360
1361 @v_args(inline=True)
1362 def dec_literal(self, token: LarkToken) -> int:
1363 """Parse decimal literal."""
1364 return int(str(token))
1365
1366 def _process_escape_sequences(self, s: str) -> list[int]:
1367 """Process escape sequences in a string.
1368
1369 Handles: \\n, \\t, \\r, \\0, \\\\, \\\', \\x##
1370
1371 Args:
1372 s: String with potential escape sequences
1373
1374 Returns:
1375 List of character codes
1376 """
1377 result = []
1378 i = 0
1379 while i < len(s):
1380 if i + 1 < len(s) and s[i] == "\\":
1381 next_char = s[i + 1]
1382 if next_char == "n":
1383 result.append(ord("\n"))
1384 i += 2
1385 elif next_char == "t":
1386 result.append(ord("\t"))
1387 i += 2
1388 elif next_char == "r":
1389 result.append(ord("\r"))
1390 i += 2
1391 elif next_char == "0":
1392 result.append(0)
1393 i += 2
1394 elif next_char == "\\":
1395 result.append(ord("\\"))
1396 i += 2
1397 elif next_char == "'":
1398 result.append(ord("'"))
1399 i += 2
1400 elif next_char == '"':
1401 result.append(ord('"'))
1402 i += 2
1403 elif next_char == "x" and i + 3 < len(s):
1404 # Hex escape: \xHH
1405 hex_str = s[i + 2:i + 4]
1406 try:
1407 result.append(int(hex_str, 16))
1408 i += 4
1409 except ValueError:
1410 # Invalid hex, just include the character
1411 result.append(ord(s[i]))
1412 i += 1
1413 else:
1414 # Unknown escape, just include the character
1415 result.append(ord(s[i]))
1416 i += 1
1417 else:
1418 result.append(ord(s[i]))
1419 i += 1
1420 return result
1421
1422 @v_args(inline=True)
1423 def char_literal(self, token: LarkToken) -> int:
1424 """Parse character literal."""
1425 s = str(token)
1426 # Remove surrounding quotes
1427 s = s[1:-1]
1428 # Handle escape sequences
1429 if s == "\\n":
1430 return ord("\n")
1431 elif s == "\\t":
1432 return ord("\t")
1433 elif s == "\\r":
1434 return ord("\r")
1435 elif s == "\\0":
1436 return 0
1437 elif s == "\\\\":
1438 return ord("\\")
1439 elif s == "\\'":
1440 return ord("'")
1441 elif s.startswith("\\x"):
1442 return int(s[2:], 16)
1443 else:
1444 return ord(s[0])
1445
1446 @v_args(inline=True)
1447 def string_literal(self, token: LarkToken) -> list[int]:
1448 """Parse string literal (returns list of character codes)."""
1449 s = str(token)[1:-1] # Remove quotes
1450 return self._process_escape_sequences(s)
1451
1452 @v_args(inline=True)
1453 def raw_string_literal(self, token: LarkToken) -> list[int]:
1454 """Parse raw string literal (no escape processing)."""
1455 s = str(token)[2:-1] # Remove r" and "
1456 return [ord(c) for c in s]
1457
1458 @v_args(inline=True)
1459 def byte_string_literal(self, token: LarkToken) -> list[int]:
1460 """Parse byte string literal."""
1461 s = str(token)[2:-1] # Remove b" and "
1462 return self._process_escape_sequences(s)
1463
1464 @v_args(inline=True)
1465 def named_arg(self, arg_name: LarkToken, value: Any) -> tuple[str, Any]:
1466 """Process named argument."""
1467 return (str(arg_name), value)
1468
1469 @v_args(inline=True)
1470 def ref_list(self, *refs) -> list[dict]:
1471 """Collect reference list."""
1472 return list(refs)
1473
1474 @v_args(inline=True)
1475 def value_list(self, *values) -> list[int]:
1476 """Collect value list and pack multi-char values big-endian.
1477
1478 - Hex/dec literals: returned as single values (not packed)
1479 - Multiple char values: packed big-endian into 16-bit words
1480 - String/list data: chars extracted and packed
1481 """
1482 # Flatten values (strings return lists of char codes)
1483 result = []
1484 for value in values:
1485 if isinstance(value, list):
1486 # String data from string_literal, etc.
1487 result.extend(value)
1488 else:
1489 # Single value (char or hex/dec literal)
1490 result.append(value)
1491
1492 # Only pack if we have multiple values (char pairs) AND all are bytes
1493 if len(result) <= 1:
1494 # Single value: return as-is (whether hex literal or single char)
1495 return result
1496
1497 all_bytes = all(0 <= v <= 255 for v in result)
1498 if not all_bytes:
1499 # Mixed or large values, return as-is
1500 return result
1501
1502 # Multiple bytes: pack consecutive pairs big-endian
1503 packed = []
1504 i = 0
1505 while i < len(result):
1506 if i + 1 < len(result):
1507 # Two bytes: big-endian
1508 val = (result[i] << 8) | result[i + 1]
1509 packed.append(val)
1510 i += 2
1511 else:
1512 # Single byte: pad with 0 in low byte
1513 val = (result[i] << 8) | 0x00
1514 packed.append(val)
1515 i += 1
1516
1517 return packed
1518
1519
1520def lower(tree) -> IRGraph:
1521 """Lower a parse tree into an IRGraph.
1522
1523 Args:
1524 tree: A Lark parse tree from parsing dfasm source
1525
1526 Returns:
1527 An IRGraph with nodes, edges, regions, and any errors encountered
1528 """
1529 transformer = LowerTransformer()
1530 return transformer.transform(tree)