music on atproto
plyr.fm
1"""tests for queue service LISTEN/NOTIFY functionality."""
2
3import asyncio
4import contextlib
5from unittest import mock
6
7import asyncpg
8import pytest
9
10from backend._internal.queue import QueueService
11
12
13@pytest.fixture
14def queue_service():
15 """create a queue service instance for testing."""
16 return QueueService()
17
18
19async def test_notify_with_timeout_prevents_hang(queue_service: QueueService):
20 """test that NOTIFY operations timeout instead of hanging forever."""
21 # create a mock connection that hangs on execute
22 mock_conn = mock.AsyncMock(spec=asyncpg.Connection)
23 mock_conn.is_closed.return_value = False
24
25 async def slow_execute(*args, **kwargs):
26 # simulate zombie connection that never responds
27 await asyncio.sleep(999)
28
29 mock_conn.execute = slow_execute
30
31 queue_service.conn = mock_conn
32
33 # NOTIFY should timeout in 1 second, not hang for 999 seconds
34 start = asyncio.get_event_loop().time()
35 await queue_service._notify_change("did:plc:test")
36 elapsed = asyncio.get_event_loop().time() - start
37
38 # should complete quickly due to timeout
39 assert elapsed < 2.0, f"notify took {elapsed}s, should timeout in ~1s"
40
41 # connection should be marked as dead
42 assert queue_service.conn is None
43
44
45async def test_heartbeat_detects_zombie_connection():
46 """test that heartbeat proactively detects dead connections."""
47 # create service with short timeout for testing
48 queue_service = QueueService(heartbeat_interval=0.1, heartbeat_timeout=0.1)
49
50 # create a mock connection that times out on execute
51 mock_conn = mock.AsyncMock(spec=asyncpg.Connection)
52 mock_conn.is_closed.return_value = False
53
54 async def timeout_execute(*args, **kwargs):
55 await asyncio.sleep(10) # longer than heartbeat timeout
56
57 mock_conn.execute = timeout_execute
58 queue_service.conn = mock_conn
59
60 # start heartbeat loop
61 heartbeat_task = asyncio.create_task(queue_service._heartbeat_loop())
62
63 # wait for timeout to trigger
64 await asyncio.sleep(0.3)
65
66 # cancel the heartbeat task
67 heartbeat_task.cancel()
68 with contextlib.suppress(asyncio.CancelledError):
69 await heartbeat_task
70
71 # connection should be marked as dead after heartbeat timeout
72 assert queue_service.conn is None
73
74
75async def test_notify_handles_closed_connection_gracefully(queue_service: QueueService):
76 """test that NOTIFY handles already-closed connections gracefully."""
77 mock_conn = mock.AsyncMock(spec=asyncpg.Connection)
78 mock_conn.is_closed.return_value = True
79
80 queue_service.conn = mock_conn
81
82 # should return early without attempting NOTIFY
83 await queue_service._notify_change("did:plc:test")
84
85 # execute should not have been called
86 mock_conn.execute.assert_not_called()
87
88
89async def test_notify_handles_none_connection_gracefully(queue_service: QueueService):
90 """test that NOTIFY handles None connection gracefully."""
91 queue_service.conn = None
92
93 # should not raise
94 await queue_service._notify_change("did:plc:test")