A Python port of the Invisible Internet Project (I2P)
at main 247 lines 8.9 kB view raw
1"""Additional handler tests for coverage — DEST, NAMING, PING, DATAGRAM, RAW, edge cases.""" 2 3import asyncio 4 5import pytest 6 7from i2p_sam.bridge import SAMBridge 8 9 10@pytest.fixture 11async def bridge(): 12 b = SAMBridge(host="127.0.0.1", port=0) 13 await b.start() 14 yield b 15 for task in list(b._handler_tasks): 16 task.cancel() 17 if b._handler_tasks: 18 await asyncio.gather(*b._handler_tasks, return_exceptions=True) 19 await b.stop() 20 21 22async def _hello(bridge): 23 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) 24 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") 25 await writer.drain() 26 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 27 assert b"RESULT=OK" in resp 28 return reader, writer 29 30 31async def _create_session(bridge, session_id="test", style="STREAM"): 32 reader, writer = await _hello(bridge) 33 writer.write(f"SESSION CREATE ID={session_id} STYLE={style} DESTINATION=TRANSIENT\n".encode()) 34 await writer.drain() 35 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 36 assert b"RESULT=OK" in resp 37 return reader, writer 38 39 40class TestPing: 41 @pytest.mark.asyncio 42 async def test_ping_pong(self, bridge): 43 reader, writer = await _hello(bridge) 44 writer.write(b"PING test123\n") 45 await writer.drain() 46 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 47 assert b"PONG" in resp 48 assert b"test123" in resp 49 writer.close() 50 await writer.wait_closed() 51 52 @pytest.mark.asyncio 53 async def test_ping_empty(self, bridge): 54 reader, writer = await _hello(bridge) 55 writer.write(b"PING \n") 56 await writer.drain() 57 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 58 assert b"PONG" in resp 59 writer.close() 60 await writer.wait_closed() 61 62 63class TestDestGenerate: 64 @pytest.mark.asyncio 65 async def test_dest_generate(self, bridge): 66 reader, writer = await _hello(bridge) 67 writer.write(b"DEST GENERATE\n") 68 await writer.drain() 69 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 70 assert b"DEST REPLY" in resp 71 assert b"TRANSIENT" in resp 72 writer.close() 73 await writer.wait_closed() 74 75 76class TestDestLookup: 77 @pytest.mark.asyncio 78 async def test_lookup_me(self, bridge): 79 reader, writer = await _create_session(bridge, "lookuptest") 80 writer.write(b"DEST LOOKUP NAME=ME\n") 81 await writer.drain() 82 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 83 assert b"DEST REPLY" in resp 84 writer.close() 85 await writer.wait_closed() 86 87 @pytest.mark.asyncio 88 async def test_lookup_unknown(self, bridge): 89 reader, writer = await _create_session(bridge, "lookuptest2") 90 writer.write(b"DEST LOOKUP NAME=unknown.i2p\n") 91 await writer.drain() 92 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 93 assert b"KEY_NOT_FOUND" in resp 94 writer.close() 95 await writer.wait_closed() 96 97 @pytest.mark.asyncio 98 async def test_lookup_empty_name(self, bridge): 99 reader, writer = await _create_session(bridge, "lookuptest3") 100 writer.write(b"DEST LOOKUP NAME=\n") 101 await writer.drain() 102 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 103 assert b"KEY_NOT_FOUND" in resp 104 writer.close() 105 await writer.wait_closed() 106 107 108class TestNamingLookup: 109 @pytest.mark.asyncio 110 async def test_naming_me(self, bridge): 111 reader, writer = await _create_session(bridge, "namingtest") 112 writer.write(b"NAMING LOOKUP NAME=ME\n") 113 await writer.drain() 114 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 115 assert b"NAMING REPLY" in resp 116 writer.close() 117 await writer.wait_closed() 118 119 @pytest.mark.asyncio 120 async def test_naming_unknown(self, bridge): 121 reader, writer = await _create_session(bridge, "namingtest2") 122 writer.write(b"NAMING LOOKUP NAME=unknown.i2p\n") 123 await writer.drain() 124 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 125 # KEY_NOT_FOUND for unknown names 126 text = resp.decode() 127 assert "NAMING" in text 128 writer.close() 129 await writer.wait_closed() 130 131 @pytest.mark.asyncio 132 async def test_naming_empty(self, bridge): 133 reader, writer = await _create_session(bridge, "namingtest3") 134 writer.write(b"NAMING LOOKUP NAME=\n") 135 await writer.drain() 136 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 137 writer.close() 138 await writer.wait_closed() 139 140 141class TestSessionEdgeCases: 142 @pytest.mark.asyncio 143 async def test_session_no_id(self, bridge): 144 reader, writer = await _hello(bridge) 145 writer.write(b"SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT\n") 146 await writer.drain() 147 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 148 assert b"I2P_ERROR" in resp or b"RESULT=" in resp 149 writer.close() 150 await writer.wait_closed() 151 152 @pytest.mark.asyncio 153 async def test_session_unknown_opcode(self, bridge): 154 reader, writer = await _hello(bridge) 155 writer.write(b"SESSION BOGUS ID=test STYLE=STREAM\n") 156 await writer.drain() 157 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 158 assert b"I2P_ERROR" in resp or b"Unknown" in resp 159 writer.close() 160 await writer.wait_closed() 161 162 @pytest.mark.asyncio 163 async def test_datagram_session_create(self, bridge): 164 reader, writer = await _hello(bridge) 165 writer.write(b"SESSION CREATE ID=dgram STYLE=DATAGRAM DESTINATION=TRANSIENT PORT=0\n") 166 await writer.drain() 167 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 168 assert b"RESULT=OK" in resp 169 writer.close() 170 await writer.wait_closed() 171 172 @pytest.mark.asyncio 173 async def test_raw_session_create(self, bridge): 174 reader, writer = await _hello(bridge) 175 writer.write(b"SESSION CREATE ID=rawtest STYLE=RAW DESTINATION=TRANSIENT PROTOCOL=18\n") 176 await writer.drain() 177 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 178 assert b"RESULT=OK" in resp 179 writer.close() 180 await writer.wait_closed() 181 182 @pytest.mark.asyncio 183 async def test_primary_session_create(self, bridge): 184 reader, writer = await _hello(bridge) 185 writer.write(b"SESSION CREATE ID=primary STYLE=PRIMARY DESTINATION=TRANSIENT\n") 186 await writer.drain() 187 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 188 assert b"RESULT=OK" in resp 189 writer.close() 190 await writer.wait_closed() 191 192 193class TestStreamEdgeCases: 194 @pytest.mark.asyncio 195 async def test_stream_without_session(self, bridge): 196 reader, writer = await _hello(bridge) 197 writer.write(b"STREAM CONNECT ID=nosession DESTINATION=AAAA\n") 198 await writer.drain() 199 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 200 assert b"I2P_ERROR" in resp 201 writer.close() 202 await writer.wait_closed() 203 204 @pytest.mark.asyncio 205 async def test_stream_unknown_opcode(self, bridge): 206 reader, writer = await _create_session(bridge, "streamtest") 207 writer.write(b"STREAM BOGUS ID=streamtest\n") 208 await writer.drain() 209 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 210 assert b"I2P_ERROR" in resp or b"Unknown" in resp 211 writer.close() 212 await writer.wait_closed() 213 214 215class TestHelloEdgeCases: 216 @pytest.mark.asyncio 217 async def test_hello_wrong_verb(self, bridge): 218 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) 219 writer.write(b"NOTAHELLO VERSION MIN=3.0 MAX=3.3\n") 220 await writer.drain() 221 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 222 assert b"NOVERSION" in resp or b"RESULT=" in resp 223 writer.close() 224 await writer.wait_closed() 225 226 @pytest.mark.asyncio 227 async def test_hello_empty_line(self, bridge): 228 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) 229 writer.write(b"\n") 230 await writer.drain() 231 # Connection should close 232 data = await asyncio.wait_for(reader.read(1024), timeout=5.0) 233 writer.close() 234 await writer.wait_closed() 235 236 237class TestUnknownVerb: 238 @pytest.mark.asyncio 239 async def test_unknown_command(self, bridge): 240 reader, writer = await _hello(bridge) 241 writer.write(b"FOOBAR SOMETHING\n") 242 await writer.drain() 243 await asyncio.sleep(0.2) 244 # Should not crash, connection stays open 245 assert not reader.at_eof() 246 writer.close() 247 await writer.wait_closed()