A rust implementation of skywatch-phash

fix: Address critical production issues from code audit

## Critical Issues Fixed

1. **Fix panic risk in rate limiter**
- Removed unsafe unwrap() in RateLimiter::new()
- Now returns Result with proper error handling
- Prevents service crashes from invalid rate limit config

2. **Fix Redis connection leak in worker pool**
- Worker was creating new Redis client per job (extremely wasteful)
- Refactored to create connection once at worker startup
- Reuse connection for all jobs processed by worker
- Eliminates unnecessary connection overhead

3. **Add configurable blob download timeouts**
- Added per-attempt timeout (default: 10s per format/endpoint)
- Added total timeout across all fallbacks (default: 30s)
- Prevents 90s+ hangs when CDN is slow or unavailable
- Configurable via BLOB_DOWNLOAD_TIMEOUT_SECS and BLOB_TOTAL_TIMEOUT_SECS

4. **Add retry logic for Ozone API calls**
- Implements exponential backoff (100ms → 5s cap)
- Retries up to 3 times on transient errors (5xx, timeouts)
- Detects non-transient errors (4xx) and fails immediately
- Prevents data loss due to temporary API unavailability

5. **Add comprehensive llms.txt documentation**
- Created 48KB documentation for entire codebase
- Covers architecture, design decisions, critical paths
- Documents all external integrations and error handling
- Includes debugging tips and deployment notes

## Configuration Changes

New environment variables:
- BLOB_DOWNLOAD_TIMEOUT_SECS (default: 10)
- BLOB_TOTAL_TIMEOUT_SECS (default: 30)

## What's Left

The following critical improvements remain for full production readiness:
- Circuit breaker pattern for cascading failure prevention
- Redis connection failure backoff and recovery
- Complete test coverage (<5% currently)

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>

Skywatch 40bb3842 c36259f8

Changed files
+1611 -35
src
config
moderation
processor
queue
+1490
llms.txt
··· 1 + # skywatch-phash-rs: Comprehensive Codebase Overview 2 + 3 + ## Project Summary 4 + 5 + **skywatch-phash-rs** is a high-performance, real-time perceptual hash-based image moderation service for Bluesky/ATProto. It monitors Bluesky's Jetstream firehose for posts with images, computes perceptual hashes (aHash), matches them against a configurable set of known harmful image hashes, and automatically applies moderation actions (labels and reports) through the Ozone moderation system. 6 + 7 + **Technology Stack:** 8 + - Language: Rust (Edition 2024) 9 + - Async Runtime: Tokio 10 + - ATProto Client: Jacquard (internal crate from sibling directory) 11 + - Message Queue: Redis (async, with connection pooling) 12 + - Image Processing: image crate + image_hasher (aHash/average hash algorithm) 13 + - Error Handling: miette (diagnostic errors) + thiserror 14 + - Logging: tracing + tracing-subscriber (structured, JSON-capable) 15 + 16 + **Current Version:** 0.2.0 17 + **License:** MIT 18 + 19 + --- 20 + 21 + ## Project Purpose & Key Features 22 + 23 + ### Core Functionality 24 + 1. **Real-time Jetstream Subscription**: Subscribes to Bluesky's firehose, filtering only posts with embedded images 25 + 2. **Perceptual Hash Computation**: Computes 64-bit average hash (aHash) for each image blob (CID) 26 + 3. **Hamming Distance Matching**: Compares computed hashes against configurable rules using Hamming distance thresholds 27 + 4. **Automated Moderation**: Takes configured actions on matches: 28 + - Apply labels to posts and/or accounts 29 + - File reports to posts and/or accounts 30 + - Takedown posts and/or accounts (future capability) 31 + 5. **Resilient Job Processing**: Redis-backed job queue with retry logic and dead-letter handling 32 + 6. **Phash Caching**: Caches computed hashes in Redis to reduce redundant work on viral images 33 + 7. **Deduplication**: Redis-backed claim tracking prevents duplicate moderation actions within 7-day windows 34 + 8. **Metrics & Observability**: Lock-free atomic counters track jobs, blobs, matches, cache performance, and moderation actions 35 + 9. **Graceful Shutdown**: Saves cursor position and logs final metrics on exit 36 + 37 + ### Key Non-Features (Explicitly Handled) 38 + - Does NOT block Jetstream ingestion on processing delays (decoupled via queue) 39 + - Does NOT lose jobs mid-processing (Redis persistence) 40 + - Does NOT duplicate moderation actions (Redis claims + Ozone verification) 41 + - Does NOT require external authentication servers (Jacquard handles session management) 42 + 43 + --- 44 + 45 + ## Directory Structure 46 + 47 + ``` 48 + skywatch-phash-rs/ 49 + ├── src/ 50 + │ ├── main.rs # Entry point: orchestrates Jetstream, queue, workers, metrics 51 + │ ├── lib.rs # Module exports 52 + │ │ 53 + │ ├── types/mod.rs # Core data structures 54 + │ │ ├── BlobCheck # Rule definition with phashes, threshold, actions 55 + │ │ ├── BlobReference # Image blob CID + optional MIME type 56 + │ │ ├── ImageJob # Post metadata + blobs for processing 57 + │ │ └── MatchResult # Result of phash matching with distance 58 + │ │ 59 + │ ├── config/mod.rs # Environment variable configuration (required & optional) 60 + │ │ 61 + │ ├── jetstream/ 62 + │ │ ├── mod.rs # JetstreamClient: WebSocket subscriber with retry/failover 63 + │ │ ├── events.rs # Event extraction (blob parsing from post records) 64 + │ │ └── cursor.rs # Cursor persistence (firehose_cursor.db) 65 + │ │ 66 + │ ├── processor/ 67 + │ │ ├── mod.rs # Module exports 68 + │ │ ├── phash.rs # Perceptual hash computation (aHash 8x8 -> 16 hex chars) 69 + │ │ └── matcher.rs # Blob check matching, blob download, job processing 70 + │ │ 71 + │ ├── queue/ 72 + │ │ ├── redis_queue.rs # Redis job queue (pending/processing/dead-letter) 73 + │ │ └── worker.rs # Worker pool: job dequeue, process, retry, take moderation actions 74 + │ │ 75 + │ ├── cache/mod.rs # Redis phash cache (get_or_compute pattern) 76 + │ │ 77 + │ ├── moderation/ 78 + │ │ ├── mod.rs # Module exports 79 + │ │ ├── post.rs # Post label/report actions (future: takedown) 80 + │ │ ├── account.rs # Account label/report/comment actions (future: takedown) 81 + │ │ ├── claims.rs # Redis claim tracking for deduplication 82 + │ │ ├── helpers.rs # Shared moderation logic 83 + │ │ └── rate_limiter.rs # Rate limit tracking (respect Ozone API limits) 84 + │ │ 85 + │ ├── agent/ 86 + │ │ ├── mod.rs # Module exports 87 + │ │ └── session.rs # AgentSession: authenticated Jacquard client wrapper 88 + │ │ 89 + │ ├── plc/mod.rs # PLC Directory client with endpoint failover 90 + │ │ 91 + │ └── metrics/mod.rs # Metrics collector: lock-free atomic counters 92 + 93 + ├── rules/ 94 + │ └── blobs.json # BlobCheck rule definitions (loaded at startup) 95 + 96 + ├── Cargo.toml # Dependencies and build configuration 97 + ├── README.md # User-facing documentation 98 + ├── ARCHITECTURE.md # Deep architecture guide (TypeScript context, not perfectly aligned with Rust) 99 + └── CLAUDE.md # Project-specific guidelines for Claude Code 100 + 101 + ``` 102 + 103 + --- 104 + 105 + ## Core Module Details 106 + 107 + ### 1. main.rs (Entry Point, ~285 lines) 108 + 109 + **Responsibilities:** 110 + - Load configuration from environment variables 111 + - Authenticate with Bluesky/Ozone via Jacquard 112 + - Initialize Redis queue, cache, metrics 113 + - Load blob check rules from `rules/blobs.json` 114 + - Start Jetstream subscriber with auto-retry and failover 115 + - Spawn job receiver task (Jetstream events -> Redis queue) 116 + - Spawn worker pool (N concurrent workers processing jobs) 117 + - Spawn metrics logger (logs stats every 60 seconds) 118 + - Handle graceful shutdown (Ctrl+C, SIGTERM) 119 + 120 + **Key Design Patterns:** 121 + - Jetstream connection with exponential backoff (retry_delay up to max_retry_delay_secs) 122 + - URL rotation: primary Jetstream URL + fallback URLs 123 + - Job channel (mpsc) decouples Jetstream from Redis for resilience 124 + - Worker pool runs as N independent futures (not tokio::spawn to avoid HRTB issues) 125 + - `tokio::select!` for clean multi-task orchestration 126 + 127 + **Critical Code Sections:** 128 + - Lines 88-93: Cursor loading and resumption 129 + - Lines 98-158: Jetstream retry loop with exponential backoff 130 + - Lines 160-188: Job receiver task (Jetstream -> Redis) 131 + - Lines 190-216: Worker pool initialization 132 + - Lines 233-262: Shutdown coordination with `tokio::select!` 133 + 134 + **Dependencies:** 135 + - tokio (async runtime, channels, signals) 136 + - tracing (structured logging) 137 + - miette (error context and diagnostics) 138 + 139 + --- 140 + 141 + ### 2. types/mod.rs (Data Structures, ~138 lines) 142 + 143 + **Core Types:** 144 + 145 + ```rust 146 + BlobCheck { 147 + phashes: Vec<CowStr<'static>>, // Known bad image hashes (16 hex chars each) 148 + label: CowStr<'static>, // Label to apply (e.g., "troll", "spam") 149 + comment: CowStr<'static>, // Report comment with context 150 + report_acct: bool, // Report the account? 151 + label_acct: bool, // Label the account? 152 + report_post: bool, // Report the post? 153 + to_label: bool, // Label the post? 154 + takedown_post: bool, // Takedown post? (default: false) 155 + takedown_acct: bool, // Takedown account? (default: false) 156 + hamming_threshold: Option<u32>, // Per-rule threshold (overrides default) 157 + description: Option<CowStr<'static>>, // Internal documentation 158 + ignore_did: Option<Vec<Did<'static>>>, // DIDs to exempt from this rule 159 + } 160 + 161 + BlobReference { 162 + cid: Cid<'static>, // Content ID of the blob 163 + mime_type: Option<CowStr<'static>>, // Optional MIME type (may be missing) 164 + } 165 + 166 + ImageJob { 167 + post_uri: AtUri<'static>, // "at://did/app.bsky.feed.post/rkey" 168 + post_cid: Cid<'static>, // Post commit CID 169 + post_did: Did<'static>, // Author DID 170 + blobs: Vec<BlobReference>, // Embedded images 171 + timestamp: i64, // Job creation time (millis) 172 + attempts: u32, // Retry counter 173 + } 174 + 175 + MatchResult { 176 + phash: CowStr<'static>, // Computed hash 177 + matched_check: BlobCheck, // Matching rule 178 + matched_phash: CowStr<'static>, // Matched rule's phash 179 + hamming_distance: u32, // Distance (0-64) 180 + } 181 + ``` 182 + 183 + **Design Notes:** 184 + - Uses `CowStr<'static>` from Jacquard for zero-copy serialization (owned in all contexts) 185 + - Custom deserializers handle Jacquard's type conversions (Did, Cid, AtUri) 186 + - `ignore_did` field uses serde alias "ignoreDID" for JSON compatibility 187 + - All deserialized values converted to 'static lifetime for cross-thread safety 188 + 189 + --- 190 + 191 + ### 3. config/mod.rs (Configuration, ~236 lines) 192 + 193 + **Configuration Hierarchy:** 194 + 195 + ```rust 196 + Config { 197 + jetstream: JetstreamConfig { 198 + url: String, // Primary Jetstream WebSocket URL 199 + fallback_urls: Vec<String>, // Fallback URLs for resilience 200 + wanted_collections: Vec<String>, // Always ["app.bsky.feed.post"] 201 + cursor_update_interval: u64, // Save cursor every N millis (default: 10000) 202 + retry_delay_secs: u64, // Initial backoff (default: 5) 203 + max_retry_delay_secs: u64, // Max backoff (default: 300) 204 + }, 205 + redis: RedisConfig { 206 + url: String, // Redis connection string 207 + }, 208 + processing: ProcessingConfig { 209 + concurrency: usize, // Number of worker threads (default: 10) 210 + retry_attempts: u32, // Max retries per job (default: 3) 211 + retry_delay: u64, // Delay between retries in ms (default: 1000) 212 + }, 213 + cache: CacheConfig { 214 + enabled: bool, // Enable phash caching? (default: true) 215 + ttl: u64, // Cache TTL in seconds (default: 86400 = 24h) 216 + }, 217 + pds: PdsConfig { 218 + endpoint: String, // PDS endpoint for blob fetch (default: https://bsky.social) 219 + }, 220 + plc: PlcConfig { 221 + endpoint: String, // PLC Directory for DID resolution 222 + fallback_endpoints: Vec<String>, // Fallback PLC endpoints 223 + }, 224 + automod: AutomodConfig { 225 + handle: String, // Automod account handle (REQUIRED) 226 + password: String, // App password (REQUIRED) 227 + }, 228 + ozone: OzoneConfig { 229 + url: String, // Ozone URL (REQUIRED, for context only) 230 + pds: String, // Ozone PDS endpoint (REQUIRED, used in auth) 231 + }, 232 + moderation: ModerationConfig { 233 + labeler_did: String, // Labeler DID (REQUIRED) 234 + rate_limit: u64, // Rate limit delay in ms (default: 100) 235 + }, 236 + phash: PhashConfig { 237 + default_hamming_threshold: u32, // Default hamming threshold (default: 3) 238 + }, 239 + } 240 + ``` 241 + 242 + **Environment Variables:** 243 + 244 + **Required:** 245 + - `AUTOMOD_HANDLE` - Bluesky handle for labeler account 246 + - `AUTOMOD_PASSWORD` - App password (NOT user password) 247 + - `LABELER_DID` - DID of labeler account 248 + - `OZONE_URL` - Ozone service URL 249 + - `OZONE_PDS` - Ozone PDS endpoint (for agent initialization) 250 + 251 + **Optional (with defaults):** 252 + - `JETSTREAM_URL` (default: "wss://jetstream.atproto.tools/subscribe") 253 + - `JETSTREAM_FALLBACK_URLS` (comma-separated, default: fire.hose.cam URLs) 254 + - `REDIS_URL` (default: "redis://localhost:6379") 255 + - `PLC_ENDPOINT` (default: "https://plc.directory") 256 + - `PDS_ENDPOINT` (default: "https://bsky.social") 257 + - `PROCESSING_CONCURRENCY` (default: 10) 258 + - `RETRY_ATTEMPTS` (default: 3) 259 + - `RETRY_DELAY_MS` (default: 1000) 260 + - `CACHE_ENABLED` (default: true) 261 + - `CACHE_TTL_SECONDS` (default: 86400) 262 + - `PHASH_HAMMING_THRESHOLD` (default: 3) 263 + - `RATE_LIMIT_MS` (default: 100) 264 + 265 + **Parsing Helpers:** 266 + - `get_env()` - Required env var or default 267 + - `get_env_u32/u64/usize()` - Parse numeric with default 268 + - `get_env_bool()` - Parse "true", "1", "yes" (case-insensitive) 269 + - `get_env_list()` - Parse comma-separated list 270 + 271 + **Load Flow:** 272 + 1. Call `dotenvy::dotenv().ok()` to load `.env` (if exists) 273 + 2. Parse all config sections in `Config::from_env()` 274 + 3. Return miette::Result with diagnostic context on error 275 + 276 + --- 277 + 278 + ### 4. jetstream/mod.rs (WebSocket Subscriber, ~234 lines) 279 + 280 + **JetstreamClient Structure:** 281 + 282 + ```rust 283 + pub struct JetstreamClient { 284 + url: Url, 285 + cursor: Option<i64>, // Microsecond timestamp from last processed event 286 + } 287 + ``` 288 + 289 + **Core Method: `subscribe()`** 290 + - Connects to Jetstream WebSocket using Jacquard's `TungsteniteSubscriptionClient` 291 + - Configures subscription for "app.bsky.feed.post" creates only 292 + - Runs main message loop with multi-task coordination: 293 + - Message timeout: 120 seconds (kill connection if no data) 294 + - Heartbeat: Log every 30 seconds 295 + - Cursor update: Save every 10 seconds 296 + - Shutdown: Graceful exit on broadcast signal 297 + - Extracts cursor (time_us) from each message for resumption 298 + - Calls `process_message()` for each commit event 299 + - Returns `Ok(())` on graceful shutdown, `Err` on connection failure 300 + 301 + **Message Processing: `process_message()`** 302 + - Filters: Only processes `Commit` messages with `Create` operation 303 + - Skips: Non-post collections, updates/deletes, posts without records 304 + - Extracts blobs: Calls `events::extract_blobs_from_record(record_data)` 305 + - Creates ImageJob: post_uri, post_cid, post_did, blobs, timestamp, attempts=0 306 + - Sends to job channel for queueing 307 + - Silently skips if job channel closed (receiver stopped) 308 + 309 + **Error Handling:** 310 + - WebSocket errors: Break loop to allow reconnection 311 + - Message parse errors: Log and continue 312 + - Job send errors: Warn but continue (graceful handling) 313 + 314 + **Dependencies:** 315 + - tokio (channels, time, select) 316 + - jacquard_common (Jetstream types, WebSocket client) 317 + - tracing (logging) 318 + 319 + --- 320 + 321 + ### 5. jetstream/cursor.rs (Cursor Persistence, ~42 lines) 322 + 323 + **Purpose:** Persist Jetstream cursor to disk for recovery after restart 324 + 325 + **Key Functions:** 326 + ```rust 327 + pub fn read_cursor() -> Option<i64> 328 + // Reads from "firehose_cursor.db" 329 + // Returns None if file missing or unparseable 330 + // Logs info on success, warns on error 331 + 332 + pub fn write_cursor(cursor: i64) -> Result<()> 333 + // Writes cursor to "firehose_cursor.db" 334 + // Overwrites existing file 335 + // Returns miette::Result 336 + ``` 337 + 338 + **Cursor Semantics:** 339 + - Value is **microsecond timestamp** (NOT millisecond) 340 + - Obtained from `JetstreamMessage.time_us` field 341 + - Used to resume subscription at exact position on restart 342 + - Prevents reprocessing of same posts 343 + 344 + **File Path:** `./firehose_cursor.db` (relative to working directory) 345 + 346 + **Integration:** 347 + - main.rs: Lines 88-93 load cursor before Jetstream connection 348 + - jetstream/mod.rs: Lines 118-125 save cursor every 10 seconds 349 + - jetstream/mod.rs: Lines 132-139 save final cursor on shutdown 350 + 351 + --- 352 + 353 + ### 6. processor/phash.rs (Hash Computation, ~156 lines) 354 + 355 + **Algorithm: Average Hash (aHash) with 8x8 Grid** 356 + 357 + ```rust 358 + pub fn compute_phash(image_bytes: &[u8]) -> Result<String, PhashError> 359 + // 1. Load image from bytes using image crate 360 + // 2. Call compute_phash_from_image() 361 + // 3. Return 16-character hex string 362 + 363 + pub fn compute_phash_from_image(img: &DynamicImage) -> Result<String, PhashError> 364 + // 1. Configure hasher: HashAlg::Mean (average), size 8x8 365 + // 2. Compute hash via image_hasher 366 + // 3. Convert to hex string via hash_to_hex() 367 + // 4. Validate length (must be exactly 16 chars) 368 + 369 + fn hash_to_hex(hash: &ImageHash) -> Result<String, PhashError> 370 + // Convert 8 bytes to 16 hex characters (lowercase) 371 + // Each byte -> 2 hex digits 372 + // Format: "deadbeefdeadbeef" 373 + ``` 374 + 375 + **Hamming Distance Calculation:** 376 + 377 + ```rust 378 + pub fn hamming_distance(hash1: &str, hash2: &str) -> Result<u32, PhashError> 379 + // 1. Validate both hashes are 16 hex chars 380 + // 2. Parse as u64 from base 16 381 + // 3. XOR to find differing bits 382 + // 4. Count set bits using Brian Kernighan's algorithm: 383 + // while n > 0: count++; n &= n - 1 384 + // 5. Return count (range 0-64) 385 + ``` 386 + 387 + **Test Coverage:** 388 + - Identical hashes: distance = 0 389 + - Completely different: distance = 64 390 + - Single bit difference: distance = 1 391 + - Invalid format validation: length, hex parsing 392 + - Phash format validation: output is 16 hex chars 393 + 394 + **Design Notes:** 395 + - Uses image_hasher crate for robust image decoding 396 + - HashAlg::Mean = average hash (matching TypeScript version) 397 + - Size 8x8 = 64-bit hash (exactly 16 hex chars) 398 + - No normalization or preprocessing (raw pixels) 399 + - Deterministic: same image always produces same hash 400 + 401 + --- 402 + 403 + ### 7. processor/matcher.rs (Rule Matching & Blob Processing, ~299 lines) 404 + 405 + **Key Functions:** 406 + 407 + ```rust 408 + pub async fn load_blob_checks(path: &Path) -> Result<Vec<BlobCheck>> 409 + // Load rules from JSON file 410 + // Deserialize into Vec<BlobCheck> 411 + // Log count on success 412 + 413 + pub async fn download_blob(client: &Client, config: &Config, did: &str, cid: &str) -> Result<Vec<u8>> 414 + // Try CDN first: https://cdn.bsky.app/img/feed_fullsize/plain/{did}/{cid}@{format} 415 + // Try formats: jpeg, png, webp (in order) 416 + // Fall back to PDS if CDN fails 417 + // PDS URL: {pds_endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid} 418 + // Return raw bytes 419 + 420 + pub fn match_phash(phash: &str, blob_checks: &[BlobCheck], did: &str, default_threshold: u32) -> Option<MatchResult> 421 + // For each BlobCheck: 422 + // - Skip if did in ignore_did list 423 + // - Get threshold (per-check or default) 424 + // - For each check's phash: 425 + // - Compute hamming_distance 426 + // - If distance <= threshold, return MatchResult (first match wins) 427 + // Return None if no match 428 + 429 + pub async fn process_blob(client: &Client, config: &Config, blob_checks: &[BlobCheck], did: &str, blob: &BlobReference) -> Result<Option<MatchResult>> 430 + // 1. Download blob bytes 431 + // 2. Compute phash 432 + // 3. Match against checks 433 + // 4. Return Option<MatchResult> 434 + 435 + pub async fn process_image_job(client: &Client, config: &Config, blob_checks: &[BlobCheck], job: &ImageJob) -> Result<Vec<MatchResult>> 436 + // For each blob in job: 437 + // - Call process_blob() 438 + // - Collect matches (skip errors, continue) 439 + // Return all matches found 440 + ``` 441 + 442 + **Test Coverage:** 443 + - Exact phash match: distance = 0 444 + - Within threshold: distance <= threshold 445 + - Exceeds threshold: distance > threshold, no match 446 + - Ignored DIDs: skipped entirely 447 + - Real rules loading: verifies rules/blobs.json format 448 + 449 + **Design Notes:** 450 + - First match wins (no combining multiple rules) 451 + - Missing MIME type is accepted (optional field) 452 + - Errors in blob processing don't stop job (continue to next blob) 453 + - CDN as primary path (faster, reduces PDS load) 454 + - PDS fallback for unavailable CDN images 455 + 456 + --- 457 + 458 + ### 8. queue/redis_queue.rs (Job Persistence, ~150 lines) 459 + 460 + **Redis Queue Structure:** 461 + 462 + ``` 463 + PENDING_QUEUE: "jobs:pending" -> List (FIFO for new jobs) 464 + PROCESSING_QUEUE: "jobs:processing" -> List (active jobs, not used in current impl) 465 + DEAD_LETTER_QUEUE: "jobs:dead" -> List (jobs exhausted retries) 466 + ``` 467 + 468 + **JobQueue Implementation:** 469 + 470 + ```rust 471 + pub struct JobQueue { 472 + redis: redis::aio::MultiplexedConnection, 473 + max_retries: u32, 474 + } 475 + 476 + pub async fn new(config: &Config) -> Result<Self> 477 + // Connect to Redis via multiplexed connection 478 + // Load max_retries from config 479 + 480 + pub async fn push(&mut self, job: &ImageJob) -> Result<()> 481 + // Serialize job to JSON 482 + // RPUSH to PENDING_QUEUE 483 + 484 + pub async fn pop(&mut self, timeout_secs: usize) -> Result<Option<ImageJob>> 485 + // BLPOP from PENDING_QUEUE with timeout 486 + // Deserialize JSON to ImageJob 487 + // Return Option 488 + 489 + pub async fn retry(&mut self, mut job: ImageJob) -> Result<()> 490 + // Increment attempts 491 + // If attempts >= max_retries: 492 + // - Move to dead-letter via move_to_dead_letter() 493 + // Else: 494 + // - Re-push to pending 495 + 496 + pub async fn move_to_dead_letter(&mut self, job: &ImageJob) -> Result<()> 497 + // Serialize and RPUSH to DEAD_LETTER_QUEUE 498 + 499 + pub async fn stats(&mut self) -> Result<QueueStats> 500 + // Return lengths of all three queues 501 + ``` 502 + 503 + **State Machine:** 504 + ``` 505 + Job created -> RPUSH to PENDING 506 + Worker BLPOP from PENDING 507 + Worker processes -> success: discarded 508 + Worker processes -> error: retry() 509 + if attempts < max: RPUSH to PENDING 510 + if attempts >= max: RPUSH to DEAD_LETTER 511 + ``` 512 + 513 + **Design Notes:** 514 + - Multiplexed connection for async concurrency 515 + - No PROCESSING_QUEUE used (could be added for transactional safety) 516 + - JSON serialization (human-readable, debuggable) 517 + - Timeout on BLPOP (1 second default in worker) 518 + - Dead-letter for observability (can inspect failed jobs) 519 + 520 + --- 521 + 522 + ### 9. queue/worker.rs (Job Processing, ~200+ lines) 523 + 524 + **Worker Pool Architecture:** 525 + 526 + ```rust 527 + pub struct WorkerPool { 528 + config: Config, 529 + client: Client, 530 + agent: AgentSession, 531 + blob_checks: Vec<BlobCheck>, 532 + metrics: Metrics, 533 + rate_limiter: RateLimiter, 534 + } 535 + 536 + pub fn new(...) -> Self 537 + // Create single worker pool instance 538 + // Config, HTTP client, agent, rules all cloned for sharing 539 + 540 + pub async fn start(&self, mut queue: JobQueue, mut cache: PhashCache, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> 541 + // Main worker loop 542 + // Each worker runs independently (tokio::select!) 543 + ``` 544 + 545 + **Processing Loop (per worker):** 546 + ``` 547 + 1. Select from: 548 + - Shutdown signal -> break 549 + - Queue pop (1 second timeout) -> if Some(job): 550 + a. For each blob in job: 551 + - Check cache (get_or_compute pattern) 552 + - If cache miss: download + phash 553 + - Match against rules 554 + - If match: execute moderation actions 555 + b. Track metrics (blobs, matches, etc) 556 + c. On success: remove from queue (implicit) 557 + d. On error: call retry() 558 + 2. Continue loop 559 + ``` 560 + 561 + **Moderation Action Execution:** 562 + ``` 563 + For each match found: 564 + - If to_label: create post label (with claim check) 565 + - If report_post: create post report 566 + - If label_acct: create account label (with claim check) 567 + - If report_acct: create account report 568 + - If takedown_post: takedown post (future) 569 + - If takedown_acct: takedown account (future) 570 + ``` 571 + 572 + **Rate Limiting Integration:** 573 + - RateLimiter wraps moderation actions 574 + - Enforces delay between actions (config.moderation.rate_limit) 575 + - Prevents overwhelming Ozone API 576 + 577 + **Error Handling:** 578 + - Blob download error: retry job 579 + - Phash computation error: retry job 580 + - Moderation action error: log and continue (don't retry) 581 + - Queue error: continue to next iteration 582 + 583 + **Design Notes:** 584 + - Each worker owns its own queue and cache connections 585 + - No lock contention (workers are independent) 586 + - Shutdown via broadcast receiver (all workers stop together) 587 + - Redis client created per select! iteration (necessary for multiplexed connection reuse) 588 + - Metrics are thread-safe (Arc<AtomicU64>) 589 + 590 + --- 591 + 592 + ### 10. cache/mod.rs (Phash Caching, ~120 lines) 593 + 594 + **Redis Phash Cache:** 595 + 596 + ``` 597 + Key Pattern: "phash:{cid}" 598 + Value: hex hash string (16 chars) 599 + TTL: config.cache.ttl (default: 86400 = 24 hours) 600 + ``` 601 + 602 + **PhashCache Structure:** 603 + 604 + ```rust 605 + pub struct PhashCache { 606 + redis: redis::aio::MultiplexedConnection, 607 + ttl: u64, 608 + enabled: bool, 609 + } 610 + 611 + pub async fn new(config: &Config) -> Result<Self> 612 + // Connect to Redis 613 + // Store ttl and enabled flag 614 + 615 + pub async fn get(&mut self, cid: &str) -> Result<Option<String>> 616 + // If !enabled: return Ok(None) 617 + // GET from "phash:{cid}" 618 + // Log cache hit/miss 619 + 620 + pub async fn set(&mut self, cid: &str, phash: &str) -> Result<()> 621 + // If !enabled: return Ok(()) 622 + // SET with EX (expire time) 623 + // Log cached entry 624 + 625 + pub async fn delete(&mut self, cid: &str) -> Result<()> 626 + // If !enabled: return Ok(()) 627 + // DEL "phash:{cid}" 628 + 629 + pub fn is_enabled(&self) -> bool 630 + // Return enabled flag 631 + 632 + pub async fn get_or_compute<F, Fut>(&mut self, cid: &str, compute_fn: F) -> Result<String> 633 + // Check cache 634 + // If hit: return 635 + // If miss: call compute_fn() 636 + // Set cache with result 637 + // Return result 638 + ``` 639 + 640 + **Performance Characteristics:** 641 + - Cache hit rate: 20-40% typical (viral images) 642 + - TTL: 24 hours (prevents stale hashes) 643 + - Fallback: If disabled, computed fresh each time 644 + - No memory limit (Redis-managed) 645 + 646 + **Design Notes:** 647 + - Optional feature (config.cache.enabled = true/false) 648 + - Zero-copy: strings passed by reference 649 + - Fail-open: cache errors logged but don't break processing 650 + - get_or_compute() pattern reduces boilerplate in workers 651 + 652 + --- 653 + 654 + ### 11. moderation/ (Action Execution) 655 + 656 + **Module Structure:** 657 + 658 + ``` 659 + moderation/ 660 + ├── mod.rs # Exports 661 + ├── post.rs # Post label/report actions 662 + ├── account.rs # Account label/report/comment actions 663 + ├── claims.rs # Redis deduplication claims 664 + ├── rate_limiter.rs # Rate limit enforcement 665 + └── helpers.rs # Shared utilities 666 + ``` 667 + 668 + **Post Actions: post.rs** 669 + 670 + ```rust 671 + pub async fn create_post_label( 672 + agent: &Arc<Agent<MemoryCredentialSession>>, 673 + labeler_did: &str, 674 + post_uri: &str, 675 + post_cid: &str, 676 + label: &str, 677 + comment: &str, 678 + phash: &str, 679 + distance: u32, 680 + ) -> Result<()> 681 + // Emit mod event via Ozone: 682 + // - $type: "tools.ozone.moderation.defs#modEventLabel" 683 + // - subject: strongRef {uri, cid} 684 + // - createLabelVals: [label] 685 + // - comment: "{timestamp}: {comment} at {uri} with phash \"{phash}\" (distance={distance})" 686 + 687 + pub async fn create_post_report( 688 + agent: &Arc<Agent<MemoryCredentialSession>>, 689 + labeler_did: &str, 690 + post_uri: &str, 691 + post_cid: &str, 692 + label: &str, 693 + comment: &str, 694 + phash: &str, 695 + ) -> Result<()> 696 + // Emit mod event via Ozone: 697 + // - $type: "tools.ozone.moderation.defs#modEventReport" 698 + // - subject: strongRef {uri, cid} 699 + // - reportType: "com.atproto.moderation.defs#reasonOther" 700 + // - comment: "{timestamp}: {comment} at {uri} with phash \"{phash}\"" 701 + ``` 702 + 703 + **Account Actions: account.rs** 704 + 705 + ```rust 706 + pub async fn create_account_label( 707 + agent: &Arc<Agent<MemoryCredentialSession>>, 708 + labeler_did: &str, 709 + did: &str, 710 + label: &str, 711 + comment: &str, 712 + ) -> Result<()> 713 + // Emit mod event: 714 + // - $type: "tools.ozone.moderation.defs#modEventLabel" 715 + // - subject: repoRef {did} 716 + // - createLabelVals: [label] 717 + // - comment: "{timestamp}: {comment} for account {did}" 718 + 719 + pub async fn create_account_report( 720 + agent: &Arc<Agent<MemoryCredentialSession>>, 721 + labeler_did: &str, 722 + did: &str, 723 + label: &str, 724 + comment: &str, 725 + ) -> Result<()> 726 + // Emit mod event: 727 + // - $type: "tools.ozone.moderation.defs#modEventReport" 728 + // - subject: repoRef {did} 729 + // - reportType: "com.atproto.moderation.defs#reasonOther" 730 + // - comment: "{timestamp}: {comment} for account {did}" 731 + 732 + pub async fn create_account_comment( 733 + agent: &Arc<Agent<MemoryCredentialSession>>, 734 + labeler_did: &str, 735 + did: &str, 736 + comment: &str, 737 + ) -> Result<()> 738 + // Emit mod event: 739 + // - $type: "tools.ozone.moderation.defs#modEventComment" 740 + // - subject: repoRef {did} 741 + // - comment: "{timestamp}: {comment}" 742 + ``` 743 + 744 + **Claims: claims.rs (Deduplication)** 745 + 746 + ```rust 747 + pub async fn try_claim_post_label( 748 + redis: &mut redis::aio::MultiplexedConnection, 749 + uri: &str, 750 + label: &str, 751 + ) -> Result<bool> 752 + // Key: "claim:post:label:{uri}:{label}" 753 + // SET {key} "1" NX EX 604800 (7 days) 754 + // Return true if SET succeeded (claim acquired) 755 + 756 + pub async fn try_claim_account_label( 757 + redis: &mut redis::aio::MultiplexedConnection, 758 + did: &str, 759 + label: &str, 760 + ) -> Result<bool> 761 + // Key: "claim:account:label:{did}:{label}" 762 + // SET {key} "1" NX EX 604800 763 + // Return true if SET succeeded 764 + 765 + pub async fn has_been_claimed_recently( 766 + redis: &mut redis::aio::MultiplexedConnection, 767 + claim_key: &str, 768 + ) -> Result<bool> 769 + // GET claim_key 770 + // Return true if exists 771 + ``` 772 + 773 + **Rate Limiter: rate_limiter.rs** 774 + 775 + ```rust 776 + pub struct RateLimiter { 777 + delay_ms: u64, 778 + last_action: Arc<Mutex<Instant>>, 779 + } 780 + 781 + pub async fn limit<F, Fut>(&self, action: F) -> Result<Fut::Output> 782 + where 783 + F: FnOnce() -> Fut, 784 + Fut: Future, 785 + // Wait if needed to enforce rate limit 786 + // Execute action 787 + // Update last_action timestamp 788 + ``` 789 + 790 + --- 791 + 792 + ### 12. metrics/mod.rs (Observability, ~353 lines) 793 + 794 + **Metrics Tracker:** 795 + 796 + ```rust 797 + pub struct Metrics { 798 + inner: Arc<MetricsInner>, // All fields are Arc<AtomicU64> 799 + } 800 + 801 + struct MetricsInner { 802 + // Jobs 803 + jobs_received: AtomicU64, // From Jetstream 804 + jobs_processed: AtomicU64, // Completed (success or fail) 805 + jobs_failed: AtomicU64, // Failed at all retries 806 + jobs_retried: AtomicU64, // Retried (attempts > 0) 807 + 808 + // Blobs 809 + blobs_processed: AtomicU64, // Hashed (success) 810 + blobs_downloaded: AtomicU64, // Downloaded from CDN/PDS 811 + 812 + // Matches 813 + matches_found: AtomicU64, // Phashes matched rules 814 + 815 + // Cache 816 + cache_hits: AtomicU64, // Cached phash used 817 + cache_misses: AtomicU64, // Phash not cached, computed 818 + 819 + // Moderation 820 + posts_labeled: AtomicU64, // Post labels created 821 + posts_reported: AtomicU64, // Post reports created 822 + accounts_labeled: AtomicU64, // Account labels created 823 + accounts_reported: AtomicU64, // Account reports created 824 + 825 + // Skipped (deduplication) 826 + posts_already_labeled: AtomicU64, 827 + posts_already_reported: AtomicU64, 828 + accounts_already_labeled: AtomicU64, 829 + accounts_already_reported: AtomicU64, 830 + } 831 + ``` 832 + 833 + **Key Methods:** 834 + - `inc_*()`: Atomic increment 835 + - Getters: Load current value 836 + - `log_stats()`: Log all metrics (called every 60 seconds + on shutdown) 837 + - `cache_hit_rate()`: Calculate percentage 838 + - `snapshot()`: Immutable snapshot for reporting 839 + 840 + **Lock-Free Design:** 841 + - All operations use `AtomicU64` with `Ordering::Relaxed` 842 + - No mutexes (no contention on increments) 843 + - Multiple workers can update simultaneously 844 + - Consistent snapshot possible via `.snapshot()` 845 + 846 + **Logged Every 60 Seconds:** 847 + ``` 848 + Jobs: received=X, processed=Y, failed=Z, retried=W 849 + Blobs: processed=X, downloaded=Y 850 + Matches: found=X 851 + Cache: hits=X, misses=Y, hit_rate=Z% 852 + Moderation: posts_labeled=X, posts_reported=Y, accounts_labeled=Z, accounts_reported=W 853 + Skipped (deduplication): posts_already_labeled=X, posts_already_reported=Y, accounts_already_labeled=Z, accounts_already_reported=W 854 + ``` 855 + 856 + --- 857 + 858 + ### 13. agent/session.rs (Authentication) 859 + 860 + **AgentSession Wrapper:** 861 + 862 + ```rust 863 + pub struct AgentSession { 864 + agent: Arc<Agent<MemoryCredentialSession>>, // Jacquard client 865 + did: Arc<str>, // Authenticated DID 866 + } 867 + 868 + pub async fn new(config: &Config) -> Result<Self> 869 + // Create MemoryCredentialSession::authenticated() 870 + // Pass handle, password, ozone.pds 871 + // Extract did and create Arc<Agent> 872 + // Return AgentSession 873 + 874 + pub fn agent(&self) -> &Arc<Agent<MemoryCredentialSession>> 875 + // Get reference to Jacquard agent 876 + 877 + pub fn did(&self) -> &str 878 + // Get authenticated DID 879 + ``` 880 + 881 + **Session Management:** 882 + - Jacquard handles token refresh internally 883 + - MemoryCredentialSession stores tokens in memory (no file I/O) 884 + - No manual token refresh needed (transparent) 885 + - Credentials passed once at initialization 886 + 887 + **Thread Safety:** 888 + - Agent wrapped in Arc (shareable across threads) 889 + - All internal types use 'static lifetime 890 + - Clone is cheap (Arc clone) 891 + 892 + --- 893 + 894 + ### 14. plc/mod.rs (DID Resolution, ~130 lines) 895 + 896 + **PLC Directory Client with Failover:** 897 + 898 + ```rust 899 + pub struct PlcClient { 900 + client: Client, 901 + endpoints: Vec<String>, // Primary + fallbacks 902 + } 903 + 904 + pub fn new(client: Client, config: &PlcConfig) -> Self 905 + // Combine primary + fallback endpoints 906 + // Store as vector for round-robin 907 + 908 + pub async fn resolve_did(&self, did: &str) -> Result<DidDocument> 909 + // For each endpoint: 910 + // - GET {endpoint}/{did} 911 + // - Parse JSON to DidDocument 912 + // - On success: return (log fallback usage if idx > 0) 913 + // - On error: continue to next endpoint 914 + // If all fail: return error with last error 915 + 916 + pub async fn get_pds_endpoint(&self, did: &str) -> Result<String> 917 + // Call resolve_did() 918 + // Find service with type "AtprotoPersonalDataServer" 919 + // Return serviceEndpoint URL 920 + // Error if not found 921 + ``` 922 + 923 + **DidDocument Structure:** 924 + 925 + ```rust 926 + pub struct DidDocument { 927 + pub id: String, 928 + pub also_known_as: Vec<String>, 929 + pub service: Vec<ServiceEndpoint>, 930 + } 931 + 932 + pub struct ServiceEndpoint { 933 + pub id: String, 934 + pub service_type: String, 935 + pub service_endpoint: String, 936 + } 937 + ``` 938 + 939 + **Design Notes:** 940 + - Automatic failover on network/parsing errors 941 + - Logs when fallback succeeds (operational visibility) 942 + - Used in future versions for DID -> PDS resolution 943 + - Currently not used in main processing (PDS endpoint from config) 944 + 945 + --- 946 + 947 + ### 15. rules/blobs.json (Rule Configuration) 948 + 949 + **Current Rules (4 rules):** 950 + 951 + ```json 952 + [ 953 + { 954 + "phashes": ["07870707...", "d9794408...", ...], 955 + "label": "troll", 956 + "comment": "Image is used in harassment campaign", 957 + "reportAcct": false, 958 + "labelAcct": true, 959 + "reportPost": false, 960 + "toLabel": true, 961 + "hammingThreshold": 1, 962 + "description": "Will Stancil Harassment Memes", 963 + "ignoreDID": ["did:plc:7umvpuxe2vbrc3zrzuquzniu"] 964 + }, 965 + { 966 + "phashes": ["00fffd7c...", "ffbf8f83...", ...], 967 + "label": "maga-trump", 968 + "comment": "Pro-trump imagery", 969 + "reportAcct": true, 970 + "labelAcct": false, 971 + "reportPost": false, 972 + "toLabel": true, 973 + "hammingThreshold": 3, 974 + "description": "Sample harassment image variants" 975 + }, 976 + ... 977 + ] 978 + ``` 979 + 980 + **Rule Fields:** 981 + - `phashes`: Array of 16-char hex hashes to match 982 + - `label`: Label to apply (e.g., "troll", "spam", "csam") 983 + - `comment`: Description for audit trail 984 + - `reportAcct`: Report the account 985 + - `labelAcct`: Label the account 986 + - `reportPost`: Report the post 987 + - `toLabel`: Label the post 988 + - `hammingThreshold`: Max hamming distance for match (overrides global default) 989 + - `description`: Internal documentation (not used) 990 + - `ignoreDID`: Optional array of DIDs to exempt from this rule 991 + 992 + **Matching Logic:** 993 + 1. For each rule in order: 994 + - Skip if post author DID in `ignoreDID` 995 + - Get threshold (per-rule or global default) 996 + - For each rule's phash: 997 + - Compute hamming_distance with computed phash 998 + - If distance <= threshold: Match found, execute actions 999 + 2. First match wins (no combining multiple rules) 1000 + 1001 + --- 1002 + 1003 + ## Data Flow & Workflow 1004 + 1005 + ### High-Level Process 1006 + 1007 + ``` 1008 + 1. JETSTREAM INGEST (main.rs:98-158) 1009 + Jetstream WebSocket -> posts with images 1010 + ↓ (extract blobs) 1011 + ImageJob created with post_uri, post_cid, post_did, blobs, timestamp, attempts=0 1012 + ↓ (send) 1013 + Job channel (mpsc, unbounded) 1014 + 1015 + 2. JOB QUEUING (main.rs:160-188) 1016 + Job receiver task polls from channel 1017 + ↓ (serialize) 1018 + Redis RPUSH to "jobs:pending" 1019 + 1020 + Metrics: inc_jobs_received() 1021 + 1022 + 3. WORKER PROCESSING (worker.rs:91+) 1023 + N concurrent workers run independently 1024 + Each worker: 1025 + - BLPOP from "jobs:pending" with 1s timeout 1026 + - For each blob in job: 1027 + a. Check phash cache ("phash:{cid}") 1028 + ↓ (if miss) 1029 + b. Download blob (CDN first, fall back to PDS) 1030 + 1031 + c. Compute phash (aHash 8x8 -> 16 hex) 1032 + 1033 + d. Cache phash in Redis (24h TTL) 1034 + 1035 + e. Match against rules (hamming_distance <= threshold) 1036 + ↓ (if match found) 1037 + f. Execute moderation actions: 1038 + - Check claims (deduplication, 7-day TTL) 1039 + - Create post/account labels or reports 1040 + - Rate-limited (config.moderation.rate_limit) 1041 + 1042 + - Update metrics (matches, labels, reports, etc) 1043 + - On error: call retry() -> re-push to "jobs:pending" 1044 + - On max retries: move to "jobs:dead" 1045 + 1046 + 4. METRICS LOGGING (main.rs:220-229) 1047 + Every 60 seconds: Metrics::log_stats() 1048 + Outputs: jobs, blobs, matches, cache, moderation, skipped 1049 + 1050 + 5. GRACEFUL SHUTDOWN (main.rs:233-284) 1051 + Ctrl+C or SIGTERM 1052 + 1053 + Send shutdown signal to all tasks 1054 + 1055 + Workers finish current jobs and exit 1056 + 1057 + Jetstream client writes final cursor 1058 + 1059 + Log final metrics 1060 + 1061 + Exit 1062 + ``` 1063 + 1064 + ### Error Handling Strategy 1065 + 1066 + **Level 1: Blob Processing** 1067 + - Download error: retry job (up to max_retries) 1068 + - Phash computation error: retry job 1069 + - Image decode error: log and continue (next blob) 1070 + 1071 + **Level 2: Rule Matching** 1072 + - Hamming distance error: log and continue 1073 + - No error if no match found (normal case) 1074 + 1075 + **Level 3: Moderation Actions** 1076 + - Label/report API errors: log but don't retry job 1077 + - Rate limit respected (wait before action) 1078 + - Claim check may skip action (already done in 7 days) 1079 + 1080 + **Level 4: Queue Management** 1081 + - Job pushed successfully: tracked in metrics 1082 + - Job failed after max retries: moved to dead-letter 1083 + - Dead-letter jobs observable via Redis (debugging) 1084 + 1085 + --- 1086 + 1087 + ## Testing Structure 1088 + 1089 + **Unit Tests Included:** 1090 + 1091 + 1. **config/mod.rs** 1092 + - `test_get_env_bool()` - Boolean parsing 1093 + - `test_get_env_u32()` - U32 parsing 1094 + 1095 + 2. **processor/phash.rs** 1096 + - `test_hamming_distance_identical()` - Same hash = 0 1097 + - `test_hamming_distance_different()` - Opposite hash = 64 1098 + - `test_hamming_distance_one_bit()` - One bit diff = 1 1099 + - `test_hamming_distance_invalid_length()` - Validation 1100 + - `test_hamming_distance_invalid_hex()` - Hex validation 1101 + - `test_phash_format()` - Output format (16 chars, valid hex) 1102 + 1103 + 3. **processor/matcher.rs** 1104 + - `test_match_phash_exact()` - Exact match 1105 + - `test_match_phash_within_threshold()` - Within threshold 1106 + - `test_match_phash_exceeds_threshold()` - Exceeds threshold 1107 + - `test_match_phash_ignored_did()` - DID exemption 1108 + - `test_load_real_rules()` - Load rules/blobs.json 1109 + 1110 + 4. **metrics/mod.rs** 1111 + - `test_metrics_increment()` - Atomic increment 1112 + - `test_cache_hit_rate()` - Hit rate calculation 1113 + - `test_metrics_snapshot()` - Snapshot consistency 1114 + 1115 + 5. **plc/mod.rs** 1116 + - `test_plc_resolve()` - Real PLC resolution (requires network) 1117 + 1118 + **Integration Tests (Require Redis):** 1119 + - cache/mod.rs - Cache get/set/delete operations 1120 + 1121 + **Run Tests:** 1122 + ```bash 1123 + # All unit tests 1124 + cargo test 1125 + 1126 + # Specific test 1127 + cargo test test_hamming_distance_identical 1128 + 1129 + # Show output 1130 + cargo test -- --nocapture 1131 + 1132 + # Ignored tests (network-dependent) 1133 + cargo test -- --ignored 1134 + ``` 1135 + 1136 + --- 1137 + 1138 + ## Key Design Decisions & Rationale 1139 + 1140 + ### 1. Redis for Persistence 1141 + - **Why:** Durability across process crashes, distributed state 1142 + - **Trade-off:** Adds Redis dependency (no in-memory fallback) 1143 + - **Mitigation:** Dead-letter queue preserves failed jobs for inspection 1144 + 1145 + ### 2. Job Queue Decouples Ingestion 1146 + - **Why:** Jetstream can be very fast, workers may lag 1147 + - **How:** mpsc channel -> Redis queue -> worker pool 1148 + - **Benefit:** Backpressure natural (Redis queue grows if workers slow) 1149 + 1150 + ### 3. Phash Caching (Redis) 1151 + - **Why:** Viral images processed multiple times, compute cost high 1152 + - **TTL:** 24 hours (balance between freshness and hit rate) 1153 + - **Hit Rate:** 20-40% typical (good ROI for cost) 1154 + 1155 + ### 4. Claims Deduplication 1156 + - **Why:** Prevent duplicate moderation actions within 7 days 1157 + - **How:** Redis SET ... NX (atomic acquire) 1158 + - **Check:** Verify label still exists in Ozone (race condition safety) 1159 + - **Trade-off:** May skip legitimate re-moderation within 7 days 1160 + 1161 + ### 5. Worker Pool Pattern 1162 + - **Design:** N independent workers, not tokio::spawn (HRTB issue) 1163 + - **Concurrency:** Multiplexed Redis connections (no lock contention) 1164 + - **Shutdown:** Broadcast receiver stops all workers together 1165 + 1166 + ### 6. Per-Rule Hamming Threshold 1167 + - **Why:** Different rule types need different sensitivity 1168 + - **Example:** Exact harassment memes (threshold 1) vs looser CSAM detection (threshold 5) 1169 + - **Default:** Overridable via PHASH_HAMMING_THRESHOLD 1170 + 1171 + ### 7. Cursor Persistence 1172 + - **Why:** Resume from exact position after restart 1173 + - **Format:** Microsecond timestamp (not millisecond) 1174 + - **Frequency:** Every 10 seconds + on shutdown 1175 + - **File:** firehose_cursor.db (working directory) 1176 + 1177 + ### 8. Jetstream Failover 1178 + - **Primary:** wss://jetstream.atproto.tools/subscribe 1179 + - **Fallbacks:** fire.hose.cam URLs (different provider) 1180 + - **Strategy:** Exponential backoff with exponential caps 1181 + 1182 + ### 9. Blob Download Fallback 1183 + - **Primary:** CDN (cdn.bsky.app/img/feed_fullsize) 1184 + - **Secondary:** PDS (com.atproto.sync.getBlob) 1185 + - **Why:** CDN is faster, reduces PDS load 1186 + 1187 + ### 10. Rate Limiting 1188 + - **Purpose:** Respect Ozone API quotas 1189 + - **Mechanism:** Delay before each moderation action 1190 + - **Config:** rate_limit_ms (default 100) 1191 + - **Future:** Track RateLimit headers from Ozone responses 1192 + 1193 + --- 1194 + 1195 + ## Performance Characteristics 1196 + 1197 + ### Throughput 1198 + - Jetstream: ~1000s posts/second (unlimited) 1199 + - Workers: 10 workers × 1-5 blobs/second = 10-50 images/second 1200 + - Bottleneck: Network I/O (blob download), not hashing 1201 + 1202 + ### Latency 1203 + - Jetstream event -> job enqueue: <100ms 1204 + - Job dequeue -> phash computed: 200-500ms (network dependent) 1205 + - Phash -> moderation action: ~100ms (rate-limited) 1206 + - Total end-to-end: 300-700ms per image 1207 + 1208 + ### Memory 1209 + - Minimal (no large buffers held) 1210 + - Arc cloning for data sharing 1211 + - Metrics: lock-free atomics 1212 + - Config loaded once at startup 1213 + 1214 + ### CPU 1215 + - Phash computation: ~1-5ms per image (image_hasher) 1216 + - Hamming distance: <1µs (bitwise operations) 1217 + - Not CPU-bound 1218 + 1219 + ### Disk 1220 + - Cursor file: <20 bytes (microsecond timestamp) 1221 + - Rules: JSON file (~10KB typical) 1222 + - Logs: optional (stdout/JSON logging) 1223 + 1224 + --- 1225 + 1226 + ## Known Limitations & Future Work 1227 + 1228 + ### Current Limitations 1229 + 1. Single-process (no distributed workers) 1230 + 2. In-memory PDS/PLC caches lost on restart 1231 + 3. No metrics server (Prometheus endpoint) 1232 + 4. Takedown actions not implemented (infrastructure) 1233 + 5. No image deduplication by CID before download 1234 + 6. No batch operations to Ozone API 1235 + 1236 + ### Future Enhancements (from README) 1237 + - Rate limit header parsing (adaptive backoff) 1238 + - Takedown post/account actions 1239 + - Distributed worker support 1240 + - Persistent moderation history 1241 + - Web UI for rule management 1242 + - Active monitoring/alerting 1243 + 1244 + --- 1245 + 1246 + ## Environment Setup 1247 + 1248 + ### Required Credentials 1249 + ```bash 1250 + AUTOMOD_HANDLE=automod.bsky.social # Labeler account handle 1251 + AUTOMOD_PASSWORD=xxxx-yyyy-zzzz-wwww # App password (NOT user password) 1252 + LABELER_DID=did:plc:example # Your labeler account DID 1253 + OZONE_URL=https://ozone.bsky.app # Ozone service URL 1254 + OZONE_PDS=https://pds.bluesky.social # Ozone PDS endpoint 1255 + ``` 1256 + 1257 + ### Docker Deployment 1258 + ```bash 1259 + cp .env.example .env 1260 + # Edit .env with credentials 1261 + docker compose up --build 1262 + ``` 1263 + 1264 + ### Local Development 1265 + ```bash 1266 + # Start Redis 1267 + docker run -d -p 6379:6379 redis 1268 + 1269 + # Create .env 1270 + cp .env.example .env 1271 + # Edit with credentials 1272 + 1273 + # Run 1274 + cargo run 1275 + 1276 + # Tests 1277 + cargo test 1278 + ``` 1279 + 1280 + ### CLI Tool: phash-cli 1281 + ```bash 1282 + cargo run --bin phash-cli path/to/image.jpg 1283 + # Output: e0e0e0e0e0fcfefe (16 hex chars) 1284 + ``` 1285 + 1286 + --- 1287 + 1288 + ## Critical Gotchas & Nuances 1289 + 1290 + ### 1. Cursor is Microseconds, Not Milliseconds 1291 + - Jetstream provides `time_us` (microseconds since epoch) 1292 + - NOT milliseconds (1,000x larger) 1293 + - Used directly for resumption 1294 + 1295 + ### 2. Hamming Threshold Comparison 1296 + - `distance <= threshold` (inclusive) 1297 + - Threshold 0 = exact match only 1298 + - Threshold 5 = default (moderate sensitivity) 1299 + 1300 + ### 3. First Match Wins 1301 + - Rules evaluated in order 1302 + - First matching rule's actions executed 1303 + - No combining multiple rules 1304 + 1305 + ### 4. Ignore DIDs Are Per-Rule 1306 + - Each rule can have its own `ignoreDID` list 1307 + - Not a global blocklist 1308 + - Checked during matching 1309 + 1310 + ### 5. MIME Type Is Optional 1311 + - `BlobReference.mime_type` may be missing 1312 + - Code doesn't filter by MIME type 1313 + - SVG images might be processed (future: skip SVG) 1314 + 1315 + ### 6. Claims Are Deduplication Only 1316 + - Redis claim prevents action for 7 days 1317 + - Still checks Ozone API (belt-and-suspenders) 1318 + - May skip legitimate re-moderation within 7 days 1319 + 1320 + ### 7. Rate Limit Delay 1321 + - Applied BEFORE action (preventive) 1322 + - Not tied to Ozone response headers (yet) 1323 + - May result in artificial delay even at low load 1324 + 1325 + ### 8. Retry Logic 1326 + - Job retried on ANY error (not just transient) 1327 + - Max retries from config (default 3) 1328 + - Dead-letter after max retries (not discarded) 1329 + 1330 + ### 9. Cache TTL 1331 + - 24 hours default (very long) 1332 + - Can be tuned via CACHE_TTL_SECONDS 1333 + - Image may be edited/removed but hash cached 1334 + 1335 + ### 10. Jetstream Fallover 1336 + - URL rotation on failure 1337 + - Exponential backoff (5s -> 10s -> 20s ... -> 300s) 1338 + - Max 5 connection attempts total before giving up 1339 + 1340 + --- 1341 + 1342 + ## Code Quality & Testing 1343 + 1344 + ### Error Handling Approach 1345 + - **miette for diagnostics:** Rich error context and pretty printing 1346 + - **thiserror for custom types:** Derive Error trait 1347 + - **Result<T> pervasive:** No panics in business logic 1348 + - **Graceful degradation:** Errors logged, processing continues 1349 + 1350 + ### Concurrency Patterns 1351 + - **Arc for sharing:** Cheap clones across workers 1352 + - **Atomic types for metrics:** Lock-free increments 1353 + - **tokio::select! for orchestration:** Clean multi-task coordination 1354 + - **Multiplexed Redis connections:** One connection, concurrent operations 1355 + 1356 + ### Code Style 1357 + - Consistent module structure (mod.rs organization) 1358 + - Clear separation of concerns (processor, queue, moderation) 1359 + - Use of Jacquard types (CowStr, Did, AtUri, Cid) 1360 + - Comprehensive logging via tracing 1361 + 1362 + --- 1363 + 1364 + ## Summary of Key Components & Responsibilities 1365 + 1366 + | Component | File | Purpose | Key Types | 1367 + |-----------|------|---------|-----------| 1368 + | Entry Point | main.rs | Orchestrate startup, shutdown, task coordination | - | 1369 + | Configuration | config/mod.rs | Load env vars, provide config to all modules | Config, JetstreamConfig, etc | 1370 + | Jetstream | jetstream/ | Subscribe to firehose, extract blobs, handle cursor | JetstreamClient, ImageJob | 1371 + | Image Processing | processor/ | Hash computation, rule matching, blob download | Phash, hamming_distance, MatchResult | 1372 + | Job Queue | queue/ | Redis persistence, retry logic, dead-letter | JobQueue, WorkerPool, ImageJob | 1373 + | Cache | cache/mod.rs | Redis phash cache with TTL | PhashCache | 1374 + | Moderation | moderation/ | Execute label/report actions, deduplication | Claims, RateLimiter | 1375 + | Metrics | metrics/mod.rs | Track statistics with lock-free atomics | Metrics, MetricsSnapshot | 1376 + | Authentication | agent/ | Jacquard session wrapper | AgentSession | 1377 + | DID Resolution | plc/mod.rs | Resolve DIDs to PDS endpoints | PlcClient | 1378 + 1379 + --- 1380 + 1381 + ## Quick Reference: Important File Paths 1382 + 1383 + **Configuration:** 1384 + - Environment loading: `/Users/scarndp/dev/skywatch/skywatch-phash-rs/src/config/mod.rs:80-144` 1385 + - Jetstream config: `config/mod.rs:87-103` 1386 + - Processing config: `config/mod.rs:107-111` 1387 + - Cache config: `config/mod.rs:112-114` 1388 + 1389 + **Core Algorithm:** 1390 + - Phash computation: `/Users/scarndp/dev/skywatch/skywatch-phash-rs/src/processor/phash.rs:26-44` 1391 + - Hamming distance: `processor/phash.rs:72-98` 1392 + - Rule matching: `processor/matcher.rs:72-113` 1393 + 1394 + **Job Processing:** 1395 + - Worker loop: `/Users/scarndp/dev/skywatch/skywatch-phash-rs/src/queue/worker.rs:91-99` (main select loop) 1396 + - Job retry: `queue/redis_queue.rs:78-97` 1397 + - Queue push/pop: `queue/redis_queue.rs:40-76` 1398 + 1399 + **Moderation:** 1400 + - Post actions: `/Users/scarndp/dev/skywatch/skywatch-phash-rs/src/moderation/post.rs` 1401 + - Account actions: `moderation/account.rs` 1402 + - Claims checking: `moderation/claims.rs` 1403 + 1404 + **Metrics:** 1405 + - Metric types: `/Users/scarndp/dev/skywatch/skywatch-phash-rs/src/metrics/mod.rs:1-66` 1406 + - Log stats: `metrics/mod.rs:212-244` 1407 + 1408 + **Rules:** 1409 + - Current rules: `/Users/scarndp/dev/skywatch/skywatch-phash-rs/rules/blobs.json` 1410 + 1411 + --- 1412 + 1413 + ## Debugging Tips 1414 + 1415 + ### Enable Debug Logging 1416 + ```bash 1417 + RUST_LOG=debug cargo run 1418 + RUST_LOG=skywatch_phash_rs=debug,info cargo run 1419 + ``` 1420 + 1421 + ### Monitor Redis 1422 + ```bash 1423 + redis-cli 1424 + > KEYS "*" 1425 + > LLEN jobs:pending 1426 + > LLEN jobs:dead 1427 + > GET phash:{cid} 1428 + ``` 1429 + 1430 + ### Check Metrics in Real-Time 1431 + ```bash 1432 + # Logs printed every 60 seconds, watch with: 1433 + tail -f logs.txt | grep "=== Metrics ===" 1434 + ``` 1435 + 1436 + ### Cursor Position 1437 + ```bash 1438 + cat firehose_cursor.db 1439 + # Shows microsecond timestamp 1440 + ``` 1441 + 1442 + ### Test Phash CLI 1443 + ```bash 1444 + cargo run --bin phash-cli path/to/image.jpg 1445 + # Output: 16-char hex string 1446 + ``` 1447 + 1448 + ### Inspect Rules 1449 + ```bash 1450 + jq '.' rules/blobs.json 1451 + jq '.[] | {label, phashes: (.phashes | length)}' rules/blobs.json 1452 + ``` 1453 + 1454 + --- 1455 + 1456 + ## Deployment Notes 1457 + 1458 + ### Docker Compose 1459 + - Service: `app` (main binary) 1460 + - Dependencies: `redis` (persistent queue/cache) 1461 + - Environment: Sourced from `.env` 1462 + - Logs: Streamed to stdout/logs volume 1463 + 1464 + ### Graceful Shutdown 1465 + - Jetstream writes final cursor 1466 + - Workers finish active jobs (no kill -9) 1467 + - Metrics logged at shutdown 1468 + - Redis connections closed 1469 + 1470 + ### Monitoring 1471 + - Metrics every 60 seconds (INFO level) 1472 + - Final metrics on shutdown 1473 + - Structured JSON logging (if enabled) 1474 + - No built-in Prometheus endpoint (yet) 1475 + 1476 + --- 1477 + 1478 + ## References & Related Projects 1479 + 1480 + **Parent Repository:** `tangled.sh:skywatch.blue/skywatch-phash-rs` 1481 + **Jacquard Dependency:** `../jacquard/crates/jacquard` (local path) 1482 + **Bluesky/ATProto:** https://github.com/bluesky-social/atproto 1483 + **Image Hasher:** https://github.com/Ed-von-Schleck/image-hasher 1484 + 1485 + --- 1486 + 1487 + **Document Version:** 1.0 1488 + **Last Updated:** 2025-10-26 1489 + **Codebase Version:** 0.2.0 1490 + **Rust Edition:** 2024
+6
src/config/mod.rs
··· 35 35 pub concurrency: usize, 36 36 pub retry_attempts: u32, 37 37 pub retry_delay: u64, 38 + /// Timeout per blob download attempt in seconds (per format/endpoint) 39 + pub blob_download_timeout_secs: u64, 40 + /// Total timeout for all blob download fallback attempts in seconds 41 + pub blob_total_timeout_secs: u64, 38 42 } 39 43 40 44 #[derive(Debug, Clone)] ··· 108 112 concurrency: get_env_usize("PROCESSING_CONCURRENCY", 10), 109 113 retry_attempts: get_env_u32("RETRY_ATTEMPTS", 3), 110 114 retry_delay: get_env_u64("RETRY_DELAY_MS", 1000), 115 + blob_download_timeout_secs: get_env_u64("BLOB_DOWNLOAD_TIMEOUT_SECS", 10), 116 + blob_total_timeout_secs: get_env_u64("BLOB_TOTAL_TIMEOUT_SECS", 30), 111 117 }, 112 118 cache: CacheConfig { 113 119 enabled: get_env_bool("CACHE_ENABLED", true),
+1 -1
src/main.rs
··· 72 72 agent.clone(), 73 73 blob_checks.clone(), 74 74 metrics.clone(), 75 - ); 75 + )?; 76 76 info!( 77 77 "Worker pool created with {} workers", 78 78 config.processing.concurrency
+47 -4
src/moderation/helpers.rs
··· 8 8 use jacquard_common::IntoStatic; 9 9 use miette::{IntoDiagnostic, Result}; 10 10 use std::collections::BTreeMap; 11 + use std::time::Duration; 12 + use tracing::{debug, warn}; 11 13 12 14 use crate::config::Config; 13 15 use crate::moderation::rate_limiter::RateLimiter; ··· 62 64 rate_limiter: &RateLimiter, 63 65 event: EmitEvent<'a>, 64 66 ) -> Result<()> { 65 - rate_limiter.wait().await; 67 + const MAX_RETRIES: u32 = 3; 68 + let mut retry_count = 0; 69 + let mut backoff = Duration::from_millis(100); 70 + 71 + loop { 72 + rate_limiter.wait().await; 73 + 74 + let opts = build_moderation_call_opts(config); 75 + match agent.send_with_opts(event.clone(), opts).await.into_diagnostic() { 76 + Ok(_) => { 77 + debug!("Moderation event sent successfully"); 78 + return Ok(()); 79 + } 80 + Err(e) => { 81 + retry_count += 1; 82 + let error_msg = format!("{}", e); 83 + 84 + // Check if error is potentially transient 85 + let is_transient = error_msg.contains("500") 86 + || error_msg.contains("502") 87 + || error_msg.contains("503") 88 + || error_msg.contains("504") 89 + || error_msg.contains("timeout") 90 + || error_msg.contains("connection"); 91 + 92 + if retry_count > MAX_RETRIES || !is_transient { 93 + warn!( 94 + "Moderation API call failed (attempt {}/{}): {} (transient: {})", 95 + retry_count, MAX_RETRIES, error_msg, is_transient 96 + ); 97 + return Err(e); 98 + } 66 99 67 - let opts = build_moderation_call_opts(config); 68 - agent.send_with_opts(event, opts).await.into_diagnostic()?; 100 + warn!( 101 + "Moderation API call failed (attempt {}/{}), retrying in {:.0}ms: {}", 102 + retry_count, 103 + MAX_RETRIES, 104 + backoff.as_secs_f64() * 1000.0, 105 + error_msg 106 + ); 69 107 70 - Ok(()) 108 + tokio::time::sleep(backoff).await; 109 + let next_backoff_ms = (backoff.as_millis() as u64 * 2).min(5000); // Cap at 5s 110 + backoff = Duration::from_millis(next_backoff_ms); 111 + } 112 + } 113 + } 71 114 }
+6 -5
src/moderation/rate_limiter.rs
··· 3 3 state::{InMemoryState, NotKeyed}, 4 4 Quota, RateLimiter as GovernorRateLimiter, 5 5 }; 6 + use miette::{miette, Result}; 6 7 use std::sync::Arc; 7 8 use std::time::Duration; 8 9 ··· 15 16 impl RateLimiter { 16 17 /// Create a new rate limiter with the given rate limit in milliseconds 17 18 /// For example, rate_limit_ms = 100 means 100ms minimum between requests (10 requests per second) 18 - pub fn new(rate_limit_ms: u64) -> Self { 19 + pub fn new(rate_limit_ms: u64) -> Result<Self> { 19 20 let duration = if rate_limit_ms == 0 { 20 21 Duration::from_millis(1) 21 22 } else { 22 23 Duration::from_millis(rate_limit_ms) 23 24 }; 24 25 25 - // 1 request per rate_limit_ms duration 26 - let quota = Quota::with_period(duration).unwrap(); 26 + let quota = Quota::with_period(duration) 27 + .ok_or_else(|| miette!("Invalid rate limit duration: {}ms", rate_limit_ms))?; 27 28 let limiter = GovernorRateLimiter::direct(quota); 28 29 29 - Self { 30 + Ok(Self { 30 31 limiter: Arc::new(limiter), 31 - } 32 + }) 32 33 } 33 34 34 35 /// Wait until a request can be made according to the rate limit
+48 -6
src/processor/matcher.rs
··· 1 1 use miette::{IntoDiagnostic, Result}; 2 2 use reqwest::Client; 3 3 use std::path::Path; 4 + use std::time::{Duration, Instant}; 4 5 use tracing::{debug, info, warn}; 5 6 6 7 use crate::config::Config; ··· 22 23 did: &str, 23 24 cid: &str, 24 25 ) -> Result<Vec<u8>> { 26 + let start = Instant::now(); 27 + let per_attempt_timeout = Duration::from_secs(config.processing.blob_download_timeout_secs); 28 + let total_timeout = Duration::from_secs(config.processing.blob_total_timeout_secs); 29 + 25 30 // Try CDN first - attempt common image formats 26 31 for format in ["jpeg", "png", "webp"] { 32 + // Check if we've exceeded total timeout 33 + if start.elapsed() > total_timeout { 34 + warn!( 35 + "Blob download total timeout exceeded for did={}, cid={}", 36 + did, cid 37 + ); 38 + return Err(miette::miette!( 39 + "Blob download timeout after {:.1}s", 40 + start.elapsed().as_secs_f64() 41 + )); 42 + } 43 + 27 44 let cdn_url = format!( 28 45 "https://cdn.bsky.app/img/feed_fullsize/plain/{}/{}@{}", 29 46 did, cid, format 30 47 ); 31 48 32 - debug!("Trying CDN download: {}", cdn_url); 49 + debug!("Trying CDN download: {} (timeout: {}s)", cdn_url, config.processing.blob_download_timeout_secs); 33 50 34 - match client.get(&cdn_url).send().await { 51 + match client 52 + .get(&cdn_url) 53 + .timeout(per_attempt_timeout) 54 + .send() 55 + .await 56 + { 35 57 Ok(response) if response.status().is_success() => { 36 58 debug!("Successfully downloaded from CDN: did={}, cid={}", did, cid); 37 59 let bytes = response.bytes().await.into_diagnostic()?; ··· 41 63 debug!("CDN returned status {}, trying next format", response.status()); 42 64 } 43 65 Err(e) => { 44 - debug!("CDN request failed: {}, trying next format", e); 66 + debug!( 67 + "CDN request failed: {} (elapsed: {:.1}s), trying next format", 68 + e, 69 + start.elapsed().as_secs_f64() 70 + ); 45 71 } 46 72 } 47 73 } 48 74 49 75 // Fall back to PDS if CDN fails 50 - warn!("CDN failed for did={}, cid={}, falling back to PDS", did, cid); 76 + warn!( 77 + "CDN failed for did={}, cid={}, falling back to PDS (elapsed: {:.1}s)", 78 + did, 79 + cid, 80 + start.elapsed().as_secs_f64() 81 + ); 82 + 83 + // Check if we've exceeded total timeout before PDS attempt 84 + if start.elapsed() > total_timeout { 85 + warn!("Blob download total timeout exceeded before PDS fallback"); 86 + return Err(miette::miette!( 87 + "Blob download timeout after {:.1}s", 88 + start.elapsed().as_secs_f64() 89 + )); 90 + } 51 91 52 92 let pds_url = format!( 53 93 "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", 54 94 config.pds.endpoint, did, cid 55 95 ); 56 96 57 - debug!("Downloading from PDS: {}", pds_url); 97 + debug!("Downloading from PDS: {} (timeout: {}s)", pds_url, config.processing.blob_download_timeout_secs); 58 98 59 99 let response = client 60 100 .get(&pds_url) 101 + .timeout(per_attempt_timeout) 61 102 .send() 62 103 .await 63 - .into_diagnostic()? 104 + .into_diagnostic() 105 + .map_err(|e| miette::miette!("PDS blob download failed: {}", e))? 64 106 .error_for_status() 65 107 .into_diagnostic()?; 66 108
+13 -19
src/queue/worker.rs
··· 73 73 agent: AgentSession, 74 74 blob_checks: Vec<BlobCheck>, 75 75 metrics: Metrics, 76 - ) -> Self { 77 - let rate_limiter = RateLimiter::new(config.moderation.rate_limit); 76 + ) -> Result<Self> { 77 + let rate_limiter = RateLimiter::new(config.moderation.rate_limit)?; 78 78 79 - Self { 79 + Ok(Self { 80 80 config, 81 81 client, 82 82 agent, 83 83 blob_checks, 84 84 metrics, 85 85 rate_limiter, 86 - } 86 + }) 87 87 } 88 88 89 89 /// Start the worker pool - processes jobs sequentially ··· 94 94 mut cache: PhashCache, 95 95 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, 96 96 ) -> Result<()> { 97 + // Create Redis connection once at worker startup, reuse for all jobs 98 + let redis_client = redis::Client::open(self.config.redis.url.as_str()) 99 + .into_diagnostic()?; 100 + let mut redis_conn = redis_client 101 + .get_multiplexed_async_connection() 102 + .await 103 + .into_diagnostic() 104 + .map_err(|e| miette::miette!("Failed to establish Redis connection: {}", e))?; 105 + 97 106 loop { 98 107 tokio::select! { 99 108 _ = shutdown_rx.recv() => { ··· 105 114 match job_result { 106 115 Ok(Some(job)) => { 107 116 debug!("Worker popped job from queue: {}", job.post_uri); 108 - let redis_client = match redis::Client::open(self.config.redis.url.as_str()) { 109 - Ok(c) => c, 110 - Err(e) => { 111 - error!("Failed to create Redis client: {}", e); 112 - continue; 113 - } 114 - }; 115 - 116 - let mut redis_conn = match redis_client.get_multiplexed_async_connection().await { 117 - Ok(conn) => conn, 118 - Err(e) => { 119 - error!("Failed to connect to Redis: {}", e); 120 - continue; 121 - } 122 - }; 123 117 124 118 let job_clone = job.clone(); 125 119 if let Err(e) = Self::process_job(