A Python port of the Invisible Internet Project (I2P)
at main 118 lines 4.3 kB view raw
1"""Tests for SAM primary/multi session.""" 2 3import asyncio 4 5import pytest 6 7from i2p_sam.bridge import SAMBridge 8 9 10@pytest.fixture 11async def bridge(): 12 """Create and start a SAMBridge on a random port.""" 13 b = SAMBridge(host="127.0.0.1", port=0) 14 await b.start() 15 yield b 16 await b.stop() 17 18 19async def _connect_and_hello(bridge: SAMBridge) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: 20 """Helper: connect and complete HELLO handshake.""" 21 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port) 22 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") 23 await writer.drain() 24 response = await asyncio.wait_for(reader.readline(), timeout=5.0) 25 assert b"RESULT=OK" in response 26 return reader, writer 27 28 29class TestPrimarySession: 30 """Tests for PRIMARY session type.""" 31 32 @pytest.mark.asyncio 33 async def test_primary_session_create(self, bridge): 34 """STYLE=PRIMARY succeeds.""" 35 reader, writer = await _connect_and_hello(bridge) 36 37 writer.write(b"SESSION CREATE ID=primary1 STYLE=PRIMARY DESTINATION=TRANSIENT\n") 38 await writer.drain() 39 40 response = await asyncio.wait_for(reader.readline(), timeout=5.0) 41 text = response.decode("utf-8").strip() 42 assert "SESSION STATUS RESULT=OK" in text 43 assert "DESTINATION=" in text 44 45 writer.close() 46 await writer.wait_closed() 47 48 @pytest.mark.asyncio 49 async def test_primary_add_subsession(self, bridge): 50 """SESSION ADD with STREAM style succeeds.""" 51 reader, writer = await _connect_and_hello(bridge) 52 53 # Create primary session first 54 writer.write(b"SESSION CREATE ID=primary2 STYLE=PRIMARY DESTINATION=TRANSIENT\n") 55 await writer.drain() 56 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 57 assert b"RESULT=OK" in resp 58 59 # Add a STREAM subsession 60 writer.write(b"SESSION ADD ID=primary2 STYLE=STREAM FROM_PORT=0 TO_PORT=0\n") 61 await writer.drain() 62 resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0) 63 text = resp2.decode("utf-8").strip() 64 assert "SESSION STATUS RESULT=OK" in text 65 66 writer.close() 67 await writer.wait_closed() 68 69 @pytest.mark.asyncio 70 async def test_primary_remove_subsession(self, bridge): 71 """SESSION REMOVE removes a subsession.""" 72 reader, writer = await _connect_and_hello(bridge) 73 74 writer.write(b"SESSION CREATE ID=primary3 STYLE=PRIMARY DESTINATION=TRANSIENT\n") 75 await writer.drain() 76 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 77 assert b"RESULT=OK" in resp 78 79 # Add then remove 80 writer.write(b"SESSION ADD ID=primary3 STYLE=STREAM FROM_PORT=8000 TO_PORT=0\n") 81 await writer.drain() 82 resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0) 83 assert b"RESULT=OK" in resp2 84 85 writer.write(b"SESSION REMOVE ID=primary3 FROM_PORT=8000\n") 86 await writer.drain() 87 resp3 = await asyncio.wait_for(reader.readline(), timeout=5.0) 88 text = resp3.decode("utf-8").strip() 89 assert "SESSION STATUS RESULT=OK" in text 90 91 writer.close() 92 await writer.wait_closed() 93 94 @pytest.mark.asyncio 95 async def test_primary_duplicate_subsession(self, bridge): 96 """Duplicate sub-ID is rejected.""" 97 reader, writer = await _connect_and_hello(bridge) 98 99 writer.write(b"SESSION CREATE ID=primary4 STYLE=PRIMARY DESTINATION=TRANSIENT\n") 100 await writer.drain() 101 resp = await asyncio.wait_for(reader.readline(), timeout=5.0) 102 assert b"RESULT=OK" in resp 103 104 # Add subsession 105 writer.write(b"SESSION ADD ID=primary4 STYLE=STREAM FROM_PORT=9000 TO_PORT=0\n") 106 await writer.drain() 107 resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0) 108 assert b"RESULT=OK" in resp2 109 110 # Try same FROM_PORT again 111 writer.write(b"SESSION ADD ID=primary4 STYLE=STREAM FROM_PORT=9000 TO_PORT=0\n") 112 await writer.drain() 113 resp3 = await asyncio.wait_for(reader.readline(), timeout=5.0) 114 text = resp3.decode("utf-8").strip() 115 assert "DUPLICATED_ID" in text 116 117 writer.close() 118 await writer.wait_closed()