use core::net::IpAddr; use std::{ io::{Read, Write}, net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpStream, UdpSocket}, time::Duration, }; use async_channel::{Receiver, Sender}; use async_io::{Async, Timer}; use bevy::{ app::{Plugin, Startup}, ecs::{error::Result, resource::Resource, system::Commands}, tasks::IoTaskPool, }; use futures_concurrency::future::Race; use sachy_mdns::{ GROUP_SOCK_V4, MDNS_PORT, client::query_service, dns::{ records::{QType, Record}, reqres::Response, traits::DnsParse, }, }; use socket2::{Domain, Protocol, Socket, Type}; #[derive(Debug, Resource)] pub struct DiscoverResponse(pub Receiver); pub enum StrikeUpdateState { Disconnected, Connecting, Connected, Updating(striker_proto::StrikerResponse), } #[derive(Debug, Resource)] pub struct StrikeUpdates(pub Receiver); #[derive(Debug, Resource)] pub struct StrikeRequests(pub Sender); #[derive(Debug, Resource)] pub struct StrikeActions(pub Sender); pub enum StrikeAction { Connect(SocketAddr), Disconnect, } pub struct InstanceDetails { pub host: String, pub address: String, pub port: u16, pub ip: IpAddr, } #[derive(Debug, Resource)] pub struct MdnsSignaler(pub Sender<()>); fn create_mdns_socket() -> std::io::Result> { let sock = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; sock.set_reuse_address(true)?; sock.set_nonblocking(true)?; sock.bind(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MDNS_PORT)).into())?; let udp_socket = UdpSocket::from(sock); Async::new_nonblocking(udp_socket) } pub fn setup_mdns_task(mut commands: Commands) -> Result { let io = IoTaskPool::get(); let (signal_tx, signal_rx) = async_channel::bounded(1); let (resp_tx, resp_rx) = async_channel::bounded(64); let udp_socket = create_mdns_socket()?; io.spawn(async move { let mut buf = vec![0u8; 1028]; let mut query_buf = vec![0u8; 128]; let query = query_service("_picostrike._tcp.local", &mut query_buf).unwrap(); while signal_rx.recv().await.is_ok() { let send_fut = async { // Retry three times in case packets get lost coz UDP things for _ in 0..3 { udp_socket.send_to(query, GROUP_SOCK_V4).await.ok(); Timer::after(Duration::from_millis(250)).await; } }; let recv_fut = async { while let Ok((read, _)) = udp_socket.recv_from(&mut buf).await { let input = &buf[..read]; let Ok(resp) = Response::parse(&mut &*input, input) else { continue; }; if resp.answers.iter().any(|answer| { answer.atype == QType::PTR && answer.name == "_picostrike._tcp.local" }) && let Some(ip) = resp.additional.iter().find_map(|answer| match &answer .record { Record::A(a) => Some(IpAddr::V4(a.address)), Record::AAAA(aaaa) => Some(IpAddr::V6(aaaa.address)), _ => None, }) && let Some((name, srv)) = resp.additional.iter().find_map(|answer| { if let Record::SRV(srv) = &answer.record { Some((answer.name, srv)) } else { None } }) { resp_tx .send(InstanceDetails { host: name.to_string(), address: srv.target.to_string(), port: srv.port, ip, }) .await .ok(); } } }; let timer = async { Timer::after(Duration::from_millis(1000)).await; }; let cancel = async { signal_rx.recv().await.ok(); }; (send_fut, recv_fut, timer, cancel).race().await; } }) .detach(); commands.insert_resource(DiscoverResponse(resp_rx)); commands.insert_resource(MdnsSignaler(signal_tx)); Ok(()) } pub fn setup_strike_connection(mut commands: Commands) { let io = IoTaskPool::get(); let (signal_tx, signal_rx) = async_channel::bounded(2); let (req_tx, req_rx) = async_channel::bounded(1); let (resp_tx, resp_rx) = async_channel::bounded(64); io.spawn(async move { let mut read_buf = vec![0u8; 4096]; let mut write_buf = vec![0u8; 4096]; while let Ok(StrikeAction::Connect(addr)) = signal_rx.recv().await { let net_fut = async { loop { resp_tx.send(StrikeUpdateState::Connecting).await.ok(); let Ok(stream) = Async::::connect(addr).await else { Timer::after(Duration::from_secs(1)).await; continue; }; resp_tx.send(StrikeUpdateState::Connected).await.ok(); stream.write_with(|s| s.set_nodelay(true)).await.ok(); let read_fut = async { while let Ok(read) = stream.read_with(|mut a| a.read(&mut read_buf)).await { let Ok(data) = striker_proto::receive_response(&mut read_buf[..read]) else { continue; }; if resp_tx.send(StrikeUpdateState::Updating(data)).await.is_err() { break; } } }; let write_fut = async { while let Ok(req) = req_rx.recv().await { let Ok(payload) = striker_proto::send_request(req, &mut write_buf) else { continue; }; if stream.write_with(|mut s| s.write(payload)).await.is_err() { break; } } }; (read_fut, write_fut).race().await; stream .write_with(|s| s.shutdown(std::net::Shutdown::Both)) .await .ok(); break; } }; let cancel_fut = async { while signal_rx .recv() .await .is_ok_and(|strike| !matches!(strike, StrikeAction::Disconnect)) { } }; (net_fut, cancel_fut).race().await; resp_tx.send(StrikeUpdateState::Disconnected).await.ok(); } }) .detach(); commands.insert_resource(StrikeActions(signal_tx)); commands.insert_resource(StrikeUpdates(resp_rx)); commands.insert_resource(StrikeRequests(req_tx)); } pub struct NetPlugin; impl Plugin for NetPlugin { fn build(&self, app: &mut bevy::app::App) { app.add_systems(Startup, (setup_mdns_task, setup_strike_connection)); } }