A (planned) collection of lightweight tools for streaming.
at main 6.4 kB view raw
1use anyhow::{Result, anyhow}; 2use cpal::{ 3 SampleFormat, StreamConfig, 4 traits::{DeviceTrait, HostTrait, StreamTrait}, 5}; 6use std::{ 7 collections::VecDeque, 8 sync::{ 9 Arc, Mutex, 10 atomic::{self, AtomicBool, AtomicU64}, 11 }, 12 time::{self, SystemTime, UNIX_EPOCH}, 13}; 14 15#[derive(Debug, Clone)] 16pub struct MicInfo { 17 pub name: String, 18 pub sample_rate: u32, 19 pub channels: u16, 20} 21 22impl Default for MicInfo { 23 fn default() -> Self { 24 Self::new() 25 } 26} 27 28impl MicInfo { 29 pub fn new() -> Self { 30 let host = cpal::default_host(); 31 let device = host.default_input_device().expect("no default input device"); 32 let supported_config = device.default_input_config().unwrap(); 33 let config: cpal::StreamConfig = supported_config.clone().into(); 34 35 Self { 36 name: device.name().unwrap_or_else(|_| "Unknown device".into()), 37 sample_rate: config.sample_rate.0, 38 channels: config.channels, 39 } 40 } 41} 42 43pub type SharedLevels = Arc<Mutex<VecDeque<f32>>>; 44type LastActiveTime = Arc<AtomicU64>; 45 46const HOLD_TIME_MS: u64 = 300; 47 48fn get_current_time_ms() -> u64 { 49 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 50} 51 52fn process_input_f32(data: &[f32], active: &AtomicBool, levels: &SharedLevels, last_active: &LastActiveTime) { 53 if data.is_empty() { 54 return; 55 } 56 57 let mut sum = 0.0; 58 for &s in data { 59 sum += s * s; 60 } 61 let rms = (sum / data.len() as f32).sqrt(); 62 63 let threshold = 0.01; 64 let now = get_current_time_ms(); 65 66 if rms > threshold { 67 last_active.store(now, atomic::Ordering::Relaxed); 68 active.store(true, atomic::Ordering::Relaxed); 69 } else { 70 let last_time = last_active.load(atomic::Ordering::Relaxed); 71 let is_active = (now - last_time) < HOLD_TIME_MS; 72 active.store(is_active, atomic::Ordering::Relaxed); 73 } 74 75 push_level(levels, rms); 76} 77 78fn process_input_i16(data: &[i16], active: &AtomicBool, levels: &SharedLevels, last_active: &LastActiveTime) { 79 if data.is_empty() { 80 return; 81 } 82 let norm = i16::MAX as f32; 83 let mut sum = 0.0; 84 for &s in data { 85 let v = s as f32 / norm; 86 sum += v * v; 87 } 88 let rms = (sum / data.len() as f32).sqrt(); 89 90 let threshold = 0.01; 91 let now = get_current_time_ms(); 92 93 if rms > threshold { 94 last_active.store(now, atomic::Ordering::Relaxed); 95 active.store(true, atomic::Ordering::Relaxed); 96 } else { 97 let last_time = last_active.load(atomic::Ordering::Relaxed); 98 let is_active = (now - last_time) < HOLD_TIME_MS; 99 active.store(is_active, atomic::Ordering::Relaxed); 100 } 101 102 push_level(levels, rms); 103} 104 105fn process_input_u16(data: &[u16], active: &AtomicBool, levels: &SharedLevels, last_active: &LastActiveTime) { 106 if data.is_empty() { 107 return; 108 } 109 let max = u16::MAX as f32; 110 let mid = max / 2.0; 111 let mut sum = 0.0; 112 for &s in data { 113 let v = (s as f32 - mid) / mid; 114 sum += v * v; 115 } 116 let rms = (sum / data.len() as f32).sqrt(); 117 118 let threshold = 0.01; 119 let now = get_current_time_ms(); 120 121 if rms > threshold { 122 last_active.store(now, atomic::Ordering::Relaxed); 123 active.store(true, atomic::Ordering::Relaxed); 124 } else { 125 let last_time = last_active.load(atomic::Ordering::Relaxed); 126 let is_active = (now - last_time) < HOLD_TIME_MS; 127 active.store(is_active, atomic::Ordering::Relaxed); 128 } 129 130 push_level(levels, rms); 131} 132 133fn push_level(levels: &SharedLevels, rms: f32) { 134 const MAX_SAMPLES: usize = 64; 135 136 if let Ok(mut buf) = levels.lock() { 137 buf.push_back(rms); 138 if buf.len() > MAX_SAMPLES { 139 buf.pop_front(); 140 } 141 } 142} 143 144pub fn spawn_mic_listener(active: Arc<AtomicBool>, levels: SharedLevels) -> Result<()> { 145 std::thread::spawn(move || { 146 if let Err(e) = mic_loop(active, levels) { 147 eprintln!("mic loop error: {e:?}"); 148 } 149 }); 150 Ok(()) 151} 152 153pub fn mic_loop(active: Arc<AtomicBool>, levels: SharedLevels) -> Result<()> { 154 let host = cpal::default_host(); 155 let device = host 156 .default_input_device() 157 .ok_or_else(|| anyhow::anyhow!("no default input device"))?; 158 println!("Using input device: {}", device.name()?); 159 160 let supported_config = device 161 .default_input_config() 162 .map_err(|e| anyhow::anyhow!("failed to get default input config: {e}"))?; 163 let sample_format = supported_config.sample_format(); 164 let config: StreamConfig = supported_config.into(); 165 166 let last_active: LastActiveTime = Arc::new(AtomicU64::new(0)); 167 168 let stream = match sample_format { 169 SampleFormat::F32 => { 170 let active = active.clone(); 171 let levels = levels.clone(); 172 let last_active = last_active.clone(); 173 let err_fn = |err| eprintln!("cpal input stream error: {err}"); 174 device.build_input_stream( 175 &config, 176 move |data: &[f32], _| process_input_f32(data, &active, &levels, &last_active), 177 err_fn, 178 None, 179 )? 180 } 181 SampleFormat::I16 => { 182 let active = active.clone(); 183 let levels = levels.clone(); 184 let last_active = last_active.clone(); 185 let err_fn = |err| eprintln!("cpal input stream error: {err}"); 186 device.build_input_stream( 187 &config, 188 move |data: &[i16], _| process_input_i16(data, &active, &levels, &last_active), 189 err_fn, 190 None, 191 )? 192 } 193 SampleFormat::U16 => { 194 let active = active.clone(); 195 let levels = levels.clone(); 196 let last_active = last_active.clone(); 197 let err_fn = |err| eprintln!("cpal input stream error: {err}"); 198 device.build_input_stream( 199 &config, 200 move |data: &[u16], _| process_input_u16(data, &active, &levels, &last_active), 201 err_fn, 202 None, 203 )? 204 } 205 206 other => { 207 return Err(anyhow!("Unsupported sample format: {other:?}")); 208 } 209 }; 210 211 stream.play()?; 212 213 loop { 214 std::thread::sleep(time::Duration::from_secs(1)); 215 } 216}