/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use std::{collections::VecDeque, ops::Deref, sync::Arc}; use anyhow::{Result, bail}; use clap::Parser; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader}, net::{ TcpListener, TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}, }, sync::Mutex, }; use tracing::{debug, error, warn}; use tracing_subscriber::{EnvFilter, prelude::*}; #[derive(Parser, Debug)] #[command(version)] struct Args { #[arg(short, long, default_value_t = 6667)] port: u16, /// The server to proxy. host: String, } type BufReadHalf = BufReader; struct Connection { connected: Mutex, read: Mutex, write: Mutex, } struct InnerBouncer { client: Connection, server: Connection, // TODO: persist message_queue: std::sync::Mutex>>, } struct Bouncer(Arc); #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::registry() .with(EnvFilter::from_default_env()) .with(tracing_subscriber::fmt::layer()) .init(); let args = Args::parse(); let client_listener = TcpListener::bind("127.0.0.1:6667").await?; let client_conn = client_listener.accept().await?.0; let server_conn = TcpStream::connect(args.host).await?; let bouncer = Bouncer::new(client_conn, server_conn); let its_2_am_idk_what_to_name_this_bazinga = bouncer.clone(); tokio::spawn(async move { its_2_am_idk_what_to_name_this_bazinga .clientbound_task() .await .unwrap(); }); let its_2_am_idk_what_to_name_this_bazinga = bouncer.clone(); tokio::spawn(async move { its_2_am_idk_what_to_name_this_bazinga .serverbound_task() .await .unwrap(); }); tokio::signal::ctrl_c().await?; Ok(()) } impl Clone for Bouncer { fn clone(&self) -> Self { Self(self.0.clone()) } } impl Deref for Bouncer { type Target = InnerBouncer; fn deref(&self) -> &Self::Target { &self.0 } } impl Bouncer { fn new(client: TcpStream, server: TcpStream) -> Self { let (client_read, client_write) = client.into_split(); let (server_read, server_write) = server.into_split(); Self( InnerBouncer { client: Connection { connected: true.into(), read: BufReader::new(client_read).into(), write: client_write.into(), }, server: Connection { connected: true.into(), read: BufReader::new(server_read).into(), write: server_write.into(), }, message_queue: Default::default(), } .into(), ) } pub async fn clientbound_task(&self) -> Result<()> { let mut buf = Vec::new(); loop { // TODO: don't re-lock read on each loop? let bytes_read = { let mut read = self.server.read.lock().await; read.read_until(b'\n', &mut buf).await? }; if bytes_read == 0 { bail!("EOF from server, what?"); } let line = String::from_utf8_lossy(&buf); debug!("SERVER->CLIENT: {}", line.trim_end_matches("\r\n")); self.write_to_client(&buf); buf.clear(); } } pub async fn serverbound_task(&self) -> Result<()> { let mut buf = Vec::new(); let mut read = self.connections.client_read.lock().await; let mut write = self.connections.server_write.lock().await; loop { let read = read.read_until(b'\n', &mut buf).await?; if read == 0 { warn!("EOF from client, shutting down serverbound task"); let mut connected = self.connections.client_connected.write().await; *connected = false; return Ok(()); } let line = String::from_utf8_lossy(&buf); let line = line.trim_end_matches("\r\n"); debug!("CLIENT->SERVER: {}", line); write.write(&buf).await?; buf.clear(); } } // writes to the client. if disconnected, enqueues instead and marks the client as disconnected. async fn write_to_client(&self, buf: &[u8]) -> std::io::Result<()> { let mut connected = self.client.connected.lock().await; if *connected { let mut write = self.client.write.lock().await; // TODO: handle backlog here? match write.write(buf).await { Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => { warn!("Client disconnected"); *connected = false; } Err(why) => return Err(why), Ok(_) => return Ok(()), } } // always run if not initially connected or disconnected above^^^, so OK to return afterwards if !*connected { debug!("writing to queue instead..."); self.message_queue.lock().unwrap().push_back(buf.to_vec()); } // it's 5am. i'll clean this up later Ok(()) } }