Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use crate::server_message_conversion::ButtplugServerDeviceEventMessageConverter;
9
10use super::{
11 ButtplugServerResultFuture,
12 device::ServerDeviceManager,
13 message::{
14 ButtplugClientMessageVariant,
15 ButtplugServerMessageVariant,
16 server_device_attributes::TryFromClientMessage,
17 spec_enums::{
18 ButtplugCheckedClientMessageV4,
19 ButtplugDeviceCommandMessageUnionV4,
20 ButtplugDeviceManagerMessageUnion,
21 },
22 },
23 ping_timer::PingTimer,
24 server_message_conversion::ButtplugServerMessageConverter,
25};
26use buttplug_core::{
27 errors::*,
28 message::{
29 self,
30 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
31 ButtplugMessage,
32 ButtplugMessageSpecVersion,
33 ButtplugServerMessageV4,
34 ErrorV0,
35 StopAllDevicesV0,
36 StopScanningV0,
37 },
38 util::stream::convert_broadcast_receiver_to_stream,
39};
40use futures::{
41 Stream,
42 future::{self, BoxFuture, FutureExt},
43};
44use once_cell::sync::OnceCell;
45use std::{
46 fmt,
47 sync::{
48 Arc,
49 atomic::{AtomicBool, Ordering},
50 },
51};
52use tokio::sync::broadcast;
53use tokio_stream::StreamExt;
54use tracing::info_span;
55use tracing_futures::Instrument;
56
57/// The server side of the Buttplug protocol. Frontend for connection to device management and
58/// communication.
59pub struct ButtplugServer {
60 /// The name of the server, which is relayed to the client on connection (mostly for
61 /// confirmation in UI dialogs)
62 server_name: String,
63 /// The maximum ping time, in milliseconds, for the server. If the server does not receive a
64 /// [Ping](buttplug_core::messages::Ping) message in this amount of time after the handshake has
65 /// succeeded, the server will automatically disconnect. If this is not called, the ping timer
66 /// will not be activated.
67 ///
68 /// Note that this has nothing to do with communication medium specific pings, like those built
69 /// into the Websocket protocol. This ping is specific to the Buttplug protocol.
70 max_ping_time: u32,
71 /// Timer for managing ping time tracking, if max_ping_time > 0.
72 ping_timer: Arc<PingTimer>,
73 /// Manages device discovery and communication.
74 device_manager: Arc<ServerDeviceManager>,
75 /// If true, client is currently connected to server
76 connected: Arc<AtomicBool>,
77 /// Broadcaster for server events. Receivers for this are handed out through the
78 /// [ButtplugServer::event_stream()] method.
79 output_sender: broadcast::Sender<ButtplugServerMessageV4>,
80 /// Name of the connected client, assuming there is one.
81 client_name: Arc<OnceCell<String>>,
82 /// Current spec version for the connected client
83 spec_version: Arc<OnceCell<ButtplugMessageSpecVersion>>,
84}
85
86impl std::fmt::Debug for ButtplugServer {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("ButtplugServer")
89 .field("server_name", &self.server_name)
90 .field("max_ping_time", &self.max_ping_time)
91 .field("connected", &self.connected)
92 .finish()
93 }
94}
95
96impl ButtplugServer {
97 pub(super) fn new(
98 server_name: &str,
99 max_ping_time: u32,
100 ping_timer: Arc<PingTimer>,
101 device_manager: Arc<ServerDeviceManager>,
102 connected: Arc<AtomicBool>,
103 output_sender: broadcast::Sender<ButtplugServerMessageV4>,
104 ) -> Self {
105 ButtplugServer {
106 server_name: server_name.to_owned(),
107 max_ping_time,
108 ping_timer,
109 device_manager,
110 connected,
111 output_sender,
112 client_name: Arc::new(OnceCell::new()),
113 spec_version: Arc::new(OnceCell::new()),
114 }
115 }
116
117 pub fn client_name(&self) -> Option<String> {
118 self.client_name.get().cloned()
119 }
120
121 /// Retreive an async stream of ButtplugServerMessages. This is how the server sends out
122 /// non-query-related updates to the system, including information on devices being added/removed,
123 /// client disconnection, etc...
124 pub fn event_stream(&self) -> impl Stream<Item = ButtplugServerMessageVariant> + use<> {
125 let spec_version = self.spec_version.clone();
126 let converter = ButtplugServerMessageConverter::new(None);
127 let device_indexes: Vec<u32> = self
128 .device_manager
129 .devices()
130 .iter()
131 .map(|x| *x.key())
132 .collect();
133 let device_event_converter = ButtplugServerDeviceEventMessageConverter::new(device_indexes);
134 self.server_version_event_stream().map(move |m| {
135 if let ButtplugServerMessageV4::DeviceList(list) = m {
136 device_event_converter.convert_device_list(
137 spec_version
138 .get()
139 .unwrap_or(&ButtplugMessageSpecVersion::Version4),
140 &list,
141 )
142 } else {
143 // If we get an event and don't have a spec version yet, just throw out the latest.
144 converter
145 .convert_outgoing(
146 &m,
147 spec_version
148 .get()
149 .unwrap_or(&ButtplugMessageSpecVersion::Version4),
150 )
151 .unwrap()
152 }
153 })
154 }
155
156 /// Retreive an async stream of ButtplugServerMessages, always at the latest available message
157 /// spec. This is how the server sends out non-query-related updates to the system, including
158 /// information on devices being added/removed, client disconnection, etc...
159 pub fn server_version_event_stream(&self) -> impl Stream<Item = ButtplugServerMessageV4> + use<> {
160 // Unlike the client API, we can expect anyone using the server to pin this
161 // themselves.
162 let server_receiver = convert_broadcast_receiver_to_stream(self.output_sender.subscribe());
163 let device_receiver = self.device_manager.event_stream();
164 device_receiver.merge(server_receiver)
165 }
166
167 /// Returns a references to the internal device manager, for handling configuration.
168 pub fn device_manager(&self) -> Arc<ServerDeviceManager> {
169 self.device_manager.clone()
170 }
171
172 /// If true, client is currently connected to the server.
173 pub fn connected(&self) -> bool {
174 self.connected.load(Ordering::Relaxed)
175 }
176
177 /// Disconnects the server from a client, if it is connected.
178 pub fn disconnect(&self) -> BoxFuture<'_, Result<(), message::ErrorV0>> {
179 debug!("Buttplug Server {} disconnect requested", self.server_name);
180 let ping_timer = self.ping_timer.clone();
181 // As long as StopScanning/StopAllDevices aren't changed across message specs, we can inject
182 // them using parse_checked_message and bypass version checking.
183 let stop_scanning_fut = self.parse_checked_message(
184 ButtplugCheckedClientMessageV4::StopScanning(StopScanningV0::default()),
185 );
186 let stop_fut = self.parse_checked_message(ButtplugCheckedClientMessageV4::StopAllDevices(
187 StopAllDevicesV0::default(),
188 ));
189 let connected = self.connected.clone();
190 async move {
191 connected.store(false, Ordering::Relaxed);
192 ping_timer.stop_ping_timer().await;
193 // Ignore returns here, we just want to stop.
194 info!("Server disconnected, stopping device scanning if it was started...");
195 let _ = stop_scanning_fut.await;
196 info!("Server disconnected, stopping all devices...");
197 let _ = stop_fut.await;
198 Ok(())
199 }
200 .boxed()
201 }
202
203 pub fn shutdown(&self) -> ButtplugServerResultFuture {
204 let device_manager = self.device_manager.clone();
205 //let disconnect_future = self.disconnect();
206 async move { device_manager.shutdown().await }.boxed()
207 }
208
209 /// Sends a [ButtplugClientMessage] to be parsed by the server (for handshake or ping), or passed
210 /// into the server's [DeviceManager] for communication with devices.
211 pub fn parse_message(
212 &self,
213 msg: ButtplugClientMessageVariant,
214 ) -> BoxFuture<'static, Result<ButtplugServerMessageVariant, ButtplugServerMessageVariant>> {
215 let features = self.device_manager().feature_map();
216 let msg_id = msg.id();
217 debug!("Server received: {:?}", msg);
218 match msg {
219 ButtplugClientMessageVariant::V4(msg) => {
220 let internal_msg =
221 match ButtplugCheckedClientMessageV4::try_from_client_message(msg, &features) {
222 Ok(m) => m,
223 Err(e) => {
224 let mut err_msg = ErrorV0::from(e);
225 err_msg.set_id(msg_id);
226 return future::ready(Err(ButtplugServerMessageVariant::from(
227 ButtplugServerMessageV4::from(err_msg),
228 )))
229 .boxed();
230 }
231 };
232 let fut = self.parse_checked_message(internal_msg);
233 async move {
234 Ok(
235 fut
236 .await
237 .map_err(|e| ButtplugServerMessageVariant::from(ButtplugServerMessageV4::from(e)))?
238 .into(),
239 )
240 }
241 .boxed()
242 }
243 msg => {
244 let v = msg.version();
245 let converter = ButtplugServerMessageConverter::new(Some(msg.clone()));
246 let spec_version = *self.spec_version.get_or_init(|| {
247 info!(
248 "Setting Buttplug Server Message Spec Downgrade version to {}",
249 v
250 );
251 v
252 });
253 match ButtplugCheckedClientMessageV4::try_from_client_message(msg, &features) {
254 Ok(converted_msg) => {
255 debug!("Converted message: {:?}", converted_msg);
256 let fut = self.parse_checked_message(converted_msg);
257 async move {
258 let result = fut.await.map_err(|e| {
259 converter
260 .convert_outgoing(&e.into(), &spec_version)
261 .unwrap()
262 })?;
263 let out_msg = converter
264 .convert_outgoing(&result, &spec_version)
265 .map_err(|e| {
266 converter
267 .convert_outgoing(
268 &ButtplugServerMessageV4::from(ErrorV0::from(e)),
269 &spec_version,
270 )
271 .unwrap()
272 });
273 debug!("Server returning: {:?}", out_msg);
274 out_msg
275 }
276 .boxed()
277 }
278 Err(e) => {
279 let mut err_msg = ErrorV0::from(e);
280 err_msg.set_id(msg_id);
281
282 future::ready(Err(
283 converter
284 .convert_outgoing(&ButtplugServerMessageV4::from(err_msg), &spec_version)
285 .unwrap(),
286 ))
287 .boxed()
288 }
289 }
290 }
291 }
292 }
293
294 pub fn parse_checked_message(
295 &self,
296 msg: ButtplugCheckedClientMessageV4,
297 ) -> BoxFuture<'static, Result<ButtplugServerMessageV4, message::ErrorV0>> {
298 trace!(
299 "Buttplug Server {} received message to client parse: {:?}",
300 self.server_name, msg
301 );
302 let id = msg.id();
303 if !self.connected() {
304 // Check for ping timeout first! There's no way we should've pinged out if
305 // we haven't received RequestServerInfo first, but we do want to know if
306 // we pinged out.
307 let error = if self.ping_timer.pinged_out() {
308 Some(message::ErrorV0::from(ButtplugError::from(
309 ButtplugPingError::PingedOut,
310 )))
311 } else if !matches!(msg, ButtplugCheckedClientMessageV4::RequestServerInfo(_)) {
312 Some(message::ErrorV0::from(ButtplugError::from(
313 ButtplugHandshakeError::RequestServerInfoExpected,
314 )))
315 } else {
316 None
317 };
318 if let Some(mut return_error) = error {
319 return_error.set_id(msg.id());
320 return future::ready(Err(return_error)).boxed();
321 }
322 // If we haven't pinged out and we got an RSI message, fall thru.
323 }
324 // Produce whatever future is needed to reply to the message, this may be a
325 // device command future, or something the server handles. All futures will
326 // return Result<ButtplugServerMessage, ButtplugError>, and we'll handle
327 // tagging the result with the message id in the future we put out as the
328 // return value from this method.
329 let out_fut = if ButtplugDeviceManagerMessageUnion::try_from(msg.clone()).is_ok()
330 || ButtplugDeviceCommandMessageUnionV4::try_from(msg.clone()).is_ok()
331 {
332 self.device_manager.parse_message(msg.clone())
333 } else {
334 match msg {
335 ButtplugCheckedClientMessageV4::RequestServerInfo(rsi_msg) => {
336 self.perform_handshake(rsi_msg)
337 }
338 ButtplugCheckedClientMessageV4::Ping(p) => self.handle_ping(p),
339 _ => ButtplugMessageError::UnexpectedMessageType(format!("{msg:?}")).into(),
340 }
341 };
342 // Simple way to set the ID on the way out. Just rewrap
343 // the returned future to make sure it happens.
344 async move {
345 out_fut
346 .await
347 .map(|mut ok_msg| {
348 ok_msg.set_id(id);
349 trace!("Server returning message: {:?}", ok_msg);
350 ok_msg
351 })
352 .map_err(|err| {
353 let mut error = message::ErrorV0::from(err);
354 error.set_id(id);
355 error
356 })
357 }
358 .instrument(info_span!("Buttplug Server Message", id = id))
359 .boxed()
360 }
361
362 /// Performs the [RequestServerInfo]([ServerInfo](buttplug_core::message::RequestServerInfo) /
363 /// [ServerInfo](buttplug_core::message::ServerInfo) handshake, as specified in the [Buttplug
364 /// Protocol Spec](https://buttplug-spec.docs.buttplug.io). This is the first thing that must
365 /// happens upon connection to the server, in order to make sure the server can speak the same
366 /// protocol version as the client.
367 fn perform_handshake(&self, msg: message::RequestServerInfoV4) -> ButtplugServerResultFuture {
368 if self.connected() {
369 return ButtplugHandshakeError::HandshakeAlreadyHappened.into();
370 }
371 if !self.connected() && self.client_name.get().is_some() {
372 return ButtplugHandshakeError::ReconnectDenied.into();
373 }
374 info!(
375 "Performing server handshake check with client {} at message version {}.{}",
376 msg.client_name(),
377 msg.protocol_version_major(),
378 msg.protocol_version_minor()
379 );
380
381 if BUTTPLUG_CURRENT_API_MAJOR_VERSION < msg.protocol_version_major() {
382 return ButtplugHandshakeError::MessageSpecVersionMismatch(
383 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
384 msg.protocol_version_major(),
385 )
386 .into();
387 }
388
389 // Only start the ping timer after we've received the handshake.
390 let ping_timer = self.ping_timer.clone();
391
392 // Due to programming/spec errors in prior versions of the protocol, anything before v4 expected
393 // that it would be back a matching api version of the server. The correct response is to send back whatever the
394 let output_version = if (msg.protocol_version_major() as u32) < 4 {
395 msg.protocol_version_major()
396 } else {
397 BUTTPLUG_CURRENT_API_MAJOR_VERSION
398 };
399 let out_msg =
400 message::ServerInfoV4::new(&self.server_name, output_version, 0, self.max_ping_time);
401 let connected = self.connected.clone();
402 self
403 .client_name
404 .set(msg.client_name().to_owned())
405 .expect("We should never conflict on name access");
406 async move {
407 ping_timer.start_ping_timer().await;
408 connected.store(true, Ordering::Relaxed);
409 debug!("Server handshake check successful.");
410 Result::Ok(out_msg.into())
411 }
412 .boxed()
413 }
414
415 /// Update the [PingTimer] with the latest received ping message.
416 fn handle_ping(&self, msg: message::PingV0) -> ButtplugServerResultFuture {
417 if self.max_ping_time == 0 {
418 return ButtplugPingError::PingTimerNotRunning.into();
419 }
420 let fut = self.ping_timer.update_ping_time();
421 async move {
422 fut.await;
423 Result::Ok(message::OkV0::new(msg.id()).into())
424 }
425 .boxed()
426 }
427}
428
429#[cfg(test)]
430mod test {
431 use crate::ButtplugServerBuilder;
432 use buttplug_core::message::{self, BUTTPLUG_CURRENT_API_MAJOR_VERSION};
433 #[tokio::test]
434 async fn test_server_deny_reuse() {
435 let server = ButtplugServerBuilder::default().finish().unwrap();
436 let msg =
437 message::RequestServerInfoV4::new("Test Client", BUTTPLUG_CURRENT_API_MAJOR_VERSION, 0);
438 let mut reply = server.parse_checked_message(msg.clone().into()).await;
439 assert!(reply.is_ok(), "Should get back ok: {:?}", reply);
440
441 reply = server.parse_checked_message(msg.clone().into()).await;
442 assert!(
443 reply.is_err(),
444 "Should get back err on double handshake: {:?}",
445 reply
446 );
447 assert!(server.disconnect().await.is_ok(), "Should disconnect ok");
448
449 reply = server.parse_checked_message(msg.clone().into()).await;
450 assert!(
451 reply.is_err(),
452 "Should get back err on handshake after disconnect: {:?}",
453 reply
454 );
455 }
456}