1//! Event processing logic for Blahg ATProtocol events.
2//!
3//! Processes different types of ATProtocol events:
4//! - Blog post records (tools.smokesignal.blahg.content.post)
5//! - Feed posts that reference blog content (app.bsky.feed.post)
6//! - Likes on blog posts (app.bsky.feed.like, community.lexicon.interaction.like)
7
8use std::collections::{HashMap, HashSet};
9use std::str::FromStr;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13use crate::config::Config;
14use crate::consumer::{BlahgEvent, BlahgEventReceiver};
15use crate::errors::{BlahgError, Result};
16use crate::identity::CachingIdentityResolver;
17use crate::lexicon::PostRecord;
18use crate::storage::{ContentStorage, Post, PostReference, Storage};
19use atproto_client::com::atproto::repo::get_blob;
20use atproto_record::aturi::ATURI;
21use chrono::Utc;
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24use slugify::slugify;
25use tracing::{error, info};
26
27/// Strong reference to another record (used in posts and likes)
28#[derive(Debug, Deserialize, Serialize, Default)]
29struct StrongRef {
30 #[serde(rename = "$type")]
31 type_: Option<String>,
32 uri: String,
33 cid: String,
34}
35
36/// app.bsky.feed.post record structure
37#[derive(Debug, Deserialize)]
38struct FeedPostRecord {
39 #[serde(rename = "$type")]
40 type_: Option<String>,
41
42 #[serde(default)]
43 facets: Vec<Facet>,
44
45 #[serde(default)]
46 embed: Option<Embed>,
47
48 #[serde(default)]
49 reply: Option<Reply>,
50}
51
52/// Facet in a feed post (for link detection)
53#[derive(Debug, Deserialize)]
54struct Facet {
55 features: Vec<Feature>,
56}
57
58/// Feature in a facet
59#[derive(Debug, Deserialize)]
60struct Feature {
61 #[serde(rename = "$type")]
62 type_: String,
63 uri: Option<String>,
64}
65
66/// Embed in a feed post
67#[derive(Debug, Deserialize)]
68struct Embed {
69 #[serde(rename = "$type")]
70 type_: String,
71 external: Option<External>,
72 record: Option<StrongRef>,
73}
74
75/// External embed
76#[derive(Debug, Deserialize)]
77struct External {
78 uri: String,
79}
80
81/// Reply structure
82#[derive(Debug, Deserialize)]
83struct Reply {
84 root: StrongRef,
85 parent: StrongRef,
86}
87
88/// Like record structure (both app.bsky.feed.like and community.lexicon.interaction.like)
89#[derive(Debug, Deserialize)]
90struct LikeRecord {
91 subject: StrongRef,
92}
93
94/// Background processor for Blahg events
95pub struct EventProcessor {
96 storage: Arc<dyn Storage>,
97 content_storage: Arc<dyn ContentStorage>,
98 config: Arc<Config>,
99 identity_resolver: CachingIdentityResolver<dyn Storage>,
100 http_client: reqwest::Client,
101 post_prefix_cache: RwLock<Option<HashMap<String, String>>>,
102}
103
104impl EventProcessor {
105 /// Create a new event processor with the required dependencies.
106 pub fn new(
107 storage: Arc<dyn Storage>,
108 content_storage: Arc<dyn ContentStorage>,
109 config: Arc<Config>,
110 identity_resolver: CachingIdentityResolver<dyn Storage>,
111 http_client: reqwest::Client,
112 ) -> Self {
113 Self {
114 storage,
115 content_storage,
116 config,
117 identity_resolver,
118 http_client,
119 post_prefix_cache: RwLock::new(None),
120 }
121 }
122
123 /// Start processing events from the queue
124 pub async fn start_processing(&self, mut event_receiver: BlahgEventReceiver) -> Result<()> {
125 info!("Event processor started");
126
127 while let Some(event) = event_receiver.recv().await {
128 match &event {
129 BlahgEvent::Commit {
130 did,
131 collection,
132 rkey,
133 cid,
134 record,
135 } => {
136 if let Err(e) = self.handle_commit(did, collection, rkey, cid, record).await {
137 error!("Failed to process commit event: {}", e);
138 }
139 }
140 BlahgEvent::Delete {
141 did,
142 collection,
143 rkey,
144 } => {
145 if let Err(e) = self.handle_delete(did, collection, rkey).await {
146 error!("Failed to process delete event: {}", e);
147 }
148 }
149 }
150 }
151
152 info!("Event processor finished");
153 Ok(())
154 }
155
156 async fn handle_commit(
157 &self,
158 did: &str,
159 collection: &str,
160 rkey: &str,
161 cid: &str,
162 record: &Value,
163 ) -> Result<()> {
164 // info!("Processing commit: {} {} for {}", collection, rkey, did);
165
166 match collection {
167 "tools.smokesignal.blahg.content.post" => {
168 self.handle_blog_post_commit(did, rkey, cid, record).await
169 }
170 "app.bsky.feed.post" => self.handle_feed_post_commit(did, rkey, cid, record).await,
171 "app.bsky.feed.like" | "community.lexicon.interaction.like" => {
172 self.handle_like_commit(did, collection, rkey, cid, record)
173 .await
174 }
175 _ => {
176 // Unknown collection type, ignore
177 Ok(())
178 }
179 }
180 }
181
182 async fn handle_delete(&self, did: &str, collection: &str, rkey: &str) -> Result<()> {
183 let aturi = format!("at://{}/{}/{}", did, collection, rkey);
184
185 match collection {
186 "tools.smokesignal.blahg.content.post" => {
187 if (self.storage.delete_post(&aturi).await?).is_some() {
188 info!("Successfully deleted post: {}", aturi);
189 }
190 }
191 "app.bsky.feed.post" | "app.bsky.feed.like" | "community.lexicon.interaction.like" => {
192 self.storage.delete_post_reference(&aturi).await?;
193 }
194 _ => {
195 // Unknown collection type, ignore
196 }
197 }
198
199 Ok(())
200 }
201
202 async fn handle_blog_post_commit(
203 &self,
204 did: &str,
205 rkey: &str,
206 cid: &str,
207 record: &Value,
208 ) -> Result<()> {
209 // Check if the author matches the configured author
210 if self.config.author != did {
211 return Ok(());
212 }
213
214 let aturi = format!("at://{}/tools.smokesignal.blahg.content.post/{}", did, rkey);
215
216 // Parse the post record
217 let post_record: PostRecord = serde_json::from_value(record.clone())?;
218
219 // Resolve the DID to get PDS endpoint
220 let document = self.identity_resolver.resolve(did).await?;
221 let pds_endpoints = document.pds_endpoints();
222 let pds_endpoint =
223 pds_endpoints
224 .first()
225 .ok_or_else(|| BlahgError::ProcessIdentityResolutionFailed {
226 did: did.to_string(),
227 details: "No PDS endpoint found in DID document".to_string(),
228 })?;
229
230 // Download and store the content blob
231 let content_cid = post_record.content.r#ref.link.clone();
232 if !self.content_storage.content_exists(&content_cid).await? {
233 let content_bytes =
234 get_blob(&self.http_client, pds_endpoint, did, &content_cid).await?;
235 self.content_storage
236 .write_content(&content_cid, &content_bytes)
237 .await?;
238 }
239
240 // Download and store attachment blobs
241 for attachment in &post_record.attachments {
242 let attachment_cid = attachment.content.r#ref.link.clone();
243 if !self.content_storage.content_exists(&attachment_cid).await? {
244 let attachment_bytes =
245 get_blob(&self.http_client, pds_endpoint, did, &attachment_cid).await?;
246 self.content_storage
247 .write_content(&attachment_cid, &attachment_bytes)
248 .await?;
249 }
250 }
251
252 // Generate slug from title
253 let slug = slugify!(&post_record.title);
254
255 // Create post record
256 let post = Post {
257 aturi: aturi.to_string(),
258 cid: cid.to_string(),
259 title: post_record.title.clone(),
260 slug,
261 content: content_cid,
262 record_key: rkey.to_string(),
263 created_at: post_record.published_at,
264 updated_at: Utc::now(),
265 record: record.clone(),
266 };
267
268 // Store post
269 self.storage.upsert_post(&post).await?;
270
271 // Invalidate the post prefix cache
272 {
273 let mut guard = self.post_prefix_cache.write().await;
274 *guard = None;
275 }
276
277 info!("Successfully processed blog post: {}", aturi);
278 Ok(())
279 }
280
281 async fn handle_feed_post_commit(
282 &self,
283 did: &str,
284 rkey: &str,
285 cid: &str,
286 record: &Value,
287 ) -> Result<()> {
288 let aturi = format!("at://{}/app.bsky.feed.post/{}", did, rkey);
289
290 // Parse the feed post record
291 let feed_post: FeedPostRecord = match serde_json::from_value(record.clone()) {
292 Ok(post) => post,
293 Err(_) => return Ok(()), // Invalid record format, skip
294 };
295
296 // Get the post prefix lookup for URL matching
297 let post_prefix_lookup = self.post_prefix_lookup().await?;
298 let mut referenced_post_aturis = HashSet::new();
299
300 // Check facets for post URL prefixes
301 for facet in &feed_post.facets {
302 for feature in &facet.features {
303 if feature.type_ == "app.bsky.richtext.facet#link" {
304 if let Some(uri) = &feature.uri {
305 // Check if this URI starts with any of the post prefix keys
306 for (prefix, post_aturi) in &post_prefix_lookup {
307 if uri.starts_with(prefix) {
308 referenced_post_aturis.insert(post_aturi.clone());
309 break;
310 }
311 }
312 }
313 }
314 }
315 }
316
317 // Check embed for post URL prefixes and AT-URIs
318 if let Some(embed) = &feed_post.embed {
319 if embed.type_ == "app.bsky.embed.external" {
320 if let Some(external) = &embed.external {
321 // Check if this external URI starts with any of the post prefix keys
322 for (prefix, post_aturi) in &post_prefix_lookup {
323 if external.uri.starts_with(prefix) {
324 referenced_post_aturis.insert(post_aturi.clone());
325 break;
326 }
327 }
328 }
329 }
330 // Check for embedded records from known author
331 if embed.type_ == "app.bsky.embed.record" {
332 if let Some(record_ref) = &embed.record {
333 // Check if the record URI is in our post lookup values
334 if post_prefix_lookup.values().any(|v| v == &record_ref.uri) {
335 referenced_post_aturis.insert(record_ref.uri.clone());
336 }
337 }
338 }
339 }
340
341 // Check reply for known author posts
342 if let Some(reply) = &feed_post.reply {
343 if post_prefix_lookup.values().any(|v| v == &reply.root.uri) {
344 referenced_post_aturis.insert(reply.root.uri.clone());
345 }
346 if post_prefix_lookup.values().any(|v| v == &reply.parent.uri) {
347 referenced_post_aturis.insert(reply.parent.uri.clone());
348 }
349 }
350
351 // Store references for each referenced post
352 for post_aturi in referenced_post_aturis {
353 let post_reference = PostReference {
354 aturi: aturi.to_string(),
355 cid: cid.to_string(),
356 did: did.to_string(),
357 collection: "app.bsky.feed.post".to_string(),
358 post_aturi: post_aturi.clone(),
359 discovered_at: Utc::now(),
360 record: record.clone(),
361 };
362
363 self.storage.upsert_post_reference(&post_reference).await?;
364 info!("Stored feed post reference: {} -> {}", aturi, post_aturi);
365 }
366
367 Ok(())
368 }
369
370 async fn handle_like_commit(
371 &self,
372 did: &str,
373 collection: &str,
374 rkey: &str,
375 cid: &str,
376 record: &Value,
377 ) -> Result<()> {
378 let aturi = format!("at://{}/{}/{}", did, collection, rkey);
379
380 // Parse the like record
381 let like_record: LikeRecord = match serde_json::from_value(record.clone()) {
382 Ok(like) => like,
383 Err(_) => return Ok(()), // Invalid record format, skip
384 };
385
386 // Get the post prefix lookup for AT-URI matching
387 let post_prefix_lookup = self.post_prefix_lookup().await?;
388
389 // Check if the subject is a known author post
390 if post_prefix_lookup
391 .values()
392 .any(|v| v == &like_record.subject.uri)
393 {
394 let post_reference = PostReference {
395 aturi: aturi.to_string(),
396 cid: cid.to_string(),
397 did: did.to_string(),
398 collection: collection.to_string(),
399 post_aturi: like_record.subject.uri.clone(),
400 discovered_at: Utc::now(),
401 record: record.clone(),
402 };
403
404 self.storage.upsert_post_reference(&post_reference).await?;
405 info!(
406 "Stored like reference: {} -> {}",
407 aturi, like_record.subject.uri
408 );
409 }
410
411 Ok(())
412 }
413
414 /// Creates a post prefix lookup hashmap with external URL prefixes mapped to AT-URIs
415 async fn post_prefix_lookup(&self) -> Result<HashMap<String, String>> {
416 // Check if we have a cached value
417 {
418 let guard = self.post_prefix_cache.read().await;
419 if let Some(cached) = guard.as_ref() {
420 return Ok(cached.clone());
421 }
422 }
423
424 let posts = self.storage.get_posts().await?;
425 let mut lookup = HashMap::new();
426
427 for post in posts {
428 // Extract the record key from the AT-URI
429 // AT-URI format: at://did/tools.smokesignal.blahg.content.post/rkey
430 if let Ok(aturi) = ATURI::from_str(&post.aturi) {
431 let record_key = aturi.record_key;
432 let key = format!("{}/posts/{}-", self.config.external_base, record_key);
433 lookup.insert(key, post.aturi);
434 }
435 }
436
437 // Cache the result
438 {
439 let mut guard = self.post_prefix_cache.write().await;
440 *guard = Some(lookup.clone());
441 }
442
443 Ok(lookup)
444 }
445}