Your locally hosted lumina server for IDAPro
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Metrics for prometheus (#105)

* add metrics for prometheus

* metrics: rename metrics

* metrics: add lumen_queried_total

authored by

Naim A and committed by
GitHub
8f6fd977 2d41a925

+139 -12
+31
Cargo.lock
··· 237 237 "log", 238 238 "native-tls", 239 239 "postgres-native-tls", 240 + "prometheus-client", 240 241 "serde", 241 242 "tokio", 242 243 "tokio-postgres", ··· 344 345 "crypto-common", 345 346 "subtle", 346 347 ] 348 + 349 + [[package]] 350 + name = "dtoa" 351 + version = "1.0.9" 352 + source = "registry+https://github.com/rust-lang/crates.io-index" 353 + checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" 347 354 348 355 [[package]] 349 356 name = "encoding_rs" ··· 777 784 "log", 778 785 "native-tls", 779 786 "pretty_env_logger", 787 + "prometheus-client", 780 788 "tokio", 781 789 "tokio-native-tls", 782 790 "warp", ··· 1088 1096 checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" 1089 1097 dependencies = [ 1090 1098 "unicode-ident", 1099 + ] 1100 + 1101 + [[package]] 1102 + name = "prometheus-client" 1103 + version = "0.21.2" 1104 + source = "registry+https://github.com/rust-lang/crates.io-index" 1105 + checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" 1106 + dependencies = [ 1107 + "dtoa", 1108 + "itoa", 1109 + "parking_lot", 1110 + "prometheus-client-derive-encode", 1111 + ] 1112 + 1113 + [[package]] 1114 + name = "prometheus-client-derive-encode" 1115 + version = "0.4.2" 1116 + source = "registry+https://github.com/rust-lang/crates.io-index" 1117 + checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" 1118 + dependencies = [ 1119 + "proc-macro2", 1120 + "quote", 1121 + "syn", 1091 1122 ] 1092 1123 1093 1124 [[package]]
+1
common/Cargo.toml
··· 21 21 diesel = {version = "2.1", optional = true, default-features = false, features = ["postgres_backend", "time"]} 22 22 diesel-async = {version = "0.3", optional = true, features = ["postgres", "bb8"]} 23 23 anyhow = "1.0" 24 + prometheus-client = "0.21.2" 24 25 25 26 [features] 26 27 default = ["web", "db"]
+2
common/src/lib.rs
··· 9 9 pub mod rpc; 10 10 pub mod web; 11 11 pub mod async_drop; 12 + pub mod metrics; 12 13 13 14 pub struct SharedState_ { 14 15 pub db: db::Database, 15 16 pub config: std::sync::Arc<config::Config>, 16 17 pub server_name: String, 18 + pub metrics: metrics::Metrics, 17 19 } 18 20 19 21 pub type SharedState = std::sync::Arc<SharedState_>;
+64
common/src/metrics.rs
··· 1 + use std::sync::atomic::AtomicI64; 2 + 3 + use prometheus_client::{registry::Registry, metrics::{gauge::Gauge, family::Family, counter::Counter}, encoding::EncodeLabelSet}; 4 + 5 + pub struct Metrics { 6 + pub registry: Registry, 7 + 8 + /// Count active lumina connections 9 + pub active_connections: Gauge<i64, AtomicI64>, 10 + 11 + /// Record connected client versions 12 + pub lumina_version: Family<LuminaVersion, Gauge>, 13 + 14 + /// Count new functions pushes 15 + pub new_funcs: Counter<u64>, 16 + 17 + /// Count pushed functions 18 + pub pushes: Counter<u64>, 19 + 20 + /// Count pulled functions (only found) 21 + pub pulls: Counter<u64>, 22 + 23 + /// Queried functions 24 + pub queried_funcs: Counter<u64>, 25 + } 26 + 27 + #[derive(EncodeLabelSet, Debug, Hash, Eq, PartialEq, Clone)] 28 + pub struct LuminaVersion { 29 + pub protocol_version: u32, 30 + } 31 + 32 + impl Default for Metrics { 33 + fn default() -> Self { 34 + let mut registry = Registry::default(); 35 + 36 + let active_connections = Gauge::default(); 37 + registry.register("lumen_active_connections", "Active Lumina connections", active_connections.clone()); 38 + 39 + let lumina_version = Family::<LuminaVersion, Gauge>::default(); 40 + registry.register("lumen_protocol_version", "Version of Lumina protocol being used", lumina_version.clone()); 41 + 42 + let new_funcs = Counter::default(); 43 + registry.register("lumen_new_funcs", "Pushes previously unknown functions", new_funcs.clone()); 44 + 45 + let pushes = Counter::default(); 46 + registry.register("lumen_pushes_total", "Total pushes functions", pushes.clone()); 47 + 48 + let pulls = Counter::default(); 49 + registry.register("lumen_pulls_total", "Total pulled functions", pulls.clone()); 50 + 51 + let queried_funcs = Counter::default(); 52 + registry.register("lumen_queried_total", "Total Queried functions", queried_funcs.clone()); 53 + 54 + Metrics { 55 + registry, 56 + active_connections, 57 + lumina_version, 58 + new_funcs, 59 + pushes, 60 + pulls, 61 + queried_funcs, 62 + } 63 + } 64 + }
+1
lumen/Cargo.toml
··· 15 15 tokio-native-tls = "0.3" 16 16 native-tls = {version = "0.2"} 17 17 warp = "0.3" 18 + prometheus-client = "0.21.2"
+20 -8
lumen/src/main.rs
··· 5 5 #![deny(clippy::all)] 6 6 7 7 use common::async_drop::AsyncDropper; 8 + use common::metrics::LuminaVersion; 8 9 use common::rpc::{RpcHello, RpcFail, HelloResult}; 9 10 use native_tls::Identity; 10 11 use clap::Arg; ··· 12 13 use tokio::time::timeout; 13 14 use std::collections::HashMap; 14 15 use std::mem::discriminant; 15 - use std::sync::atomic::{AtomicU32, Ordering}; 16 16 use std::time::{Duration, Instant}; 17 17 use std::{borrow::Cow, sync::Arc}; 18 18 use tokio::{net::TcpListener, io::AsyncWrite, io::AsyncRead}; ··· 87 87 return Err(Error::Timeout); 88 88 } 89 89 }; 90 - debug!("pull {}/{} funcs ended after {:?}", funcs.iter().filter(|v| v.is_some()).count(), md.funcs.len(), start.elapsed()); 90 + let pulled_funcs = funcs.iter().filter(|v| v.is_some()).count(); 91 + state.metrics.pulls.inc_by(pulled_funcs as _); 92 + state.metrics.queried_funcs.inc_by(md.funcs.len() as _); 93 + debug!("pull {pulled_funcs}/{} funcs ended after {:?}", md.funcs.len(), start.elapsed()); 91 94 92 95 let statuses: Vec<u32> = funcs.iter().map(|v| u32::from(v.is_none())).collect(); 93 96 let found = funcs ··· 127 130 return Ok(()); 128 131 } 129 132 }; 130 - debug!("push {} funcs ended after {:?}", status.len(), start.elapsed()); 133 + state.metrics.pushes.inc_by(status.len() as _); 134 + let new_funcs = status 135 + .iter() 136 + .fold(0u64, |counter, &v| if v > 0 { counter + 1 } else {counter}); 137 + state.metrics.new_funcs.inc_by(new_funcs); 138 + debug!("push {} funcs ended after {:?} ({new_funcs} new)", status.len(), start.elapsed()); 131 139 132 140 RpcMessage::PushMetadataResult(rpc::PushMetadataResult { 133 141 status: Cow::Owned(status), ··· 187 195 return Ok(()); 188 196 } 189 197 }; 198 + state.metrics.lumina_version.get_or_create(&LuminaVersion { 199 + protocol_version: hello.protocol_version, 200 + }).inc(); 190 201 191 202 if let Some(ref creds) = creds { 192 203 if creds.username != "guest" { ··· 229 240 } 230 241 231 242 async fn serve(listener: TcpListener, accpt: Option<tokio_native_tls::TlsAcceptor>, state: SharedState, mut shutdown_signal: tokio::sync::oneshot::Receiver<()>) { 232 - static COUNTER: AtomicU32 = AtomicU32::new(0); 233 243 let accpt = accpt.map(Arc::new); 234 244 235 245 let (async_drop, worker) = AsyncDropper::new(); ··· 264 274 let accpt = accpt.clone(); 265 275 266 276 let conns2 = connections.clone(); 277 + let counter = state.metrics.active_connections.clone(); 267 278 let guard = async_drop.defer(async move { 268 - let count = { 269 - COUNTER.fetch_sub(1, Ordering::Relaxed) - 1 270 - }; 279 + let count = counter.dec() - 1; 271 280 debug!("connection with {:?} ended after {:?}; {} active connections", addr, start.elapsed(), count); 272 281 273 282 let mut guard = conns2.lock().await; ··· 275 284 error!("Couldn't remove connection from set {addr}"); 276 285 } 277 286 }); 287 + 288 + let counter = state.metrics.active_connections.clone(); 278 289 let handle = tokio::spawn(async move { 279 290 let _guard = guard; 280 291 let count = { 281 - COUNTER.fetch_add(1, Ordering::Relaxed) + 1 292 + counter.inc() + 1 282 293 }; 283 294 let protocol = if accpt.is_some() {" [TLS]"} else {""}; 284 295 debug!("Connection from {:?}{}: {} active connections", &addr, protocol, count); ··· 353 364 db, 354 365 config, 355 366 server_name, 367 + metrics: common::metrics::Metrics::default(), 356 368 }); 357 369 358 370 let tls_acceptor;
+20 -4
lumen/src/web.rs
··· 1 1 use std::net::SocketAddr; 2 2 3 - use warp::{self, Filter}; 3 + use log::error; 4 + use warp::{Filter, hyper::StatusCode, reply::Response}; 4 5 use common::{SharedState, web::api::api_root}; 5 6 6 7 pub async fn start_webserver<A: Into<SocketAddr> + 'static>(bind_addr: A, shared_state: SharedState) { 7 8 let root = warp::get() 8 9 .and(warp::path::end()) 9 10 .map(|| warp::reply::html(include_str!("home.html"))); 10 - 11 + 12 + let shared_state1 = shared_state.clone(); 11 13 let api = warp::path("api") 12 - .and(api_root(shared_state)); 14 + .and(api_root(shared_state1)); 15 + 16 + let metrics = warp::get().and(warp::path("metrics")).and(warp::path::end()) 17 + .map(move || { 18 + let mut res = String::new(); 19 + if let Err(err) = prometheus_client::encoding::text::encode(&mut res, &shared_state.metrics.registry) { 20 + error!("failed to encode metrics: {err}"); 21 + let mut r = Response::default(); 22 + *r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; 23 + r 24 + } else { 25 + warp::reply::Response::new(res.into()) 26 + } 27 + }); 13 28 14 29 let routes = root 15 - .or(api); 30 + .or(api) 31 + .or(metrics); 16 32 17 33 warp::serve(routes) 18 34 .run(bind_addr).await;