Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

rocks: fix target filtering, clippy

Changed files
+35 -53
constellation
+2 -2
constellation/src/storage/mem_store.rs
··· 158 HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s))); 159 160 let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new(); 161 - for (did, rkey) in linkers.into_iter().cloned().filter_map(|l| l) { 162 if !filter_dids.is_empty() && !filter_dids.contains(&did) { 163 continue; 164 } ··· 171 rkey, 172 }) 173 .unwrap_or(&Vec::new()) 174 - .into_iter() 175 .filter_map(|(path, target)| { 176 if *path == path_to_other 177 && (filter_to_targets.is_empty() || filter_to_targets.contains(target))
··· 158 HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s))); 159 160 let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new(); 161 + for (did, rkey) in linkers.iter().flatten().cloned() { 162 if !filter_dids.is_empty() && !filter_dids.contains(&did) { 163 continue; 164 } ··· 171 rkey, 172 }) 173 .unwrap_or(&Vec::new()) 174 + .iter() 175 .filter_map(|(path, target)| { 176 if *path == path_to_other 177 && (filter_to_targets.is_empty() || filter_to_targets.contains(target))
+1
constellation/src/storage/mod.rs
··· 59 } 60 61 pub trait LinkReader: Clone + Send + Sync + 'static { 62 fn get_many_to_many_counts( 63 &self, 64 target: &str,
··· 59 } 60 61 pub trait LinkReader: Clone + Send + Sync + 'static { 62 + #[allow(clippy::too_many_arguments)] 63 fn get_many_to_many_counts( 64 &self, 65 target: &str,
+32 -51
constellation/src/storage/rocks_store.rs
··· 59 #[derive(Debug, Clone)] 60 pub struct RocksStorage { 61 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 62 - did_id_table: IdTable<Did, DidIdValue, true>, 63 - target_id_table: IdTable<TargetKey, TargetId, true>, 64 is_writer: bool, 65 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>, 66 } ··· 88 fn cf_descriptor(&self) -> ColumnFamilyDescriptor { 89 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base()) 90 } 91 - fn init<const WITH_REVERSE: bool>( 92 - self, 93 - db: &DBWithThreadMode<MultiThreaded>, 94 - ) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> { 95 if db.cf_handle(&self.name).is_none() { 96 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?"); 97 } ··· 122 } 123 } 124 #[derive(Debug, Clone)] 125 - struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool> 126 where 127 Orig: KeyFromRocks, 128 for<'a> &'a Orig: AsRocksKey, ··· 130 base: IdTableBase<Orig, IdVal>, 131 priv_id_seq: u64, 132 } 133 - impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE> 134 where 135 Orig: KeyFromRocks, 136 for<'v> &'v IdVal: AsRocksValue, ··· 181 id_value 182 })) 183 } 184 fn estimate_count(&self) -> u64 { 185 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved 186 } 187 - } 188 - impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true> 189 - where 190 - Orig: KeyFromRocks, 191 - for<'v> &'v IdVal: AsRocksValue, 192 - for<'k> &'k Orig: AsRocksKey, 193 - { 194 fn get_or_create_id_val( 195 &mut self, 196 db: &DBWithThreadMode<MultiThreaded>, ··· 218 } 219 } 220 } 221 - impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false> 222 - where 223 - Orig: KeyFromRocks, 224 - for<'v> &'v IdVal: AsRocksValue, 225 - for<'k> &'k Orig: AsRocksKey, 226 - { 227 - fn get_or_create_id_val( 228 - &mut self, 229 - db: &DBWithThreadMode<MultiThreaded>, 230 - batch: &mut WriteBatch, 231 - orig: &Orig, 232 - ) -> Result<IdVal> { 233 - let cf = db.cf_handle(&self.base.name).unwrap(); 234 - self.__get_or_create_id_val(&cf, db, batch, orig) 235 - } 236 - } 237 238 impl IdTableValue for DidIdValue { 239 fn new(v: u64) -> Self { ··· 263 } 264 265 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 266 - let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF); 267 - let target_id_table = IdTable::<_, _, true>::setup(TARGET_IDS_CF); 268 269 let cfs = vec![ 270 // id reference tables ··· 857 }; 858 859 let filter_did_ids: HashMap<DidId, bool> = filter_dids 860 - .into_iter() 861 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 862 .collect::<Result<Vec<DidIdValue>>>()? 863 .into_iter() 864 .map(|DidIdValue(id, active)| (id, active)) 865 .collect(); 866 867 - let filter_to_target_ids = filter_to_targets 868 - .into_iter() 869 - .filter_map(|target| { 870 - self.target_id_table 871 - .get_id_val( 872 - &self.db, 873 - &TargetKey(Target(target.to_string()), collection.clone(), path.clone()), 874 - ) 875 - .transpose() 876 - }) 877 - .collect::<Result<HashSet<TargetId>>>()?; 878 879 let linkers = self.get_target_linkers(&target_id)?; 880 ··· 923 // (this check continues after the did-lookup, which we have to do) 924 let page_is_full = grouped_counts.len() as u64 >= limit; 925 if page_is_full { 926 - let current_max = grouped_counts.keys().rev().next().unwrap(); // limit should be non-zero bleh 927 if fwd_target > *current_max { 928 continue; 929 } ··· 952 Default::default() 953 }); 954 entry.0 += 1; 955 - entry.1.insert(did_id.clone()); 956 957 if should_evict { 958 grouped_counts.pop_last(); ··· 961 962 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len()); 963 for (target_id, (n, dids)) in &grouped_counts { 964 - let Some(target) = self.target_id_table.get_val_from_id(&self.db, target_id.0)? else { 965 eprintln!("failed to look up target from target_id {target_id:?}"); 966 continue; 967 }; 968 - items.push((target.0.0, *n, dids.len() as u64)); 969 } 970 971 let next = if grouped_counts.len() as u64 >= limit { 972 // yeah.... it's a number saved as a string......sorry 973 grouped_counts 974 .keys() 975 - .rev() 976 - .next() 977 .map(|k| format!("{}", k.0)) 978 } else { 979 None
··· 59 #[derive(Debug, Clone)] 60 pub struct RocksStorage { 61 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 62 + did_id_table: IdTable<Did, DidIdValue>, 63 + target_id_table: IdTable<TargetKey, TargetId>, 64 is_writer: bool, 65 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>, 66 } ··· 88 fn cf_descriptor(&self) -> ColumnFamilyDescriptor { 89 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base()) 90 } 91 + fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> { 92 if db.cf_handle(&self.name).is_none() { 93 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?"); 94 } ··· 119 } 120 } 121 #[derive(Debug, Clone)] 122 + struct IdTable<Orig, IdVal: IdTableValue> 123 where 124 Orig: KeyFromRocks, 125 for<'a> &'a Orig: AsRocksKey, ··· 127 base: IdTableBase<Orig, IdVal>, 128 priv_id_seq: u64, 129 } 130 + impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal> 131 where 132 Orig: KeyFromRocks, 133 for<'v> &'v IdVal: AsRocksValue, ··· 178 id_value 179 })) 180 } 181 + 182 fn estimate_count(&self) -> u64 { 183 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved 184 } 185 + 186 fn get_or_create_id_val( 187 &mut self, 188 db: &DBWithThreadMode<MultiThreaded>, ··· 210 } 211 } 212 } 213 214 impl IdTableValue for DidIdValue { 215 fn new(v: u64) -> Self { ··· 239 } 240 241 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 242 + let did_id_table = IdTable::setup(DID_IDS_CF); 243 + let target_id_table = IdTable::setup(TARGET_IDS_CF); 244 245 let cfs = vec![ 246 // id reference tables ··· 833 }; 834 835 let filter_did_ids: HashMap<DidId, bool> = filter_dids 836 + .iter() 837 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 838 .collect::<Result<Vec<DidIdValue>>>()? 839 .into_iter() 840 .map(|DidIdValue(id, active)| (id, active)) 841 .collect(); 842 843 + // stored targets are keyed by triples of (target, collection, path). 844 + // target filtering only consideres the target itself, so we actually 845 + // need to do a prefix iteration of all target ids for this target and 846 + // keep them all. 847 + // i *think* the number of keys at a target prefix should usually be 848 + // pretty small, so this is hopefully fine. but if it turns out to be 849 + // large, we can push this filtering back into the main links loop and 850 + // do forward db queries per backlink to get the raw target back out. 851 + let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 852 + for t in filter_to_targets { 853 + for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { 854 + filter_to_target_ids.insert(target_id); 855 + } 856 + } 857 858 let linkers = self.get_target_linkers(&target_id)?; 859 ··· 902 // (this check continues after the did-lookup, which we have to do) 903 let page_is_full = grouped_counts.len() as u64 >= limit; 904 if page_is_full { 905 + let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh 906 if fwd_target > *current_max { 907 continue; 908 } ··· 931 Default::default() 932 }); 933 entry.0 += 1; 934 + entry.1.insert(did_id); 935 936 if should_evict { 937 grouped_counts.pop_last(); ··· 940 941 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len()); 942 for (target_id, (n, dids)) in &grouped_counts { 943 + let Some(target) = self 944 + .target_id_table 945 + .get_val_from_id(&self.db, target_id.0)? 946 + else { 947 eprintln!("failed to look up target from target_id {target_id:?}"); 948 continue; 949 }; 950 + items.push((target.0 .0, *n, dids.len() as u64)); 951 } 952 953 let next = if grouped_counts.len() as u64 >= limit { 954 // yeah.... it's a number saved as a string......sorry 955 grouped_counts 956 .keys() 957 + .next_back() 958 .map(|k| format!("{}", k.0)) 959 } else { 960 None