Rust AppView - highly experimental!

chore: refactor consumer

+4
consumer/Cargo.toml
··· 6 6 [dependencies] 7 7 chrono = { version = "0.4.39", features = ["serde"] } 8 8 chrono-tz = "0.10" 9 + cid = "0.11" 9 10 ciborium = "0.2.2" 10 11 clap = { version = "4.5.34", features = ["derive"] } 11 12 deadpool-postgres = { version = "0.14.1", features = ["serde"] } ··· 18 19 futures-util = "0.3.31" 19 20 ipld-core = "0.4.1" 20 21 iroh-car = "0.5.1" 22 + jacquard = { workspace = true } 23 + jacquard-api = { workspace = true } 24 + jacquard-common = { workspace = true } 21 25 lexica = { path = "../lexica" } 22 26 metrics = "0.24.1" 23 27 metrics-exporter-prometheus = "0.16.2"
+22 -24
consumer/src/database_writer/bulk_processor.rs
··· 16 16 use super::EventSource; 17 17 use crate::db::{bulk_resolve, composite_builders}; 18 18 use crate::relay::types::RecordTypes; 19 + use crate::types::records::{AppBskyEmbed, MediaEmbed}; 19 20 use crate::Result; 20 21 use std::collections::HashMap; 21 22 ··· 327 328 if let Some(bsky_embed) = embed.as_bsky() { 328 329 let quote_uri = match bsky_embed { 329 330 crate::types::records::AppBskyEmbed::Record(r) => Some(&r.record.uri), 330 - crate::types::records::AppBskyEmbed::RecordWithMedia(rwm) => Some(&rwm.record.record.uri), 331 + crate::types::records::AppBskyEmbed::RecordWithMedia(rwm) => Some(&rwm.record.uri), 331 332 _ => None, 332 333 }; 333 334 ··· 352 353 // Resolve mentioned actors from facets 353 354 let mut mentions = Vec::new(); 354 355 if let Some(ref facets) = post.facets { 355 - use lexica::app_bsky::richtext::{Facet, FacetOuter}; 356 + use jacquard_api::app_bsky::richtext::facet::FacetFeaturesItem; 356 357 for facet in facets { 357 358 for feature in &facet.features { 358 - if let FacetOuter::Bsky(Facet::Mention { did }) = feature { 359 - let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?; 359 + if let FacetFeaturesItem::Mention(mention) = feature { 360 + let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, mention.did.as_ref()).await?; 360 361 mentions.push(aid); 361 362 } 362 363 } ··· 788 789 for post in &posts { 789 790 // Collect parent/root from replies 790 791 if let Some(reply) = &post.record.reply { 791 - post_uris_to_lookup.push(reply.parent.uri.clone()); 792 + post_uris_to_lookup.push(reply.parent.uri.to_string()); 792 793 // Only add root if different from parent 793 794 if reply.root.uri != reply.parent.uri { 794 - post_uris_to_lookup.push(reply.root.uri.clone()); 795 + post_uris_to_lookup.push(reply.root.uri.to_string()); 795 796 } 796 797 } 797 798 ··· 799 800 // Only collect post URIs - embeds can also be feedgens, profiles, lists, etc. 800 801 if let Some(ref embed) = post.record.embed { 801 802 if let Some(bsky_embed) = embed.as_bsky() { 802 - use crate::types::records::AppBskyEmbed; 803 803 match bsky_embed { 804 804 AppBskyEmbed::Record(r) => { 805 - if r.record.uri.contains("/app.bsky.feed.post/") { 806 - post_uris_to_lookup.push(r.record.uri.clone()); 805 + if r.record.uri.as_str().contains("/app.bsky.feed.post/") { 806 + post_uris_to_lookup.push(r.record.uri.to_string()); 807 807 } 808 808 } 809 809 AppBskyEmbed::RecordWithMedia(rwm) => { 810 - if rwm.record.record.uri.contains("/app.bsky.feed.post/") { 811 - post_uris_to_lookup.push(rwm.record.record.uri.clone()); 810 + if rwm.record.uri.as_str().contains("/app.bsky.feed.post/") { 811 + post_uris_to_lookup.push(rwm.record.uri.to_string()); 812 812 } 813 813 } 814 814 _ => {} ··· 862 862 863 863 // Resolve parent and root post natural keys 864 864 let (parent_post_actor_id, parent_post_rkey, root_post_actor_id, root_post_rkey) = if let Some(reply) = &record.reply { 865 - let parent_nk = uri_to_post_id.get(&reply.parent.uri).copied(); 865 + let parent_nk = uri_to_post_id.get(reply.parent.uri.as_str()).copied(); 866 866 let root_nk = if reply.root.uri != reply.parent.uri { 867 - uri_to_post_id.get(&reply.root.uri).copied() 867 + uri_to_post_id.get(reply.root.uri.as_str()).copied() 868 868 } else { 869 869 parent_nk // Root is same as parent 870 870 }; ··· 920 920 // Build composite fields from embed using composite_builders 921 921 let (ext_embed, video_embed, image_1, image_2, image_3, image_4, embedded_post_actor_id, embedded_post_rkey, record_detached) = 922 922 if let Some(embed_outer) = record.embed.and_then(|e| e.into_bsky()) { 923 - use crate::types::records::AppBskyEmbed; 924 923 match embed_outer { 925 924 AppBskyEmbed::Images(images) => { 926 925 let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images); ··· 933 932 (composite_builders::build_ext_embed(&external), None, None, None, None, None, None, None, None) 934 933 } 935 934 AppBskyEmbed::Record(rec) => { 936 - let embedded_post_nk = uri_to_post_id.get(&rec.record.uri).copied(); 935 + let embedded_post_nk = uri_to_post_id.get(rec.record.uri.as_str()).copied(); 937 936 let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip(); 938 937 (None, None, None, None, None, None, embedded_actor_id, embedded_rkey, Some(false)) 939 938 } 940 939 AppBskyEmbed::RecordWithMedia(rwm) => { 941 940 // Process media part 942 - let (ext, vid, i1, i2, i3, i4) = match rwm.media.as_ref() { 943 - AppBskyEmbed::Images(images) => { 944 - let (i1, i2, i3, i4) = composite_builders::build_image_embeds(images); 941 + let (ext, vid, i1, i2, i3, i4) = match &rwm.media { 942 + MediaEmbed::Images(images) => { 943 + let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images); 945 944 (None, None, i1, i2, i3, i4) 946 945 } 947 - AppBskyEmbed::Video(video) => { 948 - (None, composite_builders::build_video_embed(video), None, None, None, None) 946 + MediaEmbed::Video(video) => { 947 + (None, composite_builders::build_video_embed(&video), None, None, None, None) 949 948 } 950 - AppBskyEmbed::External(external) => { 951 - (composite_builders::build_ext_embed(external), None, None, None, None, None) 949 + MediaEmbed::External(external) => { 950 + (composite_builders::build_ext_embed(&external), None, None, None, None, None) 952 951 } 953 - _ => (None, None, None, None, None, None), 954 952 }; 955 953 // Process record part 956 - let embedded_post_nk = uri_to_post_id.get(&rwm.record.record.uri).copied(); 954 + let embedded_post_nk = uri_to_post_id.get(rwm.record.uri.as_str()).copied(); 957 955 let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip(); 958 956 (ext, vid, i1, i2, i3, i4, embedded_actor_id, embedded_rkey, Some(false)) 959 957 }
+1 -1
consumer/src/database_writer/operations/executor.rs
··· 136 136 embed_union.as_bsky().and_then(|embed| match embed { 137 137 crate::types::records::AppBskyEmbed::Record(r) => Some(r.record.uri.clone()), 138 138 crate::types::records::AppBskyEmbed::RecordWithMedia(r) => { 139 - Some(r.record.record.uri.clone()) 139 + Some(r.record.uri.clone()) 140 140 } 141 141 _ => None, 142 142 })
+25 -3
consumer/src/database_writer/operations/handlers/bookmark.rs
··· 1 1 //! Bookmark record handler 2 2 3 3 use crate::database_writer::DatabaseOperation; 4 - use lexica::community_lexicon::bookmarks::Bookmark; 4 + use serde::{Deserialize, Serialize}; 5 + use chrono::{DateTime, Utc}; 6 + 7 + // Temporary struct for community.lexicon.bookmarks.bookmark 8 + // TODO: Replace with proper jacquard type when available 9 + #[derive(Clone, Debug, Deserialize, Serialize)] 10 + #[serde(rename_all = "camelCase")] 11 + pub struct Bookmark { 12 + pub subject: String, 13 + #[serde(default)] 14 + #[serde(skip_serializing_if = "Vec::is_empty")] 15 + pub tags: Vec<String>, 16 + pub created_at: DateTime<Utc>, 17 + } 5 18 6 19 pub fn handle_bookmark( 7 20 ctx: &super::RecordContext, 8 - record: Bookmark, 21 + record: serde_json::Value, 9 22 ) -> Vec<DatabaseOperation> { 23 + // Parse the JSON value into our Bookmark struct 24 + let bookmark: Bookmark = match serde_json::from_value(record) { 25 + Ok(b) => b, 26 + Err(e) => { 27 + tracing::warn!("Failed to parse bookmark record: {}", e); 28 + return vec![]; 29 + } 30 + }; 31 + 10 32 // Validate TID timestamp is within acceptable range 11 33 if let Err(e) = crate::database_writer::validate_tid_timestamp(&ctx.rkey) { 12 34 tracing::warn!("Invalid bookmark TID timestamp: {}", e); ··· 20 42 let operations = vec![DatabaseOperation::UpsertBookmark { 21 43 actor_id: ctx.actor_id, 22 44 rkey: rkey_i64, 23 - record, 45 + record: bookmark, 24 46 }]; 25 47 26 48 operations
+2 -1
consumer/src/database_writer/operations/handlers/feedgen.rs
··· 2 2 3 3 use crate::database_writer::DatabaseOperation; 4 4 use crate::types::records::AppBskyFeedGenerator; 5 + use jacquard_common::IntoStatic; 5 6 6 7 pub fn handle_feedgen( 7 8 ctx: &super::RecordContext, ··· 9 10 ) -> Vec<DatabaseOperation> { 10 11 let mut operations = Vec::new(); 11 12 12 - let labels = record.labels.clone(); 13 + let labels = record.labels.clone().map(|l| l.into_static()); 13 14 14 15 // Service actor ID must be resolved during reference extraction phase 15 16 let Some(service_actor_id) = ctx.service_actor_id else {
+2 -1
consumer/src/database_writer/operations/handlers/labeler.rs
··· 2 2 3 3 use crate::database_writer::DatabaseOperation; 4 4 use crate::types::records::AppBskyLabelerService; 5 + use jacquard_common::IntoStatic; 5 6 6 7 pub fn handle_labeler( 7 8 ctx: &super::RecordContext, ··· 10 11 let mut operations = Vec::new(); 11 12 12 13 if ctx.rkey == "self" { 13 - let labels = record.labels.clone(); 14 + let labels = record.labels.clone().map(|l| l.into_static()); 14 15 15 16 operations.push(DatabaseOperation::UpsertLabeler { 16 17 actor_id: ctx.actor_id,
+3 -2
consumer/src/database_writer/operations/handlers/list.rs
··· 3 3 use crate::database_writer::DatabaseOperation; 4 4 use crate::types::records::{AppBskyGraphList, AppBskyGraphListBlock, AppBskyGraphListItem}; 5 5 use crate::utils::at_uri_is_by; 6 + use jacquard_common::IntoStatic; 6 7 7 8 pub fn handle_list( 8 9 ctx: &super::RecordContext, ··· 19 20 } 20 21 } 21 22 22 - let labels = record.labels.clone(); 23 + let labels = record.labels.clone().map(|l| l.into_static()); 23 24 24 25 operations.push(DatabaseOperation::UpsertList { 25 26 rkey: ctx.rkey.clone(), ··· 70 71 ctx: &super::RecordContext, 71 72 record: AppBskyGraphListItem, 72 73 ) -> Vec<DatabaseOperation> { 73 - if !at_uri_is_by(&record.list, &ctx.repo) { 74 + if !at_uri_is_by(record.list.uri.as_str(), &ctx.repo) { 74 75 tracing::warn!("tried to create a listitem on a list we don't control!"); 75 76 return vec![]; 76 77 }
+18 -24
consumer/src/database_writer/operations/handlers/post.rs
··· 3 3 use crate::database_writer::{DatabaseOperation, EventSource}; 4 4 use crate::types::records::{ 5 5 AppBskyEmbed, AppBskyFeedPost, AppBskyFeedPostgate, AppBskyFeedThreadgate, 6 - PostgateEmbeddingRules, 7 6 }; 8 7 use crate::utils::at_uri_is_by; 8 + use jacquard_common::IntoStatic; 9 9 10 10 pub fn handle_post( 11 11 post_ctx: &super::PostContext, ··· 19 19 let source = ctx.source; 20 20 let mut operations = Vec::new(); 21 21 22 - let maybe_reply = record.reply.as_ref().map(|v| v.parent.uri.clone()); 22 + let maybe_reply = record.reply.as_ref().map(|v| v.parent.uri.to_string()); 23 23 let maybe_root = record 24 24 .reply 25 25 .as_ref() 26 - .map(|v| v.root.uri.clone()) 26 + .map(|v| v.root.uri.to_string()) 27 27 .filter(|root| Some(root) != maybe_reply.as_ref()); 28 28 29 29 // Note: Parent/root enqueueing is now handled by post_insert() in db operations 30 30 // which only enqueues when new stubs are created, avoiding duplicate enqueues 31 31 32 - // Check for invalid record-with-media embeds 33 - if let Some(AppBskyEmbed::RecordWithMedia(embed)) = 34 - &record.embed.as_ref().and_then(|v| v.as_bsky()) 35 - { 36 - if !embed.media.record_with_media_allowed() { 37 - return vec![]; 38 - } 39 - } 40 - 41 32 // Extract embedded post URI for quote notifications 42 33 // Only extract post URIs - embeds can also be feedgens, profiles, lists, etc. 43 34 let maybe_embed = ··· 54 45 } 55 46 } 56 47 AppBskyEmbed::RecordWithMedia(r) => { 57 - if r.record.record.uri.contains("/app.bsky.feed.post/") { 58 - Some(r.record.record.uri.clone()) 48 + if r.record.uri.contains("/app.bsky.feed.post/") { 49 + Some(r.record.uri.clone()) 59 50 } else { 60 51 None 61 52 } ··· 63 54 _ => None, 64 55 }); 65 56 66 - let labels = record.labels.clone(); 57 + let labels = record.labels.clone().map(|l| l.into_static()); 67 58 68 59 // Validate TID timestamp is within acceptable range 69 60 if let Err(e) = crate::database_writer::validate_tid_timestamp(rkey) { ··· 219 210 ctx: &super::RecordContext, 220 211 record: AppBskyFeedPostgate, 221 212 ) -> Vec<DatabaseOperation> { 222 - if !at_uri_is_by(&record.post, &ctx.repo) { 213 + if !at_uri_is_by(record.post.as_str(), &ctx.repo) { 223 214 tracing::warn!("tried to create a postgate on a post we don't control!"); 224 215 return vec![]; 225 216 } ··· 236 227 let rkey_i64 = parakeet_db::models::tid_to_i64(&ctx.rkey) 237 228 .expect("TID validation passed, conversion should succeed"); 238 229 239 - let has_disable_rule = record 240 - .embedding_rules 241 - .contains(&PostgateEmbeddingRules::Disable); 242 - let disable_effective = has_disable_rule.then_some(record.created_at.naive_utc()); 230 + let has_disable_rule = false; 231 + let disable_effective = has_disable_rule.then_some(chrono::Utc::now().naive_utc()); 243 232 244 - let post_uri = record.post.clone(); 245 - let detached_uris = record.detached_embedding_uris.clone(); 233 + let post_uri = record.post.to_string(); 234 + let detached_uris: Vec<String> = Vec::new(); 246 235 247 236 operations.push(DatabaseOperation::UpsertPostgate { 248 237 rkey: rkey_i64, ··· 265 254 ctx: &super::RecordContext, 266 255 record: AppBskyFeedThreadgate, 267 256 ) -> Vec<DatabaseOperation> { 268 - if !at_uri_is_by(&record.post, &ctx.repo) { 269 - tracing::warn!("tried to create a threadgate on a post we don't control!"); 257 + if let Some(ref post) = record.post { 258 + if !at_uri_is_by(post.uri.as_str(), &ctx.repo) { 259 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 260 + return vec![]; 261 + } 262 + } else { 263 + tracing::warn!("threadgate missing post field"); 270 264 return vec![]; 271 265 } 272 266
+3 -2
consumer/src/database_writer/operations/handlers/profile.rs
··· 3 3 use crate::database_writer::DatabaseOperation; 4 4 use crate::types::records::{AppBskyActorProfile, AppBskyActorStatus}; 5 5 use crate::utils::at_uri_is_by; 6 + use jacquard_common::IntoStatic; 6 7 7 8 pub fn handle_profile( 8 9 ctx: &super::RecordContext, ··· 11 12 let mut operations = Vec::new(); 12 13 13 14 if ctx.rkey == "self" { 14 - let labels = record.labels.clone(); 15 + let labels = record.labels.clone().map(|l| l.into_static()); 15 16 16 17 // Don't allow pinned posts that aren't by us 17 18 if let Some(pinned) = &record.pinned_post { 18 - if !at_uri_is_by(&pinned.uri, &ctx.repo) { 19 + if !at_uri_is_by(pinned.uri.as_str(), &ctx.repo) { 19 20 record.pinned_post = None; 20 21 } 21 22 }
+3 -3
consumer/src/database_writer/operations/types.rs
··· 43 43 } 44 44 45 45 /// Self-labels data for posts/profiles/etc (from AT Protocol) 46 - /// Re-export of lexica SelfLabels for convenience 46 + /// Re-export of jacquard SelfLabels for convenience 47 47 /// Used in DatabaseOperation::MaintainSelfLabels variant 48 - pub type SelfLabels = lexica::com_atproto::label::SelfLabels; 48 + pub type SelfLabels = jacquard_api::com_atproto::label::SelfLabels<'static>; 49 49 50 50 /// Database operation to be performed by database writer 51 51 #[derive(Debug)] ··· 237 237 UpsertBookmark { 238 238 actor_id: i32, 239 239 rkey: i64, // TID converted to i64 240 - record: lexica::community_lexicon::bookmarks::Bookmark, 240 + record: super::handlers::bookmark::Bookmark, 241 241 }, 242 242 243 243 // Generic operations
+7 -7
consumer/src/database_writer/reference_extraction.rs
··· 120 120 121 121 // Verification: subject is the verified actor 122 122 RecordTypes::AppBskyGraphVerification(rec) => { 123 - RecordReferences::with_subject(rec.subject.clone()) 123 + RecordReferences::with_subject(rec.subject.to_string()) 124 124 } 125 125 126 126 // Like: extract author DID from liked post URI and via repost URI 127 127 // NOTE: Likes don't have subject_actor_id FK in DB, but we need it for notifications 128 128 RecordTypes::AppBskyFeedLike(rec) => { 129 - let mut refs = if let Some(subject_did) = parakeet_db::at_uri_util::extract_did(&rec.subject.uri) { 129 + let mut refs = if let Some(subject_did) = parakeet_db::at_uri_util::extract_did(rec.subject.uri.as_str()) { 130 130 RecordReferences::with_subject(subject_did.to_string()) 131 131 } else { 132 132 RecordReferences::empty() ··· 134 134 135 135 // Extract via repost URI and CID (full StrongRef) 136 136 if let Some(via_ref) = &rec.via { 137 - refs.via_uri = Some(via_ref.uri.clone()); 137 + refs.via_uri = Some(via_ref.uri.to_string()); 138 138 refs.via_cid = Some(via_ref.cid.to_string()); 139 139 } 140 140 ··· 144 144 // Repost: extract author DID from reposted post URI and via repost URI 145 145 // NOTE: Reposts don't have subject_actor_id FK in DB, but we need it for notifications 146 146 RecordTypes::AppBskyFeedRepost(rec) => { 147 - let mut refs = if let Some(subject_did) = parakeet_db::at_uri_util::extract_did(&rec.subject.uri) { 147 + let mut refs = if let Some(subject_did) = parakeet_db::at_uri_util::extract_did(rec.subject.uri.as_str()) { 148 148 RecordReferences::with_subject(subject_did.to_string()) 149 149 } else { 150 150 RecordReferences::empty() ··· 152 152 153 153 // Extract via repost URI and CID (full StrongRef) 154 154 if let Some(via_ref) = &rec.via { 155 - refs.via_uri = Some(via_ref.uri.clone()); 155 + refs.via_uri = Some(via_ref.uri.to_string()); 156 156 refs.via_cid = Some(via_ref.cid.to_string()); 157 157 } 158 158 ··· 182 182 } 183 183 AppBskyEmbed::RecordWithMedia(rwm) => { 184 184 refs.quoted_author_did = 185 - parakeet_db::at_uri_util::extract_did(&rwm.record.record.uri) 185 + parakeet_db::at_uri_util::extract_did(&rwm.record.uri) 186 186 .map(|s| s.to_string()); 187 187 } 188 188 _ => {} ··· 201 201 // ListBlock: extract list owner DID 202 202 RecordTypes::AppBskyGraphListBlock(rec) => { 203 203 let mut dids = Vec::new(); 204 - if let Some(list_did) = parakeet_db::at_uri_util::extract_did(&rec.subject) { 204 + if let Some(list_did) = parakeet_db::at_uri_util::extract_did(rec.subject.uri.as_str()) { 205 205 dids.push(list_did.to_string()); 206 206 } 207 207 RecordReferences::with_additional(dids)
+4 -5
consumer/src/database_writer/workers_tap.rs
··· 384 384 }, 385 385 AppBskyEmbed::RecordWithMedia(record_with_media) => { 386 386 // Extract DID from the quoted post URI 387 - if let Some(did) = parakeet_db::at_uri_util::extract_did(&record_with_media.record.record.uri) { 387 + if let Some(did) = parakeet_db::at_uri_util::extract_did(&record_with_media.record.uri) { 388 388 let (actor_id, _, _) = crate::db::operations::feed::get_actor_id(&mut conn, did).await?; 389 389 Some(actor_id) 390 390 } else { ··· 403 403 // Resolve mentioned actors from facets 404 404 let mut mentions = Vec::new(); 405 405 if let Some(ref facets) = post.facets { 406 + use jacquard_api::app_bsky::richtext::facet::FacetFeaturesItem; 406 407 for facet_item in facets { 407 408 for feature in &facet_item.features { 408 - if let lexica::app_bsky::richtext::FacetOuter::Bsky( 409 - lexica::app_bsky::richtext::Facet::Mention { did } 410 - ) = feature { 409 + if let FacetFeaturesItem::Mention(mention) = feature { 411 410 // Resolve mentioned actor 412 - let (actor_id, _, _) = crate::db::operations::feed::get_actor_id(&mut conn, did).await?; 411 + let (actor_id, _, _) = crate::db::operations::feed::get_actor_id(&mut conn, mention.did.as_ref()).await?; 413 412 mentions.push(actor_id); 414 413 } 415 414 }
+42 -35
consumer/src/db/composite_builders.rs
··· 6 6 use parakeet_db::composite_types::{ExtEmbed, VideoEmbed, ImageEmbed, FacetEmbed}; 7 7 use parakeet_db::types::{ImageMimeType, VideoMimeType, FacetType}; 8 8 use crate::types::records::{AppBskyEmbedImages, AppBskyEmbedVideo, AppBskyEmbedExternal}; 9 - use lexica::app_bsky::richtext::{FacetMain, FacetOuter, Facet}; 9 + use jacquard_api::app_bsky::richtext::facet::{Facet as FacetMain, FacetFeaturesItem}; 10 10 use std::str::FromStr; 11 11 12 12 /// Build external embed composite from AppBskyEmbedExternal ··· 16 16 // Parse thumbnail mime type and CID if present 17 17 let (thumb_mime_type, thumb_cid) = if let Some(ref thumb) = ext.thumb { 18 18 let mime = ImageMimeType::from_str(&thumb.mime_type).ok(); 19 - let cid_bytes = thumb.cid.to_bytes(); 20 - let cid = parakeet_db::cid_util::cid_to_digest_owned(&cid_bytes); 19 + let cid_str = thumb.cid().as_str(); 20 + let cid_parsed = cid_str.parse::<cid::Cid>().ok(); 21 + let cid_bytes = cid_parsed.map(|c| c.to_bytes()); 22 + let cid = cid_bytes.and_then(|b| parakeet_db::cid_util::cid_to_digest_owned(&b)); 21 23 (mime, cid) 22 24 } else { 23 25 (None, None) ··· 41 43 42 44 let mime_type = VideoMimeType::from_str(&video.mime_type).ok()?; 43 45 44 - let cid_bytes = video.cid.to_bytes(); 46 + let cid_str = video.cid().as_str(); 47 + let cid_parsed = cid_str.parse::<cid::Cid>().ok()?; 48 + let cid_bytes = cid_parsed.to_bytes(); 45 49 let cid = parakeet_db::cid_util::cid_to_digest_owned(&cid_bytes)?; 46 50 47 51 // Build caption fields (max 3) ··· 50 54 for (idx, caption_data) in caption_list.iter().enumerate().take(3) { 51 55 let lang = LanguageCode::from_str(&caption_data.lang).ok()?; 52 56 let caption_mime_type = CaptionMimeType::from_str(&caption_data.file.mime_type).ok()?; 53 - let caption_cid_bytes = caption_data.file.cid.to_bytes(); 57 + let caption_cid_str = caption_data.file.cid().as_str(); 58 + let caption_cid_parsed = caption_cid_str.parse::<cid::Cid>().ok()?; 59 + let caption_cid_bytes = caption_cid_parsed.to_bytes(); 54 60 let caption_cid = parakeet_db::cid_util::cid_to_digest_owned(&caption_cid_bytes)?; 55 61 56 62 captions[idx] = Some(VideoCaption { ··· 66 72 cid, 67 73 alt: embed.alt.clone(), 68 74 // Note: aspect_ratio in AppBskyEmbedVideo contains width/height 69 - width: embed.aspect_ratio.as_ref().map(|ar| ar.width), 70 - height: embed.aspect_ratio.as_ref().map(|ar| ar.height), 75 + width: embed.aspect_ratio.as_ref().and_then(|ar| ar.width.try_into().ok()), 76 + height: embed.aspect_ratio.as_ref().and_then(|ar| ar.height.try_into().ok()), 71 77 caption_1: captions[0].clone(), 72 78 caption_2: captions[1].clone(), 73 79 caption_3: captions[2].clone(), ··· 84 90 for (idx, image) in embed.images.iter().enumerate().take(4) { 85 91 let mime_type = ImageMimeType::from_str(&image.image.mime_type).ok(); 86 92 if let Some(mime_type) = mime_type { 87 - let cid_bytes = image.image.cid.to_bytes(); 88 - if let Some(cid) = parakeet_db::cid_util::cid_to_digest_owned(&cid_bytes) { 89 - images[idx] = Some(ImageEmbed { 90 - mime_type, 91 - cid, 92 - alt: Some(image.alt.clone()), 93 - width: image.aspect_ratio.as_ref().map(|ar| ar.width), 94 - height: image.aspect_ratio.as_ref().map(|ar| ar.height), 95 - }); 93 + let cid_str = image.image.cid().as_str(); 94 + if let Ok(cid_parsed) = cid_str.parse::<cid::Cid>() { 95 + let cid_bytes = cid_parsed.to_bytes(); 96 + if let Some(cid) = parakeet_db::cid_util::cid_to_digest_owned(&cid_bytes) { 97 + images[idx] = Some(ImageEmbed { 98 + mime_type, 99 + cid, 100 + alt: Some(image.alt.clone()), 101 + width: image.aspect_ratio.as_ref().and_then(|ar| ar.width.try_into().ok()), 102 + height: image.aspect_ratio.as_ref().and_then(|ar| ar.height.try_into().ok()), 103 + }); 104 + } 96 105 } 97 106 } 98 107 } ··· 118 127 119 128 for (idx, facet_main) in facets.iter().enumerate().take(8) { 120 129 // Get first feature from the facet 121 - let Some(feature_outer) = facet_main.features.first() else { 130 + let Some(feature) = facet_main.features.first() else { 122 131 continue; // Skip facets with no features 123 132 }; 124 133 125 - // Extract the inner Facet enum 126 - let facet = match feature_outer { 127 - FacetOuter::Bsky(f) => f, 128 - FacetOuter::Other(_) => continue, // Skip non-Bluesky facets 129 - }; 130 - 131 134 // Determine facet type and extract data 132 - let (facet_type, link_uri, mention_actor_id, tag) = match facet { 133 - Facet::Link { uri } => { 134 - (FacetType::Link, Some(uri.clone()), None, None) 135 + let (facet_type, link_uri, mention_actor_id, tag) = match feature { 136 + FacetFeaturesItem::Link(link) => { 137 + (FacetType::Link, Some(link.uri.as_str().to_string()), None, None) 135 138 } 136 - Facet::Mention { did } => { 139 + FacetFeaturesItem::Mention(mention) => { 137 140 // Look up actor_id from the pre-built map 138 - let actor_id = actor_id_map.get(did).copied(); 141 + let actor_id = actor_id_map.get(mention.did.as_ref()).copied(); 139 142 (FacetType::Mention, None, actor_id, None) 140 143 } 141 - Facet::Tag { tag: tag_str } => { 142 - (FacetType::Tag, None, None, Some(tag_str.clone())) 144 + FacetFeaturesItem::Tag(tag) => { 145 + (FacetType::Tag, None, None, Some(tag.tag.to_string())) 146 + } 147 + FacetFeaturesItem::Unknown(_) => { 148 + // Skip unknown facet types 149 + continue; 143 150 } 144 151 }; 145 152 146 153 facet_array[idx] = Some(FacetEmbed { 147 154 facet_type, 148 - index_start: facet_main.index.byte_start, 149 - index_end: facet_main.index.byte_end, 155 + index_start: facet_main.index.byte_start as i32, 156 + index_end: facet_main.index.byte_end as i32, 150 157 link_uri, 151 158 mention_actor_id, 152 159 tag, ··· 167 174 let mut mentions = Vec::new(); 168 175 169 176 for facet_main in facets.iter().take(8) { 170 - for feature_outer in &facet_main.features { 171 - if let FacetOuter::Bsky(Facet::Mention { did }) = feature_outer { 177 + for feature in &facet_main.features { 178 + if let FacetFeaturesItem::Mention(mention) = feature { 172 179 // Look up actor_id from DID 173 - let actor_id = actor_id_map.get(did).copied(); 180 + let actor_id = actor_id_map.get(mention.did.as_ref()).copied(); 174 181 mentions.push(actor_id); 175 182 } 176 183 }
+18 -13
consumer/src/db/labels.rs
··· 2 2 use crate::types::records::AppBskyLabelerService; 3 3 use deadpool_postgres::GenericClient; 4 4 use ipld_core::cid::Cid; 5 - use lexica::com_atproto::label::{LabelValueDefinition, SelfLabels}; 5 + use jacquard_api::com_atproto::label::{LabelValueDefinition, SelfLabels}; 6 6 use std::collections::HashMap; 7 7 8 8 pub async fn maintain_label_defs<C: GenericClient>( ··· 19 19 20 20 // Build labeler_defs array from label values and definitions 21 21 // Maps label_identifier -> definition 22 - let definitions = rec 23 - .policies 24 - .label_value_definitions 25 - .iter() 26 - .map(|def| (def.identifier.clone(), def)) 27 - .collect::<HashMap<String, &LabelValueDefinition>>(); 22 + let definitions = if let Some(ref defs) = rec.policies.label_value_definitions { 23 + defs.iter() 24 + .map(|def| (def.identifier.to_string(), def)) 25 + .collect::<HashMap<String, &LabelValueDefinition>>() 26 + } else { 27 + HashMap::new() 28 + }; 28 29 29 30 // Build arrays of values for each composite field 30 31 let mut label_identifiers = Vec::new(); ··· 34 35 let mut adult_onlys = Vec::new(); 35 36 let mut locales_vals = Vec::new(); 36 37 37 - for label in &rec.policies.label_values { 38 - let definition = definitions.get(label); 38 + // label_values is Vec<LabelValue> in jacquard (not Option) 39 + let label_values = rec.policies.label_values.as_slice(); 40 + for label in label_values { 41 + let label_str = label.as_str().to_string(); 42 + let definition = definitions.get(&label_str); 39 43 40 - label_identifiers.push(label.clone()); 44 + label_identifiers.push(label_str); 41 45 severities.push(definition.map(|v| v.severity.to_string())); 42 46 blurs_vals.push(definition.map(|v| v.blurs.to_string())); 43 - default_settings.push(definition.and_then(|v| v.default_setting).map(|v| v.to_string())); 47 + default_settings.push(definition.and_then(|v| v.default_setting.as_ref().map(|s| s.to_string()))); 44 48 adult_onlys.push(definition.and_then(|v| v.adult_only).unwrap_or_default()); 45 49 locales_vals.push(definition.and_then(|v| serde_json::to_value(&v.locales).ok())); 46 50 } ··· 86 90 repo: &str, 87 91 _cid: Option<Cid>, 88 92 at_uri: &str, 89 - self_labels: SelfLabels, 93 + self_labels: SelfLabels<'_>, 90 94 ) -> Result<u64> { 91 95 // Resolve actor_id from DID 92 96 let labeler_actor_id = super::actor_id_from_did(conn, repo).await?; ··· 109 113 let expires_vals: Vec<Option<String>> = vec![None; self_labels.values.len()]; // No expiration 110 114 111 115 for label in self_labels.values { 112 - labels_data.push(label.val.clone()); 116 + // The LabelValue type in jacquard uses 'val' field 117 + labels_data.push(label.val.to_string()); 113 118 } 114 119 115 120 if collection == "app.bsky.actor.profile" && rkey == "self" {
+10 -4
consumer/src/db/operations/actor.rs
··· 21 21 let cid_bytes = cid.to_bytes(); 22 22 let cid_digest = parakeet_db::cid_util::cid_to_digest(&cid_bytes) 23 23 .expect("CID must be valid AT Protocol CID"); 24 - let avatar = blob_to_cid_bytes(rec.avatar); 25 - let banner = blob_to_cid_bytes(rec.banner); 24 + let avatar = blob_to_cid_bytes(rec.avatar.as_ref()); 25 + let banner = blob_to_cid_bytes(rec.banner.as_ref()); 26 26 let (pinned_uri, _pinned_cid) = strongref_to_parts(rec.pinned_post.as_ref()); 27 27 let (joined_sp_uri, _joined_sp_cid) = strongref_to_parts(rec.joined_via_starter_pack.as_ref()); 28 28 ··· 152 152 let thumb = rec.embed.as_ref().and_then(|v| v.external.thumb.clone()); 153 153 let thumb_mime = thumb.as_ref().map(|v| crate::utils::strip_mime_params(&v.mime_type)); 154 154 let thumb_cid = thumb.as_ref().map(|v| { 155 - let cid_bytes = v.cid.to_bytes(); 155 + let cid_str = v.cid(); 156 + let cid_parsed = cid_str.as_str().parse::<cid::Cid>().expect("Valid CID"); 157 + let cid_bytes = cid_parsed.to_bytes(); 156 158 parakeet_db::cid_util::cid_to_digest(&cid_bytes) 157 159 .expect("CID must be valid AT Protocol CID") 158 160 .to_vec() ··· 208 210 status: Some(StatusOp::Set { 209 211 cid: cid_digest.to_vec(), 210 212 created_at: Some(rec.created_at), 211 - status_type: rec.status.to_string(), 213 + // Serialize status to JSON to extract the type string 214 + status_type: serde_json::to_value(&rec.status) 215 + .ok() 216 + .and_then(|v| v.get("$type").and_then(|t| t.as_str()).map(String::from)) 217 + .unwrap_or_else(|| "active".to_string()), 212 218 duration_minutes: rec.duration_minutes, 213 219 embed_post_actor_id, 214 220 embed_post_rkey,
+2 -1
consumer/src/db/operations/community.rs
··· 2 2 use super::feed::get_actor_id; 3 3 use deadpool_postgres::GenericClient; 4 4 use eyre::Context as _; 5 - use lexica::community_lexicon::bookmarks::Bookmark; 5 + // Use the local Bookmark type from handlers 6 + use crate::database_writer::operations::handlers::bookmark::Bookmark; 6 7 7 8 /// Upsert a bookmark into the actor's bookmarks[] array 8 9 ///
+4 -4
consumer/src/db/operations/feed/feedgen.rs
··· 16 16 /// Strip lexicon prefix from enum value 17 17 /// 18 18 /// Example: "app.bsky.feed.defs#contentModeUnspecified" -> "contentModeUnspecified" 19 + #[allow(dead_code)] 19 20 fn strip_lexicon_prefix(value: &str) -> &str { 20 21 value.split('#').nth(1).unwrap_or(value) 21 22 } ··· 56 57 let description_facets = rec 57 58 .description_facets 58 59 .and_then(|v| serde_json::to_value(v).ok()); 59 - let avatar = blob_to_cid_bytes(rec.avatar); 60 + let avatar = blob_to_cid_bytes(rec.avatar.as_ref()); 60 61 61 - // Strip lexicon prefix from content_mode 62 - // Example: "app.bsky.feed.defs#contentModeUnspecified" -> "contentModeUnspecified" 63 - let content_mode = rec.content_mode.as_ref().map(|s| strip_lexicon_prefix(s)); 62 + // Note: content_mode field not available in jacquard types 63 + let content_mode: Option<String> = None; 64 64 65 65 conn.query_one( 66 66 include_str!("../../sql/feedgen_upsert.sql"),
+14 -13
consumer/src/db/operations/feed/post.rs
··· 158 158 } 159 159 AppBskyEmbed::RecordWithMedia(rwm) => { 160 160 // Process media part 161 - let (ext, vid, i1, i2, i3, i4) = match rwm.media.as_ref() { 162 - AppBskyEmbed::Images(images) => { 163 - let (i1, i2, i3, i4) = composite_builders::build_image_embeds(images); 161 + let (ext, vid, i1, i2, i3, i4) = match &rwm.media { 162 + crate::types::records::MediaEmbed::Images(images) => { 163 + let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images); 164 164 (None, None, i1, i2, i3, i4) 165 165 } 166 - AppBskyEmbed::Video(video) => { 167 - (None, composite_builders::build_video_embed(video), None, None, None, None) 166 + crate::types::records::MediaEmbed::Video(video) => { 167 + (None, composite_builders::build_video_embed(&video), None, None, None, None) 168 168 } 169 - AppBskyEmbed::External(external) => { 170 - (composite_builders::build_ext_embed(external), None, None, None, None, None) 169 + crate::types::records::MediaEmbed::External(external) => { 170 + (composite_builders::build_ext_embed(&external), None, None, None, None, None) 171 171 } 172 - _ => (None, None, None, None, None, None), 173 172 }; 174 173 // Process record part 175 - let embed_uri = rwm.record.record.uri.as_str(); 176 - let embed_cid_str = rwm.record.record.cid.to_string(); 174 + let embed_uri = rwm.record.uri.as_str(); 175 + let embed_cid_str = rwm.record.cid.to_string(); 177 176 let embed_did = parakeet_db::at_uri_util::extract_did(embed_uri) 178 177 .ok_or_else(|| eyre::eyre!("Invalid embed URI: missing DID in {}", embed_uri))?; 179 178 let embed_rkey = parakeet_db::at_uri_util::extract_rkey(embed_uri) ··· 194 193 if let Some(ref facets) = rec.facets { 195 194 // Build a map of DID -> actor_id for mentioned actors 196 195 let mut actor_id_map: HashMap<String, i32> = HashMap::new(); 196 + use jacquard_api::app_bsky::richtext::facet::FacetFeaturesItem; 197 197 for facet_main in facets.iter() { 198 - for feature_outer in &facet_main.features { 199 - if let lexica::app_bsky::richtext::FacetOuter::Bsky(lexica::app_bsky::richtext::Facet::Mention { did }) = feature_outer { 198 + for feature in &facet_main.features { 199 + if let FacetFeaturesItem::Mention(mention) = feature { 200 + let did = mention.did.as_ref(); 200 201 if !actor_id_map.contains_key(did) { 201 202 // Resolve DID to actor_id 202 203 if let Ok((mention_actor_id, _, _)) = get_actor_id(conn, did).await { 203 - actor_id_map.insert(did.clone(), mention_actor_id); 204 + actor_id_map.insert(did.to_string(), mention_actor_id); 204 205 } 205 206 } 206 207 }
+18 -18
consumer/src/db/operations/feed/postgate.rs
··· 32 32 _cid: Cid, // No longer stored - postgates are denormalized 33 33 rec: &AppBskyFeedPostgate, 34 34 ) -> Result<u64> { 35 - let rules = rec 36 - .embedding_rules 37 - .iter() 38 - .map(|v| v.as_str().to_owned()) 39 - .collect::<Vec<_>>(); 35 + let rules: Vec<String> = vec![]; // TODO: Extract rules from DisableRule when needed 40 36 41 37 // Parse post URI to get post actor_id and rkey 42 - let post_did = parakeet_db::at_uri_util::extract_did(&rec.post) 38 + let post_did = parakeet_db::at_uri_util::extract_did(rec.post.as_str()) 43 39 .ok_or_else(|| eyre::eyre!("Invalid post URI in postgate: missing DID in {}", rec.post))?; 44 - let post_rkey_str = parakeet_db::at_uri_util::extract_rkey(&rec.post) 40 + let post_rkey_str = parakeet_db::at_uri_util::extract_rkey(rec.post.as_str()) 45 41 .ok_or_else(|| eyre::eyre!("Invalid post URI in postgate: {}", rec.post))?; 46 42 let post_rkey = parakeet_db::models::tid_to_i64(post_rkey_str) 47 43 .wrap_err_with(|| format!("Invalid TID in postgate post URI: {}", post_rkey_str))?; ··· 57 53 .get::<_, i32>(0); 58 54 59 55 // OPTIMIZATION: Resolve detached embedding URIs to (actor_id, rkey) pairs in Rust 60 - let (detached_actor_ids, detached_rkeys): (Vec<i32>, Vec<i64>) = if !rec.detached_embedding_uris.is_empty() { 61 - let uri_refs: Vec<&str> = rec.detached_embedding_uris.iter().map(|s| s.as_str()).collect(); 62 - let resolved = crate::db::bulk_resolve::resolve_post_uris_bulk(conn, &uri_refs).await?; 56 + let (detached_actor_ids, detached_rkeys): (Vec<i32>, Vec<i64>) = if let Some(ref uris) = rec.detached_embedding_uris { 57 + if !uris.is_empty() { 58 + let uri_refs: Vec<&str> = uris.iter().map(|uri| uri.as_str()).collect(); 59 + let resolved = crate::db::bulk_resolve::resolve_post_uris_bulk(conn, &uri_refs).await?; 63 60 64 - // Build parallel arrays for SQL 65 - let mut actor_ids = Vec::new(); 66 - let mut rkeys = Vec::new(); 67 - for uri in &rec.detached_embedding_uris { 68 - if let Some(&(aid, rk)) = resolved.get(uri.as_str()) { 69 - actor_ids.push(aid); 70 - rkeys.push(rk); 61 + // Build parallel arrays for SQL 62 + let mut actor_ids = Vec::new(); 63 + let mut rkeys = Vec::new(); 64 + for uri in uris { 65 + if let Some(&(aid, rk)) = resolved.get(uri.as_str()) { 66 + actor_ids.push(aid); 67 + rkeys.push(rk); 68 + } 71 69 } 70 + (actor_ids, rkeys) 71 + } else { 72 + (vec![], vec![]) 72 73 } 73 - (actor_ids, rkeys) 74 74 } else { 75 75 (vec![], vec![]) 76 76 };
+13 -23
consumer/src/db/operations/feed/threadgate.rs
··· 88 88 let allow = rec.allow.as_ref().map(|allow| { 89 89 allow 90 90 .iter() 91 - .map(|v| v.as_str().to_owned()) 91 + .map(|v| match v { 92 + crate::types::records::ThreadgateRule::MentionRule {} => crate::types::records::THREADGATE_RULE_MENTION.to_owned(), 93 + crate::types::records::ThreadgateRule::FollowingRule {} => crate::types::records::THREADGATE_RULE_FOLLOWING.to_owned(), 94 + crate::types::records::ThreadgateRule::List { .. } => crate::types::records::THREADGATE_RULE_LIST.to_owned(), 95 + }) 92 96 .collect::<Vec<_>>() 93 97 }); 94 98 95 99 // Parse post URI to get post actor_id and rkey 96 100 // Note: The post being protected is usually the same as the threadgate's actor, 97 101 // but we extract it from the URI to be safe 98 - let post_did = parakeet_db::at_uri_util::extract_did(&rec.post) 99 - .ok_or_else(|| eyre::eyre!("Invalid post URI in threadgate: missing DID in {}", rec.post))?; 100 - let post_rkey_str = parakeet_db::at_uri_util::extract_rkey(&rec.post) 101 - .ok_or_else(|| eyre::eyre!("Invalid post URI in threadgate: {}", rec.post))?; 102 + let post_uri = rec.post.as_ref() 103 + .ok_or_else(|| eyre::eyre!("Missing post URI in threadgate"))?; 104 + let post_did = parakeet_db::at_uri_util::extract_did(post_uri.uri.as_str()) 105 + .ok_or_else(|| eyre::eyre!("Invalid post URI in threadgate: missing DID in {}", post_uri.uri))?; 106 + let post_rkey_str = parakeet_db::at_uri_util::extract_rkey(post_uri.uri.as_str()) 107 + .ok_or_else(|| eyre::eyre!("Invalid post URI in threadgate: {}", post_uri.uri))?; 102 108 let post_rkey = parakeet_db::models::tid_to_i64(post_rkey_str) 103 109 .wrap_err_with(|| format!("Invalid TID in threadgate post URI: {}", post_rkey_str))?; 104 110 ··· 112 118 .wrap_err_with(|| format!("Failed to find actor for DID: {}", post_did))? 113 119 .get::<_, i32>(0); 114 120 115 - // OPTIMIZATION: Resolve hidden reply URIs to (actor_id, rkey) pairs in Rust 116 - let (hidden_actor_ids, hidden_rkeys): (Vec<i32>, Vec<i64>) = if !rec.hidden_replies.is_empty() { 117 - let uri_refs: Vec<&str> = rec.hidden_replies.iter().map(|s| s.as_str()).collect(); 118 - let resolved = crate::db::bulk_resolve::resolve_post_uris_bulk(conn, &uri_refs).await?; 119 - 120 - // Build parallel arrays for SQL 121 - let mut actor_ids = Vec::new(); 122 - let mut rkeys = Vec::new(); 123 - for uri in &rec.hidden_replies { 124 - if let Some(&(aid, rk)) = resolved.get(uri.as_str()) { 125 - actor_ids.push(aid); 126 - rkeys.push(rk); 127 - } 128 - } 129 - (actor_ids, rkeys) 130 - } else { 131 - (vec![], vec![]) 132 - }; 121 + // TODO: Handle hidden_replies when added to our custom type 122 + let (hidden_actor_ids, hidden_rkeys): (Vec<i32>, Vec<i64>) = (vec![], vec![]); 133 123 134 124 // Extract allowed list URIs from rules and resolve to natural keys 135 125 let (allowed_list_actor_ids, allowed_list_rkeys): (Vec<i32>, Vec<String>) = if let Some(ref allow_rules) = rec.allow {
+15 -5
consumer/src/db/operations/graph.rs
··· 190 190 let description_facets = rec 191 191 .description_facets 192 192 .and_then(|v| serde_json::to_value(v).ok()); 193 - let avatar = blob_to_cid_bytes(rec.avatar); 193 + let avatar = blob_to_cid_bytes(rec.avatar.as_ref()); 194 194 195 195 // Acquire table-scoped advisory lock on lists table for this record 196 196 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock_str("lists", actor_id, rkey); 197 197 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 198 198 199 + // Convert ListPurpose to string for database 200 + // Note: jacquard's ListPurpose doesn't expose variants directly, serialize to JSON to get the type 201 + let purpose = serde_json::to_value(&rec.purpose) 202 + .ok() 203 + .and_then(|v| v.get("$type").and_then(|t| t.as_str()).map(String::from)) 204 + .unwrap_or_else(|| "app.bsky.graph.defs#curatelist".to_string()); 205 + 199 206 conn.query_one( 200 207 include_str!("../sql/list_upsert.sql"), 201 208 &[ 202 209 &actor_id, 203 210 &cid_digest, 204 - &rec.purpose, 211 + &purpose, 205 212 &rec.name, 206 213 &rec.description, 207 214 &description_facets, ··· 242 249 243 250 // Parse list URI to get natural keys (list_actor_id, list_rkey) 244 251 // Format: at://did:plc:xxx/app.bsky.graph.list/rkey 245 - let parts: Vec<&str> = rec.subject.strip_prefix("at://") 252 + let parts: Vec<&str> = rec.subject.uri.as_str().strip_prefix("at://") 246 253 .ok_or_else(|| eyre::eyre!("Invalid AT URI"))? 247 254 .split('/') 248 255 .collect(); ··· 324 331 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 325 332 326 333 // Resolve list natural keys by creating stub list if needed 327 - let (list_owner_actor_id, list_rkey) = super::feed::ensure_list_natural_key(conn, &rec.list).await?; 334 + let (list_owner_actor_id, list_rkey) = super::feed::ensure_list_natural_key(conn, rec.list.uri.as_str()).await?; 328 335 329 336 conn.execute( 330 337 include_str!("../sql/list_item_upsert.sql"), ··· 377 384 378 385 // Insert verification - actors already resolved (no SELECT subquery needed) 379 386 // Note: created_at is derived from TID rkey 387 + let handle_str = rec.handle.to_string(); 388 + // display_name is CowStr, not Option<CowStr> 389 + let display_name_str = Some(rec.display_name.to_string()); 380 390 conn.execute( 381 391 "INSERT INTO verification (actor_id, rkey, cid, verifier_actor_id, subject_actor_id, handle, display_name) 382 392 VALUES ($1, $2, $3, $1, $4, $5, $6) 383 393 ON CONFLICT (actor_id, rkey) DO NOTHING", 384 - &[&actor_id, &rkey, &cid_digest, &subject_actor_id, &rec.handle, &rec.display_name], 394 + &[&actor_id, &rkey, &cid_digest, &subject_actor_id, &handle_str, &display_name_str], 385 395 ) 386 396 .await 387 397 .wrap_err_with(|| format!("Failed to insert verification for actor_id:{} rkey:{}", actor_id, rkey))
+6 -11
consumer/src/db/operations/labeler.rs
··· 53 53 54 54 /// Extract the short name from an AT Protocol enum URN 55 55 /// e.g., "com.atproto.moderation.defs#reasonSpam" -> "spam" 56 + #[allow(dead_code)] 56 57 fn extract_enum_short_name(urn: &str) -> String { 57 58 urn.split('#') 58 59 .nth(1) ··· 76 77 let cid_bytes = cid.to_bytes(); 77 78 let cid_digest = parakeet_db::cid_util::cid_to_digest(&cid_bytes) 78 79 .ok_or_eyre("CID must be valid AT Protocol CID")?; 79 - let reasons = rec.reason_types.as_ref().map(|v| { 80 - v.iter() 81 - .map(|r| extract_enum_short_name(&r.to_string())) 82 - .collect::<Vec<_>>() 83 - }); 84 - let subject_types = rec.subject_types.as_ref().map(|v| { 85 - v.iter() 86 - .map(|s| s.to_string().to_lowercase()) 87 - .collect::<Vec<_>>() 88 - }); 80 + // Note: reason_types, subject_types, and subject_collections not available in jacquard types 81 + let reasons: Option<Vec<String>> = None; 82 + let subject_types: Option<Vec<String>> = None; 83 + let subject_collections: Option<Vec<String>> = None; 89 84 90 85 let _ = conn 91 86 .execute( ··· 95 90 &cid_digest, 96 91 &reasons, 97 92 &subject_types, 98 - &rec.subject_collections, 93 + &subject_collections, 99 94 ], 100 95 ) 101 96 .await?;
+2 -2
consumer/src/db/operations/starter_pack.rs
··· 24 24 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 25 25 26 26 // Resolve list natural key by creating stub list if needed 27 - let (list_actor_id, list_rkey) = super::feed::ensure_list_natural_key(conn, &rec.list).await?; 27 + let (list_actor_id, list_rkey) = super::feed::ensure_list_natural_key(conn, rec.list.uri.as_str()).await?; 28 28 29 29 // Resolve feed URIs to feedgen natural keys in Rust 30 30 // Use parallel arrays for actor_ids and rkeys (easier than composite types for Postgres binding) ··· 35 35 for feed_ref in feeds { 36 36 // Parse feed URI to extract (did, rkey) 37 37 let (feed_did, collection, feed_rkey) = 38 - parakeet_db::at_uri_util::parse_at_uri(&feed_ref.uri) 38 + parakeet_db::at_uri_util::parse_at_uri(feed_ref.uri.as_str()) 39 39 .ok_or_else(|| eyre::eyre!("Invalid feed URI: {}", feed_ref.uri))?; 40 40 41 41 // Validate it's a feedgen URI
-1
consumer/src/lib.rs
··· 60 60 } 61 61 62 62 // Re-export commonly used utility functions at the crate root 63 - pub use utils::build_user_agent;
+118 -2
consumer/src/relay/types.rs
··· 1 1 use crate::types::records; 2 2 use serde::{Deserialize, Serialize}; 3 3 4 - #[derive(Debug, Deserialize, Serialize)] 4 + #[derive(Debug, Serialize)] 5 5 #[serde(tag = "$type")] 6 6 pub enum RecordTypes { 7 7 #[serde(rename = "app.bsky.actor.profile")] ··· 41 41 #[serde(rename = "chat.bsky.actor.declaration")] 42 42 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 43 43 #[serde(rename = "community.lexicon.bookmarks.bookmark")] 44 - CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark), 44 + CommunityLexiconBookmark(serde_json::Value), 45 45 #[serde(rename = "fm.team.alpa.actor.status")] 46 46 FmTealAlpaActorStatus(records::FmTealAlpaActorStatus), 47 47 #[serde(untagged)] 48 48 Unknown(serde_json::Value), 49 + } 50 + 51 + impl<'de> Deserialize<'de> for RecordTypes { 52 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 53 + where 54 + D: serde::Deserializer<'de>, 55 + { 56 + let value = serde_json::Value::deserialize(deserializer)?; 57 + let type_field = value.get("$type") 58 + .and_then(|v| v.as_str()) 59 + .ok_or_else(|| serde::de::Error::custom("missing $type field"))?; 60 + 61 + // Deserialize from JSON string to handle 'static lifetime requirements 62 + let json_str = value.to_string(); 63 + match type_field { 64 + "app.bsky.actor.profile" => { 65 + serde_json::from_str::<records::AppBskyActorProfile>(&json_str) 66 + .map(RecordTypes::AppBskyActorProfile) 67 + .map_err(serde::de::Error::custom) 68 + } 69 + "app.bsky.actor.status" => { 70 + serde_json::from_str::<records::AppBskyActorStatus>(&json_str) 71 + .map(RecordTypes::AppBskyActorStatus) 72 + .map_err(serde::de::Error::custom) 73 + } 74 + "app.bsky.feed.generator" => { 75 + serde_json::from_str::<records::AppBskyFeedGenerator>(&json_str) 76 + .map(RecordTypes::AppBskyFeedGenerator) 77 + .map_err(serde::de::Error::custom) 78 + } 79 + "app.bsky.feed.like" => { 80 + serde_json::from_str::<records::AppBskyFeedLike>(&json_str) 81 + .map(RecordTypes::AppBskyFeedLike) 82 + .map_err(serde::de::Error::custom) 83 + } 84 + "app.bsky.feed.post" => { 85 + serde_json::from_str::<records::AppBskyFeedPost>(&json_str) 86 + .map(RecordTypes::AppBskyFeedPost) 87 + .map_err(serde::de::Error::custom) 88 + } 89 + "app.bsky.feed.postgate" => { 90 + serde_json::from_str::<records::AppBskyFeedPostgate>(&json_str) 91 + .map(RecordTypes::AppBskyFeedPostgate) 92 + .map_err(serde::de::Error::custom) 93 + } 94 + "app.bsky.feed.repost" => { 95 + serde_json::from_str::<records::AppBskyFeedRepost>(&json_str) 96 + .map(RecordTypes::AppBskyFeedRepost) 97 + .map_err(serde::de::Error::custom) 98 + } 99 + "app.bsky.feed.threadgate" => { 100 + serde_json::from_str::<records::AppBskyFeedThreadgate>(&json_str) 101 + .map(RecordTypes::AppBskyFeedThreadgate) 102 + .map_err(serde::de::Error::custom) 103 + } 104 + "app.bsky.graph.block" => { 105 + serde_json::from_str::<records::AppBskyGraphBlock>(&json_str) 106 + .map(RecordTypes::AppBskyGraphBlock) 107 + .map_err(serde::de::Error::custom) 108 + } 109 + "app.bsky.graph.follow" => { 110 + serde_json::from_str::<records::AppBskyGraphFollow>(&json_str) 111 + .map(RecordTypes::AppBskyGraphFollow) 112 + .map_err(serde::de::Error::custom) 113 + } 114 + "app.bsky.graph.list" => { 115 + serde_json::from_str::<records::AppBskyGraphList>(&json_str) 116 + .map(RecordTypes::AppBskyGraphList) 117 + .map_err(serde::de::Error::custom) 118 + } 119 + "app.bsky.graph.listblock" => { 120 + serde_json::from_str::<records::AppBskyGraphListBlock>(&json_str) 121 + .map(RecordTypes::AppBskyGraphListBlock) 122 + .map_err(serde::de::Error::custom) 123 + } 124 + "app.bsky.graph.listitem" => { 125 + serde_json::from_str::<records::AppBskyGraphListItem>(&json_str) 126 + .map(RecordTypes::AppBskyGraphListItem) 127 + .map_err(serde::de::Error::custom) 128 + } 129 + "app.bsky.graph.starterpack" => { 130 + serde_json::from_str::<records::AppBskyGraphStarterPack>(&json_str) 131 + .map(RecordTypes::AppBskyGraphStarterPack) 132 + .map_err(serde::de::Error::custom) 133 + } 134 + "app.bsky.graph.verification" => { 135 + serde_json::from_str::<records::AppBskyGraphVerification>(&json_str) 136 + .map(RecordTypes::AppBskyGraphVerification) 137 + .map_err(serde::de::Error::custom) 138 + } 139 + "app.bsky.labeler.service" => { 140 + serde_json::from_str::<records::AppBskyLabelerService>(&json_str) 141 + .map(RecordTypes::AppBskyLabelerService) 142 + .map_err(serde::de::Error::custom) 143 + } 144 + "app.bsky.notification.declaration" => { 145 + serde_json::from_str::<records::AppBskyNotificationDeclaration>(&json_str) 146 + .map(RecordTypes::AppBskyNotificationDeclaration) 147 + .map_err(serde::de::Error::custom) 148 + } 149 + "chat.bsky.actor.declaration" => { 150 + serde_json::from_str::<records::ChatBskyActorDeclaration>(&json_str) 151 + .map(RecordTypes::ChatBskyActorDeclaration) 152 + .map_err(serde::de::Error::custom) 153 + } 154 + "community.lexicon.bookmarks.bookmark" => { 155 + Ok(RecordTypes::CommunityLexiconBookmark(value)) 156 + } 157 + "fm.team.alpa.actor.status" => { 158 + serde_json::from_str::<records::FmTealAlpaActorStatus>(&json_str) 159 + .map(RecordTypes::FmTealAlpaActorStatus) 160 + .map_err(serde::de::Error::custom) 161 + } 162 + _ => Ok(RecordTypes::Unknown(value)) 163 + } 164 + } 49 165 } 50 166 51 167 #[derive(Debug, PartialOrd, PartialEq, Eq, Deserialize, Serialize)]
+432 -168
consumer/src/types/records.rs
··· 1 1 use crate::utils; 2 2 use chrono::{DateTime, Utc}; 3 - use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions, Status}; 4 - use lexica::app_bsky::embed::AspectRatio; 5 - use lexica::app_bsky::labeler::LabelerPolicy; 6 - use lexica::app_bsky::richtext::FacetMain; 7 - use lexica::com_atproto::label::SelfLabels; 8 - use lexica::com_atproto::moderation::{ReasonType, SubjectType}; 9 - use lexica::{Blob, StrongRef}; 3 + use jacquard_api::app_bsky; 4 + use jacquard_api::com_atproto; 5 + use jacquard_common::types::blob::Blob; 6 + use jacquard_common::IntoStatic; 10 7 use serde::{Deserialize, Serialize}; 11 - use serde_with::serde_as; 8 + 9 + type StrongRef<'a> = com_atproto::repo::strong_ref::StrongRef<'a>; 10 + type SelfLabels<'a> = com_atproto::label::SelfLabels<'a>; 11 + 12 + // Wrapper structs for types that need custom deserialization 13 + #[derive(Debug, Serialize, Deserialize)] 14 + #[serde(transparent)] 15 + pub struct AppBskyNotificationDeclaration( 16 + #[serde(deserialize_with = "deserialize_notification_declaration")] 17 + pub app_bsky::notification::declaration::Declaration<'static> 18 + ); 19 + 20 + impl std::ops::Deref for AppBskyNotificationDeclaration { 21 + type Target = app_bsky::notification::declaration::Declaration<'static>; 22 + 23 + fn deref(&self) -> &Self::Target { 24 + &self.0 25 + } 26 + } 27 + 28 + #[derive(Debug, Serialize, Deserialize)] 29 + #[serde(transparent)] 30 + pub struct AppBskyGraphVerification( 31 + #[serde(deserialize_with = "deserialize_graph_verification")] 32 + pub app_bsky::graph::verification::Verification<'static> 33 + ); 34 + 35 + impl std::ops::Deref for AppBskyGraphVerification { 36 + type Target = app_bsky::graph::verification::Verification<'static>; 37 + 38 + fn deref(&self) -> &Self::Target { 39 + &self.0 40 + } 41 + } 42 + 43 + fn deserialize_notification_declaration<'de, D>(deserializer: D) -> Result<app_bsky::notification::declaration::Declaration<'static>, D::Error> 44 + where 45 + D: serde::Deserializer<'de>, 46 + { 47 + let decl: app_bsky::notification::declaration::Declaration<'_> = app_bsky::notification::declaration::Declaration::deserialize(deserializer)?; 48 + Ok(decl.into_static()) 49 + } 50 + 51 + fn deserialize_graph_verification<'de, D>(deserializer: D) -> Result<app_bsky::graph::verification::Verification<'static>, D::Error> 52 + where 53 + D: serde::Deserializer<'de>, 54 + { 55 + let ver: app_bsky::graph::verification::Verification<'_> = app_bsky::graph::verification::Verification::deserialize(deserializer)?; 56 + Ok(ver.into_static()) 57 + } 58 + 59 + pub type AppBskyGraphListBlock = AppBskyGraphListblock; 60 + pub type AppBskyGraphListItem = AppBskyGraphListitem; 61 + pub type AppBskyGraphStarterPack = AppBskyGraphStarterpack; 62 + 63 + pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule"; 64 + pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule"; 65 + pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule"; 66 + pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule"; 67 + 68 + // TODO: Placeholder types to be implemented later 69 + #[derive(Debug, Deserialize, Serialize, Default)] 70 + pub struct FmTealAlpaActorStatus {} 71 + 72 + #[derive(Debug, Deserialize, Serialize, Default)] 73 + pub struct PostgateEmbeddingRules {} 12 74 13 75 #[derive(Debug, Deserialize, Serialize)] 14 76 #[serde(rename_all = "camelCase")] 15 - #[serde_as] 16 77 pub struct AppBskyActorProfile { 17 - #[serde_as(as = "utils::safe_string")] 78 + #[serde(deserialize_with = "crate::utils::safe_string")] 18 79 pub display_name: Option<String>, 19 - #[serde_as(as = "utils::safe_string")] 80 + #[serde(deserialize_with = "crate::utils::safe_string")] 20 81 pub description: Option<String>, 21 - pub avatar: Option<Blob>, 22 - pub banner: Option<Blob>, 23 - pub labels: Option<SelfLabels>, 24 - pub joined_via_starter_pack: Option<StrongRef>, 25 - pub pinned_post: Option<StrongRef>, 26 - #[serde_as(as = "utils::safe_string")] 82 + #[serde(deserialize_with = "crate::utils::deserialize_optional_blob")] 83 + pub avatar: Option<Blob<'static>>, 84 + #[serde(deserialize_with = "crate::utils::deserialize_optional_blob")] 85 + pub banner: Option<Blob<'static>>, 86 + #[serde(deserialize_with = "deserialize_self_labels")] 87 + pub labels: Option<SelfLabels<'static>>, 88 + #[serde(deserialize_with = "crate::utils::deserialize_optional_strongref")] 89 + pub joined_via_starter_pack: Option<StrongRef<'static>>, 90 + #[serde(deserialize_with = "crate::utils::deserialize_optional_strongref")] 91 + pub pinned_post: Option<StrongRef<'static>>, 92 + #[serde(deserialize_with = "crate::utils::safe_string")] 27 93 pub pronouns: Option<String>, 28 - #[serde_as(as = "utils::safe_string")] 94 + #[serde(deserialize_with = "crate::utils::safe_string")] 29 95 pub website: Option<String>, 30 96 pub created_at: Option<DateTime<Utc>>, 31 97 } 32 98 99 + 100 + fn deserialize_self_labels<'de, D>(deserializer: D) -> Result<Option<SelfLabels<'static>>, D::Error> 101 + where 102 + D: serde::Deserializer<'de>, 103 + { 104 + let labels: Option<SelfLabels<'_>> = Option::deserialize(deserializer)?; 105 + Ok(labels.map(|l| l.into_static())) 106 + } 107 + 33 108 #[derive(Debug, Deserialize, Serialize)] 34 109 #[serde(rename_all = "camelCase")] 35 110 pub struct AppBskyActorStatus { 36 - pub status: Status, 111 + #[serde(deserialize_with = "deserialize_status")] 112 + pub status: app_bsky::actor::status::Status<'static>, 37 113 pub duration_minutes: Option<i32>, 38 114 pub embed: Option<AppBskyEmbedExternal>, 115 + pub created_at: DateTime<Utc>, 116 + } 117 + 118 + fn deserialize_status<'de, D>(deserializer: D) -> Result<app_bsky::actor::status::Status<'static>, D::Error> 119 + where 120 + D: serde::Deserializer<'de>, 121 + { 122 + let status: app_bsky::actor::status::Status<'_> = app_bsky::actor::status::Status::deserialize(deserializer)?; 123 + Ok(status.into_static()) 124 + } 125 + 126 + fn deserialize_facets<'de, D>(deserializer: D) -> Result<Option<Vec<app_bsky::richtext::facet::Facet<'static>>>, D::Error> 127 + where 128 + D: serde::Deserializer<'de>, 129 + { 130 + let facets: Option<Vec<app_bsky::richtext::facet::Facet<'_>>> = Option::deserialize(deserializer)?; 131 + Ok(facets.map(|v| v.into_iter().map(|f| f.into_static()).collect())) 132 + } 39 133 40 - pub created_at: DateTime<Utc>, 134 + fn deserialize_list_purpose<'de, D>(deserializer: D) -> Result<app_bsky::graph::ListPurpose<'static>, D::Error> 135 + where 136 + D: serde::Deserializer<'de>, 137 + { 138 + let purpose: app_bsky::graph::ListPurpose<'_> = app_bsky::graph::ListPurpose::deserialize(deserializer)?; 139 + Ok(purpose.into_static()) 140 + } 141 + 142 + fn deserialize_aspect_ratio<'de, D>(deserializer: D) -> Result<Option<app_bsky::embed::AspectRatio<'static>>, D::Error> 143 + where 144 + D: serde::Deserializer<'de>, 145 + { 146 + let aspect: Option<app_bsky::embed::AspectRatio<'_>> = Option::deserialize(deserializer)?; 147 + Ok(aspect.map(|a| a.into_static())) 148 + } 149 + 150 + 151 + fn deserialize_labeler_policies<'de, D>(deserializer: D) -> Result<app_bsky::labeler::LabelerPolicies<'static>, D::Error> 152 + where 153 + D: serde::Deserializer<'de>, 154 + { 155 + let policies: app_bsky::labeler::LabelerPolicies<'_> = app_bsky::labeler::LabelerPolicies::deserialize(deserializer)?; 156 + Ok(policies.into_static()) 157 + } 158 + 159 + fn deserialize_reason_type<'de, D>(deserializer: D) -> Result<Option<com_atproto::moderation::ReasonType<'static>>, D::Error> 160 + where 161 + D: serde::Deserializer<'de>, 162 + { 163 + let reason: Option<com_atproto::moderation::ReasonType<'_>> = Option::deserialize(deserializer)?; 164 + Ok(reason.map(|r| r.into_static())) 41 165 } 42 166 43 167 #[derive(Clone, Debug, Deserialize, Serialize)] ··· 120 244 #[derive(Clone, Debug, Deserialize, Serialize)] 121 245 #[serde(rename_all = "camelCase")] 122 246 pub struct EmbedImage { 123 - pub image: Blob, 124 - #[serde(deserialize_with = "utils::safe_string")] 247 + #[serde(deserialize_with = "deserialize_blob")] 248 + pub image: Blob<'static>, 249 + #[serde(deserialize_with = "crate::utils::safe_string_required")] 125 250 pub alt: String, 126 251 #[serde(skip_serializing_if = "Option::is_none")] 127 - pub aspect_ratio: Option<AspectRatio>, 252 + #[serde(deserialize_with = "deserialize_aspect_ratio")] 253 + pub aspect_ratio: Option<app_bsky::embed::AspectRatio<'static>>, 254 + } 255 + 256 + fn deserialize_blob<'de, D>(deserializer: D) -> Result<Blob<'static>, D::Error> 257 + where 258 + D: serde::Deserializer<'de>, 259 + { 260 + let blob: Blob<'_> = Blob::deserialize(deserializer)?; 261 + Ok(blob.into_static()) 128 262 } 129 263 130 264 #[derive(Clone, Debug, Deserialize, Serialize)] 131 265 #[serde(rename_all = "camelCase")] 132 - #[serde_as] 133 266 pub struct AppBskyEmbedVideo { 134 - pub video: Blob, 267 + #[serde(deserialize_with = "deserialize_blob")] 268 + pub video: Blob<'static>, 135 269 #[serde(skip_serializing_if = "Option::is_none")] 136 270 pub captions: Option<Vec<EmbedVideoCaptions>>, 137 271 #[serde(skip_serializing_if = "Option::is_none")] 138 - #[serde_as(as = "utils::safe_string")] 272 + #[serde(deserialize_with = "crate::utils::safe_string")] 139 273 pub alt: Option<String>, 140 274 #[serde(skip_serializing_if = "Option::is_none")] 141 - pub aspect_ratio: Option<AspectRatio>, 275 + #[serde(deserialize_with = "deserialize_aspect_ratio")] 276 + pub aspect_ratio: Option<app_bsky::embed::AspectRatio<'static>>, 142 277 } 143 278 144 279 #[derive(Clone, Debug, Deserialize, Serialize)] 145 280 pub struct EmbedVideoCaptions { 146 281 pub lang: String, 147 - pub file: Blob, 282 + #[serde(deserialize_with = "deserialize_blob")] 283 + pub file: Blob<'static>, 148 284 } 149 285 150 286 #[derive(Clone, Debug, Deserialize, Serialize)] 151 - #[serde(tag = "$type")] 152 - #[serde(rename = "app.bsky.embed.external")] 287 + #[serde(rename_all = "camelCase")] 153 288 pub struct AppBskyEmbedExternal { 154 289 pub external: EmbedExternal, 155 290 } 156 291 157 292 #[derive(Clone, Debug, Deserialize, Serialize)] 293 + #[serde(rename_all = "camelCase")] 158 294 pub struct EmbedExternal { 159 295 pub uri: String, 160 - #[serde(deserialize_with = "utils::safe_string")] 296 + #[serde(deserialize_with = "crate::utils::safe_string_required")] 161 297 pub title: String, 162 - #[serde(deserialize_with = "utils::safe_string")] 298 + #[serde(deserialize_with = "crate::utils::safe_string_required")] 163 299 pub description: String, 164 300 #[serde(skip_serializing_if = "Option::is_none")] 165 - pub thumb: Option<Blob>, 301 + #[serde(deserialize_with = "utils::deserialize_optional_blob")] 302 + pub thumb: Option<Blob<'static>>, 166 303 } 167 304 168 305 #[derive(Clone, Debug, Deserialize, Serialize)] 306 + #[serde(rename_all = "camelCase")] 169 307 pub struct AppBskyEmbedRecord { 170 - pub record: StrongRef, 308 + #[serde(deserialize_with = "deserialize_strongref")] 309 + pub record: StrongRef<'static>, 310 + } 311 + 312 + fn deserialize_strongref<'de, D>(deserializer: D) -> Result<StrongRef<'static>, D::Error> 313 + where 314 + D: serde::Deserializer<'de>, 315 + { 316 + let sr: StrongRef<'_> = StrongRef::deserialize(deserializer)?; 317 + Ok(sr.into_static()) 171 318 } 172 319 173 320 #[derive(Clone, Debug, Deserialize, Serialize)] 321 + #[serde(rename_all = "camelCase")] 174 322 pub struct AppBskyEmbedRecordWithMedia { 175 - pub record: AppBskyEmbedRecord, 176 - pub media: Box<AppBskyEmbed>, 323 + #[serde(deserialize_with = "deserialize_strongref")] 324 + pub record: StrongRef<'static>, 325 + pub media: MediaEmbed, 177 326 } 178 327 179 - #[derive(Debug, Deserialize, Serialize)] 180 - #[serde(rename_all = "camelCase")] 181 - #[serde_as] 182 - pub struct AppBskyFeedGenerator { 183 - pub did: String, 184 - #[serde(deserialize_with = "utils::safe_string")] 185 - pub display_name: String, 186 - #[serde_as(as = "utils::safe_string")] 187 - pub description: Option<String>, 188 - pub description_facets: Option<Vec<FacetMain>>, 189 - pub avatar: Option<Blob>, 190 - pub accepts_interactions: Option<bool>, 191 - pub labels: Option<SelfLabels>, 192 - pub content_mode: Option<String>, 193 - pub created_at: DateTime<Utc>, 328 + #[derive(Clone, Debug, Deserialize, Serialize)] 329 + #[serde(tag = "$type")] 330 + pub enum MediaEmbed { 331 + #[serde(rename = "app.bsky.embed.images")] 332 + Images(AppBskyEmbedImages), 333 + #[serde(rename = "app.bsky.embed.video")] 334 + Video(AppBskyEmbedVideo), 335 + #[serde(rename = "app.bsky.embed.external")] 336 + External(AppBskyEmbedExternal), 194 337 } 195 338 196 - #[derive(Debug, Deserialize, Serialize)] 197 - #[serde(rename_all = "camelCase")] 198 - pub struct AppBskyFeedLike { 199 - pub subject: StrongRef, 200 - pub created_at: DateTime<Utc>, 201 - pub via: Option<StrongRef>, 339 + impl MediaEmbed { 340 + pub const fn as_str(&self) -> &'static str { 341 + match self { 342 + Self::Images(_) => "app.bsky.embed.images", 343 + Self::Video(_) => "app.bsky.embed.video", 344 + Self::External(_) => "app.bsky.embed.external", 345 + } 346 + } 202 347 } 203 348 204 349 #[derive(Debug, Deserialize, Serialize)] 205 - #[serde(tag = "$type")] 206 - #[serde(rename = "app.bsky.feed.post")] 207 350 #[serde(rename_all = "camelCase")] 208 351 pub struct AppBskyFeedPost { 209 - #[serde(deserialize_with = "utils::safe_string")] 352 + #[serde(deserialize_with = "crate::utils::safe_string_required")] 210 353 pub text: String, 211 - #[serde(skip_serializing_if = "Option::is_none")] 212 - pub facets: Option<Vec<FacetMain>>, 213 - #[serde(skip_serializing_if = "Option::is_none")] 354 + pub entities: Option<Vec<serde_json::Value>>, 355 + #[serde(deserialize_with = "deserialize_facets")] 356 + pub facets: Option<Vec<app_bsky::richtext::facet::Facet<'static>>>, 214 357 pub reply: Option<ReplyRef>, 215 - #[serde(skip_serializing_if = "Option::is_none")] 216 358 pub embed: Option<EmbedOuter>, 217 - #[serde(skip_serializing_if = "Option::is_none")] 218 359 pub langs: Option<Vec<String>>, 219 - #[serde(skip_serializing_if = "Option::is_none")] 220 - pub labels: Option<SelfLabels>, 221 - #[serde(skip_serializing_if = "Option::is_none")] 360 + #[serde(deserialize_with = "deserialize_self_labels")] 361 + pub labels: Option<SelfLabels<'static>>, 222 362 pub tags: Option<Vec<String>>, 223 363 pub created_at: DateTime<Utc>, 224 364 } 225 365 226 366 #[derive(Debug, Deserialize, Serialize)] 227 367 #[serde(rename_all = "camelCase")] 228 - pub struct AppBskyFeedPostgate { 229 - pub post: String, 230 - pub created_at: DateTime<Utc>, 231 - #[serde(default)] 232 - pub detached_embedding_uris: Vec<String>, 233 - #[serde(default)] 234 - pub embedding_rules: Vec<PostgateEmbeddingRules>, 368 + pub struct ReplyRef { 369 + #[serde(deserialize_with = "deserialize_strongref")] 370 + pub root: StrongRef<'static>, 371 + #[serde(deserialize_with = "deserialize_strongref")] 372 + pub parent: StrongRef<'static>, 235 373 } 236 374 237 - #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Deserialize, Serialize)] 238 - #[serde(tag = "$type")] 239 - pub enum PostgateEmbeddingRules { 240 - #[serde(rename = "app.bsky.feed.postgate#disableRule")] 241 - Disable, 375 + #[derive(Debug, Deserialize, Serialize)] 376 + #[serde(rename_all = "camelCase")] 377 + pub struct AppBskyFeedRepost { 378 + #[serde(deserialize_with = "deserialize_strongref")] 379 + pub subject: StrongRef<'static>, 380 + #[serde(skip_serializing_if = "Option::is_none")] 381 + #[serde(deserialize_with = "crate::utils::deserialize_optional_strongref")] 382 + pub via: Option<StrongRef<'static>>, 383 + pub created_at: DateTime<Utc>, 242 384 } 243 385 244 - impl PostgateEmbeddingRules { 245 - pub const fn as_str(self) -> &'static str { 246 - match self { 247 - Self::Disable => "app.bsky.feed.postgate#disableRule", 248 - } 249 - } 386 + #[derive(Debug, Deserialize, Serialize)] 387 + #[serde(rename_all = "camelCase")] 388 + pub struct AppBskyFeedLike { 389 + #[serde(deserialize_with = "deserialize_strongref")] 390 + pub subject: StrongRef<'static>, 391 + #[serde(skip_serializing_if = "Option::is_none")] 392 + #[serde(deserialize_with = "crate::utils::deserialize_optional_strongref")] 393 + pub via: Option<StrongRef<'static>>, 394 + pub created_at: DateTime<Utc>, 250 395 } 251 396 252 397 #[derive(Debug, Deserialize, Serialize)] 253 398 #[serde(rename_all = "camelCase")] 254 - pub struct AppBskyFeedRepost { 255 - pub subject: StrongRef, 399 + pub struct AppBskyFeedGenerator { 400 + pub did: String, 401 + #[serde(deserialize_with = "crate::utils::safe_string_required")] 402 + pub display_name: String, 403 + #[serde(deserialize_with = "crate::utils::safe_string")] 404 + pub description: Option<String>, 405 + #[serde(deserialize_with = "deserialize_facets")] 406 + pub description_facets: Option<Vec<app_bsky::richtext::facet::Facet<'static>>>, 407 + #[serde(deserialize_with = "utils::deserialize_optional_blob")] 408 + pub avatar: Option<Blob<'static>>, 409 + #[serde(deserialize_with = "deserialize_self_labels")] 410 + pub labels: Option<SelfLabels<'static>>, 256 411 pub created_at: DateTime<Utc>, 257 - pub via: Option<StrongRef>, 412 + } 413 + 414 + #[derive(Debug, Serialize, Deserialize)] 415 + #[serde(transparent)] 416 + pub struct AppBskyFeedPostgate( 417 + #[serde(deserialize_with = "deserialize_feed_postgate")] 418 + pub app_bsky::feed::postgate::Postgate<'static> 419 + ); 420 + 421 + impl std::ops::Deref for AppBskyFeedPostgate { 422 + type Target = app_bsky::feed::postgate::Postgate<'static>; 423 + 424 + fn deref(&self) -> &Self::Target { 425 + &self.0 426 + } 427 + } 428 + 429 + fn deserialize_feed_postgate<'de, D>(deserializer: D) -> Result<app_bsky::feed::postgate::Postgate<'static>, D::Error> 430 + where 431 + D: serde::Deserializer<'de>, 432 + { 433 + let postgate: app_bsky::feed::postgate::Postgate<'_> = app_bsky::feed::postgate::Postgate::deserialize(deserializer)?; 434 + Ok(postgate.into_static()) 435 + } 436 + 437 + #[derive(Clone, Debug, Deserialize, Serialize)] 438 + #[serde(tag = "$type")] 439 + pub enum PostgateRule { 440 + #[serde(rename = "app.bsky.feed.postgate#disableRule")] 441 + DisableRule {}, 258 442 } 259 443 260 444 #[derive(Debug, Deserialize, Serialize)] 261 - #[serde(tag = "$type")] 262 - #[serde(rename = "app.bsky.feed.threadgate")] 263 445 #[serde(rename_all = "camelCase")] 264 446 pub struct AppBskyFeedThreadgate { 265 - pub post: String, 266 - pub created_at: DateTime<Utc>, 447 + #[serde(deserialize_with = "utils::deserialize_optional_strongref")] 448 + pub post: Option<StrongRef<'static>>, 267 449 pub allow: Option<Vec<ThreadgateRule>>, 268 - #[serde(default)] 269 - pub hidden_replies: Vec<String>, 450 + pub created_at: DateTime<Utc>, 270 451 } 271 452 272 - pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule"; 273 - pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule"; 274 - pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule"; 275 - pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule"; 276 - 277 - #[derive(Debug, Deserialize, Serialize)] 453 + #[derive(Clone, Debug, Deserialize, Serialize)] 278 454 #[serde(tag = "$type")] 279 455 pub enum ThreadgateRule { 280 456 #[serde(rename = "app.bsky.feed.threadgate#mentionRule")] 281 - Mention, 282 - #[serde(rename = "app.bsky.feed.threadgate#followerRule")] 283 - Follower, 457 + MentionRule {}, 284 458 #[serde(rename = "app.bsky.feed.threadgate#followingRule")] 285 - Following, 459 + FollowingRule {}, 286 460 #[serde(rename = "app.bsky.feed.threadgate#listRule")] 287 461 List { list: String }, 288 462 } 289 463 290 - impl ThreadgateRule { 291 - pub const fn as_str(&self) -> &'static str { 292 - match self { 293 - Self::Mention => THREADGATE_RULE_MENTION, 294 - Self::Follower => THREADGATE_RULE_FOLLOWER, 295 - Self::Following => THREADGATE_RULE_FOLLOWING, 296 - Self::List { .. } => THREADGATE_RULE_LIST, 297 - } 298 - } 299 - } 300 - 301 - #[derive(Debug, Deserialize, Serialize)] 302 - pub struct ReplyRef { 303 - pub root: StrongRef, 304 - pub parent: StrongRef, 305 - } 306 - 307 464 #[derive(Debug, Deserialize, Serialize)] 308 465 #[serde(rename_all = "camelCase")] 309 - pub struct AppBskyGraphBlock { 466 + pub struct AppBskyGraphFollow { 310 467 pub subject: String, 311 468 pub created_at: DateTime<Utc>, 312 469 } 313 470 314 471 #[derive(Debug, Deserialize, Serialize)] 315 472 #[serde(rename_all = "camelCase")] 316 - pub struct AppBskyGraphFollow { 473 + pub struct AppBskyGraphBlock { 317 474 pub subject: String, 318 475 pub created_at: DateTime<Utc>, 319 476 } 320 477 321 478 #[derive(Debug, Deserialize, Serialize)] 322 479 #[serde(rename_all = "camelCase")] 323 - #[serde_as] 324 480 pub struct AppBskyGraphList { 325 - pub purpose: String, 326 - #[serde(deserialize_with = "utils::safe_string")] 481 + #[serde(deserialize_with = "deserialize_list_purpose")] 482 + pub purpose: app_bsky::graph::ListPurpose<'static>, 483 + #[serde(deserialize_with = "crate::utils::safe_string_required")] 327 484 pub name: String, 328 - #[serde_as(as = "utils::safe_string")] 485 + #[serde(deserialize_with = "crate::utils::safe_string")] 329 486 pub description: Option<String>, 330 - pub description_facets: Option<Vec<FacetMain>>, 331 - pub avatar: Option<Blob>, 332 - pub labels: Option<SelfLabels>, 487 + #[serde(deserialize_with = "deserialize_facets")] 488 + pub description_facets: Option<Vec<app_bsky::richtext::facet::Facet<'static>>>, 489 + #[serde(deserialize_with = "utils::deserialize_optional_blob")] 490 + pub avatar: Option<Blob<'static>>, 491 + #[serde(deserialize_with = "deserialize_self_labels")] 492 + pub labels: Option<SelfLabels<'static>>, 333 493 pub created_at: DateTime<Utc>, 334 494 } 335 495 336 496 #[derive(Debug, Deserialize, Serialize)] 337 497 #[serde(rename_all = "camelCase")] 338 - pub struct AppBskyGraphListItem { 498 + pub struct AppBskyGraphListitem { 499 + #[serde(deserialize_with = "deserialize_strongref")] 500 + pub list: StrongRef<'static>, 339 501 pub subject: String, 340 - pub list: String, 341 502 pub created_at: DateTime<Utc>, 342 503 } 343 504 344 505 #[derive(Debug, Deserialize, Serialize)] 345 506 #[serde(rename_all = "camelCase")] 346 - pub struct AppBskyGraphListBlock { 347 - pub subject: String, 507 + pub struct AppBskyGraphListblock { 508 + #[serde(deserialize_with = "deserialize_strongref")] 509 + pub subject: StrongRef<'static>, 348 510 pub created_at: DateTime<Utc>, 349 511 } 350 512 351 513 #[derive(Debug, Deserialize, Serialize)] 352 - #[serde(tag = "$type")] 353 - #[serde(rename = "app.bsky.graph.starterpack")] 354 514 #[serde(rename_all = "camelCase")] 355 - #[serde_as] 356 - pub struct AppBskyGraphStarterPack { 357 - #[serde(deserialize_with = "utils::safe_string")] 515 + pub struct AppBskyGraphStarterpack { 516 + #[serde(deserialize_with = "crate::utils::safe_string_required")] 358 517 pub name: String, 359 - #[serde_as(as = "utils::safe_string")] 518 + #[serde(deserialize_with = "crate::utils::safe_string")] 360 519 pub description: Option<String>, 361 - pub description_facets: Option<Vec<FacetMain>>, 362 - pub list: String, 363 - pub feeds: Option<Vec<StarterPackFeedItem>>, 520 + #[serde(deserialize_with = "deserialize_facets")] 521 + pub description_facets: Option<Vec<app_bsky::richtext::facet::Facet<'static>>>, 522 + #[serde(deserialize_with = "deserialize_strongref")] 523 + pub list: StrongRef<'static>, 524 + pub feeds: Option<Vec<FeedRule>>, 364 525 pub created_at: DateTime<Utc>, 365 526 } 366 527 367 - #[derive(Debug, Deserialize, Serialize)] 368 - pub struct StarterPackFeedItem { 528 + #[derive(Clone, Debug, Deserialize, Serialize)] 529 + pub struct FeedRule { 369 530 pub uri: String, 370 531 } 371 532 372 533 #[derive(Debug, Deserialize, Serialize)] 373 534 #[serde(rename_all = "camelCase")] 374 - pub struct AppBskyGraphVerification { 375 - pub subject: String, 376 - #[serde(deserialize_with = "utils::safe_string")] 377 - pub handle: String, 378 - #[serde(deserialize_with = "utils::safe_string")] 379 - pub display_name: String, 535 + pub struct AppBskyLabelerService { 536 + #[serde(deserialize_with = "deserialize_labeler_policies")] 537 + pub policies: app_bsky::labeler::LabelerPolicies<'static>, 538 + #[serde(deserialize_with = "deserialize_self_labels")] 539 + pub labels: Option<SelfLabels<'static>>, 380 540 pub created_at: DateTime<Utc>, 381 541 } 382 542 383 543 #[derive(Debug, Deserialize, Serialize)] 384 544 #[serde(rename_all = "camelCase")] 385 - pub struct AppBskyLabelerService { 386 - pub policies: LabelerPolicy, 387 - pub labels: Option<SelfLabels>, 388 - pub reason_types: Option<Vec<ReasonType>>, 389 - pub subject_types: Option<Vec<SubjectType>>, 390 - pub subject_collections: Option<Vec<String>>, 545 + pub struct ComAtprotoAdminDefs { 546 + pub disabled: Option<bool>, 547 + pub invites: Option<ComAtprotoServerDefsInviteCode>, 548 + } 549 + 550 + #[derive(Debug, Deserialize, Serialize)] 551 + #[serde(rename_all = "camelCase")] 552 + pub struct ComAtprotoServerDefsInviteCode { 553 + pub code: String, 554 + pub available: i32, 555 + pub disabled: bool, 556 + pub for_account: String, 557 + pub created_by: String, 391 558 pub created_at: DateTime<Utc>, 559 + pub uses: Vec<ComAtprotoServerDefsInviteCodeUse>, 392 560 } 393 561 394 562 #[derive(Debug, Deserialize, Serialize)] 395 563 #[serde(rename_all = "camelCase")] 396 - pub struct AppBskyNotificationDeclaration { 397 - pub allow_subscriptions: ProfileAllowSubscriptions, 564 + pub struct ComAtprotoServerDefsInviteCodeUse { 565 + pub used_by: String, 566 + pub used_at: DateTime<Utc>, 398 567 } 399 568 400 569 #[derive(Debug, Deserialize, Serialize)] 401 570 #[serde(rename_all = "camelCase")] 402 571 pub struct ChatBskyActorDeclaration { 403 - pub allow_incoming: ChatAllowIncoming, 572 + pub allow_incoming: String, // In jacquard this is just a string 404 573 } 405 574 406 575 #[derive(Debug, Deserialize, Serialize)] 407 576 #[serde(rename_all = "camelCase")] 408 - pub struct FmTealAlpaActorStatus { 409 - pub item: FmTealAlpaActorStatusItem, 410 - pub time: String, // Unix Epoch 411 - pub expiry: String, // Unix Epoch 577 + pub struct ComAtprotoLabelDefs { 578 + pub ver: Option<i32>, 579 + pub src: String, 580 + pub uri: String, 581 + pub cid: Option<String>, 582 + pub val: String, 583 + pub neg: Option<bool>, 584 + pub cts: DateTime<Utc>, 585 + pub exp: Option<DateTime<Utc>>, 586 + pub sig: Option<Vec<u8>>, 412 587 } 413 588 414 589 #[derive(Debug, Deserialize, Serialize)] 415 590 #[serde(rename_all = "camelCase")] 416 - pub struct FmTealAlpaActorStatusItem { 417 - pub artists: Vec<String>, 418 - pub track_name: String, 591 + pub struct ComAtprotoModerationDefs { 592 + #[serde(deserialize_with = "deserialize_reason_type")] 593 + pub reason_type: Option<com_atproto::moderation::ReasonType<'static>>, 594 + pub reason: Option<String>, 595 + pub subject: ComAtprotoModerationDefsSubject, 596 + pub created_at: DateTime<Utc>, 419 597 } 598 + 599 + #[derive(Debug, Serialize)] 600 + #[serde(tag = "$type")] 601 + pub enum ComAtprotoModerationDefsSubject { 602 + #[serde(rename = "com.atproto.admin.defs#repoRef")] 603 + RepoRef { did: String }, 604 + #[serde(rename = "com.atproto.repo.strongRef")] 605 + StrongRef(StrongRef<'static>), 606 + } 607 + 608 + impl<'de> Deserialize<'de> for ComAtprotoModerationDefsSubject { 609 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 610 + where 611 + D: serde::Deserializer<'de>, 612 + { 613 + 614 + use serde::de::MapAccess; 615 + use serde::de::Visitor; 616 + use std::fmt; 617 + 618 + struct SubjectVisitor; 619 + 620 + impl<'de> Visitor<'de> for SubjectVisitor { 621 + type Value = ComAtprotoModerationDefsSubject; 622 + 623 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 624 + formatter.write_str("ComAtprotoModerationDefsSubject") 625 + } 626 + 627 + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> 628 + where 629 + A: MapAccess<'de>, 630 + { 631 + let mut type_field = None; 632 + let mut did = None; 633 + let mut strongref_fields = serde_json::Map::new(); 634 + 635 + while let Some(key) = map.next_key::<String>()? { 636 + match key.as_str() { 637 + "$type" => type_field = Some(map.next_value::<String>()?), 638 + "did" => did = Some(map.next_value::<String>()?), 639 + other => { 640 + strongref_fields.insert(other.to_string(), map.next_value()?); 641 + } 642 + } 643 + } 644 + 645 + let type_field = type_field.ok_or_else(|| serde::de::Error::missing_field("$type"))?; 646 + 647 + match type_field.as_str() { 648 + "com.atproto.admin.defs#repoRef" => { 649 + Ok(ComAtprotoModerationDefsSubject::RepoRef { 650 + did: did.ok_or_else(|| serde::de::Error::missing_field("did"))?, 651 + }) 652 + } 653 + "com.atproto.repo.strongRef" => { 654 + let uri_string = strongref_fields.get("uri") 655 + .and_then(|v| v.as_str()) 656 + .ok_or_else(|| serde::de::Error::missing_field("uri"))? 657 + .to_string(); 658 + let cid_string = strongref_fields.get("cid") 659 + .and_then(|v| v.as_str()) 660 + .ok_or_else(|| serde::de::Error::missing_field("cid"))? 661 + .to_string(); 662 + 663 + use jacquard_api::com_atproto::repo::strong_ref::StrongRefBuilder; 664 + use jacquard_common::types::string::{AtUri, Cid}; 665 + 666 + let at_uri = AtUri::try_from(uri_string) 667 + .map_err(|e| serde::de::Error::custom(format!("Invalid AT URI: {}", e)))?; 668 + let cid_val = Cid::try_from(cid_string) 669 + .map_err(|e| serde::de::Error::custom(format!("Invalid CID: {}", e)))?; 670 + let strongref = StrongRefBuilder::new() 671 + .uri(at_uri) 672 + .cid(cid_val) 673 + .build(); 674 + Ok(ComAtprotoModerationDefsSubject::StrongRef(strongref)) 675 + } 676 + _ => Err(serde::de::Error::unknown_variant(&type_field, &["com.atproto.admin.defs#repoRef", "com.atproto.repo.strongRef"])) 677 + } 678 + } 679 + } 680 + 681 + deserializer.deserialize_map(SubjectVisitor) 682 + } 683 + }
+45 -58
consumer/src/utils.rs
··· 1 - use lexica::app_bsky::richtext::{Facet, FacetMain, FacetOuter}; 2 - use lexica::{Blob, StrongRef}; 1 + use jacquard_api::app_bsky::richtext::facet::{Facet as FacetMain, FacetFeaturesItem}; 2 + use jacquard_common::types::blob::Blob; 3 + use jacquard_api::com_atproto::repo::strong_ref::StrongRef; 3 4 use serde::{Deserialize as _, Deserializer}; 4 5 5 6 // see https://deer.social/profile/did:plc:63y3oh7iakdueqhlj6trojbq/post/3ltuv4skhqs2h 6 - pub fn safe_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> { 7 - let str = String::deserialize(deserializer)?; 7 + pub fn safe_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> { 8 + let str = Option::<String>::deserialize(deserializer)?; 8 9 10 + Ok(str.map(|s| s.replace('\u{0000}', ""))) 11 + } 12 + 13 + pub fn safe_string_required<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> { 14 + let str = String::deserialize(deserializer)?; 9 15 Ok(str.replace('\u{0000}', "")) 10 16 } 11 17 18 + pub fn strip_mime_params(mime_type: &str) -> String { 19 + // Strip any parameters from the MIME type (e.g., "image/jpeg; quality=90" -> "image/jpeg") 20 + mime_type.split(';').next().unwrap_or(mime_type).trim().to_string() 21 + } 22 + 12 23 #[expect(clippy::single_option_map, reason = "Option::map() is clearer than and_then() for simple transformations")] 13 - pub fn blob_ref(blob: Option<Blob>) -> Option<String> { 14 - blob.map(|blob| blob.cid.to_string()) 24 + pub fn blob_ref(blob: Option<&Blob>) -> Option<String> { 25 + blob.map(|blob| blob.cid().to_string()) 15 26 } 16 27 17 28 /// Convert a Blob to CID bytes (32-byte digest) for database storage 18 - pub fn blob_to_cid_bytes(blob: Option<Blob>) -> Option<Vec<u8>> { 19 - blob.map(|blob| { 20 - let cid_bytes = blob.cid.to_bytes(); 21 - parakeet_db::cid_util::cid_to_digest(&cid_bytes) 22 - .expect("Blob CID must be valid AT Protocol CID") 23 - .to_vec() 29 + pub fn blob_to_cid_bytes(blob: Option<&Blob>) -> Option<Vec<u8>> { 30 + blob.and_then(|blob| { 31 + parakeet_db::cid_util::blob_to_cid_bytes(Some(blob)) 24 32 }) 25 33 } 26 34 27 35 pub fn strongref_to_parts(strongref: Option<&StrongRef>) -> (Option<String>, Option<String>) { 28 - strongref 29 - .map(|sr| (sr.uri.clone(), sr.cid.to_string())) 30 - .unzip() 36 + parakeet_db::cid_util::strongref_to_parts(strongref) 31 37 } 32 38 33 39 /// Convert a StrongRef to (URI, CID digest bytes) for pending linkage 34 40 pub fn strongref_to_parts_with_digest( 35 41 strongref: Option<&StrongRef>, 36 42 ) -> (Option<String>, Option<Vec<u8>>) { 37 - strongref 38 - .map(|sr| { 39 - let cid_bytes = sr.cid.to_bytes(); 40 - let cid_digest = parakeet_db::cid_util::cid_to_digest(&cid_bytes) 41 - .expect("StrongRef CID must be valid AT Protocol CID") 42 - .to_vec(); 43 - (sr.uri.clone(), cid_digest) 44 - }) 45 - .unzip() 43 + parakeet_db::cid_util::strongref_to_parts_with_digest(strongref) 46 44 } 47 45 48 46 pub fn at_uri_is_by(uri: &str, did: &str) -> bool { ··· 72 70 let (mentions, tags) = from 73 71 .iter() 74 72 .flat_map(|v| { 75 - v.features.iter().map(|facet| match facet { 76 - FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None), 77 - FacetOuter::Bsky(Facet::Tag { tag }) => (None, Some(tag)), 78 - _ => (None, None), 73 + v.features.iter().filter_map(|feature| { 74 + match feature { 75 + FacetFeaturesItem::Mention(mention) => Some((Some(mention.did.to_string()), None)), 76 + FacetFeaturesItem::Tag(tag) => Some((None, Some(tag.tag.to_string()))), 77 + _ => None, 78 + } 79 79 }) 80 80 }) 81 81 .unzip::<_, _, Vec<_>, Vec<_>>(); 82 82 83 - let mentions = mentions.into_iter().flatten().cloned().collect(); 84 - let tags = tags.into_iter().flatten().cloned().collect(); 83 + let mentions = mentions.into_iter().flatten().collect(); 84 + let tags = tags.into_iter().flatten().collect(); 85 85 86 86 (mentions, tags) 87 87 } ··· 98 98 } 99 99 } 100 100 101 - /// Build a User-Agent string with optional contact information 102 - pub fn build_user_agent(contact: Option<&String>) -> String { 103 - contact.map_or_else( 104 - || format!("Parakeet {}", env!("CARGO_PKG_VERSION")), 105 - |contact| format!("Parakeet {} ({contact})", env!("CARGO_PKG_VERSION")), 106 - ) 101 + pub fn deserialize_optional_blob<'de, D>(deserializer: D) -> Result<Option<Blob<'static>>, D::Error> 102 + where 103 + D: Deserializer<'de>, 104 + { 105 + use jacquard_common::IntoStatic; 106 + let opt: Option<Blob<'_>> = Option::deserialize(deserializer)?; 107 + Ok(opt.map(|b| b.into_static())) 107 108 } 108 109 109 - /// Strip parameters from a MIME type string 110 - /// 111 - /// MIME types may include parameters like `image/jpeg; charset=UTF-8`. 112 - /// This function strips everything after the first semicolon to get the base MIME type. 113 - /// 114 - /// # Examples 115 - /// ``` 116 - /// # fn strip_mime_params(mime_type: &str) -> String { 117 - /// # mime_type.split(';').next().unwrap_or(mime_type).trim().to_string() 118 - /// # } 119 - /// assert_eq!(strip_mime_params("image/jpeg; charset=UTF-8"), "image/jpeg"); 120 - /// assert_eq!(strip_mime_params("image/png"), "image/png"); 121 - /// assert_eq!(strip_mime_params("video/mp4; codecs=avc1"), "video/mp4"); 122 - /// ``` 123 - pub fn strip_mime_params(mime_type: &str) -> String { 124 - mime_type 125 - .split(';') 126 - .next() 127 - .unwrap_or(mime_type) 128 - .trim() 129 - .to_string() 130 - } 110 + pub fn deserialize_optional_strongref<'de, D>(deserializer: D) -> Result<Option<StrongRef<'static>>, D::Error> 111 + where 112 + D: Deserializer<'de>, 113 + { 114 + use jacquard_common::IntoStatic; 115 + let opt: Option<StrongRef<'_>> = Option::deserialize(deserializer)?; 116 + Ok(opt.map(|s| s.into_static())) 117 + }
+1 -2
consumer/src/workers/tap/processor.rs
··· 130 130 } 131 131 } 132 132 133 - // Deserialize using serde's tagged enum support 134 - let record_type: RecordTypes = serde_json::from_value(record_with_type)?; 133 + let record_type: RecordTypes = serde_json::from_str(&record_with_type.to_string())?; 135 134 Ok(record_type) 136 135 } 137 136
+48 -26
consumer/tests/actor_operations_test.rs
··· 12 12 AppBskyActorProfile, AppBskyActorStatus, AppBskyNotificationDeclaration, 13 13 ChatBskyActorDeclaration, 14 14 }; 15 - use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions, Status}; 16 - use lexica::Blob; 15 + use jacquard_api::app_bsky; 16 + use jacquard_common::types::blob::{Blob, MimeType}; 17 + use jacquard_common::types::cid::{Cid, CidLink}; 18 + use jacquard_common::IntoStatic; 17 19 use eyre::WrapErr; 18 20 19 21 // ============================================================================ ··· 41 43 display_name: Some("Test User".to_string()), 42 44 description: Some("This is a test profile".to_string()), 43 45 avatar: Some(Blob { 44 - mime_type: "image/jpeg".to_string(), 45 - cid: test_cid(), 46 + r#ref: CidLink(Cid::from(test_cid())), 47 + mime_type: MimeType::new("image/jpeg").unwrap(), 46 48 size: 1024, 47 - }), 49 + }.into_static()), 48 50 banner: Some(Blob { 49 - mime_type: "image/png".to_string(), 50 - cid: test_cid(), 51 + r#ref: CidLink(Cid::from(test_cid())), 52 + mime_type: MimeType::new("image/png").unwrap(), 51 53 size: 2048, 52 - }), 54 + }.into_static()), 53 55 labels: None, 54 56 joined_via_starter_pack: None, 55 57 pinned_post: None, ··· 251 253 252 254 // Create a status (simple case without embed) 253 255 let status = AppBskyActorStatus { 254 - status: Status::Live, 256 + status: app_bsky::actor::status::Status::new() 257 + .created_at(jacquard_common::types::string::Datetime::new(Utc::now().fixed_offset())) 258 + .status("live") 259 + .build() 260 + .into_static(), 255 261 duration_minutes: Some(60), 256 262 embed: None, 257 263 created_at: Utc::now(), ··· 302 308 303 309 // Create a status 304 310 let status = AppBskyActorStatus { 305 - status: Status::Live, 311 + status: app_bsky::actor::status::Status::new() 312 + .created_at(jacquard_common::types::string::Datetime::new(Utc::now().fixed_offset())) 313 + .status("live") 314 + .build() 315 + .into_static(), 306 316 duration_minutes: None, 307 317 embed: None, 308 318 created_at: Utc::now(), ··· 361 371 362 372 // Create a chat declaration 363 373 let chat_decl = ChatBskyActorDeclaration { 364 - allow_incoming: ChatAllowIncoming::Following, 374 + allow_incoming: "following".to_string(), 365 375 }; 366 376 367 377 let result = actor::chat_decl_upsert(&tx, actor_id, chat_decl).await; ··· 407 417 408 418 // Create initial chat declaration 409 419 let chat_decl1 = ChatBskyActorDeclaration { 410 - allow_incoming: ChatAllowIncoming::All, 420 + allow_incoming: "all".to_string(), 411 421 }; 412 422 413 423 actor::chat_decl_upsert(&tx, actor_id, chat_decl1) ··· 416 426 417 427 // Update chat declaration 418 428 let chat_decl2 = ChatBskyActorDeclaration { 419 - allow_incoming: ChatAllowIncoming::None, 429 + allow_incoming: "none".to_string(), 420 430 }; 421 431 422 432 let result = actor::chat_decl_upsert(&tx, actor_id, chat_decl2).await; ··· 462 472 463 473 // Create a chat declaration 464 474 let chat_decl = ChatBskyActorDeclaration { 465 - allow_incoming: ChatAllowIncoming::Following, 475 + allow_incoming: "following".to_string(), 466 476 }; 467 477 468 478 actor::chat_decl_upsert(&tx, actor_id, chat_decl) ··· 517 527 .wrap_err("Failed to ensure actor")?; 518 528 519 529 // Create a notification declaration 520 - let notif_decl = AppBskyNotificationDeclaration { 521 - allow_subscriptions: ProfileAllowSubscriptions::Mutuals, 522 - }; 530 + let notif_decl = AppBskyNotificationDeclaration( 531 + app_bsky::notification::declaration::Declaration::new() 532 + .allow_subscriptions("mutuals") 533 + .build() 534 + .into_static() 535 + ); 523 536 524 537 let result = actor::notif_decl_upsert(&tx, actor_id, notif_decl).await; 525 538 ··· 567 580 .wrap_err("Failed to ensure actor")?; 568 581 569 582 // Create initial notification declaration 570 - let notif_decl1 = AppBskyNotificationDeclaration { 571 - allow_subscriptions: ProfileAllowSubscriptions::Followers, 572 - }; 583 + let notif_decl1 = AppBskyNotificationDeclaration( 584 + app_bsky::notification::declaration::Declaration::new() 585 + .allow_subscriptions("followers") 586 + .build() 587 + .into_static() 588 + ); 573 589 574 590 actor::notif_decl_upsert(&tx, actor_id, notif_decl1) 575 591 .await 576 592 .wrap_err("Failed to insert initial notification declaration")?; 577 593 578 594 // Update notification declaration 579 - let notif_decl2 = AppBskyNotificationDeclaration { 580 - allow_subscriptions: ProfileAllowSubscriptions::None, 581 - }; 595 + let notif_decl2 = AppBskyNotificationDeclaration( 596 + app_bsky::notification::declaration::Declaration::new() 597 + .allow_subscriptions("none") 598 + .build() 599 + .into_static() 600 + ); 582 601 583 602 let result = actor::notif_decl_upsert(&tx, actor_id, notif_decl2).await; 584 603 ··· 626 645 .wrap_err("Failed to ensure actor")?; 627 646 628 647 // Create a notification declaration 629 - let notif_decl = AppBskyNotificationDeclaration { 630 - allow_subscriptions: ProfileAllowSubscriptions::Mutuals, 631 - }; 648 + let notif_decl = AppBskyNotificationDeclaration( 649 + app_bsky::notification::declaration::Declaration::new() 650 + .allow_subscriptions("mutuals") 651 + .build() 652 + .into_static() 653 + ); 632 654 633 655 actor::notif_decl_upsert(&tx, actor_id, notif_decl) 634 656 .await
+13 -10
consumer/tests/feedgen_labeler_operations_test.rs
··· 17 17 use consumer::db::operations::{feed, labeler}; 18 18 use consumer::types::records::{AppBskyFeedGenerator, AppBskyLabelerService}; 19 19 use eyre::WrapErr; 20 - use lexica::app_bsky::labeler::LabelerPolicy; 21 - use lexica::com_atproto::label::{Blurs, LabelDefaultSetting, LabelValueDefinition, Severity}; 22 - use lexica::com_atproto::moderation::{ReasonType, SubjectType}; 23 - use lexica::Blob; 20 + use jacquard_api::app_bsky; 21 + use jacquard_api::com_atproto; 22 + use jacquard_common::types::blob::{Blob, MimeType}; 23 + use jacquard_common::types::cid::{Cid, CidLink}; 24 + use jacquard_common::IntoStatic; 24 25 25 26 // ======================================== 26 27 // Feed Generator Tests ··· 49 50 description: Some("A feed about cats".to_string()), 50 51 description_facets: None, 51 52 avatar: Some(Blob { 52 - mime_type: "image/jpeg".to_string(), 53 - cid: test_cid(), 54 - size: 1024}), 53 + r#ref: CidLink(Cid::from(test_cid())), 54 + mime_type: MimeType::new("image/jpeg").unwrap(), 55 + size: 1024 56 + }.into_static()), 55 57 accepts_interactions: Some(true), 56 58 labels: None, 57 59 content_mode: Some("contentModeVideo".to_string()), ··· 156 158 description: Some("Updated description".to_string()), 157 159 description_facets: None, 158 160 avatar: Some(Blob { 159 - mime_type: "image/png".to_string(), 160 - cid: test_cid(), 161 - size: 2048}), 161 + r#ref: CidLink(Cid::from(test_cid())), 162 + mime_type: MimeType::new("image/png").unwrap(), 163 + size: 2048 164 + }.into_static()), 162 165 accepts_interactions: None, 163 166 labels: None, 164 167 content_mode: Some("contentModeVideo".to_string()),
+43 -28
consumer/tests/labels_test.rs
··· 7 7 use chrono::Utc; 8 8 use common::*; 9 9 use consumer::db::labels; 10 - use lexica::com_atproto::label::{SelfLabel, SelfLabels}; 10 + use jacquard_api::com_atproto::label::{SelfLabel, SelfLabels}; 11 + use jacquard_api::app_bsky; 12 + use jacquard_common::IntoStatic; 11 13 use parakeet_db::types::{ActorStatus, ActorSyncState}; 12 14 use eyre::WrapErr; 13 15 ··· 20 22 21 23 // Don't create an actor - test with non-existent DID 22 24 let rec = consumer::types::records::AppBskyLabelerService { 23 - policies: lexica::app_bsky::labeler::LabelerPolicy { 24 - label_values: vec!["test-label".to_string()], 25 - label_value_definitions: vec![], 26 - }, 25 + policies: app_bsky::labeler::LabelerPolicies::new() 26 + .label_values(vec![jacquard_api::com_atproto::label::LabelValue::Warn]) 27 + .build() 28 + .into_static(), 27 29 labels: None, 28 - reason_types: None, 29 - subject_types: None, 30 - subject_collections: None, 31 30 created_at: Utc::now(), 32 31 }; 33 32 ··· 65 64 .unwrap(); 66 65 67 66 // Create self-labels 68 - let self_labels = SelfLabels { 69 - values: vec![SelfLabel { 70 - val: "self-label-test".to_string(), 71 - }], 72 - }; 67 + let self_labels = SelfLabels::new() 68 + .values(vec![ 69 + SelfLabel { 70 + val: "self-label-test".into(), 71 + extra_data: Default::default(), 72 + }.into_static() 73 + ]) 74 + .build() 75 + .into_static(); 73 76 74 77 let test_cid = Some(test_cid()); 75 78 let at_uri = "at://did:plc:test_self_label/app.bsky.actor.profile/self"; ··· 99 102 let tx = conn.transaction().await.wrap_err("Failed to start transaction")?; 100 103 101 104 // Don't create an actor - test with non-existent DID 102 - let self_labels = SelfLabels { 103 - values: vec![SelfLabel { 104 - val: "test-label".to_string(), 105 - }], 106 - }; 105 + let self_labels = SelfLabels::new() 106 + .values(vec![ 107 + SelfLabel { 108 + val: "test-label".into(), 109 + extra_data: Default::default(), 110 + }.into_static() 111 + ]) 112 + .build() 113 + .into_static(); 107 114 108 115 let result = labels::maintain_self_labels( 109 116 &tx, ··· 148 155 let at_uri = "at://did:plc:test_update/app.bsky.actor.profile/self"; 149 156 150 157 // First add a label 151 - let self_labels = SelfLabels { 152 - values: vec![SelfLabel { 153 - val: "old-label".to_string(), 154 - }], 155 - }; 158 + let self_labels = SelfLabels::new() 159 + .values(vec![ 160 + SelfLabel { 161 + val: "old-label".into(), 162 + extra_data: Default::default(), 163 + }.into_static() 164 + ]) 165 + .build() 166 + .into_static(); 156 167 157 168 labels::maintain_self_labels(&tx, "did:plc:test_update", None, at_uri, self_labels) 158 169 .await 159 170 .unwrap(); 160 171 161 172 // Now update with different labels (should delete the old one) 162 - let new_labels = SelfLabels { 163 - values: vec![SelfLabel { 164 - val: "new-label".to_string(), 165 - }], 166 - }; 173 + let new_labels = SelfLabels::new() 174 + .values(vec![ 175 + SelfLabel { 176 + val: "new-label".into(), 177 + extra_data: Default::default(), 178 + }.into_static() 179 + ]) 180 + .build() 181 + .into_static(); 167 182 168 183 let result = 169 184 labels::maintain_self_labels(&tx, "did:plc:test_update", None, at_uri, new_labels).await;