1"""tests for queue service LISTEN/NOTIFY functionality.""" 2 3import asyncio 4import contextlib 5from unittest import mock 6 7import asyncpg 8import pytest 9 10from backend._internal.queue import QueueService 11 12 13@pytest.fixture 14def queue_service(): 15 """create a queue service instance for testing.""" 16 return QueueService() 17 18 19async def test_notify_with_timeout_prevents_hang(queue_service: QueueService): 20 """test that NOTIFY operations timeout instead of hanging forever.""" 21 # create a mock connection that hangs on execute 22 mock_conn = mock.AsyncMock(spec=asyncpg.Connection) 23 mock_conn.is_closed.return_value = False 24 25 async def slow_execute(*args, **kwargs): 26 # simulate zombie connection that never responds 27 await asyncio.sleep(999) 28 29 mock_conn.execute = slow_execute 30 31 queue_service.conn = mock_conn 32 33 # NOTIFY should timeout in 1 second, not hang for 999 seconds 34 start = asyncio.get_event_loop().time() 35 await queue_service._notify_change("did:plc:test") 36 elapsed = asyncio.get_event_loop().time() - start 37 38 # should complete quickly due to timeout 39 assert elapsed < 2.0, f"notify took {elapsed}s, should timeout in ~1s" 40 41 # connection should be marked as dead 42 assert queue_service.conn is None 43 44 45async def test_heartbeat_detects_zombie_connection(): 46 """test that heartbeat proactively detects dead connections.""" 47 # create service with short timeout for testing 48 queue_service = QueueService(heartbeat_interval=0.1, heartbeat_timeout=0.1) 49 50 # create a mock connection that times out on execute 51 mock_conn = mock.AsyncMock(spec=asyncpg.Connection) 52 mock_conn.is_closed.return_value = False 53 54 async def timeout_execute(*args, **kwargs): 55 await asyncio.sleep(10) # longer than heartbeat timeout 56 57 mock_conn.execute = timeout_execute 58 queue_service.conn = mock_conn 59 60 # start heartbeat loop 61 heartbeat_task = asyncio.create_task(queue_service._heartbeat_loop()) 62 63 # wait for timeout to trigger 64 await asyncio.sleep(0.3) 65 66 # cancel the heartbeat task 67 heartbeat_task.cancel() 68 with contextlib.suppress(asyncio.CancelledError): 69 await heartbeat_task 70 71 # connection should be marked as dead after heartbeat timeout 72 assert queue_service.conn is None 73 74 75async def test_notify_handles_closed_connection_gracefully(queue_service: QueueService): 76 """test that NOTIFY handles already-closed connections gracefully.""" 77 mock_conn = mock.AsyncMock(spec=asyncpg.Connection) 78 mock_conn.is_closed.return_value = True 79 80 queue_service.conn = mock_conn 81 82 # should return early without attempting NOTIFY 83 await queue_service._notify_change("did:plc:test") 84 85 # execute should not have been called 86 mock_conn.execute.assert_not_called() 87 88 89async def test_notify_handles_none_connection_gracefully(queue_service: QueueService): 90 """test that NOTIFY handles None connection gracefully.""" 91 queue_service.conn = None 92 93 # should not raise 94 await queue_service._notify_change("did:plc:test")