A Python port of the Invisible Internet Project (I2P)
1"""Tests for router maintenance loops — keepalive, reconnect, broadcast, eviction."""
2
3import asyncio
4import time
5from unittest.mock import AsyncMock, MagicMock, patch
6
7import pytest
8
9from i2p_router.maintenance import (
10 keepalive_loop,
11 netdb_eviction_loop,
12 routerinfo_broadcast_loop,
13)
14
15
16class MockConnection:
17 def __init__(self, idle_seconds=100.0):
18 self._idle = idle_seconds
19 self.sent_frames = []
20 self.sent_i2np = []
21
22 def seconds_since_last_write(self):
23 return self._idle
24
25 async def send_frame(self, blocks):
26 self.sent_frames.append(blocks)
27
28 async def send_i2np(self, msg):
29 self.sent_i2np.append(msg)
30
31
32class MockPool:
33 def __init__(self, peers=None):
34 self._peers = peers or {}
35
36 def get_all_peer_hashes(self):
37 return list(self._peers.keys())
38
39 def get(self, peer_hash):
40 return self._peers.get(peer_hash)
41
42 def remove(self, peer_hash):
43 self._peers.pop(peer_hash, None)
44
45 @property
46 def active_count(self):
47 return len(self._peers)
48
49 def is_connected(self, key):
50 return key in self._peers
51
52 def add(self, key, conn):
53 self._peers[key] = conn
54
55
56class TestKeepaliveLoop:
57 @pytest.mark.asyncio
58 async def test_sends_padding_to_idle_connections(self):
59 conn = MockConnection(idle_seconds=100.0)
60 pool = MockPool({b"\x01\x02\x03\x04": conn})
61
62 task = asyncio.create_task(keepalive_loop(pool, interval=0.05))
63 await asyncio.sleep(0.15)
64 task.cancel()
65 with pytest.raises(asyncio.CancelledError):
66 await task
67
68 assert len(conn.sent_frames) > 0
69
70 @pytest.mark.asyncio
71 async def test_skips_recently_active_connections(self):
72 conn = MockConnection(idle_seconds=0.01) # very recently active
73 pool = MockPool({b"\x01\x02\x03\x04": conn})
74
75 task = asyncio.create_task(keepalive_loop(pool, interval=60.0))
76 # Use short sleep — only one iteration possible with 60s interval
77 await asyncio.sleep(0.1)
78 task.cancel()
79 with pytest.raises(asyncio.CancelledError):
80 await task
81
82 # idle (0.01) < interval (60.0), so no padding sent
83 assert len(conn.sent_frames) == 0
84
85 @pytest.mark.asyncio
86 async def test_removes_failed_connections(self):
87 conn = MockConnection(idle_seconds=100.0)
88 conn.send_frame = AsyncMock(side_effect=ConnectionError("broken"))
89 pool = MockPool({b"\x01\x02\x03\x04": conn})
90
91 task = asyncio.create_task(keepalive_loop(pool, interval=0.05))
92 await asyncio.sleep(0.15)
93 task.cancel()
94 with pytest.raises(asyncio.CancelledError):
95 await task
96
97 assert b"\x01\x02\x03\x04" not in pool._peers
98
99 @pytest.mark.asyncio
100 async def test_handles_missing_connection(self):
101 """Pool returns None for a peer hash."""
102 pool = MockPool({})
103 pool.get_all_peer_hashes = lambda: [b"\x01\x02\x03\x04"]
104
105 task = asyncio.create_task(keepalive_loop(pool, interval=0.05))
106 await asyncio.sleep(0.15)
107 task.cancel()
108 with pytest.raises(asyncio.CancelledError):
109 await task
110
111
112class TestBroadcastLoop:
113 @pytest.mark.asyncio
114 async def test_broadcasts_to_all_peers(self):
115 conn1 = MockConnection()
116 conn2 = MockConnection()
117 pool = MockPool({
118 b"\x01\x02\x03\x04": conn1,
119 b"\x05\x06\x07\x08": conn2,
120 })
121 msg = b"fake-db-store"
122
123 task = asyncio.create_task(routerinfo_broadcast_loop(pool, msg, interval=0.05))
124 await asyncio.sleep(0.15)
125 task.cancel()
126 with pytest.raises(asyncio.CancelledError):
127 await task
128
129 assert len(conn1.sent_i2np) > 0
130 assert conn1.sent_i2np[0] == b"fake-db-store"
131 assert len(conn2.sent_i2np) > 0
132
133 @pytest.mark.asyncio
134 async def test_removes_failed_broadcast_peer(self):
135 conn = MockConnection()
136 conn.send_i2np = AsyncMock(side_effect=ConnectionError)
137 pool = MockPool({b"\x01\x02\x03\x04": conn})
138
139 task = asyncio.create_task(
140 routerinfo_broadcast_loop(pool, b"msg", interval=0.05)
141 )
142 await asyncio.sleep(0.15)
143 task.cancel()
144 with pytest.raises(asyncio.CancelledError):
145 await task
146
147 assert b"\x01\x02\x03\x04" not in pool._peers
148
149
150class MockDataStoreEntry:
151 def __init__(self, received_ms, key=b"\x00" * 32):
152 self.received_ms = received_ms
153 self.key = key
154
155
156class MockDataStore:
157 def __init__(self, entries=None):
158 self._entries = entries or {}
159
160 def get_all(self):
161 return list(self._entries.values())
162
163 def get_all_keys(self):
164 return list(self._entries.keys())
165
166 def remove(self, key):
167 self._entries.pop(key, None)
168
169
170class TestEvictionLoop:
171 @pytest.mark.asyncio
172 async def test_evicts_expired_entries(self):
173 now_ms = int(time.time() * 1000)
174 old_entry = MockDataStoreEntry(received_ms=now_ms - 100_000_000) # very old
175 new_entry = MockDataStoreEntry(received_ms=now_ms)
176
177 ds = MockDataStore({
178 b"old": old_entry,
179 b"new": new_entry,
180 })
181
182 task = asyncio.create_task(
183 netdb_eviction_loop(ds, sqlite_netdb=None, interval=0.05, max_age_ms=86_400_000)
184 )
185 await asyncio.sleep(0.15)
186 task.cancel()
187 with pytest.raises(asyncio.CancelledError):
188 await task
189
190 assert b"old" not in ds._entries
191 assert b"new" in ds._entries
192
193 @pytest.mark.asyncio
194 async def test_evicts_from_sqlite_too(self):
195 ds = MockDataStore({})
196 sqlite = MagicMock()
197 sqlite.evict_expired.return_value = 5
198
199 task = asyncio.create_task(
200 netdb_eviction_loop(ds, sqlite_netdb=sqlite, interval=0.05)
201 )
202 await asyncio.sleep(0.15)
203 task.cancel()
204 with pytest.raises(asyncio.CancelledError):
205 await task
206
207 sqlite.evict_expired.assert_called()
208
209 @pytest.mark.asyncio
210 async def test_no_sqlite_no_crash(self):
211 ds = MockDataStore({})
212
213 task = asyncio.create_task(
214 netdb_eviction_loop(ds, sqlite_netdb=None, interval=0.05)
215 )
216 await asyncio.sleep(0.15)
217 task.cancel()
218 with pytest.raises(asyncio.CancelledError):
219 await task