feat: database-backed jobs for robust uploads and exports (#337)

* feat: database-backed jobs for robust uploads and exports

* fix: add missing job system files

* refactor: use R2ProgressTracker utility for upload/export progress

* fix: resolve linting errors (undefined Any, nested with)

* fix: address PR comments (SSE newlines, migration default)

* fix: merge job results to prevent data loss

authored by zzstoatzz.io and committed by GitHub cebfc795 52306886

Changed files
+550 -457
backend
frontend
src
lib
routes
portal
+50
backend/alembic/versions/2025_11_24_003101_63b5345e1707_add_jobs_table.py
··· 1 + """add jobs table 2 + 3 + Revision ID: 63b5345e1707 4 + Revises: 7f3d3a0f6c1a 5 + Create Date: 2025-11-24 00:31:01.929398 6 + 7 + """ 8 + 9 + from collections.abc import Sequence 10 + 11 + import sqlalchemy as sa 12 + 13 + from alembic import op 14 + 15 + # revision identifiers, used by Alembic. 16 + revision: str = "63b5345e1707" 17 + down_revision: str | Sequence[str] | None = "7f3d3a0f6c1a" 18 + branch_labels: str | Sequence[str] | None = None 19 + depends_on: str | Sequence[str] | None = None 20 + 21 + 22 + def upgrade() -> None: 23 + """Upgrade schema.""" 24 + op.create_table( 25 + "jobs", 26 + sa.Column("id", sa.String(), nullable=False), 27 + sa.Column("type", sa.String(), nullable=False), 28 + sa.Column("status", sa.String(), nullable=False), 29 + sa.Column("owner_did", sa.String(), nullable=False), 30 + sa.Column("progress_pct", sa.Float(), nullable=False, server_default="0.0"), 31 + sa.Column("message", sa.String(), nullable=True), 32 + sa.Column("phase", sa.String(), nullable=True), 33 + sa.Column( 34 + "result", sa.dialects.postgresql.JSONB(astext_type=sa.Text()), nullable=True 35 + ), 36 + sa.Column("error", sa.String(), nullable=True), 37 + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), 38 + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), 39 + sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True), 40 + sa.PrimaryKeyConstraint("id"), 41 + ) 42 + op.create_index("idx_jobs_owner", "jobs", ["owner_did"], unique=False) 43 + op.create_index("idx_jobs_updated_at", "jobs", ["updated_at"], unique=False) 44 + 45 + 46 + def downgrade() -> None: 47 + """Downgrade schema.""" 48 + op.drop_index("idx_jobs_updated_at", table_name="jobs") 49 + op.drop_index("idx_jobs_owner", table_name="jobs") 50 + op.drop_table("jobs")
-164
backend/src/backend/_internal/exports.py
··· 1 - """async export tracking and background processing.""" 2 - 3 - import asyncio 4 - import contextlib 5 - import logging 6 - from dataclasses import dataclass 7 - from datetime import UTC, datetime 8 - from enum import Enum 9 - from typing import Any 10 - from uuid import uuid4 11 - 12 - logger = logging.getLogger(__name__) 13 - 14 - # maximum number of buffered progress updates per export listener 15 - MAX_PROGRESS_QUEUE_SIZE = 10 16 - 17 - 18 - class ExportStatus(str, Enum): 19 - """export status enum.""" 20 - 21 - PENDING = "pending" 22 - PROCESSING = "processing" 23 - COMPLETED = "completed" 24 - FAILED = "failed" 25 - 26 - 27 - @dataclass 28 - class ExportProgress: 29 - """export progress state.""" 30 - 31 - export_id: str 32 - status: ExportStatus 33 - message: str 34 - track_count: int | None = None 35 - processed_count: int = 0 36 - error: str | None = None 37 - created_at: datetime | None = None 38 - completed_at: datetime | None = None 39 - download_url: str | None = None 40 - 41 - def to_dict(self) -> dict[str, Any]: 42 - """serialize to dict.""" 43 - return { 44 - "export_id": self.export_id, 45 - "status": self.status.value, 46 - "message": self.message, 47 - "track_count": self.track_count, 48 - "processed_count": self.processed_count, 49 - "error": self.error, 50 - "created_at": self.created_at.isoformat() if self.created_at else None, 51 - "completed_at": self.completed_at.isoformat() 52 - if self.completed_at 53 - else None, 54 - "download_url": self.download_url, 55 - } 56 - 57 - 58 - class ExportTracker: 59 - """tracks export progress in memory.""" 60 - 61 - def __init__(self): 62 - self._exports: dict[str, ExportProgress] = {} 63 - self._listeners: dict[str, list[asyncio.Queue]] = {} 64 - self._export_data: dict[str, bytes] = {} # store zip data temporarily 65 - 66 - def create_export(self, track_count: int) -> str: 67 - """create a new export and return its ID.""" 68 - export_id = str(uuid4()) 69 - self._exports[export_id] = ExportProgress( 70 - export_id=export_id, 71 - status=ExportStatus.PENDING, 72 - message="export queued", 73 - track_count=track_count, 74 - created_at=datetime.now(UTC), 75 - ) 76 - self._listeners[export_id] = [] 77 - return export_id 78 - 79 - def update_status( 80 - self, 81 - export_id: str, 82 - status: ExportStatus, 83 - message: str, 84 - processed_count: int | None = None, 85 - error: str | None = None, 86 - ) -> None: 87 - """update export status and notify listeners.""" 88 - if export_id not in self._exports: 89 - logger.warning(f"attempted to update unknown export: {export_id}") 90 - return 91 - 92 - export = self._exports[export_id] 93 - export.status = status 94 - export.message = message 95 - if processed_count is not None: 96 - export.processed_count = processed_count 97 - export.error = error 98 - 99 - if status in (ExportStatus.COMPLETED, ExportStatus.FAILED): 100 - export.completed_at = datetime.now(UTC) 101 - 102 - # notify all listeners 103 - if export_id in self._listeners: 104 - for queue in self._listeners[export_id]: 105 - try: 106 - queue.put_nowait(export.to_dict()) 107 - except asyncio.QueueFull: 108 - logger.warning(f"listener queue full for export {export_id}") 109 - 110 - def store_export_data(self, export_id: str, data: bytes) -> None: 111 - """store the export zip data temporarily.""" 112 - self._export_data[export_id] = data 113 - if export_id in self._exports: 114 - self._exports[export_id].download_url = f"/exports/{export_id}/download" 115 - 116 - def get_export_data(self, export_id: str) -> bytes | None: 117 - """get the stored export data.""" 118 - return self._export_data.get(export_id) 119 - 120 - def get_status(self, export_id: str) -> ExportProgress | None: 121 - """get current export status.""" 122 - return self._exports.get(export_id) 123 - 124 - async def subscribe(self, export_id: str) -> asyncio.Queue: 125 - """subscribe to export progress updates.""" 126 - if export_id not in self._listeners: 127 - self._listeners[export_id] = [] 128 - 129 - queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_PROGRESS_QUEUE_SIZE) 130 - self._listeners[export_id].append(queue) 131 - 132 - # send current status immediately 133 - if export_id in self._exports: 134 - await queue.put(self._exports[export_id].to_dict()) 135 - 136 - return queue 137 - 138 - def unsubscribe(self, export_id: str, queue: asyncio.Queue) -> None: 139 - """unsubscribe from export progress.""" 140 - if export_id in self._listeners: 141 - with contextlib.suppress(ValueError): 142 - self._listeners[export_id].remove(queue) 143 - 144 - def cleanup_old_exports(self, max_age_seconds: int = 3600) -> None: 145 - """remove exports older than max_age_seconds.""" 146 - now = datetime.now(UTC) 147 - to_remove = [] 148 - 149 - for export_id, export in self._exports.items(): 150 - if export.completed_at: 151 - age = (now - export.completed_at).total_seconds() 152 - if age > max_age_seconds: 153 - to_remove.append(export_id) 154 - 155 - for export_id in to_remove: 156 - del self._exports[export_id] 157 - if export_id in self._listeners: 158 - del self._listeners[export_id] 159 - if export_id in self._export_data: 160 - del self._export_data[export_id] 161 - 162 - 163 - # global tracker instance 164 - export_tracker = ExportTracker()
+94
backend/src/backend/_internal/jobs.py
··· 1 + """Database-backed job tracking service.""" 2 + 3 + import logging 4 + from datetime import UTC, datetime 5 + from typing import Any 6 + 7 + import logfire 8 + from sqlalchemy import select 9 + 10 + from backend.models.job import Job, JobStatus, JobType 11 + from backend.utilities.database import db_session 12 + 13 + logger = logging.getLogger(__name__) 14 + 15 + 16 + class JobService: 17 + """Service for managing database-backed jobs.""" 18 + 19 + async def create_job( 20 + self, 21 + job_type: JobType, 22 + owner_did: str, 23 + initial_message: str = "job created", 24 + ) -> str: 25 + """Create a new job and return its ID.""" 26 + async with db_session() as db: 27 + job = Job( 28 + type=job_type.value, 29 + owner_did=owner_did, 30 + status=JobStatus.PENDING.value, 31 + message=initial_message, 32 + progress_pct=0.0, 33 + ) 34 + db.add(job) 35 + await db.commit() 36 + await db.refresh(job) 37 + return job.id 38 + 39 + async def update_progress( 40 + self, 41 + job_id: str, 42 + status: JobStatus, 43 + message: str, 44 + progress_pct: float | None = None, 45 + phase: str | None = None, 46 + result: dict[str, Any] | None = None, 47 + error: str | None = None, 48 + ) -> None: 49 + """Update job progress.""" 50 + async with db_session() as db: 51 + stmt = select(Job).where(Job.id == job_id) 52 + result_db = await db.execute(stmt) 53 + job = result_db.scalar_one_or_none() 54 + 55 + if not job: 56 + logger.warning(f"attempted to update unknown job: {job_id}") 57 + return 58 + 59 + job.status = status.value 60 + job.message = message 61 + if progress_pct is not None: 62 + job.progress_pct = progress_pct 63 + if phase: 64 + job.phase = phase 65 + if result: 66 + job.result = {**(job.result or {}), **result} 67 + if error: 68 + job.error = error 69 + 70 + if status in (JobStatus.COMPLETED, JobStatus.FAILED): 71 + job.completed_at = datetime.now(UTC) 72 + 73 + await db.commit() 74 + 75 + # log significant updates 76 + if status in (JobStatus.COMPLETED, JobStatus.FAILED) or ( 77 + progress_pct and int(progress_pct) % 25 == 0 78 + ): 79 + logfire.info( 80 + "job updated", 81 + job_id=job_id, 82 + status=status.value, 83 + progress=progress_pct, 84 + ) 85 + 86 + async def get_job(self, job_id: str) -> Job | None: 87 + """Get job by ID.""" 88 + async with db_session() as db: 89 + stmt = select(Job).where(Job.id == job_id) 90 + result = await db.execute(stmt) 91 + return result.scalar_one_or_none() 92 + 93 + 94 + job_service = JobService()
-154
backend/src/backend/_internal/uploads.py
··· 1 - """async upload tracking and background processing.""" 2 - 3 - import asyncio 4 - import contextlib 5 - import logging 6 - from dataclasses import dataclass 7 - from datetime import UTC, datetime 8 - from enum import Enum 9 - from typing import Any 10 - from uuid import uuid4 11 - 12 - logger = logging.getLogger(__name__) 13 - 14 - # maximum number of buffered progress updates per upload listener 15 - # this prevents memory buildup if a client disconnects without properly closing the SSE connection 16 - MAX_PROGRESS_QUEUE_SIZE = 10 17 - 18 - 19 - class UploadStatus(str, Enum): 20 - """upload status enum.""" 21 - 22 - PENDING = "pending" 23 - PROCESSING = "processing" 24 - COMPLETED = "completed" 25 - FAILED = "failed" 26 - 27 - 28 - @dataclass 29 - class UploadProgress: 30 - """upload progress state.""" 31 - 32 - upload_id: str 33 - status: UploadStatus 34 - message: str 35 - track_id: int | None = None 36 - error: str | None = None 37 - created_at: datetime | None = None 38 - completed_at: datetime | None = None 39 - server_progress_pct: float | None = None 40 - phase: str | None = None 41 - 42 - def to_dict(self) -> dict[str, Any]: 43 - """serialize to dict.""" 44 - return { 45 - "upload_id": self.upload_id, 46 - "status": self.status.value, 47 - "message": self.message, 48 - "track_id": self.track_id, 49 - "error": self.error, 50 - "created_at": self.created_at.isoformat() if self.created_at else None, 51 - "completed_at": self.completed_at.isoformat() 52 - if self.completed_at 53 - else None, 54 - "server_progress_pct": self.server_progress_pct, 55 - "phase": self.phase, 56 - } 57 - 58 - 59 - class UploadTracker: 60 - """tracks upload progress in memory.""" 61 - 62 - def __init__(self): 63 - self._uploads: dict[str, UploadProgress] = {} 64 - self._listeners: dict[str, list[asyncio.Queue]] = {} 65 - 66 - def create_upload(self) -> str: 67 - """create a new upload and return its ID.""" 68 - upload_id = str(uuid4()) 69 - self._uploads[upload_id] = UploadProgress( 70 - upload_id=upload_id, 71 - status=UploadStatus.PENDING, 72 - message="upload queued", 73 - created_at=datetime.now(UTC), 74 - ) 75 - self._listeners[upload_id] = [] 76 - return upload_id 77 - 78 - def update_status( 79 - self, 80 - upload_id: str, 81 - status: UploadStatus, 82 - message: str, 83 - track_id: int | None = None, 84 - error: str | None = None, 85 - server_progress_pct: float | None = None, 86 - phase: str | None = None, 87 - ) -> None: 88 - """update upload status and notify listeners.""" 89 - if upload_id not in self._uploads: 90 - logger.warning(f"attempted to update unknown upload: {upload_id}") 91 - return 92 - 93 - upload = self._uploads[upload_id] 94 - upload.status = status 95 - upload.message = message 96 - upload.track_id = track_id 97 - upload.error = error 98 - upload.server_progress_pct = server_progress_pct 99 - upload.phase = phase 100 - 101 - if status in (UploadStatus.COMPLETED, UploadStatus.FAILED): 102 - upload.completed_at = datetime.now(UTC) 103 - 104 - # notify all listeners 105 - if upload_id in self._listeners: 106 - for queue in self._listeners[upload_id]: 107 - try: 108 - queue.put_nowait(upload.to_dict()) 109 - except asyncio.QueueFull: 110 - logger.warning(f"listener queue full for upload {upload_id}") 111 - 112 - def get_status(self, upload_id: str) -> UploadProgress | None: 113 - """get current upload status.""" 114 - return self._uploads.get(upload_id) 115 - 116 - async def subscribe(self, upload_id: str) -> asyncio.Queue: 117 - """subscribe to upload progress updates.""" 118 - if upload_id not in self._listeners: 119 - self._listeners[upload_id] = [] 120 - 121 - queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_PROGRESS_QUEUE_SIZE) 122 - self._listeners[upload_id].append(queue) 123 - 124 - # send current status immediately 125 - if upload_id in self._uploads: 126 - await queue.put(self._uploads[upload_id].to_dict()) 127 - 128 - return queue 129 - 130 - def unsubscribe(self, upload_id: str, queue: asyncio.Queue) -> None: 131 - """unsubscribe from upload progress.""" 132 - if upload_id in self._listeners: 133 - with contextlib.suppress(ValueError): 134 - self._listeners[upload_id].remove(queue) 135 - 136 - def cleanup_old_uploads(self, max_age_seconds: int = 3600) -> None: 137 - """remove uploads older than max_age_seconds.""" 138 - now = datetime.now(UTC) 139 - to_remove = [] 140 - 141 - for upload_id, upload in self._uploads.items(): 142 - if upload.completed_at: 143 - age = (now - upload.completed_at).total_seconds() 144 - if age > max_age_seconds: 145 - to_remove.append(upload_id) 146 - 147 - for upload_id in to_remove: 148 - del self._uploads[upload_id] 149 - if upload_id in self._listeners: 150 - del self._listeners[upload_id] 151 - 152 - 153 - # global tracker instance 154 - upload_tracker = UploadTracker()
+121 -41
backend/src/backend/api/exports.py
··· 3 3 import asyncio 4 4 import io 5 5 import json 6 + import logging 6 7 import zipfile 7 8 from typing import Annotated 8 9 9 10 import aioboto3 10 11 import logfire 11 12 from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException 12 - from fastapi.responses import StreamingResponse 13 + from fastapi.responses import RedirectResponse, StreamingResponse 13 14 from sqlalchemy import select 14 15 from sqlalchemy.ext.asyncio import AsyncSession 15 16 16 17 from backend._internal import Session, require_auth 17 - from backend._internal.exports import ExportStatus, export_tracker 18 + from backend._internal.jobs import job_service 18 19 from backend.config import settings 19 20 from backend.models import Track, get_db 21 + from backend.models.job import JobStatus, JobType 20 22 from backend.utilities.database import db_session 23 + from backend.utilities.progress import R2ProgressTracker 21 24 22 25 router = APIRouter(prefix="/exports", tags=["exports"]) 26 + logger = logging.getLogger(__name__) 23 27 24 28 25 29 async def _process_export_background(export_id: str, artist_did: str) -> None: 26 30 """background task to process export.""" 27 31 try: 28 - export_tracker.update_status( 29 - export_id, ExportStatus.PROCESSING, "fetching tracks..." 32 + await job_service.update_progress( 33 + export_id, JobStatus.PROCESSING, "fetching tracks..." 30 34 ) 31 35 32 36 # query all tracks for the user ··· 40 44 tracks = result.scalars().all() 41 45 42 46 if not tracks: 43 - export_tracker.update_status( 47 + await job_service.update_progress( 44 48 export_id, 45 - ExportStatus.FAILED, 49 + JobStatus.FAILED, 46 50 "export failed", 47 51 error="no tracks found to export", 48 52 ) ··· 62 66 # track counter for duplicate titles 63 67 title_counts: dict[str, int] = {} 64 68 processed = 0 69 + total = len(tracks) 65 70 66 71 for track in tracks: 67 72 if not track.file_id or not track.file_type: ··· 76 81 77 82 try: 78 83 # update progress 79 - export_tracker.update_status( 84 + pct = (processed / total) * 100 85 + await job_service.update_progress( 80 86 export_id, 81 - ExportStatus.PROCESSING, 87 + JobStatus.PROCESSING, 82 88 f"downloading {track.title}...", 83 - processed_count=processed, 89 + progress_pct=pct, 90 + result={"processed_count": processed, "total_count": total}, 84 91 ) 85 92 86 93 # download file from R2 ··· 130 137 ) 131 138 # continue with other tracks instead of failing entire export 132 139 133 - # store the zip data 140 + # store the zip data to R2 134 141 zip_buffer.seek(0) 135 - export_tracker.store_export_data(export_id, zip_buffer.getvalue()) 142 + zip_filename = f"{export_id}.zip" 143 + key = f"exports/{zip_filename}" 144 + 145 + await job_service.update_progress( 146 + export_id, 147 + JobStatus.PROCESSING, 148 + "uploading export to storage...", 149 + ) 150 + 151 + # Upload using aioboto3 directly 152 + try: 153 + async_session = aioboto3.Session() 154 + zip_size = zip_buffer.getbuffer().nbytes 155 + 156 + async with ( 157 + R2ProgressTracker( 158 + job_id=export_id, 159 + total_size=zip_size, 160 + message="uploading export to storage...", 161 + phase="upload", 162 + ) as tracker, 163 + async_session.client( 164 + "s3", 165 + endpoint_url=settings.storage.r2_endpoint_url, 166 + aws_access_key_id=settings.storage.aws_access_key_id, 167 + aws_secret_access_key=settings.storage.aws_secret_access_key, 168 + ) as s3_client, 169 + ): 170 + await s3_client.upload_fileobj( 171 + zip_buffer, 172 + settings.storage.r2_bucket, 173 + key, 174 + ExtraArgs={"ContentType": "application/zip"}, 175 + Callback=tracker.on_progress, 176 + ) 177 + 178 + # Final 100% update 179 + await job_service.update_progress( 180 + export_id, 181 + JobStatus.PROCESSING, 182 + "uploading export to storage...", 183 + phase="upload", 184 + progress_pct=100.0, 185 + ) 186 + 187 + except Exception as e: 188 + logfire.error("failed to upload export zip", error=str(e)) 189 + raise 190 + 191 + # get download URL 192 + download_url = f"{settings.storage.r2_public_bucket_url}/{key}" 136 193 137 194 # mark as completed 138 - export_tracker.update_status( 195 + await job_service.update_progress( 139 196 export_id, 140 - ExportStatus.COMPLETED, 197 + JobStatus.COMPLETED, 141 198 f"export completed - {processed} tracks ready", 142 - processed_count=processed, 199 + result={ 200 + "processed_count": processed, 201 + "total_count": len(tracks), 202 + "download_url": download_url, 203 + }, 143 204 ) 144 205 145 206 except Exception as e: ··· 147 208 "export failed with unexpected error", 148 209 export_id=export_id, 149 210 ) 150 - export_tracker.update_status( 211 + await job_service.update_progress( 151 212 export_id, 152 - ExportStatus.FAILED, 213 + JobStatus.FAILED, 153 214 "export failed", 154 215 error=f"unexpected error: {e!s}", 155 216 ) ··· 174 235 raise HTTPException(status_code=404, detail="no tracks found to export") 175 236 176 237 # create export tracking 177 - export_id = export_tracker.create_export(track_count=len(tracks)) 238 + export_id = await job_service.create_job( 239 + JobType.EXPORT, session.did, "export queued for processing" 240 + ) 178 241 179 242 # schedule background processing 180 243 background_tasks.add_task(_process_export_background, export_id, session.did) ··· 193 256 194 257 async def event_stream(): 195 258 """generate SSE events for export progress.""" 196 - queue = await export_tracker.subscribe(export_id) 259 + # Polling loop 197 260 try: 198 261 while True: 199 - try: 200 - # wait for next update with timeout 201 - update = await asyncio.wait_for(queue.get(), timeout=30.0) 202 - yield f"data: {json.dumps(update)}\n\n" 262 + job = await job_service.get_job(export_id) 263 + if not job: 264 + yield f"data: {json.dumps({'status': 'failed', 'message': 'export job not found', 'error': 'job lost'})}\n\n" 265 + break 266 + 267 + # Construct payload 268 + payload = { 269 + "export_id": job.id, 270 + "status": job.status, 271 + "message": job.message, 272 + "error": job.error, 273 + "processed_count": job.result.get("processed_count") 274 + if job.result 275 + else 0, 276 + "total_count": job.result.get("total_count") if job.result else 0, 277 + } 278 + if job.result and "download_url" in job.result: 279 + payload["download_url"] = job.result["download_url"] 280 + 281 + yield f"data: {json.dumps(payload)}\n\n" 203 282 204 - # if export completed or failed, close stream 205 - if update["status"] in ("completed", "failed"): 206 - break 283 + if job.status in (JobStatus.COMPLETED.value, JobStatus.FAILED.value): 284 + break 207 285 208 - except TimeoutError: 209 - # send keepalive 210 - yield ": keepalive\n\n" 286 + await asyncio.sleep(1.0) 211 287 212 - finally: 213 - export_tracker.unsubscribe(export_id, queue) 288 + except Exception as e: 289 + logger.error(f"error in export progress stream: {e}") 290 + yield f"data: {json.dumps({'status': 'failed', 'message': 'connection error', 'error': str(e)})}\n\n" 214 291 215 292 return StreamingResponse( 216 293 event_stream(), ··· 227 304 async def download_export( 228 305 export_id: str, 229 306 session: Annotated[Session, Depends(require_auth)], 230 - ) -> StreamingResponse: 307 + ) -> RedirectResponse: 231 308 """download the completed export zip file.""" 232 - export_data = export_tracker.get_export_data(export_id) 309 + job = await job_service.get_job(export_id) 233 310 234 - if not export_data: 235 - raise HTTPException(status_code=404, detail="export not found or expired") 311 + if not job: 312 + raise HTTPException(status_code=404, detail="export not found") 236 313 237 - return StreamingResponse( 238 - iter([export_data]), 239 - media_type="application/zip", 240 - headers={ 241 - "Content-Disposition": 'attachment; filename="plyr-tracks.zip"', 242 - }, 243 - ) 314 + if job.owner_did != session.did: 315 + raise HTTPException(status_code=403, detail="not authorized") 316 + 317 + if job.status != JobStatus.COMPLETED.value: 318 + raise HTTPException(status_code=400, detail="export not ready") 319 + 320 + if not job.result or "download_url" not in job.result: 321 + raise HTTPException(status_code=500, detail="export url not found") 322 + 323 + return RedirectResponse(url=job.result["download_url"])
+98 -63
backend/src/backend/api/tracks/uploads.py
··· 28 28 from backend._internal.atproto.handles import resolve_handle 29 29 from backend._internal.audio import AudioFormat 30 30 from backend._internal.image import ImageFormat 31 - from backend._internal.uploads import UploadStatus, upload_tracker 31 + from backend._internal.jobs import job_service 32 32 from backend.config import settings 33 33 from backend.models import Artist, Track 34 + from backend.models.job import JobStatus, JobType 34 35 from backend.storage import storage 35 36 from backend.utilities.database import db_session 36 37 from backend.utilities.hashing import CHUNK_SIZE 38 + from backend.utilities.progress import R2ProgressTracker 37 39 from backend.utilities.rate_limit import limiter 38 40 39 41 from .router import router ··· 59 61 "process upload background", upload_id=upload_id, filename=filename 60 62 ): 61 63 try: 62 - upload_tracker.update_status( 63 - upload_id, UploadStatus.PROCESSING, "processing upload..." 64 + await job_service.update_progress( 65 + upload_id, JobStatus.PROCESSING, "processing upload..." 64 66 ) 65 67 66 68 # validate file type 67 69 ext = Path(filename).suffix.lower() 68 70 audio_format = AudioFormat.from_extension(ext) 69 71 if not audio_format: 70 - upload_tracker.update_status( 72 + await job_service.update_progress( 71 73 upload_id, 72 - UploadStatus.FAILED, 74 + JobStatus.FAILED, 73 75 "upload failed", 74 76 error=f"unsupported file type: {ext}", 75 77 ) 76 78 return 77 79 78 80 # save audio file 79 - upload_tracker.update_status( 81 + await job_service.update_progress( 80 82 upload_id, 81 - UploadStatus.PROCESSING, 83 + JobStatus.PROCESSING, 82 84 "uploading to storage...", 83 85 phase="upload", 86 + progress_pct=0, 84 87 ) 85 88 try: 86 89 logfire.info("preparing to save audio file", filename=filename) 87 90 88 - # define progress callback for storage upload 89 - def on_upload_progress(progress_pct: float) -> None: 90 - """callback invoked during R2 upload with progress percentage.""" 91 - upload_tracker.update_status( 92 - upload_id, 93 - UploadStatus.PROCESSING, 94 - f"uploading to storage... {int(progress_pct)}%", 95 - server_progress_pct=progress_pct, 96 - phase="upload", 97 - ) 91 + file_size = Path(file_path).stat().st_size 92 + 93 + async with R2ProgressTracker( 94 + job_id=upload_id, 95 + total_size=file_size, 96 + message="uploading to storage...", 97 + phase="upload", 98 + ) as tracker: 99 + with open(file_path, "rb") as file_obj: 100 + logfire.info("calling storage.save") 101 + file_id = await storage.save( 102 + file_obj, filename, progress_callback=tracker.on_progress 103 + ) 104 + 105 + # Final 100% update 106 + await job_service.update_progress( 107 + upload_id, 108 + JobStatus.PROCESSING, 109 + "uploading to storage...", 110 + phase="upload", 111 + progress_pct=100.0, 112 + ) 113 + 114 + logfire.info("storage.save completed", file_id=file_id) 98 115 99 - with open(file_path, "rb") as file_obj: 100 - logfire.info("calling storage.save") 101 - file_id = await storage.save( 102 - file_obj, filename, progress_callback=on_upload_progress 103 - ) 104 - logfire.info("storage.save completed", file_id=file_id) 105 116 except ValueError as e: 106 117 logfire.error("ValueError during storage.save", error=str(e)) 107 - upload_tracker.update_status( 108 - upload_id, UploadStatus.FAILED, "upload failed", error=str(e) 118 + await job_service.update_progress( 119 + upload_id, JobStatus.FAILED, "upload failed", error=str(e) 109 120 ) 110 121 return 111 122 except Exception as e: ··· 114 125 error=str(e), 115 126 exc_info=True, 116 127 ) 117 - upload_tracker.update_status( 118 - upload_id, UploadStatus.FAILED, "upload failed", error=str(e) 128 + await job_service.update_progress( 129 + upload_id, JobStatus.FAILED, "upload failed", error=str(e) 119 130 ) 120 131 return 121 132 ··· 135 146 existing_track_id=existing_track.id, 136 147 artist_did=artist_did, 137 148 ) 138 - upload_tracker.update_status( 149 + await job_service.update_progress( 139 150 upload_id, 140 - UploadStatus.FAILED, 151 + JobStatus.FAILED, 141 152 "upload failed", 142 153 error=f"duplicate upload: track already exists (id: {existing_track.id})", 143 154 ) ··· 152 163 image_id = None 153 164 image_url = None 154 165 if image_path and image_filename: 155 - upload_tracker.update_status( 166 + await job_service.update_progress( 156 167 upload_id, 157 - UploadStatus.PROCESSING, 168 + JobStatus.PROCESSING, 158 169 "saving image...", 159 170 phase="image", 160 171 ) ··· 185 196 ) 186 197 artist = result.scalar_one_or_none() 187 198 if not artist: 188 - upload_tracker.update_status( 199 + await job_service.update_progress( 189 200 upload_id, 190 - UploadStatus.FAILED, 201 + JobStatus.FAILED, 191 202 "upload failed", 192 203 error="artist profile not found", 193 204 ) ··· 196 207 # resolve featured artist handles 197 208 featured_artists = [] 198 209 if features: 199 - upload_tracker.update_status( 210 + await job_service.update_progress( 200 211 upload_id, 201 - UploadStatus.PROCESSING, 212 + JobStatus.PROCESSING, 202 213 "resolving featured artists...", 203 214 phase="metadata", 204 215 ) ··· 238 249 "reason": reason, 239 250 }, 240 251 ) 241 - upload_tracker.update_status( 252 + await job_service.update_progress( 242 253 upload_id, 243 - UploadStatus.FAILED, 254 + JobStatus.FAILED, 244 255 "upload failed", 245 256 error=f"failed to sync track to ATProto: {reason}", 246 257 phase="atproto", ··· 257 268 atproto_uri = None 258 269 atproto_cid = None 259 270 if r2_url: 260 - upload_tracker.update_status( 271 + await job_service.update_progress( 261 272 upload_id, 262 - UploadStatus.PROCESSING, 273 + JobStatus.PROCESSING, 263 274 "creating atproto record...", 264 275 phase="atproto", 265 276 ) ··· 287 298 return 288 299 289 300 # create track record 290 - upload_tracker.update_status( 301 + await job_service.update_progress( 291 302 upload_id, 292 - UploadStatus.PROCESSING, 303 + JobStatus.PROCESSING, 293 304 "saving track metadata...", 294 305 phase="database", 295 306 ) ··· 339 350 f"failed to send notification for track {track.id}: {e}" 340 351 ) 341 352 342 - upload_tracker.update_status( 353 + await job_service.update_progress( 343 354 upload_id, 344 - UploadStatus.COMPLETED, 355 + JobStatus.COMPLETED, 345 356 "upload completed successfully", 346 - track_id=track.id, 357 + result={"track_id": track.id}, 347 358 ) 348 359 349 360 except IntegrityError as e: 350 361 await db.rollback() 351 362 # integrity errors now only occur for foreign key violations or other constraints 352 363 error_msg = f"database constraint violation: {e!s}" 353 - upload_tracker.update_status( 354 - upload_id, UploadStatus.FAILED, "upload failed", error=error_msg 364 + await job_service.update_progress( 365 + upload_id, 366 + JobStatus.FAILED, 367 + "upload failed", 368 + error=error_msg, 355 369 ) 356 370 # cleanup: delete uploaded file 357 371 with contextlib.suppress(Exception): ··· 359 373 360 374 except Exception as e: 361 375 logger.exception(f"upload {upload_id} failed with unexpected error") 362 - upload_tracker.update_status( 376 + await job_service.update_progress( 363 377 upload_id, 364 - UploadStatus.FAILED, 378 + JobStatus.FAILED, 365 379 "upload failed", 366 380 error=f"unexpected error: {e!s}", 367 381 ) ··· 466 480 ) 467 481 tmp_image.write(chunk) 468 482 469 - # create upload tracking 470 - upload_id = upload_tracker.create_upload() 483 + # create upload tracking via JobService 484 + upload_id = await job_service.create_job( 485 + JobType.UPLOAD, auth_session.did, "upload queued for processing" 486 + ) 471 487 472 488 # schedule background processing once response is sent 473 489 background_tasks.add_task( ··· 505 521 506 522 async def event_stream(): 507 523 """Generate SSE events for upload progress.""" 508 - queue = await upload_tracker.subscribe(upload_id) 524 + # Polling loop 509 525 try: 510 526 while True: 511 - try: 512 - # wait for next update with timeout 513 - update = await asyncio.wait_for(queue.get(), timeout=30.0) 514 - yield f"data: {json.dumps(update)}\n\n" 527 + job = await job_service.get_job(upload_id) 528 + if not job: 529 + # Job not found or lost 530 + yield f"data: {json.dumps({'status': 'failed', 'message': 'upload job not found', 'error': 'job lost'})}\n\n" 531 + break 515 532 516 - # if upload completed or failed, close stream 517 - if update["status"] in ("completed", "failed"): 518 - break 533 + # Construct payload matching old UploadProgress.to_dict() 534 + payload = { 535 + "upload_id": job.id, 536 + "status": job.status, 537 + "message": job.message, 538 + "error": job.error, 539 + "phase": job.phase, 540 + "server_progress_pct": job.progress_pct, 541 + "created_at": job.created_at.isoformat() 542 + if job.created_at 543 + else None, 544 + "completed_at": job.completed_at.isoformat() 545 + if job.completed_at 546 + else None, 547 + } 548 + if job.result and "track_id" in job.result: 549 + payload["track_id"] = job.result["track_id"] 519 550 520 - except TimeoutError: 521 - # send keepalive 522 - yield ": keepalive\n\n" 551 + yield f"data: {json.dumps(payload)}\n\n" 523 552 524 - finally: 525 - upload_tracker.unsubscribe(upload_id, queue) 553 + if job.status in (JobStatus.COMPLETED.value, JobStatus.FAILED.value): 554 + break 555 + 556 + await asyncio.sleep(1.0) 557 + 558 + except Exception as e: 559 + logger.error(f"error in upload progress stream: {e}") 560 + yield f"data: {json.dumps({'status': 'failed', 'message': 'connection error', 'error': str(e)})}\n\n" 526 561 527 562 return StreamingResponse( 528 563 event_stream(),
+2
backend/src/backend/models/__init__.py
··· 4 4 from backend.models.artist import Artist 5 5 from backend.models.database import Base 6 6 from backend.models.exchange_token import ExchangeToken 7 + from backend.models.job import Job 7 8 from backend.models.oauth_state import OAuthStateModel 8 9 from backend.models.preferences import UserPreferences 9 10 from backend.models.queue import QueueState ··· 17 18 "Artist", 18 19 "Base", 19 20 "ExchangeToken", 21 + "Job", 20 22 "OAuthStateModel", 21 23 "QueueState", 22 24 "Track",
+70
backend/src/backend/models/job.py
··· 1 + """Job models for tracking long-running tasks.""" 2 + 3 + from datetime import UTC, datetime 4 + from enum import Enum 5 + from typing import Any 6 + from uuid import uuid4 7 + 8 + from sqlalchemy import DateTime, Float, Index, String 9 + from sqlalchemy.dialects.postgresql import JSONB 10 + from sqlalchemy.orm import Mapped, mapped_column 11 + 12 + from backend.models.database import Base 13 + 14 + 15 + class JobStatus(str, Enum): 16 + """Status of a job.""" 17 + 18 + PENDING = "pending" 19 + PROCESSING = "processing" 20 + COMPLETED = "completed" 21 + FAILED = "failed" 22 + 23 + 24 + class JobType(str, Enum): 25 + """Type of job.""" 26 + 27 + UPLOAD = "upload" 28 + EXPORT = "export" 29 + 30 + 31 + class Job(Base): 32 + """Job for tracking long-running tasks.""" 33 + 34 + __tablename__ = "jobs" 35 + 36 + id: Mapped[str] = mapped_column( 37 + String, primary_key=True, default=lambda: str(uuid4()) 38 + ) 39 + type: Mapped[str] = mapped_column(String, nullable=False) # JobType enum 40 + status: Mapped[str] = mapped_column( 41 + String, nullable=False, default=JobStatus.PENDING.value 42 + ) # JobStatus enum 43 + owner_did: Mapped[str] = mapped_column(String, nullable=False) 44 + 45 + # Progress 46 + progress_pct: Mapped[float] = mapped_column(Float, default=0.0) 47 + message: Mapped[str | None] = mapped_column(String, nullable=True) 48 + phase: Mapped[str | None] = mapped_column(String, nullable=True) 49 + 50 + # Result/Error 51 + result: Mapped[dict[str, Any] | None] = mapped_column(JSONB, nullable=True) 52 + error: Mapped[str | None] = mapped_column(String, nullable=True) 53 + 54 + # Metadata 55 + created_at: Mapped[datetime] = mapped_column( 56 + DateTime(timezone=True), default=lambda: datetime.now(UTC) 57 + ) 58 + updated_at: Mapped[datetime] = mapped_column( 59 + DateTime(timezone=True), 60 + default=lambda: datetime.now(UTC), 61 + onupdate=lambda: datetime.now(UTC), 62 + ) 63 + completed_at: Mapped[datetime | None] = mapped_column( 64 + DateTime(timezone=True), nullable=True 65 + ) 66 + 67 + __table_args__ = ( 68 + Index("idx_jobs_owner", "owner_did"), 69 + Index("idx_jobs_updated_at", "updated_at"), 70 + )
+87
backend/src/backend/utilities/progress.py
··· 1 + """Progress tracking utilities for long-running operations.""" 2 + 3 + import asyncio 4 + import contextlib 5 + import logging 6 + from typing import Any 7 + 8 + from backend._internal.jobs import job_service 9 + from backend.models.job import JobStatus 10 + 11 + logger = logging.getLogger(__name__) 12 + 13 + 14 + class R2ProgressTracker: 15 + """Tracks R2 upload progress and updates the job service. 16 + 17 + Bridges the gap between boto3's synchronous callback and our async 18 + database job service. 19 + """ 20 + 21 + def __init__( 22 + self, 23 + job_id: str, 24 + total_size: int, 25 + message: str = "uploading to storage...", 26 + phase: str = "upload", 27 + update_interval: float = 1.0, 28 + ): 29 + self.job_id = job_id 30 + self.total_size = total_size 31 + self.message = message 32 + self.phase = phase 33 + self.update_interval = update_interval 34 + self._bytes_transferred = 0 35 + self._reporter_task: asyncio.Task | None = None 36 + 37 + def on_progress(self, bytes_amount: int) -> None: 38 + """Synchronous callback for boto3.""" 39 + self._bytes_transferred += bytes_amount 40 + 41 + async def __aenter__(self) -> "R2ProgressTracker": 42 + await self.start() 43 + return self 44 + 45 + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 46 + await self.stop() 47 + 48 + async def start(self) -> None: 49 + """Start the background reporting task.""" 50 + self._reporter_task = asyncio.create_task(self._report_loop()) 51 + 52 + async def stop(self) -> None: 53 + """Stop the reporting task and ensure final update.""" 54 + if self._reporter_task: 55 + self._reporter_task.cancel() 56 + with contextlib.suppress(asyncio.CancelledError): 57 + await self._reporter_task 58 + self._reporter_task = None 59 + 60 + async def _report_loop(self) -> None: 61 + """Periodic reporting loop.""" 62 + while True: 63 + pct = self.percentage 64 + 65 + # Don't report 100% until explicitly finished by the caller 66 + # via final job update, or let it sit at 99.9% 67 + report_pct = min(pct, 99.9) 68 + 69 + await job_service.update_progress( 70 + self.job_id, 71 + JobStatus.PROCESSING, 72 + self.message, 73 + phase=self.phase, 74 + progress_pct=report_pct, 75 + ) 76 + 77 + if pct >= 100.0: 78 + break 79 + 80 + await asyncio.sleep(self.update_interval) 81 + 82 + @property 83 + def percentage(self) -> float: 84 + """Calculate current percentage.""" 85 + if self.total_size <= 0: 86 + return 0.0 if self._bytes_transferred == 0 else 100.0 87 + return (self._bytes_transferred / self.total_size) * 100.0
+9 -4
frontend/src/lib/uploader.svelte.ts
··· 40 40 const uploadMessage = fileSizeMB > 10 41 41 ? 'uploading track... (large file, this may take a moment)' 42 42 : 'uploading track...'; 43 - const toastId = toast.info(uploadMessage, 30000); 43 + // 0 means infinite/persist until dismissed 44 + const toastId = toast.info(uploadMessage, 0); 44 45 45 46 if (!browser) return; 46 47 const formData = new FormData(); ··· 104 105 105 106 // show backend processing messages 106 107 if (update.message && update.status === 'processing') { 107 - // for upload phase with progress, show determinate progress 108 - // for other phases, just show the message 109 - toast.update(task.toastId, update.message); 108 + // if we have server-side progress, show it 109 + const serverProgress = update.server_progress_pct; 110 + if (serverProgress !== undefined && serverProgress !== null) { 111 + toast.update(task.toastId, `${update.message} (${Math.round(serverProgress)}%)`); 112 + } else { 113 + toast.update(task.toastId, update.message); 114 + } 110 115 } 111 116 112 117 if (update.status === 'completed') {
+19 -31
frontend/src/routes/portal/+page.svelte
··· 422 422 if (exportingMedia) return; 423 423 424 424 const trackCount = tracks.length; 425 - const toastId = toast.info(`preparing export of ${trackCount} ${trackCount === 1 ? 'track' : 'tracks'}...`, 30000); 425 + // 0 means infinite/persist until dismissed 426 + const toastId = toast.info(`preparing export of ${trackCount} ${trackCount === 1 ? 'track' : 'tracks'}...`, 0); 426 427 427 428 exportingMedia = true; 428 429 try { ··· 451 452 452 453 // show progress messages 453 454 if (update.message && update.status === 'processing') { 454 - const progressInfo = update.track_count 455 - ? ` (${update.processed_count}/${update.track_count})` 455 + const progressInfo = update.total_count 456 + ? ` (${update.processed_count}/${update.total_count})` 456 457 : ''; 457 458 toast.update(toastId, `${update.message}${progressInfo}`); 458 459 } ··· 462 463 exportingMedia = false; 463 464 464 465 // update toast to show download is starting 465 - toast.update(toastId, 'downloading export...'); 466 - 467 - // trigger download 468 - try { 469 - const downloadResponse = await fetch(`${API_URL}/exports/${exportId}/download`, { 470 - credentials: 'include' 471 - }); 472 - 473 - if (downloadResponse.ok) { 474 - const blob = await downloadResponse.blob(); 475 - const url = window.URL.createObjectURL(blob); 476 - const a = document.createElement('a'); 477 - a.href = url; 478 - a.download = `plyr-tracks-${new Date().toISOString().split('T')[0]}.zip`; 479 - document.body.appendChild(a); 480 - a.click(); 481 - window.URL.revokeObjectURL(url); 482 - document.body.removeChild(a); 483 - 484 - toast.dismiss(toastId); 485 - toast.success(`${update.processed_count || trackCount} ${trackCount === 1 ? 'track' : 'tracks'} exported successfully`); 486 - } else { 487 - toast.dismiss(toastId); 488 - toast.error('failed to download export'); 489 - } 490 - } catch (e) { 491 - console.error('download failed:', e); 466 + toast.update(toastId, 'download starting...'); 467 + 468 + if (update.download_url) { 469 + // Trigger download directly from R2 470 + const a = document.createElement('a'); 471 + a.href = update.download_url; 472 + a.download = `plyr-tracks-${new Date().toISOString().split('T')[0]}.zip`; 473 + document.body.appendChild(a); 474 + a.click(); 475 + document.body.removeChild(a); 476 + 477 + toast.dismiss(toastId); 478 + toast.success(`${update.processed_count || trackCount} ${trackCount === 1 ? 'track' : 'tracks'} exported successfully`); 479 + } else { 492 480 toast.dismiss(toastId); 493 - toast.error('failed to download export'); 481 + toast.error('export completed but download url missing'); 494 482 } 495 483 } 496 484