Buttplug sex toy control library
at dev 3.0 kB view raw
1// Is this just two examples from tokio_tungstenite glued together? 2// 3// It absolute is! 4 5use futures_util::{StreamExt, TryStreamExt, future}; 6use log::info; 7use tokio::{ 8 net::{TcpListener, TcpStream}, 9 select, 10}; 11use tokio_tungstenite::connect_async; 12use tokio_util::sync::CancellationToken; 13 14pub struct ButtplugRepeater { 15 local_port: u16, 16 remote_address: String, 17 stop_token: CancellationToken, 18} 19 20impl ButtplugRepeater { 21 pub fn new(local_port: u16, remote_address: &str, stop_token: CancellationToken) -> Self { 22 Self { 23 local_port, 24 remote_address: remote_address.to_owned(), 25 stop_token, 26 } 27 } 28 29 pub async fn listen(&self) { 30 info!("Repeater loop starting"); 31 let addr = format!("127.0.0.1:{}", self.local_port); 32 33 let try_socket = TcpListener::bind(&addr).await; 34 let listener = try_socket.expect("Failed to bind"); 35 info!("Listening on: {}", addr); 36 37 loop { 38 select! { 39 stream_result = listener.accept() => { 40 match stream_result { 41 Ok((stream, _)) => { 42 let mut remote_address = self.remote_address.clone(); 43 if !remote_address.starts_with("ws://") { 44 remote_address.insert_str(0, "ws://"); 45 } 46 tokio::spawn(ButtplugRepeater::accept_connection(remote_address, stream)); 47 }, 48 Err(e) => { 49 error!("Error accepting new websocket for repeater: {:?}", e); 50 break; 51 } 52 } 53 }, 54 _ = self.stop_token.cancelled() => { 55 info!("Repeater loop requested to stop, breaking."); 56 break; 57 } 58 } 59 } 60 info!("Repeater loop exiting"); 61 } 62 63 async fn accept_connection(server_addr: String, stream: TcpStream) { 64 let client_addr = stream 65 .peer_addr() 66 .expect("connected streams should have a peer address"); 67 info!("Client address: {}", client_addr); 68 69 let client_ws_stream = tokio_tungstenite::accept_async(stream) 70 .await 71 .expect("Error during the websocket handshake occurred"); 72 73 info!("New WebSocket connection: {}", client_addr); 74 75 info!("Connecting to server {}", server_addr); 76 77 let server_url = url::Url::parse(&server_addr).unwrap(); 78 79 let ws_stream = match connect_async(&server_url).await { 80 Ok((stream, _)) => stream, 81 Err(e) => { 82 error!("Cannot connect: {:?}", e); 83 return; 84 } 85 }; 86 info!("WebSocket handshake has been successfully completed"); 87 88 let (server_write, server_read) = ws_stream.split(); 89 90 let (client_write, client_read) = client_ws_stream.split(); 91 92 let client_fut = client_read 93 .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) 94 .forward(server_write); 95 let server_fut = server_read 96 .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) 97 .forward(client_write); 98 future::select(client_fut, server_fut).await; 99 info!("Closing repeater connection."); 100 } 101}