mount jetstream as a userspace character device on linux
at main 6.9 kB view raw
1use fuser::{Filesystem, ReplyData}; 2use futures_util::StreamExt; 3use log::{error, info, warn}; 4use std::collections::VecDeque; 5use std::process::Command; 6use std::sync::{Arc, Mutex}; 7use std::thread; 8use tokio::sync::oneshot; 9use tokio_tungstenite::{connect_async, tungstenite::Message}; 10 11const DEV: &str = "jetstream"; 12const URL: &str = "wss://jetstream1.us-east.fire.hose.cam/subscribe"; 13 14struct CuseDevice { 15 buffer: Arc<Mutex<VecDeque<u8>>>, 16 is_running: Arc<Mutex<bool>>, 17 cancel: Arc<Mutex<Option<oneshot::Sender<()>>>>, 18} 19 20impl CuseDevice { 21 fn new() -> Self { 22 Self { 23 buffer: Arc::new(Mutex::new(VecDeque::new())), 24 is_running: Arc::new(Mutex::new(false)), 25 cancel: Arc::new(Mutex::new(None)), 26 } 27 } 28 29 fn begin_stream(&self) { 30 let mut is_running = self.is_running.lock().unwrap(); 31 if !*is_running { 32 *is_running = true; 33 let buffer_clone = Arc::clone(&self.buffer); 34 let (cancel_tx, cancel_rx) = oneshot::channel(); 35 36 *self.cancel.lock().unwrap() = Some(cancel_tx); 37 38 thread::spawn(move || { 39 let rt = tokio::runtime::Runtime::new().unwrap(); 40 rt.block_on(async { 41 Self::websocket_task(buffer_clone, cancel_rx).await; 42 }); 43 }); 44 info!("started websocket connection thread"); 45 } 46 } 47 48 fn end_stream(&self) { 49 let mut shutdown_tx = self.cancel.lock().unwrap(); 50 if let Some(tx) = shutdown_tx.take() { 51 let _ = tx.send(()); 52 info!("sent shutdown signal to websocket"); 53 } 54 *self.is_running.lock().unwrap() = false; 55 } 56 57 async fn websocket_task( 58 buffer: Arc<Mutex<VecDeque<u8>>>, 59 mut shutdown_rx: oneshot::Receiver<()>, 60 ) { 61 loop { 62 info!("connecting to jetstream..."); 63 match connect_async(URL).await { 64 Ok((mut ws, _)) => { 65 info!("connected to jetstream ... "); 66 67 loop { 68 tokio::select! { 69 _ = &mut shutdown_rx => { 70 info!("shutting down ... "); 71 let _ = ws.close(None).await; 72 return; 73 } 74 75 msg = ws.next() => { 76 match msg { 77 Some(Ok(Message::Text(text))) => { 78 let mut buf = buffer.lock().unwrap(); 79 80 buf.extend(text.as_bytes()); 81 buf.push_back(b'\n'); 82 83 while buf.len() > 1024 * 1024 { 84 buf.pop_front(); 85 } 86 87 info!("received {} bytes, buffer size: {}", text.len(), buf.len()); 88 } 89 Some(Ok(Message::Close(_))) => { 90 warn!("websocket closed by server"); 91 break; 92 } 93 Some(Err(e)) => { 94 error!("websocket error: {e}"); 95 break; 96 } 97 None => { 98 warn!("websocket stream ended"); 99 break; 100 } 101 _ => {} 102 } 103 } 104 } 105 } 106 } 107 Err(e) => { 108 error!("failed to connect to websocket: {e}"); 109 } 110 } 111 112 if shutdown_rx.try_recv().is_ok() { 113 info!("shutdown requested, stopping websocket task"); 114 return; 115 } 116 117 info!("reconnecting in 5 seconds..."); 118 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 119 } 120 } 121} 122 123impl Filesystem for CuseDevice { 124 fn cuse_init( 125 &mut self, 126 _req: &fuser::Request<'_>, 127 reply: fuser::ReplyCuseInit, 128 ) -> Result<(), libc::c_int> { 129 let name = DEV; 130 let major = 51; 131 let minor = 0; 132 info!("Initializing CUSE at /dev/{name}"); 133 let config = fuser::CuseConfig::new(name, major, minor); 134 reply.reply(config); 135 Ok(()) 136 } 137 138 fn read( 139 &mut self, 140 _req: &fuser::Request<'_>, 141 ino: u64, 142 fh: u64, 143 offset: i64, 144 size: u32, 145 flags: i32, 146 lock_owner: Option<u64>, 147 reply: ReplyData, 148 ) { 149 info!( 150 "read(ino: {ino:#x?}, fh: {fh}, offset: {offset}, size: {size}, flags: {flags:#x?}, lock_owner: {lock_owner:?})", 151 ); 152 153 self.begin_stream(); 154 155 loop { 156 let mut buffer = self.buffer.lock().unwrap(); 157 if !buffer.is_empty() { 158 let bytes_to_read = std::cmp::min(size as usize, buffer.len()); 159 let data = buffer.drain(0..bytes_to_read).collect::<Vec<_>>(); 160 info!( 161 "Returning {} bytes, buffer remaining: {}", 162 data.len(), 163 buffer.len() 164 ); 165 reply.data(&data); 166 return; 167 } 168 169 drop(buffer); 170 std::thread::sleep(std::time::Duration::from_millis(10)); 171 } 172 } 173 174 fn release( 175 &mut self, 176 _req: &fuser::Request<'_>, 177 ino: u64, 178 fh: u64, 179 flags: i32, 180 lock_owner: Option<u64>, 181 flush: bool, 182 reply: fuser::ReplyEmpty, 183 ) { 184 info!( 185 "release(ino: {ino:#x?}, fh: {fh}, flags: {flags:#x?}, lock_owner: {lock_owner:?}, flush: {flush})" 186 ); 187 self.end_stream(); 188 reply.ok(); 189 } 190} 191 192fn main() { 193 env_logger::init(); 194 let device = CuseDevice::new(); 195 196 // run cuse in a separate thread 197 let handle = thread::spawn(|| { 198 fuser::cuse(device).unwrap_or_else(|e| { 199 error!( 200 "failed to start cuse device: {e}. try run this example as privileged user" 201 ); 202 std::process::exit(1); 203 }) 204 }); 205 206 // make the device readable without sudo 207 let output = Command::new("chmod") 208 .args(["644", &format!("/dev/{DEV}")]) 209 .output(); 210 211 match output { 212 Ok(result) if result.status.success() => info!("/dev/{DEV} is now 0644"), 213 _ => warn!("failed to execute chmod"), 214 } 215 216 let _ = handle.join(); 217 info!("done"); 218}