A Python port of the Invisible Internet Project (I2P)
at main 351 lines 12 kB view raw
1"""Router data structures — RouterIdentity, RouterAddress, RouterInfo. 2 3Ported from net.i2p.data.router.RouterIdentity, RouterAddress, RouterInfo. 4""" 5 6from __future__ import annotations 7 8import io 9import struct 10 11from i2p_data.keys_and_cert import KeysAndCert 12 13 14def _parse_mapping(data: bytes) -> dict[str, str]: 15 """Parse an I2P Mapping from raw bytes. 16 17 The I2P Mapping format (used inside RouterAddress and RouterInfo options) 18 consists of key-value pairs where each key and value is a length-prefixed 19 UTF-8 string, separated by '=' and terminated by ';': 20 21 <len_byte><key_bytes>=<len_byte><value_bytes>; 22 23 This repeats until all bytes in the mapping are consumed. 24 """ 25 options: dict[str, str] = {} 26 offset = 0 27 while offset < len(data): 28 # Read key: 1-byte length + key bytes 29 if offset >= len(data): 30 break 31 key_len = data[offset] 32 offset += 1 33 if offset + key_len > len(data): 34 break 35 key = data[offset:offset + key_len].decode("utf-8", errors="replace") 36 offset += key_len 37 38 # Expect '=' 39 if offset >= len(data) or data[offset:offset + 1] != b"=": 40 break 41 offset += 1 42 43 # Read value: 1-byte length + value bytes 44 if offset >= len(data): 45 break 46 val_len = data[offset] 47 offset += 1 48 if offset + val_len > len(data): 49 break 50 value = data[offset:offset + val_len].decode("utf-8", errors="replace") 51 offset += val_len 52 53 # Expect ';' 54 if offset >= len(data) or data[offset:offset + 1] != b";": 55 # Try to accept anyway 56 options[key] = value 57 break 58 offset += 1 59 60 options[key] = value 61 62 return options 63 64 65def _serialize_mapping(options: dict[str, str]) -> bytes: 66 """Serialize a dict to I2P Mapping binary format. 67 68 Each pair is: len_byte key_bytes = len_byte value_bytes ; 69 Keys are written in sorted order per the I2P spec. 70 """ 71 buf = io.BytesIO() 72 for key in sorted(options.keys()): 73 key_bytes = key.encode("utf-8") 74 val_bytes = options[key].encode("utf-8") 75 buf.write(struct.pack("!B", len(key_bytes))) 76 buf.write(key_bytes) 77 buf.write(b"=") 78 buf.write(struct.pack("!B", len(val_bytes))) 79 buf.write(val_bytes) 80 buf.write(b";") 81 return buf.getvalue() 82 83 84class RouterIdentity(KeysAndCert): 85 """Router identity — a KeysAndCert identifying a router in the I2P network.""" 86 87 @classmethod 88 def from_bytes(cls, data: bytes) -> "RouterIdentity": 89 """Deserialize from wire format.""" 90 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 91 from i2p_data.certificate import Certificate, KeyCertificate 92 from i2p_crypto.dsa import SigType 93 94 if len(data) < 387: 95 raise ValueError(f"RouterIdentity requires at least 387 bytes, got {len(data)}") 96 97 pub_area = data[:cls.PUBKEY_AREA_SIZE] 98 sig_area = data[cls.PUBKEY_AREA_SIZE:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE] 99 cert_data = data[cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE:] 100 101 cert = Certificate.from_bytes(cert_data) 102 103 if isinstance(cert, KeyCertificate): 104 enc_type = cert.get_enc_type() or EncType.ELGAMAL 105 sig_type = cert.get_sig_type() or SigType.DSA_SHA1 106 else: 107 enc_type = EncType.ELGAMAL 108 sig_type = SigType.DSA_SHA1 109 110 pub_len = enc_type.pubkey_len 111 pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type) 112 113 sig_len = sig_type.pubkey_len 114 sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type) 115 116 raw = data[:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE + len(cert)] 117 return cls(pub_key, sig_key, cert, raw=raw) 118 119 120class RouterAddress: 121 """A transport address for a router. 122 123 Contains cost (priority), expiration, transport type, and options. 124 """ 125 126 __slots__ = ("_cost", "_expiration", "_transport", "_options") 127 128 def __init__(self, cost: int, expiration: int, transport: str, 129 options: dict[str, str] | None = None) -> None: 130 self._cost = cost 131 self._expiration = expiration 132 self._transport = transport 133 self._options = dict(options) if options else {} 134 135 @property 136 def cost(self) -> int: 137 return self._cost 138 139 @property 140 def expiration(self) -> int: 141 return self._expiration 142 143 @property 144 def transport(self) -> str: 145 return self._transport 146 147 @property 148 def options(self) -> dict[str, str]: 149 return dict(self._options) 150 151 def get_host(self) -> str | None: 152 return self._options.get("host") 153 154 def get_port(self) -> int | None: 155 port = self._options.get("port") 156 return int(port) if port is not None else None 157 158 def to_bytes(self) -> bytes: 159 """Serialize to I2P wire format. 160 161 Format: 1 byte cost + 8 bytes expiration + 1 byte transport_len + 162 transport string + properties 163 """ 164 buf = io.BytesIO() 165 buf.write(struct.pack("!B", self._cost)) 166 buf.write(struct.pack("!Q", self._expiration)) 167 168 transport_bytes = self._transport.encode("utf-8") 169 buf.write(struct.pack("!B", len(transport_bytes))) 170 buf.write(transport_bytes) 171 172 # Write properties (I2P Mapping format) 173 if self._options: 174 props_data = _serialize_mapping(self._options) 175 buf.write(struct.pack("!H", len(props_data))) 176 buf.write(props_data) 177 else: 178 buf.write(struct.pack("!H", 0)) 179 180 return buf.getvalue() 181 182 @classmethod 183 def from_bytes(cls, data: bytes) -> tuple["RouterAddress", int]: 184 """Deserialize from bytes. Returns (address, bytes_consumed).""" 185 stream = io.BytesIO(data) 186 return cls.from_stream(stream) 187 188 @classmethod 189 def from_stream(cls, stream: io.IOBase) -> tuple["RouterAddress", int]: 190 """Read from stream. Returns (address, bytes_consumed).""" 191 start = stream.tell() 192 193 cost = struct.unpack("!B", stream.read(1))[0] 194 expiration = struct.unpack("!Q", stream.read(8))[0] 195 196 transport_len = struct.unpack("!B", stream.read(1))[0] 197 transport = stream.read(transport_len).decode("utf-8") 198 199 # Read properties (I2P Mapping format) 200 props_len = struct.unpack("!H", stream.read(2))[0] 201 options = {} 202 if props_len > 0: 203 props_data = stream.read(props_len) 204 options = _parse_mapping(props_data) 205 206 consumed = stream.tell() - start 207 return cls(cost, expiration, transport, options), consumed 208 209 def __eq__(self, other: object) -> bool: 210 if not isinstance(other, RouterAddress): 211 return NotImplemented 212 return (self._cost == other._cost and self._expiration == other._expiration 213 and self._transport == other._transport and self._options == other._options) 214 215 def __repr__(self) -> str: 216 host = self.get_host() or "?" 217 port = self.get_port() or "?" 218 return f"RouterAddress({self._transport}://{host}:{port}, cost={self._cost})" 219 220 221class RouterInfo: 222 """Router information — identity, addresses, options, and signature. 223 224 A signed database entry describing a router's capabilities and 225 transport addresses. 226 """ 227 228 __slots__ = ("_identity", "_published", "_addresses", "_options", "_signature") 229 230 def __init__(self, identity: RouterIdentity, published: int, 231 addresses: list[RouterAddress] | None = None, 232 options: dict[str, str] | None = None, 233 signature: bytes = b"") -> None: 234 self._identity = identity 235 self._published = published 236 self._addresses = list(addresses) if addresses else [] 237 self._options = dict(options) if options else {} 238 self._signature = signature 239 240 @property 241 def identity(self) -> RouterIdentity: 242 return self._identity 243 244 @property 245 def published(self) -> int: 246 return self._published 247 248 @property 249 def addresses(self) -> list[RouterAddress]: 250 return list(self._addresses) 251 252 @property 253 def options(self) -> dict[str, str]: 254 return dict(self._options) 255 256 @property 257 def signature(self) -> bytes: 258 return self._signature 259 260 def _signable_bytes(self) -> bytes: 261 """Get the bytes that are signed (everything except the signature).""" 262 buf = io.BytesIO() 263 264 # Identity 265 buf.write(self._identity.to_bytes()) 266 267 # Published date (8 bytes, milliseconds) 268 buf.write(struct.pack("!Q", self._published)) 269 270 # Number of addresses (1 byte) 271 buf.write(struct.pack("!B", len(self._addresses))) 272 273 # Addresses 274 for addr in self._addresses: 275 buf.write(addr.to_bytes()) 276 277 # Peer size (always 0 in current I2P) 278 buf.write(struct.pack("!B", 0)) 279 280 # Options (I2P Mapping format) 281 if self._options: 282 props_data = _serialize_mapping(self._options) 283 buf.write(struct.pack("!H", len(props_data))) 284 buf.write(props_data) 285 else: 286 buf.write(struct.pack("!H", 0)) 287 288 return buf.getvalue() 289 290 def to_bytes(self) -> bytes: 291 """Serialize to wire format.""" 292 return self._signable_bytes() + self._signature 293 294 def sign(self, private_key: bytes) -> None: 295 """Sign this RouterInfo with the given private key.""" 296 from i2p_crypto.dsa import DSAEngine 297 sig_type = self._identity.signing_public_key.sig_type 298 self._signature = DSAEngine.sign(self._signable_bytes(), private_key, sig_type) 299 300 def verify(self) -> bool: 301 """Verify the signature using the identity's signing public key.""" 302 if not self._signature: 303 return False 304 from i2p_crypto.dsa import DSAEngine 305 sig_type = self._identity.signing_public_key.sig_type 306 pub_key = self._identity.signing_public_key.to_bytes() 307 return DSAEngine.verify(self._signable_bytes(), self._signature, pub_key, sig_type) 308 309 @classmethod 310 def from_bytes(cls, data: bytes) -> "RouterInfo": 311 """Deserialize from wire format.""" 312 from i2p_data.certificate import Certificate, KeyCertificate 313 from i2p_crypto.dsa import SigType 314 315 stream = io.BytesIO(data) 316 317 # Read identity 318 identity = RouterIdentity.from_bytes(data) 319 identity_len = KeysAndCert.PUBKEY_AREA_SIZE + KeysAndCert.SIGKEY_AREA_SIZE + len(identity.certificate) 320 stream.seek(identity_len) 321 322 # Published date 323 published = struct.unpack("!Q", stream.read(8))[0] 324 325 # Addresses 326 num_addresses = struct.unpack("!B", stream.read(1))[0] 327 addresses = [] 328 for _ in range(num_addresses): 329 remaining = data[stream.tell():] 330 addr, consumed = RouterAddress.from_bytes(remaining) 331 addresses.append(addr) 332 stream.seek(stream.tell() + consumed) 333 334 # Peer size (skip) 335 struct.unpack("!B", stream.read(1))[0] 336 337 # Options (I2P Mapping format) 338 props_len = struct.unpack("!H", stream.read(2))[0] 339 options = {} 340 if props_len > 0: 341 props_data = stream.read(props_len) 342 options = _parse_mapping(props_data) 343 344 # Signature (remaining bytes) 345 sig_type = identity.signing_public_key.sig_type 346 signature = stream.read(sig_type.sig_len) 347 348 return cls(identity, published, addresses, options, signature) 349 350 def __repr__(self) -> str: 351 return f"RouterInfo(hash={self._identity.hash()[:4].hex()}..., addrs={len(self._addresses)})"