+9
-9
Cargo.lock
+9
-9
Cargo.lock
···
992
993
[[package]]
994
name = "clap"
995
-
version = "4.5.47"
996
source = "registry+https://github.com/rust-lang/crates.io-index"
997
-
checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931"
998
dependencies = [
999
"clap_builder",
1000
"clap_derive",
···
1002
1003
[[package]]
1004
name = "clap_builder"
1005
-
version = "4.5.47"
1006
source = "registry+https://github.com/rust-lang/crates.io-index"
1007
-
checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6"
1008
dependencies = [
1009
"anstream",
1010
"anstyle",
···
1375
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
1376
dependencies = [
1377
"data-encoding",
1378
-
"syn 1.0.109",
1379
]
1380
1381
[[package]]
···
3045
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
3046
dependencies = [
3047
"cfg-if",
3048
-
"windows-targets 0.48.5",
3049
]
3050
3051
[[package]]
···
4539
4540
[[package]]
4541
name = "reqwest"
4542
-
version = "0.12.22"
4543
source = "registry+https://github.com/rust-lang/crates.io-index"
4544
-
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
4545
dependencies = [
4546
"async-compression",
4547
"base64 0.22.1",
···
6440
source = "registry+https://github.com/rust-lang/crates.io-index"
6441
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
6442
dependencies = [
6443
-
"windows-sys 0.48.0",
6444
]
6445
6446
[[package]]
···
992
993
[[package]]
994
name = "clap"
995
+
version = "4.5.48"
996
source = "registry+https://github.com/rust-lang/crates.io-index"
997
+
checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae"
998
dependencies = [
999
"clap_builder",
1000
"clap_derive",
···
1002
1003
[[package]]
1004
name = "clap_builder"
1005
+
version = "4.5.48"
1006
source = "registry+https://github.com/rust-lang/crates.io-index"
1007
+
checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9"
1008
dependencies = [
1009
"anstream",
1010
"anstyle",
···
1375
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
1376
dependencies = [
1377
"data-encoding",
1378
+
"syn 2.0.106",
1379
]
1380
1381
[[package]]
···
3045
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
3046
dependencies = [
3047
"cfg-if",
3048
+
"windows-targets 0.52.6",
3049
]
3050
3051
[[package]]
···
4539
4540
[[package]]
4541
name = "reqwest"
4542
+
version = "0.12.23"
4543
source = "registry+https://github.com/rust-lang/crates.io-index"
4544
+
checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb"
4545
dependencies = [
4546
"async-compression",
4547
"base64 0.22.1",
···
6440
source = "registry+https://github.com/rust-lang/crates.io-index"
6441
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
6442
dependencies = [
6443
+
"windows-sys 0.59.0",
6444
]
6445
6446
[[package]]
+29
-10
constellation/src/bin/main.rs
+29
-10
constellation/src/bin/main.rs
···
54
/// Saved jsonl from jetstream to use instead of a live subscription
55
#[arg(short, long)]
56
fixture: Option<PathBuf>,
57
}
58
59
#[derive(Debug, Clone, ValueEnum)]
···
115
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
116
}
117
println!("rocks ready.");
118
-
run(
119
-
rocks,
120
-
fixture,
121
-
args.data,
122
-
stream,
123
-
bind,
124
-
metrics_bind,
125
-
stay_alive,
126
-
)
127
}
128
}
129
}
···
213
214
'monitor: loop {
215
match readable.get_stats() {
216
-
Ok(StorageStats { dids, targetables, linking_records }) => {
217
metrics::gauge!("storage.stats.dids").set(dids as f64);
218
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
219
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
···
54
/// Saved jsonl from jetstream to use instead of a live subscription
55
#[arg(short, long)]
56
fixture: Option<PathBuf>,
57
+
/// run a scan across the target id table and write all key -> ids to id -> keys
58
+
#[arg(long, action)]
59
+
repair_target_ids: bool,
60
}
61
62
#[derive(Debug, Clone, ValueEnum)]
···
118
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
119
}
120
println!("rocks ready.");
121
+
std::thread::scope(|s| {
122
+
if args.repair_target_ids {
123
+
let rocks = rocks.clone();
124
+
let stay_alive = stay_alive.clone();
125
+
s.spawn(move || {
126
+
let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive);
127
+
eprintln!("repair finished: {rep:?}");
128
+
rep
129
+
});
130
+
}
131
+
s.spawn(|| {
132
+
let r = run(
133
+
rocks,
134
+
fixture,
135
+
args.data,
136
+
stream,
137
+
bind,
138
+
metrics_bind,
139
+
stay_alive,
140
+
);
141
+
eprintln!("run finished: {r:?}");
142
+
r
143
+
});
144
+
});
145
+
Ok(())
146
}
147
}
148
}
···
232
233
'monitor: loop {
234
match readable.get_stats() {
235
+
Ok(StorageStats { dids, targetables, linking_records, .. }) => {
236
metrics::gauge!("storage.stats.dids").set(dids as f64);
237
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
238
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
+4
constellation/src/server/filters.rs
+4
constellation/src/server/filters.rs
+145
-8
constellation/src/server/mod.rs
+145
-8
constellation/src/server/mod.rs
···
32
DEFAULT_CURSOR_LIMIT
33
}
34
35
-
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
36
-
37
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
38
-
eprintln!("handler join error: {e}");
39
http::StatusCode::INTERNAL_SERVER_ERROR
40
}
41
···
52
let store = store.clone();
53
move |accept| async {
54
spawn_blocking(|| hello(accept, store))
55
.await
56
.map_err(to500)?
57
}
···
190
#[template(path = "hello.html.j2")]
191
struct HelloReponse {
192
help: &'static str,
193
-
days_indexed: u64,
194
stats: StorageStats,
195
}
196
fn hello(
···
200
let stats = store
201
.get_stats()
202
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
203
-
let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
204
-
.elapsed()
205
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
206
-
.as_secs()
207
-
/ 86400;
208
Ok(acceptable(accept, HelloReponse {
209
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
210
days_indexed,
211
stats,
212
}))
213
}
214
215
#[derive(Clone, Deserialize)]
···
609
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
610
}
611
}
···
32
DEFAULT_CURSOR_LIMIT
33
}
34
35
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
36
+
eprintln!("handler error: {e}");
37
http::StatusCode::INTERNAL_SERVER_ERROR
38
}
39
···
50
let store = store.clone();
51
move |accept| async {
52
spawn_blocking(|| hello(accept, store))
53
+
.await
54
+
.map_err(to500)?
55
+
}
56
+
}),
57
+
)
58
+
.route(
59
+
"/xrpc/blue.microcosm.links.getManyToManyCounts",
60
+
get({
61
+
let store = store.clone();
62
+
move |accept, query| async {
63
+
spawn_blocking(|| get_many_to_many_counts(accept, query, store))
64
.await
65
.map_err(to500)?
66
}
···
199
#[template(path = "hello.html.j2")]
200
struct HelloReponse {
201
help: &'static str,
202
+
days_indexed: Option<u64>,
203
stats: StorageStats,
204
}
205
fn hello(
···
209
let stats = store
210
.get_stats()
211
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
212
+
let days_indexed = stats
213
+
.started_at
214
+
.map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
215
+
.transpose()
216
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
217
+
.map(|d| d.as_secs() / 86_400);
218
Ok(acceptable(accept, HelloReponse {
219
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
220
days_indexed,
221
stats,
222
}))
223
+
}
224
+
225
+
#[derive(Clone, Deserialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
struct GetManyToManyCountsQuery {
228
+
subject: String,
229
+
source: String,
230
+
/// path to the secondary link in the linking record
231
+
path_to_other: String,
232
+
/// filter to linking records (join of the m2m) by these DIDs
233
+
#[serde(default)]
234
+
did: Vec<String>,
235
+
/// filter to specific secondary records
236
+
#[serde(default)]
237
+
other_subject: Vec<String>,
238
+
cursor: Option<OpaqueApiCursor>,
239
+
/// Set the max number of links to return per page of results
240
+
#[serde(default = "get_default_cursor_limit")]
241
+
limit: u64,
242
+
}
243
+
#[derive(Serialize)]
244
+
struct OtherSubjectCount {
245
+
subject: String,
246
+
total: u64,
247
+
distinct: u64,
248
+
}
249
+
#[derive(Template, Serialize)]
250
+
#[template(path = "get-many-to-many-counts.html.j2")]
251
+
struct GetManyToManyCountsResponse {
252
+
counts_by_other_subject: Vec<OtherSubjectCount>,
253
+
cursor: Option<OpaqueApiCursor>,
254
+
#[serde(skip_serializing)]
255
+
query: GetManyToManyCountsQuery,
256
+
}
257
+
fn get_many_to_many_counts(
258
+
accept: ExtractAccept,
259
+
query: axum_extra::extract::Query<GetManyToManyCountsQuery>,
260
+
store: impl LinkReader,
261
+
) -> Result<impl IntoResponse, http::StatusCode> {
262
+
let cursor_key = query
263
+
.cursor
264
+
.clone()
265
+
.map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
266
+
.transpose()?
267
+
.map(|c| c.next);
268
+
269
+
let limit = query.limit;
270
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
271
+
return Err(http::StatusCode::BAD_REQUEST);
272
+
}
273
+
274
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
275
+
query
276
+
.did
277
+
.iter()
278
+
.map(|d| d.trim())
279
+
.filter(|d| !d.is_empty())
280
+
.map(|d| Did(d.to_string())),
281
+
);
282
+
283
+
let filter_other_subjects: HashSet<String> = HashSet::from_iter(
284
+
query
285
+
.other_subject
286
+
.iter()
287
+
.map(|s| s.trim().to_string())
288
+
.filter(|s| !s.is_empty()),
289
+
);
290
+
291
+
let Some((collection, path)) = query.source.split_once(':') else {
292
+
return Err(http::StatusCode::BAD_REQUEST);
293
+
};
294
+
let path = format!(".{path}");
295
+
296
+
let path_to_other = format!(".{}", query.path_to_other);
297
+
298
+
let paged = store
299
+
.get_many_to_many_counts(
300
+
&query.subject,
301
+
collection,
302
+
&path,
303
+
&path_to_other,
304
+
limit,
305
+
cursor_key,
306
+
&filter_dids,
307
+
&filter_other_subjects,
308
+
)
309
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
310
+
311
+
let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into());
312
+
313
+
let items = paged
314
+
.items
315
+
.into_iter()
316
+
.map(|(subject, total, distinct)| OtherSubjectCount {
317
+
subject,
318
+
total,
319
+
distinct,
320
+
})
321
+
.collect();
322
+
323
+
Ok(acceptable(
324
+
accept,
325
+
GetManyToManyCountsResponse {
326
+
counts_by_other_subject: items,
327
+
cursor,
328
+
query: (*query).clone(),
329
+
},
330
+
))
331
}
332
333
#[derive(Clone, Deserialize)]
···
727
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
728
}
729
}
730
+
731
+
#[derive(Serialize, Deserialize)] // for bincode
732
+
struct ApiKeyedCursor {
733
+
next: String, // the key
734
+
}
735
+
736
+
impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor {
737
+
type Error = bincode::Error;
738
+
739
+
fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
740
+
bincode::DefaultOptions::new().deserialize(&item.0)
741
+
}
742
+
}
743
+
744
+
impl From<ApiKeyedCursor> for OpaqueApiCursor {
745
+
fn from(item: ApiKeyedCursor) -> Self {
746
+
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
747
+
}
748
+
}
+78
-1
constellation/src/storage/mem_store.rs
+78
-1
constellation/src/storage/mem_store.rs
···
1
-
use super::{LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
2
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
3
use anyhow::Result;
4
use links::CollectedLink;
···
132
}
133
134
impl LinkReader for MemStorage {
135
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
136
let data = self.0.lock().unwrap();
137
let Some(paths) = data.targets.get(&Target::new(target)) else {
···
353
dids,
354
targetables,
355
linking_records,
356
})
357
}
358
}
···
1
+
use super::{
2
+
LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats,
3
+
};
4
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
5
use anyhow::Result;
6
use links::CollectedLink;
···
134
}
135
136
impl LinkReader for MemStorage {
137
+
fn get_many_to_many_counts(
138
+
&self,
139
+
target: &str,
140
+
collection: &str,
141
+
path: &str,
142
+
path_to_other: &str,
143
+
limit: u64,
144
+
after: Option<String>,
145
+
filter_dids: &HashSet<Did>,
146
+
filter_to_targets: &HashSet<String>,
147
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
148
+
let data = self.0.lock().unwrap();
149
+
let Some(paths) = data.targets.get(&Target::new(target)) else {
150
+
return Ok(PagedOrderedCollection::default());
151
+
};
152
+
let Some(linkers) = paths.get(&Source::new(collection, path)) else {
153
+
return Ok(PagedOrderedCollection::default());
154
+
};
155
+
156
+
let path_to_other = RecordPath::new(path_to_other);
157
+
let filter_to_targets: HashSet<Target> =
158
+
HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
159
+
160
+
let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new();
161
+
for (did, rkey) in linkers.iter().flatten().cloned() {
162
+
if !filter_dids.is_empty() && !filter_dids.contains(&did) {
163
+
continue;
164
+
}
165
+
if let Some(fwd_target) = data
166
+
.links
167
+
.get(&did)
168
+
.unwrap_or(&HashMap::new())
169
+
.get(&RepoId {
170
+
collection: collection.to_string(),
171
+
rkey,
172
+
})
173
+
.unwrap_or(&Vec::new())
174
+
.iter()
175
+
.filter_map(|(path, target)| {
176
+
if *path == path_to_other
177
+
&& (filter_to_targets.is_empty() || filter_to_targets.contains(target))
178
+
{
179
+
Some(target)
180
+
} else {
181
+
None
182
+
}
183
+
})
184
+
.take(1)
185
+
.next()
186
+
{
187
+
let e = grouped_counts.entry(fwd_target.clone()).or_default();
188
+
e.0 += 1;
189
+
e.1.insert(did.clone());
190
+
}
191
+
}
192
+
let mut items: Vec<(String, u64, u64)> = grouped_counts
193
+
.iter()
194
+
.map(|(k, (n, u))| (k.0.clone(), *n, u.len() as u64))
195
+
.collect();
196
+
items.sort();
197
+
items = items
198
+
.into_iter()
199
+
.skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false))
200
+
.take(limit as usize)
201
+
.collect();
202
+
let next = if items.len() as u64 >= limit {
203
+
items.last().map(|(t, _, _)| t.clone())
204
+
} else {
205
+
None
206
+
};
207
+
Ok(PagedOrderedCollection { items, next })
208
+
}
209
+
210
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
211
let data = self.0.lock().unwrap();
212
let Some(paths) = data.targets.get(&Target::new(target)) else {
···
428
dids,
429
targetables,
430
linking_records,
431
+
started_at: None,
432
+
other_data: Default::default(),
433
})
434
}
435
}
+225
constellation/src/storage/mod.rs
+225
constellation/src/storage/mod.rs
···
19
pub total: u64,
20
}
21
22
#[derive(Debug, Deserialize, Serialize, PartialEq)]
23
pub struct StorageStats {
24
/// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
···
33
/// records with multiple links are single-counted.
34
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
35
pub linking_records: u64,
36
}
37
38
pub trait LinkStorage: Send + Sync {
···
48
}
49
50
pub trait LinkReader: Clone + Send + Sync + 'static {
51
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
52
53
fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
···
1326
counts
1327
});
1328
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1329
});
1330
}
···
19
pub total: u64,
20
}
21
22
+
/// A paged collection whose keys are sorted instead of indexed
23
+
///
24
+
/// this has weaker guarantees than PagedAppendingCollection: it might
25
+
/// return a totally consistent snapshot. but it should avoid duplicates
26
+
/// and each page should at least be internally consistent.
27
+
#[derive(Debug, PartialEq, Default)]
28
+
pub struct PagedOrderedCollection<T, K: Ord> {
29
+
pub items: Vec<T>,
30
+
pub next: Option<K>,
31
+
}
32
+
33
#[derive(Debug, Deserialize, Serialize, PartialEq)]
34
pub struct StorageStats {
35
/// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
···
44
/// records with multiple links are single-counted.
45
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
46
pub linking_records: u64,
47
+
48
+
/// first jetstream cursor when this instance first started
49
+
pub started_at: Option<u64>,
50
+
51
+
/// anything else we want to throw in
52
+
pub other_data: HashMap<String, u64>,
53
}
54
55
pub trait LinkStorage: Send + Sync {
···
65
}
66
67
pub trait LinkReader: Clone + Send + Sync + 'static {
68
+
#[allow(clippy::too_many_arguments)]
69
+
fn get_many_to_many_counts(
70
+
&self,
71
+
target: &str,
72
+
collection: &str,
73
+
path: &str,
74
+
path_to_other: &str,
75
+
limit: u64,
76
+
after: Option<String>,
77
+
filter_dids: &HashSet<Did>,
78
+
filter_to_targets: &HashSet<String>,
79
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
80
+
81
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
82
83
fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
···
1356
counts
1357
});
1358
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1359
+
});
1360
+
1361
+
//////// many-to-many /////////
1362
+
1363
+
test_each_storage!(get_m2m_counts_empty, |storage| {
1364
+
assert_eq!(
1365
+
storage.get_many_to_many_counts(
1366
+
"a.com",
1367
+
"a.b.c",
1368
+
".d.e",
1369
+
".f.g",
1370
+
10,
1371
+
None,
1372
+
&HashSet::new(),
1373
+
&HashSet::new(),
1374
+
)?,
1375
+
PagedOrderedCollection {
1376
+
items: vec![],
1377
+
next: None,
1378
+
}
1379
+
);
1380
+
});
1381
+
1382
+
test_each_storage!(get_m2m_counts_single, |storage| {
1383
+
storage.push(
1384
+
&ActionableEvent::CreateLinks {
1385
+
record_id: RecordId {
1386
+
did: "did:plc:asdf".into(),
1387
+
collection: "app.t.c".into(),
1388
+
rkey: "asdf".into(),
1389
+
},
1390
+
links: vec![
1391
+
CollectedLink {
1392
+
target: Link::Uri("a.com".into()),
1393
+
path: ".abc.uri".into(),
1394
+
},
1395
+
CollectedLink {
1396
+
target: Link::Uri("b.com".into()),
1397
+
path: ".def.uri".into(),
1398
+
},
1399
+
CollectedLink {
1400
+
target: Link::Uri("b.com".into()),
1401
+
path: ".ghi.uri".into(),
1402
+
},
1403
+
],
1404
+
},
1405
+
0,
1406
+
)?;
1407
+
assert_eq!(
1408
+
storage.get_many_to_many_counts(
1409
+
"a.com",
1410
+
"app.t.c",
1411
+
".abc.uri",
1412
+
".def.uri",
1413
+
10,
1414
+
None,
1415
+
&HashSet::new(),
1416
+
&HashSet::new(),
1417
+
)?,
1418
+
PagedOrderedCollection {
1419
+
items: vec![("b.com".to_string(), 1, 1)],
1420
+
next: None,
1421
+
}
1422
+
);
1423
+
});
1424
+
1425
+
test_each_storage!(get_m2m_counts_filters, |storage| {
1426
+
storage.push(
1427
+
&ActionableEvent::CreateLinks {
1428
+
record_id: RecordId {
1429
+
did: "did:plc:asdf".into(),
1430
+
collection: "app.t.c".into(),
1431
+
rkey: "asdf".into(),
1432
+
},
1433
+
links: vec![
1434
+
CollectedLink {
1435
+
target: Link::Uri("a.com".into()),
1436
+
path: ".abc.uri".into(),
1437
+
},
1438
+
CollectedLink {
1439
+
target: Link::Uri("b.com".into()),
1440
+
path: ".def.uri".into(),
1441
+
},
1442
+
],
1443
+
},
1444
+
0,
1445
+
)?;
1446
+
storage.push(
1447
+
&ActionableEvent::CreateLinks {
1448
+
record_id: RecordId {
1449
+
did: "did:plc:asdfasdf".into(),
1450
+
collection: "app.t.c".into(),
1451
+
rkey: "asdf".into(),
1452
+
},
1453
+
links: vec![
1454
+
CollectedLink {
1455
+
target: Link::Uri("a.com".into()),
1456
+
path: ".abc.uri".into(),
1457
+
},
1458
+
CollectedLink {
1459
+
target: Link::Uri("b.com".into()),
1460
+
path: ".def.uri".into(),
1461
+
},
1462
+
],
1463
+
},
1464
+
1,
1465
+
)?;
1466
+
storage.push(
1467
+
&ActionableEvent::CreateLinks {
1468
+
record_id: RecordId {
1469
+
did: "did:plc:fdsa".into(),
1470
+
collection: "app.t.c".into(),
1471
+
rkey: "asdf".into(),
1472
+
},
1473
+
links: vec![
1474
+
CollectedLink {
1475
+
target: Link::Uri("a.com".into()),
1476
+
path: ".abc.uri".into(),
1477
+
},
1478
+
CollectedLink {
1479
+
target: Link::Uri("c.com".into()),
1480
+
path: ".def.uri".into(),
1481
+
},
1482
+
],
1483
+
},
1484
+
2,
1485
+
)?;
1486
+
storage.push(
1487
+
&ActionableEvent::CreateLinks {
1488
+
record_id: RecordId {
1489
+
did: "did:plc:fdsa".into(),
1490
+
collection: "app.t.c".into(),
1491
+
rkey: "asdf2".into(),
1492
+
},
1493
+
links: vec![
1494
+
CollectedLink {
1495
+
target: Link::Uri("a.com".into()),
1496
+
path: ".abc.uri".into(),
1497
+
},
1498
+
CollectedLink {
1499
+
target: Link::Uri("c.com".into()),
1500
+
path: ".def.uri".into(),
1501
+
},
1502
+
],
1503
+
},
1504
+
3,
1505
+
)?;
1506
+
assert_eq!(
1507
+
storage.get_many_to_many_counts(
1508
+
"a.com",
1509
+
"app.t.c",
1510
+
".abc.uri",
1511
+
".def.uri",
1512
+
10,
1513
+
None,
1514
+
&HashSet::new(),
1515
+
&HashSet::new(),
1516
+
)?,
1517
+
PagedOrderedCollection {
1518
+
items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1519
+
next: None,
1520
+
}
1521
+
);
1522
+
assert_eq!(
1523
+
storage.get_many_to_many_counts(
1524
+
"a.com",
1525
+
"app.t.c",
1526
+
".abc.uri",
1527
+
".def.uri",
1528
+
10,
1529
+
None,
1530
+
&HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1531
+
&HashSet::new(),
1532
+
)?,
1533
+
PagedOrderedCollection {
1534
+
items: vec![("c.com".to_string(), 2, 1),],
1535
+
next: None,
1536
+
}
1537
+
);
1538
+
assert_eq!(
1539
+
storage.get_many_to_many_counts(
1540
+
"a.com",
1541
+
"app.t.c",
1542
+
".abc.uri",
1543
+
".def.uri",
1544
+
10,
1545
+
None,
1546
+
&HashSet::new(),
1547
+
&HashSet::from_iter(["b.com".to_string()]),
1548
+
)?,
1549
+
PagedOrderedCollection {
1550
+
items: vec![("b.com".to_string(), 2, 2),],
1551
+
next: None,
1552
+
}
1553
+
);
1554
});
1555
}
+342
-40
constellation/src/storage/rocks_store.rs
+342
-40
constellation/src/storage/rocks_store.rs
···
1
-
use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
2
use crate::{CountsByCount, Did, RecordId};
3
use anyhow::{bail, Result};
4
use bincode::Options as BincodeOptions;
···
11
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
12
};
13
use serde::{Deserialize, Serialize};
14
-
use std::collections::{HashMap, HashSet};
15
use std::io::Read;
16
use std::marker::PhantomData;
17
use std::path::{Path, PathBuf};
···
20
Arc,
21
};
22
use std::thread;
23
-
use std::time::{Duration, Instant};
24
use tokio_util::sync::CancellationToken;
25
26
static DID_IDS_CF: &str = "did_ids";
···
29
static LINK_TARGETS_CF: &str = "link_targets";
30
31
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
32
33
// todo: actually understand and set these options probably better
34
fn rocks_opts_base() -> Options {
···
56
#[derive(Debug, Clone)]
57
pub struct RocksStorage {
58
pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
59
-
did_id_table: IdTable<Did, DidIdValue, true>,
60
-
target_id_table: IdTable<TargetKey, TargetId, false>,
61
is_writer: bool,
62
backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
63
}
···
85
fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
86
ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
87
}
88
-
fn init<const WITH_REVERSE: bool>(
89
-
self,
90
-
db: &DBWithThreadMode<MultiThreaded>,
91
-
) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> {
92
if db.cf_handle(&self.name).is_none() {
93
bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
94
}
···
119
}
120
}
121
#[derive(Debug, Clone)]
122
-
struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool>
123
where
124
Orig: KeyFromRocks,
125
for<'a> &'a Orig: AsRocksKey,
···
127
base: IdTableBase<Orig, IdVal>,
128
priv_id_seq: u64,
129
}
130
-
impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE>
131
where
132
Orig: KeyFromRocks,
133
for<'v> &'v IdVal: AsRocksValue,
···
139
_key_marker: PhantomData,
140
_val_marker: PhantomData,
141
name: name.into(),
142
-
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
143
}
144
}
145
fn get_id_val(
···
178
id_value
179
}))
180
}
181
fn estimate_count(&self) -> u64 {
182
self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
183
}
184
-
}
185
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true>
186
-
where
187
-
Orig: KeyFromRocks,
188
-
for<'v> &'v IdVal: AsRocksValue,
189
-
for<'k> &'k Orig: AsRocksKey,
190
-
{
191
fn get_or_create_id_val(
192
&mut self,
193
db: &DBWithThreadMode<MultiThreaded>,
···
215
}
216
}
217
}
218
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false>
219
-
where
220
-
Orig: KeyFromRocks,
221
-
for<'v> &'v IdVal: AsRocksValue,
222
-
for<'k> &'k Orig: AsRocksKey,
223
-
{
224
-
fn get_or_create_id_val(
225
-
&mut self,
226
-
db: &DBWithThreadMode<MultiThreaded>,
227
-
batch: &mut WriteBatch,
228
-
orig: &Orig,
229
-
) -> Result<IdVal> {
230
-
let cf = db.cf_handle(&self.base.name).unwrap();
231
-
self.__get_or_create_id_val(&cf, db, batch, orig)
232
-
}
233
-
}
234
235
impl IdTableValue for DidIdValue {
236
fn new(v: u64) -> Self {
···
249
}
250
}
251
252
impl RocksStorage {
253
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
254
Self::describe_metrics();
255
-
RocksStorage::open_readmode(path, false)
256
}
257
258
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
260
}
261
262
fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
263
-
let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF);
264
-
let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF);
265
266
let cfs = vec![
267
// id reference tables
268
did_id_table.cf_descriptor(),
···
296
is_writer: !readonly,
297
backup_task: None.into(),
298
})
299
}
300
301
pub fn start_backup(
···
826
}
827
828
impl LinkReader for RocksStorage {
829
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
830
let target_key = TargetKey(
831
Target(target.to_string()),
···
1042
.map(|s| s.parse::<u64>())
1043
.transpose()?
1044
.unwrap_or(0);
1045
Ok(StorageStats {
1046
dids,
1047
targetables,
1048
linking_records,
1049
})
1050
}
1051
}
···
1071
impl AsRocksValue for &TargetId {}
1072
impl KeyFromRocks for TargetKey {}
1073
impl ValueFromRocks for TargetId {}
1074
1075
// target_links table
1076
impl AsRocksKey for &TargetId {}
···
1142
}
1143
1144
// target ids
1145
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1146
struct TargetId(u64); // key
1147
1148
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1149
pub struct Target(pub String); // the actual target/uri
1150
1151
// targets (uris, dids, etc.): the reverse index
···
1
+
use super::{
2
+
ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection,
3
+
StorageStats,
4
+
};
5
use crate::{CountsByCount, Did, RecordId};
6
use anyhow::{bail, Result};
7
use bincode::Options as BincodeOptions;
···
14
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
15
};
16
use serde::{Deserialize, Serialize};
17
+
use std::collections::{BTreeMap, HashMap, HashSet};
18
use std::io::Read;
19
use std::marker::PhantomData;
20
use std::path::{Path, PathBuf};
···
23
Arc,
24
};
25
use std::thread;
26
+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27
use tokio_util::sync::CancellationToken;
28
29
static DID_IDS_CF: &str = "did_ids";
···
32
static LINK_TARGETS_CF: &str = "link_targets";
33
34
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
35
+
static STARTED_AT_KEY: &str = "jetstream_first_cursor";
36
+
// add reverse mappings for targets if this db was running before that was a thing
37
+
static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
38
+
39
+
static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
40
+
41
+
#[derive(Debug, Clone, Serialize, Deserialize)]
42
+
struct TargetIdRepairState {
43
+
/// start time for repair, microseconds timestamp
44
+
current_us_started_at: u64,
45
+
/// id table's latest id when repair started
46
+
id_when_started: u64,
47
+
/// id table id
48
+
latest_repaired_i: u64,
49
+
}
50
+
impl AsRocksValue for TargetIdRepairState {}
51
+
impl ValueFromRocks for TargetIdRepairState {}
52
53
// todo: actually understand and set these options probably better
54
fn rocks_opts_base() -> Options {
···
76
#[derive(Debug, Clone)]
77
pub struct RocksStorage {
78
pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
79
+
did_id_table: IdTable<Did, DidIdValue>,
80
+
target_id_table: IdTable<TargetKey, TargetId>,
81
is_writer: bool,
82
backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
83
}
···
105
fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
106
ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
107
}
108
+
fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> {
109
if db.cf_handle(&self.name).is_none() {
110
bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
111
}
···
136
}
137
}
138
#[derive(Debug, Clone)]
139
+
struct IdTable<Orig, IdVal: IdTableValue>
140
where
141
Orig: KeyFromRocks,
142
for<'a> &'a Orig: AsRocksKey,
···
144
base: IdTableBase<Orig, IdVal>,
145
priv_id_seq: u64,
146
}
147
+
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal>
148
where
149
Orig: KeyFromRocks,
150
for<'v> &'v IdVal: AsRocksValue,
···
156
_key_marker: PhantomData,
157
_val_marker: PhantomData,
158
name: name.into(),
159
+
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
160
}
161
}
162
fn get_id_val(
···
195
id_value
196
}))
197
}
198
+
199
fn estimate_count(&self) -> u64 {
200
self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
201
}
202
+
203
fn get_or_create_id_val(
204
&mut self,
205
db: &DBWithThreadMode<MultiThreaded>,
···
227
}
228
}
229
}
230
231
impl IdTableValue for DidIdValue {
232
fn new(v: u64) -> Self {
···
245
}
246
}
247
248
+
fn now() -> u64 {
249
+
SystemTime::now()
250
+
.duration_since(UNIX_EPOCH)
251
+
.unwrap()
252
+
.as_micros() as u64
253
+
}
254
+
255
impl RocksStorage {
256
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
257
Self::describe_metrics();
258
+
let me = RocksStorage::open_readmode(path, false)?;
259
+
me.global_init()?;
260
+
Ok(me)
261
}
262
263
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
265
}
266
267
fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
268
+
let did_id_table = IdTable::setup(DID_IDS_CF);
269
+
let target_id_table = IdTable::setup(TARGET_IDS_CF);
270
271
+
// note: global stuff like jetstream cursor goes in the default cf
272
+
// these are bonus extra cfs
273
let cfs = vec![
274
// id reference tables
275
did_id_table.cf_descriptor(),
···
303
is_writer: !readonly,
304
backup_task: None.into(),
305
})
306
+
}
307
+
308
+
fn global_init(&self) -> Result<()> {
309
+
let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
310
+
if first_run {
311
+
self.db.put(STARTED_AT_KEY, _rv(now()))?;
312
+
313
+
// hack / temporary: if we're a new db, put in a completed repair
314
+
// state so we don't run repairs (repairs are for old-code dbs)
315
+
let completed = TargetIdRepairState {
316
+
id_when_started: 0,
317
+
current_us_started_at: 0,
318
+
latest_repaired_i: 0,
319
+
};
320
+
self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
321
+
}
322
+
Ok(())
323
+
}
324
+
325
+
pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
326
+
let mut state = match self
327
+
.db
328
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
329
+
.map(|s| _vr(&s))
330
+
.transpose()?
331
+
{
332
+
Some(s) => s,
333
+
None => TargetIdRepairState {
334
+
id_when_started: self.did_id_table.priv_id_seq,
335
+
current_us_started_at: now(),
336
+
latest_repaired_i: 0,
337
+
},
338
+
};
339
+
340
+
eprintln!("initial repair state: {state:?}");
341
+
342
+
let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
343
+
344
+
let mut iter = self.db.raw_iterator_cf(&cf);
345
+
iter.seek_to_first();
346
+
347
+
eprintln!("repair iterator sent to first key");
348
+
349
+
// skip ahead if we're done some, or take a single first step
350
+
for _ in 0..state.latest_repaired_i {
351
+
iter.next();
352
+
}
353
+
354
+
eprintln!(
355
+
"repair iterator skipped to {}th key",
356
+
state.latest_repaired_i
357
+
);
358
+
359
+
let mut maybe_done = false;
360
+
361
+
let mut write_fast = rocksdb::WriteOptions::default();
362
+
write_fast.set_sync(false);
363
+
write_fast.disable_wal(true);
364
+
365
+
while !stay_alive.is_cancelled() && !maybe_done {
366
+
// let mut batch = WriteBatch::default();
367
+
368
+
let mut any_written = false;
369
+
370
+
for _ in 0..1000 {
371
+
if state.latest_repaired_i % 1_000_000 == 0 {
372
+
eprintln!("target iter at {}", state.latest_repaired_i);
373
+
}
374
+
state.latest_repaired_i += 1;
375
+
376
+
if !iter.valid() {
377
+
eprintln!("invalid iter, are we done repairing?");
378
+
maybe_done = true;
379
+
break;
380
+
};
381
+
382
+
// eprintln!("iterator seems to be valid! getting the key...");
383
+
let raw_key = iter.key().unwrap();
384
+
if raw_key.len() == 8 {
385
+
// eprintln!("found an 8-byte key, skipping it since it's probably an id...");
386
+
iter.next();
387
+
continue;
388
+
}
389
+
let target: TargetKey = _kr::<TargetKey>(raw_key)?;
390
+
let target_id: TargetId = _vr(iter.value().unwrap())?;
391
+
392
+
self.db
393
+
.put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
394
+
any_written = true;
395
+
iter.next();
396
+
}
397
+
398
+
if any_written {
399
+
self.db
400
+
.put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
401
+
std::thread::sleep(breather);
402
+
}
403
+
}
404
+
405
+
eprintln!("repair iterator done.");
406
+
407
+
Ok(false)
408
}
409
410
pub fn start_backup(
···
935
}
936
937
impl LinkReader for RocksStorage {
938
+
fn get_many_to_many_counts(
939
+
&self,
940
+
target: &str,
941
+
collection: &str,
942
+
path: &str,
943
+
path_to_other: &str,
944
+
limit: u64,
945
+
after: Option<String>,
946
+
filter_dids: &HashSet<Did>,
947
+
filter_to_targets: &HashSet<String>,
948
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
949
+
let collection = Collection(collection.to_string());
950
+
let path = RPath(path.to_string());
951
+
952
+
let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone());
953
+
954
+
// unfortunately the cursor is a, uh, stringified number.
955
+
// this was easier for the memstore (plain target, not target id), and
956
+
// making it generic is a bit awful.
957
+
// so... parse the number out of a string here :(
958
+
// TODO: this should bubble up to a BAD_REQUEST response
959
+
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
960
+
961
+
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
962
+
eprintln!("nothin doin for this target, {target_key:?}");
963
+
return Ok(Default::default());
964
+
};
965
+
966
+
let filter_did_ids: HashMap<DidId, bool> = filter_dids
967
+
.iter()
968
+
.filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
969
+
.collect::<Result<Vec<DidIdValue>>>()?
970
+
.into_iter()
971
+
.map(|DidIdValue(id, active)| (id, active))
972
+
.collect();
973
+
974
+
// stored targets are keyed by triples of (target, collection, path).
975
+
// target filtering only consideres the target itself, so we actually
976
+
// need to do a prefix iteration of all target ids for this target and
977
+
// keep them all.
978
+
// i *think* the number of keys at a target prefix should usually be
979
+
// pretty small, so this is hopefully fine. but if it turns out to be
980
+
// large, we can push this filtering back into the main links loop and
981
+
// do forward db queries per backlink to get the raw target back out.
982
+
let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
983
+
for t in filter_to_targets {
984
+
for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
985
+
filter_to_target_ids.insert(target_id);
986
+
}
987
+
}
988
+
989
+
let linkers = self.get_target_linkers(&target_id)?;
990
+
991
+
let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new();
992
+
993
+
for (did_id, rkey) in linkers.0 {
994
+
if did_id.is_empty() {
995
+
continue;
996
+
}
997
+
998
+
if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
999
+
continue;
1000
+
}
1001
+
1002
+
let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey);
1003
+
let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
1004
+
continue;
1005
+
};
1006
+
1007
+
let Some(fwd_target) = targets
1008
+
.0
1009
+
.into_iter()
1010
+
.filter_map(|RecordLinkTarget(rpath, target_id)| {
1011
+
if rpath.0 == path_to_other
1012
+
&& (filter_to_target_ids.is_empty()
1013
+
|| filter_to_target_ids.contains(&target_id))
1014
+
{
1015
+
Some(target_id)
1016
+
} else {
1017
+
None
1018
+
}
1019
+
})
1020
+
.take(1)
1021
+
.next()
1022
+
else {
1023
+
eprintln!("no forward match");
1024
+
continue;
1025
+
};
1026
+
1027
+
// small relief: we page over target ids, so we can already bail
1028
+
// reprocessing previous pages here
1029
+
if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) {
1030
+
continue;
1031
+
}
1032
+
1033
+
// aand we can skip target ids that must be on future pages
1034
+
// (this check continues after the did-lookup, which we have to do)
1035
+
let page_is_full = grouped_counts.len() as u64 >= limit;
1036
+
if page_is_full {
1037
+
let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh
1038
+
if fwd_target > *current_max {
1039
+
continue;
1040
+
}
1041
+
}
1042
+
1043
+
// bit painful: 2-step lookup to make sure this did is active
1044
+
let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
1045
+
eprintln!("failed to look up did from did_id {did_id:?}");
1046
+
continue;
1047
+
};
1048
+
let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
1049
+
eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1050
+
continue;
1051
+
};
1052
+
if !active {
1053
+
continue;
1054
+
}
1055
+
1056
+
// page-management, continued
1057
+
// if we have a full page, and we're inserting a *new* key less than
1058
+
// the current max, then we can evict the current max
1059
+
let mut should_evict = false;
1060
+
let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| {
1061
+
// this is a *new* key, so kick the max if we're full
1062
+
should_evict = page_is_full;
1063
+
Default::default()
1064
+
});
1065
+
entry.0 += 1;
1066
+
entry.1.insert(did_id);
1067
+
1068
+
if should_evict {
1069
+
grouped_counts.pop_last();
1070
+
}
1071
+
}
1072
+
1073
+
let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len());
1074
+
for (target_id, (n, dids)) in &grouped_counts {
1075
+
let Some(target) = self
1076
+
.target_id_table
1077
+
.get_val_from_id(&self.db, target_id.0)?
1078
+
else {
1079
+
eprintln!("failed to look up target from target_id {target_id:?}");
1080
+
continue;
1081
+
};
1082
+
items.push((target.0 .0, *n, dids.len() as u64));
1083
+
}
1084
+
1085
+
let next = if grouped_counts.len() as u64 >= limit {
1086
+
// yeah.... it's a number saved as a string......sorry
1087
+
grouped_counts
1088
+
.keys()
1089
+
.next_back()
1090
+
.map(|k| format!("{}", k.0))
1091
+
} else {
1092
+
None
1093
+
};
1094
+
1095
+
Ok(PagedOrderedCollection { items, next })
1096
+
}
1097
+
1098
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
1099
let target_key = TargetKey(
1100
Target(target.to_string()),
···
1311
.map(|s| s.parse::<u64>())
1312
.transpose()?
1313
.unwrap_or(0);
1314
+
let started_at = self
1315
+
.db
1316
+
.get(STARTED_AT_KEY)?
1317
+
.map(|c| _vr(&c))
1318
+
.transpose()?
1319
+
.unwrap_or(COZY_FIRST_CURSOR);
1320
+
1321
+
let other_data = self
1322
+
.db
1323
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
1324
+
.map(|s| _vr(&s))
1325
+
.transpose()?
1326
+
.map(
1327
+
|TargetIdRepairState {
1328
+
current_us_started_at,
1329
+
id_when_started,
1330
+
latest_repaired_i,
1331
+
}| {
1332
+
HashMap::from([
1333
+
("current_us_started_at".to_string(), current_us_started_at),
1334
+
("id_when_started".to_string(), id_when_started),
1335
+
("latest_repaired_i".to_string(), latest_repaired_i),
1336
+
])
1337
+
},
1338
+
)
1339
+
.unwrap_or(HashMap::default());
1340
+
1341
Ok(StorageStats {
1342
dids,
1343
targetables,
1344
linking_records,
1345
+
started_at: Some(started_at),
1346
+
other_data,
1347
})
1348
}
1349
}
···
1369
impl AsRocksValue for &TargetId {}
1370
impl KeyFromRocks for TargetKey {}
1371
impl ValueFromRocks for TargetId {}
1372
+
1373
+
// temp?
1374
+
impl KeyFromRocks for TargetId {}
1375
+
impl AsRocksValue for &TargetKey {}
1376
1377
// target_links table
1378
impl AsRocksKey for &TargetId {}
···
1444
}
1445
1446
// target ids
1447
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
1448
struct TargetId(u64); // key
1449
1450
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1451
pub struct Target(pub String); // the actual target/uri
1452
1453
// targets (uris, dids, etc.): the reverse index
+1
-1
constellation/templates/get-backlinks.html.j2
+1
-1
constellation/templates/get-backlinks.html.j2
+67
constellation/templates/get-many-to-many-counts.html.j2
+67
constellation/templates/get-many-to-many-counts.html.j2
···
···
1
+
{% extends "base.html.j2" %}
2
+
{% import "try-it-macros.html.j2" as try_it %}
3
+
4
+
{% block title %}Many to Many counts{% endblock %}
5
+
{% block description %}Counts of many-to-many {{ query.source }} join records with links to {{ query.subject }} and a secondary target at {{ query.path_to_other }}{% endblock %}
6
+
7
+
{% block content %}
8
+
9
+
{% call try_it::get_many_to_many_counts(
10
+
query.subject,
11
+
query.source,
12
+
query.path_to_other,
13
+
query.did,
14
+
query.other_subject,
15
+
query.limit,
16
+
) %}
17
+
18
+
<h2>
19
+
Many-to-many links to <code>{{ query.subject }}</code> joining through <code>{{ query.path_to_other }}</code>
20
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
21
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
22
+
{% endif %}
23
+
</h2>
24
+
25
+
<p><strong>{% if cursor.is_some() || query.cursor.is_some() %}more than {% endif %}{{ counts_by_other_subject.len()|to_u64|human_number }} joins</strong> <code>{{ query.source }}→{{ query.path_to_other }}</code></p>
26
+
27
+
<ul>
28
+
<li>See direct backlinks at <code>/xrpc/blue.microcosm.links.getBacklinks</code>: <a href="/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject|urlencode }}&source={{ query.source|urlencode }}">/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject }}&source={{ query.source }}</a></li>
29
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
30
+
</ul>
31
+
32
+
<h3>Counts by other subject:</h3>
33
+
34
+
{% for counts in counts_by_other_subject %}
35
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>Joined subject</strong>: {{ counts.subject }}
36
+
<strong>Joining records</strong>: {{ counts.total }}
37
+
<strong>Unique joiner ids</strong>: {{ counts.distinct }}
38
+
-> {% if let Some(browseable_uri) = counts.subject|to_browseable -%}
39
+
<a href="{{ browseable_uri }}">browse record</a>
40
+
{%- endif %}</pre>
41
+
{% endfor %}
42
+
43
+
{% if let Some(c) = cursor %}
44
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
45
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
46
+
<input type="hidden" name="source" value="{{ query.source }}" />
47
+
<input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" />
48
+
{% for did in query.did %}
49
+
<input type="hidden" name="did" value="{{ did }}" />
50
+
{% endfor %}
51
+
{% for otherSubject in query.other_subject %}
52
+
<input type="hidden" name="otherSubject" value="{{ otherSubject }}" />
53
+
{% endfor %}
54
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
55
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
56
+
<button type="submit">next page…</button>
57
+
</form>
58
+
{% else %}
59
+
<button disabled><em>end of results</em></button>
60
+
{% endif %}
61
+
62
+
<details>
63
+
<summary>Raw JSON response</summary>
64
+
<pre class="code">{{ self|tojson }}</pre>
65
+
</details>
66
+
67
+
{% endblock %}
+38
-2
constellation/templates/hello.html.j2
+38
-2
constellation/templates/hello.html.j2
···
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
21
<p>
22
-
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">{{ days_indexed|human_number }}</span> days.<br/>
23
<small>(indexing new records in real time, backfill coming soon!)</small>
24
</p>
25
26
-
<p>But feel free to use it! If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
27
28
29
<h2>API Endpoints</h2>
···
43
44
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
45
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
46
47
48
<h3 class="route"><code>GET /links</code></h3>
···
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
21
<p>
22
+
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">
23
+
{%- if let Some(days) = days_indexed %}
24
+
{{ days|human_number }}
25
+
{% else %}
26
+
???
27
+
{% endif -%}
28
+
</span> days.<br/>
29
<small>(indexing new records in real time, backfill coming soon!)</small>
30
</p>
31
32
+
{# {% for k, v in stats.other_data.iter() %}
33
+
<p><strong>{{ k }}</strong>: {{ v }}</p>
34
+
{% endfor %} #}
35
+
36
+
<p>You're welcome to use this public instance! Please do not build the torment nexus. If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
37
38
39
<h2>API Endpoints</h2>
···
53
54
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
55
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
56
+
57
+
58
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3>
59
+
60
+
<p>TODO: description</p>
61
+
62
+
<h4>Query parameters:</h4>
63
+
64
+
<ul>
65
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
66
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
67
+
<li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li>
68
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
69
+
<li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple users. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
70
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
71
+
</ul>
72
+
73
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
74
+
{% call try_it::get_many_to_many_counts(
75
+
"at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue",
76
+
"sh.tangled.label.op:add[].key",
77
+
"subject",
78
+
[""],
79
+
[""],
80
+
25,
81
+
) %}
82
83
84
<h3 class="route"><code>GET /links</code></h3>
+43
-1
constellation/templates/try-it-macros.html.j2
+43
-1
constellation/templates/try-it-macros.html.j2
···
1
{% macro get_backlinks(subject, source, dids, limit) %}
2
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
3
-
<pre class="code"><strong>GET</strong> /links
4
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
5
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
6
{%- for did in dids %}{% if !did.is_empty() %}
···
20
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
21
p.insertBefore(i, didPlaceholder);
22
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
23
});
24
</script>
25
{% endmacro %}
···
1
{% macro get_backlinks(subject, source, dids, limit) %}
2
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
3
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks
4
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
5
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
6
{%- for did in dids %}{% if !did.is_empty() %}
···
20
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
21
p.insertBefore(i, didPlaceholder);
22
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
23
+
});
24
+
</script>
25
+
{% endmacro %}
26
+
27
+
{% macro get_many_to_many_counts(subject, source, pathToOther, dids, otherSubjects, limit) %}
28
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
29
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToManyCounts
30
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
31
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
32
+
&pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing.uri" />
33
+
{%- for did in dids %}{% if !did.is_empty() %}
34
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
35
+
<span id="m2m-subject-placeholder"></span> <button id="m2m-add-subject">+ other subject filter</button>
36
+
{%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %}
37
+
&otherSubject= <input type="text" name="did" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %}
38
+
<span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button>
39
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
40
+
</form>
41
+
<script>
42
+
const m2mAddDidButton = document.getElementById('m2m-add-did');
43
+
const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder');
44
+
m2mAddDidButton.addEventListener('click', e => {
45
+
e.preventDefault();
46
+
const i = document.createElement('input');
47
+
i.placeholder = 'did:plc:...';
48
+
i.name = "did"
49
+
const p = m2mAddDidButton.parentNode;
50
+
p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder);
51
+
p.insertBefore(i, m2mDidPlaceholder);
52
+
p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder);
53
+
});
54
+
const m2mAddSubjectButton = document.getElementById('m2m-add-subject');
55
+
const m2mSubjectPlaceholder = document.getElementById('m2m-subject-placeholder');
56
+
m2mAddSubjectButton.addEventListener('click', e => {
57
+
e.preventDefault();
58
+
const i = document.createElement('input');
59
+
i.placeholder = 'at-uri, did, uri...';
60
+
i.name = "otherSubject"
61
+
const p = m2mAddSubjectButton.parentNode;
62
+
p.insertBefore(document.createTextNode('&otherSubject= '), m2mSubjectPlaceholder);
63
+
p.insertBefore(i, m2mSubjectPlaceholder);
64
+
p.insertBefore(document.createTextNode('\n '), m2mSubjectPlaceholder);
65
});
66
</script>
67
{% endmacro %}