Buttplug sex toy control library
at dev 5.2 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8use buttplug_core::{ 9 connector::ButtplugConnector, 10 errors::ButtplugError, 11 message::{ButtplugMessage, ButtplugMessageValidator, ErrorV0}, 12 util::async_manager, 13}; 14use buttplug_server::{ 15 ButtplugServer, 16 ButtplugServerBuilder, 17 message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant}, 18}; 19use futures::{FutureExt, StreamExt, future::Future, pin_mut, select}; 20use log::*; 21use std::sync::Arc; 22use thiserror::Error; 23use tokio::sync::{Notify, mpsc}; 24 25#[derive(Error, Debug)] 26pub enum ButtplugServerConnectorError { 27 #[error("Cannot bring up server for connection: {0}")] 28 ConnectorError(String), 29} 30 31pub struct ButtplugTestServer { 32 server: Arc<ButtplugServer>, 33 disconnect_notifier: Arc<Notify>, 34} 35 36async fn run_server<ConnectorType>( 37 server: Arc<ButtplugServer>, 38 connector: ConnectorType, 39 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>, 40 disconnect_notifier: Arc<Notify>, 41) where 42 ConnectorType: 43 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static, 44{ 45 info!("Starting remote server loop"); 46 let shared_connector = Arc::new(connector); 47 let server_receiver = server.event_stream(); 48 pin_mut!(server_receiver); 49 loop { 50 select! { 51 connector_msg = connector_receiver.recv().fuse() => match connector_msg { 52 None => { 53 info!("Connector disconnected, exiting loop."); 54 break; 55 } 56 Some(client_message) => { 57 trace!("Got message from connector: {:?}", client_message); 58 let server_clone = server.clone(); 59 let connector_clone = shared_connector.clone(); 60 async_manager::spawn(async move { 61 if let Err(e) = client_message.is_valid() { 62 error!("Message not valid: {:?} - Error: {}", client_message, e); 63 let mut err_msg = ErrorV0::from(ButtplugError::from(e)); 64 err_msg.set_id(client_message.id()); 65 let _ = connector_clone.send(ButtplugServerMessageVariant::V3(err_msg.into())).await; 66 return; 67 } 68 match server_clone.parse_message(client_message.clone()).await { 69 Ok(ret_msg) => { 70 if connector_clone.send(ret_msg).await.is_err() { 71 error!("Cannot send reply to server, dropping and assuming remote server thread has exited."); 72 } 73 }, 74 Err(err_msg) => { 75 if connector_clone.send(err_msg).await.is_err() { 76 error!("Cannot send reply to server, dropping and assuming remote server thread has exited."); 77 } 78 } 79 } 80 }); 81 } 82 }, 83 _ = disconnect_notifier.notified().fuse() => { 84 info!("Server disconnected via controller disappearance, exiting loop."); 85 break; 86 }, 87 server_msg = server_receiver.next().fuse() => match server_msg { 88 None => { 89 info!("Server disconnected via server disappearance, exiting loop."); 90 break; 91 } 92 Some(msg) => { 93 if shared_connector.send(msg).await.is_err() { 94 error!("Server disappeared, exiting remote server thread."); 95 } 96 } 97 }, 98 }; 99 } 100 if let Err(err) = server.disconnect().await { 101 error!("Error disconnecting server: {:?}", err); 102 } 103 info!("Exiting remote server loop"); 104} 105 106impl Default for ButtplugTestServer { 107 fn default() -> Self { 108 Self::new( 109 ButtplugServerBuilder::default() 110 .finish() 111 .expect("Default is infallible"), 112 ) 113 } 114} 115 116impl ButtplugTestServer { 117 pub fn new(server: ButtplugServer) -> Self { 118 Self { 119 server: Arc::new(server), 120 disconnect_notifier: Arc::new(Notify::new()), 121 } 122 } 123 124 pub fn start<ConnectorType>( 125 &self, 126 mut connector: ConnectorType, 127 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> 128 where 129 ConnectorType: 130 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static, 131 { 132 let server_clone = self.server.clone(); 133 let disconnect_notifier = self.disconnect_notifier.clone(); 134 async move { 135 let (connector_sender, connector_receiver) = mpsc::channel(256); 136 connector 137 .connect(connector_sender) 138 .await 139 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?; 140 run_server( 141 server_clone, 142 connector, 143 connector_receiver, 144 disconnect_notifier, 145 ) 146 .await; 147 Ok(()) 148 } 149 } 150 151 #[allow(dead_code)] 152 pub async fn disconnect(&self) -> Result<(), ButtplugError> { 153 self.disconnect_notifier.notify_waiters(); 154 Ok(()) 155 } 156 157 #[allow(dead_code)] 158 pub async fn shutdown(self) -> Result<(), ButtplugError> { 159 self.server.shutdown().await?; 160 Ok(()) 161 } 162} 163 164impl Drop for ButtplugTestServer { 165 fn drop(&mut self) { 166 self.disconnect_notifier.notify_waiters(); 167 } 168}