Buttplug sex toy control library
at dev 13 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8//! Implementation of internal Buttplug Client event loop. 9 10use super::{ 11 ButtplugClientEvent, 12 ButtplugClientMessageFuturePair, 13 ButtplugClientMessageSender, 14 client_message_sorter::ClientMessageSorter, 15 device::{ButtplugClientDevice, ButtplugClientDeviceEvent}, 16}; 17use buttplug_core::{ 18 connector::{ButtplugConnector, ButtplugConnectorStateShared}, 19 errors::ButtplugError, 20 message::{ 21 ButtplugClientMessageV4, 22 ButtplugDeviceMessage, 23 ButtplugMessageValidator, 24 ButtplugServerMessageV4, 25 DeviceListV4, 26 DeviceMessageInfoV4, 27 }, 28}; 29use dashmap::DashMap; 30use log::*; 31use std::sync::{ 32 Arc, 33 atomic::{AtomicBool, Ordering}, 34}; 35use tokio::{ 36 select, 37 sync::{broadcast, mpsc}, 38}; 39 40/// Enum used for communication from the client to the event loop. 41#[derive(Clone)] 42pub enum ButtplugClientRequest { 43 /// Client request to disconnect, via already sent connector instance. 44 Disconnect(ButtplugConnectorStateShared), 45 /// Given a DeviceList message, update the inner loop values and create 46 /// events for additions. 47 HandleDeviceList(DeviceListV4), 48 /// Client request to send a message via the connector. 49 /// 50 /// Bundled future should have reply set and waker called when this is 51 /// finished. 52 Message(ButtplugClientMessageFuturePair), 53} 54 55/// Event loop for running [ButtplugClient] connections. 56/// 57/// Acts as a hub for communication between the connector and [ButtplugClient] 58/// instances. 59/// 60/// Created whenever a new [super::ButtplugClient] is created, the internal loop 61/// handles connection and communication with the server through the connector, 62/// and creation of events received from the server. 63/// 64/// The event_loop does a few different things during its lifetime: 65/// 66/// - It will listen for events from the connector, or messages from the client, 67/// routing them to their proper receivers until either server/client 68/// disconnects. 69/// 70/// - On disconnect, it will tear down, and cannot be used again. All clients 71/// and devices associated with the loop will be invalidated, and connect must 72/// be called on the client again (or a new client should be created). 73/// 74/// # Why an event loop? 75/// 76/// Due to the async nature of Buttplug, we many channels routed to many 77/// different tasks. However, all of those tasks will refer to the same event 78/// loop. This allows us to coordinate and centralize our information while 79/// keeping the API async. 80/// 81/// Note that no async call here should block. Any .await should only be on 82/// async channels, and those channels should never have backpressure. We hope. 83pub(super) struct ButtplugClientEventLoop<ConnectorType> 84where 85 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static, 86{ 87 /// Connected status from client, managed by the event loop in case of disconnect. 88 connected_status: Arc<AtomicBool>, 89 /// Connector the event loop will use to communicate with the [ButtplugServer] 90 connector: ConnectorType, 91 /// Receiver for messages send from the [ButtplugServer] via the connector. 92 from_connector_receiver: mpsc::Receiver<ButtplugServerMessageV4>, 93 /// Map of devices shared between the client and the event loop 94 device_map: Arc<DashMap<u32, ButtplugClientDevice>>, 95 /// Sends events to the [ButtplugClient] instance. 96 to_client_sender: broadcast::Sender<ButtplugClientEvent>, 97 /// Sends events to the client receiver. Stored here so it can be handed to 98 /// new ButtplugClientDevice instances. 99 from_client_sender: ButtplugClientMessageSender, 100 /// Receives incoming messages from client instances. 101 from_client_receiver: broadcast::Receiver<ButtplugClientRequest>, 102 sorter: ClientMessageSorter, 103} 104 105impl<ConnectorType> ButtplugClientEventLoop<ConnectorType> 106where 107 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static, 108{ 109 /// Creates a new [ButtplugClientEventLoop]. 110 /// 111 /// Given the [ButtplugClientConnector] object, as well as the channels used 112 /// for communicating with the client, creates an event loop structure and 113 /// returns it. 114 pub fn new( 115 connected_status: Arc<AtomicBool>, 116 connector: ConnectorType, 117 from_connector_receiver: mpsc::Receiver<ButtplugServerMessageV4>, 118 to_client_sender: broadcast::Sender<ButtplugClientEvent>, 119 from_client_sender: ButtplugClientMessageSender, 120 device_map: Arc<DashMap<u32, ButtplugClientDevice>>, 121 ) -> Self { 122 trace!("Creating ButtplugClientEventLoop instance."); 123 Self { 124 connected_status, 125 device_map, 126 from_client_receiver: from_client_sender.subscribe(), 127 from_client_sender, 128 to_client_sender, 129 from_connector_receiver, 130 connector, 131 sorter: ClientMessageSorter::default(), 132 } 133 } 134 135 /// Creates a [ButtplugClientDevice] from [DeviceMessageInfo]. 136 /// 137 /// Given a [DeviceMessageInfo] from a [DeviceAdded] or [DeviceList] message, 138 /// creates a ButtplugClientDevice and adds it the internal device map, then 139 /// returns the instance. 140 fn create_client_device(&mut self, info: &DeviceMessageInfoV4) -> ButtplugClientDevice { 141 debug!( 142 "Trying to create a client device from DeviceMessageInfo: {:?}", 143 info 144 ); 145 match self.device_map.get(&info.device_index()) { 146 // If the device already exists in our map, clone it. 147 Some(dev) => { 148 debug!("Device already exists, creating clone."); 149 dev.clone() 150 } 151 // If it doesn't, insert it. 152 None => { 153 debug!("Device does not exist, creating new entry."); 154 let device = ButtplugClientDevice::new_from_device_info(info, &self.from_client_sender); 155 self.device_map.insert(info.device_index(), device.clone()); 156 device 157 } 158 } 159 } 160 161 fn send_client_event(&mut self, event: ButtplugClientEvent) { 162 trace!("Forwarding event {:?} to client", event); 163 164 if self.to_client_sender.receiver_count() == 0 { 165 error!( 166 "Client event {:?} dropped, no client event listener available.", 167 event 168 ); 169 return; 170 } 171 172 self 173 .to_client_sender 174 .send(event) 175 .expect("Already checked for receivers."); 176 } 177 178 fn disconnect_device(&mut self, device_index: u32) { 179 if !self.device_map.contains_key(&device_index) { 180 return; 181 } 182 183 let device = (*self 184 .device_map 185 .get(&device_index) 186 .expect("Checked for device index already.")) 187 .clone(); 188 device.set_device_connected(false); 189 device.queue_event(ButtplugClientDeviceEvent::DeviceRemoved); 190 // Then remove it from our storage map 191 self.device_map.remove(&device_index); 192 self.send_client_event(ButtplugClientEvent::DeviceRemoved(device)); 193 } 194 195 /// Parse device messages from the connector. 196 /// 197 /// Since the event loop maintains the state of all devices reported from the 198 /// server, it will catch [DeviceAdded]/[DeviceList]/[DeviceRemoved] messages 199 /// and update its map accordingly. After that, it will pass the information 200 /// on as a [ButtplugClientEvent] to the [ButtplugClient]. 201 async fn parse_connector_message(&mut self, msg: ButtplugServerMessageV4) { 202 if self.sorter.maybe_resolve_result(&msg) { 203 trace!("Message future found, returning"); 204 return; 205 } 206 if let Err(e) = msg.is_valid() { 207 error!("Message not valid: {:?} - Error: {}", msg, e); 208 self.send_client_event(ButtplugClientEvent::Error(ButtplugError::from(e))); 209 return; 210 } 211 trace!("Message future not found, assuming server event."); 212 info!("{:?}", msg); 213 match msg { 214 ButtplugServerMessageV4::DeviceList(list) => { 215 trace!("Got device list, devices either added or removed"); 216 for dev in list.devices() { 217 if self.device_map.contains_key(&dev.1.device_index()) { 218 continue; 219 } 220 trace!("Device added, updating map and sending to client"); 221 let info = dev.1.clone(); 222 let device = self.create_client_device(&info); 223 self.send_client_event(ButtplugClientEvent::DeviceAdded(device)); 224 } 225 let new_indexes: Vec<u32> = list.devices().iter().map(|x| x.1.device_index()).collect(); 226 let disconnected_indexes: Vec<u32> = self 227 .device_map 228 .iter() 229 .filter(|x| !new_indexes.contains(x.key())) 230 .map(|x| *x.key()) 231 .collect(); 232 for index in disconnected_indexes { 233 trace!("Device removed, updating map and sending to client"); 234 self.disconnect_device(index); 235 } 236 } 237 ButtplugServerMessageV4::ScanningFinished(_) => { 238 trace!("Scanning finished event received, forwarding to client."); 239 self.send_client_event(ButtplugClientEvent::ScanningFinished); 240 } 241 ButtplugServerMessageV4::InputReading(msg) => { 242 let device_idx = msg.device_index(); 243 if let Some(device) = self.device_map.get(&device_idx) { 244 device 245 .value() 246 .queue_event(ButtplugClientDeviceEvent::Message( 247 ButtplugServerMessageV4::from(msg), 248 )); 249 } 250 } 251 ButtplugServerMessageV4::Error(e) => { 252 self.send_client_event(ButtplugClientEvent::Error(e.into())); 253 } 254 _ => error!("Cannot process message, dropping: {:?}", msg), 255 } 256 } 257 258 /// Send a message from the [ButtplugClient] to the [ButtplugClientConnector]. 259 async fn send_message(&mut self, mut msg_fut: ButtplugClientMessageFuturePair) { 260 if let Err(e) = &msg_fut.msg.is_valid() { 261 error!("Message not valid: {:?} - Error: {}", msg_fut.msg, e); 262 msg_fut 263 .waker 264 .set_reply(Err(ButtplugError::from(e.clone()).into())); 265 return; 266 } 267 268 trace!("Sending message to connector: {:?}", msg_fut.msg); 269 self.sorter.register_future(&mut msg_fut); 270 if self.connector.send(msg_fut.msg).await.is_err() { 271 error!("Sending message failed, connector most likely no longer connected."); 272 } 273 } 274 275 /// Parses message types from the client, returning false when disconnect 276 /// happens. 277 /// 278 /// Takes different messages from the client and handles them: 279 /// 280 /// - For outbound messages to the server, sends them to the connector/server. 281 /// - For disconnections, requests connector disconnect 282 /// - For RequestDeviceList, builds a reply out of its own 283 async fn parse_client_request(&mut self, msg: ButtplugClientRequest) -> bool { 284 match msg { 285 ButtplugClientRequest::Message(msg_fut) => { 286 trace!("Sending message through connector: {:?}", msg_fut.msg); 287 self.send_message(msg_fut).await; 288 true 289 } 290 ButtplugClientRequest::Disconnect(state) => { 291 trace!("Client requested disconnect"); 292 state.set_reply(self.connector.disconnect().await); 293 false 294 } 295 ButtplugClientRequest::HandleDeviceList(device_list) => { 296 trace!("Device list received, updating map."); 297 for (i, device) in device_list.devices() { 298 if self.device_map.contains_key(i) { 299 continue; 300 } 301 let device = self.create_client_device(device); 302 self.send_client_event(ButtplugClientEvent::DeviceAdded(device)); 303 } 304 true 305 } 306 } 307 } 308 309 /// Runs the event loop, returning once either the client or connector drops. 310 pub async fn run(&mut self) { 311 debug!("Running client event loop."); 312 loop { 313 select! { 314 event = self.from_connector_receiver.recv() => match event { 315 None => { 316 info!("Connector disconnected, exiting loop."); 317 break; 318 } 319 Some(msg) => { 320 self.parse_connector_message(msg).await; 321 } 322 }, 323 client = self.from_client_receiver.recv() => match client { 324 Err(_) => { 325 info!("Client disconnected, exiting loop."); 326 break; 327 } 328 Ok(msg) => { 329 if !self.parse_client_request(msg).await { 330 break; 331 } 332 } 333 }, 334 }; 335 } 336 self 337 .device_map 338 .iter() 339 .for_each(|val| val.value().set_client_connected(false)); 340 341 let device_indexes: Vec<u32> = self.device_map.iter().map(|k| *k.key()).collect(); 342 device_indexes 343 .iter() 344 .for_each(|k| self.disconnect_device(*k)); 345 self.connected_status.store(false, Ordering::Relaxed); 346 self.send_client_event(ButtplugClientEvent::ServerDisconnect); 347 348 debug!("Exiting client event loop."); 349 } 350}