A Python port of the Invisible Internet Project (I2P)
at main 673 lines 27 kB view raw
1"""Tests for router identity generation and RouterInfo building. 2 3TDD tests — written before implementation. 4""" 5 6import base64 7import hashlib 8import time 9 10import pytest 11 12from i2p_data.router import RouterIdentity, RouterAddress, RouterInfo 13from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 14from i2p_data.certificate import Certificate, KeyCertificate, CertificateType 15from i2p_crypto.dsa import SigType, KeyGenerator, DSAEngine 16from i2p_crypto.x25519 import X25519DH 17from i2p_router.identity import ( 18 RouterIdentityGenerator, 19 RouterInfoBuilder, 20 RouterKeyBundle, 21 generate_router_keys, 22 build_router_identity, 23 build_router_info, 24 create_full_router_identity, 25) 26 27 28class TestRouterIdentityGenerator: 29 """Tests for RouterIdentityGenerator.generate().""" 30 31 def test_generate_returns_router_identity_and_private_keys(self): 32 """generate() returns a (RouterIdentity, dict) tuple.""" 33 identity, private_keys = RouterIdentityGenerator.generate() 34 assert isinstance(identity, RouterIdentity) 35 assert isinstance(private_keys, dict) 36 assert "elgamal_private" in private_keys 37 assert "signing_private" in private_keys 38 39 def test_generate_default_uses_eddsa(self): 40 """Default sig_type is EdDSA_SHA512_Ed25519.""" 41 identity, private_keys = RouterIdentityGenerator.generate() 42 assert identity.signing_public_key.sig_type == SigType.EdDSA_SHA512_Ed25519 43 44 def test_generate_eddsa_has_key_certificate(self): 45 """EdDSA identity must have a KeyCertificate (type=KEY).""" 46 identity, _ = RouterIdentityGenerator.generate() 47 assert isinstance(identity.certificate, KeyCertificate) 48 assert identity.certificate.cert_type == CertificateType.KEY 49 50 def test_generate_elgamal_public_key_length(self): 51 """ElGamal public key is 256 bytes.""" 52 identity, _ = RouterIdentityGenerator.generate() 53 assert len(identity.public_key.to_bytes()) == 256 54 assert identity.public_key.enc_type == EncType.ELGAMAL 55 56 def test_generate_eddsa_signing_key_length(self): 57 """EdDSA signing public key is 32 bytes.""" 58 identity, _ = RouterIdentityGenerator.generate() 59 assert len(identity.signing_public_key.to_bytes()) == 32 60 61 def test_generate_eddsa_private_keys_lengths(self): 62 """Private keys have correct lengths.""" 63 _, private_keys = RouterIdentityGenerator.generate() 64 assert len(private_keys["elgamal_private"]) == 256 65 assert len(private_keys["signing_private"]) == 32 66 67 def test_generate_dsa_sha1_fallback(self): 68 """Can generate with DSA_SHA1 sig_type.""" 69 identity, private_keys = RouterIdentityGenerator.generate(sig_type=SigType.DSA_SHA1) 70 assert identity.signing_public_key.sig_type == SigType.DSA_SHA1 71 assert len(identity.signing_public_key.to_bytes()) == 128 72 assert isinstance(identity.certificate, Certificate) 73 # DSA_SHA1 uses NULL certificate (not KeyCertificate) 74 assert identity.certificate.cert_type == CertificateType.NULL 75 76 def test_generate_dsa_sha1_private_key_length(self): 77 """DSA_SHA1 private signing key is 20 bytes.""" 78 _, private_keys = RouterIdentityGenerator.generate(sig_type=SigType.DSA_SHA1) 79 assert len(private_keys["signing_private"]) == 20 80 81 def test_generate_identity_serializes_correctly(self): 82 """Generated identity serializes to at least 387 bytes.""" 83 identity, _ = RouterIdentityGenerator.generate() 84 data = identity.to_bytes() 85 # 256 (pub area) + 128 (sig area) + cert (at least 3 bytes) 86 assert len(data) >= 387 87 88 def test_generate_identity_hash_is_32_bytes(self): 89 """Identity hash (SHA-256) is 32 bytes.""" 90 identity, _ = RouterIdentityGenerator.generate() 91 h = identity.hash() 92 assert len(h) == 32 93 94 def test_generate_two_identities_differ(self): 95 """Two generated identities should have different hashes.""" 96 id1, _ = RouterIdentityGenerator.generate() 97 id2, _ = RouterIdentityGenerator.generate() 98 assert id1.hash() != id2.hash() 99 100 101class TestRouterInfoBuilder: 102 """Tests for RouterInfoBuilder.""" 103 104 @pytest.fixture 105 def identity_and_keys(self): 106 """Generate a fresh identity for builder tests.""" 107 return RouterIdentityGenerator.generate() 108 109 def test_build_creates_router_info(self, identity_and_keys): 110 """build() returns a RouterInfo instance.""" 111 identity, private_keys = identity_and_keys 112 builder = RouterInfoBuilder(identity, private_keys["signing_private"]) 113 info = builder.build() 114 assert isinstance(info, RouterInfo) 115 116 def test_build_has_correct_identity(self, identity_and_keys): 117 """Built RouterInfo contains the same identity.""" 118 identity, private_keys = identity_and_keys 119 info = RouterInfoBuilder(identity, private_keys["signing_private"]).build() 120 assert info.identity == identity 121 122 def test_build_has_published_timestamp(self, identity_and_keys): 123 """Built RouterInfo has a reasonable published timestamp.""" 124 identity, private_keys = identity_and_keys 125 before_ms = int(time.time() * 1000) 126 info = RouterInfoBuilder(identity, private_keys["signing_private"]).build() 127 after_ms = int(time.time() * 1000) 128 assert before_ms <= info.published <= after_ms 129 130 def test_add_ntcp2_address(self, identity_and_keys): 131 """NTCP2 address is added with correct properties.""" 132 identity, private_keys = identity_and_keys 133 static_key = b"\xaa" * 32 134 info = ( 135 RouterInfoBuilder(identity, private_keys["signing_private"]) 136 .add_ntcp2_address("192.168.1.1", 9000, static_key) 137 .build() 138 ) 139 assert len(info.addresses) == 1 140 addr = info.addresses[0] 141 assert addr.transport == "NTCP2" 142 assert addr.cost == 10 143 assert addr.expiration == 0 144 opts = addr.options 145 assert opts["host"] == "192.168.1.1" 146 assert opts["port"] == "9000" 147 import base64 148 assert opts["s"] == base64.b64encode(static_key).decode("ascii") 149 150 def test_chaining_multiple_addresses(self, identity_and_keys): 151 """add_ntcp2_address returns self for chaining.""" 152 identity, private_keys = identity_and_keys 153 builder = RouterInfoBuilder(identity, private_keys["signing_private"]) 154 result = builder.add_ntcp2_address("10.0.0.1", 8000, b"\xbb" * 32) 155 assert result is builder 156 result2 = builder.add_ntcp2_address("10.0.0.2", 8001, b"\xcc" * 32) 157 assert result2 is builder 158 info = builder.build() 159 assert len(info.addresses) == 2 160 161 def test_set_options(self, identity_and_keys): 162 """set_options stores options in RouterInfo.""" 163 identity, private_keys = identity_and_keys 164 info = ( 165 RouterInfoBuilder(identity, private_keys["signing_private"]) 166 .set_options({"router.version": "0.9.62", "caps": "XfR"}) 167 .build() 168 ) 169 assert info.options["router.version"] == "0.9.62" 170 assert info.options["caps"] == "XfR" 171 172 def test_set_options_returns_self(self, identity_and_keys): 173 """set_options returns self for chaining.""" 174 identity, private_keys = identity_and_keys 175 builder = RouterInfoBuilder(identity, private_keys["signing_private"]) 176 result = builder.set_options({"key": "val"}) 177 assert result is builder 178 179 def test_signature_verifies(self, identity_and_keys): 180 """Built RouterInfo has a valid signature.""" 181 identity, private_keys = identity_and_keys 182 info = ( 183 RouterInfoBuilder(identity, private_keys["signing_private"]) 184 .add_ntcp2_address("127.0.0.1", 9001, b"\xdd" * 32) 185 .set_options({"router.version": "0.9.62"}) 186 .build() 187 ) 188 assert info.verify() is True 189 190 def test_serialization_roundtrip(self, identity_and_keys): 191 """RouterInfo survives to_bytes -> from_bytes roundtrip.""" 192 identity, private_keys = identity_and_keys 193 info = ( 194 RouterInfoBuilder(identity, private_keys["signing_private"]) 195 .add_ntcp2_address("10.0.0.1", 7654, b"\xee" * 32) 196 .set_options({"caps": "Lf"}) 197 .build() 198 ) 199 data = info.to_bytes() 200 restored = RouterInfo.from_bytes(data) 201 assert restored.identity == info.identity 202 assert restored.published == info.published 203 assert len(restored.addresses) == len(info.addresses) 204 assert restored.addresses[0] == info.addresses[0] 205 assert restored.options == info.options 206 assert restored.verify() is True 207 208 209# --------------------------------------------------------------------------- 210# New tests for generate_router_keys / build_router_identity / build_router_info 211# --------------------------------------------------------------------------- 212 213 214class TestGenerateRouterKeys: 215 """Tests for generate_router_keys().""" 216 217 def test_returns_dict_with_expected_keys(self): 218 keys = generate_router_keys() 219 assert "signing_private" in keys 220 assert "signing_public" in keys 221 assert "ntcp2_static" in keys 222 assert "ntcp2_iv" in keys 223 224 def test_signing_keys_are_ed25519_size(self): 225 keys = generate_router_keys() 226 assert len(keys["signing_public"]) == 32 227 assert len(keys["signing_private"]) == 32 228 229 def test_ntcp2_static_is_x25519_keypair(self): 230 keys = generate_router_keys() 231 priv, pub = keys["ntcp2_static"] 232 assert len(priv) == 32 233 assert len(pub) == 32 234 235 def test_ntcp2_iv_is_16_bytes(self): 236 keys = generate_router_keys() 237 assert len(keys["ntcp2_iv"]) == 16 238 239 def test_signing_keypair_is_valid(self): 240 """The signing keypair should produce verifiable signatures.""" 241 keys = generate_router_keys() 242 message = b"test message for signing" 243 sig = DSAEngine.sign(message, keys["signing_private"], 244 SigType.EdDSA_SHA512_Ed25519) 245 assert DSAEngine.verify(message, sig, keys["signing_public"], 246 SigType.EdDSA_SHA512_Ed25519) 247 248 def test_ntcp2_keypair_works_for_dh(self): 249 """The X25519 keypair should work for Diffie-Hellman exchange.""" 250 keys = generate_router_keys() 251 priv, pub = keys["ntcp2_static"] 252 peer_priv, peer_pub = X25519DH.generate_keypair() 253 shared1 = X25519DH.dh(priv, peer_pub) 254 shared2 = X25519DH.dh(peer_priv, pub) 255 assert shared1 == shared2 256 257 def test_each_call_produces_different_keys(self): 258 keys1 = generate_router_keys() 259 keys2 = generate_router_keys() 260 assert keys1["signing_public"] != keys2["signing_public"] 261 assert keys1["ntcp2_iv"] != keys2["ntcp2_iv"] 262 263 264class TestBuildRouterIdentity: 265 """Tests for build_router_identity().""" 266 267 def test_returns_router_identity(self): 268 keys = generate_router_keys() 269 _, ntcp2_pub = keys["ntcp2_static"] 270 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 271 assert isinstance(identity, RouterIdentity) 272 273 def test_has_eddsa_sig_type(self): 274 keys = generate_router_keys() 275 _, ntcp2_pub = keys["ntcp2_static"] 276 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 277 assert identity.signing_public_key.sig_type == SigType.EdDSA_SHA512_Ed25519 278 279 def test_has_ecies_x25519_enc_type(self): 280 keys = generate_router_keys() 281 _, ntcp2_pub = keys["ntcp2_static"] 282 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 283 assert identity.public_key.enc_type == EncType.ECIES_X25519 284 285 def test_certificate_is_key_certificate(self): 286 keys = generate_router_keys() 287 _, ntcp2_pub = keys["ntcp2_static"] 288 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 289 cert = identity.certificate 290 assert isinstance(cert, KeyCertificate) 291 assert cert.cert_type == CertificateType.KEY 292 293 def test_certificate_sig_type_code(self): 294 keys = generate_router_keys() 295 _, ntcp2_pub = keys["ntcp2_static"] 296 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 297 cert = identity.certificate 298 assert cert.get_sig_type_code() == SigType.EdDSA_SHA512_Ed25519.code # 7 299 300 def test_certificate_enc_type_code(self): 301 keys = generate_router_keys() 302 _, ntcp2_pub = keys["ntcp2_static"] 303 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 304 cert = identity.certificate 305 assert cert.get_enc_type_code() == EncType.ECIES_X25519.code # 4 306 307 def test_serialization_roundtrip(self): 308 keys = generate_router_keys() 309 _, ntcp2_pub = keys["ntcp2_static"] 310 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 311 raw = identity.to_bytes() 312 assert len(raw) >= 387 313 restored = RouterIdentity.from_bytes(raw) 314 assert restored.to_bytes() == raw 315 316 def test_signing_key_matches(self): 317 keys = generate_router_keys() 318 _, ntcp2_pub = keys["ntcp2_static"] 319 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 320 assert identity.signing_public_key.to_bytes() == keys["signing_public"] 321 322 def test_public_key_matches(self): 323 keys = generate_router_keys() 324 _, ntcp2_pub = keys["ntcp2_static"] 325 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 326 assert identity.public_key.to_bytes() == ntcp2_pub 327 328 329class TestBuildRouterInfoFunc: 330 """Tests for build_router_info() standalone function.""" 331 332 @pytest.fixture 333 def router_data(self): 334 keys = generate_router_keys() 335 _, ntcp2_pub = keys["ntcp2_static"] 336 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 337 return { 338 "identity": identity, 339 "signing_private": keys["signing_private"], 340 "ntcp2_static_pub": ntcp2_pub, 341 "ntcp2_iv": keys["ntcp2_iv"], 342 } 343 344 def test_returns_router_info(self, router_data): 345 ri = build_router_info( 346 identity=router_data["identity"], 347 signing_private=router_data["signing_private"], 348 host="192.168.1.1", 349 port=9000, 350 ntcp2_static_pub=router_data["ntcp2_static_pub"], 351 ntcp2_iv=router_data["ntcp2_iv"], 352 ) 353 assert isinstance(ri, RouterInfo) 354 355 def test_has_one_ntcp2_address(self, router_data): 356 ri = build_router_info( 357 identity=router_data["identity"], 358 signing_private=router_data["signing_private"], 359 host="192.168.1.1", 360 port=9000, 361 ntcp2_static_pub=router_data["ntcp2_static_pub"], 362 ntcp2_iv=router_data["ntcp2_iv"], 363 ) 364 assert len(ri.addresses) == 1 365 assert ri.addresses[0].transport == "NTCP2" 366 367 def test_address_has_host_and_port(self, router_data): 368 ri = build_router_info( 369 identity=router_data["identity"], 370 signing_private=router_data["signing_private"], 371 host="10.0.0.1", 372 port=12345, 373 ntcp2_static_pub=router_data["ntcp2_static_pub"], 374 ntcp2_iv=router_data["ntcp2_iv"], 375 ) 376 opts = ri.addresses[0].options 377 assert opts["host"] == "10.0.0.1" 378 assert opts["port"] == "12345" 379 380 def test_address_has_base64_static_key(self, router_data): 381 ri = build_router_info( 382 identity=router_data["identity"], 383 signing_private=router_data["signing_private"], 384 host="192.168.1.1", 385 port=9000, 386 ntcp2_static_pub=router_data["ntcp2_static_pub"], 387 ntcp2_iv=router_data["ntcp2_iv"], 388 ) 389 s_value = ri.addresses[0].options["s"] 390 decoded = base64.b64decode(s_value, altchars=b"-~") 391 assert decoded == router_data["ntcp2_static_pub"] 392 assert len(decoded) == 32 393 394 def test_address_has_base64_iv(self, router_data): 395 ri = build_router_info( 396 identity=router_data["identity"], 397 signing_private=router_data["signing_private"], 398 host="192.168.1.1", 399 port=9000, 400 ntcp2_static_pub=router_data["ntcp2_static_pub"], 401 ntcp2_iv=router_data["ntcp2_iv"], 402 ) 403 i_value = ri.addresses[0].options["i"] 404 decoded = base64.b64decode(i_value, altchars=b"-~") 405 assert decoded == router_data["ntcp2_iv"] 406 assert len(decoded) == 16 407 408 def test_address_has_version_2(self, router_data): 409 ri = build_router_info( 410 identity=router_data["identity"], 411 signing_private=router_data["signing_private"], 412 host="192.168.1.1", 413 port=9000, 414 ntcp2_static_pub=router_data["ntcp2_static_pub"], 415 ntcp2_iv=router_data["ntcp2_iv"], 416 ) 417 assert ri.addresses[0].options["v"] == "2" 418 419 def test_signature_is_valid(self, router_data): 420 ri = build_router_info( 421 identity=router_data["identity"], 422 signing_private=router_data["signing_private"], 423 host="192.168.1.1", 424 port=9000, 425 ntcp2_static_pub=router_data["ntcp2_static_pub"], 426 ntcp2_iv=router_data["ntcp2_iv"], 427 ) 428 assert ri.verify() 429 430 def test_roundtrip_serialization(self, router_data): 431 ri = build_router_info( 432 identity=router_data["identity"], 433 signing_private=router_data["signing_private"], 434 host="192.168.1.1", 435 port=9000, 436 ntcp2_static_pub=router_data["ntcp2_static_pub"], 437 ntcp2_iv=router_data["ntcp2_iv"], 438 ) 439 raw = ri.to_bytes() 440 restored = RouterInfo.from_bytes(raw) 441 assert restored.identity == ri.identity 442 assert restored.published == ri.published 443 assert len(restored.addresses) == 1 444 assert restored.addresses[0] == ri.addresses[0] 445 assert restored.verify() 446 447 def test_published_is_millisecond_timestamp(self, router_data): 448 before = int(time.time() * 1000) 449 ri = build_router_info( 450 identity=router_data["identity"], 451 signing_private=router_data["signing_private"], 452 host="192.168.1.1", 453 port=9000, 454 ntcp2_static_pub=router_data["ntcp2_static_pub"], 455 ntcp2_iv=router_data["ntcp2_iv"], 456 ) 457 after = int(time.time() * 1000) 458 assert before <= ri.published <= after 459 460 461class TestRouterInfoHash: 462 """Tests for the RouterInfo identity hash (SHA-256 of identity bytes).""" 463 464 def test_identity_hash_is_sha256(self): 465 keys = generate_router_keys() 466 _, ntcp2_pub = keys["ntcp2_static"] 467 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 468 identity_bytes = identity.to_bytes() 469 expected_hash = hashlib.sha256(identity_bytes).digest() 470 assert identity.hash() == expected_hash 471 472 def test_hash_is_32_bytes(self): 473 keys = generate_router_keys() 474 _, ntcp2_pub = keys["ntcp2_static"] 475 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 476 assert len(identity.hash()) == 32 477 478 def test_different_identities_have_different_hashes(self): 479 keys1 = generate_router_keys() 480 keys2 = generate_router_keys() 481 _, pub1 = keys1["ntcp2_static"] 482 _, pub2 = keys2["ntcp2_static"] 483 id1 = build_router_identity(keys1["signing_public"], pub1) 484 id2 = build_router_identity(keys2["signing_public"], pub2) 485 assert id1.hash() != id2.hash() 486 487 488class TestBase64Encoding: 489 """Tests that NTCP2 parameters use standard base64 (not I2P modified).""" 490 491 def test_s_uses_i2p_base64(self): 492 keys = generate_router_keys() 493 _, ntcp2_pub = keys["ntcp2_static"] 494 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 495 ri = build_router_info( 496 identity=identity, 497 signing_private=keys["signing_private"], 498 host="127.0.0.1", 499 port=1234, 500 ntcp2_static_pub=ntcp2_pub, 501 ntcp2_iv=keys["ntcp2_iv"], 502 ) 503 s_val = ri.addresses[0].options["s"] 504 # I2P base64 encoding of 32 bytes produces 44 chars 505 assert len(s_val) == 44 506 decoded = base64.b64decode(s_val, altchars=b"-~") 507 assert decoded == ntcp2_pub 508 509 def test_i_uses_i2p_base64(self): 510 keys = generate_router_keys() 511 _, ntcp2_pub = keys["ntcp2_static"] 512 identity = build_router_identity(keys["signing_public"], ntcp2_pub) 513 ri = build_router_info( 514 identity=identity, 515 signing_private=keys["signing_private"], 516 host="127.0.0.1", 517 port=1234, 518 ntcp2_static_pub=ntcp2_pub, 519 ntcp2_iv=keys["ntcp2_iv"], 520 ) 521 i_val = ri.addresses[0].options["i"] 522 # I2P base64 encoding of 16 bytes produces 24 chars 523 assert len(i_val) == 24 524 decoded = base64.b64decode(i_val, altchars=b"-~") 525 assert decoded == keys["ntcp2_iv"] 526 527 528# --------------------------------------------------------------------------- 529# RouterKeyBundle tests 530# --------------------------------------------------------------------------- 531 532 533class TestRouterKeyBundle: 534 """Tests for RouterKeyBundle dataclass and persistence.""" 535 536 def test_generate_produces_all_fields(self): 537 """generate() fills all key fields with correct sizes.""" 538 bundle = RouterKeyBundle.generate() 539 assert len(bundle.signing_private) == 32 540 assert len(bundle.signing_public) == 32 541 assert len(bundle.ntcp2_private) == 32 542 assert len(bundle.ntcp2_public) == 32 543 assert len(bundle.ntcp2_iv) == 16 544 545 def test_generate_produces_valid_signing_keypair(self): 546 """Signing keypair can produce verifiable signatures.""" 547 from i2p_crypto.dsa import DSAEngine 548 bundle = RouterKeyBundle.generate() 549 msg = b"test message" 550 sig = DSAEngine.sign(msg, bundle.signing_private, 551 SigType.EdDSA_SHA512_Ed25519) 552 assert DSAEngine.verify(msg, sig, bundle.signing_public, 553 SigType.EdDSA_SHA512_Ed25519) 554 555 def test_generate_produces_valid_x25519_keypair(self): 556 """X25519 keypair works for DH exchange.""" 557 bundle = RouterKeyBundle.generate() 558 peer_priv, peer_pub = X25519DH.generate_keypair() 559 shared1 = X25519DH.dh(bundle.ntcp2_private, peer_pub) 560 shared2 = X25519DH.dh(peer_priv, bundle.ntcp2_public) 561 assert shared1 == shared2 562 563 def test_generate_different_each_time(self): 564 """Two generated bundles differ.""" 565 b1 = RouterKeyBundle.generate() 566 b2 = RouterKeyBundle.generate() 567 assert b1.signing_public != b2.signing_public 568 assert b1.ntcp2_public != b2.ntcp2_public 569 570 def test_save_creates_json_file(self, tmp_path): 571 """save() writes a JSON file to the given path.""" 572 bundle = RouterKeyBundle.generate() 573 path = str(tmp_path / "keys.json") 574 bundle.save(path) 575 import json 576 with open(path) as f: 577 data = json.load(f) 578 assert "signing_private" in data 579 assert "signing_public" in data 580 assert "ntcp2_private" in data 581 assert "ntcp2_public" in data 582 assert "ntcp2_iv" in data 583 584 def test_save_values_are_base64(self, tmp_path): 585 """Saved values are valid base64 strings.""" 586 bundle = RouterKeyBundle.generate() 587 path = str(tmp_path / "keys.json") 588 bundle.save(path) 589 import json 590 with open(path) as f: 591 data = json.load(f) 592 for key in ("signing_private", "signing_public", "ntcp2_private", 593 "ntcp2_public", "ntcp2_iv"): 594 decoded = base64.b64decode(data[key]) 595 assert isinstance(decoded, bytes) 596 597 def test_load_restores_exact_keys(self, tmp_path): 598 """load() restores the exact same key bytes.""" 599 bundle = RouterKeyBundle.generate() 600 path = str(tmp_path / "keys.json") 601 bundle.save(path) 602 restored = RouterKeyBundle.load(path) 603 assert restored is not None 604 assert restored.signing_private == bundle.signing_private 605 assert restored.signing_public == bundle.signing_public 606 assert restored.ntcp2_private == bundle.ntcp2_private 607 assert restored.ntcp2_public == bundle.ntcp2_public 608 assert restored.ntcp2_iv == bundle.ntcp2_iv 609 610 def test_load_missing_file_returns_none(self, tmp_path): 611 """load() on a missing file returns None, not an exception.""" 612 path = str(tmp_path / "nonexistent.json") 613 result = RouterKeyBundle.load(path) 614 assert result is None 615 616 def test_roundtrip_generate_save_load_build(self, tmp_path): 617 """Full roundtrip: generate -> save -> load -> build RouterInfo.""" 618 bundle = RouterKeyBundle.generate() 619 path = str(tmp_path / "keys.json") 620 bundle.save(path) 621 restored = RouterKeyBundle.load(path) 622 assert restored is not None 623 624 # Build identity and info from restored bundle 625 identity, ri = create_full_router_identity(restored, "10.0.0.1", 9000) 626 assert isinstance(identity, RouterIdentity) 627 assert isinstance(ri, RouterInfo) 628 assert ri.verify() 629 630 # Also build from original and compare identity hash 631 identity2, ri2 = create_full_router_identity(bundle, "10.0.0.1", 9000) 632 assert identity.hash() == identity2.hash() 633 634 635class TestCreateFullRouterIdentity: 636 """Tests for create_full_router_identity().""" 637 638 def test_returns_identity_and_info(self): 639 bundle = RouterKeyBundle.generate() 640 identity, ri = create_full_router_identity(bundle, "192.168.1.1", 9000) 641 assert isinstance(identity, RouterIdentity) 642 assert isinstance(ri, RouterInfo) 643 644 def test_router_info_is_signed(self): 645 bundle = RouterKeyBundle.generate() 646 _, ri = create_full_router_identity(bundle, "192.168.1.1", 9000) 647 assert ri.verify() 648 649 def test_ntcp2_address_options(self): 650 """NTCP2 address has correct host, port, s, i, v options.""" 651 bundle = RouterKeyBundle.generate() 652 _, ri = create_full_router_identity(bundle, "10.0.0.5", 7777) 653 assert len(ri.addresses) == 1 654 opts = ri.addresses[0].options 655 assert opts["host"] == "10.0.0.5" 656 assert opts["port"] == "7777" 657 assert opts["v"] == "2" 658 659 # s is I2P base64 of 32-byte X25519 public key 660 s_decoded = base64.b64decode(opts["s"], altchars=b"-~") 661 assert len(s_decoded) == 32 662 assert s_decoded == bundle.ntcp2_public 663 664 # i is I2P base64 of 16-byte IV 665 i_decoded = base64.b64decode(opts["i"], altchars=b"-~") 666 assert len(i_decoded) == 16 667 assert i_decoded == bundle.ntcp2_iv 668 669 def test_identity_uses_ecies_x25519(self): 670 bundle = RouterKeyBundle.generate() 671 identity, _ = create_full_router_identity(bundle, "127.0.0.1", 1234) 672 assert identity.public_key.enc_type == EncType.ECIES_X25519 673 assert identity.signing_public_key.sig_type == SigType.EdDSA_SHA512_Ed25519