A Python port of the Invisible Internet Project (I2P)
at main 171 lines 5.2 kB view raw
1"""SAM v3.3 text protocol parser and formatter. 2 3Ported from net.i2p.sam.SAMUtils and related classes. 4""" 5 6from __future__ import annotations 7 8from i2p_sam.utils import parse_params 9 10 11# SAM protocol constants 12SUPPORTED_VERSIONS = ["3.0", "3.1", "3.2", "3.3"] 13MAX_VERSION = "3.3" 14MIN_VERSION = "3.0" 15 16 17class SAMCommand: 18 """Parsed SAM command with verb, opcode, and key=value params. 19 20 SAM protocol messages have the form: 21 VERB OPCODE KEY=VALUE KEY=VALUE ... 22 For example: 23 HELLO VERSION MIN=3.0 MAX=3.3 24 SESSION CREATE ID=test STYLE=STREAM DESTINATION=TRANSIENT 25 """ 26 27 def __init__(self, verb: str, opcode: str, params: dict[str, str]) -> None: 28 self.verb = verb 29 self.opcode = opcode 30 self.params = params 31 32 @classmethod 33 def parse(cls, line: str) -> "SAMCommand": 34 """Parse 'VERB OPCODE KEY=VALUE KEY=VALUE ...' text line. 35 36 Values may be quoted. Keys are case-insensitive (normalized to uppercase). 37 38 Args: 39 line: Raw text line from SAM protocol. 40 41 Returns: 42 Parsed SAMCommand instance. 43 44 Raises: 45 ValueError: If the line is empty or cannot be parsed. 46 """ 47 stripped = line.strip() 48 if not stripped: 49 raise ValueError("Empty SAM command") 50 51 parts = stripped.split(None, 2) # Split into at most 3 parts 52 53 verb = parts[0].upper() 54 55 if len(parts) == 1: 56 return cls(verb, "", {}) 57 58 opcode = parts[1] 59 60 if len(parts) == 2: 61 # Could be VERB OPCODE with no params, or opcode might contain = 62 if "=" in opcode: 63 # It's actually a param, not an opcode 64 params = parse_params(opcode) 65 return cls(verb, "", params) 66 return cls(verb, opcode, {}) 67 68 # parts[2] contains the rest (params) 69 remainder = parts[2] 70 71 # Check if opcode contains = (meaning it's actually a param) 72 if "=" in opcode: 73 params = parse_params(opcode + " " + remainder) 74 return cls(verb, "", params) 75 76 params = parse_params(remainder) 77 return cls(verb, opcode, params) 78 79 def __str__(self) -> str: 80 """Format back to SAM text protocol.""" 81 parts = [self.verb] 82 if self.opcode: 83 parts.append(self.opcode) 84 for key, value in self.params.items(): 85 if " " in value: 86 parts.append(f'{key}="{value}"') 87 else: 88 parts.append(f"{key}={value}") 89 return " ".join(parts) 90 91 def __repr__(self) -> str: 92 return f"SAMCommand({self.verb!r}, {self.opcode!r}, {self.params!r})" 93 94 95class SAMReply: 96 """Format SAM reply messages. 97 98 All reply methods return newline-terminated strings ready to send 99 over the wire. 100 """ 101 102 @staticmethod 103 def hello_ok(version: str = "3.3") -> str: 104 """HELLO REPLY with OK result.""" 105 return f"HELLO REPLY RESULT=OK VERSION={version}\n" 106 107 @staticmethod 108 def hello_noversion() -> str: 109 """HELLO REPLY with NOVERSION result.""" 110 return "HELLO REPLY RESULT=NOVERSION\n" 111 112 @staticmethod 113 def session_ok(destination: str) -> str: 114 """SESSION STATUS with OK result and destination.""" 115 return f"SESSION STATUS RESULT=OK DESTINATION={destination}\n" 116 117 @staticmethod 118 def session_error(result: str, message: str = "") -> str: 119 """SESSION STATUS with error result. 120 121 Args: 122 result: Error code (e.g. DUPLICATED_ID, INVALID_KEY). 123 message: Optional human-readable message. 124 """ 125 msg = f"SESSION STATUS RESULT={result}" 126 if message: 127 msg += f" MESSAGE=\"{message}\"" 128 return msg + "\n" 129 130 @staticmethod 131 def stream_ok() -> str: 132 """STREAM STATUS with OK result.""" 133 return "STREAM STATUS RESULT=OK\n" 134 135 @staticmethod 136 def stream_error(result: str, message: str = "") -> str: 137 """STREAM STATUS with error result. 138 139 Args: 140 result: Error code (e.g. CANT_REACH_PEER, TIMEOUT). 141 message: Optional human-readable message. 142 """ 143 msg = f"STREAM STATUS RESULT={result}" 144 if message: 145 msg += f" MESSAGE=\"{message}\"" 146 return msg + "\n" 147 148 @staticmethod 149 def dest_reply(name: str, destination: str) -> str: 150 """DEST REPLY with found destination.""" 151 return f"DEST REPLY RESULT=OK NAME={name} DESTINATION={destination}\n" 152 153 @staticmethod 154 def dest_not_found(name: str) -> str: 155 """DEST REPLY with KEY_NOT_FOUND result.""" 156 return f"DEST REPLY RESULT=KEY_NOT_FOUND NAME={name}\n" 157 158 @staticmethod 159 def naming_reply(name: str, value: str) -> str: 160 """NAMING REPLY with resolved name.""" 161 return f"NAMING REPLY RESULT=OK NAME={name} VALUE={value}\n" 162 163 @staticmethod 164 def naming_not_found(name: str) -> str: 165 """NAMING REPLY with KEY_NOT_FOUND result.""" 166 return f"NAMING REPLY RESULT=KEY_NOT_FOUND NAME={name}\n" 167 168 @staticmethod 169 def pong(data: str) -> str: 170 """PONG response to PING.""" 171 return f"PONG {data}\n"