Buttplug sex toy control library
at dev 361 lines 11 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8use super::websocket_server_comm_manager::WebsocketServerDeviceCommManagerInitInfo; 9use async_trait::async_trait; 10use buttplug_core::{errors::ButtplugDeviceError, util::async_manager}; 11use buttplug_server::device::hardware::{ 12 GenericHardwareSpecializer, 13 Hardware, 14 HardwareConnector, 15 HardwareEvent, 16 HardwareInternal, 17 HardwareReadCmd, 18 HardwareReading, 19 HardwareSpecializer, 20 HardwareSubscribeCmd, 21 HardwareUnsubscribeCmd, 22 HardwareWriteCmd, 23}; 24use buttplug_server_device_config::{Endpoint, ProtocolCommunicationSpecifier, WebsocketSpecifier}; 25use futures::{ 26 FutureExt, 27 SinkExt, 28 StreamExt, 29 future::{self, BoxFuture}, 30}; 31use std::{ 32 fmt::{self, Debug}, 33 sync::{ 34 Arc, 35 atomic::{AtomicBool, Ordering}, 36 }, 37 time::Duration, 38}; 39use tokio::{ 40 net::TcpStream, 41 select, 42 sync::{ 43 Mutex, 44 broadcast, 45 mpsc::{Receiver, Sender, channel}, 46 }, 47 time::sleep, 48}; 49use tokio_util::sync::CancellationToken; 50 51async fn run_connection_loop( 52 address: &str, 53 event_sender: broadcast::Sender<HardwareEvent>, 54 ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>, 55 mut request_receiver: Receiver<Vec<u8>>, 56 response_sender: broadcast::Sender<Vec<u8>>, 57) { 58 info!("Starting websocket server connection event loop."); 59 60 let (mut websocket_server_sender, mut websocket_server_receiver) = ws_stream.split(); 61 62 // Start pong count at 1, so we'll clear it after sending our first ping. 63 let mut pong_count = 1u32; 64 65 loop { 66 select! { 67 _ = sleep(Duration::from_millis(10000)) => { 68 if pong_count == 0 { 69 error!("No pongs received, considering connection closed."); 70 break; 71 } 72 pong_count = 0; 73 if websocket_server_sender 74 .send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0).into())) 75 .await 76 .is_err() { 77 error!("Cannot send ping to client, considering connection closed."); 78 break; 79 } 80 } 81 ws_msg = request_receiver.recv() => { 82 if let Some(binary_msg) = ws_msg { 83 if websocket_server_sender 84 .send(tokio_tungstenite::tungstenite::Message::Binary(binary_msg.into())) 85 .await 86 .is_err() { 87 error!("Cannot send binary value to client, considering connection closed."); 88 break; 89 } 90 } else { 91 info!("Websocket server connector owner dropped, disconnecting websocket connection."); 92 break; 93 } 94 } 95 websocket_server_msg = websocket_server_receiver.next() => match websocket_server_msg { 96 Some(ws_data) => { 97 match ws_data { 98 Ok(msg) => { 99 match msg { 100 tokio_tungstenite::tungstenite::Message::Text(text_msg) => { 101 // If someone accidentally packs text, politely turn it into binary for them. 102 let _ = response_sender.send(text_msg.as_bytes().to_vec()); 103 } 104 tokio_tungstenite::tungstenite::Message::Binary(binary_msg) => { 105 // If no one is listening, ignore output. 106 let _ = response_sender.send(binary_msg.to_vec()); 107 } 108 tokio_tungstenite::tungstenite::Message::Close(_) => { 109 // Drop the error if no one receives the message, we're breaking anyways. 110 let _ = event_sender 111 .send(HardwareEvent::Disconnected( 112 address.to_owned() 113 )); 114 break; 115 } 116 tokio_tungstenite::tungstenite::Message::Ping(_) => { 117 // noop 118 continue; 119 } 120 tokio_tungstenite::tungstenite::Message::Frame(_) => { 121 // noop 122 continue; 123 } 124 tokio_tungstenite::tungstenite::Message::Pong(_) => { 125 pong_count += 1; 126 continue; 127 } 128 } 129 }, 130 Err(err) => { 131 error!("Error from websocket server, assuming disconnection: {:?}", err); 132 break; 133 } 134 } 135 }, 136 None => { 137 error!("Websocket channel closed, breaking"); 138 break; 139 } 140 } 141 } 142 } 143 144 if let Err(e) = websocket_server_sender.close().await { 145 error!("Error closing websocket: {}", e); 146 } 147 debug!("Exiting Websocket Server Device control loop."); 148} 149 150impl Debug for WebsocketServerHardwareConnector { 151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 152 f.debug_struct("WebsocketServerHardwareConnector") 153 .field("info", &self.info) 154 .finish() 155 } 156} 157 158pub struct WebsocketServerHardwareConnector { 159 info: WebsocketServerDeviceCommManagerInitInfo, 160 outgoing_sender: Sender<Vec<u8>>, 161 incoming_broadcaster: broadcast::Sender<Vec<u8>>, 162 device_event_sender: broadcast::Sender<HardwareEvent>, 163} 164 165impl WebsocketServerHardwareConnector { 166 pub fn new( 167 info: WebsocketServerDeviceCommManagerInitInfo, 168 ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>, 169 ) -> Self { 170 let (outgoing_sender, outgoing_receiver) = channel(256); 171 let (incoming_broadcaster, _) = broadcast::channel(256); 172 let incoming_broadcaster_clone = incoming_broadcaster.clone(); 173 let (device_event_sender, _) = broadcast::channel(256); 174 let device_event_sender_clone = device_event_sender.clone(); 175 let address = info.address().clone(); 176 tokio::spawn(async move { 177 run_connection_loop( 178 &address, 179 device_event_sender_clone, 180 ws_stream, 181 outgoing_receiver, 182 incoming_broadcaster_clone, 183 ) 184 .await; 185 }); 186 Self { 187 info, 188 outgoing_sender, 189 incoming_broadcaster, 190 device_event_sender, 191 } 192 } 193} 194 195#[async_trait] 196impl HardwareConnector for WebsocketServerHardwareConnector { 197 fn specifier(&self) -> ProtocolCommunicationSpecifier { 198 ProtocolCommunicationSpecifier::Websocket(WebsocketSpecifier::new(self.info.identifier())) 199 } 200 201 async fn connect(&mut self) -> Result<Box<dyn HardwareSpecializer>, ButtplugDeviceError> { 202 let hardware_internal = WebsocketServerHardware::new( 203 self.device_event_sender.clone(), 204 self.info.clone(), 205 self.outgoing_sender.clone(), 206 self.incoming_broadcaster.clone(), 207 ); 208 let hardware = Hardware::new( 209 self.info.identifier(), 210 self.info.address(), 211 &[Endpoint::Rx, Endpoint::Tx], 212 &None, 213 false, 214 Box::new(hardware_internal), 215 ); 216 Ok(Box::new(GenericHardwareSpecializer::new(hardware))) 217 } 218} 219 220pub struct WebsocketServerHardware { 221 connected: Arc<AtomicBool>, 222 subscribed: Arc<AtomicBool>, 223 subscribe_token: Arc<Mutex<Option<CancellationToken>>>, 224 info: WebsocketServerDeviceCommManagerInitInfo, 225 outgoing_sender: Sender<Vec<u8>>, 226 incoming_broadcaster: broadcast::Sender<Vec<u8>>, 227 device_event_sender: broadcast::Sender<HardwareEvent>, 228} 229 230impl WebsocketServerHardware { 231 pub fn new( 232 device_event_sender: broadcast::Sender<HardwareEvent>, 233 info: WebsocketServerDeviceCommManagerInitInfo, 234 outgoing_sender: Sender<Vec<u8>>, 235 incoming_broadcaster: broadcast::Sender<Vec<u8>>, 236 ) -> Self { 237 Self { 238 connected: Arc::new(AtomicBool::new(true)), 239 info, 240 outgoing_sender, 241 incoming_broadcaster, 242 device_event_sender, 243 subscribed: Arc::new(AtomicBool::new(false)), 244 subscribe_token: Arc::new(Mutex::new(None)), 245 } 246 } 247} 248 249impl HardwareInternal for WebsocketServerHardware { 250 fn event_stream(&self) -> broadcast::Receiver<HardwareEvent> { 251 self.device_event_sender.subscribe() 252 } 253 254 fn disconnect(&self) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 255 let connected = self.connected.clone(); 256 async move { 257 connected.store(false, Ordering::Relaxed); 258 Ok(()) 259 } 260 .boxed() 261 } 262 263 fn read_value( 264 &self, 265 _msg: &HardwareReadCmd, 266 ) -> BoxFuture<'static, Result<HardwareReading, ButtplugDeviceError>> { 267 future::ready(Err(ButtplugDeviceError::UnhandledCommand( 268 "Websocket Hardware does not support read".to_owned(), 269 ))) 270 .boxed() 271 } 272 273 fn write_value( 274 &self, 275 msg: &HardwareWriteCmd, 276 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 277 let sender = self.outgoing_sender.clone(); 278 let data = msg.data().clone(); 279 // TODO Should check endpoint validity 280 async move { 281 sender.send(data).await.map_err(|err| { 282 ButtplugDeviceError::DeviceCommunicationError(format!( 283 "Could not write value to websocket device: {err}" 284 )) 285 }) 286 } 287 .boxed() 288 } 289 290 fn subscribe( 291 &self, 292 _msg: &HardwareSubscribeCmd, 293 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 294 if self.subscribed.load(Ordering::Relaxed) { 295 error!("Endpoint already subscribed somehow!"); 296 return future::ready(Ok(())).boxed(); 297 } 298 // TODO Should check endpoint validity 299 let mut data_receiver = self.incoming_broadcaster.subscribe(); 300 let event_sender = self.device_event_sender.clone(); 301 let address = self.info.address().clone(); 302 let subscribed = self.subscribed.clone(); 303 let subscribed_token = self.subscribe_token.clone(); 304 async move { 305 subscribed.store(true, Ordering::Relaxed); 306 let token = CancellationToken::new(); 307 *(subscribed_token.lock().await) = Some(token.child_token()); 308 async_manager::spawn(async move { 309 loop { 310 select! { 311 result = data_receiver.recv().fuse() => { 312 match result { 313 Ok(data) => { 314 debug!("Got websocket data! {:?}", data); 315 // We don't really care if there's no one to send the error to here. 316 let _ = event_sender 317 .send(HardwareEvent::Notification( 318 address.clone(), 319 Endpoint::Tx, 320 data, 321 )); 322 }, 323 Err(_) => break, 324 } 325 }, 326 _ = token.cancelled().fuse() => { 327 break; 328 } 329 } 330 } 331 info!("Data channel closed, ending websocket server device listener task"); 332 }); 333 Ok(()) 334 } 335 .boxed() 336 } 337 338 fn unsubscribe( 339 &self, 340 _msg: &HardwareUnsubscribeCmd, 341 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 342 if self.subscribed.load(Ordering::Relaxed) { 343 let subscribed = self.subscribed.clone(); 344 let subscribed_token = self.subscribe_token.clone(); 345 async move { 346 subscribed.store(false, Ordering::Relaxed); 347 let token = (subscribed_token.lock().await) 348 .take() 349 .expect("If we were subscribed, we'll have a token."); 350 token.cancel(); 351 Ok(()) 352 } 353 .boxed() 354 } else { 355 future::ready(Err(ButtplugDeviceError::DeviceCommunicationError( 356 "Device not subscribed.".to_owned(), 357 ))) 358 .boxed() 359 } 360 } 361}