at main 7.2 kB view raw
1#!/usr/bin/env -S uv run --script --quiet --with-editable=backend 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [ 5# "httpx", 6# "pydantic-settings", 7# ] 8# /// 9"""backfill copyright labels for flagged tracks. 10 11usage: 12 uv run scripts/backfill_copyright_labels.py --env prod --dry-run 13 uv run scripts/backfill_copyright_labels.py --env prod 14 15this will: 16- fetch all tracks flagged in copyright_scans that have atproto_record_uri 17- emit labels to the moderation service for each flagged track 18 19environment variables (set in .env or export): 20 PROD_DATABASE_URL - production database connection string 21 STAGING_DATABASE_URL - staging database connection string 22 MODERATION_SERVICE_URL - URL of moderation service (default: https://moderation.plyr.fm) 23 MODERATION_AUTH_TOKEN - auth token for moderation service 24""" 25 26import asyncio 27import os 28import sys 29from typing import Literal 30 31import httpx 32from pydantic import Field 33from pydantic_settings import BaseSettings, SettingsConfigDict 34 35 36Environment = Literal["dev", "staging", "prod"] 37 38 39class BackfillSettings(BaseSettings): 40 """settings for backfill script.""" 41 42 model_config = SettingsConfigDict( 43 env_file=".env", 44 case_sensitive=False, 45 extra="ignore", 46 ) 47 48 dev_database_url: str = Field(default="", validation_alias="DEV_DATABASE_URL") 49 staging_database_url: str = Field( 50 default="", validation_alias="STAGING_DATABASE_URL" 51 ) 52 prod_database_url: str = Field(default="", validation_alias="PROD_DATABASE_URL") 53 54 moderation_service_url: str = Field( 55 default="https://moderation.plyr.fm", 56 validation_alias="MODERATION_SERVICE_URL", 57 ) 58 moderation_auth_token: str = Field( 59 default="", validation_alias="MODERATION_AUTH_TOKEN" 60 ) 61 62 def get_database_url(self, env: Environment) -> str: 63 """get database URL for environment.""" 64 urls = { 65 "dev": self.dev_database_url, 66 "staging": self.staging_database_url, 67 "prod": self.prod_database_url, 68 } 69 url = urls.get(env, "") 70 if not url: 71 raise ValueError(f"no database URL configured for {env}") 72 return url 73 74 75def setup_env(settings: BackfillSettings, env: Environment) -> None: 76 """setup environment variables for backend imports.""" 77 db_url = settings.get_database_url(env) 78 # ensure asyncpg driver is used 79 if db_url.startswith("postgresql://"): 80 db_url = db_url.replace("postgresql://", "postgresql+asyncpg://", 1) 81 # asyncpg uses 'ssl' not 'sslmode' - convert the parameter 82 db_url = db_url.replace("sslmode=require", "ssl=require") 83 os.environ["DATABASE_URL"] = db_url 84 85 86async def emit_label( 87 client: httpx.AsyncClient, 88 settings: BackfillSettings, 89 uri: str, 90 cid: str | None, 91) -> bool: 92 """emit a copyright-violation label for a track.""" 93 try: 94 response = await client.post( 95 f"{settings.moderation_service_url}/emit-label", 96 json={ 97 "uri": uri, 98 "val": "copyright-violation", 99 "cid": cid, 100 }, 101 headers={"X-Moderation-Key": settings.moderation_auth_token}, 102 timeout=30.0, 103 ) 104 response.raise_for_status() 105 return True 106 except httpx.HTTPStatusError as e: 107 print(f" ❌ HTTP error: {e.response.status_code}") 108 try: 109 print(f" {e.response.json()}") 110 except Exception: 111 print(f" {e.response.text[:200]}") 112 return False 113 except Exception as e: 114 print(f" ❌ error: {e}") 115 return False 116 117 118async def run_backfill(env: Environment, dry_run: bool = False) -> None: 119 """backfill copyright labels for flagged tracks.""" 120 settings = BackfillSettings() 121 122 # validate settings 123 try: 124 db_url = settings.get_database_url(env) 125 print( 126 f"✓ database: {db_url.split('@')[1].split('/')[0] if '@' in db_url else 'configured'}" 127 ) 128 except ValueError as e: 129 print(f"{e}") 130 print(f"\nset {env.upper()}_DATABASE_URL in .env") 131 sys.exit(1) 132 133 if not settings.moderation_auth_token: 134 print("❌ MODERATION_AUTH_TOKEN not set") 135 sys.exit(1) 136 137 print(f"✓ moderation service: {settings.moderation_service_url}") 138 139 # setup env before backend imports 140 setup_env(settings, env) 141 142 # import backend after env setup 143 from sqlalchemy import select 144 from sqlalchemy.orm import joinedload 145 146 from backend.models import CopyrightScan, Track 147 from backend.utilities.database import db_session 148 149 async with db_session() as db: 150 # find flagged tracks with atproto URIs 151 stmt = ( 152 select(Track) 153 .options(joinedload(Track.artist)) 154 .join(CopyrightScan, CopyrightScan.track_id == Track.id) 155 .where(CopyrightScan.is_flagged.is_(True)) 156 .where(Track.atproto_record_uri.isnot(None)) 157 .order_by(Track.created_at.desc()) 158 ) 159 160 result = await db.execute(stmt) 161 tracks = result.scalars().unique().all() 162 163 if not tracks: 164 print("\n✅ no flagged tracks need label backfill") 165 return 166 167 print(f"\n📋 found {len(tracks)} flagged tracks with ATProto URIs") 168 169 if dry_run: 170 print("\n[DRY RUN] would emit labels for:") 171 for track in tracks: 172 print(f" - {track.id}: {track.title} by @{track.artist.handle}") 173 print(f" uri: {track.atproto_record_uri}") 174 return 175 176 # emit labels 177 async with httpx.AsyncClient() as client: 178 emitted = 0 179 failed = 0 180 181 for i, track in enumerate(tracks, 1): 182 print(f"\n[{i}/{len(tracks)}] emitting label for: {track.title}") 183 print(f" artist: @{track.artist.handle}") 184 print(f" uri: {track.atproto_record_uri}") 185 186 success = await emit_label( 187 client, 188 settings, 189 track.atproto_record_uri, 190 track.atproto_record_cid, 191 ) 192 193 if success: 194 emitted += 1 195 print(" ✓ label emitted") 196 else: 197 failed += 1 198 199 print(f"\n{'=' * 50}") 200 print("✅ backfill complete") 201 print(f" emitted: {emitted}") 202 print(f" failed: {failed}") 203 204 205def main() -> None: 206 """main entry point.""" 207 import argparse 208 209 parser = argparse.ArgumentParser( 210 description="backfill copyright labels for flagged tracks" 211 ) 212 parser.add_argument( 213 "--env", 214 type=str, 215 required=True, 216 choices=["dev", "staging", "prod"], 217 help="environment to backfill", 218 ) 219 parser.add_argument( 220 "--dry-run", 221 action="store_true", 222 help="show what would be emitted without making changes", 223 ) 224 225 args = parser.parse_args() 226 227 print(f"🏷️ copyright label backfill - {args.env}") 228 print("=" * 50) 229 230 asyncio.run(run_backfill(args.env, args.dry_run)) 231 232 233if __name__ == "__main__": 234 main()