//! Event processing logic for Blahg ATProtocol events. //! //! Processes different types of ATProtocol events: //! - Blog post records (tools.smokesignal.blahg.content.post) //! - Feed posts that reference blog content (app.bsky.feed.post) //! - Likes on blog posts (app.bsky.feed.like, community.lexicon.interaction.like) use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; use tokio::sync::RwLock; use crate::config::Config; use crate::consumer::{BlahgEvent, BlahgEventReceiver}; use crate::errors::{BlahgError, Result}; use crate::identity::CachingIdentityResolver; use crate::lexicon::PostRecord; use crate::storage::{ContentStorage, Post, PostReference, Storage}; use atproto_client::com::atproto::repo::get_blob; use atproto_record::aturi::ATURI; use chrono::Utc; use serde::{Deserialize, Serialize}; use serde_json::Value; use slugify::slugify; use tracing::{error, info}; /// Strong reference to another record (used in posts and likes) #[derive(Debug, Deserialize, Serialize, Default)] struct StrongRef { #[serde(rename = "$type")] type_: Option, uri: String, cid: String, } /// app.bsky.feed.post record structure #[derive(Debug, Deserialize)] struct FeedPostRecord { #[serde(rename = "$type")] type_: Option, #[serde(default)] facets: Vec, #[serde(default)] embed: Option, #[serde(default)] reply: Option, } /// Facet in a feed post (for link detection) #[derive(Debug, Deserialize)] struct Facet { features: Vec, } /// Feature in a facet #[derive(Debug, Deserialize)] struct Feature { #[serde(rename = "$type")] type_: String, uri: Option, } /// Embed in a feed post #[derive(Debug, Deserialize)] struct Embed { #[serde(rename = "$type")] type_: String, external: Option, record: Option, } /// External embed #[derive(Debug, Deserialize)] struct External { uri: String, } /// Reply structure #[derive(Debug, Deserialize)] struct Reply { root: StrongRef, parent: StrongRef, } /// Like record structure (both app.bsky.feed.like and community.lexicon.interaction.like) #[derive(Debug, Deserialize)] struct LikeRecord { subject: StrongRef, } /// Background processor for Blahg events pub struct EventProcessor { storage: Arc, content_storage: Arc, config: Arc, identity_resolver: CachingIdentityResolver, http_client: reqwest::Client, post_prefix_cache: RwLock>>, } impl EventProcessor { /// Create a new event processor with the required dependencies. pub fn new( storage: Arc, content_storage: Arc, config: Arc, identity_resolver: CachingIdentityResolver, http_client: reqwest::Client, ) -> Self { Self { storage, content_storage, config, identity_resolver, http_client, post_prefix_cache: RwLock::new(None), } } /// Start processing events from the queue pub async fn start_processing(&self, mut event_receiver: BlahgEventReceiver) -> Result<()> { info!("Event processor started"); while let Some(event) = event_receiver.recv().await { match &event { BlahgEvent::Commit { did, collection, rkey, cid, record, } => { if let Err(e) = self.handle_commit(did, collection, rkey, cid, record).await { error!("Failed to process commit event: {}", e); } } BlahgEvent::Delete { did, collection, rkey, } => { if let Err(e) = self.handle_delete(did, collection, rkey).await { error!("Failed to process delete event: {}", e); } } } } info!("Event processor finished"); Ok(()) } async fn handle_commit( &self, did: &str, collection: &str, rkey: &str, cid: &str, record: &Value, ) -> Result<()> { // info!("Processing commit: {} {} for {}", collection, rkey, did); match collection { "tools.smokesignal.blahg.content.post" => { self.handle_blog_post_commit(did, rkey, cid, record).await } "app.bsky.feed.post" => self.handle_feed_post_commit(did, rkey, cid, record).await, "app.bsky.feed.like" | "community.lexicon.interaction.like" => { self.handle_like_commit(did, collection, rkey, cid, record) .await } _ => { // Unknown collection type, ignore Ok(()) } } } async fn handle_delete(&self, did: &str, collection: &str, rkey: &str) -> Result<()> { let aturi = format!("at://{}/{}/{}", did, collection, rkey); match collection { "tools.smokesignal.blahg.content.post" => { if (self.storage.delete_post(&aturi).await?).is_some() { info!("Successfully deleted post: {}", aturi); } } "app.bsky.feed.post" | "app.bsky.feed.like" | "community.lexicon.interaction.like" => { self.storage.delete_post_reference(&aturi).await?; } _ => { // Unknown collection type, ignore } } Ok(()) } async fn handle_blog_post_commit( &self, did: &str, rkey: &str, cid: &str, record: &Value, ) -> Result<()> { // Check if the author matches the configured author if self.config.author != did { return Ok(()); } let aturi = format!("at://{}/tools.smokesignal.blahg.content.post/{}", did, rkey); // Parse the post record let post_record: PostRecord = serde_json::from_value(record.clone())?; // Resolve the DID to get PDS endpoint let document = self.identity_resolver.resolve(did).await?; let pds_endpoints = document.pds_endpoints(); let pds_endpoint = pds_endpoints .first() .ok_or_else(|| BlahgError::ProcessIdentityResolutionFailed { did: did.to_string(), details: "No PDS endpoint found in DID document".to_string(), })?; // Download and store the content blob let content_cid = post_record.content.r#ref.link.clone(); if !self.content_storage.content_exists(&content_cid).await? { let content_bytes = get_blob(&self.http_client, pds_endpoint, did, &content_cid).await?; self.content_storage .write_content(&content_cid, &content_bytes) .await?; } // Download and store attachment blobs for attachment in &post_record.attachments { let attachment_cid = attachment.content.r#ref.link.clone(); if !self.content_storage.content_exists(&attachment_cid).await? { let attachment_bytes = get_blob(&self.http_client, pds_endpoint, did, &attachment_cid).await?; self.content_storage .write_content(&attachment_cid, &attachment_bytes) .await?; } } // Generate slug from title let slug = slugify!(&post_record.title); // Create post record let post = Post { aturi: aturi.to_string(), cid: cid.to_string(), title: post_record.title.clone(), slug, content: content_cid, record_key: rkey.to_string(), created_at: post_record.published_at, updated_at: Utc::now(), record: record.clone(), }; // Store post self.storage.upsert_post(&post).await?; // Invalidate the post prefix cache { let mut guard = self.post_prefix_cache.write().await; *guard = None; } info!("Successfully processed blog post: {}", aturi); Ok(()) } async fn handle_feed_post_commit( &self, did: &str, rkey: &str, cid: &str, record: &Value, ) -> Result<()> { let aturi = format!("at://{}/app.bsky.feed.post/{}", did, rkey); // Parse the feed post record let feed_post: FeedPostRecord = match serde_json::from_value(record.clone()) { Ok(post) => post, Err(_) => return Ok(()), // Invalid record format, skip }; // Get the post prefix lookup for URL matching let post_prefix_lookup = self.post_prefix_lookup().await?; let mut referenced_post_aturis = HashSet::new(); // Check facets for post URL prefixes for facet in &feed_post.facets { for feature in &facet.features { if feature.type_ == "app.bsky.richtext.facet#link" { if let Some(uri) = &feature.uri { // Check if this URI starts with any of the post prefix keys for (prefix, post_aturi) in &post_prefix_lookup { if uri.starts_with(prefix) { referenced_post_aturis.insert(post_aturi.clone()); break; } } } } } } // Check embed for post URL prefixes and AT-URIs if let Some(embed) = &feed_post.embed { if embed.type_ == "app.bsky.embed.external" { if let Some(external) = &embed.external { // Check if this external URI starts with any of the post prefix keys for (prefix, post_aturi) in &post_prefix_lookup { if external.uri.starts_with(prefix) { referenced_post_aturis.insert(post_aturi.clone()); break; } } } } // Check for embedded records from known author if embed.type_ == "app.bsky.embed.record" { if let Some(record_ref) = &embed.record { // Check if the record URI is in our post lookup values if post_prefix_lookup.values().any(|v| v == &record_ref.uri) { referenced_post_aturis.insert(record_ref.uri.clone()); } } } } // Check reply for known author posts if let Some(reply) = &feed_post.reply { if post_prefix_lookup.values().any(|v| v == &reply.root.uri) { referenced_post_aturis.insert(reply.root.uri.clone()); } if post_prefix_lookup.values().any(|v| v == &reply.parent.uri) { referenced_post_aturis.insert(reply.parent.uri.clone()); } } // Store references for each referenced post for post_aturi in referenced_post_aturis { let post_reference = PostReference { aturi: aturi.to_string(), cid: cid.to_string(), did: did.to_string(), collection: "app.bsky.feed.post".to_string(), post_aturi: post_aturi.clone(), discovered_at: Utc::now(), record: record.clone(), }; self.storage.upsert_post_reference(&post_reference).await?; info!("Stored feed post reference: {} -> {}", aturi, post_aturi); } Ok(()) } async fn handle_like_commit( &self, did: &str, collection: &str, rkey: &str, cid: &str, record: &Value, ) -> Result<()> { let aturi = format!("at://{}/{}/{}", did, collection, rkey); // Parse the like record let like_record: LikeRecord = match serde_json::from_value(record.clone()) { Ok(like) => like, Err(_) => return Ok(()), // Invalid record format, skip }; // Get the post prefix lookup for AT-URI matching let post_prefix_lookup = self.post_prefix_lookup().await?; // Check if the subject is a known author post if post_prefix_lookup .values() .any(|v| v == &like_record.subject.uri) { let post_reference = PostReference { aturi: aturi.to_string(), cid: cid.to_string(), did: did.to_string(), collection: collection.to_string(), post_aturi: like_record.subject.uri.clone(), discovered_at: Utc::now(), record: record.clone(), }; self.storage.upsert_post_reference(&post_reference).await?; info!( "Stored like reference: {} -> {}", aturi, like_record.subject.uri ); } Ok(()) } /// Creates a post prefix lookup hashmap with external URL prefixes mapped to AT-URIs async fn post_prefix_lookup(&self) -> Result> { // Check if we have a cached value { let guard = self.post_prefix_cache.read().await; if let Some(cached) = guard.as_ref() { return Ok(cached.clone()); } } let posts = self.storage.get_posts().await?; let mut lookup = HashMap::new(); for post in posts { // Extract the record key from the AT-URI // AT-URI format: at://did/tools.smokesignal.blahg.content.post/rkey if let Ok(aturi) = ATURI::from_str(&post.aturi) { let record_key = aturi.record_key; let key = format!("{}/posts/{}-", self.config.external_base, record_key); lookup.insert(key, post.aturi); } } // Cache the result { let mut guard = self.post_prefix_cache.write().await; *guard = Some(lookup.clone()); } Ok(lookup) } }