"""Tests for ServerTunnelTask — accept loop, connection handling.""" import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from i2p_apps.i2ptunnel.tasks import TunnelStats, TunnelTask, ServerTunnelTask def _make_config(): config = MagicMock() config.name = "test-server" config.target_host = "127.0.0.1" config.target_port = 8080 config.type = MagicMock() return config class TestTunnelStats: def test_defaults(self): s = TunnelStats() assert s.total_connections == 0 assert s.active_connections == 0 assert s.total_bytes == 0 class TestServerTunnelTaskLifecycle: @pytest.mark.asyncio async def test_open_sets_open(self): config = _make_config() session = AsyncMock() # accept will block; cancel it quickly session.accept = AsyncMock(side_effect=asyncio.CancelledError) task = ServerTunnelTask(config, session) await task.open() assert task.is_open is True await task.close() assert task.is_open is False @pytest.mark.asyncio async def test_close_without_open(self): config = _make_config() session = AsyncMock() task = ServerTunnelTask(config, session) await task.close() assert task.is_open is False class TestServerAcceptLoop: @pytest.mark.asyncio async def test_accept_and_handle(self): config = _make_config() session = AsyncMock() i2p_reader = AsyncMock() i2p_writer = MagicMock() i2p_writer.close = MagicMock() i2p_writer.wait_closed = AsyncMock() call_count = 0 async def mock_accept(): nonlocal call_count call_count += 1 if call_count > 1: raise asyncio.CancelledError return ("remote-dest", i2p_reader, i2p_writer) session.accept = mock_accept task = ServerTunnelTask(config, session) with patch("i2p_apps.i2ptunnel.tasks.asyncio.open_connection", new_callable=AsyncMock) as mock_conn: local_reader = AsyncMock() local_writer = MagicMock() local_writer.close = MagicMock() local_writer.wait_closed = AsyncMock() mock_conn.return_value = (local_reader, local_writer) with patch("i2p_apps.i2ptunnel.tasks.bridge", new_callable=AsyncMock): await task.open() # Give the accept loop time to run await asyncio.sleep(0.1) await task.close() assert task.stats.total_connections >= 1 @pytest.mark.asyncio async def test_accept_connection_refused(self): config = _make_config() session = AsyncMock() i2p_reader = AsyncMock() i2p_writer = MagicMock() i2p_writer.close = MagicMock() call_count = 0 async def mock_accept(): nonlocal call_count call_count += 1 if call_count > 1: raise asyncio.CancelledError return ("remote-dest", i2p_reader, i2p_writer) session.accept = mock_accept task = ServerTunnelTask(config, session) with patch("i2p_apps.i2ptunnel.tasks.asyncio.open_connection", new_callable=AsyncMock) as mock_conn: mock_conn.side_effect = ConnectionRefusedError("refused") await task.open() await asyncio.sleep(0.1) await task.close() @pytest.mark.asyncio async def test_accept_loop_error_recovery(self): config = _make_config() session = AsyncMock() call_count = 0 async def mock_accept(): nonlocal call_count call_count += 1 if call_count == 1: raise RuntimeError("transient error") raise asyncio.CancelledError session.accept = mock_accept task = ServerTunnelTask(config, session) await task.open() await asyncio.sleep(1.5) # Must exceed the 1.0s sleep in error handler await task.close() class TestServerHandleIncoming: @pytest.mark.asyncio async def test_handle_incoming_calls_bridge(self): config = _make_config() session = AsyncMock() task = ServerTunnelTask(config, session) i2p_reader = AsyncMock() i2p_writer = MagicMock() i2p_writer.close = MagicMock() local_reader = AsyncMock() local_writer = MagicMock() local_writer.close = MagicMock() with patch("i2p_apps.i2ptunnel.tasks.bridge", new_callable=AsyncMock) as mock_bridge: await task.handle_incoming( "remote-dest", i2p_reader, i2p_writer, local_reader, local_writer ) mock_bridge.assert_called_once()