-1
server/Cargo.lock
-1
server/Cargo.lock
-1
server/Cargo.toml
-1
server/Cargo.toml
-19
server/src/db/mod.rs
-19
server/src/db/mod.rs
···
380
380
})
381
381
}
382
382
383
-
// train zstd dict with 100 blocks from every lexicon
384
-
pub fn train_zstd_dict(&self) -> AppResult<Vec<u8>> {
385
-
let samples = self
386
-
.get_nsids()
387
-
.filter_map(|nsid| self.get_handle(&nsid))
388
-
.map(|handle| {
389
-
handle
390
-
.iter()
391
-
.rev()
392
-
.map(|res| {
393
-
res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
394
-
.map(|(_, value)| Cursor::new(value))
395
-
})
396
-
.take(1000)
397
-
})
398
-
.flatten();
399
-
zstd::dict::from_sample_iterator(samples, 1024 * 128).map_err(AppError::from)
400
-
}
401
-
402
383
pub fn get_hits(
403
384
&self,
404
385
nsid: &str,
+501
server/src/db_old/block.rs
+501
server/src/db_old/block.rs
···
1
+
use ordered_varint::Variable;
2
+
use rkyv::{
3
+
Archive, Deserialize, Serialize,
4
+
api::high::{HighSerializer, HighValidator},
5
+
bytecheck::CheckBytes,
6
+
de::Pool,
7
+
rancor::{self, Strategy},
8
+
ser::allocator::ArenaHandle,
9
+
util::AlignedVec,
10
+
};
11
+
use std::{
12
+
io::{self, Read, Write},
13
+
marker::PhantomData,
14
+
};
15
+
16
+
use crate::error::{AppError, AppResult};
17
+
18
+
pub struct Item<T> {
19
+
pub timestamp: u64,
20
+
data: AlignedVec,
21
+
phantom: PhantomData<T>,
22
+
}
23
+
24
+
impl<T: Archive> Item<T> {
25
+
pub fn access(&self) -> &T::Archived {
26
+
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
27
+
}
28
+
}
29
+
30
+
impl<T> Item<T>
31
+
where
32
+
T: Archive,
33
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
34
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
35
+
{
36
+
pub fn deser(&self) -> AppResult<T> {
37
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
38
+
}
39
+
}
40
+
41
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
42
+
pub fn new(timestamp: u64, data: &T) -> Self {
43
+
Item {
44
+
timestamp,
45
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
46
+
phantom: PhantomData,
47
+
}
48
+
}
49
+
}
50
+
51
+
pub struct ItemEncoder<W: Write, T> {
52
+
writer: W,
53
+
prev_timestamp: u64,
54
+
prev_delta: i64,
55
+
_item: PhantomData<T>,
56
+
}
57
+
58
+
impl<W: Write, T> ItemEncoder<W, T> {
59
+
pub fn new(writer: W) -> Self {
60
+
ItemEncoder {
61
+
writer,
62
+
prev_timestamp: 0,
63
+
prev_delta: 0,
64
+
_item: PhantomData,
65
+
}
66
+
}
67
+
68
+
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
69
+
if self.prev_timestamp == 0 {
70
+
// self.writer.write_varint(item.timestamp)?;
71
+
self.prev_timestamp = item.timestamp;
72
+
self.write_data(&item.data)?;
73
+
return Ok(());
74
+
}
75
+
76
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
77
+
78
+
self.writer.write_varint(delta - self.prev_delta)?;
79
+
self.prev_timestamp = item.timestamp;
80
+
self.prev_delta = delta;
81
+
82
+
self.write_data(&item.data)?;
83
+
84
+
Ok(())
85
+
}
86
+
87
+
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
88
+
self.writer.write_varint(data.len())?;
89
+
self.writer.write_all(data)?;
90
+
Ok(())
91
+
}
92
+
93
+
pub fn finish(mut self) -> AppResult<W> {
94
+
self.writer.flush()?;
95
+
Ok(self.writer)
96
+
}
97
+
}
98
+
99
+
pub struct ItemDecoder<R, T> {
100
+
reader: R,
101
+
current_timestamp: u64,
102
+
current_delta: i64,
103
+
first_item: bool,
104
+
_item: PhantomData<T>,
105
+
}
106
+
107
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
108
+
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
109
+
Ok(ItemDecoder {
110
+
reader,
111
+
current_timestamp: start_timestamp,
112
+
current_delta: 0,
113
+
first_item: true,
114
+
_item: PhantomData,
115
+
})
116
+
}
117
+
118
+
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
119
+
if self.first_item {
120
+
// read the first timestamp
121
+
// let timestamp = match self.reader.read_varint::<u64>() {
122
+
// Ok(timestamp) => timestamp,
123
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
124
+
// Err(e) => return Err(e.into()),
125
+
// };
126
+
// self.current_timestamp = timestamp;
127
+
128
+
let Some(data_raw) = self.read_item()? else {
129
+
return Ok(None);
130
+
};
131
+
self.first_item = false;
132
+
return Ok(Some(Item {
133
+
timestamp: self.current_timestamp,
134
+
data: data_raw,
135
+
phantom: PhantomData,
136
+
}));
137
+
}
138
+
139
+
let Some(_delta) = self.read_timestamp()? else {
140
+
return Ok(None);
141
+
};
142
+
143
+
// read data
144
+
let data_raw = match self.read_item()? {
145
+
Some(data_raw) => data_raw,
146
+
None => {
147
+
return Err(io::Error::new(
148
+
io::ErrorKind::UnexpectedEof,
149
+
"expected data after delta",
150
+
)
151
+
.into());
152
+
}
153
+
};
154
+
155
+
Ok(Some(Item {
156
+
timestamp: self.current_timestamp,
157
+
data: data_raw,
158
+
phantom: PhantomData,
159
+
}))
160
+
}
161
+
162
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
163
+
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
164
+
let delta = match self.reader.read_varint::<i64>() {
165
+
Ok(delta) => delta,
166
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
167
+
Err(e) => return Err(e.into()),
168
+
};
169
+
self.current_delta += delta;
170
+
self.current_timestamp =
171
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
172
+
Ok(Some(self.current_timestamp))
173
+
}
174
+
175
+
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
176
+
let data_len = match self.reader.read_varint::<usize>() {
177
+
Ok(data_len) => data_len,
178
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
179
+
Err(e) => return Err(e.into()),
180
+
};
181
+
let mut data_raw = AlignedVec::with_capacity(data_len);
182
+
for _ in 0..data_len {
183
+
data_raw.push(0);
184
+
}
185
+
self.reader.read_exact(data_raw.as_mut_slice())?;
186
+
Ok(Some(data_raw))
187
+
}
188
+
}
189
+
190
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
191
+
type Item = AppResult<Item<T>>;
192
+
193
+
fn next(&mut self) -> Option<Self::Item> {
194
+
self.decode().transpose()
195
+
}
196
+
}
197
+
198
+
pub trait WriteVariableExt: Write {
199
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
200
+
value.encode_variable(self)
201
+
}
202
+
}
203
+
impl<W: Write> WriteVariableExt for W {}
204
+
205
+
pub trait ReadVariableExt: Read {
206
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
207
+
T::decode_variable(self)
208
+
}
209
+
}
210
+
impl<R: Read> ReadVariableExt for R {}
211
+
212
+
#[cfg(test)]
213
+
mod test {
214
+
use super::*;
215
+
use rkyv::{Archive, Deserialize, Serialize};
216
+
use std::io::Cursor;
217
+
218
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
219
+
#[rkyv(compare(PartialEq))]
220
+
struct TestData {
221
+
id: u32,
222
+
value: String,
223
+
}
224
+
225
+
#[test]
226
+
fn test_encoder_decoder_single_item() {
227
+
let data = TestData {
228
+
id: 123,
229
+
value: "test".to_string(),
230
+
};
231
+
232
+
let item = Item::new(1000, &data);
233
+
234
+
// encode
235
+
let mut buffer = Vec::new();
236
+
let mut encoder = ItemEncoder::new(&mut buffer);
237
+
encoder.encode(&item).unwrap();
238
+
encoder.finish().unwrap();
239
+
240
+
// decode
241
+
let cursor = Cursor::new(buffer);
242
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
243
+
244
+
let decoded_item = decoder.decode().unwrap().unwrap();
245
+
assert_eq!(decoded_item.timestamp, 1000);
246
+
247
+
let decoded_data = decoded_item.access();
248
+
assert_eq!(decoded_data.id, 123);
249
+
assert_eq!(decoded_data.value.as_str(), "test");
250
+
}
251
+
252
+
#[test]
253
+
fn test_encoder_decoder_multiple_items() {
254
+
let items = vec![
255
+
Item::new(
256
+
1000,
257
+
&TestData {
258
+
id: 1,
259
+
value: "first".to_string(),
260
+
},
261
+
),
262
+
Item::new(
263
+
1010,
264
+
&TestData {
265
+
id: 2,
266
+
value: "second".to_string(),
267
+
},
268
+
),
269
+
Item::new(
270
+
1015,
271
+
&TestData {
272
+
id: 3,
273
+
value: "third".to_string(),
274
+
},
275
+
),
276
+
Item::new(
277
+
1025,
278
+
&TestData {
279
+
id: 4,
280
+
value: "fourth".to_string(),
281
+
},
282
+
),
283
+
];
284
+
285
+
// encode
286
+
let mut buffer = Vec::new();
287
+
let mut encoder = ItemEncoder::new(&mut buffer);
288
+
289
+
for item in &items {
290
+
encoder.encode(item).unwrap();
291
+
}
292
+
encoder.finish().unwrap();
293
+
294
+
// decode
295
+
let cursor = Cursor::new(buffer);
296
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
297
+
298
+
let mut decoded_items = Vec::new();
299
+
while let Some(item) = decoder.decode().unwrap() {
300
+
decoded_items.push(item);
301
+
}
302
+
303
+
assert_eq!(decoded_items.len(), 4);
304
+
305
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
306
+
assert_eq!(original.timestamp, decoded.timestamp);
307
+
assert_eq!(original.access().id, decoded.access().id);
308
+
assert_eq!(
309
+
original.access().value.as_str(),
310
+
decoded.access().value.as_str()
311
+
);
312
+
}
313
+
}
314
+
315
+
#[test]
316
+
fn test_encoder_decoder_with_iterator() {
317
+
let items = vec![
318
+
Item::new(
319
+
2000,
320
+
&TestData {
321
+
id: 10,
322
+
value: "a".to_string(),
323
+
},
324
+
),
325
+
Item::new(
326
+
2005,
327
+
&TestData {
328
+
id: 20,
329
+
value: "b".to_string(),
330
+
},
331
+
),
332
+
Item::new(
333
+
2012,
334
+
&TestData {
335
+
id: 30,
336
+
value: "c".to_string(),
337
+
},
338
+
),
339
+
];
340
+
341
+
// encode
342
+
let mut buffer = Vec::new();
343
+
let mut encoder = ItemEncoder::new(&mut buffer);
344
+
345
+
for item in &items {
346
+
encoder.encode(item).unwrap();
347
+
}
348
+
encoder.finish().unwrap();
349
+
350
+
// decode
351
+
let cursor = Cursor::new(buffer);
352
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
353
+
354
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
355
+
let decoded_items = decoded_items.unwrap();
356
+
357
+
assert_eq!(decoded_items.len(), 3);
358
+
assert_eq!(decoded_items[0].timestamp, 2000);
359
+
assert_eq!(decoded_items[1].timestamp, 2005);
360
+
assert_eq!(decoded_items[2].timestamp, 2012);
361
+
362
+
assert_eq!(decoded_items[0].access().id, 10);
363
+
assert_eq!(decoded_items[1].access().id, 20);
364
+
assert_eq!(decoded_items[2].access().id, 30);
365
+
}
366
+
367
+
#[test]
368
+
fn test_delta_compression() {
369
+
let items = vec![
370
+
Item::new(
371
+
1000,
372
+
&TestData {
373
+
id: 1,
374
+
value: "a".to_string(),
375
+
},
376
+
),
377
+
Item::new(
378
+
1010,
379
+
&TestData {
380
+
id: 2,
381
+
value: "b".to_string(),
382
+
},
383
+
), // delta = 10
384
+
Item::new(
385
+
1020,
386
+
&TestData {
387
+
id: 3,
388
+
value: "c".to_string(),
389
+
},
390
+
), // delta = 10, delta-of-delta = 0
391
+
Item::new(
392
+
1025,
393
+
&TestData {
394
+
id: 4,
395
+
value: "d".to_string(),
396
+
},
397
+
), // delta = 5, delta-of-delta = -5
398
+
];
399
+
400
+
let mut buffer = Vec::new();
401
+
let mut encoder = ItemEncoder::new(&mut buffer);
402
+
403
+
for item in &items {
404
+
encoder.encode(item).unwrap();
405
+
}
406
+
encoder.finish().unwrap();
407
+
408
+
// decode and verify
409
+
let cursor = Cursor::new(buffer);
410
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
411
+
412
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
413
+
let decoded_items = decoded_items.unwrap();
414
+
415
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
416
+
assert_eq!(original.timestamp, decoded.timestamp);
417
+
assert_eq!(original.access().id, decoded.access().id);
418
+
}
419
+
}
420
+
421
+
#[test]
422
+
fn test_empty_decode() {
423
+
let buffer = Vec::new();
424
+
let cursor = Cursor::new(buffer);
425
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
426
+
427
+
let result = decoder.decode().unwrap();
428
+
assert!(result.is_none());
429
+
}
430
+
431
+
#[test]
432
+
fn test_backwards_timestamp() {
433
+
let items = vec![
434
+
Item::new(
435
+
1000,
436
+
&TestData {
437
+
id: 1,
438
+
value: "first".to_string(),
439
+
},
440
+
),
441
+
Item::new(
442
+
900,
443
+
&TestData {
444
+
id: 2,
445
+
value: "second".to_string(),
446
+
},
447
+
),
448
+
];
449
+
450
+
let mut buffer = Vec::new();
451
+
let mut encoder = ItemEncoder::new(&mut buffer);
452
+
453
+
for item in &items {
454
+
encoder.encode(item).unwrap();
455
+
}
456
+
encoder.finish().unwrap();
457
+
458
+
let cursor = Cursor::new(buffer);
459
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
460
+
461
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
462
+
let decoded_items = decoded_items.unwrap();
463
+
464
+
assert_eq!(decoded_items.len(), 2);
465
+
assert_eq!(decoded_items[0].timestamp, 1000);
466
+
assert_eq!(decoded_items[1].timestamp, 900);
467
+
}
468
+
469
+
#[test]
470
+
fn test_different_data_sizes() {
471
+
let small_data = TestData {
472
+
id: 1,
473
+
value: "x".to_string(),
474
+
};
475
+
let large_data = TestData {
476
+
id: 2,
477
+
value: "a".repeat(1000),
478
+
};
479
+
480
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
481
+
482
+
let mut buffer = Vec::new();
483
+
let mut encoder = ItemEncoder::new(&mut buffer);
484
+
485
+
for item in &items {
486
+
encoder.encode(item).unwrap();
487
+
}
488
+
encoder.finish().unwrap();
489
+
490
+
let cursor = Cursor::new(buffer);
491
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
492
+
493
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
494
+
let decoded_items = decoded_items.unwrap();
495
+
496
+
assert_eq!(decoded_items.len(), 2);
497
+
assert_eq!(decoded_items[0].access().value.as_str(), "x");
498
+
assert_eq!(decoded_items[1].access().value.len(), 1000);
499
+
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
500
+
}
501
+
}
+424
server/src/db_old/mod.rs
+424
server/src/db_old/mod.rs
···
1
+
use std::{
2
+
io::Cursor,
3
+
ops::{Bound, Deref, RangeBounds},
4
+
path::Path,
5
+
sync::{
6
+
Arc,
7
+
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
8
+
},
9
+
time::{Duration, Instant},
10
+
};
11
+
12
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
13
+
use ordered_varint::Variable;
14
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
15
+
use smol_str::SmolStr;
16
+
use tokio::sync::broadcast;
17
+
18
+
use crate::{
19
+
db_old::block::{ReadVariableExt, WriteVariableExt},
20
+
error::{AppError, AppResult},
21
+
jetstream::JetstreamEvent,
22
+
utils::{DefaultRateTracker, get_time},
23
+
};
24
+
25
+
mod block;
26
+
27
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
28
+
#[rkyv(compare(PartialEq), derive(Debug))]
29
+
pub struct NsidCounts {
30
+
pub count: u128,
31
+
pub deleted_count: u128,
32
+
pub last_seen: u64,
33
+
}
34
+
35
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
36
+
#[rkyv(compare(PartialEq), derive(Debug))]
37
+
pub struct NsidHit {
38
+
pub deleted: bool,
39
+
}
40
+
41
+
#[derive(Clone)]
42
+
pub struct EventRecord {
43
+
pub nsid: SmolStr,
44
+
pub timestamp: u64, // seconds
45
+
pub deleted: bool,
46
+
}
47
+
48
+
impl EventRecord {
49
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
50
+
match event {
51
+
JetstreamEvent::Commit {
52
+
time_us, commit, ..
53
+
} => Some(Self {
54
+
nsid: commit.collection.into(),
55
+
timestamp: time_us / 1_000_000,
56
+
deleted: false,
57
+
}),
58
+
JetstreamEvent::Delete {
59
+
time_us, commit, ..
60
+
} => Some(Self {
61
+
nsid: commit.collection.into(),
62
+
timestamp: time_us / 1_000_000,
63
+
deleted: true,
64
+
}),
65
+
_ => None,
66
+
}
67
+
}
68
+
}
69
+
70
+
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
71
+
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
72
+
type Item = block::Item<NsidHit>;
73
+
74
+
pub struct LexiconHandle {
75
+
tree: Partition,
76
+
buf: Arc<scc::Queue<EventRecord>>,
77
+
buf_len: AtomicUsize,
78
+
last_insert: AtomicU64,
79
+
eps: DefaultRateTracker,
80
+
block_size: AtomicUsize,
81
+
}
82
+
83
+
impl LexiconHandle {
84
+
fn new(keyspace: &Keyspace, nsid: &str) -> Self {
85
+
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
86
+
Self {
87
+
tree: keyspace.open_partition(nsid, opts).unwrap(),
88
+
buf: Default::default(),
89
+
buf_len: AtomicUsize::new(0),
90
+
last_insert: AtomicU64::new(0),
91
+
eps: DefaultRateTracker::new(Duration::from_secs(5)),
92
+
block_size: AtomicUsize::new(1000),
93
+
}
94
+
}
95
+
96
+
fn item_count(&self) -> usize {
97
+
self.buf_len.load(AtomicOrdering::Acquire)
98
+
}
99
+
100
+
fn last_insert(&self) -> u64 {
101
+
self.last_insert.load(AtomicOrdering::Acquire)
102
+
}
103
+
104
+
fn suggested_block_size(&self) -> usize {
105
+
self.block_size.load(AtomicOrdering::Relaxed)
106
+
}
107
+
108
+
fn insert(&self, event: EventRecord) {
109
+
self.buf.push(event);
110
+
self.buf_len.fetch_add(1, AtomicOrdering::Release);
111
+
self.last_insert
112
+
.store(get_time().as_millis() as u64, AtomicOrdering::Release);
113
+
self.eps.observe(1);
114
+
let rate = self.eps.rate() as usize;
115
+
if rate != 0 {
116
+
self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
117
+
}
118
+
}
119
+
120
+
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
121
+
let mut writer = ItemEncoder::new(Vec::with_capacity(
122
+
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
123
+
));
124
+
let mut start_timestamp = None;
125
+
let mut end_timestamp = None;
126
+
let mut written = 0_usize;
127
+
while let Some(event) = self.buf.pop() {
128
+
let item = Item::new(
129
+
event.timestamp,
130
+
&NsidHit {
131
+
deleted: event.deleted,
132
+
},
133
+
);
134
+
writer.encode(&item)?;
135
+
if start_timestamp.is_none() {
136
+
start_timestamp = Some(event.timestamp);
137
+
}
138
+
end_timestamp = Some(event.timestamp);
139
+
if written >= max_block_size {
140
+
break;
141
+
}
142
+
written += 1;
143
+
}
144
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
145
+
self.buf_len.store(0, AtomicOrdering::Release);
146
+
let value = writer.finish()?;
147
+
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
148
+
key.write_varint(start_timestamp)?;
149
+
key.write_varint(end_timestamp)?;
150
+
self.tree.insert(key, value)?;
151
+
}
152
+
Ok(written)
153
+
}
154
+
}
155
+
156
+
type BoxedIter<T> = Box<dyn Iterator<Item = T>>;
157
+
158
+
// counts is nsid -> NsidCounts
159
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
160
+
pub struct Db {
161
+
inner: Keyspace,
162
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
163
+
counts: Partition,
164
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
165
+
eps: DefaultRateTracker,
166
+
min_block_size: usize,
167
+
max_block_size: usize,
168
+
max_last_activity: Duration,
169
+
}
170
+
171
+
impl Db {
172
+
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
173
+
tracing::info!("opening db...");
174
+
let ks = Config::new(path)
175
+
.cache_size(8 * 1024 * 1024) // from talna
176
+
.open()?;
177
+
Ok(Self {
178
+
hits: Default::default(),
179
+
counts: ks.open_partition(
180
+
"_counts",
181
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
182
+
)?,
183
+
inner: ks,
184
+
event_broadcaster: broadcast::channel(1000).0,
185
+
eps: DefaultRateTracker::new(Duration::from_secs(1)),
186
+
min_block_size: 512,
187
+
max_block_size: 100_000,
188
+
max_last_activity: Duration::from_secs(10),
189
+
})
190
+
}
191
+
192
+
pub fn sync(&self, all: bool) -> AppResult<()> {
193
+
let _guard = scc::ebr::Guard::new();
194
+
for (nsid, tree) in self.hits.iter(&_guard) {
195
+
let count = tree.item_count();
196
+
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
197
+
let is_too_old = (get_time().as_millis() as u64 - tree.last_insert())
198
+
> self.max_last_activity.as_millis() as u64;
199
+
if count > 0 && (all || is_max_block_size || is_too_old) {
200
+
loop {
201
+
let synced = tree.sync(self.max_block_size)?;
202
+
if synced == 0 {
203
+
break;
204
+
}
205
+
tracing::info!("synced {synced} of {nsid} to db");
206
+
}
207
+
}
208
+
}
209
+
Ok(())
210
+
}
211
+
212
+
#[inline(always)]
213
+
pub fn eps(&self) -> usize {
214
+
self.eps.rate() as usize
215
+
}
216
+
217
+
#[inline(always)]
218
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
219
+
self.event_broadcaster.subscribe()
220
+
}
221
+
222
+
#[inline(always)]
223
+
fn maybe_run_in_nsid_tree<T>(
224
+
&self,
225
+
nsid: &str,
226
+
f: impl FnOnce(&LexiconHandle) -> T,
227
+
) -> Option<T> {
228
+
let _guard = scc::ebr::Guard::new();
229
+
let handle = match self.hits.peek(nsid, &_guard) {
230
+
Some(handle) => handle.clone(),
231
+
None => {
232
+
if self.inner.partition_exists(nsid) {
233
+
let handle = Arc::new(LexiconHandle::new(&self.inner, nsid));
234
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
235
+
handle
236
+
} else {
237
+
return None;
238
+
}
239
+
}
240
+
};
241
+
Some(f(&handle))
242
+
}
243
+
244
+
#[inline(always)]
245
+
fn run_in_nsid_tree<T>(
246
+
&self,
247
+
nsid: SmolStr,
248
+
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
249
+
) -> AppResult<T> {
250
+
f(self
251
+
.hits
252
+
.entry(nsid.clone())
253
+
.or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
254
+
.get())
255
+
}
256
+
257
+
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
258
+
let EventRecord {
259
+
nsid,
260
+
timestamp,
261
+
deleted,
262
+
} = e.clone();
263
+
264
+
// insert event
265
+
self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
266
+
// increment count
267
+
let mut counts = self.get_count(&nsid)?;
268
+
counts.last_seen = timestamp;
269
+
if deleted {
270
+
counts.deleted_count += 1;
271
+
} else {
272
+
counts.count += 1;
273
+
}
274
+
self.insert_count(&nsid, counts.clone())?;
275
+
if self.event_broadcaster.receiver_count() > 0 {
276
+
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
277
+
}
278
+
self.eps.observe(1);
279
+
Ok(())
280
+
}
281
+
282
+
#[inline(always)]
283
+
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
284
+
self.counts
285
+
.insert(
286
+
nsid,
287
+
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
288
+
)
289
+
.map_err(AppError::from)
290
+
}
291
+
292
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
293
+
let Some(raw) = self.counts.get(nsid)? else {
294
+
return Ok(NsidCounts::default());
295
+
};
296
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
297
+
}
298
+
299
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
300
+
self.counts.iter().map(|res| {
301
+
res.map_err(AppError::from).map(|(key, val)| {
302
+
(
303
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
304
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
305
+
)
306
+
})
307
+
})
308
+
}
309
+
310
+
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
311
+
self.inner
312
+
.list_partitions()
313
+
.into_iter()
314
+
.filter(|k| k.deref() != "_counts")
315
+
}
316
+
317
+
pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> {
318
+
self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> {
319
+
Box::new(
320
+
handle
321
+
.tree
322
+
.iter()
323
+
.rev()
324
+
.map(|res| res.map_err(AppError::from)),
325
+
)
326
+
})
327
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
328
+
}
329
+
330
+
pub fn get_hits(
331
+
&self,
332
+
nsid: &str,
333
+
range: impl RangeBounds<u64> + std::fmt::Debug,
334
+
) -> BoxedIter<AppResult<Item>> {
335
+
let start = range
336
+
.start_bound()
337
+
.cloned()
338
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
339
+
let end = range
340
+
.end_bound()
341
+
.cloned()
342
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
343
+
let limit = match range.end_bound().cloned() {
344
+
Bound::Included(end) => end,
345
+
Bound::Excluded(end) => end.saturating_sub(1),
346
+
Bound::Unbounded => u64::MAX,
347
+
};
348
+
349
+
self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> {
350
+
let map_block = move |(key, val)| {
351
+
let mut key_reader = Cursor::new(key);
352
+
let start_timestamp = key_reader.read_varint::<u64>()?;
353
+
let items =
354
+
ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
355
+
item.as_ref().map_or(true, |item| item.timestamp <= limit)
356
+
});
357
+
Ok(items)
358
+
};
359
+
360
+
Box::new(
361
+
handle
362
+
.tree
363
+
.range(TimestampRange { start, end })
364
+
.map(move |res| res.map_err(AppError::from).and_then(map_block))
365
+
.flatten()
366
+
.flatten(),
367
+
)
368
+
})
369
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
370
+
}
371
+
372
+
pub fn tracking_since(&self) -> AppResult<u64> {
373
+
// HACK: we should actually store when we started tracking but im lazy
374
+
// should be accurate enough
375
+
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
376
+
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
377
+
return Ok(0);
378
+
};
379
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
380
+
timestamp_reader
381
+
.read_varint::<u64>()
382
+
.map_err(AppError::from)
383
+
})
384
+
.unwrap_or(Ok(0))
385
+
}
386
+
}
387
+
388
+
type TimestampRepr = Vec<u8>;
389
+
390
+
struct TimestampRange {
391
+
start: Bound<TimestampRepr>,
392
+
end: Bound<TimestampRepr>,
393
+
}
394
+
395
+
impl RangeBounds<TimestampRepr> for TimestampRange {
396
+
#[inline(always)]
397
+
fn start_bound(&self) -> Bound<&TimestampRepr> {
398
+
self.start.as_ref()
399
+
}
400
+
401
+
#[inline(always)]
402
+
fn end_bound(&self) -> Bound<&TimestampRepr> {
403
+
self.end.as_ref()
404
+
}
405
+
}
406
+
407
+
type TimestampReprOld = [u8; 8];
408
+
409
+
struct TimestampRangeOld {
410
+
start: Bound<TimestampReprOld>,
411
+
end: Bound<TimestampReprOld>,
412
+
}
413
+
414
+
impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
415
+
#[inline(always)]
416
+
fn start_bound(&self) -> Bound<&TimestampReprOld> {
417
+
self.start.as_ref()
418
+
}
419
+
420
+
#[inline(always)]
421
+
fn end_bound(&self) -> Bound<&TimestampReprOld> {
422
+
self.end.as_ref()
423
+
}
424
+
}
+4
-17
server/src/main.rs
+4
-17
server/src/main.rs
···
17
17
18
18
mod api;
19
19
mod db;
20
+
mod db_old;
20
21
mod error;
21
22
mod jetstream;
22
23
mod utils;
···
51
52
}
52
53
Some("debug") => {
53
54
debug();
54
-
return;
55
-
}
56
-
Some("traindict") => {
57
-
train_zstd_dict();
58
55
return;
59
56
}
60
57
Some(x) => {
···
211
208
db.sync(true).expect("cant sync db");
212
209
}
213
210
214
-
fn train_zstd_dict() {
215
-
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
216
-
let dict_data = db.train_zstd_dict().expect("cant train zstd dict");
217
-
std::fs::write("zstd_dict", dict_data).expect("cant save zstd dict")
218
-
}
219
-
220
211
fn debug() {
221
212
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
222
213
let info = db.info().expect("cant get db info");
···
269
260
270
261
fn migrate() {
271
262
let cancel_token = CancellationToken::new();
272
-
let from = Arc::new(
273
-
Db::new(
274
-
DbConfig::default().path(".fjall_data_from"),
275
-
cancel_token.child_token(),
276
-
)
277
-
.expect("couldnt create db"),
278
-
);
263
+
264
+
let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db"));
265
+
279
266
let to = Arc::new(
280
267
Db::new(
281
268
DbConfig::default().path(".fjall_data_to").ks(|c| {