A RPi Pico powered Lightning Detector
at main 5.0 kB view raw
1use alloc::vec; 2use embassy_futures::select::{select, select3}; 3use embassy_net::{ 4 dns, 5 tcp::TcpSocket, 6 udp::{PacketMetadata, UdpSocket}, 7}; 8use embassy_time::{Duration, Timer, WithTimeout}; 9use sachy_fmt::{error, info, unwrap}; 10use sachy_mdns::{ 11 GROUP_ADDR_V4, GROUP_SOCK_V4, MDNS_PORT, 12 service::{MdnsService, Service}, 13 state::MdnsAction, 14}; 15use sachy_sntp::SntpSocket; 16 17use crate::{ 18 constants::{HOST_NAME, HOST_PORT}, 19 rtc::GlobalRtc, 20 updates::{NetDataReceiver, UpdateConnection}, 21 utils::try_static_buffer_with, 22}; 23 24#[embassy_executor::task] 25pub async fn udp_stack(stack: embassy_net::Stack<'static>, rtc: GlobalRtc<'static>) { 26 let rx_meta = unwrap!(try_static_buffer_with(16, || PacketMetadata::EMPTY)); 27 let tx_meta = unwrap!(try_static_buffer_with(16, || PacketMetadata::EMPTY)); 28 let rx_buffer = unwrap!(try_static_buffer_with(4096, Default::default)); 29 let tx_buffer = unwrap!(try_static_buffer_with(4096, Default::default)); 30 31 let mut udp = UdpSocket::new(stack, rx_meta, rx_buffer, tx_meta, tx_buffer); 32 33 stack.wait_config_up().await; 34 35 unwrap!(stack.join_multicast_group(GROUP_ADDR_V4)); 36 37 let mut service = MdnsService::new(Service::new( 38 "_picostrike._tcp.local", 39 HOST_NAME, 40 HOST_NAME, 41 stack 42 .config_v4() 43 .map(|config| config.address.address().into()), 44 HOST_PORT, 45 )); 46 47 loop { 48 stack.wait_config_up().await; 49 50 if !rtc.is_ready().await { 51 sntp_loop(&mut udp, stack, rtc).await; 52 } 53 54 select3( 55 stack.wait_link_down(), 56 mdns_loop(&mut service, &mut udp), 57 rtc.wait_for_reset(), 58 ) 59 .await; 60 61 udp.close(); 62 } 63} 64 65async fn sntp_loop<'device>( 66 udp: &mut UdpSocket<'device>, 67 stack: embassy_net::Stack<'device>, 68 rtc: GlobalRtc<'static>, 69) { 70 loop { 71 let Ok(addr) = stack.dns_query("pool.ntp.org", dns::DnsQueryType::A).await else { 72 error!("Failed to query DNS for an NTP server. Retrying..."); 73 continue; 74 }; 75 76 match udp.resolve_time(addr.as_slice()).await { 77 Ok(time) => { 78 unwrap!(rtc.set_rtc_datetime(time).await); 79 break; 80 } 81 Err(e) => error!("Failed to resolve SNTP time: {}", e), 82 } 83 } 84} 85 86async fn mdns_loop<'device>(service: &mut MdnsService, udp: &mut UdpSocket<'device>) { 87 unwrap!(udp.bind(MDNS_PORT)); 88 89 let mut send_buf = vec![0u8; 2048]; 90 91 loop { 92 let ready = match service.next_action() { 93 MdnsAction::Announce => service.send_announcement(&mut send_buf), 94 95 MdnsAction::ListenFor { timeout } => udp 96 .recv_from_with(|buf, _from| service.listen_for_queries(buf, &mut send_buf)) 97 .with_timeout(timeout) 98 .await 99 .ok() 100 .flatten(), 101 102 MdnsAction::WaitFor { duration } => { 103 Timer::after(duration).await; 104 105 None 106 } 107 }; 108 109 if let Some(bytes) = ready 110 && udp.send_to(bytes, GROUP_SOCK_V4).await.is_ok() 111 { 112 udp.flush().await; 113 } 114 } 115} 116 117#[embassy_executor::task] 118pub async fn tcp_stack(stack: embassy_net::Stack<'static>) { 119 let rx_buffer = unwrap!(try_static_buffer_with(4096, Default::default)); 120 let tx_buffer = unwrap!(try_static_buffer_with(4096, Default::default)); 121 let mut tcp = TcpSocket::new(stack, rx_buffer, tx_buffer); 122 let net_data = UpdateConnection::get_receiver(); 123 124 tcp.set_keep_alive(Some(Duration::from_secs(1))); 125 tcp.set_timeout(Some(Duration::from_secs(10))); 126 127 loop { 128 stack.wait_config_up().await; 129 130 select(stack.wait_link_down(), data_loop(&mut tcp, &net_data)).await; 131 132 UpdateConnection::disconnect(); 133 } 134} 135 136async fn data_loop<'connection>(tcp: &mut TcpSocket<'connection>, net_data: &NetDataReceiver) { 137 loop { 138 UpdateConnection::disconnect(); 139 140 if tcp.accept(HOST_PORT).await.is_err() { 141 continue; 142 } 143 144 info!("Connected!"); 145 UpdateConnection::connect(); 146 147 'inner: loop { 148 let data = net_data.receive().await; 149 150 if !tcp.may_send() { 151 // Clear backlog, no point in keeping the updates 152 // if there is nothing to send the updates to 153 net_data.clear(); 154 break 'inner; 155 } 156 157 if tcp 158 .write_with(|buf| { 159 let written = unwrap!(postcard::to_slice( 160 &striker_proto::StrikerResponse::Update(data), 161 buf 162 )); 163 164 (written.len(), ()) 165 }) 166 .await 167 .is_err() 168 { 169 break 'inner; 170 } 171 172 if tcp.flush().await.is_err() { 173 break 'inner; 174 } 175 } 176 177 info!("DISCONNECT"); 178 } 179}