Buttplug sex toy control library
at dev 298 lines 9.3 kB view raw
1// Copyright 2020 Shift Cryptosecurity AG 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15use futures::prelude::*; 16use futures::task::SpawnError; 17use hidapi::{HidDevice, HidError}; 18use std::io; 19use std::pin::Pin; 20use std::sync::mpsc; 21use std::sync::{Arc, Mutex}; 22use std::task::{Context, Poll, Waker}; 23use thiserror::Error; 24 25#[derive(Error, Debug)] 26pub enum HidAsyncDeviceError { 27 #[error("libhid failed")] 28 HidApi(#[from] HidError), 29 #[error("io failed")] 30 Io(#[from] io::Error), 31 #[error("spawn failed")] 32 Spawn(#[from] SpawnError), 33} 34 35enum ReadState { 36 Idle, 37 Busy, 38} 39 40struct DeviceInner { 41 device: Arc<Mutex<HidDevice>>, 42 read_thread: Option<std::thread::JoinHandle<()>>, 43 rstate: ReadState, 44 data_rx: mpsc::Receiver<Option<[u8; 64]>>, // One message per read 45 req_tx: Option<mpsc::Sender<Waker>>, // One message per expected read 46 buffer: Option<[u8; 64]>, 47 buffer_pos: usize, 48} 49 50pub struct HidAsyncDevice { 51 // store an Option so that `close` works 52 inner: Option<Arc<Mutex<DeviceInner>>>, 53} 54 55impl Clone for HidAsyncDevice { 56 fn clone(&self) -> Self { 57 Self { 58 inner: self.inner.as_ref().map(Arc::clone), 59 } 60 } 61} 62 63impl Drop for HidAsyncDevice { 64 fn drop(&mut self) { 65 //debug!("dropping hid connection"); 66 if let Some(inner) = self.inner.take() { 67 if let Ok(mut guard) = inner.lock() { 68 // Take the waker queue and drop it so that the reader thread finihes 69 let req_tx = guard.req_tx.take(); 70 drop(req_tx); 71 72 // Wait for the reader thread to finish 73 if let Some(jh) = guard.read_thread.take() 74 && jh.join().is_ok() 75 { 76 info!("device read thread joined") 77 } 78 } else { 79 //error!("Failed to take lock on device"); 80 } 81 } else { 82 //error!("there was no inner"); 83 } 84 } 85} 86 87impl HidAsyncDevice { 88 pub fn new(device: HidDevice) -> Result<Self, HidAsyncDeviceError> { 89 let (data_tx, data_rx) = mpsc::channel(); 90 let (req_tx, req_rx) = mpsc::channel::<Waker>(); 91 // set non-blocking so that we can ignore spurious wakeups. 92 //device.set_blocking_mode(false); 93 // Must be accessed from both inner thread and asyn_write 94 let device = Arc::new(Mutex::new(device)); 95 let jh = std::thread::spawn({ 96 let device = Arc::clone(&device); 97 move || { 98 loop { 99 // Wait for read request 100 //debug!("waiting for request"); 101 let waker = match req_rx.recv() { 102 Ok(waker) => waker, 103 Err(_e) => { 104 info!("No more wakers, shutting down"); 105 return; 106 } 107 }; 108 //debug!("Got notified"); 109 match device.lock() { 110 Ok(guard) => { 111 let mut buf = [0u8; 64]; 112 //match guard.read_timeout(&mut buf[..], 1000) { 113 match guard.read(&mut buf[..]) { 114 Err(_) => { 115 //error!("hidapi failed: {}", e); 116 drop(data_tx); 117 waker.wake_by_ref(); 118 break; 119 } 120 Ok(len) => { 121 if len == 0 { 122 data_tx.send(None).unwrap(); 123 waker.wake_by_ref(); 124 continue; 125 } 126 //debug!("Read data"); 127 if data_tx.send(Some(buf)).is_err() { 128 //error!("Sending internally: {}", e); 129 break; 130 } 131 waker.wake_by_ref(); 132 } 133 } 134 } 135 Err(_) => { 136 //error!("Broken lock: {:?}", e); 137 return; 138 } 139 } 140 } 141 } 142 }); 143 Ok(Self { 144 inner: Some(Arc::new(Mutex::new(DeviceInner { 145 device, 146 read_thread: Some(jh), 147 rstate: ReadState::Idle, 148 data_rx, 149 req_tx: Some(req_tx), 150 buffer: None, 151 buffer_pos: 0, 152 }))), 153 }) 154 } 155} 156 157impl AsyncWrite for HidAsyncDevice { 158 fn poll_write( 159 mut self: Pin<&mut Self>, 160 _cx: &mut Context, 161 mut buf: &[u8], 162 ) -> Poll<Result<usize, io::Error>> { 163 let len = buf.len(); 164 if self.inner.is_none() { 165 return Poll::Ready(Err(io::Error::new( 166 io::ErrorKind::InvalidData, 167 "Cannot poll a closed device", 168 ))); 169 } 170 loop { 171 let max_len = usize::min(64, buf.len()); 172 // The hidapi API requires that you put the report ID in the first byte. 173 // If you don't use report IDs you must put a 0 there. 174 //let mut buf_with_report_id = [0u8; 1 + 64]; 175 //(&mut buf_with_report_id[1..1 + max_len]).copy_from_slice(&buf[..max_len]); 176 177 //let this: &mut Self = &mut self; 178 //debug!("Will write {} bytes: {:?}", buf.len(), &buf[..]); 179 match self.inner.as_mut().unwrap().lock() { 180 Ok(guard) => { 181 if let Ok(guard) = guard.device.lock() { 182 guard 183 .write(buf) 184 .map_err(|e| io::Error::other(format!("hidapi failed: {e}")))?; 185 //debug!("Wrote: {:?}", &buf[0..max_len]); 186 } 187 } 188 Err(e) => return Poll::Ready(Err(io::Error::other(format!("Mutex broken: {e:?}")))), 189 } 190 buf = &buf[max_len..]; 191 if buf.is_empty() { 192 //debug!("Wrote total {}: {:?}", buf.len(), buf); 193 return Poll::Ready(Ok(len)); 194 } 195 } 196 } 197 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>> { 198 Poll::Ready(Ok(())) 199 } 200 // TODO cleanup read thread... 201 fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>> { 202 let this: &mut Self = &mut self; 203 // take the device and drop it 204 let _device = this.inner.take(); 205 Poll::Ready(Ok(())) 206 } 207} 208 209// Will always read out 64 bytes. Make sure to read out all bytes to avoid trailing bytes in next 210// readout. 211// Will store all bytes that did not fit in provided buffer and give them next time. 212impl AsyncRead for HidAsyncDevice { 213 fn poll_read( 214 mut self: Pin<&mut Self>, 215 cx: &mut Context, 216 buf: &mut [u8], 217 ) -> Poll<Result<usize, io::Error>> { 218 if self.inner.is_none() { 219 return Poll::Ready(Err(io::Error::new( 220 io::ErrorKind::InvalidData, 221 "Cannot poll a closed device", 222 ))); 223 } 224 let mut this = self 225 .inner 226 .as_mut() 227 .unwrap() 228 .lock() 229 .map_err(|e| io::Error::other(format!("Mutex broken: {e:?}")))?; 230 loop { 231 let waker = cx.waker().clone(); 232 match this.rstate { 233 ReadState::Idle => { 234 //debug!("Sending waker"); 235 if let Some(req_tx) = &mut this.req_tx { 236 if let Err(_e) = req_tx.send(waker) { 237 //error!("failed to send waker"); 238 } 239 } else { 240 return Poll::Ready(Err(io::Error::new( 241 io::ErrorKind::InvalidData, 242 "Failed internal send", 243 ))); 244 } 245 this.rstate = ReadState::Busy; 246 } 247 ReadState::Busy => { 248 // First send any bytes from the previous readout 249 if let Some(inner_buf) = this.buffer.take() { 250 let len = usize::min(buf.len(), inner_buf.len()); 251 let inner_slice = &inner_buf[this.buffer_pos..this.buffer_pos + len]; 252 let buf_slice = &mut buf[..len]; 253 buf_slice.copy_from_slice(inner_slice); 254 // Check if there is more data left 255 if this.buffer_pos + inner_slice.len() < inner_buf.len() { 256 this.buffer = Some(inner_buf); 257 this.buffer_pos += inner_slice.len(); 258 } else { 259 this.rstate = ReadState::Idle; 260 } 261 return Poll::Ready(Ok(len)); 262 } 263 264 // Second try to receive more bytes 265 let vec = match this.data_rx.try_recv() { 266 Ok(Some(vec)) => vec, 267 Ok(None) => { 268 // end of stream? 269 return Poll::Pending; 270 } 271 Err(e) => match e { 272 mpsc::TryRecvError::Disconnected => { 273 return Poll::Ready(Err(io::Error::other("Inner channel dead"))); 274 } 275 mpsc::TryRecvError::Empty => { 276 return Poll::Pending; 277 } 278 }, 279 }; 280 //debug!("Read data {:?}", &vec[..]); 281 let len = usize::min(vec.len(), buf.len()); 282 let buf_slice = &mut buf[..len]; 283 let vec_slice = &vec[..len]; 284 buf_slice.copy_from_slice(vec_slice); 285 if len < vec.len() { 286 // If bytes did not fit in buf, store bytes for next readout 287 this.buffer = Some(vec); 288 this.buffer_pos = 0; 289 } else { 290 this.rstate = ReadState::Idle; 291 } 292 //debug!("returning {}", len); 293 return Poll::Ready(Ok(len)); 294 } 295 }; 296 } 297 } 298}