Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use super::websocket_server_comm_manager::WebsocketServerDeviceCommManagerInitInfo;
9use async_trait::async_trait;
10use buttplug_core::{errors::ButtplugDeviceError, util::async_manager};
11use buttplug_server::device::hardware::{
12 GenericHardwareSpecializer,
13 Hardware,
14 HardwareConnector,
15 HardwareEvent,
16 HardwareInternal,
17 HardwareReadCmd,
18 HardwareReading,
19 HardwareSpecializer,
20 HardwareSubscribeCmd,
21 HardwareUnsubscribeCmd,
22 HardwareWriteCmd,
23};
24use buttplug_server_device_config::{Endpoint, ProtocolCommunicationSpecifier, WebsocketSpecifier};
25use futures::{
26 FutureExt,
27 SinkExt,
28 StreamExt,
29 future::{self, BoxFuture},
30};
31use std::{
32 fmt::{self, Debug},
33 sync::{
34 Arc,
35 atomic::{AtomicBool, Ordering},
36 },
37 time::Duration,
38};
39use tokio::{
40 net::TcpStream,
41 select,
42 sync::{
43 Mutex,
44 broadcast,
45 mpsc::{Receiver, Sender, channel},
46 },
47 time::sleep,
48};
49use tokio_util::sync::CancellationToken;
50
51async fn run_connection_loop(
52 address: &str,
53 event_sender: broadcast::Sender<HardwareEvent>,
54 ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
55 mut request_receiver: Receiver<Vec<u8>>,
56 response_sender: broadcast::Sender<Vec<u8>>,
57) {
58 info!("Starting websocket server connection event loop.");
59
60 let (mut websocket_server_sender, mut websocket_server_receiver) = ws_stream.split();
61
62 // Start pong count at 1, so we'll clear it after sending our first ping.
63 let mut pong_count = 1u32;
64
65 loop {
66 select! {
67 _ = sleep(Duration::from_millis(10000)) => {
68 if pong_count == 0 {
69 error!("No pongs received, considering connection closed.");
70 break;
71 }
72 pong_count = 0;
73 if websocket_server_sender
74 .send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0).into()))
75 .await
76 .is_err() {
77 error!("Cannot send ping to client, considering connection closed.");
78 break;
79 }
80 }
81 ws_msg = request_receiver.recv() => {
82 if let Some(binary_msg) = ws_msg {
83 if websocket_server_sender
84 .send(tokio_tungstenite::tungstenite::Message::Binary(binary_msg.into()))
85 .await
86 .is_err() {
87 error!("Cannot send binary value to client, considering connection closed.");
88 break;
89 }
90 } else {
91 info!("Websocket server connector owner dropped, disconnecting websocket connection.");
92 break;
93 }
94 }
95 websocket_server_msg = websocket_server_receiver.next() => match websocket_server_msg {
96 Some(ws_data) => {
97 match ws_data {
98 Ok(msg) => {
99 match msg {
100 tokio_tungstenite::tungstenite::Message::Text(text_msg) => {
101 // If someone accidentally packs text, politely turn it into binary for them.
102 let _ = response_sender.send(text_msg.as_bytes().to_vec());
103 }
104 tokio_tungstenite::tungstenite::Message::Binary(binary_msg) => {
105 // If no one is listening, ignore output.
106 let _ = response_sender.send(binary_msg.to_vec());
107 }
108 tokio_tungstenite::tungstenite::Message::Close(_) => {
109 // Drop the error if no one receives the message, we're breaking anyways.
110 let _ = event_sender
111 .send(HardwareEvent::Disconnected(
112 address.to_owned()
113 ));
114 break;
115 }
116 tokio_tungstenite::tungstenite::Message::Ping(_) => {
117 // noop
118 continue;
119 }
120 tokio_tungstenite::tungstenite::Message::Frame(_) => {
121 // noop
122 continue;
123 }
124 tokio_tungstenite::tungstenite::Message::Pong(_) => {
125 pong_count += 1;
126 continue;
127 }
128 }
129 },
130 Err(err) => {
131 error!("Error from websocket server, assuming disconnection: {:?}", err);
132 break;
133 }
134 }
135 },
136 None => {
137 error!("Websocket channel closed, breaking");
138 break;
139 }
140 }
141 }
142 }
143
144 if let Err(e) = websocket_server_sender.close().await {
145 error!("Error closing websocket: {}", e);
146 }
147 debug!("Exiting Websocket Server Device control loop.");
148}
149
150impl Debug for WebsocketServerHardwareConnector {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152 f.debug_struct("WebsocketServerHardwareConnector")
153 .field("info", &self.info)
154 .finish()
155 }
156}
157
158pub struct WebsocketServerHardwareConnector {
159 info: WebsocketServerDeviceCommManagerInitInfo,
160 outgoing_sender: Sender<Vec<u8>>,
161 incoming_broadcaster: broadcast::Sender<Vec<u8>>,
162 device_event_sender: broadcast::Sender<HardwareEvent>,
163}
164
165impl WebsocketServerHardwareConnector {
166 pub fn new(
167 info: WebsocketServerDeviceCommManagerInitInfo,
168 ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
169 ) -> Self {
170 let (outgoing_sender, outgoing_receiver) = channel(256);
171 let (incoming_broadcaster, _) = broadcast::channel(256);
172 let incoming_broadcaster_clone = incoming_broadcaster.clone();
173 let (device_event_sender, _) = broadcast::channel(256);
174 let device_event_sender_clone = device_event_sender.clone();
175 let address = info.address().clone();
176 tokio::spawn(async move {
177 run_connection_loop(
178 &address,
179 device_event_sender_clone,
180 ws_stream,
181 outgoing_receiver,
182 incoming_broadcaster_clone,
183 )
184 .await;
185 });
186 Self {
187 info,
188 outgoing_sender,
189 incoming_broadcaster,
190 device_event_sender,
191 }
192 }
193}
194
195#[async_trait]
196impl HardwareConnector for WebsocketServerHardwareConnector {
197 fn specifier(&self) -> ProtocolCommunicationSpecifier {
198 ProtocolCommunicationSpecifier::Websocket(WebsocketSpecifier::new(self.info.identifier()))
199 }
200
201 async fn connect(&mut self) -> Result<Box<dyn HardwareSpecializer>, ButtplugDeviceError> {
202 let hardware_internal = WebsocketServerHardware::new(
203 self.device_event_sender.clone(),
204 self.info.clone(),
205 self.outgoing_sender.clone(),
206 self.incoming_broadcaster.clone(),
207 );
208 let hardware = Hardware::new(
209 self.info.identifier(),
210 self.info.address(),
211 &[Endpoint::Rx, Endpoint::Tx],
212 &None,
213 false,
214 Box::new(hardware_internal),
215 );
216 Ok(Box::new(GenericHardwareSpecializer::new(hardware)))
217 }
218}
219
220pub struct WebsocketServerHardware {
221 connected: Arc<AtomicBool>,
222 subscribed: Arc<AtomicBool>,
223 subscribe_token: Arc<Mutex<Option<CancellationToken>>>,
224 info: WebsocketServerDeviceCommManagerInitInfo,
225 outgoing_sender: Sender<Vec<u8>>,
226 incoming_broadcaster: broadcast::Sender<Vec<u8>>,
227 device_event_sender: broadcast::Sender<HardwareEvent>,
228}
229
230impl WebsocketServerHardware {
231 pub fn new(
232 device_event_sender: broadcast::Sender<HardwareEvent>,
233 info: WebsocketServerDeviceCommManagerInitInfo,
234 outgoing_sender: Sender<Vec<u8>>,
235 incoming_broadcaster: broadcast::Sender<Vec<u8>>,
236 ) -> Self {
237 Self {
238 connected: Arc::new(AtomicBool::new(true)),
239 info,
240 outgoing_sender,
241 incoming_broadcaster,
242 device_event_sender,
243 subscribed: Arc::new(AtomicBool::new(false)),
244 subscribe_token: Arc::new(Mutex::new(None)),
245 }
246 }
247}
248
249impl HardwareInternal for WebsocketServerHardware {
250 fn event_stream(&self) -> broadcast::Receiver<HardwareEvent> {
251 self.device_event_sender.subscribe()
252 }
253
254 fn disconnect(&self) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
255 let connected = self.connected.clone();
256 async move {
257 connected.store(false, Ordering::Relaxed);
258 Ok(())
259 }
260 .boxed()
261 }
262
263 fn read_value(
264 &self,
265 _msg: &HardwareReadCmd,
266 ) -> BoxFuture<'static, Result<HardwareReading, ButtplugDeviceError>> {
267 future::ready(Err(ButtplugDeviceError::UnhandledCommand(
268 "Websocket Hardware does not support read".to_owned(),
269 )))
270 .boxed()
271 }
272
273 fn write_value(
274 &self,
275 msg: &HardwareWriteCmd,
276 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
277 let sender = self.outgoing_sender.clone();
278 let data = msg.data().clone();
279 // TODO Should check endpoint validity
280 async move {
281 sender.send(data).await.map_err(|err| {
282 ButtplugDeviceError::DeviceCommunicationError(format!(
283 "Could not write value to websocket device: {err}"
284 ))
285 })
286 }
287 .boxed()
288 }
289
290 fn subscribe(
291 &self,
292 _msg: &HardwareSubscribeCmd,
293 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
294 if self.subscribed.load(Ordering::Relaxed) {
295 error!("Endpoint already subscribed somehow!");
296 return future::ready(Ok(())).boxed();
297 }
298 // TODO Should check endpoint validity
299 let mut data_receiver = self.incoming_broadcaster.subscribe();
300 let event_sender = self.device_event_sender.clone();
301 let address = self.info.address().clone();
302 let subscribed = self.subscribed.clone();
303 let subscribed_token = self.subscribe_token.clone();
304 async move {
305 subscribed.store(true, Ordering::Relaxed);
306 let token = CancellationToken::new();
307 *(subscribed_token.lock().await) = Some(token.child_token());
308 async_manager::spawn(async move {
309 loop {
310 select! {
311 result = data_receiver.recv().fuse() => {
312 match result {
313 Ok(data) => {
314 debug!("Got websocket data! {:?}", data);
315 // We don't really care if there's no one to send the error to here.
316 let _ = event_sender
317 .send(HardwareEvent::Notification(
318 address.clone(),
319 Endpoint::Tx,
320 data,
321 ));
322 },
323 Err(_) => break,
324 }
325 },
326 _ = token.cancelled().fuse() => {
327 break;
328 }
329 }
330 }
331 info!("Data channel closed, ending websocket server device listener task");
332 });
333 Ok(())
334 }
335 .boxed()
336 }
337
338 fn unsubscribe(
339 &self,
340 _msg: &HardwareUnsubscribeCmd,
341 ) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
342 if self.subscribed.load(Ordering::Relaxed) {
343 let subscribed = self.subscribed.clone();
344 let subscribed_token = self.subscribe_token.clone();
345 async move {
346 subscribed.store(false, Ordering::Relaxed);
347 let token = (subscribed_token.lock().await)
348 .take()
349 .expect("If we were subscribed, we'll have a token.");
350 token.cancel();
351 Ok(())
352 }
353 .boxed()
354 } else {
355 future::ready(Err(ButtplugDeviceError::DeviceCommunicationError(
356 "Device not subscribed.".to_owned(),
357 )))
358 .boxed()
359 }
360 }
361}