APIs for links and references in the ATmosphere
1use bincode::config::Options;
2use clap::Parser;
3use serde::Serialize;
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use tokio_util::sync::CancellationToken;
8
9use constellation::storage::rocks_store::{
10 Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts,
11};
12use constellation::storage::RocksStorage;
13use constellation::Did;
14
15use links::parse_any_link;
16use rocksdb::IteratorMode;
17use std::time;
18
19/// Aggregate links in the at-mosphere
20#[derive(Parser, Debug)]
21#[command(version, about, long_about = None)]
22struct Args {
23 /// where is rocksdb's data
24 #[arg(short, long)]
25 data: PathBuf,
26 /// slow down so we don't kill the firehose consumer, if running concurrently
27 #[arg(short, long)]
28 limit: Option<u64>,
29}
30
31type LinkType = String;
32
33#[derive(Debug, Eq, Hash, PartialEq, Serialize)]
34struct SourceLink(Collection, RPath, LinkType, Option<Collection>); // last is target collection, if it's an at-uri link with a collection
35
36#[derive(Debug, Serialize)]
37struct SourceSample {
38 did: String,
39 rkey: String,
40}
41
42#[derive(Debug, Default, Serialize)]
43struct Bucket {
44 count: u64,
45 sum: u64,
46 sample: Option<SourceSample>,
47}
48
49#[derive(Debug, Default, Serialize)]
50struct Buckets([Bucket; 23]);
51
52const BUCKETS: [u64; 23] = [
53 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16_384, 65_535,
54 262_144, 1_048_576,
55];
56
57// b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax
58
59static DID_IDS_CF: &str = "did_ids";
60static TARGET_IDS_CF: &str = "target_ids";
61static TARGET_LINKERS_CF: &str = "target_links";
62
63const REPORT_INTERVAL: usize = 50_000;
64
65type Stats = HashMap<SourceLink, Buckets>;
66
67#[derive(Debug, Serialize)]
68struct Printable {
69 collection: String,
70 path: String,
71 link_type: String,
72 target_collection: Option<String>,
73 buckets: Buckets,
74}
75
76#[derive(Debug, Default)]
77struct ErrStats {
78 failed_to_get_sample: usize,
79 failed_to_read_target_id: usize,
80 failed_to_deserialize_target_key: usize,
81 failed_to_parse_target_as_link: usize,
82 failed_to_get_links: usize,
83 failed_to_deserialize_linkers: usize,
84}
85
86fn thousands(n: usize) -> String {
87 n.to_string()
88 .as_bytes()
89 .rchunks(3)
90 .rev()
91 .map(std::str::from_utf8)
92 .collect::<Result<Vec<&str>, _>>()
93 .unwrap()
94 .join(",")
95}
96
97fn main() {
98 let args = Args::parse();
99
100 let limit = args.limit.map(|amount| {
101 ratelimit::Ratelimiter::builder(amount, time::Duration::from_secs(1))
102 .max_tokens(amount)
103 .initial_available(amount)
104 .build()
105 .unwrap()
106 });
107
108 eprintln!("starting rocksdb...");
109 let rocks = RocksStorage::open_readonly(args.data).unwrap();
110 eprintln!("rocks ready.");
111
112 let RocksStorage { ref db, .. } = rocks;
113
114 let stay_alive = CancellationToken::new();
115 ctrlc::set_handler({
116 let mut desperation: u8 = 0;
117 let stay_alive = stay_alive.clone();
118 move || match desperation {
119 0 => {
120 eprintln!("ok, shutting down...");
121 stay_alive.cancel();
122 desperation += 1;
123 }
124 1.. => panic!("fine, panicking!"),
125 }
126 })
127 .unwrap();
128
129 let mut stats = Stats::new();
130 let mut err_stats: ErrStats = Default::default();
131
132 let did_ids_cf = db.cf_handle(DID_IDS_CF).unwrap();
133 let target_id_cf = db.cf_handle(TARGET_IDS_CF).unwrap();
134 let target_links_cf = db.cf_handle(TARGET_LINKERS_CF).unwrap();
135
136 let t0 = time::Instant::now();
137 let mut t_prev = t0;
138
139 let mut i = 0;
140 for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) {
141 if stay_alive.is_cancelled() {
142 break;
143 }
144
145 if let Some(ref limiter) = limit {
146 if let Err(dur) = limiter.try_wait() {
147 std::thread::sleep(dur)
148 }
149 }
150
151 if i > 0 && i % REPORT_INTERVAL == 0 {
152 let now = time::Instant::now();
153 let rate = (REPORT_INTERVAL as f32) / (now.duration_since(t_prev).as_secs_f32());
154 eprintln!(
155 "{i}\t({}k)\t{:.2}\t{rate:.1}/s",
156 thousands(i / 1000),
157 t0.elapsed().as_secs_f32()
158 );
159 t_prev = now;
160 }
161 i += 1;
162
163 let Ok((target_key, target_id)) = item else {
164 err_stats.failed_to_read_target_id += 1;
165 continue;
166 };
167
168 let Ok(TargetKey(Target(target), collection, rpath)) =
169 _bincode_opts().deserialize(&target_key)
170 else {
171 err_stats.failed_to_deserialize_target_key += 1;
172 continue;
173 };
174
175 let source = {
176 let Some(parsed) = parse_any_link(&target) else {
177 err_stats.failed_to_parse_target_as_link += 1;
178 continue;
179 };
180 SourceLink(
181 collection,
182 rpath,
183 parsed.name().into(),
184 parsed.at_uri_collection().map(Collection),
185 )
186 };
187
188 let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else {
189 err_stats.failed_to_get_links += 1;
190 continue;
191 };
192 let Ok(linkers) = _bincode_opts().deserialize::<TargetLinkers>(&links_raw) else {
193 err_stats.failed_to_deserialize_linkers += 1;
194 continue;
195 };
196 let (n, _) = linkers.count();
197
198 if n == 0 {
199 continue;
200 }
201
202 let mut bucket = 0;
203 for edge in BUCKETS {
204 if n <= edge || bucket == 22 {
205 break;
206 }
207 bucket += 1;
208 }
209
210 let b = &mut stats.entry(source).or_default().0[bucket];
211 b.count += 1;
212 b.sum += n;
213 if b.sample.is_none() {
214 let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize];
215 if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) {
216 if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) {
217 b.sample = Some(SourceSample {
218 did,
219 rkey: k.clone(),
220 });
221 } else {
222 err_stats.failed_to_get_sample += 1;
223 }
224 } else {
225 err_stats.failed_to_get_sample += 1;
226 }
227 }
228
229 // if i >= 40_000 {
230 // break;
231 // }
232 }
233
234 let dt = t0.elapsed();
235
236 eprintln!("gathering stats for output...");
237
238 let itemified = stats
239 .into_iter()
240 .map(
241 |(
242 SourceLink(Collection(collection), RPath(path), link_type, target_collection),
243 buckets,
244 )| Printable {
245 collection,
246 path,
247 link_type,
248 target_collection: target_collection.map(|Collection(c)| c),
249 buckets,
250 },
251 )
252 .collect::<Vec<_>>();
253
254 match serde_json::to_string(&itemified) {
255 Ok(s) => println!("{s}"),
256 Err(e) => eprintln!("failed to serialize results: {e:?}"),
257 }
258
259 eprintln!(
260 "{} summarizing {} link targets in {:.1}s",
261 if stay_alive.is_cancelled() {
262 "STOPPED"
263 } else {
264 "FINISHED"
265 },
266 thousands(i),
267 dt.as_secs_f32()
268 );
269 eprintln!("{err_stats:?}");
270 eprintln!("bye.");
271}
272
273// scan plan
274
275// buckets (backlink count)
276// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 1048576+
277// by
278// - collection
279// - json path
280// - link type
281// samples for each bucket for each variation