A Python port of the Invisible Internet Project (I2P)
1"""SAM v3.3 text protocol parser and formatter.
2
3Ported from net.i2p.sam.SAMUtils and related classes.
4"""
5
6from __future__ import annotations
7
8from i2p_sam.utils import parse_params
9
10
11# SAM protocol constants
12SUPPORTED_VERSIONS = ["3.0", "3.1", "3.2", "3.3"]
13MAX_VERSION = "3.3"
14MIN_VERSION = "3.0"
15
16
17class SAMCommand:
18 """Parsed SAM command with verb, opcode, and key=value params.
19
20 SAM protocol messages have the form:
21 VERB OPCODE KEY=VALUE KEY=VALUE ...
22 For example:
23 HELLO VERSION MIN=3.0 MAX=3.3
24 SESSION CREATE ID=test STYLE=STREAM DESTINATION=TRANSIENT
25 """
26
27 def __init__(self, verb: str, opcode: str, params: dict[str, str]) -> None:
28 self.verb = verb
29 self.opcode = opcode
30 self.params = params
31
32 @classmethod
33 def parse(cls, line: str) -> "SAMCommand":
34 """Parse 'VERB OPCODE KEY=VALUE KEY=VALUE ...' text line.
35
36 Values may be quoted. Keys are case-insensitive (normalized to uppercase).
37
38 Args:
39 line: Raw text line from SAM protocol.
40
41 Returns:
42 Parsed SAMCommand instance.
43
44 Raises:
45 ValueError: If the line is empty or cannot be parsed.
46 """
47 stripped = line.strip()
48 if not stripped:
49 raise ValueError("Empty SAM command")
50
51 parts = stripped.split(None, 2) # Split into at most 3 parts
52
53 verb = parts[0].upper()
54
55 if len(parts) == 1:
56 return cls(verb, "", {})
57
58 opcode = parts[1]
59
60 if len(parts) == 2:
61 # Could be VERB OPCODE with no params, or opcode might contain =
62 if "=" in opcode:
63 # It's actually a param, not an opcode
64 params = parse_params(opcode)
65 return cls(verb, "", params)
66 return cls(verb, opcode, {})
67
68 # parts[2] contains the rest (params)
69 remainder = parts[2]
70
71 # Check if opcode contains = (meaning it's actually a param)
72 if "=" in opcode:
73 params = parse_params(opcode + " " + remainder)
74 return cls(verb, "", params)
75
76 params = parse_params(remainder)
77 return cls(verb, opcode, params)
78
79 def __str__(self) -> str:
80 """Format back to SAM text protocol."""
81 parts = [self.verb]
82 if self.opcode:
83 parts.append(self.opcode)
84 for key, value in self.params.items():
85 if " " in value:
86 parts.append(f'{key}="{value}"')
87 else:
88 parts.append(f"{key}={value}")
89 return " ".join(parts)
90
91 def __repr__(self) -> str:
92 return f"SAMCommand({self.verb!r}, {self.opcode!r}, {self.params!r})"
93
94
95class SAMReply:
96 """Format SAM reply messages.
97
98 All reply methods return newline-terminated strings ready to send
99 over the wire.
100 """
101
102 @staticmethod
103 def hello_ok(version: str = "3.3") -> str:
104 """HELLO REPLY with OK result."""
105 return f"HELLO REPLY RESULT=OK VERSION={version}\n"
106
107 @staticmethod
108 def hello_noversion() -> str:
109 """HELLO REPLY with NOVERSION result."""
110 return "HELLO REPLY RESULT=NOVERSION\n"
111
112 @staticmethod
113 def session_ok(destination: str) -> str:
114 """SESSION STATUS with OK result and destination."""
115 return f"SESSION STATUS RESULT=OK DESTINATION={destination}\n"
116
117 @staticmethod
118 def session_error(result: str, message: str = "") -> str:
119 """SESSION STATUS with error result.
120
121 Args:
122 result: Error code (e.g. DUPLICATED_ID, INVALID_KEY).
123 message: Optional human-readable message.
124 """
125 msg = f"SESSION STATUS RESULT={result}"
126 if message:
127 msg += f" MESSAGE=\"{message}\""
128 return msg + "\n"
129
130 @staticmethod
131 def stream_ok() -> str:
132 """STREAM STATUS with OK result."""
133 return "STREAM STATUS RESULT=OK\n"
134
135 @staticmethod
136 def stream_error(result: str, message: str = "") -> str:
137 """STREAM STATUS with error result.
138
139 Args:
140 result: Error code (e.g. CANT_REACH_PEER, TIMEOUT).
141 message: Optional human-readable message.
142 """
143 msg = f"STREAM STATUS RESULT={result}"
144 if message:
145 msg += f" MESSAGE=\"{message}\""
146 return msg + "\n"
147
148 @staticmethod
149 def dest_reply(name: str, destination: str) -> str:
150 """DEST REPLY with found destination."""
151 return f"DEST REPLY RESULT=OK NAME={name} DESTINATION={destination}\n"
152
153 @staticmethod
154 def dest_not_found(name: str) -> str:
155 """DEST REPLY with KEY_NOT_FOUND result."""
156 return f"DEST REPLY RESULT=KEY_NOT_FOUND NAME={name}\n"
157
158 @staticmethod
159 def naming_reply(name: str, value: str) -> str:
160 """NAMING REPLY with resolved name."""
161 return f"NAMING REPLY RESULT=OK NAME={name} VALUE={value}\n"
162
163 @staticmethod
164 def naming_not_found(name: str) -> str:
165 """NAMING REPLY with KEY_NOT_FOUND result."""
166 return f"NAMING REPLY RESULT=KEY_NOT_FOUND NAME={name}\n"
167
168 @staticmethod
169 def pong(data: str) -> str:
170 """PONG response to PING."""
171 return f"PONG {data}\n"