Browse and listen to thousands of radio stations across the globe right from your terminal ๐ ๐ป ๐ตโจ
radio
rust
tokio
web-radio
command-line-tool
tui
1use std::{
2 pin::Pin,
3 sync::{Arc, Mutex},
4 task::{Context, Poll},
5 thread,
6 time::Duration,
7};
8
9use anyhow::Error;
10use futures_util::Future;
11use reqwest::blocking::Response;
12use rodio::{OutputStream, OutputStreamHandle, Sink};
13use tokio::sync::mpsc;
14
15use crate::decoder::Mp3Decoder;
16
17pub struct Player;
18
19impl Player {
20 pub fn new(cmd_rx: Arc<Mutex<mpsc::UnboundedReceiver<PlayerCommand>>>) -> Self {
21 thread::spawn(move || {
22 let internal = PlayerInternal::new(cmd_rx);
23 futures::executor::block_on(internal);
24 });
25 Self {}
26 }
27}
28
29#[derive(Debug)]
30pub enum PlayerCommand {
31 Play(String),
32 PlayOrPause,
33 Stop,
34}
35
36struct PlayerInternal {
37 sink: Arc<Mutex<Sink>>,
38 stream: OutputStream,
39 handle: OutputStreamHandle,
40 commands: Arc<Mutex<mpsc::UnboundedReceiver<PlayerCommand>>>,
41 decoder: Option<Mp3Decoder<Response>>,
42}
43
44impl PlayerInternal {
45 fn new(cmd_rx: Arc<Mutex<mpsc::UnboundedReceiver<PlayerCommand>>>) -> Self {
46 let (stream, handle) = rodio::OutputStream::try_default().unwrap();
47 Self {
48 sink: Arc::new(Mutex::new(rodio::Sink::try_new(&handle).unwrap())),
49 stream,
50 handle,
51 commands: cmd_rx,
52 decoder: None,
53 }
54 }
55
56 fn handle_play(&mut self, url: String) -> Result<(), Error> {
57 let (stream, handle) = rodio::OutputStream::try_default().unwrap();
58 self.stream = stream;
59 self.sink = Arc::new(Mutex::new(rodio::Sink::try_new(&handle).unwrap()));
60 self.handle = handle;
61 let sink = self.sink.clone();
62
63 thread::spawn(move || {
64 let (frame_tx, _frame_rx) = std::sync::mpsc::channel::<minimp3::Frame>();
65 let client = reqwest::blocking::Client::new();
66
67 let response = client.get(url.clone()).send().unwrap();
68
69 println!("headers: {:#?}", response.headers());
70 let location = response.headers().get("location");
71
72 let response = match location {
73 Some(location) => {
74 let response = client.get(location.to_str().unwrap()).send().unwrap();
75 let location = response.headers().get("location");
76 match location {
77 Some(location) => client.get(location.to_str().unwrap()).send().unwrap(),
78 None => response,
79 }
80 }
81 None => response,
82 };
83 let decoder = Mp3Decoder::new(response, Some(frame_tx)).unwrap();
84
85 {
86 let sink = sink.lock().unwrap();
87 sink.append(decoder);
88 sink.play();
89 }
90
91 loop {
92 let sink = sink.lock().unwrap();
93
94 if sink.empty() {
95 break;
96 }
97
98 drop(sink);
99
100 std::thread::sleep(Duration::from_millis(10));
101 }
102 });
103
104 Ok(())
105 }
106
107 fn handle_play_or_pause(&self) -> Result<(), Error> {
108 let sink = self.sink.lock().unwrap();
109 match sink.is_paused() {
110 true => sink.play(),
111 false => sink.pause(),
112 };
113 Ok(())
114 }
115
116 fn handle_stop(&self) -> Result<(), Error> {
117 let sink = self.sink.lock().unwrap();
118 sink.stop();
119 Ok(())
120 }
121
122 pub fn handle_command(&mut self, cmd: PlayerCommand) -> Result<(), Error> {
123 match cmd {
124 PlayerCommand::Play(url) => self.handle_play(url),
125 PlayerCommand::PlayOrPause => self.handle_play_or_pause(),
126 PlayerCommand::Stop => self.handle_stop(),
127 }
128 }
129}
130
131impl Future for PlayerInternal {
132 type Output = ();
133 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134 loop {
135 // Process commands that have been sent to the player
136 let cmd = match self.commands.lock().unwrap().poll_recv(cx) {
137 Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down.
138 Poll::Ready(Some(cmd)) => Some(cmd),
139 _ => None,
140 };
141
142 if let Some(cmd) = cmd {
143 if let Err(e) = self.handle_command(cmd) {
144 println!("{:?}", e);
145 }
146 }
147
148 thread::sleep(Duration::from_millis(500));
149 }
150 }
151}