"""Tests for SAM stream session handling.""" import asyncio import pytest from i2p_sam.bridge import SAMBridge @pytest.fixture async def bridge(): """Create and start a SAMBridge on a random port.""" b = SAMBridge(host="127.0.0.1", port=0) await b.start() yield b # Cancel all handler tasks before stopping the server to avoid # blocking on handlers stuck in STREAM ACCEPT's 300s queue wait. for task in list(b._handler_tasks): task.cancel() if b._handler_tasks: await asyncio.gather(*b._handler_tasks, return_exceptions=True) await b.stop() async def _connect_and_hello(bridge: SAMBridge) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Helper: connect and complete HELLO handshake.""" reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") await writer.drain() response = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in response return reader, writer class TestSessionCreate: """Tests for SESSION CREATE.""" @pytest.mark.asyncio async def test_session_create_transient(self, bridge): """Create session with TRANSIENT dest, get base64 back.""" reader, writer = await _connect_and_hello(bridge) writer.write(b"SESSION CREATE ID=teststream STYLE=STREAM DESTINATION=TRANSIENT\n") await writer.drain() response = await asyncio.wait_for(reader.readline(), timeout=5.0) text = response.decode("utf-8").strip() assert "SESSION STATUS RESULT=OK" in text assert "DESTINATION=" in text # The destination should be a non-empty base64 string dest_part = text.split("DESTINATION=")[1] assert len(dest_part) > 10 writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_session_create_duplicate_id(self, bridge): """Second CREATE with same ID gets DUPLICATED_ID.""" r1, w1 = await _connect_and_hello(bridge) r2, w2 = await _connect_and_hello(bridge) # First client creates session w1.write(b"SESSION CREATE ID=duptest STYLE=STREAM DESTINATION=TRANSIENT\n") await w1.drain() resp1 = await asyncio.wait_for(r1.readline(), timeout=5.0) assert b"RESULT=OK" in resp1 # Second client tries same ID w2.write(b"SESSION CREATE ID=duptest STYLE=STREAM DESTINATION=TRANSIENT\n") await w2.drain() resp2 = await asyncio.wait_for(r2.readline(), timeout=5.0) assert b"DUPLICATED_ID" in resp2 w1.close() w2.close() await w1.wait_closed() await w2.wait_closed() class TestStreamCommands: """Tests for STREAM CONNECT/ACCEPT format.""" @pytest.mark.asyncio async def test_stream_connect_format(self, bridge): """STREAM CONNECT sends correct status reply.""" reader, writer = await _connect_and_hello(bridge) # Create session first writer.write(b"SESSION CREATE ID=conntest STYLE=STREAM DESTINATION=TRANSIENT\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp # STREAM CONNECT to a fake destination (will fail with CANT_REACH_PEER # since there's no real I2P router, but format should be correct) fake_dest = "A" * 344 # Fake base64 destination writer.write(f"STREAM CONNECT ID=conntest DESTINATION={fake_dest}\n".encode()) await writer.drain() response = await asyncio.wait_for(reader.readline(), timeout=5.0) text = response.decode("utf-8").strip() # Should get a STREAM STATUS reply (OK or error) assert "STREAM STATUS" in text writer.close() await writer.wait_closed() @pytest.mark.asyncio @pytest.mark.timeout(15) async def test_stream_accept_format(self, bridge): """STREAM ACCEPT is dispatched without error. The accept handler blocks on an internal queue (waiting for an incoming I2P connection that will never arrive in tests), so we cannot wait for a reply. Instead we verify the session is created, send STREAM ACCEPT, confirm the connection stays alive briefly, then tear down. """ reader, writer = await _connect_and_hello(bridge) writer.write(b"SESSION CREATE ID=acctest STYLE=STREAM DESTINATION=TRANSIENT\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp # STREAM ACCEPT — will block server-side waiting for incoming. # We just verify the command doesn't crash the bridge. writer.write(b"STREAM ACCEPT ID=acctest\n") await writer.drain() # Brief pause to let the bridge dispatch the command await asyncio.sleep(0.3) # Verify the connection is still alive (not errored out) assert not reader.at_eof() # Force-close to avoid blocking on bridge's 300s accept timeout if writer.can_write_eof(): writer.write_eof() writer.close() try: await asyncio.wait_for(writer.wait_closed(), timeout=2.0) except (asyncio.TimeoutError, ConnectionResetError, BrokenPipeError): pass class TestQuit: """Tests for QUIT/STOP/EXIT commands.""" @pytest.mark.asyncio async def test_quit_closes_connection(self, bridge): """QUIT/STOP/EXIT close cleanly.""" reader, writer = await _connect_and_hello(bridge) writer.write(b"QUIT\n") await writer.drain() # Connection should close — readline returns empty data = await asyncio.wait_for(reader.read(1024), timeout=5.0) # After QUIT, server closes connection, so we get EOF assert data == b"" writer.close() await writer.wait_closed()