A multiplayer VR framework w/voice chat

add debug information about voice chat

phaz.uk ea5e764f a8027fc6

verified
Changed files
+207 -55
audio
client
server
+6 -13
audio/src/lib.rs
··· 1 1 use bevy::{app::{App, Plugin, Startup, Update}, ecs::{component::Component, system::{Commands, Query}}, math::{Quat, Vec3}, transform::components::Transform}; 2 - use cpal::{BufferSize, SampleRate, StreamConfig}; 3 - use kira::{AudioManager, AudioManagerSettings, DefaultBackend, Tween, backend::cpal::CpalBackendSettings, listener::ListenerHandle}; 2 + use kira::{AudioManager, AudioManagerSettings, DefaultBackend, Tween, listener::ListenerHandle}; 4 3 5 4 use crate::source::FelixAudioSource; 6 5 ··· 12 11 pub mod source; 13 12 pub mod utils; 14 13 pub mod voice; 14 + 15 + // TODO: Make this use HRTF at some point 16 + // Maybe revisit steam audio? 17 + // Or just find some other library that does HRTFs. 15 18 16 19 fn update_audio_system( 17 20 mut audio_system: Query<&mut FelixAudioComponent>, ··· 47 50 impl Plugin for FelixAudio{ 48 51 fn build(&self, app: &mut App) { 49 52 app.add_systems(Startup, move | mut commands: Commands | { 50 - let mut settings = AudioManagerSettings::default(); 51 - settings.backend_settings = CpalBackendSettings { 52 - config: Some(StreamConfig { 53 - channels: 2, 54 - sample_rate: SampleRate(48_000), 55 - buffer_size: BufferSize::Fixed(512) 56 - }), 57 - ..Default::default() 58 - }; 59 - 60 - let mut manager = AudioManager::<DefaultBackend>::new(settings).unwrap(); 53 + let mut manager = AudioManager::<DefaultBackend>::new(AudioManagerSettings::default()).unwrap(); 61 54 let main_listener = manager.add_listener(Vec3::default(), Quat::default()).unwrap(); 62 55 63 56 let handle = FelixAudioComponent { manager, main_listener };
+18 -3
audio/src/voice/decoder.rs
··· 1 - use std::{collections::VecDeque, sync::{Arc, Mutex, mpsc::{Sender, channel}}, thread}; 1 + use std::{collections::VecDeque, sync::{Arc, Mutex, RwLock, mpsc::{Sender, channel}}, thread}; 2 2 3 3 use opus::{Channels, Decoder}; 4 4 5 5 use crate::{MONO_20MS, SAMPLE_RATE}; 6 6 7 7 pub struct VoiceChatDecoder{ 8 - stream_in: Sender<Vec<u8>> 8 + stream_in: Sender<Vec<u8>>, 9 + last_rms: Arc<RwLock<f32>> 9 10 } 10 11 11 12 impl VoiceChatDecoder{ ··· 13 14 let mut decoder = Decoder::new(SAMPLE_RATE as u32, Channels::Mono).unwrap(); 14 15 let ( stream_in, stream_out ) = channel(); 15 16 17 + let set_rms = Arc::new(RwLock::new(0.0)); 18 + let set_rms_1 = set_rms.clone(); 19 + 16 20 thread::spawn(move || { 17 21 let mut buffer = [0.0; MONO_20MS]; 18 22 ··· 20 24 let packet: Vec<u8> = stream_out.recv().unwrap(); 21 25 decoder.decode_float(&packet, &mut buffer, false).unwrap(); 22 26 27 + let mut total = 0.0; 28 + for sample in buffer{ total += sample.powi(2) } 29 + *set_rms.write().unwrap() = ( total / buffer.len() as f32 ).sqrt(); 30 + 23 31 let mut voice = queue.lock().unwrap(); 24 32 for sample in buffer{ voice.push_back(sample); } 25 33 } 26 34 }); 27 35 28 - Self { stream_in } 36 + Self { 37 + stream_in, 38 + last_rms: set_rms_1 39 + } 29 40 } 30 41 31 42 pub fn decode( &self, packet: Vec<u8> ){ 32 43 self.stream_in.send(packet).unwrap(); 44 + } 45 + 46 + pub fn get_last_rms( &self ) -> f32{ 47 + *self.last_rms.read().unwrap() 33 48 } 34 49 }
+35 -4
audio/src/voice/microphone.rs
··· 1 - use std::{env, net::{ToSocketAddrs, UdpSocket}, sync::{Arc, Mutex}}; 1 + use std::{env, net::{ToSocketAddrs, UdpSocket}, sync::{Arc, Mutex, RwLock}}; 2 2 3 3 use bevy::ecs::component::Component; 4 4 use cpal::{BufferSize, SampleRate, Stream, StreamConfig, traits::{DeviceTrait, HostTrait, StreamTrait}}; ··· 11 11 pub struct VoiceChatMicrophone{ 12 12 stream: Option<Stream>, 13 13 udp: Option<UdpSocket>, 14 - muted: Arc<Mutex<bool>> 14 + muted: Arc<Mutex<bool>>, 15 + last_rms: Arc<RwLock<isize>> 15 16 } 16 17 17 18 impl VoiceChatMicrophone{ ··· 19 20 Self { 20 21 stream: None, 21 22 udp: Some(socket), 22 - muted: Arc::new(Mutex::new(false)) 23 + muted: Arc::new(Mutex::new(false)), // TODO: Default to muted 24 + last_rms: Arc::new(RwLock::new(0)) 23 25 } 24 26 } 25 27 ··· 33 35 let mut encoder = Encoder::new(SAMPLE_RATE as u32, Channels::Mono, Application::Voip)?; 34 36 35 37 let host = cpal::default_host(); 36 - let mic = host.default_input_device().unwrap(); 38 + 39 + let mut i = 0; 40 + let devices = host.input_devices().unwrap(); 41 + for device in devices{ 42 + println!("{}) {:?}", i, device.name()); 43 + i += 1; 44 + } 45 + 46 + let mut devices = host.input_devices().unwrap(); 47 + let mic = if let Ok(device_index) = env::var("MIC_INDEX"){ 48 + devices.nth(device_index.parse()?).unwrap() 49 + } else{ 50 + host.default_input_device().unwrap() 51 + }; 52 + 53 + println!("Using Device: {:?}", mic.name()); 37 54 38 55 let mut output = [0; 512]; 39 56 ··· 45 62 46 63 let addr = env::var("HOST")?.to_socket_addrs()?.nth(0).unwrap(); 47 64 65 + for conf in mic.supported_input_configs().unwrap(){ 66 + println!("{} {:?} {} {:?}", conf.channels(), conf.buffer_size(), conf.sample_format(), conf.with_max_sample_rate()); 67 + } 68 + 69 + let set_rms = self.last_rms.clone(); 70 + 48 71 let stream = mic.build_input_stream( 49 72 &StreamConfig { 50 73 channels: 1, ··· 63 86 64 87 if buffer_indx >= MONO_20MS{ 65 88 buffer_indx = 0; 89 + 90 + let mut total = 0; 91 + for sample in buffer{ total += (sample as isize).pow(2) } 92 + *set_rms.write().unwrap() = ( total / buffer.len() as isize ).isqrt(); 66 93 67 94 let len = encoder.encode(&buffer, &mut output).unwrap(); 68 95 let buf_to_send = output[0..len].to_vec(); ··· 86 113 self.stream = Some(stream); 87 114 88 115 Ok(()) 116 + } 117 + 118 + pub fn get_rms_of_last_packet( &self ) -> f32{ 119 + *self.last_rms.read().unwrap() as f32 / i16::MAX as f32 89 120 } 90 121 }
+60
client/src/components/debug_camera.rs
··· 1 1 use std::f32::consts::PI; 2 2 3 3 use bevy::{ input::mouse::MouseMotion, prelude::* }; 4 + use felix_audio::voice::microphone::VoiceChatMicrophone; 5 + 6 + use crate::net::connection::Connection; 4 7 5 8 #[derive(Component)] 6 9 pub struct DebugCamera{ ··· 25 28 keys: Res<ButtonInput<KeyCode>>, 26 29 mouse_buttons: Res<ButtonInput<MouseButton>>, 27 30 mut mouse_motion: MessageReader<MouseMotion>, 31 + 32 + // Debug Text 33 + mut debug_text: Query<(&mut Text, &DebugText)>, 34 + voice: Query<&VoiceChatMicrophone>, 35 + networking: Query<&Connection>, 28 36 ){ 37 + let net_manager = networking.single().unwrap(); 38 + 29 39 let mut delta_time = time.delta_secs(); 30 40 if keys.pressed(KeyCode::ShiftLeft){ delta_time *= 2.0; } 31 41 ··· 63 73 let dir = -transform.up(); 64 74 transform.translation += dir * delta_time; 65 75 } 76 + 77 + let mut remote_rms = "".to_owned(); 78 + for id in net_manager.get_remote_player_voice_ids(){ 79 + remote_rms += format!("{}: {:.4}\n", id, net_manager.get_remote_player_voice_rms(&id)).as_str(); 80 + } 81 + 82 + let rms = if let Ok(mic) = voice.single(){ 83 + mic.get_rms_of_last_packet() 84 + } else{ 85 + -1.0 86 + }; 87 + 88 + let ( mut text, _ ) = debug_text.single_mut().unwrap(); 89 + text.0 = format!( 90 + "Microphone RMS: {:.4} 91 + {}Position: {:.2} {:.2} {:.2} 92 + Rotation: {:.2} {:.2} {:.2} {:.2}", 93 + rms, 94 + 95 + remote_rms, 96 + 97 + transform.translation.x, 98 + transform.translation.y, 99 + transform.translation.z, 100 + 101 + transform.rotation.x, 102 + transform.rotation.y, 103 + transform.rotation.z, 104 + transform.rotation.w 105 + ); 106 + } 107 + 108 + #[derive(Component)] 109 + pub struct DebugText; 110 + 111 + pub fn setup( 112 + mut commands: Commands 113 + ){ 114 + commands.spawn(( 115 + Node { 116 + position_type: PositionType::Absolute, 117 + bottom: px(5.0), 118 + right: px(5.0), 119 + ..default() 120 + }, 121 + Text::new("Here is some text"), 122 + TextColor(Color::WHITE), 123 + TextLayout::new_with_justify(Justify::Right), 124 + DebugText 125 + )); 66 126 }
+1 -1
client/src/components/network_interface.rs
··· 88 88 let mut lock = listener.lock().unwrap(); 89 89 90 90 for i in 0..buf.len(){ 91 - buf[i] = lock.pop_front().unwrap(); } 91 + buf[i] = lock.pop_front().unwrap_or(0.0); } 92 92 93 93 buf 94 94 }))
+8 -4
client/src/main.rs
··· 8 8 mod net; 9 9 10 10 fn main() { 11 - dotenvy::dotenv().unwrap(); 11 + if let Err(err) = dotenvy::dotenv(){ println!("[WARN] .ENV failed loading {:#?}", err); } 12 12 13 13 App::new() 14 14 .add_plugins(( ··· 17 17 )) 18 18 .add_systems(Startup, setup) 19 19 .add_systems(Startup, move | mut commands: Commands |{ 20 - let ( comp, voice ) = net::handle_net().expect("Network Module Failed."); 21 - 20 + let ( comp, voice ) = net::handle_net().expect("Network Module Failed"); 22 21 commands.spawn(comp); 23 - commands.spawn(voice::init_microphone(voice).expect("Failed to start microphone.")); 22 + 23 + match voice::init_microphone(voice){ 24 + Ok(voice) => { commands.spawn(voice); }, 25 + Err(err) => println!("[WARN] Failed to start microphone: {}", err) 26 + } 24 27 }) 25 28 .add_systems(Update, debug_camera::update) 29 + .add_systems(Startup, debug_camera::setup) 26 30 .add_systems(Update, remote_player::update) 27 31 .add_systems(FixedUpdate, network_interface::fixed_update) 28 32 .run();
+52 -14
client/src/net/connection.rs
··· 1 - use std::{collections::{HashMap, VecDeque}, io::{Read, Write}, net::{SocketAddr, TcpStream, UdpSocket}, sync::{Arc, Mutex, RwLock}, thread}; 1 + use std::{collections::{HashMap, VecDeque}, io::{Read, Write}, net::{Shutdown, SocketAddr, TcpStream, UdpSocket}, sync::{Arc, Mutex, RwLock}, thread}; 2 2 use bevy::ecs::component::Component; 3 3 use felix_audio::voice::decoder::VoiceChatDecoder; 4 4 use tokio::sync::broadcast::{self, Receiver, Sender}; ··· 14 14 udp: UdpSocket, 15 15 udp_server_address: SocketAddr, 16 16 17 + net_send: Sender<NetClientCommand>, 17 18 net_recv: Receiver<NetClientCommand>, 18 19 19 20 voice_queues: Arc<RwLock<HashMap<String, VoiceChatDecoder>>>, 20 - pub id: String 21 + pub id: String, 22 + 23 + killed_signal: broadcast::Sender<()> 21 24 } 22 25 23 26 impl Connection{ 24 27 pub fn new( stream: TcpStream, udp: UdpSocket, udp_server_address: SocketAddr ) -> Self{ 25 28 let ( event_sender, event_recv ) = broadcast::channel(32); 29 + let ( killed_signal_send, killed_signal ) = broadcast::channel(32); 26 30 27 31 let mut conn = Self { 28 32 tcp: stream, ··· 30 34 udp, 31 35 udp_server_address, 32 36 37 + net_send: event_sender.clone(), 33 38 net_recv: event_recv, 34 39 35 40 voice_queues: Arc::new(RwLock::new(HashMap::new())), 36 - id: "".to_owned() 41 + id: "".to_owned(), 42 + 43 + killed_signal: killed_signal_send 37 44 }; 38 45 39 - conn.start_listener_thread(event_sender).unwrap(); 46 + conn.start_listener_thread(event_sender, killed_signal).unwrap(); 40 47 41 48 let packet = NotifyConnectionInfo { id: conn.id.clone() }; 42 49 conn.send_reliable(packet).unwrap(); ··· 44 51 conn 45 52 } 46 53 47 - fn start_listener_thread(&self, cmd: Sender<NetClientCommand>) -> anyhow::Result<()>{ 54 + fn start_listener_thread(&self, cmd: Sender<NetClientCommand>, mut signal: Receiver<()>) -> anyhow::Result<()>{ 48 55 let mut tcp = self.tcp.try_clone()?; 49 56 let udp = self.udp.try_clone()?; 50 57 ··· 54 61 let srv_addr = self.udp_server_address.clone(); 55 62 let voice_queues = self.voice_queues.clone(); 56 63 64 + let mut signal_1 = signal.resubscribe(); 65 + 57 66 thread::spawn(move || { // UDP RECV THREAD 58 67 let mut buf = [0; 1024]; 59 68 60 69 while let Ok((length, addr)) = udp_1.recv_from(&mut buf){ 61 70 if addr != srv_addr{ continue; } 71 + if signal_1.try_recv().is_ok(){ break; } 62 72 63 73 let mut msg: Buffer = (&buf[0..length]).into(); 64 74 65 75 while msg.left() > 0{ 66 76 let packet = packet::parse(&mut msg); 67 - cmd_1.send(NetClientCommand::RecvPacket(packet)).unwrap(); 77 + 78 + match packet{ 79 + PacketTypes::PlayerVoicePacket(packet) => { 80 + let voices = voice_queues.try_read().unwrap(); 81 + if let Some(decoder) = voices.get(&packet.id){ 82 + decoder.decode(packet.packet); } 83 + }, 84 + _ => { 85 + cmd_1.send(NetClientCommand::RecvPacket(packet)).unwrap(); 86 + } 87 + } 68 88 } 69 89 } 70 90 }); ··· 74 94 75 95 while let Ok(length) = tcp.read(&mut buf){ 76 96 if length == 0 { break; } 97 + if signal.try_recv().is_ok(){ break; } 77 98 78 99 let mut msg: Buffer = (&buf[0..length]).into(); 79 100 ··· 87 108 let packet = LinkUDP { id: info.id }; 88 109 let packet: Vec<_> = packet.to_buf().into(); 89 110 90 - udp.send_to(&packet, "127.0.0.1:2603").unwrap(); 91 - }, 92 - PacketTypes::PlayerVoicePacket(packet) => { 93 - let voices = voice_queues.read().unwrap(); 94 - if let Some(decoder) = voices.get(&packet.id){ 95 - decoder.decode(packet.packet); } 111 + udp.send_to(&packet, srv_addr).unwrap(); 96 112 }, 97 113 _ => { 98 114 cmd.send(NetClientCommand::RecvPacket(packet)).unwrap(); ··· 107 123 Ok(()) 108 124 } 109 125 110 - pub fn start_listening_for_player_voice( &mut self, id: &String ) -> Arc<Mutex<VecDeque<f32>>>{ 126 + pub fn get_remote_player_voice_ids( &self ) -> Vec<String>{ 127 + let voices = self.voice_queues.read().unwrap(); 128 + voices.iter().map(| x | x.0.clone()).collect() 129 + } 130 + 131 + pub fn get_remote_player_voice_rms( &self, id: &String ) -> f32{ 132 + let voices = self.voice_queues.read().unwrap(); 133 + let decoder = voices.get(id).unwrap(); 134 + 135 + decoder.get_last_rms() 136 + } 137 + 138 + pub fn start_listening_for_player_voice( &self, id: &String ) -> Arc<Mutex<VecDeque<f32>>>{ 111 139 let voice_queue = Arc::new(Mutex::new(VecDeque::new())); 112 140 let decoder = VoiceChatDecoder::new(voice_queue.clone()); 113 141 ··· 117 145 voice_queue 118 146 } 119 147 120 - pub fn stop_listening_for_player_voice( &mut self, id: &String ){ 148 + pub fn stop_listening_for_player_voice( &self, id: &String ){ 121 149 let mut voices = self.voice_queues.write().unwrap(); 122 150 voices.remove(id); 123 151 } ··· 138 166 self.udp.send_to(&buf, self.udp_server_address)?; 139 167 140 168 Ok(()) 169 + } 170 + } 171 + 172 + impl Drop for Connection{ 173 + fn drop(&mut self) { 174 + println!("Killing connection to: {}", self.udp_server_address); 175 + self.net_send.send(NetClientCommand::Disconnected).unwrap(); 176 + 177 + self.killed_signal.send(()).unwrap(); 178 + self.tcp.shutdown(Shutdown::Both).unwrap(); 141 179 } 142 180 }
+8 -2
client/src/setup.rs
··· 1 1 use bevy::prelude::*; 2 2 use felix_audio::FelixAudioListener; 3 3 4 - use crate::debug_camera; 4 + use crate::components::debug_camera::DebugCamera; 5 5 6 6 pub fn setup( 7 7 mut commands: Commands, 8 8 mut meshes: ResMut<Assets<Mesh>>, 9 9 mut materials: ResMut<Assets<StandardMaterial>> 10 10 ){ 11 + // TODO: World loading from server 12 + 13 + // TODO: World loading format? Could build an asset bundle parser? 14 + 15 + // TODO: UI and Menus 16 + 11 17 commands.spawn(( 12 18 Mesh3d(meshes.add(Cuboid::new(1.0, 1.0, 1.0))), 13 19 MeshMaterial3d(materials.add(Color::WHITE)), ··· 33 39 )); 34 40 35 41 commands.spawn(( 36 - debug_camera::DebugCamera::default(), 42 + DebugCamera::default(), // TODO: Build a proper player controller 37 43 Camera3d::default(), 38 44 Transform::from_xyz(0.0, 0.0, 0.0).looking_at(Vec3::ZERO, Vec3::Y), 39 45 FelixAudioListener
-4
server/src/net/connection.rs
··· 74 74 self.position = packet.position; 75 75 self.rotation = packet.rotation; 76 76 }, 77 - PacketTypes::PlayerVoicePacket(mut voice) => { 78 - voice.id = self.id.clone(); // NOTE: See "audio/src/voice/microphone.rs:71" 79 - self.main_thread_sender.send(NetServerCommand::BroadcastVoice(voice)).unwrap(); 80 - }, 81 77 _ => {} 82 78 } 83 79
+19 -10
server/src/net/mod.rs
··· 1 1 use std::{collections::HashMap, env, net::{SocketAddr, TcpListener, TcpStream, UdpSocket}, thread, time::Duration}; 2 2 3 - use felix_net::{buffer::Buffer, packet::{self, PacketTypes}, packets::{player_join_packet::PlayerJoinPacket, player_leave_packet::PlayerLeavePacket, player_list_packet::PlayerListPacket, player_voice_packet::PlayerVoicePacket, update_clients_positions::UpdateClientsPositions}}; 3 + use felix_net::{buffer::Buffer, packet::{self, PacketTypes}, packets::{player_join_packet::PlayerJoinPacket, player_leave_packet::PlayerLeavePacket, player_list_packet::PlayerListPacket, update_clients_positions::UpdateClientsPositions}}; 4 4 5 5 use crate::net::connection::Connection; 6 6 ··· 13 13 SendOverUDP(Vec<u8>, SocketAddr), 14 14 RecvOverUDP(SocketAddr, PacketTypes), 15 15 RecvOverTCP(String, PacketTypes), 16 - BroadcastVoice(PlayerVoicePacket), 17 16 BroadcastPlayerPositions 18 17 } 19 18 20 19 pub fn handle_net() -> anyhow::Result<()>{ 20 + // TODO: Multiple instances 21 + // Need to decide if i want to run each instance on a seperate thread 22 + 21 23 let port = env::var("HOST_PORT")?; 22 24 23 25 let tcp_listener = TcpListener::bind(format!("0.0.0.0:{}", port))?; ··· 113 115 }, 114 116 NetServerCommand::RecvOverUDP(addr, packet) => { 115 117 if let Some(conn_id) = connections_by_address.get(&addr){ 116 - let conn = connections.get_mut(conn_id).unwrap(); 117 - conn.recv_packet(packet)?; 118 + match packet { 119 + PacketTypes::PlayerVoicePacket(mut voice) => { // Keep voice stuff in one thread to avoid copying a lot of data all over the place 120 + voice.id = conn_id.clone(); // NOTE: See "audio/src/voice/microphone.rs:71" 121 + 122 + for ( remote_id, conn ) in &mut connections{ 123 + if remote_id == &voice.id { continue; } 124 + conn.try_send_unreliable(voice.clone())?; 125 + } 126 + }, 127 + _ => { 128 + let conn = connections.get_mut(conn_id).unwrap(); 129 + conn.recv_packet(packet)?; 130 + } 131 + } 132 + 118 133 } 119 134 }, 120 135 NetServerCommand::RecvOverTCP(conn_id, packet) => { ··· 130 145 131 146 for (_, conn) in &mut connections{ 132 147 conn.try_send_unreliable(packet.clone())?; } 133 - }, 134 - NetServerCommand::BroadcastVoice(packet) => { 135 - for ( remote_id, conn ) in &mut connections{ 136 - if remote_id == &packet.id { continue; } 137 - conn.try_send_unreliable(packet.clone())?; 138 - } 139 148 } 140 149 } 141 150 }