A Python port of the Invisible Internet Project (I2P)
at main 154 lines 4.8 kB view raw
1"""Tests for ServerTunnelTask — accept loop, connection handling.""" 2 3import asyncio 4from unittest.mock import AsyncMock, MagicMock, patch 5 6import pytest 7 8from i2p_apps.i2ptunnel.tasks import TunnelStats, TunnelTask, ServerTunnelTask 9 10 11def _make_config(): 12 config = MagicMock() 13 config.name = "test-server" 14 config.target_host = "127.0.0.1" 15 config.target_port = 8080 16 config.type = MagicMock() 17 return config 18 19 20class TestTunnelStats: 21 def test_defaults(self): 22 s = TunnelStats() 23 assert s.total_connections == 0 24 assert s.active_connections == 0 25 assert s.total_bytes == 0 26 27 28class TestServerTunnelTaskLifecycle: 29 @pytest.mark.asyncio 30 async def test_open_sets_open(self): 31 config = _make_config() 32 session = AsyncMock() 33 # accept will block; cancel it quickly 34 session.accept = AsyncMock(side_effect=asyncio.CancelledError) 35 task = ServerTunnelTask(config, session) 36 await task.open() 37 assert task.is_open is True 38 await task.close() 39 assert task.is_open is False 40 41 @pytest.mark.asyncio 42 async def test_close_without_open(self): 43 config = _make_config() 44 session = AsyncMock() 45 task = ServerTunnelTask(config, session) 46 await task.close() 47 assert task.is_open is False 48 49 50class TestServerAcceptLoop: 51 @pytest.mark.asyncio 52 async def test_accept_and_handle(self): 53 config = _make_config() 54 session = AsyncMock() 55 56 i2p_reader = AsyncMock() 57 i2p_writer = MagicMock() 58 i2p_writer.close = MagicMock() 59 i2p_writer.wait_closed = AsyncMock() 60 61 call_count = 0 62 63 async def mock_accept(): 64 nonlocal call_count 65 call_count += 1 66 if call_count > 1: 67 raise asyncio.CancelledError 68 return ("remote-dest", i2p_reader, i2p_writer) 69 70 session.accept = mock_accept 71 task = ServerTunnelTask(config, session) 72 73 with patch("i2p_apps.i2ptunnel.tasks.asyncio.open_connection", new_callable=AsyncMock) as mock_conn: 74 local_reader = AsyncMock() 75 local_writer = MagicMock() 76 local_writer.close = MagicMock() 77 local_writer.wait_closed = AsyncMock() 78 mock_conn.return_value = (local_reader, local_writer) 79 80 with patch("i2p_apps.i2ptunnel.tasks.bridge", new_callable=AsyncMock): 81 await task.open() 82 # Give the accept loop time to run 83 await asyncio.sleep(0.1) 84 await task.close() 85 86 assert task.stats.total_connections >= 1 87 88 @pytest.mark.asyncio 89 async def test_accept_connection_refused(self): 90 config = _make_config() 91 session = AsyncMock() 92 93 i2p_reader = AsyncMock() 94 i2p_writer = MagicMock() 95 i2p_writer.close = MagicMock() 96 97 call_count = 0 98 99 async def mock_accept(): 100 nonlocal call_count 101 call_count += 1 102 if call_count > 1: 103 raise asyncio.CancelledError 104 return ("remote-dest", i2p_reader, i2p_writer) 105 106 session.accept = mock_accept 107 task = ServerTunnelTask(config, session) 108 109 with patch("i2p_apps.i2ptunnel.tasks.asyncio.open_connection", new_callable=AsyncMock) as mock_conn: 110 mock_conn.side_effect = ConnectionRefusedError("refused") 111 await task.open() 112 await asyncio.sleep(0.1) 113 await task.close() 114 115 @pytest.mark.asyncio 116 async def test_accept_loop_error_recovery(self): 117 config = _make_config() 118 session = AsyncMock() 119 120 call_count = 0 121 122 async def mock_accept(): 123 nonlocal call_count 124 call_count += 1 125 if call_count == 1: 126 raise RuntimeError("transient error") 127 raise asyncio.CancelledError 128 129 session.accept = mock_accept 130 task = ServerTunnelTask(config, session) 131 await task.open() 132 await asyncio.sleep(1.5) # Must exceed the 1.0s sleep in error handler 133 await task.close() 134 135 136class TestServerHandleIncoming: 137 @pytest.mark.asyncio 138 async def test_handle_incoming_calls_bridge(self): 139 config = _make_config() 140 session = AsyncMock() 141 task = ServerTunnelTask(config, session) 142 143 i2p_reader = AsyncMock() 144 i2p_writer = MagicMock() 145 i2p_writer.close = MagicMock() 146 local_reader = AsyncMock() 147 local_writer = MagicMock() 148 local_writer.close = MagicMock() 149 150 with patch("i2p_apps.i2ptunnel.tasks.bridge", new_callable=AsyncMock) as mock_bridge: 151 await task.handle_incoming( 152 "remote-dest", i2p_reader, i2p_writer, local_reader, local_writer 153 ) 154 mock_bridge.assert_called_once()