+22
-2
ufos/src/main.rs
+22
-2
ufos/src/main.rs
···
9
9
use ufos::file_consumer;
10
10
use ufos::server;
11
11
use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
12
-
use ufos::storage_fjall::FjallStorage;
12
+
use ufos::storage_fjall::{FjallConfig, FjallStorage};
13
13
use ufos::store_types::SketchSecretPrefix;
14
14
use ufos::{nice_duration, ConsumerInfo};
15
15
···
55
55
/// DEBUG: interpret jetstream as a file fixture
56
56
#[arg(long, action)]
57
57
jetstream_fixture: bool,
58
+
/// HOPEFULLY only needed once
59
+
///
60
+
/// brute-force garbage-collect all dangling records because we weren't deleting
61
+
/// them before at all (oops)
62
+
#[arg(long, action)]
63
+
fjall_records_gc: bool,
58
64
}
59
65
60
66
#[tokio::main]
···
67
73
args.data.clone(),
68
74
jetstream,
69
75
args.jetstream_force,
70
-
Default::default(),
76
+
FjallConfig {
77
+
major_compact: !args.fjall_records_gc,
78
+
},
71
79
)?;
80
+
81
+
if args.fjall_records_gc {
82
+
log::info!("beginning brute-force records gc");
83
+
let t0 = std::time::Instant::now();
84
+
let (n, m) = write_store.records_brute_gc_danger()?;
85
+
let dt = t0.elapsed();
86
+
log::info!(
87
+
"completed brute-force records gc in {dt:?}, removed {n} and retained {m} records."
88
+
);
89
+
return Ok(());
90
+
}
91
+
72
92
go(args, read_store, write_store, cursor, sketch_secret).await?;
73
93
Ok(())
74
94
}
+85
-17
ufos/src/storage_fjall.rs
+85
-17
ufos/src/storage_fjall.rs
···
148
148
/// this is only meant for tests
149
149
#[cfg(test)]
150
150
pub temp: bool,
151
+
/// do major compaction on startup
152
+
///
153
+
/// default is false. probably a good thing unless it's too slow.
154
+
pub major_compact: bool,
151
155
}
152
156
153
157
impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage {
···
155
159
path: impl AsRef<Path>,
156
160
endpoint: String,
157
161
force_endpoint: bool,
158
-
_config: FjallConfig,
162
+
config: FjallConfig,
159
163
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
160
164
let keyspace = {
161
165
let config = Config::new(path);
···
224
228
sketch_secret
225
229
};
226
230
227
-
for (partition, name) in [
228
-
(&global, "global"),
229
-
(&feeds, "feeds"),
230
-
(&records, "records"),
231
-
(&rollups, "rollups"),
232
-
(&queues, "queues"),
233
-
] {
234
-
let size0 = partition.disk_space();
235
-
log::info!("beggining major compaction for {name} (original size: {size0})");
236
-
let t0 = Instant::now();
237
-
partition.major_compact().expect("compact better work 😬");
238
-
let dt = t0.elapsed();
239
-
let sizef = partition.disk_space();
240
-
let dsize = (sizef as i64) - (size0 as i64);
241
-
log::info!("completed compaction for {name} in {dt:?} (new size: {sizef}, {dsize})");
231
+
if config.major_compact {
232
+
for (partition, name) in [
233
+
(&global, "global"),
234
+
(&feeds, "feeds"),
235
+
(&records, "records"),
236
+
(&rollups, "rollups"),
237
+
(&queues, "queues"),
238
+
] {
239
+
let size0 = partition.disk_space();
240
+
log::info!("beggining major compaction for {name} (original size: {size0})");
241
+
let t0 = Instant::now();
242
+
partition.major_compact().expect("compact better work 😬");
243
+
let dt = t0.elapsed();
244
+
let sizef = partition.disk_space();
245
+
let dsize = (sizef as i64) - (size0 as i64);
246
+
log::info!(
247
+
"completed compaction for {name} in {dt:?} (new size: {sizef}, {dsize})"
248
+
);
249
+
}
250
+
} else {
251
+
log::info!("skipping major compaction on startup");
242
252
}
243
253
244
254
let reader = FjallReader {
···
1366
1376
batch.commit()?;
1367
1377
Ok((cursors_advanced, dirty_nsids))
1368
1378
}
1379
+
pub fn records_brute_gc_danger(&self) -> StorageResult<(usize, usize)> {
1380
+
let (mut removed, mut retained) = (0, 0);
1381
+
let mut to_retain = HashSet::<Vec<u8>>::new();
1382
+
1383
+
// Partition: 'feed'
1384
+
//
1385
+
// - Per-collection list of record references ordered by jetstream cursor
1386
+
// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
1387
+
// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
1388
+
//
1389
+
//
1390
+
// Partition: 'records'
1391
+
//
1392
+
// - Actual records by their atproto location
1393
+
// - key: nullstr || nullstr || nullstr (did, collection, rkey)
1394
+
// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
1395
+
//
1396
+
//
1397
+
1398
+
log::warn!("loading *all* record keys from feed into memory (yikes)");
1399
+
let t0 = Instant::now();
1400
+
for (i, kv) in self.feeds.iter().enumerate() {
1401
+
if i > 0 && (i % 100000 == 0) {
1402
+
log::info!("{i}...");
1403
+
}
1404
+
let (key_bytes, val_bytes) = kv?;
1405
+
let key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
1406
+
let val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
1407
+
let record_key: RecordLocationKey = (&key, &val).into();
1408
+
to_retain.insert(record_key.to_db_bytes()?);
1409
+
}
1410
+
log::warn!(
1411
+
"loaded. wow. took {:?}, found {} keys",
1412
+
t0.elapsed(),
1413
+
to_retain.len()
1414
+
);
1415
+
1416
+
log::warn!("warmup OVER, iterating some billions of record keys now");
1417
+
let t0 = Instant::now();
1418
+
for (i, k) in self.records.keys().enumerate() {
1419
+
let key_bytes = k?;
1420
+
if to_retain.contains(&*key_bytes) {
1421
+
retained += 1;
1422
+
} else {
1423
+
self.records.remove(key_bytes)?;
1424
+
removed += 1;
1425
+
}
1426
+
if i > 0 && (i % 10_000_000) == 0 {
1427
+
log::info!("{i}: {retained} retained, {removed} removed.");
1428
+
}
1429
+
}
1430
+
log::warn!("whew! that took {:?}", t0.elapsed());
1431
+
1432
+
Ok((removed, retained))
1433
+
}
1369
1434
}
1370
1435
1371
1436
impl StoreWriter<FjallBackground> for FjallWriter {
···
1817
1882
tempfile::tempdir().unwrap(),
1818
1883
"offline test (no real jetstream endpoint)".to_string(),
1819
1884
false,
1820
-
FjallConfig { temp: true },
1885
+
FjallConfig {
1886
+
temp: true,
1887
+
..Default::default()
1888
+
},
1821
1889
)
1822
1890
.unwrap();
1823
1891
(read, write)