Rust AppView - highly experimental!
at experiments 149 lines 4.8 kB view raw
1use super::types::{TapAction, TapEvent, TapRecord}; 2use crate::core::actor_store::ActorIdStore; 3use crate::core::{ActorBackend, Event, StorageBackend, StorageError}; 4use crate::records::{Follow, Like, Post, Profile, Repost}; 5use async_trait::async_trait; 6use std::sync::Arc; 7use tokio::sync::mpsc; 8use tokio::task::JoinHandle; 9use tracing::{error, info}; 10 11pub trait FromTapRecord: Sized { 12 fn from_tap_record(record: &TapRecord, actor_id: i64) -> Result<Self, StorageError>; 13} 14 15#[async_trait] 16pub trait DatabaseWritable { 17 async fn write_to_db<DB: StorageBackend + ?Sized>(&self, db: &DB) -> Result<(), StorageError>; 18} 19 20const COLLECTIONS: &[&str] = &[ 21 "app.bsky.feed.post", 22 "app.bsky.actor.profile", 23 "app.bsky.graph.follow", 24 "app.bsky.feed.like", 25 "app.bsky.feed.repost", 26]; 27 28macro_rules! process_collection { 29 ($event:expr, $db:expr, $store:expr) => { 30 match $event { 31 TapEvent::Record { record, .. } => match record.collection.as_str() { 32 "app.bsky.feed.post" => process_record::<Post, _>($event, $db, $store).await, 33 "app.bsky.actor.profile" => process_record::<Profile, _>($event, $db, $store).await, 34 "app.bsky.graph.follow" => process_record::<Follow, _>($event, $db, $store).await, 35 "app.bsky.feed.like" => process_record::<Like, _>($event, $db, $store).await, 36 "app.bsky.feed.repost" => process_record::<Repost, _>($event, $db, $store).await, 37 _ => Ok(()), 38 }, 39 _ => Ok(()), 40 } 41 }; 42} 43 44pub fn spawn_worker<AB: ActorBackend + 'static>( 45 collection: &'static str, 46 mut rx: mpsc::Receiver<TapEvent>, 47 db: Arc<dyn StorageBackend>, 48 actor_store: Arc<ActorIdStore<AB>>, 49) -> JoinHandle<()> { 50 tokio::spawn(async move { 51 info!("Worker started for collection: {}", collection); 52 while let Some(event) = rx.recv().await { 53 if let Err(e) = process_collection!(&event, db.as_ref(), &actor_store) { 54 error!( 55 "Failed to process {} event {}: {}", 56 collection, 57 event.id(), 58 e 59 ); 60 } 61 } 62 info!("Worker stopped for collection: {}", collection); 63 }) 64} 65 66async fn process_record<T, AB>( 67 event: &TapEvent, 68 db: &dyn StorageBackend, 69 actor_store: &Arc<ActorIdStore<AB>>, 70) -> Result<(), StorageError> 71where 72 T: FromTapRecord + DatabaseWritable + Send + 'static, 73 AB: ActorBackend, 74{ 75 let TapEvent::Record { record, .. } = event else { 76 return Ok(()); 77 }; 78 match record.action { 79 TapAction::Create | TapAction::Update => { 80 T::from_tap_record(record, actor_store.get(&record.did).await?)? 81 .write_to_db(db) 82 .await 83 } 84 TapAction::Delete => { 85 db.delete_record(&format!( 86 "at://{}/{}/{}", 87 record.did, record.collection, record.rkey 88 )) 89 .await 90 } 91 } 92} 93 94pub struct Dispatcher { 95 channels: std::collections::HashMap<String, mpsc::Sender<TapEvent>>, 96 workers: Vec<JoinHandle<()>>, 97} 98 99impl Dispatcher { 100 pub fn new<AB: ActorBackend + 'static>( 101 db: Arc<dyn StorageBackend>, 102 actor_store: Arc<ActorIdStore<AB>>, 103 channel_size: usize, 104 ) -> Self { 105 let (channels, workers) = COLLECTIONS.iter().fold( 106 (std::collections::HashMap::new(), Vec::new()), 107 |(mut ch, mut w), c| { 108 let (tx, rx) = mpsc::channel(channel_size); 109 ch.insert(c.to_string(), tx); 110 w.push(spawn_worker(c, rx, db.clone(), actor_store.clone())); 111 (ch, w) 112 }, 113 ); 114 Self { channels, workers } 115 } 116 117 pub async fn dispatch(&self, event: TapEvent) -> Result<(), StorageError> { 118 let TapEvent::Record { ref record, .. } = event else { 119 return Ok(()); 120 }; 121 if let Some(tx) = self.channels.get(&record.collection) { 122 tx.send(event) 123 .await 124 .map_err(|_| StorageError::Query("Worker channel closed".into()))?; 125 } 126 Ok(()) 127 } 128 129 pub async fn shutdown(self) { 130 drop(self.channels); 131 for worker in self.workers { 132 let _ = worker.await; 133 } 134 } 135} 136 137pub struct EventProcessor<DB: StorageBackend, AB: ActorBackend> { 138 db: Arc<DB>, 139 actor_store: Arc<ActorIdStore<AB>>, 140} 141 142impl<DB: StorageBackend, AB: ActorBackend> EventProcessor<DB, AB> { 143 pub fn new(db: Arc<DB>, actor_store: Arc<ActorIdStore<AB>>) -> Self { 144 Self { db, actor_store } 145 } 146 pub async fn process_event(&self, event: &TapEvent) -> Result<(), StorageError> { 147 process_collection!(event, self.db.as_ref(), &self.actor_store) 148 } 149}