feat: add redis cache for copyright label lookups (#566)

* feat: add redis cache for copyright label lookups

the moderation service call was causing 2-3s latency spikes on GET /tracks/
in production. since we have multiple fly.io instances, in-memory caching
wouldn't work - added distributed redis cache (reusing docket's redis).

- add backend/utilities/redis.py for async client from docket URL
- cache active label status with 5min TTL (matches queue cache)
- use mget/pipeline for efficient batch operations
- invalidate cache when labels are emitted
- fail closed on errors (treat as active)

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: move cache settings to config, add async context manager

- add label_cache_prefix and label_cache_ttl_seconds to ModerationSettings
- add async_redis_client() context manager for isolated connections
- add clear_client_cache() for test cleanup
- remove hardcoded cache constants from moderation.py
- fix tests to properly clear client cache between runs

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* test: add redis isolation for xdist parallel tests

- add redis_database fixture that assigns different DB per worker
- use unique URIs in cache tests to avoid cross-test pollution
- document redis test isolation pattern in docs/testing/README.md

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: handle missing redis gracefully in test fixture

the redis_database fixture now catches ConnectionError and skips
silently when redis is unavailable. tests that don't need redis
will pass, and tests that do need it will fail with specific errors.

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: mock settings in moderation cache tests

tests that mock httpx must also mock settings.moderation to avoid
early return on auth_token check. without this, tests pass locally
(where MODERATION_AUTH_TOKEN may be set) but fail in CI.

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: use D: prefix for DOCKET_URL to allow CI override

pytest-env's D: prefix means "default" - only set if not already
set by the environment. this allows CI's DOCKET_URL=redis://localhost:6379
to take precedence over pyproject.toml's 6380 (for local docker-compose).

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: move deferred imports to top of file, add type hints

- move redis imports to module level in moderation.py
- move asyncio import to module level in redis.py
- add type hint for kwargs dict in redis.py
- move imports to module level in conftest.py
- add rule to AGENTS.md: DO NOT UNNECESSARILY DEFER IMPORTS

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

authored by zzstoatzz.io Claude and committed by GitHub 1d8942ad 3fb7e953

Changed files
+590 -36
backend
docs
testing
+2 -1
AGENTS.md
··· 17 17 * **Async Everywhere:** Never block the event loop. Use `anyio`/`aiofiles`. 18 18 * **Type Hints:** Required everywhere (Python & TypeScript). 19 19 * **Communication:** Use emojis sparingly and strictly for emphasis. 20 + * **DO NOT UNNECESSARILY DEFER IMPORTS.** Put imports at the top of the file where they belong. Deferred imports inside functions are only acceptable for avoiding circular imports - not for "lazy loading" or other reasons. 20 21 21 22 ## ๐Ÿ› ๏ธ Stack & Tooling 22 23 * **Backend:** FastAPI, Neon (Postgres), Cloudflare R2, Fly.io. ··· 55 56 โ””โ”€โ”€ STATUS.md # Living status document 56 57 ``` 57 58 58 - this file ("AGENTS.md") is symlinked to `CLAUDE.md` and `GEMINI.md` for maximal compatibility. 59 + this file ("AGENTS.md") is symlinked to `CLAUDE.md` for maximal compatibility.
-1
GEMINI.md
··· 1 - AGENTS.md
+4
backend/pyproject.toml
··· 28 28 "orjson>=3.11.4", 29 29 "mutagen>=1.47.0", 30 30 "pydocket>=0.15.2", 31 + "redis>=7.1.0", 31 32 ] 32 33 33 34 requires-python = ">=3.11" ··· 80 81 # reduce connection pool for tests to avoid exhausting Neon's connection limit 81 82 "DATABASE_POOL_SIZE=2", 82 83 "DATABASE_MAX_OVERFLOW=0", 84 + # redis URL for cache tests (uses test-redis from docker-compose) 85 + # D: prefix means don't override if already set (e.g., by CI workflow) 86 + "D:DOCKET_URL=redis://localhost:6380/0", 83 87 ] 84 88 markers = [ 85 89 "integration: marks tests as integration tests (deselect with '-m \"not integration\"')",
+93 -7
backend/src/backend/_internal/moderation.py
··· 10 10 from backend.config import settings 11 11 from backend.models import CopyrightScan, Track 12 12 from backend.utilities.database import db_session 13 + from backend.utilities.redis import get_async_redis_client 13 14 14 15 logger = logging.getLogger(__name__) 15 16 ··· 183 184 ) 184 185 response.raise_for_status() 185 186 187 + # invalidate cache since label status changed 188 + await invalidate_label_cache(uri) 189 + 186 190 logfire.info( 187 191 "copyright label emitted", 188 192 uri=uri, ··· 195 199 async def get_active_copyright_labels(uris: list[str]) -> set[str]: 196 200 """check which URIs have active (non-negated) copyright-violation labels. 197 201 198 - queries the moderation service's labeler to determine which tracks are 199 - still actively flagged. this is the source of truth for flag status. 202 + uses redis cache (shared across instances) to avoid repeated calls 203 + to the moderation service. only URIs not in cache are fetched. 200 204 201 205 args: 202 206 uris: list of AT URIs to check ··· 219 223 logger.warning("MODERATION_AUTH_TOKEN not set, treating all as active") 220 224 return set(uris) 221 225 226 + # check redis cache first - partition into cached vs uncached 227 + active_from_cache: set[str] = set() 228 + uris_to_fetch: list[str] = [] 229 + 230 + try: 231 + redis = get_async_redis_client() 232 + prefix = settings.moderation.label_cache_prefix 233 + cache_keys = [f"{prefix}{uri}" for uri in uris] 234 + cached_values = await redis.mget(cache_keys) 235 + 236 + for uri, cached_value in zip(uris, cached_values, strict=True): 237 + if cached_value is not None: 238 + if cached_value == "1": 239 + active_from_cache.add(uri) 240 + # else: cached as "0" (not active), skip 241 + else: 242 + uris_to_fetch.append(uri) 243 + except Exception as e: 244 + # redis unavailable - fall through to fetch all 245 + logger.warning("redis cache unavailable: %s", e) 246 + uris_to_fetch = list(uris) 247 + 248 + # if everything was cached, return early 249 + if not uris_to_fetch: 250 + logfire.debug( 251 + "checked active copyright labels (all cached)", 252 + total_uris=len(uris), 253 + active_count=len(active_from_cache), 254 + ) 255 + return active_from_cache 256 + 257 + # fetch uncached URIs from moderation service 222 258 try: 223 259 async with httpx.AsyncClient( 224 260 timeout=httpx.Timeout(settings.moderation.timeout_seconds) 225 261 ) as client: 226 262 response = await client.post( 227 263 f"{settings.moderation.labeler_url}/admin/active-labels", 228 - json={"uris": uris}, 264 + json={"uris": uris_to_fetch}, 229 265 headers={"X-Moderation-Key": settings.moderation.auth_token}, 230 266 ) 231 267 response.raise_for_status() 232 268 data = response.json() 233 - active = set(data.get("active_uris", [])) 234 - logfire.debug( 269 + active_from_service = set(data.get("active_uris", [])) 270 + 271 + # update redis cache with results 272 + try: 273 + redis = get_async_redis_client() 274 + prefix = settings.moderation.label_cache_prefix 275 + ttl = settings.moderation.label_cache_ttl_seconds 276 + async with redis.pipeline() as pipe: 277 + for uri in uris_to_fetch: 278 + cache_key = f"{prefix}{uri}" 279 + value = "1" if uri in active_from_service else "0" 280 + await pipe.set(cache_key, value, ex=ttl) 281 + await pipe.execute() 282 + except Exception as e: 283 + # cache update failed - not critical, just log 284 + logger.warning("failed to update redis cache: %s", e) 285 + 286 + logfire.info( 235 287 "checked active copyright labels", 236 288 total_uris=len(uris), 237 - active_count=len(active), 289 + cached_count=len(uris) - len(uris_to_fetch), 290 + fetched_count=len(uris_to_fetch), 291 + active_count=len(active_from_cache) + len(active_from_service), 238 292 ) 239 - return active 293 + return active_from_cache | active_from_service 294 + 240 295 except Exception as e: 241 296 # fail closed: if we can't confirm resolution, treat as active 297 + # don't cache failures - we want to retry next time 242 298 logger.warning("failed to check active labels, treating all as active: %s", e) 243 299 return set(uris) 300 + 301 + 302 + async def invalidate_label_cache(uri: str) -> None: 303 + """invalidate cache entry for a URI when its label status changes. 304 + 305 + call this when emitting or negating labels to ensure fresh data. 306 + """ 307 + try: 308 + redis = get_async_redis_client() 309 + prefix = settings.moderation.label_cache_prefix 310 + await redis.delete(f"{prefix}{uri}") 311 + except Exception as e: 312 + logger.warning("failed to invalidate label cache for %s: %s", uri, e) 313 + 314 + 315 + async def clear_label_cache() -> None: 316 + """clear all label cache entries. primarily for testing.""" 317 + try: 318 + redis = get_async_redis_client() 319 + prefix = settings.moderation.label_cache_prefix 320 + # scan and delete all keys with our prefix 321 + cursor = 0 322 + while True: 323 + cursor, keys = await redis.scan(cursor, match=f"{prefix}*", count=100) 324 + if keys: 325 + await redis.delete(*keys) 326 + if cursor == 0: 327 + break 328 + except Exception as e: 329 + logger.warning("failed to clear label cache: %s", e) 244 330 245 331 246 332 async def _store_scan_error(track_id: int, error: str) -> None:
+8
backend/src/backend/config.py
··· 543 543 default="https://moderation.plyr.fm", 544 544 description="URL of the ATProto labeler service for emitting labels", 545 545 ) 546 + label_cache_prefix: str = Field( 547 + default="plyr:copyright-label:", 548 + description="Redis key prefix for caching copyright label status", 549 + ) 550 + label_cache_ttl_seconds: int = Field( 551 + default=300, 552 + description="TTL in seconds for cached copyright label status (default 5 min)", 553 + ) 546 554 547 555 548 556 class DocketSettings(AppSettingsSection):
+133
backend/src/backend/utilities/redis.py
··· 1 + """redis client utilities for distributed caching. 2 + 3 + provides async redis client initialized from docket URL settings. 4 + the client is lazily created and cached per event loop. 5 + """ 6 + 7 + import asyncio 8 + import logging 9 + from collections.abc import AsyncGenerator 10 + from contextlib import asynccontextmanager 11 + from typing import Any 12 + from urllib.parse import urlparse 13 + 14 + import redis.asyncio as async_redis 15 + 16 + from backend.config import settings 17 + 18 + logger = logging.getLogger(__name__) 19 + 20 + # client cache keyed by event loop to handle multiple loops in tests 21 + _client_cache: dict[int, async_redis.Redis] = {} 22 + 23 + 24 + def _parse_redis_url(url: str) -> dict: 25 + """parse a redis URL into connection kwargs. 26 + 27 + supports: 28 + - redis://host:port/db 29 + - redis://user:password@host:port/db 30 + - rediss://... (SSL) 31 + """ 32 + parsed = urlparse(url) 33 + 34 + kwargs: dict[str, Any] = { 35 + "host": parsed.hostname or "localhost", 36 + "port": parsed.port or 6379, 37 + "db": int(parsed.path.lstrip("/") or 0), 38 + "decode_responses": True, 39 + } 40 + 41 + if parsed.username: 42 + kwargs["username"] = parsed.username 43 + if parsed.password: 44 + kwargs["password"] = parsed.password 45 + if parsed.scheme == "rediss": 46 + kwargs["ssl"] = True 47 + 48 + return kwargs 49 + 50 + 51 + def get_async_redis_client() -> async_redis.Redis: 52 + """get a cached async redis client. 53 + 54 + the client is created lazily on first call and reused. 55 + parses connection info from settings.docket.url. 56 + 57 + returns: 58 + async redis client 59 + 60 + raises: 61 + RuntimeError: if docket URL is not configured 62 + """ 63 + try: 64 + loop = asyncio.get_running_loop() 65 + loop_id = id(loop) 66 + except RuntimeError: 67 + loop_id = 0 68 + 69 + if loop_id in _client_cache: 70 + return _client_cache[loop_id] 71 + 72 + if not settings.docket.url: 73 + raise RuntimeError("docket URL not configured - cannot create redis client") 74 + 75 + kwargs = _parse_redis_url(settings.docket.url) 76 + 77 + client = async_redis.Redis( 78 + socket_connect_timeout=5.0, 79 + **kwargs, 80 + ) 81 + 82 + _client_cache[loop_id] = client 83 + logger.debug("created async redis client for loop %s", loop_id) 84 + 85 + return client 86 + 87 + 88 + @asynccontextmanager 89 + async def async_redis_client() -> AsyncGenerator[async_redis.Redis, None]: 90 + """async context manager for redis client. 91 + 92 + ensures proper cleanup when used in isolated contexts. 93 + for most cases, prefer get_async_redis_client() which caches 94 + the connection for reuse. 95 + 96 + yields: 97 + async redis client 98 + 99 + raises: 100 + RuntimeError: if docket URL is not configured 101 + """ 102 + if not settings.docket.url: 103 + raise RuntimeError("docket URL not configured - cannot create redis client") 104 + 105 + kwargs = _parse_redis_url(settings.docket.url) 106 + client = async_redis.Redis( 107 + socket_connect_timeout=5.0, 108 + **kwargs, 109 + ) 110 + 111 + try: 112 + yield client 113 + finally: 114 + await client.aclose() 115 + 116 + 117 + async def close_redis_client() -> None: 118 + """close all cached redis clients.""" 119 + try: 120 + loop = asyncio.get_running_loop() 121 + loop_id = id(loop) 122 + except RuntimeError: 123 + loop_id = 0 124 + 125 + if loop_id in _client_cache: 126 + client = _client_cache.pop(loop_id) 127 + await client.aclose() 128 + logger.debug("closed async redis client for loop %s", loop_id) 129 + 130 + 131 + def clear_client_cache() -> None: 132 + """clear the client cache. primarily for testing.""" 133 + _client_cache.clear()
+80
backend/tests/conftest.py
··· 1 1 """pytest configuration for relay tests.""" 2 2 3 + import os 3 4 from collections.abc import AsyncGenerator, Generator 4 5 from contextlib import asynccontextmanager 5 6 from datetime import UTC, datetime ··· 7 8 8 9 import asyncpg 9 10 import pytest 11 + import redis as sync_redis_lib 10 12 import sqlalchemy as sa 11 13 from fastapi import FastAPI 12 14 from fastapi.testclient import TestClient ··· 20 22 21 23 from backend.config import settings 22 24 from backend.models import Base 25 + from backend.utilities.redis import clear_client_cache 23 26 24 27 25 28 class MockStorage: ··· 369 372 """provides a TestClient for testing the FastAPI application.""" 370 373 with TestClient(fastapi_app) as tc: 371 374 yield tc 375 + 376 + 377 + def _redis_db_for_worker(worker_id: str) -> int: 378 + """determine redis database number based on xdist worker id. 379 + 380 + uses different DB numbers for each worker to isolate parallel tests: 381 + - master/gw0: db 1 382 + - gw1: db 2 383 + - gw2: db 3 384 + - etc. 385 + 386 + db 0 is reserved for local development. 387 + """ 388 + if worker_id == "master" or not worker_id: 389 + return 1 390 + if "gw" in worker_id: 391 + return 1 + int(worker_id.replace("gw", "")) 392 + return 1 393 + 394 + 395 + def _redis_url_with_db(base_url: str, db: int) -> str: 396 + """replace database number in redis URL.""" 397 + # redis://host:port/db -> redis://host:port/{new_db} 398 + if "/" in base_url.rsplit(":", 1)[-1]: 399 + # has db number, replace it 400 + base = base_url.rsplit("/", 1)[0] 401 + return f"{base}/{db}" 402 + else: 403 + # no db number, append it 404 + return f"{base_url}/{db}" 405 + 406 + 407 + @pytest.fixture(scope="session", autouse=True) 408 + def redis_database(worker_id: str) -> Generator[None, None, None]: 409 + """use isolated redis databases for parallel test execution. 410 + 411 + each xdist worker gets its own redis database to prevent cache pollution 412 + between tests running in parallel. flushes the db before and after tests. 413 + 414 + if redis is not available, silently skips - tests that actually need redis 415 + will fail on their own with a more specific error. 416 + """ 417 + # skip if no redis configured 418 + if not settings.docket.url: 419 + yield 420 + return 421 + 422 + db = _redis_db_for_worker(worker_id) 423 + new_url = _redis_url_with_db(settings.docket.url, db) 424 + 425 + # patch settings for this worker process 426 + settings.docket.url = new_url 427 + os.environ["DOCKET_URL"] = new_url 428 + 429 + # clear any cached clients (they have old URL) 430 + clear_client_cache() 431 + 432 + # try to flush db before tests - if redis unavailable, skip silently 433 + try: 434 + client = sync_redis_lib.Redis.from_url(new_url, socket_connect_timeout=1) 435 + client.flushdb() 436 + client.close() 437 + except sync_redis_lib.ConnectionError: 438 + # redis not available - tests that need it will fail with specific errors 439 + yield 440 + return 441 + 442 + yield 443 + 444 + # flush db after tests and clear cached clients 445 + clear_client_cache() 446 + try: 447 + client = sync_redis_lib.Redis.from_url(new_url, socket_connect_timeout=1) 448 + client.flushdb() 449 + client.close() 450 + except sync_redis_lib.ConnectionError: 451 + pass # redis went away during tests, nothing to clean up
+194 -27
backend/tests/test_moderation.py
··· 415 415 async def test_get_active_copyright_labels_success() -> None: 416 416 """test successful call to labeler returns active URIs.""" 417 417 uris = [ 418 - "at://did:plc:test/fm.plyr.track/1", 419 - "at://did:plc:test/fm.plyr.track/2", 420 - "at://did:plc:test/fm.plyr.track/3", 418 + "at://did:plc:success/fm.plyr.track/1", 419 + "at://did:plc:success/fm.plyr.track/2", 420 + "at://did:plc:success/fm.plyr.track/3", 421 421 ] 422 422 423 423 mock_response = Mock() 424 424 mock_response.json.return_value = { 425 - "active_uris": ["at://did:plc:test/fm.plyr.track/1"] # only one is active 425 + "active_uris": [uris[0]] # only first is active 426 426 } 427 427 mock_response.raise_for_status.return_value = None 428 428 429 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 430 - mock_post.return_value = mock_response 429 + with patch("backend._internal.moderation.settings") as mock_settings: 430 + mock_settings.moderation.enabled = True 431 + mock_settings.moderation.auth_token = "test-token" 432 + mock_settings.moderation.labeler_url = "https://test.example.com" 433 + mock_settings.moderation.timeout_seconds = 30 434 + mock_settings.moderation.label_cache_prefix = "test:label:" 435 + mock_settings.moderation.label_cache_ttl_seconds = 300 431 436 432 - with patch("backend._internal.moderation.settings") as mock_settings: 433 - mock_settings.moderation.enabled = True 434 - mock_settings.moderation.auth_token = "test-token" 435 - mock_settings.moderation.labeler_url = "https://labeler.example.com" 436 - mock_settings.moderation.timeout_seconds = 10 437 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 438 + mock_post.return_value = mock_response 437 439 438 440 result = await get_active_copyright_labels(uris) 439 441 440 - # only the active URI should be in the result 441 - assert result == {"at://did:plc:test/fm.plyr.track/1"} 442 + # only the active URI should be in the result 443 + assert result == {uris[0]} 442 444 443 - # verify correct endpoint was called 444 - call_kwargs = mock_post.call_args 445 - assert "/admin/active-labels" in str(call_kwargs) 446 - assert call_kwargs.kwargs["json"] == {"uris": uris} 445 + # verify correct endpoint was called 446 + call_kwargs = mock_post.call_args 447 + assert "/admin/active-labels" in str(call_kwargs) 448 + assert call_kwargs.kwargs["json"] == {"uris": uris} 447 449 448 450 449 451 async def test_get_active_copyright_labels_service_error() -> None: 450 452 """test that service errors return all URIs as active (fail closed).""" 451 - uris = ["at://did:plc:test/fm.plyr.track/1", "at://did:plc:test/fm.plyr.track/2"] 453 + uris = [ 454 + "at://did:plc:error/fm.plyr.track/1", 455 + "at://did:plc:error/fm.plyr.track/2", 456 + ] 452 457 453 - with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 454 - mock_post.side_effect = httpx.ConnectError("connection failed") 458 + with patch("backend._internal.moderation.settings") as mock_settings: 459 + mock_settings.moderation.enabled = True 460 + mock_settings.moderation.auth_token = "test-token" 461 + mock_settings.moderation.labeler_url = "https://test.example.com" 462 + mock_settings.moderation.timeout_seconds = 30 463 + mock_settings.moderation.label_cache_prefix = "test:label:" 464 + mock_settings.moderation.label_cache_ttl_seconds = 300 455 465 456 - with patch("backend._internal.moderation.settings") as mock_settings: 457 - mock_settings.moderation.enabled = True 458 - mock_settings.moderation.auth_token = "test-token" 459 - mock_settings.moderation.labeler_url = "https://labeler.example.com" 460 - mock_settings.moderation.timeout_seconds = 10 466 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 467 + mock_post.side_effect = httpx.ConnectError("connection failed") 461 468 462 469 result = await get_active_copyright_labels(uris) 463 470 464 - # should fail closed - all URIs treated as active 465 - assert result == set(uris) 471 + # should fail closed - all URIs treated as active 472 + assert result == set(uris) 473 + 474 + 475 + # tests for active labels caching (using real redis from test docker-compose) 476 + 477 + 478 + async def test_get_active_copyright_labels_caching() -> None: 479 + """test that repeated calls use cache instead of calling service.""" 480 + uris = [ 481 + "at://did:plc:caching/fm.plyr.track/1", 482 + "at://did:plc:caching/fm.plyr.track/2", 483 + ] 484 + 485 + mock_response = Mock() 486 + mock_response.json.return_value = { 487 + "active_uris": [uris[0]] # only first is active 488 + } 489 + mock_response.raise_for_status.return_value = None 490 + 491 + with patch("backend._internal.moderation.settings") as mock_settings: 492 + mock_settings.moderation.enabled = True 493 + mock_settings.moderation.auth_token = "test-token" 494 + mock_settings.moderation.labeler_url = "https://test.example.com" 495 + mock_settings.moderation.timeout_seconds = 30 496 + mock_settings.moderation.label_cache_prefix = "test:label:" 497 + mock_settings.moderation.label_cache_ttl_seconds = 300 498 + 499 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 500 + mock_post.return_value = mock_response 501 + 502 + # first call - should hit service 503 + result1 = await get_active_copyright_labels(uris) 504 + assert result1 == {uris[0]} 505 + assert mock_post.call_count == 1 506 + 507 + # second call with same URIs - should use cache, not call service 508 + result2 = await get_active_copyright_labels(uris) 509 + assert result2 == {uris[0]} 510 + assert mock_post.call_count == 1 # still 1, no new call 511 + 512 + 513 + async def test_get_active_copyright_labels_partial_cache() -> None: 514 + """test that cache hits are combined with service calls for new URIs.""" 515 + uris_batch1 = ["at://did:plc:partial/fm.plyr.track/1"] 516 + uris_batch2 = [ 517 + "at://did:plc:partial/fm.plyr.track/1", # cached 518 + "at://did:plc:partial/fm.plyr.track/2", # new 519 + ] 520 + 521 + mock_response1 = Mock() 522 + mock_response1.json.return_value = { 523 + "active_uris": ["at://did:plc:partial/fm.plyr.track/1"] 524 + } 525 + mock_response1.raise_for_status.return_value = None 526 + 527 + mock_response2 = Mock() 528 + mock_response2.json.return_value = { 529 + "active_uris": [] # uri/2 is not active 530 + } 531 + mock_response2.raise_for_status.return_value = None 532 + 533 + with patch("backend._internal.moderation.settings") as mock_settings: 534 + mock_settings.moderation.enabled = True 535 + mock_settings.moderation.auth_token = "test-token" 536 + mock_settings.moderation.labeler_url = "https://test.example.com" 537 + mock_settings.moderation.timeout_seconds = 30 538 + mock_settings.moderation.label_cache_prefix = "test:label:" 539 + mock_settings.moderation.label_cache_ttl_seconds = 300 540 + 541 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 542 + mock_post.side_effect = [mock_response1, mock_response2] 543 + 544 + # first call - cache uri/1 as active 545 + result1 = await get_active_copyright_labels(uris_batch1) 546 + assert result1 == {"at://did:plc:partial/fm.plyr.track/1"} 547 + assert mock_post.call_count == 1 548 + 549 + # second call - uri/1 from cache, only uri/2 fetched 550 + result2 = await get_active_copyright_labels(uris_batch2) 551 + # uri/1 is active (from cache), uri/2 is not active (from service) 552 + assert result2 == {"at://did:plc:partial/fm.plyr.track/1"} 553 + assert mock_post.call_count == 2 554 + 555 + # verify second call only requested uri/2 556 + second_call_args = mock_post.call_args_list[1] 557 + assert second_call_args.kwargs["json"] == { 558 + "uris": ["at://did:plc:partial/fm.plyr.track/2"] 559 + } 560 + 561 + 562 + async def test_get_active_copyright_labels_cache_invalidation() -> None: 563 + """test that invalidate_label_cache clears specific entry.""" 564 + from backend._internal.moderation import invalidate_label_cache 565 + 566 + uris = ["at://did:plc:invalidate/fm.plyr.track/1"] 567 + 568 + mock_response = Mock() 569 + mock_response.json.return_value = { 570 + "active_uris": ["at://did:plc:invalidate/fm.plyr.track/1"] 571 + } 572 + mock_response.raise_for_status.return_value = None 573 + 574 + with patch("backend._internal.moderation.settings") as mock_settings: 575 + mock_settings.moderation.enabled = True 576 + mock_settings.moderation.auth_token = "test-token" 577 + mock_settings.moderation.labeler_url = "https://test.example.com" 578 + mock_settings.moderation.timeout_seconds = 30 579 + mock_settings.moderation.label_cache_prefix = "test:label:" 580 + mock_settings.moderation.label_cache_ttl_seconds = 300 581 + 582 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 583 + mock_post.return_value = mock_response 584 + 585 + # first call - populate cache 586 + result1 = await get_active_copyright_labels(uris) 587 + assert result1 == {"at://did:plc:invalidate/fm.plyr.track/1"} 588 + assert mock_post.call_count == 1 589 + 590 + # invalidate the cache entry 591 + await invalidate_label_cache("at://did:plc:invalidate/fm.plyr.track/1") 592 + 593 + # next call - should hit service again since cache was invalidated 594 + result2 = await get_active_copyright_labels(uris) 595 + assert result2 == {"at://did:plc:invalidate/fm.plyr.track/1"} 596 + assert mock_post.call_count == 2 597 + 598 + 599 + async def test_service_error_does_not_cache() -> None: 600 + """test that service errors don't pollute the cache.""" 601 + # use unique URIs for this test to avoid cache pollution from other tests 602 + uris = ["at://did:plc:errnocache/fm.plyr.track/1"] 603 + 604 + mock_success_response = Mock() 605 + mock_success_response.json.return_value = {"active_uris": []} 606 + mock_success_response.raise_for_status.return_value = None 607 + 608 + with patch("backend._internal.moderation.settings") as mock_settings: 609 + mock_settings.moderation.enabled = True 610 + mock_settings.moderation.auth_token = "test-token" 611 + mock_settings.moderation.labeler_url = "https://test.example.com" 612 + mock_settings.moderation.timeout_seconds = 30 613 + mock_settings.moderation.label_cache_prefix = "test:label:" 614 + mock_settings.moderation.label_cache_ttl_seconds = 300 615 + 616 + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: 617 + # first call fails 618 + mock_post.side_effect = httpx.ConnectError("connection failed") 619 + 620 + # first call - fails, returns all URIs as active (fail closed) 621 + result1 = await get_active_copyright_labels(uris) 622 + assert result1 == set(uris) 623 + assert mock_post.call_count == 1 624 + 625 + # reset mock to succeed 626 + mock_post.side_effect = None 627 + mock_post.return_value = mock_success_response 628 + 629 + # second call - should try service again (error wasn't cached) 630 + result2 = await get_active_copyright_labels(uris) 631 + assert result2 == set() # now correctly shows not active 632 + assert mock_post.call_count == 2
+2
backend/uv.lock
··· 334 334 { name = "python-dotenv" }, 335 335 { name = "python-jose", extra = ["cryptography"] }, 336 336 { name = "python-multipart" }, 337 + { name = "redis" }, 337 338 { name = "slowapi" }, 338 339 { name = "sqlalchemy" }, 339 340 { name = "uvicorn", extra = ["standard"] }, ··· 379 380 { name = "python-dotenv", specifier = ">=1.1.0" }, 380 381 { name = "python-jose", extras = ["cryptography"], specifier = ">=3.3.0" }, 381 382 { name = "python-multipart", specifier = ">=0.0.20" }, 383 + { name = "redis", specifier = ">=7.1.0" }, 382 384 { name = "slowapi", git = "https://github.com/zzstoatzz/slowapi.git?rev=fix-deprecation" }, 383 385 { name = "sqlalchemy", specifier = ">=2.0.36" }, 384 386 { name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0" },
+74
docs/testing/README.md
··· 159 159 - some ORMs behave differently in uncommitted transactions 160 160 161 161 delete-by-timestamp gives us real commits while staying fast. 162 + 163 + ## redis isolation for parallel tests 164 + 165 + tests that use redis (caching, background tasks) need isolation between xdist workers. without isolation, one worker's cache entries pollute another's tests. 166 + 167 + ### how it works 168 + 169 + each xdist worker uses a different redis database number: 170 + 171 + | worker | redis db | 172 + |--------|----------| 173 + | master/gw0 | 1 | 174 + | gw1 | 2 | 175 + | gw2 | 3 | 176 + | ... | ... | 177 + 178 + db 0 is reserved for local development. 179 + 180 + ### the redis_database fixture 181 + 182 + ```python 183 + @pytest.fixture(scope="session", autouse=True) 184 + def redis_database(worker_id: str) -> Generator[None, None, None]: 185 + """use isolated redis databases for parallel test execution.""" 186 + db = _redis_db_for_worker(worker_id) 187 + new_url = _redis_url_with_db(settings.docket.url, db) 188 + 189 + # patch settings for this worker process 190 + settings.docket.url = new_url 191 + os.environ["DOCKET_URL"] = new_url 192 + clear_client_cache() 193 + 194 + # flush db before tests 195 + sync_redis = redis.Redis.from_url(new_url) 196 + sync_redis.flushdb() 197 + sync_redis.close() 198 + 199 + yield 200 + 201 + # flush after tests 202 + ... 203 + ``` 204 + 205 + this fixture is `autouse=True` so it applies to all tests automatically. 206 + 207 + ### common pitfall: unique URIs in cache tests 208 + 209 + even with per-worker database isolation, tests within the same worker share redis state. if multiple tests use the same cache keys, they can interfere with each other. 210 + 211 + **wrong**: 212 + ```python 213 + async def test_caching_first(): 214 + uris = ["at://did:plc:test/fm.plyr.track/1"] # generic URI 215 + result = await get_active_copyright_labels(uris) 216 + # caches the result 217 + 218 + async def test_caching_second(): 219 + uris = ["at://did:plc:test/fm.plyr.track/1"] # same URI! 220 + result = await get_active_copyright_labels(uris) 221 + # gets cached value from first test - may fail unexpectedly 222 + ``` 223 + 224 + **right**: 225 + ```python 226 + async def test_caching_first(): 227 + uris = ["at://did:plc:first/fm.plyr.track/1"] # unique to this test 228 + ... 229 + 230 + async def test_caching_second(): 231 + uris = ["at://did:plc:second/fm.plyr.track/1"] # different URI 232 + ... 233 + ``` 234 + 235 + use unique identifiers (test name, uuid, etc.) in cache keys to avoid cross-test pollution.