at main 8.0 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2"""backfill duration for tracks missing it. 3 4## Context 5 6Tracks uploaded before duration extraction was implemented have NULL duration. 7This affects teal.fm scrobbles which should include duration metadata. 8 9## What This Script Does 10 111. Finds all tracks with NULL duration in extra 122. Downloads audio files from R2 concurrently (semaphore-limited) 133. Extracts duration using mutagen 144. Updates database with extracted durations 15 16## Usage 17 18```bash 19# dry run (show what would be updated) 20uv run scripts/backfill_duration.py --dry-run 21 22# actually update the database 23uv run scripts/backfill_duration.py 24 25# limit concurrency (default: 10) 26uv run scripts/backfill_duration.py --concurrency 5 27 28# target specific environment 29DATABASE_URL=postgresql://... uv run scripts/backfill_duration.py 30``` 31 32Run in order: dev → staging → prod 33""" 34 35import asyncio 36import io 37import logging 38import sys 39from pathlib import Path 40 41import httpx 42from mutagen import File as MutagenFile 43 44# add src to path so we can import backend modules 45sys.path.insert(0, str(Path(__file__).parent.parent / "backend" / "src")) 46 47from sqlalchemy import select, update 48 49from backend.models import Track 50from backend.utilities.database import db_session 51 52logging.basicConfig( 53 level=logging.INFO, 54 format="%(asctime)s - %(levelname)s - %(message)s", 55) 56logger = logging.getLogger(__name__) 57 58 59def extract_duration_from_bytes(audio_data: bytes) -> int | None: 60 """extract duration from audio bytes.""" 61 try: 62 audio = MutagenFile(io.BytesIO(audio_data)) 63 if audio is None or audio.info is None: 64 return None 65 length = getattr(audio.info, "length", None) 66 return int(length) if length else None 67 except Exception as e: 68 logger.warning(f"mutagen error: {e}") 69 return None 70 71 72async def fetch_and_extract( 73 client: httpx.AsyncClient, 74 track: Track, 75 semaphore: asyncio.Semaphore, 76) -> tuple[int, int | None, str | None]: 77 """fetch audio from R2 and extract duration. 78 79 returns: (track_id, duration, error) 80 """ 81 async with semaphore: 82 if not track.r2_url: 83 return (track.id, None, "no r2_url") 84 85 try: 86 logger.info(f"fetching track {track.id}: {track.title[:40]}...") 87 response = await client.get(track.r2_url, follow_redirects=True) 88 response.raise_for_status() 89 90 duration = extract_duration_from_bytes(response.content) 91 if duration: 92 logger.info(f"{duration}s") 93 return (track.id, duration, None) 94 else: 95 return (track.id, None, "could not extract duration") 96 97 except httpx.HTTPStatusError as e: 98 return (track.id, None, f"HTTP {e.response.status_code}") 99 except Exception as e: 100 return (track.id, None, str(e)) 101 102 103async def fetch_and_extract_simple( 104 client: httpx.AsyncClient, 105 track_id: int, 106 title: str, 107 r2_url: str | None, 108 semaphore: asyncio.Semaphore, 109) -> tuple[int, int | None, str | None]: 110 """fetch audio header from R2 and extract duration. 111 112 uses Range request to fetch only first 256KB (enough for metadata). 113 falls back to full download if range request fails or duration not found. 114 115 returns: (track_id, duration, error) 116 """ 117 async with semaphore: 118 if not r2_url: 119 return (track_id, None, "no r2_url") 120 121 try: 122 logger.info(f"fetching track {track_id}: {title[:40]}...") 123 124 # try range request first (256KB should be enough for most formats) 125 headers = {"Range": "bytes=0-262143"} 126 response = await client.get(r2_url, headers=headers, follow_redirects=True) 127 128 # 206 = partial content (range worked), 200 = full file (range ignored) 129 if response.status_code not in (200, 206): 130 response.raise_for_status() 131 132 duration = extract_duration_from_bytes(response.content) 133 if duration: 134 logger.info(f"{duration}s") 135 return (track_id, duration, None) 136 137 # if range didn't give us duration, try full file 138 if response.status_code == 206: 139 logger.info(" range request didn't work, fetching full file...") 140 response = await client.get(r2_url, follow_redirects=True) 141 response.raise_for_status() 142 duration = extract_duration_from_bytes(response.content) 143 if duration: 144 logger.info(f"{duration}s") 145 return (track_id, duration, None) 146 147 return (track_id, None, "could not extract duration") 148 149 except httpx.HTTPStatusError as e: 150 return (track_id, None, f"HTTP {e.response.status_code}") 151 except Exception as e: 152 return (track_id, None, str(e)) 153 154 155async def backfill_duration(dry_run: bool = False, concurrency: int = 10) -> None: 156 """backfill duration for tracks missing it.""" 157 158 # phase 1: query tracks needing backfill, then close connection 159 track_data: list[tuple[int, str, str | None, dict | None]] = [] 160 async with db_session() as db: 161 stmt = select(Track).where( 162 Track.extra["duration"].astext.is_(None) | ~Track.extra.has_key("duration") 163 ) 164 result = await db.execute(stmt) 165 tracks = list(result.scalars().all()) 166 167 if not tracks: 168 logger.info("no tracks need duration backfill") 169 return 170 171 logger.info(f"found {len(tracks)} tracks needing duration backfill") 172 173 if dry_run: 174 logger.info("dry run mode - tracks that would be updated:") 175 for track in tracks: 176 logger.info(f" {track.id}: {track.title} ({track.r2_url})") 177 return 178 179 # extract plain data before closing session 180 track_data = [(t.id, t.title, t.r2_url, t.extra) for t in tracks] 181 182 # phase 2: download files and extract durations (no DB connection) 183 semaphore = asyncio.Semaphore(concurrency) 184 logger.info( 185 f"processing {len(track_data)} tracks with concurrency={concurrency}..." 186 ) 187 188 async with httpx.AsyncClient(timeout=120.0) as client: 189 results = await asyncio.gather( 190 *[ 191 fetch_and_extract_simple(client, tid, title, r2_url, semaphore) 192 for tid, title, r2_url, _ in track_data 193 ] 194 ) 195 196 # build update map 197 updates: list[tuple[int, dict]] = [] 198 failed = 0 199 track_extras = {tid: extra or {} for tid, _, _, extra in track_data} 200 track_titles = {tid: title for tid, title, _, _ in track_data} 201 202 for track_id, duration, error in results: 203 if duration: 204 new_extra = {**track_extras[track_id], "duration": duration} 205 updates.append((track_id, new_extra)) 206 else: 207 failed += 1 208 logger.warning( 209 f"failed track {track_id} ({track_titles[track_id]}): {error}" 210 ) 211 212 if not updates: 213 logger.info("no updates to commit") 214 return 215 216 # phase 3: fresh connection to commit updates 217 logger.info(f"committing {len(updates)} updates...") 218 async with db_session() as db: 219 for track_id, new_extra in updates: 220 stmt = update(Track).where(Track.id == track_id).values(extra=new_extra) 221 await db.execute(stmt) 222 await db.commit() 223 224 logger.info(f"backfill complete: {len(updates)} updated, {failed} failed") 225 226 227async def main() -> None: 228 """main entry point.""" 229 dry_run = "--dry-run" in sys.argv 230 231 concurrency = 10 232 for i, arg in enumerate(sys.argv): 233 if arg == "--concurrency" and i + 1 < len(sys.argv): 234 concurrency = int(sys.argv[i + 1]) 235 236 if dry_run: 237 logger.info("DRY RUN mode - no changes will be made") 238 239 await backfill_duration(dry_run=dry_run, concurrency=concurrency) 240 241 242if __name__ == "__main__": 243 asyncio.run(main())