"""Router identity generation and RouterInfo building. Ported from net.i2p.router.RouterIdentityGenerator and related Java classes. Provides: - RouterIdentityGenerator: creates RouterIdentity with crypto keypairs - RouterInfoBuilder: fluent builder for signed RouterInfo structures """ from __future__ import annotations import base64 import json import os import struct import time from dataclasses import dataclass from i2p_data.data_helper import to_base64 as i2p_b64encode, from_base64 as i2p_b64decode from i2p_data.router import RouterIdentity, RouterAddress, RouterInfo from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import Certificate, CertificateType, KeyCertificate from i2p_crypto.dsa import SigType, KeyGenerator from i2p_crypto.elgamal import ElGamalEngine from i2p_crypto.x25519 import X25519DH class RouterIdentityGenerator: """Generate a RouterIdentity with corresponding private keys. Creates an ElGamal encryption keypair and a signing keypair, wraps the public halves into a RouterIdentity with the appropriate certificate type. """ # Default to EdDSA — modern I2P routers use this DEFAULT_SIG_TYPE = SigType.EdDSA_SHA512_Ed25519 @classmethod def generate( cls, sig_type: SigType | None = None ) -> tuple[RouterIdentity, dict]: """Generate a new router identity. Args: sig_type: Signing algorithm. Defaults to EdDSA_SHA512_Ed25519. Falls back to DSA_SHA1 if explicitly requested. Returns: (RouterIdentity, {"elgamal_private": bytes, "signing_private": bytes}) """ if sig_type is None: sig_type = cls.DEFAULT_SIG_TYPE # 1. Generate ElGamal encryption keypair (256-byte pub, 256-byte priv) elgamal_pub, elgamal_priv = ElGamalEngine.generate_keypair() # 2. Generate signing keypair signing_pub_bytes, signing_priv_bytes = KeyGenerator.generate(sig_type) # 3. Build the certificate if sig_type == SigType.DSA_SHA1: # DSA_SHA1 uses NULL certificate — keys fit exactly in the # standard 256 + 128 byte areas cert = Certificate(CertificateType.NULL) else: # Non-DSA types need a KeyCertificate declaring the sig/enc types # Payload: 2 bytes sig_type code + 2 bytes enc_type code # (+ optional extra key data if keys overflow their areas, # but for ElGamal + EdDSA no extra data is needed) cert_payload = struct.pack("!HH", sig_type.code, EncType.ELGAMAL.code) cert = KeyCertificate(cert_payload) # 4. Wrap public keys in typed containers pub_key = PublicKey(elgamal_pub, EncType.ELGAMAL) sig_key = SigningPublicKey(signing_pub_bytes, sig_type) # 5. Build the RouterIdentity identity = RouterIdentity(pub_key, sig_key, cert) private_keys = { "elgamal_private": elgamal_priv, "signing_private": signing_priv_bytes, } return identity, private_keys class RouterInfoBuilder: """Fluent builder for signed RouterInfo structures. Usage:: identity, keys = RouterIdentityGenerator.generate() info = ( RouterInfoBuilder(identity, keys["signing_private"]) .add_ntcp2_address("192.168.1.1", 9000, static_key_pub) .set_options({"router.version": "0.9.62"}) .build() ) """ def __init__( self, identity: RouterIdentity, signing_private_key: bytes ) -> None: self._identity = identity self._signing_private_key = signing_private_key self._addresses: list[RouterAddress] = [] self._options: dict[str, str] = {} def add_ntcp2_address( self, host: str, port: int, static_key_pub: bytes ) -> "RouterInfoBuilder": """Add an NTCP2 transport address. Args: host: IP address or hostname port: TCP port static_key_pub: 32-byte NTCP2 static public key Returns: self (for chaining) """ addr = RouterAddress( cost=10, expiration=0, transport="NTCP2", options={ "host": host, "port": str(port), "s": i2p_b64encode(static_key_pub), }, ) self._addresses.append(addr) return self def set_options(self, options: dict[str, str]) -> "RouterInfoBuilder": """Set router options (e.g., version, capabilities). Args: options: key-value pairs for the RouterInfo options map Returns: self (for chaining) """ self._options.update(options) return self def build(self) -> RouterInfo: """Build and sign the RouterInfo. Creates a RouterInfo with current time as the published timestamp, signs it with the signing private key, and returns it. Returns: Signed RouterInfo ready for publication """ published = int(time.time() * 1000) info = RouterInfo( identity=self._identity, published=published, addresses=list(self._addresses), options=dict(self._options), ) # Sign with the private signing key info.sign(self._signing_private_key) return info # --------------------------------------------------------------------------- # Standalone helper functions for NTCP2-capable router identity generation # --------------------------------------------------------------------------- def generate_router_keys() -> dict: """Generate all keys needed for a modern router identity. Returns dict with: signing_private: Ed25519 private key bytes (32 bytes) signing_public: Ed25519 public key bytes (32 bytes) ntcp2_static: (private, public) X25519 keypair for NTCP2 (32 bytes each) ntcp2_iv: 16 random bytes for NTCP2 "i" parameter """ # Ed25519 signing key signing_pub, signing_priv = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) # X25519 for NTCP2 ntcp2_static = X25519DH.generate_keypair() # Random IV for NTCP2 ntcp2_iv = os.urandom(16) return { "signing_private": signing_priv, "signing_public": signing_pub, "ntcp2_static": ntcp2_static, "ntcp2_iv": ntcp2_iv, } def build_router_identity(signing_public: bytes, enc_public: bytes) -> RouterIdentity: """Build a RouterIdentity with Ed25519 signing and X25519 encryption. The I2P RouterIdentity has fixed-size fields: - 256-byte public key area (X25519 key is 32 bytes, right-aligned with zero padding) - 128-byte signing key area (Ed25519 key is 32 bytes, right-aligned with zero padding) - KeyCertificate specifying the actual key types Args: signing_public: 32-byte Ed25519 public key enc_public: 32-byte X25519 public key """ # Build KeyCertificate payload: sig_type code (2 bytes) + enc_type code (2 bytes) cert_payload = struct.pack("!HH", SigType.EdDSA_SHA512_Ed25519.code, EncType.ECIES_X25519.code) cert = KeyCertificate(cert_payload) # Wrap in typed containers pub_key = PublicKey(enc_public, EncType.ECIES_X25519) sig_key = SigningPublicKey(signing_public, SigType.EdDSA_SHA512_Ed25519) return RouterIdentity(pub_key, sig_key, cert) def build_router_info( identity: RouterIdentity, signing_private: bytes, host: str, port: int, ntcp2_static_pub: bytes, ntcp2_iv: bytes, ) -> RouterInfo: """Build a signed RouterInfo with an NTCP2 transport address. The RouterAddress options include: - host: IP address string - port: port number as string - s: base64-encoded X25519 static public key (32 bytes) - i: base64-encoded IV (16 bytes) - v: "2" (NTCP2 version) Args: identity: RouterIdentity for this router signing_private: Ed25519 private key for signing host: IP address or hostname port: TCP port number ntcp2_static_pub: 32-byte X25519 static public key ntcp2_iv: 16-byte initialization vector """ s_b64 = i2p_b64encode(ntcp2_static_pub) i_b64 = i2p_b64encode(ntcp2_iv) ntcp2_addr = RouterAddress( cost=10, expiration=0, transport="NTCP2", options={ "host": host, "port": str(port), "s": s_b64, "i": i_b64, "v": "2", }, ) ri = RouterInfo( identity=identity, published=int(time.time() * 1000), addresses=[ntcp2_addr], options={ "router.version": "0.9.62", "netId": "2", "caps": "R", }, ) ri.sign(signing_private) return ri # --------------------------------------------------------------------------- # RouterKeyBundle — unified key container with persistence # --------------------------------------------------------------------------- @dataclass class RouterKeyBundle: """All keys needed for a router identity. Holds Ed25519 signing keys, X25519 NTCP2 transport keys, and the NTCP2 initialization vector. Provides save/load for JSON persistence and a convenience ``generate()`` class method. """ signing_private: bytes # Ed25519 private key (32 bytes) signing_public: bytes # Ed25519 public key (32 bytes) ntcp2_private: bytes # X25519 private key (32 bytes) ntcp2_public: bytes # X25519 public key (32 bytes) ntcp2_iv: bytes # 16-byte random IV for NTCP2 # Alias: enc_public is the same as ntcp2_public for ECIES_X25519 routers @property def enc_public(self) -> bytes: """X25519 public key used as the encryption public key.""" return self.ntcp2_public def save(self, path: str) -> None: """Save keys to a JSON file with base64-encoded values.""" data = { "signing_private": base64.b64encode(self.signing_private).decode("ascii"), "signing_public": base64.b64encode(self.signing_public).decode("ascii"), "ntcp2_private": base64.b64encode(self.ntcp2_private).decode("ascii"), "ntcp2_public": base64.b64encode(self.ntcp2_public).decode("ascii"), "ntcp2_iv": base64.b64encode(self.ntcp2_iv).decode("ascii"), } with open(path, "w") as f: json.dump(data, f, indent=2) @classmethod def load(cls, path: str) -> "RouterKeyBundle" | None: """Load keys from a JSON file. Returns None if the file doesn't exist.""" try: with open(path) as f: data = json.load(f) except FileNotFoundError: return None return cls( signing_private=base64.b64decode(data["signing_private"]), signing_public=base64.b64decode(data["signing_public"]), ntcp2_private=base64.b64decode(data["ntcp2_private"]), ntcp2_public=base64.b64decode(data["ntcp2_public"]), ntcp2_iv=base64.b64decode(data["ntcp2_iv"]), ) @classmethod def generate(cls) -> "RouterKeyBundle": """Generate a fresh key bundle with Ed25519 + X25519 + random IV.""" signing_pub, signing_priv = KeyGenerator.generate( SigType.EdDSA_SHA512_Ed25519 ) ntcp2_priv, ntcp2_pub = X25519DH.generate_keypair() ntcp2_iv = os.urandom(16) return cls( signing_private=signing_priv, signing_public=signing_pub, ntcp2_private=ntcp2_priv, ntcp2_public=ntcp2_pub, ntcp2_iv=ntcp2_iv, ) def create_full_router_identity( bundle: RouterKeyBundle, host: str, port: int ) -> tuple[RouterIdentity, RouterInfo]: """Create RouterIdentity + signed RouterInfo from a key bundle. Args: bundle: All keys for the router host: IP address or hostname for the NTCP2 address port: TCP port for the NTCP2 address Returns: (RouterIdentity, signed RouterInfo) """ identity = build_router_identity(bundle.signing_public, bundle.enc_public) ri = build_router_info( identity=identity, signing_private=bundle.signing_private, host=host, port=port, ntcp2_static_pub=bundle.ntcp2_public, ntcp2_iv=bundle.ntcp2_iv, ) return identity, ri