+1
ufos/src/main.rs
+1
ufos/src/main.rs
···
162
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
163
loop {
164
interval.tick().await;
165
+
read_store.update_metrics();
166
match read_store.get_consumer_info().await {
167
Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"),
168
Ok(ConsumerInfo::Jetstream {
+2
ufos/src/storage.rs
+2
ufos/src/storage.rs
+27
-1
ufos/src/storage_fjall.rs
+27
-1
ufos/src/storage_fjall.rs
···
23
Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
24
};
25
use jetstream::events::Cursor;
26
-
use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
27
use std::collections::{HashMap, HashSet};
28
use std::iter::Peekable;
29
use std::ops::Bound;
···
227
feeds: feeds.clone(),
228
records: records.clone(),
229
rollups: rollups.clone(),
230
};
231
let writer = FjallWriter {
232
bg_taken: Arc::new(AtomicBool::new(false)),
233
keyspace,
···
250
feeds: PartitionHandle,
251
records: PartitionHandle,
252
rollups: PartitionHandle,
253
}
254
255
/// An iterator that knows how to skip over deleted/invalidated records
···
381
type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>;
382
383
impl FjallReader {
384
fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
385
let rollup_cursor =
386
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
···
999
impl StoreReader for FjallReader {
1000
fn name(&self) -> String {
1001
"fjall storage v2".into()
1002
}
1003
async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
1004
let s = self.clone();
···
23
Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
24
};
25
use jetstream::events::Cursor;
26
+
use lsm_tree::AbstractTree;
27
+
use metrics::{
28
+
counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit,
29
+
};
30
use std::collections::{HashMap, HashSet};
31
use std::iter::Peekable;
32
use std::ops::Bound;
···
230
feeds: feeds.clone(),
231
records: records.clone(),
232
rollups: rollups.clone(),
233
+
queues: queues.clone(),
234
};
235
+
reader.describe_metrics();
236
let writer = FjallWriter {
237
bg_taken: Arc::new(AtomicBool::new(false)),
238
keyspace,
···
255
feeds: PartitionHandle,
256
records: PartitionHandle,
257
rollups: PartitionHandle,
258
+
queues: PartitionHandle,
259
}
260
261
/// An iterator that knows how to skip over deleted/invalidated records
···
387
type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>;
388
389
impl FjallReader {
390
+
fn describe_metrics(&self) {
391
+
describe_gauge!(
392
+
"storage_fjall_l0_run_count",
393
+
Unit::Count,
394
+
"number of L0 runs in a partition"
395
+
);
396
+
}
397
+
398
fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
399
let rollup_cursor =
400
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
···
1013
impl StoreReader for FjallReader {
1014
fn name(&self) -> String {
1015
"fjall storage v2".into()
1016
+
}
1017
+
fn update_metrics(&self) {
1018
+
gauge!("storage_fjall_l0_run_count", "partition" => "global")
1019
+
.set(self.global.tree.l0_run_count() as f64);
1020
+
gauge!("storage_fjall_l0_run_count", "partition" => "feeds")
1021
+
.set(self.feeds.tree.l0_run_count() as f64);
1022
+
gauge!("storage_fjall_l0_run_count", "partition" => "records")
1023
+
.set(self.records.tree.l0_run_count() as f64);
1024
+
gauge!("storage_fjall_l0_run_count", "partition" => "rollups")
1025
+
.set(self.rollups.tree.l0_run_count() as f64);
1026
+
gauge!("storage_fjall_l0_run_count", "partition" => "queues")
1027
+
.set(self.queues.tree.l0_run_count() as f64);
1028
}
1029
async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
1030
let s = self.clone();