refactor: moderation cleanup (#541, #542, #543) (#617)

* refactor: moderation cleanup (#541, #542, #543)

this PR consolidates moderation architecture across three related issues:

**#541: extract ModerationClient class**
- new `moderation_client.py` with centralized client for all moderation service calls
- replaces scattered httpx.AsyncClient instantiation with singleton pattern
- consistent timeout, auth, and caching behavior

**#542: move lazy resolution to background task**
- add `sync_copyright_resolutions()` background task
- removes lazy reconciliation from read paths
- runs periodically to sync labeler negation status

**#543: simplify get_copyright_info to pure read**
- remove write-on-read pattern from aggregations
- `is_flagged` is now the source of truth (synced by background task)
- labeler remains authoritative; we just don't query it on every read

**breaking: removes resolution columns from copyright_scans**
- `resolution`, `reviewed_at`, `reviewed_by`, `review_notes` removed
- labeler is now the single source of truth for resolution status
- migration included to drop columns

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat: add research/plan/implement workflow commands

inspired by HumanLayer patterns, adds a three-phase workflow for complex tasks:

- `/research [topic]` - deep dive on a topic, persist to docs/research/
- `/plan [issue or description]` - create implementation plan, persist to docs/plans/
- `/implement [plan path]` - execute a plan phase by phase

also improves `/consider-review` to properly fetch and process PR feedback.

the workflow encourages:
- exploring before implementing
- persisting knowledge for future reference
- no open questions in plans
- systematic verification during implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

authored by zzstoatzz.io Claude Opus 4.5 and committed by GitHub babe9121 711b4ab7

+36 -5
.claude/commands/consider-review.md
··· 1 - Check the current PR for comments, reviews, AND inline review comments via the gh cli. 1 + --- 2 + description: Review PR feedback and address comments 3 + argument-hint: [PR number, optional] 4 + --- 5 + 6 + # consider review 7 + 8 + check PR feedback and address it. 9 + 10 + ## process 11 + 12 + 1. **find the PR** 13 + - if number provided: use it 14 + - otherwise: `gh pr view --json number,title,url` 15 + 16 + 2. **gather all feedback** in parallel: 17 + ```bash 18 + # top-level review comments 19 + gh pr view NNN --comments 20 + 21 + # inline code review comments 22 + gh api repos/zzstoatzz/plyr.fm/pulls/NNN/comments --jq '.[] | {path: .path, line: .line, body: .body, author: .user.login}' 2 23 3 - For example, to get the review comments for PR #246: 24 + # review status 25 + gh pr view NNN --json reviews --jq '.reviews[] | {author: .author.login, state: .state, body: .body}' 26 + ``` 4 27 5 - ```bash 6 - gh api repos/zzstoatzz/plyr.fm/pulls/246/comments --jq '.[] | {path: .path, line: .line, body: .body}' 7 - ``` 28 + 3. **summarize feedback**: 29 + - blocking issues (changes requested) 30 + - suggestions (nice to have) 31 + - questions needing response 32 + 33 + 4. **for each item**: 34 + - if code change needed: make the fix 35 + - if clarification needed: draft a response 36 + - if disagreement: explain your reasoning 37 + 38 + 5. **report** what you addressed and what needs discussion
+54
.claude/commands/implement.md
··· 1 + --- 2 + description: Execute an implementation plan phase by phase 3 + argument-hint: [path to plan in docs/plans/] 4 + --- 5 + 6 + # implement 7 + 8 + execute a plan systematically, phase by phase. 9 + 10 + ## process 11 + 12 + 1. **read the plan**: $ARGUMENTS 13 + - read it fully 14 + - check for existing checkmarks (prior progress) 15 + - read all files mentioned in the plan 16 + 17 + 2. **pick up from first unchecked item** 18 + - if resuming, trust that completed work is done 19 + - start with the next pending phase 20 + 21 + 3. **implement each phase**: 22 + - make the changes described 23 + - run the success criteria checks 24 + - fix any issues before proceeding 25 + - check off completed items in the plan file 26 + 27 + 4. **pause for verification** after each phase: 28 + ``` 29 + phase N complete. 30 + 31 + automated checks passed: 32 + - [list what passed] 33 + 34 + ready for manual verification: 35 + - [list manual checks from plan] 36 + 37 + continue to phase N+1? 38 + ``` 39 + 40 + 5. **continue or stop** based on user feedback 41 + 42 + ## guidelines 43 + 44 + - follow the plan's intent, adapt to what you find 45 + - if something doesn't match the plan, stop and explain: 46 + ``` 47 + issue in phase N: 48 + expected: [what plan says] 49 + found: [actual situation] 50 + how should I proceed? 51 + ``` 52 + - run `just backend test` and `just backend lint` frequently 53 + - commit after each phase if changes are substantial 54 + - update the plan file checkboxes as you complete items
+87
.claude/commands/plan.md
··· 1 + --- 2 + description: Create an implementation plan before coding 3 + argument-hint: [issue number, description, or path to research doc] 4 + --- 5 + 6 + # plan 7 + 8 + think before coding. create an implementation plan and get alignment. 9 + 10 + ## process 11 + 12 + 1. **understand the task**: $ARGUMENTS 13 + - if issue number given, fetch it with `gh issue view` 14 + - if research doc referenced, read it fully 15 + - read any related code 16 + 17 + 2. **research if needed** - if you don't understand the problem space: 18 + - spawn sub-tasks to explore the codebase 19 + - find similar patterns we can follow 20 + - identify integration points and constraints 21 + 22 + 3. **propose approach** - present to user: 23 + ``` 24 + based on [context], I understand we need to [goal]. 25 + 26 + current state: 27 + - [what exists now] 28 + 29 + proposed approach: 30 + - [high-level strategy] 31 + 32 + questions: 33 + - [anything unclear] 34 + ``` 35 + 36 + 4. **resolve all questions** - don't proceed with open questions 37 + 38 + 5. **write the plan** to `docs/plans/YYYY-MM-DD-description.md`: 39 + 40 + ```markdown 41 + # plan: [feature/task name] 42 + 43 + **date**: YYYY-MM-DD 44 + **issue**: #NNN (if applicable) 45 + 46 + ## goal 47 + 48 + [what we're trying to accomplish] 49 + 50 + ## current state 51 + 52 + [what exists now, constraints discovered] 53 + 54 + ## not doing 55 + 56 + [explicitly out of scope] 57 + 58 + ## phases 59 + 60 + ### phase 1: [name] 61 + 62 + **changes**: 63 + - `path/to/file.py` - [what to change] 64 + - `another/file.ts` - [what to change] 65 + 66 + **success criteria**: 67 + - [ ] tests pass: `just backend test` 68 + - [ ] [specific behavior to verify] 69 + 70 + ### phase 2: [name] 71 + ... 72 + 73 + ## testing 74 + 75 + - [key scenarios to test] 76 + - [edge cases] 77 + ``` 78 + 79 + 6. **ask for confirmation** before finalizing 80 + 81 + ## guidelines 82 + 83 + - no open questions in the final plan - resolve everything first 84 + - keep phases small and testable 85 + - include specific file paths 86 + - success criteria should be verifiable 87 + - if the task is small, skip the formal plan and just do it
+63
.claude/commands/research.md
··· 1 + --- 2 + description: Research a topic thoroughly and persist findings 3 + argument-hint: [topic or question to research] 4 + --- 5 + 6 + # research 7 + 8 + deep dive on a topic, persist findings to `docs/research/`. 9 + 10 + ## process 11 + 12 + 1. **understand the question**: $ARGUMENTS 13 + 14 + 2. **gather context** - spawn sub-tasks in parallel to: 15 + - grep for relevant keywords 16 + - find related files and directories 17 + - read key implementation files 18 + - check git history if relevant 19 + 20 + 3. **synthesize findings** - after sub-tasks complete: 21 + - summarize what you learned 22 + - include file:line references for key discoveries 23 + - note any open questions or uncertainties 24 + 25 + 4. **persist to docs/research/** - write findings to `docs/research/YYYY-MM-DD-topic.md`: 26 + 27 + ```markdown 28 + # research: [topic] 29 + 30 + **date**: YYYY-MM-DD 31 + **question**: [the original question] 32 + 33 + ## summary 34 + 35 + [2-3 sentences on what you found] 36 + 37 + ## findings 38 + 39 + ### [area 1] 40 + - finding with reference (`file.py:123`) 41 + - another finding 42 + 43 + ### [area 2] 44 + ... 45 + 46 + ## code references 47 + 48 + - `path/to/file.py:45` - description 49 + - `another/file.ts:12-30` - description 50 + 51 + ## open questions 52 + 53 + - [anything unresolved] 54 + ``` 55 + 56 + 5. **present summary** to the user with key takeaways 57 + 58 + ## guidelines 59 + 60 + - spawn sub-tasks for broad searches, read files yourself for focused analysis 61 + - always include file:line references - make findings actionable 62 + - be honest about what you don't know 63 + - keep the output concise - this is a working document, not a thesis
+69 -1
backend/src/backend/_internal/background_tasks.py
··· 27 27 ) 28 28 from backend._internal.auth import get_session 29 29 from backend._internal.background import get_docket 30 - from backend.models import TrackComment, TrackLike 30 + from backend.models import CopyrightScan, Track, TrackComment, TrackLike 31 31 from backend.utilities.database import db_session 32 32 33 33 logger = logging.getLogger(__name__) ··· 50 50 docket = get_docket() 51 51 await docket.add(scan_copyright)(track_id, audio_url) 52 52 logfire.info("scheduled copyright scan", track_id=track_id) 53 + 54 + 55 + async def sync_copyright_resolutions() -> None: 56 + """sync resolution status from labeler to backend database. 57 + 58 + finds tracks that are flagged but have no resolution, checks the labeler 59 + to see if the labels were negated (dismissed), and marks them as resolved. 60 + 61 + this replaces the lazy reconciliation that was happening on read paths. 62 + should be scheduled periodically (e.g., every 5 minutes). 63 + """ 64 + from backend._internal.moderation_client import get_moderation_client 65 + 66 + async with db_session() as db: 67 + # find flagged scans with AT URIs that haven't been resolved 68 + result = await db.execute( 69 + select(CopyrightScan, Track.atproto_record_uri) 70 + .join(Track, CopyrightScan.track_id == Track.id) 71 + .where( 72 + CopyrightScan.is_flagged == True, # noqa: E712 73 + Track.atproto_record_uri.isnot(None), 74 + ) 75 + ) 76 + rows = result.all() 77 + 78 + if not rows: 79 + logfire.debug("sync_copyright_resolutions: no flagged scans to check") 80 + return 81 + 82 + # batch check with labeler 83 + scan_by_uri: dict[str, CopyrightScan] = {} 84 + for scan, uri in rows: 85 + if uri: 86 + scan_by_uri[uri] = scan 87 + 88 + if not scan_by_uri: 89 + return 90 + 91 + client = get_moderation_client() 92 + active_uris = await client.get_active_labels(list(scan_by_uri.keys())) 93 + 94 + # find scans that are no longer active (label was negated) 95 + resolved_count = 0 96 + for uri, scan in scan_by_uri.items(): 97 + if uri not in active_uris: 98 + # label was negated - track is no longer flagged 99 + scan.is_flagged = False 100 + resolved_count += 1 101 + 102 + if resolved_count > 0: 103 + await db.commit() 104 + logfire.info( 105 + "sync_copyright_resolutions: resolved {count} scans", 106 + count=resolved_count, 107 + ) 108 + else: 109 + logfire.debug( 110 + "sync_copyright_resolutions: checked {count} scans, none resolved", 111 + count=len(scan_by_uri), 112 + ) 113 + 114 + 115 + async def schedule_copyright_resolution_sync() -> None: 116 + """schedule a copyright resolution sync via docket.""" 117 + docket = get_docket() 118 + await docket.add(sync_copyright_resolutions)() 119 + logfire.info("scheduled copyright resolution sync") 53 120 54 121 55 122 async def process_export(export_id: str, artist_did: str) -> None: ··· 798 865 # collection of all background task functions for docket registration 799 866 background_tasks = [ 800 867 scan_copyright, 868 + sync_copyright_resolutions, 801 869 process_export, 802 870 sync_atproto, 803 871 scrobble_to_teal,
+53 -234
backend/src/backend/_internal/moderation.py
··· 1 - """moderation service client for copyright scanning.""" 1 + """moderation service integration for copyright scanning.""" 2 2 3 3 import logging 4 4 from typing import Any 5 5 6 - import httpx 7 6 import logfire 8 7 from sqlalchemy import select 8 + from sqlalchemy.orm import joinedload 9 9 10 + from backend._internal.moderation_client import get_moderation_client 10 11 from backend.config import settings 11 12 from backend.models import CopyrightScan, Track 12 13 from backend.utilities.database import db_session 13 - from backend.utilities.redis import get_async_redis_client 14 14 15 15 logger = logging.getLogger(__name__) 16 16 ··· 42 42 audio_url=audio_url, 43 43 ): 44 44 try: 45 - result = await _call_moderation_service(audio_url) 45 + client = get_moderation_client() 46 + result = await client.scan(audio_url) 46 47 await _store_scan_result(track_id, result) 47 48 except Exception as e: 48 49 logger.warning( ··· 50 51 track_id, 51 52 e, 52 53 ) 53 - # store as "clear" with error info so track doesn't stay unscanned 54 - # this handles cases like: audio too short, unreadable format, etc. 55 54 await _store_scan_error(track_id, str(e)) 56 - # don't re-raise - this is fire-and-forget 57 55 58 56 59 - async def _call_moderation_service(audio_url: str) -> dict[str, Any]: 60 - """call the moderation service /scan endpoint. 61 - 62 - args: 63 - audio_url: public URL of the audio file 64 - 65 - returns: 66 - scan result from moderation service 67 - 68 - raises: 69 - httpx.HTTPStatusError: on non-2xx response 70 - httpx.TimeoutException: on timeout 71 - """ 72 - async with httpx.AsyncClient( 73 - timeout=httpx.Timeout(settings.moderation.timeout_seconds) 74 - ) as client: 75 - response = await client.post( 76 - f"{settings.moderation.service_url}/scan", 77 - json={"audio_url": audio_url}, 78 - headers={"X-Moderation-Key": settings.moderation.auth_token}, 79 - ) 80 - response.raise_for_status() 81 - return response.json() 82 - 83 - 84 - async def _store_scan_result(track_id: int, result: dict[str, Any]) -> None: 57 + async def _store_scan_result(track_id: int, result: Any) -> None: 85 58 """store scan result in the database. 86 59 87 60 args: 88 61 track_id: database ID of the track 89 - result: scan result from moderation service 62 + result: ScanResult from moderation client 90 63 """ 91 - from sqlalchemy.orm import joinedload 92 - 93 64 async with db_session() as db: 94 - is_flagged = result.get("is_flagged", False) 95 - 96 65 scan = CopyrightScan( 97 66 track_id=track_id, 98 - is_flagged=is_flagged, 99 - highest_score=result.get("highest_score", 0), 100 - matches=result.get("matches", []), 101 - raw_response=result.get("raw_response", {}), 67 + is_flagged=result.is_flagged, 68 + highest_score=result.highest_score, 69 + matches=result.matches, 70 + raw_response=result.raw_response, 102 71 ) 103 72 db.add(scan) 104 73 await db.commit() ··· 112 81 ) 113 82 114 83 # emit ATProto label if flagged 115 - if is_flagged: 84 + if result.is_flagged: 116 85 track = await db.scalar( 117 86 select(Track) 118 87 .options(joinedload(Track.artist)) ··· 141 110 highest_score: float | None = None, 142 111 matches: list[dict[str, Any]] | None = None, 143 112 ) -> None: 144 - """emit a copyright-violation label to the ATProto labeler service. 145 - 146 - this is fire-and-forget - failures are logged but don't affect the scan result. 147 - 148 - args: 149 - uri: AT URI of the track record 150 - cid: optional CID of the record 151 - track_id: database ID of the track (for admin UI links) 152 - track_title: title of the track (for admin UI context) 153 - artist_handle: handle of the artist (for admin UI context) 154 - artist_did: DID of the artist (for admin UI context) 155 - highest_score: highest match score (for admin UI context) 156 - matches: list of copyright matches (for admin UI context) 157 - """ 158 - try: 159 - # build context for admin UI display 160 - context: dict[str, Any] | None = None 161 - if track_id or track_title or artist_handle or matches: 162 - context = { 163 - "track_id": track_id, 164 - "track_title": track_title, 165 - "artist_handle": artist_handle, 166 - "artist_did": artist_did, 167 - "highest_score": highest_score, 168 - "matches": matches, 169 - } 170 - 171 - payload: dict[str, Any] = { 172 - "uri": uri, 173 - "val": "copyright-violation", 174 - "cid": cid, 113 + """emit a copyright-violation label to the ATProto labeler service.""" 114 + context: dict[str, Any] | None = None 115 + if track_id or track_title or artist_handle or matches: 116 + context = { 117 + "track_id": track_id, 118 + "track_title": track_title, 119 + "artist_handle": artist_handle, 120 + "artist_did": artist_did, 121 + "highest_score": highest_score, 122 + "matches": matches, 175 123 } 176 - if context: 177 - payload["context"] = context 178 124 179 - async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client: 180 - response = await client.post( 181 - f"{settings.moderation.labeler_url}/emit-label", 182 - json=payload, 183 - headers={"X-Moderation-Key": settings.moderation.auth_token}, 184 - ) 185 - response.raise_for_status() 186 - 187 - # invalidate cache since label status changed 188 - await invalidate_label_cache(uri) 125 + client = get_moderation_client() 126 + await client.emit_label(uri=uri, cid=cid, context=context) 189 127 190 - logfire.info( 191 - "copyright label emitted", 192 - uri=uri, 193 - cid=cid, 194 - ) 195 - except Exception as e: 196 - logger.warning("failed to emit copyright label for %s: %s", uri, e) 197 128 129 + async def _store_scan_error(track_id: int, error: str) -> None: 130 + """store a scan error as a clear result.""" 131 + async with db_session() as db: 132 + scan = CopyrightScan( 133 + track_id=track_id, 134 + is_flagged=False, 135 + highest_score=0, 136 + matches=[], 137 + raw_response={"error": error, "status": "scan_failed"}, 138 + ) 139 + db.add(scan) 140 + await db.commit() 198 141 199 - async def get_active_copyright_labels(uris: list[str]) -> set[str]: 200 - """check which URIs have active (non-negated) copyright-violation labels. 142 + logfire.info( 143 + "copyright scan error stored as clear", 144 + track_id=track_id, 145 + error=error, 146 + ) 201 147 202 - uses redis cache (shared across instances) to avoid repeated calls 203 - to the moderation service. only URIs not in cache are fetched. 204 148 205 - args: 206 - uris: list of AT URIs to check 149 + # re-export for backwards compatibility 150 + async def get_active_copyright_labels(uris: list[str]) -> set[str]: 151 + """check which URIs have active copyright-violation labels. 207 152 208 - returns: 209 - set of URIs that are still actively flagged 210 - 211 - note: 212 - fails closed (returns all URIs as active) if moderation service is unreachable 213 - to avoid accidentally hiding real violations. 153 + this is a convenience wrapper around the moderation client. 214 154 """ 215 - if not uris: 216 - return set() 217 - 218 155 if not settings.moderation.enabled: 219 156 logger.debug("moderation disabled, treating all as active") 220 157 return set(uris) ··· 223 160 logger.warning("MODERATION_AUTH_TOKEN not set, treating all as active") 224 161 return set(uris) 225 162 226 - # check redis cache first - partition into cached vs uncached 227 - active_from_cache: set[str] = set() 228 - uris_to_fetch: list[str] = [] 229 - 230 - try: 231 - redis = get_async_redis_client() 232 - prefix = settings.moderation.label_cache_prefix 233 - cache_keys = [f"{prefix}{uri}" for uri in uris] 234 - cached_values = await redis.mget(cache_keys) 235 - 236 - for uri, cached_value in zip(uris, cached_values, strict=True): 237 - if cached_value is not None: 238 - if cached_value == "1": 239 - active_from_cache.add(uri) 240 - # else: cached as "0" (not active), skip 241 - else: 242 - uris_to_fetch.append(uri) 243 - except Exception as e: 244 - # redis unavailable - fall through to fetch all 245 - logger.warning("redis cache unavailable: %s", e) 246 - uris_to_fetch = list(uris) 247 - 248 - # if everything was cached, return early 249 - if not uris_to_fetch: 250 - logfire.debug( 251 - "checked active copyright labels (all cached)", 252 - total_uris=len(uris), 253 - active_count=len(active_from_cache), 254 - ) 255 - return active_from_cache 256 - 257 - # fetch uncached URIs from moderation service 258 - try: 259 - async with httpx.AsyncClient( 260 - timeout=httpx.Timeout(settings.moderation.timeout_seconds) 261 - ) as client: 262 - response = await client.post( 263 - f"{settings.moderation.labeler_url}/admin/active-labels", 264 - json={"uris": uris_to_fetch}, 265 - headers={"X-Moderation-Key": settings.moderation.auth_token}, 266 - ) 267 - response.raise_for_status() 268 - data = response.json() 269 - active_from_service = set(data.get("active_uris", [])) 270 - 271 - # update redis cache with results 272 - try: 273 - redis = get_async_redis_client() 274 - prefix = settings.moderation.label_cache_prefix 275 - ttl = settings.moderation.label_cache_ttl_seconds 276 - async with redis.pipeline() as pipe: 277 - for uri in uris_to_fetch: 278 - cache_key = f"{prefix}{uri}" 279 - value = "1" if uri in active_from_service else "0" 280 - await pipe.set(cache_key, value, ex=ttl) 281 - await pipe.execute() 282 - except Exception as e: 283 - # cache update failed - not critical, just log 284 - logger.warning("failed to update redis cache: %s", e) 285 - 286 - logfire.info( 287 - "checked active copyright labels", 288 - total_uris=len(uris), 289 - cached_count=len(uris) - len(uris_to_fetch), 290 - fetched_count=len(uris_to_fetch), 291 - active_count=len(active_from_cache) + len(active_from_service), 292 - ) 293 - return active_from_cache | active_from_service 294 - 295 - except Exception as e: 296 - # fail closed: if we can't confirm resolution, treat as active 297 - # don't cache failures - we want to retry next time 298 - logger.warning("failed to check active labels, treating all as active: %s", e) 299 - return set(uris) 163 + client = get_moderation_client() 164 + return await client.get_active_labels(uris) 300 165 301 166 302 167 async def invalidate_label_cache(uri: str) -> None: 303 - """invalidate cache entry for a URI when its label status changes. 304 - 305 - call this when emitting or negating labels to ensure fresh data. 306 - """ 307 - try: 308 - redis = get_async_redis_client() 309 - prefix = settings.moderation.label_cache_prefix 310 - await redis.delete(f"{prefix}{uri}") 311 - except Exception as e: 312 - logger.warning("failed to invalidate label cache for %s: %s", uri, e) 168 + """invalidate cache entry for a URI.""" 169 + client = get_moderation_client() 170 + await client.invalidate_cache(uri) 313 171 314 172 315 173 async def clear_label_cache() -> None: 316 - """clear all label cache entries. primarily for testing.""" 317 - try: 318 - redis = get_async_redis_client() 319 - prefix = settings.moderation.label_cache_prefix 320 - # scan and delete all keys with our prefix 321 - cursor = 0 322 - while True: 323 - cursor, keys = await redis.scan(cursor, match=f"{prefix}*", count=100) 324 - if keys: 325 - await redis.delete(*keys) 326 - if cursor == 0: 327 - break 328 - except Exception as e: 329 - logger.warning("failed to clear label cache: %s", e) 330 - 331 - 332 - async def _store_scan_error(track_id: int, error: str) -> None: 333 - """store a scan error as a clear result. 334 - 335 - when the moderation service can't process a file (too short, bad format, etc.), 336 - we still want to record that we tried so the track isn't stuck in limbo. 337 - 338 - args: 339 - track_id: database ID of the track 340 - error: error message from the failed scan 341 - """ 342 - async with db_session() as db: 343 - scan = CopyrightScan( 344 - track_id=track_id, 345 - is_flagged=False, 346 - highest_score=0, 347 - matches=[], 348 - raw_response={"error": error, "status": "scan_failed"}, 349 - ) 350 - db.add(scan) 351 - await db.commit() 352 - 353 - logfire.info( 354 - "copyright scan error stored as clear", 355 - track_id=track_id, 356 - error=error, 357 - ) 174 + """clear all label cache entries.""" 175 + client = get_moderation_client() 176 + await client.clear_cache()
+281
backend/src/backend/_internal/moderation_client.py
··· 1 + """moderation service client. 2 + 3 + centralized client for all moderation service interactions. 4 + replaces scattered httpx calls with a single, testable interface. 5 + """ 6 + 7 + import logging 8 + from dataclasses import dataclass 9 + from typing import Any 10 + 11 + import httpx 12 + import logfire 13 + 14 + from backend.config import settings 15 + from backend.utilities.redis import get_async_redis_client 16 + 17 + logger = logging.getLogger(__name__) 18 + 19 + 20 + @dataclass 21 + class ScanResult: 22 + """result from a copyright scan.""" 23 + 24 + is_flagged: bool 25 + highest_score: int 26 + matches: list[dict[str, Any]] 27 + raw_response: dict[str, Any] 28 + 29 + 30 + @dataclass 31 + class EmitLabelResult: 32 + """result from emitting a label.""" 33 + 34 + success: bool 35 + error: str | None = None 36 + 37 + 38 + class ModerationClient: 39 + """client for the plyr.fm moderation service. 40 + 41 + provides a clean interface for: 42 + - scanning audio for copyright matches 43 + - emitting ATProto labels 44 + - checking active labels 45 + - caching label status in redis 46 + 47 + usage: 48 + client = ModerationClient.from_settings() 49 + result = await client.scan(audio_url) 50 + """ 51 + 52 + def __init__( 53 + self, 54 + service_url: str, 55 + labeler_url: str, 56 + auth_token: str, 57 + timeout_seconds: float, 58 + label_cache_prefix: str, 59 + label_cache_ttl_seconds: int, 60 + ) -> None: 61 + self.service_url = service_url 62 + self.labeler_url = labeler_url 63 + self.auth_token = auth_token 64 + self.timeout = httpx.Timeout(timeout_seconds) 65 + self.label_cache_prefix = label_cache_prefix 66 + self.label_cache_ttl_seconds = label_cache_ttl_seconds 67 + 68 + @classmethod 69 + def from_settings(cls) -> "ModerationClient": 70 + """create a client from application settings.""" 71 + return cls( 72 + service_url=settings.moderation.service_url, 73 + labeler_url=settings.moderation.labeler_url, 74 + auth_token=settings.moderation.auth_token, 75 + timeout_seconds=settings.moderation.timeout_seconds, 76 + label_cache_prefix=settings.moderation.label_cache_prefix, 77 + label_cache_ttl_seconds=settings.moderation.label_cache_ttl_seconds, 78 + ) 79 + 80 + def _headers(self) -> dict[str, str]: 81 + """common auth headers.""" 82 + return {"X-Moderation-Key": self.auth_token} 83 + 84 + async def scan(self, audio_url: str) -> ScanResult: 85 + """scan audio for potential copyright matches. 86 + 87 + args: 88 + audio_url: public URL of the audio file 89 + 90 + returns: 91 + ScanResult with match details 92 + 93 + raises: 94 + httpx.HTTPStatusError: on non-2xx response 95 + httpx.TimeoutException: on timeout 96 + """ 97 + async with httpx.AsyncClient(timeout=self.timeout) as client: 98 + response = await client.post( 99 + f"{self.service_url}/scan", 100 + json={"audio_url": audio_url}, 101 + headers=self._headers(), 102 + ) 103 + response.raise_for_status() 104 + data = response.json() 105 + 106 + return ScanResult( 107 + is_flagged=data.get("is_flagged", False), 108 + highest_score=data.get("highest_score", 0), 109 + matches=data.get("matches", []), 110 + raw_response=data.get("raw_response", {}), 111 + ) 112 + 113 + async def emit_label( 114 + self, 115 + uri: str, 116 + cid: str | None = None, 117 + val: str = "copyright-violation", 118 + context: dict[str, Any] | None = None, 119 + ) -> EmitLabelResult: 120 + """emit an ATProto label to the labeler service. 121 + 122 + args: 123 + uri: AT URI of the record to label 124 + cid: optional CID of the record 125 + val: label value (default: copyright-violation) 126 + context: optional metadata for admin UI display 127 + 128 + returns: 129 + EmitLabelResult indicating success/failure 130 + """ 131 + payload: dict[str, Any] = {"uri": uri, "val": val} 132 + if cid: 133 + payload["cid"] = cid 134 + if context: 135 + payload["context"] = context 136 + 137 + try: 138 + async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client: 139 + response = await client.post( 140 + f"{self.labeler_url}/emit-label", 141 + json=payload, 142 + headers=self._headers(), 143 + ) 144 + response.raise_for_status() 145 + 146 + # invalidate cache since label status changed 147 + await self.invalidate_cache(uri) 148 + 149 + logfire.info("copyright label emitted", uri=uri, cid=cid) 150 + return EmitLabelResult(success=True) 151 + 152 + except Exception as e: 153 + logger.warning("failed to emit copyright label for %s: %s", uri, e) 154 + return EmitLabelResult(success=False, error=str(e)) 155 + 156 + async def get_active_labels(self, uris: list[str]) -> set[str]: 157 + """check which URIs have active (non-negated) copyright-violation labels. 158 + 159 + uses redis cache to avoid repeated calls to the labeler service. 160 + fails closed (returns all URIs as active) if labeler is unreachable. 161 + 162 + args: 163 + uris: list of AT URIs to check 164 + 165 + returns: 166 + set of URIs that are still actively flagged 167 + """ 168 + if not uris: 169 + return set() 170 + 171 + # check redis cache first 172 + active_from_cache: set[str] = set() 173 + uris_to_fetch: list[str] = [] 174 + 175 + try: 176 + redis = get_async_redis_client() 177 + cache_keys = [f"{self.label_cache_prefix}{uri}" for uri in uris] 178 + cached_values = await redis.mget(cache_keys) 179 + 180 + for uri, cached_value in zip(uris, cached_values, strict=True): 181 + if cached_value is not None: 182 + if cached_value == "1": 183 + active_from_cache.add(uri) 184 + # else: cached as "0" (not active), skip 185 + else: 186 + uris_to_fetch.append(uri) 187 + except Exception as e: 188 + logger.warning("redis cache unavailable: %s", e) 189 + uris_to_fetch = list(uris) 190 + 191 + # if everything was cached, return early 192 + if not uris_to_fetch: 193 + logfire.debug( 194 + "checked active copyright labels (all cached)", 195 + total_uris=len(uris), 196 + active_count=len(active_from_cache), 197 + ) 198 + return active_from_cache 199 + 200 + # fetch uncached URIs from labeler 201 + try: 202 + async with httpx.AsyncClient(timeout=self.timeout) as client: 203 + response = await client.post( 204 + f"{self.labeler_url}/admin/active-labels", 205 + json={"uris": uris_to_fetch}, 206 + headers=self._headers(), 207 + ) 208 + response.raise_for_status() 209 + data = response.json() 210 + active_from_service = set(data.get("active_uris", [])) 211 + 212 + # update redis cache 213 + await self._cache_label_status(uris_to_fetch, active_from_service) 214 + 215 + logfire.info( 216 + "checked active copyright labels", 217 + total_uris=len(uris), 218 + cached_count=len(uris) - len(uris_to_fetch), 219 + fetched_count=len(uris_to_fetch), 220 + active_count=len(active_from_cache) + len(active_from_service), 221 + ) 222 + return active_from_cache | active_from_service 223 + 224 + except Exception as e: 225 + # fail closed: if we can't confirm resolution, treat as active 226 + logger.warning( 227 + "failed to check active labels, treating all as active: %s", e 228 + ) 229 + return set(uris) 230 + 231 + async def _cache_label_status(self, uris: list[str], active_uris: set[str]) -> None: 232 + """cache label status in redis.""" 233 + try: 234 + redis = get_async_redis_client() 235 + async with redis.pipeline() as pipe: 236 + for uri in uris: 237 + cache_key = f"{self.label_cache_prefix}{uri}" 238 + value = "1" if uri in active_uris else "0" 239 + await pipe.set(cache_key, value, ex=self.label_cache_ttl_seconds) 240 + await pipe.execute() 241 + except Exception as e: 242 + logger.warning("failed to update redis cache: %s", e) 243 + 244 + async def invalidate_cache(self, uri: str) -> None: 245 + """invalidate cache entry for a URI when its label status changes.""" 246 + try: 247 + redis = get_async_redis_client() 248 + await redis.delete(f"{self.label_cache_prefix}{uri}") 249 + except Exception as e: 250 + logger.warning("failed to invalidate label cache for %s: %s", uri, e) 251 + 252 + async def clear_cache(self) -> None: 253 + """clear all label cache entries. primarily for testing.""" 254 + try: 255 + redis = get_async_redis_client() 256 + cursor = 0 257 + while True: 258 + cursor, keys = await redis.scan( 259 + cursor, match=f"{self.label_cache_prefix}*", count=100 260 + ) 261 + if keys: 262 + await redis.delete(*keys) 263 + if cursor == 0: 264 + break 265 + except Exception as e: 266 + logger.warning("failed to clear label cache: %s", e) 267 + 268 + 269 + # module-level singleton 270 + _client: ModerationClient | None = None 271 + 272 + 273 + def get_moderation_client() -> ModerationClient: 274 + """get the moderation client singleton. 275 + 276 + creates the client on first call, reuses on subsequent calls. 277 + """ 278 + global _client 279 + if _client is None: 280 + _client = ModerationClient.from_settings() 281 + return _client
+1 -2
backend/src/backend/models/__init__.py
··· 2 2 3 3 from backend.models.album import Album 4 4 from backend.models.artist import Artist 5 - from backend.models.copyright_scan import CopyrightScan, ScanResolution 5 + from backend.models.copyright_scan import CopyrightScan 6 6 from backend.models.database import Base 7 7 from backend.models.sensitive_image import SensitiveImage 8 8 from backend.models.exchange_token import ExchangeToken ··· 32 32 "PendingScopeUpgrade", 33 33 "Playlist", 34 34 "QueueState", 35 - "ScanResolution", 36 35 "SensitiveImage", 37 36 "Tag", 38 37 "Track",
+7 -21
backend/src/backend/models/copyright_scan.py
··· 1 1 """copyright scan model for tracking moderation results.""" 2 2 3 3 from datetime import UTC, datetime 4 - from enum import Enum 5 4 from typing import Any 6 5 7 - from sqlalchemy import DateTime, ForeignKey, Index, Integer, String 6 + from sqlalchemy import DateTime, ForeignKey, Index, Integer 8 7 from sqlalchemy.dialects.postgresql import JSONB 9 8 from sqlalchemy.orm import Mapped, mapped_column 10 9 11 10 from backend.models.database import Base 12 11 13 12 14 - class ScanResolution(str, Enum): 15 - """resolution status for a flagged scan.""" 16 - 17 - PENDING = "pending" # awaiting review 18 - DISMISSED = "dismissed" # false positive, no action needed 19 - REMOVED = "removed" # track was removed 20 - LICENSED = "licensed" # verified to be properly licensed 21 - 22 - 23 13 class CopyrightScan(Base): 24 14 """copyright scan result from moderation service. 25 15 26 - stores scan results from AuDD API for tracking potential 27 - copyright issues without immediate enforcement (ozone pattern). 16 + stores scan results from AuDD API. the labeler service is the source 17 + of truth for whether a track is actively flagged (label not negated). 18 + 19 + the is_flagged field here indicates the initial scan result. the 20 + sync_copyright_resolutions background task updates it when labels 21 + are negated in the labeler. 28 22 """ 29 23 30 24 __tablename__ = "copyright_scans" ··· 61 55 default=dict, 62 56 server_default="{}", 63 57 ) 64 - 65 - # review tracking (for later human review) 66 - resolution: Mapped[str | None] = mapped_column(String, nullable=True) 67 - reviewed_at: Mapped[datetime | None] = mapped_column( 68 - DateTime(timezone=True), nullable=True 69 - ) 70 - reviewed_by: Mapped[str | None] = mapped_column(String, nullable=True) # DID 71 - review_notes: Mapped[str | None] = mapped_column(String, nullable=True) 72 58 73 59 __table_args__ = ( 74 60 Index("idx_copyright_scans_flagged", "is_flagged"),
+14 -83
backend/src/backend/utilities/aggregations.py
··· 3 3 import logging 4 4 from collections import Counter 5 5 from dataclasses import dataclass 6 - from datetime import UTC, datetime 7 6 from typing import Any 8 7 9 - from sqlalchemy import select, update 8 + from sqlalchemy import select 10 9 from sqlalchemy.ext.asyncio import AsyncSession 11 10 from sqlalchemy.sql import func 12 11 13 - from backend.models import CopyrightScan, Tag, Track, TrackComment, TrackLike, TrackTag 12 + from backend.models import CopyrightScan, Tag, TrackComment, TrackLike, TrackTag 14 13 15 14 logger = logging.getLogger(__name__) 16 15 ··· 75 74 ) -> dict[int, CopyrightInfo]: 76 75 """get copyright scan info for multiple tracks in a single query. 77 76 78 - checks the moderation service's labeler for the true resolution status. 79 - if a track was resolved (negation label exists), treats it as not flagged 80 - and lazily updates the backend's resolution field. 77 + this is a pure read - no reconciliation with the labeler. 78 + resolution sync happens via background task (sync_copyright_resolutions). 81 79 82 80 args: 83 81 db: database session ··· 89 87 if not track_ids: 90 88 return {} 91 89 92 - # get scans with track AT URIs for labeler lookup 93 - stmt = ( 94 - select( 95 - CopyrightScan.id, 96 - CopyrightScan.track_id, 97 - CopyrightScan.is_flagged, 98 - CopyrightScan.matches, 99 - CopyrightScan.resolution, 100 - Track.atproto_record_uri, 101 - ) 102 - .join(Track, CopyrightScan.track_id == Track.id) 103 - .where(CopyrightScan.track_id.in_(track_ids)) 104 - ) 90 + stmt = select( 91 + CopyrightScan.track_id, 92 + CopyrightScan.is_flagged, 93 + CopyrightScan.matches, 94 + ).where(CopyrightScan.track_id.in_(track_ids)) 105 95 106 96 result = await db.execute(stmt) 107 97 rows = result.all() 108 98 109 - # separate flagged scans that need labeler check vs already resolved 110 - needs_labeler_check: list[ 111 - tuple[int, int, str, list] 112 - ] = [] # scan_id, track_id, uri, matches 113 99 copyright_info: dict[int, CopyrightInfo] = {} 114 - 115 - for scan_id, track_id, is_flagged, matches, resolution, uri in rows: 116 - if not is_flagged or resolution is not None: 117 - # not flagged or already resolved - no need to check labeler 118 - copyright_info[track_id] = CopyrightInfo( 119 - is_flagged=False if resolution else is_flagged, 120 - primary_match=_extract_primary_match(matches) 121 - if is_flagged and not resolution 122 - else None, 123 - ) 124 - elif uri: 125 - # flagged with no resolution - need to check labeler 126 - needs_labeler_check.append((scan_id, track_id, uri, matches)) 127 - else: 128 - # flagged but no AT URI - can't check labeler, treat as flagged 129 - copyright_info[track_id] = CopyrightInfo( 130 - is_flagged=True, 131 - primary_match=_extract_primary_match(matches), 132 - ) 133 - 134 - # check labeler for tracks that need it 135 - if needs_labeler_check: 136 - from backend._internal.moderation import get_active_copyright_labels 137 - 138 - uris = [uri for _, _, uri, _ in needs_labeler_check] 139 - active_uris = await get_active_copyright_labels(uris) 140 - 141 - # process results and lazily update DB for resolved tracks 142 - resolved_scan_ids: list[int] = [] 143 - for scan_id, track_id, uri, matches in needs_labeler_check: 144 - if uri in active_uris: 145 - # still actively flagged 146 - copyright_info[track_id] = CopyrightInfo( 147 - is_flagged=True, 148 - primary_match=_extract_primary_match(matches), 149 - ) 150 - else: 151 - # resolved in labeler - treat as not flagged 152 - copyright_info[track_id] = CopyrightInfo( 153 - is_flagged=False, 154 - primary_match=None, 155 - ) 156 - resolved_scan_ids.append(scan_id) 157 - 158 - # lazily update resolution for newly discovered resolved scans 159 - if resolved_scan_ids: 160 - try: 161 - await db.execute( 162 - update(CopyrightScan) 163 - .where(CopyrightScan.id.in_(resolved_scan_ids)) 164 - .values(resolution="dismissed", reviewed_at=datetime.now(UTC)) 165 - ) 166 - await db.commit() 167 - logger.info( 168 - "lazily updated %d copyright scans as dismissed", 169 - len(resolved_scan_ids), 170 - ) 171 - except Exception as e: 172 - logger.warning("failed to lazily update copyright resolutions: %s", e) 173 - await db.rollback() 100 + for track_id, is_flagged, matches in rows: 101 + copyright_info[track_id] = CopyrightInfo( 102 + is_flagged=is_flagged, 103 + primary_match=_extract_primary_match(matches) if is_flagged else None, 104 + ) 174 105 175 106 return copyright_info 176 107
+215 -326
backend/tests/test_moderation.py
··· 8 8 from sqlalchemy.ext.asyncio import AsyncSession 9 9 10 10 from backend._internal.moderation import ( 11 - _call_moderation_service, 12 - _store_scan_result, 13 11 get_active_copyright_labels, 14 12 scan_track_for_copyright, 15 13 ) 14 + from backend._internal.moderation_client import ModerationClient, ScanResult 16 15 from backend.models import Artist, CopyrightScan, Track 17 16 18 17 19 18 @pytest.fixture 20 - def mock_moderation_response() -> dict: 21 - """typical response from moderation service.""" 22 - return { 23 - "matches": [ 19 + def mock_scan_result() -> ScanResult: 20 + """typical scan result from moderation client.""" 21 + return ScanResult( 22 + is_flagged=True, 23 + highest_score=85, 24 + matches=[ 24 25 { 25 26 "artist": "Test Artist", 26 27 "title": "Test Song", ··· 28 29 "isrc": "USRC12345678", 29 30 } 30 31 ], 31 - "is_flagged": True, 32 - "highest_score": 85, 33 - "raw_response": {"status": "success", "result": []}, 34 - } 32 + raw_response={"status": "success", "result": []}, 33 + ) 35 34 36 35 37 36 @pytest.fixture 38 - def mock_clear_response() -> dict: 39 - """response when no copyright matches found.""" 40 - return { 41 - "matches": [], 42 - "is_flagged": False, 43 - "highest_score": 0, 44 - "raw_response": {"status": "success", "result": None}, 45 - } 37 + def mock_clear_result() -> ScanResult: 38 + """scan result when no copyright matches found.""" 39 + return ScanResult( 40 + is_flagged=False, 41 + highest_score=0, 42 + matches=[], 43 + raw_response={"status": "success", "result": None}, 44 + ) 46 45 47 46 48 - async def test_call_moderation_service_success( 49 - mock_moderation_response: dict, 50 - ) -> None: 51 - """test successful call to moderation service.""" 52 - # use regular Mock for response since httpx Response methods are sync 47 + async def test_moderation_client_scan_success() -> None: 48 + """test ModerationClient.scan() with successful response.""" 53 49 mock_response = Mock() 54 - mock_response.json.return_value = mock_moderation_response 50 + mock_response.json.return_value = { 51 + "is_flagged": True, 52 + "highest_score": 85, 53 + "matches": [{"artist": "Test", "title": "Song", "score": 85}], 54 + "raw_response": {"status": "success"}, 55 + } 55 56 mock_response.raise_for_status.return_value = None 56 57 58 + client = ModerationClient( 59 + service_url="https://test.example.com", 60 + labeler_url="https://labeler.example.com", 61 + auth_token="test-token", 62 + timeout_seconds=30, 63 + label_cache_prefix="test:label:", 64 + label_cache_ttl_seconds=300, 65 + ) 66 + 57 67 with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 58 68 mock_post.return_value = mock_response 59 69 60 - with patch("backend._internal.moderation.settings") as mock_settings: 61 - mock_settings.moderation.service_url = "https://test.example.com" 62 - mock_settings.moderation.auth_token = "test-token" 63 - mock_settings.moderation.timeout_seconds = 30 70 + result = await client.scan("https://example.com/audio.mp3") 64 71 65 - result = await _call_moderation_service("https://example.com/audio.mp3") 66 - 67 - assert result == mock_moderation_response 72 + assert result.is_flagged is True 73 + assert result.highest_score == 85 74 + assert len(result.matches) == 1 68 75 mock_post.assert_called_once() 69 - call_kwargs = mock_post.call_args 70 - assert call_kwargs.kwargs["json"] == {"audio_url": "https://example.com/audio.mp3"} 71 - assert call_kwargs.kwargs["headers"] == {"X-Moderation-Key": "test-token"} 72 76 73 77 74 - async def test_call_moderation_service_timeout() -> None: 75 - """test timeout handling.""" 78 + async def test_moderation_client_scan_timeout() -> None: 79 + """test ModerationClient.scan() timeout handling.""" 80 + client = ModerationClient( 81 + service_url="https://test.example.com", 82 + labeler_url="https://labeler.example.com", 83 + auth_token="test-token", 84 + timeout_seconds=30, 85 + label_cache_prefix="test:label:", 86 + label_cache_ttl_seconds=300, 87 + ) 88 + 76 89 with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 77 90 mock_post.side_effect = httpx.TimeoutException("timeout") 78 91 79 - with patch("backend._internal.moderation.settings") as mock_settings: 80 - mock_settings.moderation.service_url = "https://test.example.com" 81 - mock_settings.moderation.auth_token = "test-token" 82 - mock_settings.moderation.timeout_seconds = 30 83 - 84 - with pytest.raises(httpx.TimeoutException): 85 - await _call_moderation_service("https://example.com/audio.mp3") 92 + with pytest.raises(httpx.TimeoutException): 93 + await client.scan("https://example.com/audio.mp3") 86 94 87 95 88 - async def test_store_scan_result_flagged( 96 + async def test_scan_track_stores_flagged_result( 89 97 db_session: AsyncSession, 90 - mock_moderation_response: dict, 98 + mock_scan_result: ScanResult, 91 99 ) -> None: 92 100 """test storing a flagged scan result.""" 93 - # create test artist and track 94 101 artist = Artist( 95 102 did="did:plc:test123", 96 103 handle="test.bsky.social", ··· 109 116 db_session.add(track) 110 117 await db_session.commit() 111 118 112 - await _store_scan_result(track.id, mock_moderation_response) 119 + with patch("backend._internal.moderation.settings") as mock_settings: 120 + mock_settings.moderation.enabled = True 121 + mock_settings.moderation.auth_token = "test-token" 113 122 114 - # verify scan was stored 123 + with patch( 124 + "backend._internal.moderation.get_moderation_client" 125 + ) as mock_get_client: 126 + mock_client = AsyncMock() 127 + mock_client.scan.return_value = mock_scan_result 128 + mock_get_client.return_value = mock_client 129 + 130 + assert track.r2_url is not None 131 + await scan_track_for_copyright(track.id, track.r2_url) 132 + 115 133 result = await db_session.execute( 116 134 select(CopyrightScan).where(CopyrightScan.track_id == track.id) 117 135 ) ··· 123 141 assert scan.matches[0]["artist"] == "Test Artist" 124 142 125 143 126 - async def test_store_scan_result_flagged_emits_label( 144 + async def test_scan_track_emits_label_when_flagged( 127 145 db_session: AsyncSession, 128 - mock_moderation_response: dict, 146 + mock_scan_result: ScanResult, 129 147 ) -> None: 130 148 """test that flagged scan result emits ATProto label.""" 131 - # create test artist and track with ATProto URI 132 149 artist = Artist( 133 150 did="did:plc:labelertest", 134 151 handle="labeler.bsky.social", ··· 149 166 db_session.add(track) 150 167 await db_session.commit() 151 168 152 - with patch( 153 - "backend._internal.moderation._emit_copyright_label", 154 - new_callable=AsyncMock, 155 - ) as mock_emit: 156 - await _store_scan_result(track.id, mock_moderation_response) 169 + with patch("backend._internal.moderation.settings") as mock_settings: 170 + mock_settings.moderation.enabled = True 171 + mock_settings.moderation.auth_token = "test-token" 172 + 173 + with patch( 174 + "backend._internal.moderation.get_moderation_client" 175 + ) as mock_get_client: 176 + mock_client = AsyncMock() 177 + mock_client.scan.return_value = mock_scan_result 178 + mock_client.emit_label = AsyncMock() 179 + mock_get_client.return_value = mock_client 180 + 181 + assert track.r2_url is not None 182 + await scan_track_for_copyright(track.id, track.r2_url) 157 183 158 - # verify label emission was called with full context 159 - mock_emit.assert_called_once_with( 160 - uri="at://did:plc:labelertest/fm.plyr.track/abc123", 161 - cid="bafyreiabc123", 162 - track_id=track.id, 163 - track_title="Labeler Test Track", 164 - artist_handle="labeler.bsky.social", 165 - artist_did="did:plc:labelertest", 166 - highest_score=85, 167 - matches=[ 168 - { 169 - "artist": "Test Artist", 170 - "title": "Test Song", 171 - "score": 85, 172 - "isrc": "USRC12345678", 173 - } 174 - ], 175 - ) 184 + # verify label was emitted 185 + mock_client.emit_label.assert_called_once() 186 + call_kwargs = mock_client.emit_label.call_args.kwargs 187 + assert call_kwargs["uri"] == "at://did:plc:labelertest/fm.plyr.track/abc123" 188 + assert call_kwargs["cid"] == "bafyreiabc123" 176 189 177 190 178 - async def test_store_scan_result_flagged_no_atproto_uri_skips_label( 191 + async def test_scan_track_no_label_without_atproto_uri( 179 192 db_session: AsyncSession, 180 - mock_moderation_response: dict, 193 + mock_scan_result: ScanResult, 181 194 ) -> None: 182 195 """test that flagged scan without ATProto URI skips label emission.""" 183 - # create test artist and track without ATProto URI 184 196 artist = Artist( 185 197 did="did:plc:nouri", 186 198 handle="nouri.bsky.social", ··· 200 212 db_session.add(track) 201 213 await db_session.commit() 202 214 203 - with patch( 204 - "backend._internal.moderation._emit_copyright_label", 205 - new_callable=AsyncMock, 206 - ) as mock_emit: 207 - await _store_scan_result(track.id, mock_moderation_response) 215 + with patch("backend._internal.moderation.settings") as mock_settings: 216 + mock_settings.moderation.enabled = True 217 + mock_settings.moderation.auth_token = "test-token" 208 218 209 - # label emission should not be called 210 - mock_emit.assert_not_called() 219 + with patch( 220 + "backend._internal.moderation.get_moderation_client" 221 + ) as mock_get_client: 222 + mock_client = AsyncMock() 223 + mock_client.scan.return_value = mock_scan_result 224 + mock_client.emit_label = AsyncMock() 225 + mock_get_client.return_value = mock_client 211 226 227 + assert track.r2_url is not None 228 + await scan_track_for_copyright(track.id, track.r2_url) 212 229 213 - async def test_store_scan_result_clear( 230 + # label emission should not be called 231 + mock_client.emit_label.assert_not_called() 232 + 233 + 234 + async def test_scan_track_stores_clear_result( 214 235 db_session: AsyncSession, 215 - mock_clear_response: dict, 236 + mock_clear_result: ScanResult, 216 237 ) -> None: 217 238 """test storing a clear (no matches) scan result.""" 218 - # create test artist and track 219 239 artist = Artist( 220 240 did="did:plc:test456", 221 241 handle="clear.bsky.social", ··· 234 254 db_session.add(track) 235 255 await db_session.commit() 236 256 237 - await _store_scan_result(track.id, mock_clear_response) 257 + with patch("backend._internal.moderation.settings") as mock_settings: 258 + mock_settings.moderation.enabled = True 259 + mock_settings.moderation.auth_token = "test-token" 260 + 261 + with patch( 262 + "backend._internal.moderation.get_moderation_client" 263 + ) as mock_get_client: 264 + mock_client = AsyncMock() 265 + mock_client.scan.return_value = mock_clear_result 266 + mock_get_client.return_value = mock_client 267 + 268 + assert track.r2_url is not None 269 + await scan_track_for_copyright(track.id, track.r2_url) 238 270 239 - # verify scan was stored 240 271 result = await db_session.execute( 241 272 select(CopyrightScan).where(CopyrightScan.track_id == track.id) 242 273 ) ··· 253 284 mock_settings.moderation.enabled = False 254 285 255 286 with patch( 256 - "backend._internal.moderation._call_moderation_service" 257 - ) as mock_call: 287 + "backend._internal.moderation.get_moderation_client" 288 + ) as mock_get_client: 258 289 await scan_track_for_copyright(1, "https://example.com/audio.mp3") 259 290 260 - # should not call the service when disabled 261 - mock_call.assert_not_called() 291 + # should not even get the client when disabled 292 + mock_get_client.assert_not_called() 262 293 263 294 264 295 async def test_scan_track_no_auth_token() -> None: ··· 268 299 mock_settings.moderation.auth_token = "" 269 300 270 301 with patch( 271 - "backend._internal.moderation._call_moderation_service" 272 - ) as mock_call: 302 + "backend._internal.moderation.get_moderation_client" 303 + ) as mock_get_client: 273 304 await scan_track_for_copyright(1, "https://example.com/audio.mp3") 274 305 275 - # should not call the service without auth token 276 - mock_call.assert_not_called() 306 + # should not even get the client without auth token 307 + mock_get_client.assert_not_called() 277 308 278 309 279 310 async def test_scan_track_service_error_stores_as_clear( 280 311 db_session: AsyncSession, 281 312 ) -> None: 282 313 """test that service errors are stored as clear results.""" 283 - # create test artist and track 284 314 artist = Artist( 285 315 did="did:plc:errortest", 286 316 handle="errortest.bsky.social", ··· 304 334 mock_settings.moderation.auth_token = "test-token" 305 335 306 336 with patch( 307 - "backend._internal.moderation._call_moderation_service", 308 - new_callable=AsyncMock, 309 - ) as mock_call: 310 - mock_call.side_effect = httpx.HTTPStatusError( 337 + "backend._internal.moderation.get_moderation_client" 338 + ) as mock_get_client: 339 + mock_client = AsyncMock() 340 + mock_client.scan.side_effect = httpx.HTTPStatusError( 311 341 "502 error", 312 342 request=AsyncMock(), 313 343 response=AsyncMock(status_code=502), 314 344 ) 345 + mock_get_client.return_value = mock_client 315 346 316 347 # should not raise - stores error as clear 317 348 await scan_track_for_copyright(track.id, "https://example.com/short.mp3") 318 349 319 - # verify scan was stored as clear with error info 320 350 result = await db_session.execute( 321 351 select(CopyrightScan).where(CopyrightScan.track_id == track.id) 322 352 ) ··· 329 359 assert scan.raw_response["status"] == "scan_failed" 330 360 331 361 332 - async def test_scan_track_full_flow( 333 - db_session: AsyncSession, 334 - mock_moderation_response: dict, 335 - ) -> None: 336 - """test complete scan flow from track to stored result.""" 337 - # create test artist and track 338 - artist = Artist( 339 - did="did:plc:fullflow", 340 - handle="fullflow.bsky.social", 341 - display_name="Full Flow User", 342 - ) 343 - db_session.add(artist) 344 - await db_session.commit() 345 - 346 - track = Track( 347 - title="Full Flow Track", 348 - file_id="fullflow_file", 349 - file_type="flac", 350 - artist_did=artist.did, 351 - r2_url="https://example.com/fullflow.flac", 352 - ) 353 - db_session.add(track) 354 - await db_session.commit() 355 - 356 - with patch("backend._internal.moderation.settings") as mock_settings: 357 - mock_settings.moderation.enabled = True 358 - mock_settings.moderation.auth_token = "test-token" 359 - mock_settings.moderation.service_url = "https://test.example.com" 360 - mock_settings.moderation.timeout_seconds = 30 361 - 362 - with patch( 363 - "backend._internal.moderation._call_moderation_service", 364 - new_callable=AsyncMock, 365 - ) as mock_call: 366 - mock_call.return_value = mock_moderation_response 367 - 368 - assert track.r2_url is not None 369 - await scan_track_for_copyright(track.id, track.r2_url) 370 - 371 - # verify scan was stored (need fresh session query) 372 - result = await db_session.execute( 373 - select(CopyrightScan).where(CopyrightScan.track_id == track.id) 374 - ) 375 - scan = result.scalar_one() 376 - 377 - assert scan.is_flagged is True 378 - assert scan.highest_score == 85 379 - 380 - 381 362 # tests for get_active_copyright_labels 382 363 383 364 ··· 420 401 "at://did:plc:success/fm.plyr.track/3", 421 402 ] 422 403 423 - mock_response = Mock() 424 - mock_response.json.return_value = { 425 - "active_uris": [uris[0]] # only first is active 426 - } 427 - mock_response.raise_for_status.return_value = None 428 - 429 404 with patch("backend._internal.moderation.settings") as mock_settings: 430 405 mock_settings.moderation.enabled = True 431 406 mock_settings.moderation.auth_token = "test-token" 432 - mock_settings.moderation.labeler_url = "https://test.example.com" 433 - mock_settings.moderation.timeout_seconds = 30 434 - mock_settings.moderation.label_cache_prefix = "test:label:" 435 - mock_settings.moderation.label_cache_ttl_seconds = 300 436 407 437 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 438 - mock_post.return_value = mock_response 408 + with patch( 409 + "backend._internal.moderation.get_moderation_client" 410 + ) as mock_get_client: 411 + mock_client = AsyncMock() 412 + mock_client.get_active_labels.return_value = {uris[0]} # only first active 413 + mock_get_client.return_value = mock_client 439 414 440 415 result = await get_active_copyright_labels(uris) 441 416 442 - # only the active URI should be in the result 443 - assert result == {uris[0]} 444 - 445 - # verify correct endpoint was called 446 - call_kwargs = mock_post.call_args 447 - assert "/admin/active-labels" in str(call_kwargs) 448 - assert call_kwargs.kwargs["json"] == {"uris": uris} 417 + assert result == {uris[0]} 449 418 450 419 451 420 async def test_get_active_copyright_labels_service_error() -> None: ··· 458 427 with patch("backend._internal.moderation.settings") as mock_settings: 459 428 mock_settings.moderation.enabled = True 460 429 mock_settings.moderation.auth_token = "test-token" 461 - mock_settings.moderation.labeler_url = "https://test.example.com" 462 - mock_settings.moderation.timeout_seconds = 30 463 - mock_settings.moderation.label_cache_prefix = "test:label:" 464 - mock_settings.moderation.label_cache_ttl_seconds = 300 465 430 466 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 467 - mock_post.side_effect = httpx.ConnectError("connection failed") 431 + with patch( 432 + "backend._internal.moderation.get_moderation_client" 433 + ) as mock_get_client: 434 + mock_client = AsyncMock() 435 + # client's get_active_labels fails closed internally 436 + mock_client.get_active_labels.return_value = set(uris) 437 + mock_get_client.return_value = mock_client 468 438 469 439 result = await get_active_copyright_labels(uris) 470 440 471 - # should fail closed - all URIs treated as active 472 - assert result == set(uris) 473 - 474 - 475 - # tests for active labels caching (using real redis from test docker-compose) 476 - 477 - 478 - async def test_get_active_copyright_labels_caching() -> None: 479 - """test that repeated calls use cache instead of calling service.""" 480 - uris = [ 481 - "at://did:plc:caching/fm.plyr.track/1", 482 - "at://did:plc:caching/fm.plyr.track/2", 483 - ] 484 - 485 - mock_response = Mock() 486 - mock_response.json.return_value = { 487 - "active_uris": [uris[0]] # only first is active 488 - } 489 - mock_response.raise_for_status.return_value = None 490 - 491 - with patch("backend._internal.moderation.settings") as mock_settings: 492 - mock_settings.moderation.enabled = True 493 - mock_settings.moderation.auth_token = "test-token" 494 - mock_settings.moderation.labeler_url = "https://test.example.com" 495 - mock_settings.moderation.timeout_seconds = 30 496 - mock_settings.moderation.label_cache_prefix = "test:label:" 497 - mock_settings.moderation.label_cache_ttl_seconds = 300 498 - 499 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 500 - mock_post.return_value = mock_response 501 - 502 - # first call - should hit service 503 - result1 = await get_active_copyright_labels(uris) 504 - assert result1 == {uris[0]} 505 - assert mock_post.call_count == 1 506 - 507 - # second call with same URIs - should use cache, not call service 508 - result2 = await get_active_copyright_labels(uris) 509 - assert result2 == {uris[0]} 510 - assert mock_post.call_count == 1 # still 1, no new call 511 - 512 - 513 - async def test_get_active_copyright_labels_partial_cache() -> None: 514 - """test that cache hits are combined with service calls for new URIs.""" 515 - uris_batch1 = ["at://did:plc:partial/fm.plyr.track/1"] 516 - uris_batch2 = [ 517 - "at://did:plc:partial/fm.plyr.track/1", # cached 518 - "at://did:plc:partial/fm.plyr.track/2", # new 519 - ] 520 - 521 - mock_response1 = Mock() 522 - mock_response1.json.return_value = { 523 - "active_uris": ["at://did:plc:partial/fm.plyr.track/1"] 524 - } 525 - mock_response1.raise_for_status.return_value = None 526 - 527 - mock_response2 = Mock() 528 - mock_response2.json.return_value = { 529 - "active_uris": [] # uri/2 is not active 530 - } 531 - mock_response2.raise_for_status.return_value = None 532 - 533 - with patch("backend._internal.moderation.settings") as mock_settings: 534 - mock_settings.moderation.enabled = True 535 - mock_settings.moderation.auth_token = "test-token" 536 - mock_settings.moderation.labeler_url = "https://test.example.com" 537 - mock_settings.moderation.timeout_seconds = 30 538 - mock_settings.moderation.label_cache_prefix = "test:label:" 539 - mock_settings.moderation.label_cache_ttl_seconds = 300 540 - 541 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 542 - mock_post.side_effect = [mock_response1, mock_response2] 543 - 544 - # first call - cache uri/1 as active 545 - result1 = await get_active_copyright_labels(uris_batch1) 546 - assert result1 == {"at://did:plc:partial/fm.plyr.track/1"} 547 - assert mock_post.call_count == 1 548 - 549 - # second call - uri/1 from cache, only uri/2 fetched 550 - result2 = await get_active_copyright_labels(uris_batch2) 551 - # uri/1 is active (from cache), uri/2 is not active (from service) 552 - assert result2 == {"at://did:plc:partial/fm.plyr.track/1"} 553 - assert mock_post.call_count == 2 441 + assert result == set(uris) 554 442 555 - # verify second call only requested uri/2 556 - second_call_args = mock_post.call_args_list[1] 557 - assert second_call_args.kwargs["json"] == { 558 - "uris": ["at://did:plc:partial/fm.plyr.track/2"] 559 - } 560 443 444 + # tests for background task 561 445 562 - async def test_get_active_copyright_labels_cache_invalidation() -> None: 563 - """test that invalidate_label_cache clears specific entry.""" 564 - from backend._internal.moderation import invalidate_label_cache 565 446 566 - uris = ["at://did:plc:invalidate/fm.plyr.track/1"] 447 + async def test_sync_copyright_resolutions(db_session: AsyncSession) -> None: 448 + """test that sync_copyright_resolutions updates flagged scans.""" 449 + from backend._internal.background_tasks import sync_copyright_resolutions 567 450 568 - mock_response = Mock() 569 - mock_response.json.return_value = { 570 - "active_uris": ["at://did:plc:invalidate/fm.plyr.track/1"] 571 - } 572 - mock_response.raise_for_status.return_value = None 451 + # create test artist and tracks 452 + artist = Artist( 453 + did="did:plc:synctest", 454 + handle="synctest.bsky.social", 455 + display_name="Sync Test User", 456 + ) 457 + db_session.add(artist) 458 + await db_session.commit() 573 459 574 - with patch("backend._internal.moderation.settings") as mock_settings: 575 - mock_settings.moderation.enabled = True 576 - mock_settings.moderation.auth_token = "test-token" 577 - mock_settings.moderation.labeler_url = "https://test.example.com" 578 - mock_settings.moderation.timeout_seconds = 30 579 - mock_settings.moderation.label_cache_prefix = "test:label:" 580 - mock_settings.moderation.label_cache_ttl_seconds = 300 460 + # track 1: flagged, will be resolved 461 + track1 = Track( 462 + title="Flagged Track 1", 463 + file_id="flagged_1", 464 + file_type="mp3", 465 + artist_did=artist.did, 466 + r2_url="https://example.com/flagged1.mp3", 467 + atproto_record_uri="at://did:plc:synctest/fm.plyr.track/1", 468 + ) 469 + db_session.add(track1) 581 470 582 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 583 - mock_post.return_value = mock_response 471 + # track 2: flagged, will stay flagged 472 + track2 = Track( 473 + title="Flagged Track 2", 474 + file_id="flagged_2", 475 + file_type="mp3", 476 + artist_did=artist.did, 477 + r2_url="https://example.com/flagged2.mp3", 478 + atproto_record_uri="at://did:plc:synctest/fm.plyr.track/2", 479 + ) 480 + db_session.add(track2) 481 + await db_session.commit() 584 482 585 - # first call - populate cache 586 - result1 = await get_active_copyright_labels(uris) 587 - assert result1 == {"at://did:plc:invalidate/fm.plyr.track/1"} 588 - assert mock_post.call_count == 1 483 + # create flagged scans 484 + scan1 = CopyrightScan( 485 + track_id=track1.id, 486 + is_flagged=True, 487 + highest_score=85, 488 + matches=[{"artist": "Test", "title": "Song"}], 489 + raw_response={}, 490 + ) 491 + scan2 = CopyrightScan( 492 + track_id=track2.id, 493 + is_flagged=True, 494 + highest_score=90, 495 + matches=[{"artist": "Test", "title": "Song2"}], 496 + raw_response={}, 497 + ) 498 + db_session.add_all([scan1, scan2]) 499 + await db_session.commit() 589 500 590 - # invalidate the cache entry 591 - await invalidate_label_cache("at://did:plc:invalidate/fm.plyr.track/1") 501 + with patch( 502 + "backend._internal.moderation_client.get_moderation_client" 503 + ) as mock_get_client: 504 + mock_client = AsyncMock() 505 + # only track2's URI is still active 506 + mock_client.get_active_labels.return_value = { 507 + "at://did:plc:synctest/fm.plyr.track/2" 508 + } 509 + mock_get_client.return_value = mock_client 592 510 593 - # next call - should hit service again since cache was invalidated 594 - result2 = await get_active_copyright_labels(uris) 595 - assert result2 == {"at://did:plc:invalidate/fm.plyr.track/1"} 596 - assert mock_post.call_count == 2 511 + await sync_copyright_resolutions() 597 512 513 + # refresh from db 514 + await db_session.refresh(scan1) 515 + await db_session.refresh(scan2) 598 516 599 - async def test_service_error_does_not_cache() -> None: 600 - """test that service errors don't pollute the cache.""" 601 - # use unique URIs for this test to avoid cache pollution from other tests 602 - uris = ["at://did:plc:errnocache/fm.plyr.track/1"] 517 + # scan1 should no longer be flagged (label was negated) 518 + assert scan1.is_flagged is False 603 519 604 - mock_success_response = Mock() 605 - mock_success_response.json.return_value = {"active_uris": []} 606 - mock_success_response.raise_for_status.return_value = None 607 - 608 - with patch("backend._internal.moderation.settings") as mock_settings: 609 - mock_settings.moderation.enabled = True 610 - mock_settings.moderation.auth_token = "test-token" 611 - mock_settings.moderation.labeler_url = "https://test.example.com" 612 - mock_settings.moderation.timeout_seconds = 30 613 - mock_settings.moderation.label_cache_prefix = "test:label:" 614 - mock_settings.moderation.label_cache_ttl_seconds = 300 615 - 616 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 617 - # first call fails 618 - mock_post.side_effect = httpx.ConnectError("connection failed") 619 - 620 - # first call - fails, returns all URIs as active (fail closed) 621 - result1 = await get_active_copyright_labels(uris) 622 - assert result1 == set(uris) 623 - assert mock_post.call_count == 1 624 - 625 - # reset mock to succeed 626 - mock_post.side_effect = None 627 - mock_post.return_value = mock_success_response 628 - 629 - # second call - should try service again (error wasn't cached) 630 - result2 = await get_active_copyright_labels(uris) 631 - assert result2 == set() # now correctly shows not active 632 - assert mock_post.call_count == 2 520 + # scan2 should still be flagged 521 + assert scan2.is_flagged is True
+20 -70
backend/tests/utilities/test_aggregations.py
··· 1 1 """tests for aggregation utilities.""" 2 2 3 - from unittest.mock import AsyncMock, patch 4 - 5 3 import pytest 6 - from sqlalchemy import select 7 4 from sqlalchemy.ext.asyncio import AsyncSession 8 5 9 6 from backend.models import Artist, CopyrightScan, Track, TrackLike ··· 146 143 return track 147 144 148 145 149 - async def test_get_copyright_info_already_resolved( 146 + async def test_get_copyright_info_flagged( 150 147 db_session: AsyncSession, flagged_track: Track 151 148 ) -> None: 152 - """test that already resolved scans are treated as not flagged.""" 153 - # update scan to have resolution set 154 - scan = await db_session.scalar( 155 - select(CopyrightScan).where(CopyrightScan.track_id == flagged_track.id) 156 - ) 157 - assert scan is not None 158 - scan.resolution = "dismissed" 159 - await db_session.commit() 160 - 161 - # should NOT call labeler since resolution is already set 162 - with patch( 163 - "backend._internal.moderation.get_active_copyright_labels", 164 - new_callable=AsyncMock, 165 - ) as mock_labeler: 166 - result = await get_copyright_info(db_session, [flagged_track.id]) 149 + """test that flagged scans are returned as flagged. 167 150 168 - # labeler should not be called for already-resolved scans 169 - mock_labeler.assert_not_called() 151 + get_copyright_info is now a pure read - it reads the is_flagged state 152 + directly from the database. the sync_copyright_resolutions background 153 + task is responsible for updating is_flagged based on labeler state. 154 + """ 155 + result = await get_copyright_info(db_session, [flagged_track.id]) 170 156 171 - # track should show as not flagged 172 - assert flagged_track.id in result 173 - assert result[flagged_track.id].is_flagged is False 174 - 175 - 176 - async def test_get_copyright_info_checks_labeler_for_pending( 177 - db_session: AsyncSession, flagged_track: Track 178 - ) -> None: 179 - """test that pending flagged scans query the labeler.""" 180 - # mock labeler returning this URI as active (still flagged) 181 - with patch( 182 - "backend._internal.moderation.get_active_copyright_labels", 183 - new_callable=AsyncMock, 184 - ) as mock_labeler: 185 - mock_labeler.return_value = {flagged_track.atproto_record_uri} 186 - 187 - result = await get_copyright_info(db_session, [flagged_track.id]) 188 - 189 - # labeler should be called 190 - mock_labeler.assert_called_once() 191 - call_args = mock_labeler.call_args[0][0] 192 - assert flagged_track.atproto_record_uri in call_args 193 - 194 - # track should still show as flagged 195 157 assert flagged_track.id in result 196 158 assert result[flagged_track.id].is_flagged is True 197 159 assert result[flagged_track.id].primary_match == "Copyrighted Song by Famous Artist" 198 160 199 161 200 - async def test_get_copyright_info_resolved_in_labeler( 162 + async def test_get_copyright_info_not_flagged( 201 163 db_session: AsyncSession, flagged_track: Track 202 164 ) -> None: 203 - """test that labeler resolution clears the flag and updates DB.""" 204 - # mock labeler returning empty set (all resolved) 205 - with patch( 206 - "backend._internal.moderation.get_active_copyright_labels", 207 - new_callable=AsyncMock, 208 - ) as mock_labeler: 209 - mock_labeler.return_value = set() # not active = resolved 165 + """test that resolved scans (is_flagged=False) are returned as not flagged.""" 166 + from sqlalchemy import select 210 167 211 - result = await get_copyright_info(db_session, [flagged_track.id]) 212 - 213 - # track should show as not flagged 214 - assert flagged_track.id in result 215 - assert result[flagged_track.id].is_flagged is False 216 - 217 - # verify lazy update: resolution should be set in DB 168 + # update scan to be not flagged (simulates sync_copyright_resolutions running) 218 169 scan = await db_session.scalar( 219 170 select(CopyrightScan).where(CopyrightScan.track_id == flagged_track.id) 220 171 ) 221 172 assert scan is not None 222 - assert scan.resolution == "dismissed" 223 - assert scan.reviewed_at is not None 173 + scan.is_flagged = False 174 + await db_session.commit() 175 + 176 + result = await get_copyright_info(db_session, [flagged_track.id]) 177 + 178 + assert flagged_track.id in result 179 + assert result[flagged_track.id].is_flagged is False 180 + assert result[flagged_track.id].primary_match is None 224 181 225 182 226 183 async def test_get_copyright_info_empty_list(db_session: AsyncSession) -> None: ··· 236 193 # test_tracks fixture doesn't create copyright scans 237 194 track_ids = [track.id for track in test_tracks] 238 195 239 - with patch( 240 - "backend._internal.moderation.get_active_copyright_labels", 241 - new_callable=AsyncMock, 242 - ) as mock_labeler: 243 - result = await get_copyright_info(db_session, track_ids) 244 - 245 - # labeler should not be called since no flagged tracks 246 - mock_labeler.assert_not_called() 196 + result = await get_copyright_info(db_session, track_ids) 247 197 248 198 # no tracks should be in result since none have scans 249 199 assert result == {}
docs/plans/.gitkeep

This is a binary file and will not be displayed.

docs/research/.gitkeep

This is a binary file and will not be displayed.

+122
docs/research/2025-12-18-moderation-cleanup.md
··· 1 + # research: moderation cleanup 2 + 3 + **date**: 2025-12-18 4 + **question**: understand issues #541-544 and how the moderation system works to inform cleanup 5 + 6 + ## summary 7 + 8 + the moderation system is split between backend (Python/FastAPI) and moderation service (Rust). copyright scanning uses AudD API, stores results in backend's `copyright_scans` table, and emits ATProto labels via the moderation service. there's a "lazy reconciliation" pattern on read paths that adds complexity. sensitive images are entirely in backend. the 4 issues propose consolidating this into a cleaner architecture. 9 + 10 + ## findings 11 + 12 + ### current architecture 13 + 14 + ``` 15 + ┌─────────────────────────────────────────────────────────────────┐ 16 + │ BACKEND (Python) │ 17 + ├─────────────────────────────────────────────────────────────────┤ 18 + │ _internal/moderation.py │ 19 + │ - scan_track_for_copyright() → calls moderation service /scan │ 20 + │ - _emit_copyright_label() → POST /emit-label │ 21 + │ - get_active_copyright_labels() → POST /admin/active-labels │ 22 + │ (each creates its own httpx.AsyncClient) │ 23 + ├─────────────────────────────────────────────────────────────────┤ 24 + │ models/copyright_scan.py │ 25 + │ - is_flagged, resolution, matches, raw_response │ 26 + │ - resolution field tries to mirror labeler state │ 27 + ├─────────────────────────────────────────────────────────────────┤ 28 + │ models/sensitive_image.py │ 29 + │ - image_id or url, reason, flagged_at, flagged_by │ 30 + ├─────────────────────────────────────────────────────────────────┤ 31 + │ utilities/aggregations.py:73-175 │ 32 + │ - get_copyright_info() does lazy reconciliation │ 33 + │ - read path calls labeler, then WRITES to DB if resolved │ 34 + └─────────────────────────────────────────────────────────────────┘ 35 + 36 + 37 + ┌─────────────────────────────────────────────────────────────────┐ 38 + │ MODERATION SERVICE (Rust) │ 39 + ├─────────────────────────────────────────────────────────────────┤ 40 + │ /scan - calls AudD API, returns scan result │ 41 + │ /emit-label - creates ATProto label in labeler DB │ 42 + │ /admin/active-labels - returns URIs with non-negated labels │ 43 + │ /admin/* - htmx admin UI for reviewing flags │ 44 + ├─────────────────────────────────────────────────────────────────┤ 45 + │ labels table - ATProto labels with negation support │ 46 + │ label_context table - track metadata for admin UI display │ 47 + └─────────────────────────────────────────────────────────────────┘ 48 + ``` 49 + 50 + ### issue #541: ModerationClient class 51 + 52 + **problem**: 3 functions in `moderation.py` each create their own `httpx.AsyncClient`: 53 + - `_call_moderation_service()` (line 72-81) 54 + - `_emit_copyright_label()` (line 179-185) 55 + - `get_active_copyright_labels()` (line 259-268) 56 + 57 + **solution**: extract `ModerationClient` class with shared client, auth, timeout handling. could use singleton pattern like `get_docket()` or store on `app.state`. 58 + 59 + ### issue #542: lazy resolution sync 60 + 61 + **problem**: `get_copyright_info()` in `aggregations.py:73-175` does: 62 + 1. fetch scans from backend DB 63 + 2. for flagged tracks without resolution, call labeler 64 + 3. if label was negated, UPDATE the backend DB inline 65 + 66 + this means read paths do writes, adding latency and complexity. 67 + 68 + **solution**: move to docket background task that periodically syncs resolutions. read path becomes pure read. 69 + 70 + ### issue #543: dual storage source of truth 71 + 72 + **problem**: copyright flag status stored in TWO places: 73 + 1. backend `copyright_scans.resolution` field 74 + 2. moderation service labeler (negation labels) 75 + 76 + they can get out of sync, requiring reconciliation logic. 77 + 78 + **options proposed**: 79 + - A: labeler is source of truth (remove `resolution` from backend) 80 + - B: backend is source of truth (labeler just signs labels) 81 + - C: webhook sync (labeler notifies backend on changes) 82 + 83 + ### issue #544: SensitiveImage in wrong place 84 + 85 + **problem**: `SensitiveImage` model and `/moderation/sensitive-images` endpoint are in backend, but all other moderation (copyright) is in moderation service. 86 + 87 + **solution**: move to moderation service for consistency. frontend just changes the URL it fetches from. 88 + 89 + ## code references 90 + 91 + - `backend/src/backend/_internal/moderation.py:59-81` - `_call_moderation_service()` with inline httpx client 92 + - `backend/src/backend/_internal/moderation.py:134-196` - `_emit_copyright_label()` with inline httpx client 93 + - `backend/src/backend/_internal/moderation.py:199-299` - `get_active_copyright_labels()` with redis caching 94 + - `backend/src/backend/utilities/aggregations.py:73-175` - `get_copyright_info()` with lazy reconciliation 95 + - `backend/src/backend/models/copyright_scan.py:23-76` - `CopyrightScan` model with `resolution` field 96 + - `backend/src/backend/models/sensitive_image.py:11-38` - `SensitiveImage` model 97 + - `backend/src/backend/api/moderation.py:24-39` - `/moderation/sensitive-images` endpoint 98 + 99 + ## dependencies between issues 100 + 101 + ``` 102 + #541 (ModerationClient) 103 + 104 + #542 (background sync) - uses ModerationClient 105 + 106 + #543 (source of truth) - depends on sync strategy 107 + 108 + #544 (SensitiveImage) - independent, can be done anytime 109 + ``` 110 + 111 + ## recommended order 112 + 113 + 1. **#541 first** - extract ModerationClient, improves testability, no behavior change 114 + 2. **#542 next** - move lazy sync to background task using new client 115 + 3. **#543 then** - once sync is background, decide source of truth (likely option A: labeler owns resolution) 116 + 4. **#544 anytime** - independent refactor, lower priority 117 + 118 + ## open questions 119 + 120 + - should moderation service expose webhook for label changes? (would eliminate need for polling in #542) 121 + - is the 5-minute redis cache TTL for labels appropriate? (currently in settings) 122 + - does the admin UI need to stay in moderation service or could it move to main frontend `/admin` routes?