1"""tests for background task scheduling.""" 2 3import asyncio 4import contextlib 5from unittest.mock import AsyncMock, MagicMock, patch 6 7import backend._internal.background_tasks as bg_tasks 8 9 10async def test_schedule_export_uses_docket() -> None: 11 """schedule_export should add task to docket.""" 12 calls: list[tuple[str, str]] = [] 13 14 async def mock_schedule(export_id: str, artist_did: str) -> None: 15 calls.append((export_id, artist_did)) 16 17 mock_docket = MagicMock() 18 mock_docket.add = MagicMock(return_value=mock_schedule) 19 20 with ( 21 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 22 patch.object(bg_tasks, "process_export", MagicMock()), 23 ): 24 await bg_tasks.schedule_export("export-123", "did:plc:testuser") 25 26 mock_docket.add.assert_called_once() 27 assert calls == [("export-123", "did:plc:testuser")] 28 29 30async def test_schedule_copyright_scan_uses_docket() -> None: 31 """schedule_copyright_scan should add task to docket.""" 32 calls: list[tuple[int, str]] = [] 33 34 async def mock_schedule(track_id: int, audio_url: str) -> None: 35 calls.append((track_id, audio_url)) 36 37 mock_docket = MagicMock() 38 mock_docket.add = MagicMock(return_value=mock_schedule) 39 40 with ( 41 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 42 patch.object(bg_tasks, "scan_copyright", MagicMock()), 43 ): 44 await bg_tasks.schedule_copyright_scan(123, "https://example.com/audio.mp3") 45 46 mock_docket.add.assert_called_once() 47 assert calls == [(123, "https://example.com/audio.mp3")] 48 49 50async def test_schedule_atproto_sync_uses_docket() -> None: 51 """schedule_atproto_sync should add task to docket.""" 52 calls: list[tuple[str, str]] = [] 53 54 async def mock_schedule(session_id: str, user_did: str) -> None: 55 calls.append((session_id, user_did)) 56 57 mock_docket = MagicMock() 58 mock_docket.add = MagicMock(return_value=mock_schedule) 59 60 with ( 61 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 62 patch.object(bg_tasks, "sync_atproto", MagicMock()), 63 ): 64 await bg_tasks.schedule_atproto_sync("session-abc", "did:plc:testuser") 65 66 mock_docket.add.assert_called_once() 67 assert calls == [("session-abc", "did:plc:testuser")] 68 69 70async def test_schedule_teal_scrobble_uses_docket() -> None: 71 """schedule_teal_scrobble should add task to docket.""" 72 calls: list[tuple] = [] 73 74 async def mock_schedule( 75 session_id: str, 76 track_id: int, 77 track_title: str, 78 artist_name: str, 79 duration: int | None, 80 album_name: str | None, 81 ) -> None: 82 calls.append( 83 (session_id, track_id, track_title, artist_name, duration, album_name) 84 ) 85 86 mock_docket = MagicMock() 87 mock_docket.add = MagicMock(return_value=mock_schedule) 88 89 with ( 90 patch.object(bg_tasks, "get_docket", return_value=mock_docket), 91 patch.object(bg_tasks, "scrobble_to_teal", MagicMock()), 92 ): 93 await bg_tasks.schedule_teal_scrobble( 94 session_id="session-xyz", 95 track_id=42, 96 track_title="Test Track", 97 artist_name="Test Artist", 98 duration=180, 99 album_name="Test Album", 100 ) 101 102 mock_docket.add.assert_called_once() 103 assert calls == [ 104 ("session-xyz", 42, "Test Track", "Test Artist", 180, "Test Album") 105 ] 106 107 108async def test_process_export_downloads_concurrently() -> None: 109 """process_export should download tracks concurrently, not sequentially. 110 111 regression test: previously tracks were downloaded one at a time, 112 making exports slow for users with many tracks or large files. 113 """ 114 download_times: list[float] = [] 115 download_start_event = asyncio.Event() 116 117 async def mock_get_object(Bucket: str, Key: str) -> dict: 118 """track when downloads start and simulate network delay.""" 119 download_times.append(asyncio.get_event_loop().time()) 120 # signal that at least one download has started 121 download_start_event.set() 122 # simulate network delay 123 await asyncio.sleep(0.1) 124 # return mock response with async body 125 body = AsyncMock() 126 body.iter_chunks = lambda: async_chunk_gen() 127 return {"Body": body} 128 129 async def async_chunk_gen(): 130 yield b"mock audio data" 131 132 # create mock tracks 133 mock_tracks = [] 134 for i in range(4): 135 track = MagicMock() 136 track.id = i 137 track.title = f"Track {i}" 138 track.file_id = f"file_{i}" 139 track.file_type = "mp3" 140 mock_tracks.append(track) 141 142 # mock database query 143 mock_result = MagicMock() 144 mock_result.scalars.return_value.all.return_value = mock_tracks 145 146 mock_db = AsyncMock() 147 mock_db.execute.return_value = mock_result 148 149 # mock S3 client 150 mock_s3 = AsyncMock() 151 mock_s3.get_object = mock_get_object 152 153 # mock session that returns mock s3 client 154 mock_session = MagicMock() 155 mock_session.client.return_value.__aenter__.return_value = mock_s3 156 157 # mock job service 158 mock_job_service = AsyncMock() 159 160 # mock aiofiles.open to be a no-op 161 mock_file = AsyncMock() 162 mock_file.__aenter__.return_value = mock_file 163 mock_file.__aexit__.return_value = None 164 mock_file.write = AsyncMock() 165 166 with ( 167 patch( 168 "backend._internal.background_tasks.aioboto3.Session", 169 return_value=mock_session, 170 ), 171 patch( 172 "backend._internal.background_tasks.aiofiles.open", return_value=mock_file 173 ), 174 patch("backend._internal.background_tasks.zipfile.ZipFile"), 175 patch("backend._internal.background_tasks.os.unlink"), 176 patch("backend.utilities.database.db_session") as mock_db_session, 177 patch("backend._internal.jobs.job_service", mock_job_service), 178 ): 179 mock_db_session.return_value.__aenter__.return_value = mock_db 180 181 # run process_export but cancel before upload phase 182 # (we only care about testing download concurrency) 183 with contextlib.suppress(TimeoutError): 184 await asyncio.wait_for( 185 bg_tasks.process_export("export-123", "did:plc:testuser"), 186 timeout=2.0, 187 ) 188 189 # verify downloads started concurrently: 190 # if sequential, each download would start ~0.1s after the previous 191 # if concurrent, all 4 downloads should start within ~0.05s of each other 192 assert len(download_times) == 4, f"expected 4 downloads, got {len(download_times)}" 193 194 # check that all downloads started within a small time window (concurrent) 195 # not spread out over 0.4s (sequential) 196 time_spread = max(download_times) - min(download_times) 197 assert time_spread < 0.1, ( 198 f"downloads should start concurrently (within 0.1s), " 199 f"but time spread was {time_spread:.3f}s - likely still sequential" 200 )