Browse and listen to thousands of radio stations across the globe right from your terminal ๐ŸŒŽ ๐Ÿ“ป ๐ŸŽตโœจ
radio rust tokio web-radio command-line-tool tui
at main 4.4 kB view raw
1use std::{ 2 pin::Pin, 3 sync::{Arc, Mutex}, 4 task::{Context, Poll}, 5 thread, 6 time::Duration, 7}; 8 9use anyhow::Error; 10use futures_util::Future; 11use reqwest::blocking::Response; 12use rodio::{OutputStream, OutputStreamHandle, Sink}; 13use tokio::sync::mpsc; 14 15use crate::decoder::Mp3Decoder; 16 17pub struct Player; 18 19impl Player { 20 pub fn new(cmd_rx: Arc<Mutex<mpsc::UnboundedReceiver<PlayerCommand>>>) -> Self { 21 thread::spawn(move || { 22 let internal = PlayerInternal::new(cmd_rx); 23 futures::executor::block_on(internal); 24 }); 25 Self {} 26 } 27} 28 29#[derive(Debug)] 30pub enum PlayerCommand { 31 Play(String), 32 PlayOrPause, 33 Stop, 34} 35 36struct PlayerInternal { 37 sink: Arc<Mutex<Sink>>, 38 stream: OutputStream, 39 handle: OutputStreamHandle, 40 commands: Arc<Mutex<mpsc::UnboundedReceiver<PlayerCommand>>>, 41 decoder: Option<Mp3Decoder<Response>>, 42} 43 44impl PlayerInternal { 45 fn new(cmd_rx: Arc<Mutex<mpsc::UnboundedReceiver<PlayerCommand>>>) -> Self { 46 let (stream, handle) = rodio::OutputStream::try_default().unwrap(); 47 Self { 48 sink: Arc::new(Mutex::new(rodio::Sink::try_new(&handle).unwrap())), 49 stream, 50 handle, 51 commands: cmd_rx, 52 decoder: None, 53 } 54 } 55 56 fn handle_play(&mut self, url: String) -> Result<(), Error> { 57 let (stream, handle) = rodio::OutputStream::try_default().unwrap(); 58 self.stream = stream; 59 self.sink = Arc::new(Mutex::new(rodio::Sink::try_new(&handle).unwrap())); 60 self.handle = handle; 61 let sink = self.sink.clone(); 62 63 thread::spawn(move || { 64 let (frame_tx, _frame_rx) = std::sync::mpsc::channel::<minimp3::Frame>(); 65 let client = reqwest::blocking::Client::new(); 66 67 let response = client.get(url.clone()).send().unwrap(); 68 69 println!("headers: {:#?}", response.headers()); 70 let location = response.headers().get("location"); 71 72 let response = match location { 73 Some(location) => { 74 let response = client.get(location.to_str().unwrap()).send().unwrap(); 75 let location = response.headers().get("location"); 76 match location { 77 Some(location) => client.get(location.to_str().unwrap()).send().unwrap(), 78 None => response, 79 } 80 } 81 None => response, 82 }; 83 let decoder = Mp3Decoder::new(response, Some(frame_tx)).unwrap(); 84 85 { 86 let sink = sink.lock().unwrap(); 87 sink.append(decoder); 88 sink.play(); 89 } 90 91 loop { 92 let sink = sink.lock().unwrap(); 93 94 if sink.empty() { 95 break; 96 } 97 98 drop(sink); 99 100 std::thread::sleep(Duration::from_millis(10)); 101 } 102 }); 103 104 Ok(()) 105 } 106 107 fn handle_play_or_pause(&self) -> Result<(), Error> { 108 let sink = self.sink.lock().unwrap(); 109 match sink.is_paused() { 110 true => sink.play(), 111 false => sink.pause(), 112 }; 113 Ok(()) 114 } 115 116 fn handle_stop(&self) -> Result<(), Error> { 117 let sink = self.sink.lock().unwrap(); 118 sink.stop(); 119 Ok(()) 120 } 121 122 pub fn handle_command(&mut self, cmd: PlayerCommand) -> Result<(), Error> { 123 match cmd { 124 PlayerCommand::Play(url) => self.handle_play(url), 125 PlayerCommand::PlayOrPause => self.handle_play_or_pause(), 126 PlayerCommand::Stop => self.handle_stop(), 127 } 128 } 129} 130 131impl Future for PlayerInternal { 132 type Output = (); 133 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 134 loop { 135 // Process commands that have been sent to the player 136 let cmd = match self.commands.lock().unwrap().poll_recv(cx) { 137 Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down. 138 Poll::Ready(Some(cmd)) => Some(cmd), 139 _ => None, 140 }; 141 142 if let Some(cmd) = cmd { 143 if let Err(e) = self.handle_command(cmd) { 144 println!("{:?}", e); 145 } 146 } 147 148 thread::sleep(Duration::from_millis(500)); 149 } 150 } 151}