"""Tests for router identity generation and RouterInfo building. TDD tests — written before implementation. """ import base64 import hashlib import time import pytest from i2p_data.router import RouterIdentity, RouterAddress, RouterInfo from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import Certificate, KeyCertificate, CertificateType from i2p_crypto.dsa import SigType, KeyGenerator, DSAEngine from i2p_crypto.x25519 import X25519DH from i2p_router.identity import ( RouterIdentityGenerator, RouterInfoBuilder, RouterKeyBundle, generate_router_keys, build_router_identity, build_router_info, create_full_router_identity, ) class TestRouterIdentityGenerator: """Tests for RouterIdentityGenerator.generate().""" def test_generate_returns_router_identity_and_private_keys(self): """generate() returns a (RouterIdentity, dict) tuple.""" identity, private_keys = RouterIdentityGenerator.generate() assert isinstance(identity, RouterIdentity) assert isinstance(private_keys, dict) assert "elgamal_private" in private_keys assert "signing_private" in private_keys def test_generate_default_uses_eddsa(self): """Default sig_type is EdDSA_SHA512_Ed25519.""" identity, private_keys = RouterIdentityGenerator.generate() assert identity.signing_public_key.sig_type == SigType.EdDSA_SHA512_Ed25519 def test_generate_eddsa_has_key_certificate(self): """EdDSA identity must have a KeyCertificate (type=KEY).""" identity, _ = RouterIdentityGenerator.generate() assert isinstance(identity.certificate, KeyCertificate) assert identity.certificate.cert_type == CertificateType.KEY def test_generate_elgamal_public_key_length(self): """ElGamal public key is 256 bytes.""" identity, _ = RouterIdentityGenerator.generate() assert len(identity.public_key.to_bytes()) == 256 assert identity.public_key.enc_type == EncType.ELGAMAL def test_generate_eddsa_signing_key_length(self): """EdDSA signing public key is 32 bytes.""" identity, _ = RouterIdentityGenerator.generate() assert len(identity.signing_public_key.to_bytes()) == 32 def test_generate_eddsa_private_keys_lengths(self): """Private keys have correct lengths.""" _, private_keys = RouterIdentityGenerator.generate() assert len(private_keys["elgamal_private"]) == 256 assert len(private_keys["signing_private"]) == 32 def test_generate_dsa_sha1_fallback(self): """Can generate with DSA_SHA1 sig_type.""" identity, private_keys = RouterIdentityGenerator.generate(sig_type=SigType.DSA_SHA1) assert identity.signing_public_key.sig_type == SigType.DSA_SHA1 assert len(identity.signing_public_key.to_bytes()) == 128 assert isinstance(identity.certificate, Certificate) # DSA_SHA1 uses NULL certificate (not KeyCertificate) assert identity.certificate.cert_type == CertificateType.NULL def test_generate_dsa_sha1_private_key_length(self): """DSA_SHA1 private signing key is 20 bytes.""" _, private_keys = RouterIdentityGenerator.generate(sig_type=SigType.DSA_SHA1) assert len(private_keys["signing_private"]) == 20 def test_generate_identity_serializes_correctly(self): """Generated identity serializes to at least 387 bytes.""" identity, _ = RouterIdentityGenerator.generate() data = identity.to_bytes() # 256 (pub area) + 128 (sig area) + cert (at least 3 bytes) assert len(data) >= 387 def test_generate_identity_hash_is_32_bytes(self): """Identity hash (SHA-256) is 32 bytes.""" identity, _ = RouterIdentityGenerator.generate() h = identity.hash() assert len(h) == 32 def test_generate_two_identities_differ(self): """Two generated identities should have different hashes.""" id1, _ = RouterIdentityGenerator.generate() id2, _ = RouterIdentityGenerator.generate() assert id1.hash() != id2.hash() class TestRouterInfoBuilder: """Tests for RouterInfoBuilder.""" @pytest.fixture def identity_and_keys(self): """Generate a fresh identity for builder tests.""" return RouterIdentityGenerator.generate() def test_build_creates_router_info(self, identity_and_keys): """build() returns a RouterInfo instance.""" identity, private_keys = identity_and_keys builder = RouterInfoBuilder(identity, private_keys["signing_private"]) info = builder.build() assert isinstance(info, RouterInfo) def test_build_has_correct_identity(self, identity_and_keys): """Built RouterInfo contains the same identity.""" identity, private_keys = identity_and_keys info = RouterInfoBuilder(identity, private_keys["signing_private"]).build() assert info.identity == identity def test_build_has_published_timestamp(self, identity_and_keys): """Built RouterInfo has a reasonable published timestamp.""" identity, private_keys = identity_and_keys before_ms = int(time.time() * 1000) info = RouterInfoBuilder(identity, private_keys["signing_private"]).build() after_ms = int(time.time() * 1000) assert before_ms <= info.published <= after_ms def test_add_ntcp2_address(self, identity_and_keys): """NTCP2 address is added with correct properties.""" identity, private_keys = identity_and_keys static_key = b"\xaa" * 32 info = ( RouterInfoBuilder(identity, private_keys["signing_private"]) .add_ntcp2_address("192.168.1.1", 9000, static_key) .build() ) assert len(info.addresses) == 1 addr = info.addresses[0] assert addr.transport == "NTCP2" assert addr.cost == 10 assert addr.expiration == 0 opts = addr.options assert opts["host"] == "192.168.1.1" assert opts["port"] == "9000" import base64 assert opts["s"] == base64.b64encode(static_key).decode("ascii") def test_chaining_multiple_addresses(self, identity_and_keys): """add_ntcp2_address returns self for chaining.""" identity, private_keys = identity_and_keys builder = RouterInfoBuilder(identity, private_keys["signing_private"]) result = builder.add_ntcp2_address("10.0.0.1", 8000, b"\xbb" * 32) assert result is builder result2 = builder.add_ntcp2_address("10.0.0.2", 8001, b"\xcc" * 32) assert result2 is builder info = builder.build() assert len(info.addresses) == 2 def test_set_options(self, identity_and_keys): """set_options stores options in RouterInfo.""" identity, private_keys = identity_and_keys info = ( RouterInfoBuilder(identity, private_keys["signing_private"]) .set_options({"router.version": "0.9.62", "caps": "XfR"}) .build() ) assert info.options["router.version"] == "0.9.62" assert info.options["caps"] == "XfR" def test_set_options_returns_self(self, identity_and_keys): """set_options returns self for chaining.""" identity, private_keys = identity_and_keys builder = RouterInfoBuilder(identity, private_keys["signing_private"]) result = builder.set_options({"key": "val"}) assert result is builder def test_signature_verifies(self, identity_and_keys): """Built RouterInfo has a valid signature.""" identity, private_keys = identity_and_keys info = ( RouterInfoBuilder(identity, private_keys["signing_private"]) .add_ntcp2_address("127.0.0.1", 9001, b"\xdd" * 32) .set_options({"router.version": "0.9.62"}) .build() ) assert info.verify() is True def test_serialization_roundtrip(self, identity_and_keys): """RouterInfo survives to_bytes -> from_bytes roundtrip.""" identity, private_keys = identity_and_keys info = ( RouterInfoBuilder(identity, private_keys["signing_private"]) .add_ntcp2_address("10.0.0.1", 7654, b"\xee" * 32) .set_options({"caps": "Lf"}) .build() ) data = info.to_bytes() restored = RouterInfo.from_bytes(data) assert restored.identity == info.identity assert restored.published == info.published assert len(restored.addresses) == len(info.addresses) assert restored.addresses[0] == info.addresses[0] assert restored.options == info.options assert restored.verify() is True # --------------------------------------------------------------------------- # New tests for generate_router_keys / build_router_identity / build_router_info # --------------------------------------------------------------------------- class TestGenerateRouterKeys: """Tests for generate_router_keys().""" def test_returns_dict_with_expected_keys(self): keys = generate_router_keys() assert "signing_private" in keys assert "signing_public" in keys assert "ntcp2_static" in keys assert "ntcp2_iv" in keys def test_signing_keys_are_ed25519_size(self): keys = generate_router_keys() assert len(keys["signing_public"]) == 32 assert len(keys["signing_private"]) == 32 def test_ntcp2_static_is_x25519_keypair(self): keys = generate_router_keys() priv, pub = keys["ntcp2_static"] assert len(priv) == 32 assert len(pub) == 32 def test_ntcp2_iv_is_16_bytes(self): keys = generate_router_keys() assert len(keys["ntcp2_iv"]) == 16 def test_signing_keypair_is_valid(self): """The signing keypair should produce verifiable signatures.""" keys = generate_router_keys() message = b"test message for signing" sig = DSAEngine.sign(message, keys["signing_private"], SigType.EdDSA_SHA512_Ed25519) assert DSAEngine.verify(message, sig, keys["signing_public"], SigType.EdDSA_SHA512_Ed25519) def test_ntcp2_keypair_works_for_dh(self): """The X25519 keypair should work for Diffie-Hellman exchange.""" keys = generate_router_keys() priv, pub = keys["ntcp2_static"] peer_priv, peer_pub = X25519DH.generate_keypair() shared1 = X25519DH.dh(priv, peer_pub) shared2 = X25519DH.dh(peer_priv, pub) assert shared1 == shared2 def test_each_call_produces_different_keys(self): keys1 = generate_router_keys() keys2 = generate_router_keys() assert keys1["signing_public"] != keys2["signing_public"] assert keys1["ntcp2_iv"] != keys2["ntcp2_iv"] class TestBuildRouterIdentity: """Tests for build_router_identity().""" def test_returns_router_identity(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) assert isinstance(identity, RouterIdentity) def test_has_eddsa_sig_type(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) assert identity.signing_public_key.sig_type == SigType.EdDSA_SHA512_Ed25519 def test_has_ecies_x25519_enc_type(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) assert identity.public_key.enc_type == EncType.ECIES_X25519 def test_certificate_is_key_certificate(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) cert = identity.certificate assert isinstance(cert, KeyCertificate) assert cert.cert_type == CertificateType.KEY def test_certificate_sig_type_code(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) cert = identity.certificate assert cert.get_sig_type_code() == SigType.EdDSA_SHA512_Ed25519.code # 7 def test_certificate_enc_type_code(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) cert = identity.certificate assert cert.get_enc_type_code() == EncType.ECIES_X25519.code # 4 def test_serialization_roundtrip(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) raw = identity.to_bytes() assert len(raw) >= 387 restored = RouterIdentity.from_bytes(raw) assert restored.to_bytes() == raw def test_signing_key_matches(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) assert identity.signing_public_key.to_bytes() == keys["signing_public"] def test_public_key_matches(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) assert identity.public_key.to_bytes() == ntcp2_pub class TestBuildRouterInfoFunc: """Tests for build_router_info() standalone function.""" @pytest.fixture def router_data(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) return { "identity": identity, "signing_private": keys["signing_private"], "ntcp2_static_pub": ntcp2_pub, "ntcp2_iv": keys["ntcp2_iv"], } def test_returns_router_info(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) assert isinstance(ri, RouterInfo) def test_has_one_ntcp2_address(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) assert len(ri.addresses) == 1 assert ri.addresses[0].transport == "NTCP2" def test_address_has_host_and_port(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="10.0.0.1", port=12345, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) opts = ri.addresses[0].options assert opts["host"] == "10.0.0.1" assert opts["port"] == "12345" def test_address_has_base64_static_key(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) s_value = ri.addresses[0].options["s"] decoded = base64.b64decode(s_value, altchars=b"-~") assert decoded == router_data["ntcp2_static_pub"] assert len(decoded) == 32 def test_address_has_base64_iv(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) i_value = ri.addresses[0].options["i"] decoded = base64.b64decode(i_value, altchars=b"-~") assert decoded == router_data["ntcp2_iv"] assert len(decoded) == 16 def test_address_has_version_2(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) assert ri.addresses[0].options["v"] == "2" def test_signature_is_valid(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) assert ri.verify() def test_roundtrip_serialization(self, router_data): ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) raw = ri.to_bytes() restored = RouterInfo.from_bytes(raw) assert restored.identity == ri.identity assert restored.published == ri.published assert len(restored.addresses) == 1 assert restored.addresses[0] == ri.addresses[0] assert restored.verify() def test_published_is_millisecond_timestamp(self, router_data): before = int(time.time() * 1000) ri = build_router_info( identity=router_data["identity"], signing_private=router_data["signing_private"], host="192.168.1.1", port=9000, ntcp2_static_pub=router_data["ntcp2_static_pub"], ntcp2_iv=router_data["ntcp2_iv"], ) after = int(time.time() * 1000) assert before <= ri.published <= after class TestRouterInfoHash: """Tests for the RouterInfo identity hash (SHA-256 of identity bytes).""" def test_identity_hash_is_sha256(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) identity_bytes = identity.to_bytes() expected_hash = hashlib.sha256(identity_bytes).digest() assert identity.hash() == expected_hash def test_hash_is_32_bytes(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) assert len(identity.hash()) == 32 def test_different_identities_have_different_hashes(self): keys1 = generate_router_keys() keys2 = generate_router_keys() _, pub1 = keys1["ntcp2_static"] _, pub2 = keys2["ntcp2_static"] id1 = build_router_identity(keys1["signing_public"], pub1) id2 = build_router_identity(keys2["signing_public"], pub2) assert id1.hash() != id2.hash() class TestBase64Encoding: """Tests that NTCP2 parameters use standard base64 (not I2P modified).""" def test_s_uses_i2p_base64(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) ri = build_router_info( identity=identity, signing_private=keys["signing_private"], host="127.0.0.1", port=1234, ntcp2_static_pub=ntcp2_pub, ntcp2_iv=keys["ntcp2_iv"], ) s_val = ri.addresses[0].options["s"] # I2P base64 encoding of 32 bytes produces 44 chars assert len(s_val) == 44 decoded = base64.b64decode(s_val, altchars=b"-~") assert decoded == ntcp2_pub def test_i_uses_i2p_base64(self): keys = generate_router_keys() _, ntcp2_pub = keys["ntcp2_static"] identity = build_router_identity(keys["signing_public"], ntcp2_pub) ri = build_router_info( identity=identity, signing_private=keys["signing_private"], host="127.0.0.1", port=1234, ntcp2_static_pub=ntcp2_pub, ntcp2_iv=keys["ntcp2_iv"], ) i_val = ri.addresses[0].options["i"] # I2P base64 encoding of 16 bytes produces 24 chars assert len(i_val) == 24 decoded = base64.b64decode(i_val, altchars=b"-~") assert decoded == keys["ntcp2_iv"] # --------------------------------------------------------------------------- # RouterKeyBundle tests # --------------------------------------------------------------------------- class TestRouterKeyBundle: """Tests for RouterKeyBundle dataclass and persistence.""" def test_generate_produces_all_fields(self): """generate() fills all key fields with correct sizes.""" bundle = RouterKeyBundle.generate() assert len(bundle.signing_private) == 32 assert len(bundle.signing_public) == 32 assert len(bundle.ntcp2_private) == 32 assert len(bundle.ntcp2_public) == 32 assert len(bundle.ntcp2_iv) == 16 def test_generate_produces_valid_signing_keypair(self): """Signing keypair can produce verifiable signatures.""" from i2p_crypto.dsa import DSAEngine bundle = RouterKeyBundle.generate() msg = b"test message" sig = DSAEngine.sign(msg, bundle.signing_private, SigType.EdDSA_SHA512_Ed25519) assert DSAEngine.verify(msg, sig, bundle.signing_public, SigType.EdDSA_SHA512_Ed25519) def test_generate_produces_valid_x25519_keypair(self): """X25519 keypair works for DH exchange.""" bundle = RouterKeyBundle.generate() peer_priv, peer_pub = X25519DH.generate_keypair() shared1 = X25519DH.dh(bundle.ntcp2_private, peer_pub) shared2 = X25519DH.dh(peer_priv, bundle.ntcp2_public) assert shared1 == shared2 def test_generate_different_each_time(self): """Two generated bundles differ.""" b1 = RouterKeyBundle.generate() b2 = RouterKeyBundle.generate() assert b1.signing_public != b2.signing_public assert b1.ntcp2_public != b2.ntcp2_public def test_save_creates_json_file(self, tmp_path): """save() writes a JSON file to the given path.""" bundle = RouterKeyBundle.generate() path = str(tmp_path / "keys.json") bundle.save(path) import json with open(path) as f: data = json.load(f) assert "signing_private" in data assert "signing_public" in data assert "ntcp2_private" in data assert "ntcp2_public" in data assert "ntcp2_iv" in data def test_save_values_are_base64(self, tmp_path): """Saved values are valid base64 strings.""" bundle = RouterKeyBundle.generate() path = str(tmp_path / "keys.json") bundle.save(path) import json with open(path) as f: data = json.load(f) for key in ("signing_private", "signing_public", "ntcp2_private", "ntcp2_public", "ntcp2_iv"): decoded = base64.b64decode(data[key]) assert isinstance(decoded, bytes) def test_load_restores_exact_keys(self, tmp_path): """load() restores the exact same key bytes.""" bundle = RouterKeyBundle.generate() path = str(tmp_path / "keys.json") bundle.save(path) restored = RouterKeyBundle.load(path) assert restored is not None assert restored.signing_private == bundle.signing_private assert restored.signing_public == bundle.signing_public assert restored.ntcp2_private == bundle.ntcp2_private assert restored.ntcp2_public == bundle.ntcp2_public assert restored.ntcp2_iv == bundle.ntcp2_iv def test_load_missing_file_returns_none(self, tmp_path): """load() on a missing file returns None, not an exception.""" path = str(tmp_path / "nonexistent.json") result = RouterKeyBundle.load(path) assert result is None def test_roundtrip_generate_save_load_build(self, tmp_path): """Full roundtrip: generate -> save -> load -> build RouterInfo.""" bundle = RouterKeyBundle.generate() path = str(tmp_path / "keys.json") bundle.save(path) restored = RouterKeyBundle.load(path) assert restored is not None # Build identity and info from restored bundle identity, ri = create_full_router_identity(restored, "10.0.0.1", 9000) assert isinstance(identity, RouterIdentity) assert isinstance(ri, RouterInfo) assert ri.verify() # Also build from original and compare identity hash identity2, ri2 = create_full_router_identity(bundle, "10.0.0.1", 9000) assert identity.hash() == identity2.hash() class TestCreateFullRouterIdentity: """Tests for create_full_router_identity().""" def test_returns_identity_and_info(self): bundle = RouterKeyBundle.generate() identity, ri = create_full_router_identity(bundle, "192.168.1.1", 9000) assert isinstance(identity, RouterIdentity) assert isinstance(ri, RouterInfo) def test_router_info_is_signed(self): bundle = RouterKeyBundle.generate() _, ri = create_full_router_identity(bundle, "192.168.1.1", 9000) assert ri.verify() def test_ntcp2_address_options(self): """NTCP2 address has correct host, port, s, i, v options.""" bundle = RouterKeyBundle.generate() _, ri = create_full_router_identity(bundle, "10.0.0.5", 7777) assert len(ri.addresses) == 1 opts = ri.addresses[0].options assert opts["host"] == "10.0.0.5" assert opts["port"] == "7777" assert opts["v"] == "2" # s is I2P base64 of 32-byte X25519 public key s_decoded = base64.b64decode(opts["s"], altchars=b"-~") assert len(s_decoded) == 32 assert s_decoded == bundle.ntcp2_public # i is I2P base64 of 16-byte IV i_decoded = base64.b64decode(opts["i"], altchars=b"-~") assert len(i_decoded) == 16 assert i_decoded == bundle.ntcp2_iv def test_identity_uses_ecies_x25519(self): bundle = RouterKeyBundle.generate() identity, _ = create_full_router_identity(bundle, "127.0.0.1", 1234) assert identity.public_key.enc_type == EncType.ECIES_X25519 assert identity.signing_public_key.sig_type == SigType.EdDSA_SHA512_Ed25519