use fuser::{Filesystem, ReplyData}; use futures_util::StreamExt; use log::{error, info, warn}; use std::collections::VecDeque; use std::process::Command; use std::sync::{Arc, Mutex}; use std::thread; use tokio::sync::oneshot; use tokio_tungstenite::{connect_async, tungstenite::Message}; const DEV: &str = "jetstream"; const URL: &str = "wss://jetstream1.us-east.fire.hose.cam/subscribe"; struct CuseDevice { buffer: Arc>>, is_running: Arc>, cancel: Arc>>>, } impl CuseDevice { fn new() -> Self { Self { buffer: Arc::new(Mutex::new(VecDeque::new())), is_running: Arc::new(Mutex::new(false)), cancel: Arc::new(Mutex::new(None)), } } fn begin_stream(&self) { let mut is_running = self.is_running.lock().unwrap(); if !*is_running { *is_running = true; let buffer_clone = Arc::clone(&self.buffer); let (cancel_tx, cancel_rx) = oneshot::channel(); *self.cancel.lock().unwrap() = Some(cancel_tx); thread::spawn(move || { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { Self::websocket_task(buffer_clone, cancel_rx).await; }); }); info!("started websocket connection thread"); } } fn end_stream(&self) { let mut shutdown_tx = self.cancel.lock().unwrap(); if let Some(tx) = shutdown_tx.take() { let _ = tx.send(()); info!("sent shutdown signal to websocket"); } *self.is_running.lock().unwrap() = false; } async fn websocket_task( buffer: Arc>>, mut shutdown_rx: oneshot::Receiver<()>, ) { loop { info!("connecting to jetstream..."); match connect_async(URL).await { Ok((mut ws, _)) => { info!("connected to jetstream ... "); loop { tokio::select! { _ = &mut shutdown_rx => { info!("shutting down ... "); let _ = ws.close(None).await; return; } msg = ws.next() => { match msg { Some(Ok(Message::Text(text))) => { let mut buf = buffer.lock().unwrap(); buf.extend(text.as_bytes()); buf.push_back(b'\n'); while buf.len() > 1024 * 1024 { buf.pop_front(); } info!("received {} bytes, buffer size: {}", text.len(), buf.len()); } Some(Ok(Message::Close(_))) => { warn!("websocket closed by server"); break; } Some(Err(e)) => { error!("websocket error: {e}"); break; } None => { warn!("websocket stream ended"); break; } _ => {} } } } } } Err(e) => { error!("failed to connect to websocket: {e}"); } } if shutdown_rx.try_recv().is_ok() { info!("shutdown requested, stopping websocket task"); return; } info!("reconnecting in 5 seconds..."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } } } impl Filesystem for CuseDevice { fn cuse_init( &mut self, _req: &fuser::Request<'_>, reply: fuser::ReplyCuseInit, ) -> Result<(), libc::c_int> { let name = DEV; let major = 51; let minor = 0; info!("Initializing CUSE at /dev/{name}"); let config = fuser::CuseConfig::new(name, major, minor); reply.reply(config); Ok(()) } fn read( &mut self, _req: &fuser::Request<'_>, ino: u64, fh: u64, offset: i64, size: u32, flags: i32, lock_owner: Option, reply: ReplyData, ) { info!( "read(ino: {ino:#x?}, fh: {fh}, offset: {offset}, size: {size}, flags: {flags:#x?}, lock_owner: {lock_owner:?})", ); self.begin_stream(); loop { let mut buffer = self.buffer.lock().unwrap(); if !buffer.is_empty() { let bytes_to_read = std::cmp::min(size as usize, buffer.len()); let data = buffer.drain(0..bytes_to_read).collect::>(); info!( "Returning {} bytes, buffer remaining: {}", data.len(), buffer.len() ); reply.data(&data); return; } drop(buffer); std::thread::sleep(std::time::Duration::from_millis(10)); } } fn release( &mut self, _req: &fuser::Request<'_>, ino: u64, fh: u64, flags: i32, lock_owner: Option, flush: bool, reply: fuser::ReplyEmpty, ) { info!( "release(ino: {ino:#x?}, fh: {fh}, flags: {flags:#x?}, lock_owner: {lock_owner:?}, flush: {flush})" ); self.end_stream(); reply.ok(); } } fn main() { env_logger::init(); let device = CuseDevice::new(); // run cuse in a separate thread let handle = thread::spawn(|| { fuser::cuse(device).unwrap_or_else(|e| { error!( "failed to start cuse device: {e}. try run this example as privileged user" ); std::process::exit(1); }) }); // make the device readable without sudo let output = Command::new("chmod") .args(["644", &format!("/dev/{DEV}")]) .output(); match output { Ok(result) if result.status.success() => info!("/dev/{DEV} is now 0644"), _ => warn!("failed to execute chmod"), } let _ = handle.join(); info!("done"); }