A Python port of the Invisible Internet Project (I2P)
at main 468 lines 17 kB view raw
1"""Tests for HTTPS reseed client.""" 2 3import io 4import os 5import struct 6import tempfile 7import zipfile 8from unittest.mock import MagicMock, patch, call 9import asyncio 10 11import pytest 12 13 14def _build_su3_bytes(router_infos: list[bytes], 15 signer_id: str = "test@example.com", 16 version: str = "1.0", 17 sig_type_code: int = 0x0006, 18 sig_length: int = 256, 19 content_type: int = 3, 20 file_type: int = 0) -> bytes: 21 """Build a minimal SU3 file wrapping routerInfo-*.dat entries in a ZIP.""" 22 # Build ZIP content 23 zip_buf = io.BytesIO() 24 with zipfile.ZipFile(zip_buf, "w") as zf: 25 for i, ri_data in enumerate(router_infos): 26 zf.writestr(f"routerInfo-{i:04d}.dat", ri_data) 27 zip_data = zip_buf.getvalue() 28 29 version_bytes = version.encode("utf-8") 30 signer_bytes = signer_id.encode("utf-8") 31 content_length = len(zip_data) 32 33 # Fixed header (40 bytes) 34 header = bytearray(40) 35 header[0:6] = b"I2Psu3" 36 header[6] = 0 # reserved 37 header[7] = 0 # file format version 38 struct.pack_into("!H", header, 8, sig_type_code) 39 struct.pack_into("!H", header, 10, sig_length) 40 header[12] = 0 # reserved 41 header[13] = len(version_bytes) 42 header[14] = 0 # reserved 43 header[15] = len(signer_bytes) 44 struct.pack_into("!Q", header, 16, content_length) 45 header[24] = 0 # reserved 46 header[25] = file_type 47 header[26] = 0 # reserved 48 header[27] = content_type 49 # 28-39: reserved zeros (already zero) 50 51 data = bytes(header) + version_bytes + signer_bytes + zip_data 52 # Append fake signature 53 data += b"\x00" * sig_length 54 return data 55 56 57class TestReseedCertificateStore: 58 def test_empty_store(self): 59 from i2p_netdb.reseed import ReseedCertificateStore 60 store = ReseedCertificateStore() 61 assert store.get_certificate("nobody") is None 62 63 def test_add_and_get(self): 64 from i2p_netdb.reseed import ReseedCertificateStore 65 store = ReseedCertificateStore() 66 cert_data = b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----" 67 store.add_certificate("signer@example.com", cert_data) 68 assert store.get_certificate("signer@example.com") == cert_data 69 70 def test_load_from_directory(self): 71 from i2p_netdb.reseed import ReseedCertificateStore 72 with tempfile.TemporaryDirectory() as td: 73 # Write two .crt files 74 cert1 = b"CERT_ONE" 75 cert2 = b"CERT_TWO" 76 with open(os.path.join(td, "signer1@mail.i2p.crt"), "wb") as f: 77 f.write(cert1) 78 with open(os.path.join(td, "signer2@mail.i2p.crt"), "wb") as f: 79 f.write(cert2) 80 # Write a non-.crt file that should be ignored 81 with open(os.path.join(td, "readme.txt"), "wb") as f: 82 f.write(b"ignore me") 83 84 store = ReseedCertificateStore(cert_dir=td) 85 assert store.get_certificate("signer1@mail.i2p") == cert1 86 assert store.get_certificate("signer2@mail.i2p") == cert2 87 assert store.get_certificate("readme") is None 88 89 def test_load_from_nonexistent_directory(self): 90 from i2p_netdb.reseed import ReseedCertificateStore 91 # Should not raise, just have an empty store 92 store = ReseedCertificateStore(cert_dir="/nonexistent/path/xyz") 93 assert store.get_certificate("anything") is None 94 95 96class TestReseedClientConstruction: 97 def test_default_urls(self): 98 from i2p_netdb.reseed import ReseedClient, DEFAULT_RESEED_URLS 99 client = ReseedClient() 100 assert len(client._urls) == len(DEFAULT_RESEED_URLS) 101 102 def test_custom_urls(self): 103 from i2p_netdb.reseed import ReseedClient 104 urls = ["https://a.example.com", "https://b.example.com"] 105 client = ReseedClient(reseed_urls=urls) 106 assert client._urls == urls 107 108 def test_default_reseed_urls_count(self): 109 from i2p_netdb.reseed import DEFAULT_RESEED_URLS 110 assert len(DEFAULT_RESEED_URLS) == 13 111 112 def test_default_parameters(self): 113 from i2p_netdb.reseed import ReseedClient 114 client = ReseedClient() 115 assert client._target_count == 100 116 assert client._min_servers == 2 117 assert client._timeout == 420 118 assert client._max_ri_size == 4096 119 120 121class TestFetchSU3: 122 def test_fetch_su3_success(self): 123 from i2p_netdb.reseed import ReseedClient 124 125 fake_su3 = _build_su3_bytes([b"routerinfo_data_1"]) 126 client = ReseedClient(reseed_urls=["https://reseed.example.com"]) 127 128 mock_response = MagicMock() 129 mock_response.read.return_value = fake_su3 130 mock_response.status = 200 131 mock_response.__enter__ = MagicMock(return_value=mock_response) 132 mock_response.__exit__ = MagicMock(return_value=False) 133 134 with patch("urllib.request.urlopen", return_value=mock_response) as mock_open: 135 result = client._fetch_su3("https://reseed.example.com") 136 assert result == fake_su3 137 # Check URL has the correct path 138 req_arg = mock_open.call_args[0][0] 139 assert "/i2pseeds.su3" in req_arg.full_url 140 assert "netid=2" in req_arg.full_url 141 142 def test_fetch_su3_404(self): 143 from i2p_netdb.reseed import ReseedClient 144 import urllib.error 145 146 client = ReseedClient(reseed_urls=["https://reseed.example.com"]) 147 148 with patch("urllib.request.urlopen", 149 side_effect=urllib.error.HTTPError( 150 "https://reseed.example.com", 404, "Not Found", {}, None)): 151 with pytest.raises(urllib.error.HTTPError): 152 client._fetch_su3("https://reseed.example.com") 153 154 def test_fetch_su3_timeout(self): 155 from i2p_netdb.reseed import ReseedClient 156 import urllib.error 157 158 client = ReseedClient(reseed_urls=["https://reseed.example.com"], timeout=5) 159 160 with patch("urllib.request.urlopen", 161 side_effect=urllib.error.URLError("timed out")): 162 with pytest.raises(urllib.error.URLError): 163 client._fetch_su3("https://reseed.example.com") 164 165 166class TestExtractRouterInfos: 167 def test_extract_valid_ris(self): 168 from i2p_netdb.reseed import ReseedClient 169 170 ri1 = os.urandom(500) 171 ri2 = os.urandom(300) 172 su3_data = _build_su3_bytes([ri1, ri2]) 173 174 client = ReseedClient() 175 result = client._extract_router_infos(su3_data) 176 assert len(result) == 2 177 assert ri1 in result 178 assert ri2 in result 179 180 def test_extract_filters_oversized_ris(self): 181 from i2p_netdb.reseed import ReseedClient 182 183 small_ri = os.urandom(100) 184 large_ri = os.urandom(5000) # > default max_ri_size (4096) 185 su3_data = _build_su3_bytes([small_ri, large_ri]) 186 187 client = ReseedClient(max_ri_size=4096) 188 result = client._extract_router_infos(su3_data) 189 assert len(result) == 1 190 assert result[0] == small_ri 191 192 def test_extract_invalid_su3_data(self): 193 from i2p_netdb.reseed import ReseedClient 194 195 client = ReseedClient() 196 with pytest.raises(ValueError): 197 client._extract_router_infos(b"this is not valid su3 data") 198 199 def test_extract_non_routerinfo_files_ignored(self): 200 """ZIP files that don't match routerInfo-*.dat should be ignored.""" 201 from i2p_netdb.reseed import ReseedClient 202 203 # Build SU3 with a mix of routerInfo and non-routerInfo files 204 ri_data = os.urandom(200) 205 zip_buf = io.BytesIO() 206 with zipfile.ZipFile(zip_buf, "w") as zf: 207 zf.writestr("routerInfo-0001.dat", ri_data) 208 zf.writestr("other-file.txt", b"not a router info") 209 zf.writestr("metadata.json", b"{}") 210 zip_data = zip_buf.getvalue() 211 212 # Build SU3 manually with this ZIP 213 version = b"1.0" 214 signer = b"test@example.com" 215 sig_length = 256 216 header = bytearray(40) 217 header[0:6] = b"I2Psu3" 218 header[7] = 0 219 struct.pack_into("!H", header, 8, 0x0006) 220 struct.pack_into("!H", header, 10, sig_length) 221 header[13] = len(version) 222 header[15] = len(signer) 223 struct.pack_into("!Q", header, 16, len(zip_data)) 224 header[25] = 0 # TYPE_ZIP 225 header[27] = 3 # CONTENT_RESEED 226 227 su3_data = bytes(header) + version + signer + zip_data + (b"\x00" * sig_length) 228 229 client = ReseedClient() 230 result = client._extract_router_infos(su3_data) 231 assert len(result) == 1 232 assert result[0] == ri_data 233 234 235class TestReseedFallback: 236 def test_tries_next_server_on_failure(self): 237 """When first server fails, client should try the next one.""" 238 from i2p_netdb.reseed import ReseedClient 239 import urllib.error 240 241 ri_data = os.urandom(200) 242 su3_data = _build_su3_bytes([ri_data]) 243 244 client = ReseedClient( 245 reseed_urls=["https://fail.example.com", "https://good.example.com"], 246 target_count=1, 247 min_servers=1, 248 ) 249 250 mock_response = MagicMock() 251 mock_response.read.return_value = su3_data 252 mock_response.status = 200 253 mock_response.__enter__ = MagicMock(return_value=mock_response) 254 mock_response.__exit__ = MagicMock(return_value=False) 255 256 def side_effect(req, **kwargs): 257 url = req.full_url if hasattr(req, 'full_url') else str(req) 258 if "fail.example.com" in url: 259 raise urllib.error.URLError("connection refused") 260 return mock_response 261 262 with patch("urllib.request.urlopen", side_effect=side_effect): 263 with patch("random.shuffle"): # Don't shuffle, keep order deterministic 264 result = asyncio.run(client.reseed()) 265 assert len(result) >= 1 266 assert ri_data in result 267 268 def test_all_servers_fail_returns_empty(self): 269 from i2p_netdb.reseed import ReseedClient 270 import urllib.error 271 272 client = ReseedClient( 273 reseed_urls=["https://a.fail", "https://b.fail"], 274 target_count=1, 275 min_servers=1, 276 ) 277 278 with patch("urllib.request.urlopen", 279 side_effect=urllib.error.URLError("refused")): 280 with patch("random.shuffle"): 281 result = asyncio.run(client.reseed()) 282 assert result == [] 283 284 285class TestMinimumPeersThreshold: 286 def test_stops_after_target_count(self): 287 """Should stop collecting once target_count RIs are gathered.""" 288 from i2p_netdb.reseed import ReseedClient 289 290 # Each server yields 60 RIs; target is 100, min_servers=1 291 # After two servers we have 120 >= 100, should stop 292 ris_server1 = [os.urandom(100) for _ in range(60)] 293 ris_server2 = [os.urandom(100) for _ in range(60)] 294 ris_server3 = [os.urandom(100) for _ in range(60)] 295 296 su3_1 = _build_su3_bytes(ris_server1) 297 su3_2 = _build_su3_bytes(ris_server2) 298 su3_3 = _build_su3_bytes(ris_server3) 299 300 client = ReseedClient( 301 reseed_urls=[ 302 "https://s1.example.com", 303 "https://s2.example.com", 304 "https://s3.example.com", 305 ], 306 target_count=100, 307 min_servers=1, 308 ) 309 310 call_count = 0 311 su3_list = [su3_1, su3_2, su3_3] 312 313 def side_effect(req, **kwargs): 314 nonlocal call_count 315 idx = call_count 316 call_count += 1 317 mock_resp = MagicMock() 318 mock_resp.read.return_value = su3_list[idx] 319 mock_resp.status = 200 320 mock_resp.__enter__ = MagicMock(return_value=mock_resp) 321 mock_resp.__exit__ = MagicMock(return_value=False) 322 return mock_resp 323 324 with patch("urllib.request.urlopen", side_effect=side_effect): 325 with patch("random.shuffle"): 326 result = asyncio.run(client.reseed()) 327 # Should have 120 RIs (from 2 servers) and not hit the 3rd 328 assert len(result) == 120 329 assert call_count == 2 330 331 def test_min_servers_enforced(self): 332 """Must contact at least min_servers even if target_count reached early.""" 333 from i2p_netdb.reseed import ReseedClient 334 335 # 150 RIs from server1 (already > target of 100), but min_servers=2 336 ris_server1 = [os.urandom(100) for _ in range(150)] 337 ris_server2 = [os.urandom(100) for _ in range(10)] 338 339 su3_1 = _build_su3_bytes(ris_server1) 340 su3_2 = _build_su3_bytes(ris_server2) 341 342 client = ReseedClient( 343 reseed_urls=["https://s1.example.com", "https://s2.example.com"], 344 target_count=100, 345 min_servers=2, 346 ) 347 348 call_count = 0 349 su3_list = [su3_1, su3_2] 350 351 def side_effect(req, **kwargs): 352 nonlocal call_count 353 idx = call_count 354 call_count += 1 355 mock_resp = MagicMock() 356 mock_resp.read.return_value = su3_list[idx] 357 mock_resp.status = 200 358 mock_resp.__enter__ = MagicMock(return_value=mock_resp) 359 mock_resp.__exit__ = MagicMock(return_value=False) 360 return mock_resp 361 362 with patch("urllib.request.urlopen", side_effect=side_effect): 363 with patch("random.shuffle"): 364 result = asyncio.run(client.reseed()) 365 assert call_count == 2 366 assert len(result) == 160 367 368 369class TestDefaultReseedURLs: 370 def test_all_urls_are_https(self): 371 from i2p_netdb.reseed import DEFAULT_RESEED_URLS 372 for url in DEFAULT_RESEED_URLS: 373 assert url.startswith("https://"), f"URL not HTTPS: {url}" 374 375 def test_urls_are_unique(self): 376 from i2p_netdb.reseed import DEFAULT_RESEED_URLS 377 assert len(DEFAULT_RESEED_URLS) == len(set(DEFAULT_RESEED_URLS)) 378 379 def test_known_servers_present(self): 380 from i2p_netdb.reseed import DEFAULT_RESEED_URLS 381 urls_joined = " ".join(DEFAULT_RESEED_URLS) 382 assert "stormycloud.org" in urls_joined 383 assert "diva.exchange" in urls_joined 384 assert "memcpy.io" in urls_joined 385 386 387# --------------------------------------------------------------------------- 388# ReseedManager tests — bootstrap lifecycle 389# --------------------------------------------------------------------------- 390 391 392class TestReseedManagerParseResponse: 393 def test_parse_reseed_response_base64_format(self): 394 """Parse newline-separated base64-encoded RouterInfo stubs.""" 395 import base64 396 from i2p_netdb.reseed import ReseedManager 397 398 ri1 = os.urandom(200) 399 ri2 = os.urandom(300) 400 payload = base64.b64encode(ri1) + b"\n" + base64.b64encode(ri2) + b"\n" 401 402 mgr = ReseedManager() 403 result = mgr.parse_reseed_response(payload) 404 assert len(result) == 2 405 assert result[0] == ri1 406 assert result[1] == ri2 407 408 def test_parse_empty_response(self): 409 from i2p_netdb.reseed import ReseedManager 410 411 mgr = ReseedManager() 412 result = mgr.parse_reseed_response(b"") 413 assert result == [] 414 415 def test_parse_ignores_blank_lines(self): 416 import base64 417 from i2p_netdb.reseed import ReseedManager 418 419 ri1 = os.urandom(100) 420 payload = b"\n\n" + base64.b64encode(ri1) + b"\n\n" 421 422 mgr = ReseedManager() 423 result = mgr.parse_reseed_response(payload) 424 assert len(result) == 1 425 426 427class TestReseedManagerBootstrap: 428 def test_needs_reseed_when_empty(self): 429 from i2p_netdb.reseed import ReseedManager 430 431 mgr = ReseedManager() 432 assert mgr.needs_reseed(datastore_count=0) 433 434 def test_needs_reseed_below_threshold(self): 435 from i2p_netdb.reseed import ReseedManager 436 437 mgr = ReseedManager() 438 assert mgr.needs_reseed(datastore_count=10) 439 assert mgr.needs_reseed(datastore_count=24) 440 441 def test_no_reseed_above_threshold(self): 442 from i2p_netdb.reseed import ReseedManager 443 444 mgr = ReseedManager() 445 assert not mgr.needs_reseed(datastore_count=25) 446 assert not mgr.needs_reseed(datastore_count=100) 447 448 def test_mark_reseeded_prevents_further_reseed(self): 449 from i2p_netdb.reseed import ReseedManager 450 451 mgr = ReseedManager() 452 assert mgr.needs_reseed(datastore_count=0) 453 mgr.mark_reseeded() 454 assert not mgr.needs_reseed(datastore_count=0) 455 assert mgr.is_reseeded 456 457 def test_reseed_with_no_urls_returns_empty(self): 458 from i2p_netdb.reseed import ReseedManager 459 460 mgr = ReseedManager(reseed_urls=[]) 461 result = mgr.parse_reseed_response(b"") 462 assert result == [] 463 464 def test_is_reseeded_initially_false(self): 465 from i2p_netdb.reseed import ReseedManager 466 467 mgr = ReseedManager() 468 assert not mgr.is_reseeded