"""Tests for HTTPS reseed client.""" import io import os import struct import tempfile import zipfile from unittest.mock import MagicMock, patch, call import asyncio import pytest def _build_su3_bytes(router_infos: list[bytes], signer_id: str = "test@example.com", version: str = "1.0", sig_type_code: int = 0x0006, sig_length: int = 256, content_type: int = 3, file_type: int = 0) -> bytes: """Build a minimal SU3 file wrapping routerInfo-*.dat entries in a ZIP.""" # Build ZIP content zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w") as zf: for i, ri_data in enumerate(router_infos): zf.writestr(f"routerInfo-{i:04d}.dat", ri_data) zip_data = zip_buf.getvalue() version_bytes = version.encode("utf-8") signer_bytes = signer_id.encode("utf-8") content_length = len(zip_data) # Fixed header (40 bytes) header = bytearray(40) header[0:6] = b"I2Psu3" header[6] = 0 # reserved header[7] = 0 # file format version struct.pack_into("!H", header, 8, sig_type_code) struct.pack_into("!H", header, 10, sig_length) header[12] = 0 # reserved header[13] = len(version_bytes) header[14] = 0 # reserved header[15] = len(signer_bytes) struct.pack_into("!Q", header, 16, content_length) header[24] = 0 # reserved header[25] = file_type header[26] = 0 # reserved header[27] = content_type # 28-39: reserved zeros (already zero) data = bytes(header) + version_bytes + signer_bytes + zip_data # Append fake signature data += b"\x00" * sig_length return data class TestReseedCertificateStore: def test_empty_store(self): from i2p_netdb.reseed import ReseedCertificateStore store = ReseedCertificateStore() assert store.get_certificate("nobody") is None def test_add_and_get(self): from i2p_netdb.reseed import ReseedCertificateStore store = ReseedCertificateStore() cert_data = b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----" store.add_certificate("signer@example.com", cert_data) assert store.get_certificate("signer@example.com") == cert_data def test_load_from_directory(self): from i2p_netdb.reseed import ReseedCertificateStore with tempfile.TemporaryDirectory() as td: # Write two .crt files cert1 = b"CERT_ONE" cert2 = b"CERT_TWO" with open(os.path.join(td, "signer1@mail.i2p.crt"), "wb") as f: f.write(cert1) with open(os.path.join(td, "signer2@mail.i2p.crt"), "wb") as f: f.write(cert2) # Write a non-.crt file that should be ignored with open(os.path.join(td, "readme.txt"), "wb") as f: f.write(b"ignore me") store = ReseedCertificateStore(cert_dir=td) assert store.get_certificate("signer1@mail.i2p") == cert1 assert store.get_certificate("signer2@mail.i2p") == cert2 assert store.get_certificate("readme") is None def test_load_from_nonexistent_directory(self): from i2p_netdb.reseed import ReseedCertificateStore # Should not raise, just have an empty store store = ReseedCertificateStore(cert_dir="/nonexistent/path/xyz") assert store.get_certificate("anything") is None class TestReseedClientConstruction: def test_default_urls(self): from i2p_netdb.reseed import ReseedClient, DEFAULT_RESEED_URLS client = ReseedClient() assert len(client._urls) == len(DEFAULT_RESEED_URLS) def test_custom_urls(self): from i2p_netdb.reseed import ReseedClient urls = ["https://a.example.com", "https://b.example.com"] client = ReseedClient(reseed_urls=urls) assert client._urls == urls def test_default_reseed_urls_count(self): from i2p_netdb.reseed import DEFAULT_RESEED_URLS assert len(DEFAULT_RESEED_URLS) == 13 def test_default_parameters(self): from i2p_netdb.reseed import ReseedClient client = ReseedClient() assert client._target_count == 100 assert client._min_servers == 2 assert client._timeout == 420 assert client._max_ri_size == 4096 class TestFetchSU3: def test_fetch_su3_success(self): from i2p_netdb.reseed import ReseedClient fake_su3 = _build_su3_bytes([b"routerinfo_data_1"]) client = ReseedClient(reseed_urls=["https://reseed.example.com"]) mock_response = MagicMock() mock_response.read.return_value = fake_su3 mock_response.status = 200 mock_response.__enter__ = MagicMock(return_value=mock_response) mock_response.__exit__ = MagicMock(return_value=False) with patch("urllib.request.urlopen", return_value=mock_response) as mock_open: result = client._fetch_su3("https://reseed.example.com") assert result == fake_su3 # Check URL has the correct path req_arg = mock_open.call_args[0][0] assert "/i2pseeds.su3" in req_arg.full_url assert "netid=2" in req_arg.full_url def test_fetch_su3_404(self): from i2p_netdb.reseed import ReseedClient import urllib.error client = ReseedClient(reseed_urls=["https://reseed.example.com"]) with patch("urllib.request.urlopen", side_effect=urllib.error.HTTPError( "https://reseed.example.com", 404, "Not Found", {}, None)): with pytest.raises(urllib.error.HTTPError): client._fetch_su3("https://reseed.example.com") def test_fetch_su3_timeout(self): from i2p_netdb.reseed import ReseedClient import urllib.error client = ReseedClient(reseed_urls=["https://reseed.example.com"], timeout=5) with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("timed out")): with pytest.raises(urllib.error.URLError): client._fetch_su3("https://reseed.example.com") class TestExtractRouterInfos: def test_extract_valid_ris(self): from i2p_netdb.reseed import ReseedClient ri1 = os.urandom(500) ri2 = os.urandom(300) su3_data = _build_su3_bytes([ri1, ri2]) client = ReseedClient() result = client._extract_router_infos(su3_data) assert len(result) == 2 assert ri1 in result assert ri2 in result def test_extract_filters_oversized_ris(self): from i2p_netdb.reseed import ReseedClient small_ri = os.urandom(100) large_ri = os.urandom(5000) # > default max_ri_size (4096) su3_data = _build_su3_bytes([small_ri, large_ri]) client = ReseedClient(max_ri_size=4096) result = client._extract_router_infos(su3_data) assert len(result) == 1 assert result[0] == small_ri def test_extract_invalid_su3_data(self): from i2p_netdb.reseed import ReseedClient client = ReseedClient() with pytest.raises(ValueError): client._extract_router_infos(b"this is not valid su3 data") def test_extract_non_routerinfo_files_ignored(self): """ZIP files that don't match routerInfo-*.dat should be ignored.""" from i2p_netdb.reseed import ReseedClient # Build SU3 with a mix of routerInfo and non-routerInfo files ri_data = os.urandom(200) zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w") as zf: zf.writestr("routerInfo-0001.dat", ri_data) zf.writestr("other-file.txt", b"not a router info") zf.writestr("metadata.json", b"{}") zip_data = zip_buf.getvalue() # Build SU3 manually with this ZIP version = b"1.0" signer = b"test@example.com" sig_length = 256 header = bytearray(40) header[0:6] = b"I2Psu3" header[7] = 0 struct.pack_into("!H", header, 8, 0x0006) struct.pack_into("!H", header, 10, sig_length) header[13] = len(version) header[15] = len(signer) struct.pack_into("!Q", header, 16, len(zip_data)) header[25] = 0 # TYPE_ZIP header[27] = 3 # CONTENT_RESEED su3_data = bytes(header) + version + signer + zip_data + (b"\x00" * sig_length) client = ReseedClient() result = client._extract_router_infos(su3_data) assert len(result) == 1 assert result[0] == ri_data class TestReseedFallback: def test_tries_next_server_on_failure(self): """When first server fails, client should try the next one.""" from i2p_netdb.reseed import ReseedClient import urllib.error ri_data = os.urandom(200) su3_data = _build_su3_bytes([ri_data]) client = ReseedClient( reseed_urls=["https://fail.example.com", "https://good.example.com"], target_count=1, min_servers=1, ) mock_response = MagicMock() mock_response.read.return_value = su3_data mock_response.status = 200 mock_response.__enter__ = MagicMock(return_value=mock_response) mock_response.__exit__ = MagicMock(return_value=False) def side_effect(req, **kwargs): url = req.full_url if hasattr(req, 'full_url') else str(req) if "fail.example.com" in url: raise urllib.error.URLError("connection refused") return mock_response with patch("urllib.request.urlopen", side_effect=side_effect): with patch("random.shuffle"): # Don't shuffle, keep order deterministic result = asyncio.run(client.reseed()) assert len(result) >= 1 assert ri_data in result def test_all_servers_fail_returns_empty(self): from i2p_netdb.reseed import ReseedClient import urllib.error client = ReseedClient( reseed_urls=["https://a.fail", "https://b.fail"], target_count=1, min_servers=1, ) with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("refused")): with patch("random.shuffle"): result = asyncio.run(client.reseed()) assert result == [] class TestMinimumPeersThreshold: def test_stops_after_target_count(self): """Should stop collecting once target_count RIs are gathered.""" from i2p_netdb.reseed import ReseedClient # Each server yields 60 RIs; target is 100, min_servers=1 # After two servers we have 120 >= 100, should stop ris_server1 = [os.urandom(100) for _ in range(60)] ris_server2 = [os.urandom(100) for _ in range(60)] ris_server3 = [os.urandom(100) for _ in range(60)] su3_1 = _build_su3_bytes(ris_server1) su3_2 = _build_su3_bytes(ris_server2) su3_3 = _build_su3_bytes(ris_server3) client = ReseedClient( reseed_urls=[ "https://s1.example.com", "https://s2.example.com", "https://s3.example.com", ], target_count=100, min_servers=1, ) call_count = 0 su3_list = [su3_1, su3_2, su3_3] def side_effect(req, **kwargs): nonlocal call_count idx = call_count call_count += 1 mock_resp = MagicMock() mock_resp.read.return_value = su3_list[idx] mock_resp.status = 200 mock_resp.__enter__ = MagicMock(return_value=mock_resp) mock_resp.__exit__ = MagicMock(return_value=False) return mock_resp with patch("urllib.request.urlopen", side_effect=side_effect): with patch("random.shuffle"): result = asyncio.run(client.reseed()) # Should have 120 RIs (from 2 servers) and not hit the 3rd assert len(result) == 120 assert call_count == 2 def test_min_servers_enforced(self): """Must contact at least min_servers even if target_count reached early.""" from i2p_netdb.reseed import ReseedClient # 150 RIs from server1 (already > target of 100), but min_servers=2 ris_server1 = [os.urandom(100) for _ in range(150)] ris_server2 = [os.urandom(100) for _ in range(10)] su3_1 = _build_su3_bytes(ris_server1) su3_2 = _build_su3_bytes(ris_server2) client = ReseedClient( reseed_urls=["https://s1.example.com", "https://s2.example.com"], target_count=100, min_servers=2, ) call_count = 0 su3_list = [su3_1, su3_2] def side_effect(req, **kwargs): nonlocal call_count idx = call_count call_count += 1 mock_resp = MagicMock() mock_resp.read.return_value = su3_list[idx] mock_resp.status = 200 mock_resp.__enter__ = MagicMock(return_value=mock_resp) mock_resp.__exit__ = MagicMock(return_value=False) return mock_resp with patch("urllib.request.urlopen", side_effect=side_effect): with patch("random.shuffle"): result = asyncio.run(client.reseed()) assert call_count == 2 assert len(result) == 160 class TestDefaultReseedURLs: def test_all_urls_are_https(self): from i2p_netdb.reseed import DEFAULT_RESEED_URLS for url in DEFAULT_RESEED_URLS: assert url.startswith("https://"), f"URL not HTTPS: {url}" def test_urls_are_unique(self): from i2p_netdb.reseed import DEFAULT_RESEED_URLS assert len(DEFAULT_RESEED_URLS) == len(set(DEFAULT_RESEED_URLS)) def test_known_servers_present(self): from i2p_netdb.reseed import DEFAULT_RESEED_URLS urls_joined = " ".join(DEFAULT_RESEED_URLS) assert "stormycloud.org" in urls_joined assert "diva.exchange" in urls_joined assert "memcpy.io" in urls_joined # --------------------------------------------------------------------------- # ReseedManager tests — bootstrap lifecycle # --------------------------------------------------------------------------- class TestReseedManagerParseResponse: def test_parse_reseed_response_base64_format(self): """Parse newline-separated base64-encoded RouterInfo stubs.""" import base64 from i2p_netdb.reseed import ReseedManager ri1 = os.urandom(200) ri2 = os.urandom(300) payload = base64.b64encode(ri1) + b"\n" + base64.b64encode(ri2) + b"\n" mgr = ReseedManager() result = mgr.parse_reseed_response(payload) assert len(result) == 2 assert result[0] == ri1 assert result[1] == ri2 def test_parse_empty_response(self): from i2p_netdb.reseed import ReseedManager mgr = ReseedManager() result = mgr.parse_reseed_response(b"") assert result == [] def test_parse_ignores_blank_lines(self): import base64 from i2p_netdb.reseed import ReseedManager ri1 = os.urandom(100) payload = b"\n\n" + base64.b64encode(ri1) + b"\n\n" mgr = ReseedManager() result = mgr.parse_reseed_response(payload) assert len(result) == 1 class TestReseedManagerBootstrap: def test_needs_reseed_when_empty(self): from i2p_netdb.reseed import ReseedManager mgr = ReseedManager() assert mgr.needs_reseed(datastore_count=0) def test_needs_reseed_below_threshold(self): from i2p_netdb.reseed import ReseedManager mgr = ReseedManager() assert mgr.needs_reseed(datastore_count=10) assert mgr.needs_reseed(datastore_count=24) def test_no_reseed_above_threshold(self): from i2p_netdb.reseed import ReseedManager mgr = ReseedManager() assert not mgr.needs_reseed(datastore_count=25) assert not mgr.needs_reseed(datastore_count=100) def test_mark_reseeded_prevents_further_reseed(self): from i2p_netdb.reseed import ReseedManager mgr = ReseedManager() assert mgr.needs_reseed(datastore_count=0) mgr.mark_reseeded() assert not mgr.needs_reseed(datastore_count=0) assert mgr.is_reseeded def test_reseed_with_no_urls_returns_empty(self): from i2p_netdb.reseed import ReseedManager mgr = ReseedManager(reseed_urls=[]) result = mgr.parse_reseed_response(b"") assert result == [] def test_is_reseeded_initially_false(self): from i2p_netdb.reseed import ReseedManager mgr = ReseedManager() assert not mgr.is_reseeded