music on atproto
plyr.fm
1#!/usr/bin/env -S uv run --script --quiet --with-editable=backend
2# /// script
3# requires-python = ">=3.12"
4# dependencies = [
5# "httpx",
6# "pydantic-settings",
7# ]
8# ///
9"""backfill copyright labels for flagged tracks.
10
11usage:
12 uv run scripts/backfill_copyright_labels.py --env prod --dry-run
13 uv run scripts/backfill_copyright_labels.py --env prod
14
15this will:
16- fetch all tracks flagged in copyright_scans that have atproto_record_uri
17- emit labels to the moderation service for each flagged track
18
19environment variables (set in .env or export):
20 PROD_DATABASE_URL - production database connection string
21 STAGING_DATABASE_URL - staging database connection string
22 MODERATION_SERVICE_URL - URL of moderation service (default: https://moderation.plyr.fm)
23 MODERATION_AUTH_TOKEN - auth token for moderation service
24"""
25
26import asyncio
27import os
28import sys
29from typing import Literal
30
31import httpx
32from pydantic import Field
33from pydantic_settings import BaseSettings, SettingsConfigDict
34
35
36Environment = Literal["dev", "staging", "prod"]
37
38
39class BackfillSettings(BaseSettings):
40 """settings for backfill script."""
41
42 model_config = SettingsConfigDict(
43 env_file=".env",
44 case_sensitive=False,
45 extra="ignore",
46 )
47
48 dev_database_url: str = Field(default="", validation_alias="DEV_DATABASE_URL")
49 staging_database_url: str = Field(
50 default="", validation_alias="STAGING_DATABASE_URL"
51 )
52 prod_database_url: str = Field(default="", validation_alias="PROD_DATABASE_URL")
53
54 moderation_service_url: str = Field(
55 default="https://moderation.plyr.fm",
56 validation_alias="MODERATION_SERVICE_URL",
57 )
58 moderation_auth_token: str = Field(
59 default="", validation_alias="MODERATION_AUTH_TOKEN"
60 )
61
62 def get_database_url(self, env: Environment) -> str:
63 """get database URL for environment."""
64 urls = {
65 "dev": self.dev_database_url,
66 "staging": self.staging_database_url,
67 "prod": self.prod_database_url,
68 }
69 url = urls.get(env, "")
70 if not url:
71 raise ValueError(f"no database URL configured for {env}")
72 return url
73
74
75def setup_env(settings: BackfillSettings, env: Environment) -> None:
76 """setup environment variables for backend imports."""
77 db_url = settings.get_database_url(env)
78 # ensure asyncpg driver is used
79 if db_url.startswith("postgresql://"):
80 db_url = db_url.replace("postgresql://", "postgresql+asyncpg://", 1)
81 # asyncpg uses 'ssl' not 'sslmode' - convert the parameter
82 db_url = db_url.replace("sslmode=require", "ssl=require")
83 os.environ["DATABASE_URL"] = db_url
84
85
86async def emit_label(
87 client: httpx.AsyncClient,
88 settings: BackfillSettings,
89 uri: str,
90 cid: str | None,
91) -> bool:
92 """emit a copyright-violation label for a track."""
93 try:
94 response = await client.post(
95 f"{settings.moderation_service_url}/emit-label",
96 json={
97 "uri": uri,
98 "val": "copyright-violation",
99 "cid": cid,
100 },
101 headers={"X-Moderation-Key": settings.moderation_auth_token},
102 timeout=30.0,
103 )
104 response.raise_for_status()
105 return True
106 except httpx.HTTPStatusError as e:
107 print(f" ❌ HTTP error: {e.response.status_code}")
108 try:
109 print(f" {e.response.json()}")
110 except Exception:
111 print(f" {e.response.text[:200]}")
112 return False
113 except Exception as e:
114 print(f" ❌ error: {e}")
115 return False
116
117
118async def run_backfill(env: Environment, dry_run: bool = False) -> None:
119 """backfill copyright labels for flagged tracks."""
120 settings = BackfillSettings()
121
122 # validate settings
123 try:
124 db_url = settings.get_database_url(env)
125 print(
126 f"✓ database: {db_url.split('@')[1].split('/')[0] if '@' in db_url else 'configured'}"
127 )
128 except ValueError as e:
129 print(f"❌ {e}")
130 print(f"\nset {env.upper()}_DATABASE_URL in .env")
131 sys.exit(1)
132
133 if not settings.moderation_auth_token:
134 print("❌ MODERATION_AUTH_TOKEN not set")
135 sys.exit(1)
136
137 print(f"✓ moderation service: {settings.moderation_service_url}")
138
139 # setup env before backend imports
140 setup_env(settings, env)
141
142 # import backend after env setup
143 from sqlalchemy import select
144 from sqlalchemy.orm import joinedload
145
146 from backend.models import CopyrightScan, Track
147 from backend.utilities.database import db_session
148
149 async with db_session() as db:
150 # find flagged tracks with atproto URIs
151 stmt = (
152 select(Track)
153 .options(joinedload(Track.artist))
154 .join(CopyrightScan, CopyrightScan.track_id == Track.id)
155 .where(CopyrightScan.is_flagged.is_(True))
156 .where(Track.atproto_record_uri.isnot(None))
157 .order_by(Track.created_at.desc())
158 )
159
160 result = await db.execute(stmt)
161 tracks = result.scalars().unique().all()
162
163 if not tracks:
164 print("\n✅ no flagged tracks need label backfill")
165 return
166
167 print(f"\n📋 found {len(tracks)} flagged tracks with ATProto URIs")
168
169 if dry_run:
170 print("\n[DRY RUN] would emit labels for:")
171 for track in tracks:
172 print(f" - {track.id}: {track.title} by @{track.artist.handle}")
173 print(f" uri: {track.atproto_record_uri}")
174 return
175
176 # emit labels
177 async with httpx.AsyncClient() as client:
178 emitted = 0
179 failed = 0
180
181 for i, track in enumerate(tracks, 1):
182 print(f"\n[{i}/{len(tracks)}] emitting label for: {track.title}")
183 print(f" artist: @{track.artist.handle}")
184 print(f" uri: {track.atproto_record_uri}")
185
186 success = await emit_label(
187 client,
188 settings,
189 track.atproto_record_uri,
190 track.atproto_record_cid,
191 )
192
193 if success:
194 emitted += 1
195 print(" ✓ label emitted")
196 else:
197 failed += 1
198
199 print(f"\n{'=' * 50}")
200 print("✅ backfill complete")
201 print(f" emitted: {emitted}")
202 print(f" failed: {failed}")
203
204
205def main() -> None:
206 """main entry point."""
207 import argparse
208
209 parser = argparse.ArgumentParser(
210 description="backfill copyright labels for flagged tracks"
211 )
212 parser.add_argument(
213 "--env",
214 type=str,
215 required=True,
216 choices=["dev", "staging", "prod"],
217 help="environment to backfill",
218 )
219 parser.add_argument(
220 "--dry-run",
221 action="store_true",
222 help="show what would be emitted without making changes",
223 )
224
225 args = parser.parse_args()
226
227 print(f"🏷️ copyright label backfill - {args.env}")
228 print("=" * 50)
229
230 asyncio.run(run_backfill(args.env, args.dry_run))
231
232
233if __name__ == "__main__":
234 main()