Buttplug sex toy control library
at dev 16 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8use async_trait::async_trait; 9use btleplug::api::CharPropFlags; 10use btleplug::{ 11 api::{Central, CentralEvent, Characteristic, Peripheral, ValueNotification, WriteType}, 12 platform::Adapter, 13}; 14use buttplug_core::{errors::ButtplugDeviceError, util::async_manager}; 15use buttplug_server::device::hardware::{ 16 Hardware, 17 HardwareConnector, 18 HardwareEvent, 19 HardwareInternal, 20 HardwareReadCmd, 21 HardwareReading, 22 HardwareSpecializer, 23 HardwareSubscribeCmd, 24 HardwareUnsubscribeCmd, 25 HardwareWriteCmd, 26 communication::HardwareSpecificError, 27}; 28use buttplug_server_device_config::{ 29 BluetoothLESpecifier, 30 Endpoint, 31 ProtocolCommunicationSpecifier, 32}; 33use dashmap::DashSet; 34use futures::{ 35 Stream, 36 StreamExt, 37 future::{self, BoxFuture, FutureExt}, 38}; 39use std::{ 40 collections::HashMap, 41 fmt::{self, Debug}, 42 pin::Pin, 43 sync::Arc, 44 time::Duration, 45}; 46use tokio::{select, sync::broadcast}; 47use uuid::Uuid; 48 49pub(super) struct BtleplugHardwareConnector<T: Peripheral + 'static> { 50 // Passed in and stored as a member because otherwise it's annoying to get (properties require await) 51 name: String, 52 // Passed in and stored as a member because otherwise it's annoying to get (properties require await) 53 manufacturer_data: HashMap<u16, Vec<u8>>, 54 // Passed in and stored as a member because otherwise it's annoying to get (properties require await) 55 services: Vec<Uuid>, 56 device: T, 57 adapter: Adapter, 58 requires_keepalive: bool, 59} 60 61impl<T: Peripheral> BtleplugHardwareConnector<T> { 62 pub fn new( 63 name: &str, 64 manufacturer_data: &HashMap<u16, Vec<u8>>, 65 services: &[Uuid], 66 device: T, 67 adapter: Adapter, 68 requires_keepalive: bool, 69 ) -> Self { 70 Self { 71 name: name.to_owned(), 72 manufacturer_data: manufacturer_data.clone(), 73 services: services.to_vec(), 74 device, 75 adapter, 76 requires_keepalive, 77 } 78 } 79} 80 81impl<T: Peripheral> Debug for BtleplugHardwareConnector<T> { 82 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 83 f.debug_struct("BtleplugHardwareCreator") 84 .field("name", &self.name) 85 .field("address", &self.device.id()) 86 .finish() 87 } 88} 89 90#[async_trait] 91impl<T: Peripheral> HardwareConnector for BtleplugHardwareConnector<T> { 92 fn specifier(&self) -> ProtocolCommunicationSpecifier { 93 ProtocolCommunicationSpecifier::BluetoothLE(BluetoothLESpecifier::new_from_device( 94 &self.name, 95 &self.manufacturer_data, 96 &self.services, 97 )) 98 } 99 100 async fn connect(&mut self) -> Result<Box<dyn HardwareSpecializer>, ButtplugDeviceError> { 101 if !self 102 .device 103 .is_connected() 104 .await 105 .expect("If we crash here it's Bluez's fault. Use something else please.") 106 { 107 if let Err(e) = self.device.connect().await { 108 let return_err = ButtplugDeviceError::DeviceSpecificError( 109 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}")) 110 .to_string(), 111 ); 112 return Err(return_err); 113 } 114 if let Err(err) = self.device.discover_services().await { 115 error!("BTLEPlug error discovering characteristics: {:?}", err); 116 return Err(ButtplugDeviceError::DeviceConnectionError(format!( 117 "BTLEPlug error discovering characteristics: {err:?}" 118 ))); 119 } 120 } 121 Ok(Box::new(BtleplugHardwareSpecializer::new( 122 &self.name, 123 self.device.clone(), 124 self.adapter.clone(), 125 self.requires_keepalive, 126 ))) 127 } 128} 129 130pub struct BtleplugHardwareSpecializer<T: Peripheral + 'static> { 131 name: String, 132 device: T, 133 adapter: Adapter, 134 requires_keepalive: bool, 135} 136 137impl<T: Peripheral> BtleplugHardwareSpecializer<T> { 138 pub(super) fn new(name: &str, device: T, adapter: Adapter, requires_keepalive: bool) -> Self { 139 Self { 140 name: name.to_owned(), 141 device, 142 adapter, 143 requires_keepalive, 144 } 145 } 146} 147 148#[async_trait] 149impl<T: Peripheral> HardwareSpecializer for BtleplugHardwareSpecializer<T> { 150 async fn specialize( 151 &mut self, 152 specifiers: &[ProtocolCommunicationSpecifier], 153 ) -> Result<Hardware, ButtplugDeviceError> { 154 // Map UUIDs to endpoints 155 let mut uuid_map = HashMap::<Uuid, Endpoint>::new(); 156 let mut endpoints = HashMap::<Endpoint, Characteristic>::new(); 157 let address = self.device.id(); 158 159 if let Some(ProtocolCommunicationSpecifier::BluetoothLE(btle)) = specifiers 160 .iter() 161 .find(|x| matches!(x, ProtocolCommunicationSpecifier::BluetoothLE(_))) 162 { 163 for (proto_uuid, proto_service) in btle.services() { 164 for service in self.device.services() { 165 if service.uuid != *proto_uuid { 166 continue; 167 } 168 169 debug!("Found required service {} {:?}", service.uuid, service); 170 for (chr_name, chr_uuid) in proto_service.iter() { 171 if let Some(chr) = service.characteristics.iter().find(|c| c.uuid == *chr_uuid) { 172 debug!( 173 "Found characteristic {} for endpoint {}", 174 chr.uuid, *chr_name 175 ); 176 endpoints.insert(*chr_name, chr.clone()); 177 uuid_map.insert(*chr_uuid, *chr_name); 178 } else { 179 error!( 180 "Characteristic {} ({}) not found, may cause issues in connection.", 181 chr_name, chr_uuid 182 ); 183 } 184 } 185 } 186 } 187 } else { 188 error!( 189 "Can't find btle protocol specifier mapping for device {} {:?}", 190 self.name, address 191 ); 192 return Err(ButtplugDeviceError::DeviceConnectionError(format!( 193 "Can't find btle protocol specifier mapping for device {} {:?}", 194 self.name, address 195 ))); 196 } 197 let notification_stream = self 198 .device 199 .notifications() 200 .await 201 .expect("Should always be able to get notifications"); 202 203 let device_internal_impl = BtlePlugHardware::new( 204 self.device.clone(), 205 &self.name, 206 self 207 .adapter 208 .events() 209 .await 210 .expect("Should always be able to get events"), 211 notification_stream, 212 endpoints.clone(), 213 uuid_map, 214 ); 215 Ok(Hardware::new( 216 &self.name, 217 &format!("{address:?}"), 218 &endpoints.keys().cloned().collect::<Vec<Endpoint>>(), 219 &Some(Duration::from_millis(75)), 220 self.requires_keepalive, 221 Box::new(device_internal_impl), 222 )) 223 } 224} 225 226pub struct BtlePlugHardware<T: Peripheral + 'static> { 227 device: T, 228 event_stream: broadcast::Sender<HardwareEvent>, 229 endpoints: HashMap<Endpoint, Characteristic>, 230 subscribed_endpoints: Arc<DashSet<Endpoint>>, 231} 232 233impl<T: Peripheral + 'static> BtlePlugHardware<T> { 234 pub fn new( 235 device: T, 236 name: &str, 237 mut adapter_event_stream: Pin<Box<dyn Stream<Item = CentralEvent> + Send>>, 238 mut notification_stream: Pin<Box<dyn Stream<Item = ValueNotification> + Send>>, 239 endpoints: HashMap<Endpoint, Characteristic>, 240 uuid_map: HashMap<Uuid, Endpoint>, 241 ) -> Self { 242 let (event_stream, _) = broadcast::channel(256); 243 let event_stream_clone = event_stream.clone(); 244 let address = device.id(); 245 let name_clone = name.to_owned(); 246 async_manager::spawn(async move { 247 let mut error_notification = false; 248 loop { 249 select! { 250 notification = notification_stream.next() => { 251 if let Some(notification) = notification { 252 let endpoint = if let Some(endpoint) = uuid_map.get(&notification.uuid) { 253 *endpoint 254 } else { 255 // Only print the error message once. 256 if !error_notification { 257 error!( 258 "Endpoint for UUID {} not found in map, assuming device has disconnected.", 259 notification.uuid 260 ); 261 error_notification = true; 262 } 263 continue; 264 }; 265 if event_stream_clone.receiver_count() == 0 { 266 continue; 267 } 268 if let Err(err) = event_stream_clone.send(HardwareEvent::Notification( 269 format!("{address:?}"), 270 endpoint, 271 notification.value, 272 )) { 273 error!( 274 "Cannot send notification, device object disappeared: {:?}", 275 err 276 ); 277 break; 278 } 279 } 280 } 281 adapter_event = adapter_event_stream.next() => { 282 if let Some(CentralEvent::DeviceDisconnected(addr)) = adapter_event 283 && address == addr { 284 info!( 285 "Device {:?} disconnected", 286 name_clone 287 ); 288 if event_stream_clone.receiver_count() != 0 289 && let Err(err) = event_stream_clone 290 .send(HardwareEvent::Disconnected( 291 format!("{address:?}") 292 )) { 293 error!( 294 "Cannot send notification, device object disappeared: {:?}", 295 err 296 ); 297 } 298 // At this point, we have nothing left to do because we can't reconnect a device 299 // that's been connected. Exit. 300 break; 301 } 302 } 303 } 304 } 305 info!( 306 "Exiting btleplug notification/event loop for device {:?}", 307 address 308 ) 309 }); 310 Self { 311 device, 312 endpoints, 313 event_stream, 314 subscribed_endpoints: Arc::new(DashSet::new()), 315 } 316 } 317} 318 319impl<T: Peripheral + 'static> HardwareInternal for BtlePlugHardware<T> { 320 fn event_stream(&self) -> broadcast::Receiver<HardwareEvent> { 321 self.event_stream.subscribe() 322 } 323 324 fn disconnect(&self) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 325 let device = self.device.clone(); 326 async move { 327 let _ = device.disconnect().await; 328 Ok(()) 329 } 330 .boxed() 331 } 332 333 fn write_value( 334 &self, 335 msg: &HardwareWriteCmd, 336 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 337 let characteristic = match self.endpoints.get(&msg.endpoint()) { 338 Some(chr) => chr.clone(), 339 None => { 340 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint( 341 msg.endpoint().to_string(), 342 ))) 343 .boxed(); 344 } 345 }; 346 347 let device = self.device.clone(); 348 let mut write_type = if msg.write_with_response() { 349 WriteType::WithResponse 350 } else { 351 WriteType::WithoutResponse 352 }; 353 354 if (write_type == WriteType::WithoutResponse 355 && (characteristic.properties & CharPropFlags::WRITE_WITHOUT_RESPONSE) 356 != CharPropFlags::WRITE_WITHOUT_RESPONSE) 357 || (write_type == WriteType::WithResponse 358 && (characteristic.properties & CharPropFlags::WRITE) != CharPropFlags::WRITE) 359 { 360 if write_type == WriteType::WithoutResponse 361 && (characteristic.properties & CharPropFlags::WRITE) == CharPropFlags::WRITE 362 { 363 warn!( 364 "BTLEPlug device doesn't support write-without-response! Falling back to write-with-response!" 365 ); 366 write_type = WriteType::WithResponse 367 } else if write_type == WriteType::WithResponse 368 && (characteristic.properties & CharPropFlags::WRITE_WITHOUT_RESPONSE) 369 == CharPropFlags::WRITE_WITHOUT_RESPONSE 370 { 371 warn!( 372 "BTLEPlug device doesn't support write-with-response! Falling back to write-without-response!" 373 ); 374 write_type = WriteType::WithoutResponse 375 } else { 376 error!( 377 "BTLEPlug device doesn't support {}! No fallback available!", 378 if write_type == WriteType::WithoutResponse { 379 "write-without-response" 380 } else { 381 "write-with-response" 382 } 383 ); 384 } 385 } 386 387 let data = msg.data().clone(); 388 async move { 389 match device.write(&characteristic, &data, write_type).await { 390 Ok(()) => { 391 trace!( 392 "Sent write: {:?}, {:?} to {:?}", 393 data, write_type, characteristic 394 ); 395 Ok(()) 396 } 397 Err(e) => { 398 error!("BTLEPlug device write error: {:?}", e); 399 Err(ButtplugDeviceError::DeviceSpecificError( 400 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}")) 401 .to_string(), 402 )) 403 } 404 } 405 } 406 .boxed() 407 } 408 409 fn read_value( 410 &self, 411 msg: &HardwareReadCmd, 412 ) -> BoxFuture<'static, Result<HardwareReading, ButtplugDeviceError>> { 413 // Right now we only need read for doing a whitelist check on devices. We 414 // don't care about the data we get back. 415 let characteristic = match self.endpoints.get(&msg.endpoint()) { 416 Some(chr) => chr.clone(), 417 None => { 418 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint( 419 msg.endpoint().to_string(), 420 ))) 421 .boxed(); 422 } 423 }; 424 let device = self.device.clone(); 425 let endpoint = msg.endpoint(); 426 async move { 427 match device.read(&characteristic).await { 428 Ok(data) => { 429 trace!("Got reading: {:?}", data); 430 Ok(HardwareReading::new(endpoint, &data)) 431 } 432 Err(e) => { 433 error!("BTLEPlug device read error: {:?}", e); 434 Err(ButtplugDeviceError::DeviceSpecificError( 435 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}")) 436 .to_string(), 437 )) 438 } 439 } 440 } 441 .boxed() 442 } 443 444 fn subscribe( 445 &self, 446 msg: &HardwareSubscribeCmd, 447 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 448 let endpoint = msg.endpoint(); 449 if self.subscribed_endpoints.contains(&endpoint) { 450 debug!( 451 "Endpoint {} already subscribed, ignoring and returning Ok.", 452 endpoint 453 ); 454 return future::ready(Ok(())).boxed(); 455 } 456 let characteristic = match self.endpoints.get(&endpoint) { 457 Some(chr) => chr.clone(), 458 None => { 459 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint( 460 msg.endpoint().to_string(), 461 ))) 462 .boxed(); 463 } 464 }; 465 let endpoints = self.subscribed_endpoints.clone(); 466 let device = self.device.clone(); 467 async move { 468 device.subscribe(&characteristic).await.map_err(|e| { 469 ButtplugDeviceError::DeviceSpecificError( 470 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}")) 471 .to_string(), 472 ) 473 })?; 474 endpoints.insert(endpoint); 475 Ok(()) 476 } 477 .boxed() 478 } 479 480 fn unsubscribe( 481 &self, 482 msg: &HardwareUnsubscribeCmd, 483 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> { 484 let endpoint = msg.endpoint(); 485 if !self.subscribed_endpoints.contains(&endpoint) { 486 debug!( 487 "Endpoint {} already unsubscribed, ignoring and returning Ok.", 488 endpoint 489 ); 490 return future::ready(Ok(())).boxed(); 491 } 492 let characteristic = match self.endpoints.get(&msg.endpoint()) { 493 Some(chr) => chr.clone(), 494 None => { 495 return future::ready(Err(ButtplugDeviceError::InvalidEndpoint( 496 msg.endpoint().to_string(), 497 ))) 498 .boxed(); 499 } 500 }; 501 let endpoints = self.subscribed_endpoints.clone(); 502 let device = self.device.clone(); 503 async move { 504 device.unsubscribe(&characteristic).await.map_err(|e| { 505 ButtplugDeviceError::DeviceSpecificError( 506 HardwareSpecificError::HardwareSpecificError("btleplug".to_owned(), format!("{e:?}")) 507 .to_string(), 508 ) 509 })?; 510 endpoints.remove(&endpoint); 511 Ok(()) 512 } 513 .boxed() 514 } 515} 516 517impl<T: Peripheral> Drop for BtlePlugHardware<T> { 518 fn drop(&mut self) { 519 let disconnect_fut = self.disconnect(); 520 async_manager::spawn(async move { 521 if let Err(e) = disconnect_fut.await { 522 error!("Error disconnecting btleplug device: {:?}", e); 523 } 524 }); 525 } 526}