Buttplug sex toy control library
at dev 16 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8use buttplug_core::{ 9 message::{ButtplugServerMessageV4, DeviceListV4, ScanningFinishedV0}, 10 util::async_manager, 11}; 12use buttplug_server_device_config::DeviceConfigurationManager; 13use tracing::info_span; 14 15use crate::device::{ 16 ServerDevice, 17 ServerDeviceEvent, 18 hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerEvent}, 19 protocol::ProtocolManager, 20}; 21use dashmap::{DashMap, DashSet}; 22use futures::{FutureExt, StreamExt, future, pin_mut}; 23use std::sync::{Arc, atomic::AtomicBool}; 24use tokio::sync::{broadcast, mpsc}; 25use tokio_util::sync::CancellationToken; 26use tracing_futures::Instrument; 27 28use super::server_device_manager::DeviceManagerCommand; 29 30pub(super) struct ServerDeviceManagerEventLoop { 31 comm_managers: Vec<Box<dyn HardwareCommunicationManager>>, 32 device_config_manager: Arc<DeviceConfigurationManager>, 33 device_command_receiver: mpsc::Receiver<DeviceManagerCommand>, 34 /// Maps device index (exposed to the outside world) to actual device objects held by the server. 35 device_map: Arc<DashMap<u32, Arc<ServerDevice>>>, 36 /// Broadcaster that relays device events in the form of Buttplug Messages to 37 /// whoever owns the Buttplug Server. 38 server_sender: broadcast::Sender<ButtplugServerMessageV4>, 39 /// As the device manager owns the Device Communication Managers, it will have 40 /// a receiver that the comm managers all send thru. 41 device_comm_receiver: mpsc::Receiver<HardwareCommunicationManagerEvent>, 42 /// Sender for device events, passed to new devices when they are created. 43 device_event_sender: mpsc::Sender<ServerDeviceEvent>, 44 /// Receiver for device events, which the event loops to handle events. 45 device_event_receiver: mpsc::Receiver<ServerDeviceEvent>, 46 /// True if StartScanning has been called but no ScanningFinished has been 47 /// emitted yet. 48 scanning_bringup_in_progress: bool, 49 /// Denote whether scanning has been started since we last sent a ScanningFinished message. 50 scanning_started: bool, 51 /// Devices currently trying to connect. 52 connecting_devices: Arc<DashSet<String>>, 53 /// Cancellation token for the event loop 54 loop_cancellation_token: CancellationToken, 55 /// True if stop scanning message was sent, means we won't send scanning finished. 56 stop_scanning_received: AtomicBool, 57 /// Protocol map, for mapping user definitions to protocols 58 protocol_manager: ProtocolManager, 59} 60 61impl ServerDeviceManagerEventLoop { 62 pub fn new( 63 comm_managers: Vec<Box<dyn HardwareCommunicationManager>>, 64 device_config_manager: Arc<DeviceConfigurationManager>, 65 device_map: Arc<DashMap<u32, Arc<ServerDevice>>>, 66 loop_cancellation_token: CancellationToken, 67 server_sender: broadcast::Sender<ButtplugServerMessageV4>, 68 device_comm_receiver: mpsc::Receiver<HardwareCommunicationManagerEvent>, 69 device_command_receiver: mpsc::Receiver<DeviceManagerCommand>, 70 ) -> Self { 71 let (device_event_sender, device_event_receiver) = mpsc::channel(256); 72 Self { 73 comm_managers, 74 device_config_manager, 75 server_sender, 76 device_map, 77 device_comm_receiver, 78 device_event_sender, 79 device_event_receiver, 80 device_command_receiver, 81 scanning_bringup_in_progress: false, 82 scanning_started: false, 83 connecting_devices: Arc::new(DashSet::new()), 84 loop_cancellation_token, 85 stop_scanning_received: AtomicBool::new(false), 86 protocol_manager: ProtocolManager::default(), 87 } 88 } 89 90 fn scanning_status(&self) -> bool { 91 if self.comm_managers.iter().any(|x| x.scanning_status()) { 92 debug!("At least one manager still scanning, continuing event loop."); 93 return true; 94 } 95 false 96 } 97 98 async fn handle_start_scanning(&mut self) { 99 if self.scanning_status() || self.scanning_bringup_in_progress { 100 debug!("System already scanning, ignoring new scanning request"); 101 return; 102 } 103 self 104 .stop_scanning_received 105 .store(false, std::sync::atomic::Ordering::Relaxed); 106 info!("No scan currently in progress, starting new scan."); 107 self.scanning_bringup_in_progress = true; 108 self.scanning_started = true; 109 let fut_vec: Vec<_> = self 110 .comm_managers 111 .iter_mut() 112 .map(|guard| guard.start_scanning()) 113 .collect(); 114 // TODO If start_scanning fails anywhere, this will ignore it. We should maybe at least log? 115 future::join_all(fut_vec).await; 116 debug!("Scanning started for all hardware comm managers."); 117 self.scanning_bringup_in_progress = false; 118 } 119 120 async fn handle_stop_scanning(&mut self) { 121 self 122 .stop_scanning_received 123 .store(true, std::sync::atomic::Ordering::Relaxed); 124 let fut_vec: Vec<_> = self 125 .comm_managers 126 .iter_mut() 127 .map(|guard| guard.stop_scanning()) 128 .collect(); 129 // TODO If stop_scanning fails anywhere, this will ignore it. We should maybe at least log? 130 future::join_all(fut_vec).await; 131 } 132 133 async fn handle_device_communication(&mut self, event: HardwareCommunicationManagerEvent) { 134 match event { 135 HardwareCommunicationManagerEvent::ScanningFinished => { 136 debug!( 137 "System signaled that scanning was finished, check to see if all managers are finished." 138 ); 139 if self.scanning_bringup_in_progress { 140 debug!( 141 "Hardware Comm Manager finished before scanning was fully started, continuing event loop." 142 ); 143 return; 144 } 145 // Only send scanning finished if we haven't requested a stop. 146 if !self.scanning_status() 147 && self.scanning_started 148 && !self 149 .stop_scanning_received 150 .load(std::sync::atomic::Ordering::Relaxed) 151 { 152 debug!("All managers finished, emitting ScanningFinished"); 153 self.scanning_started = false; 154 if self 155 .server_sender 156 .send(ScanningFinishedV0::default().into()) 157 .is_err() 158 { 159 info!("Server disappeared, exiting loop."); 160 } 161 } 162 } 163 HardwareCommunicationManagerEvent::DeviceFound { 164 name, 165 address, 166 creator, 167 } => { 168 info!("Device {} ({}) found.", name, address); 169 // Make sure the device isn't on the deny list, or is on the allow list if anything is on it. 170 if !self.device_config_manager.address_allowed(&address) { 171 return; 172 } 173 debug!( 174 "Device {} allowed via configuration file, continuing.", 175 address 176 ); 177 178 // Check to make sure the device isn't already connected. If it is, drop what we've been 179 // sent and return. 180 if self 181 .device_map 182 .iter() 183 .any(|entry| *entry.value().identifier().address() == address) 184 { 185 debug!( 186 "Device {} already connected, ignoring new device event.", 187 address 188 ); 189 return; 190 } 191 192 // First off, we need to see if we even have a configuration available for the device we're 193 // trying to create. If we don't, exit, because this isn't actually an error. However, if we 194 // actually *do* have a configuration but something goes wrong after this, then it's an 195 // error. 196 // 197 // We used to do this in build_server_device, but we shouldn't mark devices as actually 198 // connecting until after this happens, so we're moving it back here. 199 let protocol_specializers = self.protocol_manager.protocol_specializers( 200 &creator.specifier(), 201 self.device_config_manager.base_communication_specifiers(), 202 self.device_config_manager.user_communication_specifiers(), 203 ); 204 205 // If we have no identifiers, then there's nothing to do here. Throw an error. 206 if protocol_specializers.is_empty() { 207 debug!( 208 "{}", 209 format!( 210 "No viable protocols for hardware {:?}, ignoring.", 211 creator.specifier() 212 ) 213 ); 214 return; 215 } 216 217 // Some device managers (like bluetooth) can send multiple DeviceFound events for the same 218 // device, due to how things like advertisements work. We'll filter this at the 219 // DeviceManager level to make sure that even if a badly coded DCM throws multiple found 220 // events, we only listen to the first one. 221 if !self.connecting_devices.insert(address.clone()) { 222 info!( 223 "Device {} currently trying to connect, ignoring new device event.", 224 address 225 ); 226 return; 227 } 228 229 let device_event_sender_clone = self.device_event_sender.clone(); 230 231 let device_config_manager = self.device_config_manager.clone(); 232 let connecting_devices = self.connecting_devices.clone(); 233 let span = info_span!( 234 "device creation", 235 name = tracing::field::display(name), 236 address = tracing::field::display(address.clone()) 237 ); 238 239 async_manager::spawn(async move { 240 match ServerDevice::build(device_config_manager, creator, protocol_specializers).await { 241 Ok(device) => { 242 if device_event_sender_clone 243 .send(ServerDeviceEvent::Connected(Arc::new(device))) 244 .await 245 .is_err() { 246 error!("Device manager disappeared before connection established, device will be dropped."); 247 } 248 }, 249 Err(e) => { 250 error!("Device errored while trying to connect: {:?}", e); 251 } 252 } 253 connecting_devices.remove(&address); 254 }.instrument(span)); 255 } 256 } 257 } 258 259 fn generate_device_list(&self) -> DeviceListV4 { 260 let devices = self 261 .device_map 262 .iter() 263 .map(|device| device.value().as_device_message_info(*device.key())) 264 .collect(); 265 DeviceListV4::new(devices) 266 } 267 268 async fn handle_device_event(&mut self, device_event: ServerDeviceEvent) { 269 trace!("Got device event: {:?}", device_event); 270 match device_event { 271 ServerDeviceEvent::Connected(device) => { 272 let span = info_span!( 273 "device registration", 274 name = tracing::field::display(device.name()), 275 identifier = tracing::field::debug(device.identifier()) 276 ); 277 let _enter = span.enter(); 278 279 // Get the index from the device 280 let device_index = device.definition().index(); 281 // Since we can now reuse device indexes, this means we might possibly 282 // stomp on devices already in the map if they don't register a 283 // disconnect before we try to insert the new device. If we have a 284 // device already in the map with the same index (and therefore same 285 // address), consider it disconnected and eject it from the map. This 286 // should also trigger a disconnect event before our new DeviceAdded 287 // message goes out, so timing matters here. 288 match self.device_map.remove(&device_index) { 289 Some((_, old_device)) => { 290 info!("Device map contains key {}.", device_index); 291 // After removing the device from the array, manually disconnect it to 292 // make sure the event is thrown. 293 if let Err(err) = old_device.disconnect().await { 294 // If we throw an error during the disconnect, we can't really do 295 // anything with it, but should at least log it. 296 error!("Error during index collision disconnect: {:?}", err); 297 } 298 } 299 _ => { 300 info!("Device map does not contain key {}.", device_index); 301 } 302 } 303 304 // Create event loop for forwarding device events into our selector. 305 let event_listener = device.event_stream(); 306 let event_sender = self.device_event_sender.clone(); 307 async_manager::spawn(async move { 308 pin_mut!(event_listener); 309 // This can fail if the event_sender loses the server before this loop dies. 310 while let Some(event) = event_listener.next().await { 311 if event_sender.send(event).await.is_err() { 312 info!("Event sending failure in servier device manager event loop, exiting."); 313 break; 314 } 315 } 316 }); 317 318 info!("Assigning index {} to {}", device_index, device.name()); 319 self.device_map.insert(device_index, device.clone()); 320 321 let device_update_message: ButtplugServerMessageV4 = self.generate_device_list().into(); 322 323 // After that, we can send out to the server's event listeners to let 324 // them know a device has been added. 325 if self.server_sender.send(device_update_message).is_err() { 326 debug!("Server not currently available, dropping Device Added event."); 327 } 328 } 329 ServerDeviceEvent::Disconnected(identifier) => { 330 let mut device_index = None; 331 for device_pair in self.device_map.iter() { 332 if *device_pair.value().identifier() == identifier { 333 device_index = Some(*device_pair.key()); 334 break; 335 } 336 } 337 if let Some(device_index) = device_index { 338 self 339 .device_map 340 .remove(&device_index) 341 .expect("Remove will always work."); 342 let device_update_message: ButtplugServerMessageV4 = self.generate_device_list().into(); 343 if self.server_sender.send(device_update_message).is_err() { 344 debug!("Server not currently available, dropping Device Removed event."); 345 } 346 } 347 } 348 ServerDeviceEvent::Notification(_, message) => { 349 if self.server_sender.send(message.into()).is_err() { 350 debug!("Server not currently available, dropping Device Added event."); 351 } 352 } 353 } 354 } 355 356 pub async fn run(&mut self) { 357 debug!("Starting Device Manager Loop"); 358 loop { 359 tokio::select! { 360 device_comm_msg = self.device_comm_receiver.recv() => { 361 if let Some(msg) = device_comm_msg { 362 trace!("Got device communication message {:?}", msg); 363 self.handle_device_communication(msg).await; 364 } else { 365 break; 366 } 367 } 368 device_event_msg = self.device_event_receiver.recv() => { 369 if let Some(msg) = device_event_msg { 370 trace!("Got device event message {:?}", msg); 371 self.handle_device_event(msg).await; 372 } else { 373 error!("We shouldn't be able to get here since we also own the sender."); 374 break; 375 } 376 }, 377 device_command_msg = self.device_command_receiver.recv() => { 378 if let Some(msg) = device_command_msg { 379 trace!("Got device command message {:?}", msg); 380 match msg { 381 DeviceManagerCommand::StartScanning => self.handle_start_scanning().await, 382 DeviceManagerCommand::StopScanning => self.handle_stop_scanning().await, 383 } 384 } else { 385 debug!("Channel to Device Manager frontend dropped, exiting event loop."); 386 break; 387 } 388 } 389 _ = self.loop_cancellation_token.cancelled().fuse() => { 390 debug!("Device event loop cancelled, exiting."); 391 break; 392 } 393 } 394 } 395 debug!("Exiting Device Manager Loop"); 396 } 397}