Buttplug sex toy control library
at dev 5.5 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8use crate::{ 9 device::{ 10 hardware::{Hardware, HardwareEvent, HardwareSubscribeCmd, HardwareUnsubscribeCmd}, 11 protocol::{ProtocolHandler, generic_protocol_setup}, 12 }, 13 message::ButtplugServerDeviceMessage, 14}; 15use buttplug_core::{ 16 errors::ButtplugDeviceError, 17 message::{InputData, InputReadingV4, InputType}, 18 util::{async_manager, stream::convert_broadcast_receiver_to_stream}, 19}; 20use buttplug_server_device_config::Endpoint; 21use dashmap::DashSet; 22use futures::{ 23 FutureExt, 24 StreamExt, 25 future::{self, BoxFuture}, 26}; 27use std::{pin::Pin, sync::Arc}; 28use tokio::sync::broadcast; 29use uuid::Uuid; 30 31generic_protocol_setup!(KGoalBoost, "kgoal-boost"); 32 33pub struct KGoalBoost { 34 // Set of sensors we've subscribed to for updates. 35 subscribed_sensors: Arc<DashSet<u32>>, 36 event_stream: broadcast::Sender<ButtplugServerDeviceMessage>, 37} 38 39impl Default for KGoalBoost { 40 fn default() -> Self { 41 let (sender, _) = broadcast::channel(256); 42 Self { 43 subscribed_sensors: Arc::new(DashSet::new()), 44 event_stream: sender, 45 } 46 } 47} 48 49impl ProtocolHandler for KGoalBoost { 50 fn event_stream( 51 &self, 52 ) -> Pin<Box<dyn futures::Stream<Item = ButtplugServerDeviceMessage> + Send>> { 53 convert_broadcast_receiver_to_stream(self.event_stream.subscribe()).boxed() 54 } 55 56 fn handle_input_subscribe_cmd( 57 &self, 58 device_index: u32, 59 device: Arc<Hardware>, 60 feature_index: u32, 61 feature_id: Uuid, 62 _sensor_type: InputType, 63 ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { 64 if self.subscribed_sensors.contains(&feature_index) { 65 return future::ready(Ok(())).boxed(); 66 } 67 let sensors = self.subscribed_sensors.clone(); 68 // Readout value: 0x000104000005d3 69 // Byte 0: Always 0x00 70 // Byte 1: Always 0x01 71 // Byte 2: Always 0x04 72 // Byte 3-4: Normalized u16 Reading 73 // Byte 5-6: Raw u16 Reading 74 async move { 75 // If we have no sensors we're currently subscribed to, we'll need to bring up our BLE 76 // characteristic subscription. 77 if sensors.is_empty() { 78 device 79 .subscribe(&HardwareSubscribeCmd::new(feature_id, Endpoint::RxPressure)) 80 .await?; 81 let sender = self.event_stream.clone(); 82 let mut hardware_stream = device.event_stream(); 83 let stream_sensors = sensors.clone(); 84 // If we subscribe successfully, we need to set up our event handler. 85 async_manager::spawn(async move { 86 while let Ok(info) = hardware_stream.recv().await { 87 // If we have no receivers, quit. 88 if sender.receiver_count() == 0 || stream_sensors.is_empty() { 89 return; 90 } 91 if let HardwareEvent::Notification(_, endpoint, data) = info 92 && endpoint == Endpoint::RxPressure 93 { 94 if data.len() < 7 { 95 // Not even sure how this would happen, error and continue on. 96 error!("KGoal Boost data not expected length!"); 97 continue; 98 } 99 // Extract our two pressure values. 100 let normalized = (data[3] as u32) << 8 | data[4] as u32; 101 let unnormalized = (data[5] as u32) << 8 | data[6] as u32; 102 if stream_sensors.contains(&0) 103 && sender 104 .send( 105 InputReadingV4::new( 106 device_index, 107 feature_index, 108 buttplug_core::message::InputTypeData::Pressure(InputData::new(normalized)), 109 ) 110 .into(), 111 ) 112 .is_err() 113 { 114 debug!("Hardware device listener for KGoal Boost shut down, returning from task."); 115 return; 116 } 117 if stream_sensors.contains(&1) 118 && sender 119 .send( 120 InputReadingV4::new( 121 device_index, 122 feature_index, 123 buttplug_core::message::InputTypeData::Pressure(InputData::new(unnormalized)), 124 ) 125 .into(), 126 ) 127 .is_err() 128 { 129 debug!("Hardware device listener for KGoal Boost shut down, returning from task."); 130 return; 131 } 132 } 133 } 134 }); 135 } 136 sensors.insert(feature_index); 137 Ok(()) 138 } 139 .boxed() 140 } 141 142 fn handle_input_unsubscribe_cmd( 143 &self, 144 device: Arc<Hardware>, 145 feature_index: u32, 146 feature_id: Uuid, 147 _sensor_type: InputType, 148 ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { 149 if !self.subscribed_sensors.contains(&feature_index) { 150 return future::ready(Ok(())).boxed(); 151 } 152 let sensors = self.subscribed_sensors.clone(); 153 async move { 154 // If we have no sensors we're currently subscribed to, we'll need to bring up our BLE 155 // characteristic subscription. 156 sensors.remove(&feature_index); 157 if sensors.is_empty() { 158 device 159 .unsubscribe(&HardwareUnsubscribeCmd::new( 160 feature_id, 161 Endpoint::RxPressure, 162 )) 163 .await?; 164 } 165 Ok(()) 166 } 167 .boxed() 168 } 169}