A RPi Pico powered Lightning Detector
1use alloc::vec;
2use embassy_futures::select::{select, select3};
3use embassy_net::{
4 dns,
5 tcp::TcpSocket,
6 udp::{PacketMetadata, UdpSocket},
7};
8use embassy_time::{Duration, Timer, WithTimeout};
9use sachy_fmt::{error, info, unwrap};
10use sachy_mdns::{
11 GROUP_ADDR_V4, GROUP_SOCK_V4, MDNS_PORT,
12 service::{MdnsService, Service},
13 state::MdnsAction,
14};
15use sachy_sntp::SntpSocket;
16
17use crate::{
18 constants::{HOST_NAME, HOST_PORT},
19 rtc::GlobalRtc,
20 updates::{NetDataReceiver, UpdateConnection},
21 utils::try_static_buffer_with,
22};
23
24#[embassy_executor::task]
25pub async fn udp_stack(stack: embassy_net::Stack<'static>, rtc: GlobalRtc<'static>) {
26 let rx_meta = unwrap!(try_static_buffer_with(16, || PacketMetadata::EMPTY));
27 let tx_meta = unwrap!(try_static_buffer_with(16, || PacketMetadata::EMPTY));
28 let rx_buffer = unwrap!(try_static_buffer_with(4096, Default::default));
29 let tx_buffer = unwrap!(try_static_buffer_with(4096, Default::default));
30
31 let mut udp = UdpSocket::new(stack, rx_meta, rx_buffer, tx_meta, tx_buffer);
32
33 stack.wait_config_up().await;
34
35 unwrap!(stack.join_multicast_group(GROUP_ADDR_V4));
36
37 let mut service = MdnsService::new(Service::new(
38 "_picostrike._tcp.local",
39 HOST_NAME,
40 HOST_NAME,
41 stack
42 .config_v4()
43 .map(|config| config.address.address().into()),
44 HOST_PORT,
45 ));
46
47 loop {
48 stack.wait_config_up().await;
49
50 if !rtc.is_ready().await {
51 sntp_loop(&mut udp, stack, rtc).await;
52 }
53
54 select3(
55 stack.wait_link_down(),
56 mdns_loop(&mut service, &mut udp),
57 rtc.wait_for_reset(),
58 )
59 .await;
60
61 udp.close();
62 }
63}
64
65async fn sntp_loop<'device>(
66 udp: &mut UdpSocket<'device>,
67 stack: embassy_net::Stack<'device>,
68 rtc: GlobalRtc<'static>,
69) {
70 loop {
71 let Ok(addr) = stack.dns_query("pool.ntp.org", dns::DnsQueryType::A).await else {
72 error!("Failed to query DNS for an NTP server. Retrying...");
73 continue;
74 };
75
76 match udp.resolve_time(addr.as_slice()).await {
77 Ok(time) => {
78 unwrap!(rtc.set_rtc_datetime(time).await);
79 break;
80 }
81 Err(e) => error!("Failed to resolve SNTP time: {}", e),
82 }
83 }
84}
85
86async fn mdns_loop<'device>(service: &mut MdnsService, udp: &mut UdpSocket<'device>) {
87 unwrap!(udp.bind(MDNS_PORT));
88
89 let mut send_buf = vec![0u8; 2048];
90
91 loop {
92 let ready = match service.next_action() {
93 MdnsAction::Announce => service.send_announcement(&mut send_buf),
94
95 MdnsAction::ListenFor { timeout } => udp
96 .recv_from_with(|buf, _from| service.listen_for_queries(buf, &mut send_buf))
97 .with_timeout(timeout)
98 .await
99 .ok()
100 .flatten(),
101
102 MdnsAction::WaitFor { duration } => {
103 Timer::after(duration).await;
104
105 None
106 }
107 };
108
109 if let Some(bytes) = ready
110 && udp.send_to(bytes, GROUP_SOCK_V4).await.is_ok()
111 {
112 udp.flush().await;
113 }
114 }
115}
116
117#[embassy_executor::task]
118pub async fn tcp_stack(stack: embassy_net::Stack<'static>) {
119 let rx_buffer = unwrap!(try_static_buffer_with(4096, Default::default));
120 let tx_buffer = unwrap!(try_static_buffer_with(4096, Default::default));
121 let mut tcp = TcpSocket::new(stack, rx_buffer, tx_buffer);
122 let net_data = UpdateConnection::get_receiver();
123
124 tcp.set_keep_alive(Some(Duration::from_secs(1)));
125 tcp.set_timeout(Some(Duration::from_secs(10)));
126
127 loop {
128 stack.wait_config_up().await;
129
130 select(stack.wait_link_down(), data_loop(&mut tcp, &net_data)).await;
131
132 UpdateConnection::disconnect();
133 }
134}
135
136async fn data_loop<'connection>(tcp: &mut TcpSocket<'connection>, net_data: &NetDataReceiver) {
137 loop {
138 UpdateConnection::disconnect();
139
140 if tcp.accept(HOST_PORT).await.is_err() {
141 continue;
142 }
143
144 info!("Connected!");
145 UpdateConnection::connect();
146
147 'inner: loop {
148 let data = net_data.receive().await;
149
150 if !tcp.may_send() {
151 // Clear backlog, no point in keeping the updates
152 // if there is nothing to send the updates to
153 net_data.clear();
154 break 'inner;
155 }
156
157 if tcp
158 .write_with(|buf| {
159 let written = unwrap!(postcard::to_slice(
160 &striker_proto::StrikerResponse::Update(data),
161 buf
162 ));
163
164 (written.len(), ())
165 })
166 .await
167 .is_err()
168 {
169 break 'inner;
170 }
171
172 if tcp.flush().await.is_err() {
173 break 'inner;
174 }
175 }
176
177 info!("DISCONNECT");
178 }
179}