A Python port of the Invisible Internet Project (I2P)
1"""Router data structures — RouterIdentity, RouterAddress, RouterInfo.
2
3Ported from net.i2p.data.router.RouterIdentity, RouterAddress, RouterInfo.
4"""
5
6from __future__ import annotations
7
8import io
9import struct
10
11from i2p_data.keys_and_cert import KeysAndCert
12
13
14def _parse_mapping(data: bytes) -> dict[str, str]:
15 """Parse an I2P Mapping from raw bytes.
16
17 The I2P Mapping format (used inside RouterAddress and RouterInfo options)
18 consists of key-value pairs where each key and value is a length-prefixed
19 UTF-8 string, separated by '=' and terminated by ';':
20
21 <len_byte><key_bytes>=<len_byte><value_bytes>;
22
23 This repeats until all bytes in the mapping are consumed.
24 """
25 options: dict[str, str] = {}
26 offset = 0
27 while offset < len(data):
28 # Read key: 1-byte length + key bytes
29 if offset >= len(data):
30 break
31 key_len = data[offset]
32 offset += 1
33 if offset + key_len > len(data):
34 break
35 key = data[offset:offset + key_len].decode("utf-8", errors="replace")
36 offset += key_len
37
38 # Expect '='
39 if offset >= len(data) or data[offset:offset + 1] != b"=":
40 break
41 offset += 1
42
43 # Read value: 1-byte length + value bytes
44 if offset >= len(data):
45 break
46 val_len = data[offset]
47 offset += 1
48 if offset + val_len > len(data):
49 break
50 value = data[offset:offset + val_len].decode("utf-8", errors="replace")
51 offset += val_len
52
53 # Expect ';'
54 if offset >= len(data) or data[offset:offset + 1] != b";":
55 # Try to accept anyway
56 options[key] = value
57 break
58 offset += 1
59
60 options[key] = value
61
62 return options
63
64
65def _serialize_mapping(options: dict[str, str]) -> bytes:
66 """Serialize a dict to I2P Mapping binary format.
67
68 Each pair is: len_byte key_bytes = len_byte value_bytes ;
69 Keys are written in sorted order per the I2P spec.
70 """
71 buf = io.BytesIO()
72 for key in sorted(options.keys()):
73 key_bytes = key.encode("utf-8")
74 val_bytes = options[key].encode("utf-8")
75 buf.write(struct.pack("!B", len(key_bytes)))
76 buf.write(key_bytes)
77 buf.write(b"=")
78 buf.write(struct.pack("!B", len(val_bytes)))
79 buf.write(val_bytes)
80 buf.write(b";")
81 return buf.getvalue()
82
83
84class RouterIdentity(KeysAndCert):
85 """Router identity — a KeysAndCert identifying a router in the I2P network."""
86
87 @classmethod
88 def from_bytes(cls, data: bytes) -> "RouterIdentity":
89 """Deserialize from wire format."""
90 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType
91 from i2p_data.certificate import Certificate, KeyCertificate
92 from i2p_crypto.dsa import SigType
93
94 if len(data) < 387:
95 raise ValueError(f"RouterIdentity requires at least 387 bytes, got {len(data)}")
96
97 pub_area = data[:cls.PUBKEY_AREA_SIZE]
98 sig_area = data[cls.PUBKEY_AREA_SIZE:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE]
99 cert_data = data[cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE:]
100
101 cert = Certificate.from_bytes(cert_data)
102
103 if isinstance(cert, KeyCertificate):
104 enc_type = cert.get_enc_type() or EncType.ELGAMAL
105 sig_type = cert.get_sig_type() or SigType.DSA_SHA1
106 else:
107 enc_type = EncType.ELGAMAL
108 sig_type = SigType.DSA_SHA1
109
110 pub_len = enc_type.pubkey_len
111 pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type)
112
113 sig_len = sig_type.pubkey_len
114 sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type)
115
116 raw = data[:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE + len(cert)]
117 return cls(pub_key, sig_key, cert, raw=raw)
118
119
120class RouterAddress:
121 """A transport address for a router.
122
123 Contains cost (priority), expiration, transport type, and options.
124 """
125
126 __slots__ = ("_cost", "_expiration", "_transport", "_options")
127
128 def __init__(self, cost: int, expiration: int, transport: str,
129 options: dict[str, str] | None = None) -> None:
130 self._cost = cost
131 self._expiration = expiration
132 self._transport = transport
133 self._options = dict(options) if options else {}
134
135 @property
136 def cost(self) -> int:
137 return self._cost
138
139 @property
140 def expiration(self) -> int:
141 return self._expiration
142
143 @property
144 def transport(self) -> str:
145 return self._transport
146
147 @property
148 def options(self) -> dict[str, str]:
149 return dict(self._options)
150
151 def get_host(self) -> str | None:
152 return self._options.get("host")
153
154 def get_port(self) -> int | None:
155 port = self._options.get("port")
156 return int(port) if port is not None else None
157
158 def to_bytes(self) -> bytes:
159 """Serialize to I2P wire format.
160
161 Format: 1 byte cost + 8 bytes expiration + 1 byte transport_len +
162 transport string + properties
163 """
164 buf = io.BytesIO()
165 buf.write(struct.pack("!B", self._cost))
166 buf.write(struct.pack("!Q", self._expiration))
167
168 transport_bytes = self._transport.encode("utf-8")
169 buf.write(struct.pack("!B", len(transport_bytes)))
170 buf.write(transport_bytes)
171
172 # Write properties (I2P Mapping format)
173 if self._options:
174 props_data = _serialize_mapping(self._options)
175 buf.write(struct.pack("!H", len(props_data)))
176 buf.write(props_data)
177 else:
178 buf.write(struct.pack("!H", 0))
179
180 return buf.getvalue()
181
182 @classmethod
183 def from_bytes(cls, data: bytes) -> tuple["RouterAddress", int]:
184 """Deserialize from bytes. Returns (address, bytes_consumed)."""
185 stream = io.BytesIO(data)
186 return cls.from_stream(stream)
187
188 @classmethod
189 def from_stream(cls, stream: io.IOBase) -> tuple["RouterAddress", int]:
190 """Read from stream. Returns (address, bytes_consumed)."""
191 start = stream.tell()
192
193 cost = struct.unpack("!B", stream.read(1))[0]
194 expiration = struct.unpack("!Q", stream.read(8))[0]
195
196 transport_len = struct.unpack("!B", stream.read(1))[0]
197 transport = stream.read(transport_len).decode("utf-8")
198
199 # Read properties (I2P Mapping format)
200 props_len = struct.unpack("!H", stream.read(2))[0]
201 options = {}
202 if props_len > 0:
203 props_data = stream.read(props_len)
204 options = _parse_mapping(props_data)
205
206 consumed = stream.tell() - start
207 return cls(cost, expiration, transport, options), consumed
208
209 def __eq__(self, other: object) -> bool:
210 if not isinstance(other, RouterAddress):
211 return NotImplemented
212 return (self._cost == other._cost and self._expiration == other._expiration
213 and self._transport == other._transport and self._options == other._options)
214
215 def __repr__(self) -> str:
216 host = self.get_host() or "?"
217 port = self.get_port() or "?"
218 return f"RouterAddress({self._transport}://{host}:{port}, cost={self._cost})"
219
220
221class RouterInfo:
222 """Router information — identity, addresses, options, and signature.
223
224 A signed database entry describing a router's capabilities and
225 transport addresses.
226 """
227
228 __slots__ = ("_identity", "_published", "_addresses", "_options", "_signature")
229
230 def __init__(self, identity: RouterIdentity, published: int,
231 addresses: list[RouterAddress] | None = None,
232 options: dict[str, str] | None = None,
233 signature: bytes = b"") -> None:
234 self._identity = identity
235 self._published = published
236 self._addresses = list(addresses) if addresses else []
237 self._options = dict(options) if options else {}
238 self._signature = signature
239
240 @property
241 def identity(self) -> RouterIdentity:
242 return self._identity
243
244 @property
245 def published(self) -> int:
246 return self._published
247
248 @property
249 def addresses(self) -> list[RouterAddress]:
250 return list(self._addresses)
251
252 @property
253 def options(self) -> dict[str, str]:
254 return dict(self._options)
255
256 @property
257 def signature(self) -> bytes:
258 return self._signature
259
260 def _signable_bytes(self) -> bytes:
261 """Get the bytes that are signed (everything except the signature)."""
262 buf = io.BytesIO()
263
264 # Identity
265 buf.write(self._identity.to_bytes())
266
267 # Published date (8 bytes, milliseconds)
268 buf.write(struct.pack("!Q", self._published))
269
270 # Number of addresses (1 byte)
271 buf.write(struct.pack("!B", len(self._addresses)))
272
273 # Addresses
274 for addr in self._addresses:
275 buf.write(addr.to_bytes())
276
277 # Peer size (always 0 in current I2P)
278 buf.write(struct.pack("!B", 0))
279
280 # Options (I2P Mapping format)
281 if self._options:
282 props_data = _serialize_mapping(self._options)
283 buf.write(struct.pack("!H", len(props_data)))
284 buf.write(props_data)
285 else:
286 buf.write(struct.pack("!H", 0))
287
288 return buf.getvalue()
289
290 def to_bytes(self) -> bytes:
291 """Serialize to wire format."""
292 return self._signable_bytes() + self._signature
293
294 def sign(self, private_key: bytes) -> None:
295 """Sign this RouterInfo with the given private key."""
296 from i2p_crypto.dsa import DSAEngine
297 sig_type = self._identity.signing_public_key.sig_type
298 self._signature = DSAEngine.sign(self._signable_bytes(), private_key, sig_type)
299
300 def verify(self) -> bool:
301 """Verify the signature using the identity's signing public key."""
302 if not self._signature:
303 return False
304 from i2p_crypto.dsa import DSAEngine
305 sig_type = self._identity.signing_public_key.sig_type
306 pub_key = self._identity.signing_public_key.to_bytes()
307 return DSAEngine.verify(self._signable_bytes(), self._signature, pub_key, sig_type)
308
309 @classmethod
310 def from_bytes(cls, data: bytes) -> "RouterInfo":
311 """Deserialize from wire format."""
312 from i2p_data.certificate import Certificate, KeyCertificate
313 from i2p_crypto.dsa import SigType
314
315 stream = io.BytesIO(data)
316
317 # Read identity
318 identity = RouterIdentity.from_bytes(data)
319 identity_len = KeysAndCert.PUBKEY_AREA_SIZE + KeysAndCert.SIGKEY_AREA_SIZE + len(identity.certificate)
320 stream.seek(identity_len)
321
322 # Published date
323 published = struct.unpack("!Q", stream.read(8))[0]
324
325 # Addresses
326 num_addresses = struct.unpack("!B", stream.read(1))[0]
327 addresses = []
328 for _ in range(num_addresses):
329 remaining = data[stream.tell():]
330 addr, consumed = RouterAddress.from_bytes(remaining)
331 addresses.append(addr)
332 stream.seek(stream.tell() + consumed)
333
334 # Peer size (skip)
335 struct.unpack("!B", stream.read(1))[0]
336
337 # Options (I2P Mapping format)
338 props_len = struct.unpack("!H", stream.read(2))[0]
339 options = {}
340 if props_len > 0:
341 props_data = stream.read(props_len)
342 options = _parse_mapping(props_data)
343
344 # Signature (remaining bytes)
345 sig_type = identity.signing_public_key.sig_type
346 signature = stream.read(sig_type.sig_len)
347
348 return cls(identity, published, addresses, options, signature)
349
350 def __repr__(self) -> str:
351 return f"RouterInfo(hash={self._identity.hash()[:4].hex()}..., addrs={len(self._addresses)})"