A Python port of the Invisible Internet Project (I2P)
1"""Tests for SAM stream session handling."""
2
3import asyncio
4
5import pytest
6
7from i2p_sam.bridge import SAMBridge
8
9
10@pytest.fixture
11async def bridge():
12 """Create and start a SAMBridge on a random port."""
13 b = SAMBridge(host="127.0.0.1", port=0)
14 await b.start()
15 yield b
16 # Cancel all handler tasks before stopping the server to avoid
17 # blocking on handlers stuck in STREAM ACCEPT's 300s queue wait.
18 for task in list(b._handler_tasks):
19 task.cancel()
20 if b._handler_tasks:
21 await asyncio.gather(*b._handler_tasks, return_exceptions=True)
22 await b.stop()
23
24
25async def _connect_and_hello(bridge: SAMBridge) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
26 """Helper: connect and complete HELLO handshake."""
27 reader, writer = await asyncio.open_connection("127.0.0.1", bridge.port)
28 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n")
29 await writer.drain()
30 response = await asyncio.wait_for(reader.readline(), timeout=5.0)
31 assert b"RESULT=OK" in response
32 return reader, writer
33
34
35class TestSessionCreate:
36 """Tests for SESSION CREATE."""
37
38 @pytest.mark.asyncio
39 async def test_session_create_transient(self, bridge):
40 """Create session with TRANSIENT dest, get base64 back."""
41 reader, writer = await _connect_and_hello(bridge)
42
43 writer.write(b"SESSION CREATE ID=teststream STYLE=STREAM DESTINATION=TRANSIENT\n")
44 await writer.drain()
45
46 response = await asyncio.wait_for(reader.readline(), timeout=5.0)
47 text = response.decode("utf-8").strip()
48 assert "SESSION STATUS RESULT=OK" in text
49 assert "DESTINATION=" in text
50 # The destination should be a non-empty base64 string
51 dest_part = text.split("DESTINATION=")[1]
52 assert len(dest_part) > 10
53
54 writer.close()
55 await writer.wait_closed()
56
57 @pytest.mark.asyncio
58 async def test_session_create_duplicate_id(self, bridge):
59 """Second CREATE with same ID gets DUPLICATED_ID."""
60 r1, w1 = await _connect_and_hello(bridge)
61 r2, w2 = await _connect_and_hello(bridge)
62
63 # First client creates session
64 w1.write(b"SESSION CREATE ID=duptest STYLE=STREAM DESTINATION=TRANSIENT\n")
65 await w1.drain()
66 resp1 = await asyncio.wait_for(r1.readline(), timeout=5.0)
67 assert b"RESULT=OK" in resp1
68
69 # Second client tries same ID
70 w2.write(b"SESSION CREATE ID=duptest STYLE=STREAM DESTINATION=TRANSIENT\n")
71 await w2.drain()
72 resp2 = await asyncio.wait_for(r2.readline(), timeout=5.0)
73 assert b"DUPLICATED_ID" in resp2
74
75 w1.close()
76 w2.close()
77 await w1.wait_closed()
78 await w2.wait_closed()
79
80
81class TestStreamCommands:
82 """Tests for STREAM CONNECT/ACCEPT format."""
83
84 @pytest.mark.asyncio
85 async def test_stream_connect_format(self, bridge):
86 """STREAM CONNECT sends correct status reply."""
87 reader, writer = await _connect_and_hello(bridge)
88
89 # Create session first
90 writer.write(b"SESSION CREATE ID=conntest STYLE=STREAM DESTINATION=TRANSIENT\n")
91 await writer.drain()
92 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
93 assert b"RESULT=OK" in resp
94
95 # STREAM CONNECT to a fake destination (will fail with CANT_REACH_PEER
96 # since there's no real I2P router, but format should be correct)
97 fake_dest = "A" * 344 # Fake base64 destination
98 writer.write(f"STREAM CONNECT ID=conntest DESTINATION={fake_dest}\n".encode())
99 await writer.drain()
100
101 response = await asyncio.wait_for(reader.readline(), timeout=5.0)
102 text = response.decode("utf-8").strip()
103 # Should get a STREAM STATUS reply (OK or error)
104 assert "STREAM STATUS" in text
105
106 writer.close()
107 await writer.wait_closed()
108
109 @pytest.mark.asyncio
110 @pytest.mark.timeout(15)
111 async def test_stream_accept_format(self, bridge):
112 """STREAM ACCEPT is dispatched without error.
113
114 The accept handler blocks on an internal queue (waiting for an
115 incoming I2P connection that will never arrive in tests), so we
116 cannot wait for a reply. Instead we verify the session is
117 created, send STREAM ACCEPT, confirm the connection stays alive
118 briefly, then tear down.
119 """
120 reader, writer = await _connect_and_hello(bridge)
121
122 writer.write(b"SESSION CREATE ID=acctest STYLE=STREAM DESTINATION=TRANSIENT\n")
123 await writer.drain()
124 resp = await asyncio.wait_for(reader.readline(), timeout=5.0)
125 assert b"RESULT=OK" in resp
126
127 # STREAM ACCEPT — will block server-side waiting for incoming.
128 # We just verify the command doesn't crash the bridge.
129 writer.write(b"STREAM ACCEPT ID=acctest\n")
130 await writer.drain()
131
132 # Brief pause to let the bridge dispatch the command
133 await asyncio.sleep(0.3)
134
135 # Verify the connection is still alive (not errored out)
136 assert not reader.at_eof()
137
138 # Force-close to avoid blocking on bridge's 300s accept timeout
139 if writer.can_write_eof():
140 writer.write_eof()
141 writer.close()
142 try:
143 await asyncio.wait_for(writer.wait_closed(), timeout=2.0)
144 except (asyncio.TimeoutError, ConnectionResetError, BrokenPipeError):
145 pass
146
147
148class TestQuit:
149 """Tests for QUIT/STOP/EXIT commands."""
150
151 @pytest.mark.asyncio
152 async def test_quit_closes_connection(self, bridge):
153 """QUIT/STOP/EXIT close cleanly."""
154 reader, writer = await _connect_and_hello(bridge)
155
156 writer.write(b"QUIT\n")
157 await writer.drain()
158
159 # Connection should close — readline returns empty
160 data = await asyncio.wait_for(reader.read(1024), timeout=5.0)
161 # After QUIT, server closes connection, so we get EOF
162 assert data == b""
163
164 writer.close()
165 await writer.wait_closed()