A Python port of the Invisible Internet Project (I2P)
at main 165 lines 6.0 kB view raw
1"""Tests for SAM stream session handling.""" 2 3import asyncio 4 5import pytest 6 7from i2p_sam.bridge import SAMBridge 8 9 10@pytest.fixture 11async def bridge(): 12 """Create and start a SAMBridge on a random port.""" 13 b = SAMBridge(host="127.0.0.1", port=0) 14 await b.start() 15 yield b 16 # Cancel all handler tasks before stopping the server to avoid 17 # blocking on handlers stuck in STREAM ACCEPT's 300s queue wait. 18 for task in list(b._handler_tasks): 19 task.cancel() 20 if b._handler_tasks: 21 await asyncio.gather(*b._handler_tasks, return_exceptions=True) 22 await b.stop() 23 24 25async def _connect_and_hello(bridge: SAMBridge) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: 26 """Helper: connect and complete HELLO handshake.""" 27 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) 28 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") 29 await writer.drain() 30 response = await asyncio.wait_for(reader.readline(), timeout=5.0) 31 assert b"RESULT=OK" in response 32 return reader, writer 33 34 35class TestSessionCreate: 36 """Tests for SESSION CREATE.""" 37 38 @pytest.mark.asyncio 39 async def test_session_create_transient(self, bridge): 40 """Create session with TRANSIENT dest, get base64 back.""" 41 reader, writer = await _connect_and_hello(bridge) 42 43 writer.write(b"SESSION CREATE ID=teststream STYLE=STREAM DESTINATION=TRANSIENT\n") 44 await writer.drain() 45 46 response = await asyncio.wait_for(reader.readline(), timeout=5.0) 47 text = response.decode("utf-8").strip() 48 assert "SESSION STATUS RESULT=OK" in text 49 assert "DESTINATION=" in text 50 # The destination should be a non-empty base64 string 51 dest_part = text.split("DESTINATION=")[1] 52 assert len(dest_part) > 10 53 54 writer.close() 55 await writer.wait_closed() 56 57 @pytest.mark.asyncio 58 async def test_session_create_duplicate_id(self, bridge): 59 """Second CREATE with same ID gets DUPLICATED_ID.""" 60 r1, w1 = await _connect_and_hello(bridge) 61 r2, w2 = await _connect_and_hello(bridge) 62 63 # First client creates session 64 w1.write(b"SESSION CREATE ID=duptest STYLE=STREAM DESTINATION=TRANSIENT\n") 65 await w1.drain() 66 resp1 = await asyncio.wait_for(r1.readline(), timeout=5.0) 67 assert b"RESULT=OK" in resp1 68 69 # Second client tries same ID 70 w2.write(b"SESSION CREATE ID=duptest STYLE=STREAM DESTINATION=TRANSIENT\n") 71 await w2.drain() 72 resp2 = await asyncio.wait_for(r2.readline(), timeout=5.0) 73 assert b"DUPLICATED_ID" in resp2 74 75 w1.close() 76 w2.close() 77 await w1.wait_closed() 78 await w2.wait_closed() 79 80 81class TestStreamCommands: 82 """Tests for STREAM CONNECT/ACCEPT format.""" 83 84 @pytest.mark.asyncio 85 async def test_stream_connect_format(self, bridge): 86 """STREAM CONNECT sends correct status reply.""" 87 reader, writer = await _connect_and_hello(bridge) 88 89 # Create session first 90 writer.write(b"SESSION CREATE ID=conntest STYLE=STREAM DESTINATION=TRANSIENT\n") 91 await writer.drain() 92 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 93 assert b"RESULT=OK" in resp 94 95 # STREAM CONNECT to a fake destination (will fail with CANT_REACH_PEER 96 # since there's no real I2P router, but format should be correct) 97 fake_dest = "A" * 344 # Fake base64 destination 98 writer.write(f"STREAM CONNECT ID=conntest DESTINATION={fake_dest}\n".encode()) 99 await writer.drain() 100 101 response = await asyncio.wait_for(reader.readline(), timeout=5.0) 102 text = response.decode("utf-8").strip() 103 # Should get a STREAM STATUS reply (OK or error) 104 assert "STREAM STATUS" in text 105 106 writer.close() 107 await writer.wait_closed() 108 109 @pytest.mark.asyncio 110 @pytest.mark.timeout(15) 111 async def test_stream_accept_format(self, bridge): 112 """STREAM ACCEPT is dispatched without error. 113 114 The accept handler blocks on an internal queue (waiting for an 115 incoming I2P connection that will never arrive in tests), so we 116 cannot wait for a reply. Instead we verify the session is 117 created, send STREAM ACCEPT, confirm the connection stays alive 118 briefly, then tear down. 119 """ 120 reader, writer = await _connect_and_hello(bridge) 121 122 writer.write(b"SESSION CREATE ID=acctest STYLE=STREAM DESTINATION=TRANSIENT\n") 123 await writer.drain() 124 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 125 assert b"RESULT=OK" in resp 126 127 # STREAM ACCEPT — will block server-side waiting for incoming. 128 # We just verify the command doesn't crash the bridge. 129 writer.write(b"STREAM ACCEPT ID=acctest\n") 130 await writer.drain() 131 132 # Brief pause to let the bridge dispatch the command 133 await asyncio.sleep(0.3) 134 135 # Verify the connection is still alive (not errored out) 136 assert not reader.at_eof() 137 138 # Force-close to avoid blocking on bridge's 300s accept timeout 139 if writer.can_write_eof(): 140 writer.write_eof() 141 writer.close() 142 try: 143 await asyncio.wait_for(writer.wait_closed(), timeout=2.0) 144 except (asyncio.TimeoutError, ConnectionResetError, BrokenPipeError): 145 pass 146 147 148class TestQuit: 149 """Tests for QUIT/STOP/EXIT commands.""" 150 151 @pytest.mark.asyncio 152 async def test_quit_closes_connection(self, bridge): 153 """QUIT/STOP/EXIT close cleanly.""" 154 reader, writer = await _connect_and_hello(bridge) 155 156 writer.write(b"QUIT\n") 157 await writer.drain() 158 159 # Connection should close — readline returns empty 160 data = await asyncio.wait_for(reader.read(1024), timeout=5.0) 161 # After QUIT, server closes connection, so we get EOF 162 assert data == b"" 163 164 writer.close() 165 await writer.wait_closed()