+149
-1
constellation/src/server/mod.rs
+149
-1
constellation/src/server/mod.rs
···
35
35
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
36
36
37
37
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
38
-
eprintln!("handler join error: {e}");
38
+
eprintln!("handler error: {e}");
39
39
http::StatusCode::INTERNAL_SERVER_ERROR
40
40
}
41
41
···
56
56
.map_err(to500)?
57
57
}
58
58
}),
59
+
)
60
+
.route(
61
+
"/xrpc/blue.microcosm.links.getManyToManyCounts",
62
+
get({
63
+
let store = store.clone();
64
+
move |accept, query| async {
65
+
spawn_blocking(|| get_many_to_many_counts(accept, query, store))
66
+
.await
67
+
.map_err(to500)?
68
+
}
69
+
})
59
70
)
60
71
.route(
61
72
"/links/count",
···
211
222
stats,
212
223
}))
213
224
}
225
+
226
+
#[derive(Clone, Deserialize)]
227
+
#[serde(rename_all = "camelCase")]
228
+
struct GetManyToManyCountsQuery {
229
+
subject: String,
230
+
source: String,
231
+
/// path to the secondary link in the linking record
232
+
path_to_other: String,
233
+
/// filter to linking records (join of the m2m) by these DIDs
234
+
#[serde(default)]
235
+
did: Vec<String>,
236
+
/// filter to specific secondary records
237
+
#[serde(default)]
238
+
other_subject: Vec<String>,
239
+
cursor: Option<OpaqueApiCursor>,
240
+
/// Set the max number of links to return per page of results
241
+
#[serde(default = "get_default_cursor_limit")]
242
+
limit: u64,
243
+
}
244
+
#[derive(Serialize)]
245
+
struct OtherSubjectCount {
246
+
subject: String,
247
+
total: u64,
248
+
distinct: u64,
249
+
}
250
+
#[derive(Template, Serialize)]
251
+
#[template(path = "get-many-to-many-counts.html.j2")]
252
+
struct GetManyToManyCountsResponse {
253
+
counts_by_other_subject: Vec<OtherSubjectCount>,
254
+
total_other_subjects: u64,
255
+
cursor: Option<OpaqueApiCursor>,
256
+
#[serde(skip_serializing)]
257
+
query: GetManyToManyCountsQuery,
258
+
}
259
+
fn get_many_to_many_counts(
260
+
accept: ExtractAccept,
261
+
query: axum_extra::extract::Query<GetManyToManyCountsQuery>,
262
+
store: impl LinkReader,
263
+
) -> Result<impl IntoResponse, http::StatusCode> {
264
+
let cursor_key = query
265
+
.cursor
266
+
.clone()
267
+
.map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
268
+
.transpose()?
269
+
.map(|c| c.next);
270
+
271
+
let limit = query.limit;
272
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
273
+
return Err(http::StatusCode::BAD_REQUEST);
274
+
}
275
+
276
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
277
+
query
278
+
.did
279
+
.iter()
280
+
.map(|d| d.trim())
281
+
.filter(|d| !d.is_empty())
282
+
.map(|d| Did(d.to_string())),
283
+
);
284
+
285
+
let filter_other_subjects: HashSet<String> = HashSet::from_iter(
286
+
query
287
+
.other_subject
288
+
.iter()
289
+
.map(|s| s.trim().to_string())
290
+
.filter(|s| !s.is_empty()),
291
+
);
292
+
293
+
let Some((collection, path)) = query.source.split_once(':') else {
294
+
return Err(http::StatusCode::BAD_REQUEST);
295
+
};
296
+
let path = format!(".{path}");
297
+
298
+
let paged = store
299
+
.get_many_to_many_counts(
300
+
&query.subject,
301
+
collection,
302
+
&path,
303
+
&query.path_to_other,
304
+
limit,
305
+
cursor_key,
306
+
&filter_dids,
307
+
&filter_other_subjects,
308
+
)
309
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
310
+
311
+
let cursor = paged.next.map(|next| {
312
+
ApiKeyedCursor {
313
+
version: paged.total,
314
+
next,
315
+
}
316
+
.into()
317
+
});
318
+
319
+
let items = paged
320
+
.items
321
+
.into_iter()
322
+
.map(|(subject, total, distinct)|
323
+
OtherSubjectCount {
324
+
subject,
325
+
total,
326
+
distinct,
327
+
})
328
+
.collect();
329
+
330
+
Ok(acceptable(
331
+
accept,
332
+
GetManyToManyCountsResponse {
333
+
counts_by_other_subject: items,
334
+
total_other_subjects: paged.total,
335
+
cursor,
336
+
query: (*query).clone(),
337
+
},
338
+
))
339
+
}
340
+
341
+
214
342
215
343
#[derive(Clone, Deserialize)]
216
344
struct GetLinksCountQuery {
···
609
737
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
610
738
}
611
739
}
740
+
741
+
#[derive(Serialize, Deserialize)] // for bincode
742
+
struct ApiKeyedCursor {
743
+
version: u64, // total length (dirty check)
744
+
next: String, // the key
745
+
}
746
+
747
+
impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor {
748
+
type Error = bincode::Error;
749
+
750
+
fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
751
+
bincode::DefaultOptions::new().deserialize(&item.0)
752
+
}
753
+
}
754
+
755
+
impl From<ApiKeyedCursor> for OpaqueApiCursor {
756
+
fn from(item: ApiKeyedCursor) -> Self {
757
+
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
758
+
}
759
+
}
+78
-1
constellation/src/storage/mem_store.rs
+78
-1
constellation/src/storage/mem_store.rs
···
1
-
use super::{LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
1
+
use super::{LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats};
2
2
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
3
3
use anyhow::Result;
4
4
use links::CollectedLink;
···
132
132
}
133
133
134
134
impl LinkReader for MemStorage {
135
+
fn get_many_to_many_counts(
136
+
&self,
137
+
target: &str,
138
+
collection: &str,
139
+
path: &str,
140
+
path_to_other: &str,
141
+
limit: u64,
142
+
after: Option<String>,
143
+
filter_dids: &HashSet<Did>,
144
+
filter_to_targets: &HashSet<String>,
145
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
146
+
let empty = || {
147
+
PagedOrderedCollection {
148
+
items: vec![],
149
+
next: None,
150
+
total: 0,
151
+
}
152
+
};
153
+
let data = self.0.lock().unwrap();
154
+
let Some(paths) = data.targets.get(&Target::new(target)) else {
155
+
return Ok(empty());
156
+
};
157
+
let Some(linkers) = paths.get(&Source::new(collection, path)) else {
158
+
return Ok(empty());
159
+
};
160
+
161
+
let path_to_other = RecordPath::new(path_to_other);
162
+
let filter_to_targets: HashSet::<Target> = HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
163
+
164
+
let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new();
165
+
for (did, rkey) in linkers.into_iter().cloned().filter_map(|l| l) {
166
+
if !filter_dids.is_empty() && !filter_dids.contains(&did) {
167
+
continue;
168
+
}
169
+
if let Some(fwd_target) = data
170
+
.links
171
+
.get(&did)
172
+
.unwrap_or(&HashMap::new())
173
+
.get(&RepoId { collection: collection.to_string(), rkey })
174
+
.unwrap_or(&Vec::new())
175
+
.into_iter()
176
+
.filter_map(|(path, target)| {
177
+
if *path == path_to_other
178
+
&& (filter_to_targets.is_empty() || filter_to_targets.contains(target))
179
+
{ Some(target) } else { None }
180
+
})
181
+
.take(1)
182
+
.next()
183
+
{
184
+
let e = grouped_counts.entry(fwd_target.clone()).or_default();
185
+
e.0 += 1;
186
+
e.1.insert(did.clone());
187
+
}
188
+
}
189
+
let total = grouped_counts.len() as u64;
190
+
let mut items: Vec<(String, u64, u64)> = grouped_counts
191
+
.iter()
192
+
.map(|(k, (n, u))| (k.0.clone(), *n, u.len() as u64))
193
+
.collect();
194
+
items.sort();
195
+
items = items
196
+
.into_iter()
197
+
.skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false))
198
+
.take(limit as usize)
199
+
.collect();
200
+
let next = if items.len() as u64 >= limit {
201
+
items.last().map(|(t, _, _)| t.clone())
202
+
} else {
203
+
None
204
+
};
205
+
Ok(PagedOrderedCollection {
206
+
items,
207
+
next,
208
+
total,
209
+
})
210
+
}
211
+
135
212
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
136
213
let data = self.0.lock().unwrap();
137
214
let Some(paths) = data.targets.get(&Target::new(target)) else {
+216
constellation/src/storage/mod.rs
+216
constellation/src/storage/mod.rs
···
19
19
pub total: u64,
20
20
}
21
21
22
+
/// A paged collection whose keys are sorted instead of indexed
23
+
///
24
+
/// this has weaker guarantees than PagedAppendingCollection: it might
25
+
/// return a totally consistent snapshot. but it should avoid duplicates
26
+
/// and each page should at least be internally consistent.
27
+
#[derive(Debug, PartialEq)]
28
+
pub struct PagedOrderedCollection<T, K: Ord> {
29
+
pub items: Vec<T>,
30
+
pub next: Option<K>,
31
+
pub total: u64,
32
+
}
33
+
22
34
#[derive(Debug, Deserialize, Serialize, PartialEq)]
23
35
pub struct StorageStats {
24
36
/// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
···
48
60
}
49
61
50
62
pub trait LinkReader: Clone + Send + Sync + 'static {
63
+
fn get_many_to_many_counts(
64
+
&self,
65
+
target: &str,
66
+
collection: &str,
67
+
path: &str,
68
+
path_to_other: &str,
69
+
limit: u64,
70
+
after: Option<String>,
71
+
filter_dids: &HashSet<Did>,
72
+
filter_to_targets: &HashSet<String>,
73
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
74
+
51
75
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
52
76
53
77
fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
···
1326
1350
counts
1327
1351
});
1328
1352
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1353
+
});
1354
+
1355
+
//////// many-to-many /////////
1356
+
1357
+
test_each_storage!(get_m2m_counts_empty, |storage| {
1358
+
assert_eq!(storage.get_many_to_many_counts(
1359
+
"a.com",
1360
+
"a.b.c",
1361
+
".d.e",
1362
+
".f.g",
1363
+
10,
1364
+
None,
1365
+
&HashSet::new(),
1366
+
&HashSet::new(),
1367
+
)?, PagedOrderedCollection {
1368
+
items: vec![],
1369
+
next: None,
1370
+
total: 0,
1371
+
});
1372
+
});
1373
+
1374
+
test_each_storage!(get_m2m_counts_single, |storage| {
1375
+
storage.push(
1376
+
&ActionableEvent::CreateLinks {
1377
+
record_id: RecordId {
1378
+
did: "did:plc:asdf".into(),
1379
+
collection: "app.t.c".into(),
1380
+
rkey: "asdf".into(),
1381
+
},
1382
+
links: vec![
1383
+
CollectedLink {
1384
+
target: Link::Uri("a.com".into()),
1385
+
path: ".abc.uri".into(),
1386
+
},
1387
+
CollectedLink {
1388
+
target: Link::Uri("b.com".into()),
1389
+
path: ".def.uri".into(),
1390
+
},
1391
+
CollectedLink {
1392
+
target: Link::Uri("b.com".into()),
1393
+
path: ".ghi.uri".into(),
1394
+
},
1395
+
],
1396
+
},
1397
+
0,
1398
+
)?;
1399
+
assert_eq!(storage.get_many_to_many_counts(
1400
+
"a.com",
1401
+
"app.t.c",
1402
+
".abc.uri",
1403
+
".def.uri",
1404
+
10,
1405
+
None,
1406
+
&HashSet::new(),
1407
+
&HashSet::new(),
1408
+
)?, PagedOrderedCollection {
1409
+
items: vec![("b.com".to_string(), 1, 1)],
1410
+
next: None,
1411
+
total: 1,
1412
+
});
1413
+
});
1414
+
1415
+
test_each_storage!(get_m2m_counts_filters, |storage| {
1416
+
storage.push(
1417
+
&ActionableEvent::CreateLinks {
1418
+
record_id: RecordId {
1419
+
did: "did:plc:asdf".into(),
1420
+
collection: "app.t.c".into(),
1421
+
rkey: "asdf".into(),
1422
+
},
1423
+
links: vec![
1424
+
CollectedLink {
1425
+
target: Link::Uri("a.com".into()),
1426
+
path: ".abc.uri".into(),
1427
+
},
1428
+
CollectedLink {
1429
+
target: Link::Uri("b.com".into()),
1430
+
path: ".def.uri".into(),
1431
+
},
1432
+
],
1433
+
},
1434
+
0,
1435
+
)?;
1436
+
storage.push(
1437
+
&ActionableEvent::CreateLinks {
1438
+
record_id: RecordId {
1439
+
did: "did:plc:asdfasdf".into(),
1440
+
collection: "app.t.c".into(),
1441
+
rkey: "asdf".into(),
1442
+
},
1443
+
links: vec![
1444
+
CollectedLink {
1445
+
target: Link::Uri("a.com".into()),
1446
+
path: ".abc.uri".into(),
1447
+
},
1448
+
CollectedLink {
1449
+
target: Link::Uri("b.com".into()),
1450
+
path: ".def.uri".into(),
1451
+
},
1452
+
],
1453
+
},
1454
+
1,
1455
+
)?;
1456
+
storage.push(
1457
+
&ActionableEvent::CreateLinks {
1458
+
record_id: RecordId {
1459
+
did: "did:plc:fdsa".into(),
1460
+
collection: "app.t.c".into(),
1461
+
rkey: "asdf".into(),
1462
+
},
1463
+
links: vec![
1464
+
CollectedLink {
1465
+
target: Link::Uri("a.com".into()),
1466
+
path: ".abc.uri".into(),
1467
+
},
1468
+
CollectedLink {
1469
+
target: Link::Uri("c.com".into()),
1470
+
path: ".def.uri".into(),
1471
+
},
1472
+
],
1473
+
},
1474
+
2,
1475
+
)?;
1476
+
storage.push(
1477
+
&ActionableEvent::CreateLinks {
1478
+
record_id: RecordId {
1479
+
did: "did:plc:fdsa".into(),
1480
+
collection: "app.t.c".into(),
1481
+
rkey: "asdf2".into(),
1482
+
},
1483
+
links: vec![
1484
+
CollectedLink {
1485
+
target: Link::Uri("a.com".into()),
1486
+
path: ".abc.uri".into(),
1487
+
},
1488
+
CollectedLink {
1489
+
target: Link::Uri("c.com".into()),
1490
+
path: ".def.uri".into(),
1491
+
},
1492
+
],
1493
+
},
1494
+
3,
1495
+
)?;
1496
+
assert_eq!(storage.get_many_to_many_counts(
1497
+
"a.com",
1498
+
"app.t.c",
1499
+
".abc.uri",
1500
+
".def.uri",
1501
+
10,
1502
+
None,
1503
+
&HashSet::new(),
1504
+
&HashSet::new(),
1505
+
)?, PagedOrderedCollection {
1506
+
items: vec![
1507
+
("b.com".to_string(), 2, 2),
1508
+
("c.com".to_string(), 2, 1),
1509
+
],
1510
+
next: None,
1511
+
total: 2,
1512
+
});
1513
+
assert_eq!(storage.get_many_to_many_counts(
1514
+
"a.com",
1515
+
"app.t.c",
1516
+
".abc.uri",
1517
+
".def.uri",
1518
+
10,
1519
+
None,
1520
+
&HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1521
+
&HashSet::new(),
1522
+
)?, PagedOrderedCollection {
1523
+
items: vec![
1524
+
("c.com".to_string(), 2, 1),
1525
+
],
1526
+
next: None,
1527
+
total: 1,
1528
+
});
1529
+
assert_eq!(storage.get_many_to_many_counts(
1530
+
"a.com",
1531
+
"app.t.c",
1532
+
".abc.uri",
1533
+
".def.uri",
1534
+
10,
1535
+
None,
1536
+
&HashSet::new(),
1537
+
&HashSet::from_iter(["b.com".to_string()]),
1538
+
)?, PagedOrderedCollection {
1539
+
items: vec![
1540
+
("b.com".to_string(), 2, 2),
1541
+
],
1542
+
next: None,
1543
+
total: 1,
1544
+
});
1329
1545
});
1330
1546
}
+15
-1
constellation/src/storage/rocks_store.rs
+15
-1
constellation/src/storage/rocks_store.rs
···
1
-
use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
1
+
use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats};
2
2
use crate::{CountsByCount, Did, RecordId};
3
3
use anyhow::{bail, Result};
4
4
use bincode::Options as BincodeOptions;
···
826
826
}
827
827
828
828
impl LinkReader for RocksStorage {
829
+
fn get_many_to_many_counts(
830
+
&self,
831
+
_target: &str,
832
+
_collection: &str,
833
+
_path: &str,
834
+
_path_to_other: &str,
835
+
_limit: u64,
836
+
_after: Option<String>,
837
+
_filter_dids: &HashSet<Did>,
838
+
_filter_to_targets: &HashSet<String>,
839
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
840
+
todo!();
841
+
}
842
+
829
843
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
830
844
let target_key = TargetKey(
831
845
Target(target.to_string()),
constellation/templates/get-many-to-many-counts.html.j2
constellation/templates/get-many-to-many-counts.html.j2
This is a binary file and will not be displayed.