Buttplug sex toy control library
at master 17 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8use crate::server_message_conversion::ButtplugServerDeviceEventMessageConverter; 9 10use super::{ 11 ButtplugServerResultFuture, 12 device::ServerDeviceManager, 13 message::{ 14 ButtplugClientMessageVariant, 15 ButtplugServerMessageVariant, 16 server_device_attributes::TryFromClientMessage, 17 spec_enums::{ 18 ButtplugCheckedClientMessageV4, 19 ButtplugDeviceCommandMessageUnionV4, 20 ButtplugDeviceManagerMessageUnion, 21 }, 22 }, 23 ping_timer::PingTimer, 24 server_message_conversion::ButtplugServerMessageConverter, 25}; 26use buttplug_core::{ 27 errors::*, 28 message::{ 29 self, 30 BUTTPLUG_CURRENT_API_MAJOR_VERSION, 31 ButtplugMessage, 32 ButtplugMessageSpecVersion, 33 ButtplugServerMessageV4, 34 ErrorV0, 35 StopAllDevicesV0, 36 StopScanningV0, 37 }, 38 util::stream::convert_broadcast_receiver_to_stream, 39}; 40use futures::{ 41 Stream, 42 future::{self, BoxFuture, FutureExt}, 43}; 44use once_cell::sync::OnceCell; 45use std::{ 46 fmt, 47 sync::{ 48 Arc, 49 atomic::{AtomicBool, Ordering}, 50 }, 51}; 52use tokio::sync::broadcast; 53use tokio_stream::StreamExt; 54use tracing::info_span; 55use tracing_futures::Instrument; 56 57/// The server side of the Buttplug protocol. Frontend for connection to device management and 58/// communication. 59pub struct ButtplugServer { 60 /// The name of the server, which is relayed to the client on connection (mostly for 61 /// confirmation in UI dialogs) 62 server_name: String, 63 /// The maximum ping time, in milliseconds, for the server. If the server does not receive a 64 /// [Ping](buttplug_core::messages::Ping) message in this amount of time after the handshake has 65 /// succeeded, the server will automatically disconnect. If this is not called, the ping timer 66 /// will not be activated. 67 /// 68 /// Note that this has nothing to do with communication medium specific pings, like those built 69 /// into the Websocket protocol. This ping is specific to the Buttplug protocol. 70 max_ping_time: u32, 71 /// Timer for managing ping time tracking, if max_ping_time > 0. 72 ping_timer: Arc<PingTimer>, 73 /// Manages device discovery and communication. 74 device_manager: Arc<ServerDeviceManager>, 75 /// If true, client is currently connected to server 76 connected: Arc<AtomicBool>, 77 /// Broadcaster for server events. Receivers for this are handed out through the 78 /// [ButtplugServer::event_stream()] method. 79 output_sender: broadcast::Sender<ButtplugServerMessageV4>, 80 /// Name of the connected client, assuming there is one. 81 client_name: Arc<OnceCell<String>>, 82 /// Current spec version for the connected client 83 spec_version: Arc<OnceCell<ButtplugMessageSpecVersion>>, 84} 85 86impl std::fmt::Debug for ButtplugServer { 87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 88 f.debug_struct("ButtplugServer") 89 .field("server_name", &self.server_name) 90 .field("max_ping_time", &self.max_ping_time) 91 .field("connected", &self.connected) 92 .finish() 93 } 94} 95 96impl ButtplugServer { 97 pub(super) fn new( 98 server_name: &str, 99 max_ping_time: u32, 100 ping_timer: Arc<PingTimer>, 101 device_manager: Arc<ServerDeviceManager>, 102 connected: Arc<AtomicBool>, 103 output_sender: broadcast::Sender<ButtplugServerMessageV4>, 104 ) -> Self { 105 ButtplugServer { 106 server_name: server_name.to_owned(), 107 max_ping_time, 108 ping_timer, 109 device_manager, 110 connected, 111 output_sender, 112 client_name: Arc::new(OnceCell::new()), 113 spec_version: Arc::new(OnceCell::new()), 114 } 115 } 116 117 pub fn client_name(&self) -> Option<String> { 118 self.client_name.get().cloned() 119 } 120 121 /// Retreive an async stream of ButtplugServerMessages. This is how the server sends out 122 /// non-query-related updates to the system, including information on devices being added/removed, 123 /// client disconnection, etc... 124 pub fn event_stream(&self) -> impl Stream<Item = ButtplugServerMessageVariant> + use<> { 125 let spec_version = self.spec_version.clone(); 126 let converter = ButtplugServerMessageConverter::new(None); 127 let device_indexes: Vec<u32> = self 128 .device_manager 129 .devices() 130 .iter() 131 .map(|x| *x.key()) 132 .collect(); 133 let device_event_converter = ButtplugServerDeviceEventMessageConverter::new(device_indexes); 134 self.server_version_event_stream().map(move |m| { 135 if let ButtplugServerMessageV4::DeviceList(list) = m { 136 device_event_converter.convert_device_list( 137 spec_version 138 .get() 139 .unwrap_or(&ButtplugMessageSpecVersion::Version4), 140 &list, 141 ) 142 } else { 143 // If we get an event and don't have a spec version yet, just throw out the latest. 144 converter 145 .convert_outgoing( 146 &m, 147 spec_version 148 .get() 149 .unwrap_or(&ButtplugMessageSpecVersion::Version4), 150 ) 151 .unwrap() 152 } 153 }) 154 } 155 156 /// Retreive an async stream of ButtplugServerMessages, always at the latest available message 157 /// spec. This is how the server sends out non-query-related updates to the system, including 158 /// information on devices being added/removed, client disconnection, etc... 159 pub fn server_version_event_stream(&self) -> impl Stream<Item = ButtplugServerMessageV4> + use<> { 160 // Unlike the client API, we can expect anyone using the server to pin this 161 // themselves. 162 let server_receiver = convert_broadcast_receiver_to_stream(self.output_sender.subscribe()); 163 let device_receiver = self.device_manager.event_stream(); 164 device_receiver.merge(server_receiver) 165 } 166 167 /// Returns a references to the internal device manager, for handling configuration. 168 pub fn device_manager(&self) -> Arc<ServerDeviceManager> { 169 self.device_manager.clone() 170 } 171 172 /// If true, client is currently connected to the server. 173 pub fn connected(&self) -> bool { 174 self.connected.load(Ordering::Relaxed) 175 } 176 177 /// Disconnects the server from a client, if it is connected. 178 pub fn disconnect(&self) -> BoxFuture<'_, Result<(), message::ErrorV0>> { 179 debug!("Buttplug Server {} disconnect requested", self.server_name); 180 let ping_timer = self.ping_timer.clone(); 181 // As long as StopScanning/StopAllDevices aren't changed across message specs, we can inject 182 // them using parse_checked_message and bypass version checking. 183 let stop_scanning_fut = self.parse_checked_message( 184 ButtplugCheckedClientMessageV4::StopScanning(StopScanningV0::default()), 185 ); 186 let stop_fut = self.parse_checked_message(ButtplugCheckedClientMessageV4::StopAllDevices( 187 StopAllDevicesV0::default(), 188 )); 189 let connected = self.connected.clone(); 190 async move { 191 connected.store(false, Ordering::Relaxed); 192 ping_timer.stop_ping_timer().await; 193 // Ignore returns here, we just want to stop. 194 info!("Server disconnected, stopping device scanning if it was started..."); 195 let _ = stop_scanning_fut.await; 196 info!("Server disconnected, stopping all devices..."); 197 let _ = stop_fut.await; 198 Ok(()) 199 } 200 .boxed() 201 } 202 203 pub fn shutdown(&self) -> ButtplugServerResultFuture { 204 let device_manager = self.device_manager.clone(); 205 //let disconnect_future = self.disconnect(); 206 async move { device_manager.shutdown().await }.boxed() 207 } 208 209 /// Sends a [ButtplugClientMessage] to be parsed by the server (for handshake or ping), or passed 210 /// into the server's [DeviceManager] for communication with devices. 211 pub fn parse_message( 212 &self, 213 msg: ButtplugClientMessageVariant, 214 ) -> BoxFuture<'static, Result<ButtplugServerMessageVariant, ButtplugServerMessageVariant>> { 215 let features = self.device_manager().feature_map(); 216 let msg_id = msg.id(); 217 debug!("Server received: {:?}", msg); 218 match msg { 219 ButtplugClientMessageVariant::V4(msg) => { 220 let internal_msg = 221 match ButtplugCheckedClientMessageV4::try_from_client_message(msg, &features) { 222 Ok(m) => m, 223 Err(e) => { 224 let mut err_msg = ErrorV0::from(e); 225 err_msg.set_id(msg_id); 226 return future::ready(Err(ButtplugServerMessageVariant::from( 227 ButtplugServerMessageV4::from(err_msg), 228 ))) 229 .boxed(); 230 } 231 }; 232 let fut = self.parse_checked_message(internal_msg); 233 async move { 234 Ok( 235 fut 236 .await 237 .map_err(|e| ButtplugServerMessageVariant::from(ButtplugServerMessageV4::from(e)))? 238 .into(), 239 ) 240 } 241 .boxed() 242 } 243 msg => { 244 let v = msg.version(); 245 let converter = ButtplugServerMessageConverter::new(Some(msg.clone())); 246 let spec_version = *self.spec_version.get_or_init(|| { 247 info!( 248 "Setting Buttplug Server Message Spec Downgrade version to {}", 249 v 250 ); 251 v 252 }); 253 match ButtplugCheckedClientMessageV4::try_from_client_message(msg, &features) { 254 Ok(converted_msg) => { 255 debug!("Converted message: {:?}", converted_msg); 256 let fut = self.parse_checked_message(converted_msg); 257 async move { 258 let result = fut.await.map_err(|e| { 259 converter 260 .convert_outgoing(&e.into(), &spec_version) 261 .unwrap() 262 })?; 263 let out_msg = converter 264 .convert_outgoing(&result, &spec_version) 265 .map_err(|e| { 266 converter 267 .convert_outgoing( 268 &ButtplugServerMessageV4::from(ErrorV0::from(e)), 269 &spec_version, 270 ) 271 .unwrap() 272 }); 273 debug!("Server returning: {:?}", out_msg); 274 out_msg 275 } 276 .boxed() 277 } 278 Err(e) => { 279 let mut err_msg = ErrorV0::from(e); 280 err_msg.set_id(msg_id); 281 282 future::ready(Err( 283 converter 284 .convert_outgoing(&ButtplugServerMessageV4::from(err_msg), &spec_version) 285 .unwrap(), 286 )) 287 .boxed() 288 } 289 } 290 } 291 } 292 } 293 294 pub fn parse_checked_message( 295 &self, 296 msg: ButtplugCheckedClientMessageV4, 297 ) -> BoxFuture<'static, Result<ButtplugServerMessageV4, message::ErrorV0>> { 298 trace!( 299 "Buttplug Server {} received message to client parse: {:?}", 300 self.server_name, msg 301 ); 302 let id = msg.id(); 303 if !self.connected() { 304 // Check for ping timeout first! There's no way we should've pinged out if 305 // we haven't received RequestServerInfo first, but we do want to know if 306 // we pinged out. 307 let error = if self.ping_timer.pinged_out() { 308 Some(message::ErrorV0::from(ButtplugError::from( 309 ButtplugPingError::PingedOut, 310 ))) 311 } else if !matches!(msg, ButtplugCheckedClientMessageV4::RequestServerInfo(_)) { 312 Some(message::ErrorV0::from(ButtplugError::from( 313 ButtplugHandshakeError::RequestServerInfoExpected, 314 ))) 315 } else { 316 None 317 }; 318 if let Some(mut return_error) = error { 319 return_error.set_id(msg.id()); 320 return future::ready(Err(return_error)).boxed(); 321 } 322 // If we haven't pinged out and we got an RSI message, fall thru. 323 } 324 // Produce whatever future is needed to reply to the message, this may be a 325 // device command future, or something the server handles. All futures will 326 // return Result<ButtplugServerMessage, ButtplugError>, and we'll handle 327 // tagging the result with the message id in the future we put out as the 328 // return value from this method. 329 let out_fut = if ButtplugDeviceManagerMessageUnion::try_from(msg.clone()).is_ok() 330 || ButtplugDeviceCommandMessageUnionV4::try_from(msg.clone()).is_ok() 331 { 332 self.device_manager.parse_message(msg.clone()) 333 } else { 334 match msg { 335 ButtplugCheckedClientMessageV4::RequestServerInfo(rsi_msg) => { 336 self.perform_handshake(rsi_msg) 337 } 338 ButtplugCheckedClientMessageV4::Ping(p) => self.handle_ping(p), 339 _ => ButtplugMessageError::UnexpectedMessageType(format!("{msg:?}")).into(), 340 } 341 }; 342 // Simple way to set the ID on the way out. Just rewrap 343 // the returned future to make sure it happens. 344 async move { 345 out_fut 346 .await 347 .map(|mut ok_msg| { 348 ok_msg.set_id(id); 349 trace!("Server returning message: {:?}", ok_msg); 350 ok_msg 351 }) 352 .map_err(|err| { 353 let mut error = message::ErrorV0::from(err); 354 error.set_id(id); 355 error 356 }) 357 } 358 .instrument(info_span!("Buttplug Server Message", id = id)) 359 .boxed() 360 } 361 362 /// Performs the [RequestServerInfo]([ServerInfo](buttplug_core::message::RequestServerInfo) / 363 /// [ServerInfo](buttplug_core::message::ServerInfo) handshake, as specified in the [Buttplug 364 /// Protocol Spec](https://buttplug-spec.docs.buttplug.io). This is the first thing that must 365 /// happens upon connection to the server, in order to make sure the server can speak the same 366 /// protocol version as the client. 367 fn perform_handshake(&self, msg: message::RequestServerInfoV4) -> ButtplugServerResultFuture { 368 if self.connected() { 369 return ButtplugHandshakeError::HandshakeAlreadyHappened.into(); 370 } 371 if !self.connected() && self.client_name.get().is_some() { 372 return ButtplugHandshakeError::ReconnectDenied.into(); 373 } 374 info!( 375 "Performing server handshake check with client {} at message version {}.{}", 376 msg.client_name(), 377 msg.protocol_version_major(), 378 msg.protocol_version_minor() 379 ); 380 381 if BUTTPLUG_CURRENT_API_MAJOR_VERSION < msg.protocol_version_major() { 382 return ButtplugHandshakeError::MessageSpecVersionMismatch( 383 BUTTPLUG_CURRENT_API_MAJOR_VERSION, 384 msg.protocol_version_major(), 385 ) 386 .into(); 387 } 388 389 // Only start the ping timer after we've received the handshake. 390 let ping_timer = self.ping_timer.clone(); 391 392 // Due to programming/spec errors in prior versions of the protocol, anything before v4 expected 393 // that it would be back a matching api version of the server. The correct response is to send back whatever the 394 let output_version = if (msg.protocol_version_major() as u32) < 4 { 395 msg.protocol_version_major() 396 } else { 397 BUTTPLUG_CURRENT_API_MAJOR_VERSION 398 }; 399 let out_msg = 400 message::ServerInfoV4::new(&self.server_name, output_version, 0, self.max_ping_time); 401 let connected = self.connected.clone(); 402 self 403 .client_name 404 .set(msg.client_name().to_owned()) 405 .expect("We should never conflict on name access"); 406 async move { 407 ping_timer.start_ping_timer().await; 408 connected.store(true, Ordering::Relaxed); 409 debug!("Server handshake check successful."); 410 Result::Ok(out_msg.into()) 411 } 412 .boxed() 413 } 414 415 /// Update the [PingTimer] with the latest received ping message. 416 fn handle_ping(&self, msg: message::PingV0) -> ButtplugServerResultFuture { 417 if self.max_ping_time == 0 { 418 return ButtplugPingError::PingTimerNotRunning.into(); 419 } 420 let fut = self.ping_timer.update_ping_time(); 421 async move { 422 fut.await; 423 Result::Ok(message::OkV0::new(msg.id()).into()) 424 } 425 .boxed() 426 } 427} 428 429#[cfg(test)] 430mod test { 431 use crate::ButtplugServerBuilder; 432 use buttplug_core::message::{self, BUTTPLUG_CURRENT_API_MAJOR_VERSION}; 433 #[tokio::test] 434 async fn test_server_deny_reuse() { 435 let server = ButtplugServerBuilder::default().finish().unwrap(); 436 let msg = 437 message::RequestServerInfoV4::new("Test Client", BUTTPLUG_CURRENT_API_MAJOR_VERSION, 0); 438 let mut reply = server.parse_checked_message(msg.clone().into()).await; 439 assert!(reply.is_ok(), "Should get back ok: {:?}", reply); 440 441 reply = server.parse_checked_message(msg.clone().into()).await; 442 assert!( 443 reply.is_err(), 444 "Should get back err on double handshake: {:?}", 445 reply 446 ); 447 assert!(server.disconnect().await.is_ok(), "Should disconnect ok"); 448 449 reply = server.parse_checked_message(msg.clone().into()).await; 450 assert!( 451 reply.is_err(), 452 "Should get back err on handshake after disconnect: {:?}", 453 reply 454 ); 455 } 456}