Buttplug sex toy control library
at dev 5.4 kB view raw
1// Buttplug Rust Source Code File - See https://buttplug.io for more info. 2// 3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved. 4// 5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root 6// for full license information. 7 8//! Handling of remote message pairing and future resolution. 9 10use super::{ 11 ButtplugClientError, 12 ButtplugClientMessageFuturePair, 13 ButtplugServerMessageStateShared, 14}; 15use buttplug_core::message::{ButtplugMessage, ButtplugMessageValidator, ButtplugServerMessageV4}; 16use dashmap::DashMap; 17use log::*; 18use std::sync::{ 19 Arc, 20 atomic::{AtomicU32, Ordering}, 21}; 22 23/// Message sorting and pairing for remote client connectors. 24/// 25/// In order to create reliable connections to remote systems, we need a way to maintain message 26/// coherence. We expect that whenever a client sends the server a request message, the server will 27/// always send back a response message. 28/// 29/// For the [in-process][crate::connector::ButtplugInProcessClientConnector] case, where the client and 30/// server are in the same process, we can simply use execution flow to match the client message and 31/// server response. However, when going over IPC or network, we have to wait to hear back from the 32/// server. To match the outgoing client request message with the incoming server response message 33/// in the remote case, we use the `id` field of [ButtplugMessage]. The client's request message 34/// will have a server response with a matching index. Any message that comes from the server 35/// without an originating client message ([DeviceAdded][crate::core::messages::DeviceAdded], 36/// [Log][crate::core::messages::Log], etc...) will have an `id` of 0 and is considered an *event*, 37/// meaning something happened on the server that was not directly tied to a client request. 38/// 39/// The ClientConnectionMessageSorter does two things to facilitate this matching: 40/// 41/// - Creates and keeps track of the current message `id`, as a [u32] 42/// - Manages a HashMap of indexes to resolvable futures. 43/// 44/// Whenever a remote connector sends a [ButtplugMessage], it first puts it through its 45/// ClientMessageSorter to fill in the message `id`. Similarly, when a [ButtplugMessage] is 46/// received, it comes through the sorter, with one of 3 outcomes: 47/// 48/// - If there is a future with matching `id` waiting on a response, it resolves that future using 49/// the incoming message 50/// - If the message `id` is 0, the message is emitted as an *event*. 51/// - If the message `id` is not zero but there is no future waiting, the message is dropped and an 52/// error is emitted. 53/// 54pub struct ClientMessageSorter { 55 /// Map of message `id`s to their related future. 56 /// 57 /// This is where we store message `id`s that are waiting for a return from the server. Once we 58 /// get back a response with a matching `id`, we remove the entry from this map, and use the waker 59 /// to complete the future with the received response message. 60 future_map: DashMap<u32, ButtplugServerMessageStateShared>, 61 62 /// Message `id` counter 63 /// 64 /// Every time we add a message to the future_map, we need it to have a unique `id`. We assume 65 /// that unsigned 2^32 will be enough (Buttplug isn't THAT chatty), and use it as a monotonically 66 /// increasing counter for setting `id`s. 67 current_id: Arc<AtomicU32>, 68} 69 70impl ClientMessageSorter { 71 /// Registers a future to be resolved when we receive a response. 72 /// 73 /// Given a message and its related future, set the message's `id`, and match that id with the 74 /// future to be resolved when we get a response back. 75 pub fn register_future(&self, msg_fut: &mut ButtplugClientMessageFuturePair) { 76 let id = self.current_id.load(Ordering::Relaxed); 77 trace!("Setting message id to {}", id); 78 msg_fut.msg.set_id(id); 79 self.future_map.insert(id, msg_fut.waker.clone()); 80 self.current_id.store(id + 1, Ordering::Relaxed); 81 } 82 83 /// Given a response message from the server, resolve related future if we have one. 84 /// 85 /// Returns true if the response message was resolved to a future via matching `id`, otherwise 86 /// returns false. False returns mean the message should be considered as an *event*. 87 pub fn maybe_resolve_result(&self, msg: &ButtplugServerMessageV4) -> bool { 88 trace!("{:?}", msg); 89 let id = msg.id(); 90 trace!("Trying to resolve message future for id {}.", id); 91 match self.future_map.remove(&id) { 92 Some((_, state)) => { 93 trace!("Resolved id {} to a future.", id); 94 if let Err(e) = msg.is_valid() { 95 error!("Message not valid: {:?} - Error: {}", msg, e); 96 state.set_reply(Err(ButtplugClientError::ButtplugError(e.into()))); 97 } else if let ButtplugServerMessageV4::Error(e) = msg { 98 state.set_reply(Err(e.original_error().into())) 99 } else { 100 state.set_reply(Ok(msg.clone())) 101 } 102 true 103 } 104 None => { 105 trace!("Message id {} not found, considering it an event.", id); 106 false 107 } 108 } 109 } 110} 111 112impl Default for ClientMessageSorter { 113 /// Create a default implementation of the ClientConnectorMessageSorter 114 /// 115 /// Sets the current_id to 1, since as a client we can't send message `id` of 0 (0 is reserved for 116 /// system incoming messages). 117 fn default() -> Self { 118 Self { 119 future_map: DashMap::new(), 120 current_id: Arc::new(AtomicU32::new(1)), 121 } 122 } 123}