feat: move like and comment PDS writes to background tasks (#561)

* feat: move like and comment PDS writes to background tasks

Likes:
- Update like/unlike endpoints to use optimistic DB writes
- Schedule PDS record operations via docket background tasks
- Make atproto_like_uri nullable to support async PDS writes
- Update tests to verify background task scheduling

Comments:
- Update create/update/delete comment endpoints to use optimistic DB writes
- Schedule PDS record operations via docket background tasks
- Make atproto_comment_uri nullable to support async PDS writes
- Update tests to verify background task scheduling

This reduces API response times by moving slow PDS writes to the background
while keeping the UI responsive with immediate local database updates.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: move deferred imports to module level

Move imports that were unnecessarily deferred inside PDS task functions
to the top of the module for clarity.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

authored by zzstoatzz.io Claude and committed by GitHub dbfc7b70 c9fe8e43

+10
.claude/commands/investigate-report.md
···
··· 1 + --- 2 + description: Investigate the user report and fix it if you find it's a bug. 3 + argumment-hint: [the user report] 4 + --- 5 + 6 + First read the user report: $ARGUMENTS 7 + 8 + Look around the codebase via `tree`, `rg` and perhaps related git history to find the root cause of the bug. 9 + 10 + If you find the root cause, write a regression test, watch the test fail, fix the bug, and watch the test pass, then open a PR with the fix.
+41
backend/alembic/versions/2025_12_10_125948_4febe85be5a4_allow_null_atproto_like_uri_for_.py
···
··· 1 + """allow null atproto_like_uri for optimistic writes 2 + 3 + Revision ID: 4febe85be5a4 4 + Revises: a6069b752a90 5 + Create Date: 2025-12-10 12:59:48.646844 6 + 7 + """ 8 + 9 + from collections.abc import Sequence 10 + 11 + import sqlalchemy as sa 12 + 13 + from alembic import op 14 + 15 + # revision identifiers, used by Alembic. 16 + revision: str = "4febe85be5a4" 17 + down_revision: str | Sequence[str] | None = "a6069b752a90" 18 + branch_labels: str | Sequence[str] | None = None 19 + depends_on: str | Sequence[str] | None = None 20 + 21 + 22 + def upgrade() -> None: 23 + """Make atproto_like_uri nullable for background PDS writes.""" 24 + op.alter_column( 25 + "track_likes", 26 + "atproto_like_uri", 27 + existing_type=sa.VARCHAR(), 28 + nullable=True, 29 + ) 30 + 31 + 32 + def downgrade() -> None: 33 + """Revert atproto_like_uri to non-nullable.""" 34 + # first, delete any rows with NULL atproto_like_uri 35 + op.execute("DELETE FROM track_likes WHERE atproto_like_uri IS NULL") 36 + op.alter_column( 37 + "track_likes", 38 + "atproto_like_uri", 39 + existing_type=sa.VARCHAR(), 40 + nullable=False, 41 + )
+41
backend/alembic/versions/2025_12_10_131338_37cc1d6980c3_allow_null_atproto_comment_uri_for_.py
···
··· 1 + """allow null atproto_comment_uri for optimistic writes 2 + 3 + Revision ID: 37cc1d6980c3 4 + Revises: 4febe85be5a4 5 + Create Date: 2025-12-10 13:13:38.218675 6 + 7 + """ 8 + 9 + from collections.abc import Sequence 10 + 11 + import sqlalchemy as sa 12 + 13 + from alembic import op 14 + 15 + # revision identifiers, used by Alembic. 16 + revision: str = "37cc1d6980c3" 17 + down_revision: str | Sequence[str] | None = "4febe85be5a4" 18 + branch_labels: str | Sequence[str] | None = None 19 + depends_on: str | Sequence[str] | None = None 20 + 21 + 22 + def upgrade() -> None: 23 + """Make atproto_comment_uri nullable for background PDS writes.""" 24 + op.alter_column( 25 + "track_comments", 26 + "atproto_comment_uri", 27 + existing_type=sa.VARCHAR(), 28 + nullable=True, 29 + ) 30 + 31 + 32 + def downgrade() -> None: 33 + """Revert atproto_comment_uri to non-nullable.""" 34 + # first, delete any rows with NULL atproto_comment_uri 35 + op.execute("DELETE FROM track_comments WHERE atproto_comment_uri IS NULL") 36 + op.alter_column( 37 + "track_comments", 38 + "atproto_comment_uri", 39 + existing_type=sa.VARCHAR(), 40 + nullable=False, 41 + )
+285 -1
backend/src/backend/_internal/background_tasks.py
··· 11 import os 12 import tempfile 13 import zipfile 14 - from datetime import datetime 15 from pathlib import Path 16 17 import aioboto3 18 import aiofiles 19 import logfire 20 21 from backend._internal.background import get_docket 22 23 logger = logging.getLogger(__name__) 24 ··· 516 logfire.info("scheduled album list sync", album_id=album_id) 517 518 519 # collection of all background task functions for docket registration 520 background_tasks = [ 521 scan_copyright, ··· 523 sync_atproto, 524 scrobble_to_teal, 525 sync_album_list, 526 ]
··· 11 import os 12 import tempfile 13 import zipfile 14 + from datetime import UTC, datetime 15 from pathlib import Path 16 17 import aioboto3 18 import aiofiles 19 import logfire 20 + from sqlalchemy import select 21 22 + from backend._internal.atproto.records import ( 23 + create_comment_record, 24 + create_like_record, 25 + delete_record_by_uri, 26 + update_comment_record, 27 + ) 28 + from backend._internal.auth import get_session 29 from backend._internal.background import get_docket 30 + from backend.models import TrackComment, TrackLike 31 + from backend.utilities.database import db_session 32 33 logger = logging.getLogger(__name__) 34 ··· 526 logfire.info("scheduled album list sync", album_id=album_id) 527 528 529 + # --------------------------------------------------------------------------- 530 + # PDS record write tasks 531 + # 532 + # these tasks handle writing records to the user's PDS (Personal Data Server) 533 + # in the background, then updating the local database with the result. 534 + # this keeps API responses fast while ensuring PDS and DB stay in sync. 535 + # --------------------------------------------------------------------------- 536 + 537 + 538 + async def pds_create_like( 539 + session_id: str, 540 + like_id: int, 541 + subject_uri: str, 542 + subject_cid: str, 543 + ) -> None: 544 + """create a like record on the user's PDS and update the database. 545 + 546 + args: 547 + session_id: the user's session ID for authentication 548 + like_id: database ID of the TrackLike record to update 549 + subject_uri: AT URI of the track being liked 550 + subject_cid: CID of the track being liked 551 + """ 552 + auth_session = await get_session(session_id) 553 + if not auth_session: 554 + logger.warning(f"pds_create_like: session {session_id[:8]}... not found") 555 + return 556 + 557 + try: 558 + like_uri = await create_like_record( 559 + auth_session=auth_session, 560 + subject_uri=subject_uri, 561 + subject_cid=subject_cid, 562 + ) 563 + 564 + # update database with the ATProto URI 565 + async with db_session() as session: 566 + result = await session.execute( 567 + select(TrackLike).where(TrackLike.id == like_id) 568 + ) 569 + like = result.scalar_one_or_none() 570 + if like: 571 + like.atproto_like_uri = like_uri 572 + await session.commit() 573 + logger.info(f"pds_create_like: created like record {like_uri}") 574 + else: 575 + # like was deleted before we could update it - clean up orphan 576 + logger.warning(f"pds_create_like: like {like_id} no longer exists") 577 + await delete_record_by_uri(auth_session, like_uri) 578 + 579 + except Exception as e: 580 + logger.error(f"pds_create_like failed for like {like_id}: {e}", exc_info=True) 581 + # note: we don't delete the DB record on failure - user still sees "liked" 582 + # and we can retry or fix later. this is better than inconsistent state. 583 + 584 + 585 + async def schedule_pds_create_like( 586 + session_id: str, 587 + like_id: int, 588 + subject_uri: str, 589 + subject_cid: str, 590 + ) -> None: 591 + """schedule a like record creation via docket.""" 592 + docket = get_docket() 593 + await docket.add(pds_create_like)(session_id, like_id, subject_uri, subject_cid) 594 + logfire.info("scheduled pds like creation", like_id=like_id) 595 + 596 + 597 + async def pds_delete_like( 598 + session_id: str, 599 + like_uri: str, 600 + ) -> None: 601 + """delete a like record from the user's PDS. 602 + 603 + args: 604 + session_id: the user's session ID for authentication 605 + like_uri: AT URI of the like record to delete 606 + """ 607 + auth_session = await get_session(session_id) 608 + if not auth_session: 609 + logger.warning(f"pds_delete_like: session {session_id[:8]}... not found") 610 + return 611 + 612 + try: 613 + await delete_record_by_uri(auth_session, like_uri) 614 + logger.info(f"pds_delete_like: deleted like record {like_uri}") 615 + except Exception as e: 616 + logger.error(f"pds_delete_like failed for {like_uri}: {e}", exc_info=True) 617 + # deletion failed - the PDS record may still exist, but DB is already clean 618 + # this is acceptable: orphaned PDS records are harmless 619 + 620 + 621 + async def schedule_pds_delete_like(session_id: str, like_uri: str) -> None: 622 + """schedule a like record deletion via docket.""" 623 + docket = get_docket() 624 + await docket.add(pds_delete_like)(session_id, like_uri) 625 + logfire.info("scheduled pds like deletion", like_uri=like_uri) 626 + 627 + 628 + async def pds_create_comment( 629 + session_id: str, 630 + comment_id: int, 631 + subject_uri: str, 632 + subject_cid: str, 633 + text: str, 634 + timestamp_ms: int, 635 + ) -> None: 636 + """create a comment record on the user's PDS and update the database. 637 + 638 + args: 639 + session_id: the user's session ID for authentication 640 + comment_id: database ID of the TrackComment record to update 641 + subject_uri: AT URI of the track being commented on 642 + subject_cid: CID of the track being commented on 643 + text: comment text 644 + timestamp_ms: playback position when comment was made 645 + """ 646 + auth_session = await get_session(session_id) 647 + if not auth_session: 648 + logger.warning(f"pds_create_comment: session {session_id[:8]}... not found") 649 + return 650 + 651 + try: 652 + comment_uri = await create_comment_record( 653 + auth_session=auth_session, 654 + subject_uri=subject_uri, 655 + subject_cid=subject_cid, 656 + text=text, 657 + timestamp_ms=timestamp_ms, 658 + ) 659 + 660 + # update database with the ATProto URI 661 + async with db_session() as session: 662 + result = await session.execute( 663 + select(TrackComment).where(TrackComment.id == comment_id) 664 + ) 665 + comment = result.scalar_one_or_none() 666 + if comment: 667 + comment.atproto_comment_uri = comment_uri 668 + await session.commit() 669 + logger.info(f"pds_create_comment: created comment record {comment_uri}") 670 + else: 671 + # comment was deleted before we could update it - clean up orphan 672 + logger.warning( 673 + f"pds_create_comment: comment {comment_id} no longer exists" 674 + ) 675 + await delete_record_by_uri(auth_session, comment_uri) 676 + 677 + except Exception as e: 678 + logger.error( 679 + f"pds_create_comment failed for comment {comment_id}: {e}", exc_info=True 680 + ) 681 + 682 + 683 + async def schedule_pds_create_comment( 684 + session_id: str, 685 + comment_id: int, 686 + subject_uri: str, 687 + subject_cid: str, 688 + text: str, 689 + timestamp_ms: int, 690 + ) -> None: 691 + """schedule a comment record creation via docket.""" 692 + docket = get_docket() 693 + await docket.add(pds_create_comment)( 694 + session_id, comment_id, subject_uri, subject_cid, text, timestamp_ms 695 + ) 696 + logfire.info("scheduled pds comment creation", comment_id=comment_id) 697 + 698 + 699 + async def pds_delete_comment( 700 + session_id: str, 701 + comment_uri: str, 702 + ) -> None: 703 + """delete a comment record from the user's PDS. 704 + 705 + args: 706 + session_id: the user's session ID for authentication 707 + comment_uri: AT URI of the comment record to delete 708 + """ 709 + auth_session = await get_session(session_id) 710 + if not auth_session: 711 + logger.warning(f"pds_delete_comment: session {session_id[:8]}... not found") 712 + return 713 + 714 + try: 715 + await delete_record_by_uri(auth_session, comment_uri) 716 + logger.info(f"pds_delete_comment: deleted comment record {comment_uri}") 717 + except Exception as e: 718 + logger.error(f"pds_delete_comment failed for {comment_uri}: {e}", exc_info=True) 719 + 720 + 721 + async def schedule_pds_delete_comment(session_id: str, comment_uri: str) -> None: 722 + """schedule a comment record deletion via docket.""" 723 + docket = get_docket() 724 + await docket.add(pds_delete_comment)(session_id, comment_uri) 725 + logfire.info("scheduled pds comment deletion", comment_uri=comment_uri) 726 + 727 + 728 + async def pds_update_comment( 729 + session_id: str, 730 + comment_id: int, 731 + comment_uri: str, 732 + subject_uri: str, 733 + subject_cid: str, 734 + text: str, 735 + timestamp_ms: int, 736 + created_at: datetime, 737 + ) -> None: 738 + """update a comment record on the user's PDS. 739 + 740 + args: 741 + session_id: the user's session ID for authentication 742 + comment_id: database ID of the TrackComment record 743 + comment_uri: AT URI of the comment record to update 744 + subject_uri: AT URI of the track being commented on 745 + subject_cid: CID of the track being commented on 746 + text: new comment text 747 + timestamp_ms: playback position when comment was made 748 + created_at: original creation timestamp 749 + """ 750 + auth_session = await get_session(session_id) 751 + if not auth_session: 752 + logger.warning(f"pds_update_comment: session {session_id[:8]}... not found") 753 + return 754 + 755 + try: 756 + await update_comment_record( 757 + auth_session=auth_session, 758 + comment_uri=comment_uri, 759 + subject_uri=subject_uri, 760 + subject_cid=subject_cid, 761 + text=text, 762 + timestamp_ms=timestamp_ms, 763 + created_at=created_at, 764 + updated_at=datetime.now(UTC), 765 + ) 766 + logger.info(f"pds_update_comment: updated comment record {comment_uri}") 767 + except Exception as e: 768 + logger.error( 769 + f"pds_update_comment failed for comment {comment_id}: {e}", exc_info=True 770 + ) 771 + 772 + 773 + async def schedule_pds_update_comment( 774 + session_id: str, 775 + comment_id: int, 776 + comment_uri: str, 777 + subject_uri: str, 778 + subject_cid: str, 779 + text: str, 780 + timestamp_ms: int, 781 + created_at: datetime, 782 + ) -> None: 783 + """schedule a comment record update via docket.""" 784 + docket = get_docket() 785 + await docket.add(pds_update_comment)( 786 + session_id, 787 + comment_id, 788 + comment_uri, 789 + subject_uri, 790 + subject_cid, 791 + text, 792 + timestamp_ms, 793 + created_at, 794 + ) 795 + logfire.info("scheduled pds comment update", comment_id=comment_id) 796 + 797 + 798 # collection of all background task functions for docket registration 799 background_tasks = [ 800 scan_copyright, ··· 802 sync_atproto, 803 scrobble_to_teal, 804 sync_album_list, 805 + pds_create_like, 806 + pds_delete_like, 807 + pds_create_comment, 808 + pds_delete_comment, 809 + pds_update_comment, 810 ]
+71 -73
backend/src/backend/api/tracks/comments.py
··· 4 from datetime import UTC, datetime 5 from typing import Annotated 6 7 - from fastapi import Depends, HTTPException 8 from pydantic import BaseModel, Field 9 from sqlalchemy import func, select 10 from sqlalchemy.ext.asyncio import AsyncSession 11 12 from backend._internal import Session as AuthSession 13 from backend._internal import require_auth 14 - from backend._internal.atproto import ( 15 - create_comment_record, 16 - delete_record_by_uri, 17 - update_comment_record, 18 ) 19 from backend.models import Artist, Track, TrackComment, UserPreferences, get_db 20 ··· 121 async def create_comment( 122 track_id: int, 123 body: CommentCreate, 124 db: Annotated[AsyncSession, Depends(get_db)], 125 auth_session: AuthSession = Depends(require_auth), 126 ) -> CommentResponse: 127 """create a timed comment on a track. 128 129 requires auth. track owner must have allow_comments enabled. 130 """ 131 # get track 132 track_result = await db.execute(select(Track).where(Track.id == track_id)) ··· 165 detail=f"track has reached maximum of {MAX_COMMENTS_PER_TRACK} comments", 166 ) 167 168 - # create ATProto record 169 - comment_uri = None 170 - try: 171 - comment_uri = await create_comment_record( 172 - auth_session=auth_session, 173 - subject_uri=track.atproto_record_uri, 174 - subject_cid=track.atproto_record_cid, 175 - text=body.text, 176 - timestamp_ms=body.timestamp_ms, 177 - ) 178 - 179 - # create database record 180 - comment = TrackComment( 181 - track_id=track_id, 182 - user_did=auth_session.did, 183 - text=body.text, 184 - timestamp_ms=body.timestamp_ms, 185 - atproto_comment_uri=comment_uri, 186 - ) 187 - db.add(comment) 188 - await db.commit() 189 - await db.refresh(comment) 190 191 - except Exception as e: 192 - await db.rollback() 193 - logger.error( 194 - f"failed to create comment on track {track_id} by {auth_session.did}: {e}", 195 - exc_info=True, 196 - ) 197 - # cleanup ATProto record if created 198 - if comment_uri: 199 - try: 200 - await delete_record_by_uri(auth_session, comment_uri) 201 - logger.info( 202 - f"cleaned up orphaned ATProto comment record: {comment_uri}" 203 - ) 204 - except Exception as cleanup_exc: 205 - logger.error( 206 - f"failed to cleanup orphaned comment record: {cleanup_exc}" 207 - ) 208 - raise HTTPException(status_code=500, detail="failed to create comment") from e 209 210 # get user info for response 211 artist_result = await db.execute( ··· 229 @router.delete("/comments/{comment_id}") 230 async def delete_comment( 231 comment_id: int, 232 db: Annotated[AsyncSession, Depends(get_db)], 233 auth_session: AuthSession = Depends(require_auth), 234 ) -> dict: 235 - """delete a comment. only the author can delete their own comments.""" 236 comment_result = await db.execute( 237 select(TrackComment).where(TrackComment.id == comment_id) 238 ) ··· 244 if comment.user_did != auth_session.did: 245 raise HTTPException(status_code=403, detail="can only delete your own comments") 246 247 - # delete ATProto record 248 - try: 249 - await delete_record_by_uri(auth_session, comment.atproto_comment_uri) 250 - except Exception as e: 251 - logger.error(f"failed to delete ATProto comment record: {e}") 252 - # continue with DB deletion anyway 253 254 await db.delete(comment) 255 await db.commit() 256 257 return {"deleted": True} 258 259 ··· 261 async def update_comment( 262 comment_id: int, 263 body: CommentUpdate, 264 db: Annotated[AsyncSession, Depends(get_db)], 265 auth_session: AuthSession = Depends(require_auth), 266 ) -> CommentResponse: 267 - """update a comment's text. only the author can edit their own comments.""" 268 comment_result = await db.execute( 269 select(TrackComment).where(TrackComment.id == comment_id) 270 ) ··· 286 detail="track missing ATProto record - cannot update comment", 287 ) 288 289 - # update ATProto record first 290 updated_at = datetime.now(UTC) 291 - try: 292 - await update_comment_record( 293 - auth_session=auth_session, 294 comment_uri=comment.atproto_comment_uri, 295 subject_uri=track.atproto_record_uri, 296 subject_cid=track.atproto_record_cid, 297 text=body.text, 298 timestamp_ms=comment.timestamp_ms, 299 created_at=comment.created_at, 300 - updated_at=updated_at, 301 ) 302 - except Exception as e: 303 - logger.error( 304 - f"failed to update ATProto comment record {comment.atproto_comment_uri}: {e}", 305 - exc_info=True, 306 - ) 307 - raise HTTPException( 308 - status_code=500, detail="failed to update comment on ATProto" 309 - ) from e 310 - 311 - # update database record 312 - comment.text = body.text 313 - comment.updated_at = updated_at 314 - 315 - await db.commit() 316 - await db.refresh(comment) 317 318 # get user info for response 319 artist_result = await db.execute(
··· 4 from datetime import UTC, datetime 5 from typing import Annotated 6 7 + from fastapi import Cookie, Depends, HTTPException, Request 8 from pydantic import BaseModel, Field 9 from sqlalchemy import func, select 10 from sqlalchemy.ext.asyncio import AsyncSession 11 12 from backend._internal import Session as AuthSession 13 from backend._internal import require_auth 14 + from backend._internal.background_tasks import ( 15 + schedule_pds_create_comment, 16 + schedule_pds_delete_comment, 17 + schedule_pds_update_comment, 18 ) 19 from backend.models import Artist, Track, TrackComment, UserPreferences, get_db 20 ··· 121 async def create_comment( 122 track_id: int, 123 body: CommentCreate, 124 + request: Request, 125 db: Annotated[AsyncSession, Depends(get_db)], 126 auth_session: AuthSession = Depends(require_auth), 127 + session_id_cookie: Annotated[str | None, Cookie(alias="session_id")] = None, 128 ) -> CommentResponse: 129 """create a timed comment on a track. 130 131 requires auth. track owner must have allow_comments enabled. 132 + the comment is visible immediately; the ATProto record is created in background. 133 """ 134 # get track 135 track_result = await db.execute(select(Track).where(Track.id == track_id)) ··· 168 detail=f"track has reached maximum of {MAX_COMMENTS_PER_TRACK} comments", 169 ) 170 171 + # create database record immediately (optimistic) 172 + comment = TrackComment( 173 + track_id=track_id, 174 + user_did=auth_session.did, 175 + text=body.text, 176 + timestamp_ms=body.timestamp_ms, 177 + atproto_comment_uri=None, # will be set by background task 178 + ) 179 + db.add(comment) 180 + await db.commit() 181 + await db.refresh(comment) 182 183 + # schedule PDS record creation in background 184 + session_id = session_id_cookie or request.headers.get("authorization", "").replace( 185 + "Bearer ", "" 186 + ) 187 + await schedule_pds_create_comment( 188 + session_id=session_id, 189 + comment_id=comment.id, 190 + subject_uri=track.atproto_record_uri, 191 + subject_cid=track.atproto_record_cid, 192 + text=body.text, 193 + timestamp_ms=body.timestamp_ms, 194 + ) 195 196 # get user info for response 197 artist_result = await db.execute( ··· 215 @router.delete("/comments/{comment_id}") 216 async def delete_comment( 217 comment_id: int, 218 + request: Request, 219 db: Annotated[AsyncSession, Depends(get_db)], 220 auth_session: AuthSession = Depends(require_auth), 221 + session_id_cookie: Annotated[str | None, Cookie(alias="session_id")] = None, 222 ) -> dict: 223 + """delete a comment. only the author can delete their own comments. 224 + 225 + the comment is removed immediately; the ATProto record is deleted in background. 226 + """ 227 comment_result = await db.execute( 228 select(TrackComment).where(TrackComment.id == comment_id) 229 ) ··· 235 if comment.user_did != auth_session.did: 236 raise HTTPException(status_code=403, detail="can only delete your own comments") 237 238 + # capture the ATProto URI before deleting the DB record 239 + comment_uri = comment.atproto_comment_uri 240 241 + # delete database record immediately (optimistic) 242 await db.delete(comment) 243 await db.commit() 244 245 + # schedule PDS record deletion in background (if URI exists) 246 + if comment_uri: 247 + session_id = session_id_cookie or request.headers.get( 248 + "authorization", "" 249 + ).replace("Bearer ", "") 250 + await schedule_pds_delete_comment( 251 + session_id=session_id, 252 + comment_uri=comment_uri, 253 + ) 254 + 255 return {"deleted": True} 256 257 ··· 259 async def update_comment( 260 comment_id: int, 261 body: CommentUpdate, 262 + request: Request, 263 db: Annotated[AsyncSession, Depends(get_db)], 264 auth_session: AuthSession = Depends(require_auth), 265 + session_id_cookie: Annotated[str | None, Cookie(alias="session_id")] = None, 266 ) -> CommentResponse: 267 + """update a comment's text. only the author can edit their own comments. 268 + 269 + the comment is updated immediately; the ATProto record is updated in background. 270 + """ 271 comment_result = await db.execute( 272 select(TrackComment).where(TrackComment.id == comment_id) 273 ) ··· 289 detail="track missing ATProto record - cannot update comment", 290 ) 291 292 + # update database record immediately (optimistic) 293 updated_at = datetime.now(UTC) 294 + comment.text = body.text 295 + comment.updated_at = updated_at 296 + 297 + await db.commit() 298 + await db.refresh(comment) 299 + 300 + # schedule PDS record update in background (if URI exists) 301 + if comment.atproto_comment_uri: 302 + session_id = session_id_cookie or request.headers.get( 303 + "authorization", "" 304 + ).replace("Bearer ", "") 305 + await schedule_pds_update_comment( 306 + session_id=session_id, 307 + comment_id=comment.id, 308 comment_uri=comment.atproto_comment_uri, 309 subject_uri=track.atproto_record_uri, 310 subject_cid=track.atproto_record_cid, 311 text=body.text, 312 timestamp_ms=comment.timestamp_ms, 313 created_at=comment.created_at, 314 ) 315 316 # get user info for response 317 artist_result = await db.execute(
+52 -79
backend/src/backend/api/tracks/likes.py
··· 4 import logging 5 from typing import Annotated 6 7 - from fastapi import Depends, HTTPException 8 from sqlalchemy import select 9 from sqlalchemy.ext.asyncio import AsyncSession 10 from sqlalchemy.orm import selectinload 11 12 from backend._internal import Session as AuthSession 13 from backend._internal import require_auth 14 - from backend._internal.atproto import create_like_record, delete_record_by_uri 15 from backend.models import Artist, Track, TrackLike, get_db 16 from backend.schemas import TrackResponse 17 from backend.utilities.aggregations import get_comment_counts, get_like_counts ··· 64 @router.post("/{track_id}/like") 65 async def like_track( 66 track_id: int, 67 db: Annotated[AsyncSession, Depends(get_db)], 68 auth_session: AuthSession = Depends(require_auth), 69 ) -> dict: 70 - """Like a track - creates ATProto record and stores in index.""" 71 result = await db.execute(select(Track).where(Track.id == track_id)) 72 track = result.scalar_one_or_none() 73 ··· 91 if existing_like.scalar_one_or_none(): 92 return {"liked": True} 93 94 - like_uri = None 95 - try: 96 - like_uri = await create_like_record( 97 - auth_session=auth_session, 98 - subject_uri=track.atproto_record_uri, 99 - subject_cid=track.atproto_record_cid, 100 - ) 101 102 - like = TrackLike( 103 - track_id=track_id, 104 - user_did=auth_session.did, 105 - atproto_like_uri=like_uri, 106 - ) 107 - db.add(like) 108 - await db.commit() 109 - except Exception as e: 110 - logger.error( 111 - f"failed to like track {track_id} for user {auth_session.did}: {e}", 112 - exc_info=True, 113 - ) 114 - if like_uri: 115 - try: 116 - await delete_record_by_uri( 117 - auth_session=auth_session, 118 - record_uri=like_uri, 119 - ) 120 - logger.info(f"cleaned up orphaned ATProto like record: {like_uri}") 121 - except Exception as cleanup_exc: 122 - logger.error( 123 - f"failed to clean up orphaned ATProto like record {like_uri}: {cleanup_exc}" 124 - ) 125 - raise HTTPException( 126 - status_code=500, detail="failed to like track - please try again" 127 - ) from e 128 129 - return {"liked": True, "atproto_uri": like_uri} 130 131 132 @router.delete("/{track_id}/like") 133 async def unlike_track( 134 track_id: int, 135 db: Annotated[AsyncSession, Depends(get_db)], 136 auth_session: AuthSession = Depends(require_auth), 137 ) -> dict: 138 - """Unlike a track - deletes ATProto record and removes from index.""" 139 result = await db.execute( 140 select(TrackLike).where( 141 TrackLike.track_id == track_id, TrackLike.user_did == auth_session.did ··· 146 if not like: 147 return {"liked": False} 148 149 - track_result = await db.execute(select(Track).where(Track.id == track_id)) 150 - track = track_result.scalar_one_or_none() 151 - if not track: 152 - raise HTTPException(status_code=404, detail="track not found") 153 154 - if not track.atproto_record_uri or not track.atproto_record_cid: 155 - raise HTTPException( 156 - status_code=422, 157 - detail={ 158 - "error": "missing_atproto_record", 159 - "message": "this track cannot be unliked because its ATProto record is missing", 160 - }, 161 - ) 162 163 - await delete_record_by_uri( 164 - auth_session=auth_session, 165 - record_uri=like.atproto_like_uri, 166 - ) 167 - 168 - await db.delete(like) 169 - try: 170 - await db.commit() 171 - except Exception as e: 172 - logger.error( 173 - f"failed to commit unlike to database after deleting ATProto record: {e}" 174 ) 175 - try: 176 - recreated_uri = await create_like_record( 177 - auth_session=auth_session, 178 - subject_uri=track.atproto_record_uri, 179 - subject_cid=track.atproto_record_cid, 180 - ) 181 - logger.info( 182 - f"rolled back ATProto deletion by recreating like: {recreated_uri}" 183 - ) 184 - except Exception as rollback_exc: 185 - logger.critical( 186 - f"failed to rollback ATProto deletion for track {track_id}, " 187 - f"user {auth_session.did}: {rollback_exc}. " 188 - "database and ATProto are now inconsistent" 189 - ) 190 - raise HTTPException( 191 - status_code=500, detail="failed to unlike track - please try again" 192 - ) from e 193 194 return {"liked": False} 195
··· 4 import logging 5 from typing import Annotated 6 7 + from fastapi import Cookie, Depends, HTTPException, Request 8 from sqlalchemy import select 9 from sqlalchemy.ext.asyncio import AsyncSession 10 from sqlalchemy.orm import selectinload 11 12 from backend._internal import Session as AuthSession 13 from backend._internal import require_auth 14 + from backend._internal.background_tasks import ( 15 + schedule_pds_create_like, 16 + schedule_pds_delete_like, 17 + ) 18 from backend.models import Artist, Track, TrackLike, get_db 19 from backend.schemas import TrackResponse 20 from backend.utilities.aggregations import get_comment_counts, get_like_counts ··· 67 @router.post("/{track_id}/like") 68 async def like_track( 69 track_id: int, 70 + request: Request, 71 db: Annotated[AsyncSession, Depends(get_db)], 72 auth_session: AuthSession = Depends(require_auth), 73 + session_id_cookie: Annotated[str | None, Cookie(alias="session_id")] = None, 74 ) -> dict: 75 + """Like a track - stores in database immediately, creates ATProto record in background. 76 + 77 + The like is visible immediately in the UI. The ATProto record is created 78 + asynchronously via a background task, keeping the API response fast. 79 + """ 80 result = await db.execute(select(Track).where(Track.id == track_id)) 81 track = result.scalar_one_or_none() 82 ··· 100 if existing_like.scalar_one_or_none(): 101 return {"liked": True} 102 103 + # create database record immediately (optimistic) 104 + like = TrackLike( 105 + track_id=track_id, 106 + user_did=auth_session.did, 107 + atproto_like_uri=None, # will be set by background task 108 + ) 109 + db.add(like) 110 + await db.commit() 111 + await db.refresh(like) 112 113 + # schedule PDS record creation in background 114 + session_id = session_id_cookie or request.headers.get("authorization", "").replace( 115 + "Bearer ", "" 116 + ) 117 + await schedule_pds_create_like( 118 + session_id=session_id, 119 + like_id=like.id, 120 + subject_uri=track.atproto_record_uri, 121 + subject_cid=track.atproto_record_cid, 122 + ) 123 124 + return {"liked": True} 125 126 127 @router.delete("/{track_id}/like") 128 async def unlike_track( 129 track_id: int, 130 + request: Request, 131 db: Annotated[AsyncSession, Depends(get_db)], 132 auth_session: AuthSession = Depends(require_auth), 133 + session_id_cookie: Annotated[str | None, Cookie(alias="session_id")] = None, 134 ) -> dict: 135 + """Unlike a track - removes from database immediately, deletes ATProto record in background. 136 + 137 + The unlike is reflected immediately in the UI. The ATProto record deletion 138 + happens asynchronously via a background task. 139 + """ 140 result = await db.execute( 141 select(TrackLike).where( 142 TrackLike.track_id == track_id, TrackLike.user_did == auth_session.did ··· 147 if not like: 148 return {"liked": False} 149 150 + # capture the ATProto URI before deleting the DB record 151 + like_uri = like.atproto_like_uri 152 153 + # delete database record immediately (optimistic) 154 + await db.delete(like) 155 + await db.commit() 156 157 + # schedule PDS record deletion in background (if URI exists) 158 + if like_uri: 159 + session_id = session_id_cookie or request.headers.get( 160 + "authorization", "" 161 + ).replace("Bearer ", "") 162 + await schedule_pds_delete_like( 163 + session_id=session_id, 164 + like_uri=like_uri, 165 ) 166 167 return {"liked": False} 168
+3 -2
backend/src/backend/models/track_comment.py
··· 42 timestamp_ms: Mapped[int] = mapped_column(Integer, nullable=False) 43 44 # ATProto comment record URI (source of truth) 45 - atproto_comment_uri: Mapped[str] = mapped_column( 46 - String, nullable=False, unique=True, index=True 47 ) 48 49 # when it was created (indexed from ATProto record)
··· 42 timestamp_ms: Mapped[int] = mapped_column(Integer, nullable=False) 43 44 # ATProto comment record URI (source of truth) 45 + # nullable to support optimistic DB writes before PDS record creation 46 + atproto_comment_uri: Mapped[str | None] = mapped_column( 47 + String, nullable=True, unique=True, index=True 48 ) 49 50 # when it was created (indexed from ATProto record)
+3 -2
backend/src/backend/models/track_like.py
··· 36 user_did: Mapped[str] = mapped_column(String, nullable=False, index=True) 37 38 # ATProto like record URI (source of truth) 39 - atproto_like_uri: Mapped[str] = mapped_column( 40 - String, nullable=False, unique=True, index=True 41 ) 42 43 # when it was liked (indexed from ATProto record)
··· 36 user_did: Mapped[str] = mapped_column(String, nullable=False, index=True) 37 38 # ATProto like record URI (source of truth) 39 + # nullable to support optimistic DB writes before PDS record creation 40 + atproto_like_uri: Mapped[str | None] = mapped_column( 41 + String, nullable=True, unique=True, index=True 42 ) 43 44 # when it was liked (indexed from ATProto record)
+116 -49
backend/tests/api/test_track_comments.py
··· 176 artist_with_comments_enabled: UserPreferences, 177 commenter_artist: Artist, 178 ): 179 - """test successful comment creation.""" 180 - with patch("backend.api.tracks.comments.create_comment_record") as mock_create: 181 - mock_create.return_value = "at://did:test:commenter123/fm.plyr.comment/xyz" 182 - 183 async with AsyncClient( 184 transport=ASGITransport(app=test_app), base_url="http://test" 185 ) as client: ··· 194 assert data["timestamp_ms"] == 30000 195 assert data["user_did"] == "did:test:commenter123" 196 197 - # verify ATProto record was created 198 - mock_create.assert_called_once() 199 200 - # verify DB entry 201 result = await db_session.execute( 202 select(TrackComment).where(TrackComment.track_id == test_track.id) 203 ) 204 comment = result.scalar_one() 205 assert comment.text == "awesome drop at this moment!" 206 assert comment.timestamp_ms == 30000 207 208 209 async def test_create_comment_respects_limit( ··· 285 artist_with_comments_enabled: UserPreferences, 286 commenter_artist: Artist, 287 ): 288 - """test that comment owner can edit their comment and ATProto record is updated.""" 289 - from unittest.mock import AsyncMock, patch 290 - 291 comment = TrackComment( 292 track_id=test_track.id, 293 user_did="did:test:commenter123", ··· 300 await db_session.refresh(comment) 301 302 with patch( 303 - "backend.api.tracks.comments.update_comment_record", new_callable=AsyncMock 304 - ) as mock_update: 305 - mock_update.return_value = "bafynewcid" 306 - 307 async with AsyncClient( 308 transport=ASGITransport(app=test_app), base_url="http://test" 309 ) as client: ··· 317 assert data["text"] == "edited text" 318 assert data["updated_at"] is not None 319 320 - # verify ATProto record was updated 321 - mock_update.assert_called_once() 322 - call_kwargs = mock_update.call_args.kwargs 323 assert call_kwargs["comment_uri"] == comment.atproto_comment_uri 324 assert call_kwargs["subject_uri"] == test_track.atproto_record_uri 325 assert call_kwargs["subject_cid"] == test_track.atproto_record_cid ··· 327 assert call_kwargs["timestamp_ms"] == 5000 328 329 330 - async def test_edit_comment_syncs_to_atproto( 331 test_app: FastAPI, 332 db_session: AsyncSession, 333 test_track: Track, 334 artist_with_comments_enabled: UserPreferences, 335 commenter_artist: Artist, 336 ): 337 - """regression test: editing a comment must update the ATProto record.""" 338 - from unittest.mock import AsyncMock, patch 339 - 340 comment = TrackComment( 341 track_id=test_track.id, 342 user_did="did:test:commenter123", 343 text="original text", 344 - timestamp_ms=12345, 345 - atproto_comment_uri="at://did:test:commenter123/fm.plyr.comment/sync1", 346 ) 347 db_session.add(comment) 348 await db_session.commit() 349 await db_session.refresh(comment) 350 - original_created_at = comment.created_at 351 352 with patch( 353 - "backend.api.tracks.comments.update_comment_record", new_callable=AsyncMock 354 - ) as mock_update: 355 - mock_update.return_value = "bafynewcid123" 356 - 357 async with AsyncClient( 358 transport=ASGITransport(app=test_app), base_url="http://test" 359 ) as client: 360 response = await client.patch( 361 f"/tracks/comments/{comment.id}", 362 - json={"text": "updated via edit"}, 363 ) 364 365 assert response.status_code == 200 366 367 - # verify ATProto sync happened with correct data 368 - mock_update.assert_called_once() 369 - call_kwargs = mock_update.call_args.kwargs 370 - 371 - # subject must reference the track's ATProto record 372 - assert call_kwargs["subject_uri"] == test_track.atproto_record_uri 373 - assert call_kwargs["subject_cid"] == test_track.atproto_record_cid 374 - 375 - # original timestamp_ms and created_at preserved 376 - assert call_kwargs["timestamp_ms"] == 12345 377 - assert call_kwargs["created_at"] == original_created_at 378 - 379 - # new text and updated_at passed 380 - assert call_kwargs["text"] == "updated via edit" 381 - assert "updated_at" in call_kwargs # updated_at should be set 382 383 384 async def test_edit_comment_forbidden_for_other_user( ··· 418 artist_with_comments_enabled: UserPreferences, 419 commenter_artist: Artist, 420 ): 421 - """test that comment owner can delete their comment.""" 422 - from unittest.mock import AsyncMock, patch 423 - 424 comment = TrackComment( 425 track_id=test_track.id, 426 user_did="did:test:commenter123", ··· 434 comment_id = comment.id 435 436 with patch( 437 - "backend.api.tracks.comments.delete_record_by_uri", new_callable=AsyncMock 438 - ): 439 async with AsyncClient( 440 transport=ASGITransport(app=test_app), base_url="http://test" 441 ) as client: ··· 444 assert response.status_code == 200 445 assert response.json()["deleted"] is True 446 447 result = await db_session.execute( 448 select(TrackComment).where(TrackComment.id == comment_id) 449 )
··· 176 artist_with_comments_enabled: UserPreferences, 177 commenter_artist: Artist, 178 ): 179 + """test successful comment creation schedules background task.""" 180 + with patch( 181 + "backend.api.tracks.comments.schedule_pds_create_comment" 182 + ) as mock_schedule: 183 async with AsyncClient( 184 transport=ASGITransport(app=test_app), base_url="http://test" 185 ) as client: ··· 194 assert data["timestamp_ms"] == 30000 195 assert data["user_did"] == "did:test:commenter123" 196 197 + # verify background task was scheduled 198 + mock_schedule.assert_called_once() 199 + call_kwargs = mock_schedule.call_args.kwargs 200 + assert call_kwargs["subject_uri"] == test_track.atproto_record_uri 201 + assert call_kwargs["subject_cid"] == test_track.atproto_record_cid 202 + assert call_kwargs["text"] == "awesome drop at this moment!" 203 + assert call_kwargs["timestamp_ms"] == 30000 204 205 + # verify DB entry exists (created immediately, before PDS) 206 result = await db_session.execute( 207 select(TrackComment).where(TrackComment.track_id == test_track.id) 208 ) 209 comment = result.scalar_one() 210 assert comment.text == "awesome drop at this moment!" 211 assert comment.timestamp_ms == 30000 212 + # atproto_comment_uri is None initially - will be set by background task 213 + assert comment.atproto_comment_uri is None 214 + 215 + 216 + async def test_create_comment_db_entry_has_correct_comment_id( 217 + test_app: FastAPI, 218 + db_session: AsyncSession, 219 + test_track: Track, 220 + artist_with_comments_enabled: UserPreferences, 221 + commenter_artist: Artist, 222 + ): 223 + """test that the comment_id passed to background task matches the DB record.""" 224 + with patch( 225 + "backend.api.tracks.comments.schedule_pds_create_comment" 226 + ) as mock_schedule: 227 + async with AsyncClient( 228 + transport=ASGITransport(app=test_app), base_url="http://test" 229 + ) as client: 230 + await client.post( 231 + f"/tracks/{test_track.id}/comments", 232 + json={"text": "test comment", "timestamp_ms": 5000}, 233 + ) 234 + 235 + # get the comment_id from the scheduled call 236 + call_kwargs = mock_schedule.call_args.kwargs 237 + scheduled_comment_id = call_kwargs["comment_id"] 238 + 239 + # verify it matches the DB record 240 + result = await db_session.execute( 241 + select(TrackComment).where(TrackComment.track_id == test_track.id) 242 + ) 243 + comment = result.scalar_one() 244 + assert comment.id == scheduled_comment_id 245 246 247 async def test_create_comment_respects_limit( ··· 323 artist_with_comments_enabled: UserPreferences, 324 commenter_artist: Artist, 325 ): 326 + """test that comment owner can edit their comment and background task is scheduled.""" 327 comment = TrackComment( 328 track_id=test_track.id, 329 user_did="did:test:commenter123", ··· 336 await db_session.refresh(comment) 337 338 with patch( 339 + "backend.api.tracks.comments.schedule_pds_update_comment" 340 + ) as mock_schedule: 341 async with AsyncClient( 342 transport=ASGITransport(app=test_app), base_url="http://test" 343 ) as client: ··· 351 assert data["text"] == "edited text" 352 assert data["updated_at"] is not None 353 354 + # verify background task was scheduled 355 + mock_schedule.assert_called_once() 356 + call_kwargs = mock_schedule.call_args.kwargs 357 assert call_kwargs["comment_uri"] == comment.atproto_comment_uri 358 assert call_kwargs["subject_uri"] == test_track.atproto_record_uri 359 assert call_kwargs["subject_cid"] == test_track.atproto_record_cid ··· 361 assert call_kwargs["timestamp_ms"] == 5000 362 363 364 + async def test_edit_comment_without_atproto_uri( 365 test_app: FastAPI, 366 db_session: AsyncSession, 367 test_track: Track, 368 artist_with_comments_enabled: UserPreferences, 369 commenter_artist: Artist, 370 ): 371 + """test that editing a comment without ATProto URI doesn't schedule update.""" 372 + # comment without ATProto URI (e.g., background task hasn't run yet) 373 comment = TrackComment( 374 track_id=test_track.id, 375 user_did="did:test:commenter123", 376 text="original text", 377 + timestamp_ms=5000, 378 + atproto_comment_uri=None, 379 ) 380 db_session.add(comment) 381 await db_session.commit() 382 await db_session.refresh(comment) 383 384 with patch( 385 + "backend.api.tracks.comments.schedule_pds_update_comment" 386 + ) as mock_schedule: 387 async with AsyncClient( 388 transport=ASGITransport(app=test_app), base_url="http://test" 389 ) as client: 390 response = await client.patch( 391 f"/tracks/comments/{comment.id}", 392 + json={"text": "edited text"}, 393 ) 394 395 assert response.status_code == 200 396 + data = response.json() 397 + assert data["text"] == "edited text" 398 399 + # no PDS update should be scheduled since there's no ATProto record 400 + mock_schedule.assert_not_called() 401 402 403 async def test_edit_comment_forbidden_for_other_user( ··· 437 artist_with_comments_enabled: UserPreferences, 438 commenter_artist: Artist, 439 ): 440 + """test that comment owner can delete their comment and background task is scheduled.""" 441 comment = TrackComment( 442 track_id=test_track.id, 443 user_did="did:test:commenter123", ··· 451 comment_id = comment.id 452 453 with patch( 454 + "backend.api.tracks.comments.schedule_pds_delete_comment" 455 + ) as mock_schedule: 456 async with AsyncClient( 457 transport=ASGITransport(app=test_app), base_url="http://test" 458 ) as client: ··· 461 assert response.status_code == 200 462 assert response.json()["deleted"] is True 463 464 + # verify background task was scheduled with correct URI 465 + mock_schedule.assert_called_once() 466 + call_kwargs = mock_schedule.call_args.kwargs 467 + assert ( 468 + call_kwargs["comment_uri"] == "at://did:test:commenter123/fm.plyr.comment/del1" 469 + ) 470 + 471 + # verify DB entry is gone (deleted immediately, before PDS) 472 + result = await db_session.execute( 473 + select(TrackComment).where(TrackComment.id == comment_id) 474 + ) 475 + assert result.scalar_one_or_none() is None 476 + 477 + 478 + async def test_delete_comment_without_atproto_uri( 479 + test_app: FastAPI, 480 + db_session: AsyncSession, 481 + test_track: Track, 482 + artist_with_comments_enabled: UserPreferences, 483 + commenter_artist: Artist, 484 + ): 485 + """test that deleting a comment without ATProto URI doesn't schedule deletion.""" 486 + # comment without ATProto URI (e.g., background task hasn't run yet) 487 + comment = TrackComment( 488 + track_id=test_track.id, 489 + user_did="did:test:commenter123", 490 + text="to be deleted", 491 + timestamp_ms=5000, 492 + atproto_comment_uri=None, 493 + ) 494 + db_session.add(comment) 495 + await db_session.commit() 496 + await db_session.refresh(comment) 497 + comment_id = comment.id 498 + 499 + with patch( 500 + "backend.api.tracks.comments.schedule_pds_delete_comment" 501 + ) as mock_schedule: 502 + async with AsyncClient( 503 + transport=ASGITransport(app=test_app), base_url="http://test" 504 + ) as client: 505 + response = await client.delete(f"/tracks/comments/{comment_id}") 506 + 507 + assert response.status_code == 200 508 + assert response.json()["deleted"] is True 509 + 510 + # no PDS deletion should be scheduled since there's no ATProto record 511 + mock_schedule.assert_not_called() 512 + 513 + # verify DB entry is still gone 514 result = await db_session.execute( 515 select(TrackComment).where(TrackComment.id == comment_id) 516 )
+104 -98
backend/tests/api/test_track_likes.py
··· 1 """tests for track like api endpoints and error handling.""" 2 3 from collections.abc import Generator 4 - from unittest.mock import AsyncMock, patch 5 6 import pytest 7 from fastapi import FastAPI ··· 83 async def test_like_track_success( 84 test_app: FastAPI, db_session: AsyncSession, test_track: Track 85 ): 86 - """test successful track like creates ATProto record and DB entry.""" 87 - with patch("backend.api.tracks.likes.create_like_record") as mock_create: 88 - mock_create.return_value = "at://did:test:user123/fm.plyr.like/abc123" 89 - 90 async with AsyncClient( 91 transport=ASGITransport(app=test_app), base_url="http://test" 92 ) as client: ··· 95 assert response.status_code == 200 96 assert response.json()["liked"] is True 97 98 - # verify ATProto record was created 99 - mock_create.assert_called_once() 100 101 - # verify DB entry exists 102 result = await db_session.execute( 103 select(TrackLike).where( 104 TrackLike.track_id == test_track.id, ··· 107 ) 108 like = result.scalar_one_or_none() 109 assert like is not None 110 - assert like.atproto_like_uri == "at://did:test:user123/fm.plyr.like/abc123" 111 112 113 - async def test_like_track_cleanup_on_db_failure( 114 test_app: FastAPI, db_session: AsyncSession, test_track: Track 115 ): 116 - """test that ATProto record is cleaned up if DB commit fails.""" 117 - created_uri = "at://did:test:user123/fm.plyr.like/abc123" 118 - 119 - # create a mock session that fails on commit 120 - mock_db = AsyncMock(spec=AsyncSession) 121 - mock_db.execute = db_session.execute 122 - mock_db.add = db_session.add 123 - mock_db.delete = db_session.delete 124 - mock_db.flush = db_session.flush 125 - mock_db.refresh = db_session.refresh 126 - mock_db.commit = AsyncMock(side_effect=Exception("DB error")) 127 - 128 - async def mock_get_db(): 129 - yield mock_db 130 - 131 - from backend.models import get_db 132 - 133 - test_app.dependency_overrides[get_db] = mock_get_db 134 - 135 - with ( 136 - patch("backend.api.tracks.likes.create_like_record") as mock_create, 137 - patch("backend.api.tracks.likes.delete_record_by_uri") as mock_delete, 138 - ): 139 - mock_create.return_value = created_uri 140 - 141 async with AsyncClient( 142 transport=ASGITransport(app=test_app), base_url="http://test" 143 ) as client: 144 - response = await client.post(f"/tracks/{test_track.id}/like") 145 146 - # should return 500 error 147 - assert response.status_code == 500 148 - assert "failed to like track" in response.json()["detail"] 149 150 - # verify cleanup was attempted 151 - mock_delete.assert_called_once() 152 - call_args = mock_delete.call_args 153 - assert call_args.kwargs["record_uri"] == created_uri 154 - 155 - # cleanup override 156 - del test_app.dependency_overrides[get_db] 157 158 159 async def test_unlike_track_success( 160 test_app: FastAPI, db_session: AsyncSession, test_track: Track 161 ): 162 - """test successful track unlike deletes ATProto record and DB entry.""" 163 # create existing like 164 like = TrackLike( 165 track_id=test_track.id, ··· 169 db_session.add(like) 170 await db_session.commit() 171 172 - with patch("backend.api.tracks.likes.delete_record_by_uri") as mock_delete: 173 async with AsyncClient( 174 transport=ASGITransport(app=test_app), base_url="http://test" 175 ) as client: ··· 178 assert response.status_code == 200 179 assert response.json()["liked"] is False 180 181 - # verify ATProto record was deleted 182 - mock_delete.assert_called_once() 183 184 - # verify DB entry is gone 185 result = await db_session.execute( 186 select(TrackLike).where( 187 TrackLike.track_id == test_track.id, ··· 191 assert result.scalar_one_or_none() is None 192 193 194 - async def test_unlike_track_rollback_on_db_failure( 195 test_app: FastAPI, db_session: AsyncSession, test_track: Track 196 ): 197 - """test that ATProto record is recreated if DB commit fails during unlike.""" 198 - like_uri = "at://did:test:user123/fm.plyr.like/abc123" 199 - 200 - # create existing like 201 like = TrackLike( 202 track_id=test_track.id, 203 user_did="did:test:user123", 204 - atproto_like_uri=like_uri, 205 ) 206 db_session.add(like) 207 await db_session.commit() 208 209 - # create a mock session that fails on commit 210 - mock_db = AsyncMock(spec=AsyncSession) 211 - mock_db.execute = db_session.execute 212 - mock_db.add = db_session.add 213 - mock_db.delete = db_session.delete 214 - mock_db.flush = db_session.flush 215 - mock_db.refresh = db_session.refresh 216 - mock_db.commit = AsyncMock(side_effect=Exception("DB error")) 217 - 218 - async def mock_get_db(): 219 - yield mock_db 220 - 221 - from backend.models import get_db 222 - 223 - test_app.dependency_overrides[get_db] = mock_get_db 224 - 225 - with ( 226 - patch("backend.api.tracks.likes.delete_record_by_uri") as mock_delete, 227 - patch("backend.api.tracks.likes.create_like_record") as mock_create, 228 - ): 229 - mock_create.return_value = "at://did:test:user123/fm.plyr.like/new123" 230 - 231 async with AsyncClient( 232 transport=ASGITransport(app=test_app), base_url="http://test" 233 ) as client: 234 response = await client.delete(f"/tracks/{test_track.id}/like") 235 236 - # should return 500 error 237 - assert response.status_code == 500 238 - assert "failed to unlike track" in response.json()["detail"] 239 240 - # verify ATProto record was deleted 241 - mock_delete.assert_called_once() 242 243 - # verify rollback was attempted (recreate the like) 244 - mock_create.assert_called_once() 245 - call_args = mock_create.call_args 246 - assert call_args.kwargs["subject_uri"] == test_track.atproto_record_uri 247 - assert call_args.kwargs["subject_cid"] == test_track.atproto_record_cid 248 - 249 - # cleanup override 250 - del test_app.dependency_overrides[get_db] 251 252 253 async def test_like_already_liked_track_idempotent( ··· 263 db_session.add(like) 264 await db_session.commit() 265 266 - with patch("backend.api.tracks.likes.create_like_record") as mock_create: 267 async with AsyncClient( 268 transport=ASGITransport(app=test_app), base_url="http://test" 269 ) as client: ··· 272 assert response.status_code == 200 273 assert response.json()["liked"] is True 274 275 - # verify no new ATProto record was created 276 - mock_create.assert_not_called() 277 278 279 async def test_unlike_not_liked_track_idempotent( 280 test_app: FastAPI, db_session: AsyncSession, test_track: Track 281 ): 282 """test that unliking a not-liked track is idempotent.""" 283 - with patch("backend.api.tracks.likes.delete_record_by_uri") as mock_delete: 284 async with AsyncClient( 285 transport=ASGITransport(app=test_app), base_url="http://test" 286 ) as client: ··· 289 assert response.status_code == 200 290 assert response.json()["liked"] is False 291 292 - # verify no ATProto record deletion was attempted 293 - mock_delete.assert_not_called()
··· 1 """tests for track like api endpoints and error handling.""" 2 3 from collections.abc import Generator 4 + from unittest.mock import patch 5 6 import pytest 7 from fastapi import FastAPI ··· 83 async def test_like_track_success( 84 test_app: FastAPI, db_session: AsyncSession, test_track: Track 85 ): 86 + """test successful track like creates DB entry and schedules PDS record creation.""" 87 + with patch("backend.api.tracks.likes.schedule_pds_create_like") as mock_schedule: 88 async with AsyncClient( 89 transport=ASGITransport(app=test_app), base_url="http://test" 90 ) as client: ··· 93 assert response.status_code == 200 94 assert response.json()["liked"] is True 95 96 + # verify background task was scheduled 97 + mock_schedule.assert_called_once() 98 + call_kwargs = mock_schedule.call_args.kwargs 99 + assert call_kwargs["subject_uri"] == test_track.atproto_record_uri 100 + assert call_kwargs["subject_cid"] == test_track.atproto_record_cid 101 102 + # verify DB entry exists (created immediately, before PDS) 103 result = await db_session.execute( 104 select(TrackLike).where( 105 TrackLike.track_id == test_track.id, ··· 108 ) 109 like = result.scalar_one_or_none() 110 assert like is not None 111 + # atproto_like_uri is None initially - will be set by background task 112 + assert like.atproto_like_uri is None 113 114 115 + async def test_like_track_db_entry_has_correct_like_id( 116 test_app: FastAPI, db_session: AsyncSession, test_track: Track 117 ): 118 + """test that the like_id passed to background task matches the DB record.""" 119 + with patch("backend.api.tracks.likes.schedule_pds_create_like") as mock_schedule: 120 async with AsyncClient( 121 transport=ASGITransport(app=test_app), base_url="http://test" 122 ) as client: 123 + await client.post(f"/tracks/{test_track.id}/like") 124 125 + # get the like_id from the scheduled call 126 + call_kwargs = mock_schedule.call_args.kwargs 127 + scheduled_like_id = call_kwargs["like_id"] 128 129 + # verify it matches the DB record 130 + result = await db_session.execute( 131 + select(TrackLike).where( 132 + TrackLike.track_id == test_track.id, 133 + TrackLike.user_did == "did:test:user123", 134 + ) 135 + ) 136 + like = result.scalar_one() 137 + assert like.id == scheduled_like_id 138 139 140 async def test_unlike_track_success( 141 test_app: FastAPI, db_session: AsyncSession, test_track: Track 142 ): 143 + """test successful track unlike removes DB entry and schedules PDS record deletion.""" 144 # create existing like 145 like = TrackLike( 146 track_id=test_track.id, ··· 150 db_session.add(like) 151 await db_session.commit() 152 153 + with patch("backend.api.tracks.likes.schedule_pds_delete_like") as mock_schedule: 154 async with AsyncClient( 155 transport=ASGITransport(app=test_app), base_url="http://test" 156 ) as client: ··· 159 assert response.status_code == 200 160 assert response.json()["liked"] is False 161 162 + # verify background task was scheduled with correct URI 163 + mock_schedule.assert_called_once() 164 + call_kwargs = mock_schedule.call_args.kwargs 165 + assert call_kwargs["like_uri"] == "at://did:test:user123/fm.plyr.like/abc123" 166 167 + # verify DB entry is gone (deleted immediately, before PDS) 168 result = await db_session.execute( 169 select(TrackLike).where( 170 TrackLike.track_id == test_track.id, ··· 174 assert result.scalar_one_or_none() is None 175 176 177 + async def test_unlike_track_without_atproto_uri( 178 test_app: FastAPI, db_session: AsyncSession, test_track: Track 179 ): 180 + """test that unliking a track without ATProto URI doesn't schedule deletion.""" 181 + # create like without ATProto URI (e.g., background task hasn't run yet) 182 like = TrackLike( 183 track_id=test_track.id, 184 user_did="did:test:user123", 185 + atproto_like_uri=None, 186 ) 187 db_session.add(like) 188 await db_session.commit() 189 190 + with patch("backend.api.tracks.likes.schedule_pds_delete_like") as mock_schedule: 191 async with AsyncClient( 192 transport=ASGITransport(app=test_app), base_url="http://test" 193 ) as client: 194 response = await client.delete(f"/tracks/{test_track.id}/like") 195 196 + assert response.status_code == 200 197 + assert response.json()["liked"] is False 198 199 + # no PDS deletion should be scheduled since there's no ATProto record 200 + mock_schedule.assert_not_called() 201 202 + # verify DB entry is still gone 203 + result = await db_session.execute( 204 + select(TrackLike).where( 205 + TrackLike.track_id == test_track.id, 206 + TrackLike.user_did == "did:test:user123", 207 + ) 208 + ) 209 + assert result.scalar_one_or_none() is None 210 211 212 async def test_like_already_liked_track_idempotent( ··· 222 db_session.add(like) 223 await db_session.commit() 224 225 + with patch("backend.api.tracks.likes.schedule_pds_create_like") as mock_schedule: 226 async with AsyncClient( 227 transport=ASGITransport(app=test_app), base_url="http://test" 228 ) as client: ··· 231 assert response.status_code == 200 232 assert response.json()["liked"] is True 233 234 + # verify no new background task was scheduled 235 + mock_schedule.assert_not_called() 236 237 238 async def test_unlike_not_liked_track_idempotent( 239 test_app: FastAPI, db_session: AsyncSession, test_track: Track 240 ): 241 """test that unliking a not-liked track is idempotent.""" 242 + with patch("backend.api.tracks.likes.schedule_pds_delete_like") as mock_schedule: 243 async with AsyncClient( 244 transport=ASGITransport(app=test_app), base_url="http://test" 245 ) as client: ··· 248 assert response.status_code == 200 249 assert response.json()["liked"] is False 250 251 + # verify no background task was scheduled 252 + mock_schedule.assert_not_called() 253 + 254 + 255 + async def test_like_track_missing_atproto_record( 256 + test_app: FastAPI, db_session: AsyncSession 257 + ): 258 + """test that liking a track without ATProto record returns 422.""" 259 + # create artist 260 + artist = Artist( 261 + did="did:plc:artist456", 262 + handle="artist2.bsky.social", 263 + display_name="Test Artist 2", 264 + ) 265 + db_session.add(artist) 266 + await db_session.flush() 267 + 268 + # create track WITHOUT ATProto record 269 + track = Track( 270 + title="No ATProto Track", 271 + artist_did=artist.did, 272 + file_id="noatproto123", 273 + file_type="mp3", 274 + atproto_record_uri=None, 275 + atproto_record_cid=None, 276 + ) 277 + db_session.add(track) 278 + await db_session.commit() 279 + await db_session.refresh(track) 280 + 281 + async with AsyncClient( 282 + transport=ASGITransport(app=test_app), base_url="http://test" 283 + ) as client: 284 + response = await client.post(f"/tracks/{track.id}/like") 285 + 286 + assert response.status_code == 422 287 + detail = response.json()["detail"] 288 + assert detail["error"] == "missing_atproto_record" 289 + 290 + 291 + async def test_like_nonexistent_track(test_app: FastAPI): 292 + """test that liking a nonexistent track returns 404.""" 293 + async with AsyncClient( 294 + transport=ASGITransport(app=test_app), base_url="http://test" 295 + ) as client: 296 + response = await client.post("/tracks/99999/like") 297 + 298 + assert response.status_code == 404 299 + assert response.json()["detail"] == "track not found"