Buttplug sex toy control library
1// Copyright 2020 Shift Cryptosecurity AG
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::prelude::*;
16use futures::task::SpawnError;
17use hidapi::{HidDevice, HidError};
18use std::io;
19use std::pin::Pin;
20use std::sync::mpsc;
21use std::sync::{Arc, Mutex};
22use std::task::{Context, Poll, Waker};
23use thiserror::Error;
24
25#[derive(Error, Debug)]
26pub enum HidAsyncDeviceError {
27 #[error("libhid failed")]
28 HidApi(#[from] HidError),
29 #[error("io failed")]
30 Io(#[from] io::Error),
31 #[error("spawn failed")]
32 Spawn(#[from] SpawnError),
33}
34
35enum ReadState {
36 Idle,
37 Busy,
38}
39
40struct DeviceInner {
41 device: Arc<Mutex<HidDevice>>,
42 read_thread: Option<std::thread::JoinHandle<()>>,
43 rstate: ReadState,
44 data_rx: mpsc::Receiver<Option<[u8; 64]>>, // One message per read
45 req_tx: Option<mpsc::Sender<Waker>>, // One message per expected read
46 buffer: Option<[u8; 64]>,
47 buffer_pos: usize,
48}
49
50pub struct HidAsyncDevice {
51 // store an Option so that `close` works
52 inner: Option<Arc<Mutex<DeviceInner>>>,
53}
54
55impl Clone for HidAsyncDevice {
56 fn clone(&self) -> Self {
57 Self {
58 inner: self.inner.as_ref().map(Arc::clone),
59 }
60 }
61}
62
63impl Drop for HidAsyncDevice {
64 fn drop(&mut self) {
65 //debug!("dropping hid connection");
66 if let Some(inner) = self.inner.take() {
67 if let Ok(mut guard) = inner.lock() {
68 // Take the waker queue and drop it so that the reader thread finihes
69 let req_tx = guard.req_tx.take();
70 drop(req_tx);
71
72 // Wait for the reader thread to finish
73 if let Some(jh) = guard.read_thread.take()
74 && jh.join().is_ok()
75 {
76 info!("device read thread joined")
77 }
78 } else {
79 //error!("Failed to take lock on device");
80 }
81 } else {
82 //error!("there was no inner");
83 }
84 }
85}
86
87impl HidAsyncDevice {
88 pub fn new(device: HidDevice) -> Result<Self, HidAsyncDeviceError> {
89 let (data_tx, data_rx) = mpsc::channel();
90 let (req_tx, req_rx) = mpsc::channel::<Waker>();
91 // set non-blocking so that we can ignore spurious wakeups.
92 //device.set_blocking_mode(false);
93 // Must be accessed from both inner thread and asyn_write
94 let device = Arc::new(Mutex::new(device));
95 let jh = std::thread::spawn({
96 let device = Arc::clone(&device);
97 move || {
98 loop {
99 // Wait for read request
100 //debug!("waiting for request");
101 let waker = match req_rx.recv() {
102 Ok(waker) => waker,
103 Err(_e) => {
104 info!("No more wakers, shutting down");
105 return;
106 }
107 };
108 //debug!("Got notified");
109 match device.lock() {
110 Ok(guard) => {
111 let mut buf = [0u8; 64];
112 //match guard.read_timeout(&mut buf[..], 1000) {
113 match guard.read(&mut buf[..]) {
114 Err(_) => {
115 //error!("hidapi failed: {}", e);
116 drop(data_tx);
117 waker.wake_by_ref();
118 break;
119 }
120 Ok(len) => {
121 if len == 0 {
122 data_tx.send(None).unwrap();
123 waker.wake_by_ref();
124 continue;
125 }
126 //debug!("Read data");
127 if data_tx.send(Some(buf)).is_err() {
128 //error!("Sending internally: {}", e);
129 break;
130 }
131 waker.wake_by_ref();
132 }
133 }
134 }
135 Err(_) => {
136 //error!("Broken lock: {:?}", e);
137 return;
138 }
139 }
140 }
141 }
142 });
143 Ok(Self {
144 inner: Some(Arc::new(Mutex::new(DeviceInner {
145 device,
146 read_thread: Some(jh),
147 rstate: ReadState::Idle,
148 data_rx,
149 req_tx: Some(req_tx),
150 buffer: None,
151 buffer_pos: 0,
152 }))),
153 })
154 }
155}
156
157impl AsyncWrite for HidAsyncDevice {
158 fn poll_write(
159 mut self: Pin<&mut Self>,
160 _cx: &mut Context,
161 mut buf: &[u8],
162 ) -> Poll<Result<usize, io::Error>> {
163 let len = buf.len();
164 if self.inner.is_none() {
165 return Poll::Ready(Err(io::Error::new(
166 io::ErrorKind::InvalidData,
167 "Cannot poll a closed device",
168 )));
169 }
170 loop {
171 let max_len = usize::min(64, buf.len());
172 // The hidapi API requires that you put the report ID in the first byte.
173 // If you don't use report IDs you must put a 0 there.
174 //let mut buf_with_report_id = [0u8; 1 + 64];
175 //(&mut buf_with_report_id[1..1 + max_len]).copy_from_slice(&buf[..max_len]);
176
177 //let this: &mut Self = &mut self;
178 //debug!("Will write {} bytes: {:?}", buf.len(), &buf[..]);
179 match self.inner.as_mut().unwrap().lock() {
180 Ok(guard) => {
181 if let Ok(guard) = guard.device.lock() {
182 guard
183 .write(buf)
184 .map_err(|e| io::Error::other(format!("hidapi failed: {e}")))?;
185 //debug!("Wrote: {:?}", &buf[0..max_len]);
186 }
187 }
188 Err(e) => return Poll::Ready(Err(io::Error::other(format!("Mutex broken: {e:?}")))),
189 }
190 buf = &buf[max_len..];
191 if buf.is_empty() {
192 //debug!("Wrote total {}: {:?}", buf.len(), buf);
193 return Poll::Ready(Ok(len));
194 }
195 }
196 }
197 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>> {
198 Poll::Ready(Ok(()))
199 }
200 // TODO cleanup read thread...
201 fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>> {
202 let this: &mut Self = &mut self;
203 // take the device and drop it
204 let _device = this.inner.take();
205 Poll::Ready(Ok(()))
206 }
207}
208
209// Will always read out 64 bytes. Make sure to read out all bytes to avoid trailing bytes in next
210// readout.
211// Will store all bytes that did not fit in provided buffer and give them next time.
212impl AsyncRead for HidAsyncDevice {
213 fn poll_read(
214 mut self: Pin<&mut Self>,
215 cx: &mut Context,
216 buf: &mut [u8],
217 ) -> Poll<Result<usize, io::Error>> {
218 if self.inner.is_none() {
219 return Poll::Ready(Err(io::Error::new(
220 io::ErrorKind::InvalidData,
221 "Cannot poll a closed device",
222 )));
223 }
224 let mut this = self
225 .inner
226 .as_mut()
227 .unwrap()
228 .lock()
229 .map_err(|e| io::Error::other(format!("Mutex broken: {e:?}")))?;
230 loop {
231 let waker = cx.waker().clone();
232 match this.rstate {
233 ReadState::Idle => {
234 //debug!("Sending waker");
235 if let Some(req_tx) = &mut this.req_tx {
236 if let Err(_e) = req_tx.send(waker) {
237 //error!("failed to send waker");
238 }
239 } else {
240 return Poll::Ready(Err(io::Error::new(
241 io::ErrorKind::InvalidData,
242 "Failed internal send",
243 )));
244 }
245 this.rstate = ReadState::Busy;
246 }
247 ReadState::Busy => {
248 // First send any bytes from the previous readout
249 if let Some(inner_buf) = this.buffer.take() {
250 let len = usize::min(buf.len(), inner_buf.len());
251 let inner_slice = &inner_buf[this.buffer_pos..this.buffer_pos + len];
252 let buf_slice = &mut buf[..len];
253 buf_slice.copy_from_slice(inner_slice);
254 // Check if there is more data left
255 if this.buffer_pos + inner_slice.len() < inner_buf.len() {
256 this.buffer = Some(inner_buf);
257 this.buffer_pos += inner_slice.len();
258 } else {
259 this.rstate = ReadState::Idle;
260 }
261 return Poll::Ready(Ok(len));
262 }
263
264 // Second try to receive more bytes
265 let vec = match this.data_rx.try_recv() {
266 Ok(Some(vec)) => vec,
267 Ok(None) => {
268 // end of stream?
269 return Poll::Pending;
270 }
271 Err(e) => match e {
272 mpsc::TryRecvError::Disconnected => {
273 return Poll::Ready(Err(io::Error::other("Inner channel dead")));
274 }
275 mpsc::TryRecvError::Empty => {
276 return Poll::Pending;
277 }
278 },
279 };
280 //debug!("Read data {:?}", &vec[..]);
281 let len = usize::min(vec.len(), buf.len());
282 let buf_slice = &mut buf[..len];
283 let vec_slice = &vec[..len];
284 buf_slice.copy_from_slice(vec_slice);
285 if len < vec.len() {
286 // If bytes did not fit in buf, store bytes for next readout
287 this.buffer = Some(vec);
288 this.buffer_pos = 0;
289 } else {
290 this.rstate = ReadState::Idle;
291 }
292 //debug!("returning {}", len);
293 return Poll::Ready(Ok(len));
294 }
295 };
296 }
297 }
298}