Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 203 lines 7.2 kB view raw
1use super::{ 2 types::{CarCommitEntry, CarEntry, CarRecordEntry}, 3 CopyStore, 4}; 5use crate::indexer::records; 6use crate::indexer::types::{AggregateDeltaStore, RecordTypes}; 7use crate::{db, indexer}; 8use deadpool_postgres::Transaction; 9use ipld_core::cid::Cid; 10use iroh_car::CarReader; 11use metrics::counter; 12use parakeet_index::AggregateType; 13use redis::aio::MultiplexedConnection; 14use std::collections::HashMap; 15use std::path::Path; 16use tokio::io::BufReader; 17 18type BackfillDeltaStore = HashMap<(String, i32), i32>; 19 20pub async fn insert_repo( 21 t: &mut Transaction<'_>, 22 rc: &mut MultiplexedConnection, 23 tmp_dir: &Path, 24 repo: &str, 25) -> eyre::Result<(CarCommitEntry, BackfillDeltaStore, CopyStore)> { 26 let car = tokio::fs::File::open(tmp_dir.join(repo)).await?; 27 let mut car_stream = CarReader::new(BufReader::new(car)).await?; 28 29 // the root should be the commit block 30 let root = car_stream.header().roots().first().cloned().unwrap(); 31 32 let mut commit = None; 33 let mut mst_nodes: HashMap<Cid, String> = HashMap::new(); 34 let mut records: HashMap<Cid, RecordTypes> = HashMap::new(); 35 let mut deltas = HashMap::new(); 36 let mut copies = CopyStore::default(); 37 38 while let Some((cid, block)) = car_stream.next_block().await? { 39 let Ok(block) = serde_ipld_dagcbor::from_slice::<CarEntry>(&block) else { 40 tracing::warn!("failed to parse block {cid}"); 41 continue; 42 }; 43 44 if root == cid { 45 if let CarEntry::Commit(commit_entry) = block { 46 commit = Some(commit_entry); 47 } else { 48 tracing::warn!("root did not point to a commit entry"); 49 } 50 continue; 51 } 52 53 match block { 54 CarEntry::Commit(_) => { 55 tracing::warn!("got commit entry that was not in root") 56 } 57 CarEntry::Record(CarRecordEntry::Known(record)) => { 58 if let Some(path) = mst_nodes.remove(&cid) { 59 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 60 } else { 61 records.insert(cid, record); 62 } 63 } 64 CarEntry::Record(CarRecordEntry::Other { ty }) => { 65 tracing::debug!("repo contains unknown record type: {ty} ({cid})"); 66 } 67 CarEntry::Mst(mst) => { 68 let mut out = Vec::with_capacity(mst.e.len()); 69 70 for node in mst.e { 71 let ks = String::from_utf8_lossy(&node.k); 72 73 let key = if node.p == 0 { 74 ks.to_string() 75 } else { 76 let (_, prev): &(Cid, String) = out.last().unwrap(); 77 let prefix = &prev[..node.p as usize]; 78 79 format!("{prefix}{ks}") 80 }; 81 82 out.push((node.v, key.to_string())); 83 } 84 85 mst_nodes.extend(out); 86 } 87 } 88 } 89 90 for (cid, record) in records { 91 if let Some(path) = mst_nodes.remove(&cid) { 92 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 93 } else { 94 tracing::warn!("couldn't find MST node for record {cid}") 95 } 96 } 97 98 let commit = commit.unwrap(); 99 100 Ok((commit, deltas, copies)) 101} 102 103async fn record_index( 104 t: &mut Transaction<'_>, 105 rc: &mut MultiplexedConnection, 106 copies: &mut CopyStore, 107 deltas: &mut BackfillDeltaStore, 108 did: &str, 109 path: &str, 110 cid: Cid, 111 record: RecordTypes, 112) -> eyre::Result<()> { 113 let Some((collection_raw, rkey)) = path.split_once("/") else { 114 tracing::warn!("op contained invalid path {path}"); 115 return Ok(()); 116 }; 117 118 counter!("backfilled_commits", "collection" => collection_raw.to_string()).increment(1); 119 120 let at_uri = format!("at://{did}/{path}"); 121 122 match record { 123 RecordTypes::AppBskyFeedLike(rec) => { 124 deltas.incr(&rec.subject.uri, AggregateType::Like).await; 125 126 copies.push_record(&at_uri, cid); 127 copies 128 .likes 129 .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 130 } 131 RecordTypes::AppBskyFeedPost(rec) => { 132 let maybe_reply = rec.reply.as_ref().map(|v| v.parent.uri.clone()); 133 let maybe_embed = rec 134 .embed 135 .as_ref() 136 .and_then(|v| v.as_bsky()) 137 .and_then(|v| match v { 138 records::AppBskyEmbed::Record(r) => Some(r.record.uri.clone()), 139 records::AppBskyEmbed::RecordWithMedia(r) => Some(r.record.record.uri.clone()), 140 _ => None, 141 }); 142 143 if let Some(labels) = rec.labels.clone() { 144 db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?; 145 } 146 if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) { 147 db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?; 148 } 149 150 deltas.incr(did, AggregateType::ProfilePost).await; 151 if let Some(reply) = maybe_reply { 152 deltas.incr(&reply, AggregateType::Reply).await; 153 } 154 if let Some(embed) = maybe_embed { 155 deltas.incr(&embed, AggregateType::Embed).await; 156 } 157 158 copies.push_record(&at_uri, cid); 159 copies.posts.push((at_uri, cid, rec)); 160 } 161 RecordTypes::AppBskyFeedRepost(rec) => { 162 deltas.incr(&rec.subject.uri, AggregateType::Repost).await; 163 164 copies.push_record(&at_uri, cid); 165 copies 166 .reposts 167 .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 168 } 169 RecordTypes::AppBskyGraphBlock(rec) => { 170 copies.push_record(&at_uri, cid); 171 copies 172 .blocks 173 .push((rkey.to_string(), rec.subject, rec.created_at)); 174 } 175 RecordTypes::AppBskyGraphFollow(rec) => { 176 deltas.incr(did, AggregateType::Follow).await; 177 deltas.incr(&rec.subject, AggregateType::Follower).await; 178 179 copies.push_record(&at_uri, cid); 180 copies 181 .follows 182 .push((rkey.to_string(), rec.subject, rec.created_at)); 183 } 184 RecordTypes::AppBskyGraphListItem(rec) => { 185 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>(); 186 if did != split_aturi[2] { 187 // it's also probably a bad idea to log *all* the attempts to do this... 188 tracing::warn!("tried to create a listitem on a list we don't control!"); 189 return Ok(()); 190 } 191 192 copies.push_record(&at_uri, cid); 193 copies.list_items.push((at_uri, rec)); 194 } 195 RecordTypes::AppBskyGraphVerification(rec) => { 196 copies.push_record(&at_uri, cid); 197 copies.verifications.push((at_uri, cid, rec)); 198 } 199 _ => indexer::index_op(t, rc, deltas, did, cid, record, &at_uri, rkey).await?, 200 } 201 202 Ok(()) 203}