this repo has no description
at main 208 lines 7.5 kB view raw
1use std::sync::Arc; 2 3use futures_util::future; 4use ipld_core::ipld::Ipld; 5use jacquard::{ 6 api::com_atproto::sync::subscribe_repos::{Commit, SubscribeReposMessage, Sync}, 7 types::string::Handle, 8}; 9use jacquard_repo::{BlockStore, MemoryBlockStore}; 10use sqlx::{Pool, Postgres, query}; 11use thiserror::Error; 12use tokio::{sync::broadcast, task::JoinHandle}; 13 14use crate::{backfill::backfill, utils::ipld_json::ipld_to_json_value}; 15 16trait Ingest { 17 type Error; 18 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error>; 19} 20 21#[derive(Debug, Error)] 22enum CommitError { 23 #[error("Error parsing #commit event: {}", .0)] 24 ParseCarBytes(#[from] jacquard_repo::RepoError), 25} 26 27impl Ingest for Commit<'_> { 28 type Error = CommitError; 29 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> { 30 let car = jacquard_repo::car::parse_car_bytes(&self.blocks).await?; 31 let storage = Arc::new(MemoryBlockStore::new_from_blocks(car.blocks)); 32 33 let ops = future::join_all(self.ops.clone().into_iter().map(|op| async { 34 // get block data by cid, or None if errors/not found 35 if let Some(cid) = &op.cid { 36 if let Ok(cid) = cid.0.to_ipld() 37 && let Ok(contents) = storage.get(&cid).await 38 && let Some(contents) = contents 39 && let Ok(val) = serde_ipld_dagcbor::from_slice::<Ipld>(&contents) 40 { 41 (op, ipld_to_json_value(&val).ok()) 42 } else { 43 (op, None) 44 } 45 } else { 46 (op, None) 47 } 48 })) 49 .await; 50 51 future::join_all(ops.into_iter().map(|(op, val)| async { 52 let mut path = op.path.split("/"); 53 let Some(collection) = path.next() else { 54 eprintln!("Invalid path ({})", op.path.as_str()); 55 return; 56 }; 57 let Some(rkey) = path.next() else { 58 eprintln!("Invalid path ({})", op.path.as_str()); 59 return; 60 }; 61 // assert the path is only collection/rkey 62 if path.next().is_some() { 63 eprintln!("Invalid path ({})", op.path.as_str()); 64 return; 65 }; 66 match op.action.clone().as_str() { 67 "create" | "update" => { 68 let Some(cid) = op.cid.map(|x| x.0.to_string()) else { 69 eprintln!( 70 "Missing cid for {} {}/{}", 71 op.action.clone().as_str(), 72 collection, 73 rkey 74 ); 75 return; 76 }; 77 let Some(val) = val else { 78 eprintln!( 79 "Missing value for {} {}/{}/{}", 80 op.action.clone().as_str(), 81 collection, 82 rkey, 83 cid 84 ); 85 return; 86 }; 87 if let Err(err) = query!( 88 "INSERT INTO records (collection, rkey, cid, record) 89 VALUES ($1, $2, $3, $4) 90 ON CONFLICT (collection, rkey) 91 DO UPDATE SET 92 cid = EXCLUDED.cid, 93 record = EXCLUDED.record;", 94 collection, 95 rkey, 96 cid, 97 val 98 ) 99 .execute(&*conn) 100 .await 101 { 102 eprintln!( 103 "Error applying {} to {}/{}/{}\n{}", 104 op.action.clone().as_str(), 105 collection, 106 rkey, 107 cid, 108 err 109 ); 110 } else { 111 println!( 112 "{} {}/{}/{}", 113 op.action.clone().as_str(), 114 collection, 115 rkey, 116 cid 117 ); 118 }; 119 } 120 "delete" => { 121 if let Err(err) = query!( 122 "DELETE FROM records WHERE 123 collection = $1 124 and rkey = $2", 125 collection, 126 rkey, 127 ) 128 .execute(&*conn) 129 .await 130 { 131 eprintln!("Error deleting {}/{}\n{}", collection, rkey, err); 132 } else { 133 println!("delete {}/{}", collection, rkey); 134 }; 135 } 136 _ => { 137 println!( 138 "unknown action {} for {:#?} {:#?}", 139 op.action.as_str(), 140 op, 141 val 142 ) 143 } 144 } 145 })) 146 .await; 147 148 Ok(()) 149 } 150} 151 152impl Ingest for Sync<'_> { 153 type Error = crate::backfill::Error; 154 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> { 155 backfill(conn, None).await 156 } 157} 158 159pub fn ingest( 160 mut reciever: broadcast::Receiver<SubscribeReposMessage<'static>>, 161 conn: Arc<Pool<Postgres>>, 162) -> JoinHandle<()> { 163 tokio::spawn(async move { 164 loop { 165 let next = match reciever.recv().await { 166 Ok(val) => val, 167 Err(err) => match err { 168 broadcast::error::RecvError::Closed => { 169 eprintln!("Ingestion failed. Quitting"); 170 break; 171 } 172 broadcast::error::RecvError::Lagged(skipped) => { 173 eprintln!("Warning: lagging behind. Skipping {skipped} messages"); 174 continue; 175 } 176 }, 177 }; 178 179 match next { 180 SubscribeReposMessage::Commit(commit) => { 181 commit.ingest(conn.clone()).await.unwrap_or_else(|err| { 182 eprintln!("error handling #commit({}): {:?}", commit.clone().rev, err) 183 }) 184 } 185 SubscribeReposMessage::Sync(sync) => { 186 sync.ingest(conn.clone()).await.unwrap_or_else(|err| { 187 eprintln!("error handling #sync({}): {:?}", sync.clone().rev, err) 188 }) 189 } 190 SubscribeReposMessage::Identity(identity) => println!( 191 "ignoring #identity({}) event. has user migrated?", 192 identity.handle.unwrap_or(Handle::raw("handle.invalid")) 193 ), 194 SubscribeReposMessage::Account(account) => println!( 195 "ignoring #account({} {}) event. has user deactivated?", 196 account.active, 197 account.status.unwrap_or("unknown".into()) 198 ), 199 SubscribeReposMessage::Info(info) => { 200 println!("ignoring #info({}) event", info.name) 201 } 202 SubscribeReposMessage::Unknown(_) => { 203 println!("ignoring unknown event. is meview outdated?") 204 } 205 }; 206 } 207 }) 208}