A Python port of the Invisible Internet Project (I2P)
1"""Tests for ServerTunnelTask — accept loop, connection handling."""
2
3import asyncio
4from unittest.mock import AsyncMock, MagicMock, patch
5
6import pytest
7
8from i2p_apps.i2ptunnel.tasks import TunnelStats, TunnelTask, ServerTunnelTask
9
10
11def _make_config():
12 config = MagicMock()
13 config.name = "test-server"
14 config.target_host = "127.0.0.1"
15 config.target_port = 8080
16 config.type = MagicMock()
17 return config
18
19
20class TestTunnelStats:
21 def test_defaults(self):
22 s = TunnelStats()
23 assert s.total_connections == 0
24 assert s.active_connections == 0
25 assert s.total_bytes == 0
26
27
28class TestServerTunnelTaskLifecycle:
29 @pytest.mark.asyncio
30 async def test_open_sets_open(self):
31 config = _make_config()
32 session = AsyncMock()
33 # accept will block; cancel it quickly
34 session.accept = AsyncMock(side_effect=asyncio.CancelledError)
35 task = ServerTunnelTask(config, session)
36 await task.open()
37 assert task.is_open is True
38 await task.close()
39 assert task.is_open is False
40
41 @pytest.mark.asyncio
42 async def test_close_without_open(self):
43 config = _make_config()
44 session = AsyncMock()
45 task = ServerTunnelTask(config, session)
46 await task.close()
47 assert task.is_open is False
48
49
50class TestServerAcceptLoop:
51 @pytest.mark.asyncio
52 async def test_accept_and_handle(self):
53 config = _make_config()
54 session = AsyncMock()
55
56 i2p_reader = AsyncMock()
57 i2p_writer = MagicMock()
58 i2p_writer.close = MagicMock()
59 i2p_writer.wait_closed = AsyncMock()
60
61 call_count = 0
62
63 async def mock_accept():
64 nonlocal call_count
65 call_count += 1
66 if call_count > 1:
67 raise asyncio.CancelledError
68 return ("remote-dest", i2p_reader, i2p_writer)
69
70 session.accept = mock_accept
71 task = ServerTunnelTask(config, session)
72
73 with patch("i2p_apps.i2ptunnel.tasks.asyncio.open_connection", new_callable=AsyncMock) as mock_conn:
74 local_reader = AsyncMock()
75 local_writer = MagicMock()
76 local_writer.close = MagicMock()
77 local_writer.wait_closed = AsyncMock()
78 mock_conn.return_value = (local_reader, local_writer)
79
80 with patch("i2p_apps.i2ptunnel.tasks.bridge", new_callable=AsyncMock):
81 await task.open()
82 # Give the accept loop time to run
83 await asyncio.sleep(0.1)
84 await task.close()
85
86 assert task.stats.total_connections >= 1
87
88 @pytest.mark.asyncio
89 async def test_accept_connection_refused(self):
90 config = _make_config()
91 session = AsyncMock()
92
93 i2p_reader = AsyncMock()
94 i2p_writer = MagicMock()
95 i2p_writer.close = MagicMock()
96
97 call_count = 0
98
99 async def mock_accept():
100 nonlocal call_count
101 call_count += 1
102 if call_count > 1:
103 raise asyncio.CancelledError
104 return ("remote-dest", i2p_reader, i2p_writer)
105
106 session.accept = mock_accept
107 task = ServerTunnelTask(config, session)
108
109 with patch("i2p_apps.i2ptunnel.tasks.asyncio.open_connection", new_callable=AsyncMock) as mock_conn:
110 mock_conn.side_effect = ConnectionRefusedError("refused")
111 await task.open()
112 await asyncio.sleep(0.1)
113 await task.close()
114
115 @pytest.mark.asyncio
116 async def test_accept_loop_error_recovery(self):
117 config = _make_config()
118 session = AsyncMock()
119
120 call_count = 0
121
122 async def mock_accept():
123 nonlocal call_count
124 call_count += 1
125 if call_count == 1:
126 raise RuntimeError("transient error")
127 raise asyncio.CancelledError
128
129 session.accept = mock_accept
130 task = ServerTunnelTask(config, session)
131 await task.open()
132 await asyncio.sleep(1.5) # Must exceed the 1.0s sleep in error handler
133 await task.close()
134
135
136class TestServerHandleIncoming:
137 @pytest.mark.asyncio
138 async def test_handle_incoming_calls_bridge(self):
139 config = _make_config()
140 session = AsyncMock()
141 task = ServerTunnelTask(config, session)
142
143 i2p_reader = AsyncMock()
144 i2p_writer = MagicMock()
145 i2p_writer.close = MagicMock()
146 local_reader = AsyncMock()
147 local_writer = MagicMock()
148 local_writer.close = MagicMock()
149
150 with patch("i2p_apps.i2ptunnel.tasks.bridge", new_callable=AsyncMock) as mock_bridge:
151 await task.handle_incoming(
152 "remote-dest", i2p_reader, i2p_writer, local_reader, local_writer
153 )
154 mock_bridge.assert_called_once()