Browse and listen to thousands of radio stations across the globe right from your terminal ๐ ๐ป ๐ตโจ
radio
rust
tokio
web-radio
command-line-tool
tui
1use std::sync::Arc;
2use std::thread;
3use std::time::Duration;
4
5use anyhow::{Context, Error};
6use hyper::header::HeaderValue;
7use rodio::{OutputStream, OutputStreamHandle, Sink};
8use tokio::sync::mpsc;
9
10use crate::decoder::Mp3Decoder;
11use crate::types::Station;
12
13/// Commands sent to the audio worker thread.
14#[derive(Debug)]
15enum AudioCommand {
16 Play {
17 station: Station,
18 volume_percent: f32,
19 },
20 SetVolume(f32),
21 Stop,
22}
23
24/// Playback events emitted by the audio worker.
25#[derive(Debug, Clone)]
26pub enum PlaybackEvent {
27 Started(PlaybackState),
28 Error(String),
29 Stopped,
30}
31
32/// Public interface for receiving playback events.
33pub struct PlaybackEvents {
34 rx: mpsc::UnboundedReceiver<PlaybackEvent>,
35}
36
37impl PlaybackEvents {
38 pub async fn recv(&mut self) -> Option<PlaybackEvent> {
39 self.rx.recv().await
40 }
41}
42
43/// Snapshot of the current playback metadata.
44#[derive(Debug, Clone)]
45pub struct PlaybackState {
46 pub station: Station,
47 pub stream_name: String,
48 pub now_playing: String,
49 pub genre: String,
50 pub description: String,
51 pub bitrate: String,
52}
53
54/// Controller that owns the command channel to the audio worker.
55pub struct AudioController {
56 cmd_tx: mpsc::UnboundedSender<AudioCommand>,
57}
58
59impl AudioController {
60 /// Spawn a new audio worker thread and return a controller plus event receiver.
61 pub fn new() -> Result<(Self, PlaybackEvents), Error> {
62 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<AudioCommand>();
63 let (event_tx, event_rx) = mpsc::unbounded_channel::<PlaybackEvent>();
64
65 thread::Builder::new()
66 .name("tunein-audio-worker".into())
67 .spawn({
68 let events = event_tx.clone();
69 move || {
70 let mut worker = AudioWorker::new(event_tx);
71 if let Err(err) = worker.run(&mut cmd_rx) {
72 let _ = events.send(PlaybackEvent::Error(err.to_string()));
73 }
74 }
75 })
76 .context("failed to spawn audio worker thread")?;
77
78 Ok((Self { cmd_tx }, PlaybackEvents { rx: event_rx }))
79 }
80
81 pub fn play(&self, station: Station, volume_percent: f32) -> Result<(), Error> {
82 self.cmd_tx
83 .send(AudioCommand::Play {
84 station,
85 volume_percent,
86 })
87 .map_err(|e| Error::msg(e.to_string()))
88 }
89
90 pub fn set_volume(&self, volume_percent: f32) -> Result<(), Error> {
91 self.cmd_tx
92 .send(AudioCommand::SetVolume(volume_percent))
93 .map_err(|e| Error::msg(e.to_string()))
94 }
95
96 pub fn stop(&self) -> Result<(), Error> {
97 self.cmd_tx
98 .send(AudioCommand::Stop)
99 .map_err(|e| Error::msg(e.to_string()))
100 }
101}
102
103struct AudioWorker {
104 _stream: OutputStream,
105 handle: OutputStreamHandle,
106 sink: Option<Arc<Sink>>,
107 current_volume: f32,
108 events: mpsc::UnboundedSender<PlaybackEvent>,
109}
110
111impl AudioWorker {
112 fn new(events: mpsc::UnboundedSender<PlaybackEvent>) -> Self {
113 let (stream, handle) =
114 OutputStream::try_default().expect("failed to acquire default audio output device");
115 Self {
116 _stream: stream,
117 handle,
118 sink: None,
119 current_volume: 100.0,
120 events,
121 }
122 }
123
124 fn run(&mut self, cmd_rx: &mut mpsc::UnboundedReceiver<AudioCommand>) -> Result<(), Error> {
125 while let Some(cmd) = cmd_rx.blocking_recv() {
126 match cmd {
127 AudioCommand::Play {
128 station,
129 volume_percent,
130 } => self.handle_play(station, volume_percent)?,
131 AudioCommand::SetVolume(volume_percent) => {
132 self.current_volume = volume_percent.max(0.0);
133 if let Some(sink) = &self.sink {
134 sink.set_volume(self.current_volume / 100.0);
135 }
136 }
137 AudioCommand::Stop => {
138 if let Some(sink) = self.sink.take() {
139 sink.stop();
140 }
141 let _ = self.events.send(PlaybackEvent::Stopped);
142 }
143 }
144 }
145
146 Ok(())
147 }
148
149 fn handle_play(&mut self, station: Station, volume_percent: f32) -> Result<(), Error> {
150 if let Some(sink) = self.sink.take() {
151 sink.stop();
152 thread::sleep(Duration::from_millis(50));
153 }
154
155 let stream_url = station.stream_url.clone();
156 let client = reqwest::blocking::Client::new();
157 let response = client
158 .get(&stream_url)
159 .send()
160 .with_context(|| format!("failed to open stream {}", stream_url))?;
161
162 let headers = response.headers().clone();
163 let now_playing = station.playing.clone().unwrap_or_default();
164
165 let display_name = header_to_string(headers.get("icy-name"))
166 .filter(|name| name != "Unknown")
167 .unwrap_or_else(|| station.name.clone());
168 let genre = header_to_string(headers.get("icy-genre")).unwrap_or_default();
169 let description = header_to_string(headers.get("icy-description")).unwrap_or_default();
170 let bitrate = header_to_string(headers.get("icy-br")).unwrap_or_default();
171
172 let response = follow_redirects(client, response)?;
173
174 let sink = Arc::new(Sink::try_new(&self.handle)?);
175 sink.set_volume(volume_percent.max(0.0) / 100.0);
176
177 let decoder = Mp3Decoder::new(response, None).map_err(|_| {
178 Error::msg("stream is not in MP3 format or failed to initialize decoder")
179 })?;
180 sink.append(decoder);
181 sink.play();
182
183 self.current_volume = volume_percent;
184 self.sink = Some(sink.clone());
185
186 let state = PlaybackState {
187 station,
188 stream_name: display_name,
189 now_playing,
190 genre,
191 description,
192 bitrate,
193 };
194
195 let _ = self.events.send(PlaybackEvent::Started(state));
196
197 Ok(())
198 }
199}
200
201fn follow_redirects(
202 client: reqwest::blocking::Client,
203 response: reqwest::blocking::Response,
204) -> Result<reqwest::blocking::Response, Error> {
205 let mut current = response;
206 for _ in 0..3 {
207 if let Some(location) = current.headers().get("location") {
208 let url = location
209 .to_str()
210 .map_err(|_| Error::msg("invalid redirect location header"))?;
211 current = client.get(url).send()?;
212 } else {
213 return Ok(current);
214 }
215 }
216 Ok(current)
217}
218
219fn header_to_string(value: Option<&HeaderValue>) -> Option<String> {
220 value
221 .and_then(|header| header.to_str().ok())
222 .map(|s| s.to_string())
223}