Buttplug sex toy control library
1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2024 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! Handling of remote message pairing and future resolution.
9
10use super::{
11 ButtplugClientError,
12 ButtplugClientMessageFuturePair,
13 ButtplugServerMessageStateShared,
14};
15use buttplug_core::message::{ButtplugMessage, ButtplugMessageValidator, ButtplugServerMessageV4};
16use dashmap::DashMap;
17use log::*;
18use std::sync::{
19 Arc,
20 atomic::{AtomicU32, Ordering},
21};
22
23/// Message sorting and pairing for remote client connectors.
24///
25/// In order to create reliable connections to remote systems, we need a way to maintain message
26/// coherence. We expect that whenever a client sends the server a request message, the server will
27/// always send back a response message.
28///
29/// For the [in-process][crate::connector::ButtplugInProcessClientConnector] case, where the client and
30/// server are in the same process, we can simply use execution flow to match the client message and
31/// server response. However, when going over IPC or network, we have to wait to hear back from the
32/// server. To match the outgoing client request message with the incoming server response message
33/// in the remote case, we use the `id` field of [ButtplugMessage]. The client's request message
34/// will have a server response with a matching index. Any message that comes from the server
35/// without an originating client message ([DeviceAdded][crate::core::messages::DeviceAdded],
36/// [Log][crate::core::messages::Log], etc...) will have an `id` of 0 and is considered an *event*,
37/// meaning something happened on the server that was not directly tied to a client request.
38///
39/// The ClientConnectionMessageSorter does two things to facilitate this matching:
40///
41/// - Creates and keeps track of the current message `id`, as a [u32]
42/// - Manages a HashMap of indexes to resolvable futures.
43///
44/// Whenever a remote connector sends a [ButtplugMessage], it first puts it through its
45/// ClientMessageSorter to fill in the message `id`. Similarly, when a [ButtplugMessage] is
46/// received, it comes through the sorter, with one of 3 outcomes:
47///
48/// - If there is a future with matching `id` waiting on a response, it resolves that future using
49/// the incoming message
50/// - If the message `id` is 0, the message is emitted as an *event*.
51/// - If the message `id` is not zero but there is no future waiting, the message is dropped and an
52/// error is emitted.
53///
54pub struct ClientMessageSorter {
55 /// Map of message `id`s to their related future.
56 ///
57 /// This is where we store message `id`s that are waiting for a return from the server. Once we
58 /// get back a response with a matching `id`, we remove the entry from this map, and use the waker
59 /// to complete the future with the received response message.
60 future_map: DashMap<u32, ButtplugServerMessageStateShared>,
61
62 /// Message `id` counter
63 ///
64 /// Every time we add a message to the future_map, we need it to have a unique `id`. We assume
65 /// that unsigned 2^32 will be enough (Buttplug isn't THAT chatty), and use it as a monotonically
66 /// increasing counter for setting `id`s.
67 current_id: Arc<AtomicU32>,
68}
69
70impl ClientMessageSorter {
71 /// Registers a future to be resolved when we receive a response.
72 ///
73 /// Given a message and its related future, set the message's `id`, and match that id with the
74 /// future to be resolved when we get a response back.
75 pub fn register_future(&self, msg_fut: &mut ButtplugClientMessageFuturePair) {
76 let id = self.current_id.load(Ordering::Relaxed);
77 trace!("Setting message id to {}", id);
78 msg_fut.msg.set_id(id);
79 self.future_map.insert(id, msg_fut.waker.clone());
80 self.current_id.store(id + 1, Ordering::Relaxed);
81 }
82
83 /// Given a response message from the server, resolve related future if we have one.
84 ///
85 /// Returns true if the response message was resolved to a future via matching `id`, otherwise
86 /// returns false. False returns mean the message should be considered as an *event*.
87 pub fn maybe_resolve_result(&self, msg: &ButtplugServerMessageV4) -> bool {
88 trace!("{:?}", msg);
89 let id = msg.id();
90 trace!("Trying to resolve message future for id {}.", id);
91 match self.future_map.remove(&id) {
92 Some((_, state)) => {
93 trace!("Resolved id {} to a future.", id);
94 if let Err(e) = msg.is_valid() {
95 error!("Message not valid: {:?} - Error: {}", msg, e);
96 state.set_reply(Err(ButtplugClientError::ButtplugError(e.into())));
97 } else if let ButtplugServerMessageV4::Error(e) = msg {
98 state.set_reply(Err(e.original_error().into()))
99 } else {
100 state.set_reply(Ok(msg.clone()))
101 }
102 true
103 }
104 None => {
105 trace!("Message id {} not found, considering it an event.", id);
106 false
107 }
108 }
109 }
110}
111
112impl Default for ClientMessageSorter {
113 /// Create a default implementation of the ClientConnectorMessageSorter
114 ///
115 /// Sets the current_id to 1, since as a client we can't send message `id` of 0 (0 is reserved for
116 /// system incoming messages).
117 fn default() -> Self {
118 Self {
119 future_map: DashMap::new(),
120 current_id: Arc::new(AtomicU32::new(1)),
121 }
122 }
123}