"""Tests for RouterWatchdog lifecycle and health checks.""" import asyncio from unittest.mock import MagicMock import pytest from i2p_router.watchdog import RouterWatchdog def _make_router(state="running", uptime=100.0, jq=None, tm=None): router = MagicMock() router.state = state router.uptime_seconds = uptime ctx = MagicMock() ctx.job_queue = jq ctx.transport_manager = tm router._context = ctx return router class TestWatchdogProperties: def test_initial_state(self): wd = RouterWatchdog(_make_router()) assert wd.consecutive_errors == 0 assert not wd.is_running def test_constants(self): assert RouterWatchdog.CHECK_INTERVAL_SECONDS == 60 assert RouterWatchdog.MAX_CONSECUTIVE_ERRORS == 20 class TestJobQueueLiveness: def test_no_context(self): router = MagicMock(spec=[]) wd = RouterWatchdog(router) assert wd._verify_job_queue_liveness() is False def test_no_job_queue(self): router = _make_router(jq=None) router._context.job_queue = None wd = RouterWatchdog(router) assert wd._verify_job_queue_liveness() is True def test_never_run(self): jq = MagicMock() jq.last_job_run_time = 0 wd = RouterWatchdog(_make_router(jq=jq)) assert wd._verify_job_queue_liveness() is True def test_recent_run(self): import time jq = MagicMock() jq.last_job_run_time = time.monotonic() - 10 # 10s ago wd = RouterWatchdog(_make_router(jq=jq)) assert wd._verify_job_queue_liveness() is True def test_stale_run(self): import time jq = MagicMock() jq.last_job_run_time = time.monotonic() - 1000 # way old wd = RouterWatchdog(_make_router(jq=jq)) assert wd._verify_job_queue_liveness() is False class TestTransportLiveness: def test_no_context(self): router = MagicMock(spec=[]) wd = RouterWatchdog(router) assert wd._verify_transport_liveness() is False def test_no_transport_manager(self): router = _make_router(tm=None) router._context.transport_manager = None wd = RouterWatchdog(router) assert wd._verify_transport_liveness() is True def test_transport_running(self): tm = MagicMock() tm.is_running = True wd = RouterWatchdog(_make_router(tm=tm)) assert wd._verify_transport_liveness() is True def test_transport_stopped(self): tm = MagicMock() tm.is_running = False wd = RouterWatchdog(_make_router(tm=tm)) assert wd._verify_transport_liveness() is False class TestDumpStatus: def test_includes_basic_info(self): wd = RouterWatchdog(_make_router(state="running", uptime=500.0)) status = wd.dump_status() assert "RouterWatchdog Status" in status assert "running" in status assert "500" in status def test_includes_job_queue_info(self): jq = MagicMock() jq.pending_count = 42 wd = RouterWatchdog(_make_router(jq=jq)) status = wd.dump_status() assert "42" in status def test_includes_transport_info(self): tm = MagicMock() tm.is_running = True wd = RouterWatchdog(_make_router(tm=tm)) status = wd.dump_status() assert "True" in status class TestWatchdogLoop: @pytest.mark.asyncio async def test_run_and_stop(self): wd = RouterWatchdog(_make_router()) wd.CHECK_INTERVAL_SECONDS = 0.05 task = asyncio.create_task(wd.run()) await asyncio.sleep(0.15) wd.stop() await asyncio.wait_for(task, timeout=2.0) assert not wd.is_running @pytest.mark.asyncio async def test_consecutive_errors_reset_on_success(self): import time # Use a property that always returns current time jq = MagicMock() type(jq).last_job_run_time = property(lambda self: time.monotonic()) tm = MagicMock() tm.is_running = True wd = RouterWatchdog(_make_router(jq=jq, tm=tm)) wd.CHECK_INTERVAL_SECONDS = 0.05 task = asyncio.create_task(wd.run()) await asyncio.sleep(0.15) wd.stop() await asyncio.wait_for(task, timeout=2.0) assert wd.consecutive_errors == 0 @pytest.mark.asyncio async def test_errors_accumulate(self): router = MagicMock(spec=[]) # no _context → both checks fail wd = RouterWatchdog(router) wd.CHECK_INTERVAL_SECONDS = 0.05 task = asyncio.create_task(wd.run()) await asyncio.sleep(0.2) wd.stop() await asyncio.wait_for(task, timeout=2.0) assert wd.consecutive_errors > 0