A rust implementation of skywatch-phash

feat(deps): Update jacquard dependencies to 0.8.0

This commit updates the following jacquard dependencies:

* jacquard * jacquard-api * jacquard-common * jacquard-derive *
jacquard-identity * jacquard-oauth

This brings in bug fixes, new features, and performance improvements
from the latest versions of these crates.

feat(deps): Update jacquard dependencies to 0.8.0

Skywatch 2795508e 188ba7d9

Changed files
+289 -316
rules
src
agent
cache
jetstream
moderation
processor
queue
types
+18 -19
ARCHITECTURE.md
··· 64 64 cursor = fs.readFileSync("cursor.txt") || Math.floor(Date.now() * 1000) 65 65 66 66 // 2. Connect to Jetstream 67 - jetstream = new Jetstream({ 67 + jetstream = new Jetstream({ 68 68 endpoint: "wss://jetstream1.us-east.fire.hose.cam/subscribe", 69 69 cursor, 70 70 wantedCollections: ["app.bsky.feed.post"] ··· 74 74 jetstream.onCreate("app.bsky.feed.post", async (event) => { 75 75 // Extract image blobs (CIDs) from post 76 76 const blobs = extractBlobsFromEvent(event) 77 - 77 + 78 78 // Enqueue for processing 79 79 const job = { postUri, postCid, postDid, blobs, timestamp, attempts: 0 } 80 80 await queue.enqueue(job) ··· 86 86 87 87 // 5. On match found, execute moderation actions 88 88 worker.onMatchFound((postUri, postCid, postDid, match) => { 89 - if (match.matchedCheck.toLabel) 89 + if (match.matchedCheck.toLabel) 90 90 await createPostLabel(...) 91 91 if (match.matchedCheck.reportPost) 92 92 await createPostReport(...) ··· 149 149 // Step 1: Decode image via Sharp 150 150 const image = sharp(buffer) 151 151 const metadata = await image.metadata() 152 - 152 + 153 153 // Step 2: Resize to 8x8 grayscale 154 154 // CRITICAL: fit: "fill" preserves aspect ratio, may add padding 155 155 const resized = await image ··· 157 157 .grayscale() 158 158 .raw() 159 159 .toBuffer() 160 - 160 + 161 161 // Step 3: Extract pixel values (Uint8Array, 0-255 range) 162 162 const pixels = new Uint8Array(resized) 163 - 163 + 164 164 // Step 4: Compute average brightness 165 165 const avg = pixels.reduce((sum, val) => sum + val, 0) / pixels.length 166 - 166 + 167 167 // Step 5: Create 64-bit hash (8x8 = 64 pixels) 168 168 let hash = "" 169 169 for (let i = 0; i < pixels.length; i++) { 170 170 hash += pixels[i] > avg ? "1" : "0" 171 171 } 172 - 172 + 173 173 // Step 6: Convert binary string to hex (16 character string) 174 174 return BigInt(`0b${hash}`).toString(16).padStart(16, "0") 175 175 } ··· 193 193 - Sharp converts RGB to single channel (standard luminosity formula) 194 194 - Range: 0-255 195 195 196 - 3. **No normalization:** 196 + 3. **No normalization:** 197 197 - Raw pixel values compared to mean (not normalized) 198 198 - This is correct for perceptual hashing 199 199 ··· 213 213 // Convert hex to BigInt 214 214 const a = BigInt(`0x${hash1}`) 215 215 const b = BigInt(`0x${hash2}`) 216 - 216 + 217 217 // XOR finds differing bits 218 218 const xor = a ^ b 219 - 219 + 220 220 // Count set bits (Brian Kernighan's algorithm) 221 221 let count = 0 222 222 let n = xor ··· 224 224 count++ 225 225 n &= n - 1n // Remove rightmost set bit 226 226 } 227 - 227 + 228 228 return count 229 229 } 230 230 231 231 function findMatch(phash: string, checks: BlobCheck[]): BlobCheck | null { 232 232 for (const check of checks) { 233 233 const threshold = check.hammingThreshold ?? 5 234 - 234 + 235 235 for (const checkPhash of check.phashes) { 236 236 const distance = hammingDistance(phash, checkPhash) 237 - 237 + 238 238 if (distance <= threshold) { 239 239 return check // First match wins 240 240 } 241 241 } 242 242 } 243 - 243 + 244 244 return null 245 245 } 246 246 ``` ··· 363 363 // GET https://plc.directory/{did} 364 364 // Extract service with id="atproto_pds" and type="AtprotoPersonalDataServer" 365 365 // Return serviceEndpoint URL 366 - 366 + 367 367 // Cached per DID to avoid repeated lookups 368 368 } 369 369 ``` ··· 390 390 for (const checkPhash of check.phashes) { 391 391 const distance = hammingDistance(phash, checkPhash) 392 392 const threshold = check.hammingThreshold ?? defaultThreshold 393 - 393 + 394 394 if (distance <= threshold) { 395 395 return { 396 396 phash, ··· 680 680 ```typescript 681 681 class MetricsCollector { 682 682 counters: Map<string, number> 683 - 683 + 684 684 increment(metric: string, value?: number) 685 685 get(metric: string): number 686 686 getAll(): Record<string, number> ··· 1015 1015 8. **Graceful shutdown** saves cursor and exits cleanly 1016 1016 1017 1017 This is a **real-time, event-driven system** - no polling, no batch processing. Every post on Bluesky is potentially seen and processed within seconds. 1018 -
+6 -6
Cargo.lock
··· 1922 1922 1923 1923 [[package]] 1924 1924 name = "jacquard" 1925 - version = "0.7.0" 1925 + version = "0.8.0" 1926 1926 dependencies = [ 1927 1927 "bon", 1928 1928 "bytes", ··· 1954 1954 1955 1955 [[package]] 1956 1956 name = "jacquard-api" 1957 - version = "0.7.1" 1957 + version = "0.8.0" 1958 1958 dependencies = [ 1959 1959 "bon", 1960 1960 "bytes", ··· 1968 1968 1969 1969 [[package]] 1970 1970 name = "jacquard-common" 1971 - version = "0.7.0" 1971 + version = "0.8.0" 1972 1972 dependencies = [ 1973 1973 "base64 0.22.1", 1974 1974 "bon", ··· 2008 2008 2009 2009 [[package]] 2010 2010 name = "jacquard-derive" 2011 - version = "0.7.0" 2011 + version = "0.8.0" 2012 2012 dependencies = [ 2013 2013 "proc-macro2", 2014 2014 "quote", ··· 2017 2017 2018 2018 [[package]] 2019 2019 name = "jacquard-identity" 2020 - version = "0.7.0" 2020 + version = "0.8.0" 2021 2021 dependencies = [ 2022 2022 "bon", 2023 2023 "bytes", ··· 2040 2040 2041 2041 [[package]] 2042 2042 name = "jacquard-oauth" 2043 - version = "0.7.0" 2043 + version = "0.8.0" 2044 2044 dependencies = [ 2045 2045 "base64 0.22.1", 2046 2046 "bytes",
+1 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "skywatch-phash-rs" 3 - version = "0.1.0" 3 + version = "0.2.0" 4 4 edition = "2024" 5 5 authors = ["Giulia <skywatch@skywatch.blue"] 6 6 description = "Perceptual hash-based image moderation for Bluesky (Rust rewrite)"
+4 -2
rules/blobs.json
··· 6 6 "0f093139797b7967", 7 7 "fdedc3030100c0fd", 8 8 "0f7f707dcc0c0600", 9 - "87030303199dff81" 9 + "87030303199dff81", 10 + "3f17070707077f7e", 11 + "f95d1f1ffcf8fbff" 10 12 ], 11 13 "label": "troll", 12 14 "comment": "Image is used in harrassment campaign targeting Will Stancil", ··· 19 21 "ignoreDID": ["did:plc:7umvpuxe2vbrc3zrzuquzniu"] 20 22 }, 21 23 { 22 - "phashes": ["00fffd7cd8da5000"], 24 + "phashes": ["00fffd7cd8da5000", "ffbf8f83999b9b00", "00ffe300ff8000ff"], 23 25 "label": "maga-trump", 24 26 "comment": "Pro-trump imagery", 25 27 "reportAcct": true,
+1 -1
src/agent/session.rs
··· 26 26 27 27 tracing::info!("Successfully logged in as {} ({})", auth.handle, auth.did); 28 28 29 - let did: Arc<str> = Arc::from(auth.did.to_string()); 29 + let did = Arc::from(auth.did.as_str()); 30 30 let agent = Arc::new(Agent::from(session)); 31 31 32 32 Ok(Self { agent, did })
-4
src/cache/mod.rs
··· 99 99 F: FnOnce() -> Fut, 100 100 Fut: std::future::Future<Output = Result<String>>, 101 101 { 102 - // Try to get from cache 103 102 if let Some(cached) = self.get(cid).await? { 104 103 return Ok(cached); 105 104 } 106 105 107 - // Compute if not cached 108 106 let phash = compute_fn().await?; 109 - 110 - // Store in cache 111 107 self.set(cid, &phash).await?; 112 108 113 109 Ok(phash)
+31 -20
src/jetstream/events.rs
··· 1 - use jacquard_common::types::string::{AtprotoStr, Cid}; 1 + use jacquard_common::types::string::Cid; 2 2 use jacquard_common::types::value::Data; 3 3 use miette::Result; 4 4 ··· 6 6 7 7 /// Extract blob references from a post record 8 8 /// 9 - /// Handles two cases: 10 - /// 1. Direct images: record.embed.images[].image.ref.$link 11 - /// 2. Quote posts with media: record.embed.media.images[].image.ref.$link 9 + /// Handles: 10 + /// 1. Direct images: record.embed.images[].image (Blob type) 11 + /// 2. Quote posts with media: record.embed.media.images[].image (Blob type) 12 + /// 3. External link cards: record.embed.external.thumb (Blob type) 13 + /// 14 + /// TODO: Add support for app.bsky.embed.video (embed.video blob) 12 15 pub fn extract_blobs_from_record(record: &Data) -> Result<Vec<BlobReference>> { 13 16 let mut blobs = Vec::new(); 14 17 ··· 40 43 } 41 44 } 42 45 46 + // Case 3: External link card thumbnail (embed.external.thumb) 47 + if let Some(Data::Object(external)) = embed.0.get("external") { 48 + if let Some(blob_ref) = extract_blob_direct(external.0.get("thumb")) { 49 + blobs.push(blob_ref); 50 + } 51 + } 52 + 43 53 Ok(blobs) 44 54 } 45 55 46 - /// Extract a single blob reference from an image object 56 + /// Extract a single blob reference from an image object (embed.images[].image) 47 57 fn extract_blob_from_image(img: &Data) -> Option<BlobReference> { 48 58 use jacquard_common::IntoStatic; 59 + use jacquard_common::types::blob::Blob; 49 60 50 61 let Data::Object(img_obj) = img else { 51 62 return None; 52 63 }; 53 64 54 - let Data::Object(image_obj) = img_obj.0.get("image")? else { 65 + // The image field is directly a Blob type in jacquard 66 + let Data::Blob(Blob { r#ref, mime_type, .. }) = img_obj.0.get("image")? else { 55 67 return None; 56 68 }; 57 69 58 - let Data::Object(ref_obj) = image_obj.0.get("ref")? else { 59 - return None; 60 - }; 70 + Some(BlobReference { 71 + cid: Cid::str(r#ref.as_str()).into_static(), 72 + mime_type: Some(mime_type.to_string().into()), 73 + }) 74 + } 75 + 76 + /// Extract a blob reference directly from a Data::Blob (for external.thumb, etc.) 77 + fn extract_blob_direct(data: Option<&Data>) -> Option<BlobReference> { 78 + use jacquard_common::IntoStatic; 79 + use jacquard_common::types::blob::Blob; 61 80 62 - let Data::String(AtprotoStr::String(cid_str)) = ref_obj.0.get("$link")? else { 81 + let Data::Blob(Blob { r#ref, mime_type, .. }) = data? else { 63 82 return None; 64 83 }; 65 84 66 - let mime_type = image_obj 67 - .0 68 - .get("mimeType") 69 - .and_then(|v| match v { 70 - Data::String(AtprotoStr::String(s)) => Some(s.to_string().into()), 71 - _ => None, 72 - }); 73 - 74 85 Some(BlobReference { 75 - cid: Cid::str(cid_str.as_str()).into_static(), 76 - mime_type, 86 + cid: Cid::str(r#ref.as_str()).into_static(), 87 + mime_type: Some(mime_type.to_string().into()), 77 88 }) 78 89 } 79 90
+54 -34
src/jetstream/mod.rs
··· 1 + use futures::StreamExt; 2 + use jacquard_common::IntoStatic; 1 3 use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; 2 4 use jacquard_common::types::string::AtUri; 3 5 use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 4 - use jacquard_common::IntoStatic; 5 6 use miette::{IntoDiagnostic, Result}; 6 - use futures::StreamExt; 7 7 use tokio::sync::mpsc; 8 8 use tracing::{debug, error, info, warn}; 9 9 use url::Url; ··· 54 54 let mut message_count = 0u64; 55 55 let mut last_cursor: Option<i64> = None; 56 56 let mut cursor_update_interval = tokio::time::interval(std::time::Duration::from_secs(10)); 57 + let mut heartbeat_interval = tokio::time::interval(std::time::Duration::from_secs(30)); 57 58 58 59 loop { 59 60 tokio::select! { 60 - Some(result) = messages.next() => { 61 - match result { 62 - Ok(msg) => { 63 - message_count += 1; 64 - if message_count % 1000 == 0 { 65 - debug!("Processed {} messages", message_count); 66 - } 61 + _ = heartbeat_interval.tick() => { 62 + info!("Jetstream heartbeat: {} messages received so far", message_count); 63 + } 64 + msg_option = messages.next() => { 65 + match msg_option { 66 + Some(result) => { 67 + match result { 68 + Ok(msg) => { 69 + message_count += 1; 70 + if message_count % 100 == 0 { 71 + debug!("Processed {} messages", message_count); 72 + } 73 + if message_count == 1 { 74 + info!("First message received!"); 75 + } 67 76 68 - // Extract cursor from message 69 - let cursor = match &msg { 70 - JetstreamMessage::Commit { time_us, .. } => Some(*time_us), 71 - JetstreamMessage::Identity { time_us, .. } => Some(*time_us), 72 - JetstreamMessage::Account { time_us, .. } => Some(*time_us), 73 - }; 77 + // Extract cursor from message 78 + let cursor = match &msg { 79 + JetstreamMessage::Commit { time_us, .. } => Some(*time_us), 80 + JetstreamMessage::Identity { time_us, .. } => Some(*time_us), 81 + JetstreamMessage::Account { time_us, .. } => Some(*time_us), 82 + }; 74 83 75 - if let Some(c) = cursor { 76 - last_cursor = Some(c); 77 - } 84 + if let Some(c) = cursor { 85 + last_cursor = Some(c); 86 + } 78 87 79 - if let Err(e) = self.process_message(msg, &job_sender) { 80 - error!("Error processing message: {}", e); 88 + if let Err(e) = self.process_message(msg, &job_sender) { 89 + error!("Error processing message: {}", e); 90 + } 91 + } 92 + Err(e) => { 93 + error!("Jetstream error: {}", e); 94 + } 81 95 } 82 96 } 83 - Err(e) => { 84 - error!("Jetstream error: {}", e); 97 + None => { 98 + warn!("Jetstream stream ended unexpectedly"); 99 + break; 85 100 } 86 101 } 87 102 } ··· 128 143 } => { 129 144 // Only process create operations on posts 130 145 if commit.collection.as_ref() != "app.bsky.feed.post" { 146 + debug!("Skipping non-post collection: {}", commit.collection); 131 147 return Ok(()); 132 148 } 133 149 134 150 if !matches!(commit.operation, CommitOperation::Create) { 151 + debug!("Skipping non-create operation"); 135 152 return Ok(()); 136 153 } 137 154 138 155 // Parse record to extract blobs (skip if no record) 139 156 let Some(record_data) = &commit.record else { 157 + debug!("Skipping post with no record"); 140 158 return Ok(()); 141 159 }; 142 160 143 161 let blobs = events::extract_blobs_from_record(record_data)?; 144 162 145 163 if blobs.is_empty() { 164 + debug!("Post has no blobs"); 146 165 return Ok(()); 147 166 } 148 167 168 + debug!("Found post with {} blob(s)!", blobs.len()); 169 + 149 170 let post_uri_str = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 150 - let post_uri = AtUri::new(&post_uri_str) 151 - .into_diagnostic()? 152 - .into_static(); 171 + let post_uri = AtUri::new(&post_uri_str).into_diagnostic()?.into_static(); 153 172 154 - debug!( 155 - "Post with {} blob(s): {}", 156 - blobs.len(), 157 - post_uri 158 - ); 173 + debug!("Post with {} blob(s): {}", blobs.len(), post_uri); 159 174 160 175 // Create job 161 176 let post_cid = commit ··· 174 189 }; 175 190 176 191 // Send to queue 177 - if let Err(e) = job_sender.send(job) { 178 - warn!("Failed to send job to queue: {}", e); 192 + match job_sender.send(job) { 193 + Ok(_) => { 194 + debug!("Sent job to channel: {}", post_uri_str); 195 + Ok(()) 196 + } 197 + Err(e) => { 198 + warn!("Failed to send job to queue: {}", e); 199 + Ok(()) 200 + } 179 201 } 180 - 181 - Ok(()) 182 202 } 183 203 JetstreamMessage::Identity { .. } => { 184 204 // Ignore identity updates for now
+28 -7
src/main.rs
··· 4 4 use std::time::Duration; 5 5 use tokio::sync::mpsc; 6 6 use tokio::time::interval; 7 - use tracing::{error, info}; 7 + use tracing::{debug, error, info}; 8 8 9 9 use skywatch_phash_rs::{ 10 10 agent::AgentSession, ··· 73 73 blob_checks.clone(), 74 74 metrics.clone(), 75 75 ); 76 - info!("Worker pool created with {} workers", config.processing.concurrency); 76 + info!( 77 + "Worker pool created with {} workers", 78 + config.processing.concurrency 79 + ); 77 80 78 81 // Create shutdown channels 79 82 let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); ··· 102 105 103 106 // Start job receiver (receives from jetstream, pushes to queue) 104 107 info!("Starting job receiver..."); 105 - let mut queue_for_receiver = queue.clone(); 106 108 let receiver_metrics = metrics.clone(); 109 + let receiver_config = config.clone(); 107 110 let receiver_handle = tokio::spawn(async move { 111 + // Create fresh queue connection for receiver 112 + let mut queue_for_receiver = match JobQueue::new(&receiver_config).await { 113 + Ok(q) => q, 114 + Err(e) => { 115 + error!("Failed to create queue for receiver: {}", e); 116 + return; 117 + } 118 + }; 119 + 108 120 while let Some(job) = job_rx.recv().await { 121 + debug!("Job receiver got job: {}", job.post_uri); 109 122 receiver_metrics.inc_jobs_received(); 110 - if let Err(e) = queue_for_receiver.push(&job).await { 111 - error!("Failed to push job to queue: {}", e); 112 - receiver_metrics.inc_jobs_failed(); 123 + match queue_for_receiver.push(&job).await { 124 + Ok(_) => { 125 + debug!("Pushed job to Redis queue: {}", job.post_uri); 126 + } 127 + Err(e) => { 128 + error!("Failed to push job to queue: {}", e); 129 + receiver_metrics.inc_jobs_failed(); 130 + } 113 131 } 114 132 } 115 133 info!("Job receiver stopped"); ··· 131 149 132 150 let worker_future = async move { 133 151 info!("Worker {} starting", worker_id); 134 - if let Err(e) = worker_pool_clone.start(queue_clone, cache_clone, worker_shutdown).await { 152 + if let Err(e) = worker_pool_clone 153 + .start(queue_clone, cache_clone, worker_shutdown) 154 + .await 155 + { 135 156 error!("Worker {} failed: {}", worker_id, e); 136 157 } 137 158 info!("Worker {} stopped", worker_id);
+26 -47
src/moderation/account.rs
··· 7 7 use jacquard_api::tools_ozone::moderation::{ 8 8 ModEventLabel, ModEventReport, ModEventTakedown, ModTool, 9 9 }; 10 - use jacquard_common::CowStr; 11 - use jacquard_common::xrpc::XrpcClient; 10 + use jacquard_common::{CowStr, IntoStatic}; 11 + use jacquard_common::types::string::{AtUri, Did}; 12 12 13 - use miette::{IntoDiagnostic, Result}; 13 + use miette::Result; 14 14 use std::collections::BTreeMap; 15 15 use tracing::{debug, info}; 16 16 17 17 use crate::config::Config; 18 18 use crate::moderation::rate_limiter::RateLimiter; 19 19 use crate::moderation::{ 20 - build_mod_tool_meta, build_moderation_call_opts, build_timestamped_comment, parse_did, 20 + build_mod_tool_meta, build_timestamped_comment, send_moderation_event, 21 21 }; 22 22 23 23 /// Label an account with a specific label via Ozone moderation API 24 - pub async fn label_account( 24 + pub async fn label_account<'a>( 25 25 agent: &Agent<MemoryCredentialSession>, 26 26 config: &Config, 27 27 rate_limiter: &RateLimiter, 28 - did: &str, 28 + did: &Did<'a>, 29 29 label_val: &str, 30 30 check_comment: &str, 31 - post_uri: &str, 31 + post_uri: &AtUri<'a>, 32 32 phash: &str, 33 - created_by: &str, 33 + created_by: &Did<'a>, 34 34 ) -> Result<()> { 35 - // Wait for rate limiter before making request 36 - rate_limiter.wait().await; 37 - 38 35 info!("Labeling account {} with label: {}", did, label_val); 39 36 40 37 let comment = build_timestamped_comment(check_comment); 41 - let meta = build_mod_tool_meta(post_uri, phash); 38 + let meta = build_mod_tool_meta(post_uri.as_str(), phash); 42 39 43 - // Create moderation label event using jacquard-api types 44 40 let event = EmitEvent::new() 45 - .created_by(parse_did(created_by)?) 41 + .created_by(created_by.clone().into_static()) 46 42 .event(EmitEventEvent::ModEventLabel(Box::new( 47 43 ModEventLabel::builder() 48 44 .create_label_vals(vec![CowStr::from(label_val)]) ··· 52 48 ))) 53 49 .subject(EmitEventSubject::RepoRef(Box::new( 54 50 RepoRef::builder() 55 - .did(parse_did(did)?) 51 + .did(did.clone().into_static()) 56 52 .build(), 57 53 ))) 58 54 .mod_tool(ModTool { ··· 62 58 }) 63 59 .build(); 64 60 65 - let opts = build_moderation_call_opts(config); 66 - 67 - // Send request via jacquard agent 68 - let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; 61 + send_moderation_event(agent, config, rate_limiter, event).await?; 69 62 70 63 debug!("Successfully labeled account: {}", did); 71 64 ··· 73 66 } 74 67 75 68 /// Report an account to ozone moderation 76 - pub async fn report_account( 69 + pub async fn report_account<'a>( 77 70 agent: &Agent<MemoryCredentialSession>, 78 71 config: &Config, 79 72 rate_limiter: &RateLimiter, 80 - did: &str, 73 + did: &Did<'a>, 81 74 reason: ReasonType<'static>, 82 75 check_comment: &str, 83 - post_uri: &str, 76 + post_uri: &AtUri<'a>, 84 77 phash: &str, 85 - created_by: &str, 78 + created_by: &Did<'a>, 86 79 ) -> Result<()> { 87 - // Wait for rate limiter before making request 88 - rate_limiter.wait().await; 89 - 90 80 info!("Reporting account {} to ozone: {:?}", did, reason); 91 81 92 82 let comment = build_timestamped_comment(check_comment); 93 - let meta = build_mod_tool_meta(post_uri, phash); 83 + let meta = build_mod_tool_meta(post_uri.as_str(), phash); 94 84 95 - // Create moderation report event using jacquard-api types 96 85 let event = EmitEvent::new() 97 - .created_by(parse_did(created_by)?) 86 + .created_by(created_by.clone().into_static()) 98 87 .event(EmitEventEvent::ModEventReport(Box::new( 99 88 ModEventReport::builder() 100 89 .report_type(reason) ··· 103 92 ))) 104 93 .subject(EmitEventSubject::RepoRef(Box::new( 105 94 RepoRef::builder() 106 - .did(parse_did(did)?) 95 + .did(did.clone().into_static()) 107 96 .build(), 108 97 ))) 109 98 .subject_blob_cids(vec![]) ··· 114 103 }) 115 104 .build(); 116 105 117 - let opts = build_moderation_call_opts(config); 118 - 119 - // Send request via jacquard agent 120 - let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; 106 + send_moderation_event(agent, config, rate_limiter, event).await?; 121 107 122 108 debug!("Successfully reported account: {}", did); 123 109 ··· 125 111 } 126 112 127 113 /// Takedown an account via Ozone moderation API 128 - pub async fn takedown_account( 114 + pub async fn takedown_account<'a>( 129 115 agent: &Agent<MemoryCredentialSession>, 130 116 config: &Config, 131 117 rate_limiter: &RateLimiter, 132 - did: &str, 118 + did: &Did<'a>, 133 119 comment: &str, 134 - created_by: &str, 120 + created_by: &Did<'a>, 135 121 ) -> Result<()> { 136 - // Wait for rate limiter before making request 137 - rate_limiter.wait().await; 138 - 139 122 info!("Taking down account: {}", did); 140 123 141 - // Create moderation takedown event using jacquard-api types 142 124 let event = EmitEvent::new() 143 - .created_by(parse_did(created_by)?) 125 + .created_by(created_by.clone().into_static()) 144 126 .event(EmitEventEvent::ModEventTakedown(Box::new( 145 127 ModEventTakedown { 146 128 comment: Some(CowStr::from(comment)), ··· 149 131 ))) 150 132 .subject(EmitEventSubject::RepoRef(Box::new( 151 133 RepoRef::builder() 152 - .did(parse_did(did)?) 134 + .did(did.clone().into_static()) 153 135 .build(), 154 136 ))) 155 137 .build(); 156 138 157 - let opts = build_moderation_call_opts(config); 158 - 159 - // Send request via jacquard agent 160 - let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; 139 + send_moderation_event(agent, config, rate_limiter, event).await?; 161 140 162 141 debug!("Successfully took down account: {}", did); 163 142
+18 -1
src/moderation/helpers.rs
··· 1 + use jacquard::client::{Agent, MemoryCredentialSession}; 2 + use jacquard_api::tools_ozone::moderation::emit_event::EmitEvent; 1 3 use jacquard_common::types::string::{AtprotoStr, Did}; 2 4 use jacquard_common::types::value::{Data, Object}; 3 - use jacquard_common::xrpc::CallOptions; 5 + use jacquard_common::xrpc::{CallOptions, XrpcClient}; 4 6 use jacquard_common::CowStr; 5 7 use jacquard_common::smol_str::SmolStr; 6 8 use jacquard_common::IntoStatic; ··· 8 10 use std::collections::BTreeMap; 9 11 10 12 use crate::config::Config; 13 + use crate::moderation::rate_limiter::RateLimiter; 11 14 12 15 pub fn build_timestamped_comment(check_comment: &str) -> String { 13 16 let timestamp = chrono::Utc::now().to_rfc3339(); ··· 49 52 .into_diagnostic() 50 53 .map(|did| did.into_static()) 51 54 } 55 + 56 + pub async fn send_moderation_event<'a>( 57 + agent: &Agent<MemoryCredentialSession>, 58 + config: &Config, 59 + rate_limiter: &RateLimiter, 60 + event: EmitEvent<'a>, 61 + ) -> Result<()> { 62 + rate_limiter.wait().await; 63 + 64 + let opts = build_moderation_call_opts(config); 65 + agent.send_with_opts(event, opts).await.into_diagnostic()?; 66 + 67 + Ok(()) 68 + }
+31 -105
src/moderation/post.rs
··· 8 8 use jacquard_api::tools_ozone::moderation::{ 9 9 ModEventLabel, ModEventReport, ModEventTakedown, ModTool, 10 10 }; 11 - use jacquard_common::CowStr; 12 - use jacquard_common::types::string::{AtUri, Cid}; 13 - use jacquard_common::xrpc::XrpcClient; 14 - use miette::{IntoDiagnostic, Result}; 11 + use jacquard_common::{CowStr, IntoStatic}; 12 + use jacquard_common::types::string::{AtUri, Cid, Did}; 13 + use miette::Result; 15 14 use std::collections::BTreeMap; 16 15 use tracing::{debug, info}; 17 16 18 17 use crate::config::Config; 19 18 use crate::moderation::rate_limiter::RateLimiter; 20 19 use crate::moderation::{ 21 - build_mod_tool_meta, build_moderation_call_opts, build_timestamped_comment, parse_did, 20 + build_mod_tool_meta, build_timestamped_comment, send_moderation_event, 22 21 }; 23 22 24 23 /// Label a post with a specific label via Ozone moderation API 25 - pub async fn label_post( 24 + pub async fn label_post<'a>( 26 25 agent: &Agent<MemoryCredentialSession>, 27 26 config: &Config, 28 27 rate_limiter: &RateLimiter, 29 - post_uri: &str, 30 - post_cid: &str, 28 + post_uri: &AtUri<'a>, 29 + post_cid: &Cid<'a>, 31 30 label_val: &str, 32 31 check_comment: &str, 33 32 phash: &str, 34 - created_by: &str, 33 + created_by: &Did<'a>, 35 34 ) -> Result<()> { 36 - // Wait for rate limiter before making request 37 - rate_limiter.wait().await; 38 - 39 35 info!("Labeling post {} with label: {}", post_uri, label_val); 40 36 41 37 let comment = build_timestamped_comment(check_comment); 42 - let meta = build_mod_tool_meta(post_uri, phash); 38 + let meta = build_mod_tool_meta(post_uri.as_str(), phash); 43 39 44 - // Create moderation label event using jacquard-api types 45 40 let event = EmitEvent::new() 46 - .created_by(parse_did(created_by)?) 41 + .created_by(created_by.clone().into_static()) 47 42 .event(EmitEventEvent::ModEventLabel(Box::new( 48 43 ModEventLabel::builder() 49 44 .create_label_vals(vec![CowStr::from(label_val)]) ··· 53 48 ))) 54 49 .subject(EmitEventSubject::StrongRef(Box::new( 55 50 StrongRef::builder() 56 - .uri(AtUri::new(post_uri).into_diagnostic()?) 57 - .cid(Cid::str(post_cid)) 51 + .uri(post_uri.clone().into_static()) 52 + .cid(post_cid.clone().into_static()) 58 53 .build(), 59 54 ))) 60 55 .mod_tool(ModTool { ··· 64 59 }) 65 60 .build(); 66 61 67 - let opts = build_moderation_call_opts(config); 68 - 69 - // Send request via jacquard agent 70 - let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; 62 + send_moderation_event(agent, config, rate_limiter, event).await?; 71 63 72 64 debug!("Successfully labeled post: {}", post_uri); 73 65 ··· 75 67 } 76 68 77 69 /// Report a post to ozone moderation 78 - pub async fn report_post( 70 + pub async fn report_post<'a>( 79 71 agent: &Agent<MemoryCredentialSession>, 80 72 config: &Config, 81 73 rate_limiter: &RateLimiter, 82 - post_uri: &str, 83 - _post_cid: &str, 74 + post_uri: &AtUri<'a>, 75 + _post_cid: &Cid<'a>, 76 + post_did: &Did<'a>, 84 77 reason: ReasonType<'static>, 85 78 check_comment: &str, 86 79 phash: &str, 87 - created_by: &str, 80 + created_by: &Did<'a>, 88 81 ) -> Result<()> { 89 - // Wait for rate limiter before making request 90 - rate_limiter.wait().await; 91 - 92 82 info!("Reporting post {} to ozone: {:?}", post_uri, reason); 93 83 94 84 let comment = build_timestamped_comment(check_comment); 95 - let meta = build_mod_tool_meta(post_uri, phash); 96 - let did_str = extract_did_from_uri(post_uri)?; 85 + let meta = build_mod_tool_meta(post_uri.as_str(), phash); 97 86 98 - // Create moderation report event using jacquard-api types 99 87 let event = EmitEvent::new() 100 - .created_by(parse_did(created_by)?) 88 + .created_by(created_by.clone().into_static()) 101 89 .event(EmitEventEvent::ModEventReport(Box::new( 102 90 ModEventReport::builder() 103 91 .report_type(reason) ··· 106 94 ))) 107 95 .subject(EmitEventSubject::RepoRef(Box::new( 108 96 RepoRef::builder() 109 - .did(parse_did(&did_str)?) 97 + .did(post_did.clone().into_static()) 110 98 .build(), 111 99 ))) 112 100 .subject_blob_cids(vec![]) ··· 117 105 }) 118 106 .build(); 119 107 120 - let opts = build_moderation_call_opts(config); 121 - 122 - // Send request via jacquard agent 123 - let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; 108 + send_moderation_event(agent, config, rate_limiter, event).await?; 124 109 125 110 debug!("Successfully reported post: {}", post_uri); 126 111 ··· 128 113 } 129 114 130 115 /// Takedown a post via Ozone moderation API 131 - pub async fn takedown_post( 116 + pub async fn takedown_post<'a>( 132 117 agent: &Agent<MemoryCredentialSession>, 133 118 config: &Config, 134 119 rate_limiter: &RateLimiter, 135 - post_uri: &str, 136 - post_cid: &str, 120 + post_uri: &AtUri<'a>, 121 + post_cid: &Cid<'a>, 137 122 comment: &str, 138 - created_by: &str, 123 + created_by: &Did<'a>, 139 124 ) -> Result<()> { 140 - // Wait for rate limiter before making request 141 - rate_limiter.wait().await; 142 - 143 125 info!("Taking down post: {}", post_uri); 144 126 145 - // Create moderation takedown event using jacquard-api types 146 127 let event = EmitEvent::new() 147 - .created_by(parse_did(created_by)?) 128 + .created_by(created_by.clone().into_static()) 148 129 .event(EmitEventEvent::ModEventTakedown(Box::new( 149 130 ModEventTakedown { 150 131 comment: Some(CowStr::from(comment)), ··· 153 134 ))) 154 135 .subject(EmitEventSubject::StrongRef(Box::new( 155 136 StrongRef::builder() 156 - .uri(AtUri::new(post_uri).into_diagnostic()?) 157 - .cid(Cid::str(post_cid)) 137 + .uri(post_uri.clone().into_static()) 138 + .cid(post_cid.clone().into_static()) 158 139 .build(), 159 140 ))) 160 141 .build(); 161 142 162 - let opts = build_moderation_call_opts(config); 163 - 164 - // Send request via jacquard agent 165 - let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; 143 + send_moderation_event(agent, config, rate_limiter, event).await?; 166 144 167 145 debug!("Successfully took down post: {}", post_uri); 168 146 169 147 Ok(()) 170 148 } 171 149 172 - /// Parse an AT URI into its components 173 - /// Format: at://did:plc:xxx/app.bsky.feed.post/rkey 174 - fn parse_at_uri(uri: &str) -> Result<(String, String, String)> { 175 - let uri = uri 176 - .strip_prefix("at://") 177 - .ok_or_else(|| miette::miette!("Invalid AT URI format: missing 'at://' prefix"))?; 178 - 179 - let parts: Vec<&str> = uri.split('/').collect(); 180 - if parts.len() != 3 { 181 - return Err(miette::miette!( 182 - "Invalid AT URI format: expected 3 parts, got {}", 183 - parts.len() 184 - )); 185 - } 186 - 187 - Ok(( 188 - parts[0].to_string(), // repo (DID) 189 - parts[1].to_string(), // collection 190 - parts[2].to_string(), // rkey 191 - )) 192 - } 193 - 194 - /// Extract DID from AT URI 195 - fn extract_did_from_uri(uri: &str) -> Result<String> { 196 - let (did, _, _) = parse_at_uri(uri)?; 197 - Ok(did) 198 - } 199 - 200 150 #[cfg(test)] 201 - mod tests { 202 - use super::*; 203 - 204 - #[test] 205 - fn test_parse_at_uri() { 206 - let uri = "at://did:plc:xyz123/app.bsky.feed.post/abc456"; 207 - let (repo, collection, rkey) = parse_at_uri(uri).unwrap(); 208 - assert_eq!(repo, "did:plc:xyz123"); 209 - assert_eq!(collection, "app.bsky.feed.post"); 210 - assert_eq!(rkey, "abc456"); 211 - } 212 - 213 - #[test] 214 - fn test_parse_at_uri_invalid() { 215 - let uri = "https://example.com/post/123"; 216 - assert!(parse_at_uri(uri).is_err()); 217 - } 218 - 219 - #[test] 220 - fn test_extract_did_from_uri() { 221 - let uri = "at://did:plc:xyz123/app.bsky.feed.post/abc456"; 222 - let did = extract_did_from_uri(uri).unwrap(); 223 - assert_eq!(did, "did:plc:xyz123"); 224 - } 225 - } 151 + mod tests {}
-6
src/processor/matcher.rs
··· 76 76 default_threshold: u32, 77 77 ) -> Option<MatchResult> { 78 78 for check in blob_checks { 79 - // Check if DID is in ignore list 80 79 if let Some(ignore_list) = &check.ignore_did { 81 80 if ignore_list.iter().any(|ignored_did| ignored_did.as_str() == did) { 82 81 debug!("Skipping check '{}' for ignored DID: {}", check.label, did); ··· 86 85 87 86 let threshold = check.hamming_threshold.unwrap_or(default_threshold); 88 87 89 - // Check each phash in the check 90 88 for check_phash in &check.phashes { 91 89 match phash::hamming_distance(phash, check_phash.as_str()) { 92 90 Ok(distance) => { ··· 122 120 did: &str, 123 121 blob: &BlobReference, 124 122 ) -> Result<Option<MatchResult>> { 125 - // Download the blob 126 123 let image_bytes = download_blob(client, config, did, &blob.cid).await?; 127 - 128 - // Compute phash 129 124 let phash = phash::compute_phash(&image_bytes)?; 130 125 debug!("Computed phash for blob {}: {}", blob.cid, phash); 131 126 132 - // Match against checks 133 127 let match_result = match_phash(&phash, blob_checks, did, config.phash.default_hamming_threshold); 134 128 135 129 Ok(match_result)
+13 -23
src/processor/phash.rs
··· 1 1 use image::DynamicImage; 2 2 use image_hasher::{HashAlg, HasherConfig, ImageHash}; 3 - use miette::{Diagnostic, IntoDiagnostic, Result}; 3 + use miette::Diagnostic; 4 4 use thiserror::Error; 5 5 6 6 #[derive(Debug, Error, Diagnostic)] 7 7 pub enum PhashError { 8 - #[error("Failed to decode image: {0}")] 9 - ImageDecodeError(String), 10 - 11 - #[error("Failed to compute hash: {0}")] 12 - HashComputeError(String), 8 + #[error("Failed to decode image")] 9 + ImageDecode(#[from] image::ImageError), 13 10 14 11 #[error("Invalid hash format: {0}")] 15 12 InvalidHashFormat(String), 13 + 14 + #[error("Invalid hex string")] 15 + ParseInt(#[from] std::num::ParseIntError), 16 16 } 17 17 18 18 /// Compute perceptual hash for an image using average hash (aHash) algorithm ··· 23 23 /// 3. Compute average pixel value 24 24 /// 4. Create 64-bit binary: 1 if pixel > avg, 0 otherwise 25 25 /// 5. Convert to hex string (16 chars) 26 - pub fn compute_phash(image_bytes: &[u8]) -> Result<String> { 27 - // Decode image from bytes 28 - let img = image::load_from_memory(image_bytes) 29 - .into_diagnostic() 30 - .map_err(|e| PhashError::ImageDecodeError(e.to_string()))?; 31 - 26 + pub fn compute_phash(image_bytes: &[u8]) -> Result<String, PhashError> { 27 + let img = image::load_from_memory(image_bytes)?; 32 28 compute_phash_from_image(&img) 33 29 } 34 30 35 31 /// Compute perceptual hash from a DynamicImage 36 - pub fn compute_phash_from_image(img: &DynamicImage) -> Result<String> { 32 + pub fn compute_phash_from_image(img: &DynamicImage) -> Result<String, PhashError> { 37 33 // Configure hasher with aHash (Mean) algorithm and 8x8 size 38 34 let hasher = HasherConfig::new() 39 35 .hash_alg(HashAlg::Mean) // average hash ··· 48 44 } 49 45 50 46 /// Convert ImageHash to hex string format (16 chars, matching TS output) 51 - fn hash_to_hex(hash: &ImageHash) -> Result<String> { 47 + fn hash_to_hex(hash: &ImageHash) -> Result<String, PhashError> { 52 48 // Get hash bytes 53 49 let bytes = hash.as_bytes(); 54 50 ··· 73 69 /// Compute hamming distance between two phash hex strings 74 70 /// 75 71 /// Uses Brian Kernighan's algorithm to count set bits 76 - pub fn hamming_distance(hash1: &str, hash2: &str) -> Result<u32> { 72 + pub fn hamming_distance(hash1: &str, hash2: &str) -> Result<u32, PhashError> { 77 73 // Validate input lengths 78 74 if hash1.len() != 16 || hash2.len() != 16 { 79 75 return Err(PhashError::InvalidHashFormat(format!( ··· 84 80 .into()); 85 81 } 86 82 87 - // Parse hex strings to u64 88 - let a = u64::from_str_radix(hash1, 16) 89 - .into_diagnostic() 90 - .map_err(|_| PhashError::InvalidHashFormat(format!("Invalid hex string: {}", hash1)))?; 91 - 92 - let b = u64::from_str_radix(hash2, 16) 93 - .into_diagnostic() 94 - .map_err(|_| PhashError::InvalidHashFormat(format!("Invalid hex string: {}", hash2)))?; 83 + let a = u64::from_str_radix(hash1, 16)?; 84 + let b = u64::from_str_radix(hash2, 16)?; 95 85 96 86 // XOR to find differing bits 97 87 let xor = a ^ b;
+44 -26
src/queue/worker.rs
··· 1 - use miette::Result; 1 + use jacquard::client::{Agent, MemoryCredentialSession}; 2 + use jacquard_api::com_atproto::moderation::ReasonType; 3 + use jacquard_common::types::string::Did; 4 + use miette::{IntoDiagnostic, Result}; 2 5 use reqwest::Client; 3 6 use std::sync::Arc; 4 7 use std::time::Duration; 5 8 use tokio::time::sleep; 6 - use tracing::{error, info}; 7 - use jacquard::client::{Agent, MemoryCredentialSession}; 8 - use jacquard_api::com_atproto::moderation::ReasonType; 9 + use tracing::{debug, error, info}; 9 10 10 11 use crate::agent::AgentSession; 11 12 use crate::cache::PhashCache; ··· 27 28 info!(concat!($action_name, " completed for: {}"), $subject); 28 29 } else { 29 30 $metrics_skip; 30 - info!(concat!($action_name, " already done, skipping: {}"), $subject); 31 + info!( 32 + concat!($action_name, " already done, skipping: {}"), 33 + $subject 34 + ); 31 35 } 32 36 } 33 37 }; ··· 42 46 info!(concat!($action_name, " completed for: {}"), $subject); 43 47 } else { 44 48 $metrics_skip; 45 - info!(concat!($action_name, " already done, skipping: {}"), $subject); 49 + info!( 50 + concat!($action_name, " already done, skipping: {}"), 51 + $subject 52 + ); 46 53 } 47 54 } 48 55 }; ··· 97 104 job_result = queue.pop(1) => { 98 105 match job_result { 99 106 Ok(Some(job)) => { 100 - // Create new redis connection for this job 107 + debug!("Worker popped job from queue: {}", job.post_uri); 101 108 let redis_client = match redis::Client::open(self.config.redis.url.as_str()) { 102 109 Ok(c) => c, 103 110 Err(e) => { ··· 114 121 } 115 122 }; 116 123 117 - // Process job 124 + let job_clone = job.clone(); 118 125 if let Err(e) = Self::process_job( 119 126 &self.config, 120 127 &self.client, ··· 125 132 &mut cache, 126 133 &mut redis_conn, 127 134 job, 128 - &mut queue, 129 135 self.agent.did(), 130 136 ) 131 137 .await 132 138 { 133 139 error!("Worker task failed: {}", e); 140 + self.metrics.inc_jobs_failed(); 141 + if let Err(retry_err) = queue.retry(job_clone).await { 142 + error!("Failed to retry job: {}", retry_err); 143 + } 134 144 } 135 145 } 136 146 Ok(None) => { ··· 159 169 cache: &mut PhashCache, 160 170 redis_conn: &mut redis::aio::MultiplexedConnection, 161 171 job: ImageJob, 162 - queue: &mut JobQueue, 163 172 created_by: &str, 164 173 ) -> Result<()> { 165 - info!("Processing job: {}", job.post_uri); 174 + debug!("Processing job: {}", job.post_uri); 166 175 167 - // Process all blobs and find matches 168 - let matches = Self::process_job_blobs(config, client, blob_checks, metrics, cache, &job).await?; 176 + let matches = 177 + Self::process_job_blobs(config, client, blob_checks, metrics, cache, &job).await?; 169 178 170 179 if matches.is_empty() { 171 - info!("No matches found for job: {}", job.post_uri); 180 + debug!("No matches found for job: {}", job.post_uri); 172 181 metrics.inc_jobs_processed(); 173 182 return Ok(()); 174 183 } 175 184 176 185 // Take moderation actions for each match 177 186 for match_result in matches { 178 - if let Err(e) = 179 - Self::take_moderation_action(config, agent, metrics, rate_limiter, redis_conn, &job, &match_result, created_by) 180 - .await 187 + if let Err(e) = Self::take_moderation_action( 188 + config, 189 + agent, 190 + metrics, 191 + rate_limiter, 192 + redis_conn, 193 + &job, 194 + &match_result, 195 + created_by, 196 + ) 197 + .await 181 198 { 182 199 error!("Failed to take moderation action: {}", e); 183 - metrics.inc_jobs_failed(); 184 - // Retry the job 185 - queue.retry(job.clone()).await?; 200 + // Don't retry here - worker will handle it 186 201 return Err(e); 187 202 } 188 203 } 189 204 190 - info!("Successfully processed job: {}", job.post_uri); 205 + debug!("Successfully processed job: {}", job.post_uri); 191 206 metrics.inc_jobs_processed(); 192 207 193 208 Ok(()) ··· 217 232 metrics.inc_blobs_downloaded(); 218 233 219 234 // Download and compute 220 - let image_bytes = matcher::download_blob(client, config, &job.post_did, &blob.cid).await?; 235 + let image_bytes = 236 + matcher::download_blob(client, config, &job.post_did, &blob.cid).await?; 221 237 let computed_phash = crate::processor::phash::compute_phash(&image_bytes)?; 222 238 223 239 // Store in cache ··· 252 268 created_by: &str, 253 269 ) -> Result<()> { 254 270 let check = &match_result.matched_check; 271 + let created_by_did = Did::new(created_by).into_diagnostic()?; 255 272 256 273 info!( 257 274 "Taking moderation action for label '{}' on post: {}", ··· 268 285 rate_limiter, 269 286 &job.post_uri, 270 287 &job.post_cid, 288 + &job.post_did, 271 289 ReasonType::ComAtprotoModerationDefsReasonSpam, 272 290 &check.comment, 273 291 &match_result.phash, 274 - created_by, 292 + &created_by_did, 275 293 ), 276 294 metrics.inc_posts_reported(), 277 295 metrics.inc_posts_already_reported(), ··· 291 309 &check.label, 292 310 &check.comment, 293 311 &match_result.phash, 294 - created_by, 312 + &created_by_did, 295 313 ), 296 314 claims::set_label(redis_conn, &job.post_uri, &check.label, None), 297 315 metrics.inc_posts_labeled(), ··· 312 330 &check.comment, 313 331 &job.post_uri, 314 332 &match_result.phash, 315 - created_by, 333 + &created_by_did, 316 334 ), 317 335 metrics.inc_accounts_reported(), 318 336 metrics.inc_accounts_already_reported(), ··· 332 350 &check.comment, 333 351 &job.post_uri, 334 352 &match_result.phash, 335 - created_by, 353 + &created_by_did, 336 354 ), 337 355 claims::set_label(redis_conn, &job.post_did, &check.label, None), 338 356 metrics.inc_accounts_labeled(),
+14 -14
src/types/mod.rs
··· 69 69 where 70 70 D: Deserializer<'de>, 71 71 { 72 - let s = String::deserialize(deserializer)?; 73 - Ok(CowStr::from(s)) 72 + let s: &str = Deserialize::deserialize(deserializer)?; 73 + Ok(CowStr::from(s).into_static()) 74 74 } 75 75 76 76 fn deserialize_cowstr_vec<'de, D>(deserializer: D) -> Result<Vec<CowStr<'static>>, D::Error> 77 77 where 78 78 D: Deserializer<'de>, 79 79 { 80 - let strings: Vec<String> = Vec::deserialize(deserializer)?; 81 - Ok(strings.into_iter().map(CowStr::from).collect()) 80 + let strings: Vec<&str> = Vec::deserialize(deserializer)?; 81 + Ok(strings.into_iter().map(|s| CowStr::from(s).into_static()).collect()) 82 82 } 83 83 84 84 fn deserialize_option_cowstr<'de, D>(deserializer: D) -> Result<Option<CowStr<'static>>, D::Error> 85 85 where 86 86 D: Deserializer<'de>, 87 87 { 88 - let opt: Option<String> = Option::deserialize(deserializer)?; 89 - Ok(opt.map(CowStr::from)) 88 + let opt: Option<&str> = Option::deserialize(deserializer)?; 89 + Ok(opt.map(|s| CowStr::from(s).into_static())) 90 90 } 91 91 92 92 fn deserialize_cid<'de, D>(deserializer: D) -> Result<Cid<'static>, D::Error> 93 93 where 94 94 D: Deserializer<'de>, 95 95 { 96 - let s = String::deserialize(deserializer)?; 97 - Ok(Cid::str(&s).into_static()) 96 + let s: &str = Deserialize::deserialize(deserializer)?; 97 + Ok(Cid::str(s).into_static()) 98 98 } 99 99 100 100 fn deserialize_did<'de, D>(deserializer: D) -> Result<Did<'static>, D::Error> 101 101 where 102 102 D: Deserializer<'de>, 103 103 { 104 - let s = String::deserialize(deserializer)?; 105 - Did::new(&s) 104 + let s: &str = Deserialize::deserialize(deserializer)?; 105 + Did::new(s) 106 106 .map(|d| d.into_static()) 107 107 .map_err(serde::de::Error::custom) 108 108 } ··· 113 113 where 114 114 D: Deserializer<'de>, 115 115 { 116 - let opt: Option<Vec<String>> = Option::deserialize(deserializer)?; 116 + let opt: Option<Vec<&str>> = Option::deserialize(deserializer)?; 117 117 match opt { 118 118 Some(strings) => { 119 119 let dids: Result<Vec<Did<'static>>, _> = strings 120 - .iter() 120 + .into_iter() 121 121 .map(|s| Did::new(s).map(|d| d.into_static())) 122 122 .collect(); 123 123 Ok(Some(dids.map_err(serde::de::Error::custom)?)) ··· 130 130 where 131 131 D: Deserializer<'de>, 132 132 { 133 - let s = String::deserialize(deserializer)?; 134 - AtUri::new(&s) 133 + let s: &str = Deserialize::deserialize(deserializer)?; 134 + AtUri::new(s) 135 135 .map(|u| u.into_static()) 136 136 .map_err(serde::de::Error::custom) 137 137 }