Buttplug sex toy control library
at dev 11 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2022 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8use buttplug_core::{ 9 connector::ButtplugConnector, 10 errors::ButtplugError, 11 message::ButtplugServerMessageV4, 12 util::{async_manager, stream::convert_broadcast_receiver_to_stream}, 13}; 14use buttplug_server::{ 15 ButtplugServer, ButtplugServerBuilder, 16 message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant}, 17}; 18use buttplug_server_device_config::UserDeviceIdentifier; 19use dashmap::DashSet; 20use futures::{FutureExt, Stream, StreamExt, future::Future, pin_mut, select}; 21use getset::Getters; 22use serde::{Deserialize, Serialize}; 23use std::sync::Arc; 24use thiserror::Error; 25use tokio::sync::{ 26 Notify, 27 broadcast::{self, Sender}, 28 mpsc, 29}; 30 31// Clone derived here to satisfy tokio broadcast requirements. 32#[derive(Clone, Debug, Serialize, Deserialize)] 33pub enum ButtplugRemoteServerEvent { 34 ClientConnected(String), 35 ClientDisconnected, 36 DeviceAdded { 37 index: u32, 38 identifier: UserDeviceIdentifier, 39 name: String, 40 display_name: Option<String>, 41 }, 42 DeviceRemoved { 43 index: u32, 44 }, 45 //DeviceCommand(ButtplugDeviceCommandMessageUnion) 46} 47 48#[derive(Error, Debug)] 49pub enum ButtplugServerConnectorError { 50 #[error("Cannot bring up server for connection: {0}")] 51 ConnectorError(String), 52} 53 54#[derive(Getters)] 55pub struct ButtplugRemoteServer { 56 #[getset(get = "pub")] 57 server: Arc<ButtplugServer>, 58 #[getset(get = "pub")] 59 event_sender: broadcast::Sender<ButtplugRemoteServerEvent>, 60 disconnect_notifier: Arc<Notify>, 61} 62 63async fn run_device_event_stream( 64 server: Arc<ButtplugServer>, 65 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>, 66) { 67 let server_receiver = server.server_version_event_stream(); 68 let known_indexes = DashSet::<u32>::default(); 69 70 pin_mut!(server_receiver); 71 loop { 72 match server_receiver.next().await { 73 None => { 74 info!("Server disconnected via server disappearance, exiting loop."); 75 break; 76 } 77 Some(msg) => { 78 if let ButtplugServerMessageV4::DeviceList(dl) = msg 79 && remote_event_sender.receiver_count() > 0 80 { 81 for da in dl.devices() { 82 if known_indexes.contains(&da.1.device_index()) { 83 continue; 84 } 85 if let Some(device_info) = server.device_manager().device_info(da.1.device_index()) { 86 let added_event = ButtplugRemoteServerEvent::DeviceAdded { 87 index: da.1.device_index(), 88 name: da.1.device_name().clone(), 89 identifier: device_info.identifier().clone(), 90 display_name: device_info.display_name().clone(), 91 }; 92 if remote_event_sender.send(added_event).is_err() { 93 error!( 94 "Cannot send event to owner, dropping and assuming local server thread has exited." 95 ); 96 } 97 known_indexes.insert(da.1.device_index()); 98 } 99 } 100 let indexes = known_indexes.clone(); 101 let current_indexes: Vec<u32> = dl.devices().keys().cloned().collect(); 102 for dr in indexes { 103 if current_indexes.contains(&dr) { 104 continue; 105 } 106 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr }; 107 if remote_event_sender.send(removed_event).is_err() { 108 error!( 109 "Cannot send event to owner, dropping and assuming local server thread has exited." 110 ); 111 } 112 known_indexes.remove(&dr); 113 } 114 } 115 } 116 } 117 } 118} 119 120async fn run_server<ConnectorType>( 121 server: Arc<ButtplugServer>, 122 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>, 123 connector: ConnectorType, 124 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>, 125 disconnect_notifier: Arc<Notify>, 126) where 127 ConnectorType: 128 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static, 129{ 130 info!("Starting remote server loop"); 131 let shared_connector = Arc::new(connector); 132 let server_receiver = server.server_version_event_stream(); 133 let client_version_receiver = server.event_stream(); 134 pin_mut!(server_receiver); 135 pin_mut!(client_version_receiver); 136 loop { 137 select! { 138 connector_msg = connector_receiver.recv().fuse() => match connector_msg { 139 None => { 140 info!("Connector disconnected, exiting loop."); 141 if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() { 142 warn!("Cannot update remote about client disconnection"); 143 } 144 break; 145 } 146 Some(client_message) => { 147 trace!("Got message from connector: {:?}", client_message); 148 let server_clone = server.clone(); 149 let connected = server_clone.connected(); 150 let connector_clone = shared_connector.clone(); 151 let remote_event_sender_clone = remote_event_sender.clone(); 152 async_manager::spawn(async move { 153 match server_clone.parse_message(client_message.clone()).await { 154 Ok(ret_msg) => { 155 // Only send event if we just connected. Sucks to check it on every message but the boolean check should be quick. 156 if !connected && server_clone.connected() 157 && remote_event_sender_clone.receiver_count() > 0 158 && remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() { 159 error!("Cannot send event to owner, dropping and assuming local server thread has exited."); 160 } 161 if connector_clone.send(ret_msg).await.is_err() { 162 error!("Cannot send reply to server, dropping and assuming remote server thread has exited."); 163 } 164 }, 165 Err(err_msg) => { 166 if connector_clone.send(err_msg).await.is_err() { 167 error!("Cannot send reply to server, dropping and assuming remote server thread has exited."); 168 } 169 } 170 } 171 }); 172 } 173 }, 174 _ = disconnect_notifier.notified().fuse() => { 175 info!("Server disconnected via controller disappearance, exiting loop."); 176 break; 177 }, 178 server_msg = server_receiver.next().fuse() => match server_msg { 179 None => { 180 info!("Server disconnected via server disappearance, exiting loop."); 181 break; 182 } 183 Some(msg) => { 184 /* 185 if remote_event_sender.receiver_count() > 0 { 186 match &msg { 187 ButtplugServerMessageV4::DeviceAdded(da) => { 188 if let Some(device_info) = server.device_manager().device_info(da.device_index()) { 189 let added_event = ButtplugRemoteServerEvent::DeviceAdded { index: da.device_index(), name: da.device_name().clone(), identifier: device_info.identifier().clone().into(), display_name: device_info.display_name().clone() }; 190 if remote_event_sender.send(added_event).is_err() { 191 error!("Cannot send event to owner, dropping and assuming local server thread has exited."); 192 } 193 } 194 }, 195 ButtplugServerMessageV4::DeviceRemoved(dr) => { 196 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr.device_index() }; 197 if remote_event_sender.send(removed_event).is_err() { 198 error!("Cannot send event to owner, dropping and assuming local server thread has exited."); 199 } 200 }, 201 _ => {} 202 } 203 } 204 */ 205 } 206 }, 207 client_msg = client_version_receiver.next().fuse() => match client_msg { 208 None => { 209 info!("Server disconnected via server disappearance, exiting loop."); 210 break; 211 } 212 Some(msg) => { 213 let connector_clone = shared_connector.clone(); 214 if connector_clone.send(msg).await.is_err() { 215 error!("Server disappeared, exiting remote server thread."); 216 } 217 } 218 } 219 }; 220 } 221 if let Err(err) = server.disconnect().await { 222 error!("Error disconnecting server: {:?}", err); 223 } 224 info!("Exiting remote server loop"); 225} 226 227impl Default for ButtplugRemoteServer { 228 fn default() -> Self { 229 Self::new( 230 ButtplugServerBuilder::default() 231 .finish() 232 .expect("Default is infallible"), 233 &None, 234 ) 235 } 236} 237 238impl ButtplugRemoteServer { 239 pub fn new( 240 server: ButtplugServer, 241 event_sender: &Option<Sender<ButtplugRemoteServerEvent>>, 242 ) -> Self { 243 let event_sender = if let Some(sender) = event_sender { 244 sender.clone() 245 } else { 246 broadcast::channel(256).0 247 }; 248 // Thanks to the existence of the backdoor server, device updates can happen for the lifetime to 249 // the RemoteServer instance, not just during client connect. We need to make sure these are 250 // emitted to the frontend. 251 let server = Arc::new(server); 252 { 253 let server = server.clone(); 254 tokio::spawn({ 255 let server = server; 256 let event_sender = event_sender.clone(); 257 async move { 258 run_device_event_stream(server, event_sender).await; 259 } 260 }); 261 } 262 Self { 263 event_sender, 264 server, 265 disconnect_notifier: Arc::new(Notify::new()), 266 } 267 } 268 269 pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> + use<> { 270 convert_broadcast_receiver_to_stream(self.event_sender.subscribe()) 271 } 272 273 pub fn start<ConnectorType>( 274 &self, 275 mut connector: ConnectorType, 276 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> + use<ConnectorType> 277 where 278 ConnectorType: 279 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static, 280 { 281 let server = self.server.clone(); 282 let event_sender = self.event_sender.clone(); 283 let disconnect_notifier = self.disconnect_notifier.clone(); 284 async move { 285 let (connector_sender, connector_receiver) = mpsc::channel(256); 286 // Due to the connect method requiring a mutable connector, we must connect before starting up 287 // our server loop. Anything that needs to happen outside of the client connection session 288 // should happen around this. This flow is locked. 289 connector 290 .connect(connector_sender) 291 .await 292 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?; 293 run_server( 294 server, 295 event_sender, 296 connector, 297 connector_receiver, 298 disconnect_notifier, 299 ) 300 .await; 301 Ok(()) 302 } 303 } 304 305 pub async fn disconnect(&self) -> Result<(), ButtplugError> { 306 self.disconnect_notifier.notify_waiters(); 307 Ok(()) 308 } 309 310 pub async fn shutdown(&self) -> Result<(), ButtplugError> { 311 self.server.shutdown().await?; 312 Ok(()) 313 } 314} 315 316impl Drop for ButtplugRemoteServer { 317 fn drop(&mut self) { 318 self.disconnect_notifier.notify_waiters(); 319 } 320}