+3
-2
ufos/src/consumer.rs
+3
-2
ufos/src/consumer.rs
···
121
created_collection = true;
122
Default::default()
123
});
124
+
collection.total_seen += 1;
125
+
collection.samples.push_front(record);
126
+
collection.samples.truncate(MAX_BATCHED_RECORDS);
127
128
if created_collection {
129
self.current_batch.records.len() >= MAX_BATCHED_COLLECTIONS // full if we have collections to the max
+7
-1
ufos/src/lib.rs
+7
-1
ufos/src/lib.rs
···
12
pub record: serde_json::Value,
13
}
14
15
#[derive(Debug)]
16
pub struct DeleteRecord {
17
pub did: Did,
···
21
22
#[derive(Debug, Default)]
23
pub struct EventBatch {
24
-
pub records: HashMap<Nsid, VecDeque<SetRecord>>,
25
pub record_deletes: Vec<DeleteRecord>,
26
pub account_removes: Vec<Did>,
27
}
···
12
pub record: serde_json::Value,
13
}
14
15
+
#[derive(Debug, Default)]
16
+
pub struct CollectionSamples {
17
+
pub total_seen: usize,
18
+
pub samples: VecDeque<SetRecord>,
19
+
}
20
+
21
#[derive(Debug)]
22
pub struct DeleteRecord {
23
pub did: Did,
···
27
28
#[derive(Debug, Default)]
29
pub struct EventBatch {
30
+
pub records: HashMap<Nsid, CollectionSamples>,
31
pub record_deletes: Vec<DeleteRecord>,
32
pub account_removes: Vec<Did>,
33
}
+3
-2
ufos/src/store.rs
+3
-2
ufos/src/store.rs
···
20
record_deletes,
21
account_removes,
22
} = batch;
23
-
let total_records: usize = records.values().map(|v| v.len()).sum();
24
println!(
25
-
"got batch with {total_records} records in {} collections, {} record deletes, {} account removes",
26
records.len(),
27
record_deletes.len(),
28
account_removes.len()
···
20
record_deletes,
21
account_removes,
22
} = batch;
23
+
let total_records: usize = records.values().map(|v| v.total_seen).sum();
24
+
let total_samples: usize = records.values().map(|v| v.samples.len()).sum();
25
println!(
26
+
"got batch of {total_samples: >3} samples from {total_records: >3} records in {: >2} collections, {: >2} record deletes, {} account removes",
27
records.len(),
28
record_deletes.len(),
29
account_removes.len()