Buttplug sex toy control library

chore: Return error as a variant type from downgrader

Means we can return errors in the version expected by the
client since they need to go to the client anyways, and usually
pass through the connector without parsing.

+80 -42
+5 -1
buttplug/src/core/connector/in_process_connector.rs
··· 166 } else { 167 ButtplugServerMessageV3::Error(ButtplugError::from(ButtplugMessageError::MessageConversionError("In-process connector messages should never have differing versions.".to_owned())).into()) 168 } 169 - Err(e) => e.into() 170 }; 171 sender 172 .send(output)
··· 166 } else { 167 ButtplugServerMessageV3::Error(ButtplugError::from(ButtplugMessageError::MessageConversionError("In-process connector messages should never have differing versions.".to_owned())).into()) 168 } 169 + Err(e) => if let ButtplugServerMessageVariant::V3(msg) = e { 170 + msg 171 + } else { 172 + ButtplugServerMessageV3::Error(ButtplugError::from(ButtplugMessageError::MessageConversionError("In-process connector messages should never have differing versions.".to_owned())).into()) 173 + } 174 }; 175 sender 176 .send(output)
+14 -10
buttplug/src/server/server_downgrade_wrapper.rs
··· 7 8 use std::{fmt, sync::Arc}; 9 10 - use crate::core::message::{self, ButtplugClientMessageVariant, ButtplugMessageSpecVersion, ButtplugServerMessageV4, ButtplugServerMessageVariant}; 11 12 use super::{device::ServerDeviceManager, server_message_conversion::ButtplugServerMessageConverter, ButtplugServer, ButtplugServerResultFuture}; 13 use futures::{future::{self, BoxFuture, FutureExt}, Stream}; ··· 55 self.server.disconnect() 56 } 57 58 pub fn client_version_event_stream(&self) -> impl Stream<Item = ButtplugServerMessageVariant> { 59 let spec_version = self.spec_version.clone(); 60 self.server.event_stream().map(move |m| { ··· 76 pub fn parse_message( 77 &self, 78 msg: ButtplugClientMessageVariant, 79 - ) -> BoxFuture<'static, Result<ButtplugServerMessageVariant, message::ErrorV0>> { 80 error!("{:?}", msg); 81 match msg { 82 ButtplugClientMessageVariant::V4(msg) => { 83 let fut = self.server.parse_message(msg); 84 async move { 85 - Ok(fut.await?.into()) 86 }.boxed() 87 } 88 msg => { 89 let v = msg.version(); 90 let converter = ButtplugServerMessageConverter::new(Some(msg)); 91 match converter.convert_incoming(&self.server.device_manager()) { 92 Ok(converted_msg) => { 93 let fut = self.server.parse_message(converted_msg); 94 - let spec_version = *self.spec_version.get_or_init(|| { 95 - info!("Setting Buttplug Server Message Spec Downgrade version to {}", v); 96 - v 97 - } ); 98 async move { 99 - let result = fut.await?; 100 - converter.convert_outgoing(&result, &spec_version).map_err(|e| e.into()) 101 }.boxed() 102 } 103 - Err(e) => future::ready(Err(e.into())).boxed() 104 } 105 } 106 }
··· 7 8 use std::{fmt, sync::Arc}; 9 10 + use crate::core::message::{self, ButtplugClientMessageVariant, ButtplugMessageSpecVersion, ButtplugServerMessageV4, ButtplugServerMessageVariant, ErrorV0}; 11 12 use super::{device::ServerDeviceManager, server_message_conversion::ButtplugServerMessageConverter, ButtplugServer, ButtplugServerResultFuture}; 13 use futures::{future::{self, BoxFuture, FutureExt}, Stream}; ··· 55 self.server.disconnect() 56 } 57 58 + pub fn spec_version(&self) -> Option<ButtplugMessageSpecVersion> { 59 + self.spec_version.get().copied() 60 + } 61 + 62 pub fn client_version_event_stream(&self) -> impl Stream<Item = ButtplugServerMessageVariant> { 63 let spec_version = self.spec_version.clone(); 64 self.server.event_stream().map(move |m| { ··· 80 pub fn parse_message( 81 &self, 82 msg: ButtplugClientMessageVariant, 83 + ) -> BoxFuture<'static, Result<ButtplugServerMessageVariant, ButtplugServerMessageVariant>> { 84 error!("{:?}", msg); 85 match msg { 86 ButtplugClientMessageVariant::V4(msg) => { 87 let fut = self.server.parse_message(msg); 88 async move { 89 + Ok(fut.await.map_err(|e| ButtplugServerMessageVariant::from(ButtplugServerMessageV4::from(e)))?.into()) 90 }.boxed() 91 } 92 msg => { 93 let v = msg.version(); 94 let converter = ButtplugServerMessageConverter::new(Some(msg)); 95 + let spec_version = *self.spec_version.get_or_init(|| { 96 + info!("Setting Buttplug Server Message Spec Downgrade version to {}", v); 97 + v 98 + } ); 99 match converter.convert_incoming(&self.server.device_manager()) { 100 Ok(converted_msg) => { 101 let fut = self.server.parse_message(converted_msg); 102 async move { 103 + let result = fut.await.map_err(|e| converter.convert_outgoing(&e.into(), &spec_version).unwrap())?; 104 + converter.convert_outgoing(&result, &spec_version).map_err(|e| converter.convert_outgoing(&&ButtplugServerMessageV4::from(ErrorV0::from(e)), &spec_version).unwrap()) 105 }.boxed() 106 } 107 + Err(e) => future::ready(Err(converter.convert_outgoing(&ButtplugServerMessageV4::from(ErrorV0::from(e)), &spec_version).unwrap())).boxed() 108 } 109 } 110 }
+29 -17
buttplug/tests/test_server.rs
··· 29 device::{ 30 hardware::{HardwareCommand, HardwareWriteCmd}, 31 ServerDeviceManagerBuilder, 32 - }, 33 - ButtplugServerBuilder, ButtplugServerDowngradeWrapper, 34 }, 35 }; 36 use futures::{pin_mut, Stream, StreamExt}; ··· 86 // assert_eq!(server.server_name, "Test Server"); 87 let result = server.parse_message(msg.try_into().unwrap()).await; 88 assert!(result.is_err()); 89 - assert!(matches!( 90 - result.unwrap_err().original_error(), 91 - ButtplugError::ButtplugHandshakeError(ButtplugHandshakeError::RequestServerInfoExpected) 92 - )); 93 assert!(!server.connected()); 94 } 95 ··· 232 let (server, _recv) = setup_test_server((msg.clone()).into()).await; 233 assert!(server.connected()); 234 let err = server.parse_message(message::ButtplugClientMessageVariant::V3(msg.into())).await.unwrap_err(); 235 - assert!(matches!( 236 - err.original_error(), 237 - ButtplugError::ButtplugHandshakeError(ButtplugHandshakeError::HandshakeAlreadyHappened) 238 - )); 239 } 240 241 #[tokio::test] 242 async fn test_invalid_device_index() { 243 let msg = message::RequestServerInfoV1::new("Test Client", BUTTPLUG_CURRENT_MESSAGE_SPEC_VERSION); 244 let (server, _) = setup_test_server(msg.into()).await; 245 - let reply = server 246 .parse_message(message::ButtplugClientMessageVariant::V3(message::VibrateCmdV1::new(10, vec![]).into())) 247 - .await; 248 - assert!(reply.is_err()); 249 - assert!(matches!( 250 - reply.unwrap_err().original_error(), 251 - ButtplugError::ButtplugDeviceError(ButtplugDeviceError::DeviceNotAvailable(_)) 252 - )); 253 } 254 255 #[tokio::test]
··· 29 device::{ 30 hardware::{HardwareCommand, HardwareWriteCmd}, 31 ServerDeviceManagerBuilder, 32 + }, ButtplugServerBuilder, ButtplugServerDowngradeWrapper 33 }, 34 }; 35 use futures::{pin_mut, Stream, StreamExt}; ··· 85 // assert_eq!(server.server_name, "Test Server"); 86 let result = server.parse_message(msg.try_into().unwrap()).await; 87 assert!(result.is_err()); 88 + if let Err(ButtplugServerMessageVariant::V3(ButtplugServerMessageV3::Error(e))) = result { 89 + assert!(matches!( 90 + e.original_error(), 91 + ButtplugError::ButtplugHandshakeError(ButtplugHandshakeError::RequestServerInfoExpected) 92 + )); 93 + } else { 94 + panic!("Should've gotten error") 95 + } 96 assert!(!server.connected()); 97 } 98 ··· 235 let (server, _recv) = setup_test_server((msg.clone()).into()).await; 236 assert!(server.connected()); 237 let err = server.parse_message(message::ButtplugClientMessageVariant::V3(msg.into())).await.unwrap_err(); 238 + if let ButtplugServerMessageVariant::V3(ButtplugServerMessageV3::Error(e)) = err { 239 + assert!(matches!( 240 + e.original_error(), 241 + ButtplugError::ButtplugHandshakeError(ButtplugHandshakeError::HandshakeAlreadyHappened) 242 + )); 243 + } else { 244 + panic!("Should've gotten error") 245 + } 246 + 247 } 248 249 #[tokio::test] 250 async fn test_invalid_device_index() { 251 let msg = message::RequestServerInfoV1::new("Test Client", BUTTPLUG_CURRENT_MESSAGE_SPEC_VERSION); 252 let (server, _) = setup_test_server(msg.into()).await; 253 + let err = server 254 .parse_message(message::ButtplugClientMessageVariant::V3(message::VibrateCmdV1::new(10, vec![]).into())) 255 + .await 256 + .unwrap_err(); 257 + if let ButtplugServerMessageVariant::V3(ButtplugServerMessageV3::Error(e)) = err { 258 + assert!(matches!( 259 + e.original_error(), 260 + ButtplugError::ButtplugDeviceError(ButtplugDeviceError::DeviceNotAvailable(_)) 261 + )); 262 + } else { 263 + panic!("Should've gotten error") 264 + } 265 } 266 267 #[tokio::test]
+12 -12
buttplug/tests/test_server_device.rs
··· 8 mod util; 9 use buttplug::core::{ 10 errors::{ButtplugDeviceError, ButtplugError}, 11 - message::{self, ButtplugClientMessageVariant, ButtplugServerMessageV3, ButtplugServerMessageVariant, Endpoint, BUTTPLUG_CURRENT_MESSAGE_SPEC_VERSION}, 12 }; 13 use futures::{pin_mut, StreamExt}; 14 use std::matches; 15 pub use util::test_device_manager::TestDeviceCommunicationManagerBuilder; 16 - use util::test_server_with_device; 17 18 // Test devices that have protocols that support movements not all devices do. 19 // For instance, the Onyx+ is part of a protocol that supports vibration, but ··· 112 113 #[tokio::test] 114 async fn test_reject_on_no_raw_message() { 115 - let (server, _) = test_server_with_device("Massage Demo", false); 116 - let recv = server.client_version_event_stream(); 117 pin_mut!(recv); 118 assert!(server 119 .parse_message( 120 - ButtplugClientMessageVariant::V3(message::RequestServerInfoV1::new("Test Client", BUTTPLUG_CURRENT_MESSAGE_SPEC_VERSION).into()) 121 ) 122 .await 123 .is_ok()); 124 assert!(server 125 - .parse_message(ButtplugClientMessageVariant::V3(message::StartScanningV0::default().into())) 126 .await 127 .is_ok()); 128 while let Some(msg) = recv.next().await { 129 - if let ButtplugServerMessageVariant::V3(ButtplugServerMessageV3::ScanningFinished(_)) = msg { 130 continue; 131 - } else if let ButtplugServerMessageVariant::V3(ButtplugServerMessageV3::DeviceAdded(da)) = msg { 132 assert_eq!(da.device_name(), "Aneros Vivi"); 133 let mut should_be_err; 134 should_be_err = server 135 .parse_message( 136 - ButtplugClientMessageVariant::V3(message::RawWriteCmdV2::new(da.device_index(), Endpoint::Tx, &[0x0], false).into()), 137 ) 138 .await; 139 assert!(should_be_err.is_err()); ··· 143 )); 144 145 should_be_err = server 146 - .parse_message(ButtplugClientMessageVariant::V3(message::RawReadCmdV2::new(da.device_index(), Endpoint::Tx, 0, 0).into())) 147 .await; 148 assert!(should_be_err.is_err()); 149 assert!(matches!( ··· 152 )); 153 154 should_be_err = server 155 - .parse_message(ButtplugClientMessageVariant::V3(message::RawSubscribeCmdV2::new(da.device_index(), Endpoint::Tx).into())) 156 .await; 157 assert!(should_be_err.is_err()); 158 assert!(matches!( ··· 161 )); 162 163 should_be_err = server 164 - .parse_message(ButtplugClientMessageVariant::V3(message::RawUnsubscribeCmdV2::new(da.device_index(), Endpoint::Tx).into())) 165 .await; 166 assert!(should_be_err.is_err()); 167 assert!(matches!(
··· 8 mod util; 9 use buttplug::core::{ 10 errors::{ButtplugDeviceError, ButtplugError}, 11 + message::{self, ButtplugClientMessageV4, ButtplugClientMessageVariant, ButtplugServerMessageV3, ButtplugServerMessageV4, ButtplugServerMessageVariant, Endpoint, BUTTPLUG_CURRENT_MESSAGE_SPEC_VERSION}, 12 }; 13 use futures::{pin_mut, StreamExt}; 14 use std::matches; 15 pub use util::test_device_manager::TestDeviceCommunicationManagerBuilder; 16 + use util::{test_server_with_device, test_server_v4_with_device}; 17 18 // Test devices that have protocols that support movements not all devices do. 19 // For instance, the Onyx+ is part of a protocol that supports vibration, but ··· 112 113 #[tokio::test] 114 async fn test_reject_on_no_raw_message() { 115 + let (server, _) = test_server_v4_with_device("Massage Demo", false); 116 + let recv = server.event_stream(); 117 pin_mut!(recv); 118 assert!(server 119 .parse_message( 120 + ButtplugClientMessageV4::from(message::RequestServerInfoV1::new("Test Client", BUTTPLUG_CURRENT_MESSAGE_SPEC_VERSION)) 121 ) 122 .await 123 .is_ok()); 124 assert!(server 125 + .parse_message(ButtplugClientMessageV4::from(message::StartScanningV0::default())) 126 .await 127 .is_ok()); 128 while let Some(msg) = recv.next().await { 129 + if let ButtplugServerMessageV4::ScanningFinished(_) = msg { 130 continue; 131 + } else if let ButtplugServerMessageV4::DeviceAdded(da) = msg { 132 assert_eq!(da.device_name(), "Aneros Vivi"); 133 let mut should_be_err; 134 should_be_err = server 135 .parse_message( 136 + ButtplugClientMessageV4::from(message::RawWriteCmdV2::new(da.device_index(), Endpoint::Tx, &[0x0], false)), 137 ) 138 .await; 139 assert!(should_be_err.is_err()); ··· 143 )); 144 145 should_be_err = server 146 + .parse_message(ButtplugClientMessageV4::from(message::RawReadCmdV2::new(da.device_index(), Endpoint::Tx, 0, 0))) 147 .await; 148 assert!(should_be_err.is_err()); 149 assert!(matches!( ··· 152 )); 153 154 should_be_err = server 155 + .parse_message(ButtplugClientMessageV4::from(message::RawSubscribeCmdV2::new(da.device_index(), Endpoint::Tx))) 156 .await; 157 assert!(should_be_err.is_err()); 158 assert!(matches!( ··· 161 )); 162 163 should_be_err = server 164 + .parse_message(ButtplugClientMessageV4::from(message::RawUnsubscribeCmdV2::new(da.device_index(), Endpoint::Tx))) 165 .await; 166 assert!(should_be_err.is_err()); 167 assert!(matches!(
+5 -1
buttplug/tests/util/device_test/client/client_v2/in_process_connector.rs
··· 163 } else { 164 ButtplugServerMessageV2::Error(ButtplugError::from(ButtplugMessageError::MessageConversionError("In-process connector messages should never have differing versions.".to_owned())).into()) 165 } 166 - Err(e) => e.into() 167 }; 168 sender 169 .send(output)
··· 163 } else { 164 ButtplugServerMessageV2::Error(ButtplugError::from(ButtplugMessageError::MessageConversionError("In-process connector messages should never have differing versions.".to_owned())).into()) 165 } 166 + Err(e) => if let ButtplugServerMessageVariant::V2(msg) = e { 167 + msg 168 + } else { 169 + ButtplugServerMessageV2::Error(ButtplugError::from(ButtplugMessageError::MessageConversionError("In-process connector messages should never have differing versions.".to_owned())).into()) 170 + } 171 }; 172 sender 173 .send(output)
+14
buttplug/tests/util/mod.rs
··· 147 device, 148 ) 149 }
··· 147 device, 148 ) 149 } 150 + 151 + #[allow(dead_code)] 152 + pub fn test_server_v4_with_device( 153 + device_type: &str, 154 + allow_raw_message: bool, 155 + ) -> (ButtplugServer, TestDeviceChannelHost) { 156 + let mut builder = TestDeviceCommunicationManagerBuilder::default(); 157 + let device = builder.add_test_device(&TestDeviceIdentifier::new(device_type, None)); 158 + 159 + ( 160 + test_server_with_comm_manager(builder, allow_raw_message), 161 + device, 162 + ) 163 + }
+1 -1
buttplug/tests/util/test_server.rs
··· 71 } 72 }, 73 Err(err_msg) => { 74 - if connector_clone.send(ButtplugServerMessageVariant::V3(err_msg.into())).await.is_err() { 75 error!("Cannot send reply to server, dropping and assuming remote server thread has exited."); 76 } 77 }
··· 71 } 72 }, 73 Err(err_msg) => { 74 + if connector_clone.send(err_msg).await.is_err() { 75 error!("Cannot send reply to server, dropping and assuming remote server thread has exited."); 76 } 77 }