use std::{collections::{HashMap, VecDeque}, io::{Read, Write}, net::{Shutdown, SocketAddr, TcpStream, UdpSocket}, sync::{Arc, Mutex, RwLock}, thread}; use bevy::ecs::component::Component; use felix_audio::voice::decoder::VoiceChatDecoder; use tokio::sync::broadcast::{self, Receiver, Sender}; use felix_net::{buffer::Buffer, packet::{self, Packet, PacketTypes}, packets::{link_udp::LinkUDP, notify_connection_info::NotifyConnectionInfo}}; use crate::net::NetClientCommand; #[derive(Component)] pub struct Connection{ tcp: TcpStream, udp: UdpSocket, udp_server_address: SocketAddr, net_send: Sender, net_recv: Receiver, voice_queues: Arc>>, pub id: String, killed_signal: broadcast::Sender<()> } impl Connection{ pub fn new( stream: TcpStream, udp: UdpSocket, udp_server_address: SocketAddr ) -> Self{ let ( event_sender, event_recv ) = broadcast::channel(32); let ( killed_signal_send, killed_signal ) = broadcast::channel(32); let mut conn = Self { tcp: stream, udp, udp_server_address, net_send: event_sender.clone(), net_recv: event_recv, voice_queues: Arc::new(RwLock::new(HashMap::new())), id: "".to_owned(), killed_signal: killed_signal_send }; conn.start_listener_thread(event_sender, killed_signal).unwrap(); let packet = NotifyConnectionInfo { id: conn.id.clone() }; conn.send_reliable(packet).unwrap(); conn } fn start_listener_thread(&self, cmd: Sender, mut signal: Receiver<()>) -> anyhow::Result<()>{ let mut tcp = self.tcp.try_clone()?; let udp = self.udp.try_clone()?; let udp_1 = self.udp.try_clone()?; let cmd_1 = cmd.clone(); let srv_addr = self.udp_server_address.clone(); let voice_queues = self.voice_queues.clone(); let mut signal_1 = signal.resubscribe(); thread::spawn(move || { // UDP RECV THREAD let mut buf = [0; 1024]; while let Ok((length, addr)) = udp_1.recv_from(&mut buf){ if addr != srv_addr{ continue; } if signal_1.try_recv().is_ok(){ break; } let mut msg: Buffer = (&buf[0..length]).into(); while msg.left() > 0{ let packet = packet::parse(&mut msg); match packet{ PacketTypes::PlayerVoicePacket(packet) => { let voices = voice_queues.try_read().unwrap(); if let Some(decoder) = voices.get(&packet.id){ decoder.decode(packet.packet); } }, _ => { cmd_1.send(NetClientCommand::RecvPacket(packet)).unwrap(); } } } } }); thread::spawn(move || { // TCP RECV THREAD let mut buf = [0; 1024]; while let Ok(length) = tcp.read(&mut buf){ if length == 0 { break; } if signal.try_recv().is_ok(){ break; } let mut msg: Buffer = (&buf[0..length]).into(); while msg.left() > 0{ let packet = packet::parse(&mut msg); match packet{ PacketTypes::NotifyConnectionInfo(info) => { cmd.send(NetClientCommand::Connected(info.id.clone())).unwrap(); let packet = LinkUDP { id: info.id }; let packet: Vec<_> = packet.to_buf().into(); udp.send_to(&packet, srv_addr).unwrap(); }, _ => { cmd.send(NetClientCommand::RecvPacket(packet)).unwrap(); } } } } cmd.send(NetClientCommand::Disconnected).unwrap(); }); Ok(()) } pub fn get_remote_player_voice_ids( &self ) -> Vec{ let voices = self.voice_queues.read().unwrap(); voices.iter().map(| x | x.0.clone()).collect() } pub fn get_remote_player_voice_rms( &self, id: &String ) -> f32{ let voices = self.voice_queues.read().unwrap(); let decoder = voices.get(id).unwrap(); decoder.get_last_rms() } pub fn start_listening_for_player_voice( &self, id: &String ) -> Arc>>{ let voice_queue = Arc::new(Mutex::new(VecDeque::new())); let decoder = VoiceChatDecoder::new(voice_queue.clone()); let mut voices = self.voice_queues.write().unwrap(); voices.insert(id.clone(), decoder); voice_queue } pub fn stop_listening_for_player_voice( &self, id: &String ){ let mut voices = self.voice_queues.write().unwrap(); voices.remove(id); } pub fn recv_event(&mut self) -> anyhow::Result{ Ok(self.net_recv.try_recv()?) } pub fn send_reliable(&mut self, packet: impl Packet) -> anyhow::Result<()>{ let buf: Vec = packet.to_buf().into(); self.tcp.write(&buf)?; Ok(()) } pub fn try_send_unreliable(&mut self, packet: impl Packet) -> anyhow::Result<()>{ let buf: Vec = packet.to_buf().into(); self.udp.send_to(&buf, self.udp_server_address)?; Ok(()) } } impl Drop for Connection{ fn drop(&mut self) { println!("Killing connection to: {}", self.udp_server_address); self.net_send.send(NetClientCommand::Disconnected).unwrap(); self.killed_signal.send(()).unwrap(); self.tcp.shutdown(Shutdown::Both).unwrap(); } }