Buttplug sex toy control library
at dev 17 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8//! Communications API for accessing Buttplug Servers 9pub mod client_event_loop; 10pub mod client_message_sorter; 11pub mod connector; 12pub mod device; 13pub mod serializer; 14 15use buttplug_core::{ 16 connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorFuture}, 17 errors::{ButtplugError, ButtplugHandshakeError}, 18 message::{ 19 BUTTPLUG_CURRENT_API_MAJOR_VERSION, 20 BUTTPLUG_CURRENT_API_MINOR_VERSION, 21 ButtplugClientMessageV4, 22 ButtplugServerMessageV4, 23 PingV0, 24 RequestDeviceListV0, 25 RequestServerInfoV4, 26 StartScanningV0, 27 StopAllDevicesV0, 28 StopScanningV0, 29 }, 30 util::{ 31 async_manager, 32 future::{ButtplugFuture, ButtplugFutureStateShared}, 33 stream::convert_broadcast_receiver_to_stream, 34 }, 35}; 36use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest}; 37use dashmap::DashMap; 38pub use device::{ButtplugClientDevice, ButtplugClientDeviceEvent}; 39use futures::{ 40 Stream, 41 future::{self, BoxFuture, FutureExt}, 42}; 43use log::*; 44use std::{ 45 collections::BTreeMap, 46 sync::{ 47 Arc, 48 atomic::{AtomicBool, Ordering}, 49 }, 50}; 51use strum_macros::Display; 52use thiserror::Error; 53use tokio::sync::{Mutex, broadcast, mpsc}; 54use tracing_futures::Instrument; 55 56/// Result type used for public APIs. 57/// 58/// Allows us to differentiate between an issue with the connector (as a 59/// [ButtplugConnectorError]) and an issue within Buttplug (as a 60/// [ButtplugError]). 61type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>; 62type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>; 63 64/// Result type used for passing server responses. 65pub type ButtplugServerMessageResult = ButtplugClientResult<ButtplugServerMessageV4>; 66pub type ButtplugServerMessageResultFuture = ButtplugClientResultFuture<ButtplugServerMessageV4>; 67/// Future state type for returning server responses across futures. 68pub(crate) type ButtplugServerMessageStateShared = 69 ButtplugFutureStateShared<ButtplugServerMessageResult>; 70/// Future type that expects server responses. 71pub(crate) type ButtplugServerMessageFuture = ButtplugFuture<ButtplugServerMessageResult>; 72 73/// Future state for messages sent from the client that expect a server response. 74/// 75/// When a message is sent from the client and expects a response from the server, we'd like to know 76/// when that response arrives, and usually we'll want to wait for it. We can do so by creating a 77/// future that will be resolved when a response is received from the server. 78/// 79/// To do this, we build a [ButtplugFuture], then take its waker and pass it along with the message 80/// we send to the connector, using the [ButtplugClientMessageFuturePair] type. We can then expect 81/// the connector to get the response from the server, match it with our message (using something 82/// like the ClientMessageSorter, an internal structure in the Buttplug library), and set the reply 83/// in the waker we've sent along. This will resolve the future we're waiting on and allow us to 84/// continue execution. 85#[derive(Clone)] 86pub struct ButtplugClientMessageFuturePair { 87 msg: ButtplugClientMessageV4, 88 waker: ButtplugServerMessageStateShared, 89} 90 91impl ButtplugClientMessageFuturePair { 92 pub fn new(msg: ButtplugClientMessageV4, waker: ButtplugServerMessageStateShared) -> Self { 93 Self { msg, waker } 94 } 95} 96 97/// Represents all of the different types of errors a ButtplugClient can return. 98/// 99/// Clients can return two types of errors: 100/// 101/// - [ButtplugConnectorError], which means there was a problem with the connection between the 102/// client and the server, like a network connection issue. 103/// - [ButtplugError], which is an error specific to the Buttplug Protocol. 104#[derive(Debug, Error, Display)] 105pub enum ButtplugClientError { 106 /// Connector error 107 #[error(transparent)] 108 ButtplugConnectorError(#[from] ButtplugConnectorError), 109 /// Protocol error 110 #[error(transparent)] 111 ButtplugError(#[from] ButtplugError), 112 /// Error converting output command: {} 113 ButtplugOutputCommandConversionError(String), 114} 115 116/// Enum representing different events that can be emitted by a client. 117/// 118/// These events are created by the server and sent to the client, and represent 119/// unrequested actions that the client will need to respond to, or that 120/// applications using the client may be interested in. 121#[derive(Clone, Debug)] 122pub enum ButtplugClientEvent { 123 /// Emitted when a scanning session (started via a StartScanning call on 124 /// [ButtplugClient]) has finished. 125 ScanningFinished, 126 /// Emitted when a device has been added to the server. Includes a 127 /// [ButtplugClientDevice] object representing the device. 128 DeviceAdded(ButtplugClientDevice), 129 /// Emitted when a device has been removed from the server. Includes a 130 /// [ButtplugClientDevice] object representing the device. 131 DeviceRemoved(ButtplugClientDevice), 132 /// Emitted when a client has not pinged the server in a sufficient amount of 133 /// time. 134 PingTimeout, 135 /// Emitted when the client successfully connects to a server. 136 ServerConnect, 137 /// Emitted when a client connector detects that the server has disconnected. 138 ServerDisconnect, 139 /// Emitted when an error that cannot be matched to a request is received from 140 /// the server. 141 Error(ButtplugError), 142} 143 144impl Unpin for ButtplugClientEvent { 145} 146 147pub(crate) fn create_boxed_future_client_error<T>( 148 err: ButtplugError, 149) -> ButtplugClientResultFuture<T> 150where 151 T: 'static + Send + Sync, 152{ 153 future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed() 154} 155 156#[derive(Clone, Debug)] 157pub(crate) struct ButtplugClientMessageSender { 158 message_sender: broadcast::Sender<ButtplugClientRequest>, 159 connected: Arc<AtomicBool>, 160} 161 162impl ButtplugClientMessageSender { 163 fn new( 164 message_sender: &broadcast::Sender<ButtplugClientRequest>, 165 connected: &Arc<AtomicBool>, 166 ) -> Self { 167 Self { 168 message_sender: message_sender.clone(), 169 connected: connected.clone(), 170 } 171 } 172 173 /// Send message to the internal event loop. 174 /// 175 /// Mostly for handling boilerplate around possible send errors. 176 pub fn send_message_to_event_loop( 177 &self, 178 msg: ButtplugClientRequest, 179 ) -> BoxFuture<'static, Result<(), ButtplugClientError>> { 180 // If we're running the event loop, we should have a message_sender. 181 // Being connected to the server doesn't matter here yet because we use 182 // this function in order to connect also. 183 // 184 // The message sender doesn't require an async send now, but we still want 185 // to delay execution as part of our future in order to keep task coherency. 186 let message_sender = self.message_sender.clone(); 187 async move { 188 message_sender 189 .send(msg) 190 .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?; 191 Ok(()) 192 } 193 .boxed() 194 } 195 196 pub fn subscribe(&self) -> broadcast::Receiver<ButtplugClientRequest> { 197 self.message_sender.subscribe() 198 } 199 200 pub fn send_message(&self, msg: ButtplugClientMessageV4) -> ButtplugServerMessageResultFuture { 201 if !self.connected.load(Ordering::Relaxed) { 202 future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed() 203 } else { 204 self.send_message_ignore_connect_status(msg) 205 } 206 } 207 208 /// Sends a ButtplugMessage from client to server. Expects to receive a ButtplugMessage back from 209 /// the server. 210 pub fn send_message_ignore_connect_status( 211 &self, 212 msg: ButtplugClientMessageV4, 213 ) -> ButtplugServerMessageResultFuture { 214 // Create a future to pair with the message being resolved. 215 let fut = ButtplugServerMessageFuture::default(); 216 let internal_msg = ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new( 217 msg, 218 fut.get_state_clone(), 219 )); 220 221 // Send message to internal loop and wait for return. 222 let send_fut = self.send_message_to_event_loop(internal_msg); 223 async move { 224 send_fut.await?; 225 fut.await 226 } 227 .boxed() 228 } 229 230 /// Sends a ButtplugMessage from client to server. Expects to receive an [Ok] 231 /// type ButtplugMessage back from the server. 232 pub fn send_message_expect_ok(&self, msg: ButtplugClientMessageV4) -> ButtplugClientResultFuture { 233 let send_fut = self.send_message(msg); 234 async move { send_fut.await.map(|_| ()) }.boxed() 235 } 236} 237 238/// Struct used by applications to communicate with a Buttplug Server. 239/// 240/// Buttplug Clients provide an API layer on top of the Buttplug Protocol that 241/// handles boring things like message creation and pairing, protocol ordering, 242/// etc... This allows developers to concentrate on controlling hardware with 243/// the API. 244/// 245/// Clients serve a few different purposes: 246/// - Managing connections to servers, thru [ButtplugConnector]s 247/// - Emitting events received from the Server 248/// - Holding state related to the server (i.e. what devices are currently 249/// connected, etc...) 250/// 251/// Clients are created by the [ButtplugClient::new()] method, which also 252/// handles spinning up the event loop and connecting the client to the server. 253/// Closures passed to the run() method can access and use the Client object. 254pub struct ButtplugClient { 255 /// The client name. Depending on the connection type and server being used, 256 /// this name is sometimes shown on the server logs or GUI. 257 client_name: String, 258 /// The server name that we're current connected to. 259 server_name: Arc<Mutex<Option<String>>>, 260 event_stream: broadcast::Sender<ButtplugClientEvent>, 261 // Sender to relay messages to the internal client loop 262 message_sender: ButtplugClientMessageSender, 263 connected: Arc<AtomicBool>, 264 device_map: Arc<DashMap<u32, ButtplugClientDevice>>, 265} 266 267impl ButtplugClient { 268 pub fn new(name: &str) -> Self { 269 let (message_sender, _) = broadcast::channel(256); 270 let (event_stream, _) = broadcast::channel(256); 271 let connected = Arc::new(AtomicBool::new(false)); 272 Self { 273 client_name: name.to_owned(), 274 server_name: Arc::new(Mutex::new(None)), 275 event_stream, 276 message_sender: ButtplugClientMessageSender::new(&message_sender, &connected), 277 connected, 278 device_map: Arc::new(DashMap::new()), 279 } 280 } 281 282 pub async fn connect<ConnectorType>( 283 &self, 284 mut connector: ConnectorType, 285 ) -> Result<(), ButtplugClientError> 286 where 287 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static, 288 { 289 if self.connected() { 290 return Err(ButtplugClientError::ButtplugConnectorError( 291 ButtplugConnectorError::ConnectorAlreadyConnected, 292 )); 293 } 294 295 // If connect is being called again, clear out the device map and start over. 296 self.device_map.clear(); 297 298 info!("Connecting to server."); 299 let (connector_sender, connector_receiver) = mpsc::channel(256); 300 connector.connect(connector_sender).await.map_err(|e| { 301 error!("Connection to server failed: {:?}", e); 302 ButtplugClientError::from(e) 303 })?; 304 info!("Connection to server succeeded."); 305 let mut client_event_loop = ButtplugClientEventLoop::new( 306 self.connected.clone(), 307 connector, 308 connector_receiver, 309 self.event_stream.clone(), 310 self.message_sender.clone(), 311 self.device_map.clone(), 312 ); 313 314 // Start the event loop before we run the handshake. 315 async_manager::spawn( 316 async move { 317 client_event_loop.run().await; 318 } 319 .instrument(tracing::info_span!("Client Loop Span")), 320 ); 321 self.run_handshake().await 322 } 323 324 /// Creates the ButtplugClient instance and tries to establish a connection. 325 /// 326 /// Takes all of the components needed to build a [ButtplugClient], creates 327 /// the struct, then tries to run connect and execute the Buttplug protocol 328 /// handshake. Will return a connected and ready to use ButtplugClient is all 329 /// goes well. 330 async fn run_handshake(&self) -> ButtplugClientResult { 331 // Run our handshake 332 info!("Running handshake with server."); 333 let msg = self 334 .message_sender 335 .send_message_ignore_connect_status( 336 RequestServerInfoV4::new( 337 &self.client_name, 338 BUTTPLUG_CURRENT_API_MAJOR_VERSION, 339 BUTTPLUG_CURRENT_API_MINOR_VERSION, 340 ) 341 .into(), 342 ) 343 .await?; 344 345 debug!("Got ServerInfo return."); 346 if let ButtplugServerMessageV4::ServerInfo(server_info) = msg { 347 info!("Connected to {}", server_info.server_name()); 348 *self.server_name.lock().await = Some(server_info.server_name().clone()); 349 // Don't set ourselves as connected until after ServerInfo has been 350 // received. This means we avoid possible races with the RequestServerInfo 351 // handshake. 352 self.connected.store(true, Ordering::Relaxed); 353 354 // Get currently connected devices. The event loop will 355 // handle sending the message and getting the return, and 356 // will send the client updates as events. 357 let msg = self 358 .message_sender 359 .send_message(RequestDeviceListV0::default().into()) 360 .await?; 361 if let ButtplugServerMessageV4::DeviceList(m) = msg { 362 self 363 .message_sender 364 .send_message_to_event_loop(ButtplugClientRequest::HandleDeviceList(m)) 365 .await?; 366 } 367 Ok(()) 368 } else { 369 self.disconnect().await?; 370 Err(ButtplugClientError::ButtplugError( 371 ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{msg:?}")).into(), 372 )) 373 } 374 } 375 376 /// Returns true if client is currently connected. 377 pub fn connected(&self) -> bool { 378 self.connected.load(Ordering::Relaxed) 379 } 380 381 /// Disconnects from server, if connected. 382 /// 383 /// Returns Err(ButtplugClientError) if disconnection fails. It can be assumed 384 /// that even on failure, the client will be disconnected. 385 pub fn disconnect(&self) -> ButtplugClientResultFuture { 386 if !self.connected() { 387 return future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed(); 388 } 389 // Send the connector to the internal loop for management. Once we throw 390 // the connector over, the internal loop will handle connecting and any 391 // further communications with the server, if connection is successful. 392 let fut = ButtplugConnectorFuture::default(); 393 let msg = ButtplugClientRequest::Disconnect(fut.get_state_clone()); 394 let send_fut = self.message_sender.send_message_to_event_loop(msg); 395 let connected = self.connected.clone(); 396 async move { 397 connected.store(false, Ordering::Relaxed); 398 send_fut.await?; 399 Ok(()) 400 } 401 .boxed() 402 } 403 404 /// Tells server to start scanning for devices. 405 /// 406 /// Returns Err([ButtplugClientError]) if request fails due to issues with 407 /// DeviceManagers on the server, disconnection, etc. 408 pub fn start_scanning(&self) -> ButtplugClientResultFuture { 409 self 410 .message_sender 411 .send_message_expect_ok(StartScanningV0::default().into()) 412 } 413 414 /// Tells server to stop scanning for devices. 415 /// 416 /// Returns Err([ButtplugClientError]) if request fails due to issues with 417 /// DeviceManagers on the server, disconnection, etc. 418 pub fn stop_scanning(&self) -> ButtplugClientResultFuture { 419 self 420 .message_sender 421 .send_message_expect_ok(StopScanningV0::default().into()) 422 } 423 424 /// Tells server to stop all devices. 425 /// 426 /// Returns Err([ButtplugClientError]) if request fails due to issues with 427 /// DeviceManagers on the server, disconnection, etc. 428 pub fn stop_all_devices(&self) -> ButtplugClientResultFuture { 429 self 430 .message_sender 431 .send_message_expect_ok(StopAllDevicesV0::default().into()) 432 } 433 434 pub fn event_stream(&self) -> impl Stream<Item = ButtplugClientEvent> + use<> { 435 let stream = convert_broadcast_receiver_to_stream(self.event_stream.subscribe()); 436 // We can either Box::pin here or force the user to pin_mut!() on their 437 // end. While this does end up with a dynamic dispatch on our end, it 438 // still makes the API nicer for the user, so we'll just eat the perf hit. 439 // Not to mention, this is not a high throughput system really, so it 440 // shouldn't matter. 441 Box::pin(stream) 442 } 443 444 /// Retreives a list of currently connected devices. 445 pub fn devices(&self) -> BTreeMap<u32, ButtplugClientDevice> { 446 self 447 .device_map 448 .iter() 449 .map(|map_pair| (*map_pair.key(), map_pair.value().clone())) 450 .collect() 451 } 452 453 pub fn ping(&self) -> ButtplugClientResultFuture { 454 let ping_fut = self 455 .message_sender 456 .send_message_expect_ok(PingV0::default().into()); 457 ping_fut.boxed() 458 } 459 460 pub fn server_name(&self) -> Option<String> { 461 // We'd have to be calling server_name in an extremely tight, asynchronous 462 // loop for this to return None, so we'll treat this as lockless. 463 // 464 // Dear users actually reading this code: This is not an invitation for you 465 // to get the server name in a tight, asynchronous loop. This will never 466 // change throughout the life to the connection. 467 if let Ok(name) = self.server_name.try_lock() { 468 name.clone() 469 } else { 470 None 471 } 472 } 473}