"""Tests for router maintenance loops — keepalive, reconnect, broadcast, eviction.""" import asyncio import time from unittest.mock import AsyncMock, MagicMock, patch import pytest from i2p_router.maintenance import ( keepalive_loop, netdb_eviction_loop, routerinfo_broadcast_loop, ) class MockConnection: def __init__(self, idle_seconds=100.0): self._idle = idle_seconds self.sent_frames = [] self.sent_i2np = [] def seconds_since_last_write(self): return self._idle async def send_frame(self, blocks): self.sent_frames.append(blocks) async def send_i2np(self, msg): self.sent_i2np.append(msg) class MockPool: def __init__(self, peers=None): self._peers = peers or {} def get_all_peer_hashes(self): return list(self._peers.keys()) def get(self, peer_hash): return self._peers.get(peer_hash) def remove(self, peer_hash): self._peers.pop(peer_hash, None) @property def active_count(self): return len(self._peers) def is_connected(self, key): return key in self._peers def add(self, key, conn): self._peers[key] = conn class TestKeepaliveLoop: @pytest.mark.asyncio async def test_sends_padding_to_idle_connections(self): conn = MockConnection(idle_seconds=100.0) pool = MockPool({b"\x01\x02\x03\x04": conn}) task = asyncio.create_task(keepalive_loop(pool, interval=0.05)) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task assert len(conn.sent_frames) > 0 @pytest.mark.asyncio async def test_skips_recently_active_connections(self): conn = MockConnection(idle_seconds=0.01) # very recently active pool = MockPool({b"\x01\x02\x03\x04": conn}) task = asyncio.create_task(keepalive_loop(pool, interval=60.0)) # Use short sleep — only one iteration possible with 60s interval await asyncio.sleep(0.1) task.cancel() with pytest.raises(asyncio.CancelledError): await task # idle (0.01) < interval (60.0), so no padding sent assert len(conn.sent_frames) == 0 @pytest.mark.asyncio async def test_removes_failed_connections(self): conn = MockConnection(idle_seconds=100.0) conn.send_frame = AsyncMock(side_effect=ConnectionError("broken")) pool = MockPool({b"\x01\x02\x03\x04": conn}) task = asyncio.create_task(keepalive_loop(pool, interval=0.05)) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task assert b"\x01\x02\x03\x04" not in pool._peers @pytest.mark.asyncio async def test_handles_missing_connection(self): """Pool returns None for a peer hash.""" pool = MockPool({}) pool.get_all_peer_hashes = lambda: [b"\x01\x02\x03\x04"] task = asyncio.create_task(keepalive_loop(pool, interval=0.05)) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task class TestBroadcastLoop: @pytest.mark.asyncio async def test_broadcasts_to_all_peers(self): conn1 = MockConnection() conn2 = MockConnection() pool = MockPool({ b"\x01\x02\x03\x04": conn1, b"\x05\x06\x07\x08": conn2, }) msg = b"fake-db-store" task = asyncio.create_task(routerinfo_broadcast_loop(pool, msg, interval=0.05)) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task assert len(conn1.sent_i2np) > 0 assert conn1.sent_i2np[0] == b"fake-db-store" assert len(conn2.sent_i2np) > 0 @pytest.mark.asyncio async def test_removes_failed_broadcast_peer(self): conn = MockConnection() conn.send_i2np = AsyncMock(side_effect=ConnectionError) pool = MockPool({b"\x01\x02\x03\x04": conn}) task = asyncio.create_task( routerinfo_broadcast_loop(pool, b"msg", interval=0.05) ) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task assert b"\x01\x02\x03\x04" not in pool._peers class MockDataStoreEntry: def __init__(self, received_ms, key=b"\x00" * 32): self.received_ms = received_ms self.key = key class MockDataStore: def __init__(self, entries=None): self._entries = entries or {} def get_all(self): return list(self._entries.values()) def get_all_keys(self): return list(self._entries.keys()) def remove(self, key): self._entries.pop(key, None) class TestEvictionLoop: @pytest.mark.asyncio async def test_evicts_expired_entries(self): now_ms = int(time.time() * 1000) old_entry = MockDataStoreEntry(received_ms=now_ms - 100_000_000) # very old new_entry = MockDataStoreEntry(received_ms=now_ms) ds = MockDataStore({ b"old": old_entry, b"new": new_entry, }) task = asyncio.create_task( netdb_eviction_loop(ds, sqlite_netdb=None, interval=0.05, max_age_ms=86_400_000) ) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task assert b"old" not in ds._entries assert b"new" in ds._entries @pytest.mark.asyncio async def test_evicts_from_sqlite_too(self): ds = MockDataStore({}) sqlite = MagicMock() sqlite.evict_expired.return_value = 5 task = asyncio.create_task( netdb_eviction_loop(ds, sqlite_netdb=sqlite, interval=0.05) ) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task sqlite.evict_expired.assert_called() @pytest.mark.asyncio async def test_no_sqlite_no_crash(self): ds = MockDataStore({}) task = asyncio.create_task( netdb_eviction_loop(ds, sqlite_netdb=None, interval=0.05) ) await asyncio.sleep(0.15) task.cancel() with pytest.raises(asyncio.CancelledError): await task