A Python port of the Invisible Internet Project (I2P)
1"""Tests for HTTPS reseed client."""
2
3import io
4import os
5import struct
6import tempfile
7import zipfile
8from unittest.mock import MagicMock, patch, call
9import asyncio
10
11import pytest
12
13
14def _build_su3_bytes(router_infos: list[bytes],
15 signer_id: str = "test@example.com",
16 version: str = "1.0",
17 sig_type_code: int = 0x0006,
18 sig_length: int = 256,
19 content_type: int = 3,
20 file_type: int = 0) -> bytes:
21 """Build a minimal SU3 file wrapping routerInfo-*.dat entries in a ZIP."""
22 # Build ZIP content
23 zip_buf = io.BytesIO()
24 with zipfile.ZipFile(zip_buf, "w") as zf:
25 for i, ri_data in enumerate(router_infos):
26 zf.writestr(f"routerInfo-{i:04d}.dat", ri_data)
27 zip_data = zip_buf.getvalue()
28
29 version_bytes = version.encode("utf-8")
30 signer_bytes = signer_id.encode("utf-8")
31 content_length = len(zip_data)
32
33 # Fixed header (40 bytes)
34 header = bytearray(40)
35 header[0:6] = b"I2Psu3"
36 header[6] = 0 # reserved
37 header[7] = 0 # file format version
38 struct.pack_into("!H", header, 8, sig_type_code)
39 struct.pack_into("!H", header, 10, sig_length)
40 header[12] = 0 # reserved
41 header[13] = len(version_bytes)
42 header[14] = 0 # reserved
43 header[15] = len(signer_bytes)
44 struct.pack_into("!Q", header, 16, content_length)
45 header[24] = 0 # reserved
46 header[25] = file_type
47 header[26] = 0 # reserved
48 header[27] = content_type
49 # 28-39: reserved zeros (already zero)
50
51 data = bytes(header) + version_bytes + signer_bytes + zip_data
52 # Append fake signature
53 data += b"\x00" * sig_length
54 return data
55
56
57class TestReseedCertificateStore:
58 def test_empty_store(self):
59 from i2p_netdb.reseed import ReseedCertificateStore
60 store = ReseedCertificateStore()
61 assert store.get_certificate("nobody") is None
62
63 def test_add_and_get(self):
64 from i2p_netdb.reseed import ReseedCertificateStore
65 store = ReseedCertificateStore()
66 cert_data = b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----"
67 store.add_certificate("signer@example.com", cert_data)
68 assert store.get_certificate("signer@example.com") == cert_data
69
70 def test_load_from_directory(self):
71 from i2p_netdb.reseed import ReseedCertificateStore
72 with tempfile.TemporaryDirectory() as td:
73 # Write two .crt files
74 cert1 = b"CERT_ONE"
75 cert2 = b"CERT_TWO"
76 with open(os.path.join(td, "signer1@mail.i2p.crt"), "wb") as f:
77 f.write(cert1)
78 with open(os.path.join(td, "signer2@mail.i2p.crt"), "wb") as f:
79 f.write(cert2)
80 # Write a non-.crt file that should be ignored
81 with open(os.path.join(td, "readme.txt"), "wb") as f:
82 f.write(b"ignore me")
83
84 store = ReseedCertificateStore(cert_dir=td)
85 assert store.get_certificate("signer1@mail.i2p") == cert1
86 assert store.get_certificate("signer2@mail.i2p") == cert2
87 assert store.get_certificate("readme") is None
88
89 def test_load_from_nonexistent_directory(self):
90 from i2p_netdb.reseed import ReseedCertificateStore
91 # Should not raise, just have an empty store
92 store = ReseedCertificateStore(cert_dir="/nonexistent/path/xyz")
93 assert store.get_certificate("anything") is None
94
95
96class TestReseedClientConstruction:
97 def test_default_urls(self):
98 from i2p_netdb.reseed import ReseedClient, DEFAULT_RESEED_URLS
99 client = ReseedClient()
100 assert len(client._urls) == len(DEFAULT_RESEED_URLS)
101
102 def test_custom_urls(self):
103 from i2p_netdb.reseed import ReseedClient
104 urls = ["https://a.example.com", "https://b.example.com"]
105 client = ReseedClient(reseed_urls=urls)
106 assert client._urls == urls
107
108 def test_default_reseed_urls_count(self):
109 from i2p_netdb.reseed import DEFAULT_RESEED_URLS
110 assert len(DEFAULT_RESEED_URLS) == 13
111
112 def test_default_parameters(self):
113 from i2p_netdb.reseed import ReseedClient
114 client = ReseedClient()
115 assert client._target_count == 100
116 assert client._min_servers == 2
117 assert client._timeout == 420
118 assert client._max_ri_size == 4096
119
120
121class TestFetchSU3:
122 def test_fetch_su3_success(self):
123 from i2p_netdb.reseed import ReseedClient
124
125 fake_su3 = _build_su3_bytes([b"routerinfo_data_1"])
126 client = ReseedClient(reseed_urls=["https://reseed.example.com"])
127
128 mock_response = MagicMock()
129 mock_response.read.return_value = fake_su3
130 mock_response.status = 200
131 mock_response.__enter__ = MagicMock(return_value=mock_response)
132 mock_response.__exit__ = MagicMock(return_value=False)
133
134 with patch("urllib.request.urlopen", return_value=mock_response) as mock_open:
135 result = client._fetch_su3("https://reseed.example.com")
136 assert result == fake_su3
137 # Check URL has the correct path
138 req_arg = mock_open.call_args[0][0]
139 assert "/i2pseeds.su3" in req_arg.full_url
140 assert "netid=2" in req_arg.full_url
141
142 def test_fetch_su3_404(self):
143 from i2p_netdb.reseed import ReseedClient
144 import urllib.error
145
146 client = ReseedClient(reseed_urls=["https://reseed.example.com"])
147
148 with patch("urllib.request.urlopen",
149 side_effect=urllib.error.HTTPError(
150 "https://reseed.example.com", 404, "Not Found", {}, None)):
151 with pytest.raises(urllib.error.HTTPError):
152 client._fetch_su3("https://reseed.example.com")
153
154 def test_fetch_su3_timeout(self):
155 from i2p_netdb.reseed import ReseedClient
156 import urllib.error
157
158 client = ReseedClient(reseed_urls=["https://reseed.example.com"], timeout=5)
159
160 with patch("urllib.request.urlopen",
161 side_effect=urllib.error.URLError("timed out")):
162 with pytest.raises(urllib.error.URLError):
163 client._fetch_su3("https://reseed.example.com")
164
165
166class TestExtractRouterInfos:
167 def test_extract_valid_ris(self):
168 from i2p_netdb.reseed import ReseedClient
169
170 ri1 = os.urandom(500)
171 ri2 = os.urandom(300)
172 su3_data = _build_su3_bytes([ri1, ri2])
173
174 client = ReseedClient()
175 result = client._extract_router_infos(su3_data)
176 assert len(result) == 2
177 assert ri1 in result
178 assert ri2 in result
179
180 def test_extract_filters_oversized_ris(self):
181 from i2p_netdb.reseed import ReseedClient
182
183 small_ri = os.urandom(100)
184 large_ri = os.urandom(5000) # > default max_ri_size (4096)
185 su3_data = _build_su3_bytes([small_ri, large_ri])
186
187 client = ReseedClient(max_ri_size=4096)
188 result = client._extract_router_infos(su3_data)
189 assert len(result) == 1
190 assert result[0] == small_ri
191
192 def test_extract_invalid_su3_data(self):
193 from i2p_netdb.reseed import ReseedClient
194
195 client = ReseedClient()
196 with pytest.raises(ValueError):
197 client._extract_router_infos(b"this is not valid su3 data")
198
199 def test_extract_non_routerinfo_files_ignored(self):
200 """ZIP files that don't match routerInfo-*.dat should be ignored."""
201 from i2p_netdb.reseed import ReseedClient
202
203 # Build SU3 with a mix of routerInfo and non-routerInfo files
204 ri_data = os.urandom(200)
205 zip_buf = io.BytesIO()
206 with zipfile.ZipFile(zip_buf, "w") as zf:
207 zf.writestr("routerInfo-0001.dat", ri_data)
208 zf.writestr("other-file.txt", b"not a router info")
209 zf.writestr("metadata.json", b"{}")
210 zip_data = zip_buf.getvalue()
211
212 # Build SU3 manually with this ZIP
213 version = b"1.0"
214 signer = b"test@example.com"
215 sig_length = 256
216 header = bytearray(40)
217 header[0:6] = b"I2Psu3"
218 header[7] = 0
219 struct.pack_into("!H", header, 8, 0x0006)
220 struct.pack_into("!H", header, 10, sig_length)
221 header[13] = len(version)
222 header[15] = len(signer)
223 struct.pack_into("!Q", header, 16, len(zip_data))
224 header[25] = 0 # TYPE_ZIP
225 header[27] = 3 # CONTENT_RESEED
226
227 su3_data = bytes(header) + version + signer + zip_data + (b"\x00" * sig_length)
228
229 client = ReseedClient()
230 result = client._extract_router_infos(su3_data)
231 assert len(result) == 1
232 assert result[0] == ri_data
233
234
235class TestReseedFallback:
236 def test_tries_next_server_on_failure(self):
237 """When first server fails, client should try the next one."""
238 from i2p_netdb.reseed import ReseedClient
239 import urllib.error
240
241 ri_data = os.urandom(200)
242 su3_data = _build_su3_bytes([ri_data])
243
244 client = ReseedClient(
245 reseed_urls=["https://fail.example.com", "https://good.example.com"],
246 target_count=1,
247 min_servers=1,
248 )
249
250 mock_response = MagicMock()
251 mock_response.read.return_value = su3_data
252 mock_response.status = 200
253 mock_response.__enter__ = MagicMock(return_value=mock_response)
254 mock_response.__exit__ = MagicMock(return_value=False)
255
256 def side_effect(req, **kwargs):
257 url = req.full_url if hasattr(req, 'full_url') else str(req)
258 if "fail.example.com" in url:
259 raise urllib.error.URLError("connection refused")
260 return mock_response
261
262 with patch("urllib.request.urlopen", side_effect=side_effect):
263 with patch("random.shuffle"): # Don't shuffle, keep order deterministic
264 result = asyncio.run(client.reseed())
265 assert len(result) >= 1
266 assert ri_data in result
267
268 def test_all_servers_fail_returns_empty(self):
269 from i2p_netdb.reseed import ReseedClient
270 import urllib.error
271
272 client = ReseedClient(
273 reseed_urls=["https://a.fail", "https://b.fail"],
274 target_count=1,
275 min_servers=1,
276 )
277
278 with patch("urllib.request.urlopen",
279 side_effect=urllib.error.URLError("refused")):
280 with patch("random.shuffle"):
281 result = asyncio.run(client.reseed())
282 assert result == []
283
284
285class TestMinimumPeersThreshold:
286 def test_stops_after_target_count(self):
287 """Should stop collecting once target_count RIs are gathered."""
288 from i2p_netdb.reseed import ReseedClient
289
290 # Each server yields 60 RIs; target is 100, min_servers=1
291 # After two servers we have 120 >= 100, should stop
292 ris_server1 = [os.urandom(100) for _ in range(60)]
293 ris_server2 = [os.urandom(100) for _ in range(60)]
294 ris_server3 = [os.urandom(100) for _ in range(60)]
295
296 su3_1 = _build_su3_bytes(ris_server1)
297 su3_2 = _build_su3_bytes(ris_server2)
298 su3_3 = _build_su3_bytes(ris_server3)
299
300 client = ReseedClient(
301 reseed_urls=[
302 "https://s1.example.com",
303 "https://s2.example.com",
304 "https://s3.example.com",
305 ],
306 target_count=100,
307 min_servers=1,
308 )
309
310 call_count = 0
311 su3_list = [su3_1, su3_2, su3_3]
312
313 def side_effect(req, **kwargs):
314 nonlocal call_count
315 idx = call_count
316 call_count += 1
317 mock_resp = MagicMock()
318 mock_resp.read.return_value = su3_list[idx]
319 mock_resp.status = 200
320 mock_resp.__enter__ = MagicMock(return_value=mock_resp)
321 mock_resp.__exit__ = MagicMock(return_value=False)
322 return mock_resp
323
324 with patch("urllib.request.urlopen", side_effect=side_effect):
325 with patch("random.shuffle"):
326 result = asyncio.run(client.reseed())
327 # Should have 120 RIs (from 2 servers) and not hit the 3rd
328 assert len(result) == 120
329 assert call_count == 2
330
331 def test_min_servers_enforced(self):
332 """Must contact at least min_servers even if target_count reached early."""
333 from i2p_netdb.reseed import ReseedClient
334
335 # 150 RIs from server1 (already > target of 100), but min_servers=2
336 ris_server1 = [os.urandom(100) for _ in range(150)]
337 ris_server2 = [os.urandom(100) for _ in range(10)]
338
339 su3_1 = _build_su3_bytes(ris_server1)
340 su3_2 = _build_su3_bytes(ris_server2)
341
342 client = ReseedClient(
343 reseed_urls=["https://s1.example.com", "https://s2.example.com"],
344 target_count=100,
345 min_servers=2,
346 )
347
348 call_count = 0
349 su3_list = [su3_1, su3_2]
350
351 def side_effect(req, **kwargs):
352 nonlocal call_count
353 idx = call_count
354 call_count += 1
355 mock_resp = MagicMock()
356 mock_resp.read.return_value = su3_list[idx]
357 mock_resp.status = 200
358 mock_resp.__enter__ = MagicMock(return_value=mock_resp)
359 mock_resp.__exit__ = MagicMock(return_value=False)
360 return mock_resp
361
362 with patch("urllib.request.urlopen", side_effect=side_effect):
363 with patch("random.shuffle"):
364 result = asyncio.run(client.reseed())
365 assert call_count == 2
366 assert len(result) == 160
367
368
369class TestDefaultReseedURLs:
370 def test_all_urls_are_https(self):
371 from i2p_netdb.reseed import DEFAULT_RESEED_URLS
372 for url in DEFAULT_RESEED_URLS:
373 assert url.startswith("https://"), f"URL not HTTPS: {url}"
374
375 def test_urls_are_unique(self):
376 from i2p_netdb.reseed import DEFAULT_RESEED_URLS
377 assert len(DEFAULT_RESEED_URLS) == len(set(DEFAULT_RESEED_URLS))
378
379 def test_known_servers_present(self):
380 from i2p_netdb.reseed import DEFAULT_RESEED_URLS
381 urls_joined = " ".join(DEFAULT_RESEED_URLS)
382 assert "stormycloud.org" in urls_joined
383 assert "diva.exchange" in urls_joined
384 assert "memcpy.io" in urls_joined
385
386
387# ---------------------------------------------------------------------------
388# ReseedManager tests — bootstrap lifecycle
389# ---------------------------------------------------------------------------
390
391
392class TestReseedManagerParseResponse:
393 def test_parse_reseed_response_base64_format(self):
394 """Parse newline-separated base64-encoded RouterInfo stubs."""
395 import base64
396 from i2p_netdb.reseed import ReseedManager
397
398 ri1 = os.urandom(200)
399 ri2 = os.urandom(300)
400 payload = base64.b64encode(ri1) + b"\n" + base64.b64encode(ri2) + b"\n"
401
402 mgr = ReseedManager()
403 result = mgr.parse_reseed_response(payload)
404 assert len(result) == 2
405 assert result[0] == ri1
406 assert result[1] == ri2
407
408 def test_parse_empty_response(self):
409 from i2p_netdb.reseed import ReseedManager
410
411 mgr = ReseedManager()
412 result = mgr.parse_reseed_response(b"")
413 assert result == []
414
415 def test_parse_ignores_blank_lines(self):
416 import base64
417 from i2p_netdb.reseed import ReseedManager
418
419 ri1 = os.urandom(100)
420 payload = b"\n\n" + base64.b64encode(ri1) + b"\n\n"
421
422 mgr = ReseedManager()
423 result = mgr.parse_reseed_response(payload)
424 assert len(result) == 1
425
426
427class TestReseedManagerBootstrap:
428 def test_needs_reseed_when_empty(self):
429 from i2p_netdb.reseed import ReseedManager
430
431 mgr = ReseedManager()
432 assert mgr.needs_reseed(datastore_count=0)
433
434 def test_needs_reseed_below_threshold(self):
435 from i2p_netdb.reseed import ReseedManager
436
437 mgr = ReseedManager()
438 assert mgr.needs_reseed(datastore_count=10)
439 assert mgr.needs_reseed(datastore_count=24)
440
441 def test_no_reseed_above_threshold(self):
442 from i2p_netdb.reseed import ReseedManager
443
444 mgr = ReseedManager()
445 assert not mgr.needs_reseed(datastore_count=25)
446 assert not mgr.needs_reseed(datastore_count=100)
447
448 def test_mark_reseeded_prevents_further_reseed(self):
449 from i2p_netdb.reseed import ReseedManager
450
451 mgr = ReseedManager()
452 assert mgr.needs_reseed(datastore_count=0)
453 mgr.mark_reseeded()
454 assert not mgr.needs_reseed(datastore_count=0)
455 assert mgr.is_reseeded
456
457 def test_reseed_with_no_urls_returns_empty(self):
458 from i2p_netdb.reseed import ReseedManager
459
460 mgr = ReseedManager(reseed_urls=[])
461 result = mgr.parse_reseed_response(b"")
462 assert result == []
463
464 def test_is_reseeded_initially_false(self):
465 from i2p_netdb.reseed import ReseedManager
466
467 mgr = ReseedManager()
468 assert not mgr.is_reseeded