at main 14 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [ 5# "typer>=0.20.0", 6# ] 7# /// 8"""migrate ATProto records from production namespace to environment-specific namespace. 9 10This script migrates tracks and likes from the production `fm.plyr.*` namespace 11to an environment-specific namespace (e.g., `fm.plyr.stg.*` for staging). 12 13IMPORTANT: This script does NOT delete records from the production namespace. 14Deletion must be done manually after verifying the migration succeeded. 15 16## Prerequisites 17 18Set credentials in .env: 19```bash 20ATPROTO_MAIN_HANDLE=your.handle 21ATPROTO_MAIN_PASSWORD=your-app-password 22DATABASE_URL=postgresql://... # target database (staging) 23``` 24 25## Usage 26 27```bash 28# dry run (default) - shows what would happen without making changes 29uv run scripts/migrate_atproto_namespace.py --target-namespace fm.plyr.stg 30 31# actually perform migration 32uv run scripts/migrate_atproto_namespace.py --target-namespace fm.plyr.stg --execute 33 34# verify migration 35uvx pdsx --pds <pds-url> -r <handle> ls fm.plyr.stg.track 36uvx pdsx --pds <pds-url> -r <handle> ls fm.plyr.stg.like 37``` 38 39## What it does 40 411. **Migrate tracks:** 42 - Find tracks in DB with URIs in production namespace (`fm.plyr.track`) 43 - Read existing record from PDS 44 - Create new record in target namespace (e.g., `fm.plyr.stg.track`) 45 - Update database with new URI/CID 46 - Build mapping: old track URI → new track URI 47 482. **Migrate likes:** 49 - Find likes in DB with URIs in production namespace (`fm.plyr.like`) 50 - Get current track URI from database (source of truth, handles stale PDS like records) 51 - Read track from PDS to get current CID 52 - Create new like record in target namespace with updated subject reference 53 - Update database with new like URI 54 55## References 56 57- Database queries: docs/tools/neon.md 58- PDS inspection: docs/tools/pdsx.md 59- Issue tracker: https://github.com/zzstoatzz/plyr.fm/issues/262 60""" 61 62import asyncio 63from datetime import UTC, datetime 64 65import typer 66from atproto import AsyncClient 67from atproto_identity.resolver import AsyncIdResolver 68from pydantic import Field 69from pydantic_settings import BaseSettings, SettingsConfigDict 70from sqlalchemy import select 71 72from backend.models import Track, TrackLike, db_session 73 74app = typer.Typer() 75 76 77class MigrationSettings(BaseSettings): 78 """settings for migration script.""" 79 80 model_config = SettingsConfigDict( 81 env_file=".env", 82 case_sensitive=False, 83 extra="ignore", 84 ) 85 86 main_handle: str = Field(validation_alias="ATPROTO_MAIN_HANDLE") 87 main_password: str = Field(validation_alias="ATPROTO_MAIN_PASSWORD") 88 devlog_handle: str = Field(validation_alias="NOTIFY_BOT_HANDLE") 89 devlog_password: str = Field(validation_alias="NOTIFY_BOT_PASSWORD") 90 91 92async def resolve_pds_url(handle: str) -> tuple[str, str]: 93 """resolve PDS URL and DID from handle. 94 95 returns: 96 tuple of (did, pds_url) 97 """ 98 resolver = AsyncIdResolver() 99 100 # resolve handle to DID 101 user_did = await resolver.handle.resolve(handle) 102 103 # get PDS URL from DID document 104 did_doc = await resolver.did.resolve(user_did) 105 pds_url = None 106 for service in did_doc.service: 107 if service.type == "AtprotoPersonalDataServer": 108 pds_url = service.service_endpoint 109 break 110 111 if not pds_url: 112 raise ValueError(f"no PDS found for {handle}") 113 114 return user_did, pds_url 115 116 117async def migrate_tracks( 118 client: AsyncClient, 119 user_did: str, 120 source_namespace: str, 121 target_namespace: str, 122 dry_run: bool, 123) -> dict[str, str]: 124 """migrate tracks from source to target namespace. 125 126 returns: 127 mapping of old track URI → new track URI 128 """ 129 typer.echo(f"\n{'[DRY RUN] ' if dry_run else ''}Migrating tracks...") 130 typer.echo(f" Source: {source_namespace}.track") 131 typer.echo(f" Target: {target_namespace}.track") 132 133 # find tracks in DB with URIs in source namespace 134 async with db_session() as db: 135 stmt = ( 136 select(Track) 137 .where(Track.artist_did == user_did) 138 .where(Track.atproto_record_uri.like(f"%{source_namespace}.track%")) 139 .order_by(Track.id) 140 ) 141 result = await db.execute(stmt) 142 tracks = result.scalars().all() 143 144 typer.echo(f" Found {len(tracks)} tracks to migrate") 145 146 if len(tracks) == 0: 147 return {} 148 149 uri_mapping = {} 150 151 for track in tracks: 152 old_uri = track.atproto_record_uri 153 154 typer.echo(f"\n Track #{track.id}: {track.title}") 155 typer.echo(f" Old URI: {old_uri}") 156 157 if dry_run: 158 typer.echo( 159 f" [DRY RUN] Would read record and create in {target_namespace}.track" 160 ) 161 # in dry run, create fake mapping for likes step 162 uri_mapping[old_uri] = f"at://{user_did}/{target_namespace}.track/DRYRUN" 163 continue 164 165 # read existing record from PDS 166 response = await client.com.atproto.repo.get_record( 167 { 168 "repo": user_did, 169 "collection": f"{source_namespace}.track", 170 "rkey": old_uri.split("/")[-1], 171 } 172 ) 173 174 old_record = response.value 175 176 # create new record in target namespace 177 new_record = { 178 **old_record, 179 "$type": f"{target_namespace}.track", 180 } 181 182 create_response = await client.com.atproto.repo.create_record( 183 { 184 "repo": user_did, 185 "collection": f"{target_namespace}.track", 186 "record": new_record, 187 } 188 ) 189 190 new_uri = create_response.uri 191 new_cid = create_response.cid 192 193 typer.echo(f" New URI: {new_uri}") 194 195 # update database 196 async with db_session() as db: 197 stmt = select(Track).where(Track.id == track.id) 198 result = await db.execute(stmt) 199 db_track = result.scalar_one() 200 201 db_track.atproto_record_uri = new_uri 202 db_track.atproto_record_cid = new_cid 203 204 await db.commit() 205 206 typer.echo(" ✓ Migrated and updated database") 207 208 # save mapping for likes 209 uri_mapping[old_uri] = new_uri 210 211 return uri_mapping 212 213 214async def migrate_likes( 215 client: AsyncClient, 216 user_did: str, 217 source_namespace: str, 218 target_namespace: str, 219 track_uri_mapping: dict[str, str], 220 all_clients: dict[str, AsyncClient], 221 dry_run: bool, 222) -> None: 223 """migrate likes from source to target namespace. 224 225 args: 226 all_clients: mapping of user DIDs to their authenticated clients (for cross-user likes) 227 """ 228 typer.echo(f"\n{'[DRY RUN] ' if dry_run else ''}Migrating likes...") 229 typer.echo(f" Source: {source_namespace}.like") 230 typer.echo(f" Target: {target_namespace}.like") 231 232 # find likes in DB with URIs in source namespace 233 async with db_session() as db: 234 stmt = ( 235 select(TrackLike) 236 .where(TrackLike.user_did == user_did) 237 .where(TrackLike.atproto_like_uri.like(f"%{source_namespace}.like%")) 238 .order_by(TrackLike.id) 239 ) 240 result = await db.execute(stmt) 241 likes = result.scalars().all() 242 243 typer.echo(f" Found {len(likes)} likes to migrate") 244 245 if len(likes) == 0: 246 return 247 248 for like in likes: 249 old_uri = like.atproto_like_uri 250 251 typer.echo(f"\n Like #{like.id} for track #{like.track_id}") 252 typer.echo(f" Old URI: {old_uri}") 253 254 if dry_run: 255 typer.echo( 256 f" [DRY RUN] Would read record, map subject URI, and create in {target_namespace}.like" 257 ) 258 continue 259 260 # read existing like record from PDS to get old subject URI 261 like_response = await client.com.atproto.repo.get_record( 262 { 263 "repo": user_did, 264 "collection": f"{source_namespace}.like", 265 "rkey": old_uri.split("/")[-1], 266 } 267 ) 268 old_like_record = like_response.value 269 old_subject_uri = old_like_record["subject"]["uri"] 270 271 # look up new track URI from mapping 272 if old_subject_uri not in track_uri_mapping: 273 typer.echo( 274 f" ✗ ERROR: No mapping found for subject URI: {old_subject_uri}" 275 ) 276 typer.echo(" Skipping this like") 277 continue 278 279 new_subject_uri = track_uri_mapping[old_subject_uri] 280 281 # extract track owner DID from new URI to use correct client 282 track_owner_did = new_subject_uri.split("/")[2] # at://did/collection/rkey 283 track_owner_client = all_clients.get(track_owner_did, client) 284 285 # get new track CID by reading the new track record 286 track_rkey = new_subject_uri.split("/")[-1] 287 track_response = await track_owner_client.com.atproto.repo.get_record( 288 { 289 "repo": track_owner_did, 290 "collection": f"{target_namespace}.track", 291 "rkey": track_rkey, 292 } 293 ) 294 new_subject_cid = track_response.cid 295 296 # create new like record with updated subject 297 new_like_record = { 298 "$type": f"{target_namespace}.like", 299 "subject": { 300 "uri": new_subject_uri, 301 "cid": new_subject_cid, 302 }, 303 "createdAt": old_like_record.get( 304 "createdAt", datetime.now(UTC).isoformat().replace("+00:00", "Z") 305 ), 306 } 307 308 create_response = await client.com.atproto.repo.create_record( 309 { 310 "repo": user_did, 311 "collection": f"{target_namespace}.like", 312 "record": new_like_record, 313 } 314 ) 315 316 new_like_uri = create_response.uri 317 318 typer.echo(f" Subject: {old_subject_uri}{new_subject_uri}") 319 typer.echo(f" New URI: {new_like_uri}") 320 321 # update database 322 async with db_session() as db: 323 stmt = select(TrackLike).where(TrackLike.id == like.id) 324 result = await db.execute(stmt) 325 db_like = result.scalar_one() 326 327 db_like.atproto_like_uri = new_like_uri 328 329 await db.commit() 330 331 typer.echo(" ✓ Migrated and updated database") 332 333 334@app.command() 335def main( 336 target_namespace: str = typer.Option( 337 ..., 338 "--target-namespace", 339 help="Target namespace (e.g., 'fm.plyr.stg' for staging)", 340 ), 341 execute: bool = typer.Option( 342 False, 343 "--execute", 344 help="Actually perform migration (default is dry-run)", 345 ), 346 source_namespace: str = typer.Option( 347 "fm.plyr", 348 "--source-namespace", 349 help="Source namespace (production)", 350 ), 351): 352 """migrate ATProto records from production namespace to environment-specific namespace.""" 353 354 async def run_migration(): 355 settings = MigrationSettings() 356 357 dry_run = not execute 358 359 if dry_run: 360 typer.echo("=" * 60) 361 typer.echo("DRY RUN MODE - No changes will be made") 362 typer.echo("=" * 60) 363 else: 364 typer.echo("=" * 60) 365 typer.echo("EXECUTING MIGRATION") 366 typer.echo("=" * 60) 367 typer.confirm("Are you sure you want to proceed?", abort=True) 368 369 # set up both users 370 users = [ 371 { 372 "handle": settings.main_handle, 373 "password": settings.main_password, 374 "name": "main", 375 }, 376 { 377 "handle": settings.devlog_handle, 378 "password": settings.devlog_password, 379 "name": "devlog", 380 }, 381 ] 382 383 # resolve and authenticate both users 384 authenticated_users = [] 385 for user in users: 386 typer.echo(f"\nResolving PDS for {user['handle']} ({user['name']})...") 387 user_did, pds_url = await resolve_pds_url(user["handle"]) 388 typer.echo(f" DID: {user_did}") 389 typer.echo(f" PDS: {pds_url}") 390 391 # authenticate 392 client = AsyncClient(base_url=pds_url) 393 await client.login(user["handle"], user["password"]) 394 typer.echo(" ✓ Authenticated") 395 396 authenticated_users.append( 397 { 398 **user, 399 "did": user_did, 400 "pds_url": pds_url, 401 "client": client, 402 } 403 ) 404 405 # migrate ALL tracks first (builds complete URI mapping) 406 track_uri_mapping = {} 407 for user in authenticated_users: 408 typer.echo( 409 f"\n{'=' * 60}\nProcessing tracks for {user['handle']} ({user['name']})\n{'=' * 60}" 410 ) 411 mapping = await migrate_tracks( 412 client=user["client"], 413 user_did=user["did"], 414 source_namespace=source_namespace, 415 target_namespace=target_namespace, 416 dry_run=dry_run, 417 ) 418 track_uri_mapping.update(mapping) 419 420 # build DID → client mapping for cross-user likes 421 all_clients = {user["did"]: user["client"] for user in authenticated_users} 422 423 # migrate ALL likes (uses complete URI mapping from all users) 424 for user in authenticated_users: 425 typer.echo( 426 f"\n{'=' * 60}\nProcessing likes for {user['handle']} ({user['name']})\n{'=' * 60}" 427 ) 428 await migrate_likes( 429 client=user["client"], 430 user_did=user["did"], 431 source_namespace=source_namespace, 432 target_namespace=target_namespace, 433 track_uri_mapping=track_uri_mapping, 434 all_clients=all_clients, 435 dry_run=dry_run, 436 ) 437 438 typer.echo("\n" + "=" * 60) 439 if dry_run: 440 typer.echo("DRY RUN COMPLETE") 441 typer.echo("Run with --execute to perform actual migration") 442 else: 443 typer.echo("MIGRATION COMPLETE") 444 typer.echo("\nIMPORTANT: Old records still exist in production namespace.") 445 typer.echo("After verifying migration, manually delete old records.") 446 typer.echo("=" * 60) 447 448 asyncio.run(run_migration()) 449 450 451if __name__ == "__main__": 452 app()