music on atproto
plyr.fm
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = [
5# "pydantic-ai>=0.1.0",
6# "anthropic",
7# "httpx",
8# "pydantic>=2.0",
9# "pydantic-settings",
10# "atproto>=0.0.55",
11# "rich",
12# ]
13# ///
14"""autonomous moderation loop for plyr.fm.
15
16workflow:
171. fetch pending flags from moderation service
182. analyze each flag with LLM (FALSE_POSITIVE, VIOLATION, NEEDS_HUMAN)
193. auto-resolve false positives
204. create review batch for needs_human flags
215. send DM with link to review UI
22
23the review UI handles human decisions - DM is just a notification channel.
24"""
25
26import argparse
27import asyncio
28from dataclasses import dataclass, field
29from pathlib import Path
30
31import httpx
32from atproto import AsyncClient, models
33from pydantic import BaseModel, Field
34from pydantic_ai import Agent
35from pydantic_ai.models.anthropic import AnthropicModel
36from pydantic_settings import BaseSettings, SettingsConfigDict
37from rich.console import Console
38
39console = Console()
40
41
42class LoopSettings(BaseSettings):
43 model_config = SettingsConfigDict(
44 env_file=Path(__file__).parent.parent / ".env",
45 case_sensitive=False,
46 extra="ignore",
47 )
48 moderation_service_url: str = Field(
49 default="https://moderation.plyr.fm", validation_alias="MODERATION_SERVICE_URL"
50 )
51 moderation_auth_token: str = Field(
52 default="", validation_alias="MODERATION_AUTH_TOKEN"
53 )
54 anthropic_api_key: str = Field(default="", validation_alias="ANTHROPIC_API_KEY")
55 anthropic_model: str = Field(
56 default="claude-sonnet-4-20250514", validation_alias="ANTHROPIC_MODEL"
57 )
58 bot_handle: str = Field(default="", validation_alias="NOTIFY_BOT_HANDLE")
59 bot_password: str = Field(default="", validation_alias="NOTIFY_BOT_PASSWORD")
60 recipient_handle: str = Field(
61 default="", validation_alias="NOTIFY_RECIPIENT_HANDLE"
62 )
63
64
65class FlagAnalysis(BaseModel):
66 """result of analyzing a single flag."""
67
68 uri: str
69 category: str = Field(description="FALSE_POSITIVE, VIOLATION, or NEEDS_HUMAN")
70 reason: str
71
72
73@dataclass
74class DMClient:
75 handle: str
76 password: str
77 recipient_handle: str
78 _client: AsyncClient = field(init=False, repr=False)
79 _dm_client: AsyncClient = field(init=False, repr=False)
80 _recipient_did: str = field(init=False, repr=False)
81 _convo_id: str = field(init=False, repr=False)
82
83 async def setup(self) -> None:
84 self._client = AsyncClient()
85 await self._client.login(self.handle, self.password)
86 self._dm_client = self._client.with_bsky_chat_proxy()
87 profile = await self._client.app.bsky.actor.get_profile(
88 {"actor": self.recipient_handle}
89 )
90 self._recipient_did = profile.did
91 convo = await self._dm_client.chat.bsky.convo.get_convo_for_members(
92 models.ChatBskyConvoGetConvoForMembers.Params(members=[self._recipient_did])
93 )
94 self._convo_id = convo.convo.id
95
96 async def get_messages(self, limit: int = 30) -> list[dict]:
97 response = await self._dm_client.chat.bsky.convo.get_messages(
98 models.ChatBskyConvoGetMessages.Params(convo_id=self._convo_id, limit=limit)
99 )
100 return [
101 {
102 "text": m.text,
103 "is_bot": m.sender.did != self._recipient_did,
104 "sent_at": getattr(m, "sent_at", None),
105 }
106 for m in response.messages
107 if hasattr(m, "text") and hasattr(m, "sender")
108 ]
109
110 async def send(self, text: str) -> None:
111 await self._dm_client.chat.bsky.convo.send_message(
112 models.ChatBskyConvoSendMessage.Data(
113 convo_id=self._convo_id,
114 message=models.ChatBskyConvoDefs.MessageInput(text=text),
115 )
116 )
117
118
119@dataclass
120class PlyrClient:
121 """client for checking track existence in plyr.fm."""
122
123 env: str = "prod"
124 _client: httpx.AsyncClient = field(init=False, repr=False)
125
126 def __post_init__(self) -> None:
127 base_url = {
128 "prod": "https://api.plyr.fm",
129 "staging": "https://api-stg.plyr.fm",
130 "dev": "http://localhost:8001",
131 }.get(self.env, "https://api.plyr.fm")
132 self._client = httpx.AsyncClient(base_url=base_url, timeout=10.0)
133
134 async def close(self) -> None:
135 await self._client.aclose()
136
137 async def track_exists(self, track_id: int) -> bool:
138 """check if a track exists (returns False if 404)."""
139 try:
140 r = await self._client.get(f"/tracks/{track_id}")
141 return r.status_code == 200
142 except Exception:
143 return True # assume exists on error (don't accidentally delete labels)
144
145
146@dataclass
147class ModClient:
148 base_url: str
149 auth_token: str
150 _client: httpx.AsyncClient = field(init=False, repr=False)
151
152 def __post_init__(self) -> None:
153 self._client = httpx.AsyncClient(
154 base_url=self.base_url,
155 headers={"X-Moderation-Key": self.auth_token},
156 timeout=30.0,
157 )
158
159 async def close(self) -> None:
160 await self._client.aclose()
161
162 async def list_pending(self) -> list[dict]:
163 r = await self._client.get("/admin/flags", params={"filter": "pending"})
164 r.raise_for_status()
165 return r.json().get("tracks", [])
166
167 async def resolve(self, uri: str, reason: str, notes: str = "") -> None:
168 r = await self._client.post(
169 "/admin/resolve",
170 json={
171 "uri": uri,
172 "val": "copyright-violation",
173 "reason": reason,
174 "notes": notes,
175 },
176 )
177 r.raise_for_status()
178
179 async def create_batch(
180 self, uris: list[str], created_by: str | None = None
181 ) -> dict:
182 """create a review batch and return {id, url, flag_count}."""
183 r = await self._client.post(
184 "/admin/batches",
185 json={"uris": uris, "created_by": created_by},
186 )
187 r.raise_for_status()
188 return r.json()
189
190
191def get_header(env: str) -> str:
192 return f"[PLYR-MOD:{env.upper()}]"
193
194
195def create_flag_analyzer(api_key: str, model: str) -> Agent[None, list[FlagAnalysis]]:
196 from pydantic_ai.providers.anthropic import AnthropicProvider
197
198 return Agent(
199 model=AnthropicModel(model, provider=AnthropicProvider(api_key=api_key)),
200 output_type=list[FlagAnalysis],
201 system_prompt="""\
202analyze each copyright flag. categorize as:
203- FALSE_POSITIVE: fingerprint noise, uploader is the artist, unrelated matches
204- VIOLATION: clearly copyrighted commercial content
205- NEEDS_HUMAN: ambiguous, need human review
206
207return a FlagAnalysis for each flag with uri, category, and brief reason.
208""",
209 )
210
211
212async def run_loop(
213 dry_run: bool = False, limit: int | None = None, env: str = "prod"
214) -> None:
215 settings = LoopSettings()
216 for attr in [
217 "moderation_auth_token",
218 "anthropic_api_key",
219 "bot_handle",
220 "bot_password",
221 "recipient_handle",
222 ]:
223 if not getattr(settings, attr):
224 console.print(f"[red]missing {attr}[/red]")
225 return
226
227 console.print(f"[bold]moderation loop[/bold] ({settings.anthropic_model})")
228 if dry_run:
229 console.print("[yellow]DRY RUN[/yellow]")
230
231 dm = DMClient(settings.bot_handle, settings.bot_password, settings.recipient_handle)
232 mod = ModClient(settings.moderation_service_url, settings.moderation_auth_token)
233 plyr = PlyrClient(env=env)
234
235 try:
236 await dm.setup()
237
238 # get pending flags
239 pending = await mod.list_pending()
240 if not pending:
241 console.print("[green]no pending flags[/green]")
242 return
243
244 console.print(f"[bold]{len(pending)} pending flags[/bold]")
245
246 # check for deleted tracks and auto-resolve them
247 console.print("[dim]checking for deleted tracks...[/dim]")
248 active_flags = []
249 deleted_count = 0
250 for flag in pending:
251 track_id = flag.get("context", {}).get("track_id")
252 if track_id and not await plyr.track_exists(track_id):
253 # track was deleted - resolve the flag
254 if not dry_run:
255 try:
256 await mod.resolve(
257 flag["uri"], "content_deleted", "track no longer exists"
258 )
259 console.print(
260 f" [yellow]⌫[/yellow] deleted: {flag['uri'][-40:]}"
261 )
262 deleted_count += 1
263 except Exception as e:
264 console.print(f" [red]✗[/red] {e}")
265 active_flags.append(flag)
266 else:
267 console.print(
268 f" [yellow]would resolve deleted:[/yellow] {flag['uri'][-40:]}"
269 )
270 deleted_count += 1
271 else:
272 active_flags.append(flag)
273
274 if deleted_count > 0:
275 console.print(f"[yellow]{deleted_count} deleted tracks resolved[/yellow]")
276
277 pending = active_flags
278 if not pending:
279 console.print("[green]all flags were for deleted tracks[/green]")
280 return
281
282 # analyze remaining flags
283 if limit:
284 pending = pending[:limit]
285
286 analyzer = create_flag_analyzer(
287 settings.anthropic_api_key, settings.anthropic_model
288 )
289 desc = "\n---\n".join(
290 f"URI: {f['uri']}\nTrack: {f.get('context', {}).get('track_title', '?')}\n"
291 f"Uploader: @{f.get('context', {}).get('artist_handle', '?')}\n"
292 f"Matches: {', '.join(m['artist'] for m in f.get('context', {}).get('matches', [])[:3])}"
293 for f in pending
294 )
295 result = await analyzer.run(f"analyze {len(pending)} flags:\n\n{desc}")
296 analyses = result.output
297
298 # auto-resolve false positives, everything else goes to human review
299 auto = [a for a in analyses if a.category == "FALSE_POSITIVE"]
300 human = [a for a in analyses if a.category != "FALSE_POSITIVE"]
301 console.print(f"analysis: {len(auto)} auto-resolve, {len(human)} need human")
302
303 for a in auto:
304 if not dry_run:
305 try:
306 await mod.resolve(
307 a.uri, "fingerprint_noise", f"auto: {a.reason[:50]}"
308 )
309 console.print(f" [green]✓[/green] {a.uri[-40:]}")
310 except Exception as e:
311 console.print(f" [red]✗[/red] {e}")
312
313 # create batch and send link for needs_human (if any)
314 if human:
315 human_uris = [h.uri for h in human]
316 console.print(f"[dim]creating batch for {len(human_uris)} flags...[/dim]")
317
318 if not dry_run:
319 batch = await mod.create_batch(human_uris, created_by="moderation_loop")
320 full_url = f"{mod.base_url.rstrip('/')}{batch['url']}"
321 msg = (
322 f"{get_header(env)} {batch['flag_count']} need review:\n{full_url}"
323 )
324 await dm.send(msg)
325 console.print(f"[green]sent batch {batch['id']}[/green]")
326 else:
327 console.print(
328 f"[yellow]would create batch with {len(human_uris)} flags[/yellow]"
329 )
330
331 console.print("[bold]done[/bold]")
332
333 finally:
334 await mod.close()
335 await plyr.close()
336
337
338def main() -> None:
339 parser = argparse.ArgumentParser()
340 parser.add_argument("--dry-run", action="store_true")
341 parser.add_argument("--limit", type=int, default=None)
342 parser.add_argument("--env", default="prod", choices=["dev", "staging", "prod"])
343 args = parser.parse_args()
344 asyncio.run(run_loop(dry_run=args.dry_run, limit=args.limit, env=args.env))
345
346
347if __name__ == "__main__":
348 main()