Buttplug sex toy control library
1use crate::{
2 ButtplugRemoteServer, ButtplugRepeater,
3 backdoor_server::BackdoorServer,
4 buttplug_server::{reset_buttplug_server, run_server, setup_buttplug_server},
5 error::IntifaceEngineError,
6 frontend::{
7 Frontend, frontend_external_event_loop, frontend_server_event_loop,
8 process_messages::EngineMessage,
9 },
10 mdns::IntifaceMdns,
11 options::EngineOptions,
12 remote_server::ButtplugRemoteServerEvent,
13 rest_server::IntifaceRestServer,
14};
15
16use buttplug_server_device_config::{DeviceConfigurationManager, save_user_config};
17use futures::{StreamExt, pin_mut};
18use once_cell::sync::OnceCell;
19use std::{path::Path, sync::Arc, time::Duration};
20use tokio::{fs, select};
21use tokio_util::sync::CancellationToken;
22
23#[cfg(debug_assertions)]
24pub fn maybe_crash_main_thread(options: &EngineOptions) {
25 if options.crash_main_thread() {
26 panic!("Crashing main thread by request");
27 }
28}
29
30#[allow(dead_code)]
31#[cfg(debug_assertions)]
32pub fn maybe_crash_task_thread(options: &EngineOptions) {
33 if options.crash_task_thread() {
34 tokio::spawn(async {
35 tokio::time::sleep(Duration::from_millis(100)).await;
36 panic!("Crashing a task thread by request");
37 });
38 }
39}
40
41#[derive(Default)]
42pub struct IntifaceEngine {
43 stop_token: Arc<CancellationToken>,
44 backdoor_server: OnceCell<Arc<BackdoorServer>>,
45}
46
47impl IntifaceEngine {
48 pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
49 Some(self.backdoor_server.get()?.clone())
50 }
51
52 pub async fn run(
53 &self,
54 options: &EngineOptions,
55 frontend: Option<Arc<dyn Frontend>>,
56 dcm: &Option<Arc<DeviceConfigurationManager>>,
57 ) -> Result<(), IntifaceEngineError> {
58 // Set up Frontend
59 if let Some(frontend) = &frontend {
60 let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
61 tokio::spawn(async move {
62 frontend_loop.await;
63 });
64
65 frontend.connect().await.unwrap();
66 frontend.send(EngineMessage::EngineStarted {}).await;
67 }
68
69 // Set up mDNS
70 let _mdns_server = if options.broadcast_server_mdns() {
71 // TODO Unregister whenever we have a live connection
72
73 // TODO Support different services for engine versus repeater
74 Some(IntifaceMdns::new())
75 } else {
76 None
77 };
78
79 // Set up Repeater (if in repeater mode)
80 if options.repeater_mode() {
81 info!("Starting repeater");
82
83 let repeater = ButtplugRepeater::new(
84 options.repeater_local_port().unwrap(),
85 options.repeater_remote_address().as_ref().unwrap(),
86 self.stop_token.child_token(),
87 );
88 select! {
89 _ = self.stop_token.cancelled() => {
90 info!("Owner requested process exit, exiting.");
91 }
92 _ = repeater.listen() => {
93 info!("Repeater listener stopped, exiting.");
94 }
95 };
96 if let Some(frontend) = &frontend {
97 frontend.send(EngineMessage::EngineStopped {}).await;
98 tokio::time::sleep(Duration::from_millis(100)).await;
99 frontend.disconnect();
100 }
101 return Ok(());
102 }
103
104 // Set up Engine (if in engine mode)
105
106 // At this point we will have received and validated options.
107
108 // Hang out until those listeners get sick of listening.
109 info!("Intiface CLI Setup finished, running server tasks until all joined.");
110 let server = setup_buttplug_server(options, &self.backdoor_server, dcm).await?;
111 let dcm = server
112 .device_manager()
113 .device_configuration_manager()
114 .clone();
115
116 if let Some(rest_port) = options.rest_api_port() {
117 select! {
118 _ = self.stop_token.cancelled() => {
119 info!("Owner requested process exit, exiting.");
120 }
121 res = IntifaceRestServer::run(rest_port, server) => {
122 info!("Rest API listener stopped, exiting.");
123 if let Err(e) = res {
124 error!("Error running Intiface Central RestAPI Server: {:?}", e);
125 }
126 }
127 };
128 if let Some(frontend) = &frontend {
129 frontend.send(EngineMessage::EngineStopped {}).await;
130 tokio::time::sleep(Duration::from_millis(100)).await;
131 frontend.disconnect();
132 }
133 return Ok(());
134 }
135
136 let mut server = ButtplugRemoteServer::new(server, &None);
137
138 if let Some(config_path) = options.user_device_config_path() {
139 let stream = server.event_stream();
140 {
141 let config_path = config_path.to_owned();
142 tokio::spawn(async move {
143 pin_mut!(stream);
144 loop {
145 if let Some(event) = stream.next().await {
146 match event {
147 ButtplugRemoteServerEvent::DeviceAdded {
148 index: _,
149 identifier: _,
150 name: _,
151 display_name: _,
152 } => {
153 if let Ok(config_str) = save_user_config(&dcm) {
154 // Should probably at least log if we fail to write the config file
155 if let Err(e) = fs::write(&Path::new(&config_path), config_str).await {
156 error!("Error saving config file: {:?}", e);
157 }
158 }
159 }
160 _ => continue,
161 }
162 };
163 }
164 });
165 }
166 }
167 if let Some(frontend) = &frontend {
168 frontend.send(EngineMessage::EngineServerCreated {}).await;
169 let event_receiver = server.event_stream();
170 let frontend_clone = frontend.clone();
171 let stop_child_token = self.stop_token.child_token();
172 tokio::spawn(async move {
173 frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
174 });
175 }
176
177 loop {
178 let session_connection_token = CancellationToken::new();
179 info!("Starting server");
180
181 // Let everything spin up, then try crashing.
182
183 #[cfg(debug_assertions)]
184 maybe_crash_main_thread(options);
185
186 let mut exit_requested = false;
187 select! {
188 _ = self.stop_token.cancelled() => {
189 info!("Owner requested process exit, exiting.");
190 exit_requested = true;
191 }
192 result = run_server(&server, options) => {
193 match result {
194 Ok(_) => info!("Connection dropped, restarting stay open loop."),
195 Err(e) => {
196 error!("{}", format!("Process Error: {:?}", e));
197
198 if let Some(frontend) = &frontend {
199 frontend
200 .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
201 .await;
202 }
203 }
204 }
205 }
206 };
207 match server.disconnect().await {
208 Ok(_) => {
209 info!("Client forcefully disconnected from server.");
210 if let Some(frontend) = &frontend {
211 frontend.send(EngineMessage::ClientDisconnected {}).await;
212 }
213 }
214 Err(_) => info!("Client already disconnected from server."),
215 };
216 session_connection_token.cancel();
217 if exit_requested {
218 info!("Breaking out of event loop in order to exit");
219 break;
220 }
221 // We're not exiting, rebuild our server.
222 let dm = server.server().device_manager();
223 server = reset_buttplug_server(options, &dm, server.event_sender()).await?;
224 info!("Server connection dropped, restarting");
225 }
226 info!("Shutting down server...");
227 if let Err(e) = server.shutdown().await {
228 error!("Shutdown failed: {:?}", e);
229 }
230 info!("Exiting");
231 if let Some(frontend) = &frontend {
232 frontend.send(EngineMessage::EngineStopped {}).await;
233 tokio::time::sleep(Duration::from_millis(100)).await;
234 frontend.disconnect();
235 }
236 Ok(())
237 }
238
239 pub fn stop(&self) {
240 info!("Engine stop called, cancelling token.");
241 self.stop_token.cancel();
242 }
243}