PC Power metrics ingester for Home Assistant
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

attempt handle mqtt connection error & exponential backoff

+107 -11
+61
src-tauri/Cargo.lock
··· 2149 2149 "tauri-plugin-autostart", 2150 2150 "tauri-plugin-opener", 2151 2151 "tokio", 2152 + "tokio-retry2", 2152 2153 "tokio-util", 2153 2154 "tracing", 2154 2155 "tracing-subscriber", ··· 2998 2999 ] 2999 3000 3000 3001 [[package]] 3002 + name = "pin-project" 3003 + version = "1.1.10" 3004 + source = "registry+https://github.com/rust-lang/crates.io-index" 3005 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 3006 + dependencies = [ 3007 + "pin-project-internal", 3008 + ] 3009 + 3010 + [[package]] 3011 + name = "pin-project-internal" 3012 + version = "1.1.10" 3013 + source = "registry+https://github.com/rust-lang/crates.io-index" 3014 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 3015 + dependencies = [ 3016 + "proc-macro2", 3017 + "quote", 3018 + "syn 2.0.108", 3019 + ] 3020 + 3021 + [[package]] 3001 3022 name = "pin-project-lite" 3002 3023 version = "0.2.16" 3003 3024 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3214 3235 ] 3215 3236 3216 3237 [[package]] 3238 + name = "rand" 3239 + version = "0.9.2" 3240 + source = "registry+https://github.com/rust-lang/crates.io-index" 3241 + checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" 3242 + dependencies = [ 3243 + "rand_chacha 0.9.0", 3244 + "rand_core 0.9.3", 3245 + ] 3246 + 3247 + [[package]] 3217 3248 name = "rand_chacha" 3218 3249 version = "0.2.2" 3219 3250 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3234 3265 ] 3235 3266 3236 3267 [[package]] 3268 + name = "rand_chacha" 3269 + version = "0.9.0" 3270 + source = "registry+https://github.com/rust-lang/crates.io-index" 3271 + checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" 3272 + dependencies = [ 3273 + "ppv-lite86", 3274 + "rand_core 0.9.3", 3275 + ] 3276 + 3277 + [[package]] 3237 3278 name = "rand_core" 3238 3279 version = "0.5.1" 3239 3280 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3249 3290 checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 3250 3291 dependencies = [ 3251 3292 "getrandom 0.2.16", 3293 + ] 3294 + 3295 + [[package]] 3296 + name = "rand_core" 3297 + version = "0.9.3" 3298 + source = "registry+https://github.com/rust-lang/crates.io-index" 3299 + checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" 3300 + dependencies = [ 3301 + "getrandom 0.3.4", 3252 3302 ] 3253 3303 3254 3304 [[package]] ··· 4625 4675 checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" 4626 4676 dependencies = [ 4627 4677 "native-tls", 4678 + "tokio", 4679 + ] 4680 + 4681 + [[package]] 4682 + name = "tokio-retry2" 4683 + version = "0.7.0" 4684 + source = "registry+https://github.com/rust-lang/crates.io-index" 4685 + checksum = "ae5ed96d30859f64c721d9155b66c53c39867e2ced2f0614ba61da5665440b0a" 4686 + dependencies = [ 4687 + "pin-project", 4688 + "rand 0.9.2", 4628 4689 "tokio", 4629 4690 ] 4630 4691
+1
src-tauri/Cargo.toml
··· 48 48 objc2-io-kit = "0.3.2" 49 49 objc2-core-foundation = "0.3.2" 50 50 objc2-foundation = "0.3.2" 51 + tokio-retry2 = { version = "0.7.0", features = ["jitter"] } 51 52 52 53 [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] 53 54 tauri-plugin-autostart = "2"
-2
src-tauri/src/lib.rs
··· 1 1 #![feature(gethostname)] 2 2 #![feature(result_option_map_or_default)] 3 3 4 - use std::str::FromStr; 5 - 6 4 pub mod app_config; 7 5 pub mod constants; 8 6 pub mod hex_slice;
+3 -1
src-tauri/src/main.rs
··· 8 8 tauri_app, 9 9 }; 10 10 use tokio::{sync::watch, task::JoinHandle}; 11 + use tokio_util::sync::CancellationToken; 11 12 use tracing::error; 12 13 13 14 #[tokio::main] ··· 34 35 tokio::spawn(async move { 35 36 let mut mqtt_loop_handle: Option<JoinHandle<()>> = None; 36 37 let mut smc_sender_handle: Option<JoinHandle<()>> = None; 38 + let ct = CancellationToken::new(); 37 39 38 40 loop { 39 41 // Drop previous handles first ··· 58 60 }; 59 61 60 62 let (mqttc, eventloop) = mqtt::create(host.unwrap(), port, username, password); 61 - mqtt_loop_handle = Some(mqtt::run_loop(eventloop)); 63 + mqtt_loop_handle = Some(mqtt::run_loop(eventloop, ct.clone())); 62 64 63 65 mqtt::interview(&mqttc, None, Some("MacBook Pro")) 64 66 .await
+42 -8
src-tauri/src/mqtt.rs
··· 4 4 use rootcause::{Report, prelude::ResultExt}; 5 5 use rumqttc::{AsyncClient, EventLoop, MqttOptions}; 6 6 use serde_json::{Value, json}; 7 - use tracing::{info, instrument, trace}; 7 + use tokio::select; 8 + use tokio_retry2::strategy::{ExponentialBackoff, ExponentialFactorBackoff, jitter}; 9 + use tokio_util::sync::CancellationToken; 10 + use tracing::{error, info, instrument, trace, warn}; 8 11 9 12 use crate::{hex_slice::HexSlice, smc::PowerData}; 10 13 ··· 272 275 } 273 276 274 277 #[instrument(skip(e))] 275 - pub fn run_loop(mut e: EventLoop) -> tokio::task::JoinHandle<()> { 276 - tokio::spawn(async move { 277 - trace!("Polling MQTT event loop"); 278 - // select! {} 279 - while let Ok(notif) = e.poll().await { 280 - trace!("Received: {notif:?}"); 278 + pub fn run_loop(e: EventLoop, ct: CancellationToken) -> tokio::task::JoinHandle<()> { 279 + trace!("Polling MQTT event loop"); 280 + tokio::spawn(run_loop_inner(e, ct)) 281 + } 282 + 283 + async fn run_loop_inner(mut e: EventLoop, ct: CancellationToken) { 284 + let mut retry_strategy = generate_default_backoff(); 285 + 286 + loop { 287 + select! { 288 + e = e.poll() => { 289 + if e.is_ok() { 290 + retry_strategy = generate_default_backoff(); 291 + continue; 292 + } 293 + 294 + let err = e.unwrap_err(); 295 + match err { 296 + rumqttc::ConnectionError::ConnectionRefused(connect_return_code) => { 297 + error!("MQTT Connection Refused - Error code {connect_return_code:?}"); 298 + break 299 + }, 300 + e => error!("MQTT connection error occurred - {e:?}") 301 + } 302 + 303 + let sleep_time = retry_strategy.next().unwrap(); 304 + error!("Waiting for {} to retry poll", sleep_time.as_secs()); 305 + tokio::time::sleep(sleep_time).await; 306 + } 307 + _ = ct.cancelled() => { 308 + warn!("Caught cancellation token!"); 309 + break 310 + } 281 311 } 282 - }) 312 + } 313 + } 314 + 315 + fn generate_default_backoff() -> ExponentialFactorBackoff { 316 + ExponentialFactorBackoff::from_millis(5_000, 1.5).max_delay(Duration::from_secs(60)) 283 317 }