Buttplug sex toy control library
at dev 7.8 kB view raw
1use crate::{ 2 ButtplugRemoteServer, ButtplugRepeater, 3 backdoor_server::BackdoorServer, 4 buttplug_server::{reset_buttplug_server, run_server, setup_buttplug_server}, 5 error::IntifaceEngineError, 6 frontend::{ 7 Frontend, frontend_external_event_loop, frontend_server_event_loop, 8 process_messages::EngineMessage, 9 }, 10 mdns::IntifaceMdns, 11 options::EngineOptions, 12 remote_server::ButtplugRemoteServerEvent, 13 rest_server::IntifaceRestServer, 14}; 15 16use buttplug_server_device_config::{DeviceConfigurationManager, save_user_config}; 17use futures::{StreamExt, pin_mut}; 18use once_cell::sync::OnceCell; 19use std::{path::Path, sync::Arc, time::Duration}; 20use tokio::{fs, select}; 21use tokio_util::sync::CancellationToken; 22 23#[cfg(debug_assertions)] 24pub fn maybe_crash_main_thread(options: &EngineOptions) { 25 if options.crash_main_thread() { 26 panic!("Crashing main thread by request"); 27 } 28} 29 30#[allow(dead_code)] 31#[cfg(debug_assertions)] 32pub fn maybe_crash_task_thread(options: &EngineOptions) { 33 if options.crash_task_thread() { 34 tokio::spawn(async { 35 tokio::time::sleep(Duration::from_millis(100)).await; 36 panic!("Crashing a task thread by request"); 37 }); 38 } 39} 40 41#[derive(Default)] 42pub struct IntifaceEngine { 43 stop_token: Arc<CancellationToken>, 44 backdoor_server: OnceCell<Arc<BackdoorServer>>, 45} 46 47impl IntifaceEngine { 48 pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> { 49 Some(self.backdoor_server.get()?.clone()) 50 } 51 52 pub async fn run( 53 &self, 54 options: &EngineOptions, 55 frontend: Option<Arc<dyn Frontend>>, 56 dcm: &Option<Arc<DeviceConfigurationManager>>, 57 ) -> Result<(), IntifaceEngineError> { 58 // Set up Frontend 59 if let Some(frontend) = &frontend { 60 let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone()); 61 tokio::spawn(async move { 62 frontend_loop.await; 63 }); 64 65 frontend.connect().await.unwrap(); 66 frontend.send(EngineMessage::EngineStarted {}).await; 67 } 68 69 // Set up mDNS 70 let _mdns_server = if options.broadcast_server_mdns() { 71 // TODO Unregister whenever we have a live connection 72 73 // TODO Support different services for engine versus repeater 74 Some(IntifaceMdns::new()) 75 } else { 76 None 77 }; 78 79 // Set up Repeater (if in repeater mode) 80 if options.repeater_mode() { 81 info!("Starting repeater"); 82 83 let repeater = ButtplugRepeater::new( 84 options.repeater_local_port().unwrap(), 85 options.repeater_remote_address().as_ref().unwrap(), 86 self.stop_token.child_token(), 87 ); 88 select! { 89 _ = self.stop_token.cancelled() => { 90 info!("Owner requested process exit, exiting."); 91 } 92 _ = repeater.listen() => { 93 info!("Repeater listener stopped, exiting."); 94 } 95 }; 96 if let Some(frontend) = &frontend { 97 frontend.send(EngineMessage::EngineStopped {}).await; 98 tokio::time::sleep(Duration::from_millis(100)).await; 99 frontend.disconnect(); 100 } 101 return Ok(()); 102 } 103 104 // Set up Engine (if in engine mode) 105 106 // At this point we will have received and validated options. 107 108 // Hang out until those listeners get sick of listening. 109 info!("Intiface CLI Setup finished, running server tasks until all joined."); 110 let server = setup_buttplug_server(options, &self.backdoor_server, dcm).await?; 111 let dcm = server 112 .device_manager() 113 .device_configuration_manager() 114 .clone(); 115 116 if let Some(rest_port) = options.rest_api_port() { 117 select! { 118 _ = self.stop_token.cancelled() => { 119 info!("Owner requested process exit, exiting."); 120 } 121 res = IntifaceRestServer::run(rest_port, server) => { 122 info!("Rest API listener stopped, exiting."); 123 if let Err(e) = res { 124 error!("Error running Intiface Central RestAPI Server: {:?}", e); 125 } 126 } 127 }; 128 if let Some(frontend) = &frontend { 129 frontend.send(EngineMessage::EngineStopped {}).await; 130 tokio::time::sleep(Duration::from_millis(100)).await; 131 frontend.disconnect(); 132 } 133 return Ok(()); 134 } 135 136 let mut server = ButtplugRemoteServer::new(server, &None); 137 138 if let Some(config_path) = options.user_device_config_path() { 139 let stream = server.event_stream(); 140 { 141 let config_path = config_path.to_owned(); 142 tokio::spawn(async move { 143 pin_mut!(stream); 144 loop { 145 if let Some(event) = stream.next().await { 146 match event { 147 ButtplugRemoteServerEvent::DeviceAdded { 148 index: _, 149 identifier: _, 150 name: _, 151 display_name: _, 152 } => { 153 if let Ok(config_str) = save_user_config(&dcm) { 154 // Should probably at least log if we fail to write the config file 155 if let Err(e) = fs::write(&Path::new(&config_path), config_str).await { 156 error!("Error saving config file: {:?}", e); 157 } 158 } 159 } 160 _ => continue, 161 } 162 }; 163 } 164 }); 165 } 166 } 167 if let Some(frontend) = &frontend { 168 frontend.send(EngineMessage::EngineServerCreated {}).await; 169 let event_receiver = server.event_stream(); 170 let frontend_clone = frontend.clone(); 171 let stop_child_token = self.stop_token.child_token(); 172 tokio::spawn(async move { 173 frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await; 174 }); 175 } 176 177 loop { 178 let session_connection_token = CancellationToken::new(); 179 info!("Starting server"); 180 181 // Let everything spin up, then try crashing. 182 183 #[cfg(debug_assertions)] 184 maybe_crash_main_thread(options); 185 186 let mut exit_requested = false; 187 select! { 188 _ = self.stop_token.cancelled() => { 189 info!("Owner requested process exit, exiting."); 190 exit_requested = true; 191 } 192 result = run_server(&server, options) => { 193 match result { 194 Ok(_) => info!("Connection dropped, restarting stay open loop."), 195 Err(e) => { 196 error!("{}", format!("Process Error: {:?}", e)); 197 198 if let Some(frontend) = &frontend { 199 frontend 200 .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()}) 201 .await; 202 } 203 } 204 } 205 } 206 }; 207 match server.disconnect().await { 208 Ok(_) => { 209 info!("Client forcefully disconnected from server."); 210 if let Some(frontend) = &frontend { 211 frontend.send(EngineMessage::ClientDisconnected {}).await; 212 } 213 } 214 Err(_) => info!("Client already disconnected from server."), 215 }; 216 session_connection_token.cancel(); 217 if exit_requested { 218 info!("Breaking out of event loop in order to exit"); 219 break; 220 } 221 // We're not exiting, rebuild our server. 222 let dm = server.server().device_manager(); 223 server = reset_buttplug_server(options, &dm, server.event_sender()).await?; 224 info!("Server connection dropped, restarting"); 225 } 226 info!("Shutting down server..."); 227 if let Err(e) = server.shutdown().await { 228 error!("Shutdown failed: {:?}", e); 229 } 230 info!("Exiting"); 231 if let Some(frontend) = &frontend { 232 frontend.send(EngineMessage::EngineStopped {}).await; 233 tokio::time::sleep(Duration::from_millis(100)).await; 234 frontend.disconnect(); 235 } 236 Ok(()) 237 } 238 239 pub fn stop(&self) { 240 info!("Engine stop called, cancelling token."); 241 self.stop_token.cancel(); 242 } 243}