forked from
parakeet.at/parakeet
Rust AppView - highly experimental!
1use super::types::{TapAction, TapEvent, TapRecord};
2use crate::core::actor_store::ActorIdStore;
3use crate::core::{ActorBackend, Event, StorageBackend, StorageError};
4use crate::records::{Follow, Like, Post, Profile, Repost};
5use async_trait::async_trait;
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio::task::JoinHandle;
9use tracing::{error, info};
10
11pub trait FromTapRecord: Sized {
12 fn from_tap_record(record: &TapRecord, actor_id: i64) -> Result<Self, StorageError>;
13}
14
15#[async_trait]
16pub trait DatabaseWritable {
17 async fn write_to_db<DB: StorageBackend + ?Sized>(&self, db: &DB) -> Result<(), StorageError>;
18}
19
20const COLLECTIONS: &[&str] = &[
21 "app.bsky.feed.post",
22 "app.bsky.actor.profile",
23 "app.bsky.graph.follow",
24 "app.bsky.feed.like",
25 "app.bsky.feed.repost",
26];
27
28macro_rules! process_collection {
29 ($event:expr, $db:expr, $store:expr) => {
30 match $event {
31 TapEvent::Record { record, .. } => match record.collection.as_str() {
32 "app.bsky.feed.post" => process_record::<Post, _>($event, $db, $store).await,
33 "app.bsky.actor.profile" => process_record::<Profile, _>($event, $db, $store).await,
34 "app.bsky.graph.follow" => process_record::<Follow, _>($event, $db, $store).await,
35 "app.bsky.feed.like" => process_record::<Like, _>($event, $db, $store).await,
36 "app.bsky.feed.repost" => process_record::<Repost, _>($event, $db, $store).await,
37 _ => Ok(()),
38 },
39 _ => Ok(()),
40 }
41 };
42}
43
44pub fn spawn_worker<AB: ActorBackend + 'static>(
45 collection: &'static str,
46 mut rx: mpsc::Receiver<TapEvent>,
47 db: Arc<dyn StorageBackend>,
48 actor_store: Arc<ActorIdStore<AB>>,
49) -> JoinHandle<()> {
50 tokio::spawn(async move {
51 info!("Worker started for collection: {}", collection);
52 while let Some(event) = rx.recv().await {
53 if let Err(e) = process_collection!(&event, db.as_ref(), &actor_store) {
54 error!(
55 "Failed to process {} event {}: {}",
56 collection,
57 event.id(),
58 e
59 );
60 }
61 }
62 info!("Worker stopped for collection: {}", collection);
63 })
64}
65
66async fn process_record<T, AB>(
67 event: &TapEvent,
68 db: &dyn StorageBackend,
69 actor_store: &Arc<ActorIdStore<AB>>,
70) -> Result<(), StorageError>
71where
72 T: FromTapRecord + DatabaseWritable + Send + 'static,
73 AB: ActorBackend,
74{
75 let TapEvent::Record { record, .. } = event else {
76 return Ok(());
77 };
78 match record.action {
79 TapAction::Create | TapAction::Update => {
80 T::from_tap_record(record, actor_store.get(&record.did).await?)?
81 .write_to_db(db)
82 .await
83 }
84 TapAction::Delete => {
85 db.delete_record(&format!(
86 "at://{}/{}/{}",
87 record.did, record.collection, record.rkey
88 ))
89 .await
90 }
91 }
92}
93
94pub struct Dispatcher {
95 channels: std::collections::HashMap<String, mpsc::Sender<TapEvent>>,
96 workers: Vec<JoinHandle<()>>,
97}
98
99impl Dispatcher {
100 pub fn new<AB: ActorBackend + 'static>(
101 db: Arc<dyn StorageBackend>,
102 actor_store: Arc<ActorIdStore<AB>>,
103 channel_size: usize,
104 ) -> Self {
105 let (channels, workers) = COLLECTIONS.iter().fold(
106 (std::collections::HashMap::new(), Vec::new()),
107 |(mut ch, mut w), c| {
108 let (tx, rx) = mpsc::channel(channel_size);
109 ch.insert(c.to_string(), tx);
110 w.push(spawn_worker(c, rx, db.clone(), actor_store.clone()));
111 (ch, w)
112 },
113 );
114 Self { channels, workers }
115 }
116
117 pub async fn dispatch(&self, event: TapEvent) -> Result<(), StorageError> {
118 let TapEvent::Record { ref record, .. } = event else {
119 return Ok(());
120 };
121 if let Some(tx) = self.channels.get(&record.collection) {
122 tx.send(event)
123 .await
124 .map_err(|_| StorageError::Query("Worker channel closed".into()))?;
125 }
126 Ok(())
127 }
128
129 pub async fn shutdown(self) {
130 drop(self.channels);
131 for worker in self.workers {
132 let _ = worker.await;
133 }
134 }
135}
136
137pub struct EventProcessor<DB: StorageBackend, AB: ActorBackend> {
138 db: Arc<DB>,
139 actor_store: Arc<ActorIdStore<AB>>,
140}
141
142impl<DB: StorageBackend, AB: ActorBackend> EventProcessor<DB, AB> {
143 pub fn new(db: Arc<DB>, actor_store: Arc<ActorIdStore<AB>>) -> Self {
144 Self { db, actor_store }
145 }
146 pub async fn process_event(&self, event: &TapEvent) -> Result<(), StorageError> {
147 process_collection!(event, self.db.as_ref(), &self.actor_store)
148 }
149}