music on atproto
plyr.fm
1#!/usr/bin/env -S uv run --script --quiet
2"""backfill duration for tracks missing it.
3
4## Context
5
6Tracks uploaded before duration extraction was implemented have NULL duration.
7This affects teal.fm scrobbles which should include duration metadata.
8
9## What This Script Does
10
111. Finds all tracks with NULL duration in extra
122. Downloads audio files from R2 concurrently (semaphore-limited)
133. Extracts duration using mutagen
144. Updates database with extracted durations
15
16## Usage
17
18```bash
19# dry run (show what would be updated)
20uv run scripts/backfill_duration.py --dry-run
21
22# actually update the database
23uv run scripts/backfill_duration.py
24
25# limit concurrency (default: 10)
26uv run scripts/backfill_duration.py --concurrency 5
27
28# target specific environment
29DATABASE_URL=postgresql://... uv run scripts/backfill_duration.py
30```
31
32Run in order: dev → staging → prod
33"""
34
35import asyncio
36import io
37import logging
38import sys
39from pathlib import Path
40
41import httpx
42from mutagen import File as MutagenFile
43
44# add src to path so we can import backend modules
45sys.path.insert(0, str(Path(__file__).parent.parent / "backend" / "src"))
46
47from sqlalchemy import select, update
48
49from backend.models import Track
50from backend.utilities.database import db_session
51
52logging.basicConfig(
53 level=logging.INFO,
54 format="%(asctime)s - %(levelname)s - %(message)s",
55)
56logger = logging.getLogger(__name__)
57
58
59def extract_duration_from_bytes(audio_data: bytes) -> int | None:
60 """extract duration from audio bytes."""
61 try:
62 audio = MutagenFile(io.BytesIO(audio_data))
63 if audio is None or audio.info is None:
64 return None
65 length = getattr(audio.info, "length", None)
66 return int(length) if length else None
67 except Exception as e:
68 logger.warning(f"mutagen error: {e}")
69 return None
70
71
72async def fetch_and_extract(
73 client: httpx.AsyncClient,
74 track: Track,
75 semaphore: asyncio.Semaphore,
76) -> tuple[int, int | None, str | None]:
77 """fetch audio from R2 and extract duration.
78
79 returns: (track_id, duration, error)
80 """
81 async with semaphore:
82 if not track.r2_url:
83 return (track.id, None, "no r2_url")
84
85 try:
86 logger.info(f"fetching track {track.id}: {track.title[:40]}...")
87 response = await client.get(track.r2_url, follow_redirects=True)
88 response.raise_for_status()
89
90 duration = extract_duration_from_bytes(response.content)
91 if duration:
92 logger.info(f" → {duration}s")
93 return (track.id, duration, None)
94 else:
95 return (track.id, None, "could not extract duration")
96
97 except httpx.HTTPStatusError as e:
98 return (track.id, None, f"HTTP {e.response.status_code}")
99 except Exception as e:
100 return (track.id, None, str(e))
101
102
103async def fetch_and_extract_simple(
104 client: httpx.AsyncClient,
105 track_id: int,
106 title: str,
107 r2_url: str | None,
108 semaphore: asyncio.Semaphore,
109) -> tuple[int, int | None, str | None]:
110 """fetch audio header from R2 and extract duration.
111
112 uses Range request to fetch only first 256KB (enough for metadata).
113 falls back to full download if range request fails or duration not found.
114
115 returns: (track_id, duration, error)
116 """
117 async with semaphore:
118 if not r2_url:
119 return (track_id, None, "no r2_url")
120
121 try:
122 logger.info(f"fetching track {track_id}: {title[:40]}...")
123
124 # try range request first (256KB should be enough for most formats)
125 headers = {"Range": "bytes=0-262143"}
126 response = await client.get(r2_url, headers=headers, follow_redirects=True)
127
128 # 206 = partial content (range worked), 200 = full file (range ignored)
129 if response.status_code not in (200, 206):
130 response.raise_for_status()
131
132 duration = extract_duration_from_bytes(response.content)
133 if duration:
134 logger.info(f" → {duration}s")
135 return (track_id, duration, None)
136
137 # if range didn't give us duration, try full file
138 if response.status_code == 206:
139 logger.info(" range request didn't work, fetching full file...")
140 response = await client.get(r2_url, follow_redirects=True)
141 response.raise_for_status()
142 duration = extract_duration_from_bytes(response.content)
143 if duration:
144 logger.info(f" → {duration}s")
145 return (track_id, duration, None)
146
147 return (track_id, None, "could not extract duration")
148
149 except httpx.HTTPStatusError as e:
150 return (track_id, None, f"HTTP {e.response.status_code}")
151 except Exception as e:
152 return (track_id, None, str(e))
153
154
155async def backfill_duration(dry_run: bool = False, concurrency: int = 10) -> None:
156 """backfill duration for tracks missing it."""
157
158 # phase 1: query tracks needing backfill, then close connection
159 track_data: list[tuple[int, str, str | None, dict | None]] = []
160 async with db_session() as db:
161 stmt = select(Track).where(
162 Track.extra["duration"].astext.is_(None) | ~Track.extra.has_key("duration")
163 )
164 result = await db.execute(stmt)
165 tracks = list(result.scalars().all())
166
167 if not tracks:
168 logger.info("no tracks need duration backfill")
169 return
170
171 logger.info(f"found {len(tracks)} tracks needing duration backfill")
172
173 if dry_run:
174 logger.info("dry run mode - tracks that would be updated:")
175 for track in tracks:
176 logger.info(f" {track.id}: {track.title} ({track.r2_url})")
177 return
178
179 # extract plain data before closing session
180 track_data = [(t.id, t.title, t.r2_url, t.extra) for t in tracks]
181
182 # phase 2: download files and extract durations (no DB connection)
183 semaphore = asyncio.Semaphore(concurrency)
184 logger.info(
185 f"processing {len(track_data)} tracks with concurrency={concurrency}..."
186 )
187
188 async with httpx.AsyncClient(timeout=120.0) as client:
189 results = await asyncio.gather(
190 *[
191 fetch_and_extract_simple(client, tid, title, r2_url, semaphore)
192 for tid, title, r2_url, _ in track_data
193 ]
194 )
195
196 # build update map
197 updates: list[tuple[int, dict]] = []
198 failed = 0
199 track_extras = {tid: extra or {} for tid, _, _, extra in track_data}
200 track_titles = {tid: title for tid, title, _, _ in track_data}
201
202 for track_id, duration, error in results:
203 if duration:
204 new_extra = {**track_extras[track_id], "duration": duration}
205 updates.append((track_id, new_extra))
206 else:
207 failed += 1
208 logger.warning(
209 f"failed track {track_id} ({track_titles[track_id]}): {error}"
210 )
211
212 if not updates:
213 logger.info("no updates to commit")
214 return
215
216 # phase 3: fresh connection to commit updates
217 logger.info(f"committing {len(updates)} updates...")
218 async with db_session() as db:
219 for track_id, new_extra in updates:
220 stmt = update(Track).where(Track.id == track_id).values(extra=new_extra)
221 await db.execute(stmt)
222 await db.commit()
223
224 logger.info(f"backfill complete: {len(updates)} updated, {failed} failed")
225
226
227async def main() -> None:
228 """main entry point."""
229 dry_run = "--dry-run" in sys.argv
230
231 concurrency = 10
232 for i, arg in enumerate(sys.argv):
233 if arg == "--concurrency" and i + 1 < len(sys.argv):
234 concurrency = int(sys.argv[i + 1])
235
236 if dry_run:
237 logger.info("DRY RUN mode - no changes will be made")
238
239 await backfill_duration(dry_run=dry_run, concurrency=concurrency)
240
241
242if __name__ == "__main__":
243 asyncio.run(main())