forked from
parakeet.at/parakeet
Rust AppView - highly experimental!
1use super::{
2 types::{CarCommitEntry, CarEntry, CarRecordEntry},
3 CopyStore,
4};
5use crate::indexer::records;
6use crate::indexer::types::{AggregateDeltaStore, RecordTypes};
7use crate::{db, indexer};
8use deadpool_postgres::Transaction;
9use ipld_core::cid::Cid;
10use iroh_car::CarReader;
11use metrics::counter;
12use parakeet_index::AggregateType;
13use redis::aio::MultiplexedConnection;
14use std::collections::HashMap;
15use std::path::Path;
16use tokio::io::BufReader;
17
18type BackfillDeltaStore = HashMap<(String, i32), i32>;
19
20pub async fn insert_repo(
21 t: &mut Transaction<'_>,
22 rc: &mut MultiplexedConnection,
23 tmp_dir: &Path,
24 repo: &str,
25) -> eyre::Result<(CarCommitEntry, BackfillDeltaStore, CopyStore)> {
26 let car = tokio::fs::File::open(tmp_dir.join(repo)).await?;
27 let mut car_stream = CarReader::new(BufReader::new(car)).await?;
28
29 // the root should be the commit block
30 let root = car_stream.header().roots().first().cloned().unwrap();
31
32 let mut commit = None;
33 let mut mst_nodes: HashMap<Cid, String> = HashMap::new();
34 let mut records: HashMap<Cid, RecordTypes> = HashMap::new();
35 let mut deltas = HashMap::new();
36 let mut copies = CopyStore::default();
37
38 while let Some((cid, block)) = car_stream.next_block().await? {
39 let Ok(block) = serde_ipld_dagcbor::from_slice::<CarEntry>(&block) else {
40 tracing::warn!("failed to parse block {cid}");
41 continue;
42 };
43
44 if root == cid {
45 if let CarEntry::Commit(commit_entry) = block {
46 commit = Some(commit_entry);
47 } else {
48 tracing::warn!("root did not point to a commit entry");
49 }
50 continue;
51 }
52
53 match block {
54 CarEntry::Commit(_) => {
55 tracing::warn!("got commit entry that was not in root")
56 }
57 CarEntry::Record(CarRecordEntry::Known(record)) => {
58 if let Some(path) = mst_nodes.remove(&cid) {
59 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?;
60 } else {
61 records.insert(cid, record);
62 }
63 }
64 CarEntry::Record(CarRecordEntry::Other { ty }) => {
65 tracing::debug!("repo contains unknown record type: {ty} ({cid})");
66 }
67 CarEntry::Mst(mst) => {
68 let mut out = Vec::with_capacity(mst.e.len());
69
70 for node in mst.e {
71 let ks = String::from_utf8_lossy(&node.k);
72
73 let key = if node.p == 0 {
74 ks.to_string()
75 } else {
76 let (_, prev): &(Cid, String) = out.last().unwrap();
77 let prefix = &prev[..node.p as usize];
78
79 format!("{prefix}{ks}")
80 };
81
82 out.push((node.v, key.to_string()));
83 }
84
85 mst_nodes.extend(out);
86 }
87 }
88 }
89
90 for (cid, record) in records {
91 if let Some(path) = mst_nodes.remove(&cid) {
92 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?;
93 } else {
94 tracing::warn!("couldn't find MST node for record {cid}")
95 }
96 }
97
98 let commit = commit.unwrap();
99
100 Ok((commit, deltas, copies))
101}
102
103async fn record_index(
104 t: &mut Transaction<'_>,
105 rc: &mut MultiplexedConnection,
106 copies: &mut CopyStore,
107 deltas: &mut BackfillDeltaStore,
108 did: &str,
109 path: &str,
110 cid: Cid,
111 record: RecordTypes,
112) -> eyre::Result<()> {
113 let Some((collection_raw, rkey)) = path.split_once("/") else {
114 tracing::warn!("op contained invalid path {path}");
115 return Ok(());
116 };
117
118 counter!("backfilled_commits", "collection" => collection_raw.to_string()).increment(1);
119
120 let at_uri = format!("at://{did}/{path}");
121
122 match record {
123 RecordTypes::AppBskyFeedLike(rec) => {
124 deltas.incr(&rec.subject.uri, AggregateType::Like).await;
125
126 copies.push_record(&at_uri, cid);
127 copies
128 .likes
129 .push((rkey.to_string(), rec.subject, rec.via, rec.created_at));
130 }
131 RecordTypes::AppBskyFeedPost(rec) => {
132 let maybe_reply = rec.reply.as_ref().map(|v| v.parent.uri.clone());
133 let maybe_embed = rec
134 .embed
135 .as_ref()
136 .and_then(|v| v.as_bsky())
137 .and_then(|v| match v {
138 records::AppBskyEmbed::Record(r) => Some(r.record.uri.clone()),
139 records::AppBskyEmbed::RecordWithMedia(r) => Some(r.record.record.uri.clone()),
140 _ => None,
141 });
142
143 if let Some(labels) = rec.labels.clone() {
144 db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?;
145 }
146 if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) {
147 db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?;
148 }
149
150 deltas.incr(did, AggregateType::ProfilePost).await;
151 if let Some(reply) = maybe_reply {
152 deltas.incr(&reply, AggregateType::Reply).await;
153 }
154 if let Some(embed) = maybe_embed {
155 deltas.incr(&embed, AggregateType::Embed).await;
156 }
157
158 copies.push_record(&at_uri, cid);
159 copies.posts.push((at_uri, cid, rec));
160 }
161 RecordTypes::AppBskyFeedRepost(rec) => {
162 deltas.incr(&rec.subject.uri, AggregateType::Repost).await;
163
164 copies.push_record(&at_uri, cid);
165 copies
166 .reposts
167 .push((rkey.to_string(), rec.subject, rec.via, rec.created_at));
168 }
169 RecordTypes::AppBskyGraphBlock(rec) => {
170 copies.push_record(&at_uri, cid);
171 copies
172 .blocks
173 .push((rkey.to_string(), rec.subject, rec.created_at));
174 }
175 RecordTypes::AppBskyGraphFollow(rec) => {
176 deltas.incr(did, AggregateType::Follow).await;
177 deltas.incr(&rec.subject, AggregateType::Follower).await;
178
179 copies.push_record(&at_uri, cid);
180 copies
181 .follows
182 .push((rkey.to_string(), rec.subject, rec.created_at));
183 }
184 RecordTypes::AppBskyGraphListItem(rec) => {
185 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>();
186 if did != split_aturi[2] {
187 // it's also probably a bad idea to log *all* the attempts to do this...
188 tracing::warn!("tried to create a listitem on a list we don't control!");
189 return Ok(());
190 }
191
192 copies.push_record(&at_uri, cid);
193 copies.list_items.push((at_uri, rec));
194 }
195 RecordTypes::AppBskyGraphVerification(rec) => {
196 copies.push_record(&at_uri, cid);
197 copies.verifications.push((at_uri, cid, rec));
198 }
199 _ => indexer::index_op(t, rc, deltas, did, cid, record, &at_uri, rkey).await?,
200 }
201
202 Ok(())
203}