audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 6c01fe4611adff6885e55e468f55b083132f52fb 200 lines 6.9 kB view raw
1"""tests for background task scheduling.""" 2 3import asyncio 4import contextlib 5from unittest.mock import AsyncMock, MagicMock, patch 6 7import backend._internal.background_tasks as bg_tasks 8 9 10async def test_schedule_export_uses_docket() -> None: 11 """schedule_export should add task to docket.""" 12 calls: list[tuple[str, str]] = [] 13 14 async def mock_schedule(export_id: str, artist_did: str) -> None: 15 calls.append((export_id, artist_did)) 16 17 mock_docket = MagicMock() 18 mock_docket.add = MagicMock(return_value=mock_schedule) 19 20 with ( 21 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 22 patch.object(bg_tasks, "process_export", MagicMock()), 23 ): 24 await bg_tasks.schedule_export("export-123", "did:plc:testuser") 25 26 mock_docket.add.assert_called_once() 27 assert calls == [("export-123", "did:plc:testuser")] 28 29 30async def test_schedule_copyright_scan_uses_docket() -> None: 31 """schedule_copyright_scan should add task to docket.""" 32 calls: list[tuple[int, str]] = [] 33 34 async def mock_schedule(track_id: int, audio_url: str) -> None: 35 calls.append((track_id, audio_url)) 36 37 mock_docket = MagicMock() 38 mock_docket.add = MagicMock(return_value=mock_schedule) 39 40 with ( 41 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 42 patch.object(bg_tasks, "scan_copyright", MagicMock()), 43 ): 44 await bg_tasks.schedule_copyright_scan(123, "https://example.com/audio.mp3") 45 46 mock_docket.add.assert_called_once() 47 assert calls == [(123, "https://example.com/audio.mp3")] 48 49 50async def test_schedule_atproto_sync_uses_docket() -> None: 51 """schedule_atproto_sync should add task to docket.""" 52 calls: list[tuple[str, str]] = [] 53 54 async def mock_schedule(session_id: str, user_did: str) -> None: 55 calls.append((session_id, user_did)) 56 57 mock_docket = MagicMock() 58 mock_docket.add = MagicMock(return_value=mock_schedule) 59 60 with ( 61 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 62 patch.object(bg_tasks, "sync_atproto", MagicMock()), 63 ): 64 await bg_tasks.schedule_atproto_sync("session-abc", "did:plc:testuser") 65 66 mock_docket.add.assert_called_once() 67 assert calls == [("session-abc", "did:plc:testuser")] 68 69 70async def test_schedule_teal_scrobble_uses_docket() -> None: 71 """schedule_teal_scrobble should add task to docket.""" 72 calls: list[tuple] = [] 73 74 async def mock_schedule( 75 session_id: str, 76 track_id: int, 77 track_title: str, 78 artist_name: str, 79 duration: int | None, 80 album_name: str | None, 81 ) -> None: 82 calls.append( 83 (session_id, track_id, track_title, artist_name, duration, album_name) 84 ) 85 86 mock_docket = MagicMock() 87 mock_docket.add = MagicMock(return_value=mock_schedule) 88 89 with ( 90 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 91 patch.object(bg_tasks, "scrobble_to_teal", MagicMock()), 92 ): 93 await bg_tasks.schedule_teal_scrobble( 94 session_id="session-xyz", 95 track_id=42, 96 track_title="Test Track", 97 artist_name="Test Artist", 98 duration=180, 99 album_name="Test Album", 100 ) 101 102 mock_docket.add.assert_called_once() 103 assert calls == [ 104 ("session-xyz", 42, "Test Track", "Test Artist", 180, "Test Album") 105 ] 106 107 108async def test_process_export_downloads_concurrently() -> None: 109 """process_export should download tracks concurrently, not sequentially. 110 111 regression test: previously tracks were downloaded one at a time, 112 making exports slow for users with many tracks or large files. 113 """ 114 download_times: list[float] = [] 115 download_start_event = asyncio.Event() 116 117 async def mock_get_object(Bucket: str, Key: str) -> dict: 118 """track when downloads start and simulate network delay.""" 119 download_times.append(asyncio.get_event_loop().time()) 120 # signal that at least one download has started 121 download_start_event.set() 122 # simulate network delay 123 await asyncio.sleep(0.1) 124 # return mock response with async body 125 body = AsyncMock() 126 body.iter_chunks = lambda: async_chunk_gen() 127 return {"Body": body} 128 129 async def async_chunk_gen(): 130 yield b"mock audio data" 131 132 # create mock tracks 133 mock_tracks = [] 134 for i in range(4): 135 track = MagicMock() 136 track.id = i 137 track.title = f"Track {i}" 138 track.file_id = f"file_{i}" 139 track.file_type = "mp3" 140 mock_tracks.append(track) 141 142 # mock database query 143 mock_result = MagicMock() 144 mock_result.scalars.return_value.all.return_value = mock_tracks 145 146 mock_db = AsyncMock() 147 mock_db.execute.return_value = mock_result 148 149 # mock S3 client 150 mock_s3 = AsyncMock() 151 mock_s3.get_object = mock_get_object 152 153 # mock session that returns mock s3 client 154 mock_session = MagicMock() 155 mock_session.client.return_value.__aenter__.return_value = mock_s3 156 157 # mock job service 158 mock_job_service = AsyncMock() 159 160 # mock aiofiles.open to be a no-op 161 mock_file = AsyncMock() 162 mock_file.__aenter__.return_value = mock_file 163 mock_file.__aexit__.return_value = None 164 mock_file.write = AsyncMock() 165 166 with ( 167 patch( 168 "backend._internal.background_tasks.aioboto3.Session", 169 return_value=mock_session, 170 ), 171 patch( 172 "backend._internal.background_tasks.aiofiles.open", return_value=mock_file 173 ), 174 patch("backend._internal.background_tasks.zipfile.ZipFile"), 175 patch("backend._internal.background_tasks.os.unlink"), 176 patch("backend.utilities.database.db_session") as mock_db_session, 177 patch("backend._internal.jobs.job_service", mock_job_service), 178 ): 179 mock_db_session.return_value.__aenter__.return_value = mock_db 180 181 # run process_export but cancel before upload phase 182 # (we only care about testing download concurrency) 183 with contextlib.suppress(TimeoutError): 184 await asyncio.wait_for( 185 bg_tasks.process_export("export-123", "did:plc:testuser"), 186 timeout=2.0, 187 ) 188 189 # verify downloads started concurrently: 190 # if sequential, each download would start ~0.1s after the previous 191 # if concurrent, all 4 downloads should start within ~0.05s of each other 192 assert len(download_times) == 4, f"expected 4 downloads, got {len(download_times)}" 193 194 # check that all downloads started within a small time window (concurrent) 195 # not spread out over 0.4s (sequential) 196 time_spread = max(download_times) - min(download_times) 197 assert time_spread < 0.1, ( 198 f"downloads should start concurrently (within 0.1s), " 199 f"but time spread was {time_spread:.3f}s - likely still sequential" 200 )