music on atproto
plyr.fm
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = [
5# "typer>=0.20.0",
6# ]
7# ///
8"""migrate ATProto records from production namespace to environment-specific namespace.
9
10This script migrates tracks and likes from the production `fm.plyr.*` namespace
11to an environment-specific namespace (e.g., `fm.plyr.stg.*` for staging).
12
13IMPORTANT: This script does NOT delete records from the production namespace.
14Deletion must be done manually after verifying the migration succeeded.
15
16## Prerequisites
17
18Set credentials in .env:
19```bash
20ATPROTO_MAIN_HANDLE=your.handle
21ATPROTO_MAIN_PASSWORD=your-app-password
22DATABASE_URL=postgresql://... # target database (staging)
23```
24
25## Usage
26
27```bash
28# dry run (default) - shows what would happen without making changes
29uv run scripts/migrate_atproto_namespace.py --target-namespace fm.plyr.stg
30
31# actually perform migration
32uv run scripts/migrate_atproto_namespace.py --target-namespace fm.plyr.stg --execute
33
34# verify migration
35uvx pdsx --pds <pds-url> -r <handle> ls fm.plyr.stg.track
36uvx pdsx --pds <pds-url> -r <handle> ls fm.plyr.stg.like
37```
38
39## What it does
40
411. **Migrate tracks:**
42 - Find tracks in DB with URIs in production namespace (`fm.plyr.track`)
43 - Read existing record from PDS
44 - Create new record in target namespace (e.g., `fm.plyr.stg.track`)
45 - Update database with new URI/CID
46 - Build mapping: old track URI → new track URI
47
482. **Migrate likes:**
49 - Find likes in DB with URIs in production namespace (`fm.plyr.like`)
50 - Get current track URI from database (source of truth, handles stale PDS like records)
51 - Read track from PDS to get current CID
52 - Create new like record in target namespace with updated subject reference
53 - Update database with new like URI
54
55## References
56
57- Database queries: docs/tools/neon.md
58- PDS inspection: docs/tools/pdsx.md
59- Issue tracker: https://github.com/zzstoatzz/plyr.fm/issues/262
60"""
61
62import asyncio
63from datetime import UTC, datetime
64
65import typer
66from atproto import AsyncClient
67from atproto_identity.resolver import AsyncIdResolver
68from pydantic import Field
69from pydantic_settings import BaseSettings, SettingsConfigDict
70from sqlalchemy import select
71
72from backend.models import Track, TrackLike, db_session
73
74app = typer.Typer()
75
76
77class MigrationSettings(BaseSettings):
78 """settings for migration script."""
79
80 model_config = SettingsConfigDict(
81 env_file=".env",
82 case_sensitive=False,
83 extra="ignore",
84 )
85
86 main_handle: str = Field(validation_alias="ATPROTO_MAIN_HANDLE")
87 main_password: str = Field(validation_alias="ATPROTO_MAIN_PASSWORD")
88 devlog_handle: str = Field(validation_alias="NOTIFY_BOT_HANDLE")
89 devlog_password: str = Field(validation_alias="NOTIFY_BOT_PASSWORD")
90
91
92async def resolve_pds_url(handle: str) -> tuple[str, str]:
93 """resolve PDS URL and DID from handle.
94
95 returns:
96 tuple of (did, pds_url)
97 """
98 resolver = AsyncIdResolver()
99
100 # resolve handle to DID
101 user_did = await resolver.handle.resolve(handle)
102
103 # get PDS URL from DID document
104 did_doc = await resolver.did.resolve(user_did)
105 pds_url = None
106 for service in did_doc.service:
107 if service.type == "AtprotoPersonalDataServer":
108 pds_url = service.service_endpoint
109 break
110
111 if not pds_url:
112 raise ValueError(f"no PDS found for {handle}")
113
114 return user_did, pds_url
115
116
117async def migrate_tracks(
118 client: AsyncClient,
119 user_did: str,
120 source_namespace: str,
121 target_namespace: str,
122 dry_run: bool,
123) -> dict[str, str]:
124 """migrate tracks from source to target namespace.
125
126 returns:
127 mapping of old track URI → new track URI
128 """
129 typer.echo(f"\n{'[DRY RUN] ' if dry_run else ''}Migrating tracks...")
130 typer.echo(f" Source: {source_namespace}.track")
131 typer.echo(f" Target: {target_namespace}.track")
132
133 # find tracks in DB with URIs in source namespace
134 async with db_session() as db:
135 stmt = (
136 select(Track)
137 .where(Track.artist_did == user_did)
138 .where(Track.atproto_record_uri.like(f"%{source_namespace}.track%"))
139 .order_by(Track.id)
140 )
141 result = await db.execute(stmt)
142 tracks = result.scalars().all()
143
144 typer.echo(f" Found {len(tracks)} tracks to migrate")
145
146 if len(tracks) == 0:
147 return {}
148
149 uri_mapping = {}
150
151 for track in tracks:
152 old_uri = track.atproto_record_uri
153
154 typer.echo(f"\n Track #{track.id}: {track.title}")
155 typer.echo(f" Old URI: {old_uri}")
156
157 if dry_run:
158 typer.echo(
159 f" [DRY RUN] Would read record and create in {target_namespace}.track"
160 )
161 # in dry run, create fake mapping for likes step
162 uri_mapping[old_uri] = f"at://{user_did}/{target_namespace}.track/DRYRUN"
163 continue
164
165 # read existing record from PDS
166 response = await client.com.atproto.repo.get_record(
167 {
168 "repo": user_did,
169 "collection": f"{source_namespace}.track",
170 "rkey": old_uri.split("/")[-1],
171 }
172 )
173
174 old_record = response.value
175
176 # create new record in target namespace
177 new_record = {
178 **old_record,
179 "$type": f"{target_namespace}.track",
180 }
181
182 create_response = await client.com.atproto.repo.create_record(
183 {
184 "repo": user_did,
185 "collection": f"{target_namespace}.track",
186 "record": new_record,
187 }
188 )
189
190 new_uri = create_response.uri
191 new_cid = create_response.cid
192
193 typer.echo(f" New URI: {new_uri}")
194
195 # update database
196 async with db_session() as db:
197 stmt = select(Track).where(Track.id == track.id)
198 result = await db.execute(stmt)
199 db_track = result.scalar_one()
200
201 db_track.atproto_record_uri = new_uri
202 db_track.atproto_record_cid = new_cid
203
204 await db.commit()
205
206 typer.echo(" ✓ Migrated and updated database")
207
208 # save mapping for likes
209 uri_mapping[old_uri] = new_uri
210
211 return uri_mapping
212
213
214async def migrate_likes(
215 client: AsyncClient,
216 user_did: str,
217 source_namespace: str,
218 target_namespace: str,
219 track_uri_mapping: dict[str, str],
220 all_clients: dict[str, AsyncClient],
221 dry_run: bool,
222) -> None:
223 """migrate likes from source to target namespace.
224
225 args:
226 all_clients: mapping of user DIDs to their authenticated clients (for cross-user likes)
227 """
228 typer.echo(f"\n{'[DRY RUN] ' if dry_run else ''}Migrating likes...")
229 typer.echo(f" Source: {source_namespace}.like")
230 typer.echo(f" Target: {target_namespace}.like")
231
232 # find likes in DB with URIs in source namespace
233 async with db_session() as db:
234 stmt = (
235 select(TrackLike)
236 .where(TrackLike.user_did == user_did)
237 .where(TrackLike.atproto_like_uri.like(f"%{source_namespace}.like%"))
238 .order_by(TrackLike.id)
239 )
240 result = await db.execute(stmt)
241 likes = result.scalars().all()
242
243 typer.echo(f" Found {len(likes)} likes to migrate")
244
245 if len(likes) == 0:
246 return
247
248 for like in likes:
249 old_uri = like.atproto_like_uri
250
251 typer.echo(f"\n Like #{like.id} for track #{like.track_id}")
252 typer.echo(f" Old URI: {old_uri}")
253
254 if dry_run:
255 typer.echo(
256 f" [DRY RUN] Would read record, map subject URI, and create in {target_namespace}.like"
257 )
258 continue
259
260 # read existing like record from PDS to get old subject URI
261 like_response = await client.com.atproto.repo.get_record(
262 {
263 "repo": user_did,
264 "collection": f"{source_namespace}.like",
265 "rkey": old_uri.split("/")[-1],
266 }
267 )
268 old_like_record = like_response.value
269 old_subject_uri = old_like_record["subject"]["uri"]
270
271 # look up new track URI from mapping
272 if old_subject_uri not in track_uri_mapping:
273 typer.echo(
274 f" ✗ ERROR: No mapping found for subject URI: {old_subject_uri}"
275 )
276 typer.echo(" Skipping this like")
277 continue
278
279 new_subject_uri = track_uri_mapping[old_subject_uri]
280
281 # extract track owner DID from new URI to use correct client
282 track_owner_did = new_subject_uri.split("/")[2] # at://did/collection/rkey
283 track_owner_client = all_clients.get(track_owner_did, client)
284
285 # get new track CID by reading the new track record
286 track_rkey = new_subject_uri.split("/")[-1]
287 track_response = await track_owner_client.com.atproto.repo.get_record(
288 {
289 "repo": track_owner_did,
290 "collection": f"{target_namespace}.track",
291 "rkey": track_rkey,
292 }
293 )
294 new_subject_cid = track_response.cid
295
296 # create new like record with updated subject
297 new_like_record = {
298 "$type": f"{target_namespace}.like",
299 "subject": {
300 "uri": new_subject_uri,
301 "cid": new_subject_cid,
302 },
303 "createdAt": old_like_record.get(
304 "createdAt", datetime.now(UTC).isoformat().replace("+00:00", "Z")
305 ),
306 }
307
308 create_response = await client.com.atproto.repo.create_record(
309 {
310 "repo": user_did,
311 "collection": f"{target_namespace}.like",
312 "record": new_like_record,
313 }
314 )
315
316 new_like_uri = create_response.uri
317
318 typer.echo(f" Subject: {old_subject_uri} → {new_subject_uri}")
319 typer.echo(f" New URI: {new_like_uri}")
320
321 # update database
322 async with db_session() as db:
323 stmt = select(TrackLike).where(TrackLike.id == like.id)
324 result = await db.execute(stmt)
325 db_like = result.scalar_one()
326
327 db_like.atproto_like_uri = new_like_uri
328
329 await db.commit()
330
331 typer.echo(" ✓ Migrated and updated database")
332
333
334@app.command()
335def main(
336 target_namespace: str = typer.Option(
337 ...,
338 "--target-namespace",
339 help="Target namespace (e.g., 'fm.plyr.stg' for staging)",
340 ),
341 execute: bool = typer.Option(
342 False,
343 "--execute",
344 help="Actually perform migration (default is dry-run)",
345 ),
346 source_namespace: str = typer.Option(
347 "fm.plyr",
348 "--source-namespace",
349 help="Source namespace (production)",
350 ),
351):
352 """migrate ATProto records from production namespace to environment-specific namespace."""
353
354 async def run_migration():
355 settings = MigrationSettings()
356
357 dry_run = not execute
358
359 if dry_run:
360 typer.echo("=" * 60)
361 typer.echo("DRY RUN MODE - No changes will be made")
362 typer.echo("=" * 60)
363 else:
364 typer.echo("=" * 60)
365 typer.echo("EXECUTING MIGRATION")
366 typer.echo("=" * 60)
367 typer.confirm("Are you sure you want to proceed?", abort=True)
368
369 # set up both users
370 users = [
371 {
372 "handle": settings.main_handle,
373 "password": settings.main_password,
374 "name": "main",
375 },
376 {
377 "handle": settings.devlog_handle,
378 "password": settings.devlog_password,
379 "name": "devlog",
380 },
381 ]
382
383 # resolve and authenticate both users
384 authenticated_users = []
385 for user in users:
386 typer.echo(f"\nResolving PDS for {user['handle']} ({user['name']})...")
387 user_did, pds_url = await resolve_pds_url(user["handle"])
388 typer.echo(f" DID: {user_did}")
389 typer.echo(f" PDS: {pds_url}")
390
391 # authenticate
392 client = AsyncClient(base_url=pds_url)
393 await client.login(user["handle"], user["password"])
394 typer.echo(" ✓ Authenticated")
395
396 authenticated_users.append(
397 {
398 **user,
399 "did": user_did,
400 "pds_url": pds_url,
401 "client": client,
402 }
403 )
404
405 # migrate ALL tracks first (builds complete URI mapping)
406 track_uri_mapping = {}
407 for user in authenticated_users:
408 typer.echo(
409 f"\n{'=' * 60}\nProcessing tracks for {user['handle']} ({user['name']})\n{'=' * 60}"
410 )
411 mapping = await migrate_tracks(
412 client=user["client"],
413 user_did=user["did"],
414 source_namespace=source_namespace,
415 target_namespace=target_namespace,
416 dry_run=dry_run,
417 )
418 track_uri_mapping.update(mapping)
419
420 # build DID → client mapping for cross-user likes
421 all_clients = {user["did"]: user["client"] for user in authenticated_users}
422
423 # migrate ALL likes (uses complete URI mapping from all users)
424 for user in authenticated_users:
425 typer.echo(
426 f"\n{'=' * 60}\nProcessing likes for {user['handle']} ({user['name']})\n{'=' * 60}"
427 )
428 await migrate_likes(
429 client=user["client"],
430 user_did=user["did"],
431 source_namespace=source_namespace,
432 target_namespace=target_namespace,
433 track_uri_mapping=track_uri_mapping,
434 all_clients=all_clients,
435 dry_run=dry_run,
436 )
437
438 typer.echo("\n" + "=" * 60)
439 if dry_run:
440 typer.echo("DRY RUN COMPLETE")
441 typer.echo("Run with --execute to perform actual migration")
442 else:
443 typer.echo("MIGRATION COMPLETE")
444 typer.echo("\nIMPORTANT: Old records still exist in production namespace.")
445 typer.echo("After verifying migration, manually delete old records.")
446 typer.echo("=" * 60)
447
448 asyncio.run(run_migration())
449
450
451if __name__ == "__main__":
452 app()