feat: wire up copyright scan integration with moderation service (#382)

* feat: wire up copyright scan integration with moderation service

- add ModerationSettings to config (service URL, auth token, timeout)
- create moderation client module to call /scan endpoint
- hook into upload flow with fire-and-forget background task
- scan runs after successful track commit, doesn't affect upload status
- add 8 tests covering success, timeout, disabled, and error scenarios

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: add missing logfire[sqlalchemy] dep to copyright scan script

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: store scan errors as clear results instead of leaving tracks unscanned

when moderation service can't process a file (too short, bad format, etc.),
we now store a clear result with error info in raw_response so the track
isn't stuck in limbo forever.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

authored by zzstoatzz.io Claude and committed by GitHub 434fa34f 222975aa

Changed files
+467
backend
src
backend
_internal
api
tracks
tests
scripts
+134
backend/src/backend/_internal/moderation.py
··· 1 + """moderation service client for copyright scanning.""" 2 + 3 + import logging 4 + from typing import Any 5 + 6 + import httpx 7 + import logfire 8 + 9 + from backend.config import settings 10 + from backend.models import CopyrightScan 11 + from backend.utilities.database import db_session 12 + 13 + logger = logging.getLogger(__name__) 14 + 15 + 16 + async def scan_track_for_copyright(track_id: int, audio_url: str) -> None: 17 + """scan a track for potential copyright matches. 18 + 19 + this runs as a fire-and-forget background task. failures are logged 20 + but do not affect the upload flow. 21 + 22 + if the scan fails (e.g., audio too short, unreadable format), we store 23 + a "clear" result with the error info so the track isn't stuck unscanned. 24 + 25 + args: 26 + track_id: database ID of the track to scan 27 + audio_url: public URL of the audio file (R2) 28 + """ 29 + if not settings.moderation.enabled: 30 + logger.debug("moderation disabled, skipping copyright scan") 31 + return 32 + 33 + if not settings.moderation.auth_token: 34 + logger.warning("MODERATION_AUTH_TOKEN not set, skipping copyright scan") 35 + return 36 + 37 + with logfire.span( 38 + "copyright scan", 39 + track_id=track_id, 40 + audio_url=audio_url, 41 + ): 42 + try: 43 + result = await _call_moderation_service(audio_url) 44 + await _store_scan_result(track_id, result) 45 + except Exception as e: 46 + logger.warning( 47 + "copyright scan failed for track %d: %s - storing as clear", 48 + track_id, 49 + e, 50 + ) 51 + # store as "clear" with error info so track doesn't stay unscanned 52 + # this handles cases like: audio too short, unreadable format, etc. 53 + await _store_scan_error(track_id, str(e)) 54 + # don't re-raise - this is fire-and-forget 55 + 56 + 57 + async def _call_moderation_service(audio_url: str) -> dict[str, Any]: 58 + """call the moderation service /scan endpoint. 59 + 60 + args: 61 + audio_url: public URL of the audio file 62 + 63 + returns: 64 + scan result from moderation service 65 + 66 + raises: 67 + httpx.HTTPStatusError: on non-2xx response 68 + httpx.TimeoutException: on timeout 69 + """ 70 + async with httpx.AsyncClient( 71 + timeout=httpx.Timeout(settings.moderation.timeout_seconds) 72 + ) as client: 73 + response = await client.post( 74 + f"{settings.moderation.service_url}/scan", 75 + json={"audio_url": audio_url}, 76 + headers={"X-Moderation-Key": settings.moderation.auth_token}, 77 + ) 78 + response.raise_for_status() 79 + return response.json() 80 + 81 + 82 + async def _store_scan_result(track_id: int, result: dict[str, Any]) -> None: 83 + """store scan result in the database. 84 + 85 + args: 86 + track_id: database ID of the track 87 + result: scan result from moderation service 88 + """ 89 + async with db_session() as db: 90 + scan = CopyrightScan( 91 + track_id=track_id, 92 + is_flagged=result.get("is_flagged", False), 93 + highest_score=result.get("highest_score", 0), 94 + matches=result.get("matches", []), 95 + raw_response=result.get("raw_response", {}), 96 + ) 97 + db.add(scan) 98 + await db.commit() 99 + 100 + logfire.info( 101 + "copyright scan stored", 102 + track_id=track_id, 103 + is_flagged=scan.is_flagged, 104 + highest_score=scan.highest_score, 105 + match_count=len(scan.matches), 106 + ) 107 + 108 + 109 + async def _store_scan_error(track_id: int, error: str) -> None: 110 + """store a scan error as a clear result. 111 + 112 + when the moderation service can't process a file (too short, bad format, etc.), 113 + we still want to record that we tried so the track isn't stuck in limbo. 114 + 115 + args: 116 + track_id: database ID of the track 117 + error: error message from the failed scan 118 + """ 119 + async with db_session() as db: 120 + scan = CopyrightScan( 121 + track_id=track_id, 122 + is_flagged=False, 123 + highest_score=0, 124 + matches=[], 125 + raw_response={"error": error, "status": "scan_failed"}, 126 + ) 127 + db.add(scan) 128 + await db.commit() 129 + 130 + logfire.info( 131 + "copyright scan error stored as clear", 132 + track_id=track_id, 133 + error=error, 134 + )
+10
backend/src/backend/api/tracks/uploads.py
··· 29 29 from backend._internal.audio import AudioFormat 30 30 from backend._internal.image import ImageFormat 31 31 from backend._internal.jobs import job_service 32 + from backend._internal.moderation import scan_track_for_copyright 32 33 from backend.config import settings 33 34 from backend.models import Artist, Track 34 35 from backend.models.job import JobStatus, JobType ··· 345 346 except Exception as e: 346 347 logger.warning( 347 348 f"failed to send notification for track {track.id}: {e}" 349 + ) 350 + 351 + # kick off copyright scan in background (fire-and-forget) 352 + # this runs independently and doesn't affect the upload result 353 + if r2_url: 354 + # intentionally not storing reference - scan failures are logged 355 + # but shouldn't affect the upload result 356 + asyncio.create_task( # noqa: RUF006 357 + scan_track_for_copyright(track.id, r2_url) 348 358 ) 349 359 350 360 await job_service.update_progress(
+32
backend/src/backend/config.py
··· 373 373 ) 374 374 375 375 376 + class ModerationSettings(AppSettingsSection): 377 + """Moderation service configuration.""" 378 + 379 + model_config = SettingsConfigDict( 380 + env_prefix="MODERATION_", 381 + env_file=".env", 382 + case_sensitive=False, 383 + extra="ignore", 384 + ) 385 + 386 + enabled: bool = Field( 387 + default=True, 388 + description="Enable copyright scanning on upload", 389 + ) 390 + service_url: str = Field( 391 + default="https://plyr-moderation.fly.dev", 392 + description="URL of the moderation service", 393 + ) 394 + auth_token: str = Field( 395 + default="", 396 + description="Auth token for moderation service (X-Moderation-Key header)", 397 + ) 398 + timeout_seconds: int = Field( 399 + default=300, 400 + description="Timeout for moderation service requests", 401 + ) 402 + 403 + 376 404 class RateLimitSettings(AppSettingsSection): 377 405 """Rate limiting configuration.""" 378 406 ··· 449 477 notify: NotificationSettings = Field( 450 478 default_factory=NotificationSettings, 451 479 description="Notification settings", 480 + ) 481 + moderation: ModerationSettings = Field( 482 + default_factory=ModerationSettings, 483 + description="Moderation service settings", 452 484 ) 453 485 454 486
+290
backend/tests/test_moderation.py
··· 1 + """tests for copyright moderation integration.""" 2 + 3 + from unittest.mock import AsyncMock, Mock, patch 4 + 5 + import httpx 6 + import pytest 7 + from sqlalchemy import select 8 + from sqlalchemy.ext.asyncio import AsyncSession 9 + 10 + from backend._internal.moderation import ( 11 + _call_moderation_service, 12 + _store_scan_result, 13 + scan_track_for_copyright, 14 + ) 15 + from backend.models import Artist, CopyrightScan, Track 16 + 17 + 18 + @pytest.fixture 19 + def mock_moderation_response() -> dict: 20 + """typical response from moderation service.""" 21 + return { 22 + "matches": [ 23 + { 24 + "artist": "Test Artist", 25 + "title": "Test Song", 26 + "score": 85, 27 + "isrc": "USRC12345678", 28 + } 29 + ], 30 + "is_flagged": True, 31 + "highest_score": 85, 32 + "raw_response": {"status": "success", "result": []}, 33 + } 34 + 35 + 36 + @pytest.fixture 37 + def mock_clear_response() -> dict: 38 + """response when no copyright matches found.""" 39 + return { 40 + "matches": [], 41 + "is_flagged": False, 42 + "highest_score": 0, 43 + "raw_response": {"status": "success", "result": None}, 44 + } 45 + 46 + 47 + async def test_call_moderation_service_success( 48 + mock_moderation_response: dict, 49 + ) -> None: 50 + """test successful call to moderation service.""" 51 + # use regular Mock for response since httpx Response methods are sync 52 + mock_response = Mock() 53 + mock_response.json.return_value = mock_moderation_response 54 + mock_response.raise_for_status.return_value = None 55 + 56 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 57 + mock_post.return_value = mock_response 58 + 59 + with patch("backend._internal.moderation.settings") as mock_settings: 60 + mock_settings.moderation.service_url = "https://test.example.com" 61 + mock_settings.moderation.auth_token = "test-token" 62 + mock_settings.moderation.timeout_seconds = 30 63 + 64 + result = await _call_moderation_service("https://example.com/audio.mp3") 65 + 66 + assert result == mock_moderation_response 67 + mock_post.assert_called_once() 68 + call_kwargs = mock_post.call_args 69 + assert call_kwargs.kwargs["json"] == {"audio_url": "https://example.com/audio.mp3"} 70 + assert call_kwargs.kwargs["headers"] == {"X-Moderation-Key": "test-token"} 71 + 72 + 73 + async def test_call_moderation_service_timeout() -> None: 74 + """test timeout handling.""" 75 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 76 + mock_post.side_effect = httpx.TimeoutException("timeout") 77 + 78 + with patch("backend._internal.moderation.settings") as mock_settings: 79 + mock_settings.moderation.service_url = "https://test.example.com" 80 + mock_settings.moderation.auth_token = "test-token" 81 + mock_settings.moderation.timeout_seconds = 30 82 + 83 + with pytest.raises(httpx.TimeoutException): 84 + await _call_moderation_service("https://example.com/audio.mp3") 85 + 86 + 87 + async def test_store_scan_result_flagged( 88 + db_session: AsyncSession, 89 + mock_moderation_response: dict, 90 + ) -> None: 91 + """test storing a flagged scan result.""" 92 + # create test artist and track 93 + artist = Artist( 94 + did="did:plc:test123", 95 + handle="test.bsky.social", 96 + display_name="Test User", 97 + ) 98 + db_session.add(artist) 99 + await db_session.commit() 100 + 101 + track = Track( 102 + title="Test Track", 103 + file_id="test_file_123", 104 + file_type="mp3", 105 + artist_did=artist.did, 106 + r2_url="https://example.com/audio.mp3", 107 + ) 108 + db_session.add(track) 109 + await db_session.commit() 110 + 111 + await _store_scan_result(track.id, mock_moderation_response) 112 + 113 + # verify scan was stored 114 + result = await db_session.execute( 115 + select(CopyrightScan).where(CopyrightScan.track_id == track.id) 116 + ) 117 + scan = result.scalar_one() 118 + 119 + assert scan.is_flagged is True 120 + assert scan.highest_score == 85 121 + assert len(scan.matches) == 1 122 + assert scan.matches[0]["artist"] == "Test Artist" 123 + 124 + 125 + async def test_store_scan_result_clear( 126 + db_session: AsyncSession, 127 + mock_clear_response: dict, 128 + ) -> None: 129 + """test storing a clear (no matches) scan result.""" 130 + # create test artist and track 131 + artist = Artist( 132 + did="did:plc:test456", 133 + handle="clear.bsky.social", 134 + display_name="Clear User", 135 + ) 136 + db_session.add(artist) 137 + await db_session.commit() 138 + 139 + track = Track( 140 + title="Original Track", 141 + file_id="original_file_456", 142 + file_type="wav", 143 + artist_did=artist.did, 144 + r2_url="https://example.com/original.wav", 145 + ) 146 + db_session.add(track) 147 + await db_session.commit() 148 + 149 + await _store_scan_result(track.id, mock_clear_response) 150 + 151 + # verify scan was stored 152 + result = await db_session.execute( 153 + select(CopyrightScan).where(CopyrightScan.track_id == track.id) 154 + ) 155 + scan = result.scalar_one() 156 + 157 + assert scan.is_flagged is False 158 + assert scan.highest_score == 0 159 + assert scan.matches == [] 160 + 161 + 162 + async def test_scan_track_disabled() -> None: 163 + """test that scanning is skipped when disabled.""" 164 + with patch("backend._internal.moderation.settings") as mock_settings: 165 + mock_settings.moderation.enabled = False 166 + 167 + with patch( 168 + "backend._internal.moderation._call_moderation_service" 169 + ) as mock_call: 170 + await scan_track_for_copyright(1, "https://example.com/audio.mp3") 171 + 172 + # should not call the service when disabled 173 + mock_call.assert_not_called() 174 + 175 + 176 + async def test_scan_track_no_auth_token() -> None: 177 + """test that scanning is skipped when auth token not configured.""" 178 + with patch("backend._internal.moderation.settings") as mock_settings: 179 + mock_settings.moderation.enabled = True 180 + mock_settings.moderation.auth_token = "" 181 + 182 + with patch( 183 + "backend._internal.moderation._call_moderation_service" 184 + ) as mock_call: 185 + await scan_track_for_copyright(1, "https://example.com/audio.mp3") 186 + 187 + # should not call the service without auth token 188 + mock_call.assert_not_called() 189 + 190 + 191 + async def test_scan_track_service_error_stores_as_clear( 192 + db_session: AsyncSession, 193 + ) -> None: 194 + """test that service errors are stored as clear results.""" 195 + # create test artist and track 196 + artist = Artist( 197 + did="did:plc:errortest", 198 + handle="errortest.bsky.social", 199 + display_name="Error Test User", 200 + ) 201 + db_session.add(artist) 202 + await db_session.commit() 203 + 204 + track = Track( 205 + title="Error Test Track", 206 + file_id="error_test_file", 207 + file_type="mp3", 208 + artist_did=artist.did, 209 + r2_url="https://example.com/short.mp3", 210 + ) 211 + db_session.add(track) 212 + await db_session.commit() 213 + 214 + with patch("backend._internal.moderation.settings") as mock_settings: 215 + mock_settings.moderation.enabled = True 216 + mock_settings.moderation.auth_token = "test-token" 217 + 218 + with patch( 219 + "backend._internal.moderation._call_moderation_service", 220 + new_callable=AsyncMock, 221 + ) as mock_call: 222 + mock_call.side_effect = httpx.HTTPStatusError( 223 + "502 error", 224 + request=AsyncMock(), 225 + response=AsyncMock(status_code=502), 226 + ) 227 + 228 + # should not raise - stores error as clear 229 + await scan_track_for_copyright(track.id, "https://example.com/short.mp3") 230 + 231 + # verify scan was stored as clear with error info 232 + result = await db_session.execute( 233 + select(CopyrightScan).where(CopyrightScan.track_id == track.id) 234 + ) 235 + scan = result.scalar_one() 236 + 237 + assert scan.is_flagged is False 238 + assert scan.highest_score == 0 239 + assert scan.matches == [] 240 + assert "error" in scan.raw_response 241 + assert scan.raw_response["status"] == "scan_failed" 242 + 243 + 244 + async def test_scan_track_full_flow( 245 + db_session: AsyncSession, 246 + mock_moderation_response: dict, 247 + ) -> None: 248 + """test complete scan flow from track to stored result.""" 249 + # create test artist and track 250 + artist = Artist( 251 + did="did:plc:fullflow", 252 + handle="fullflow.bsky.social", 253 + display_name="Full Flow User", 254 + ) 255 + db_session.add(artist) 256 + await db_session.commit() 257 + 258 + track = Track( 259 + title="Full Flow Track", 260 + file_id="fullflow_file", 261 + file_type="flac", 262 + artist_did=artist.did, 263 + r2_url="https://example.com/fullflow.flac", 264 + ) 265 + db_session.add(track) 266 + await db_session.commit() 267 + 268 + with patch("backend._internal.moderation.settings") as mock_settings: 269 + mock_settings.moderation.enabled = True 270 + mock_settings.moderation.auth_token = "test-token" 271 + mock_settings.moderation.service_url = "https://test.example.com" 272 + mock_settings.moderation.timeout_seconds = 30 273 + 274 + with patch( 275 + "backend._internal.moderation._call_moderation_service", 276 + new_callable=AsyncMock, 277 + ) as mock_call: 278 + mock_call.return_value = mock_moderation_response 279 + 280 + assert track.r2_url is not None 281 + await scan_track_for_copyright(track.id, track.r2_url) 282 + 283 + # verify scan was stored (need fresh session query) 284 + result = await db_session.execute( 285 + select(CopyrightScan).where(CopyrightScan.track_id == track.id) 286 + ) 287 + scan = result.scalar_one() 288 + 289 + assert scan.is_flagged is True 290 + assert scan.highest_score == 85
+1
scripts/scan_tracks_copyright.py
··· 6 6 # "pydantic-settings", 7 7 # "sqlalchemy[asyncio]", 8 8 # "psycopg[binary]", 9 + # "logfire[sqlalchemy]", 9 10 # ] 10 11 # /// 11 12 """scan all tracks for copyright using the moderation service.