Buttplug sex toy control library
at dev 11 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8//! Buttplug Device Manager, manages Device Subtype (Platform/Communication bus 9//! specific) Managers 10 11use crate::{ 12 ButtplugServerError, 13 ButtplugServerResultFuture, 14 device::{ 15 ServerDevice, 16 hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerBuilder}, 17 server_device_manager_event_loop::ServerDeviceManagerEventLoop, 18 }, 19 message::{ 20 server_device_attributes::ServerDeviceAttributes, 21 spec_enums::{ 22 ButtplugCheckedClientMessageV4, 23 ButtplugDeviceCommandMessageUnionV4, 24 ButtplugDeviceManagerMessageUnion, 25 }, 26 }, 27}; 28use buttplug_core::{ 29 errors::{ButtplugDeviceError, ButtplugMessageError, ButtplugUnknownError}, 30 message::{self, ButtplugDeviceMessage, ButtplugMessage, ButtplugServerMessageV4, DeviceListV4}, 31 util::{async_manager, stream::convert_broadcast_receiver_to_stream}, 32}; 33use buttplug_server_device_config::{DeviceConfigurationManager, UserDeviceIdentifier}; 34use dashmap::DashMap; 35use futures::{ 36 Stream, 37 future::{self, FutureExt}, 38}; 39use getset::Getters; 40use std::{ 41 collections::HashMap, 42 convert::TryFrom, 43 sync::{ 44 Arc, 45 atomic::{AtomicBool, Ordering}, 46 }, 47}; 48use tokio::sync::{broadcast, mpsc}; 49use tokio_util::sync::CancellationToken; 50 51#[derive(Debug)] 52pub(super) enum DeviceManagerCommand { 53 StartScanning, 54 StopScanning, 55} 56 57#[derive(Debug, Getters)] 58#[getset(get = "pub")] 59pub struct ServerDeviceInfo { 60 identifier: UserDeviceIdentifier, 61 display_name: Option<String>, 62} 63 64pub struct ServerDeviceManagerBuilder { 65 device_configuration_manager: Arc<DeviceConfigurationManager>, 66 comm_managers: Vec<Box<dyn HardwareCommunicationManagerBuilder>>, 67} 68 69impl ServerDeviceManagerBuilder { 70 pub fn new(device_configuration_manager: DeviceConfigurationManager) -> Self { 71 Self { 72 device_configuration_manager: Arc::new(device_configuration_manager), 73 comm_managers: vec![], 74 } 75 } 76 77 /// Use a prebuilt device configuration manager that needs to be shared with the outside world 78 /// (usually for serialization of user configurations to file) 79 pub fn new_with_arc(device_configuration_manager: Arc<DeviceConfigurationManager>) -> Self { 80 Self { 81 device_configuration_manager, 82 comm_managers: vec![], 83 } 84 } 85 86 pub fn comm_manager<T>(&mut self, builder: T) -> &mut Self 87 where 88 T: HardwareCommunicationManagerBuilder + 'static, 89 { 90 self.comm_managers.push(Box::new(builder)); 91 self 92 } 93 94 pub fn finish(&mut self) -> Result<ServerDeviceManager, ButtplugServerError> { 95 let (device_command_sender, device_command_receiver) = mpsc::channel(256); 96 let (device_event_sender, device_event_receiver) = mpsc::channel(256); 97 let mut comm_managers: Vec<Box<dyn HardwareCommunicationManager>> = Vec::new(); 98 for builder in &mut self.comm_managers { 99 let comm_mgr = builder.finish(device_event_sender.clone()); 100 101 if comm_managers 102 .iter() 103 .any(|mgr| mgr.name() == comm_mgr.name()) 104 { 105 return Err( 106 ButtplugServerError::DeviceCommunicationManagerTypeAlreadyAdded( 107 comm_mgr.name().to_owned(), 108 ), 109 ); 110 } 111 112 comm_managers.push(comm_mgr); 113 } 114 115 let mut colliding_dcms = vec![]; 116 for mgr in comm_managers.iter() { 117 info!("{}: {}", mgr.name(), mgr.can_scan()); 118 // Hack: Lovense and Bluetooth dongles will fight with each other over devices, possibly 119 // interrupting each other connecting and causing very weird issues for users. Print a 120 // warning message to logs if more than one is active and available to scan. 121 if [ 122 "BtlePlugCommunicationManager", 123 "LovenseSerialDongleCommunicationManager", 124 "LovenseHIDDongleCommunicationManager", 125 ] 126 .iter() 127 .any(|x| x == &mgr.name()) 128 && mgr.can_scan() 129 { 130 colliding_dcms.push(mgr.name().to_owned()); 131 } 132 } 133 if colliding_dcms.len() > 1 { 134 warn!( 135 "The following device connection methods may collide: {}. This may mean you have lovense dongles and bluetooth dongles connected at the same time. Please disconnect the lovense dongles or turn off the Lovense HID/Serial Dongle support in Intiface/Buttplug. Lovense devices will work with the Bluetooth dongle.", 136 colliding_dcms.join(", ") 137 ); 138 } 139 140 let devices = Arc::new(DashMap::new()); 141 let loop_cancellation_token = CancellationToken::new(); 142 143 let output_sender = broadcast::channel(255).0; 144 145 let mut event_loop = ServerDeviceManagerEventLoop::new( 146 comm_managers, 147 self.device_configuration_manager.clone(), 148 devices.clone(), 149 loop_cancellation_token.child_token(), 150 output_sender.clone(), 151 device_event_receiver, 152 device_command_receiver, 153 ); 154 async_manager::spawn(async move { 155 event_loop.run().await; 156 }); 157 Ok(ServerDeviceManager { 158 device_configuration_manager: self.device_configuration_manager.clone(), 159 devices, 160 device_command_sender, 161 loop_cancellation_token, 162 running: Arc::new(AtomicBool::new(true)), 163 output_sender, 164 }) 165 } 166} 167 168#[derive(Getters)] 169pub struct ServerDeviceManager { 170 #[getset(get = "pub")] 171 device_configuration_manager: Arc<DeviceConfigurationManager>, 172 #[getset(get = "pub(crate)")] 173 devices: Arc<DashMap<u32, Arc<ServerDevice>>>, 174 device_command_sender: mpsc::Sender<DeviceManagerCommand>, 175 loop_cancellation_token: CancellationToken, 176 running: Arc<AtomicBool>, 177 output_sender: broadcast::Sender<ButtplugServerMessageV4>, 178} 179 180impl ServerDeviceManager { 181 pub fn event_stream(&self) -> impl Stream<Item = ButtplugServerMessageV4> + use<> { 182 // Unlike the client API, we can expect anyone using the server to pin this 183 // themselves. 184 convert_broadcast_receiver_to_stream(self.output_sender.subscribe()) 185 } 186 187 fn start_scanning(&self) -> ButtplugServerResultFuture { 188 let command_sender = self.device_command_sender.clone(); 189 async move { 190 if command_sender 191 .send(DeviceManagerCommand::StartScanning) 192 .await 193 .is_err() 194 { 195 // TODO Fill in error. 196 } 197 Ok(message::OkV0::default().into()) 198 } 199 .boxed() 200 } 201 202 fn stop_scanning(&self) -> ButtplugServerResultFuture { 203 let command_sender = self.device_command_sender.clone(); 204 async move { 205 if command_sender 206 .send(DeviceManagerCommand::StopScanning) 207 .await 208 .is_err() 209 { 210 // TODO Fill in error. 211 } 212 Ok(message::OkV0::default().into()) 213 } 214 .boxed() 215 } 216 217 pub(crate) fn stop_all_devices(&self) -> ButtplugServerResultFuture { 218 let device_map = self.devices.clone(); 219 // TODO This could use some error reporting. 220 async move { 221 let fut_vec: Vec<_> = device_map 222 .iter() 223 .map(|dev| { 224 let device = dev.value(); 225 device.parse_message(message::StopDeviceCmdV0::new(1).into()) 226 }) 227 .collect(); 228 future::join_all(fut_vec).await; 229 Ok(message::OkV0::default().into()) 230 } 231 .boxed() 232 } 233 234 fn parse_device_message( 235 &self, 236 device_msg: ButtplugDeviceCommandMessageUnionV4, 237 ) -> ButtplugServerResultFuture { 238 match self.devices.get(&device_msg.device_index()) { 239 Some(device) => { 240 //let fut = device.parse_message(device_msg); 241 device.parse_message(device_msg) 242 // Create a future to run the message through the device, then handle adding the id to the result. 243 //fut.boxed() 244 } 245 None => ButtplugDeviceError::DeviceNotAvailable(device_msg.device_index()).into(), 246 } 247 } 248 249 fn generate_device_list(&self) -> DeviceListV4 { 250 let devices = self 251 .devices 252 .iter() 253 .map(|device| device.value().as_device_message_info(*device.key())) 254 .collect(); 255 DeviceListV4::new(devices) 256 } 257 258 fn parse_device_manager_message( 259 &self, 260 manager_msg: ButtplugDeviceManagerMessageUnion, 261 ) -> ButtplugServerResultFuture { 262 match manager_msg { 263 ButtplugDeviceManagerMessageUnion::RequestDeviceList(msg) => { 264 let mut device_list = self.generate_device_list(); 265 device_list.set_id(msg.id()); 266 future::ready(Ok(device_list.into())).boxed() 267 } 268 ButtplugDeviceManagerMessageUnion::StopAllDevices(_) => self.stop_all_devices(), 269 ButtplugDeviceManagerMessageUnion::StartScanning(_) => self.start_scanning(), 270 ButtplugDeviceManagerMessageUnion::StopScanning(_) => self.stop_scanning(), 271 } 272 } 273 274 pub fn parse_message(&self, msg: ButtplugCheckedClientMessageV4) -> ButtplugServerResultFuture { 275 if !self.running.load(Ordering::Relaxed) { 276 return future::ready(Err(ButtplugUnknownError::DeviceManagerNotRunning.into())).boxed(); 277 } 278 // If this is a device command message, just route it directly to the 279 // device. 280 if let Ok(device_msg) = ButtplugDeviceCommandMessageUnionV4::try_from(msg.clone()) { 281 self.parse_device_message(device_msg) 282 } else if let Ok(manager_msg) = ButtplugDeviceManagerMessageUnion::try_from(msg.clone()) { 283 self.parse_device_manager_message(manager_msg) 284 } else { 285 ButtplugMessageError::UnexpectedMessageType(format!("{msg:?}")).into() 286 } 287 } 288 289 pub(crate) fn feature_map(&self) -> HashMap<u32, ServerDeviceAttributes> { 290 self 291 .devices() 292 .iter() 293 .map(|x| (*x.key(), x.legacy_attributes().clone())) 294 .collect() 295 } 296 297 pub fn device_info(&self, index: u32) -> Option<ServerDeviceInfo> { 298 self.devices.get(&index).map(|device| ServerDeviceInfo { 299 identifier: device.value().identifier().clone(), 300 display_name: device.value().definition().display_name().clone(), 301 }) 302 } 303 304 // Only a ButtplugServer should be able to call this. We don't want to expose this capability to 305 // the outside world. Note that this could cause issues for lifetimes if someone holds this longer 306 // than the lifetime of the server that originally created it. Ideally we should lock the Server 307 // Device Manager lifetime to the owning ButtplugServer lifetime to ensure that doesn't happen, 308 // but that's going to be complicated. 309 pub(crate) fn shutdown(&self) -> ButtplugServerResultFuture { 310 let devices = self.devices.clone(); 311 // Make sure that, once our owning server shuts us down, no one outside can use this manager 312 // again. Otherwise we can have all sorts of ownership weirdness. 313 self.running.store(false, Ordering::Relaxed); 314 let stop_scanning = self.stop_scanning(); 315 let stop_devices = self.stop_all_devices(); 316 let token = self.loop_cancellation_token.clone(); 317 async move { 318 // Force stop scanning, otherwise we can disconnect and instantly try to reconnect while 319 // cleaning up if we're still scanning. 320 let _ = stop_scanning.await; 321 let _ = stop_devices.await; 322 for device in devices.iter() { 323 device.value().disconnect().await?; 324 } 325 token.cancel(); 326 Ok(message::OkV0::default().into()) 327 } 328 .boxed() 329 } 330} 331 332impl Drop for ServerDeviceManager { 333 fn drop(&mut self) { 334 info!("Dropping device manager!"); 335 self.loop_cancellation_token.cancel(); 336 } 337}