"""Tests for tunnel task lifecycle — open, close, stats, connections.""" import asyncio from unittest.mock import AsyncMock, MagicMock import pytest from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType from i2p_apps.i2ptunnel.tasks import ( ClientTunnelTask, ServerTunnelTask, TunnelStats, TunnelTask, ) def _make_config( name="test", tunnel_type=TunnelType.HTTPCLIENT, listen_port=0, target_port=0, **kwargs, ): return TunnelDefinition( name=name, type=tunnel_type, listen_port=listen_port, target_port=target_port, interface="127.0.0.1", **kwargs, ) class ConcreteClient(ClientTunnelTask): """Concrete subclass for testing.""" def __init__(self, config, session, handler=None): super().__init__(config, session) self._handler = handler or (lambda r, w: asyncio.sleep(0)) async def handle_client(self, reader, writer): await self._handler(reader, writer) class TestTunnelStats: def test_defaults(self): s = TunnelStats() assert s.total_connections == 0 assert s.active_connections == 0 assert s.total_bytes == 0 def test_mutable(self): s = TunnelStats() s.total_connections = 5 s.total_bytes = 1024 assert s.total_connections == 5 assert s.total_bytes == 1024 class TestClientTunnelTask: @pytest.mark.asyncio async def test_open_and_close(self): config = _make_config(listen_port=0) session = MagicMock() task = ConcreteClient(config, session) assert not task.is_open await task.open() assert task.is_open assert task._server is not None await task.close() assert not task.is_open assert task._server is None @pytest.mark.asyncio async def test_close_idempotent(self): config = _make_config(listen_port=0) session = MagicMock() task = ConcreteClient(config, session) await task.close() # no crash when not opened @pytest.mark.asyncio async def test_tunnel_type(self): config = _make_config(tunnel_type=TunnelType.HTTPCLIENT) task = ConcreteClient(config, MagicMock()) assert task.tunnel_type == TunnelType.HTTPCLIENT @pytest.mark.asyncio async def test_stats_initial(self): config = _make_config() task = ConcreteClient(config, MagicMock()) assert task.stats.total_connections == 0 @pytest.mark.asyncio async def test_accepts_connections(self): """Open server, connect to it, verify stats.""" handled = asyncio.Event() async def handler(reader, writer): handled.set() config = _make_config(listen_port=0) task = ConcreteClient(config, MagicMock(), handler=handler) await task.open() port = task._server.sockets[0].getsockname()[1] reader, writer = await asyncio.open_connection("127.0.0.1", port) writer.write(b"hello") await writer.drain() await asyncio.wait_for(handled.wait(), timeout=5.0) writer.close() await writer.wait_closed() await asyncio.sleep(0.1) # let stats update await task.close() assert task.stats.total_connections >= 1 @pytest.mark.asyncio async def test_handler_error_doesnt_crash(self): """Handler exceptions are caught, connection closed.""" async def bad_handler(reader, writer): raise ValueError("boom") config = _make_config(listen_port=0) task = ConcreteClient(config, MagicMock(), handler=bad_handler) await task.open() port = task._server.sockets[0].getsockname()[1] reader, writer = await asyncio.open_connection("127.0.0.1", port) writer.write(b"data") await writer.drain() await asyncio.sleep(0.2) await task.close() # Should not crash, stats should reflect the connection assert task.stats.total_connections >= 1 assert task.stats.active_connections == 0 class TestServerTunnelTask: @pytest.mark.asyncio async def test_open_starts_accept_task(self): config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080) session = MagicMock() # Make accept block forever session.accept = AsyncMock(side_effect=asyncio.CancelledError) task = ServerTunnelTask(config, session) await task.open() assert task.is_open assert task._accept_task is not None await task.close() assert not task.is_open assert task._accept_task is None @pytest.mark.asyncio async def test_close_without_open(self): config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080) task = ServerTunnelTask(config, MagicMock()) await task.close() # no crash @pytest.mark.asyncio async def test_stats_initial(self): config = _make_config(tunnel_type=TunnelType.SERVER, target_host="127.0.0.1", target_port=8080) task = ServerTunnelTask(config, MagicMock()) assert task.stats.total_connections == 0 assert task.stats.active_connections == 0