Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! Server Device Implementation
9//!
10//! This struct manages the trip from buttplug protocol actuator/sensor message to hardware
11//! communication. This involves:
12//!
13//! - Taking buttplug device command messages from the exposed server
14//! - Converting older spec version messages to the newest spec version, which usually requires
15//! device information for actuation/sensor messages.
16//! - Validity checking the messages to make sure they match the capabilities of the hardware
17//! - Turning the buttplug messages into hardware commands via the associated protocol
18//! - Sending them to the hardware
19//! - Possibly receiving back information (in the case of sensors), possibly firing and forgetting
20//! (in terms of almost everything else)
21//!
22//! We make a lot of assumptions in here based on the devices we support right now, including:
23//!
24//! - Devices will only ever have one directional rotation actuator (we have no device that supports
25//! two rotational components currently)
26//! - Devices will only ever have one linear actuator (we have no device that supports multiple
27//! linear actuators currently)
28//! - Devices scalar command ordering is explicitly set by the device config file
29//! - This means that we rely on the config file to know which vibrator is which on a device with
30//! multiple vibrators. In protocols, especially for toy brands that release a large line of
31//! different toys all using the same protocols (lovense, wevibe, etc), the order of features in
32//! the config file MATTERS and needs to be tested against an actual device to make sure we're
33//! controlling the actuator we think we are.
34//! - This situation sucks and we should have better definitions, a problem outlined at
35//! https://github.com/buttplugio/buttplug/issues/646
36//!
37//! In order to handle multiple message spec versions
38
39use std::{
40 collections::{BTreeMap, VecDeque},
41 fmt::{self, Debug},
42 sync::Arc,
43 time::Duration,
44};
45
46use buttplug_core::{
47 ButtplugResultFuture,
48 errors::{ButtplugDeviceError, ButtplugError},
49 message::{
50 self,
51 ButtplugServerMessageV4,
52 DeviceFeature,
53 DeviceMessageInfoV4,
54 InputCommandType,
55 InputType,
56 OutputType,
57 OutputValue,
58 },
59 util::{self, async_manager, stream::convert_broadcast_receiver_to_stream},
60};
61use buttplug_server_device_config::{
62 DeviceConfigurationManager,
63 ServerDeviceDefinition,
64 UserDeviceIdentifier,
65};
66
67use crate::{
68 ButtplugServerResultFuture,
69 device::{
70 hardware::{Hardware, HardwareCommand, HardwareConnector, HardwareEvent},
71 protocol::{ProtocolHandler, ProtocolKeepaliveStrategy, ProtocolSpecializer},
72 },
73 message::{
74 ButtplugServerDeviceMessage,
75 checked_input_cmd::CheckedInputCmdV4,
76 checked_output_cmd::CheckedOutputCmdV4,
77 server_device_attributes::ServerDeviceAttributes,
78 spec_enums::ButtplugDeviceCommandMessageUnionV4,
79 },
80};
81use core::hash::{Hash, Hasher};
82use dashmap::DashMap;
83use futures::future::{self, BoxFuture, FutureExt};
84use getset::Getters;
85use tokio::{
86 select,
87 sync::{
88 Mutex,
89 mpsc::{Sender, channel},
90 },
91 time::Instant,
92};
93use tokio_stream::StreamExt;
94use uuid::Uuid;
95
96#[derive(Debug)]
97pub enum ServerDeviceEvent {
98 Connected(Arc<ServerDevice>),
99 Notification(UserDeviceIdentifier, ButtplugServerDeviceMessage),
100 Disconnected(UserDeviceIdentifier),
101}
102
103#[derive(Getters)]
104pub struct ServerDevice {
105 hardware: Arc<Hardware>,
106 handler: Arc<dyn ProtocolHandler>,
107 #[getset(get = "pub")]
108 definition: ServerDeviceDefinition,
109 //output_command_manager: ActuatorCommandManager,
110 /// Unique identifier for the device
111 #[getset(get = "pub")]
112 identifier: UserDeviceIdentifier,
113 #[getset(get = "pub")]
114 legacy_attributes: ServerDeviceAttributes,
115 last_output_command: DashMap<Uuid, CheckedOutputCmdV4>,
116
117 stop_commands: Vec<ButtplugDeviceCommandMessageUnionV4>,
118 internal_hw_msg_sender: Sender<Vec<HardwareCommand>>,
119}
120impl Debug for ServerDevice {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 f.debug_struct("ButtplugDevice")
123 .field("name", &self.name())
124 .field("identifier", &self.identifier)
125 .finish()
126 }
127}
128
129impl Hash for ServerDevice {
130 fn hash<H: Hasher>(&self, state: &mut H) {
131 self.identifier.hash(state);
132 }
133}
134
135impl Eq for ServerDevice {
136}
137
138impl PartialEq for ServerDevice {
139 fn eq(&self, other: &Self) -> bool {
140 self.identifier == *other.identifier()
141 }
142}
143
144impl ServerDevice {
145 pub(super) async fn build(
146 device_config_manager: Arc<DeviceConfigurationManager>,
147 mut hardware_connector: Box<dyn HardwareConnector>,
148 protocol_specializers: Vec<ProtocolSpecializer>,
149 ) -> Result<Self, ButtplugDeviceError> {
150 // We've already checked to make sure we have specializers in the server device manager event
151 // loop. That check used to be here for sake of continuity in building devices in this method, but
152 // having that done before we get here fixes issues with some device advertisement timing (See
153 // #462 for more info.)
154
155 // At this point, we know we've got hardware that is waiting to connect, and enough protocol
156 // info to actually do something after we connect. So go ahead and connect.
157 trace!("Connecting to {:?}", hardware_connector);
158 let mut hardware_specializer = hardware_connector.connect().await?;
159
160 // We can't run these in parallel because we need to only accept one specializer.
161 let mut protocol_identifier = None;
162 let mut hardware_out = None;
163 for protocol_specializer in protocol_specializers {
164 if let Ok(specialized_hardware) = hardware_specializer
165 .specialize(protocol_specializer.specifiers())
166 .await
167 {
168 protocol_identifier = Some(protocol_specializer.identify());
169 hardware_out = Some(specialized_hardware);
170 break;
171 }
172 }
173
174 if protocol_identifier.is_none() {
175 return Err(ButtplugDeviceError::DeviceConfigurationError(
176 "No protocols with viable communication matches for hardware.".to_owned(),
177 ));
178 }
179
180 let mut protocol_identifier_stage = protocol_identifier.unwrap();
181 let hardware = Arc::new(hardware_out.unwrap());
182
183 let (identifier, mut protocol_initializer) = protocol_identifier_stage
184 .identify(hardware.clone(), hardware_connector.specifier())
185 .await?;
186
187 // Now we have an identifier. After this point, if anything fails, consider it a complete
188 // connection failure, as identify may have already run commands on the device, and therefore
189 // put it in an unknown state if anything fails.
190
191 // Check in the DeviceConfigurationManager to make sure we have attributes for this device.
192 let attrs = if let Some(attrs) = device_config_manager.device_definition(&identifier) {
193 attrs
194 } else {
195 return Err(ButtplugDeviceError::DeviceConfigurationError(format!(
196 "No protocols with viable protocol attributes for hardware {identifier:?}."
197 )));
198 };
199
200 // If we have attributes, go ahead and initialize, handing us back our hardware instance that
201 // is now ready to use with the protocol handler.
202
203 // Build the server device and return.
204 let handler = protocol_initializer
205 .initialize(hardware.clone(), &attrs.clone())
206 .await?;
207
208 let requires_keepalive = hardware.requires_keepalive();
209 let strategy = handler.keepalive_strategy();
210
211 // We now have fully initialized hardware, return a server device.
212 let device = Self::new(identifier, handler, hardware, &attrs);
213
214 // If we need a keepalive with a packet replay, set this up via stopping the device on connect.
215 if ((requires_keepalive
216 && matches!(
217 strategy,
218 ProtocolKeepaliveStrategy::HardwareRequiredRepeatLastPacketStrategy
219 ))
220 || matches!(
221 strategy,
222 ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(_)
223 ))
224 && let Err(e) = device.handle_stop_device_cmd().await
225 {
226 return Err(ButtplugDeviceError::DeviceConnectionError(format!(
227 "Error setting up keepalive: {e}"
228 )));
229 }
230
231 Ok(device)
232 }
233
234 /// Given a protocol and a device impl, create a new ButtplugDevice instance
235 fn new(
236 identifier: UserDeviceIdentifier,
237 handler: Arc<dyn ProtocolHandler>,
238 hardware: Arc<Hardware>,
239 definition: &ServerDeviceDefinition,
240 ) -> Self {
241 let (internal_hw_msg_sender, mut internal_hw_msg_recv) = channel::<Vec<HardwareCommand>>(1024);
242
243 let device_wait_duration = if let Some(gap) = definition.message_gap_ms() {
244 Some(Duration::from_millis(gap as u64))
245 } else {
246 hardware.message_gap()
247 };
248
249 // Set up and start the packet send task
250 {
251 let hardware = hardware.clone();
252 let requires_keepalive = hardware.requires_keepalive();
253 let strategy = handler.keepalive_strategy();
254 let strategy_duration =
255 if let ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(duration) = strategy {
256 Some(duration)
257 } else {
258 None
259 };
260 async_manager::spawn(async move {
261 let mut hardware_events = hardware.event_stream();
262 let keepalive_packet = Mutex::new(None);
263 // TODO This needs to throw system error messages
264 let send_hw_cmd = async |command| {
265 let _ = hardware.parse_message(&command).await;
266 if ((requires_keepalive
267 && matches!(
268 strategy,
269 ProtocolKeepaliveStrategy::HardwareRequiredRepeatLastPacketStrategy
270 ))
271 || matches!(
272 strategy,
273 ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(_)
274 ))
275 && let HardwareCommand::Write(command) = command
276 {
277 *keepalive_packet.lock().await = Some(command);
278 };
279 };
280 loop {
281 let requires_keepalive = hardware.requires_keepalive();
282 let wait_duration_fut = async move {
283 if let Some(duration) = strategy_duration {
284 util::sleep(duration).await;
285 } else if requires_keepalive {
286 // This is really only for iOS Bluetooth
287 util::sleep(Duration::from_secs(5)).await;
288 } else {
289 future::pending::<()>().await;
290 };
291 };
292 select! {
293 hw_event = hardware_events.recv() => {
294 if let Ok(hw_event) = hw_event {
295 if matches!(hw_event, HardwareEvent::Disconnected(_)) {
296 info!("Hardware disconnected, shutting down keepalive");
297 return;
298 }
299 } else {
300 info!("Hardware disconnected, shutting down keepalive");
301 return;
302 }
303 }
304 msg = internal_hw_msg_recv.recv() => {
305 if msg.is_none() {
306 info!("No longer receiving message from device parent, breaking");
307 break;
308 }
309 let hardware_cmd = msg.unwrap();
310 if device_wait_duration.is_none() {
311 trace!("No wait duration specified, sending hardware commands {:?}", hardware_cmd);
312 // send and continue
313 for cmd in hardware_cmd {
314 send_hw_cmd(cmd).await;
315 }
316 continue;
317 }
318 // Run commands in order, otherwise we may end up sending out of order. This may take a while,
319 // but it's what 99% of protocols expect. If they want something else, they can implement it
320 // themselves.
321 //
322 // If anything errors out, just bail on the command series. This most likely means the device
323 // disconnected.
324 let mut local_commands: VecDeque<HardwareCommand> = VecDeque::new();
325 local_commands.append(&mut VecDeque::from(hardware_cmd));
326
327 let sleep_until = Instant::now() + *device_wait_duration.as_ref().unwrap();
328 loop {
329 select! {
330 hw_event = hardware_events.recv() => {
331 if let Ok(hw_event) = hw_event {
332 if matches!(hw_event, HardwareEvent::Disconnected(_)) {
333 info!("Hardware disconnected, shutting down keepalive");
334 return;
335 }
336 } else {
337 info!("Hardware disconnected, shutting down keepalive");
338 return;
339 }
340 }
341 msg = internal_hw_msg_recv.recv() => {
342 if msg.is_none() {
343 info!("No longer receiving message from device parent, breaking");
344 local_commands.clear();
345 break;
346 }
347 // Run commands in order, otherwise we may end up sending out of order. This may take a while,
348 // but it's what 99% of protocols expect. If they want something else, they can implement it
349 // themselves.
350 //
351 // If anything errors out, just bail on the command series. This most likely means the device
352 // disconnected.
353 for command in msg.unwrap() {
354 local_commands.retain(|v| !command.overlaps(v));
355 local_commands.push_back(command);
356 }
357 }
358 _ = util::sleep(sleep_until - Instant::now()) => {
359 break;
360 }
361 }
362 if sleep_until < Instant::now() {
363 break;
364 }
365 }
366 while let Some(command) = local_commands.pop_front() {
367 debug!("Sending hardware command {:?}", command);
368 send_hw_cmd(command).await;
369 }
370 }
371 _ = wait_duration_fut => {
372 let keepalive_packet = keepalive_packet.lock().await.clone();
373 match &strategy {
374 ProtocolKeepaliveStrategy::RepeatLastPacketStrategyWithTiming(duration) => {
375 if hardware.time_since_last_write().await > *duration {
376 if let Some(packet) = keepalive_packet {
377 if let Err(e) = hardware.write_value(&packet).await {
378 warn!("Error writing keepalive packet: {:?}", e);
379 break;
380 }
381 } else {
382 warn!("No keepalive packet available, device may disconnect.");
383 }
384 }
385 }
386 ProtocolKeepaliveStrategy::HardwareRequiredRepeatPacketStrategy(packet) => {
387 if let Err(e) = hardware.write_value(packet).await {
388 warn!("Error writing keepalive packet: {:?}", e);
389 break;
390 }
391 }
392 ProtocolKeepaliveStrategy::HardwareRequiredRepeatLastPacketStrategy => {
393 if let Some(packet) = keepalive_packet
394 && let Err(e) = hardware.write_value(&packet).await {
395 warn!("Error writing keepalive packet: {:?}", e);
396 break;
397 }
398 }
399 }
400 }
401 }
402 }
403 info!("Leaving keepalive task for {}", hardware.name());
404 });
405 }
406
407 let mut stop_commands: Vec<ButtplugDeviceCommandMessageUnionV4> = vec![];
408 // We consider the feature's FeatureType to be the "main" capability of a feature. Use that to
409 // calculate stop commands.
410 for (index, feature) in definition.features().iter().enumerate() {
411 if let Some(output_map) = feature.output() {
412 for actuator_type in output_map.output_types() {
413 let mut stop_cmd = |actuator_cmd| {
414 stop_commands
415 .push(CheckedOutputCmdV4::new(1, 0, index as u32, feature.id(), actuator_cmd).into());
416 };
417
418 // Break out of these if one is found, we only need 1 stop message per output.
419 match actuator_type {
420 OutputType::Constrict => {
421 stop_cmd(message::OutputCommand::Constrict(OutputValue::new(0)));
422 break;
423 }
424 OutputType::Temperature => {
425 stop_cmd(message::OutputCommand::Temperature(OutputValue::new(0)));
426 break;
427 }
428 OutputType::Spray => {
429 stop_cmd(message::OutputCommand::Spray(OutputValue::new(0)));
430 break;
431 }
432 OutputType::Led => {
433 stop_cmd(message::OutputCommand::Led(OutputValue::new(0)));
434 break;
435 }
436 OutputType::Oscillate => {
437 stop_cmd(message::OutputCommand::Oscillate(OutputValue::new(0)));
438 break;
439 }
440 OutputType::Rotate => {
441 stop_cmd(message::OutputCommand::Rotate(OutputValue::new(0)));
442 break;
443 }
444 OutputType::Vibrate => {
445 stop_cmd(message::OutputCommand::Vibrate(OutputValue::new(0)));
446 break;
447 }
448 _ => {
449 // There's not much we can do about position or position w/ duration, so just continue on
450 continue;
451 }
452 }
453 }
454 }
455 }
456 Self {
457 identifier,
458 //output_command_manager: acm,
459 handler,
460 hardware,
461 definition: definition.clone(),
462 // Generating legacy attributes is cheap, just do it right when we create the device.
463 legacy_attributes: ServerDeviceAttributes::new(definition.features()),
464 last_output_command: DashMap::new(),
465 stop_commands,
466 internal_hw_msg_sender,
467 }
468 }
469
470 /// Get the name of the device as set in the Device Configuration File.
471 pub fn name(&self) -> String {
472 self.definition.name().to_owned()
473 }
474
475 /// Disconnect from the device, if it's connected.
476 pub fn disconnect(&self) -> ButtplugResultFuture {
477 let fut = self.hardware.disconnect();
478 async move { fut.await.map_err(|err| err.into()) }.boxed()
479 }
480
481 /// Retreive the event stream for the device.
482 ///
483 /// This will include connections, disconnections, and notification events from subscribed
484 /// endpoints.
485 pub fn event_stream(&self) -> impl futures::Stream<Item = ServerDeviceEvent> + Send + use<> {
486 let identifier = self.identifier.clone();
487 let hardware_stream = convert_broadcast_receiver_to_stream(self.hardware.event_stream())
488 .filter_map(move |hardware_event| {
489 let id = identifier.clone();
490 match hardware_event {
491 HardwareEvent::Disconnected(_) => Some(ServerDeviceEvent::Disconnected(id)),
492 HardwareEvent::Notification(_address, _endpoint, _data) => {
493 // TODO Does this still need to be here? Does this need to be routed to the protocol it's part of?
494 None
495 }
496 }
497 });
498
499 let identifier = self.identifier.clone();
500 let handler_mapped_stream = self.handler.event_stream().map(move |incoming_message| {
501 let id = identifier.clone();
502 ServerDeviceEvent::Notification(id, incoming_message)
503 });
504 hardware_stream.merge(handler_mapped_stream)
505 }
506
507 pub fn needs_update(&self, _command_message: &ButtplugDeviceCommandMessageUnionV4) -> bool {
508 true
509 }
510
511 pub fn as_device_message_info(&self, index: u32) -> DeviceMessageInfoV4 {
512 DeviceMessageInfoV4::new(
513 index,
514 &self.name(),
515 self.definition().display_name(),
516 100,
517 &self
518 .definition
519 .features()
520 .iter()
521 .enumerate()
522 .map(|(i, x)| (i as u32, x.as_device_feature(i as u32).expect("Infallible")))
523 .filter(|(_, x)| x.output().as_ref().is_some() || x.input().as_ref().is_some())
524 .collect::<BTreeMap<u32, DeviceFeature>>(),
525 )
526 }
527
528 // In order to not have to worry about id setting at the protocol level (this
529 // should be taken care of in the server's device manager), we return server
530 // messages but Buttplug errors.
531 pub fn parse_message(
532 &self,
533 command_message: ButtplugDeviceCommandMessageUnionV4,
534 ) -> ButtplugServerResultFuture {
535 match command_message {
536 // Input messages
537 ButtplugDeviceCommandMessageUnionV4::InputCmd(msg) => self.handle_input_cmd(msg),
538 // Actuator messages
539 ButtplugDeviceCommandMessageUnionV4::OutputCmd(msg) => self.handle_outputcmd_v4(&msg),
540 ButtplugDeviceCommandMessageUnionV4::OutputVecCmd(msg) => {
541 let mut futs = vec![];
542 let msg_id = msg.id();
543 for m in msg.value_vec() {
544 futs.push(self.handle_outputcmd_v4(m))
545 }
546 async move {
547 for f in futs {
548 f.await?;
549 }
550 Ok(message::OkV0::new(msg_id).into())
551 }
552 .boxed()
553 }
554 // Other generic messages
555 ButtplugDeviceCommandMessageUnionV4::StopDeviceCmd(_) => self.handle_stop_device_cmd(),
556 }
557 }
558
559 fn handle_outputcmd_v4(&self, msg: &CheckedOutputCmdV4) -> ButtplugServerResultFuture {
560 if let Some(last_msg) = self.last_output_command.get(&msg.feature_id())
561 && *last_msg == *msg
562 {
563 trace!("No commands generated for incoming device packet, skipping and returning success.");
564 return future::ready(Ok(message::OkV0::default().into())).boxed();
565 }
566 self
567 .last_output_command
568 .insert(msg.feature_id(), msg.clone());
569 self.handle_generic_command_result(self.handler.handle_output_cmd(msg))
570 }
571
572 fn handle_hardware_commands(&self, commands: Vec<HardwareCommand>) -> ButtplugServerResultFuture {
573 let sender = self.internal_hw_msg_sender.clone();
574 async move {
575 let _ = sender.send(commands).await;
576 Ok(message::OkV0::default().into())
577 }
578 .boxed()
579 }
580
581 fn handle_generic_command_result(
582 &self,
583 command_result: Result<Vec<HardwareCommand>, ButtplugDeviceError>,
584 ) -> ButtplugServerResultFuture {
585 let hardware_commands = match command_result {
586 Ok(commands) => commands,
587 Err(err) => return future::ready(Err(err.into())).boxed(),
588 };
589
590 self.handle_hardware_commands(hardware_commands)
591 }
592
593 fn handle_stop_device_cmd(&self) -> ButtplugServerResultFuture {
594 let mut fut_vec = vec![];
595 self
596 .stop_commands
597 .iter()
598 .for_each(|msg| fut_vec.push(self.parse_message(msg.clone())));
599 async move {
600 for fut in fut_vec {
601 fut.await?;
602 }
603 Ok(message::OkV0::default().into())
604 }
605 .boxed()
606 }
607
608 fn handle_input_cmd(
609 &self,
610 message: CheckedInputCmdV4,
611 ) -> BoxFuture<'static, Result<ButtplugServerMessageV4, ButtplugError>> {
612 match message.input_command() {
613 InputCommandType::Read => self.handle_input_read_cmd(
614 message.device_index(),
615 message.feature_index(),
616 message.feature_id(),
617 message.input_type(),
618 ),
619 InputCommandType::Subscribe => self.handle_input_subscribe_cmd(
620 message.device_index(),
621 message.feature_index(),
622 message.feature_id(),
623 message.input_type(),
624 ),
625 InputCommandType::Unsubscribe => self.handle_input_unsubscribe_cmd(
626 message.feature_index(),
627 message.feature_id(),
628 message.input_type(),
629 ),
630 }
631 }
632
633 fn handle_input_read_cmd(
634 &self,
635 device_index: u32,
636 feature_index: u32,
637 feature_id: Uuid,
638 input_type: InputType,
639 ) -> BoxFuture<'static, Result<ButtplugServerMessageV4, ButtplugError>> {
640 let device = self.hardware.clone();
641 let handler = self.handler.clone();
642 async move {
643 handler
644 .handle_input_read_cmd(device_index, device, feature_index, feature_id, input_type)
645 .await
646 .map_err(|e| e.into())
647 .map(|e| e.into())
648 }
649 .boxed()
650 }
651
652 fn handle_input_subscribe_cmd(
653 &self,
654 device_index: u32,
655 feature_index: u32,
656 feature_id: Uuid,
657 input_type: InputType,
658 ) -> ButtplugServerResultFuture {
659 let device = self.hardware.clone();
660 let handler = self.handler.clone();
661 async move {
662 handler
663 .handle_input_subscribe_cmd(device_index, device, feature_index, feature_id, input_type)
664 .await
665 .map(|_| message::OkV0::new(1).into())
666 .map_err(|e| e.into())
667 }
668 .boxed()
669 }
670
671 fn handle_input_unsubscribe_cmd(
672 &self,
673 feature_index: u32,
674 feature_id: Uuid,
675 input_type: InputType,
676 ) -> ButtplugServerResultFuture {
677 let device = self.hardware.clone();
678 let handler = self.handler.clone();
679 async move {
680 handler
681 .handle_input_unsubscribe_cmd(device, feature_index, feature_id, input_type)
682 .await
683 .map(|_| message::OkV0::new(1).into())
684 .map_err(|e| e.into())
685 }
686 .boxed()
687 }
688}