fork
Configure Feed
Select the types of activity you want to include in your feed.
Buttplug sex toy control library
fork
Configure Feed
Select the types of activity you want to include in your feed.
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2022 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use buttplug_core::{
9 connector::ButtplugConnector,
10 errors::ButtplugError,
11 message::ButtplugServerMessageV4,
12 util::{async_manager, stream::convert_broadcast_receiver_to_stream},
13};
14use buttplug_server::{
15 ButtplugServer, ButtplugServerBuilder,
16 message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant},
17};
18use buttplug_server_device_config::UserDeviceIdentifier;
19use dashmap::DashSet;
20use futures::{FutureExt, Stream, StreamExt, future::Future, pin_mut, select};
21use getset::Getters;
22use serde::{Deserialize, Serialize};
23use std::sync::Arc;
24use thiserror::Error;
25use tokio::sync::{
26 Notify,
27 broadcast::{self, Sender},
28 mpsc,
29};
30
31// Clone derived here to satisfy tokio broadcast requirements.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub enum ButtplugRemoteServerEvent {
34 ClientConnected(String),
35 ClientDisconnected,
36 DeviceAdded {
37 index: u32,
38 identifier: UserDeviceIdentifier,
39 name: String,
40 display_name: Option<String>,
41 },
42 DeviceRemoved {
43 index: u32,
44 },
45 //DeviceCommand(ButtplugDeviceCommandMessageUnion)
46}
47
48#[derive(Error, Debug)]
49pub enum ButtplugServerConnectorError {
50 #[error("Cannot bring up server for connection: {0}")]
51 ConnectorError(String),
52}
53
54#[derive(Getters)]
55pub struct ButtplugRemoteServer {
56 #[getset(get = "pub")]
57 server: Arc<ButtplugServer>,
58 #[getset(get = "pub")]
59 event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
60 disconnect_notifier: Arc<Notify>,
61}
62
63async fn run_device_event_stream(
64 server: Arc<ButtplugServer>,
65 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
66) {
67 let server_receiver = server.server_version_event_stream();
68 let known_indexes = DashSet::<u32>::default();
69
70 pin_mut!(server_receiver);
71 loop {
72 match server_receiver.next().await {
73 None => {
74 info!("Server disconnected via server disappearance, exiting loop.");
75 break;
76 }
77 Some(msg) => {
78 if let ButtplugServerMessageV4::DeviceList(dl) = msg
79 && remote_event_sender.receiver_count() > 0
80 {
81 for da in dl.devices() {
82 if known_indexes.contains(&da.1.device_index()) {
83 continue;
84 }
85 if let Some(device_info) = server.device_manager().device_info(da.1.device_index()) {
86 let added_event = ButtplugRemoteServerEvent::DeviceAdded {
87 index: da.1.device_index(),
88 name: da.1.device_name().clone(),
89 identifier: device_info.identifier().clone(),
90 display_name: device_info.display_name().clone(),
91 };
92 if remote_event_sender.send(added_event).is_err() {
93 error!(
94 "Cannot send event to owner, dropping and assuming local server thread has exited."
95 );
96 }
97 known_indexes.insert(da.1.device_index());
98 }
99 }
100 let indexes = known_indexes.clone();
101 let current_indexes: Vec<u32> = dl.devices().keys().cloned().collect();
102 for dr in indexes {
103 if current_indexes.contains(&dr) {
104 continue;
105 }
106 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr };
107 if remote_event_sender.send(removed_event).is_err() {
108 error!(
109 "Cannot send event to owner, dropping and assuming local server thread has exited."
110 );
111 }
112 known_indexes.remove(&dr);
113 }
114 }
115 }
116 }
117 }
118}
119
120async fn run_server<ConnectorType>(
121 server: Arc<ButtplugServer>,
122 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
123 connector: ConnectorType,
124 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
125 disconnect_notifier: Arc<Notify>,
126) where
127 ConnectorType:
128 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
129{
130 info!("Starting remote server loop");
131 let shared_connector = Arc::new(connector);
132 let server_receiver = server.server_version_event_stream();
133 let client_version_receiver = server.event_stream();
134 pin_mut!(server_receiver);
135 pin_mut!(client_version_receiver);
136 loop {
137 select! {
138 connector_msg = connector_receiver.recv().fuse() => match connector_msg {
139 None => {
140 info!("Connector disconnected, exiting loop.");
141 if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
142 warn!("Cannot update remote about client disconnection");
143 }
144 break;
145 }
146 Some(client_message) => {
147 trace!("Got message from connector: {:?}", client_message);
148 let server_clone = server.clone();
149 let connected = server_clone.connected();
150 let connector_clone = shared_connector.clone();
151 let remote_event_sender_clone = remote_event_sender.clone();
152 async_manager::spawn(async move {
153 match server_clone.parse_message(client_message.clone()).await {
154 Ok(ret_msg) => {
155 // Only send event if we just connected. Sucks to check it on every message but the boolean check should be quick.
156 if !connected && server_clone.connected()
157 && remote_event_sender_clone.receiver_count() > 0
158 && remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
159 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
160 }
161 if connector_clone.send(ret_msg).await.is_err() {
162 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
163 }
164 },
165 Err(err_msg) => {
166 if connector_clone.send(err_msg).await.is_err() {
167 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
168 }
169 }
170 }
171 });
172 }
173 },
174 _ = disconnect_notifier.notified().fuse() => {
175 info!("Server disconnected via controller disappearance, exiting loop.");
176 break;
177 },
178 server_msg = server_receiver.next().fuse() => match server_msg {
179 None => {
180 info!("Server disconnected via server disappearance, exiting loop.");
181 break;
182 }
183 Some(msg) => {
184 /*
185 if remote_event_sender.receiver_count() > 0 {
186 match &msg {
187 ButtplugServerMessageV4::DeviceAdded(da) => {
188 if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
189 let added_event = ButtplugRemoteServerEvent::DeviceAdded { index: da.device_index(), name: da.device_name().clone(), identifier: device_info.identifier().clone().into(), display_name: device_info.display_name().clone() };
190 if remote_event_sender.send(added_event).is_err() {
191 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
192 }
193 }
194 },
195 ButtplugServerMessageV4::DeviceRemoved(dr) => {
196 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr.device_index() };
197 if remote_event_sender.send(removed_event).is_err() {
198 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
199 }
200 },
201 _ => {}
202 }
203 }
204 */
205 }
206 },
207 client_msg = client_version_receiver.next().fuse() => match client_msg {
208 None => {
209 info!("Server disconnected via server disappearance, exiting loop.");
210 break;
211 }
212 Some(msg) => {
213 let connector_clone = shared_connector.clone();
214 if connector_clone.send(msg).await.is_err() {
215 error!("Server disappeared, exiting remote server thread.");
216 }
217 }
218 }
219 };
220 }
221 if let Err(err) = server.disconnect().await {
222 error!("Error disconnecting server: {:?}", err);
223 }
224 info!("Exiting remote server loop");
225}
226
227impl Default for ButtplugRemoteServer {
228 fn default() -> Self {
229 Self::new(
230 ButtplugServerBuilder::default()
231 .finish()
232 .expect("Default is infallible"),
233 &None,
234 )
235 }
236}
237
238impl ButtplugRemoteServer {
239 pub fn new(
240 server: ButtplugServer,
241 event_sender: &Option<Sender<ButtplugRemoteServerEvent>>,
242 ) -> Self {
243 let event_sender = if let Some(sender) = event_sender {
244 sender.clone()
245 } else {
246 broadcast::channel(256).0
247 };
248 // Thanks to the existence of the backdoor server, device updates can happen for the lifetime to
249 // the RemoteServer instance, not just during client connect. We need to make sure these are
250 // emitted to the frontend.
251 let server = Arc::new(server);
252 {
253 let server = server.clone();
254 tokio::spawn({
255 let server = server;
256 let event_sender = event_sender.clone();
257 async move {
258 run_device_event_stream(server, event_sender).await;
259 }
260 });
261 }
262 Self {
263 event_sender,
264 server,
265 disconnect_notifier: Arc::new(Notify::new()),
266 }
267 }
268
269 pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> + use<> {
270 convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
271 }
272
273 pub fn start<ConnectorType>(
274 &self,
275 mut connector: ConnectorType,
276 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> + use<ConnectorType>
277 where
278 ConnectorType:
279 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
280 {
281 let server = self.server.clone();
282 let event_sender = self.event_sender.clone();
283 let disconnect_notifier = self.disconnect_notifier.clone();
284 async move {
285 let (connector_sender, connector_receiver) = mpsc::channel(256);
286 // Due to the connect method requiring a mutable connector, we must connect before starting up
287 // our server loop. Anything that needs to happen outside of the client connection session
288 // should happen around this. This flow is locked.
289 connector
290 .connect(connector_sender)
291 .await
292 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
293 run_server(
294 server,
295 event_sender,
296 connector,
297 connector_receiver,
298 disconnect_notifier,
299 )
300 .await;
301 Ok(())
302 }
303 }
304
305 pub async fn disconnect(&self) -> Result<(), ButtplugError> {
306 self.disconnect_notifier.notify_waiters();
307 Ok(())
308 }
309
310 pub async fn shutdown(&self) -> Result<(), ButtplugError> {
311 self.server.shutdown().await?;
312 Ok(())
313 }
314}
315
316impl Drop for ButtplugRemoteServer {
317 fn drop(&mut self) {
318 self.disconnect_notifier.notify_waiters();
319 }
320}