Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use buttplug_core::{
9 message::{ButtplugServerMessageV4, DeviceListV4, ScanningFinishedV0},
10 util::async_manager,
11};
12use buttplug_server_device_config::DeviceConfigurationManager;
13use tracing::info_span;
14
15use crate::device::{
16 ServerDevice,
17 ServerDeviceEvent,
18 hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerEvent},
19 protocol::ProtocolManager,
20};
21use dashmap::{DashMap, DashSet};
22use futures::{FutureExt, StreamExt, future, pin_mut};
23use std::sync::{Arc, atomic::AtomicBool};
24use tokio::sync::{broadcast, mpsc};
25use tokio_util::sync::CancellationToken;
26use tracing_futures::Instrument;
27
28use super::server_device_manager::DeviceManagerCommand;
29
30pub(super) struct ServerDeviceManagerEventLoop {
31 comm_managers: Vec<Box<dyn HardwareCommunicationManager>>,
32 device_config_manager: Arc<DeviceConfigurationManager>,
33 device_command_receiver: mpsc::Receiver<DeviceManagerCommand>,
34 /// Maps device index (exposed to the outside world) to actual device objects held by the server.
35 device_map: Arc<DashMap<u32, Arc<ServerDevice>>>,
36 /// Broadcaster that relays device events in the form of Buttplug Messages to
37 /// whoever owns the Buttplug Server.
38 server_sender: broadcast::Sender<ButtplugServerMessageV4>,
39 /// As the device manager owns the Device Communication Managers, it will have
40 /// a receiver that the comm managers all send thru.
41 device_comm_receiver: mpsc::Receiver<HardwareCommunicationManagerEvent>,
42 /// Sender for device events, passed to new devices when they are created.
43 device_event_sender: mpsc::Sender<ServerDeviceEvent>,
44 /// Receiver for device events, which the event loops to handle events.
45 device_event_receiver: mpsc::Receiver<ServerDeviceEvent>,
46 /// True if StartScanning has been called but no ScanningFinished has been
47 /// emitted yet.
48 scanning_bringup_in_progress: bool,
49 /// Denote whether scanning has been started since we last sent a ScanningFinished message.
50 scanning_started: bool,
51 /// Devices currently trying to connect.
52 connecting_devices: Arc<DashSet<String>>,
53 /// Cancellation token for the event loop
54 loop_cancellation_token: CancellationToken,
55 /// True if stop scanning message was sent, means we won't send scanning finished.
56 stop_scanning_received: AtomicBool,
57 /// Protocol map, for mapping user definitions to protocols
58 protocol_manager: ProtocolManager,
59}
60
61impl ServerDeviceManagerEventLoop {
62 pub fn new(
63 comm_managers: Vec<Box<dyn HardwareCommunicationManager>>,
64 device_config_manager: Arc<DeviceConfigurationManager>,
65 device_map: Arc<DashMap<u32, Arc<ServerDevice>>>,
66 loop_cancellation_token: CancellationToken,
67 server_sender: broadcast::Sender<ButtplugServerMessageV4>,
68 device_comm_receiver: mpsc::Receiver<HardwareCommunicationManagerEvent>,
69 device_command_receiver: mpsc::Receiver<DeviceManagerCommand>,
70 ) -> Self {
71 let (device_event_sender, device_event_receiver) = mpsc::channel(256);
72 Self {
73 comm_managers,
74 device_config_manager,
75 server_sender,
76 device_map,
77 device_comm_receiver,
78 device_event_sender,
79 device_event_receiver,
80 device_command_receiver,
81 scanning_bringup_in_progress: false,
82 scanning_started: false,
83 connecting_devices: Arc::new(DashSet::new()),
84 loop_cancellation_token,
85 stop_scanning_received: AtomicBool::new(false),
86 protocol_manager: ProtocolManager::default(),
87 }
88 }
89
90 fn scanning_status(&self) -> bool {
91 if self.comm_managers.iter().any(|x| x.scanning_status()) {
92 debug!("At least one manager still scanning, continuing event loop.");
93 return true;
94 }
95 false
96 }
97
98 async fn handle_start_scanning(&mut self) {
99 if self.scanning_status() || self.scanning_bringup_in_progress {
100 debug!("System already scanning, ignoring new scanning request");
101 return;
102 }
103 self
104 .stop_scanning_received
105 .store(false, std::sync::atomic::Ordering::Relaxed);
106 info!("No scan currently in progress, starting new scan.");
107 self.scanning_bringup_in_progress = true;
108 self.scanning_started = true;
109 let fut_vec: Vec<_> = self
110 .comm_managers
111 .iter_mut()
112 .map(|guard| guard.start_scanning())
113 .collect();
114 // TODO If start_scanning fails anywhere, this will ignore it. We should maybe at least log?
115 future::join_all(fut_vec).await;
116 debug!("Scanning started for all hardware comm managers.");
117 self.scanning_bringup_in_progress = false;
118 }
119
120 async fn handle_stop_scanning(&mut self) {
121 self
122 .stop_scanning_received
123 .store(true, std::sync::atomic::Ordering::Relaxed);
124 let fut_vec: Vec<_> = self
125 .comm_managers
126 .iter_mut()
127 .map(|guard| guard.stop_scanning())
128 .collect();
129 // TODO If stop_scanning fails anywhere, this will ignore it. We should maybe at least log?
130 future::join_all(fut_vec).await;
131 }
132
133 async fn handle_device_communication(&mut self, event: HardwareCommunicationManagerEvent) {
134 match event {
135 HardwareCommunicationManagerEvent::ScanningFinished => {
136 debug!(
137 "System signaled that scanning was finished, check to see if all managers are finished."
138 );
139 if self.scanning_bringup_in_progress {
140 debug!(
141 "Hardware Comm Manager finished before scanning was fully started, continuing event loop."
142 );
143 return;
144 }
145 // Only send scanning finished if we haven't requested a stop.
146 if !self.scanning_status()
147 && self.scanning_started
148 && !self
149 .stop_scanning_received
150 .load(std::sync::atomic::Ordering::Relaxed)
151 {
152 debug!("All managers finished, emitting ScanningFinished");
153 self.scanning_started = false;
154 if self
155 .server_sender
156 .send(ScanningFinishedV0::default().into())
157 .is_err()
158 {
159 info!("Server disappeared, exiting loop.");
160 }
161 }
162 }
163 HardwareCommunicationManagerEvent::DeviceFound {
164 name,
165 address,
166 creator,
167 } => {
168 info!("Device {} ({}) found.", name, address);
169 // Make sure the device isn't on the deny list, or is on the allow list if anything is on it.
170 if !self.device_config_manager.address_allowed(&address) {
171 return;
172 }
173 debug!(
174 "Device {} allowed via configuration file, continuing.",
175 address
176 );
177
178 // Check to make sure the device isn't already connected. If it is, drop what we've been
179 // sent and return.
180 if self
181 .device_map
182 .iter()
183 .any(|entry| *entry.value().identifier().address() == address)
184 {
185 debug!(
186 "Device {} already connected, ignoring new device event.",
187 address
188 );
189 return;
190 }
191
192 // First off, we need to see if we even have a configuration available for the device we're
193 // trying to create. If we don't, exit, because this isn't actually an error. However, if we
194 // actually *do* have a configuration but something goes wrong after this, then it's an
195 // error.
196 //
197 // We used to do this in build_server_device, but we shouldn't mark devices as actually
198 // connecting until after this happens, so we're moving it back here.
199 let protocol_specializers = self.protocol_manager.protocol_specializers(
200 &creator.specifier(),
201 self.device_config_manager.base_communication_specifiers(),
202 self.device_config_manager.user_communication_specifiers(),
203 );
204
205 // If we have no identifiers, then there's nothing to do here. Throw an error.
206 if protocol_specializers.is_empty() {
207 debug!(
208 "{}",
209 format!(
210 "No viable protocols for hardware {:?}, ignoring.",
211 creator.specifier()
212 )
213 );
214 return;
215 }
216
217 // Some device managers (like bluetooth) can send multiple DeviceFound events for the same
218 // device, due to how things like advertisements work. We'll filter this at the
219 // DeviceManager level to make sure that even if a badly coded DCM throws multiple found
220 // events, we only listen to the first one.
221 if !self.connecting_devices.insert(address.clone()) {
222 info!(
223 "Device {} currently trying to connect, ignoring new device event.",
224 address
225 );
226 return;
227 }
228
229 let device_event_sender_clone = self.device_event_sender.clone();
230
231 let device_config_manager = self.device_config_manager.clone();
232 let connecting_devices = self.connecting_devices.clone();
233 let span = info_span!(
234 "device creation",
235 name = tracing::field::display(name),
236 address = tracing::field::display(address.clone())
237 );
238
239 async_manager::spawn(async move {
240 match ServerDevice::build(device_config_manager, creator, protocol_specializers).await {
241 Ok(device) => {
242 if device_event_sender_clone
243 .send(ServerDeviceEvent::Connected(Arc::new(device)))
244 .await
245 .is_err() {
246 error!("Device manager disappeared before connection established, device will be dropped.");
247 }
248 },
249 Err(e) => {
250 error!("Device errored while trying to connect: {:?}", e);
251 }
252 }
253 connecting_devices.remove(&address);
254 }.instrument(span));
255 }
256 }
257 }
258
259 fn generate_device_list(&self) -> DeviceListV4 {
260 let devices = self
261 .device_map
262 .iter()
263 .map(|device| device.value().as_device_message_info(*device.key()))
264 .collect();
265 DeviceListV4::new(devices)
266 }
267
268 async fn handle_device_event(&mut self, device_event: ServerDeviceEvent) {
269 trace!("Got device event: {:?}", device_event);
270 match device_event {
271 ServerDeviceEvent::Connected(device) => {
272 let span = info_span!(
273 "device registration",
274 name = tracing::field::display(device.name()),
275 identifier = tracing::field::debug(device.identifier())
276 );
277 let _enter = span.enter();
278
279 // Get the index from the device
280 let device_index = device.definition().index();
281 // Since we can now reuse device indexes, this means we might possibly
282 // stomp on devices already in the map if they don't register a
283 // disconnect before we try to insert the new device. If we have a
284 // device already in the map with the same index (and therefore same
285 // address), consider it disconnected and eject it from the map. This
286 // should also trigger a disconnect event before our new DeviceAdded
287 // message goes out, so timing matters here.
288 match self.device_map.remove(&device_index) {
289 Some((_, old_device)) => {
290 info!("Device map contains key {}.", device_index);
291 // After removing the device from the array, manually disconnect it to
292 // make sure the event is thrown.
293 if let Err(err) = old_device.disconnect().await {
294 // If we throw an error during the disconnect, we can't really do
295 // anything with it, but should at least log it.
296 error!("Error during index collision disconnect: {:?}", err);
297 }
298 }
299 _ => {
300 info!("Device map does not contain key {}.", device_index);
301 }
302 }
303
304 // Create event loop for forwarding device events into our selector.
305 let event_listener = device.event_stream();
306 let event_sender = self.device_event_sender.clone();
307 async_manager::spawn(async move {
308 pin_mut!(event_listener);
309 // This can fail if the event_sender loses the server before this loop dies.
310 while let Some(event) = event_listener.next().await {
311 if event_sender.send(event).await.is_err() {
312 info!("Event sending failure in servier device manager event loop, exiting.");
313 break;
314 }
315 }
316 });
317
318 info!("Assigning index {} to {}", device_index, device.name());
319 self.device_map.insert(device_index, device.clone());
320
321 let device_update_message: ButtplugServerMessageV4 = self.generate_device_list().into();
322
323 // After that, we can send out to the server's event listeners to let
324 // them know a device has been added.
325 if self.server_sender.send(device_update_message).is_err() {
326 debug!("Server not currently available, dropping Device Added event.");
327 }
328 }
329 ServerDeviceEvent::Disconnected(identifier) => {
330 let mut device_index = None;
331 for device_pair in self.device_map.iter() {
332 if *device_pair.value().identifier() == identifier {
333 device_index = Some(*device_pair.key());
334 break;
335 }
336 }
337 if let Some(device_index) = device_index {
338 self
339 .device_map
340 .remove(&device_index)
341 .expect("Remove will always work.");
342 let device_update_message: ButtplugServerMessageV4 = self.generate_device_list().into();
343 if self.server_sender.send(device_update_message).is_err() {
344 debug!("Server not currently available, dropping Device Removed event.");
345 }
346 }
347 }
348 ServerDeviceEvent::Notification(_, message) => {
349 if self.server_sender.send(message.into()).is_err() {
350 debug!("Server not currently available, dropping Device Added event.");
351 }
352 }
353 }
354 }
355
356 pub async fn run(&mut self) {
357 debug!("Starting Device Manager Loop");
358 loop {
359 tokio::select! {
360 device_comm_msg = self.device_comm_receiver.recv() => {
361 if let Some(msg) = device_comm_msg {
362 trace!("Got device communication message {:?}", msg);
363 self.handle_device_communication(msg).await;
364 } else {
365 break;
366 }
367 }
368 device_event_msg = self.device_event_receiver.recv() => {
369 if let Some(msg) = device_event_msg {
370 trace!("Got device event message {:?}", msg);
371 self.handle_device_event(msg).await;
372 } else {
373 error!("We shouldn't be able to get here since we also own the sender.");
374 break;
375 }
376 },
377 device_command_msg = self.device_command_receiver.recv() => {
378 if let Some(msg) = device_command_msg {
379 trace!("Got device command message {:?}", msg);
380 match msg {
381 DeviceManagerCommand::StartScanning => self.handle_start_scanning().await,
382 DeviceManagerCommand::StopScanning => self.handle_stop_scanning().await,
383 }
384 } else {
385 debug!("Channel to Device Manager frontend dropped, exiting event loop.");
386 break;
387 }
388 }
389 _ = self.loop_cancellation_token.cancelled().fuse() => {
390 debug!("Device event loop cancelled, exiting.");
391 break;
392 }
393 }
394 }
395 debug!("Exiting Device Manager Loop");
396 }
397}