this repo has no description
at main 445 lines 15 kB view raw
1//! Event processing logic for Blahg ATProtocol events. 2//! 3//! Processes different types of ATProtocol events: 4//! - Blog post records (tools.smokesignal.blahg.content.post) 5//! - Feed posts that reference blog content (app.bsky.feed.post) 6//! - Likes on blog posts (app.bsky.feed.like, community.lexicon.interaction.like) 7 8use std::collections::{HashMap, HashSet}; 9use std::str::FromStr; 10use std::sync::Arc; 11use tokio::sync::RwLock; 12 13use crate::config::Config; 14use crate::consumer::{BlahgEvent, BlahgEventReceiver}; 15use crate::errors::{BlahgError, Result}; 16use crate::identity::CachingIdentityResolver; 17use crate::lexicon::PostRecord; 18use crate::storage::{ContentStorage, Post, PostReference, Storage}; 19use atproto_client::com::atproto::repo::get_blob; 20use atproto_record::aturi::ATURI; 21use chrono::Utc; 22use serde::{Deserialize, Serialize}; 23use serde_json::Value; 24use slugify::slugify; 25use tracing::{error, info}; 26 27/// Strong reference to another record (used in posts and likes) 28#[derive(Debug, Deserialize, Serialize, Default)] 29struct StrongRef { 30 #[serde(rename = "$type")] 31 type_: Option<String>, 32 uri: String, 33 cid: String, 34} 35 36/// app.bsky.feed.post record structure 37#[derive(Debug, Deserialize)] 38struct FeedPostRecord { 39 #[serde(rename = "$type")] 40 type_: Option<String>, 41 42 #[serde(default)] 43 facets: Vec<Facet>, 44 45 #[serde(default)] 46 embed: Option<Embed>, 47 48 #[serde(default)] 49 reply: Option<Reply>, 50} 51 52/// Facet in a feed post (for link detection) 53#[derive(Debug, Deserialize)] 54struct Facet { 55 features: Vec<Feature>, 56} 57 58/// Feature in a facet 59#[derive(Debug, Deserialize)] 60struct Feature { 61 #[serde(rename = "$type")] 62 type_: String, 63 uri: Option<String>, 64} 65 66/// Embed in a feed post 67#[derive(Debug, Deserialize)] 68struct Embed { 69 #[serde(rename = "$type")] 70 type_: String, 71 external: Option<External>, 72 record: Option<StrongRef>, 73} 74 75/// External embed 76#[derive(Debug, Deserialize)] 77struct External { 78 uri: String, 79} 80 81/// Reply structure 82#[derive(Debug, Deserialize)] 83struct Reply { 84 root: StrongRef, 85 parent: StrongRef, 86} 87 88/// Like record structure (both app.bsky.feed.like and community.lexicon.interaction.like) 89#[derive(Debug, Deserialize)] 90struct LikeRecord { 91 subject: StrongRef, 92} 93 94/// Background processor for Blahg events 95pub struct EventProcessor { 96 storage: Arc<dyn Storage>, 97 content_storage: Arc<dyn ContentStorage>, 98 config: Arc<Config>, 99 identity_resolver: CachingIdentityResolver<dyn Storage>, 100 http_client: reqwest::Client, 101 post_prefix_cache: RwLock<Option<HashMap<String, String>>>, 102} 103 104impl EventProcessor { 105 /// Create a new event processor with the required dependencies. 106 pub fn new( 107 storage: Arc<dyn Storage>, 108 content_storage: Arc<dyn ContentStorage>, 109 config: Arc<Config>, 110 identity_resolver: CachingIdentityResolver<dyn Storage>, 111 http_client: reqwest::Client, 112 ) -> Self { 113 Self { 114 storage, 115 content_storage, 116 config, 117 identity_resolver, 118 http_client, 119 post_prefix_cache: RwLock::new(None), 120 } 121 } 122 123 /// Start processing events from the queue 124 pub async fn start_processing(&self, mut event_receiver: BlahgEventReceiver) -> Result<()> { 125 info!("Event processor started"); 126 127 while let Some(event) = event_receiver.recv().await { 128 match &event { 129 BlahgEvent::Commit { 130 did, 131 collection, 132 rkey, 133 cid, 134 record, 135 } => { 136 if let Err(e) = self.handle_commit(did, collection, rkey, cid, record).await { 137 error!("Failed to process commit event: {}", e); 138 } 139 } 140 BlahgEvent::Delete { 141 did, 142 collection, 143 rkey, 144 } => { 145 if let Err(e) = self.handle_delete(did, collection, rkey).await { 146 error!("Failed to process delete event: {}", e); 147 } 148 } 149 } 150 } 151 152 info!("Event processor finished"); 153 Ok(()) 154 } 155 156 async fn handle_commit( 157 &self, 158 did: &str, 159 collection: &str, 160 rkey: &str, 161 cid: &str, 162 record: &Value, 163 ) -> Result<()> { 164 // info!("Processing commit: {} {} for {}", collection, rkey, did); 165 166 match collection { 167 "tools.smokesignal.blahg.content.post" => { 168 self.handle_blog_post_commit(did, rkey, cid, record).await 169 } 170 "app.bsky.feed.post" => self.handle_feed_post_commit(did, rkey, cid, record).await, 171 "app.bsky.feed.like" | "community.lexicon.interaction.like" => { 172 self.handle_like_commit(did, collection, rkey, cid, record) 173 .await 174 } 175 _ => { 176 // Unknown collection type, ignore 177 Ok(()) 178 } 179 } 180 } 181 182 async fn handle_delete(&self, did: &str, collection: &str, rkey: &str) -> Result<()> { 183 let aturi = format!("at://{}/{}/{}", did, collection, rkey); 184 185 match collection { 186 "tools.smokesignal.blahg.content.post" => { 187 if (self.storage.delete_post(&aturi).await?).is_some() { 188 info!("Successfully deleted post: {}", aturi); 189 } 190 } 191 "app.bsky.feed.post" | "app.bsky.feed.like" | "community.lexicon.interaction.like" => { 192 self.storage.delete_post_reference(&aturi).await?; 193 } 194 _ => { 195 // Unknown collection type, ignore 196 } 197 } 198 199 Ok(()) 200 } 201 202 async fn handle_blog_post_commit( 203 &self, 204 did: &str, 205 rkey: &str, 206 cid: &str, 207 record: &Value, 208 ) -> Result<()> { 209 // Check if the author matches the configured author 210 if self.config.author != did { 211 return Ok(()); 212 } 213 214 let aturi = format!("at://{}/tools.smokesignal.blahg.content.post/{}", did, rkey); 215 216 // Parse the post record 217 let post_record: PostRecord = serde_json::from_value(record.clone())?; 218 219 // Resolve the DID to get PDS endpoint 220 let document = self.identity_resolver.resolve(did).await?; 221 let pds_endpoints = document.pds_endpoints(); 222 let pds_endpoint = 223 pds_endpoints 224 .first() 225 .ok_or_else(|| BlahgError::ProcessIdentityResolutionFailed { 226 did: did.to_string(), 227 details: "No PDS endpoint found in DID document".to_string(), 228 })?; 229 230 // Download and store the content blob 231 let content_cid = post_record.content.r#ref.link.clone(); 232 if !self.content_storage.content_exists(&content_cid).await? { 233 let content_bytes = 234 get_blob(&self.http_client, pds_endpoint, did, &content_cid).await?; 235 self.content_storage 236 .write_content(&content_cid, &content_bytes) 237 .await?; 238 } 239 240 // Download and store attachment blobs 241 for attachment in &post_record.attachments { 242 let attachment_cid = attachment.content.r#ref.link.clone(); 243 if !self.content_storage.content_exists(&attachment_cid).await? { 244 let attachment_bytes = 245 get_blob(&self.http_client, pds_endpoint, did, &attachment_cid).await?; 246 self.content_storage 247 .write_content(&attachment_cid, &attachment_bytes) 248 .await?; 249 } 250 } 251 252 // Generate slug from title 253 let slug = slugify!(&post_record.title); 254 255 // Create post record 256 let post = Post { 257 aturi: aturi.to_string(), 258 cid: cid.to_string(), 259 title: post_record.title.clone(), 260 slug, 261 content: content_cid, 262 record_key: rkey.to_string(), 263 created_at: post_record.published_at, 264 updated_at: Utc::now(), 265 record: record.clone(), 266 }; 267 268 // Store post 269 self.storage.upsert_post(&post).await?; 270 271 // Invalidate the post prefix cache 272 { 273 let mut guard = self.post_prefix_cache.write().await; 274 *guard = None; 275 } 276 277 info!("Successfully processed blog post: {}", aturi); 278 Ok(()) 279 } 280 281 async fn handle_feed_post_commit( 282 &self, 283 did: &str, 284 rkey: &str, 285 cid: &str, 286 record: &Value, 287 ) -> Result<()> { 288 let aturi = format!("at://{}/app.bsky.feed.post/{}", did, rkey); 289 290 // Parse the feed post record 291 let feed_post: FeedPostRecord = match serde_json::from_value(record.clone()) { 292 Ok(post) => post, 293 Err(_) => return Ok(()), // Invalid record format, skip 294 }; 295 296 // Get the post prefix lookup for URL matching 297 let post_prefix_lookup = self.post_prefix_lookup().await?; 298 let mut referenced_post_aturis = HashSet::new(); 299 300 // Check facets for post URL prefixes 301 for facet in &feed_post.facets { 302 for feature in &facet.features { 303 if feature.type_ == "app.bsky.richtext.facet#link" { 304 if let Some(uri) = &feature.uri { 305 // Check if this URI starts with any of the post prefix keys 306 for (prefix, post_aturi) in &post_prefix_lookup { 307 if uri.starts_with(prefix) { 308 referenced_post_aturis.insert(post_aturi.clone()); 309 break; 310 } 311 } 312 } 313 } 314 } 315 } 316 317 // Check embed for post URL prefixes and AT-URIs 318 if let Some(embed) = &feed_post.embed { 319 if embed.type_ == "app.bsky.embed.external" { 320 if let Some(external) = &embed.external { 321 // Check if this external URI starts with any of the post prefix keys 322 for (prefix, post_aturi) in &post_prefix_lookup { 323 if external.uri.starts_with(prefix) { 324 referenced_post_aturis.insert(post_aturi.clone()); 325 break; 326 } 327 } 328 } 329 } 330 // Check for embedded records from known author 331 if embed.type_ == "app.bsky.embed.record" { 332 if let Some(record_ref) = &embed.record { 333 // Check if the record URI is in our post lookup values 334 if post_prefix_lookup.values().any(|v| v == &record_ref.uri) { 335 referenced_post_aturis.insert(record_ref.uri.clone()); 336 } 337 } 338 } 339 } 340 341 // Check reply for known author posts 342 if let Some(reply) = &feed_post.reply { 343 if post_prefix_lookup.values().any(|v| v == &reply.root.uri) { 344 referenced_post_aturis.insert(reply.root.uri.clone()); 345 } 346 if post_prefix_lookup.values().any(|v| v == &reply.parent.uri) { 347 referenced_post_aturis.insert(reply.parent.uri.clone()); 348 } 349 } 350 351 // Store references for each referenced post 352 for post_aturi in referenced_post_aturis { 353 let post_reference = PostReference { 354 aturi: aturi.to_string(), 355 cid: cid.to_string(), 356 did: did.to_string(), 357 collection: "app.bsky.feed.post".to_string(), 358 post_aturi: post_aturi.clone(), 359 discovered_at: Utc::now(), 360 record: record.clone(), 361 }; 362 363 self.storage.upsert_post_reference(&post_reference).await?; 364 info!("Stored feed post reference: {} -> {}", aturi, post_aturi); 365 } 366 367 Ok(()) 368 } 369 370 async fn handle_like_commit( 371 &self, 372 did: &str, 373 collection: &str, 374 rkey: &str, 375 cid: &str, 376 record: &Value, 377 ) -> Result<()> { 378 let aturi = format!("at://{}/{}/{}", did, collection, rkey); 379 380 // Parse the like record 381 let like_record: LikeRecord = match serde_json::from_value(record.clone()) { 382 Ok(like) => like, 383 Err(_) => return Ok(()), // Invalid record format, skip 384 }; 385 386 // Get the post prefix lookup for AT-URI matching 387 let post_prefix_lookup = self.post_prefix_lookup().await?; 388 389 // Check if the subject is a known author post 390 if post_prefix_lookup 391 .values() 392 .any(|v| v == &like_record.subject.uri) 393 { 394 let post_reference = PostReference { 395 aturi: aturi.to_string(), 396 cid: cid.to_string(), 397 did: did.to_string(), 398 collection: collection.to_string(), 399 post_aturi: like_record.subject.uri.clone(), 400 discovered_at: Utc::now(), 401 record: record.clone(), 402 }; 403 404 self.storage.upsert_post_reference(&post_reference).await?; 405 info!( 406 "Stored like reference: {} -> {}", 407 aturi, like_record.subject.uri 408 ); 409 } 410 411 Ok(()) 412 } 413 414 /// Creates a post prefix lookup hashmap with external URL prefixes mapped to AT-URIs 415 async fn post_prefix_lookup(&self) -> Result<HashMap<String, String>> { 416 // Check if we have a cached value 417 { 418 let guard = self.post_prefix_cache.read().await; 419 if let Some(cached) = guard.as_ref() { 420 return Ok(cached.clone()); 421 } 422 } 423 424 let posts = self.storage.get_posts().await?; 425 let mut lookup = HashMap::new(); 426 427 for post in posts { 428 // Extract the record key from the AT-URI 429 // AT-URI format: at://did/tools.smokesignal.blahg.content.post/rkey 430 if let Ok(aturi) = ATURI::from_str(&post.aturi) { 431 let record_key = aturi.record_key; 432 let key = format!("{}/posts/{}-", self.config.external_base, record_key); 433 lookup.insert(key, post.aturi); 434 } 435 } 436 437 // Cache the result 438 { 439 let mut guard = self.post_prefix_cache.write().await; 440 *guard = Some(lookup.clone()); 441 } 442 443 Ok(lookup) 444 } 445}