1use fuser::{Filesystem, ReplyData};
2use futures_util::StreamExt;
3use log::{error, info, warn};
4use std::collections::VecDeque;
5use std::process::Command;
6use std::sync::{Arc, Mutex};
7use std::thread;
8use tokio::sync::oneshot;
9use tokio_tungstenite::{connect_async, tungstenite::Message};
10
11const DEV: &str = "jetstream";
12const URL: &str = "wss://jetstream1.us-east.fire.hose.cam/subscribe";
13
14struct CuseDevice {
15 buffer: Arc<Mutex<VecDeque<u8>>>,
16 is_running: Arc<Mutex<bool>>,
17 cancel: Arc<Mutex<Option<oneshot::Sender<()>>>>,
18}
19
20impl CuseDevice {
21 fn new() -> Self {
22 Self {
23 buffer: Arc::new(Mutex::new(VecDeque::new())),
24 is_running: Arc::new(Mutex::new(false)),
25 cancel: Arc::new(Mutex::new(None)),
26 }
27 }
28
29 fn begin_stream(&self) {
30 let mut is_running = self.is_running.lock().unwrap();
31 if !*is_running {
32 *is_running = true;
33 let buffer_clone = Arc::clone(&self.buffer);
34 let (cancel_tx, cancel_rx) = oneshot::channel();
35
36 *self.cancel.lock().unwrap() = Some(cancel_tx);
37
38 thread::spawn(move || {
39 let rt = tokio::runtime::Runtime::new().unwrap();
40 rt.block_on(async {
41 Self::websocket_task(buffer_clone, cancel_rx).await;
42 });
43 });
44 info!("started websocket connection thread");
45 }
46 }
47
48 fn end_stream(&self) {
49 let mut shutdown_tx = self.cancel.lock().unwrap();
50 if let Some(tx) = shutdown_tx.take() {
51 let _ = tx.send(());
52 info!("sent shutdown signal to websocket");
53 }
54 *self.is_running.lock().unwrap() = false;
55 }
56
57 async fn websocket_task(
58 buffer: Arc<Mutex<VecDeque<u8>>>,
59 mut shutdown_rx: oneshot::Receiver<()>,
60 ) {
61 loop {
62 info!("connecting to jetstream...");
63 match connect_async(URL).await {
64 Ok((mut ws, _)) => {
65 info!("connected to jetstream ... ");
66
67 loop {
68 tokio::select! {
69 _ = &mut shutdown_rx => {
70 info!("shutting down ... ");
71 let _ = ws.close(None).await;
72 return;
73 }
74
75 msg = ws.next() => {
76 match msg {
77 Some(Ok(Message::Text(text))) => {
78 let mut buf = buffer.lock().unwrap();
79
80 buf.extend(text.as_bytes());
81 buf.push_back(b'\n');
82
83 while buf.len() > 1024 * 1024 {
84 buf.pop_front();
85 }
86
87 info!("received {} bytes, buffer size: {}", text.len(), buf.len());
88 }
89 Some(Ok(Message::Close(_))) => {
90 warn!("websocket closed by server");
91 break;
92 }
93 Some(Err(e)) => {
94 error!("websocket error: {e}");
95 break;
96 }
97 None => {
98 warn!("websocket stream ended");
99 break;
100 }
101 _ => {}
102 }
103 }
104 }
105 }
106 }
107 Err(e) => {
108 error!("failed to connect to websocket: {e}");
109 }
110 }
111
112 if shutdown_rx.try_recv().is_ok() {
113 info!("shutdown requested, stopping websocket task");
114 return;
115 }
116
117 info!("reconnecting in 5 seconds...");
118 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
119 }
120 }
121}
122
123impl Filesystem for CuseDevice {
124 fn cuse_init(
125 &mut self,
126 _req: &fuser::Request<'_>,
127 reply: fuser::ReplyCuseInit,
128 ) -> Result<(), libc::c_int> {
129 let name = DEV;
130 let major = 51;
131 let minor = 0;
132 info!("Initializing CUSE at /dev/{name}");
133 let config = fuser::CuseConfig::new(name, major, minor);
134 reply.reply(config);
135 Ok(())
136 }
137
138 fn read(
139 &mut self,
140 _req: &fuser::Request<'_>,
141 ino: u64,
142 fh: u64,
143 offset: i64,
144 size: u32,
145 flags: i32,
146 lock_owner: Option<u64>,
147 reply: ReplyData,
148 ) {
149 info!(
150 "read(ino: {ino:#x?}, fh: {fh}, offset: {offset}, size: {size}, flags: {flags:#x?}, lock_owner: {lock_owner:?})",
151 );
152
153 self.begin_stream();
154
155 loop {
156 let mut buffer = self.buffer.lock().unwrap();
157 if !buffer.is_empty() {
158 let bytes_to_read = std::cmp::min(size as usize, buffer.len());
159 let data = buffer.drain(0..bytes_to_read).collect::<Vec<_>>();
160 info!(
161 "Returning {} bytes, buffer remaining: {}",
162 data.len(),
163 buffer.len()
164 );
165 reply.data(&data);
166 return;
167 }
168
169 drop(buffer);
170 std::thread::sleep(std::time::Duration::from_millis(10));
171 }
172 }
173
174 fn release(
175 &mut self,
176 _req: &fuser::Request<'_>,
177 ino: u64,
178 fh: u64,
179 flags: i32,
180 lock_owner: Option<u64>,
181 flush: bool,
182 reply: fuser::ReplyEmpty,
183 ) {
184 info!(
185 "release(ino: {ino:#x?}, fh: {fh}, flags: {flags:#x?}, lock_owner: {lock_owner:?}, flush: {flush})"
186 );
187 self.end_stream();
188 reply.ok();
189 }
190}
191
192fn main() {
193 env_logger::init();
194 let device = CuseDevice::new();
195
196 // run cuse in a separate thread
197 let handle = thread::spawn(|| {
198 fuser::cuse(device).unwrap_or_else(|e| {
199 error!(
200 "failed to start cuse device: {e}. try run this example as privileged user"
201 );
202 std::process::exit(1);
203 })
204 });
205
206 // make the device readable without sudo
207 let output = Command::new("chmod")
208 .args(["644", &format!("/dev/{DEV}")])
209 .output();
210
211 match output {
212 Ok(result) if result.status.success() => info!("/dev/{DEV} is now 0644"),
213 _ => warn!("failed to execute chmod"),
214 }
215
216 let _ = handle.join();
217 info!("done");
218}