+3
-2
Cargo.lock
+3
-2
Cargo.lock
···
534
534
"ratelimit",
535
535
"rocksdb",
536
536
"serde",
537
+
"serde_json",
537
538
"serde_with",
538
539
"tempfile",
539
540
"tinyjson",
···
1848
1849
1849
1850
[[package]]
1850
1851
name = "serde_json"
1851
-
version = "1.0.138"
1852
+
version = "1.0.139"
1852
1853
source = "registry+https://github.com/rust-lang/crates.io-index"
1853
-
checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949"
1854
+
checksum = "44f86c3acccc9c65b153fe1b85a3be07fe5515274ec9f0653b4a0875731c72a6"
1854
1855
dependencies = [
1855
1856
"itoa",
1856
1857
"memchr",
+1
constellation/Cargo.toml
+1
constellation/Cargo.toml
+3
-1
constellation/readme.md
+3
-1
constellation/readme.md
···
153
153
- [ ] read ops (api)
154
154
- [ ] expose internal stats?
155
155
- [ ] figure out what's the right thing to do if merge op fails. happened on startup after an unclean reboot.
156
-
156
+
- [x] backups!
157
+
- [x] manual backup on startup
158
+
- [x] background task to create backups on an interval
157
159
158
160
cache
159
161
- [ ] set api response headers
+77
-34
constellation/src/bin/rocks-target-stats.rs
constellation/src/bin/rocks-link-stats.rs
+77
-34
constellation/src/bin/rocks-target-stats.rs
constellation/src/bin/rocks-link-stats.rs
···
1
1
use bincode::config::Options;
2
2
use clap::Parser;
3
+
use serde::Serialize;
3
4
use std::collections::HashMap;
4
5
use std::path::PathBuf;
5
6
···
27
28
28
29
type LinkType = String;
29
30
30
-
#[derive(Debug, Eq, Hash, PartialEq)]
31
-
struct SourceLink(Collection, RPath, LinkType);
31
+
#[derive(Debug, Eq, Hash, PartialEq, Serialize)]
32
+
struct SourceLink(Collection, RPath, LinkType, Option<Collection>); // last is target collection, if it's an at-uri link with a collection
32
33
33
-
#[derive(Debug, Default)]
34
-
struct Buckets([u64; 23]);
34
+
#[derive(Debug, Serialize)]
35
+
struct SourceSample {
36
+
did: String,
37
+
rkey: String,
38
+
}
39
+
40
+
#[derive(Debug, Default, Serialize)]
41
+
struct Bucket {
42
+
count: u64,
43
+
sum: u64,
44
+
sample: Option<SourceSample>,
45
+
}
35
46
36
-
const BUCKETS: Buckets = Buckets([
37
-
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144,
38
-
1048576,
39
-
]);
47
+
#[derive(Debug, Default, Serialize)]
48
+
struct Buckets([Bucket; 23]);
49
+
50
+
const BUCKETS: [u64; 23] = [
51
+
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16_384, 65_535,
52
+
262_144, 1_048_576,
53
+
];
40
54
41
55
// b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax
42
56
···
46
60
47
61
const REPORT_INTERVAL: usize = 50_000;
48
62
49
-
type Stats = HashMap<SourceLink, (String, String, Buckets)>;
63
+
type Stats = HashMap<SourceLink, Buckets>;
64
+
65
+
#[derive(Debug, Serialize)]
66
+
struct Printable {
67
+
collection: String,
68
+
path: String,
69
+
link_type: String,
70
+
target_collection: Option<String>,
71
+
buckets: Buckets,
72
+
}
50
73
51
74
#[derive(Debug, Default)]
52
75
struct ErrStats {
···
133
156
err_stats.failed_to_parse_target_as_link += 1;
134
157
continue;
135
158
};
136
-
SourceLink(collection, rpath, parsed.name().into())
159
+
SourceLink(
160
+
collection,
161
+
rpath,
162
+
parsed.name().into(),
163
+
parsed.at_uri_collection().map(Collection),
164
+
)
137
165
};
138
166
139
167
let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else {
···
151
179
}
152
180
153
181
let mut bucket = 0;
154
-
for edge in BUCKETS.0 {
182
+
for edge in BUCKETS {
155
183
if n <= edge || bucket == 22 {
156
184
break;
157
185
}
158
186
bucket += 1;
159
187
}
160
188
161
-
stats
162
-
.entry(source)
163
-
.or_insert_with(|| {
164
-
let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize];
165
-
if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) {
166
-
if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) {
167
-
return (did, k.clone(), Default::default());
168
-
}
189
+
let b = &mut stats.entry(source).or_default().0[bucket];
190
+
b.count += 1;
191
+
b.sum += n;
192
+
if b.sample.is_none() {
193
+
let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize];
194
+
if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) {
195
+
if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) {
196
+
b.sample = Some(SourceSample {
197
+
did,
198
+
rkey: k.clone(),
199
+
});
200
+
} else {
201
+
err_stats.failed_to_get_sample += 1;
169
202
}
203
+
} else {
170
204
err_stats.failed_to_get_sample += 1;
171
-
("".into(), "".into(), Default::default())
172
-
})
173
-
.2
174
-
.0[bucket] += 1;
205
+
}
206
+
}
175
207
176
-
// if i >= 400_000 { break }
208
+
if i >= 40_000 {
209
+
break;
210
+
}
177
211
}
178
212
179
213
eprintln!(
···
183
217
);
184
218
eprintln!("{err_stats:?}");
185
219
186
-
for (SourceLink(Collection(c), RPath(p), t), (d, r, Buckets(b))) in stats {
187
-
let sample_at_uri = if !(d.is_empty() || r.is_empty()) {
188
-
format!("at://{d}/{c}/{r}")
189
-
} else {
190
-
"".into()
191
-
};
192
-
println!(
193
-
"{c:?}, {p:?}, {t:?}, {sample_at_uri:?}, {}",
194
-
b.map(|n| n.to_string()).join(", ")
195
-
);
220
+
let itemified = stats
221
+
.into_iter()
222
+
.map(
223
+
|(
224
+
SourceLink(Collection(collection), RPath(path), link_type, target_collection),
225
+
buckets,
226
+
)| Printable {
227
+
collection,
228
+
path,
229
+
link_type,
230
+
target_collection: target_collection.map(|Collection(c)| c),
231
+
buckets,
232
+
},
233
+
)
234
+
.collect::<Vec<_>>();
235
+
236
+
match serde_json::to_string(&itemified) {
237
+
Ok(s) => println!("{s}"),
238
+
Err(e) => eprintln!("failed to serialize results: {e:?}"),
196
239
}
197
240
198
241
eprintln!("bye.");
+1
-1
constellation/src/storage/rocks_store.rs
+1
-1
constellation/src/storage/rocks_store.rs
+67
links/src/at_uri.rs
+67
links/src/at_uri.rs
···
137
137
// there's a more normalization to do still. ugh.
138
138
}
139
139
140
+
pub fn at_uri_collection(at_uri: &str) -> Option<String> {
141
+
let (proto, rest) = at_uri.split_at_checked(5)?;
142
+
if !proto.eq_ignore_ascii_case("at://") {
143
+
return None;
144
+
}
145
+
let (_did, rest) = rest.split_once('/')?;
146
+
if let Some((collection, _path_rest)) = rest.split_once('/') {
147
+
return Some(collection.to_string());
148
+
}
149
+
if let Some((collection, _query_rest)) = rest.split_once('?') {
150
+
return Some(collection.to_string());
151
+
}
152
+
if let Some((collection, _hash_rest)) = rest.split_once('#') {
153
+
return Some(collection.to_string());
154
+
}
155
+
Some(rest.to_string())
156
+
}
157
+
140
158
#[cfg(test)]
141
159
mod tests {
142
160
use super::*;
···
231
249
] {
232
250
assert_eq!(
233
251
parse_at_uri(case),
252
+
expected.map(|s| s.to_string()),
253
+
"{detail}"
254
+
);
255
+
}
256
+
}
257
+
258
+
#[test]
259
+
fn test_at_uri_collection() {
260
+
for (case, expected, detail) in vec![
261
+
("", None, "empty"),
262
+
("at://did:plc:vc7f4oafdgxsihk4cry2xpze", None, "did only"),
263
+
(
264
+
"at://did:plc:vc7f4oafdgxsihk4cry2xpze/collec.tion",
265
+
Some("collec.tion"),
266
+
"no path (weird)",
267
+
),
268
+
(
269
+
"at://did:plc:vc7f4oafdgxsihk4cry2xpze/collec.tion/path",
270
+
Some("collec.tion"),
271
+
"normal at-uri",
272
+
),
273
+
(
274
+
"at://did:plc:vc7f4oafdgxsihk4cry2xpze/collec.tion?query",
275
+
Some("collec.tion"),
276
+
"colleciton with query",
277
+
),
278
+
(
279
+
"at://did:plc:vc7f4oafdgxsihk4cry2xpze/collec.tion#hash",
280
+
Some("collec.tion"),
281
+
"colleciton with hash",
282
+
),
283
+
(
284
+
"at://did:plc:vc7f4oafdgxsihk4cry2xpze/collec.tion/path?query#hash",
285
+
Some("collec.tion"),
286
+
"colleciton with everything",
287
+
),
288
+
(
289
+
"at://did:web:example.com/collec.tion/path",
290
+
Some("collec.tion"),
291
+
"did:web",
292
+
),
293
+
(
294
+
"at://did:web:example.com/col.lec.tio.ns.so.long.going.on.and.on",
295
+
Some("col.lec.tio.ns.so.long.going.on.and.on"),
296
+
"long collection",
297
+
),
298
+
] {
299
+
assert_eq!(
300
+
at_uri_collection(case),
234
301
expected.map(|s| s.to_string()),
235
302
"{detail}"
236
303
);
+29
links/src/lib.rs
+29
links/src/lib.rs
···
35
35
Link::Did(_) => "did",
36
36
}
37
37
}
38
+
pub fn at_uri_collection(&self) -> Option<String> {
39
+
if let Link::AtUri(at_uri) = self {
40
+
at_uri::at_uri_collection(at_uri)
41
+
} else {
42
+
None
43
+
}
44
+
}
38
45
}
39
46
40
47
#[derive(Debug, PartialEq)]
···
99
106
parse_any_link("did:plc:44ybard66vv44zksje25o7dz"),
100
107
Some(Link::Did("did:plc:44ybard66vv44zksje25o7dz".into()))
101
108
)
109
+
}
110
+
111
+
#[test]
112
+
fn test_at_uri_collection() {
113
+
assert_eq!(
114
+
parse_any_link("https://example.com")
115
+
.unwrap()
116
+
.at_uri_collection(),
117
+
None
118
+
);
119
+
assert_eq!(
120
+
parse_any_link("did:web:bad-example.com")
121
+
.unwrap()
122
+
.at_uri_collection(),
123
+
None
124
+
);
125
+
assert_eq!(
126
+
parse_any_link("at://did:web:bad-example.com/my.collection/3jwdwj2ctlk26")
127
+
.unwrap()
128
+
.at_uri_collection(),
129
+
Some("my.collection".into())
130
+
);
102
131
}
103
132
}