A Python port of the Invisible Internet Project (I2P)
1"""Tests for SAM primary/multi session."""
2
3import asyncio
4
5import pytest
6
7from i2p_sam.bridge import SAMBridge
8
9
10@pytest.fixture
11async def bridge():
12 """Create and start a SAMBridge on a random port."""
13 b = SAMBridge(host="127.0.0.1", port=0)
14 await b.start()
15 yield b
16 await b.stop()
17
18
19async def _connect_and_hello(bridge: SAMBridge) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
20 """Helper: connect and complete HELLO handshake."""
21 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port)
22 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n")
23 await writer.drain()
24 response = await asyncio.wait_for(reader.readline(), timeout=5.0)
25 assert b"RESULT=OK" in response
26 return reader, writer
27
28
29class TestPrimarySession:
30 """Tests for PRIMARY session type."""
31
32 @pytest.mark.asyncio
33 async def test_primary_session_create(self, bridge):
34 """STYLE=PRIMARY succeeds."""
35 reader, writer = await _connect_and_hello(bridge)
36
37 writer.write(b"SESSION CREATE ID=primary1 STYLE=PRIMARY DESTINATION=TRANSIENT\n")
38 await writer.drain()
39
40 response = await asyncio.wait_for(reader.readline(), timeout=5.0)
41 text = response.decode("utf-8").strip()
42 assert "SESSION STATUS RESULT=OK" in text
43 assert "DESTINATION=" in text
44
45 writer.close()
46 await writer.wait_closed()
47
48 @pytest.mark.asyncio
49 async def test_primary_add_subsession(self, bridge):
50 """SESSION ADD with STREAM style succeeds."""
51 reader, writer = await _connect_and_hello(bridge)
52
53 # Create primary session first
54 writer.write(b"SESSION CREATE ID=primary2 STYLE=PRIMARY DESTINATION=TRANSIENT\n")
55 await writer.drain()
56 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
57 assert b"RESULT=OK" in resp
58
59 # Add a STREAM subsession
60 writer.write(b"SESSION ADD ID=primary2 STYLE=STREAM FROM_PORT=0 TO_PORT=0\n")
61 await writer.drain()
62 resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0)
63 text = resp2.decode("utf-8").strip()
64 assert "SESSION STATUS RESULT=OK" in text
65
66 writer.close()
67 await writer.wait_closed()
68
69 @pytest.mark.asyncio
70 async def test_primary_remove_subsession(self, bridge):
71 """SESSION REMOVE removes a subsession."""
72 reader, writer = await _connect_and_hello(bridge)
73
74 writer.write(b"SESSION CREATE ID=primary3 STYLE=PRIMARY DESTINATION=TRANSIENT\n")
75 await writer.drain()
76 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
77 assert b"RESULT=OK" in resp
78
79 # Add then remove
80 writer.write(b"SESSION ADD ID=primary3 STYLE=STREAM FROM_PORT=8000 TO_PORT=0\n")
81 await writer.drain()
82 resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0)
83 assert b"RESULT=OK" in resp2
84
85 writer.write(b"SESSION REMOVE ID=primary3 FROM_PORT=8000\n")
86 await writer.drain()
87 resp3 = await asyncio.wait_for(reader.readline(), timeout=5.0)
88 text = resp3.decode("utf-8").strip()
89 assert "SESSION STATUS RESULT=OK" in text
90
91 writer.close()
92 await writer.wait_closed()
93
94 @pytest.mark.asyncio
95 async def test_primary_duplicate_subsession(self, bridge):
96 """Duplicate sub-ID is rejected."""
97 reader, writer = await _connect_and_hello(bridge)
98
99 writer.write(b"SESSION CREATE ID=primary4 STYLE=PRIMARY DESTINATION=TRANSIENT\n")
100 await writer.drain()
101 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
102 assert b"RESULT=OK" in resp
103
104 # Add subsession
105 writer.write(b"SESSION ADD ID=primary4 STYLE=STREAM FROM_PORT=9000 TO_PORT=0\n")
106 await writer.drain()
107 resp2 = await asyncio.wait_for(reader.readline(), timeout=5.0)
108 assert b"RESULT=OK" in resp2
109
110 # Try same FROM_PORT again
111 writer.write(b"SESSION ADD ID=primary4 STYLE=STREAM FROM_PORT=9000 TO_PORT=0\n")
112 await writer.drain()
113 resp3 = await asyncio.wait_for(reader.readline(), timeout=5.0)
114 text = resp3.decode("utf-8").strip()
115 assert "DUPLICATED_ID" in text
116
117 writer.close()
118 await writer.wait_closed()