music on atproto
plyr.fm
1#!/usr/bin/env -S uv run --script --quiet
2"""backfill ATProto records for tracks missing atproto_record_uri.
3
4Creates ATProto records on user's PDS for tracks that:
51. Exist in the database
62. Have no atproto_record_uri (orphaned/never synced)
73. Belong to the configured user (ATPROTO_MAIN_HANDLE)
8
9The script uses the app's namespace configuration (settings.atproto.track_collection)
10to create records in the correct namespace for the current environment.
11
12## Prerequisites
13
14Set credentials in .env:
15```bash
16ATPROTO_MAIN_HANDLE=your.handle
17ATPROTO_MAIN_PASSWORD=your-app-password
18DATABASE_URL=postgresql://... # target database
19```
20
21## Usage
22
23```bash
24uv run scripts/backfill_atproto_records.py
25```
26
27The script will:
281. Resolve user's PDS URL from handle/DID
292. Query database for tracks without atproto_record_uri
303. Create ATProto records on PDS using configured namespace
314. Update database with new URIs and CIDs
32
33## Verification
34
35After running, verify success:
36```bash
37# check ATProto records on PDS (see docs/tools/pdsx.md)
38uvx pdsx --pds <pds-url> -r <handle> ls <namespace>
39
40# check database (see docs/tools/neon.md)
41SELECT COUNT(*) FROM tracks WHERE atproto_record_uri IS NOT NULL;
42```
43
44## References
45
46- Database queries: docs/tools/neon.md
47- PDS inspection: docs/tools/pdsx.md
48- ATProto records: src/backend/_internal/atproto/records.py
49"""
50
51import asyncio
52from datetime import UTC, datetime
53
54from atproto import AsyncClient
55from atproto_identity.resolver import AsyncIdResolver
56from pydantic import Field
57from pydantic_settings import BaseSettings, SettingsConfigDict
58from sqlalchemy import select
59
60from backend.config import settings as app_settings
61from backend.models import Artist, Track, db_session
62
63
64class BackfillSettings(BaseSettings):
65 """settings for backfill script."""
66
67 model_config = SettingsConfigDict(
68 env_file=".env",
69 case_sensitive=False,
70 extra="ignore",
71 )
72
73 handle: str = Field(validation_alias="ATPROTO_MAIN_HANDLE")
74 password: str = Field(validation_alias="ATPROTO_MAIN_PASSWORD")
75
76
77async def main():
78 """backfill ATProto records for orphaned tracks."""
79 settings = BackfillSettings()
80
81 # resolve PDS from handle
82 print(f"resolving PDS for {settings.handle}...")
83 resolver = AsyncIdResolver()
84
85 # first resolve handle to DID
86 user_did = await resolver.handle.resolve(settings.handle)
87 print(f"resolved DID: {user_did}")
88
89 # then get PDS URL from DID document
90 did_doc = await resolver.did.resolve(user_did)
91 pds_url = None
92 for service in did_doc.service:
93 if service.type == "AtprotoPersonalDataServer":
94 pds_url = service.service_endpoint
95 break
96
97 if not pds_url:
98 raise ValueError(f"no PDS found for {settings.handle}")
99
100 print(f"using PDS: {pds_url}")
101
102 # create atproto client with correct PDS
103 client = AsyncClient(base_url=pds_url)
104 await client.login(settings.handle, settings.password)
105
106 print(f"logged in as {settings.handle} (DID: {user_did})")
107
108 # fetch tracks that need backfilling
109 async with db_session() as db:
110 stmt = (
111 select(Track)
112 .join(Artist)
113 .where(Track.artist_did == user_did)
114 .where(Track.atproto_record_uri.is_(None))
115 .order_by(Track.id)
116 )
117 result = await db.execute(stmt)
118 tracks = result.scalars().all()
119
120 # eagerly load artist for each track
121 for track in tracks:
122 await db.refresh(track, ["artist"])
123
124 print(f"found {len(tracks)} tracks to backfill")
125
126 if not tracks:
127 print("no tracks need backfilling!")
128 return
129
130 # backfill each track
131 for track in tracks:
132 print(f"\nbackfilling track {track.id}: {track.title}")
133
134 # build record
135 record = {
136 "$type": app_settings.atproto.track_collection,
137 "title": track.title,
138 "artist": track.artist.display_name,
139 "audioUrl": track.r2_url,
140 "fileType": track.file_type,
141 "createdAt": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
142 }
143
144 # add optional fields
145 if track.album:
146 record["album"] = track.album
147
148 if track.features:
149 # convert to ATProto format
150 record["features"] = [
151 {
152 "did": f["did"],
153 "handle": f["handle"],
154 "displayName": f.get("display_name", f["handle"]),
155 }
156 for f in track.features
157 ]
158
159 if track.image_id:
160 # manually construct image URL since images table doesn't exist in prod
161 # try common image formats - in practice these are likely jpg/png
162 r2_public_url = "https://pub-d4ed8a1e39d44dac85263d86ad5676fd.r2.dev"
163 # assume jpg for now - can be updated later if needed
164 record["imageUrl"] = f"{r2_public_url}/images/{track.image_id}.jpg"
165
166 # create record
167 try:
168 response = await client.com.atproto.repo.create_record(
169 {
170 "repo": user_did,
171 "collection": app_settings.atproto.track_collection,
172 "record": record,
173 }
174 )
175
176 record_uri = response.uri
177 record_cid = response.cid
178
179 print(f" ✓ created record: {record_uri}")
180
181 # update database
182 async with db_session() as db:
183 stmt = select(Track).where(Track.id == track.id)
184 result = await db.execute(stmt)
185 db_track = result.scalar_one()
186
187 db_track.atproto_record_uri = record_uri
188 db_track.atproto_record_cid = record_cid
189
190 await db.commit()
191
192 print(" ✓ updated database")
193
194 except Exception as e:
195 print(f" ✗ failed: {e}")
196 continue
197
198 print(f"\nbackfilled {len(tracks)} tracks successfully!")
199
200
201if __name__ == "__main__":
202 asyncio.run(main())