APIs for links and references in the ATmosphere
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 281 lines 7.8 kB view raw
1use bincode::config::Options; 2use clap::Parser; 3use serde::Serialize; 4use std::collections::HashMap; 5use std::path::PathBuf; 6 7use tokio_util::sync::CancellationToken; 8 9use constellation::storage::rocks_store::{ 10 Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts, 11}; 12use constellation::storage::RocksStorage; 13use constellation::Did; 14 15use links::parse_any_link; 16use rocksdb::IteratorMode; 17use std::time; 18 19/// Aggregate links in the at-mosphere 20#[derive(Parser, Debug)] 21#[command(version, about, long_about = None)] 22struct Args { 23 /// where is rocksdb's data 24 #[arg(short, long)] 25 data: PathBuf, 26 /// slow down so we don't kill the firehose consumer, if running concurrently 27 #[arg(short, long)] 28 limit: Option<u64>, 29} 30 31type LinkType = String; 32 33#[derive(Debug, Eq, Hash, PartialEq, Serialize)] 34struct SourceLink(Collection, RPath, LinkType, Option<Collection>); // last is target collection, if it's an at-uri link with a collection 35 36#[derive(Debug, Serialize)] 37struct SourceSample { 38 did: String, 39 rkey: String, 40} 41 42#[derive(Debug, Default, Serialize)] 43struct Bucket { 44 count: u64, 45 sum: u64, 46 sample: Option<SourceSample>, 47} 48 49#[derive(Debug, Default, Serialize)] 50struct Buckets([Bucket; 23]); 51 52const BUCKETS: [u64; 23] = [ 53 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16_384, 65_535, 54 262_144, 1_048_576, 55]; 56 57// b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax 58 59static DID_IDS_CF: &str = "did_ids"; 60static TARGET_IDS_CF: &str = "target_ids"; 61static TARGET_LINKERS_CF: &str = "target_links"; 62 63const REPORT_INTERVAL: usize = 50_000; 64 65type Stats = HashMap<SourceLink, Buckets>; 66 67#[derive(Debug, Serialize)] 68struct Printable { 69 collection: String, 70 path: String, 71 link_type: String, 72 target_collection: Option<String>, 73 buckets: Buckets, 74} 75 76#[derive(Debug, Default)] 77struct ErrStats { 78 failed_to_get_sample: usize, 79 failed_to_read_target_id: usize, 80 failed_to_deserialize_target_key: usize, 81 failed_to_parse_target_as_link: usize, 82 failed_to_get_links: usize, 83 failed_to_deserialize_linkers: usize, 84} 85 86fn thousands(n: usize) -> String { 87 n.to_string() 88 .as_bytes() 89 .rchunks(3) 90 .rev() 91 .map(std::str::from_utf8) 92 .collect::<Result<Vec<&str>, _>>() 93 .unwrap() 94 .join(",") 95} 96 97fn main() { 98 let args = Args::parse(); 99 100 let limit = args.limit.map(|amount| { 101 ratelimit::Ratelimiter::builder(amount, time::Duration::from_secs(1)) 102 .max_tokens(amount) 103 .initial_available(amount) 104 .build() 105 .unwrap() 106 }); 107 108 eprintln!("starting rocksdb..."); 109 let rocks = RocksStorage::open_readonly(args.data).unwrap(); 110 eprintln!("rocks ready."); 111 112 let RocksStorage { ref db, .. } = rocks; 113 114 let stay_alive = CancellationToken::new(); 115 ctrlc::set_handler({ 116 let mut desperation: u8 = 0; 117 let stay_alive = stay_alive.clone(); 118 move || match desperation { 119 0 => { 120 eprintln!("ok, shutting down..."); 121 stay_alive.cancel(); 122 desperation += 1; 123 } 124 1.. => panic!("fine, panicking!"), 125 } 126 }) 127 .unwrap(); 128 129 let mut stats = Stats::new(); 130 let mut err_stats: ErrStats = Default::default(); 131 132 let did_ids_cf = db.cf_handle(DID_IDS_CF).unwrap(); 133 let target_id_cf = db.cf_handle(TARGET_IDS_CF).unwrap(); 134 let target_links_cf = db.cf_handle(TARGET_LINKERS_CF).unwrap(); 135 136 let t0 = time::Instant::now(); 137 let mut t_prev = t0; 138 139 let mut i = 0; 140 for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) { 141 if stay_alive.is_cancelled() { 142 break; 143 } 144 145 if let Some(ref limiter) = limit { 146 if let Err(dur) = limiter.try_wait() { 147 std::thread::sleep(dur) 148 } 149 } 150 151 if i > 0 && i % REPORT_INTERVAL == 0 { 152 let now = time::Instant::now(); 153 let rate = (REPORT_INTERVAL as f32) / (now.duration_since(t_prev).as_secs_f32()); 154 eprintln!( 155 "{i}\t({}k)\t{:.2}\t{rate:.1}/s", 156 thousands(i / 1000), 157 t0.elapsed().as_secs_f32() 158 ); 159 t_prev = now; 160 } 161 i += 1; 162 163 let Ok((target_key, target_id)) = item else { 164 err_stats.failed_to_read_target_id += 1; 165 continue; 166 }; 167 168 let Ok(TargetKey(Target(target), collection, rpath)) = 169 _bincode_opts().deserialize(&target_key) 170 else { 171 err_stats.failed_to_deserialize_target_key += 1; 172 continue; 173 }; 174 175 let source = { 176 let Some(parsed) = parse_any_link(&target) else { 177 err_stats.failed_to_parse_target_as_link += 1; 178 continue; 179 }; 180 SourceLink( 181 collection, 182 rpath, 183 parsed.name().into(), 184 parsed.at_uri_collection().map(Collection), 185 ) 186 }; 187 188 let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else { 189 err_stats.failed_to_get_links += 1; 190 continue; 191 }; 192 let Ok(linkers) = _bincode_opts().deserialize::<TargetLinkers>(&links_raw) else { 193 err_stats.failed_to_deserialize_linkers += 1; 194 continue; 195 }; 196 let (n, _) = linkers.count(); 197 198 if n == 0 { 199 continue; 200 } 201 202 let mut bucket = 0; 203 for edge in BUCKETS { 204 if n <= edge || bucket == 22 { 205 break; 206 } 207 bucket += 1; 208 } 209 210 let b = &mut stats.entry(source).or_default().0[bucket]; 211 b.count += 1; 212 b.sum += n; 213 if b.sample.is_none() { 214 let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize]; 215 if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) { 216 if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) { 217 b.sample = Some(SourceSample { 218 did, 219 rkey: k.clone(), 220 }); 221 } else { 222 err_stats.failed_to_get_sample += 1; 223 } 224 } else { 225 err_stats.failed_to_get_sample += 1; 226 } 227 } 228 229 // if i >= 40_000 { 230 // break; 231 // } 232 } 233 234 let dt = t0.elapsed(); 235 236 eprintln!("gathering stats for output..."); 237 238 let itemified = stats 239 .into_iter() 240 .map( 241 |( 242 SourceLink(Collection(collection), RPath(path), link_type, target_collection), 243 buckets, 244 )| Printable { 245 collection, 246 path, 247 link_type, 248 target_collection: target_collection.map(|Collection(c)| c), 249 buckets, 250 }, 251 ) 252 .collect::<Vec<_>>(); 253 254 match serde_json::to_string(&itemified) { 255 Ok(s) => println!("{s}"), 256 Err(e) => eprintln!("failed to serialize results: {e:?}"), 257 } 258 259 eprintln!( 260 "{} summarizing {} link targets in {:.1}s", 261 if stay_alive.is_cancelled() { 262 "STOPPED" 263 } else { 264 "FINISHED" 265 }, 266 thousands(i), 267 dt.as_secs_f32() 268 ); 269 eprintln!("{err_stats:?}"); 270 eprintln!("bye."); 271} 272 273// scan plan 274 275// buckets (backlink count) 276// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 1048576+ 277// by 278// - collection 279// - json path 280// - link type 281// samples for each bucket for each variation