Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use crate::{
9 device::{
10 hardware::{Hardware, HardwareEvent, HardwareSubscribeCmd, HardwareUnsubscribeCmd},
11 protocol::{ProtocolHandler, generic_protocol_setup},
12 },
13 message::ButtplugServerDeviceMessage,
14};
15use buttplug_core::{
16 errors::ButtplugDeviceError,
17 message::{InputData, InputReadingV4, InputType},
18 util::{async_manager, stream::convert_broadcast_receiver_to_stream},
19};
20use buttplug_server_device_config::Endpoint;
21use dashmap::DashSet;
22use futures::{
23 FutureExt,
24 StreamExt,
25 future::{self, BoxFuture},
26};
27use std::{pin::Pin, sync::Arc};
28use tokio::sync::broadcast;
29use uuid::Uuid;
30
31generic_protocol_setup!(KGoalBoost, "kgoal-boost");
32
33pub struct KGoalBoost {
34 // Set of sensors we've subscribed to for updates.
35 subscribed_sensors: Arc<DashSet<u32>>,
36 event_stream: broadcast::Sender<ButtplugServerDeviceMessage>,
37}
38
39impl Default for KGoalBoost {
40 fn default() -> Self {
41 let (sender, _) = broadcast::channel(256);
42 Self {
43 subscribed_sensors: Arc::new(DashSet::new()),
44 event_stream: sender,
45 }
46 }
47}
48
49impl ProtocolHandler for KGoalBoost {
50 fn event_stream(
51 &self,
52 ) -> Pin<Box<dyn futures::Stream<Item = ButtplugServerDeviceMessage> + Send>> {
53 convert_broadcast_receiver_to_stream(self.event_stream.subscribe()).boxed()
54 }
55
56 fn handle_input_subscribe_cmd(
57 &self,
58 device_index: u32,
59 device: Arc<Hardware>,
60 feature_index: u32,
61 feature_id: Uuid,
62 _sensor_type: InputType,
63 ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> {
64 if self.subscribed_sensors.contains(&feature_index) {
65 return future::ready(Ok(())).boxed();
66 }
67 let sensors = self.subscribed_sensors.clone();
68 // Readout value: 0x000104000005d3
69 // Byte 0: Always 0x00
70 // Byte 1: Always 0x01
71 // Byte 2: Always 0x04
72 // Byte 3-4: Normalized u16 Reading
73 // Byte 5-6: Raw u16 Reading
74 async move {
75 // If we have no sensors we're currently subscribed to, we'll need to bring up our BLE
76 // characteristic subscription.
77 if sensors.is_empty() {
78 device
79 .subscribe(&HardwareSubscribeCmd::new(feature_id, Endpoint::RxPressure))
80 .await?;
81 let sender = self.event_stream.clone();
82 let mut hardware_stream = device.event_stream();
83 let stream_sensors = sensors.clone();
84 // If we subscribe successfully, we need to set up our event handler.
85 async_manager::spawn(async move {
86 while let Ok(info) = hardware_stream.recv().await {
87 // If we have no receivers, quit.
88 if sender.receiver_count() == 0 || stream_sensors.is_empty() {
89 return;
90 }
91 if let HardwareEvent::Notification(_, endpoint, data) = info
92 && endpoint == Endpoint::RxPressure
93 {
94 if data.len() < 7 {
95 // Not even sure how this would happen, error and continue on.
96 error!("KGoal Boost data not expected length!");
97 continue;
98 }
99 // Extract our two pressure values.
100 let normalized = (data[3] as u32) << 8 | data[4] as u32;
101 let unnormalized = (data[5] as u32) << 8 | data[6] as u32;
102 if stream_sensors.contains(&0)
103 && sender
104 .send(
105 InputReadingV4::new(
106 device_index,
107 feature_index,
108 buttplug_core::message::InputTypeData::Pressure(InputData::new(normalized)),
109 )
110 .into(),
111 )
112 .is_err()
113 {
114 debug!("Hardware device listener for KGoal Boost shut down, returning from task.");
115 return;
116 }
117 if stream_sensors.contains(&1)
118 && sender
119 .send(
120 InputReadingV4::new(
121 device_index,
122 feature_index,
123 buttplug_core::message::InputTypeData::Pressure(InputData::new(unnormalized)),
124 )
125 .into(),
126 )
127 .is_err()
128 {
129 debug!("Hardware device listener for KGoal Boost shut down, returning from task.");
130 return;
131 }
132 }
133 }
134 });
135 }
136 sensors.insert(feature_index);
137 Ok(())
138 }
139 .boxed()
140 }
141
142 fn handle_input_unsubscribe_cmd(
143 &self,
144 device: Arc<Hardware>,
145 feature_index: u32,
146 feature_id: Uuid,
147 _sensor_type: InputType,
148 ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> {
149 if !self.subscribed_sensors.contains(&feature_index) {
150 return future::ready(Ok(())).boxed();
151 }
152 let sensors = self.subscribed_sensors.clone();
153 async move {
154 // If we have no sensors we're currently subscribed to, we'll need to bring up our BLE
155 // characteristic subscription.
156 sensors.remove(&feature_index);
157 if sensors.is_empty() {
158 device
159 .unsubscribe(&HardwareUnsubscribeCmd::new(
160 feature_id,
161 Endpoint::RxPressure,
162 ))
163 .await?;
164 }
165 Ok(())
166 }
167 .boxed()
168 }
169}