Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! In-process communication between clients and servers
9
10use buttplug_core::{
11 connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorResultFuture},
12 errors::{ButtplugError, ButtplugMessageError},
13 message::{ButtplugClientMessageV4, ButtplugServerMessageV4},
14 util::async_manager,
15};
16use buttplug_server::{
17 ButtplugServer,
18 ButtplugServerBuilder,
19 message::ButtplugServerMessageVariant,
20};
21
22use futures::{
23 StreamExt,
24 future::{self, BoxFuture, FutureExt},
25};
26use futures_util::pin_mut;
27use std::sync::{
28 Arc,
29 atomic::{AtomicBool, Ordering},
30};
31use tokio::sync::mpsc::{Sender, channel};
32use tracing_futures::Instrument;
33
34#[derive(Default)]
35pub struct ButtplugInProcessClientConnectorBuilder {
36 server: Option<ButtplugServer>,
37}
38
39impl ButtplugInProcessClientConnectorBuilder {
40 pub fn server(&mut self, server: ButtplugServer) -> &mut Self {
41 self.server = Some(server);
42 self
43 }
44
45 pub fn finish(&mut self) -> ButtplugInProcessClientConnector {
46 ButtplugInProcessClientConnector::new(self.server.take())
47 }
48}
49
50/// In-process Buttplug Server Connector
51///
52/// The In-Process Connector contains a [ButtplugServer], meaning that both the
53/// [ButtplugClient][crate::client::ButtplugClient] and [ButtplugServer] will exist in the same
54/// process. This is useful for developing applications, or for distributing an applications without
55/// requiring access to an outside [ButtplugServer].
56///
57/// # Notes
58///
59/// Buttplug is built in a way that tries to make sure all programs will work with new versions of
60/// the library. This is why we have [ButtplugClient][crate::client::ButtplugClient] for
61/// applications, and Connectors to access out-of-process [ButtplugServer]s over IPC, network, etc.
62/// It means that the out-of-process server can be upgraded by the user at any time, even if the
63/// [ButtplugClient][crate::client::ButtplugClient] using application hasn't been upgraded. This
64/// allows the program to support hardware that may not have even been released when it was
65/// published.
66///
67/// While including an EmbeddedConnector in your application is the quickest and easiest way to
68/// develop (and we highly recommend developing that way), and also an easy way to get users up and
69/// running as quickly as possible, we recommend also including some sort of IPC Connector in order
70/// for your application to connect to newer servers when they come out.
71#[derive(Clone)]
72pub struct ButtplugInProcessClientConnector {
73 /// Internal server object for the embedded connector.
74 server: Arc<ButtplugServer>,
75 server_outbound_sender: Sender<ButtplugServerMessageV4>,
76 connected: Arc<AtomicBool>,
77}
78
79impl Default for ButtplugInProcessClientConnector {
80 fn default() -> Self {
81 ButtplugInProcessClientConnectorBuilder::default().finish()
82 }
83}
84
85impl ButtplugInProcessClientConnector {
86 /// Creates a new in-process connector, with a server instance.
87 ///
88 /// Sets up a server, using the basic [ButtplugServer] construction arguments.
89 /// Takes the server's name and the ping time it should use, with a ping time
90 /// of 0 meaning infinite ping.
91 fn new(server: Option<ButtplugServer>) -> Self {
92 // Create a dummy channel, will just be overwritten on connect.
93 let (server_outbound_sender, _) = channel(256);
94 Self {
95 server_outbound_sender,
96 server: Arc::new(server.unwrap_or_else(|| {
97 ButtplugServerBuilder::default()
98 .finish()
99 .expect("Default server builder should always work.")
100 })),
101 connected: Arc::new(AtomicBool::new(false)),
102 }
103 }
104}
105
106impl ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4>
107 for ButtplugInProcessClientConnector
108{
109 fn connect(
110 &mut self,
111 message_sender: Sender<ButtplugServerMessageV4>,
112 ) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
113 if !self.connected.load(Ordering::Relaxed) {
114 let connected = self.connected.clone();
115 let send = message_sender.clone();
116 self.server_outbound_sender = message_sender;
117 let server_recv = self.server.server_version_event_stream();
118 async move {
119 async_manager::spawn(async move {
120 info!("Starting In Process Client Connector Event Sender Loop");
121 pin_mut!(server_recv);
122 while let Some(event) = server_recv.next().await {
123 // If we get an error back, it means the client dropped our event
124 // handler, so just stop trying. Otherwise, since this is an
125 // in-process conversion, we can unwrap because we know our
126 // try_into() will always succeed (which may not be the case with
127 // remote connections that have different spec versions).
128 if send.send(event).await.is_err() {
129 break;
130 }
131 }
132 info!("Stopping In Process Client Connector Event Sender Loop, due to channel receiver being dropped.");
133 }.instrument(tracing::info_span!("InProcessClientConnectorEventSenderLoop")));
134 connected.store(true, Ordering::Relaxed);
135 Ok(())
136 }.boxed()
137 } else {
138 ButtplugConnectorError::ConnectorAlreadyConnected.into()
139 }
140 }
141
142 fn disconnect(&self) -> ButtplugConnectorResultFuture {
143 if self.connected.load(Ordering::Relaxed) {
144 self.connected.store(false, Ordering::Relaxed);
145 future::ready(Ok(())).boxed()
146 } else {
147 ButtplugConnectorError::ConnectorNotConnected.into()
148 }
149 }
150
151 fn send(&self, msg: ButtplugClientMessageV4) -> ButtplugConnectorResultFuture {
152 if !self.connected.load(Ordering::Relaxed) {
153 return ButtplugConnectorError::ConnectorNotConnected.into();
154 }
155 let input = msg.into();
156 let output_fut = self.server.parse_message(input);
157 let sender = self.server_outbound_sender.clone();
158 async move {
159 let output = match output_fut.await {
160 Ok(m) => {
161 if let ButtplugServerMessageVariant::V4(msg) = m {
162 msg
163 } else {
164 ButtplugServerMessageV4::Error(
165 ButtplugError::from(ButtplugMessageError::MessageConversionError(
166 "In-process connector messages should never have differing versions.".to_owned(),
167 ))
168 .into(),
169 )
170 }
171 }
172 Err(e) => {
173 if let ButtplugServerMessageVariant::V4(msg) = e {
174 msg
175 } else {
176 ButtplugServerMessageV4::Error(
177 ButtplugError::from(ButtplugMessageError::MessageConversionError(
178 "In-process connector messages should never have differing versions.".to_owned(),
179 ))
180 .into(),
181 )
182 }
183 }
184 };
185 sender
186 .send(output)
187 .await
188 .map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
189 }
190 .boxed()
191 }
192}