"""I2NP message wire format serialization/deserialization. Provides functional encode/decode for I2NP message headers (short and standard) and individual message type payloads (DatabaseStore, DatabaseLookup, DatabaseSearchReply, DeliveryStatus). All multi-byte fields are big-endian (network byte order) per the I2P spec. """ import hashlib import hmac as _hmac import struct # I2NP message type constants (from I2P specification) MSG_TYPE_DATABASE_STORE = 1 MSG_TYPE_DATABASE_LOOKUP = 2 MSG_TYPE_DATABASE_SEARCH_REPLY = 3 MSG_TYPE_DELIVERY_STATUS = 10 MSG_TYPE_GARLIC = 11 MSG_TYPE_TUNNEL_DATA = 18 MSG_TYPE_TUNNEL_GATEWAY = 19 MSG_TYPE_DATA = 20 MSG_TYPE_TUNNEL_BUILD = 21 MSG_TYPE_TUNNEL_BUILD_REPLY = 22 MSG_TYPE_VARIABLE_TUNNEL_BUILD = 23 MSG_TYPE_VARIABLE_TUNNEL_BUILD_REPLY = 24 # --------------------------------------------------------------------------- # Short header (used inside NTCP2 I2NP blocks) # Format: type(1) + msg_id(4 BE) + expiration_seconds(4 BE) + size(2 BE) + payload # Total header: 11 bytes # --------------------------------------------------------------------------- def encode_i2np_short(msg_type: int, msg_id: int, expiration: int, payload: bytes) -> bytes: """Encode I2NP message with short header (for NTCP2 blocks). Format: type(1) + msg_id(4 BE) + expiration_seconds(4 BE signed) + size(2 BE) + payload """ header = struct.pack("!BIiH", msg_type, msg_id, expiration, len(payload)) return header + payload def decode_i2np_short(data: bytes) -> tuple[int, int, int, bytes]: """Decode I2NP message with short header. Returns: (msg_type, msg_id, expiration_seconds, payload) """ msg_type, msg_id, expiration, size = struct.unpack_from("!BIiH", data) payload = data[11:11 + size] return msg_type, msg_id, expiration, payload # --------------------------------------------------------------------------- # Standard header (used in tunnel messages) # Format: type(1) + msg_id(4 BE) + expiration_ms(8 BE) + size(2 BE) + checksum(1) + payload # Checksum = first byte of SHA-256(payload) # Total header: 16 bytes # --------------------------------------------------------------------------- def encode_i2np_standard(msg_type: int, msg_id: int, expiration_ms: int, payload: bytes) -> bytes: """Encode I2NP message with standard header (for tunnel messages). Format: type(1) + msg_id(4 BE) + expiration_ms(8 BE) + size(2 BE) + checksum(1) + payload Checksum = first byte of SHA-256(payload). """ checksum = hashlib.sha256(payload).digest()[0] header = struct.pack("!BIQHB", msg_type, msg_id, expiration_ms, len(payload), checksum) return header + payload def decode_i2np_standard(data: bytes) -> tuple[int, int, int, bytes]: """Decode I2NP message with standard header. Returns: (msg_type, msg_id, expiration_ms, payload) Raises ValueError if checksum doesn't match. """ msg_type, msg_id, expiration_ms, size, checksum = struct.unpack_from("!BIQHB", data) payload = data[16:16 + size] expected_checksum = hashlib.sha256(payload).digest()[0] if not _hmac.compare_digest(bytes([checksum]), bytes([expected_checksum])): raise ValueError(f"Checksum mismatch: got {checksum}, expected {expected_checksum}") return msg_type, msg_id, expiration_ms, payload # --------------------------------------------------------------------------- # DatabaseStore (type 1) # Payload: key(32) + type_byte(1, 0=RI 1=LS) + reply_token(4) + # [if token!=0: reply_gateway(32) + reply_tunnel(4)] + data # --------------------------------------------------------------------------- def encode_database_store(key: bytes, store_type: int, data: bytes, reply_token: int = 0, reply_gateway: bytes = b"", reply_tunnel_id: int = 0) -> bytes: """Encode DatabaseStore payload. key: 32-byte hash of the item being stored store_type: 0 = RouterInfo, 1 = LeaseSet data: the RouterInfo or LeaseSet bytes reply_token: if non-zero, request delivery acknowledgment reply_gateway: 32-byte hash (required if reply_token != 0) reply_tunnel_id: 4-byte tunnel ID (required if reply_token != 0) """ parts = [key, struct.pack("!BI", store_type, reply_token)] if reply_token != 0: parts.append(reply_gateway) parts.append(struct.pack("!I", reply_tunnel_id)) parts.append(data) return b"".join(parts) def decode_database_store(payload: bytes) -> dict: """Decode DatabaseStore payload. Returns dict with: key, store_type, reply_token, reply_gateway (if token!=0), reply_tunnel_id (if token!=0), data. """ key = payload[:32] store_type = payload[32] reply_token = struct.unpack("!I", payload[33:37])[0] offset = 37 result = { "key": key, "store_type": store_type, "reply_token": reply_token, } if reply_token != 0: result["reply_gateway"] = payload[offset:offset + 32] offset += 32 result["reply_tunnel_id"] = struct.unpack("!I", payload[offset:offset + 4])[0] offset += 4 result["data"] = payload[offset:] return result # --------------------------------------------------------------------------- # DatabaseLookup (type 2) # Payload: key(32) + from_hash(32) + flags(1) + [reply_tunnel_id(4) if flag bit 0] # + size(1) + exclude_list(size * 32) # --------------------------------------------------------------------------- def encode_database_lookup(key: bytes, from_hash: bytes, flags: int = 0, reply_tunnel_id: int = 0, exclude_list: list[bytes] | None = None) -> bytes: """Encode DatabaseLookup payload.""" if exclude_list is None: exclude_list = [] parts = [key, from_hash, struct.pack("!B", flags)] if flags & 0x01: parts.append(struct.pack("!I", reply_tunnel_id)) parts.append(struct.pack("!B", len(exclude_list))) for ex in exclude_list: parts.append(ex) return b"".join(parts) def decode_database_lookup(payload: bytes) -> dict: """Decode DatabaseLookup payload. Returns dict with: key, from_hash, flags, reply_tunnel_id, exclude_list. """ key = payload[:32] from_hash = payload[32:64] flags = payload[64] offset = 65 reply_tunnel_id = 0 if flags & 0x01: reply_tunnel_id = struct.unpack("!I", payload[offset:offset + 4])[0] offset += 4 num_excludes = payload[offset] offset += 1 exclude_list = [] for _ in range(num_excludes): exclude_list.append(payload[offset:offset + 32]) offset += 32 return { "key": key, "from_hash": from_hash, "flags": flags, "reply_tunnel_id": reply_tunnel_id, "exclude_list": exclude_list, } # --------------------------------------------------------------------------- # DatabaseSearchReply (type 3) # Payload: key(32) + num_peers(1) + [peer_hashes(32 each)] + from_hash(32) # --------------------------------------------------------------------------- def encode_database_search_reply(key: bytes, peer_hashes: list[bytes], from_hash: bytes) -> bytes: """Encode DatabaseSearchReply payload.""" parts = [key, struct.pack("!B", len(peer_hashes))] for ph in peer_hashes: parts.append(ph) parts.append(from_hash) return b"".join(parts) def decode_database_search_reply(payload: bytes) -> dict: """Decode DatabaseSearchReply payload. Returns dict with: key, peer_hashes, from_hash. """ key = payload[:32] num_peers = payload[32] offset = 33 peer_hashes = [] for _ in range(num_peers): peer_hashes.append(payload[offset:offset + 32]) offset += 32 from_hash = payload[offset:offset + 32] return { "key": key, "peer_hashes": peer_hashes, "from_hash": from_hash, } # --------------------------------------------------------------------------- # DeliveryStatus (type 10) # Payload: msg_id(4 BE) + timestamp(8 BE) # --------------------------------------------------------------------------- def encode_delivery_status(msg_id: int, timestamp: int) -> bytes: """Encode DeliveryStatus payload: msg_id(4 BE) + timestamp(8 BE).""" return struct.pack("!IQ", msg_id, timestamp) def decode_delivery_status(payload: bytes) -> dict: """Decode DeliveryStatus payload. Returns dict with: msg_id, timestamp. """ msg_id, timestamp = struct.unpack("!IQ", payload[:12]) return {"msg_id": msg_id, "timestamp": timestamp}