A (planned) collection of lightweight tools for streaming.
1use anyhow::{Result, anyhow};
2use cpal::{
3 SampleFormat, StreamConfig,
4 traits::{DeviceTrait, HostTrait, StreamTrait},
5};
6use std::{
7 collections::VecDeque,
8 sync::{
9 Arc, Mutex,
10 atomic::{self, AtomicBool, AtomicU64},
11 },
12 time::{self, SystemTime, UNIX_EPOCH},
13};
14
15#[derive(Debug, Clone)]
16pub struct MicInfo {
17 pub name: String,
18 pub sample_rate: u32,
19 pub channels: u16,
20}
21
22impl Default for MicInfo {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl MicInfo {
29 pub fn new() -> Self {
30 let host = cpal::default_host();
31 let device = host.default_input_device().expect("no default input device");
32 let supported_config = device.default_input_config().unwrap();
33 let config: cpal::StreamConfig = supported_config.clone().into();
34
35 Self {
36 name: device.name().unwrap_or_else(|_| "Unknown device".into()),
37 sample_rate: config.sample_rate.0,
38 channels: config.channels,
39 }
40 }
41}
42
43pub type SharedLevels = Arc<Mutex<VecDeque<f32>>>;
44type LastActiveTime = Arc<AtomicU64>;
45
46const HOLD_TIME_MS: u64 = 300;
47
48fn get_current_time_ms() -> u64 {
49 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
50}
51
52fn process_input_f32(data: &[f32], active: &AtomicBool, levels: &SharedLevels, last_active: &LastActiveTime) {
53 if data.is_empty() {
54 return;
55 }
56
57 let mut sum = 0.0;
58 for &s in data {
59 sum += s * s;
60 }
61 let rms = (sum / data.len() as f32).sqrt();
62
63 let threshold = 0.01;
64 let now = get_current_time_ms();
65
66 if rms > threshold {
67 last_active.store(now, atomic::Ordering::Relaxed);
68 active.store(true, atomic::Ordering::Relaxed);
69 } else {
70 let last_time = last_active.load(atomic::Ordering::Relaxed);
71 let is_active = (now - last_time) < HOLD_TIME_MS;
72 active.store(is_active, atomic::Ordering::Relaxed);
73 }
74
75 push_level(levels, rms);
76}
77
78fn process_input_i16(data: &[i16], active: &AtomicBool, levels: &SharedLevels, last_active: &LastActiveTime) {
79 if data.is_empty() {
80 return;
81 }
82 let norm = i16::MAX as f32;
83 let mut sum = 0.0;
84 for &s in data {
85 let v = s as f32 / norm;
86 sum += v * v;
87 }
88 let rms = (sum / data.len() as f32).sqrt();
89
90 let threshold = 0.01;
91 let now = get_current_time_ms();
92
93 if rms > threshold {
94 last_active.store(now, atomic::Ordering::Relaxed);
95 active.store(true, atomic::Ordering::Relaxed);
96 } else {
97 let last_time = last_active.load(atomic::Ordering::Relaxed);
98 let is_active = (now - last_time) < HOLD_TIME_MS;
99 active.store(is_active, atomic::Ordering::Relaxed);
100 }
101
102 push_level(levels, rms);
103}
104
105fn process_input_u16(data: &[u16], active: &AtomicBool, levels: &SharedLevels, last_active: &LastActiveTime) {
106 if data.is_empty() {
107 return;
108 }
109 let max = u16::MAX as f32;
110 let mid = max / 2.0;
111 let mut sum = 0.0;
112 for &s in data {
113 let v = (s as f32 - mid) / mid;
114 sum += v * v;
115 }
116 let rms = (sum / data.len() as f32).sqrt();
117
118 let threshold = 0.01;
119 let now = get_current_time_ms();
120
121 if rms > threshold {
122 last_active.store(now, atomic::Ordering::Relaxed);
123 active.store(true, atomic::Ordering::Relaxed);
124 } else {
125 let last_time = last_active.load(atomic::Ordering::Relaxed);
126 let is_active = (now - last_time) < HOLD_TIME_MS;
127 active.store(is_active, atomic::Ordering::Relaxed);
128 }
129
130 push_level(levels, rms);
131}
132
133fn push_level(levels: &SharedLevels, rms: f32) {
134 const MAX_SAMPLES: usize = 64;
135
136 if let Ok(mut buf) = levels.lock() {
137 buf.push_back(rms);
138 if buf.len() > MAX_SAMPLES {
139 buf.pop_front();
140 }
141 }
142}
143
144pub fn spawn_mic_listener(active: Arc<AtomicBool>, levels: SharedLevels) -> Result<()> {
145 std::thread::spawn(move || {
146 if let Err(e) = mic_loop(active, levels) {
147 eprintln!("mic loop error: {e:?}");
148 }
149 });
150 Ok(())
151}
152
153pub fn mic_loop(active: Arc<AtomicBool>, levels: SharedLevels) -> Result<()> {
154 let host = cpal::default_host();
155 let device = host
156 .default_input_device()
157 .ok_or_else(|| anyhow::anyhow!("no default input device"))?;
158 println!("Using input device: {}", device.name()?);
159
160 let supported_config = device
161 .default_input_config()
162 .map_err(|e| anyhow::anyhow!("failed to get default input config: {e}"))?;
163 let sample_format = supported_config.sample_format();
164 let config: StreamConfig = supported_config.into();
165
166 let last_active: LastActiveTime = Arc::new(AtomicU64::new(0));
167
168 let stream = match sample_format {
169 SampleFormat::F32 => {
170 let active = active.clone();
171 let levels = levels.clone();
172 let last_active = last_active.clone();
173 let err_fn = |err| eprintln!("cpal input stream error: {err}");
174 device.build_input_stream(
175 &config,
176 move |data: &[f32], _| process_input_f32(data, &active, &levels, &last_active),
177 err_fn,
178 None,
179 )?
180 }
181 SampleFormat::I16 => {
182 let active = active.clone();
183 let levels = levels.clone();
184 let last_active = last_active.clone();
185 let err_fn = |err| eprintln!("cpal input stream error: {err}");
186 device.build_input_stream(
187 &config,
188 move |data: &[i16], _| process_input_i16(data, &active, &levels, &last_active),
189 err_fn,
190 None,
191 )?
192 }
193 SampleFormat::U16 => {
194 let active = active.clone();
195 let levels = levels.clone();
196 let last_active = last_active.clone();
197 let err_fn = |err| eprintln!("cpal input stream error: {err}");
198 device.build_input_stream(
199 &config,
200 move |data: &[u16], _| process_input_u16(data, &active, &levels, &last_active),
201 err_fn,
202 None,
203 )?
204 }
205
206 other => {
207 return Err(anyhow!("Unsupported sample format: {other:?}"));
208 }
209 };
210
211 stream.play()?;
212
213 loop {
214 std::thread::sleep(time::Duration::from_secs(1));
215 }
216}