Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use buttplug_core::{
9 connector::ButtplugConnector,
10 errors::ButtplugError,
11 message::{ButtplugMessage, ButtplugMessageValidator, ErrorV0},
12 util::async_manager,
13};
14use buttplug_server::{
15 ButtplugServer,
16 ButtplugServerBuilder,
17 message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant},
18};
19use futures::{FutureExt, StreamExt, future::Future, pin_mut, select};
20use log::*;
21use std::sync::Arc;
22use thiserror::Error;
23use tokio::sync::{Notify, mpsc};
24
25#[derive(Error, Debug)]
26pub enum ButtplugServerConnectorError {
27 #[error("Cannot bring up server for connection: {0}")]
28 ConnectorError(String),
29}
30
31pub struct ButtplugTestServer {
32 server: Arc<ButtplugServer>,
33 disconnect_notifier: Arc<Notify>,
34}
35
36async fn run_server<ConnectorType>(
37 server: Arc<ButtplugServer>,
38 connector: ConnectorType,
39 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
40 disconnect_notifier: Arc<Notify>,
41) where
42 ConnectorType:
43 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
44{
45 info!("Starting remote server loop");
46 let shared_connector = Arc::new(connector);
47 let server_receiver = server.event_stream();
48 pin_mut!(server_receiver);
49 loop {
50 select! {
51 connector_msg = connector_receiver.recv().fuse() => match connector_msg {
52 None => {
53 info!("Connector disconnected, exiting loop.");
54 break;
55 }
56 Some(client_message) => {
57 trace!("Got message from connector: {:?}", client_message);
58 let server_clone = server.clone();
59 let connector_clone = shared_connector.clone();
60 async_manager::spawn(async move {
61 if let Err(e) = client_message.is_valid() {
62 error!("Message not valid: {:?} - Error: {}", client_message, e);
63 let mut err_msg = ErrorV0::from(ButtplugError::from(e));
64 err_msg.set_id(client_message.id());
65 let _ = connector_clone.send(ButtplugServerMessageVariant::V3(err_msg.into())).await;
66 return;
67 }
68 match server_clone.parse_message(client_message.clone()).await {
69 Ok(ret_msg) => {
70 if connector_clone.send(ret_msg).await.is_err() {
71 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
72 }
73 },
74 Err(err_msg) => {
75 if connector_clone.send(err_msg).await.is_err() {
76 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
77 }
78 }
79 }
80 });
81 }
82 },
83 _ = disconnect_notifier.notified().fuse() => {
84 info!("Server disconnected via controller disappearance, exiting loop.");
85 break;
86 },
87 server_msg = server_receiver.next().fuse() => match server_msg {
88 None => {
89 info!("Server disconnected via server disappearance, exiting loop.");
90 break;
91 }
92 Some(msg) => {
93 if shared_connector.send(msg).await.is_err() {
94 error!("Server disappeared, exiting remote server thread.");
95 }
96 }
97 },
98 };
99 }
100 if let Err(err) = server.disconnect().await {
101 error!("Error disconnecting server: {:?}", err);
102 }
103 info!("Exiting remote server loop");
104}
105
106impl Default for ButtplugTestServer {
107 fn default() -> Self {
108 Self::new(
109 ButtplugServerBuilder::default()
110 .finish()
111 .expect("Default is infallible"),
112 )
113 }
114}
115
116impl ButtplugTestServer {
117 pub fn new(server: ButtplugServer) -> Self {
118 Self {
119 server: Arc::new(server),
120 disconnect_notifier: Arc::new(Notify::new()),
121 }
122 }
123
124 pub fn start<ConnectorType>(
125 &self,
126 mut connector: ConnectorType,
127 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>>
128 where
129 ConnectorType:
130 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
131 {
132 let server_clone = self.server.clone();
133 let disconnect_notifier = self.disconnect_notifier.clone();
134 async move {
135 let (connector_sender, connector_receiver) = mpsc::channel(256);
136 connector
137 .connect(connector_sender)
138 .await
139 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
140 run_server(
141 server_clone,
142 connector,
143 connector_receiver,
144 disconnect_notifier,
145 )
146 .await;
147 Ok(())
148 }
149 }
150
151 #[allow(dead_code)]
152 pub async fn disconnect(&self) -> Result<(), ButtplugError> {
153 self.disconnect_notifier.notify_waiters();
154 Ok(())
155 }
156
157 #[allow(dead_code)]
158 pub async fn shutdown(self) -> Result<(), ButtplugError> {
159 self.server.shutdown().await?;
160 Ok(())
161 }
162}
163
164impl Drop for ButtplugTestServer {
165 fn drop(&mut self) {
166 self.disconnect_notifier.notify_waiters();
167 }
168}