feat: migrate media export to docket background tasks (#536)

- move export processing from FastAPI BackgroundTasks to docket
- add process_export task function with full R2 upload logic
- add schedule_export helper with asyncio fallback
- register process_export in docket worker
- add tests for scheduling with docket and asyncio fallback
- update background-tasks.md documentation

exports now benefit from docket's durability: if a worker crashes
mid-export, the task will be retried on restart.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude <noreply@anthropic.com>

authored by zzstoatzz.io Claude and committed by GitHub 9fc6015d b7287f2e

Changed files
+385 -223
backend
docs
+3 -2
backend/src/backend/_internal/background.py
··· 121 121 add new task imports here as they're created. 122 122 """ 123 123 # import task functions here to avoid circular imports 124 - from backend._internal.background_tasks import scan_copyright 124 + from backend._internal.background_tasks import process_export, scan_copyright 125 125 126 126 docket.register(scan_copyright) 127 + docket.register(process_export) 127 128 128 129 logger.info( 129 130 "registered background tasks", 130 - extra={"tasks": ["scan_copyright"]}, 131 + extra={"tasks": ["scan_copyright", "process_export"]}, 131 132 )
+250
backend/src/backend/_internal/background_tasks.py
··· 6 6 7 7 import asyncio 8 8 import logging 9 + import os 10 + import tempfile 11 + import zipfile 12 + from datetime import datetime 13 + from pathlib import Path 9 14 15 + import aioboto3 16 + import aiofiles 10 17 import logfire 11 18 12 19 from backend._internal.background import get_docket, is_docket_enabled ··· 53 60 54 61 # fallback: fire-and-forget 55 62 asyncio.create_task(scan_track_for_copyright(track_id, audio_url)) # noqa: RUF006 63 + 64 + 65 + async def process_export(export_id: str, artist_did: str) -> None: 66 + """process a media export in the background. 67 + 68 + downloads all tracks for the given artist, zips them, and uploads 69 + to R2. progress is tracked via job_service. 70 + 71 + args: 72 + export_id: job ID for tracking progress 73 + artist_did: DID of the artist whose tracks to export 74 + """ 75 + from sqlalchemy import select 76 + 77 + from backend._internal.jobs import job_service 78 + from backend.config import settings 79 + from backend.models import Track 80 + from backend.models.job import JobStatus 81 + from backend.storage.r2 import UploadProgressTracker 82 + from backend.utilities.database import db_session 83 + from backend.utilities.progress import R2ProgressTracker 84 + 85 + try: 86 + await job_service.update_progress( 87 + export_id, JobStatus.PROCESSING, "fetching tracks..." 88 + ) 89 + 90 + # query all tracks for the user 91 + async with db_session() as db: 92 + stmt = ( 93 + select(Track) 94 + .where(Track.artist_did == artist_did) 95 + .order_by(Track.created_at) 96 + ) 97 + result = await db.execute(stmt) 98 + tracks = result.scalars().all() 99 + 100 + if not tracks: 101 + await job_service.update_progress( 102 + export_id, 103 + JobStatus.FAILED, 104 + "export failed", 105 + error="no tracks found to export", 106 + ) 107 + return 108 + 109 + # use temp directory to avoid loading large files into memory 110 + with tempfile.TemporaryDirectory() as temp_dir: 111 + temp_path = Path(temp_dir) 112 + zip_path = temp_path / f"{export_id}.zip" 113 + async_session = aioboto3.Session() 114 + 115 + async with async_session.client( 116 + "s3", 117 + endpoint_url=settings.storage.r2_endpoint_url, 118 + aws_access_key_id=settings.storage.aws_access_key_id, 119 + aws_secret_access_key=settings.storage.aws_secret_access_key, 120 + ) as s3_client: 121 + with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zip_file: 122 + # track counter for duplicate titles 123 + title_counts: dict[str, int] = {} 124 + processed = 0 125 + total = len(tracks) 126 + 127 + for track in tracks: 128 + if not track.file_id or not track.file_type: 129 + logfire.warn( 130 + "skipping track: missing file_id or file_type", 131 + track_id=track.id, 132 + ) 133 + continue 134 + 135 + # construct R2 key 136 + key = f"audio/{track.file_id}.{track.file_type}" 137 + 138 + try: 139 + # update progress (current_track is 1-indexed for display) 140 + current_track = processed + 1 141 + pct = (processed / total) * 100 142 + await job_service.update_progress( 143 + export_id, 144 + JobStatus.PROCESSING, 145 + f"downloading track {current_track} of {total}...", 146 + progress_pct=pct, 147 + result={ 148 + "processed_count": processed, 149 + "total_count": total, 150 + "current_track": current_track, 151 + "current_title": track.title, 152 + }, 153 + ) 154 + 155 + # create safe filename 156 + # handle duplicate titles by appending counter 157 + base_filename = f"{track.title}.{track.file_type}" 158 + if base_filename in title_counts: 159 + title_counts[base_filename] += 1 160 + filename = f"{track.title} ({title_counts[base_filename]}).{track.file_type}" 161 + else: 162 + title_counts[base_filename] = 0 163 + filename = base_filename 164 + 165 + # sanitize filename (remove invalid chars) 166 + filename = "".join( 167 + c 168 + for c in filename 169 + if c.isalnum() or c in (" ", ".", "-", "_", "(", ")") 170 + ) 171 + 172 + # download file to temp location, streaming to disk 173 + temp_file_path = temp_path / filename 174 + response = await s3_client.get_object( 175 + Bucket=settings.storage.r2_bucket, 176 + Key=key, 177 + ) 178 + 179 + # stream to disk in chunks to avoid loading into memory 180 + async with aiofiles.open(temp_file_path, "wb") as f: 181 + async for chunk in response["Body"].iter_chunks(): 182 + await f.write(chunk) 183 + 184 + # add to zip from disk (streams from file, doesn't load into memory) 185 + zip_file.write(temp_file_path, arcname=filename) 186 + 187 + # remove temp file immediately to free disk space 188 + os.unlink(temp_file_path) 189 + 190 + processed += 1 191 + logfire.info( 192 + "added track to export: {track_title}", 193 + track_id=track.id, 194 + track_title=track.title, 195 + filename=filename, 196 + ) 197 + 198 + except Exception as e: 199 + logfire.error( 200 + "failed to add track to export: {track_title}", 201 + track_id=track.id, 202 + track_title=track.title, 203 + error=str(e), 204 + _exc_info=True, 205 + ) 206 + # continue with other tracks instead of failing entire export 207 + 208 + # upload the zip file to R2 (still inside temp directory context) 209 + r2_key = f"exports/{export_id}.zip" 210 + 211 + try: 212 + zip_size = zip_path.stat().st_size 213 + 214 + # Generate user-friendly filename for download 215 + download_filename = f"plyr-tracks-{datetime.now().date()}.zip" 216 + 217 + async with ( 218 + R2ProgressTracker( 219 + job_id=export_id, 220 + message="finalizing export...", 221 + phase="upload", 222 + ) as tracker, 223 + async_session.client( 224 + "s3", 225 + endpoint_url=settings.storage.r2_endpoint_url, 226 + aws_access_key_id=settings.storage.aws_access_key_id, 227 + aws_secret_access_key=settings.storage.aws_secret_access_key, 228 + ) as upload_client, 229 + ): 230 + # Wrap callback with UploadProgressTracker to convert bytes to percentage 231 + bytes_to_pct = UploadProgressTracker(zip_size, tracker.on_progress) 232 + with open(zip_path, "rb") as zip_file_obj: 233 + await upload_client.upload_fileobj( 234 + zip_file_obj, 235 + settings.storage.r2_bucket, 236 + r2_key, 237 + ExtraArgs={ 238 + "ContentType": "application/zip", 239 + "ContentDisposition": f'attachment; filename="{download_filename}"', 240 + }, 241 + Callback=bytes_to_pct, 242 + ) 243 + 244 + # Final 100% update 245 + await job_service.update_progress( 246 + export_id, 247 + JobStatus.PROCESSING, 248 + "finalizing export...", 249 + phase="upload", 250 + progress_pct=100.0, 251 + ) 252 + 253 + except Exception as e: 254 + logfire.error("failed to upload export zip", error=str(e)) 255 + raise 256 + 257 + # get download URL 258 + download_url = f"{settings.storage.r2_public_bucket_url}/{r2_key}" 259 + 260 + # mark as completed 261 + await job_service.update_progress( 262 + export_id, 263 + JobStatus.COMPLETED, 264 + f"export completed - {processed} tracks ready", 265 + result={ 266 + "processed_count": processed, 267 + "total_count": len(tracks), 268 + "download_url": download_url, 269 + }, 270 + ) 271 + 272 + except Exception as e: 273 + logfire.exception( 274 + "export failed with unexpected error", 275 + export_id=export_id, 276 + ) 277 + await job_service.update_progress( 278 + export_id, 279 + JobStatus.FAILED, 280 + "export failed", 281 + error=f"unexpected error: {e!s}", 282 + ) 283 + 284 + 285 + async def schedule_export(export_id: str, artist_did: str) -> None: 286 + """schedule an export, using docket if enabled, else asyncio. 287 + 288 + this is the entry point for scheduling exports. it handles 289 + the docket vs asyncio fallback logic in one place. 290 + """ 291 + if is_docket_enabled(): 292 + try: 293 + docket = get_docket() 294 + await docket.add(process_export)(export_id, artist_did) 295 + logfire.info("scheduled export via docket", export_id=export_id) 296 + return 297 + except Exception as e: 298 + logfire.warning( 299 + "docket scheduling failed, falling back to asyncio", 300 + export_id=export_id, 301 + error=str(e), 302 + ) 303 + 304 + # fallback: fire-and-forget 305 + asyncio.create_task(process_export(export_id, artist_did)) # noqa: RUF006
+4 -219
backend/src/backend/api/exports.py
··· 3 3 import asyncio 4 4 import json 5 5 import logging 6 - import os 7 - import tempfile 8 - import zipfile 9 - from pathlib import Path 10 6 from typing import Annotated 11 7 12 - import aioboto3 13 - import aiofiles 14 - import logfire 15 - from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException 8 + from fastapi import APIRouter, Depends, HTTPException 16 9 from fastapi.responses import RedirectResponse, StreamingResponse 17 10 from sqlalchemy import select 18 11 from sqlalchemy.ext.asyncio import AsyncSession 19 12 20 13 from backend._internal import Session, require_auth 14 + from backend._internal.background_tasks import schedule_export 21 15 from backend._internal.jobs import job_service 22 - from backend.config import settings 23 16 from backend.models import Track, get_db 24 17 from backend.models.job import JobStatus, JobType 25 - from backend.storage.r2 import UploadProgressTracker 26 - from backend.utilities.database import db_session 27 - from backend.utilities.progress import R2ProgressTracker 28 18 29 19 router = APIRouter(prefix="/exports", tags=["exports"]) 30 20 logger = logging.getLogger(__name__) 31 21 32 22 33 - async def _process_export_background(export_id: str, artist_did: str) -> None: 34 - """background task to process export.""" 35 - try: 36 - await job_service.update_progress( 37 - export_id, JobStatus.PROCESSING, "fetching tracks..." 38 - ) 39 - 40 - # query all tracks for the user 41 - async with db_session() as db: 42 - stmt = ( 43 - select(Track) 44 - .where(Track.artist_did == artist_did) 45 - .order_by(Track.created_at) 46 - ) 47 - result = await db.execute(stmt) 48 - tracks = result.scalars().all() 49 - 50 - if not tracks: 51 - await job_service.update_progress( 52 - export_id, 53 - JobStatus.FAILED, 54 - "export failed", 55 - error="no tracks found to export", 56 - ) 57 - return 58 - 59 - # use temp directory to avoid loading large files into memory 60 - with tempfile.TemporaryDirectory() as temp_dir: 61 - temp_path = Path(temp_dir) 62 - zip_path = temp_path / f"{export_id}.zip" 63 - async_session = aioboto3.Session() 64 - 65 - async with async_session.client( 66 - "s3", 67 - endpoint_url=settings.storage.r2_endpoint_url, 68 - aws_access_key_id=settings.storage.aws_access_key_id, 69 - aws_secret_access_key=settings.storage.aws_secret_access_key, 70 - ) as s3_client: 71 - with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zip_file: 72 - # track counter for duplicate titles 73 - title_counts: dict[str, int] = {} 74 - processed = 0 75 - total = len(tracks) 76 - 77 - for track in tracks: 78 - if not track.file_id or not track.file_type: 79 - logfire.warn( 80 - "skipping track: missing file_id or file_type", 81 - track_id=track.id, 82 - ) 83 - continue 84 - 85 - # construct R2 key 86 - key = f"audio/{track.file_id}.{track.file_type}" 87 - 88 - try: 89 - # update progress (current_track is 1-indexed for display) 90 - current_track = processed + 1 91 - pct = (processed / total) * 100 92 - await job_service.update_progress( 93 - export_id, 94 - JobStatus.PROCESSING, 95 - f"downloading track {current_track} of {total}...", 96 - progress_pct=pct, 97 - result={ 98 - "processed_count": processed, 99 - "total_count": total, 100 - "current_track": current_track, 101 - "current_title": track.title, 102 - }, 103 - ) 104 - 105 - # create safe filename 106 - # handle duplicate titles by appending counter 107 - base_filename = f"{track.title}.{track.file_type}" 108 - if base_filename in title_counts: 109 - title_counts[base_filename] += 1 110 - filename = f"{track.title} ({title_counts[base_filename]}).{track.file_type}" 111 - else: 112 - title_counts[base_filename] = 0 113 - filename = base_filename 114 - 115 - # sanitize filename (remove invalid chars) 116 - filename = "".join( 117 - c 118 - for c in filename 119 - if c.isalnum() or c in (" ", ".", "-", "_", "(", ")") 120 - ) 121 - 122 - # download file to temp location, streaming to disk 123 - temp_file_path = temp_path / filename 124 - response = await s3_client.get_object( 125 - Bucket=settings.storage.r2_bucket, 126 - Key=key, 127 - ) 128 - 129 - # stream to disk in chunks to avoid loading into memory 130 - async with aiofiles.open(temp_file_path, "wb") as f: 131 - async for chunk in response["Body"].iter_chunks(): 132 - await f.write(chunk) 133 - 134 - # add to zip from disk (streams from file, doesn't load into memory) 135 - zip_file.write(temp_file_path, arcname=filename) 136 - 137 - # remove temp file immediately to free disk space 138 - os.unlink(temp_file_path) 139 - 140 - processed += 1 141 - logfire.info( 142 - "added track to export: {track_title}", 143 - track_id=track.id, 144 - track_title=track.title, 145 - filename=filename, 146 - ) 147 - 148 - except Exception as e: 149 - logfire.error( 150 - "failed to add track to export: {track_title}", 151 - track_id=track.id, 152 - track_title=track.title, 153 - error=str(e), 154 - _exc_info=True, 155 - ) 156 - # continue with other tracks instead of failing entire export 157 - 158 - # upload the zip file to R2 (still inside temp directory context) 159 - r2_key = f"exports/{export_id}.zip" 160 - 161 - try: 162 - zip_size = zip_path.stat().st_size 163 - 164 - # Generate user-friendly filename for download 165 - from datetime import datetime 166 - 167 - download_filename = f"plyr-tracks-{datetime.now().date()}.zip" 168 - 169 - async with ( 170 - R2ProgressTracker( 171 - job_id=export_id, 172 - message="finalizing export...", 173 - phase="upload", 174 - ) as tracker, 175 - async_session.client( 176 - "s3", 177 - endpoint_url=settings.storage.r2_endpoint_url, 178 - aws_access_key_id=settings.storage.aws_access_key_id, 179 - aws_secret_access_key=settings.storage.aws_secret_access_key, 180 - ) as upload_client, 181 - ): 182 - # Wrap callback with UploadProgressTracker to convert bytes to percentage 183 - bytes_to_pct = UploadProgressTracker(zip_size, tracker.on_progress) 184 - with open(zip_path, "rb") as zip_file_obj: 185 - await upload_client.upload_fileobj( 186 - zip_file_obj, 187 - settings.storage.r2_bucket, 188 - r2_key, 189 - ExtraArgs={ 190 - "ContentType": "application/zip", 191 - "ContentDisposition": f'attachment; filename="{download_filename}"', 192 - }, 193 - Callback=bytes_to_pct, 194 - ) 195 - 196 - # Final 100% update 197 - await job_service.update_progress( 198 - export_id, 199 - JobStatus.PROCESSING, 200 - "finalizing export...", 201 - phase="upload", 202 - progress_pct=100.0, 203 - ) 204 - 205 - except Exception as e: 206 - logfire.error("failed to upload export zip", error=str(e)) 207 - raise 208 - 209 - # get download URL 210 - download_url = f"{settings.storage.r2_public_bucket_url}/{r2_key}" 211 - 212 - # mark as completed 213 - await job_service.update_progress( 214 - export_id, 215 - JobStatus.COMPLETED, 216 - f"export completed - {processed} tracks ready", 217 - result={ 218 - "processed_count": processed, 219 - "total_count": len(tracks), 220 - "download_url": download_url, 221 - }, 222 - ) 223 - 224 - except Exception as e: 225 - logfire.exception( 226 - "export failed with unexpected error", 227 - export_id=export_id, 228 - ) 229 - await job_service.update_progress( 230 - export_id, 231 - JobStatus.FAILED, 232 - "export failed", 233 - error=f"unexpected error: {e!s}", 234 - ) 235 - 236 - 237 23 @router.post("/media") 238 24 async def export_media( 239 25 session: Annotated[Session, Depends(require_auth)], 240 26 db: Annotated[AsyncSession, Depends(get_db)], 241 - background_tasks: BackgroundTasks, 242 27 ) -> dict: 243 28 """start export of all tracks for authenticated user. 244 29 ··· 257 42 JobType.EXPORT, session.did, "export queued for processing" 258 43 ) 259 44 260 - # schedule background processing 261 - background_tasks.add_task(_process_export_background, export_id, session.did) 45 + # schedule background processing via docket (or asyncio fallback) 46 + await schedule_export(export_id, session.did) 262 47 263 48 return { 264 49 "export_id": export_id,
+125
backend/tests/test_background_tasks.py
··· 1 + """tests for background task scheduling.""" 2 + 3 + from unittest.mock import MagicMock, patch 4 + 5 + import backend._internal.background_tasks as bg_tasks 6 + 7 + 8 + async def test_schedule_export_uses_docket_when_enabled() -> None: 9 + """when docket is enabled, schedule_export should add task to docket.""" 10 + calls: list[tuple[str, str]] = [] 11 + 12 + async def mock_schedule(export_id: str, artist_did: str) -> None: 13 + calls.append((export_id, artist_did)) 14 + 15 + mock_docket = MagicMock() 16 + mock_docket.add = MagicMock(return_value=mock_schedule) 17 + 18 + with ( 19 + patch.object(bg_tasks, "is_docket_enabled", return_value=True), 20 + patch.object(bg_tasks, "get_docket", return_value=mock_docket), 21 + patch.object(bg_tasks, "process_export", MagicMock()), 22 + ): 23 + await bg_tasks.schedule_export("export-123", "did:plc:testuser") 24 + 25 + mock_docket.add.assert_called_once() 26 + assert calls == [("export-123", "did:plc:testuser")] 27 + 28 + 29 + async def test_schedule_export_falls_back_to_asyncio_when_disabled() -> None: 30 + """when docket is disabled, schedule_export should use asyncio.create_task.""" 31 + create_task_calls: list[object] = [] 32 + 33 + def capture_create_task(coro: object) -> MagicMock: 34 + create_task_calls.append(coro) 35 + return MagicMock() 36 + 37 + process_export_calls: list[tuple[str, str]] = [] 38 + 39 + def mock_process_export(export_id: str, artist_did: str) -> object: 40 + process_export_calls.append((export_id, artist_did)) 41 + return MagicMock() # return non-coroutine to avoid unawaited warning 42 + 43 + with ( 44 + patch.object(bg_tasks, "is_docket_enabled", return_value=False), 45 + patch.object(bg_tasks, "process_export", mock_process_export), 46 + patch.object(bg_tasks.asyncio, "create_task", capture_create_task), 47 + ): 48 + await bg_tasks.schedule_export("export-456", "did:plc:testuser") 49 + 50 + assert len(create_task_calls) == 1 51 + assert process_export_calls == [("export-456", "did:plc:testuser")] 52 + 53 + 54 + async def test_schedule_export_falls_back_on_docket_error() -> None: 55 + """if docket scheduling fails, should fall back to asyncio.""" 56 + mock_docket = MagicMock() 57 + mock_docket.add.side_effect = Exception("redis connection failed") 58 + 59 + create_task_calls: list[object] = [] 60 + 61 + def capture_create_task(coro: object) -> MagicMock: 62 + create_task_calls.append(coro) 63 + return MagicMock() 64 + 65 + process_export_calls: list[tuple[str, str]] = [] 66 + 67 + def mock_process_export(export_id: str, artist_did: str) -> object: 68 + process_export_calls.append((export_id, artist_did)) 69 + return MagicMock() 70 + 71 + with ( 72 + patch.object(bg_tasks, "is_docket_enabled", return_value=True), 73 + patch.object(bg_tasks, "get_docket", return_value=mock_docket), 74 + patch.object(bg_tasks, "process_export", mock_process_export), 75 + patch.object(bg_tasks.asyncio, "create_task", capture_create_task), 76 + ): 77 + await bg_tasks.schedule_export("export-789", "did:plc:testuser") 78 + 79 + assert len(create_task_calls) == 1 80 + 81 + 82 + async def test_schedule_copyright_scan_uses_docket_when_enabled() -> None: 83 + """when docket is enabled, schedule_copyright_scan should add task to docket.""" 84 + calls: list[tuple[int, str]] = [] 85 + 86 + async def mock_schedule(track_id: int, audio_url: str) -> None: 87 + calls.append((track_id, audio_url)) 88 + 89 + mock_docket = MagicMock() 90 + mock_docket.add = MagicMock(return_value=mock_schedule) 91 + 92 + with ( 93 + patch.object(bg_tasks, "is_docket_enabled", return_value=True), 94 + patch.object(bg_tasks, "get_docket", return_value=mock_docket), 95 + patch.object(bg_tasks, "scan_copyright", MagicMock()), 96 + ): 97 + await bg_tasks.schedule_copyright_scan(123, "https://example.com/audio.mp3") 98 + 99 + mock_docket.add.assert_called_once() 100 + assert calls == [(123, "https://example.com/audio.mp3")] 101 + 102 + 103 + async def test_schedule_copyright_scan_falls_back_to_asyncio_when_disabled() -> None: 104 + """when docket is disabled, schedule_copyright_scan should use asyncio.""" 105 + create_task_calls: list[object] = [] 106 + 107 + def capture_create_task(coro: object) -> MagicMock: 108 + create_task_calls.append(coro) 109 + return MagicMock() 110 + 111 + scan_calls: list[tuple[int, str]] = [] 112 + 113 + def mock_scan(track_id: int, audio_url: str) -> object: 114 + scan_calls.append((track_id, audio_url)) 115 + return MagicMock() 116 + 117 + with ( 118 + patch.object(bg_tasks, "is_docket_enabled", return_value=False), 119 + patch("backend._internal.moderation.scan_track_for_copyright", mock_scan), 120 + patch.object(bg_tasks.asyncio, "create_task", capture_create_task), 121 + ): 122 + await bg_tasks.schedule_copyright_scan(456, "https://example.com/audio.mp3") 123 + 124 + assert len(create_task_calls) == 1 125 + assert scan_calls == [(456, "https://example.com/audio.mp3")]
+3 -2
docs/backend/background-tasks.md
··· 6 6 7 7 background tasks handle operations that shouldn't block the request/response cycle: 8 8 - **copyright scanning** - analyzes uploaded tracks for potential copyright matches 9 - - (future) upload processing, notifications, etc. 9 + - **media export** - downloads all tracks, zips them, and uploads to R2 10 10 11 11 ## architecture 12 12 ··· 69 69 ### scheduling a task 70 70 71 71 ```python 72 - from backend._internal.background_tasks import schedule_copyright_scan 72 + from backend._internal.background_tasks import schedule_copyright_scan, schedule_export 73 73 74 74 # automatically uses docket if enabled, else asyncio.create_task 75 75 await schedule_copyright_scan(track_id, audio_url) 76 + await schedule_export(export_id, artist_did) 76 77 ``` 77 78 78 79 ### adding new tasks