feat: add targeted album list sync background task (#558)

adds sync_album_list task to sync album ATProto list records immediately
after track operations, rather than waiting for the user's next login.

changes:
- add sync_album_list task and schedule_album_list_sync to background_tasks.py
- refactor _register_tasks to use docket.register_collection() per Guidry's advice
- call sync on: track upload, track delete, track album change, record restore
- mock schedule_album_list_sync in affected tests

fixes the issue where albums created during track upload didn't get
ATProto list records until the user logged in again.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude <noreply@anthropic.com>

authored by zzstoatzz.io Claude and committed by GitHub 57b60c65 fe1ea78c

Changed files
+140 -25
backend
+3 -24
backend/src/backend/_internal/background.py
··· 96 """register all background task functions with the docket. 97 98 tasks must be registered before they can be executed by workers. 99 - add new task imports here as they're created. 100 """ 101 - # import task functions here to avoid circular imports 102 - from backend._internal.background_tasks import ( 103 - process_export, 104 - scan_copyright, 105 - scrobble_to_teal, 106 - sync_atproto, 107 - ) 108 109 - docket.register(scan_copyright) 110 - docket.register(process_export) 111 - docket.register(sync_atproto) 112 - docket.register(scrobble_to_teal) 113 - 114 - logger.info( 115 - "registered background tasks", 116 - extra={ 117 - "tasks": [ 118 - "scan_copyright", 119 - "process_export", 120 - "sync_atproto", 121 - "scrobble_to_teal", 122 - ] 123 - }, 124 - )
··· 96 """register all background task functions with the docket. 97 98 tasks must be registered before they can be executed by workers. 99 + new tasks should be added to background_tasks.background_tasks list. 100 """ 101 + docket.register_collection("backend._internal.background_tasks:background_tasks") 102 103 + logger.info("registered background tasks")
+94
backend/src/backend/_internal/background_tasks.py
··· 430 session_id, track_id, track_title, artist_name, duration, album_name 431 ) 432 logfire.info("scheduled teal scrobble", track_id=track_id)
··· 430 session_id, track_id, track_title, artist_name, duration, album_name 431 ) 432 logfire.info("scheduled teal scrobble", track_id=track_id) 433 + 434 + 435 + async def sync_album_list(session_id: str, album_id: str) -> None: 436 + """sync a single album's ATProto list record. 437 + 438 + creates or updates the album's list record on the user's PDS. 439 + called after track uploads or album mutations. 440 + 441 + args: 442 + session_id: the user's session ID for authentication 443 + album_id: the album's database ID 444 + """ 445 + from sqlalchemy import select 446 + 447 + from backend._internal.atproto.records.fm_plyr import upsert_album_list_record 448 + from backend._internal.auth import get_session 449 + from backend.models import Album, Track 450 + from backend.utilities.database import db_session 451 + 452 + auth_session = await get_session(session_id) 453 + if not auth_session: 454 + logger.warning(f"sync_album_list: session {session_id[:8]}... not found") 455 + return 456 + 457 + async with db_session() as session: 458 + # fetch album 459 + album_result = await session.execute(select(Album).where(Album.id == album_id)) 460 + album = album_result.scalar_one_or_none() 461 + if not album: 462 + logger.warning(f"sync_album_list: album {album_id} not found") 463 + return 464 + 465 + # verify album belongs to this user 466 + if album.artist_did != auth_session.did: 467 + logger.warning( 468 + f"sync_album_list: album {album_id} does not belong to {auth_session.did}" 469 + ) 470 + return 471 + 472 + # fetch tracks with ATProto records 473 + tracks_result = await session.execute( 474 + select(Track) 475 + .where( 476 + Track.album_id == album_id, 477 + Track.atproto_record_uri.isnot(None), 478 + Track.atproto_record_cid.isnot(None), 479 + ) 480 + .order_by(Track.created_at.asc()) 481 + ) 482 + tracks = tracks_result.scalars().all() 483 + 484 + if not tracks: 485 + logger.debug( 486 + f"sync_album_list: album {album_id} has no tracks with ATProto records" 487 + ) 488 + return 489 + 490 + track_refs = [ 491 + {"uri": t.atproto_record_uri, "cid": t.atproto_record_cid} for t in tracks 492 + ] 493 + 494 + try: 495 + result = await upsert_album_list_record( 496 + auth_session, 497 + album_id=album_id, 498 + album_title=album.title, 499 + track_refs=track_refs, 500 + existing_uri=album.atproto_record_uri, 501 + existing_created_at=album.created_at, 502 + ) 503 + if result: 504 + album.atproto_record_uri = result[0] 505 + album.atproto_record_cid = result[1] 506 + await session.commit() 507 + logger.info(f"synced album list record for {album_id}: {result[0]}") 508 + except Exception as e: 509 + logger.warning(f"failed to sync album list record for {album_id}: {e}") 510 + 511 + 512 + async def schedule_album_list_sync(session_id: str, album_id: str) -> None: 513 + """schedule an album list sync via docket.""" 514 + docket = get_docket() 515 + await docket.add(sync_album_list)(session_id, album_id) 516 + logfire.info("scheduled album list sync", album_id=album_id) 517 + 518 + 519 + # collection of all background task functions for docket registration 520 + background_tasks = [ 521 + scan_copyright, 522 + process_export, 523 + sync_atproto, 524 + scrobble_to_teal, 525 + sync_album_list, 526 + ]
+25
backend/src/backend/api/tracks/mutations.py
··· 23 update_record, 24 ) 25 from backend._internal.atproto.tid import datetime_to_tid 26 from backend.config import settings 27 from backend.models import Artist, Tag, Track, TrackTag, get_db 28 from backend.schemas import TrackResponse ··· 146 f"failed to delete image {track.image_id}: {e}", exc_info=True 147 ) 148 149 # delete track record 150 await db.delete(track) 151 await db.commit() 152 153 return {"message": "track deleted successfully"} 154 ··· 188 track.title = title 189 title_changed = True 190 191 await apply_album_update(db, track, album) 192 193 if features is not None: 194 track.features = await resolve_feature_handles( ··· 259 260 await db.commit() 261 await db.refresh(track) 262 263 # build track_tags dict for response 264 # if tags were updated, use updated_tags; otherwise query for existing ··· 551 track.atproto_record_cid = new_cid 552 await db.commit() 553 await db.refresh(track) 554 555 logger.info(f"restored ATProto record for track {track_id}: {new_uri}") 556
··· 23 update_record, 24 ) 25 from backend._internal.atproto.tid import datetime_to_tid 26 + from backend._internal.background_tasks import schedule_album_list_sync 27 from backend.config import settings 28 from backend.models import Artist, Tag, Track, TrackTag, get_db 29 from backend.schemas import TrackResponse ··· 147 f"failed to delete image {track.image_id}: {e}", exc_info=True 148 ) 149 150 + # capture album_id before deletion for list sync 151 + album_id_to_sync = track.album_id 152 + 153 # delete track record 154 await db.delete(track) 155 await db.commit() 156 + 157 + # sync album list record if track was in an album 158 + if album_id_to_sync: 159 + await schedule_album_list_sync(auth_session.session_id, album_id_to_sync) 160 161 return {"message": "track deleted successfully"} 162 ··· 196 track.title = title 197 title_changed = True 198 199 + # track album changes for list sync 200 + old_album_id = track.album_id 201 await apply_album_update(db, track, album) 202 + new_album_id = track.album_id 203 + album_changed = old_album_id != new_album_id 204 205 if features is not None: 206 track.features = await resolve_feature_handles( ··· 271 272 await db.commit() 273 await db.refresh(track) 274 + 275 + # sync album list records if album changed 276 + if album_changed: 277 + # sync old album (track was removed) 278 + if old_album_id: 279 + await schedule_album_list_sync(auth_session.session_id, old_album_id) 280 + # sync new album (track was added) 281 + if new_album_id: 282 + await schedule_album_list_sync(auth_session.session_id, new_album_id) 283 284 # build track_tags dict for response 285 # if tags were updated, use updated_tags; otherwise query for existing ··· 572 track.atproto_record_cid = new_cid 573 await db.commit() 574 await db.refresh(track) 575 + 576 + # sync album list if track is in an album 577 + if track.album_id: 578 + await schedule_album_list_sync(auth_session.session_id, track.album_id) 579 580 logger.info(f"restored ATProto record for track {track_id}: {new_uri}") 581
+10 -1
backend/src/backend/api/tracks/uploads.py
··· 30 from backend._internal.atproto import create_track_record 31 from backend._internal.atproto.handles import resolve_featured_artists 32 from backend._internal.audio import AudioFormat 33 - from backend._internal.background_tasks import schedule_copyright_scan 34 from backend._internal.image import ImageFormat 35 from backend._internal.jobs import job_service 36 from backend.config import settings ··· 412 413 if r2_url: 414 await schedule_copyright_scan(track.id, r2_url) 415 416 await job_service.update_progress( 417 ctx.upload_id,
··· 30 from backend._internal.atproto import create_track_record 31 from backend._internal.atproto.handles import resolve_featured_artists 32 from backend._internal.audio import AudioFormat 33 + from backend._internal.background_tasks import ( 34 + schedule_album_list_sync, 35 + schedule_copyright_scan, 36 + ) 37 from backend._internal.image import ImageFormat 38 from backend._internal.jobs import job_service 39 from backend.config import settings ··· 415 416 if r2_url: 417 await schedule_copyright_scan(track.id, r2_url) 418 + 419 + # sync album list record if track is in an album 420 + if album_record: 421 + await schedule_album_list_sync( 422 + ctx.auth_session.session_id, album_record.id 423 + ) 424 425 await job_service.update_progress( 426 ctx.upload_id,
+8
backend/tests/api/test_track_deletion.py
··· 317 "backend.api.tracks.mutations.storage.delete", 318 side_effect=mock_delete, 319 ), 320 ): 321 async with AsyncClient( 322 transport=ASGITransport(app=test_app), base_url="http://test" ··· 370 patch( 371 "backend.api.tracks.mutations.storage.delete", 372 side_effect=mock_delete, 373 ), 374 ): 375 async with AsyncClient(
··· 317 "backend.api.tracks.mutations.storage.delete", 318 side_effect=mock_delete, 319 ), 320 + patch( 321 + "backend.api.tracks.mutations.schedule_album_list_sync", 322 + new_callable=AsyncMock, 323 + ), 324 ): 325 async with AsyncClient( 326 transport=ASGITransport(app=test_app), base_url="http://test" ··· 374 patch( 375 "backend.api.tracks.mutations.storage.delete", 376 side_effect=mock_delete, 377 + ), 378 + patch( 379 + "backend.api.tracks.mutations.schedule_album_list_sync", 380 + new_callable=AsyncMock, 381 ), 382 ): 383 async with AsyncClient(