+13
ufos/readme.md
+13
ufos/readme.md
···
28
28
cargo clean
29
29
```
30
30
31
+
for bonilla but 64-bit? (rp4)
32
+
```bash
33
+
cross build --release --target aarch64-unknown-linux-gnu && scp ../target/aarch64-unknown-linux-gnu/release/ufos pi@bonilla.local:ufos
34
+
# ^^ fails due to linker?
35
+
36
+
cross build --release --target aarch64-unknown-linux-musl && scp ../target/aarch64-unknown-linux-musl/release/ufos pi@bonilla.local:ufos
37
+
# seems to work
38
+
39
+
rsync -avhP ufos-bff-rl/ pi@bonilla:/mnt/ufos-db/
40
+
41
+
RUST_LOG=info ./ufos --jetstream us-west-2 --data /mnt/ufos-db/
42
+
```
43
+
31
44
nginx forward proxy for websocket (run this on another host):
32
45
33
46
```nginx
+6
-2
ufos/src/storage.rs
+6
-2
ufos/src/storage.rs
···
53
53
54
54
fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)>;
55
55
56
-
fn trim_collection(&mut self, collection: &Nsid, limit: usize)
57
-
-> StorageResult<(usize, usize)>;
56
+
fn trim_collection(
57
+
&mut self,
58
+
collection: &Nsid,
59
+
limit: usize,
60
+
full_scan: bool,
61
+
) -> StorageResult<(usize, usize)>;
58
62
59
63
fn delete_account(&mut self, did: &Did) -> StorageResult<usize>;
60
64
}
+54
-26
ufos/src/storage_fjall.rs
+54
-26
ufos/src/storage_fjall.rs
···
836
836
Err(StorageError::BackgroundAlreadyStarted)
837
837
} else {
838
838
if reroll {
839
+
log::info!("reroll: resetting rollup cursor...");
839
840
insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
841
+
log::info!("reroll: clearing trim cursors...");
842
+
let mut batch = self.keyspace.batch();
843
+
for kv in self
844
+
.global
845
+
.prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes(
846
+
&Default::default(),
847
+
)?)
848
+
{
849
+
let (k, _) = kv?;
850
+
batch.remove(&self.global, k);
851
+
}
852
+
let n = batch.len();
853
+
batch.commit()?;
854
+
log::info!("reroll: cleared {n} trim cursors.");
840
855
}
841
856
Ok(FjallBackground(self.clone()))
842
857
}
···
985
1000
&mut self,
986
1001
collection: &Nsid,
987
1002
limit: usize,
1003
+
full_scan: bool,
988
1004
) -> StorageResult<(usize, usize)> {
989
1005
let mut dangling_feed_keys_cleaned = 0;
990
1006
let mut records_deleted = 0;
991
1007
992
-
let feed_trim_cursor_key =
993
-
TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?;
994
-
let trim_cursor = self
995
-
.global
996
-
.get(&feed_trim_cursor_key)?
997
-
.map(|value_bytes| db_complete(&value_bytes))
998
-
.transpose()?
999
-
.unwrap_or(Cursor::from_start());
1000
-
1001
-
let live_range =
1002
-
NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()?;
1008
+
let live_range = if full_scan {
1009
+
let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
1010
+
let end = NsidRecordFeedKey::prefix_range_end(collection)?;
1011
+
start..end
1012
+
} else {
1013
+
let feed_trim_cursor_key =
1014
+
TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?;
1015
+
let trim_cursor = self
1016
+
.global
1017
+
.get(&feed_trim_cursor_key)?
1018
+
.map(|value_bytes| db_complete(&value_bytes))
1019
+
.transpose()?
1020
+
.unwrap_or(Cursor::from_start());
1021
+
NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()?
1022
+
};
1003
1023
1004
1024
let mut live_records_found = 0;
1005
-
let mut latest_expired_feed_cursor = None;
1025
+
let mut candidate_new_feed_lower_cursor = None;
1026
+
let mut ended_early = false;
1006
1027
let mut batch = self.keyspace.batch();
1007
1028
for (i, kv) in self.feeds.range(live_range).rev().enumerate() {
1008
-
if i > 1_000_000 {
1029
+
if !full_scan && i > 1_000_000 {
1009
1030
log::info!("stopping collection trim early: already scanned 1M elements");
1031
+
ended_early = true;
1010
1032
break;
1011
1033
}
1012
1034
let (key_bytes, val_bytes) = kv?;
···
1047
1069
live_records_found += 1;
1048
1070
if live_records_found <= limit {
1049
1071
continue;
1050
-
} else if latest_expired_feed_cursor.is_none() {
1051
-
latest_expired_feed_cursor = Some(feed_key.cursor());
1052
-
batch.insert(
1053
-
&self.global,
1054
-
&TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?,
1055
-
&feed_key.cursor().to_db_bytes()?,
1056
-
);
1072
+
}
1073
+
if candidate_new_feed_lower_cursor.is_none() {
1074
+
candidate_new_feed_lower_cursor = Some(feed_key.cursor());
1057
1075
}
1058
1076
1059
1077
batch.remove(&self.feeds, key_bytes);
···
1061
1079
records_deleted += 1;
1062
1080
}
1063
1081
1082
+
if !ended_early {
1083
+
if let Some(new_cursor) = candidate_new_feed_lower_cursor {
1084
+
batch.insert(
1085
+
&self.global,
1086
+
&TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?,
1087
+
&new_cursor.to_db_bytes()?,
1088
+
);
1089
+
}
1090
+
}
1091
+
1064
1092
batch.commit()?;
1065
1093
1066
-
log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records");
1094
+
log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})");
1067
1095
Ok((dangling_feed_keys_cleaned, records_deleted))
1068
1096
}
1069
1097
···
1116
1144
let t0 = Instant::now();
1117
1145
let (mut total_danglers, mut total_deleted) = (0, 0);
1118
1146
for collection in &dirty_nsids {
1119
-
let (danglers, deleted) = self.0.trim_collection(collection, 512).inspect_err(|e| log::error!("trim error: {e:?}"))?;
1147
+
let (danglers, deleted) = self.0.trim_collection(collection, 512, false).inspect_err(|e| log::error!("trim error: {e:?}"))?;
1120
1148
total_danglers += danglers;
1121
1149
total_deleted += deleted;
1122
1150
if total_deleted > 1_000_000 {
···
1666
1694
)?;
1667
1695
assert_eq!(records.len(), 0);
1668
1696
1669
-
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6)?;
1670
-
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6)?;
1671
-
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6)?;
1672
-
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6)?;
1697
+
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?;
1698
+
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?;
1699
+
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?;
1700
+
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?;
1673
1701
1674
1702
let records = read.get_records_by_collections(
1675
1703
&[Nsid::new("a.a.a".to_string()).unwrap()],
+5
-4
ufos/src/storage_mem.rs
+5
-4
ufos/src/storage_mem.rs
···
966
966
&mut self,
967
967
collection: &Nsid,
968
968
limit: usize,
969
+
_full_scan: bool,
969
970
// TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end)
970
971
) -> StorageResult<(usize, usize)> {
971
972
let mut dangling_feed_keys_cleaned = 0;
···
1510
1511
)?;
1511
1512
assert_eq!(records.len(), 0);
1512
1513
1513
-
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6)?;
1514
-
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6)?;
1515
-
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6)?;
1516
-
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6)?;
1514
+
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?;
1515
+
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?;
1516
+
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?;
1517
+
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?;
1517
1518
1518
1519
let records = read.get_records_by_collections(
1519
1520
&[Nsid::new("a.a.a".to_string()).unwrap()],