Browse and listen to thousands of radio stations across the globe right from your terminal ๐ŸŒŽ ๐Ÿ“ป ๐ŸŽตโœจ
radio rust tokio web-radio command-line-tool tui
at main 6.8 kB view raw
1use std::sync::Arc; 2use std::thread; 3use std::time::Duration; 4 5use anyhow::{Context, Error}; 6use hyper::header::HeaderValue; 7use rodio::{OutputStream, OutputStreamHandle, Sink}; 8use tokio::sync::mpsc; 9 10use crate::decoder::Mp3Decoder; 11use crate::types::Station; 12 13/// Commands sent to the audio worker thread. 14#[derive(Debug)] 15enum AudioCommand { 16 Play { 17 station: Station, 18 volume_percent: f32, 19 }, 20 SetVolume(f32), 21 Stop, 22} 23 24/// Playback events emitted by the audio worker. 25#[derive(Debug, Clone)] 26pub enum PlaybackEvent { 27 Started(PlaybackState), 28 Error(String), 29 Stopped, 30} 31 32/// Public interface for receiving playback events. 33pub struct PlaybackEvents { 34 rx: mpsc::UnboundedReceiver<PlaybackEvent>, 35} 36 37impl PlaybackEvents { 38 pub async fn recv(&mut self) -> Option<PlaybackEvent> { 39 self.rx.recv().await 40 } 41} 42 43/// Snapshot of the current playback metadata. 44#[derive(Debug, Clone)] 45pub struct PlaybackState { 46 pub station: Station, 47 pub stream_name: String, 48 pub now_playing: String, 49 pub genre: String, 50 pub description: String, 51 pub bitrate: String, 52} 53 54/// Controller that owns the command channel to the audio worker. 55pub struct AudioController { 56 cmd_tx: mpsc::UnboundedSender<AudioCommand>, 57} 58 59impl AudioController { 60 /// Spawn a new audio worker thread and return a controller plus event receiver. 61 pub fn new() -> Result<(Self, PlaybackEvents), Error> { 62 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<AudioCommand>(); 63 let (event_tx, event_rx) = mpsc::unbounded_channel::<PlaybackEvent>(); 64 65 thread::Builder::new() 66 .name("tunein-audio-worker".into()) 67 .spawn({ 68 let events = event_tx.clone(); 69 move || { 70 let mut worker = AudioWorker::new(event_tx); 71 if let Err(err) = worker.run(&mut cmd_rx) { 72 let _ = events.send(PlaybackEvent::Error(err.to_string())); 73 } 74 } 75 }) 76 .context("failed to spawn audio worker thread")?; 77 78 Ok((Self { cmd_tx }, PlaybackEvents { rx: event_rx })) 79 } 80 81 pub fn play(&self, station: Station, volume_percent: f32) -> Result<(), Error> { 82 self.cmd_tx 83 .send(AudioCommand::Play { 84 station, 85 volume_percent, 86 }) 87 .map_err(|e| Error::msg(e.to_string())) 88 } 89 90 pub fn set_volume(&self, volume_percent: f32) -> Result<(), Error> { 91 self.cmd_tx 92 .send(AudioCommand::SetVolume(volume_percent)) 93 .map_err(|e| Error::msg(e.to_string())) 94 } 95 96 pub fn stop(&self) -> Result<(), Error> { 97 self.cmd_tx 98 .send(AudioCommand::Stop) 99 .map_err(|e| Error::msg(e.to_string())) 100 } 101} 102 103struct AudioWorker { 104 _stream: OutputStream, 105 handle: OutputStreamHandle, 106 sink: Option<Arc<Sink>>, 107 current_volume: f32, 108 events: mpsc::UnboundedSender<PlaybackEvent>, 109} 110 111impl AudioWorker { 112 fn new(events: mpsc::UnboundedSender<PlaybackEvent>) -> Self { 113 let (stream, handle) = 114 OutputStream::try_default().expect("failed to acquire default audio output device"); 115 Self { 116 _stream: stream, 117 handle, 118 sink: None, 119 current_volume: 100.0, 120 events, 121 } 122 } 123 124 fn run(&mut self, cmd_rx: &mut mpsc::UnboundedReceiver<AudioCommand>) -> Result<(), Error> { 125 while let Some(cmd) = cmd_rx.blocking_recv() { 126 match cmd { 127 AudioCommand::Play { 128 station, 129 volume_percent, 130 } => self.handle_play(station, volume_percent)?, 131 AudioCommand::SetVolume(volume_percent) => { 132 self.current_volume = volume_percent.max(0.0); 133 if let Some(sink) = &self.sink { 134 sink.set_volume(self.current_volume / 100.0); 135 } 136 } 137 AudioCommand::Stop => { 138 if let Some(sink) = self.sink.take() { 139 sink.stop(); 140 } 141 let _ = self.events.send(PlaybackEvent::Stopped); 142 } 143 } 144 } 145 146 Ok(()) 147 } 148 149 fn handle_play(&mut self, station: Station, volume_percent: f32) -> Result<(), Error> { 150 if let Some(sink) = self.sink.take() { 151 sink.stop(); 152 thread::sleep(Duration::from_millis(50)); 153 } 154 155 let stream_url = station.stream_url.clone(); 156 let client = reqwest::blocking::Client::new(); 157 let response = client 158 .get(&stream_url) 159 .send() 160 .with_context(|| format!("failed to open stream {}", stream_url))?; 161 162 let headers = response.headers().clone(); 163 let now_playing = station.playing.clone().unwrap_or_default(); 164 165 let display_name = header_to_string(headers.get("icy-name")) 166 .filter(|name| name != "Unknown") 167 .unwrap_or_else(|| station.name.clone()); 168 let genre = header_to_string(headers.get("icy-genre")).unwrap_or_default(); 169 let description = header_to_string(headers.get("icy-description")).unwrap_or_default(); 170 let bitrate = header_to_string(headers.get("icy-br")).unwrap_or_default(); 171 172 let response = follow_redirects(client, response)?; 173 174 let sink = Arc::new(Sink::try_new(&self.handle)?); 175 sink.set_volume(volume_percent.max(0.0) / 100.0); 176 177 let decoder = Mp3Decoder::new(response, None).map_err(|_| { 178 Error::msg("stream is not in MP3 format or failed to initialize decoder") 179 })?; 180 sink.append(decoder); 181 sink.play(); 182 183 self.current_volume = volume_percent; 184 self.sink = Some(sink.clone()); 185 186 let state = PlaybackState { 187 station, 188 stream_name: display_name, 189 now_playing, 190 genre, 191 description, 192 bitrate, 193 }; 194 195 let _ = self.events.send(PlaybackEvent::Started(state)); 196 197 Ok(()) 198 } 199} 200 201fn follow_redirects( 202 client: reqwest::blocking::Client, 203 response: reqwest::blocking::Response, 204) -> Result<reqwest::blocking::Response, Error> { 205 let mut current = response; 206 for _ in 0..3 { 207 if let Some(location) = current.headers().get("location") { 208 let url = location 209 .to_str() 210 .map_err(|_| Error::msg("invalid redirect location header"))?; 211 current = client.get(url).send()?; 212 } else { 213 return Ok(current); 214 } 215 } 216 Ok(current) 217} 218 219fn header_to_string(value: Option<&HeaderValue>) -> Option<String> { 220 value 221 .and_then(|header| header.to_str().ok()) 222 .map(|s| s.to_string()) 223}