Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! Implementation of internal Buttplug Client event loop.
9
10use super::{
11 ButtplugClientEvent,
12 ButtplugClientMessageFuturePair,
13 ButtplugClientMessageSender,
14 client_message_sorter::ClientMessageSorter,
15 device::{ButtplugClientDevice, ButtplugClientDeviceEvent},
16};
17use buttplug_core::{
18 connector::{ButtplugConnector, ButtplugConnectorStateShared},
19 errors::ButtplugError,
20 message::{
21 ButtplugClientMessageV4,
22 ButtplugDeviceMessage,
23 ButtplugMessageValidator,
24 ButtplugServerMessageV4,
25 DeviceListV4,
26 DeviceMessageInfoV4,
27 },
28};
29use dashmap::DashMap;
30use log::*;
31use std::sync::{
32 Arc,
33 atomic::{AtomicBool, Ordering},
34};
35use tokio::{
36 select,
37 sync::{broadcast, mpsc},
38};
39
40/// Enum used for communication from the client to the event loop.
41#[derive(Clone)]
42pub enum ButtplugClientRequest {
43 /// Client request to disconnect, via already sent connector instance.
44 Disconnect(ButtplugConnectorStateShared),
45 /// Given a DeviceList message, update the inner loop values and create
46 /// events for additions.
47 HandleDeviceList(DeviceListV4),
48 /// Client request to send a message via the connector.
49 ///
50 /// Bundled future should have reply set and waker called when this is
51 /// finished.
52 Message(ButtplugClientMessageFuturePair),
53}
54
55/// Event loop for running [ButtplugClient] connections.
56///
57/// Acts as a hub for communication between the connector and [ButtplugClient]
58/// instances.
59///
60/// Created whenever a new [super::ButtplugClient] is created, the internal loop
61/// handles connection and communication with the server through the connector,
62/// and creation of events received from the server.
63///
64/// The event_loop does a few different things during its lifetime:
65///
66/// - It will listen for events from the connector, or messages from the client,
67/// routing them to their proper receivers until either server/client
68/// disconnects.
69///
70/// - On disconnect, it will tear down, and cannot be used again. All clients
71/// and devices associated with the loop will be invalidated, and connect must
72/// be called on the client again (or a new client should be created).
73///
74/// # Why an event loop?
75///
76/// Due to the async nature of Buttplug, we many channels routed to many
77/// different tasks. However, all of those tasks will refer to the same event
78/// loop. This allows us to coordinate and centralize our information while
79/// keeping the API async.
80///
81/// Note that no async call here should block. Any .await should only be on
82/// async channels, and those channels should never have backpressure. We hope.
83pub(super) struct ButtplugClientEventLoop<ConnectorType>
84where
85 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
86{
87 /// Connected status from client, managed by the event loop in case of disconnect.
88 connected_status: Arc<AtomicBool>,
89 /// Connector the event loop will use to communicate with the [ButtplugServer]
90 connector: ConnectorType,
91 /// Receiver for messages send from the [ButtplugServer] via the connector.
92 from_connector_receiver: mpsc::Receiver<ButtplugServerMessageV4>,
93 /// Map of devices shared between the client and the event loop
94 device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
95 /// Sends events to the [ButtplugClient] instance.
96 to_client_sender: broadcast::Sender<ButtplugClientEvent>,
97 /// Sends events to the client receiver. Stored here so it can be handed to
98 /// new ButtplugClientDevice instances.
99 from_client_sender: ButtplugClientMessageSender,
100 /// Receives incoming messages from client instances.
101 from_client_receiver: broadcast::Receiver<ButtplugClientRequest>,
102 sorter: ClientMessageSorter,
103}
104
105impl<ConnectorType> ButtplugClientEventLoop<ConnectorType>
106where
107 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
108{
109 /// Creates a new [ButtplugClientEventLoop].
110 ///
111 /// Given the [ButtplugClientConnector] object, as well as the channels used
112 /// for communicating with the client, creates an event loop structure and
113 /// returns it.
114 pub fn new(
115 connected_status: Arc<AtomicBool>,
116 connector: ConnectorType,
117 from_connector_receiver: mpsc::Receiver<ButtplugServerMessageV4>,
118 to_client_sender: broadcast::Sender<ButtplugClientEvent>,
119 from_client_sender: ButtplugClientMessageSender,
120 device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
121 ) -> Self {
122 trace!("Creating ButtplugClientEventLoop instance.");
123 Self {
124 connected_status,
125 device_map,
126 from_client_receiver: from_client_sender.subscribe(),
127 from_client_sender,
128 to_client_sender,
129 from_connector_receiver,
130 connector,
131 sorter: ClientMessageSorter::default(),
132 }
133 }
134
135 /// Creates a [ButtplugClientDevice] from [DeviceMessageInfo].
136 ///
137 /// Given a [DeviceMessageInfo] from a [DeviceAdded] or [DeviceList] message,
138 /// creates a ButtplugClientDevice and adds it the internal device map, then
139 /// returns the instance.
140 fn create_client_device(&mut self, info: &DeviceMessageInfoV4) -> ButtplugClientDevice {
141 debug!(
142 "Trying to create a client device from DeviceMessageInfo: {:?}",
143 info
144 );
145 match self.device_map.get(&info.device_index()) {
146 // If the device already exists in our map, clone it.
147 Some(dev) => {
148 debug!("Device already exists, creating clone.");
149 dev.clone()
150 }
151 // If it doesn't, insert it.
152 None => {
153 debug!("Device does not exist, creating new entry.");
154 let device = ButtplugClientDevice::new_from_device_info(info, &self.from_client_sender);
155 self.device_map.insert(info.device_index(), device.clone());
156 device
157 }
158 }
159 }
160
161 fn send_client_event(&mut self, event: ButtplugClientEvent) {
162 trace!("Forwarding event {:?} to client", event);
163
164 if self.to_client_sender.receiver_count() == 0 {
165 error!(
166 "Client event {:?} dropped, no client event listener available.",
167 event
168 );
169 return;
170 }
171
172 self
173 .to_client_sender
174 .send(event)
175 .expect("Already checked for receivers.");
176 }
177
178 fn disconnect_device(&mut self, device_index: u32) {
179 if !self.device_map.contains_key(&device_index) {
180 return;
181 }
182
183 let device = (*self
184 .device_map
185 .get(&device_index)
186 .expect("Checked for device index already."))
187 .clone();
188 device.set_device_connected(false);
189 device.queue_event(ButtplugClientDeviceEvent::DeviceRemoved);
190 // Then remove it from our storage map
191 self.device_map.remove(&device_index);
192 self.send_client_event(ButtplugClientEvent::DeviceRemoved(device));
193 }
194
195 /// Parse device messages from the connector.
196 ///
197 /// Since the event loop maintains the state of all devices reported from the
198 /// server, it will catch [DeviceAdded]/[DeviceList]/[DeviceRemoved] messages
199 /// and update its map accordingly. After that, it will pass the information
200 /// on as a [ButtplugClientEvent] to the [ButtplugClient].
201 async fn parse_connector_message(&mut self, msg: ButtplugServerMessageV4) {
202 if self.sorter.maybe_resolve_result(&msg) {
203 trace!("Message future found, returning");
204 return;
205 }
206 if let Err(e) = msg.is_valid() {
207 error!("Message not valid: {:?} - Error: {}", msg, e);
208 self.send_client_event(ButtplugClientEvent::Error(ButtplugError::from(e)));
209 return;
210 }
211 trace!("Message future not found, assuming server event.");
212 info!("{:?}", msg);
213 match msg {
214 ButtplugServerMessageV4::DeviceList(list) => {
215 trace!("Got device list, devices either added or removed");
216 for dev in list.devices() {
217 if self.device_map.contains_key(&dev.1.device_index()) {
218 continue;
219 }
220 trace!("Device added, updating map and sending to client");
221 let info = dev.1.clone();
222 let device = self.create_client_device(&info);
223 self.send_client_event(ButtplugClientEvent::DeviceAdded(device));
224 }
225 let new_indexes: Vec<u32> = list.devices().iter().map(|x| x.1.device_index()).collect();
226 let disconnected_indexes: Vec<u32> = self
227 .device_map
228 .iter()
229 .filter(|x| !new_indexes.contains(x.key()))
230 .map(|x| *x.key())
231 .collect();
232 for index in disconnected_indexes {
233 trace!("Device removed, updating map and sending to client");
234 self.disconnect_device(index);
235 }
236 }
237 ButtplugServerMessageV4::ScanningFinished(_) => {
238 trace!("Scanning finished event received, forwarding to client.");
239 self.send_client_event(ButtplugClientEvent::ScanningFinished);
240 }
241 ButtplugServerMessageV4::InputReading(msg) => {
242 let device_idx = msg.device_index();
243 if let Some(device) = self.device_map.get(&device_idx) {
244 device
245 .value()
246 .queue_event(ButtplugClientDeviceEvent::Message(
247 ButtplugServerMessageV4::from(msg),
248 ));
249 }
250 }
251 ButtplugServerMessageV4::Error(e) => {
252 self.send_client_event(ButtplugClientEvent::Error(e.into()));
253 }
254 _ => error!("Cannot process message, dropping: {:?}", msg),
255 }
256 }
257
258 /// Send a message from the [ButtplugClient] to the [ButtplugClientConnector].
259 async fn send_message(&mut self, mut msg_fut: ButtplugClientMessageFuturePair) {
260 if let Err(e) = &msg_fut.msg.is_valid() {
261 error!("Message not valid: {:?} - Error: {}", msg_fut.msg, e);
262 msg_fut
263 .waker
264 .set_reply(Err(ButtplugError::from(e.clone()).into()));
265 return;
266 }
267
268 trace!("Sending message to connector: {:?}", msg_fut.msg);
269 self.sorter.register_future(&mut msg_fut);
270 if self.connector.send(msg_fut.msg).await.is_err() {
271 error!("Sending message failed, connector most likely no longer connected.");
272 }
273 }
274
275 /// Parses message types from the client, returning false when disconnect
276 /// happens.
277 ///
278 /// Takes different messages from the client and handles them:
279 ///
280 /// - For outbound messages to the server, sends them to the connector/server.
281 /// - For disconnections, requests connector disconnect
282 /// - For RequestDeviceList, builds a reply out of its own
283 async fn parse_client_request(&mut self, msg: ButtplugClientRequest) -> bool {
284 match msg {
285 ButtplugClientRequest::Message(msg_fut) => {
286 trace!("Sending message through connector: {:?}", msg_fut.msg);
287 self.send_message(msg_fut).await;
288 true
289 }
290 ButtplugClientRequest::Disconnect(state) => {
291 trace!("Client requested disconnect");
292 state.set_reply(self.connector.disconnect().await);
293 false
294 }
295 ButtplugClientRequest::HandleDeviceList(device_list) => {
296 trace!("Device list received, updating map.");
297 for (i, device) in device_list.devices() {
298 if self.device_map.contains_key(i) {
299 continue;
300 }
301 let device = self.create_client_device(device);
302 self.send_client_event(ButtplugClientEvent::DeviceAdded(device));
303 }
304 true
305 }
306 }
307 }
308
309 /// Runs the event loop, returning once either the client or connector drops.
310 pub async fn run(&mut self) {
311 debug!("Running client event loop.");
312 loop {
313 select! {
314 event = self.from_connector_receiver.recv() => match event {
315 None => {
316 info!("Connector disconnected, exiting loop.");
317 break;
318 }
319 Some(msg) => {
320 self.parse_connector_message(msg).await;
321 }
322 },
323 client = self.from_client_receiver.recv() => match client {
324 Err(_) => {
325 info!("Client disconnected, exiting loop.");
326 break;
327 }
328 Ok(msg) => {
329 if !self.parse_client_request(msg).await {
330 break;
331 }
332 }
333 },
334 };
335 }
336 self
337 .device_map
338 .iter()
339 .for_each(|val| val.value().set_client_connected(false));
340
341 let device_indexes: Vec<u32> = self.device_map.iter().map(|k| *k.key()).collect();
342 device_indexes
343 .iter()
344 .for_each(|k| self.disconnect_device(*k));
345 self.connected_status.store(false, Ordering::Relaxed);
346 self.send_client_event(ButtplugClientEvent::ServerDisconnect);
347
348 debug!("Exiting client event loop.");
349 }
350}