"""Router data structures — RouterIdentity, RouterAddress, RouterInfo. Ported from net.i2p.data.router.RouterIdentity, RouterAddress, RouterInfo. """ from __future__ import annotations import io import struct from i2p_data.keys_and_cert import KeysAndCert def _parse_mapping(data: bytes) -> dict[str, str]: """Parse an I2P Mapping from raw bytes. The I2P Mapping format (used inside RouterAddress and RouterInfo options) consists of key-value pairs where each key and value is a length-prefixed UTF-8 string, separated by '=' and terminated by ';': =; This repeats until all bytes in the mapping are consumed. """ options: dict[str, str] = {} offset = 0 while offset < len(data): # Read key: 1-byte length + key bytes if offset >= len(data): break key_len = data[offset] offset += 1 if offset + key_len > len(data): break key = data[offset:offset + key_len].decode("utf-8", errors="replace") offset += key_len # Expect '=' if offset >= len(data) or data[offset:offset + 1] != b"=": break offset += 1 # Read value: 1-byte length + value bytes if offset >= len(data): break val_len = data[offset] offset += 1 if offset + val_len > len(data): break value = data[offset:offset + val_len].decode("utf-8", errors="replace") offset += val_len # Expect ';' if offset >= len(data) or data[offset:offset + 1] != b";": # Try to accept anyway options[key] = value break offset += 1 options[key] = value return options def _serialize_mapping(options: dict[str, str]) -> bytes: """Serialize a dict to I2P Mapping binary format. Each pair is: len_byte key_bytes = len_byte value_bytes ; Keys are written in sorted order per the I2P spec. """ buf = io.BytesIO() for key in sorted(options.keys()): key_bytes = key.encode("utf-8") val_bytes = options[key].encode("utf-8") buf.write(struct.pack("!B", len(key_bytes))) buf.write(key_bytes) buf.write(b"=") buf.write(struct.pack("!B", len(val_bytes))) buf.write(val_bytes) buf.write(b";") return buf.getvalue() class RouterIdentity(KeysAndCert): """Router identity — a KeysAndCert identifying a router in the I2P network.""" @classmethod def from_bytes(cls, data: bytes) -> "RouterIdentity": """Deserialize from wire format.""" from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import Certificate, KeyCertificate from i2p_crypto.dsa import SigType if len(data) < 387: raise ValueError(f"RouterIdentity requires at least 387 bytes, got {len(data)}") pub_area = data[:cls.PUBKEY_AREA_SIZE] sig_area = data[cls.PUBKEY_AREA_SIZE:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE] cert_data = data[cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE:] cert = Certificate.from_bytes(cert_data) if isinstance(cert, KeyCertificate): enc_type = cert.get_enc_type() or EncType.ELGAMAL sig_type = cert.get_sig_type() or SigType.DSA_SHA1 else: enc_type = EncType.ELGAMAL sig_type = SigType.DSA_SHA1 pub_len = enc_type.pubkey_len pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type) sig_len = sig_type.pubkey_len sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type) raw = data[:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE + len(cert)] return cls(pub_key, sig_key, cert, raw=raw) class RouterAddress: """A transport address for a router. Contains cost (priority), expiration, transport type, and options. """ __slots__ = ("_cost", "_expiration", "_transport", "_options") def __init__(self, cost: int, expiration: int, transport: str, options: dict[str, str] | None = None) -> None: self._cost = cost self._expiration = expiration self._transport = transport self._options = dict(options) if options else {} @property def cost(self) -> int: return self._cost @property def expiration(self) -> int: return self._expiration @property def transport(self) -> str: return self._transport @property def options(self) -> dict[str, str]: return dict(self._options) def get_host(self) -> str | None: return self._options.get("host") def get_port(self) -> int | None: port = self._options.get("port") return int(port) if port is not None else None def to_bytes(self) -> bytes: """Serialize to I2P wire format. Format: 1 byte cost + 8 bytes expiration + 1 byte transport_len + transport string + properties """ buf = io.BytesIO() buf.write(struct.pack("!B", self._cost)) buf.write(struct.pack("!Q", self._expiration)) transport_bytes = self._transport.encode("utf-8") buf.write(struct.pack("!B", len(transport_bytes))) buf.write(transport_bytes) # Write properties (I2P Mapping format) if self._options: props_data = _serialize_mapping(self._options) buf.write(struct.pack("!H", len(props_data))) buf.write(props_data) else: buf.write(struct.pack("!H", 0)) return buf.getvalue() @classmethod def from_bytes(cls, data: bytes) -> tuple["RouterAddress", int]: """Deserialize from bytes. Returns (address, bytes_consumed).""" stream = io.BytesIO(data) return cls.from_stream(stream) @classmethod def from_stream(cls, stream: io.IOBase) -> tuple["RouterAddress", int]: """Read from stream. Returns (address, bytes_consumed).""" start = stream.tell() cost = struct.unpack("!B", stream.read(1))[0] expiration = struct.unpack("!Q", stream.read(8))[0] transport_len = struct.unpack("!B", stream.read(1))[0] transport = stream.read(transport_len).decode("utf-8") # Read properties (I2P Mapping format) props_len = struct.unpack("!H", stream.read(2))[0] options = {} if props_len > 0: props_data = stream.read(props_len) options = _parse_mapping(props_data) consumed = stream.tell() - start return cls(cost, expiration, transport, options), consumed def __eq__(self, other: object) -> bool: if not isinstance(other, RouterAddress): return NotImplemented return (self._cost == other._cost and self._expiration == other._expiration and self._transport == other._transport and self._options == other._options) def __repr__(self) -> str: host = self.get_host() or "?" port = self.get_port() or "?" return f"RouterAddress({self._transport}://{host}:{port}, cost={self._cost})" class RouterInfo: """Router information — identity, addresses, options, and signature. A signed database entry describing a router's capabilities and transport addresses. """ __slots__ = ("_identity", "_published", "_addresses", "_options", "_signature") def __init__(self, identity: RouterIdentity, published: int, addresses: list[RouterAddress] | None = None, options: dict[str, str] | None = None, signature: bytes = b"") -> None: self._identity = identity self._published = published self._addresses = list(addresses) if addresses else [] self._options = dict(options) if options else {} self._signature = signature @property def identity(self) -> RouterIdentity: return self._identity @property def published(self) -> int: return self._published @property def addresses(self) -> list[RouterAddress]: return list(self._addresses) @property def options(self) -> dict[str, str]: return dict(self._options) @property def signature(self) -> bytes: return self._signature def _signable_bytes(self) -> bytes: """Get the bytes that are signed (everything except the signature).""" buf = io.BytesIO() # Identity buf.write(self._identity.to_bytes()) # Published date (8 bytes, milliseconds) buf.write(struct.pack("!Q", self._published)) # Number of addresses (1 byte) buf.write(struct.pack("!B", len(self._addresses))) # Addresses for addr in self._addresses: buf.write(addr.to_bytes()) # Peer size (always 0 in current I2P) buf.write(struct.pack("!B", 0)) # Options (I2P Mapping format) if self._options: props_data = _serialize_mapping(self._options) buf.write(struct.pack("!H", len(props_data))) buf.write(props_data) else: buf.write(struct.pack("!H", 0)) return buf.getvalue() def to_bytes(self) -> bytes: """Serialize to wire format.""" return self._signable_bytes() + self._signature def sign(self, private_key: bytes) -> None: """Sign this RouterInfo with the given private key.""" from i2p_crypto.dsa import DSAEngine sig_type = self._identity.signing_public_key.sig_type self._signature = DSAEngine.sign(self._signable_bytes(), private_key, sig_type) def verify(self) -> bool: """Verify the signature using the identity's signing public key.""" if not self._signature: return False from i2p_crypto.dsa import DSAEngine sig_type = self._identity.signing_public_key.sig_type pub_key = self._identity.signing_public_key.to_bytes() return DSAEngine.verify(self._signable_bytes(), self._signature, pub_key, sig_type) @classmethod def from_bytes(cls, data: bytes) -> "RouterInfo": """Deserialize from wire format.""" from i2p_data.certificate import Certificate, KeyCertificate from i2p_crypto.dsa import SigType stream = io.BytesIO(data) # Read identity identity = RouterIdentity.from_bytes(data) identity_len = KeysAndCert.PUBKEY_AREA_SIZE + KeysAndCert.SIGKEY_AREA_SIZE + len(identity.certificate) stream.seek(identity_len) # Published date published = struct.unpack("!Q", stream.read(8))[0] # Addresses num_addresses = struct.unpack("!B", stream.read(1))[0] addresses = [] for _ in range(num_addresses): remaining = data[stream.tell():] addr, consumed = RouterAddress.from_bytes(remaining) addresses.append(addr) stream.seek(stream.tell() + consumed) # Peer size (skip) struct.unpack("!B", stream.read(1))[0] # Options (I2P Mapping format) props_len = struct.unpack("!H", stream.read(2))[0] options = {} if props_len > 0: props_data = stream.read(props_len) options = _parse_mapping(props_data) # Signature (remaining bytes) sig_type = identity.signing_public_key.sig_type signature = stream.read(sig_type.sig_len) return cls(identity, published, addresses, options, signature) def __repr__(self) -> str: return f"RouterInfo(hash={self._identity.hash()[:4].hex()}..., addrs={len(self._addresses)})"