at main 6.1 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2"""backfill ATProto records for tracks missing atproto_record_uri. 3 4Creates ATProto records on user's PDS for tracks that: 51. Exist in the database 62. Have no atproto_record_uri (orphaned/never synced) 73. Belong to the configured user (ATPROTO_MAIN_HANDLE) 8 9The script uses the app's namespace configuration (settings.atproto.track_collection) 10to create records in the correct namespace for the current environment. 11 12## Prerequisites 13 14Set credentials in .env: 15```bash 16ATPROTO_MAIN_HANDLE=your.handle 17ATPROTO_MAIN_PASSWORD=your-app-password 18DATABASE_URL=postgresql://... # target database 19``` 20 21## Usage 22 23```bash 24uv run scripts/backfill_atproto_records.py 25``` 26 27The script will: 281. Resolve user's PDS URL from handle/DID 292. Query database for tracks without atproto_record_uri 303. Create ATProto records on PDS using configured namespace 314. Update database with new URIs and CIDs 32 33## Verification 34 35After running, verify success: 36```bash 37# check ATProto records on PDS (see docs/tools/pdsx.md) 38uvx pdsx --pds <pds-url> -r <handle> ls <namespace> 39 40# check database (see docs/tools/neon.md) 41SELECT COUNT(*) FROM tracks WHERE atproto_record_uri IS NOT NULL; 42``` 43 44## References 45 46- Database queries: docs/tools/neon.md 47- PDS inspection: docs/tools/pdsx.md 48- ATProto records: src/backend/_internal/atproto/records.py 49""" 50 51import asyncio 52from datetime import UTC, datetime 53 54from atproto import AsyncClient 55from atproto_identity.resolver import AsyncIdResolver 56from pydantic import Field 57from pydantic_settings import BaseSettings, SettingsConfigDict 58from sqlalchemy import select 59 60from backend.config import settings as app_settings 61from backend.models import Artist, Track, db_session 62 63 64class BackfillSettings(BaseSettings): 65 """settings for backfill script.""" 66 67 model_config = SettingsConfigDict( 68 env_file=".env", 69 case_sensitive=False, 70 extra="ignore", 71 ) 72 73 handle: str = Field(validation_alias="ATPROTO_MAIN_HANDLE") 74 password: str = Field(validation_alias="ATPROTO_MAIN_PASSWORD") 75 76 77async def main(): 78 """backfill ATProto records for orphaned tracks.""" 79 settings = BackfillSettings() 80 81 # resolve PDS from handle 82 print(f"resolving PDS for {settings.handle}...") 83 resolver = AsyncIdResolver() 84 85 # first resolve handle to DID 86 user_did = await resolver.handle.resolve(settings.handle) 87 print(f"resolved DID: {user_did}") 88 89 # then get PDS URL from DID document 90 did_doc = await resolver.did.resolve(user_did) 91 pds_url = None 92 for service in did_doc.service: 93 if service.type == "AtprotoPersonalDataServer": 94 pds_url = service.service_endpoint 95 break 96 97 if not pds_url: 98 raise ValueError(f"no PDS found for {settings.handle}") 99 100 print(f"using PDS: {pds_url}") 101 102 # create atproto client with correct PDS 103 client = AsyncClient(base_url=pds_url) 104 await client.login(settings.handle, settings.password) 105 106 print(f"logged in as {settings.handle} (DID: {user_did})") 107 108 # fetch tracks that need backfilling 109 async with db_session() as db: 110 stmt = ( 111 select(Track) 112 .join(Artist) 113 .where(Track.artist_did == user_did) 114 .where(Track.atproto_record_uri.is_(None)) 115 .order_by(Track.id) 116 ) 117 result = await db.execute(stmt) 118 tracks = result.scalars().all() 119 120 # eagerly load artist for each track 121 for track in tracks: 122 await db.refresh(track, ["artist"]) 123 124 print(f"found {len(tracks)} tracks to backfill") 125 126 if not tracks: 127 print("no tracks need backfilling!") 128 return 129 130 # backfill each track 131 for track in tracks: 132 print(f"\nbackfilling track {track.id}: {track.title}") 133 134 # build record 135 record = { 136 "$type": app_settings.atproto.track_collection, 137 "title": track.title, 138 "artist": track.artist.display_name, 139 "audioUrl": track.r2_url, 140 "fileType": track.file_type, 141 "createdAt": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 142 } 143 144 # add optional fields 145 if track.album: 146 record["album"] = track.album 147 148 if track.features: 149 # convert to ATProto format 150 record["features"] = [ 151 { 152 "did": f["did"], 153 "handle": f["handle"], 154 "displayName": f.get("display_name", f["handle"]), 155 } 156 for f in track.features 157 ] 158 159 if track.image_id: 160 # manually construct image URL since images table doesn't exist in prod 161 # try common image formats - in practice these are likely jpg/png 162 r2_public_url = "https://pub-d4ed8a1e39d44dac85263d86ad5676fd.r2.dev" 163 # assume jpg for now - can be updated later if needed 164 record["imageUrl"] = f"{r2_public_url}/images/{track.image_id}.jpg" 165 166 # create record 167 try: 168 response = await client.com.atproto.repo.create_record( 169 { 170 "repo": user_did, 171 "collection": app_settings.atproto.track_collection, 172 "record": record, 173 } 174 ) 175 176 record_uri = response.uri 177 record_cid = response.cid 178 179 print(f" ✓ created record: {record_uri}") 180 181 # update database 182 async with db_session() as db: 183 stmt = select(Track).where(Track.id == track.id) 184 result = await db.execute(stmt) 185 db_track = result.scalar_one() 186 187 db_track.atproto_record_uri = record_uri 188 db_track.atproto_record_cid = record_cid 189 190 await db.commit() 191 192 print(" ✓ updated database") 193 194 except Exception as e: 195 print(f" ✗ failed: {e}") 196 continue 197 198 print(f"\nbackfilled {len(tracks)} tracks successfully!") 199 200 201if __name__ == "__main__": 202 asyncio.run(main())