A RPi Pico powered Lightning Detector
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Timestamped detection loop

+90 -39
+2 -2
src/adc.rs
··· 1 1 use embassy_rp::{ 2 + Peri, 2 3 adc::{self, Adc, AdcPin, Async, Config}, 3 4 dma, 4 5 peripherals::ADC, 5 - Peri, 6 6 }; 7 7 use embassy_sync::{blocking_mutex::raw::RawMutex, mutex::Mutex}; 8 8 ··· 50 50 /// it tries again until the sampling succeeds. 51 51 pub async fn sample(&self, buf: &mut [u16]) { 52 52 // Gain a lock to the inner ADC state, then do a read. 53 - self.inner.lock().await.read_many(buf.as_mut()).await; 53 + self.inner.lock().await.read_many(buf).await; 54 54 } 55 55 56 56 pub async fn sample_average(&self, buf: &mut [u16]) -> u16 {
+42 -20
src/detector.rs
··· 1 1 mod analysis; 2 2 mod data; 3 3 4 - use core::cell::{Cell, RefCell, RefMut}; 4 + use core::cell::Cell; 5 5 6 6 use alloc::vec::Vec; 7 - use defmt::{info, unwrap}; 7 + use chrono::TimeDelta; 8 + use sachy_fmt::{info, unwrap}; 8 9 9 10 use embassy_futures::select::select3; 10 11 use embassy_rp::peripherals::DMA_CH1; ··· 17 18 use crate::{ 18 19 adc::AdcDriver, 19 20 pwm::PwmDriver, 21 + rtc::GlobalRtc, 20 22 state::{DEVICE_STATE, DeviceState}, 21 23 updates::{NET_CHANNEL, NetDataSender}, 22 - utils::{static_alloc, try_buffer, try_static_block_vecs}, 24 + utils::{static_alloc, try_buffer, try_static_timestamped_block_vecs}, 23 25 }; 24 26 25 27 #[embassy_executor::task] 26 28 pub async fn detector_task( 27 29 adc: AdcDriver<'static, NoopRawMutex, DMA_CH1>, 28 30 pwm: PwmDriver<'static>, 31 + rtc: GlobalRtc<'static>, 29 32 ) { 33 + while !rtc.is_running().await { 34 + info!("Waiting for RTC to start running"); 35 + Timer::after_secs(2).await; 36 + } 30 37 let net_data = NET_CHANNEL.sender(); 31 38 info!("Allocating detector resources"); 32 39 33 40 let blocks = unwrap!( 34 - try_static_block_vecs(4, 512), 41 + try_static_timestamped_block_vecs(4, 512), 35 42 "Couldn't allocate block buffers" 36 43 ); 37 44 38 - let buf_channel: &mut ZChannel<NoopRawMutex, Vec<u16>> = static_alloc(ZChannel::new(blocks)); 45 + let buf_channel: &mut ZChannel<NoopRawMutex, (i64, Vec<u16>)> = 46 + static_alloc(ZChannel::new(blocks)); 39 47 40 48 let (mut sender, mut receiver) = buf_channel.split(); 41 49 ··· 52 60 let average = detector.tune(samples.as_mut_slice()).await; 53 61 54 62 select3( 55 - detector.sample(&mut sender), 63 + detector.sample(&mut sender, rtc), 56 64 detector.analyse(&mut receiver, average, &mut peaks), 57 - detector.tick(), 65 + detector.tick(rtc), 58 66 ) 59 67 .await; 60 68 } ··· 64 72 adc: AdcDriver<'device, NoopRawMutex, DMA_CH1>, 65 73 pwm: PwmDriver<'device>, 66 74 state: DetectorState, 67 - net_data: RefCell<NetDataSender>, 75 + net_data: NetDataSender, 68 76 } 69 77 70 78 #[derive(Debug)] ··· 93 101 Self { 94 102 adc, 95 103 pwm, 96 - net_data: RefCell::new(net_data), 104 + net_data, 97 105 state: DetectorState::default(), 98 106 } 99 107 } ··· 135 143 self.adc.sample_average(samples).await 136 144 } 137 145 138 - async fn sample(&self, data: &mut Sender<'static, NoopRawMutex, Vec<u16>>) { 146 + async fn sample( 147 + &self, 148 + data: &mut Sender<'static, NoopRawMutex, (i64, Vec<u16>)>, 149 + rtc: GlobalRtc<'static>, 150 + ) { 151 + let time = unwrap!(rtc.get_timestamp().await, "Unable to get timestamp").and_utc(); 152 + let now = Instant::now(); 153 + 139 154 loop { 140 - let buf = data.send().await; 155 + let (timestamp, buf) = data.send().await; 156 + let elapsed = now.elapsed().as_micros() as i64; 157 + *timestamp = (time + TimeDelta::microseconds(elapsed)).timestamp(); 141 158 self.adc.sample(buf).await; 142 159 data.send_done(); 143 160 } ··· 145 162 146 163 async fn analyse( 147 164 &self, 148 - data: &mut Receiver<'static, NoopRawMutex, Vec<u16>>, 165 + data: &mut Receiver<'static, NoopRawMutex, (i64, Vec<u16>)>, 149 166 mut average: u16, 150 167 peaks: &mut Vec<u16>, 151 168 ) { 152 169 loop { 153 170 peaks.clear(); 154 171 155 - let buf = data.receive().await; 172 + let (timestamp, buf) = data.receive().await; 156 173 157 174 let new_avg = 158 175 analysis::analyse_buffer_by_stepped_windows(buf.as_slice(), average, peaks); ··· 165 182 .set(self.state.strikes.get().saturating_add(32)); 166 183 167 184 if let Some(net_data) = self.get_data_channel() { 168 - data::transmit_strike(buf.as_slice(), peaks.as_slice(), average, net_data); 185 + data::transmit_strike( 186 + *timestamp, 187 + buf.as_slice(), 188 + peaks.as_slice(), 189 + average, 190 + net_data, 191 + ); 169 192 } 170 193 171 194 info!( ··· 184 207 } 185 208 } 186 209 187 - async fn tick(&self) { 210 + async fn tick(&self, rtc: GlobalRtc<'static>) { 188 211 let mut inactive: Option<Instant> = None; 189 212 let mut interval = Ticker::every(Duration::from_secs(1)); 190 213 ··· 212 235 } 213 236 None => { 214 237 if let Some(net_data) = self.get_data_channel() { 215 - data::transmit_level_update(decay, net_data); 238 + let now = unwrap!(rtc.get_timestamp().await, "Unable to get timestamp"); 239 + data::transmit_level_update(now.and_utc().timestamp(), decay, net_data); 216 240 } 217 241 } 218 242 _ => continue, ··· 220 244 } 221 245 } 222 246 223 - fn get_data_channel(&self) -> Option<RefMut<'_, NetDataSender>> { 247 + fn get_data_channel(&self) -> Option<&NetDataSender> { 224 248 let can_send_data = DEVICE_STATE.lock(|x| x.get() == DeviceState::Connected); 225 249 226 - can_send_data 227 - .then(|| self.net_data.try_borrow_mut().ok()) 228 - .flatten() 250 + can_send_data.then_some(&self.net_data) 229 251 } 230 252 }
+5 -1
src/detector/analysis.rs
··· 6 6 /// needs to be a consistent drop over more than the threshold over a minimum amount of time. 7 7 const BLIP_THRESHOLD: u16 = 5; 8 8 9 - pub(super) fn analyse_buffer_by_stepped_windows(buf: &[u16], average: u16, peaks: &mut Vec<u16>) -> u16 { 9 + pub(super) fn analyse_buffer_by_stepped_windows( 10 + buf: &[u16], 11 + average: u16, 12 + peaks: &mut Vec<u16>, 13 + ) -> u16 { 10 14 const CHUNK_SIZE: usize = BLOCK_SIZE / 32; 11 15 const CHUNK_STEP: usize = CHUNK_SIZE / 2; 12 16
+8 -5
src/detector/data.rs
··· 1 - use core::cell::RefMut; 2 - 3 1 use crate::updates::NetDataSender; 4 2 5 - pub(super) fn transmit_level_update(warn_level: u16, net_data: RefMut<'_, NetDataSender>) { 3 + pub(super) fn transmit_level_update(timestamp: i64, warn_level: u16, net_data: &NetDataSender) { 6 4 net_data 7 - .try_send(crate::updates::Update::Warning(warn_level)) 5 + .try_send(crate::updates::Update::Warning { 6 + timestamp, 7 + level: warn_level, 8 + }) 8 9 .ok(); 9 10 } 10 11 11 12 pub(super) fn transmit_strike( 13 + timestamp: i64, 12 14 samples: &[u16], 13 15 peaks: &[u16], 14 16 average: u16, 15 - net_data: RefMut<'_, NetDataSender>, 17 + net_data: &NetDataSender, 16 18 ) { 17 19 net_data 18 20 .try_send(crate::updates::Update::Strike { 21 + timestamp, 19 22 peaks: peaks.to_vec(), 20 23 samples: samples.to_vec(), 21 24 average,
+1 -1
src/main.rs
··· 86 86 87 87 EXECUTOR1.init_with(Executor::new).run(|spawner| { 88 88 info!("Spawning Detector task"); 89 - spawner.must_spawn(detector::detector_task(adc, pwm)); 89 + spawner.must_spawn(detector::detector_task(adc, pwm, rtc)); 90 90 }) 91 91 }); 92 92
+5 -1
src/net.rs
··· 10 10 use sachy_mdns::{GROUP_ADDR_V4, GROUP_SOCK_V4, MDNS_PORT, MdnsAction, MdnsService, Service}; 11 11 use sachy_sntp::SntpSocket; 12 12 13 - use crate::{rtc::GlobalRtc, state::DEVICE_STATE, updates::{NetDataReceiver, NET_CHANNEL}}; 13 + use crate::{ 14 + rtc::GlobalRtc, 15 + state::DEVICE_STATE, 16 + updates::{NET_CHANNEL, NetDataReceiver}, 17 + }; 14 18 15 19 #[embassy_executor::task] 16 20 pub async fn udp_stack(stack: embassy_net::Stack<'static>, rtc: GlobalRtc<'static>) {
+2 -1
src/pwm.rs
··· 1 1 use embassy_rp::{ 2 - pwm::{ChannelBPin, Config as PwmConfig, Pwm, Slice}, Peri, 2 + Peri, 3 + pwm::{ChannelBPin, Config as PwmConfig, Pwm, Slice}, 3 4 }; 4 5 5 6 pub struct PwmDriver<'device> {
+16 -3
src/rtc.rs
··· 1 1 #![allow(dead_code)] 2 2 3 - use chrono::{Datelike, Timelike}; 3 + use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike}; 4 4 use embassy_rp::{ 5 5 peripherals, 6 6 rtc::{DateTime, DateTimeError, DayOfWeek, Rtc, RtcError}, ··· 30 30 rtc.is_running() 31 31 } 32 32 33 - pub async fn get_timestamp(&self) -> Result<DateTime, RtcError> { 33 + pub async fn get_timestamp(&self) -> Result<NaiveDateTime, RtcError> { 34 34 let rtc = self.0.lock().await; 35 35 36 - rtc.now() 36 + let now = rtc.now()?; 37 + 38 + let date = unwrap!(NaiveDate::from_ymd_opt( 39 + now.year as i32, 40 + now.month as u32, 41 + now.day as u32 42 + )); 43 + let time = unwrap!(NaiveTime::from_hms_opt( 44 + now.hour as u32, 45 + now.minute as u32, 46 + now.second as u32 47 + )); 48 + 49 + Ok(NaiveDateTime::new(date, time)) 37 50 } 38 51 39 52 pub async fn set_rtc_datetime(&self, timestamp: SntpTimestamp) -> Result<(), RtcError> {
+1 -1
src/state.rs
··· 11 11 pub enum DeviceState { 12 12 Disconnected, 13 13 Connected, 14 - } 14 + }
+5 -1
src/updates.rs
··· 10 10 11 11 #[derive(Debug, serde::Serialize)] 12 12 pub enum Update { 13 - Warning(u16), 13 + Warning { 14 + timestamp: i64, 15 + level: u16, 16 + }, 14 17 Strike { 18 + timestamp: i64, 15 19 peaks: Vec<u16>, 16 20 samples: Vec<u16>, 17 21 average: u16,
+3 -3
src/utils.rs
··· 14 14 Ok(buffer) 15 15 } 16 16 17 - pub fn try_static_block_vecs<T: Default + Copy>( 17 + pub fn try_static_timestamped_block_vecs<T: Default>( 18 18 block_num: usize, 19 19 block_capacity: usize, 20 - ) -> Result<&'static mut [Vec<T>], TryReserveError> { 20 + ) -> Result<&'static mut [(i64, Vec<T>)], TryReserveError> { 21 21 let mut blocks = Vec::new(); 22 22 23 23 blocks.try_reserve_exact(block_num)?; ··· 25 25 for _ in 0..block_num { 26 26 let block = try_buffer(block_capacity)?; 27 27 28 - blocks.push(block); 28 + blocks.push((0, block)); 29 29 } 30 30 31 31 Ok(blocks.leak())