+7
-19
constellation/src/server/mod.rs
+7
-19
constellation/src/server/mod.rs
···
66
66
.await
67
67
.map_err(to500)?
68
68
}
69
-
})
69
+
}),
70
70
)
71
71
.route(
72
72
"/links/count",
···
251
251
#[template(path = "get-many-to-many-counts.html.j2")]
252
252
struct GetManyToManyCountsResponse {
253
253
counts_by_other_subject: Vec<OtherSubjectCount>,
254
-
total_other_subjects: u64,
255
254
cursor: Option<OpaqueApiCursor>,
256
255
#[serde(skip_serializing)]
257
256
query: GetManyToManyCountsQuery,
···
308
307
)
309
308
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
310
309
311
-
let cursor = paged.next.map(|next| {
312
-
ApiKeyedCursor {
313
-
version: paged.total,
314
-
next,
315
-
}
316
-
.into()
317
-
});
310
+
let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into());
318
311
319
312
let items = paged
320
313
.items
321
314
.into_iter()
322
-
.map(|(subject, total, distinct)|
323
-
OtherSubjectCount {
324
-
subject,
325
-
total,
326
-
distinct,
327
-
})
315
+
.map(|(subject, total, distinct)| OtherSubjectCount {
316
+
subject,
317
+
total,
318
+
distinct,
319
+
})
328
320
.collect();
329
321
330
322
Ok(acceptable(
331
323
accept,
332
324
GetManyToManyCountsResponse {
333
325
counts_by_other_subject: items,
334
-
total_other_subjects: paged.total,
335
326
cursor,
336
327
query: (*query).clone(),
337
328
},
338
329
))
339
330
}
340
-
341
-
342
331
343
332
#[derive(Clone, Deserialize)]
344
333
struct GetLinksCountQuery {
···
740
729
741
730
#[derive(Serialize, Deserialize)] // for bincode
742
731
struct ApiKeyedCursor {
743
-
version: u64, // total length (dirty check)
744
732
next: String, // the key
745
733
}
746
734
+17
-19
constellation/src/storage/mem_store.rs
+17
-19
constellation/src/storage/mem_store.rs
···
1
-
use super::{LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats};
1
+
use super::{
2
+
LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats,
3
+
};
2
4
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
3
5
use anyhow::Result;
4
6
use links::CollectedLink;
···
143
145
filter_dids: &HashSet<Did>,
144
146
filter_to_targets: &HashSet<String>,
145
147
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
146
-
let empty = || {
147
-
PagedOrderedCollection {
148
-
items: vec![],
149
-
next: None,
150
-
total: 0,
151
-
}
152
-
};
153
148
let data = self.0.lock().unwrap();
154
149
let Some(paths) = data.targets.get(&Target::new(target)) else {
155
-
return Ok(empty());
150
+
return Ok(PagedOrderedCollection::default());
156
151
};
157
152
let Some(linkers) = paths.get(&Source::new(collection, path)) else {
158
-
return Ok(empty());
153
+
return Ok(PagedOrderedCollection::default());
159
154
};
160
155
161
156
let path_to_other = RecordPath::new(path_to_other);
162
-
let filter_to_targets: HashSet::<Target> = HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
157
+
let filter_to_targets: HashSet<Target> =
158
+
HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
163
159
164
160
let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new();
165
161
for (did, rkey) in linkers.into_iter().cloned().filter_map(|l| l) {
···
170
166
.links
171
167
.get(&did)
172
168
.unwrap_or(&HashMap::new())
173
-
.get(&RepoId { collection: collection.to_string(), rkey })
169
+
.get(&RepoId {
170
+
collection: collection.to_string(),
171
+
rkey,
172
+
})
174
173
.unwrap_or(&Vec::new())
175
174
.into_iter()
176
175
.filter_map(|(path, target)| {
177
176
if *path == path_to_other
178
177
&& (filter_to_targets.is_empty() || filter_to_targets.contains(target))
179
-
{ Some(target) } else { None }
178
+
{
179
+
Some(target)
180
+
} else {
181
+
None
182
+
}
180
183
})
181
184
.take(1)
182
185
.next()
···
186
189
e.1.insert(did.clone());
187
190
}
188
191
}
189
-
let total = grouped_counts.len() as u64;
190
192
let mut items: Vec<(String, u64, u64)> = grouped_counts
191
193
.iter()
192
194
.map(|(k, (n, u))| (k.0.clone(), *n, u.len() as u64))
···
202
204
} else {
203
205
None
204
206
};
205
-
Ok(PagedOrderedCollection {
206
-
items,
207
-
next,
208
-
total,
209
-
})
207
+
Ok(PagedOrderedCollection { items, next })
210
208
}
211
209
212
210
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
+81
-79
constellation/src/storage/mod.rs
+81
-79
constellation/src/storage/mod.rs
···
24
24
/// this has weaker guarantees than PagedAppendingCollection: it might
25
25
/// return a totally consistent snapshot. but it should avoid duplicates
26
26
/// and each page should at least be internally consistent.
27
-
#[derive(Debug, PartialEq)]
27
+
#[derive(Debug, PartialEq, Default)]
28
28
pub struct PagedOrderedCollection<T, K: Ord> {
29
29
pub items: Vec<T>,
30
30
pub next: Option<K>,
31
-
pub total: u64,
32
31
}
33
32
34
33
#[derive(Debug, Deserialize, Serialize, PartialEq)]
···
1355
1354
//////// many-to-many /////////
1356
1355
1357
1356
test_each_storage!(get_m2m_counts_empty, |storage| {
1358
-
assert_eq!(storage.get_many_to_many_counts(
1359
-
"a.com",
1360
-
"a.b.c",
1361
-
".d.e",
1362
-
".f.g",
1363
-
10,
1364
-
None,
1365
-
&HashSet::new(),
1366
-
&HashSet::new(),
1367
-
)?, PagedOrderedCollection {
1368
-
items: vec![],
1369
-
next: None,
1370
-
total: 0,
1371
-
});
1357
+
assert_eq!(
1358
+
storage.get_many_to_many_counts(
1359
+
"a.com",
1360
+
"a.b.c",
1361
+
".d.e",
1362
+
".f.g",
1363
+
10,
1364
+
None,
1365
+
&HashSet::new(),
1366
+
&HashSet::new(),
1367
+
)?,
1368
+
PagedOrderedCollection {
1369
+
items: vec![],
1370
+
next: None,
1371
+
}
1372
+
);
1372
1373
});
1373
1374
1374
1375
test_each_storage!(get_m2m_counts_single, |storage| {
···
1396
1397
},
1397
1398
0,
1398
1399
)?;
1399
-
assert_eq!(storage.get_many_to_many_counts(
1400
-
"a.com",
1401
-
"app.t.c",
1402
-
".abc.uri",
1403
-
".def.uri",
1404
-
10,
1405
-
None,
1406
-
&HashSet::new(),
1407
-
&HashSet::new(),
1408
-
)?, PagedOrderedCollection {
1409
-
items: vec![("b.com".to_string(), 1, 1)],
1410
-
next: None,
1411
-
total: 1,
1412
-
});
1400
+
assert_eq!(
1401
+
storage.get_many_to_many_counts(
1402
+
"a.com",
1403
+
"app.t.c",
1404
+
".abc.uri",
1405
+
".def.uri",
1406
+
10,
1407
+
None,
1408
+
&HashSet::new(),
1409
+
&HashSet::new(),
1410
+
)?,
1411
+
PagedOrderedCollection {
1412
+
items: vec![("b.com".to_string(), 1, 1)],
1413
+
next: None,
1414
+
}
1415
+
);
1413
1416
});
1414
1417
1415
1418
test_each_storage!(get_m2m_counts_filters, |storage| {
···
1493
1496
},
1494
1497
3,
1495
1498
)?;
1496
-
assert_eq!(storage.get_many_to_many_counts(
1497
-
"a.com",
1498
-
"app.t.c",
1499
-
".abc.uri",
1500
-
".def.uri",
1501
-
10,
1502
-
None,
1503
-
&HashSet::new(),
1504
-
&HashSet::new(),
1505
-
)?, PagedOrderedCollection {
1506
-
items: vec![
1507
-
("b.com".to_string(), 2, 2),
1508
-
("c.com".to_string(), 2, 1),
1509
-
],
1510
-
next: None,
1511
-
total: 2,
1512
-
});
1513
-
assert_eq!(storage.get_many_to_many_counts(
1514
-
"a.com",
1515
-
"app.t.c",
1516
-
".abc.uri",
1517
-
".def.uri",
1518
-
10,
1519
-
None,
1520
-
&HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1521
-
&HashSet::new(),
1522
-
)?, PagedOrderedCollection {
1523
-
items: vec![
1524
-
("c.com".to_string(), 2, 1),
1525
-
],
1526
-
next: None,
1527
-
total: 1,
1528
-
});
1529
-
assert_eq!(storage.get_many_to_many_counts(
1530
-
"a.com",
1531
-
"app.t.c",
1532
-
".abc.uri",
1533
-
".def.uri",
1534
-
10,
1535
-
None,
1536
-
&HashSet::new(),
1537
-
&HashSet::from_iter(["b.com".to_string()]),
1538
-
)?, PagedOrderedCollection {
1539
-
items: vec![
1540
-
("b.com".to_string(), 2, 2),
1541
-
],
1542
-
next: None,
1543
-
total: 1,
1544
-
});
1499
+
assert_eq!(
1500
+
storage.get_many_to_many_counts(
1501
+
"a.com",
1502
+
"app.t.c",
1503
+
".abc.uri",
1504
+
".def.uri",
1505
+
10,
1506
+
None,
1507
+
&HashSet::new(),
1508
+
&HashSet::new(),
1509
+
)?,
1510
+
PagedOrderedCollection {
1511
+
items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1512
+
next: None,
1513
+
}
1514
+
);
1515
+
assert_eq!(
1516
+
storage.get_many_to_many_counts(
1517
+
"a.com",
1518
+
"app.t.c",
1519
+
".abc.uri",
1520
+
".def.uri",
1521
+
10,
1522
+
None,
1523
+
&HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1524
+
&HashSet::new(),
1525
+
)?,
1526
+
PagedOrderedCollection {
1527
+
items: vec![("c.com".to_string(), 2, 1),],
1528
+
next: None,
1529
+
}
1530
+
);
1531
+
assert_eq!(
1532
+
storage.get_many_to_many_counts(
1533
+
"a.com",
1534
+
"app.t.c",
1535
+
".abc.uri",
1536
+
".def.uri",
1537
+
10,
1538
+
None,
1539
+
&HashSet::new(),
1540
+
&HashSet::from_iter(["b.com".to_string()]),
1541
+
)?,
1542
+
PagedOrderedCollection {
1543
+
items: vec![("b.com".to_string(), 2, 2),],
1544
+
next: None,
1545
+
}
1546
+
);
1545
1547
});
1546
1548
}
+155
-13
constellation/src/storage/rocks_store.rs
+155
-13
constellation/src/storage/rocks_store.rs
···
1
-
use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats};
1
+
use super::{
2
+
ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection,
3
+
StorageStats,
4
+
};
2
5
use crate::{CountsByCount, Did, RecordId};
3
6
use anyhow::{bail, Result};
4
7
use bincode::Options as BincodeOptions;
···
11
14
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
12
15
};
13
16
use serde::{Deserialize, Serialize};
14
-
use std::collections::{HashMap, HashSet};
17
+
use std::collections::{BTreeMap, HashMap, HashSet};
15
18
use std::io::Read;
16
19
use std::marker::PhantomData;
17
20
use std::path::{Path, PathBuf};
···
828
831
impl LinkReader for RocksStorage {
829
832
fn get_many_to_many_counts(
830
833
&self,
831
-
_target: &str,
832
-
_collection: &str,
833
-
_path: &str,
834
-
_path_to_other: &str,
835
-
_limit: u64,
836
-
_after: Option<String>,
837
-
_filter_dids: &HashSet<Did>,
838
-
_filter_to_targets: &HashSet<String>,
834
+
target: &str,
835
+
collection: &str,
836
+
path: &str,
837
+
path_to_other: &str,
838
+
limit: u64,
839
+
after: Option<String>,
840
+
filter_dids: &HashSet<Did>,
841
+
filter_to_targets: &HashSet<String>,
839
842
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
840
-
todo!();
843
+
let collection = Collection(collection.to_string());
844
+
let path = RPath(path.to_string());
845
+
846
+
let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone());
847
+
848
+
// unfortunately the cursor is a, uh, stringified number.
849
+
// this was easier for the memstore (plain target, not target id), and
850
+
// making it generic is a bit awful.
851
+
// so... parse the number out of a string here :(
852
+
// TODO: this should bubble up to a BAD_REQUEST response
853
+
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
854
+
855
+
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
856
+
return Ok(Default::default());
857
+
};
858
+
859
+
let filter_did_ids: HashMap<DidId, bool> = filter_dids
860
+
.into_iter()
861
+
.filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
862
+
.collect::<Result<Vec<DidIdValue>>>()?
863
+
.into_iter()
864
+
.map(|DidIdValue(id, active)| (id, active))
865
+
.collect();
866
+
867
+
let filter_to_target_ids = filter_to_targets
868
+
.into_iter()
869
+
.filter_map(|target| {
870
+
self.target_id_table
871
+
.get_id_val(
872
+
&self.db,
873
+
&TargetKey(Target(target.to_string()), collection.clone(), path.clone()),
874
+
)
875
+
.transpose()
876
+
})
877
+
.collect::<Result<HashSet<TargetId>>>()?;
878
+
879
+
let linkers = self.get_target_linkers(&target_id)?;
880
+
881
+
let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new();
882
+
883
+
for (did_id, rkey) in linkers.0 {
884
+
if did_id.is_empty() {
885
+
continue;
886
+
}
887
+
888
+
if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
889
+
continue;
890
+
}
891
+
892
+
let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey);
893
+
let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
894
+
continue;
895
+
};
896
+
897
+
let Some(fwd_target) = targets
898
+
.0
899
+
.into_iter()
900
+
.filter_map(|RecordLinkTarget(rpath, target_id)| {
901
+
if rpath.0 == path_to_other
902
+
&& (filter_to_target_ids.is_empty()
903
+
|| filter_to_target_ids.contains(&target_id))
904
+
{
905
+
Some(target_id)
906
+
} else {
907
+
None
908
+
}
909
+
})
910
+
.take(1)
911
+
.next()
912
+
else {
913
+
continue;
914
+
};
915
+
916
+
// small relief: we page over target ids, so we can already bail
917
+
// reprocessing previous pages here
918
+
if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) {
919
+
continue;
920
+
}
921
+
922
+
// aand we can skip target ids that must be on future pages
923
+
// (this check continues after the did-lookup, which we have to do)
924
+
let page_is_full = grouped_counts.len() as u64 >= limit;
925
+
if page_is_full {
926
+
let current_max = grouped_counts.keys().rev().next().unwrap(); // limit should be non-zero bleh
927
+
if fwd_target > *current_max {
928
+
continue;
929
+
}
930
+
}
931
+
932
+
// bit painful: 2-step lookup to make sure this did is active
933
+
let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
934
+
eprintln!("failed to look up did from did_id {did_id:?}");
935
+
continue;
936
+
};
937
+
let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
938
+
eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
939
+
continue;
940
+
};
941
+
if !active {
942
+
continue;
943
+
}
944
+
945
+
// page-management, continued
946
+
// if we have a full page, and we're inserting a *new* key less than
947
+
// the current max, then we can evict the current max
948
+
let mut should_evict = false;
949
+
let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| {
950
+
// this is a *new* key, so kick the max if we're full
951
+
should_evict = page_is_full;
952
+
Default::default()
953
+
});
954
+
entry.0 += 1;
955
+
entry.1.insert(did_id.clone());
956
+
957
+
if should_evict {
958
+
grouped_counts.pop_last();
959
+
}
960
+
}
961
+
962
+
let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len());
963
+
for (target_id, (n, dids)) in grouped_counts {
964
+
let Some(target) = self.target_id_table.get_val_from_id(&self.db, target_id)? else {
965
+
eprintln!("failed to look up target from target_id {target_id:?}");
966
+
continue;
967
+
};
968
+
items.push((target, n, dids.len() as u64));
969
+
}
970
+
971
+
let next = if grouped_counts.len() as u64 >= limit {
972
+
// yeah.... it's a number saved as a string......sorry
973
+
grouped_counts
974
+
.keys()
975
+
.rev()
976
+
.next()
977
+
.map(|k| format!("{}", k.0))
978
+
} else {
979
+
None
980
+
};
981
+
982
+
Ok(PagedOrderedCollection { items, next })
841
983
}
842
984
843
985
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
···
1156
1298
}
1157
1299
1158
1300
// target ids
1159
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1301
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
1160
1302
struct TargetId(u64); // key
1161
1303
1162
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1304
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1163
1305
pub struct Target(pub String); // the actual target/uri
1164
1306
1165
1307
// targets (uris, dids, etc.): the reverse index