Buttplug sex toy control library
at dev 25 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8//! Server Device Implementation 9//! 10//! This struct manages the trip from buttplug protocol actuator/sensor message to hardware 11//! communication. This involves: 12//! 13//! - Taking buttplug device command messages from the exposed server 14//! - Converting older spec version messages to the newest spec version, which usually requires 15//! device information for actuation/sensor messages. 16//! - Validity checking the messages to make sure they match the capabilities of the hardware 17//! - Turning the buttplug messages into hardware commands via the associated protocol 18//! - Sending them to the hardware 19//! - Possibly receiving back information (in the case of sensors), possibly firing and forgetting 20//! (in terms of almost everything else) 21//! 22//! We make a lot of assumptions in here based on the devices we support right now, including: 23//! 24//! - Devices will only ever have one directional rotation actuator (we have no device that supports 25//! two rotational components currently) 26//! - Devices will only ever have one linear actuator (we have no device that supports multiple 27//! linear actuators currently) 28//! - Devices scalar command ordering is explicitly set by the device config file 29//! - This means that we rely on the config file to know which vibrator is which on a device with 30//! multiple vibrators. In protocols, especially for toy brands that release a large line of 31//! different toys all using the same protocols (lovense, wevibe, etc), the order of features in 32//! the config file MATTERS and needs to be tested against an actual device to make sure we're 33//! controlling the actuator we think we are. 34//! - This situation sucks and we should have better definitions, a problem outlined at 35//! https://github.com/buttplugio/buttplug/issues/646 36//! 37//! In order to handle multiple message spec versions 38 39use std::{ 40 collections::{BTreeMap, VecDeque}, 41 fmt::{self, Debug}, 42 sync::Arc, 43 time::Duration, 44}; 45 46use buttplug_core::{ 47 ButtplugResultFuture, 48 errors::{ButtplugDeviceError, ButtplugError}, 49 message::{ 50 self, 51 ButtplugServerMessageV4, 52 DeviceFeature, 53 DeviceMessageInfoV4, 54 InputCommandType, 55 InputType, 56 OutputType, 57 OutputValue, 58 }, 59 util::{self, async_manager, stream::convert_broadcast_receiver_to_stream}, 60}; 61use buttplug_server_device_config::{ 62 DeviceConfigurationManager, 63 ServerDeviceDefinition, 64 UserDeviceIdentifier, 65}; 66 67use crate::{ 68 ButtplugServerResultFuture, 69 device::{ 70 hardware::{Hardware, HardwareCommand, HardwareConnector, HardwareEvent}, 71 protocol::{ProtocolHandler, ProtocolKeepaliveStrategy, ProtocolSpecializer}, 72 }, 73 message::{ 74 ButtplugServerDeviceMessage, 75 checked_input_cmd::CheckedInputCmdV4, 76 checked_output_cmd::CheckedOutputCmdV4, 77 server_device_attributes::ServerDeviceAttributes, 78 spec_enums::ButtplugDeviceCommandMessageUnionV4, 79 }, 80}; 81use core::hash::{Hash, Hasher}; 82use dashmap::DashMap; 83use futures::future::{self, BoxFuture, FutureExt}; 84use getset::Getters; 85use tokio::{ 86 select, 87 sync::{ 88 Mutex, 89 mpsc::{Sender, channel}, 90 }, 91 time::Instant, 92}; 93use tokio_stream::StreamExt; 94use uuid::Uuid; 95 96#[derive(Debug)] 97pub enum ServerDeviceEvent { 98 Connected(Arc<ServerDevice>), 99 Notification(UserDeviceIdentifier, ButtplugServerDeviceMessage), 100 Disconnected(UserDeviceIdentifier), 101} 102 103#[derive(Getters)] 104pub struct ServerDevice { 105 hardware: Arc<Hardware>, 106 handler: Arc<dyn ProtocolHandler>, 107 #[getset(get = "pub")] 108 definition: ServerDeviceDefinition, 109 //output_command_manager: ActuatorCommandManager, 110 /// Unique identifier for the device 111 #[getset(get = "pub")] 112 identifier: UserDeviceIdentifier, 113 #[getset(get = "pub")] 114 legacy_attributes: ServerDeviceAttributes, 115 last_output_command: DashMap<Uuid, CheckedOutputCmdV4>, 116 117 stop_commands: Vec<ButtplugDeviceCommandMessageUnionV4>, 118 internal_hw_msg_sender: Sender<Vec<HardwareCommand>>, 119} 120impl Debug for ServerDevice { 121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 122 f.debug_struct("ButtplugDevice") 123 .field("name", &self.name()) 124 .field("identifier", &self.identifier) 125 .finish() 126 } 127} 128 129impl Hash for ServerDevice { 130 fn hash<H: Hasher>(&self, state: &mut H) { 131 self.identifier.hash(state); 132 } 133} 134 135impl Eq for ServerDevice { 136} 137 138impl PartialEq for ServerDevice { 139 fn eq(&self, other: &Self) -> bool { 140 self.identifier == *other.identifier() 141 } 142} 143 144impl ServerDevice { 145 pub(super) async fn build( 146 device_config_manager: Arc<DeviceConfigurationManager>, 147 mut hardware_connector: Box<dyn HardwareConnector>, 148 protocol_specializers: Vec<ProtocolSpecializer>, 149 ) -> Result<Self, ButtplugDeviceError> { 150 // We've already checked to make sure we have specializers in the server device manager event 151 // loop. That check used to be here for sake of continuity in building devices in this method, but 152 // having that done before we get here fixes issues with some device advertisement timing (See 153 // #462 for more info.) 154 155 // At this point, we know we've got hardware that is waiting to connect, and enough protocol 156 // info to actually do something after we connect. So go ahead and connect. 157 trace!("Connecting to {:?}", hardware_connector); 158 let mut hardware_specializer = hardware_connector.connect().await?; 159 160 // We can't run these in parallel because we need to only accept one specializer. 161 let mut protocol_identifier = None; 162 let mut hardware_out = None; 163 for protocol_specializer in protocol_specializers { 164 if let Ok(specialized_hardware) = hardware_specializer 165 .specialize(protocol_specializer.specifiers()) 166 .await 167 { 168 protocol_identifier = Some(protocol_specializer.identify()); 169 hardware_out = Some(specialized_hardware); 170 break; 171 } 172 } 173 174 if protocol_identifier.is_none() { 175 return Err(ButtplugDeviceError::DeviceConfigurationError( 176 "No protocols with viable communication matches for hardware.".to_owned(), 177 )); 178 } 179 180 let mut protocol_identifier_stage = protocol_identifier.unwrap(); 181 let hardware = Arc::new(hardware_out.unwrap()); 182 183 let (identifier, mut protocol_initializer) = protocol_identifier_stage 184 .identify(hardware.clone(), hardware_connector.specifier()) 185 .await?; 186 187 // Now we have an identifier. After this point, if anything fails, consider it a complete 188 // connection failure, as identify may have already run commands on the device, and therefore 189 // put it in an unknown state if anything fails. 190 191 // Check in the DeviceConfigurationManager to make sure we have attributes for this device. 192 let attrs = if let Some(attrs) = device_config_manager.device_definition(&identifier) { 193 attrs 194 } else { 195 return Err(ButtplugDeviceError::DeviceConfigurationError(format!( 196 "No protocols with viable protocol attributes for hardware {identifier:?}." 197 ))); 198 }; 199 200 // If we have attributes, go ahead and initialize, handing us back our hardware instance that 201 // is now ready to use with the protocol handler. 202 203 // Build the server device and return. 204 let handler = protocol_initializer 205 .initialize(hardware.clone(), &attrs.clone()) 206 .await?; 207 208 let requires_keepalive = hardware.requires_keepalive(); 209 let strategy = handler.keepalive_strategy(); 210 211 // We now have fully initialized hardware, return a server device. 212 let device = Self::new(identifier, handler, hardware, &attrs); 213 214 // If we need a keepalive with a packet replay, set this up via stopping the device on connect. 215 if ((requires_keepalive 216 && matches!( 217 strategy, 218 ProtocolKeepaliveStrategy::HardwareRequiredRepeatLastPacketStrategy 219 )) 220 || matches!( 221 strategy, 222 ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(_) 223 )) 224 && let Err(e) = device.handle_stop_device_cmd().await 225 { 226 return Err(ButtplugDeviceError::DeviceConnectionError(format!( 227 "Error setting up keepalive: {e}" 228 ))); 229 } 230 231 Ok(device) 232 } 233 234 /// Given a protocol and a device impl, create a new ButtplugDevice instance 235 fn new( 236 identifier: UserDeviceIdentifier, 237 handler: Arc<dyn ProtocolHandler>, 238 hardware: Arc<Hardware>, 239 definition: &ServerDeviceDefinition, 240 ) -> Self { 241 let (internal_hw_msg_sender, mut internal_hw_msg_recv) = channel::<Vec<HardwareCommand>>(1024); 242 243 let device_wait_duration = if let Some(gap) = definition.message_gap_ms() { 244 Some(Duration::from_millis(gap as u64)) 245 } else { 246 hardware.message_gap() 247 }; 248 249 // Set up and start the packet send task 250 { 251 let hardware = hardware.clone(); 252 let requires_keepalive = hardware.requires_keepalive(); 253 let strategy = handler.keepalive_strategy(); 254 let strategy_duration = 255 if let ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(duration) = strategy { 256 Some(duration) 257 } else { 258 None 259 }; 260 async_manager::spawn(async move { 261 let mut hardware_events = hardware.event_stream(); 262 let keepalive_packet = Mutex::new(None); 263 // TODO This needs to throw system error messages 264 let send_hw_cmd = async |command| { 265 let _ = hardware.parse_message(&command).await; 266 if ((requires_keepalive 267 && matches!( 268 strategy, 269 ProtocolKeepaliveStrategy::HardwareRequiredRepeatLastPacketStrategy 270 )) 271 || matches!( 272 strategy, 273 ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(_) 274 )) 275 && let HardwareCommand::Write(command) = command 276 { 277 *keepalive_packet.lock().await = Some(command); 278 }; 279 }; 280 loop { 281 let requires_keepalive = hardware.requires_keepalive(); 282 let wait_duration_fut = async move { 283 if let Some(duration) = strategy_duration { 284 util::sleep(duration).await; 285 } else if requires_keepalive { 286 // This is really only for iOS Bluetooth 287 util::sleep(Duration::from_secs(5)).await; 288 } else { 289 future::pending::<()>().await; 290 }; 291 }; 292 select! { 293 hw_event = hardware_events.recv() => { 294 if let Ok(hw_event) = hw_event { 295 if matches!(hw_event, HardwareEvent::Disconnected(_)) { 296 info!("Hardware disconnected, shutting down keepalive"); 297 return; 298 } 299 } else { 300 info!("Hardware disconnected, shutting down keepalive"); 301 return; 302 } 303 } 304 msg = internal_hw_msg_recv.recv() => { 305 if msg.is_none() { 306 info!("No longer receiving message from device parent, breaking"); 307 break; 308 } 309 let hardware_cmd = msg.unwrap(); 310 if device_wait_duration.is_none() { 311 trace!("No wait duration specified, sending hardware commands {:?}", hardware_cmd); 312 // send and continue 313 for cmd in hardware_cmd { 314 send_hw_cmd(cmd).await; 315 } 316 continue; 317 } 318 // Run commands in order, otherwise we may end up sending out of order. This may take a while, 319 // but it's what 99% of protocols expect. If they want something else, they can implement it 320 // themselves. 321 // 322 // If anything errors out, just bail on the command series. This most likely means the device 323 // disconnected. 324 let mut local_commands: VecDeque<HardwareCommand> = VecDeque::new(); 325 local_commands.append(&mut VecDeque::from(hardware_cmd)); 326 327 let sleep_until = Instant::now() + *device_wait_duration.as_ref().unwrap(); 328 loop { 329 select! { 330 hw_event = hardware_events.recv() => { 331 if let Ok(hw_event) = hw_event { 332 if matches!(hw_event, HardwareEvent::Disconnected(_)) { 333 info!("Hardware disconnected, shutting down keepalive"); 334 return; 335 } 336 } else { 337 info!("Hardware disconnected, shutting down keepalive"); 338 return; 339 } 340 } 341 msg = internal_hw_msg_recv.recv() => { 342 if msg.is_none() { 343 info!("No longer receiving message from device parent, breaking"); 344 local_commands.clear(); 345 break; 346 } 347 // Run commands in order, otherwise we may end up sending out of order. This may take a while, 348 // but it's what 99% of protocols expect. If they want something else, they can implement it 349 // themselves. 350 // 351 // If anything errors out, just bail on the command series. This most likely means the device 352 // disconnected. 353 for command in msg.unwrap() { 354 local_commands.retain(|v| !command.overlaps(v)); 355 local_commands.push_back(command); 356 } 357 } 358 _ = util::sleep(sleep_until - Instant::now()) => { 359 break; 360 } 361 } 362 if sleep_until < Instant::now() { 363 break; 364 } 365 } 366 while let Some(command) = local_commands.pop_front() { 367 debug!("Sending hardware command {:?}", command); 368 send_hw_cmd(command).await; 369 } 370 } 371 _ = wait_duration_fut => { 372 let keepalive_packet = keepalive_packet.lock().await.clone(); 373 match &strategy { 374 ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(duration) => { 375 if hardware.time_since_last_write().await > *duration { 376 if let Some(packet) = keepalive_packet { 377 if let Err(e) = hardware.write_value(&packet).await { 378 warn!("Error writing keepalive packet: {:?}", e); 379 break; 380 } 381 } else { 382 warn!("No keepalive packet available, device may disconnect."); 383 } 384 } 385 } 386 ProtocolKeepaliveStrategy::HardwareRequiredRepeatPacketStrategy(packet) => { 387 if let Err(e) = hardware.write_value(packet).await { 388 warn!("Error writing keepalive packet: {:?}", e); 389 break; 390 } 391 } 392 ProtocolKeepaliveStrategy::HardwareRequiredRepeatLastPacketStrategy => { 393 if let Some(packet) = keepalive_packet 394 && let Err(e) = hardware.write_value(&packet).await { 395 warn!("Error writing keepalive packet: {:?}", e); 396 break; 397 } 398 } 399 } 400 } 401 } 402 } 403 info!("Leaving keepalive task for {}", hardware.name()); 404 }); 405 } 406 407 let mut stop_commands: Vec<ButtplugDeviceCommandMessageUnionV4> = vec![]; 408 // We consider the feature's FeatureType to be the "main" capability of a feature. Use that to 409 // calculate stop commands. 410 for (index, feature) in definition.features().iter().enumerate() { 411 if let Some(output_map) = feature.output() { 412 for actuator_type in output_map.output_types() { 413 let mut stop_cmd = |actuator_cmd| { 414 stop_commands 415 .push(CheckedOutputCmdV4::new(1, 0, index as u32, feature.id(), actuator_cmd).into()); 416 }; 417 418 // Break out of these if one is found, we only need 1 stop message per output. 419 match actuator_type { 420 OutputType::Constrict => { 421 stop_cmd(message::OutputCommand::Constrict(OutputValue::new(0))); 422 break; 423 } 424 OutputType::Temperature => { 425 stop_cmd(message::OutputCommand::Temperature(OutputValue::new(0))); 426 break; 427 } 428 OutputType::Spray => { 429 stop_cmd(message::OutputCommand::Spray(OutputValue::new(0))); 430 break; 431 } 432 OutputType::Led => { 433 stop_cmd(message::OutputCommand::Led(OutputValue::new(0))); 434 break; 435 } 436 OutputType::Oscillate => { 437 stop_cmd(message::OutputCommand::Oscillate(OutputValue::new(0))); 438 break; 439 } 440 OutputType::Rotate => { 441 stop_cmd(message::OutputCommand::Rotate(OutputValue::new(0))); 442 break; 443 } 444 OutputType::Vibrate => { 445 stop_cmd(message::OutputCommand::Vibrate(OutputValue::new(0))); 446 break; 447 } 448 _ => { 449 // There's not much we can do about position or position w/ duration, so just continue on 450 continue; 451 } 452 } 453 } 454 } 455 } 456 Self { 457 identifier, 458 //output_command_manager: acm, 459 handler, 460 hardware, 461 definition: definition.clone(), 462 // Generating legacy attributes is cheap, just do it right when we create the device. 463 legacy_attributes: ServerDeviceAttributes::new(definition.features()), 464 last_output_command: DashMap::new(), 465 stop_commands, 466 internal_hw_msg_sender, 467 } 468 } 469 470 /// Get the name of the device as set in the Device Configuration File. 471 pub fn name(&self) -> String { 472 self.definition.name().to_owned() 473 } 474 475 /// Disconnect from the device, if it's connected. 476 pub fn disconnect(&self) -> ButtplugResultFuture { 477 let fut = self.hardware.disconnect(); 478 async move { fut.await.map_err(|err| err.into()) }.boxed() 479 } 480 481 /// Retreive the event stream for the device. 482 /// 483 /// This will include connections, disconnections, and notification events from subscribed 484 /// endpoints. 485 pub fn event_stream(&self) -> impl futures::Stream<Item = ServerDeviceEvent> + Send + use<> { 486 let identifier = self.identifier.clone(); 487 let hardware_stream = convert_broadcast_receiver_to_stream(self.hardware.event_stream()) 488 .filter_map(move |hardware_event| { 489 let id = identifier.clone(); 490 match hardware_event { 491 HardwareEvent::Disconnected(_) => Some(ServerDeviceEvent::Disconnected(id)), 492 HardwareEvent::Notification(_address, _endpoint, _data) => { 493 // TODO Does this still need to be here? Does this need to be routed to the protocol it's part of? 494 None 495 } 496 } 497 }); 498 499 let identifier = self.identifier.clone(); 500 let handler_mapped_stream = self.handler.event_stream().map(move |incoming_message| { 501 let id = identifier.clone(); 502 ServerDeviceEvent::Notification(id, incoming_message) 503 }); 504 hardware_stream.merge(handler_mapped_stream) 505 } 506 507 pub fn needs_update(&self, _command_message: &ButtplugDeviceCommandMessageUnionV4) -> bool { 508 true 509 } 510 511 pub fn as_device_message_info(&self, index: u32) -> DeviceMessageInfoV4 { 512 DeviceMessageInfoV4::new( 513 index, 514 &self.name(), 515 self.definition().display_name(), 516 100, 517 &self 518 .definition 519 .features() 520 .iter() 521 .enumerate() 522 .map(|(i, x)| (i as u32, x.as_device_feature(i as u32).expect("Infallible"))) 523 .filter(|(_, x)| x.output().as_ref().is_some() || x.input().as_ref().is_some()) 524 .collect::<BTreeMap<u32, DeviceFeature>>(), 525 ) 526 } 527 528 // In order to not have to worry about id setting at the protocol level (this 529 // should be taken care of in the server's device manager), we return server 530 // messages but Buttplug errors. 531 pub fn parse_message( 532 &self, 533 command_message: ButtplugDeviceCommandMessageUnionV4, 534 ) -> ButtplugServerResultFuture { 535 match command_message { 536 // Input messages 537 ButtplugDeviceCommandMessageUnionV4::InputCmd(msg) => self.handle_input_cmd(msg), 538 // Actuator messages 539 ButtplugDeviceCommandMessageUnionV4::OutputCmd(msg) => self.handle_outputcmd_v4(&msg), 540 ButtplugDeviceCommandMessageUnionV4::OutputVecCmd(msg) => { 541 let mut futs = vec![]; 542 let msg_id = msg.id(); 543 for m in msg.value_vec() { 544 futs.push(self.handle_outputcmd_v4(m)) 545 } 546 async move { 547 for f in futs { 548 f.await?; 549 } 550 Ok(message::OkV0::new(msg_id).into()) 551 } 552 .boxed() 553 } 554 // Other generic messages 555 ButtplugDeviceCommandMessageUnionV4::StopDeviceCmd(_) => self.handle_stop_device_cmd(), 556 } 557 } 558 559 fn handle_outputcmd_v4(&self, msg: &CheckedOutputCmdV4) -> ButtplugServerResultFuture { 560 if let Some(last_msg) = self.last_output_command.get(&msg.feature_id()) 561 && *last_msg == *msg 562 { 563 trace!("No commands generated for incoming device packet, skipping and returning success."); 564 return future::ready(Ok(message::OkV0::default().into())).boxed(); 565 } 566 self 567 .last_output_command 568 .insert(msg.feature_id(), msg.clone()); 569 self.handle_generic_command_result(self.handler.handle_output_cmd(msg)) 570 } 571 572 fn handle_hardware_commands(&self, commands: Vec<HardwareCommand>) -> ButtplugServerResultFuture { 573 let sender = self.internal_hw_msg_sender.clone(); 574 async move { 575 let _ = sender.send(commands).await; 576 Ok(message::OkV0::default().into()) 577 } 578 .boxed() 579 } 580 581 fn handle_generic_command_result( 582 &self, 583 command_result: Result<Vec<HardwareCommand>, ButtplugDeviceError>, 584 ) -> ButtplugServerResultFuture { 585 let hardware_commands = match command_result { 586 Ok(commands) => commands, 587 Err(err) => return future::ready(Err(err.into())).boxed(), 588 }; 589 590 self.handle_hardware_commands(hardware_commands) 591 } 592 593 fn handle_stop_device_cmd(&self) -> ButtplugServerResultFuture { 594 let mut fut_vec = vec![]; 595 self 596 .stop_commands 597 .iter() 598 .for_each(|msg| fut_vec.push(self.parse_message(msg.clone()))); 599 async move { 600 for fut in fut_vec { 601 fut.await?; 602 } 603 Ok(message::OkV0::default().into()) 604 } 605 .boxed() 606 } 607 608 fn handle_input_cmd( 609 &self, 610 message: CheckedInputCmdV4, 611 ) -> BoxFuture<'static, Result<ButtplugServerMessageV4, ButtplugError>> { 612 match message.input_command() { 613 InputCommandType::Read => self.handle_input_read_cmd( 614 message.device_index(), 615 message.feature_index(), 616 message.feature_id(), 617 message.input_type(), 618 ), 619 InputCommandType::Subscribe => self.handle_input_subscribe_cmd( 620 message.device_index(), 621 message.feature_index(), 622 message.feature_id(), 623 message.input_type(), 624 ), 625 InputCommandType::Unsubscribe => self.handle_input_unsubscribe_cmd( 626 message.feature_index(), 627 message.feature_id(), 628 message.input_type(), 629 ), 630 } 631 } 632 633 fn handle_input_read_cmd( 634 &self, 635 device_index: u32, 636 feature_index: u32, 637 feature_id: Uuid, 638 input_type: InputType, 639 ) -> BoxFuture<'static, Result<ButtplugServerMessageV4, ButtplugError>> { 640 let device = self.hardware.clone(); 641 let handler = self.handler.clone(); 642 async move { 643 handler 644 .handle_input_read_cmd(device_index, device, feature_index, feature_id, input_type) 645 .await 646 .map_err(|e| e.into()) 647 .map(|e| e.into()) 648 } 649 .boxed() 650 } 651 652 fn handle_input_subscribe_cmd( 653 &self, 654 device_index: u32, 655 feature_index: u32, 656 feature_id: Uuid, 657 input_type: InputType, 658 ) -> ButtplugServerResultFuture { 659 let device = self.hardware.clone(); 660 let handler = self.handler.clone(); 661 async move { 662 handler 663 .handle_input_subscribe_cmd(device_index, device, feature_index, feature_id, input_type) 664 .await 665 .map(|_| message::OkV0::new(1).into()) 666 .map_err(|e| e.into()) 667 } 668 .boxed() 669 } 670 671 fn handle_input_unsubscribe_cmd( 672 &self, 673 feature_index: u32, 674 feature_id: Uuid, 675 input_type: InputType, 676 ) -> ButtplugServerResultFuture { 677 let device = self.hardware.clone(); 678 let handler = self.handler.clone(); 679 async move { 680 handler 681 .handle_input_unsubscribe_cmd(device, feature_index, feature_id, input_type) 682 .await 683 .map(|_| message::OkV0::new(1).into()) 684 .map_err(|e| e.into()) 685 } 686 .boxed() 687 } 688}