A Python port of the Invisible Internet Project (I2P)
at main 171 lines 5.3 kB view raw
1"""Tests for tunnel task lifecycle — open, close, stats, connections.""" 2 3import asyncio 4from unittest.mock import AsyncMock, MagicMock 5 6import pytest 7 8from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType 9from i2p_apps.i2ptunnel.tasks import ( 10 ClientTunnelTask, 11 ServerTunnelTask, 12 TunnelStats, 13 TunnelTask, 14) 15 16 17def _make_config( 18 name="test", 19 tunnel_type=TunnelType.HTTPCLIENT, 20 listen_port=0, 21 target_port=0, 22 **kwargs, 23): 24 return TunnelDefinition( 25 name=name, 26 type=tunnel_type, 27 listen_port=listen_port, 28 target_port=target_port, 29 interface="127.0.0.1", 30 **kwargs, 31 ) 32 33 34class ConcreteClient(ClientTunnelTask): 35 """Concrete subclass for testing.""" 36 def __init__(self, config, session, handler=None): 37 super().__init__(config, session) 38 self._handler = handler or (lambda r, w: asyncio.sleep(0)) 39 40 async def handle_client(self, reader, writer): 41 await self._handler(reader, writer) 42 43 44class TestTunnelStats: 45 def test_defaults(self): 46 s = TunnelStats() 47 assert s.total_connections == 0 48 assert s.active_connections == 0 49 assert s.total_bytes == 0 50 51 def test_mutable(self): 52 s = TunnelStats() 53 s.total_connections = 5 54 s.total_bytes = 1024 55 assert s.total_connections == 5 56 assert s.total_bytes == 1024 57 58 59class TestClientTunnelTask: 60 @pytest.mark.asyncio 61 async def test_open_and_close(self): 62 config = _make_config(listen_port=0) 63 session = MagicMock() 64 task = ConcreteClient(config, session) 65 66 assert not task.is_open 67 await task.open() 68 assert task.is_open 69 assert task._server is not None 70 71 await task.close() 72 assert not task.is_open 73 assert task._server is None 74 75 @pytest.mark.asyncio 76 async def test_close_idempotent(self): 77 config = _make_config(listen_port=0) 78 session = MagicMock() 79 task = ConcreteClient(config, session) 80 await task.close() # no crash when not opened 81 82 @pytest.mark.asyncio 83 async def test_tunnel_type(self): 84 config = _make_config(tunnel_type=TunnelType.HTTPCLIENT) 85 task = ConcreteClient(config, MagicMock()) 86 assert task.tunnel_type == TunnelType.HTTPCLIENT 87 88 @pytest.mark.asyncio 89 async def test_stats_initial(self): 90 config = _make_config() 91 task = ConcreteClient(config, MagicMock()) 92 assert task.stats.total_connections == 0 93 94 @pytest.mark.asyncio 95 async def test_accepts_connections(self): 96 """Open server, connect to it, verify stats.""" 97 handled = asyncio.Event() 98 99 async def handler(reader, writer): 100 handled.set() 101 102 config = _make_config(listen_port=0) 103 task = ConcreteClient(config, MagicMock(), handler=handler) 104 await task.open() 105 106 port = task._server.sockets[0].getsockname()[1] 107 reader, writer = await asyncio.open_connection("127.0.0.1", port) 108 writer.write(b"hello") 109 await writer.drain() 110 111 await asyncio.wait_for(handled.wait(), timeout=5.0) 112 113 writer.close() 114 await writer.wait_closed() 115 await asyncio.sleep(0.1) # let stats update 116 await task.close() 117 118 assert task.stats.total_connections >= 1 119 120 @pytest.mark.asyncio 121 async def test_handler_error_doesnt_crash(self): 122 """Handler exceptions are caught, connection closed.""" 123 async def bad_handler(reader, writer): 124 raise ValueError("boom") 125 126 config = _make_config(listen_port=0) 127 task = ConcreteClient(config, MagicMock(), handler=bad_handler) 128 await task.open() 129 130 port = task._server.sockets[0].getsockname()[1] 131 reader, writer = await asyncio.open_connection("127.0.0.1", port) 132 writer.write(b"data") 133 await writer.drain() 134 135 await asyncio.sleep(0.2) 136 await task.close() 137 138 # Should not crash, stats should reflect the connection 139 assert task.stats.total_connections >= 1 140 assert task.stats.active_connections == 0 141 142 143class TestServerTunnelTask: 144 @pytest.mark.asyncio 145 async def test_open_starts_accept_task(self): 146 config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080) 147 session = MagicMock() 148 # Make accept block forever 149 session.accept = AsyncMock(side_effect=asyncio.CancelledError) 150 151 task = ServerTunnelTask(config, session) 152 await task.open() 153 assert task.is_open 154 assert task._accept_task is not None 155 156 await task.close() 157 assert not task.is_open 158 assert task._accept_task is None 159 160 @pytest.mark.asyncio 161 async def test_close_without_open(self): 162 config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080) 163 task = ServerTunnelTask(config, MagicMock()) 164 await task.close() # no crash 165 166 @pytest.mark.asyncio 167 async def test_stats_initial(self): 168 config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080) 169 task = ServerTunnelTask(config, MagicMock()) 170 assert task.stats.total_connections == 0 171 assert task.stats.active_connections == 0