Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! Communications API for accessing Buttplug Servers
9pub mod client_event_loop;
10pub mod client_message_sorter;
11pub mod connector;
12pub mod device;
13pub mod serializer;
14
15use buttplug_core::{
16 connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorFuture},
17 errors::{ButtplugError, ButtplugHandshakeError},
18 message::{
19 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
20 BUTTPLUG_CURRENT_API_MINOR_VERSION,
21 ButtplugClientMessageV4,
22 ButtplugServerMessageV4,
23 PingV0,
24 RequestDeviceListV0,
25 RequestServerInfoV4,
26 StartScanningV0,
27 StopAllDevicesV0,
28 StopScanningV0,
29 },
30 util::{
31 async_manager,
32 future::{ButtplugFuture, ButtplugFutureStateShared},
33 stream::convert_broadcast_receiver_to_stream,
34 },
35};
36use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest};
37use dashmap::DashMap;
38pub use device::{ButtplugClientDevice, ButtplugClientDeviceEvent};
39use futures::{
40 Stream,
41 future::{self, BoxFuture, FutureExt},
42};
43use log::*;
44use std::{
45 collections::BTreeMap,
46 sync::{
47 Arc,
48 atomic::{AtomicBool, Ordering},
49 },
50};
51use strum_macros::Display;
52use thiserror::Error;
53use tokio::sync::{Mutex, broadcast, mpsc};
54use tracing_futures::Instrument;
55
56/// Result type used for public APIs.
57///
58/// Allows us to differentiate between an issue with the connector (as a
59/// [ButtplugConnectorError]) and an issue within Buttplug (as a
60/// [ButtplugError]).
61type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
62type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;
63
64/// Result type used for passing server responses.
65pub type ButtplugServerMessageResult = ButtplugClientResult<ButtplugServerMessageV4>;
66pub type ButtplugServerMessageResultFuture = ButtplugClientResultFuture<ButtplugServerMessageV4>;
67/// Future state type for returning server responses across futures.
68pub(crate) type ButtplugServerMessageStateShared =
69 ButtplugFutureStateShared<ButtplugServerMessageResult>;
70/// Future type that expects server responses.
71pub(crate) type ButtplugServerMessageFuture = ButtplugFuture<ButtplugServerMessageResult>;
72
73/// Future state for messages sent from the client that expect a server response.
74///
75/// When a message is sent from the client and expects a response from the server, we'd like to know
76/// when that response arrives, and usually we'll want to wait for it. We can do so by creating a
77/// future that will be resolved when a response is received from the server.
78///
79/// To do this, we build a [ButtplugFuture], then take its waker and pass it along with the message
80/// we send to the connector, using the [ButtplugClientMessageFuturePair] type. We can then expect
81/// the connector to get the response from the server, match it with our message (using something
82/// like the ClientMessageSorter, an internal structure in the Buttplug library), and set the reply
83/// in the waker we've sent along. This will resolve the future we're waiting on and allow us to
84/// continue execution.
85#[derive(Clone)]
86pub struct ButtplugClientMessageFuturePair {
87 msg: ButtplugClientMessageV4,
88 waker: ButtplugServerMessageStateShared,
89}
90
91impl ButtplugClientMessageFuturePair {
92 pub fn new(msg: ButtplugClientMessageV4, waker: ButtplugServerMessageStateShared) -> Self {
93 Self { msg, waker }
94 }
95}
96
97/// Represents all of the different types of errors a ButtplugClient can return.
98///
99/// Clients can return two types of errors:
100///
101/// - [ButtplugConnectorError], which means there was a problem with the connection between the
102/// client and the server, like a network connection issue.
103/// - [ButtplugError], which is an error specific to the Buttplug Protocol.
104#[derive(Debug, Error, Display)]
105pub enum ButtplugClientError {
106 /// Connector error
107 #[error(transparent)]
108 ButtplugConnectorError(#[from] ButtplugConnectorError),
109 /// Protocol error
110 #[error(transparent)]
111 ButtplugError(#[from] ButtplugError),
112 /// Error converting output command: {}
113 ButtplugOutputCommandConversionError(String),
114}
115
116/// Enum representing different events that can be emitted by a client.
117///
118/// These events are created by the server and sent to the client, and represent
119/// unrequested actions that the client will need to respond to, or that
120/// applications using the client may be interested in.
121#[derive(Clone, Debug)]
122pub enum ButtplugClientEvent {
123 /// Emitted when a scanning session (started via a StartScanning call on
124 /// [ButtplugClient]) has finished.
125 ScanningFinished,
126 /// Emitted when a device has been added to the server. Includes a
127 /// [ButtplugClientDevice] object representing the device.
128 DeviceAdded(ButtplugClientDevice),
129 /// Emitted when a device has been removed from the server. Includes a
130 /// [ButtplugClientDevice] object representing the device.
131 DeviceRemoved(ButtplugClientDevice),
132 /// Emitted when a client has not pinged the server in a sufficient amount of
133 /// time.
134 PingTimeout,
135 /// Emitted when the client successfully connects to a server.
136 ServerConnect,
137 /// Emitted when a client connector detects that the server has disconnected.
138 ServerDisconnect,
139 /// Emitted when an error that cannot be matched to a request is received from
140 /// the server.
141 Error(ButtplugError),
142}
143
144impl Unpin for ButtplugClientEvent {
145}
146
147pub(crate) fn create_boxed_future_client_error<T>(
148 err: ButtplugError,
149) -> ButtplugClientResultFuture<T>
150where
151 T: 'static + Send + Sync,
152{
153 future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed()
154}
155
156#[derive(Clone, Debug)]
157pub(crate) struct ButtplugClientMessageSender {
158 message_sender: broadcast::Sender<ButtplugClientRequest>,
159 connected: Arc<AtomicBool>,
160}
161
162impl ButtplugClientMessageSender {
163 fn new(
164 message_sender: &broadcast::Sender<ButtplugClientRequest>,
165 connected: &Arc<AtomicBool>,
166 ) -> Self {
167 Self {
168 message_sender: message_sender.clone(),
169 connected: connected.clone(),
170 }
171 }
172
173 /// Send message to the internal event loop.
174 ///
175 /// Mostly for handling boilerplate around possible send errors.
176 pub fn send_message_to_event_loop(
177 &self,
178 msg: ButtplugClientRequest,
179 ) -> BoxFuture<'static, Result<(), ButtplugClientError>> {
180 // If we're running the event loop, we should have a message_sender.
181 // Being connected to the server doesn't matter here yet because we use
182 // this function in order to connect also.
183 //
184 // The message sender doesn't require an async send now, but we still want
185 // to delay execution as part of our future in order to keep task coherency.
186 let message_sender = self.message_sender.clone();
187 async move {
188 message_sender
189 .send(msg)
190 .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
191 Ok(())
192 }
193 .boxed()
194 }
195
196 pub fn subscribe(&self) -> broadcast::Receiver<ButtplugClientRequest> {
197 self.message_sender.subscribe()
198 }
199
200 pub fn send_message(&self, msg: ButtplugClientMessageV4) -> ButtplugServerMessageResultFuture {
201 if !self.connected.load(Ordering::Relaxed) {
202 future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed()
203 } else {
204 self.send_message_ignore_connect_status(msg)
205 }
206 }
207
208 /// Sends a ButtplugMessage from client to server. Expects to receive a ButtplugMessage back from
209 /// the server.
210 pub fn send_message_ignore_connect_status(
211 &self,
212 msg: ButtplugClientMessageV4,
213 ) -> ButtplugServerMessageResultFuture {
214 // Create a future to pair with the message being resolved.
215 let fut = ButtplugServerMessageFuture::default();
216 let internal_msg = ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(
217 msg,
218 fut.get_state_clone(),
219 ));
220
221 // Send message to internal loop and wait for return.
222 let send_fut = self.send_message_to_event_loop(internal_msg);
223 async move {
224 send_fut.await?;
225 fut.await
226 }
227 .boxed()
228 }
229
230 /// Sends a ButtplugMessage from client to server. Expects to receive an [Ok]
231 /// type ButtplugMessage back from the server.
232 pub fn send_message_expect_ok(&self, msg: ButtplugClientMessageV4) -> ButtplugClientResultFuture {
233 let send_fut = self.send_message(msg);
234 async move { send_fut.await.map(|_| ()) }.boxed()
235 }
236}
237
238/// Struct used by applications to communicate with a Buttplug Server.
239///
240/// Buttplug Clients provide an API layer on top of the Buttplug Protocol that
241/// handles boring things like message creation and pairing, protocol ordering,
242/// etc... This allows developers to concentrate on controlling hardware with
243/// the API.
244///
245/// Clients serve a few different purposes:
246/// - Managing connections to servers, thru [ButtplugConnector]s
247/// - Emitting events received from the Server
248/// - Holding state related to the server (i.e. what devices are currently
249/// connected, etc...)
250///
251/// Clients are created by the [ButtplugClient::new()] method, which also
252/// handles spinning up the event loop and connecting the client to the server.
253/// Closures passed to the run() method can access and use the Client object.
254pub struct ButtplugClient {
255 /// The client name. Depending on the connection type and server being used,
256 /// this name is sometimes shown on the server logs or GUI.
257 client_name: String,
258 /// The server name that we're current connected to.
259 server_name: Arc<Mutex<Option<String>>>,
260 event_stream: broadcast::Sender<ButtplugClientEvent>,
261 // Sender to relay messages to the internal client loop
262 message_sender: ButtplugClientMessageSender,
263 connected: Arc<AtomicBool>,
264 device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
265}
266
267impl ButtplugClient {
268 pub fn new(name: &str) -> Self {
269 let (message_sender, _) = broadcast::channel(256);
270 let (event_stream, _) = broadcast::channel(256);
271 let connected = Arc::new(AtomicBool::new(false));
272 Self {
273 client_name: name.to_owned(),
274 server_name: Arc::new(Mutex::new(None)),
275 event_stream,
276 message_sender: ButtplugClientMessageSender::new(&message_sender, &connected),
277 connected,
278 device_map: Arc::new(DashMap::new()),
279 }
280 }
281
282 pub async fn connect<ConnectorType>(
283 &self,
284 mut connector: ConnectorType,
285 ) -> Result<(), ButtplugClientError>
286 where
287 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
288 {
289 if self.connected() {
290 return Err(ButtplugClientError::ButtplugConnectorError(
291 ButtplugConnectorError::ConnectorAlreadyConnected,
292 ));
293 }
294
295 // If connect is being called again, clear out the device map and start over.
296 self.device_map.clear();
297
298 info!("Connecting to server.");
299 let (connector_sender, connector_receiver) = mpsc::channel(256);
300 connector.connect(connector_sender).await.map_err(|e| {
301 error!("Connection to server failed: {:?}", e);
302 ButtplugClientError::from(e)
303 })?;
304 info!("Connection to server succeeded.");
305 let mut client_event_loop = ButtplugClientEventLoop::new(
306 self.connected.clone(),
307 connector,
308 connector_receiver,
309 self.event_stream.clone(),
310 self.message_sender.clone(),
311 self.device_map.clone(),
312 );
313
314 // Start the event loop before we run the handshake.
315 async_manager::spawn(
316 async move {
317 client_event_loop.run().await;
318 }
319 .instrument(tracing::info_span!("Client Loop Span")),
320 );
321 self.run_handshake().await
322 }
323
324 /// Creates the ButtplugClient instance and tries to establish a connection.
325 ///
326 /// Takes all of the components needed to build a [ButtplugClient], creates
327 /// the struct, then tries to run connect and execute the Buttplug protocol
328 /// handshake. Will return a connected and ready to use ButtplugClient is all
329 /// goes well.
330 async fn run_handshake(&self) -> ButtplugClientResult {
331 // Run our handshake
332 info!("Running handshake with server.");
333 let msg = self
334 .message_sender
335 .send_message_ignore_connect_status(
336 RequestServerInfoV4::new(
337 &self.client_name,
338 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
339 BUTTPLUG_CURRENT_API_MINOR_VERSION,
340 )
341 .into(),
342 )
343 .await?;
344
345 debug!("Got ServerInfo return.");
346 if let ButtplugServerMessageV4::ServerInfo(server_info) = msg {
347 info!("Connected to {}", server_info.server_name());
348 *self.server_name.lock().await = Some(server_info.server_name().clone());
349 // Don't set ourselves as connected until after ServerInfo has been
350 // received. This means we avoid possible races with the RequestServerInfo
351 // handshake.
352 self.connected.store(true, Ordering::Relaxed);
353
354 // Get currently connected devices. The event loop will
355 // handle sending the message and getting the return, and
356 // will send the client updates as events.
357 let msg = self
358 .message_sender
359 .send_message(RequestDeviceListV0::default().into())
360 .await?;
361 if let ButtplugServerMessageV4::DeviceList(m) = msg {
362 self
363 .message_sender
364 .send_message_to_event_loop(ButtplugClientRequest::HandleDeviceList(m))
365 .await?;
366 }
367 Ok(())
368 } else {
369 self.disconnect().await?;
370 Err(ButtplugClientError::ButtplugError(
371 ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{msg:?}")).into(),
372 ))
373 }
374 }
375
376 /// Returns true if client is currently connected.
377 pub fn connected(&self) -> bool {
378 self.connected.load(Ordering::Relaxed)
379 }
380
381 /// Disconnects from server, if connected.
382 ///
383 /// Returns Err(ButtplugClientError) if disconnection fails. It can be assumed
384 /// that even on failure, the client will be disconnected.
385 pub fn disconnect(&self) -> ButtplugClientResultFuture {
386 if !self.connected() {
387 return future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed();
388 }
389 // Send the connector to the internal loop for management. Once we throw
390 // the connector over, the internal loop will handle connecting and any
391 // further communications with the server, if connection is successful.
392 let fut = ButtplugConnectorFuture::default();
393 let msg = ButtplugClientRequest::Disconnect(fut.get_state_clone());
394 let send_fut = self.message_sender.send_message_to_event_loop(msg);
395 let connected = self.connected.clone();
396 async move {
397 connected.store(false, Ordering::Relaxed);
398 send_fut.await?;
399 Ok(())
400 }
401 .boxed()
402 }
403
404 /// Tells server to start scanning for devices.
405 ///
406 /// Returns Err([ButtplugClientError]) if request fails due to issues with
407 /// DeviceManagers on the server, disconnection, etc.
408 pub fn start_scanning(&self) -> ButtplugClientResultFuture {
409 self
410 .message_sender
411 .send_message_expect_ok(StartScanningV0::default().into())
412 }
413
414 /// Tells server to stop scanning for devices.
415 ///
416 /// Returns Err([ButtplugClientError]) if request fails due to issues with
417 /// DeviceManagers on the server, disconnection, etc.
418 pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
419 self
420 .message_sender
421 .send_message_expect_ok(StopScanningV0::default().into())
422 }
423
424 /// Tells server to stop all devices.
425 ///
426 /// Returns Err([ButtplugClientError]) if request fails due to issues with
427 /// DeviceManagers on the server, disconnection, etc.
428 pub fn stop_all_devices(&self) -> ButtplugClientResultFuture {
429 self
430 .message_sender
431 .send_message_expect_ok(StopAllDevicesV0::default().into())
432 }
433
434 pub fn event_stream(&self) -> impl Stream<Item = ButtplugClientEvent> + use<> {
435 let stream = convert_broadcast_receiver_to_stream(self.event_stream.subscribe());
436 // We can either Box::pin here or force the user to pin_mut!() on their
437 // end. While this does end up with a dynamic dispatch on our end, it
438 // still makes the API nicer for the user, so we'll just eat the perf hit.
439 // Not to mention, this is not a high throughput system really, so it
440 // shouldn't matter.
441 Box::pin(stream)
442 }
443
444 /// Retreives a list of currently connected devices.
445 pub fn devices(&self) -> BTreeMap<u32, ButtplugClientDevice> {
446 self
447 .device_map
448 .iter()
449 .map(|map_pair| (*map_pair.key(), map_pair.value().clone()))
450 .collect()
451 }
452
453 pub fn ping(&self) -> ButtplugClientResultFuture {
454 let ping_fut = self
455 .message_sender
456 .send_message_expect_ok(PingV0::default().into());
457 ping_fut.boxed()
458 }
459
460 pub fn server_name(&self) -> Option<String> {
461 // We'd have to be calling server_name in an extremely tight, asynchronous
462 // loop for this to return None, so we'll treat this as lockless.
463 //
464 // Dear users actually reading this code: This is not an invitation for you
465 // to get the server name in a tight, asynchronous loop. This will never
466 // change throughout the life to the connection.
467 if let Ok(name) = self.server_name.try_lock() {
468 name.clone()
469 } else {
470 None
471 }
472 }
473}