Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use async_trait::async_trait;
9use btleplug::api::CharPropFlags;
10use btleplug::{
11 api::{Central, CentralEvent, Characteristic, Peripheral, ValueNotification, WriteType},
12 platform::Adapter,
13};
14use buttplug_core::{errors::ButtplugDeviceError, util::async_manager};
15use buttplug_server::device::hardware::{
16 Hardware,
17 HardwareConnector,
18 HardwareEvent,
19 HardwareInternal,
20 HardwareReadCmd,
21 HardwareReading,
22 HardwareSpecializer,
23 HardwareSubscribeCmd,
24 HardwareUnsubscribeCmd,
25 HardwareWriteCmd,
26 communication::HardwareSpecificError,
27};
28use buttplug_server_device_config::{
29 BluetoothLESpecifier,
30 Endpoint,
31 ProtocolCommunicationSpecifier,
32};
33use dashmap::DashSet;
34use futures::{
35 Stream,
36 StreamExt,
37 future::{self, BoxFuture, FutureExt},
38};
39use std::{
40 collections::HashMap,
41 fmt::{self, Debug},
42 pin::Pin,
43 sync::Arc,
44 time::Duration,
45};
46use tokio::{select, sync::broadcast};
47use uuid::Uuid;
48
49pub(super) struct BtleplugHardwareConnector<T: Peripheral + 'static> {
50 // Passed in and stored as a member because otherwise it's annoying to get (properties require await)
51 name: String,
52 // Passed in and stored as a member because otherwise it's annoying to get (properties require await)
53 manufacturer_data: HashMap<u16, Vec<u8>>,
54 // Passed in and stored as a member because otherwise it's annoying to get (properties require await)
55 services: Vec<Uuid>,
56 device: T,
57 adapter: Adapter,
58 requires_keepalive: bool,
59}
60
61impl<T: Peripheral> BtleplugHardwareConnector<T> {
62 pub fn new(
63 name: &str,
64 manufacturer_data: &HashMap<u16, Vec<u8>>,
65 services: &[Uuid],
66 device: T,
67 adapter: Adapter,
68 requires_keepalive: bool,
69 ) -> Self {
70 Self {
71 name: name.to_owned(),
72 manufacturer_data: manufacturer_data.clone(),
73 services: services.to_vec(),
74 device,
75 adapter,
76 requires_keepalive,
77 }
78 }
79}
80
81impl<T: Peripheral> Debug for BtleplugHardwareConnector<T> {
82 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83 f.debug_struct("BtleplugHardwareCreator")
84 .field("name", &self.name)
85 .field("address", &self.device.id())
86 .finish()
87 }
88}
89
90#[async_trait]
91impl<T: Peripheral> HardwareConnector for BtleplugHardwareConnector<T> {
92 fn specifier(&self) -> ProtocolCommunicationSpecifier {
93 ProtocolCommunicationSpecifier::BluetoothLE(BluetoothLESpecifier::new_from_device(
94 &self.name,
95 &self.manufacturer_data,
96 &self.services,
97 ))
98 }
99
100 async fn connect(&mut self) -> Result<Box<dyn HardwareSpecializer>, ButtplugDeviceError> {
101 if !self
102 .device
103 .is_connected()
104 .await
105 .expect("If we crash here it's Bluez's fault. Use something else please.")
106 {
107 if let Err(e) = self.device.connect().await {
108 let return_err = ButtplugDeviceError::DeviceSpecificError(
109 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}"))
110 .to_string(),
111 );
112 return Err(return_err);
113 }
114 if let Err(err) = self.device.discover_services().await {
115 error!("BTLEPlug error discovering characteristics: {:?}", err);
116 return Err(ButtplugDeviceError::DeviceConnectionError(format!(
117 "BTLEPlug error discovering characteristics: {err:?}"
118 )));
119 }
120 }
121 Ok(Box::new(BtleplugHardwareSpecializer::new(
122 &self.name,
123 self.device.clone(),
124 self.adapter.clone(),
125 self.requires_keepalive,
126 )))
127 }
128}
129
130pub struct BtleplugHardwareSpecializer<T: Peripheral + 'static> {
131 name: String,
132 device: T,
133 adapter: Adapter,
134 requires_keepalive: bool,
135}
136
137impl<T: Peripheral> BtleplugHardwareSpecializer<T> {
138 pub(super) fn new(name: &str, device: T, adapter: Adapter, requires_keepalive: bool) -> Self {
139 Self {
140 name: name.to_owned(),
141 device,
142 adapter,
143 requires_keepalive,
144 }
145 }
146}
147
148#[async_trait]
149impl<T: Peripheral> HardwareSpecializer for BtleplugHardwareSpecializer<T> {
150 async fn specialize(
151 &mut self,
152 specifiers: &[ProtocolCommunicationSpecifier],
153 ) -> Result<Hardware, ButtplugDeviceError> {
154 // Map UUIDs to endpoints
155 let mut uuid_map = HashMap::<Uuid, Endpoint>::new();
156 let mut endpoints = HashMap::<Endpoint, Characteristic>::new();
157 let address = self.device.id();
158
159 if let Some(ProtocolCommunicationSpecifier::BluetoothLE(btle)) = specifiers
160 .iter()
161 .find(|x| matches!(x, ProtocolCommunicationSpecifier::BluetoothLE(_)))
162 {
163 for (proto_uuid, proto_service) in btle.services() {
164 for service in self.device.services() {
165 if service.uuid != *proto_uuid {
166 continue;
167 }
168
169 debug!("Found required service {} {:?}", service.uuid, service);
170 for (chr_name, chr_uuid) in proto_service.iter() {
171 if let Some(chr) = service.characteristics.iter().find(|c| c.uuid == *chr_uuid) {
172 debug!(
173 "Found characteristic {} for endpoint {}",
174 chr.uuid, *chr_name
175 );
176 endpoints.insert(*chr_name, chr.clone());
177 uuid_map.insert(*chr_uuid, *chr_name);
178 } else {
179 error!(
180 "Characteristic {} ({}) not found, may cause issues in connection.",
181 chr_name, chr_uuid
182 );
183 }
184 }
185 }
186 }
187 } else {
188 error!(
189 "Can't find btle protocol specifier mapping for device {} {:?}",
190 self.name, address
191 );
192 return Err(ButtplugDeviceError::DeviceConnectionError(format!(
193 "Can't find btle protocol specifier mapping for device {} {:?}",
194 self.name, address
195 )));
196 }
197 let notification_stream = self
198 .device
199 .notifications()
200 .await
201 .expect("Should always be able to get notifications");
202
203 let device_internal_impl = BtlePlugHardware::new(
204 self.device.clone(),
205 &self.name,
206 self
207 .adapter
208 .events()
209 .await
210 .expect("Should always be able to get events"),
211 notification_stream,
212 endpoints.clone(),
213 uuid_map,
214 );
215 Ok(Hardware::new(
216 &self.name,
217 &format!("{address:?}"),
218 &endpoints.keys().cloned().collect::<Vec<Endpoint>>(),
219 &Some(Duration::from_millis(75)),
220 self.requires_keepalive,
221 Box::new(device_internal_impl),
222 ))
223 }
224}
225
226pub struct BtlePlugHardware<T: Peripheral + 'static> {
227 device: T,
228 event_stream: broadcast::Sender<HardwareEvent>,
229 endpoints: HashMap<Endpoint, Characteristic>,
230 subscribed_endpoints: Arc<DashSet<Endpoint>>,
231}
232
233impl<T: Peripheral + 'static> BtlePlugHardware<T> {
234 pub fn new(
235 device: T,
236 name: &str,
237 mut adapter_event_stream: Pin<Box<dyn Stream<Item = CentralEvent> + Send>>,
238 mut notification_stream: Pin<Box<dyn Stream<Item = ValueNotification> + Send>>,
239 endpoints: HashMap<Endpoint, Characteristic>,
240 uuid_map: HashMap<Uuid, Endpoint>,
241 ) -> Self {
242 let (event_stream, _) = broadcast::channel(256);
243 let event_stream_clone = event_stream.clone();
244 let address = device.id();
245 let name_clone = name.to_owned();
246 async_manager::spawn(async move {
247 let mut error_notification = false;
248 loop {
249 select! {
250 notification = notification_stream.next() => {
251 if let Some(notification) = notification {
252 let endpoint = if let Some(endpoint) = uuid_map.get(¬ification.uuid) {
253 *endpoint
254 } else {
255 // Only print the error message once.
256 if !error_notification {
257 error!(
258 "Endpoint for UUID {} not found in map, assuming device has disconnected.",
259 notification.uuid
260 );
261 error_notification = true;
262 }
263 continue;
264 };
265 if event_stream_clone.receiver_count() == 0 {
266 continue;
267 }
268 if let Err(err) = event_stream_clone.send(HardwareEvent::Notification(
269 format!("{address:?}"),
270 endpoint,
271 notification.value,
272 )) {
273 error!(
274 "Cannot send notification, device object disappeared: {:?}",
275 err
276 );
277 break;
278 }
279 }
280 }
281 adapter_event = adapter_event_stream.next() => {
282 if let Some(CentralEvent::DeviceDisconnected(addr)) = adapter_event
283 && address == addr {
284 info!(
285 "Device {:?} disconnected",
286 name_clone
287 );
288 if event_stream_clone.receiver_count() != 0
289 && let Err(err) = event_stream_clone
290 .send(HardwareEvent::Disconnected(
291 format!("{address:?}")
292 )) {
293 error!(
294 "Cannot send notification, device object disappeared: {:?}",
295 err
296 );
297 }
298 // At this point, we have nothing left to do because we can't reconnect a device
299 // that's been connected. Exit.
300 break;
301 }
302 }
303 }
304 }
305 info!(
306 "Exiting btleplug notification/event loop for device {:?}",
307 address
308 )
309 });
310 Self {
311 device,
312 endpoints,
313 event_stream,
314 subscribed_endpoints: Arc::new(DashSet::new()),
315 }
316 }
317}
318
319impl<T: Peripheral + 'static> HardwareInternal for BtlePlugHardware<T> {
320 fn event_stream(&self) -> broadcast::Receiver<HardwareEvent> {
321 self.event_stream.subscribe()
322 }
323
324 fn disconnect(&self) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
325 let device = self.device.clone();
326 async move {
327 let _ = device.disconnect().await;
328 Ok(())
329 }
330 .boxed()
331 }
332
333 fn write_value(
334 &self,
335 msg: &HardwareWriteCmd,
336 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
337 let characteristic = match self.endpoints.get(&msg.endpoint()) {
338 Some(chr) => chr.clone(),
339 None => {
340 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint(
341 msg.endpoint().to_string(),
342 )))
343 .boxed();
344 }
345 };
346
347 let device = self.device.clone();
348 let mut write_type = if msg.write_with_response() {
349 WriteType::WithResponse
350 } else {
351 WriteType::WithoutResponse
352 };
353
354 if (write_type == WriteType::WithoutResponse
355 && (characteristic.properties & CharPropFlags::WRITE_WITHOUT_RESPONSE)
356 != CharPropFlags::WRITE_WITHOUT_RESPONSE)
357 || (write_type == WriteType::WithResponse
358 && (characteristic.properties & CharPropFlags::WRITE) != CharPropFlags::WRITE)
359 {
360 if write_type == WriteType::WithoutResponse
361 && (characteristic.properties & CharPropFlags::WRITE) == CharPropFlags::WRITE
362 {
363 warn!(
364 "BTLEPlug device doesn't support write-without-response! Falling back to write-with-response!"
365 );
366 write_type = WriteType::WithResponse
367 } else if write_type == WriteType::WithResponse
368 && (characteristic.properties & CharPropFlags::WRITE_WITHOUT_RESPONSE)
369 == CharPropFlags::WRITE_WITHOUT_RESPONSE
370 {
371 warn!(
372 "BTLEPlug device doesn't support write-with-response! Falling back to write-without-response!"
373 );
374 write_type = WriteType::WithoutResponse
375 } else {
376 error!(
377 "BTLEPlug device doesn't support {}! No fallback available!",
378 if write_type == WriteType::WithoutResponse {
379 "write-without-response"
380 } else {
381 "write-with-response"
382 }
383 );
384 }
385 }
386
387 let data = msg.data().clone();
388 async move {
389 match device.write(&characteristic, &data, write_type).await {
390 Ok(()) => {
391 trace!(
392 "Sent write: {:?}, {:?} to {:?}",
393 data, write_type, characteristic
394 );
395 Ok(())
396 }
397 Err(e) => {
398 error!("BTLEPlug device write error: {:?}", e);
399 Err(ButtplugDeviceError::DeviceSpecificError(
400 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}"))
401 .to_string(),
402 ))
403 }
404 }
405 }
406 .boxed()
407 }
408
409 fn read_value(
410 &self,
411 msg: &HardwareReadCmd,
412 ) -> BoxFuture<'static, Result<HardwareReading, ButtplugDeviceError>> {
413 // Right now we only need read for doing a whitelist check on devices. We
414 // don't care about the data we get back.
415 let characteristic = match self.endpoints.get(&msg.endpoint()) {
416 Some(chr) => chr.clone(),
417 None => {
418 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint(
419 msg.endpoint().to_string(),
420 )))
421 .boxed();
422 }
423 };
424 let device = self.device.clone();
425 let endpoint = msg.endpoint();
426 async move {
427 match device.read(&characteristic).await {
428 Ok(data) => {
429 trace!("Got reading: {:?}", data);
430 Ok(HardwareReading::new(endpoint, &data))
431 }
432 Err(e) => {
433 error!("BTLEPlug device read error: {:?}", e);
434 Err(ButtplugDeviceError::DeviceSpecificError(
435 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}"))
436 .to_string(),
437 ))
438 }
439 }
440 }
441 .boxed()
442 }
443
444 fn subscribe(
445 &self,
446 msg: &HardwareSubscribeCmd,
447 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
448 let endpoint = msg.endpoint();
449 if self.subscribed_endpoints.contains(&endpoint) {
450 debug!(
451 "Endpoint {} already subscribed, ignoring and returning Ok.",
452 endpoint
453 );
454 return future::ready(Ok(())).boxed();
455 }
456 let characteristic = match self.endpoints.get(&endpoint) {
457 Some(chr) => chr.clone(),
458 None => {
459 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint(
460 msg.endpoint().to_string(),
461 )))
462 .boxed();
463 }
464 };
465 let endpoints = self.subscribed_endpoints.clone();
466 let device = self.device.clone();
467 async move {
468 device.subscribe(&characteristic).await.map_err(|e| {
469 ButtplugDeviceError::DeviceSpecificError(
470 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}"))
471 .to_string(),
472 )
473 })?;
474 endpoints.insert(endpoint);
475 Ok(())
476 }
477 .boxed()
478 }
479
480 fn unsubscribe(
481 &self,
482 msg: &HardwareUnsubscribeCmd,
483 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
484 let endpoint = msg.endpoint();
485 if !self.subscribed_endpoints.contains(&endpoint) {
486 debug!(
487 "Endpoint {} already unsubscribed, ignoring and returning Ok.",
488 endpoint
489 );
490 return future::ready(Ok(())).boxed();
491 }
492 let characteristic = match self.endpoints.get(&msg.endpoint()) {
493 Some(chr) => chr.clone(),
494 None => {
495 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint(
496 msg.endpoint().to_string(),
497 )))
498 .boxed();
499 }
500 };
501 let endpoints = self.subscribed_endpoints.clone();
502 let device = self.device.clone();
503 async move {
504 device.unsubscribe(&characteristic).await.map_err(|e| {
505 ButtplugDeviceError::DeviceSpecificError(
506 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}"))
507 .to_string(),
508 )
509 })?;
510 endpoints.remove(&endpoint);
511 Ok(())
512 }
513 .boxed()
514 }
515}
516
517impl<T: Peripheral> Drop for BtlePlugHardware<T> {
518 fn drop(&mut self) {
519 let disconnect_fut = self.disconnect();
520 async_manager::spawn(async move {
521 if let Err(e) = disconnect_fut.await {
522 error!("Error disconnecting btleplug device: {:?}", e);
523 }
524 });
525 }
526}