A Python port of the Invisible Internet Project (I2P)
at main 378 lines 13 kB view raw
1"""Router identity generation and RouterInfo building. 2 3Ported from net.i2p.router.RouterIdentityGenerator and related Java classes. 4 5Provides: 6- RouterIdentityGenerator: creates RouterIdentity with crypto keypairs 7- RouterInfoBuilder: fluent builder for signed RouterInfo structures 8""" 9 10from __future__ import annotations 11 12import base64 13import json 14import os 15import struct 16import time 17from dataclasses import dataclass 18 19from i2p_data.data_helper import to_base64 as i2p_b64encode, from_base64 as i2p_b64decode 20from i2p_data.router import RouterIdentity, RouterAddress, RouterInfo 21from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 22from i2p_data.certificate import Certificate, CertificateType, KeyCertificate 23from i2p_crypto.dsa import SigType, KeyGenerator 24from i2p_crypto.elgamal import ElGamalEngine 25from i2p_crypto.x25519 import X25519DH 26 27 28class RouterIdentityGenerator: 29 """Generate a RouterIdentity with corresponding private keys. 30 31 Creates an ElGamal encryption keypair and a signing keypair, 32 wraps the public halves into a RouterIdentity with the appropriate 33 certificate type. 34 """ 35 36 # Default to EdDSA — modern I2P routers use this 37 DEFAULT_SIG_TYPE = SigType.EdDSA_SHA512_Ed25519 38 39 @classmethod 40 def generate( 41 cls, sig_type: SigType | None = None 42 ) -> tuple[RouterIdentity, dict]: 43 """Generate a new router identity. 44 45 Args: 46 sig_type: Signing algorithm. Defaults to EdDSA_SHA512_Ed25519. 47 Falls back to DSA_SHA1 if explicitly requested. 48 49 Returns: 50 (RouterIdentity, {"elgamal_private": bytes, "signing_private": bytes}) 51 """ 52 if sig_type is None: 53 sig_type = cls.DEFAULT_SIG_TYPE 54 55 # 1. Generate ElGamal encryption keypair (256-byte pub, 256-byte priv) 56 elgamal_pub, elgamal_priv = ElGamalEngine.generate_keypair() 57 58 # 2. Generate signing keypair 59 signing_pub_bytes, signing_priv_bytes = KeyGenerator.generate(sig_type) 60 61 # 3. Build the certificate 62 if sig_type == SigType.DSA_SHA1: 63 # DSA_SHA1 uses NULL certificate — keys fit exactly in the 64 # standard 256 + 128 byte areas 65 cert = Certificate(CertificateType.NULL) 66 else: 67 # Non-DSA types need a KeyCertificate declaring the sig/enc types 68 # Payload: 2 bytes sig_type code + 2 bytes enc_type code 69 # (+ optional extra key data if keys overflow their areas, 70 # but for ElGamal + EdDSA no extra data is needed) 71 cert_payload = struct.pack("!HH", sig_type.code, EncType.ELGAMAL.code) 72 cert = KeyCertificate(cert_payload) 73 74 # 4. Wrap public keys in typed containers 75 pub_key = PublicKey(elgamal_pub, EncType.ELGAMAL) 76 sig_key = SigningPublicKey(signing_pub_bytes, sig_type) 77 78 # 5. Build the RouterIdentity 79 identity = RouterIdentity(pub_key, sig_key, cert) 80 81 private_keys = { 82 "elgamal_private": elgamal_priv, 83 "signing_private": signing_priv_bytes, 84 } 85 return identity, private_keys 86 87 88class RouterInfoBuilder: 89 """Fluent builder for signed RouterInfo structures. 90 91 Usage:: 92 93 identity, keys = RouterIdentityGenerator.generate() 94 info = ( 95 RouterInfoBuilder(identity, keys["signing_private"]) 96 .add_ntcp2_address("192.168.1.1", 9000, static_key_pub) 97 .set_options({"router.version": "0.9.62"}) 98 .build() 99 ) 100 """ 101 102 def __init__( 103 self, identity: RouterIdentity, signing_private_key: bytes 104 ) -> None: 105 self._identity = identity 106 self._signing_private_key = signing_private_key 107 self._addresses: list[RouterAddress] = [] 108 self._options: dict[str, str] = {} 109 110 def add_ntcp2_address( 111 self, host: str, port: int, static_key_pub: bytes 112 ) -> "RouterInfoBuilder": 113 """Add an NTCP2 transport address. 114 115 Args: 116 host: IP address or hostname 117 port: TCP port 118 static_key_pub: 32-byte NTCP2 static public key 119 120 Returns: 121 self (for chaining) 122 """ 123 addr = RouterAddress( 124 cost=10, 125 expiration=0, 126 transport="NTCP2", 127 options={ 128 "host": host, 129 "port": str(port), 130 "s": i2p_b64encode(static_key_pub), 131 }, 132 ) 133 self._addresses.append(addr) 134 return self 135 136 def set_options(self, options: dict[str, str]) -> "RouterInfoBuilder": 137 """Set router options (e.g., version, capabilities). 138 139 Args: 140 options: key-value pairs for the RouterInfo options map 141 142 Returns: 143 self (for chaining) 144 """ 145 self._options.update(options) 146 return self 147 148 def build(self) -> RouterInfo: 149 """Build and sign the RouterInfo. 150 151 Creates a RouterInfo with current time as the published timestamp, 152 signs it with the signing private key, and returns it. 153 154 Returns: 155 Signed RouterInfo ready for publication 156 """ 157 published = int(time.time() * 1000) 158 159 info = RouterInfo( 160 identity=self._identity, 161 published=published, 162 addresses=list(self._addresses), 163 options=dict(self._options), 164 ) 165 166 # Sign with the private signing key 167 info.sign(self._signing_private_key) 168 169 return info 170 171 172# --------------------------------------------------------------------------- 173# Standalone helper functions for NTCP2-capable router identity generation 174# --------------------------------------------------------------------------- 175 176 177def generate_router_keys() -> dict: 178 """Generate all keys needed for a modern router identity. 179 180 Returns dict with: 181 signing_private: Ed25519 private key bytes (32 bytes) 182 signing_public: Ed25519 public key bytes (32 bytes) 183 ntcp2_static: (private, public) X25519 keypair for NTCP2 (32 bytes each) 184 ntcp2_iv: 16 random bytes for NTCP2 "i" parameter 185 """ 186 # Ed25519 signing key 187 signing_pub, signing_priv = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) 188 189 # X25519 for NTCP2 190 ntcp2_static = X25519DH.generate_keypair() 191 192 # Random IV for NTCP2 193 ntcp2_iv = os.urandom(16) 194 195 return { 196 "signing_private": signing_priv, 197 "signing_public": signing_pub, 198 "ntcp2_static": ntcp2_static, 199 "ntcp2_iv": ntcp2_iv, 200 } 201 202 203def build_router_identity(signing_public: bytes, enc_public: bytes) -> RouterIdentity: 204 """Build a RouterIdentity with Ed25519 signing and X25519 encryption. 205 206 The I2P RouterIdentity has fixed-size fields: 207 - 256-byte public key area (X25519 key is 32 bytes, right-aligned with zero padding) 208 - 128-byte signing key area (Ed25519 key is 32 bytes, right-aligned with zero padding) 209 - KeyCertificate specifying the actual key types 210 211 Args: 212 signing_public: 32-byte Ed25519 public key 213 enc_public: 32-byte X25519 public key 214 """ 215 # Build KeyCertificate payload: sig_type code (2 bytes) + enc_type code (2 bytes) 216 cert_payload = struct.pack("!HH", 217 SigType.EdDSA_SHA512_Ed25519.code, 218 EncType.ECIES_X25519.code) 219 cert = KeyCertificate(cert_payload) 220 221 # Wrap in typed containers 222 pub_key = PublicKey(enc_public, EncType.ECIES_X25519) 223 sig_key = SigningPublicKey(signing_public, SigType.EdDSA_SHA512_Ed25519) 224 225 return RouterIdentity(pub_key, sig_key, cert) 226 227 228def build_router_info( 229 identity: RouterIdentity, 230 signing_private: bytes, 231 host: str, 232 port: int, 233 ntcp2_static_pub: bytes, 234 ntcp2_iv: bytes, 235) -> RouterInfo: 236 """Build a signed RouterInfo with an NTCP2 transport address. 237 238 The RouterAddress options include: 239 - host: IP address string 240 - port: port number as string 241 - s: base64-encoded X25519 static public key (32 bytes) 242 - i: base64-encoded IV (16 bytes) 243 - v: "2" (NTCP2 version) 244 245 Args: 246 identity: RouterIdentity for this router 247 signing_private: Ed25519 private key for signing 248 host: IP address or hostname 249 port: TCP port number 250 ntcp2_static_pub: 32-byte X25519 static public key 251 ntcp2_iv: 16-byte initialization vector 252 """ 253 s_b64 = i2p_b64encode(ntcp2_static_pub) 254 i_b64 = i2p_b64encode(ntcp2_iv) 255 256 ntcp2_addr = RouterAddress( 257 cost=10, 258 expiration=0, 259 transport="NTCP2", 260 options={ 261 "host": host, 262 "port": str(port), 263 "s": s_b64, 264 "i": i_b64, 265 "v": "2", 266 }, 267 ) 268 269 ri = RouterInfo( 270 identity=identity, 271 published=int(time.time() * 1000), 272 addresses=[ntcp2_addr], 273 options={ 274 "router.version": "0.9.62", 275 "netId": "2", 276 "caps": "R", 277 }, 278 ) 279 ri.sign(signing_private) 280 return ri 281 282 283# --------------------------------------------------------------------------- 284# RouterKeyBundle — unified key container with persistence 285# --------------------------------------------------------------------------- 286 287 288@dataclass 289class RouterKeyBundle: 290 """All keys needed for a router identity. 291 292 Holds Ed25519 signing keys, X25519 NTCP2 transport keys, and the 293 NTCP2 initialization vector. Provides save/load for JSON persistence 294 and a convenience ``generate()`` class method. 295 """ 296 297 signing_private: bytes # Ed25519 private key (32 bytes) 298 signing_public: bytes # Ed25519 public key (32 bytes) 299 ntcp2_private: bytes # X25519 private key (32 bytes) 300 ntcp2_public: bytes # X25519 public key (32 bytes) 301 ntcp2_iv: bytes # 16-byte random IV for NTCP2 302 303 # Alias: enc_public is the same as ntcp2_public for ECIES_X25519 routers 304 @property 305 def enc_public(self) -> bytes: 306 """X25519 public key used as the encryption public key.""" 307 return self.ntcp2_public 308 309 def save(self, path: str) -> None: 310 """Save keys to a JSON file with base64-encoded values.""" 311 data = { 312 "signing_private": base64.b64encode(self.signing_private).decode("ascii"), 313 "signing_public": base64.b64encode(self.signing_public).decode("ascii"), 314 "ntcp2_private": base64.b64encode(self.ntcp2_private).decode("ascii"), 315 "ntcp2_public": base64.b64encode(self.ntcp2_public).decode("ascii"), 316 "ntcp2_iv": base64.b64encode(self.ntcp2_iv).decode("ascii"), 317 } 318 with open(path, "w") as f: 319 json.dump(data, f, indent=2) 320 321 @classmethod 322 def load(cls, path: str) -> "RouterKeyBundle" | None: 323 """Load keys from a JSON file. Returns None if the file doesn't exist.""" 324 try: 325 with open(path) as f: 326 data = json.load(f) 327 except FileNotFoundError: 328 return None 329 330 return cls( 331 signing_private=base64.b64decode(data["signing_private"]), 332 signing_public=base64.b64decode(data["signing_public"]), 333 ntcp2_private=base64.b64decode(data["ntcp2_private"]), 334 ntcp2_public=base64.b64decode(data["ntcp2_public"]), 335 ntcp2_iv=base64.b64decode(data["ntcp2_iv"]), 336 ) 337 338 @classmethod 339 def generate(cls) -> "RouterKeyBundle": 340 """Generate a fresh key bundle with Ed25519 + X25519 + random IV.""" 341 signing_pub, signing_priv = KeyGenerator.generate( 342 SigType.EdDSA_SHA512_Ed25519 343 ) 344 ntcp2_priv, ntcp2_pub = X25519DH.generate_keypair() 345 ntcp2_iv = os.urandom(16) 346 347 return cls( 348 signing_private=signing_priv, 349 signing_public=signing_pub, 350 ntcp2_private=ntcp2_priv, 351 ntcp2_public=ntcp2_pub, 352 ntcp2_iv=ntcp2_iv, 353 ) 354 355 356def create_full_router_identity( 357 bundle: RouterKeyBundle, host: str, port: int 358) -> tuple[RouterIdentity, RouterInfo]: 359 """Create RouterIdentity + signed RouterInfo from a key bundle. 360 361 Args: 362 bundle: All keys for the router 363 host: IP address or hostname for the NTCP2 address 364 port: TCP port for the NTCP2 address 365 366 Returns: 367 (RouterIdentity, signed RouterInfo) 368 """ 369 identity = build_router_identity(bundle.signing_public, bundle.enc_public) 370 ri = build_router_info( 371 identity=identity, 372 signing_private=bundle.signing_private, 373 host=host, 374 port=port, 375 ntcp2_static_pub=bundle.ntcp2_public, 376 ntcp2_iv=bundle.ntcp2_iv, 377 ) 378 return identity, ri