at main 12 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [ 5# "pydantic-ai>=0.1.0", 6# "anthropic", 7# "httpx", 8# "pydantic>=2.0", 9# "pydantic-settings", 10# "atproto>=0.0.55", 11# "rich", 12# ] 13# /// 14"""autonomous moderation loop for plyr.fm. 15 16workflow: 171. fetch pending flags from moderation service 182. analyze each flag with LLM (FALSE_POSITIVE, VIOLATION, NEEDS_HUMAN) 193. auto-resolve false positives 204. create review batch for needs_human flags 215. send DM with link to review UI 22 23the review UI handles human decisions - DM is just a notification channel. 24""" 25 26import argparse 27import asyncio 28from dataclasses import dataclass, field 29from pathlib import Path 30 31import httpx 32from atproto import AsyncClient, models 33from pydantic import BaseModel, Field 34from pydantic_ai import Agent 35from pydantic_ai.models.anthropic import AnthropicModel 36from pydantic_settings import BaseSettings, SettingsConfigDict 37from rich.console import Console 38 39console = Console() 40 41 42class LoopSettings(BaseSettings): 43 model_config = SettingsConfigDict( 44 env_file=Path(__file__).parent.parent / ".env", 45 case_sensitive=False, 46 extra="ignore", 47 ) 48 moderation_service_url: str = Field( 49 default="https://moderation.plyr.fm", validation_alias="MODERATION_SERVICE_URL" 50 ) 51 moderation_auth_token: str = Field( 52 default="", validation_alias="MODERATION_AUTH_TOKEN" 53 ) 54 anthropic_api_key: str = Field(default="", validation_alias="ANTHROPIC_API_KEY") 55 anthropic_model: str = Field( 56 default="claude-sonnet-4-20250514", validation_alias="ANTHROPIC_MODEL" 57 ) 58 bot_handle: str = Field(default="", validation_alias="NOTIFY_BOT_HANDLE") 59 bot_password: str = Field(default="", validation_alias="NOTIFY_BOT_PASSWORD") 60 recipient_handle: str = Field( 61 default="", validation_alias="NOTIFY_RECIPIENT_HANDLE" 62 ) 63 64 65class FlagAnalysis(BaseModel): 66 """result of analyzing a single flag.""" 67 68 uri: str 69 category: str = Field(description="FALSE_POSITIVE, VIOLATION, or NEEDS_HUMAN") 70 reason: str 71 72 73@dataclass 74class DMClient: 75 handle: str 76 password: str 77 recipient_handle: str 78 _client: AsyncClient = field(init=False, repr=False) 79 _dm_client: AsyncClient = field(init=False, repr=False) 80 _recipient_did: str = field(init=False, repr=False) 81 _convo_id: str = field(init=False, repr=False) 82 83 async def setup(self) -> None: 84 self._client = AsyncClient() 85 await self._client.login(self.handle, self.password) 86 self._dm_client = self._client.with_bsky_chat_proxy() 87 profile = await self._client.app.bsky.actor.get_profile( 88 {"actor": self.recipient_handle} 89 ) 90 self._recipient_did = profile.did 91 convo = await self._dm_client.chat.bsky.convo.get_convo_for_members( 92 models.ChatBskyConvoGetConvoForMembers.Params(members=[self._recipient_did]) 93 ) 94 self._convo_id = convo.convo.id 95 96 async def get_messages(self, limit: int = 30) -> list[dict]: 97 response = await self._dm_client.chat.bsky.convo.get_messages( 98 models.ChatBskyConvoGetMessages.Params(convo_id=self._convo_id, limit=limit) 99 ) 100 return [ 101 { 102 "text": m.text, 103 "is_bot": m.sender.did != self._recipient_did, 104 "sent_at": getattr(m, "sent_at", None), 105 } 106 for m in response.messages 107 if hasattr(m, "text") and hasattr(m, "sender") 108 ] 109 110 async def send(self, text: str) -> None: 111 await self._dm_client.chat.bsky.convo.send_message( 112 models.ChatBskyConvoSendMessage.Data( 113 convo_id=self._convo_id, 114 message=models.ChatBskyConvoDefs.MessageInput(text=text), 115 ) 116 ) 117 118 119@dataclass 120class PlyrClient: 121 """client for checking track existence in plyr.fm.""" 122 123 env: str = "prod" 124 _client: httpx.AsyncClient = field(init=False, repr=False) 125 126 def __post_init__(self) -> None: 127 base_url = { 128 "prod": "https://api.plyr.fm", 129 "staging": "https://api-stg.plyr.fm", 130 "dev": "http://localhost:8001", 131 }.get(self.env, "https://api.plyr.fm") 132 self._client = httpx.AsyncClient(base_url=base_url, timeout=10.0) 133 134 async def close(self) -> None: 135 await self._client.aclose() 136 137 async def track_exists(self, track_id: int) -> bool: 138 """check if a track exists (returns False if 404).""" 139 try: 140 r = await self._client.get(f"/tracks/{track_id}") 141 return r.status_code == 200 142 except Exception: 143 return True # assume exists on error (don't accidentally delete labels) 144 145 146@dataclass 147class ModClient: 148 base_url: str 149 auth_token: str 150 _client: httpx.AsyncClient = field(init=False, repr=False) 151 152 def __post_init__(self) -> None: 153 self._client = httpx.AsyncClient( 154 base_url=self.base_url, 155 headers={"X-Moderation-Key": self.auth_token}, 156 timeout=30.0, 157 ) 158 159 async def close(self) -> None: 160 await self._client.aclose() 161 162 async def list_pending(self) -> list[dict]: 163 r = await self._client.get("/admin/flags", params={"filter": "pending"}) 164 r.raise_for_status() 165 return r.json().get("tracks", []) 166 167 async def resolve(self, uri: str, reason: str, notes: str = "") -> None: 168 r = await self._client.post( 169 "/admin/resolve", 170 json={ 171 "uri": uri, 172 "val": "copyright-violation", 173 "reason": reason, 174 "notes": notes, 175 }, 176 ) 177 r.raise_for_status() 178 179 async def create_batch( 180 self, uris: list[str], created_by: str | None = None 181 ) -> dict: 182 """create a review batch and return {id, url, flag_count}.""" 183 r = await self._client.post( 184 "/admin/batches", 185 json={"uris": uris, "created_by": created_by}, 186 ) 187 r.raise_for_status() 188 return r.json() 189 190 191def get_header(env: str) -> str: 192 return f"[PLYR-MOD:{env.upper()}]" 193 194 195def create_flag_analyzer(api_key: str, model: str) -> Agent[None, list[FlagAnalysis]]: 196 from pydantic_ai.providers.anthropic import AnthropicProvider 197 198 return Agent( 199 model=AnthropicModel(model, provider=AnthropicProvider(api_key=api_key)), 200 output_type=list[FlagAnalysis], 201 system_prompt="""\ 202analyze each copyright flag. categorize as: 203- FALSE_POSITIVE: fingerprint noise, uploader is the artist, unrelated matches 204- VIOLATION: clearly copyrighted commercial content 205- NEEDS_HUMAN: ambiguous, need human review 206 207return a FlagAnalysis for each flag with uri, category, and brief reason. 208""", 209 ) 210 211 212async def run_loop( 213 dry_run: bool = False, limit: int | None = None, env: str = "prod" 214) -> None: 215 settings = LoopSettings() 216 for attr in [ 217 "moderation_auth_token", 218 "anthropic_api_key", 219 "bot_handle", 220 "bot_password", 221 "recipient_handle", 222 ]: 223 if not getattr(settings, attr): 224 console.print(f"[red]missing {attr}[/red]") 225 return 226 227 console.print(f"[bold]moderation loop[/bold] ({settings.anthropic_model})") 228 if dry_run: 229 console.print("[yellow]DRY RUN[/yellow]") 230 231 dm = DMClient(settings.bot_handle, settings.bot_password, settings.recipient_handle) 232 mod = ModClient(settings.moderation_service_url, settings.moderation_auth_token) 233 plyr = PlyrClient(env=env) 234 235 try: 236 await dm.setup() 237 238 # get pending flags 239 pending = await mod.list_pending() 240 if not pending: 241 console.print("[green]no pending flags[/green]") 242 return 243 244 console.print(f"[bold]{len(pending)} pending flags[/bold]") 245 246 # check for deleted tracks and auto-resolve them 247 console.print("[dim]checking for deleted tracks...[/dim]") 248 active_flags = [] 249 deleted_count = 0 250 for flag in pending: 251 track_id = flag.get("context", {}).get("track_id") 252 if track_id and not await plyr.track_exists(track_id): 253 # track was deleted - resolve the flag 254 if not dry_run: 255 try: 256 await mod.resolve( 257 flag["uri"], "content_deleted", "track no longer exists" 258 ) 259 console.print( 260 f" [yellow]⌫[/yellow] deleted: {flag['uri'][-40:]}" 261 ) 262 deleted_count += 1 263 except Exception as e: 264 console.print(f" [red]✗[/red] {e}") 265 active_flags.append(flag) 266 else: 267 console.print( 268 f" [yellow]would resolve deleted:[/yellow] {flag['uri'][-40:]}" 269 ) 270 deleted_count += 1 271 else: 272 active_flags.append(flag) 273 274 if deleted_count > 0: 275 console.print(f"[yellow]{deleted_count} deleted tracks resolved[/yellow]") 276 277 pending = active_flags 278 if not pending: 279 console.print("[green]all flags were for deleted tracks[/green]") 280 return 281 282 # analyze remaining flags 283 if limit: 284 pending = pending[:limit] 285 286 analyzer = create_flag_analyzer( 287 settings.anthropic_api_key, settings.anthropic_model 288 ) 289 desc = "\n---\n".join( 290 f"URI: {f['uri']}\nTrack: {f.get('context', {}).get('track_title', '?')}\n" 291 f"Uploader: @{f.get('context', {}).get('artist_handle', '?')}\n" 292 f"Matches: {', '.join(m['artist'] for m in f.get('context', {}).get('matches', [])[:3])}" 293 for f in pending 294 ) 295 result = await analyzer.run(f"analyze {len(pending)} flags:\n\n{desc}") 296 analyses = result.output 297 298 # auto-resolve false positives, everything else goes to human review 299 auto = [a for a in analyses if a.category == "FALSE_POSITIVE"] 300 human = [a for a in analyses if a.category != "FALSE_POSITIVE"] 301 console.print(f"analysis: {len(auto)} auto-resolve, {len(human)} need human") 302 303 for a in auto: 304 if not dry_run: 305 try: 306 await mod.resolve( 307 a.uri, "fingerprint_noise", f"auto: {a.reason[:50]}" 308 ) 309 console.print(f" [green]✓[/green] {a.uri[-40:]}") 310 except Exception as e: 311 console.print(f" [red]✗[/red] {e}") 312 313 # create batch and send link for needs_human (if any) 314 if human: 315 human_uris = [h.uri for h in human] 316 console.print(f"[dim]creating batch for {len(human_uris)} flags...[/dim]") 317 318 if not dry_run: 319 batch = await mod.create_batch(human_uris, created_by="moderation_loop") 320 full_url = f"{mod.base_url.rstrip('/')}{batch['url']}" 321 msg = ( 322 f"{get_header(env)} {batch['flag_count']} need review:\n{full_url}" 323 ) 324 await dm.send(msg) 325 console.print(f"[green]sent batch {batch['id']}[/green]") 326 else: 327 console.print( 328 f"[yellow]would create batch with {len(human_uris)} flags[/yellow]" 329 ) 330 331 console.print("[bold]done[/bold]") 332 333 finally: 334 await mod.close() 335 await plyr.close() 336 337 338def main() -> None: 339 parser = argparse.ArgumentParser() 340 parser.add_argument("--dry-run", action="store_true") 341 parser.add_argument("--limit", type=int, default=None) 342 parser.add_argument("--env", default="prod", choices=["dev", "staging", "prod"]) 343 args = parser.parse_args() 344 asyncio.run(run_loop(dry_run=args.dry_run, limit=args.limit, env=args.env)) 345 346 347if __name__ == "__main__": 348 main()