"""HTTPS reseed client — bootstrap peer discovery. New I2P routers fetch RouterInfo bundles from HTTPS reseed servers. The response is an SU3 signed file containing a ZIP of RouterInfo files. Ported from net.i2p.router.networkdb.reseed.Reseeder. """ from __future__ import annotations import asyncio import base64 import logging import os import random import urllib.error import urllib.request from i2p_data.su3 import SU3File log = logging.getLogger(__name__) # Default I2P reseed servers DEFAULT_RESEED_URLS = [ "https://reseed.stormycloud.org", "https://reseed2.i2p.net", "https://reseed.diva.exchange", "https://i2p.novg.net", "https://reseed.onion.im", "https://reseed.memcpy.io", "https://reseed.atomike.ninja", "https://reseed2.polarpoint.io", "https://banana.incognet.io", "https://reseed-fr.i2pd.xyz", "https://reseed.i2pgit.org", "https://i2pseed.creativecowpat.net:8443", "https://reseed.i2p.vzaws.com:8443", ] # Maximum SU3 download size (1 MB) _MAX_SU3_SIZE = 1_048_576 class ReseedCertificateStore: """Store for reseed signer certificates. Loads X.509 certificates from a directory. Each .crt file is keyed by filename without extension (which matches the SU3 signer ID). """ def __init__(self, cert_dir: str | None = None) -> None: self._certs: dict[str, bytes] = {} if cert_dir: self._load_from_dir(cert_dir) def _load_from_dir(self, cert_dir: str) -> None: """Load .crt files from *cert_dir*.""" if not os.path.isdir(cert_dir): log.warning("Certificate directory does not exist: %s", cert_dir) return for fname in os.listdir(cert_dir): if not fname.endswith(".crt"): continue signer_id = fname[: -len(".crt")] path = os.path.join(cert_dir, fname) try: with open(path, "rb") as f: self._certs[signer_id] = f.read() except OSError as exc: log.warning("Failed to read certificate %s: %s", path, exc) def add_certificate(self, signer_id: str, cert_data: bytes) -> None: """Add a certificate for the given signer ID.""" self._certs[signer_id] = cert_data def get_certificate(self, signer_id: str) -> bytes | None: """Return certificate bytes or None if not found.""" return self._certs.get(signer_id) class ReseedClient: """HTTPS reseed client for bootstrapping the NetDB. Fetches SU3 files from well-known reseed servers, parses them, and extracts RouterInfo entries for initial peer discovery. """ def __init__( self, reseed_urls: list[str] | None = None, cert_store: ReseedCertificateStore | None = None, max_ri_size: int = 4096, target_count: int = 100, min_servers: int = 2, timeout: int = 420, ) -> None: self._urls = reseed_urls or list(DEFAULT_RESEED_URLS) self._cert_store = cert_store or ReseedCertificateStore() self._max_ri_size = max_ri_size self._target_count = target_count self._min_servers = min_servers self._timeout = timeout async def reseed(self) -> list[bytes]: """Fetch RouterInfos from reseed servers. Tries servers in random order. Downloads SU3 files, parses them, extracts and validates RouterInfos. Returns a list of raw RouterInfo bytes. """ urls = list(self._urls) random.shuffle(urls) collected: list[bytes] = [] servers_used = 0 loop = asyncio.get_running_loop() for url in urls: try: su3_data = await loop.run_in_executor(None, self._fetch_su3, url) ris = self._extract_router_infos(su3_data) collected.extend(ris) servers_used += 1 log.info( "Reseed from %s: got %d RIs (total %d)", url, len(ris), len(collected), ) except Exception: log.warning("Reseed from %s failed", url, exc_info=True) continue # Stop when we have enough from enough servers if len(collected) >= self._target_count and servers_used >= self._min_servers: break log.info( "Reseed complete: %d RouterInfos from %d servers", len(collected), servers_used, ) return collected def _fetch_su3(self, base_url: str) -> bytes: """Fetch an SU3 file from a single reseed server. Uses urllib.request (not httpx, which has timeout bugs with long-running requests). Raises urllib.error.HTTPError or urllib.error.URLError on failure. """ url = base_url.rstrip("/") + "/i2pseeds.su3?netid=2" req = urllib.request.Request( url, headers={"User-Agent": "Wget/1.11.4"}, ) with urllib.request.urlopen(req, timeout=self._timeout) as resp: data = resp.read(_MAX_SU3_SIZE) return data def _extract_router_infos(self, su3_data: bytes) -> list[bytes]: """Parse SU3 and extract RouterInfo bytes from the embedded ZIP. Filters out entries larger than *max_ri_size*. Raises ValueError if *su3_data* is not a valid SU3 reseed file. """ su3 = SU3File.from_bytes(su3_data) raw_ris = su3.extract_routerinfos() result: list[bytes] = [] for ri_bytes in raw_ris: if len(ri_bytes) > self._max_ri_size: log.debug( "Skipping oversized RouterInfo (%d bytes > %d)", len(ri_bytes), self._max_ri_size, ) continue result.append(ri_bytes) return result class ReseedManager: """Bootstraps the router with initial RouterInfos from reseed servers. Provides a simple interface for determining whether a reseed is needed, parsing a simple base64-encoded reseed response format, and tracking reseed state. """ _MIN_DATASTORE_COUNT = 25 def __init__(self, reseed_urls: list[str] | None = None) -> None: self._urls = reseed_urls or [] self._reseeded = False def needs_reseed(self, datastore_count: int) -> bool: """Returns True if the datastore has too few entries and we have not already reseeded during this session.""" return datastore_count < self._MIN_DATASTORE_COUNT and not self._reseeded def parse_reseed_response(self, data: bytes) -> list[bytes]: """Parse a reseed response in simple format. Accepts newline-separated base64-encoded RouterInfo blobs. Returns a list of dicts with a ``raw`` key containing the decoded bytes for each entry. Real SU3 parsing is handled by :class:`ReseedClient`; this method supports a simplified format for testing and lightweight bootstrap scenarios. """ if not data: return [] results: list[bytes] = [] for line in data.split(b"\n"): line = line.strip() if not line: continue try: decoded = base64.b64decode(line) results.append(decoded) except Exception: log.debug("Skipping invalid base64 line in reseed response") continue return results def mark_reseeded(self) -> None: """Mark that a reseed has been performed this session.""" self._reseeded = True @property def is_reseeded(self) -> bool: """Whether the router has already been reseeded.""" return self._reseeded