"""Tests for RouterBootstrap — bootstrap orchestrator and main event loop. TDD: tests written before implementation. """ import asyncio import hashlib import os import tempfile import unittest from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock from i2p_router.config import RouterConfig class TestRouterBootstrapInit(unittest.TestCase): """Test RouterBootstrap initialization and configuration.""" def test_create_with_defaults(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=tempfile.mkdtemp(), ) bootstrap = RouterBootstrap(config) assert bootstrap.config is config assert bootstrap.state == "stopped" assert bootstrap.peer_count == 0 def test_create_with_custom_config(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="0.0.0.0", listen_port=12345, data_dir=tempfile.mkdtemp(), bandwidth_limit_kbps=50, ) bootstrap = RouterBootstrap(config) assert bootstrap.config.listen_port == 12345 assert bootstrap.config.bandwidth_limit_kbps == 50 class TestIdentityManagement(unittest.TestCase): """Test identity load/generate during bootstrap.""" def test_generates_identity_if_none_exists(self): from i2p_router.bootstrap import RouterBootstrap data_dir = tempfile.mkdtemp() config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=data_dir, ) bootstrap = RouterBootstrap(config) def run(): return asyncio.run(bootstrap._load_or_generate_identity()) run() assert bootstrap._key_bundle is not None assert len(bootstrap._key_bundle.signing_public) == 32 assert len(bootstrap._key_bundle.ntcp2_public) == 32 # Keys should be persisted key_path = os.path.join(data_dir, "router.keys.json") assert os.path.exists(key_path) def test_loads_existing_identity(self): from i2p_router.bootstrap import RouterBootstrap from i2p_router.identity import RouterKeyBundle data_dir = tempfile.mkdtemp() # Pre-generate and save keys bundle = RouterKeyBundle.generate() key_path = os.path.join(data_dir, "router.keys.json") bundle.save(key_path) config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=data_dir, ) bootstrap = RouterBootstrap(config) asyncio.run(bootstrap._load_or_generate_identity()) # Should load the same keys assert bootstrap._key_bundle.signing_public == bundle.signing_public assert bootstrap._key_bundle.ntcp2_public == bundle.ntcp2_public class TestReseedIntegration(unittest.TestCase): """Test reseed triggering during bootstrap.""" def test_reseed_triggered_when_netdb_empty(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=tempfile.mkdtemp(), ) bootstrap = RouterBootstrap(config) mock_reseed = AsyncMock(return_value=[os.urandom(256) for _ in range(5)]) async def run(): await bootstrap._load_or_generate_identity() with patch.object(bootstrap._reseed_client, "reseed", mock_reseed): await bootstrap._reseed_if_needed() asyncio.run(run()) # The reseed client was called (even though random bytes aren't valid RI) mock_reseed.assert_called_once() def test_reseed_skipped_when_enough_peers(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=tempfile.mkdtemp(), ) bootstrap = RouterBootstrap(config) async def run(): await bootstrap._load_or_generate_identity() # Simulate having enough entries in the datastore for i in range(60): key = hashlib.sha256(f"peer{i}".encode()).digest() bootstrap._context.datastore.put( MagicMock(key=key, data=os.urandom(100)) ) return await bootstrap._reseed_if_needed() result = asyncio.run(run()) assert result == 0 # No reseed needed class TestPeerConnectionFlow(unittest.TestCase): """Test peer connection logic.""" def test_connection_pool_tracks_peers(self): from i2p_router.peer_connector import ConnectionPool pool = ConnectionPool(max_connections=3) h1 = os.urandom(32) h2 = os.urandom(32) assert pool.add(h1, MagicMock()) is True assert pool.add(h2, MagicMock()) is True assert pool.active_count == 2 assert pool.is_connected(h1) assert pool.is_connected(h2) def test_connection_pool_rejects_over_max(self): from i2p_router.peer_connector import ConnectionPool pool = ConnectionPool(max_connections=1) h1 = os.urandom(32) h2 = os.urandom(32) assert pool.add(h1, MagicMock()) is True assert pool.add(h2, MagicMock()) is False assert pool.active_count == 1 class TestBootstrapGetStatus(unittest.TestCase): """Test status reporting during bootstrap.""" def test_status_before_start(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=tempfile.mkdtemp(), ) bootstrap = RouterBootstrap(config) status = bootstrap.get_status() assert status["state"] == "stopped" assert status["peer_count"] == 0 assert status["netdb_routerinfos"] == 0 def test_status_after_identity_load(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=tempfile.mkdtemp(), ) bootstrap = RouterBootstrap(config) asyncio.run(bootstrap._load_or_generate_identity()) status = bootstrap.get_status() assert "router_hash" in status assert len(status["router_hash"]) > 0 class TestBootstrapShutdown(unittest.TestCase): """Test graceful shutdown.""" def test_shutdown_from_stopped(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=tempfile.mkdtemp(), ) bootstrap = RouterBootstrap(config) # Should not raise asyncio.run(bootstrap.shutdown()) assert bootstrap.state == "stopped" def test_shutdown_closes_connections(self): from i2p_router.bootstrap import RouterBootstrap config = RouterConfig( listen_host="127.0.0.1", listen_port=0, data_dir=tempfile.mkdtemp(), ) bootstrap = RouterBootstrap(config) # Simulate that bootstrap is running so shutdown() doesn't bail bootstrap._state = "running" # Add mock connections to pool mock_conn = AsyncMock() mock_conn.close = AsyncMock() mock_conn.is_alive = MagicMock(return_value=True) h1 = os.urandom(32) bootstrap._conn_pool.add(h1, mock_conn) asyncio.run(bootstrap.shutdown()) assert bootstrap._conn_pool.active_count == 0 assert bootstrap.state == "stopped" mock_conn.close.assert_called_once() class TestMessageReader(unittest.TestCase): """Test reading I2NP messages from NTCP2 connections.""" def test_message_reader_dispatches_to_context(self): from i2p_router.bootstrap import MessageReader ctx = MagicMock() reader = MessageReader(ctx) # Simulate an I2NP block with type=1 (DatabaseStore) # Short header: type(1) + msg_id(4) + expiration(4) + size(2) + payload import struct payload = os.urandom(32) i2np_data = struct.pack("!BIiH", 1, 12345, 0, len(payload)) + payload reader.handle_i2np_block(i2np_data) ctx.process_inbound.assert_called_once_with(1, payload) class TestDatabaseStoreExchange(unittest.TestCase): """Test RouterInfo exchange via DatabaseStore messages.""" def test_build_database_store_message(self): from i2p_router.bootstrap import build_database_store_i2np router_hash = os.urandom(32) ri_bytes = os.urandom(256) msg = build_database_store_i2np(router_hash, ri_bytes) # Should be I2NP short header + DatabaseStore payload import struct msg_type = struct.unpack("!B", msg[:1])[0] assert msg_type == 1 # DATABASE_STORE if __name__ == "__main__": unittest.main()