this repo has no description
1use std::sync::Arc;
2
3use futures_util::future;
4use ipld_core::ipld::Ipld;
5use jacquard::{
6 api::com_atproto::sync::subscribe_repos::{Commit, SubscribeReposMessage, Sync},
7 types::string::Handle,
8};
9use jacquard_repo::{BlockStore, MemoryBlockStore};
10use sqlx::{Pool, Postgres, query};
11use thiserror::Error;
12use tokio::{sync::broadcast, task::JoinHandle};
13
14use crate::{backfill::backfill, utils::ipld_json::ipld_to_json_value};
15
16trait Ingest {
17 type Error;
18 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error>;
19}
20
21#[derive(Debug, Error)]
22enum CommitError {
23 #[error("Error parsing #commit event: {}", .0)]
24 ParseCarBytes(#[from] jacquard_repo::RepoError),
25}
26
27impl Ingest for Commit<'_> {
28 type Error = CommitError;
29 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> {
30 let car = jacquard_repo::car::parse_car_bytes(&self.blocks).await?;
31 let storage = Arc::new(MemoryBlockStore::new_from_blocks(car.blocks));
32
33 let ops = future::join_all(self.ops.clone().into_iter().map(|op| async {
34 // get block data by cid, or None if errors/not found
35 if let Some(cid) = &op.cid {
36 if let Ok(cid) = cid.0.to_ipld()
37 && let Ok(contents) = storage.get(&cid).await
38 && let Some(contents) = contents
39 && let Ok(val) = serde_ipld_dagcbor::from_slice::<Ipld>(&contents)
40 {
41 (op, ipld_to_json_value(&val).ok())
42 } else {
43 (op, None)
44 }
45 } else {
46 (op, None)
47 }
48 }))
49 .await;
50
51 future::join_all(ops.into_iter().map(|(op, val)| async {
52 let mut path = op.path.split("/");
53 let Some(collection) = path.next() else {
54 eprintln!("Invalid path ({})", op.path.as_str());
55 return;
56 };
57 let Some(rkey) = path.next() else {
58 eprintln!("Invalid path ({})", op.path.as_str());
59 return;
60 };
61 // assert the path is only collection/rkey
62 if path.next().is_some() {
63 eprintln!("Invalid path ({})", op.path.as_str());
64 return;
65 };
66 match op.action.clone().as_str() {
67 "create" | "update" => {
68 let Some(cid) = op.cid.map(|x| x.0.to_string()) else {
69 eprintln!(
70 "Missing cid for {} {}/{}",
71 op.action.clone().as_str(),
72 collection,
73 rkey
74 );
75 return;
76 };
77 let Some(val) = val else {
78 eprintln!(
79 "Missing value for {} {}/{}/{}",
80 op.action.clone().as_str(),
81 collection,
82 rkey,
83 cid
84 );
85 return;
86 };
87 if let Err(err) = query!(
88 "INSERT INTO records (collection, rkey, cid, record)
89 VALUES ($1, $2, $3, $4)
90 ON CONFLICT (collection, rkey)
91 DO UPDATE SET
92 cid = EXCLUDED.cid,
93 record = EXCLUDED.record;",
94 collection,
95 rkey,
96 cid,
97 val
98 )
99 .execute(&*conn)
100 .await
101 {
102 eprintln!(
103 "Error applying {} to {}/{}/{}\n{}",
104 op.action.clone().as_str(),
105 collection,
106 rkey,
107 cid,
108 err
109 );
110 } else {
111 println!(
112 "{} {}/{}/{}",
113 op.action.clone().as_str(),
114 collection,
115 rkey,
116 cid
117 );
118 };
119 }
120 "delete" => {
121 if let Err(err) = query!(
122 "DELETE FROM records WHERE
123 collection = $1
124 and rkey = $2",
125 collection,
126 rkey,
127 )
128 .execute(&*conn)
129 .await
130 {
131 eprintln!("Error deleting {}/{}\n{}", collection, rkey, err);
132 } else {
133 println!("delete {}/{}", collection, rkey);
134 };
135 }
136 _ => {
137 println!(
138 "unknown action {} for {:#?} {:#?}",
139 op.action.as_str(),
140 op,
141 val
142 )
143 }
144 }
145 }))
146 .await;
147
148 Ok(())
149 }
150}
151
152impl Ingest for Sync<'_> {
153 type Error = crate::backfill::Error;
154 async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> {
155 backfill(conn, None).await
156 }
157}
158
159pub fn ingest(
160 mut reciever: broadcast::Receiver<SubscribeReposMessage<'static>>,
161 conn: Arc<Pool<Postgres>>,
162) -> JoinHandle<()> {
163 tokio::spawn(async move {
164 loop {
165 let next = match reciever.recv().await {
166 Ok(val) => val,
167 Err(err) => match err {
168 broadcast::error::RecvError::Closed => {
169 eprintln!("Ingestion failed. Quitting");
170 break;
171 }
172 broadcast::error::RecvError::Lagged(skipped) => {
173 eprintln!("Warning: lagging behind. Skipping {skipped} messages");
174 continue;
175 }
176 },
177 };
178
179 match next {
180 SubscribeReposMessage::Commit(commit) => {
181 commit.ingest(conn.clone()).await.unwrap_or_else(|err| {
182 eprintln!("error handling #commit({}): {:?}", commit.clone().rev, err)
183 })
184 }
185 SubscribeReposMessage::Sync(sync) => {
186 sync.ingest(conn.clone()).await.unwrap_or_else(|err| {
187 eprintln!("error handling #sync({}): {:?}", sync.clone().rev, err)
188 })
189 }
190 SubscribeReposMessage::Identity(identity) => println!(
191 "ignoring #identity({}) event. has user migrated?",
192 identity.handle.unwrap_or(Handle::raw("handle.invalid"))
193 ),
194 SubscribeReposMessage::Account(account) => println!(
195 "ignoring #account({} {}) event. has user deactivated?",
196 account.active,
197 account.status.unwrap_or("unknown".into())
198 ),
199 SubscribeReposMessage::Info(info) => {
200 println!("ignoring #info({}) event", info.name)
201 }
202 SubscribeReposMessage::Unknown(_) => {
203 println!("ignoring unknown event. is meview outdated?")
204 }
205 };
206 }
207 })
208}