// Buttplug Rust Source Code File - See https://buttplug.io for more info. // // Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. // // Licensed under the BSD 3-Clause license. See LICENSE file in the project root // for full license information. use super::websocket_server_comm_manager::WebsocketServerDeviceCommManagerInitInfo; use async_trait::async_trait; use buttplug_core::{errors::ButtplugDeviceError, util::async_manager}; use buttplug_server::device::hardware::{ GenericHardwareSpecializer, Hardware, HardwareConnector, HardwareEvent, HardwareInternal, HardwareReadCmd, HardwareReading, HardwareSpecializer, HardwareSubscribeCmd, HardwareUnsubscribeCmd, HardwareWriteCmd, }; use buttplug_server_device_config::{Endpoint, ProtocolCommunicationSpecifier, WebsocketSpecifier}; use futures::{ FutureExt, SinkExt, StreamExt, future::{self, BoxFuture}, }; use std::{ fmt::{self, Debug}, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, time::Duration, }; use tokio::{ net::TcpStream, select, sync::{ Mutex, broadcast, mpsc::{Receiver, Sender, channel}, }, time::sleep, }; use tokio_util::sync::CancellationToken; async fn run_connection_loop( address: &str, event_sender: broadcast::Sender, ws_stream: tokio_tungstenite::WebSocketStream, mut request_receiver: Receiver>, response_sender: broadcast::Sender>, ) { info!("Starting websocket server connection event loop."); let (mut websocket_server_sender, mut websocket_server_receiver) = ws_stream.split(); // Start pong count at 1, so we'll clear it after sending our first ping. let mut pong_count = 1u32; loop { select! { _ = sleep(Duration::from_millis(10000)) => { if pong_count == 0 { error!("No pongs received, considering connection closed."); break; } pong_count = 0; if websocket_server_sender .send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0).into())) .await .is_err() { error!("Cannot send ping to client, considering connection closed."); break; } } ws_msg = request_receiver.recv() => { if let Some(binary_msg) = ws_msg { if websocket_server_sender .send(tokio_tungstenite::tungstenite::Message::Binary(binary_msg.into())) .await .is_err() { error!("Cannot send binary value to client, considering connection closed."); break; } } else { info!("Websocket server connector owner dropped, disconnecting websocket connection."); break; } } websocket_server_msg = websocket_server_receiver.next() => match websocket_server_msg { Some(ws_data) => { match ws_data { Ok(msg) => { match msg { tokio_tungstenite::tungstenite::Message::Text(text_msg) => { // If someone accidentally packs text, politely turn it into binary for them. let _ = response_sender.send(text_msg.as_bytes().to_vec()); } tokio_tungstenite::tungstenite::Message::Binary(binary_msg) => { // If no one is listening, ignore output. let _ = response_sender.send(binary_msg.to_vec()); } tokio_tungstenite::tungstenite::Message::Close(_) => { // Drop the error if no one receives the message, we're breaking anyways. let _ = event_sender .send(HardwareEvent::Disconnected( address.to_owned() )); break; } tokio_tungstenite::tungstenite::Message::Ping(_) => { // noop continue; } tokio_tungstenite::tungstenite::Message::Frame(_) => { // noop continue; } tokio_tungstenite::tungstenite::Message::Pong(_) => { pong_count += 1; continue; } } }, Err(err) => { error!("Error from websocket server, assuming disconnection: {:?}", err); break; } } }, None => { error!("Websocket channel closed, breaking"); break; } } } } if let Err(e) = websocket_server_sender.close().await { error!("Error closing websocket: {}", e); } debug!("Exiting Websocket Server Device control loop."); } impl Debug for WebsocketServerHardwareConnector { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WebsocketServerHardwareConnector") .field("info", &self.info) .finish() } } pub struct WebsocketServerHardwareConnector { info: WebsocketServerDeviceCommManagerInitInfo, outgoing_sender: Sender>, incoming_broadcaster: broadcast::Sender>, device_event_sender: broadcast::Sender, } impl WebsocketServerHardwareConnector { pub fn new( info: WebsocketServerDeviceCommManagerInitInfo, ws_stream: tokio_tungstenite::WebSocketStream, ) -> Self { let (outgoing_sender, outgoing_receiver) = channel(256); let (incoming_broadcaster, _) = broadcast::channel(256); let incoming_broadcaster_clone = incoming_broadcaster.clone(); let (device_event_sender, _) = broadcast::channel(256); let device_event_sender_clone = device_event_sender.clone(); let address = info.address().clone(); tokio::spawn(async move { run_connection_loop( &address, device_event_sender_clone, ws_stream, outgoing_receiver, incoming_broadcaster_clone, ) .await; }); Self { info, outgoing_sender, incoming_broadcaster, device_event_sender, } } } #[async_trait] impl HardwareConnector for WebsocketServerHardwareConnector { fn specifier(&self) -> ProtocolCommunicationSpecifier { ProtocolCommunicationSpecifier::Websocket(WebsocketSpecifier::new(self.info.identifier())) } async fn connect(&mut self) -> Result, ButtplugDeviceError> { let hardware_internal = WebsocketServerHardware::new( self.device_event_sender.clone(), self.info.clone(), self.outgoing_sender.clone(), self.incoming_broadcaster.clone(), ); let hardware = Hardware::new( self.info.identifier(), self.info.address(), &[Endpoint::Rx, Endpoint::Tx], &None, false, Box::new(hardware_internal), ); Ok(Box::new(GenericHardwareSpecializer::new(hardware))) } } pub struct WebsocketServerHardware { connected: Arc, subscribed: Arc, subscribe_token: Arc>>, info: WebsocketServerDeviceCommManagerInitInfo, outgoing_sender: Sender>, incoming_broadcaster: broadcast::Sender>, device_event_sender: broadcast::Sender, } impl WebsocketServerHardware { pub fn new( device_event_sender: broadcast::Sender, info: WebsocketServerDeviceCommManagerInitInfo, outgoing_sender: Sender>, incoming_broadcaster: broadcast::Sender>, ) -> Self { Self { connected: Arc::new(AtomicBool::new(true)), info, outgoing_sender, incoming_broadcaster, device_event_sender, subscribed: Arc::new(AtomicBool::new(false)), subscribe_token: Arc::new(Mutex::new(None)), } } } impl HardwareInternal for WebsocketServerHardware { fn event_stream(&self) -> broadcast::Receiver { self.device_event_sender.subscribe() } fn disconnect(&self) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { let connected = self.connected.clone(); async move { connected.store(false, Ordering::Relaxed); Ok(()) } .boxed() } fn read_value( &self, _msg: &HardwareReadCmd, ) -> BoxFuture<'static, Result> { future::ready(Err(ButtplugDeviceError::UnhandledCommand( "Websocket Hardware does not support read".to_owned(), ))) .boxed() } fn write_value( &self, msg: &HardwareWriteCmd, ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { let sender = self.outgoing_sender.clone(); let data = msg.data().clone(); // TODO Should check endpoint validity async move { sender.send(data).await.map_err(|err| { ButtplugDeviceError::DeviceCommunicationError(format!( "Could not write value to websocket device: {err}" )) }) } .boxed() } fn subscribe( &self, _msg: &HardwareSubscribeCmd, ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { if self.subscribed.load(Ordering::Relaxed) { error!("Endpoint already subscribed somehow!"); return future::ready(Ok(())).boxed(); } // TODO Should check endpoint validity let mut data_receiver = self.incoming_broadcaster.subscribe(); let event_sender = self.device_event_sender.clone(); let address = self.info.address().clone(); let subscribed = self.subscribed.clone(); let subscribed_token = self.subscribe_token.clone(); async move { subscribed.store(true, Ordering::Relaxed); let token = CancellationToken::new(); *(subscribed_token.lock().await) = Some(token.child_token()); async_manager::spawn(async move { loop { select! { result = data_receiver.recv().fuse() => { match result { Ok(data) => { debug!("Got websocket data! {:?}", data); // We don't really care if there's no one to send the error to here. let _ = event_sender .send(HardwareEvent::Notification( address.clone(), Endpoint::Tx, data, )); }, Err(_) => break, } }, _ = token.cancelled().fuse() => { break; } } } info!("Data channel closed, ending websocket server device listener task"); }); Ok(()) } .boxed() } fn unsubscribe( &self, _msg: &HardwareUnsubscribeCmd, ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { if self.subscribed.load(Ordering::Relaxed) { let subscribed = self.subscribed.clone(); let subscribed_token = self.subscribe_token.clone(); async move { subscribed.store(false, Ordering::Relaxed); let token = (subscribed_token.lock().await) .take() .expect("If we were subscribed, we'll have a token."); token.cancel(); Ok(()) } .boxed() } else { future::ready(Err(ButtplugDeviceError::DeviceCommunicationError( "Device not subscribed.".to_owned(), ))) .boxed() } } }