Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2022 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use buttplug_core::{
9 connector::ButtplugConnector,
10 errors::ButtplugError,
11 message::ButtplugServerMessageV4,
12 util::{async_manager, stream::convert_broadcast_receiver_to_stream},
13};
14use buttplug_server::{
15 ButtplugServer, ButtplugServerBuilder,
16 message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant},
17};
18use buttplug_server_device_config::UserDeviceIdentifier;
19use dashmap::DashSet;
20use futures::{FutureExt, Stream, StreamExt, future::Future, pin_mut, select};
21use getset::Getters;
22use serde::{Deserialize, Serialize};
23use std::sync::Arc;
24use thiserror::Error;
25use tokio::sync::{
26 Notify,
27 broadcast::{self, Sender},
28 mpsc,
29};
30
31// Clone derived here to satisfy tokio broadcast requirements.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub enum ButtplugRemoteServerEvent {
34 ClientConnected(String),
35 ClientDisconnected,
36 DeviceAdded {
37 index: u32,
38 identifier: UserDeviceIdentifier,
39 name: String,
40 display_name: Option<String>,
41 },
42 DeviceRemoved {
43 index: u32,
44 },
45 //DeviceCommand(ButtplugDeviceCommandMessageUnion)
46}
47
48#[derive(Error, Debug)]
49pub enum ButtplugServerConnectorError {
50 #[error("Cannot bring up server for connection: {0}")]
51 ConnectorError(String),
52}
53
54#[derive(Getters)]
55pub struct ButtplugRemoteServer {
56 #[getset(get = "pub")]
57 server: Arc<ButtplugServer>,
58 #[getset(get = "pub")]
59 event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
60 disconnect_notifier: Arc<Notify>,
61}
62
63async fn run_device_event_stream(
64 server: Arc<ButtplugServer>,
65 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
66) {
67 let server_receiver = server.server_version_event_stream();
68 let known_indexes = DashSet::<u32>::default();
69
70 pin_mut!(server_receiver);
71 loop {
72 match server_receiver.next().await {
73 None => {
74 info!("Server disconnected via server disappearance, exiting loop.");
75 break;
76 }
77 Some(msg) => {
78 if let ButtplugServerMessageV4::DeviceList(dl) = msg
79 && remote_event_sender.receiver_count() > 0
80 {
81 for da in dl.devices() {
82 if known_indexes.contains(&da.1.device_index()) {
83 continue;
84 }
85 if let Some(device_info) = server.device_manager().device_info(da.1.device_index()) {
86 let added_event = ButtplugRemoteServerEvent::DeviceAdded {
87 index: da.1.device_index(),
88 name: da.1.device_name().clone(),
89 identifier: device_info.identifier().clone(),
90 display_name: device_info.display_name().clone(),
91 };
92 if remote_event_sender.send(added_event).is_err() {
93 error!(
94 "Cannot send event to owner, dropping and assuming local server thread has exited."
95 );
96 }
97 known_indexes.insert(da.1.device_index());
98 }
99 }
100 let indexes = known_indexes.clone();
101 let current_indexes: Vec<u32> = dl.devices().keys().cloned().collect();
102 for dr in indexes {
103 if current_indexes.contains(&dr) {
104 continue;
105 }
106 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr };
107 if remote_event_sender.send(removed_event).is_err() {
108 error!(
109 "Cannot send event to owner, dropping and assuming local server thread has exited."
110 );
111 }
112 known_indexes.remove(&dr);
113 }
114 }
115 }
116 }
117 }
118}
119
120async fn run_server<ConnectorType>(
121 server: Arc<ButtplugServer>,
122 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
123 connector: ConnectorType,
124 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
125 disconnect_notifier: Arc<Notify>,
126) where
127 ConnectorType:
128 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
129{
130 info!("Starting remote server loop");
131 let shared_connector = Arc::new(connector);
132 let server_receiver = server.server_version_event_stream();
133 let client_version_receiver = server.event_stream();
134 pin_mut!(server_receiver);
135 pin_mut!(client_version_receiver);
136 loop {
137 select! {
138 connector_msg = connector_receiver.recv().fuse() => match connector_msg {
139 None => {
140 info!("Connector disconnected, exiting loop.");
141 if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
142 warn!("Cannot update remote about client disconnection");
143 }
144 break;
145 }
146 Some(client_message) => {
147 trace!("Got message from connector: {:?}", client_message);
148 let server_clone = server.clone();
149 let connected = server_clone.connected();
150 let connector_clone = shared_connector.clone();
151 let remote_event_sender_clone = remote_event_sender.clone();
152 async_manager::spawn(async move {
153 match server_clone.parse_message(client_message.clone()).await {
154 Ok(ret_msg) => {
155 // Only send event if we just connected. Sucks to check it on every message but the boolean check should be quick.
156 if !connected && server_clone.connected()
157 && remote_event_sender_clone.receiver_count() > 0
158 && remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
159 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
160 }
161 if connector_clone.send(ret_msg).await.is_err() {
162 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
163 }
164 },
165 Err(err_msg) => {
166 if connector_clone.send(err_msg).await.is_err() {
167 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
168 }
169 }
170 }
171 });
172 }
173 },
174 _ = disconnect_notifier.notified().fuse() => {
175 info!("Server disconnected via controller disappearance, exiting loop.");
176 break;
177 },
178 server_msg = server_receiver.next().fuse() => match server_msg {
179 None => {
180 info!("Server disconnected via server disappearance, exiting loop.");
181 break;
182 }
183 Some(msg) => {
184 /*
185 if remote_event_sender.receiver_count() > 0 {
186 match &msg {
187 ButtplugServerMessageV4::DeviceAdded(da) => {
188 if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
189 let added_event = ButtplugRemoteServerEvent::DeviceAdded { index: da.device_index(), name: da.device_name().clone(), identifier: device_info.identifier().clone().into(), display_name: device_info.display_name().clone() };
190 if remote_event_sender.send(added_event).is_err() {
191 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
192 }
193 }
194 },
195 ButtplugServerMessageV4::DeviceRemoved(dr) => {
196 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr.device_index() };
197 if remote_event_sender.send(removed_event).is_err() {
198 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
199 }
200 },
201 _ => {}
202 }
203 }
204 */
205 }
206 },
207 client_msg = client_version_receiver.next().fuse() => match client_msg {
208 None => {
209 info!("Server disconnected via server disappearance, exiting loop.");
210 break;
211 }
212 Some(msg) => {
213 let connector_clone = shared_connector.clone();
214 if connector_clone.send(msg).await.is_err() {
215 error!("Server disappeared, exiting remote server thread.");
216 }
217 }
218 }
219 };
220 }
221 if let Err(err) = server.disconnect().await {
222 error!("Error disconnecting server: {:?}", err);
223 }
224 info!("Exiting remote server loop");
225}
226
227impl Default for ButtplugRemoteServer {
228 fn default() -> Self {
229 Self::new(
230 ButtplugServerBuilder::default()
231 .finish()
232 .expect("Default is infallible"),
233 &None,
234 )
235 }
236}
237
238impl ButtplugRemoteServer {
239 pub fn new(
240 server: ButtplugServer,
241 event_sender: &Option<Sender<ButtplugRemoteServerEvent>>,
242 ) -> Self {
243 let event_sender = if let Some(sender) = event_sender {
244 sender.clone()
245 } else {
246 broadcast::channel(256).0
247 };
248 // Thanks to the existence of the backdoor server, device updates can happen for the lifetime to
249 // the RemoteServer instance, not just during client connect. We need to make sure these are
250 // emitted to the frontend.
251 let server = Arc::new(server);
252 {
253 let server = server.clone();
254 tokio::spawn({
255 let server = server;
256 let event_sender = event_sender.clone();
257 async move {
258 run_device_event_stream(server, event_sender).await;
259 }
260 });
261 }
262 Self {
263 event_sender,
264 server,
265 disconnect_notifier: Arc::new(Notify::new()),
266 }
267 }
268
269 pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> + use<> {
270 convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
271 }
272
273 pub fn start<ConnectorType>(
274 &self,
275 mut connector: ConnectorType,
276 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> + use<ConnectorType>
277 where
278 ConnectorType:
279 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
280 {
281 let server = self.server.clone();
282 let event_sender = self.event_sender.clone();
283 let disconnect_notifier = self.disconnect_notifier.clone();
284 async move {
285 let (connector_sender, connector_receiver) = mpsc::channel(256);
286 // Due to the connect method requiring a mutable connector, we must connect before starting up
287 // our server loop. Anything that needs to happen outside of the client connection session
288 // should happen around this. This flow is locked.
289 connector
290 .connect(connector_sender)
291 .await
292 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
293 run_server(
294 server,
295 event_sender,
296 connector,
297 connector_receiver,
298 disconnect_notifier,
299 )
300 .await;
301 Ok(())
302 }
303 }
304
305 pub async fn disconnect(&self) -> Result<(), ButtplugError> {
306 self.disconnect_notifier.notify_waiters();
307 Ok(())
308 }
309
310 pub async fn shutdown(&self) -> Result<(), ButtplugError> {
311 self.server.shutdown().await?;
312 Ok(())
313 }
314}
315
316impl Drop for ButtplugRemoteServer {
317 fn drop(&mut self) {
318 self.disconnect_notifier.notify_waiters();
319 }
320}