Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'feat-caching' into 'main'

Caching

See merge request parakeet-social/parakeet!19

+650 -187
+3
Cargo.lock
··· 2770 2770 "multibase", 2771 2771 "parakeet-db", 2772 2772 "parakeet-index", 2773 + "redis", 2773 2774 "reqwest", 2774 2775 "serde", 2776 + "serde_ipld_dagcbor", 2775 2777 "serde_json", 2776 2778 "tokio", 2777 2779 "tower-http", ··· 2786 2788 "chrono", 2787 2789 "diesel", 2788 2790 "postgres-types", 2791 + "serde", 2789 2792 "serde_json", 2790 2793 ] 2791 2794
+12 -7
consumer/src/backfill/mod.rs
··· 97 97 98 98 let mut inner = self.inner.clone(); 99 99 let mut conn = self.pool.get().await?; 100 + let mut rc = self.redis.clone(); 100 101 101 102 tracker.spawn(async move { 102 103 let _p = p; 103 104 tracing::trace!("backfilling {job}"); 104 105 105 - if let Err(e) = backfill_actor(&mut conn, &mut inner, &job).await { 106 + if let Err(e) = backfill_actor(&mut conn, &mut rc, &mut inner, &job).await { 106 107 tracing::error!(did = &job, "backfill failed: {e}"); 107 108 counter!("backfill_failure").increment(1); 108 109 ··· 132 133 #[instrument(skip(conn, inner))] 133 134 async fn backfill_actor( 134 135 conn: &mut Object, 136 + rc: &mut MultiplexedConnection, 135 137 inner: &mut BackfillManagerInner, 136 138 did: &str, 137 139 ) -> eyre::Result<()> { ··· 140 142 141 143 tracing::trace!("loading repo"); 142 144 143 - let (commit, mut deltas, copies) = repo::insert_repo(&mut t, &inner.tmp_dir, did).await?; 145 + let (commit, mut deltas, copies) = repo::insert_repo(&mut t, rc, &inner.tmp_dir, did).await?; 144 146 145 147 db::actor_set_repo_state(&mut t, did, &commit.rev, commit.data).await?; 146 148 ··· 152 154 ) 153 155 .await?; 154 156 155 - handle_backfill_rows(&mut t, &mut deltas, did, &commit.rev).await?; 157 + handle_backfill_rows(&mut t, rc, &mut deltas, did, &commit.rev).await?; 156 158 157 159 tracing::trace!("insertion finished"); 158 160 ··· 192 194 193 195 async fn handle_backfill_rows( 194 196 conn: &mut Transaction<'_>, 197 + rc: &mut MultiplexedConnection, 195 198 deltas: &mut impl AggregateDeltaStore, 196 199 repo: &str, 197 200 rev: &str, 198 - ) -> Result<(), tokio_postgres::Error> { 201 + ) -> eyre::Result<()> { 199 202 // `pull_backfill_rows` filters out anything before the last commit we pulled 200 203 let backfill_rows = db::backfill_rows_get(conn, repo, rev).await?; 201 204 202 205 for row in backfill_rows { 203 206 // blindly unwrap-ing this CID as we've already parsed it and re-serialized it 204 - let repo_cid = Cid::from_str(&row.cid).unwrap(); 207 + let repo_cid = Cid::from_str(&row.cid)?; 205 208 db::actor_set_repo_state(conn, repo, &row.repo_ver, repo_cid).await?; 206 209 207 210 // again, we've serialized this. 208 - let items: Vec<BackfillItem> = serde_json::from_value(row.data).unwrap(); 211 + let items: Vec<BackfillItem> = serde_json::from_value(row.data)?; 209 212 210 213 for item in items { 211 214 let Some((_, rkey)) = item.at_uri.rsplit_once("/") else { ··· 218 221 continue; 219 222 }; 220 223 221 - indexer::index_op(conn, deltas, repo, cid, record, &item.at_uri, rkey).await? 224 + indexer::index_op(conn, rc, deltas, repo, cid, record, &item.at_uri, rkey) 225 + .await? 222 226 } 223 227 BackfillItemInner::Delete => { 224 228 indexer::index_op_delete( 225 229 conn, 230 + rc, 226 231 deltas, 227 232 repo, 228 233 item.collection,
+6 -3
consumer/src/backfill/repo.rs
··· 10 10 use iroh_car::CarReader; 11 11 use metrics::counter; 12 12 use parakeet_index::AggregateType; 13 + use redis::aio::MultiplexedConnection; 13 14 use std::collections::HashMap; 14 15 use std::path::Path; 15 16 use tokio::io::BufReader; ··· 18 19 19 20 pub async fn insert_repo( 20 21 t: &mut Transaction<'_>, 22 + rc: &mut MultiplexedConnection, 21 23 tmp_dir: &Path, 22 24 repo: &str, 23 25 ) -> eyre::Result<(CarCommitEntry, BackfillDeltaStore, CopyStore)> { ··· 54 56 } 55 57 CarEntry::Record(record) => { 56 58 if let Some(path) = mst_nodes.remove(&cid) { 57 - record_index(t, &mut copies, &mut deltas, repo, &path, cid, record).await?; 59 + record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 58 60 } else { 59 61 records.insert(cid, record); 60 62 } ··· 84 86 85 87 for (cid, record) in records { 86 88 if let Some(path) = mst_nodes.remove(&cid) { 87 - record_index(t, &mut copies, &mut deltas, repo, &path, cid, record).await?; 89 + record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 88 90 } else { 89 91 tracing::warn!("couldn't find MST node for record {cid}") 90 92 } ··· 97 99 98 100 async fn record_index( 99 101 t: &mut Transaction<'_>, 102 + rc: &mut MultiplexedConnection, 100 103 copies: &mut CopyStore, 101 104 deltas: &mut BackfillDeltaStore, 102 105 did: &str, ··· 190 193 copies.push_record(&at_uri, cid); 191 194 copies.verifications.push((at_uri, cid, rec)); 192 195 } 193 - _ => indexer::index_op(t, deltas, did, cid, record, &at_uri, rkey).await?, 196 + _ => indexer::index_op(t, rc, deltas, did, cid, record, &at_uri, rkey).await?, 194 197 } 195 198 196 199 Ok(())
+59 -13
consumer/src/indexer/mod.rs
··· 394 394 db::actor_set_repo_state(&mut t, &commit.repo, &commit.rev, commit.commit).await?; 395 395 396 396 for op in &commit.ops { 397 - process_op(&mut t, &mut state.idxc_tx, &commit.repo, op, &blocks).await?; 397 + process_op(&mut t, rc, &mut state.idxc_tx, &commit.repo, op, &blocks).await?; 398 398 } 399 399 400 400 t.commit().await?; ··· 461 461 #[inline(always)] 462 462 async fn process_op( 463 463 conn: &mut Transaction<'_>, 464 + rc: &mut MultiplexedConnection, 464 465 deltas: &mut impl AggregateDeltaStore, 465 466 repo: &str, 466 467 op: &CommitOp, 467 468 blocks: &HashMap<Cid, Vec<u8>>, 468 - ) -> Result<(), tokio_postgres::Error> { 469 + ) -> eyre::Result<()> { 469 470 let Some((collection_raw, rkey)) = op.path.split_once("/") else { 470 471 tracing::warn!("op contained invalid path {}", op.path); 471 472 return Ok(()); ··· 490 491 return Ok(()); 491 492 }; 492 493 493 - index_op(conn, deltas, repo, cid, decoded, &full_path, rkey).await?; 494 + index_op(conn, rc, deltas, repo, cid, decoded, &full_path, rkey).await?; 494 495 } else if op.action == "delete" { 495 - index_op_delete(conn, deltas, repo, collection, &full_path, rkey).await?; 496 + index_op_delete(conn, rc, deltas, repo, collection, &full_path, rkey).await?; 496 497 } else { 497 498 tracing::warn!("op contained invalid action {}", op.action); 498 499 } ··· 517 518 518 519 pub async fn index_op( 519 520 conn: &mut Transaction<'_>, 521 + rc: &mut MultiplexedConnection, 520 522 deltas: &mut impl AggregateDeltaStore, 521 523 repo: &str, 522 524 cid: Cid, 523 525 record: RecordTypes, 524 526 at_uri: &str, 525 527 rkey: &str, 526 - ) -> Result<(), tokio_postgres::Error> { 528 + ) -> eyre::Result<()> { 527 529 match record { 528 530 RecordTypes::AppBskyActorProfile(record) => { 529 531 if rkey == "self" { ··· 533 535 if let Some(labels) = labels { 534 536 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 535 537 } 538 + 539 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 536 540 } 537 541 } 538 542 RecordTypes::AppBskyActorStatus(record) => { 539 543 if rkey == "self" { 540 544 db::status_upsert(conn, repo, record).await?; 545 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 541 546 } 542 547 } 543 548 RecordTypes::AppBskyFeedGenerator(record) => { ··· 550 555 551 556 if did_insert { 552 557 deltas.incr(repo, AggregateType::ProfileFeed).await; 558 + } else { 559 + redis::AsyncTypedCommands::del(rc, format!("feedgen#{at_uri}")).await?; 553 560 } 554 561 } 555 562 RecordTypes::AppBskyFeedLike(record) => { ··· 618 625 disable_effective, 619 626 ) 620 627 .await?; 628 + 629 + // TODO: should we purge embed#{at_uri} for everything in detached_embeding_uris? 630 + // maybe postgate_maintain_detaches should return a list of uris? 621 631 } 622 632 RecordTypes::AppBskyFeedRepost(record) => { 623 633 deltas ··· 633 643 } 634 644 635 645 db::threadgate_upsert(conn, at_uri, cid, record).await?; 646 + redis::AsyncTypedCommands::del(rc, format!("post#{at_uri}")).await?; 636 647 } 637 648 RecordTypes::AppBskyGraphBlock(record) => { 638 649 db::block_insert(conn, rkey, repo, record).await?; ··· 658 669 659 670 if did_insert { 660 671 deltas.incr(repo, AggregateType::ProfileList).await; 672 + } else { 673 + redis::AsyncTypedCommands::del(rc, format!("list#{at_uri}")).await?; 661 674 } 662 675 } 663 676 RecordTypes::AppBskyGraphListBlock(record) => { ··· 671 684 return Ok(()); 672 685 } 673 686 687 + redis::AsyncTypedCommands::del(rc, format!("list#{}", &record.list)).await?; 674 688 db::list_item_insert(conn, at_uri, record).await?; 675 689 } 676 690 RecordTypes::AppBskyGraphStarterPack(record) => { ··· 678 692 679 693 if did_insert { 680 694 deltas.incr(repo, AggregateType::ProfileStarterpack).await; 695 + } else { 696 + redis::AsyncTypedCommands::del(rc, format!("starterpacks#{at_uri}")).await?; 681 697 } 682 698 } 683 699 RecordTypes::AppBskyGraphVerification(record) => { ··· 691 707 if let Some(labels) = labels { 692 708 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 693 709 } 710 + 711 + redis::AsyncTypedCommands::del(rc, format!("labeler#{repo}")).await?; 694 712 } 695 713 } 696 714 RecordTypes::AppBskyNotificationDeclaration(record) => { 697 715 if rkey == "self" { 698 716 db::notif_decl_upsert(conn, repo, record).await?; 717 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 699 718 } 700 719 } 701 720 RecordTypes::ChatBskyActorDeclaration(record) => { 702 721 if rkey == "self" { 703 722 db::chat_decl_upsert(conn, repo, record).await?; 723 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 704 724 } 705 725 } 706 726 } ··· 712 732 713 733 pub async fn index_op_delete( 714 734 conn: &mut Transaction<'_>, 735 + rc: &mut MultiplexedConnection, 715 736 deltas: &mut impl AggregateDeltaStore, 716 737 repo: &str, 717 738 collection: CollectionType, 718 739 at_uri: &str, 719 740 rkey: &str, 720 - ) -> Result<(), tokio_postgres::Error> { 741 + ) -> eyre::Result<()> { 721 742 match collection { 722 - CollectionType::BskyProfile => db::profile_delete(conn, repo).await?, 723 - CollectionType::BskyStatus => db::status_delete(conn, repo).await?, 743 + CollectionType::BskyProfile => { 744 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 745 + db::profile_delete(conn, repo).await? 746 + } 747 + CollectionType::BskyStatus => { 748 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 749 + db::status_delete(conn, repo).await? 750 + } 724 751 CollectionType::BskyBlock => db::block_delete(conn, rkey, repo).await?, 725 752 CollectionType::BskyFeedGen => { 753 + redis::AsyncTypedCommands::del(rc, format!("feedgen#{at_uri}")).await?; 726 754 let count = db::feedgen_delete(conn, at_uri).await?; 727 755 deltas 728 756 .add_delta(repo, AggregateType::ProfileFeed, -(count as i32)) ··· 739 767 let post_info = db::post_get_info_for_delete(conn, at_uri).await?; 740 768 741 769 db::post_delete(conn, at_uri).await?; 770 + redis::AsyncTypedCommands::del(rc, format!("post#{at_uri}")).await?; 742 771 743 772 if let Some((reply_to, embed)) = post_info { 744 773 deltas.decr(repo, AggregateType::ProfilePost).await; ··· 759 788 } 760 789 0 761 790 } 762 - CollectionType::BskyFeedThreadgate => db::threadgate_delete(conn, at_uri).await?, 791 + CollectionType::BskyFeedThreadgate => { 792 + redis::AsyncTypedCommands::del(rc, format!("post#{at_uri}")).await?; 793 + db::threadgate_delete(conn, at_uri).await? 794 + } 763 795 CollectionType::BskyFollow => { 764 796 if let Some(followee) = db::follow_delete(conn, rkey, repo).await? { 765 797 deltas.decr(&followee, AggregateType::Follower).await; ··· 768 800 0 769 801 } 770 802 CollectionType::BskyList => { 803 + redis::AsyncTypedCommands::del(rc, format!("list#{at_uri}")).await?; 771 804 let count = db::list_delete(conn, at_uri).await?; 772 805 deltas 773 806 .add_delta(repo, AggregateType::ProfileList, -(count as i32)) ··· 775 808 count 776 809 } 777 810 CollectionType::BskyListBlock => db::list_block_delete(conn, at_uri).await?, 778 - CollectionType::BskyListItem => db::list_item_delete(conn, at_uri).await?, 811 + CollectionType::BskyListItem => { 812 + redis::AsyncTypedCommands::del(rc, format!("list#{at_uri}")).await?; 813 + db::list_item_delete(conn, at_uri).await? 814 + } 779 815 CollectionType::BskyStarterPack => { 816 + redis::AsyncTypedCommands::del(rc, format!("starterpacks#{at_uri}")).await?; 780 817 let count = db::starter_pack_delete(conn, at_uri).await?; 781 818 deltas 782 819 .add_delta(repo, AggregateType::ProfileStarterpack, -(count as i32)) ··· 784 821 count 785 822 } 786 823 CollectionType::BskyVerification => db::verification_delete(conn, at_uri).await?, 787 - CollectionType::BskyLabelerService => db::labeler_delete(conn, at_uri).await?, 788 - CollectionType::BskyNotificationDeclaration => db::notif_decl_delete(conn, repo).await?, 789 - CollectionType::ChatActorDecl => db::chat_decl_delete(conn, repo).await?, 824 + CollectionType::BskyLabelerService => { 825 + redis::AsyncTypedCommands::del(rc, format!("labeler#{repo}")).await?; 826 + db::labeler_delete(conn, at_uri).await? 827 + } 828 + CollectionType::BskyNotificationDeclaration => { 829 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 830 + db::notif_decl_delete(conn, repo).await? 831 + } 832 + CollectionType::ChatActorDecl => { 833 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 834 + db::chat_decl_delete(conn, repo).await? 835 + } 790 836 _ => unreachable!(), 791 837 }; 792 838
+200
dataloader-rs/src/async_cached.rs
··· 1 + use crate::runtime::{Arc, Mutex}; 2 + use crate::{yield_fn, BatchFn, WaitForWorkFn}; 3 + use std::collections::{HashMap, HashSet}; 4 + use std::hash::Hash; 5 + use std::iter::IntoIterator; 6 + 7 + pub trait AsyncCache { 8 + type Key; 9 + type Val; 10 + async fn get(&mut self, key: &Self::Key) -> Option<Self::Val>; 11 + async fn insert(&mut self, key: Self::Key, val: Self::Val); 12 + async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val>; 13 + async fn clear(&mut self); 14 + } 15 + 16 + struct State<K, V, C> 17 + where 18 + C: AsyncCache<Key = K, Val = V>, 19 + { 20 + completed: C, 21 + pending: HashSet<K>, 22 + } 23 + 24 + impl<K: Eq + Hash, V, C> State<K, V, C> 25 + where 26 + C: AsyncCache<Key = K, Val = V>, 27 + { 28 + fn with_cache(cache: C) -> Self { 29 + State { 30 + completed: cache, 31 + pending: HashSet::new(), 32 + } 33 + } 34 + } 35 + 36 + #[derive(Clone)] 37 + pub struct Loader<K, V, F, C> 38 + where 39 + K: Eq + Hash + Clone, 40 + V: Clone, 41 + F: BatchFn<K, V>, 42 + C: AsyncCache<Key = K, Val = V>, 43 + { 44 + state: Arc<Mutex<State<K, V, C>>>, 45 + load_fn: Arc<Mutex<F>>, 46 + wait_for_work_fn: Arc<dyn WaitForWorkFn>, 47 + max_batch_size: usize, 48 + } 49 + 50 + impl<K, V, F, C> Loader<K, V, F, C> 51 + where 52 + K: Eq + Hash + Clone, 53 + V: Clone, 54 + F: BatchFn<K, V>, 55 + C: AsyncCache<Key = K, Val = V>, 56 + { 57 + pub fn new(load_fn: F, cache: C) -> Self { 58 + Loader { 59 + state: Arc::new(Mutex::new(State::with_cache(cache))), 60 + load_fn: Arc::new(Mutex::new(load_fn)), 61 + max_batch_size: 200, 62 + wait_for_work_fn: Arc::new(yield_fn(10)), 63 + } 64 + } 65 + 66 + pub fn with_max_batch_size(mut self, max_batch_size: usize) -> Self { 67 + self.max_batch_size = max_batch_size; 68 + self 69 + } 70 + 71 + pub fn with_yield_count(mut self, yield_count: usize) -> Self { 72 + self.wait_for_work_fn = Arc::new(yield_fn(yield_count)); 73 + self 74 + } 75 + 76 + /// Replaces the yielding for work behavior with an arbitrary future. Rather than yielding 77 + /// the runtime repeatedly this will generate and `.await` a future of your choice. 78 + /// ***This is incompatible with*** [`Self::with_yield_count()`]. 79 + pub fn with_custom_wait_for_work(mut self, wait_for_work_fn: impl WaitForWorkFn) -> Self { 80 + self.wait_for_work_fn = Arc::new(wait_for_work_fn); 81 + self 82 + } 83 + 84 + pub fn max_batch_size(&self) -> usize { 85 + self.max_batch_size 86 + } 87 + 88 + pub async fn load(&self, key: K) -> Option<V> { 89 + let mut state = self.state.lock().await; 90 + if let Some(v) = state.completed.get(&key).await { 91 + return Some(v.clone()); 92 + } 93 + 94 + if !state.pending.contains(&key) { 95 + state.pending.insert(key.clone()); 96 + if state.pending.len() >= self.max_batch_size { 97 + let keys = state.pending.drain().collect::<Vec<K>>(); 98 + let mut load_fn = self.load_fn.lock().await; 99 + let load_ret = load_fn.load(keys.as_ref()).await; 100 + drop(load_fn); 101 + for (k, v) in load_ret.into_iter() { 102 + state.completed.insert(k, v).await; 103 + } 104 + return state.completed.get(&key).await.clone(); 105 + } 106 + } 107 + drop(state); 108 + 109 + (self.wait_for_work_fn)().await; 110 + 111 + let mut state = self.state.lock().await; 112 + if let Some(v) = state.completed.get(&key).await { 113 + return Some(v.clone()); 114 + } 115 + 116 + if !state.pending.is_empty() { 117 + let keys = state.pending.drain().collect::<Vec<K>>(); 118 + let mut load_fn = self.load_fn.lock().await; 119 + let load_ret = load_fn.load(keys.as_ref()).await; 120 + drop(load_fn); 121 + for (k, v) in load_ret.into_iter() { 122 + state.completed.insert(k, v).await; 123 + } 124 + } 125 + 126 + state.completed.get(&key).await.clone() 127 + } 128 + 129 + pub async fn load_many(&self, keys: Vec<K>) -> HashMap<K, V> { 130 + let mut state = self.state.lock().await; 131 + let mut ret = HashMap::new(); 132 + let mut rest = Vec::new(); 133 + for key in keys.into_iter() { 134 + if let Some(v) = state.completed.get(&key).await.clone() { 135 + ret.insert(key, v); 136 + continue; 137 + } 138 + if !state.pending.contains(&key) { 139 + state.pending.insert(key.clone()); 140 + 141 + if state.pending.len() >= self.max_batch_size { 142 + let keys = state.pending.drain().collect::<Vec<K>>(); 143 + let mut load_fn = self.load_fn.lock().await; 144 + let load_ret = load_fn.load(keys.as_ref()).await; 145 + drop(load_fn); 146 + for (k, v) in load_ret.into_iter() { 147 + state.completed.insert(k, v).await; 148 + } 149 + } 150 + } 151 + rest.push(key); 152 + } 153 + drop(state); 154 + 155 + (self.wait_for_work_fn)().await; 156 + 157 + if !rest.is_empty() { 158 + let mut state = self.state.lock().await; 159 + if !state.pending.is_empty() { 160 + let keys = state.pending.drain().collect::<Vec<K>>(); 161 + let mut load_fn = self.load_fn.lock().await; 162 + let load_ret = load_fn.load(keys.as_ref()).await; 163 + drop(load_fn); 164 + for (k, v) in load_ret.into_iter() { 165 + state.completed.insert(k, v).await; 166 + } 167 + } 168 + 169 + for key in rest.into_iter() { 170 + if let Some(v) = state.completed.get(&key).await.clone() { 171 + ret.insert(key, v); 172 + } 173 + } 174 + } 175 + 176 + ret 177 + } 178 + 179 + pub async fn prime(&self, key: K, val: V) { 180 + let mut state = self.state.lock().await; 181 + state.completed.insert(key, val).await; 182 + } 183 + 184 + pub async fn prime_many(&self, values: impl IntoIterator<Item = (K, V)>) { 185 + let mut state = self.state.lock().await; 186 + for (k, v) in values.into_iter() { 187 + state.completed.insert(k, v).await; 188 + } 189 + } 190 + 191 + pub async fn clear(&self, key: K) { 192 + let mut state = self.state.lock().await; 193 + state.completed.remove(&key).await; 194 + } 195 + 196 + pub async fn clear_all(&self) { 197 + let mut state = self.state.lock().await; 198 + state.completed.clear().await 199 + } 200 + }
+1
dataloader-rs/src/lib.rs
··· 1 1 #![allow(async_fn_in_trait)] 2 2 3 + pub mod async_cached; 3 4 mod batch_fn; 4 5 pub mod cached; 5 6 pub mod non_cached;
+1
parakeet-db/Cargo.toml
··· 7 7 chrono = { version = "0.4.39", features = ["serde"] } 8 8 diesel = { version = "2.2.6", features = ["chrono", "serde_json"], optional = true } 9 9 postgres-types = { version = "0.2.9", optional = true } 10 + serde = { version = "1.0.217", features = ["derive"] } 10 11 serde_json = "1.0.134" 11 12 12 13 [features]
+16 -15
parakeet-db/src/models.rs
··· 1 1 use crate::types::*; 2 2 use chrono::prelude::*; 3 3 use diesel::prelude::*; 4 + use serde::{Deserialize, Serialize}; 4 5 5 6 #[derive(Debug, Queryable, Selectable, Identifiable)] 6 7 #[diesel(table_name = crate::schema::actors)] ··· 16 17 pub last_indexed: Option<NaiveDateTime>, 17 18 } 18 19 19 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 20 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 20 21 #[diesel(table_name = crate::schema::profiles)] 21 22 #[diesel(primary_key(did))] 22 23 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 40 41 pub indexed_at: NaiveDateTime, 41 42 } 42 43 43 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 44 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 44 45 #[diesel(table_name = crate::schema::lists)] 45 46 #[diesel(primary_key(at_uri))] 46 47 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 85 86 pub indexed_at: NaiveDateTime, 86 87 } 87 88 88 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 89 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 89 90 #[diesel(table_name = crate::schema::feedgens)] 90 91 #[diesel(primary_key(at_uri))] 91 92 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 121 122 pub indexed_at: NaiveDateTime, 122 123 } 123 124 124 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 125 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 125 126 #[diesel(table_name = crate::schema::posts)] 126 127 #[diesel(primary_key(at_uri))] 127 128 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 148 149 pub indexed_at: NaiveDateTime, 149 150 } 150 151 151 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 152 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 152 153 #[diesel(table_name = crate::schema::post_embed_images)] 153 154 #[diesel(primary_key(post_uri, seq))] 154 155 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 164 165 pub height: Option<i32>, 165 166 } 166 167 167 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 168 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 168 169 #[diesel(table_name = crate::schema::post_embed_video)] 169 170 #[diesel(primary_key(post_uri))] 170 171 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 179 180 pub height: Option<i32>, 180 181 } 181 182 182 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 183 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 183 184 #[diesel(table_name = crate::schema::post_embed_video_captions)] 184 185 #[diesel(primary_key(post_uri, language))] 185 186 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 191 192 pub cid: String, 192 193 } 193 194 194 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 195 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 195 196 #[diesel(table_name = crate::schema::post_embed_ext)] 196 197 #[diesel(primary_key(post_uri))] 197 198 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 206 207 pub thumb_cid: Option<String>, 207 208 } 208 209 209 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 210 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 210 211 #[diesel(table_name = crate::schema::post_embed_record)] 211 212 #[diesel(primary_key(post_uri))] 212 213 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 236 237 pub indexed_at: NaiveDateTime, 237 238 } 238 239 239 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 240 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 240 241 #[diesel(table_name = crate::schema::threadgates)] 241 242 #[diesel(primary_key(post_uri))] 242 243 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 255 256 pub indexed_at: NaiveDateTime, 256 257 } 257 258 258 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 259 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 259 260 #[diesel(table_name = crate::schema::starterpacks)] 260 261 #[diesel(primary_key(at_uri))] 261 262 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 275 276 pub indexed_at: NaiveDateTime, 276 277 } 277 278 278 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 279 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 279 280 #[diesel(table_name = crate::schema::labelers)] 280 281 #[diesel(primary_key(did))] 281 282 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 291 292 pub indexed_at: NaiveDateTime, 292 293 } 293 294 294 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable, Associations)] 295 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable, Associations)] 295 296 #[diesel(table_name = crate::schema::labeler_defs)] 296 297 #[diesel(belongs_to(LabelerService, foreign_key = labeler))] 297 298 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 329 330 pub indexed_at: NaiveDateTime, 330 331 } 331 332 332 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 333 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 333 334 #[diesel(table_name = crate::schema::verification)] 334 335 #[diesel(primary_key(at_uri))] 335 336 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 346 347 pub indexed_at: NaiveDateTime, 347 348 } 348 349 349 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 350 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 350 351 #[diesel(table_name = crate::schema::statuses)] 351 352 #[diesel(primary_key(did))] 352 353 #[diesel(check_for_backend(diesel::pg::Pg))]
+2
parakeet/Cargo.toml
··· 23 23 multibase = "0.9.1" 24 24 parakeet-db = { path = "../parakeet-db" } 25 25 parakeet-index = { path = "../parakeet-index" } 26 + redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 26 27 reqwest = { version = "0.12", features = ["json"] } 27 28 serde = { version = "1.0.217", features = ["derive"] } 29 + serde_ipld_dagcbor = "0.6.1" 28 30 serde_json = "1.0.134" 29 31 tokio = { version = "1.42.0", features = ["full"] } 30 32 tower-http = { version = "0.6.2", features = ["cors", "trace"] }
+150
parakeet/src/cache.rs
··· 1 + use dataloader::async_cached::AsyncCache; 2 + use redis::aio::MultiplexedConnection; 3 + use redis::AsyncTypedCommands; 4 + use serde::{Deserialize, Serialize}; 5 + use std::marker::PhantomData; 6 + 7 + /// General Loader Cache 8 + pub struct LoaderCache<V> { 9 + conn: MultiplexedConnection, 10 + exp: Option<u64>, 11 + _phantom: PhantomData<V>, 12 + } 13 + 14 + impl<V> LoaderCache<V> { 15 + pub fn new(conn: &MultiplexedConnection, exp: Option<u64>) -> Self { 16 + LoaderCache::<V> { 17 + conn: conn.clone(), 18 + exp, 19 + _phantom: PhantomData, 20 + } 21 + } 22 + } 23 + 24 + impl<V> AsyncCache for LoaderCache<V> 25 + where 26 + V: for<'a> Deserialize<'a> + Serialize, 27 + { 28 + type Key = String; 29 + type Val = V; 30 + 31 + async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 32 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 33 + 34 + match serde_ipld_dagcbor::from_slice(&res?) { 35 + Ok(v) => Some(v), 36 + Err(err) => { 37 + tracing::error!(key, "failed to decode cache value: {err}"); 38 + None 39 + } 40 + } 41 + } 42 + 43 + async fn insert(&mut self, key: Self::Key, val: Self::Val) { 44 + let data = match serde_ipld_dagcbor::to_vec(&val) { 45 + Ok(data) => data, 46 + Err(err) => { 47 + tracing::error!(key, "failed to encode cache value: {err}"); 48 + return; 49 + } 50 + }; 51 + 52 + if let Some(exp) = self.exp { 53 + self.conn.set_ex(key, data, exp).await.unwrap(); 54 + } else { 55 + self.conn.set(key, data).await.unwrap(); 56 + } 57 + } 58 + 59 + async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 60 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 61 + .await 62 + .ok()?; 63 + 64 + match serde_ipld_dagcbor::from_slice(&res?) { 65 + Ok(v) => Some(v), 66 + Err(err) => { 67 + tracing::error!(key, "failed to decode cache value: {err}"); 68 + None 69 + } 70 + } 71 + } 72 + 73 + async fn clear(&mut self) {} 74 + } 75 + 76 + /// A Loader Cache in with a key prefix 77 + pub struct PrefixedLoaderCache<V> { 78 + conn: MultiplexedConnection, 79 + prefix: String, 80 + exp: Option<u64>, 81 + _phantom: PhantomData<V>, 82 + } 83 + 84 + impl<V> PrefixedLoaderCache<V> { 85 + pub fn new(conn: &MultiplexedConnection, prefix: String, exp: Option<u64>) -> Self { 86 + PrefixedLoaderCache { 87 + conn: conn.clone(), 88 + prefix, 89 + exp, 90 + _phantom: PhantomData, 91 + } 92 + } 93 + } 94 + 95 + impl<V> AsyncCache for PrefixedLoaderCache<V> 96 + where 97 + V: for<'a> Deserialize<'a> + Serialize, 98 + { 99 + type Key = String; 100 + type Val = V; 101 + 102 + async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 103 + let key = format!("{}#{}", self.prefix, key); 104 + 105 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 106 + 107 + match serde_ipld_dagcbor::from_slice(&res?) { 108 + Ok(v) => Some(v), 109 + Err(err) => { 110 + tracing::error!(key, "failed to decode cache value: {err}"); 111 + None 112 + } 113 + } 114 + } 115 + 116 + async fn insert(&mut self, key: Self::Key, val: Self::Val) { 117 + let key = format!("{}#{}", self.prefix, key); 118 + let data = match serde_ipld_dagcbor::to_vec(&val) { 119 + Ok(data) => data, 120 + Err(err) => { 121 + tracing::error!(key = &key, "failed to encode cache value: {err}"); 122 + return; 123 + } 124 + }; 125 + 126 + if let Some(exp) = self.exp { 127 + self.conn.set_ex(key, data, exp).await.unwrap(); 128 + } else { 129 + self.conn.set(key, data).await.unwrap(); 130 + } 131 + } 132 + 133 + async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 134 + let key = format!("{}#{}", self.prefix, key); 135 + 136 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 137 + .await 138 + .ok()?; 139 + 140 + match serde_ipld_dagcbor::from_slice(&res?) { 141 + Ok(v) => Some(v), 142 + Err(err) => { 143 + tracing::error!(key, "failed to decode cache value: {err}"); 144 + None 145 + } 146 + } 147 + } 148 + 149 + async fn clear(&mut self) {} 150 + }
+1
parakeet/src/config.rs
··· 15 15 pub struct Config { 16 16 pub index_uri: String, 17 17 pub database_url: String, 18 + pub cache_uri: String, 18 19 #[serde(default)] 19 20 pub server: ConfigServer, 20 21 pub service: ConfigService,
+6 -3
parakeet/src/hydration/feedgen.rs
··· 43 43 impl super::StatefulHydrator<'_> { 44 44 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 45 45 let labels = self.get_label(&feedgen).await; 46 - let (feedgen, likes) = self.loaders.feedgen.load(feedgen).await?; 46 + let likes = self.loaders.like.load(feedgen.clone()).await; 47 + let feedgen = self.loaders.feedgen.load(feedgen).await?; 47 48 let profile = self.hydrate_profile(feedgen.owner.clone()).await?; 48 49 49 50 Some(build_feedgen(feedgen, profile, labels, likes, &self.cdn)) ··· 51 52 52 53 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 53 54 let labels = self.get_label_many(&feedgens).await; 55 + let mut likes = self.loaders.like.load_many(feedgens.clone()).await; 54 56 let feedgens = self.loaders.feedgen.load_many(feedgens).await; 55 57 56 58 let creators = feedgens 57 59 .values() 58 - .map(|(feedgen, _)| feedgen.owner.clone()) 60 + .map(|feedgen| feedgen.owner.clone()) 59 61 .collect(); 60 62 61 63 let creators = self.hydrate_profiles(creators).await; 62 64 63 65 feedgens 64 66 .into_iter() 65 - .filter_map(|(uri, (feedgen, likes))| { 67 + .filter_map(|(uri, feedgen)| { 66 68 let creator = creators.get(&feedgen.owner).cloned()?; 67 69 let labels = labels.get(&uri).cloned().unwrap_or_default(); 70 + let likes = likes.remove(&uri); 68 71 69 72 Some(( 70 73 uri,
+23 -14
parakeet/src/hydration/labeler.rs
··· 92 92 impl StatefulHydrator<'_> { 93 93 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 94 94 let labels = self.get_label(&labeler).await; 95 - let (labeler, _, likes) = self.loaders.labeler.load(labeler).await?; 95 + let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 96 + let (labeler, _) = self.loaders.labeler.load(labeler).await?; 96 97 let creator = self.hydrate_profile(labeler.did.clone()).await?; 97 98 98 99 Some(build_view(labeler, creator, labels, likes)) ··· 102 103 let labels = self.get_label_many(&labelers).await; 103 104 let labelers = self.loaders.labeler.load_many(labelers).await; 104 105 105 - let creators = labelers 106 + let (creators, uris) = labelers 106 107 .values() 107 - .map(|(labeler, _, _)| labeler.did.clone()) 108 - .collect(); 108 + .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 109 + .unzip::<_, _, Vec<_>, Vec<_>>(); 109 110 let creators = self.hydrate_profiles(creators).await; 111 + let mut likes = self.loaders.like.load_many(uris.clone()).await; 110 112 111 113 labelers 112 114 .into_iter() 113 - .filter_map(|(k, (labeler, _, likes))| { 115 + .filter_map(|(k, (labeler, _))| { 114 116 let creator = creators.get(&labeler.did).cloned()?; 115 117 let labels = labels.get(&k).cloned().unwrap_or_default(); 118 + let likes = likes.remove(&make_labeler_uri(&labeler.did)); 116 119 117 120 Some((k, build_view(labeler, creator, labels, likes))) 118 121 }) ··· 121 124 122 125 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 123 126 let labels = self.get_label(&labeler).await; 124 - let (labeler, defs, likes) = self.loaders.labeler.load(labeler).await?; 127 + let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 128 + let (labeler, defs) = self.loaders.labeler.load(labeler).await?; 125 129 let creator = self.hydrate_profile(labeler.did.clone()).await?; 126 130 127 131 Some(build_view_detailed(labeler, defs, creator, labels, likes)) ··· 134 138 let labels = self.get_label_many(&labelers).await; 135 139 let labelers = self.loaders.labeler.load_many(labelers).await; 136 140 137 - let creators = labelers 141 + let (creators, uris) = labelers 138 142 .values() 139 - .map(|(labeler, _, _)| labeler.did.clone()) 140 - .collect(); 143 + .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 144 + .unzip::<_, _, Vec<_>, Vec<_>>(); 141 145 let creators = self.hydrate_profiles(creators).await; 146 + let mut likes = self.loaders.like.load_many(uris.clone()).await; 142 147 143 148 labelers 144 149 .into_iter() 145 - .filter_map(|(k, (labeler, defs, likes))| { 150 + .filter_map(|(k, (labeler, defs))| { 146 151 let creator = creators.get(&labeler.did).cloned()?; 147 152 let labels = labels.get(&k).cloned().unwrap_or_default(); 153 + let likes = likes.remove(&make_labeler_uri(&labeler.did)); 148 154 149 - Some(( 150 - k, 151 - build_view_detailed(labeler, defs, creator, labels, likes), 152 - )) 155 + let view = build_view_detailed(labeler, defs, creator, labels, likes); 156 + 157 + Some((k, view)) 153 158 }) 154 159 .collect() 155 160 } 156 161 } 162 + 163 + fn make_labeler_uri(did: &str) -> String { 164 + format!("at://{did}/app.bsky.labeler.service/self") 165 + }
+12 -7
parakeet/src/hydration/posts.rs
··· 99 99 } 100 100 101 101 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 102 - let (post, threadgate, stats) = self.loaders.posts.load(post).await?; 102 + let stats = self.loaders.post_stats.load(post.clone()).await; 103 + let (post, threadgate) = self.loaders.posts.load(post).await?; 103 104 let embed = self.hydrate_embed(post.at_uri.clone()).await; 104 105 let author = self.hydrate_profile_basic(post.did.clone()).await?; 105 106 let threadgate = self.hydrate_threadgate(threadgate).await; ··· 111 112 } 112 113 113 114 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 115 + let stats = self.loaders.post_stats.load_many(posts.clone()).await; 114 116 let posts = self.loaders.posts.load_many(posts).await; 115 117 116 118 let (authors, post_uris) = posts 117 119 .values() 118 - .map(|(post, _, _)| (post.did.clone(), post.at_uri.clone())) 120 + .map(|(post, _)| (post.did.clone(), post.at_uri.clone())) 119 121 .unzip::<_, _, Vec<_>, Vec<_>>(); 120 122 let authors = self.hydrate_profiles_basic(authors).await; 121 123 ··· 123 125 124 126 let threadgates = posts 125 127 .values() 126 - .filter_map(|(_, threadgate, _)| threadgate.clone()) 128 + .filter_map(|(_, threadgate)| threadgate.clone()) 127 129 .collect(); 128 130 let threadgates = self.hydrate_threadgates(threadgates).await; 129 131 ··· 131 133 132 134 posts 133 135 .into_iter() 134 - .filter_map(|(uri, (post, threadgate, stats))| { 136 + .filter_map(|(uri, (post, threadgate))| { 135 137 let author = authors.get(&post.did)?; 136 138 let embed = embeds.get(&uri).cloned(); 137 139 let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned()); 138 140 let labels = post_labels.get(&uri).cloned().unwrap_or_default(); 141 + let stats = stats.get(&uri).cloned(); 139 142 140 143 Some(( 141 144 uri, ··· 146 149 } 147 150 148 151 pub async fn hydrate_feed_posts(&self, posts: Vec<String>) -> HashMap<String, FeedViewPost> { 152 + let stats = self.loaders.post_stats.load_many(posts.clone()).await; 149 153 let posts = self.loaders.posts.load_many(posts).await; 150 154 151 155 let (authors, post_uris) = posts 152 156 .values() 153 - .map(|(post, _, _)| (post.did.clone(), post.at_uri.clone())) 157 + .map(|(post, _)| (post.did.clone(), post.at_uri.clone())) 154 158 .unzip::<_, _, Vec<_>, Vec<_>>(); 155 159 let authors = self.hydrate_profiles_basic(authors).await; 156 160 ··· 160 164 161 165 let reply_refs = posts 162 166 .values() 163 - .flat_map(|(post, _, _)| [post.parent_uri.clone(), post.root_uri.clone()]) 167 + .flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()]) 164 168 .flatten() 165 169 .collect::<Vec<_>>(); 166 170 ··· 168 172 169 173 posts 170 174 .into_iter() 171 - .filter_map(|(post_uri, (post, _, stats))| { 175 + .filter_map(|(post_uri, (post, _))| { 172 176 let author = authors.get(&post.did)?; 173 177 174 178 let root = post.root_uri.as_ref().and_then(|uri| reply_posts.get(uri)); ··· 199 203 200 204 let embed = embeds.get(&post_uri).cloned(); 201 205 let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); 206 + let stats = stats.get(&post_uri).cloned(); 202 207 let post = build_postview(post, author.to_owned(), labels, embed, None, stats); 203 208 204 209 Some((
+27 -11
parakeet/src/hydration/profile.rs
··· 152 152 } 153 153 154 154 fn build_basic( 155 - (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 155 + (handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 156 + stats: Option<ProfileStats>, 156 157 labels: Vec<models::Label>, 157 158 verifications: Option<Vec<models::VerificationEntry>>, 158 159 cdn: &BskyCdn, ··· 176 177 } 177 178 178 179 fn build_profile( 179 - (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 180 + (handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 181 + stats: Option<ProfileStats>, 180 182 labels: Vec<models::Label>, 181 183 verifications: Option<Vec<models::VerificationEntry>>, 182 184 cdn: &BskyCdn, ··· 202 204 } 203 205 204 206 fn build_detailed( 205 - (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 207 + (handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 208 + stats: Option<ProfileStats>, 206 209 labels: Vec<models::Label>, 207 210 verifications: Option<Vec<models::VerificationEntry>>, 208 211 cdn: &BskyCdn, ··· 235 238 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 236 239 let labels = self.get_profile_label(&did).await; 237 240 let verif = self.loaders.verification.load(did.clone()).await; 241 + let stats = self.loaders.profile_stats.load(did.clone()).await; 238 242 let profile_info = self.loaders.profile.load(did).await?; 239 243 240 - Some(build_basic(profile_info, labels, verif, &self.cdn)) 244 + Some(build_basic(profile_info, stats, labels, verif, &self.cdn)) 241 245 } 242 246 243 247 pub async fn hydrate_profiles_basic( ··· 246 250 ) -> HashMap<String, ProfileViewBasic> { 247 251 let labels = self.get_profile_label_many(&dids).await; 248 252 let verif = self.loaders.verification.load_many(dids.clone()).await; 253 + let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 249 254 let profiles = self.loaders.profile.load_many(dids).await; 250 255 251 256 profiles ··· 253 258 .map(|(k, profile_info)| { 254 259 let labels = labels.get(&k).cloned().unwrap_or_default(); 255 260 let verif = verif.get(&k).cloned(); 261 + let stats = stats.get(&k).cloned(); 256 262 257 - let v = build_basic(profile_info, labels, verif, &self.cdn); 263 + let v = build_basic(profile_info, stats, labels, verif, &self.cdn); 258 264 (k, v) 259 265 }) 260 266 .collect() ··· 262 268 263 269 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 264 270 let labels = self.get_profile_label(&did).await; 265 - 266 271 let verif = self.loaders.verification.load(did.clone()).await; 272 + let stats = self.loaders.profile_stats.load(did.clone()).await; 267 273 let profile_info = self.loaders.profile.load(did).await?; 268 274 269 - Some(build_profile(profile_info, labels, verif, &self.cdn)) 275 + Some(build_profile(profile_info, stats, labels, verif, &self.cdn)) 270 276 } 271 277 272 278 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 273 279 let labels = self.get_profile_label_many(&dids).await; 274 280 let verif = self.loaders.verification.load_many(dids.clone()).await; 281 + let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 275 282 let profiles = self.loaders.profile.load_many(dids).await; 276 283 277 284 profiles ··· 279 286 .map(|(k, profile_info)| { 280 287 let labels = labels.get(&k).cloned().unwrap_or_default(); 281 288 let verif = verif.get(&k).cloned(); 289 + let stats = stats.get(&k).cloned(); 282 290 283 - let v = build_profile(profile_info, labels, verif, &self.cdn); 291 + let v = build_profile(profile_info, stats, labels, verif, &self.cdn); 284 292 (k, v) 285 293 }) 286 294 .collect() ··· 288 296 289 297 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 290 298 let labels = self.get_profile_label(&did).await; 291 - 292 299 let verif = self.loaders.verification.load(did.clone()).await; 300 + let stats = self.loaders.profile_stats.load(did.clone()).await; 293 301 let profile_info = self.loaders.profile.load(did).await?; 294 302 295 - Some(build_detailed(profile_info, labels, verif, &self.cdn)) 303 + Some(build_detailed( 304 + profile_info, 305 + stats, 306 + labels, 307 + verif, 308 + &self.cdn, 309 + )) 296 310 } 297 311 298 312 pub async fn hydrate_profiles_detailed( ··· 301 315 ) -> HashMap<String, ProfileViewDetailed> { 302 316 let labels = self.get_profile_label_many(&dids).await; 303 317 let verif = self.loaders.verification.load_many(dids.clone()).await; 318 + let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 304 319 let profiles = self.loaders.profile.load_many(dids).await; 305 320 306 321 profiles ··· 308 323 .map(|(k, profile_info)| { 309 324 let labels = labels.get(&k).cloned().unwrap_or_default(); 310 325 let verif = verif.get(&k).cloned(); 326 + let stats = stats.get(&k).cloned(); 311 327 312 - let v = build_detailed(profile_info, labels, verif, &self.cdn); 328 + let v = build_detailed(profile_info, stats, labels, verif, &self.cdn); 313 329 (k, v) 314 330 }) 315 331 .collect()
+123 -114
parakeet/src/loaders.rs
··· 1 + use crate::cache::PrefixedLoaderCache; 1 2 use crate::xrpc::extract::LabelConfigItem; 2 - use dataloader::cached::Loader; 3 + use dataloader::async_cached::Loader; 4 + use dataloader::non_cached::Loader as NonCachedLoader; 3 5 use dataloader::BatchFn; 4 6 use diesel::prelude::*; 5 7 use diesel_async::pooled_connection::deadpool::Pool; ··· 7 9 use itertools::Itertools; 8 10 use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions}; 9 11 use parakeet_db::{models, schema}; 12 + use redis::aio::MultiplexedConnection; 13 + use serde::{Deserialize, Serialize}; 10 14 use std::collections::HashMap; 11 15 use std::str::FromStr; 12 16 17 + type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 18 + 19 + fn new_plc_loader<V, F>( 20 + load_fn: F, 21 + conn: &MultiplexedConnection, 22 + prefix: &str, 23 + exp: u64, 24 + ) -> Loader<String, V, F, PrefixedLoaderCache<V>> 25 + where 26 + V: Clone + Serialize + for<'a> Deserialize<'a>, 27 + F: BatchFn<String, V>, 28 + { 29 + Loader::new( 30 + load_fn, 31 + PrefixedLoaderCache::new(conn, prefix.to_string(), Some(exp)), 32 + ) 33 + } 34 + 13 35 pub struct Dataloaders { 14 - pub embed: Loader<String, (EmbedLoaderRet, String), EmbedLoader>, 15 - pub feedgen: Loader<String, FeedGenLoaderRet, FeedGenLoader>, 16 - pub handle: Loader<String, String, HandleLoader>, 36 + pub embed: CachingLoader<String, (EmbedLoaderRet, String), EmbedLoader>, 37 + pub feedgen: CachingLoader<String, models::FeedGen, FeedGenLoader>, 38 + pub handle: CachingLoader<String, String, HandleLoader>, 17 39 pub label: LabelLoader, 18 - pub labeler: Loader<String, LabelServiceLoaderRet, LabelServiceLoader>, 19 - pub list: Loader<String, ListLoaderRet, ListLoader>, 20 - pub posts: Loader<String, PostLoaderRet, PostLoader>, 21 - pub profile: Loader<String, ProfileLoaderRet, ProfileLoader>, 22 - pub starterpacks: Loader<String, StarterPackLoaderRet, StarterPackLoader>, 23 - pub verification: Loader<String, Vec<models::VerificationEntry>, VerificationLoader>, 40 + pub labeler: CachingLoader<String, LabelServiceLoaderRet, LabelServiceLoader>, 41 + pub list: CachingLoader<String, ListLoaderRet, ListLoader>, 42 + pub like: NonCachedLoader<String, i32, LikeLoader>, 43 + pub posts: CachingLoader<String, PostLoaderRet, PostLoader>, 44 + pub post_stats: NonCachedLoader<String, parakeet_index::PostStats, PostStatsLoader>, 45 + pub profile: CachingLoader<String, ProfileLoaderRet, ProfileLoader>, 46 + pub profile_stats: NonCachedLoader<String, parakeet_index::ProfileStats, ProfileStatsLoader>, 47 + pub starterpacks: CachingLoader<String, StarterPackLoaderRet, StarterPackLoader>, 48 + pub verification: CachingLoader<String, Vec<models::VerificationEntry>, VerificationLoader>, 24 49 } 25 50 26 51 impl Dataloaders { 27 - // for the moment, we set up memory cached loaders 28 - // we should build a redis/valkey backend at some point in the future. 29 - pub fn new(pool: Pool<AsyncPgConnection>, idxc: parakeet_index::Client) -> Dataloaders { 52 + #[rustfmt::skip] 53 + pub fn new( 54 + pool: Pool<AsyncPgConnection>, 55 + rc: MultiplexedConnection, 56 + idxc: parakeet_index::Client, 57 + ) -> Dataloaders { 30 58 Dataloaders { 31 - embed: Loader::new(EmbedLoader(pool.clone())), 32 - feedgen: Loader::new(FeedGenLoader(pool.clone(), idxc.clone())), 33 - handle: Loader::new(HandleLoader(pool.clone())), 59 + embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600), 60 + feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600), 61 + handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60), 34 62 label: LabelLoader(pool.clone()), // CARE: never cache this. 35 - labeler: Loader::new(LabelServiceLoader(pool.clone(), idxc.clone())), 36 - list: Loader::new(ListLoader(pool.clone())), 37 - posts: Loader::new(PostLoader(pool.clone(), idxc.clone())), 38 - profile: Loader::new(ProfileLoader(pool.clone(), idxc.clone())), 39 - starterpacks: Loader::new(StarterPackLoader(pool.clone())), 40 - verification: Loader::new(VerificationLoader(pool.clone())), 63 + labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 64 + like: NonCachedLoader::new(LikeLoader(idxc.clone())), 65 + list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), 66 + posts: new_plc_loader(PostLoader(pool.clone()), &rc, "post", 3600), 67 + post_stats: NonCachedLoader::new(PostStatsLoader(idxc.clone())), 68 + profile: new_plc_loader(ProfileLoader(pool.clone()), &rc, "profile", 3600), 69 + profile_stats: NonCachedLoader::new(ProfileStatsLoader(idxc.clone())), 70 + starterpacks: new_plc_loader(StarterPackLoader(pool.clone()), &rc, "starterpacks", 600), 71 + verification: new_plc_loader(VerificationLoader(pool.clone()), &rc, "verification", 60), 72 + } 73 + } 74 + } 75 + 76 + pub struct LikeLoader(parakeet_index::Client); 77 + impl BatchFn<String, i32> for LikeLoader { 78 + async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 79 + let res = self 80 + .0 81 + .get_like_count_many(parakeet_index::GetStatsManyReq { 82 + uris: keys.to_vec(), 83 + }) 84 + .await 85 + .map(|v| v.into_inner()); 86 + 87 + match res { 88 + Ok(data) => data 89 + .entries 90 + .into_iter() 91 + .map(|(k, v)| (k, v.likes)) 92 + .collect(), 93 + Err(_) => HashMap::new(), 41 94 } 42 95 } 43 96 } ··· 66 119 } 67 120 } 68 121 69 - pub struct ProfileLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 122 + pub struct ProfileLoader(Pool<AsyncPgConnection>); 70 123 pub type ProfileLoaderRet = ( 71 124 Option<String>, 72 125 models::Profile, 73 126 Option<ChatAllowIncoming>, 74 127 bool, 75 - Option<parakeet_index::ProfileStats>, 76 128 Option<models::Status>, 77 129 Option<ProfileAllowSubscriptions>, 78 130 ); ··· 115 167 )>(&mut conn) 116 168 .await; 117 169 118 - let stats_req = parakeet_index::GetStatsManyReq { 119 - uris: keys.to_vec(), 120 - }; 121 - let mut stats = self 122 - .1 123 - .get_profile_stats_many(stats_req) 124 - .await 125 - .unwrap() 126 - .into_inner() 127 - .entries; 128 - 129 170 match res { 130 171 Ok(res) => HashMap::from_iter(res.into_iter().map( 131 172 |(did, handle, profile, chat_decl, labeler_cid, status, notif_decl)| { ··· 133 174 let notif_decl = 134 175 notif_decl.and_then(|v| ProfileAllowSubscriptions::from_str(&v).ok()); 135 176 let is_labeler = labeler_cid.is_some(); 136 - let maybe_stats = stats.remove(&did); 137 177 138 - let val = ( 139 - handle, 140 - profile, 141 - chat_decl, 142 - is_labeler, 143 - maybe_stats, 144 - status, 145 - notif_decl, 146 - ); 178 + let val = (handle, profile, chat_decl, is_labeler, status, notif_decl); 147 179 148 180 (did, val) 149 181 }, ··· 153 185 HashMap::new() 154 186 } 155 187 } 188 + } 189 + } 190 + 191 + pub struct ProfileStatsLoader(parakeet_index::Client); 192 + impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 193 + async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 194 + let stats_req = parakeet_index::GetStatsManyReq { 195 + uris: keys.to_vec(), 196 + }; 197 + 198 + self.0 199 + .get_profile_stats_many(stats_req) 200 + .await 201 + .unwrap() 202 + .into_inner() 203 + .entries 156 204 } 157 205 } 158 206 ··· 189 237 } 190 238 191 239 pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 192 - type FeedGenLoaderRet = (models::FeedGen, Option<i32>); 193 - impl BatchFn<String, FeedGenLoaderRet> for FeedGenLoader { 194 - async fn load(&mut self, keys: &[String]) -> HashMap<String, FeedGenLoaderRet> { 240 + impl BatchFn<String, models::FeedGen> for FeedGenLoader { 241 + async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 195 242 let mut conn = self.0.get().await.unwrap(); 196 243 197 244 let res = schema::feedgens::table ··· 200 247 .load(&mut conn) 201 248 .await; 202 249 203 - let stats_req = parakeet_index::GetStatsManyReq { 204 - uris: keys.to_vec(), 205 - }; 206 - let mut stats = self 207 - .1 208 - .get_like_count_many(stats_req) 209 - .await 210 - .unwrap() 211 - .into_inner() 212 - .entries; 213 - 214 250 match res { 215 - Ok(res) => HashMap::from_iter(res.into_iter().map(|feedgen| { 216 - let likes = stats.remove(&feedgen.at_uri).map(|v| v.likes); 217 - 218 - (feedgen.at_uri.clone(), (feedgen, likes)) 219 - })), 251 + Ok(res) => HashMap::from_iter( 252 + res.into_iter() 253 + .map(|feedgen| (feedgen.at_uri.clone(), feedgen)), 254 + ), 220 255 Err(e) => { 221 256 tracing::error!("feedgen load failed: {e}"); 222 257 HashMap::new() ··· 225 260 } 226 261 } 227 262 228 - pub struct PostLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 229 - type PostLoaderRet = ( 230 - models::Post, 231 - Option<models::Threadgate>, 232 - Option<parakeet_index::PostStats>, 233 - ); 263 + pub struct PostLoader(Pool<AsyncPgConnection>); 264 + type PostLoaderRet = (models::Post, Option<models::Threadgate>); 234 265 impl BatchFn<String, PostLoaderRet> for PostLoader { 235 266 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 236 267 let mut conn = self.0.get().await.unwrap(); ··· 245 276 .load(&mut conn) 246 277 .await; 247 278 279 + match res { 280 + Ok(res) => HashMap::from_iter( 281 + res.into_iter() 282 + .map(|(post, threadgate)| (post.at_uri.clone(), (post, threadgate))), 283 + ), 284 + Err(e) => { 285 + tracing::error!("post load failed: {e}"); 286 + HashMap::new() 287 + } 288 + } 289 + } 290 + } 291 + 292 + pub struct PostStatsLoader(parakeet_index::Client); 293 + impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 294 + async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 248 295 let stats_req = parakeet_index::GetStatsManyReq { 249 296 uris: keys.to_vec(), 250 297 }; 251 - let mut stats = self 252 - .1 298 + 299 + self.0 253 300 .get_post_stats_many(stats_req) 254 301 .await 255 302 .unwrap() 256 303 .into_inner() 257 - .entries; 258 - 259 - match res { 260 - Ok(res) => HashMap::from_iter(res.into_iter().map(|(post, threadgate)| { 261 - let maybe_stats = stats.remove(&post.at_uri); 262 - 263 - (post.at_uri.clone(), (post, threadgate, maybe_stats)) 264 - })), 265 - Err(e) => { 266 - tracing::error!("post load failed: {e}"); 267 - HashMap::new() 268 - } 269 - } 304 + .entries 270 305 } 271 306 } 272 307 273 308 pub struct EmbedLoader(Pool<AsyncPgConnection>); 274 - #[derive(Debug, Clone)] 309 + #[derive(Debug, Clone, Serialize, Deserialize)] 275 310 pub enum EmbedLoaderRet { 276 311 Images(Vec<models::PostEmbedImage>), 277 312 Video(models::PostEmbedVideo), ··· 385 420 } 386 421 387 422 pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 388 - type LabelServiceLoaderRet = ( 389 - models::LabelerService, 390 - Vec<models::LabelDefinition>, 391 - Option<i32>, 392 - ); 423 + type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 393 424 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 394 425 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 395 426 let mut conn = self.0.get().await.unwrap(); ··· 408 439 409 440 let defs = defs.grouped_by(&labelers); 410 441 411 - let uris = keys 412 - .iter() 413 - .map(|v| format!("at://{v}/app.bsky.labeler.service/self")) 414 - .collect(); 415 - let stats_req = parakeet_index::GetStatsManyReq { uris }; 416 - let mut stats = self 417 - .1 418 - .get_like_count_many(stats_req) 419 - .await 420 - .unwrap() 421 - .into_inner() 422 - .entries; 423 - 424 442 labelers 425 443 .into_iter() 426 444 .zip(defs) 427 - .map(|(labeler, defs)| { 428 - let likes = stats 429 - .remove(&format!( 430 - "at://{}/app.bsky.labeler.service/self", 431 - &labeler.did 432 - )) 433 - .map(|v| v.likes); 434 - 435 - (labeler.did.clone(), (labeler, defs, likes)) 436 - }) 445 + .map(|(labeler, defs)| (labeler.did.clone(), (labeler, defs))) 437 446 .collect() 438 447 } 439 448 }
+8
parakeet/src/main.rs
··· 3 3 use diesel_async::pooled_connection::AsyncDieselConnectionManager; 4 4 use diesel_async::AsyncPgConnection; 5 5 use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; 6 + use redis::aio::MultiplexedConnection; 6 7 use std::sync::Arc; 7 8 use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; 8 9 use tower_http::trace::TraceLayer; 9 10 10 11 const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); 11 12 13 + mod cache; 12 14 mod config; 13 15 mod db; 14 16 mod hydration; ··· 18 20 #[derive(Clone)] 19 21 pub struct GlobalState { 20 22 pub pool: Pool<AsyncPgConnection>, 23 + pub redis_mp: MultiplexedConnection, 21 24 pub dataloaders: Arc<loaders::Dataloaders>, 22 25 pub resolver: Arc<did_resolver::Resolver>, 23 26 pub index_client: parakeet_index::Client, ··· 46 49 tracing::info!("database migrations complete"); 47 50 } 48 51 52 + let redis_client = redis::Client::open(conf.cache_uri)?; 53 + let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 54 + 49 55 let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 50 56 51 57 let dataloaders = Arc::new(loaders::Dataloaders::new( 52 58 pool.clone(), 59 + redis_mp.clone(), 53 60 index_client.clone(), 54 61 )); 55 62 let resolver = Arc::new(did_resolver::Resolver::new(did_resolver::ResolverOpts { ··· 82 89 .layer(cors) 83 90 .with_state(GlobalState { 84 91 pool, 92 + redis_mp, 85 93 dataloaders, 86 94 resolver, 87 95 index_client,