use std::sync::Arc; use futures_util::future; use ipld_core::ipld::Ipld; use jacquard::{ api::com_atproto::sync::subscribe_repos::{Commit, SubscribeReposMessage, Sync}, types::string::Handle, }; use jacquard_repo::{BlockStore, MemoryBlockStore}; use sqlx::{Pool, Postgres, query}; use thiserror::Error; use tokio::{sync::broadcast, task::JoinHandle}; use crate::{backfill::backfill, utils::ipld_json::ipld_to_json_value}; trait Ingest { type Error; async fn ingest(&self, conn: Arc>) -> Result<(), Self::Error>; } #[derive(Debug, Error)] enum CommitError { #[error("Error parsing #commit event: {}", .0)] ParseCarBytes(#[from] jacquard_repo::RepoError), } impl Ingest for Commit<'_> { type Error = CommitError; async fn ingest(&self, conn: Arc>) -> Result<(), Self::Error> { let car = jacquard_repo::car::parse_car_bytes(&self.blocks).await?; let storage = Arc::new(MemoryBlockStore::new_from_blocks(car.blocks)); let ops = future::join_all(self.ops.clone().into_iter().map(|op| async { // get block data by cid, or None if errors/not found if let Some(cid) = &op.cid { if let Ok(cid) = cid.0.to_ipld() && let Ok(contents) = storage.get(&cid).await && let Some(contents) = contents && let Ok(val) = serde_ipld_dagcbor::from_slice::(&contents) { (op, ipld_to_json_value(&val).ok()) } else { (op, None) } } else { (op, None) } })) .await; future::join_all(ops.into_iter().map(|(op, val)| async { let mut path = op.path.split("/"); let Some(collection) = path.next() else { eprintln!("Invalid path ({})", op.path.as_str()); return; }; let Some(rkey) = path.next() else { eprintln!("Invalid path ({})", op.path.as_str()); return; }; // assert the path is only collection/rkey if path.next().is_some() { eprintln!("Invalid path ({})", op.path.as_str()); return; }; match op.action.clone().as_str() { "create" | "update" => { let Some(cid) = op.cid.map(|x| x.0.to_string()) else { eprintln!( "Missing cid for {} {}/{}", op.action.clone().as_str(), collection, rkey ); return; }; let Some(val) = val else { eprintln!( "Missing value for {} {}/{}/{}", op.action.clone().as_str(), collection, rkey, cid ); return; }; if let Err(err) = query!( "INSERT INTO records (collection, rkey, cid, record) VALUES ($1, $2, $3, $4) ON CONFLICT (collection, rkey) DO UPDATE SET cid = EXCLUDED.cid, record = EXCLUDED.record;", collection, rkey, cid, val ) .execute(&*conn) .await { eprintln!( "Error applying {} to {}/{}/{}\n{}", op.action.clone().as_str(), collection, rkey, cid, err ); } else { println!( "{} {}/{}/{}", op.action.clone().as_str(), collection, rkey, cid ); }; } "delete" => { if let Err(err) = query!( "DELETE FROM records WHERE collection = $1 and rkey = $2", collection, rkey, ) .execute(&*conn) .await { eprintln!("Error deleting {}/{}\n{}", collection, rkey, err); } else { println!("delete {}/{}", collection, rkey); }; } _ => { println!( "unknown action {} for {:#?} {:#?}", op.action.as_str(), op, val ) } } })) .await; Ok(()) } } impl Ingest for Sync<'_> { type Error = crate::backfill::Error; async fn ingest(&self, conn: Arc>) -> Result<(), Self::Error> { backfill(conn, None).await } } pub fn ingest( mut reciever: broadcast::Receiver>, conn: Arc>, ) -> JoinHandle<()> { tokio::spawn(async move { loop { let next = match reciever.recv().await { Ok(val) => val, Err(err) => match err { broadcast::error::RecvError::Closed => { eprintln!("Ingestion failed. Quitting"); break; } broadcast::error::RecvError::Lagged(skipped) => { eprintln!("Warning: lagging behind. Skipping {skipped} messages"); continue; } }, }; match next { SubscribeReposMessage::Commit(commit) => { commit.ingest(conn.clone()).await.unwrap_or_else(|err| { eprintln!("error handling #commit({}): {:?}", commit.clone().rev, err) }) } SubscribeReposMessage::Sync(sync) => { sync.ingest(conn.clone()).await.unwrap_or_else(|err| { eprintln!("error handling #sync({}): {:?}", sync.clone().rev, err) }) } SubscribeReposMessage::Identity(identity) => println!( "ignoring #identity({}) event. has user migrated?", identity.handle.unwrap_or(Handle::raw("handle.invalid")) ), SubscribeReposMessage::Account(account) => println!( "ignoring #account({} {}) event. has user deactivated?", account.active, account.status.unwrap_or("unknown".into()) ), SubscribeReposMessage::Info(info) => { println!("ignoring #info({}) event", info.name) } SubscribeReposMessage::Unknown(_) => { println!("ignoring unknown event. is meview outdated?") } }; } }) }