feat: parallelize export downloads with asyncio.gather (#545)

previously tracks were downloaded sequentially, making exports slow
for users with many tracks or large files. now downloads happen
concurrently (up to 4 at a time) while zip creation remains sequential
(zipfile isn't thread-safe).

- refactor process_export to use asyncio.gather + semaphore
- add regression test verifying concurrent download behavior

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude <noreply@anthropic.com>

authored by zzstoatzz.io Claude and committed by GitHub 5d53ac11 1cc36aa7

Changed files
+234 -76
backend
src
backend
tests
+136 -75
backend/src/backend/_internal/background_tasks.py
··· 6 requires DOCKET_URL to be set (Redis is always available). 7 """ 8 9 import logging 10 import os 11 import tempfile ··· 44 async def process_export(export_id: str, artist_did: str) -> None: 45 """process a media export in the background. 46 47 - downloads all tracks for the given artist, zips them, and uploads 48 - to R2. progress is tracked via job_service. 49 50 args: 51 export_id: job ID for tracking progress ··· 91 zip_path = temp_path / f"{export_id}.zip" 92 async_session = aioboto3.Session() 93 94 - async with async_session.client( 95 - "s3", 96 - endpoint_url=settings.storage.r2_endpoint_url, 97 - aws_access_key_id=settings.storage.aws_access_key_id, 98 - aws_secret_access_key=settings.storage.aws_secret_access_key, 99 - ) as s3_client: 100 - with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zip_file: 101 - # track counter for duplicate titles 102 - title_counts: dict[str, int] = {} 103 - processed = 0 104 - total = len(tracks) 105 106 - for track in tracks: 107 - if not track.file_id or not track.file_type: 108 - logfire.warn( 109 - "skipping track: missing file_id or file_type", 110 - track_id=track.id, 111 - ) 112 - continue 113 114 - # construct R2 key 115 - key = f"audio/{track.file_id}.{track.file_type}" 116 117 - try: 118 - # update progress (current_track is 1-indexed for display) 119 - current_track = processed + 1 120 - pct = (processed / total) * 100 121 await job_service.update_progress( 122 export_id, 123 JobStatus.PROCESSING, 124 - f"downloading track {current_track} of {total}...", 125 progress_pct=pct, 126 result={ 127 - "processed_count": processed, 128 "total_count": total, 129 - "current_track": current_track, 130 - "current_title": track.title, 131 }, 132 ) 133 134 - # create safe filename 135 - # handle duplicate titles by appending counter 136 - base_filename = f"{track.title}.{track.file_type}" 137 - if base_filename in title_counts: 138 - title_counts[base_filename] += 1 139 - filename = f"{track.title} ({title_counts[base_filename]}).{track.file_type}" 140 - else: 141 - title_counts[base_filename] = 0 142 - filename = base_filename 143 144 - # sanitize filename (remove invalid chars) 145 - filename = "".join( 146 - c 147 - for c in filename 148 - if c.isalnum() or c in (" ", ".", "-", "_", "(", ")") 149 - ) 150 151 - # download file to temp location, streaming to disk 152 - temp_file_path = temp_path / filename 153 - response = await s3_client.get_object( 154 - Bucket=settings.storage.r2_bucket, 155 - Key=key, 156 - ) 157 158 - # stream to disk in chunks to avoid loading into memory 159 - async with aiofiles.open(temp_file_path, "wb") as f: 160 - async for chunk in response["Body"].iter_chunks(): 161 - await f.write(chunk) 162 163 - # add to zip from disk (streams from file, doesn't load into memory) 164 - zip_file.write(temp_file_path, arcname=filename) 165 166 - # remove temp file immediately to free disk space 167 - os.unlink(temp_file_path) 168 169 - processed += 1 170 - logfire.info( 171 - "added track to export: {track_title}", 172 - track_id=track.id, 173 - track_title=track.title, 174 - filename=filename, 175 - ) 176 177 - except Exception as e: 178 - logfire.error( 179 - "failed to add track to export: {track_title}", 180 - track_id=track.id, 181 - track_title=track.title, 182 - error=str(e), 183 - _exc_info=True, 184 - ) 185 - # continue with other tracks instead of failing entire export 186 187 # upload the zip file to R2 (still inside temp directory context) 188 r2_key = f"exports/{export_id}.zip"
··· 6 requires DOCKET_URL to be set (Redis is always available). 7 """ 8 9 + import asyncio 10 import logging 11 import os 12 import tempfile ··· 45 async def process_export(export_id: str, artist_did: str) -> None: 46 """process a media export in the background. 47 48 + downloads all tracks for the given artist concurrently, zips them, 49 + and uploads to R2. progress is tracked via job_service. 50 51 args: 52 export_id: job ID for tracking progress ··· 92 zip_path = temp_path / f"{export_id}.zip" 93 async_session = aioboto3.Session() 94 95 + # prepare track metadata before downloading 96 + title_counts: dict[str, int] = {} 97 + track_info: list[dict] = [] 98 99 + for track in tracks: 100 + if not track.file_id or not track.file_type: 101 + logfire.warn( 102 + "skipping track: missing file_id or file_type", 103 + track_id=track.id, 104 + ) 105 + continue 106 107 + # create safe filename with duplicate handling 108 + base_filename = f"{track.title}.{track.file_type}" 109 + if base_filename in title_counts: 110 + title_counts[base_filename] += 1 111 + filename = f"{track.title} ({title_counts[base_filename]}).{track.file_type}" 112 + else: 113 + title_counts[base_filename] = 0 114 + filename = base_filename 115 116 + # sanitize filename (remove invalid chars) 117 + filename = "".join( 118 + c 119 + for c in filename 120 + if c.isalnum() or c in (" ", ".", "-", "_", "(", ")") 121 + ) 122 + 123 + track_info.append( 124 + { 125 + "track": track, 126 + "key": f"audio/{track.file_id}.{track.file_type}", 127 + "filename": filename, 128 + "temp_path": temp_path / filename, 129 + } 130 + ) 131 + 132 + total = len(track_info) 133 + if total == 0: 134 + await job_service.update_progress( 135 + export_id, 136 + JobStatus.FAILED, 137 + "export failed", 138 + error="no valid tracks found to export", 139 + ) 140 + return 141 + 142 + # download counter for progress updates 143 + downloaded = 0 144 + download_lock = asyncio.Lock() 145 + 146 + async def download_track( 147 + info: dict, 148 + s3_client, 149 + semaphore: asyncio.Semaphore, 150 + ) -> dict | None: 151 + """download a single track, returning info on success or None on failure.""" 152 + nonlocal downloaded 153 + 154 + async with semaphore: 155 + track = info["track"] 156 + try: 157 + response = await s3_client.get_object( 158 + Bucket=settings.storage.r2_bucket, 159 + Key=info["key"], 160 + ) 161 + 162 + # stream to disk in chunks 163 + async with aiofiles.open(info["temp_path"], "wb") as f: 164 + async for chunk in response["Body"].iter_chunks(): 165 + await f.write(chunk) 166 + 167 + # update progress 168 + async with download_lock: 169 + downloaded += 1 170 + pct = (downloaded / total) * 100 171 await job_service.update_progress( 172 export_id, 173 JobStatus.PROCESSING, 174 + f"downloading tracks ({downloaded}/{total})...", 175 progress_pct=pct, 176 result={ 177 + "processed_count": downloaded, 178 "total_count": total, 179 }, 180 ) 181 182 + logfire.info( 183 + "downloaded track: {track_title}", 184 + track_id=track.id, 185 + track_title=track.title, 186 + ) 187 + return info 188 189 + except Exception as e: 190 + logfire.error( 191 + "failed to download track: {track_title}", 192 + track_id=track.id, 193 + track_title=track.title, 194 + error=str(e), 195 + _exc_info=True, 196 + ) 197 + return None 198 199 + # download all tracks concurrently (limit to 4 concurrent downloads) 200 + await job_service.update_progress( 201 + export_id, 202 + JobStatus.PROCESSING, 203 + f"downloading {total} tracks...", 204 + progress_pct=0, 205 + result={"processed_count": 0, "total_count": total}, 206 + ) 207 208 + semaphore = asyncio.Semaphore(4) 209 + async with async_session.client( 210 + "s3", 211 + endpoint_url=settings.storage.r2_endpoint_url, 212 + aws_access_key_id=settings.storage.aws_access_key_id, 213 + aws_secret_access_key=settings.storage.aws_secret_access_key, 214 + ) as s3_client: 215 + results = await asyncio.gather( 216 + *[download_track(info, s3_client, semaphore) for info in track_info] 217 + ) 218 219 + # filter out failed downloads 220 + successful_downloads = [r for r in results if r is not None] 221 222 + # create zip file from downloaded tracks (sequential - zipfile not thread-safe) 223 + await job_service.update_progress( 224 + export_id, 225 + JobStatus.PROCESSING, 226 + "creating zip archive...", 227 + progress_pct=100, 228 + result={ 229 + "processed_count": len(successful_downloads), 230 + "total_count": total, 231 + }, 232 + ) 233 234 + with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zip_file: 235 + for info in successful_downloads: 236 + zip_file.write(info["temp_path"], arcname=info["filename"]) 237 + logfire.info( 238 + "added track to export: {track_title}", 239 + track_id=info["track"].id, 240 + track_title=info["track"].title, 241 + filename=info["filename"], 242 + ) 243 + # remove temp file to free disk space 244 + os.unlink(info["temp_path"]) 245 246 + processed = len(successful_downloads) 247 248 # upload the zip file to R2 (still inside temp directory context) 249 r2_key = f"exports/{export_id}.zip"
+98 -1
backend/tests/test_background_tasks.py
··· 1 """tests for background task scheduling.""" 2 3 - from unittest.mock import MagicMock, patch 4 5 import backend._internal.background_tasks as bg_tasks 6 ··· 101 assert calls == [ 102 ("session-xyz", 42, "Test Track", "Test Artist", 180, "Test Album") 103 ]
··· 1 """tests for background task scheduling.""" 2 3 + import asyncio 4 + import contextlib 5 + from unittest.mock import AsyncMock, MagicMock, patch 6 7 import backend._internal.background_tasks as bg_tasks 8 ··· 103 assert calls == [ 104 ("session-xyz", 42, "Test Track", "Test Artist", 180, "Test Album") 105 ] 106 + 107 + 108 + async def test_process_export_downloads_concurrently() -> None: 109 + """process_export should download tracks concurrently, not sequentially. 110 + 111 + regression test: previously tracks were downloaded one at a time, 112 + making exports slow for users with many tracks or large files. 113 + """ 114 + download_times: list[float] = [] 115 + download_start_event = asyncio.Event() 116 + 117 + async def mock_get_object(Bucket: str, Key: str) -> dict: 118 + """track when downloads start and simulate network delay.""" 119 + download_times.append(asyncio.get_event_loop().time()) 120 + # signal that at least one download has started 121 + download_start_event.set() 122 + # simulate network delay 123 + await asyncio.sleep(0.1) 124 + # return mock response with async body 125 + body = AsyncMock() 126 + body.iter_chunks = lambda: async_chunk_gen() 127 + return {"Body": body} 128 + 129 + async def async_chunk_gen(): 130 + yield b"mock audio data" 131 + 132 + # create mock tracks 133 + mock_tracks = [] 134 + for i in range(4): 135 + track = MagicMock() 136 + track.id = i 137 + track.title = f"Track {i}" 138 + track.file_id = f"file_{i}" 139 + track.file_type = "mp3" 140 + mock_tracks.append(track) 141 + 142 + # mock database query 143 + mock_result = MagicMock() 144 + mock_result.scalars.return_value.all.return_value = mock_tracks 145 + 146 + mock_db = AsyncMock() 147 + mock_db.execute.return_value = mock_result 148 + 149 + # mock S3 client 150 + mock_s3 = AsyncMock() 151 + mock_s3.get_object = mock_get_object 152 + 153 + # mock session that returns mock s3 client 154 + mock_session = MagicMock() 155 + mock_session.client.return_value.__aenter__.return_value = mock_s3 156 + 157 + # mock job service 158 + mock_job_service = AsyncMock() 159 + 160 + # mock aiofiles.open to be a no-op 161 + mock_file = AsyncMock() 162 + mock_file.__aenter__.return_value = mock_file 163 + mock_file.__aexit__.return_value = None 164 + mock_file.write = AsyncMock() 165 + 166 + with ( 167 + patch( 168 + "backend._internal.background_tasks.aioboto3.Session", 169 + return_value=mock_session, 170 + ), 171 + patch( 172 + "backend._internal.background_tasks.aiofiles.open", return_value=mock_file 173 + ), 174 + patch("backend._internal.background_tasks.zipfile.ZipFile"), 175 + patch("backend._internal.background_tasks.os.unlink"), 176 + patch("backend.utilities.database.db_session") as mock_db_session, 177 + patch("backend._internal.jobs.job_service", mock_job_service), 178 + ): 179 + mock_db_session.return_value.__aenter__.return_value = mock_db 180 + 181 + # run process_export but cancel before upload phase 182 + # (we only care about testing download concurrency) 183 + with contextlib.suppress(TimeoutError): 184 + await asyncio.wait_for( 185 + bg_tasks.process_export("export-123", "did:plc:testuser"), 186 + timeout=2.0, 187 + ) 188 + 189 + # verify downloads started concurrently: 190 + # if sequential, each download would start ~0.1s after the previous 191 + # if concurrent, all 4 downloads should start within ~0.05s of each other 192 + assert len(download_times) == 4, f"expected 4 downloads, got {len(download_times)}" 193 + 194 + # check that all downloads started within a small time window (concurrent) 195 + # not spread out over 0.4s (sequential) 196 + time_spread = max(download_times) - min(download_times) 197 + assert time_spread < 0.1, ( 198 + f"downloads should start concurrently (within 0.1s), " 199 + f"but time spread was {time_spread:.3f}s - likely still sequential" 200 + )