A rust implementation of skywatch-phash

feat: Implement circuit breaker pattern for external APIs

Implements comprehensive circuit breaker protection for Ozone API and
PDS blob downloads to prevent cascading failures during API degradation.

Changes:
- Created generic CircuitBreaker with states: Closed → Open → Half-Open → Closed
- Implemented three circuit breaker instances:
* Ozone API: 5 consecutive failures → open, 60s timeout, 1 success to close
* PDS Blob: 3 consecutive failures → open, 5m timeout, 1 success to close
* PLC Resolution: 3 consecutive failures → open, 5m timeout (created but not yet integrated)

Implementation Details:
- Circuit breaker checks added before all Ozone moderation API calls
- Circuit breaker checks added before PDS blob fallback downloads
- Automatic state transitions with configurable timeouts
- Thread-safe design using Arc<RwLock<>>
- Metrics tracking for state transitions and rejections
- Comprehensive unit tests covering all state transitions

Architecture:
- src/resilience/circuit_breaker.rs: Core circuit breaker implementation
- src/main.rs: Initialize three circuit breaker instances with metrics
- src/queue/worker.rs: Pass circuit breakers to moderation and download functions
- src/processor/matcher.rs: Circuit breaker protection for PDS blob downloads
- src/moderation/helpers.rs: Circuit breaker protection for Ozone API calls
- src/metrics/mod.rs: Track circuit_breaker_transitions and circuit_breaker_rejections

Tests:
- All 7 circuit breaker unit tests passing
- Verified state transitions (closed → open → half-open → closed)
- Tested failure threshold triggers opening
- Tested success during half-open closes circuit
- Tested failure during half-open reopens circuit
- Tested timeout calculation and half-open transition
- Tested half_open_max_calls limiting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Skywatch bf796657 f17b2c7f

Changed files
+637 -21
src
+124
PLAN_REMAINING.md
··· 1 + # Production Resilience Improvements - Implementation Plan 2 + 3 + ## Overview 4 + Complete the remaining critical issues needed for full production readiness. Each task includes implementation, testing, and verification. 5 + 6 + --- 7 + 8 + ## Task 1: Implement Circuit Breaker Pattern for External APIs 9 + 10 + **Objective:** Prevent cascading failures when external APIs (Ozone, PDS, PLC) degrade or fail. 11 + 12 + **Requirements:** 13 + 1. Create a circuit breaker module (`src/resilience/circuit_breaker.rs`) 14 + 2. Implement three independent circuit breakers: 15 + - **Ozone API** - Opens after 5 consecutive failures, half-opens after 60s 16 + - **PDS Blob Fetch** - Opens after 3 consecutive failures per endpoint, 5m timeout 17 + - **PLC Resolution** - Opens after 3 consecutive failures per endpoint, 5m timeout 18 + 3. Circuit breaker states: Closed → Open → Half-Open → Closed 19 + 4. Add metrics tracking: circuit_breaker_state, circuit_breaker_transitions 20 + 5. Update error handling to respect circuit breaker state 21 + 6. Add comprehensive tests: 22 + - State transitions (closed → open → half-open → closed) 23 + - Failure threshold triggers 24 + - Success during half-open closes circuit 25 + - Failure during half-open reopens circuit 26 + - Timeout calculation 27 + 28 + **Files to Modify:** 29 + - Create: `src/resilience/circuit_breaker.rs` 30 + - Create: `src/resilience/mod.rs` 31 + - Modify: `src/moderation/helpers.rs` - Wrap Ozone calls with circuit breaker 32 + - Modify: `src/processor/matcher.rs` - Wrap PDS/CDN calls with circuit breaker 33 + - Modify: `src/plc/mod.rs` - Wrap PLC calls with circuit breaker 34 + - Modify: `src/main.rs` - Initialize circuit breakers 35 + - Modify: `src/metrics/mod.rs` - Add circuit breaker metrics 36 + 37 + **Tests Required:** 38 + - Unit tests for circuit breaker state machine 39 + - Integration tests for API call wrapping 40 + - Test timeout and recovery scenarios 41 + 42 + --- 43 + 44 + ## Task 2: Add Redis Connection Failure Backoff and Recovery 45 + 46 + **Objective:** Handle Redis unavailability gracefully instead of crashing or spinning in error loops. 47 + 48 + **Requirements:** 49 + 1. Implement connection retry logic with exponential backoff 50 + 2. Add connection pooling via `ConnectionManager` (redis crate provides this) 51 + 3. Add health check mechanism with configurable interval 52 + 4. Implement graceful degradation: 53 + - First failure: Log and retry with 100ms delay 54 + - Exponential backoff: Cap at 10s 55 + - After 5 consecutive failures: Switch to circuit breaker state 56 + 5. Add configuration: 57 + - `REDIS_HEALTH_CHECK_INTERVAL_SECS` (default: 30s) 58 + - `REDIS_MAX_BACKOFF_SECS` (default: 10s) 59 + 6. Add metrics: `redis_connection_failures`, `redis_reconnect_attempts`, `redis_health_status` 60 + 7. Add logging for connection state changes 61 + 62 + **Files to Modify:** 63 + - Modify: `src/config/mod.rs` - Add redis retry configuration 64 + - Create: `src/redis_pool.rs` - Redis connection manager with backoff 65 + - Modify: `src/queue/redis_queue.rs` - Use connection manager 66 + - Modify: `src/cache/mod.rs` - Use connection manager 67 + - Modify: `src/main.rs` - Initialize redis pool 68 + - Modify: `src/metrics/mod.rs` - Add redis metrics 69 + 70 + **Tests Required:** 71 + - Unit tests for backoff calculation 72 + - Connection failure retry tests 73 + - Health check tests 74 + - Graceful degradation tests 75 + 76 + --- 77 + 78 + ## Task 3: Add Integration Tests for Critical Paths 79 + 80 + **Objective:** Achieve >50% test coverage for critical paths (up from <5% currently). 81 + 82 + **Requirements:** 83 + 1. Use `testcontainers` for Redis integration tests 84 + 2. Test critical paths: 85 + - **Worker job processing flow** - Mock blob download, verify cache, verify moderation 86 + - **Jetstream message → Job Queue flow** - Verify message parsing, job creation 87 + - **Moderation actions** - Test with mock Ozone API, verify retry logic 88 + - **Blob download fallback** - Test CDN failure → PDS success 89 + - **Cache operations** - Test cache hit/miss, TTL expiration 90 + 3. Tests must be isolated (no shared state between tests) 91 + 4. Use `tokio::test` for async tests 92 + 5. Mock external services (Ozone, PDS, PLC) where appropriate 93 + 6. Test error scenarios: 94 + - Blob download timeout 95 + - Cache miss 96 + - Moderation API retry exhaustion 97 + - Redis unavailability 98 + 7. Organize tests in `tests/integration/` directory 99 + 100 + **Files to Create:** 101 + - Create: `tests/integration/mod.rs` 102 + - Create: `tests/integration/worker_test.rs` - Worker job processing 103 + - Create: `tests/integration/cache_test.rs` - Cache operations 104 + - Create: `tests/integration/blob_download_test.rs` - Download with fallback 105 + - Create: `tests/integration/moderation_test.rs` - Moderation actions with retry 106 + - Create: `tests/integration/helpers.rs` - Test fixtures and mocks 107 + 108 + **Acceptance Criteria:** 109 + - All 10+ integration tests pass 110 + - >50% code coverage for critical modules (worker, cache, moderation) 111 + - Tests are deterministic (no flakiness) 112 + - Tests complete in <30s total 113 + 114 + --- 115 + 116 + ## Success Criteria 117 + 118 + - [ ] All tasks implemented and tests passing 119 + - [ ] No panics or unwraps in production code 120 + - [ ] Circuit breaker prevents cascading failures 121 + - [ ] Redis connection failures handled gracefully 122 + - [ ] Integration tests provide >50% coverage 123 + - [ ] Code review passes for all changes 124 + - [ ] All changes committed to `feat/production-resilience-improvements` branch
+31
src/main.rs
··· 15 15 processor::matcher, 16 16 queue::{JobQueue, WorkerPool}, 17 17 redis_pool::RedisPool, 18 + resilience::CircuitBreaker, 18 19 }; 19 20 20 21 #[tokio::main] ··· 80 81 let queue = JobQueue::new(redis_pool.clone(), config.processing.retry_attempts); 81 82 info!("Job queue initialized"); 82 83 84 + // Create circuit breakers with metrics 85 + let ozone_circuit_breaker = CircuitBreaker::with_metrics( 86 + "ozone-api", 87 + 5, // 5 consecutive failures 88 + 60, // 60s timeout 89 + 1, // 1 success to close 90 + metrics.clone(), 91 + ); 92 + info!("Ozone API circuit breaker initialized"); 93 + 94 + let pds_circuit_breaker = CircuitBreaker::with_metrics( 95 + "pds-blob", 96 + 3, // 3 consecutive failures 97 + 300, // 5m timeout 98 + 1, // 1 success to close 99 + metrics.clone(), 100 + ); 101 + info!("PDS blob circuit breaker initialized"); 102 + 103 + let _plc_circuit_breaker = CircuitBreaker::with_metrics( 104 + "plc-resolution", 105 + 3, // 3 consecutive failures 106 + 300, // 5m timeout 107 + 1, // 1 success to close 108 + metrics.clone(), 109 + ); 110 + info!("PLC resolution circuit breaker initialized (not yet integrated)"); 111 + 83 112 // Create worker pool 84 113 let worker_pool = WorkerPool::new( 85 114 config.clone(), ··· 87 116 agent.clone(), 88 117 blob_checks.clone(), 89 118 metrics.clone(), 119 + ozone_circuit_breaker, 120 + pds_circuit_breaker, 90 121 )?; 91 122 info!( 92 123 "Worker pool created with {} workers",
+31
src/metrics/mod.rs
··· 42 42 redis_connection_failures: AtomicU64, 43 43 redis_reconnect_attempts: AtomicU64, 44 44 redis_health_status: AtomicU64, 45 + 46 + // Circuit breaker metrics 47 + circuit_breaker_transitions: AtomicU64, 48 + circuit_breaker_rejections: AtomicU64, 45 49 } 46 50 47 51 impl Metrics { ··· 69 73 redis_connection_failures: AtomicU64::new(0), 70 74 redis_reconnect_attempts: AtomicU64::new(0), 71 75 redis_health_status: AtomicU64::new(1), 76 + circuit_breaker_transitions: AtomicU64::new(0), 77 + circuit_breaker_rejections: AtomicU64::new(0), 72 78 }), 73 79 } 74 80 } ··· 160 166 self.inner.redis_health_status.store(if healthy { 1 } else { 0 }, Ordering::Relaxed); 161 167 } 162 168 169 + // Circuit breaker metrics 170 + pub fn inc_circuit_breaker_transitions(&self) { 171 + self.inner.circuit_breaker_transitions.fetch_add(1, Ordering::Relaxed); 172 + } 173 + 174 + pub fn inc_circuit_breaker_rejections(&self) { 175 + self.inner.circuit_breaker_rejections.fetch_add(1, Ordering::Relaxed); 176 + } 177 + 163 178 // Getters 164 179 pub fn jobs_received(&self) -> u64 { 165 180 self.inner.jobs_received.load(Ordering::Relaxed) ··· 241 256 self.inner.redis_health_status.load(Ordering::Relaxed) == 1 242 257 } 243 258 259 + pub fn circuit_breaker_transitions(&self) -> u64 { 260 + self.inner.circuit_breaker_transitions.load(Ordering::Relaxed) 261 + } 262 + 263 + pub fn circuit_breaker_rejections(&self) -> u64 { 264 + self.inner.circuit_breaker_rejections.load(Ordering::Relaxed) 265 + } 266 + 244 267 /// Log current metrics 245 268 pub fn log_stats(&self) { 246 269 info!("=== Metrics ==="); ··· 279 302 self.redis_reconnect_attempts(), 280 303 if self.redis_health_status() { "healthy" } else { "degraded" } 281 304 ); 305 + info!("Circuit breakers: transitions={}, rejections={}", 306 + self.circuit_breaker_transitions(), 307 + self.circuit_breaker_rejections() 308 + ); 282 309 } 283 310 284 311 /// Calculate cache hit rate ··· 315 342 redis_connection_failures: self.redis_connection_failures(), 316 343 redis_reconnect_attempts: self.redis_reconnect_attempts(), 317 344 redis_health_status: if self.redis_health_status() { 1 } else { 0 }, 345 + circuit_breaker_transitions: self.circuit_breaker_transitions(), 346 + circuit_breaker_rejections: self.circuit_breaker_rejections(), 318 347 } 319 348 } 320 349 } ··· 347 376 pub redis_connection_failures: u64, 348 377 pub redis_reconnect_attempts: u64, 349 378 pub redis_health_status: u64, 379 + pub circuit_breaker_transitions: u64, 380 + pub circuit_breaker_rejections: u64, 350 381 } 351 382 352 383 #[cfg(test)]
+7 -3
src/moderation/account.rs
··· 19 19 use crate::moderation::{ 20 20 build_mod_tool_meta, build_timestamped_comment, send_moderation_event, 21 21 }; 22 + use crate::resilience::CircuitBreaker; 22 23 23 24 /// Label an account with a specific label via Ozone moderation API 24 25 pub async fn label_account<'a>( 25 26 agent: &Agent<MemoryCredentialSession>, 26 27 config: &Config, 27 28 rate_limiter: &RateLimiter, 29 + circuit_breaker: Option<&CircuitBreaker>, 28 30 did: &Did<'a>, 29 31 label_val: &str, 30 32 check_comment: &str, ··· 58 60 }) 59 61 .build(); 60 62 61 - send_moderation_event(agent, config, rate_limiter, event).await?; 63 + send_moderation_event(agent, config, rate_limiter, circuit_breaker, event).await?; 62 64 63 65 debug!("Successfully labeled account: {}", did); 64 66 ··· 70 72 agent: &Agent<MemoryCredentialSession>, 71 73 config: &Config, 72 74 rate_limiter: &RateLimiter, 75 + circuit_breaker: Option<&CircuitBreaker>, 73 76 did: &Did<'a>, 74 77 reason: ReasonType<'static>, 75 78 check_comment: &str, ··· 103 106 }) 104 107 .build(); 105 108 106 - send_moderation_event(agent, config, rate_limiter, event).await?; 109 + send_moderation_event(agent, config, rate_limiter, circuit_breaker, event).await?; 107 110 108 111 debug!("Successfully reported account: {}", did); 109 112 ··· 115 118 agent: &Agent<MemoryCredentialSession>, 116 119 config: &Config, 117 120 rate_limiter: &RateLimiter, 121 + circuit_breaker: Option<&CircuitBreaker>, 118 122 did: &Did<'a>, 119 123 comment: &str, 120 124 created_by: &Did<'a>, ··· 136 140 ))) 137 141 .build(); 138 142 139 - send_moderation_event(agent, config, rate_limiter, event).await?; 143 + send_moderation_event(agent, config, rate_limiter, circuit_breaker, event).await?; 140 144 141 145 debug!("Successfully took down account: {}", did); 142 146
+15
src/moderation/helpers.rs
··· 13 13 14 14 use crate::config::Config; 15 15 use crate::moderation::rate_limiter::RateLimiter; 16 + use crate::resilience::CircuitBreaker; 16 17 17 18 pub fn build_timestamped_comment(check_comment: &str, uri: &str, phash: &str) -> String { 18 19 let timestamp = chrono::Utc::now().to_rfc3339(); ··· 62 63 agent: &Agent<MemoryCredentialSession>, 63 64 config: &Config, 64 65 rate_limiter: &RateLimiter, 66 + circuit_breaker: Option<&CircuitBreaker>, 65 67 event: EmitEvent<'a>, 66 68 ) -> Result<()> { 67 69 const MAX_RETRIES: u32 = 3; ··· 69 71 let mut backoff = Duration::from_millis(100); 70 72 71 73 loop { 74 + if let Some(cb) = circuit_breaker { 75 + if !cb.is_available().await { 76 + warn!("Circuit breaker '{}' is open, rejecting Ozone API call", cb.name()); 77 + return Err(miette::miette!("Circuit breaker open for Ozone API")); 78 + } 79 + } 80 + 72 81 rate_limiter.wait().await; 73 82 74 83 let opts = build_moderation_call_opts(config); 75 84 match agent.send_with_opts(event.clone(), opts).await.into_diagnostic() { 76 85 Ok(_) => { 77 86 debug!("Moderation event sent successfully"); 87 + if let Some(cb) = circuit_breaker { 88 + cb.record_success().await; 89 + } 78 90 return Ok(()); 79 91 } 80 92 Err(e) => { ··· 94 106 "Moderation API call failed (attempt {}/{}): {} (transient: {})", 95 107 retry_count, MAX_RETRIES, error_msg, is_transient 96 108 ); 109 + if let Some(cb) = circuit_breaker { 110 + cb.record_failure().await; 111 + } 97 112 return Err(e); 98 113 } 99 114
+7 -3
src/moderation/post.rs
··· 19 19 use crate::moderation::{ 20 20 build_mod_tool_meta, build_timestamped_comment, send_moderation_event, 21 21 }; 22 + use crate::resilience::CircuitBreaker; 22 23 23 24 /// Label a post with a specific label via Ozone moderation API 24 25 pub async fn label_post<'a>( 25 26 agent: &Agent<MemoryCredentialSession>, 26 27 config: &Config, 27 28 rate_limiter: &RateLimiter, 29 + circuit_breaker: Option<&CircuitBreaker>, 28 30 post_uri: &AtUri<'a>, 29 31 post_cid: &Cid<'a>, 30 32 label_val: &str, ··· 59 61 }) 60 62 .build(); 61 63 62 - send_moderation_event(agent, config, rate_limiter, event).await?; 64 + send_moderation_event(agent, config, rate_limiter, circuit_breaker, event).await?; 63 65 64 66 debug!("Successfully labeled post: {}", post_uri); 65 67 ··· 71 73 agent: &Agent<MemoryCredentialSession>, 72 74 config: &Config, 73 75 rate_limiter: &RateLimiter, 76 + circuit_breaker: Option<&CircuitBreaker>, 74 77 post_uri: &AtUri<'a>, 75 78 _post_cid: &Cid<'a>, 76 79 post_did: &Did<'a>, ··· 105 108 }) 106 109 .build(); 107 110 108 - send_moderation_event(agent, config, rate_limiter, event).await?; 111 + send_moderation_event(agent, config, rate_limiter, circuit_breaker, event).await?; 109 112 110 113 debug!("Successfully reported post: {}", post_uri); 111 114 ··· 117 120 agent: &Agent<MemoryCredentialSession>, 118 121 config: &Config, 119 122 rate_limiter: &RateLimiter, 123 + circuit_breaker: Option<&CircuitBreaker>, 120 124 post_uri: &AtUri<'a>, 121 125 post_cid: &Cid<'a>, 122 126 comment: &str, ··· 140 144 ))) 141 145 .build(); 142 146 143 - send_moderation_event(agent, config, rate_limiter, event).await?; 147 + send_moderation_event(agent, config, rate_limiter, circuit_breaker, event).await?; 144 148 145 149 debug!("Successfully took down post: {}", post_uri); 146 150
+2 -2
src/moderation/rate_limiter.rs
··· 48 48 #[tokio::test] 49 49 async fn test_rate_limiter() { 50 50 // 100ms between requests = 10 requests per second 51 - let limiter = RateLimiter::new(100); 51 + let limiter = RateLimiter::new(100).unwrap(); 52 52 53 53 let start = Instant::now(); 54 54 ··· 66 66 #[tokio::test] 67 67 async fn test_rate_limiter_concurrent() { 68 68 // 100ms between requests = 10 requests per second 69 - let limiter = RateLimiter::new(100); 69 + let limiter = RateLimiter::new(100).unwrap(); 70 70 71 71 let start = Instant::now(); 72 72
+37 -1
src/plc/mod.rs
··· 4 4 use tracing::{debug, error, info, warn}; 5 5 6 6 use crate::config::PlcConfig; 7 + use crate::resilience::CircuitBreaker; 7 8 8 9 #[derive(Debug, Deserialize)] 9 10 pub struct DidDocument { ··· 26 27 pub struct PlcClient { 27 28 client: Client, 28 29 endpoints: Vec<String>, 30 + circuit_breaker: Option<CircuitBreaker>, 29 31 } 30 32 31 33 impl PlcClient { ··· 34 36 let mut endpoints = vec![config.endpoint.clone()]; 35 37 endpoints.extend(config.fallback_endpoints.clone()); 36 38 37 - Self { client, endpoints } 39 + Self { 40 + client, 41 + endpoints, 42 + circuit_breaker: None, 43 + } 44 + } 45 + 46 + /// Create a new PLC client with circuit breaker protection 47 + pub fn with_circuit_breaker( 48 + client: Client, 49 + config: &PlcConfig, 50 + circuit_breaker: CircuitBreaker, 51 + ) -> Self { 52 + let mut endpoints = vec![config.endpoint.clone()]; 53 + endpoints.extend(config.fallback_endpoints.clone()); 54 + 55 + Self { 56 + client, 57 + endpoints, 58 + circuit_breaker: Some(circuit_breaker), 59 + } 38 60 } 39 61 40 62 /// Resolve a DID to its DID document with automatic failover 41 63 pub async fn resolve_did(&self, did: &str) -> Result<DidDocument> { 64 + if let Some(cb) = &self.circuit_breaker { 65 + if !cb.is_available().await { 66 + warn!("Circuit breaker '{}' is open, rejecting PLC resolution", cb.name()); 67 + return Err(miette::miette!("Circuit breaker open for PLC resolution")); 68 + } 69 + } 70 + 42 71 let mut last_error = None; 43 72 44 73 for (idx, endpoint) in self.endpoints.iter().enumerate() { ··· 56 85 idx, 57 86 did 58 87 ); 88 + } 89 + if let Some(cb) = &self.circuit_breaker { 90 + cb.record_success().await; 59 91 } 60 92 return Ok(doc); 61 93 } ··· 87 119 did, 88 120 self.endpoints.len() 89 121 ); 122 + 123 + if let Some(cb) = &self.circuit_breaker { 124 + cb.record_failure().await; 125 + } 90 126 91 127 Err(last_error.unwrap_or_else(|| { 92 128 miette::miette!("All PLC endpoints failed for DID: {}", did)
+25 -9
src/processor/matcher.rs
··· 6 6 7 7 use crate::config::Config; 8 8 use crate::processor::phash; 9 + use crate::resilience::CircuitBreaker; 9 10 use crate::types::{BlobCheck, BlobReference, ImageJob, MatchResult}; 10 11 11 12 /// Load blob checks from a JSON file ··· 20 21 pub async fn download_blob( 21 22 client: &Client, 22 23 config: &Config, 24 + circuit_breaker: &CircuitBreaker, 23 25 did: &str, 24 26 cid: &str, 25 27 ) -> Result<Vec<u8>> { ··· 80 82 start.elapsed().as_secs_f64() 81 83 ); 82 84 85 + // Check circuit breaker before attempting PDS 86 + if !circuit_breaker.is_available().await { 87 + warn!("Circuit breaker '{}' is open, rejecting PDS blob download", circuit_breaker.name()); 88 + return Err(miette::miette!("Circuit breaker open for PDS blob download")); 89 + } 90 + 83 91 // Check if we've exceeded total timeout before PDS attempt 84 92 if start.elapsed() > total_timeout { 85 93 warn!("Blob download total timeout exceeded before PDS fallback"); ··· 96 104 97 105 debug!("Downloading from PDS: {} (timeout: {}s)", pds_url, config.processing.blob_download_timeout_secs); 98 106 99 - let response = client 107 + match client 100 108 .get(&pds_url) 101 109 .timeout(per_attempt_timeout) 102 110 .send() 103 111 .await 104 112 .into_diagnostic() 105 - .map_err(|e| miette::miette!("PDS blob download failed: {}", e))? 106 - .error_for_status() 107 - .into_diagnostic()?; 108 - 109 - let bytes = response.bytes().await.into_diagnostic()?; 110 - Ok(bytes.to_vec()) 113 + .and_then(|resp| resp.error_for_status().into_diagnostic()) 114 + { 115 + Ok(response) => { 116 + let bytes = response.bytes().await.into_diagnostic()?; 117 + circuit_breaker.record_success().await; 118 + Ok(bytes.to_vec()) 119 + } 120 + Err(e) => { 121 + circuit_breaker.record_failure().await; 122 + Err(miette::miette!("PDS blob download failed: {}", e)) 123 + } 124 + } 111 125 } 112 126 113 127 /// Match a computed phash against blob checks ··· 158 172 pub async fn process_blob( 159 173 client: &Client, 160 174 config: &Config, 175 + circuit_breaker: &CircuitBreaker, 161 176 blob_checks: &[BlobCheck], 162 177 did: &str, 163 178 blob: &BlobReference, 164 179 ) -> Result<Option<MatchResult>> { 165 - let image_bytes = download_blob(client, config, did, &blob.cid).await?; 180 + let image_bytes = download_blob(client, config, circuit_breaker, did, &blob.cid).await?; 166 181 let phash = phash::compute_phash(&image_bytes)?; 167 182 debug!("Computed phash for blob {}: {}", blob.cid, phash); 168 183 ··· 175 190 pub async fn process_image_job( 176 191 client: &Client, 177 192 config: &Config, 193 + circuit_breaker: &CircuitBreaker, 178 194 blob_checks: &[BlobCheck], 179 195 job: &ImageJob, 180 196 ) -> Result<Vec<MatchResult>> { ··· 187 203 let mut matches = Vec::new(); 188 204 189 205 for blob in &job.blobs { 190 - match process_blob(client, config, blob_checks, &job.post_did, blob).await { 206 + match process_blob(client, config, circuit_breaker, blob_checks, &job.post_did, blob).await { 191 207 Ok(Some(result)) => { 192 208 matches.push(result); 193 209 }
+23 -3
src/queue/worker.rs
··· 15 15 use crate::moderation::{account, claims, post, rate_limiter::RateLimiter}; 16 16 use crate::processor::matcher; 17 17 use crate::queue::redis_queue::JobQueue; 18 + use crate::resilience::CircuitBreaker; 18 19 use crate::types::{BlobCheck, ImageJob, MatchResult}; 19 20 20 21 /// Macro to handle moderation actions with claim checking ··· 63 64 blob_checks: Vec<BlobCheck>, 64 65 metrics: Metrics, 65 66 rate_limiter: RateLimiter, 67 + ozone_circuit_breaker: CircuitBreaker, 68 + pds_circuit_breaker: CircuitBreaker, 66 69 } 67 70 68 71 impl WorkerPool { ··· 73 76 agent: AgentSession, 74 77 blob_checks: Vec<BlobCheck>, 75 78 metrics: Metrics, 79 + ozone_circuit_breaker: CircuitBreaker, 80 + pds_circuit_breaker: CircuitBreaker, 76 81 ) -> Result<Self> { 77 82 let rate_limiter = RateLimiter::new(config.moderation.rate_limit)?; 78 83 ··· 83 88 blob_checks, 84 89 metrics, 85 90 rate_limiter, 91 + ozone_circuit_breaker, 92 + pds_circuit_breaker, 86 93 }) 87 94 } 88 95 ··· 90 97 /// Concurrency is achieved by running multiple instances of this concurrently 91 98 pub async fn start( 92 99 &self, 93 - mut queue: JobQueue, 100 + queue: JobQueue, 94 101 mut cache: PhashCache, 95 102 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, 96 103 ) -> Result<()> { ··· 123 130 &self.blob_checks, 124 131 &self.metrics, 125 132 &self.rate_limiter, 133 + &self.ozone_circuit_breaker, 134 + &self.pds_circuit_breaker, 126 135 &mut cache, 127 136 &mut redis_conn, 128 137 job, ··· 160 169 blob_checks: &[BlobCheck], 161 170 metrics: &Metrics, 162 171 rate_limiter: &RateLimiter, 172 + ozone_circuit_breaker: &CircuitBreaker, 173 + pds_circuit_breaker: &CircuitBreaker, 163 174 cache: &mut PhashCache, 164 175 redis_conn: &mut redis::aio::MultiplexedConnection, 165 176 job: ImageJob, ··· 168 179 debug!("Processing job: {}", job.post_uri); 169 180 170 181 let matches = 171 - Self::process_job_blobs(config, client, blob_checks, metrics, cache, &job).await?; 182 + Self::process_job_blobs(config, client, blob_checks, metrics, pds_circuit_breaker, cache, &job).await?; 172 183 173 184 if matches.is_empty() { 174 185 debug!("No matches found for job: {}", job.post_uri); ··· 183 194 agent, 184 195 metrics, 185 196 rate_limiter, 197 + ozone_circuit_breaker, 186 198 redis_conn, 187 199 &job, 188 200 &match_result, ··· 208 220 client: &Client, 209 221 blob_checks: &[BlobCheck], 210 222 metrics: &Metrics, 223 + pds_circuit_breaker: &CircuitBreaker, 211 224 cache: &mut PhashCache, 212 225 job: &ImageJob, 213 226 ) -> Result<Vec<MatchResult>> { ··· 227 240 228 241 // Download and compute 229 242 let image_bytes = 230 - matcher::download_blob(client, config, &job.post_did, &blob.cid).await?; 243 + matcher::download_blob(client, config, pds_circuit_breaker, &job.post_did, &blob.cid).await?; 231 244 let computed_phash = crate::processor::phash::compute_phash(&image_bytes)?; 232 245 233 246 // Store in cache ··· 256 269 agent: &Arc<Agent<MemoryCredentialSession>>, 257 270 metrics: &Metrics, 258 271 rate_limiter: &RateLimiter, 272 + ozone_circuit_breaker: &CircuitBreaker, 259 273 redis_conn: &mut redis::aio::MultiplexedConnection, 260 274 job: &ImageJob, 261 275 match_result: &MatchResult, ··· 277 291 agent.as_ref(), 278 292 config, 279 293 rate_limiter, 294 + Some(ozone_circuit_breaker), 280 295 &job.post_uri, 281 296 &job.post_cid, 282 297 &job.post_did, ··· 298 313 agent.as_ref(), 299 314 config, 300 315 rate_limiter, 316 + Some(ozone_circuit_breaker), 301 317 &job.post_uri, 302 318 &job.post_cid, 303 319 &check.label, ··· 319 335 agent.as_ref(), 320 336 config, 321 337 rate_limiter, 338 + Some(ozone_circuit_breaker), 322 339 &job.post_did, 323 340 ReasonType::ComAtprotoModerationDefsReasonSpam, 324 341 &check.comment, ··· 339 356 agent.as_ref(), 340 357 config, 341 358 rate_limiter, 359 + Some(ozone_circuit_breaker), 342 360 &job.post_did, 343 361 &check.label, 344 362 &check.comment, ··· 366 384 blob_checks: self.blob_checks.clone(), 367 385 metrics: self.metrics.clone(), 368 386 rate_limiter: self.rate_limiter.clone(), 387 + ozone_circuit_breaker: self.ozone_circuit_breaker.clone(), 388 + pds_circuit_breaker: self.pds_circuit_breaker.clone(), 369 389 } 370 390 } 371 391 }
+332
src/resilience/circuit_breaker.rs
··· 1 + use std::sync::Arc; 2 + use std::time::{Duration, Instant}; 3 + use tokio::sync::RwLock; 4 + use tracing::{debug, warn}; 5 + 6 + use crate::metrics::Metrics; 7 + 8 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 9 + pub enum CircuitState { 10 + Closed, 11 + Open, 12 + HalfOpen, 13 + } 14 + 15 + #[derive(Debug, Clone)] 16 + struct CircuitBreakerState { 17 + state: CircuitState, 18 + failure_count: u32, 19 + success_count: u32, 20 + last_failure_time: Option<Instant>, 21 + half_open_attempts: u32, 22 + } 23 + 24 + #[derive(Clone)] 25 + pub struct CircuitBreaker { 26 + name: String, 27 + failure_threshold: u32, 28 + timeout: Duration, 29 + half_open_max_calls: u32, 30 + state: Arc<RwLock<CircuitBreakerState>>, 31 + metrics: Option<Metrics>, 32 + } 33 + 34 + impl CircuitBreaker { 35 + pub fn new( 36 + name: impl Into<String>, 37 + failure_threshold: u32, 38 + timeout_secs: u64, 39 + half_open_max_calls: u32, 40 + ) -> Self { 41 + Self { 42 + name: name.into(), 43 + failure_threshold, 44 + timeout: Duration::from_secs(timeout_secs), 45 + half_open_max_calls, 46 + state: Arc::new(RwLock::new(CircuitBreakerState { 47 + state: CircuitState::Closed, 48 + failure_count: 0, 49 + success_count: 0, 50 + last_failure_time: None, 51 + half_open_attempts: 0, 52 + })), 53 + metrics: None, 54 + } 55 + } 56 + 57 + pub fn with_metrics( 58 + name: impl Into<String>, 59 + failure_threshold: u32, 60 + timeout_secs: u64, 61 + half_open_max_calls: u32, 62 + metrics: Metrics, 63 + ) -> Self { 64 + Self { 65 + name: name.into(), 66 + failure_threshold, 67 + timeout: Duration::from_secs(timeout_secs), 68 + half_open_max_calls, 69 + state: Arc::new(RwLock::new(CircuitBreakerState { 70 + state: CircuitState::Closed, 71 + failure_count: 0, 72 + success_count: 0, 73 + last_failure_time: None, 74 + half_open_attempts: 0, 75 + })), 76 + metrics: Some(metrics), 77 + } 78 + } 79 + 80 + pub async fn is_available(&self) -> bool { 81 + let mut state = self.state.write().await; 82 + 83 + match state.state { 84 + CircuitState::Closed => true, 85 + CircuitState::Open => { 86 + if let Some(last_failure) = state.last_failure_time { 87 + if last_failure.elapsed() >= self.timeout { 88 + debug!( 89 + "Circuit breaker '{}' transitioning from Open to HalfOpen (timeout elapsed)", 90 + self.name 91 + ); 92 + state.state = CircuitState::HalfOpen; 93 + state.half_open_attempts = 1; 94 + state.success_count = 0; 95 + if let Some(metrics) = &self.metrics { 96 + metrics.inc_circuit_breaker_transitions(); 97 + } 98 + true 99 + } else { 100 + if let Some(metrics) = &self.metrics { 101 + metrics.inc_circuit_breaker_rejections(); 102 + } 103 + false 104 + } 105 + } else { 106 + if let Some(metrics) = &self.metrics { 107 + metrics.inc_circuit_breaker_rejections(); 108 + } 109 + false 110 + } 111 + } 112 + CircuitState::HalfOpen => { 113 + if state.half_open_attempts < self.half_open_max_calls { 114 + state.half_open_attempts += 1; 115 + true 116 + } else { 117 + if let Some(metrics) = &self.metrics { 118 + metrics.inc_circuit_breaker_rejections(); 119 + } 120 + false 121 + } 122 + } 123 + } 124 + } 125 + 126 + pub async fn record_success(&self) { 127 + let mut state = self.state.write().await; 128 + 129 + match state.state { 130 + CircuitState::Closed => { 131 + state.failure_count = 0; 132 + } 133 + CircuitState::HalfOpen => { 134 + state.success_count += 1; 135 + if state.success_count >= 1 { 136 + debug!( 137 + "Circuit breaker '{}' transitioning from HalfOpen to Closed (success threshold met)", 138 + self.name 139 + ); 140 + state.state = CircuitState::Closed; 141 + state.failure_count = 0; 142 + state.success_count = 0; 143 + state.half_open_attempts = 0; 144 + state.last_failure_time = None; 145 + if let Some(metrics) = &self.metrics { 146 + metrics.inc_circuit_breaker_transitions(); 147 + } 148 + } 149 + } 150 + CircuitState::Open => {} 151 + } 152 + } 153 + 154 + pub async fn record_failure(&self) { 155 + let mut state = self.state.write().await; 156 + 157 + match state.state { 158 + CircuitState::Closed => { 159 + state.failure_count += 1; 160 + state.last_failure_time = Some(Instant::now()); 161 + 162 + if state.failure_count >= self.failure_threshold { 163 + warn!( 164 + "Circuit breaker '{}' transitioning from Closed to Open (failure threshold {} reached)", 165 + self.name, self.failure_threshold 166 + ); 167 + state.state = CircuitState::Open; 168 + if let Some(metrics) = &self.metrics { 169 + metrics.inc_circuit_breaker_transitions(); 170 + } 171 + } 172 + } 173 + CircuitState::HalfOpen => { 174 + warn!( 175 + "Circuit breaker '{}' transitioning from HalfOpen to Open (failure during half-open)", 176 + self.name 177 + ); 178 + state.state = CircuitState::Open; 179 + state.failure_count = self.failure_threshold; 180 + state.success_count = 0; 181 + state.half_open_attempts = 0; 182 + state.last_failure_time = Some(Instant::now()); 183 + if let Some(metrics) = &self.metrics { 184 + metrics.inc_circuit_breaker_transitions(); 185 + } 186 + } 187 + CircuitState::Open => { 188 + state.last_failure_time = Some(Instant::now()); 189 + } 190 + } 191 + } 192 + 193 + pub async fn get_state(&self) -> CircuitState { 194 + self.state.read().await.state 195 + } 196 + 197 + pub fn name(&self) -> &str { 198 + &self.name 199 + } 200 + } 201 + 202 + #[cfg(test)] 203 + mod tests { 204 + use super::*; 205 + 206 + #[tokio::test] 207 + async fn test_circuit_breaker_starts_closed() { 208 + let cb = CircuitBreaker::new("test", 3, 60, 1); 209 + assert_eq!(cb.get_state().await, CircuitState::Closed); 210 + assert!(cb.is_available().await); 211 + } 212 + 213 + #[tokio::test] 214 + async fn test_circuit_breaker_opens_after_threshold() { 215 + let cb = CircuitBreaker::new("test", 3, 60, 1); 216 + 217 + assert!(cb.is_available().await); 218 + cb.record_failure().await; 219 + assert_eq!(cb.get_state().await, CircuitState::Closed); 220 + 221 + assert!(cb.is_available().await); 222 + cb.record_failure().await; 223 + assert_eq!(cb.get_state().await, CircuitState::Closed); 224 + 225 + assert!(cb.is_available().await); 226 + cb.record_failure().await; 227 + assert_eq!(cb.get_state().await, CircuitState::Open); 228 + 229 + assert!(!cb.is_available().await); 230 + } 231 + 232 + #[tokio::test] 233 + async fn test_circuit_breaker_half_open_after_timeout() { 234 + let cb = CircuitBreaker::new("test", 3, 1, 1); 235 + 236 + for _ in 0..3 { 237 + assert!(cb.is_available().await); 238 + cb.record_failure().await; 239 + } 240 + 241 + assert_eq!(cb.get_state().await, CircuitState::Open); 242 + assert!(!cb.is_available().await); 243 + 244 + tokio::time::sleep(Duration::from_secs(2)).await; 245 + 246 + assert!(cb.is_available().await); 247 + assert_eq!(cb.get_state().await, CircuitState::HalfOpen); 248 + } 249 + 250 + #[tokio::test] 251 + async fn test_circuit_breaker_closes_on_success_during_half_open() { 252 + let cb = CircuitBreaker::new("test", 3, 1, 1); 253 + 254 + for _ in 0..3 { 255 + assert!(cb.is_available().await); 256 + cb.record_failure().await; 257 + } 258 + 259 + assert_eq!(cb.get_state().await, CircuitState::Open); 260 + 261 + tokio::time::sleep(Duration::from_secs(2)).await; 262 + 263 + assert!(cb.is_available().await); 264 + assert_eq!(cb.get_state().await, CircuitState::HalfOpen); 265 + 266 + cb.record_success().await; 267 + assert_eq!(cb.get_state().await, CircuitState::Closed); 268 + assert!(cb.is_available().await); 269 + } 270 + 271 + #[tokio::test] 272 + async fn test_circuit_breaker_reopens_on_failure_during_half_open() { 273 + let cb = CircuitBreaker::new("test", 3, 1, 1); 274 + 275 + for _ in 0..3 { 276 + assert!(cb.is_available().await); 277 + cb.record_failure().await; 278 + } 279 + 280 + assert_eq!(cb.get_state().await, CircuitState::Open); 281 + 282 + tokio::time::sleep(Duration::from_secs(2)).await; 283 + 284 + assert!(cb.is_available().await); 285 + assert_eq!(cb.get_state().await, CircuitState::HalfOpen); 286 + 287 + cb.record_failure().await; 288 + assert_eq!(cb.get_state().await, CircuitState::Open); 289 + assert!(!cb.is_available().await); 290 + } 291 + 292 + #[tokio::test] 293 + async fn test_circuit_breaker_half_open_max_calls() { 294 + let cb = CircuitBreaker::new("test", 3, 1, 2); 295 + 296 + for _ in 0..3 { 297 + assert!(cb.is_available().await); 298 + cb.record_failure().await; 299 + } 300 + 301 + tokio::time::sleep(Duration::from_secs(2)).await; 302 + 303 + assert!(cb.is_available().await); 304 + assert_eq!(cb.get_state().await, CircuitState::HalfOpen); 305 + 306 + assert!(cb.is_available().await); 307 + assert_eq!(cb.get_state().await, CircuitState::HalfOpen); 308 + 309 + assert!(!cb.is_available().await); 310 + } 311 + 312 + #[tokio::test] 313 + async fn test_circuit_breaker_success_resets_failures_when_closed() { 314 + let cb = CircuitBreaker::new("test", 3, 60, 1); 315 + 316 + assert!(cb.is_available().await); 317 + cb.record_failure().await; 318 + assert!(cb.is_available().await); 319 + cb.record_failure().await; 320 + 321 + assert_eq!(cb.get_state().await, CircuitState::Closed); 322 + 323 + cb.record_success().await; 324 + 325 + assert!(cb.is_available().await); 326 + cb.record_failure().await; 327 + assert!(cb.is_available().await); 328 + cb.record_failure().await; 329 + 330 + assert_eq!(cb.get_state().await, CircuitState::Closed); 331 + } 332 + }
+3
src/resilience/mod.rs
··· 1 + pub mod circuit_breaker; 2 + 3 + pub use circuit_breaker::CircuitBreaker;