"""Tests for SAM primary/multi session.""" import asyncio import pytest from i2p_sam.bridge import SAMBridge @pytest.fixture async def bridge(): """Create and start a SAMBridge on a random port.""" b = SAMBridge(host="127.0.0.1", port=0) await b.start() yield b await b.stop() async def _connect_and_hello(bridge: SAMBridge) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Helper: connect and complete HELLO handshake.""" reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") await writer.drain() response = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in response return reader, writer class TestPrimarySession: """Tests for PRIMARY session type.""" @pytest.mark.asyncio async def test_primary_session_create(self, bridge): """STYLE=PRIMARY succeeds.""" reader, writer = await _connect_and_hello(bridge) writer.write(b"SESSION CREATE ID=primary1 STYLE=PRIMARY DESTINATION=TRANSIENT\n") await writer.drain() response = await asyncio.wait_for(reader.readline(), timeout=5.0) text = response.decode("utf-8").strip() assert "SESSION STATUS RESULT=OK" in text assert "DESTINATION=" in text writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_primary_add_subsession(self, bridge): """SESSION ADD with STREAM style succeeds.""" reader, writer = await _connect_and_hello(bridge) # Create primary session first writer.write(b"SESSION CREATE ID=primary2 STYLE=PRIMARY DESTINATION=TRANSIENT\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp # Add a STREAM subsession writer.write(b"SESSION ADD ID=primary2 STYLE=STREAM FROM_PORT=0 TO_PORT=0\n") await writer.drain() resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0) text = resp2.decode("utf-8").strip() assert "SESSION STATUS RESULT=OK" in text writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_primary_remove_subsession(self, bridge): """SESSION REMOVE removes a subsession.""" reader, writer = await _connect_and_hello(bridge) writer.write(b"SESSION CREATE ID=primary3 STYLE=PRIMARY DESTINATION=TRANSIENT\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp # Add then remove writer.write(b"SESSION ADD ID=primary3 STYLE=STREAM FROM_PORT=8000 TO_PORT=0\n") await writer.drain() resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp2 writer.write(b"SESSION REMOVE ID=primary3 FROM_PORT=8000\n") await writer.drain() resp3 = await asyncio.wait_for(reader.readline(), timeout=5.0) text = resp3.decode("utf-8").strip() assert "SESSION STATUS RESULT=OK" in text writer.close() await writer.wait_closed() @pytest.mark.asyncio async def test_primary_duplicate_subsession(self, bridge): """Duplicate sub-ID is rejected.""" reader, writer = await _connect_and_hello(bridge) writer.write(b"SESSION CREATE ID=primary4 STYLE=PRIMARY DESTINATION=TRANSIENT\n") await writer.drain() resp = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp # Add subsession writer.write(b"SESSION ADD ID=primary4 STYLE=STREAM FROM_PORT=9000 TO_PORT=0\n") await writer.drain() resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0) assert b"RESULT=OK" in resp2 # Try same FROM_PORT again writer.write(b"SESSION ADD ID=primary4 STYLE=STREAM FROM_PORT=9000 TO_PORT=0\n") await writer.drain() resp3 = await asyncio.wait_for(reader.readline(), timeout=5.0) text = resp3.decode("utf-8").strip() assert "DUPLICATED_ID" in text writer.close() await writer.wait_closed()