A Python port of the Invisible Internet Project (I2P)
1"""Additional handler tests for coverage — DEST, NAMING, PING, DATAGRAM, RAW, edge cases."""
2
3import asyncio
4
5import pytest
6
7from i2p_sam.bridge import SAMBridge
8
9
10@pytest.fixture
11async def bridge():
12 b = SAMBridge(host="127.0.0.1", port=0)
13 await b.start()
14 yield b
15 for task in list(b._handler_tasks):
16 task.cancel()
17 if b._handler_tasks:
18 await asyncio.gather(*b._handler_tasks, return_exceptions=True)
19 await b.stop()
20
21
22async def _hello(bridge):
23 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port)
24 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n")
25 await writer.drain()
26 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
27 assert b"RESULT=OK" in resp
28 return reader, writer
29
30
31async def _create_session(bridge, session_id="test", style="STREAM"):
32 reader, writer = await _hello(bridge)
33 writer.write(f"SESSION CREATE ID={session_id} STYLE={style} DESTINATION=TRANSIENT\n".encode())
34 await writer.drain()
35 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
36 assert b"RESULT=OK" in resp
37 return reader, writer
38
39
40class TestPing:
41 @pytest.mark.asyncio
42 async def test_ping_pong(self, bridge):
43 reader, writer = await _hello(bridge)
44 writer.write(b"PING test123\n")
45 await writer.drain()
46 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
47 assert b"PONG" in resp
48 assert b"test123" in resp
49 writer.close()
50 await writer.wait_closed()
51
52 @pytest.mark.asyncio
53 async def test_ping_empty(self, bridge):
54 reader, writer = await _hello(bridge)
55 writer.write(b"PING \n")
56 await writer.drain()
57 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
58 assert b"PONG" in resp
59 writer.close()
60 await writer.wait_closed()
61
62
63class TestDestGenerate:
64 @pytest.mark.asyncio
65 async def test_dest_generate(self, bridge):
66 reader, writer = await _hello(bridge)
67 writer.write(b"DEST GENERATE\n")
68 await writer.drain()
69 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
70 assert b"DEST REPLY" in resp
71 assert b"TRANSIENT" in resp
72 writer.close()
73 await writer.wait_closed()
74
75
76class TestDestLookup:
77 @pytest.mark.asyncio
78 async def test_lookup_me(self, bridge):
79 reader, writer = await _create_session(bridge, "lookuptest")
80 writer.write(b"DEST LOOKUP NAME=ME\n")
81 await writer.drain()
82 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
83 assert b"DEST REPLY" in resp
84 writer.close()
85 await writer.wait_closed()
86
87 @pytest.mark.asyncio
88 async def test_lookup_unknown(self, bridge):
89 reader, writer = await _create_session(bridge, "lookuptest2")
90 writer.write(b"DEST LOOKUP NAME=unknown.i2p\n")
91 await writer.drain()
92 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
93 assert b"KEY_NOT_FOUND" in resp
94 writer.close()
95 await writer.wait_closed()
96
97 @pytest.mark.asyncio
98 async def test_lookup_empty_name(self, bridge):
99 reader, writer = await _create_session(bridge, "lookuptest3")
100 writer.write(b"DEST LOOKUP NAME=\n")
101 await writer.drain()
102 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
103 assert b"KEY_NOT_FOUND" in resp
104 writer.close()
105 await writer.wait_closed()
106
107
108class TestNamingLookup:
109 @pytest.mark.asyncio
110 async def test_naming_me(self, bridge):
111 reader, writer = await _create_session(bridge, "namingtest")
112 writer.write(b"NAMING LOOKUP NAME=ME\n")
113 await writer.drain()
114 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
115 assert b"NAMING REPLY" in resp
116 writer.close()
117 await writer.wait_closed()
118
119 @pytest.mark.asyncio
120 async def test_naming_unknown(self, bridge):
121 reader, writer = await _create_session(bridge, "namingtest2")
122 writer.write(b"NAMING LOOKUP NAME=unknown.i2p\n")
123 await writer.drain()
124 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
125 # KEY_NOT_FOUND for unknown names
126 text = resp.decode()
127 assert "NAMING" in text
128 writer.close()
129 await writer.wait_closed()
130
131 @pytest.mark.asyncio
132 async def test_naming_empty(self, bridge):
133 reader, writer = await _create_session(bridge, "namingtest3")
134 writer.write(b"NAMING LOOKUP NAME=\n")
135 await writer.drain()
136 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
137 writer.close()
138 await writer.wait_closed()
139
140
141class TestSessionEdgeCases:
142 @pytest.mark.asyncio
143 async def test_session_no_id(self, bridge):
144 reader, writer = await _hello(bridge)
145 writer.write(b"SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT\n")
146 await writer.drain()
147 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
148 assert b"I2P_ERROR" in resp or b"RESULT=" in resp
149 writer.close()
150 await writer.wait_closed()
151
152 @pytest.mark.asyncio
153 async def test_session_unknown_opcode(self, bridge):
154 reader, writer = await _hello(bridge)
155 writer.write(b"SESSION BOGUS ID=test STYLE=STREAM\n")
156 await writer.drain()
157 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
158 assert b"I2P_ERROR" in resp or b"Unknown" in resp
159 writer.close()
160 await writer.wait_closed()
161
162 @pytest.mark.asyncio
163 async def test_datagram_session_create(self, bridge):
164 reader, writer = await _hello(bridge)
165 writer.write(b"SESSION CREATE ID=dgram STYLE=DATAGRAM DESTINATION=TRANSIENT PORT=0\n")
166 await writer.drain()
167 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
168 assert b"RESULT=OK" in resp
169 writer.close()
170 await writer.wait_closed()
171
172 @pytest.mark.asyncio
173 async def test_raw_session_create(self, bridge):
174 reader, writer = await _hello(bridge)
175 writer.write(b"SESSION CREATE ID=rawtest STYLE=RAW DESTINATION=TRANSIENT PROTOCOL=18\n")
176 await writer.drain()
177 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
178 assert b"RESULT=OK" in resp
179 writer.close()
180 await writer.wait_closed()
181
182 @pytest.mark.asyncio
183 async def test_primary_session_create(self, bridge):
184 reader, writer = await _hello(bridge)
185 writer.write(b"SESSION CREATE ID=primary STYLE=PRIMARY DESTINATION=TRANSIENT\n")
186 await writer.drain()
187 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
188 assert b"RESULT=OK" in resp
189 writer.close()
190 await writer.wait_closed()
191
192
193class TestStreamEdgeCases:
194 @pytest.mark.asyncio
195 async def test_stream_without_session(self, bridge):
196 reader, writer = await _hello(bridge)
197 writer.write(b"STREAM CONNECT ID=nosession DESTINATION=AAAA\n")
198 await writer.drain()
199 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
200 assert b"I2P_ERROR" in resp
201 writer.close()
202 await writer.wait_closed()
203
204 @pytest.mark.asyncio
205 async def test_stream_unknown_opcode(self, bridge):
206 reader, writer = await _create_session(bridge, "streamtest")
207 writer.write(b"STREAM BOGUS ID=streamtest\n")
208 await writer.drain()
209 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
210 assert b"I2P_ERROR" in resp or b"Unknown" in resp
211 writer.close()
212 await writer.wait_closed()
213
214
215class TestHelloEdgeCases:
216 @pytest.mark.asyncio
217 async def test_hello_wrong_verb(self, bridge):
218 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port)
219 writer.write(b"NOTAHELLO VERSION MIN=3.0 MAX=3.3\n")
220 await writer.drain()
221 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
222 assert b"NOVERSION" in resp or b"RESULT=" in resp
223 writer.close()
224 await writer.wait_closed()
225
226 @pytest.mark.asyncio
227 async def test_hello_empty_line(self, bridge):
228 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port)
229 writer.write(b"\n")
230 await writer.drain()
231 # Connection should close
232 data = await asyncio.wait_for(reader.read(1024), timeout=5.0)
233 writer.close()
234 await writer.wait_closed()
235
236
237class TestUnknownVerb:
238 @pytest.mark.asyncio
239 async def test_unknown_command(self, bridge):
240 reader, writer = await _hello(bridge)
241 writer.write(b"FOOBAR SOMETHING\n")
242 await writer.drain()
243 await asyncio.sleep(0.2)
244 # Should not crash, connection stays open
245 assert not reader.at_eof()
246 writer.close()
247 await writer.wait_closed()