"""Additional handler tests for coverage — DEST, NAMING, PING, DATAGRAM, RAW, edge cases.""" import asyncio import pytest from i2p_sam.bridge import SAMBridge @pytest.fixture async def bridge(): b = SAMBridge(host="127.0.0.1", port=0) await b.start() yield b for task in list(b._handler_tasks): task.cancel() if b._handler_tasks: await asyncio.gather(*b._handler_tasks, return_exceptions=True) await b.stop() async def _hello(bridge): reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp return reader, writer async def _create_session(bridge, session_id="test", style="STREAM"): reader, writer = await _hello(bridge) writer.write(f"SESSION CREATE ID={session_id} STYLE={style} DESTINATION=TRANSIENT\n".encode()) await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp return reader, writer class TestPing: @pytest.mark.asyncio async def test_ping_pong(self, bridge): reader, writer = await _hello(bridge) writer.write(b"PING test123\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"PONG" in resp assert b"test123" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_ping_empty(self, bridge): reader, writer = await _hello(bridge) writer.write(b"PING \n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"PONG" in resp writer.close() await writer.wait_closed() class TestDestGenerate: @pytest.mark.asyncio async def test_dest_generate(self, bridge): reader, writer = await _hello(bridge) writer.write(b"DEST GENERATE\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"DEST REPLY" in resp assert b"TRANSIENT" in resp writer.close() await writer.wait_closed() class TestDestLookup: @pytest.mark.asyncio async def test_lookup_me(self, bridge): reader, writer = await _create_session(bridge, "lookuptest") writer.write(b"DEST LOOKUP NAME=ME\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"DEST REPLY" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_lookup_unknown(self, bridge): reader, writer = await _create_session(bridge, "lookuptest2") writer.write(b"DEST LOOKUP NAME=unknown.i2p\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"KEY_NOT_FOUND" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_lookup_empty_name(self, bridge): reader, writer = await _create_session(bridge, "lookuptest3") writer.write(b"DEST LOOKUP NAME=\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"KEY_NOT_FOUND" in resp writer.close() await writer.wait_closed() class TestNamingLookup: @pytest.mark.asyncio async def test_naming_me(self, bridge): reader, writer = await _create_session(bridge, "namingtest") writer.write(b"NAMING LOOKUP NAME=ME\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"NAMING REPLY" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_naming_unknown(self, bridge): reader, writer = await _create_session(bridge, "namingtest2") writer.write(b"NAMING LOOKUP NAME=unknown.i2p\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) # KEY_NOT_FOUND for unknown names text = resp.decode() assert "NAMING" in text writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_naming_empty(self, bridge): reader, writer = await _create_session(bridge, "namingtest3") writer.write(b"NAMING LOOKUP NAME=\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) writer.close() await writer.wait_closed() class TestSessionEdgeCases: @pytest.mark.asyncio async def test_session_no_id(self, bridge): reader, writer = await _hello(bridge) writer.write(b"SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"I2P_ERROR" in resp or b"RESULT=" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_session_unknown_opcode(self, bridge): reader, writer = await _hello(bridge) writer.write(b"SESSION BOGUS ID=test STYLE=STREAM\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"I2P_ERROR" in resp or b"Unknown" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_datagram_session_create(self, bridge): reader, writer = await _hello(bridge) writer.write(b"SESSION CREATE ID=dgram STYLE=DATAGRAM DESTINATION=TRANSIENT PORT=0\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_raw_session_create(self, bridge): reader, writer = await _hello(bridge) writer.write(b"SESSION CREATE ID=rawtest STYLE=RAW DESTINATION=TRANSIENT PROTOCOL=18\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_primary_session_create(self, bridge): reader, writer = await _hello(bridge) writer.write(b"SESSION CREATE ID=primary STYLE=PRIMARY DESTINATION=TRANSIENT\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp writer.close() await writer.wait_closed() class TestStreamEdgeCases: @pytest.mark.asyncio async def test_stream_without_session(self, bridge): reader, writer = await _hello(bridge) writer.write(b"STREAM CONNECT ID=nosession DESTINATION=AAAA\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"I2P_ERROR" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_stream_unknown_opcode(self, bridge): reader, writer = await _create_session(bridge, "streamtest") writer.write(b"STREAM BOGUS ID=streamtest\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"I2P_ERROR" in resp or b"Unknown" in resp writer.close() await writer.wait_closed() class TestHelloEdgeCases: @pytest.mark.asyncio async def test_hello_wrong_verb(self, bridge): reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) writer.write(b"NOTAHELLO VERSION MIN=3.0 MAX=3.3\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"NOVERSION" in resp or b"RESULT=" in resp writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_hello_empty_line(self, bridge): reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) writer.write(b"\n") await writer.drain() # Connection should close data = await asyncio.wait_for(reader.read(1024), timeout=5.0) writer.close() await writer.wait_closed() class TestUnknownVerb: @pytest.mark.asyncio async def test_unknown_command(self, bridge): reader, writer = await _hello(bridge) writer.write(b"FOOBAR SOMETHING\n") await writer.drain() await asyncio.sleep(0.2) # Should not crash, connection stays open assert not reader.at_eof() writer.close() await writer.wait_closed()