Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! Buttplug Device Manager, manages Device Subtype (Platform/Communication bus
9//! specific) Managers
10
11use crate::{
12 ButtplugServerError,
13 ButtplugServerResultFuture,
14 device::{
15 ServerDevice,
16 hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerBuilder},
17 server_device_manager_event_loop::ServerDeviceManagerEventLoop,
18 },
19 message::{
20 server_device_attributes::ServerDeviceAttributes,
21 spec_enums::{
22 ButtplugCheckedClientMessageV4,
23 ButtplugDeviceCommandMessageUnionV4,
24 ButtplugDeviceManagerMessageUnion,
25 },
26 },
27};
28use buttplug_core::{
29 errors::{ButtplugDeviceError, ButtplugMessageError, ButtplugUnknownError},
30 message::{self, ButtplugDeviceMessage, ButtplugMessage, ButtplugServerMessageV4, DeviceListV4},
31 util::{async_manager, stream::convert_broadcast_receiver_to_stream},
32};
33use buttplug_server_device_config::{DeviceConfigurationManager, UserDeviceIdentifier};
34use dashmap::DashMap;
35use futures::{
36 Stream,
37 future::{self, FutureExt},
38};
39use getset::Getters;
40use std::{
41 collections::HashMap,
42 convert::TryFrom,
43 sync::{
44 Arc,
45 atomic::{AtomicBool, Ordering},
46 },
47};
48use tokio::sync::{broadcast, mpsc};
49use tokio_util::sync::CancellationToken;
50
51#[derive(Debug)]
52pub(super) enum DeviceManagerCommand {
53 StartScanning,
54 StopScanning,
55}
56
57#[derive(Debug, Getters)]
58#[getset(get = "pub")]
59pub struct ServerDeviceInfo {
60 identifier: UserDeviceIdentifier,
61 display_name: Option<String>,
62}
63
64pub struct ServerDeviceManagerBuilder {
65 device_configuration_manager: Arc<DeviceConfigurationManager>,
66 comm_managers: Vec<Box<dyn HardwareCommunicationManagerBuilder>>,
67}
68
69impl ServerDeviceManagerBuilder {
70 pub fn new(device_configuration_manager: DeviceConfigurationManager) -> Self {
71 Self {
72 device_configuration_manager: Arc::new(device_configuration_manager),
73 comm_managers: vec![],
74 }
75 }
76
77 /// Use a prebuilt device configuration manager that needs to be shared with the outside world
78 /// (usually for serialization of user configurations to file)
79 pub fn new_with_arc(device_configuration_manager: Arc<DeviceConfigurationManager>) -> Self {
80 Self {
81 device_configuration_manager,
82 comm_managers: vec![],
83 }
84 }
85
86 pub fn comm_manager<T>(&mut self, builder: T) -> &mut Self
87 where
88 T: HardwareCommunicationManagerBuilder + 'static,
89 {
90 self.comm_managers.push(Box::new(builder));
91 self
92 }
93
94 pub fn finish(&mut self) -> Result<ServerDeviceManager, ButtplugServerError> {
95 let (device_command_sender, device_command_receiver) = mpsc::channel(256);
96 let (device_event_sender, device_event_receiver) = mpsc::channel(256);
97 let mut comm_managers: Vec<Box<dyn HardwareCommunicationManager>> = Vec::new();
98 for builder in &mut self.comm_managers {
99 let comm_mgr = builder.finish(device_event_sender.clone());
100
101 if comm_managers
102 .iter()
103 .any(|mgr| mgr.name() == comm_mgr.name())
104 {
105 return Err(
106 ButtplugServerError::DeviceCommunicationManagerTypeAlreadyAdded(
107 comm_mgr.name().to_owned(),
108 ),
109 );
110 }
111
112 comm_managers.push(comm_mgr);
113 }
114
115 let mut colliding_dcms = vec![];
116 for mgr in comm_managers.iter() {
117 info!("{}: {}", mgr.name(), mgr.can_scan());
118 // Hack: Lovense and Bluetooth dongles will fight with each other over devices, possibly
119 // interrupting each other connecting and causing very weird issues for users. Print a
120 // warning message to logs if more than one is active and available to scan.
121 if [
122 "BtlePlugCommunicationManager",
123 "LovenseSerialDongleCommunicationManager",
124 "LovenseHIDDongleCommunicationManager",
125 ]
126 .iter()
127 .any(|x| x == &mgr.name())
128 && mgr.can_scan()
129 {
130 colliding_dcms.push(mgr.name().to_owned());
131 }
132 }
133 if colliding_dcms.len() > 1 {
134 warn!(
135 "The following device connection methods may collide: {}. This may mean you have lovense dongles and bluetooth dongles connected at the same time. Please disconnect the lovense dongles or turn off the Lovense HID/Serial Dongle support in Intiface/Buttplug. Lovense devices will work with the Bluetooth dongle.",
136 colliding_dcms.join(", ")
137 );
138 }
139
140 let devices = Arc::new(DashMap::new());
141 let loop_cancellation_token = CancellationToken::new();
142
143 let output_sender = broadcast::channel(255).0;
144
145 let mut event_loop = ServerDeviceManagerEventLoop::new(
146 comm_managers,
147 self.device_configuration_manager.clone(),
148 devices.clone(),
149 loop_cancellation_token.child_token(),
150 output_sender.clone(),
151 device_event_receiver,
152 device_command_receiver,
153 );
154 async_manager::spawn(async move {
155 event_loop.run().await;
156 });
157 Ok(ServerDeviceManager {
158 device_configuration_manager: self.device_configuration_manager.clone(),
159 devices,
160 device_command_sender,
161 loop_cancellation_token,
162 running: Arc::new(AtomicBool::new(true)),
163 output_sender,
164 })
165 }
166}
167
168#[derive(Getters)]
169pub struct ServerDeviceManager {
170 #[getset(get = "pub")]
171 device_configuration_manager: Arc<DeviceConfigurationManager>,
172 #[getset(get = "pub(crate)")]
173 devices: Arc<DashMap<u32, Arc<ServerDevice>>>,
174 device_command_sender: mpsc::Sender<DeviceManagerCommand>,
175 loop_cancellation_token: CancellationToken,
176 running: Arc<AtomicBool>,
177 output_sender: broadcast::Sender<ButtplugServerMessageV4>,
178}
179
180impl ServerDeviceManager {
181 pub fn event_stream(&self) -> impl Stream<Item = ButtplugServerMessageV4> + use<> {
182 // Unlike the client API, we can expect anyone using the server to pin this
183 // themselves.
184 convert_broadcast_receiver_to_stream(self.output_sender.subscribe())
185 }
186
187 fn start_scanning(&self) -> ButtplugServerResultFuture {
188 let command_sender = self.device_command_sender.clone();
189 async move {
190 if command_sender
191 .send(DeviceManagerCommand::StartScanning)
192 .await
193 .is_err()
194 {
195 // TODO Fill in error.
196 }
197 Ok(message::OkV0::default().into())
198 }
199 .boxed()
200 }
201
202 fn stop_scanning(&self) -> ButtplugServerResultFuture {
203 let command_sender = self.device_command_sender.clone();
204 async move {
205 if command_sender
206 .send(DeviceManagerCommand::StopScanning)
207 .await
208 .is_err()
209 {
210 // TODO Fill in error.
211 }
212 Ok(message::OkV0::default().into())
213 }
214 .boxed()
215 }
216
217 pub(crate) fn stop_all_devices(&self) -> ButtplugServerResultFuture {
218 let device_map = self.devices.clone();
219 // TODO This could use some error reporting.
220 async move {
221 let fut_vec: Vec<_> = device_map
222 .iter()
223 .map(|dev| {
224 let device = dev.value();
225 device.parse_message(message::StopDeviceCmdV0::new(1).into())
226 })
227 .collect();
228 future::join_all(fut_vec).await;
229 Ok(message::OkV0::default().into())
230 }
231 .boxed()
232 }
233
234 fn parse_device_message(
235 &self,
236 device_msg: ButtplugDeviceCommandMessageUnionV4,
237 ) -> ButtplugServerResultFuture {
238 match self.devices.get(&device_msg.device_index()) {
239 Some(device) => {
240 //let fut = device.parse_message(device_msg);
241 device.parse_message(device_msg)
242 // Create a future to run the message through the device, then handle adding the id to the result.
243 //fut.boxed()
244 }
245 None => ButtplugDeviceError::DeviceNotAvailable(device_msg.device_index()).into(),
246 }
247 }
248
249 fn generate_device_list(&self) -> DeviceListV4 {
250 let devices = self
251 .devices
252 .iter()
253 .map(|device| device.value().as_device_message_info(*device.key()))
254 .collect();
255 DeviceListV4::new(devices)
256 }
257
258 fn parse_device_manager_message(
259 &self,
260 manager_msg: ButtplugDeviceManagerMessageUnion,
261 ) -> ButtplugServerResultFuture {
262 match manager_msg {
263 ButtplugDeviceManagerMessageUnion::RequestDeviceList(msg) => {
264 let mut device_list = self.generate_device_list();
265 device_list.set_id(msg.id());
266 future::ready(Ok(device_list.into())).boxed()
267 }
268 ButtplugDeviceManagerMessageUnion::StopAllDevices(_) => self.stop_all_devices(),
269 ButtplugDeviceManagerMessageUnion::StartScanning(_) => self.start_scanning(),
270 ButtplugDeviceManagerMessageUnion::StopScanning(_) => self.stop_scanning(),
271 }
272 }
273
274 pub fn parse_message(&self, msg: ButtplugCheckedClientMessageV4) -> ButtplugServerResultFuture {
275 if !self.running.load(Ordering::Relaxed) {
276 return future::ready(Err(ButtplugUnknownError::DeviceManagerNotRunning.into())).boxed();
277 }
278 // If this is a device command message, just route it directly to the
279 // device.
280 if let Ok(device_msg) = ButtplugDeviceCommandMessageUnionV4::try_from(msg.clone()) {
281 self.parse_device_message(device_msg)
282 } else if let Ok(manager_msg) = ButtplugDeviceManagerMessageUnion::try_from(msg.clone()) {
283 self.parse_device_manager_message(manager_msg)
284 } else {
285 ButtplugMessageError::UnexpectedMessageType(format!("{msg:?}")).into()
286 }
287 }
288
289 pub(crate) fn feature_map(&self) -> HashMap<u32, ServerDeviceAttributes> {
290 self
291 .devices()
292 .iter()
293 .map(|x| (*x.key(), x.legacy_attributes().clone()))
294 .collect()
295 }
296
297 pub fn device_info(&self, index: u32) -> Option<ServerDeviceInfo> {
298 self.devices.get(&index).map(|device| ServerDeviceInfo {
299 identifier: device.value().identifier().clone(),
300 display_name: device.value().definition().display_name().clone(),
301 })
302 }
303
304 // Only a ButtplugServer should be able to call this. We don't want to expose this capability to
305 // the outside world. Note that this could cause issues for lifetimes if someone holds this longer
306 // than the lifetime of the server that originally created it. Ideally we should lock the Server
307 // Device Manager lifetime to the owning ButtplugServer lifetime to ensure that doesn't happen,
308 // but that's going to be complicated.
309 pub(crate) fn shutdown(&self) -> ButtplugServerResultFuture {
310 let devices = self.devices.clone();
311 // Make sure that, once our owning server shuts us down, no one outside can use this manager
312 // again. Otherwise we can have all sorts of ownership weirdness.
313 self.running.store(false, Ordering::Relaxed);
314 let stop_scanning = self.stop_scanning();
315 let stop_devices = self.stop_all_devices();
316 let token = self.loop_cancellation_token.clone();
317 async move {
318 // Force stop scanning, otherwise we can disconnect and instantly try to reconnect while
319 // cleaning up if we're still scanning.
320 let _ = stop_scanning.await;
321 let _ = stop_devices.await;
322 for device in devices.iter() {
323 device.value().disconnect().await?;
324 }
325 token.cancel();
326 Ok(message::OkV0::default().into())
327 }
328 .boxed()
329 }
330}
331
332impl Drop for ServerDeviceManager {
333 fn drop(&mut self) {
334 info!("Dropping device manager!");
335 self.loop_cancellation_token.cancel();
336 }
337}