A Python port of the Invisible Internet Project (I2P)
1"""Tests for tunnel task lifecycle — open, close, stats, connections."""
2
3import asyncio
4from unittest.mock import AsyncMock, MagicMock
5
6import pytest
7
8from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType
9from i2p_apps.i2ptunnel.tasks import (
10 ClientTunnelTask,
11 ServerTunnelTask,
12 TunnelStats,
13 TunnelTask,
14)
15
16
17def _make_config(
18 name="test",
19 tunnel_type=TunnelType.HTTPCLIENT,
20 listen_port=0,
21 target_port=0,
22 **kwargs,
23):
24 return TunnelDefinition(
25 name=name,
26 type=tunnel_type,
27 listen_port=listen_port,
28 target_port=target_port,
29 interface="127.0.0.1",
30 **kwargs,
31 )
32
33
34class ConcreteClient(ClientTunnelTask):
35 """Concrete subclass for testing."""
36 def __init__(self, config, session, handler=None):
37 super().__init__(config, session)
38 self._handler = handler or (lambda r, w: asyncio.sleep(0))
39
40 async def handle_client(self, reader, writer):
41 await self._handler(reader, writer)
42
43
44class TestTunnelStats:
45 def test_defaults(self):
46 s = TunnelStats()
47 assert s.total_connections == 0
48 assert s.active_connections == 0
49 assert s.total_bytes == 0
50
51 def test_mutable(self):
52 s = TunnelStats()
53 s.total_connections = 5
54 s.total_bytes = 1024
55 assert s.total_connections == 5
56 assert s.total_bytes == 1024
57
58
59class TestClientTunnelTask:
60 @pytest.mark.asyncio
61 async def test_open_and_close(self):
62 config = _make_config(listen_port=0)
63 session = MagicMock()
64 task = ConcreteClient(config, session)
65
66 assert not task.is_open
67 await task.open()
68 assert task.is_open
69 assert task._server is not None
70
71 await task.close()
72 assert not task.is_open
73 assert task._server is None
74
75 @pytest.mark.asyncio
76 async def test_close_idempotent(self):
77 config = _make_config(listen_port=0)
78 session = MagicMock()
79 task = ConcreteClient(config, session)
80 await task.close() # no crash when not opened
81
82 @pytest.mark.asyncio
83 async def test_tunnel_type(self):
84 config = _make_config(tunnel_type=TunnelType.HTTPCLIENT)
85 task = ConcreteClient(config, MagicMock())
86 assert task.tunnel_type == TunnelType.HTTPCLIENT
87
88 @pytest.mark.asyncio
89 async def test_stats_initial(self):
90 config = _make_config()
91 task = ConcreteClient(config, MagicMock())
92 assert task.stats.total_connections == 0
93
94 @pytest.mark.asyncio
95 async def test_accepts_connections(self):
96 """Open server, connect to it, verify stats."""
97 handled = asyncio.Event()
98
99 async def handler(reader, writer):
100 handled.set()
101
102 config = _make_config(listen_port=0)
103 task = ConcreteClient(config, MagicMock(), handler=handler)
104 await task.open()
105
106 port = task._server.sockets[0].getsockname()[1]
107 reader, writer = await asyncio.open_connection("127.0.0.1", port)
108 writer.write(b"hello")
109 await writer.drain()
110
111 await asyncio.wait_for(handled.wait(), timeout=5.0)
112
113 writer.close()
114 await writer.wait_closed()
115 await asyncio.sleep(0.1) # let stats update
116 await task.close()
117
118 assert task.stats.total_connections >= 1
119
120 @pytest.mark.asyncio
121 async def test_handler_error_doesnt_crash(self):
122 """Handler exceptions are caught, connection closed."""
123 async def bad_handler(reader, writer):
124 raise ValueError("boom")
125
126 config = _make_config(listen_port=0)
127 task = ConcreteClient(config, MagicMock(), handler=bad_handler)
128 await task.open()
129
130 port = task._server.sockets[0].getsockname()[1]
131 reader, writer = await asyncio.open_connection("127.0.0.1", port)
132 writer.write(b"data")
133 await writer.drain()
134
135 await asyncio.sleep(0.2)
136 await task.close()
137
138 # Should not crash, stats should reflect the connection
139 assert task.stats.total_connections >= 1
140 assert task.stats.active_connections == 0
141
142
143class TestServerTunnelTask:
144 @pytest.mark.asyncio
145 async def test_open_starts_accept_task(self):
146 config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080)
147 session = MagicMock()
148 # Make accept block forever
149 session.accept = AsyncMock(side_effect=asyncio.CancelledError)
150
151 task = ServerTunnelTask(config, session)
152 await task.open()
153 assert task.is_open
154 assert task._accept_task is not None
155
156 await task.close()
157 assert not task.is_open
158 assert task._accept_task is None
159
160 @pytest.mark.asyncio
161 async def test_close_without_open(self):
162 config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080)
163 task = ServerTunnelTask(config, MagicMock())
164 await task.close() # no crash
165
166 @pytest.mark.asyncio
167 async def test_stats_initial(self):
168 config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080)
169 task = ServerTunnelTask(config, MagicMock())
170 assert task.stats.total_connections == 0
171 assert task.stats.active_connections == 0