use super::types::{TapAction, TapEvent, TapRecord}; use crate::core::actor_store::ActorIdStore; use crate::core::{ActorBackend, Event, StorageBackend, StorageError}; use crate::records::{Follow, Like, Post, Profile, Repost}; use async_trait::async_trait; use std::sync::Arc; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tracing::{error, info}; pub trait FromTapRecord: Sized { fn from_tap_record(record: &TapRecord, actor_id: i64) -> Result; } #[async_trait] pub trait DatabaseWritable { async fn write_to_db(&self, db: &DB) -> Result<(), StorageError>; } const COLLECTIONS: &[&str] = &[ "app.bsky.feed.post", "app.bsky.actor.profile", "app.bsky.graph.follow", "app.bsky.feed.like", "app.bsky.feed.repost", ]; macro_rules! process_collection { ($event:expr, $db:expr, $store:expr) => { match $event { TapEvent::Record { record, .. } => match record.collection.as_str() { "app.bsky.feed.post" => process_record::($event, $db, $store).await, "app.bsky.actor.profile" => process_record::($event, $db, $store).await, "app.bsky.graph.follow" => process_record::($event, $db, $store).await, "app.bsky.feed.like" => process_record::($event, $db, $store).await, "app.bsky.feed.repost" => process_record::($event, $db, $store).await, _ => Ok(()), }, _ => Ok(()), } }; } pub fn spawn_worker( collection: &'static str, mut rx: mpsc::Receiver, db: Arc, actor_store: Arc>, ) -> JoinHandle<()> { tokio::spawn(async move { info!("Worker started for collection: {}", collection); while let Some(event) = rx.recv().await { if let Err(e) = process_collection!(&event, db.as_ref(), &actor_store) { error!( "Failed to process {} event {}: {}", collection, event.id(), e ); } } info!("Worker stopped for collection: {}", collection); }) } async fn process_record( event: &TapEvent, db: &dyn StorageBackend, actor_store: &Arc>, ) -> Result<(), StorageError> where T: FromTapRecord + DatabaseWritable + Send + 'static, AB: ActorBackend, { let TapEvent::Record { record, .. } = event else { return Ok(()); }; match record.action { TapAction::Create | TapAction::Update => { T::from_tap_record(record, actor_store.get(&record.did).await?)? .write_to_db(db) .await } TapAction::Delete => { db.delete_record(&format!( "at://{}/{}/{}", record.did, record.collection, record.rkey )) .await } } } pub struct Dispatcher { channels: std::collections::HashMap>, workers: Vec>, } impl Dispatcher { pub fn new( db: Arc, actor_store: Arc>, channel_size: usize, ) -> Self { let (channels, workers) = COLLECTIONS.iter().fold( (std::collections::HashMap::new(), Vec::new()), |(mut ch, mut w), c| { let (tx, rx) = mpsc::channel(channel_size); ch.insert(c.to_string(), tx); w.push(spawn_worker(c, rx, db.clone(), actor_store.clone())); (ch, w) }, ); Self { channels, workers } } pub async fn dispatch(&self, event: TapEvent) -> Result<(), StorageError> { let TapEvent::Record { ref record, .. } = event else { return Ok(()); }; if let Some(tx) = self.channels.get(&record.collection) { tx.send(event) .await .map_err(|_| StorageError::Query("Worker channel closed".into()))?; } Ok(()) } pub async fn shutdown(self) { drop(self.channels); for worker in self.workers { let _ = worker.await; } } } pub struct EventProcessor { db: Arc, actor_store: Arc>, } impl EventProcessor { pub fn new(db: Arc, actor_store: Arc>) -> Self { Self { db, actor_store } } pub async fn process_event(&self, event: &TapEvent) -> Result<(), StorageError> { process_collection!(event, self.db.as_ref(), &self.actor_store) } }