use std::sync::Arc; use async_tungstenite::tungstenite::protocol::frame; use async_tungstenite::tungstenite::{self, Utf8Bytes}; use async_tungstenite::{async_tls::client_async_tls, tungstenite::Message}; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; use jacquard::{ CloseCode, CloseFrame, StreamError, WebSocketClient, WebSocketConnection, WsMessage, WsSink, WsStream, WsText, client::BasicClient, }; use smol::net::TcpStream; #[derive(Debug, Clone, Default)] pub struct AsyncTungsteniteClient; impl WebSocketClient for AsyncTungsteniteClient { type Error = tungstenite::Error; async fn connect( &self, uri: jacquard::deps::fluent_uri::Uri<&str>, ) -> Result { let uri_str = uri.as_str(); let authority = uri .authority() .ok_or_else(|| tungstenite::Error::Url(tungstenite::error::UrlError::NoHostName))?; let domain = authority.host(); let default_port = if uri_str.starts_with("wss:") { 443 } else { 80 }; let port = authority .port_to_u16() .ok() .flatten() .unwrap_or(default_port); let addr = format!("{domain}:{port}"); let tcp_stream = TcpStream::connect(&addr) .await .map_err(tungstenite::Error::Io)?; let (ws_stream, _response) = client_async_tls(uri_str, tcp_stream).await?; let (sink, stream) = ws_stream.split(); let rx_stream = stream.filter_map(|result| async move { match result { Ok(msg) => convert_to_ws_message(msg).map(Ok), Err(e) => Some(Err(StreamError::transport(e))), } }); let rx = WsStream::new(rx_stream); let tx_sink = sink.with(|msg: WsMessage| async move { Ok::<_, tungstenite::Error>(convert_from_ws_message(msg)) }); let tx_sink_mapped = tx_sink.sink_map_err(StreamError::transport); let tx = WsSink::new(tx_sink_mapped); Ok(WebSocketConnection::new(tx, rx)) } } fn convert_to_ws_message(msg: Message) -> Option { match msg { Message::Text(utf8_bytes) => { // Both Utf8Bytes and WsText wrap Bytes with UTF-8 invariant — zero-copy let bytes: Bytes = utf8_bytes.into(); // Safety: tungstenite already validated UTF-8 Some(WsMessage::Text(unsafe { WsText::from_bytes_unchecked(bytes) })) } Message::Binary(data) => Some(WsMessage::Binary(data)), Message::Close(frame) => { let close_frame = frame.map(|f| { let code = convert_close_code(f.code); let reason_bytes: Bytes = f.reason.into(); // Safety: CloseFrame reason was already UTF-8 validated by tungstenite let reason_str = unsafe { core::str::from_utf8_unchecked(&reason_bytes) }; CloseFrame::new(code, reason_str.to_owned()) }); Some(WsMessage::Close(close_frame)) } _ => None, } } fn convert_from_ws_message(msg: WsMessage) -> Message { match msg { WsMessage::Text(text) => { // WsText → Bytes → Utf8Bytes, both already UTF-8 validated let bytes = text.into_bytes(); // Safety: WsText guarantees UTF-8 Message::Text(unsafe { Utf8Bytes::from_bytes_unchecked(bytes) }) } WsMessage::Binary(bytes) => Message::Binary(bytes), WsMessage::Close(frame) => { let close_frame = frame.map(|f| { let code: u16 = f.code.into(); frame::CloseFrame { code: code.into(), reason: Utf8Bytes::from(f.reason.to_string()), } }); Message::Close(close_frame) } } } fn convert_close_code(code: frame::coding::CloseCode) -> CloseCode { use frame::coding::CloseCode as TC; match code { TC::Normal => CloseCode::Normal, TC::Away => CloseCode::Away, TC::Protocol => CloseCode::Protocol, TC::Unsupported => CloseCode::Unsupported, TC::Invalid => CloseCode::Invalid, TC::Policy => CloseCode::Policy, TC::Size => CloseCode::Size, TC::Extension => CloseCode::Extension, TC::Error => CloseCode::Error, TC::Tls => CloseCode::Tls, other => { let raw: u16 = other.into(); CloseCode::from(raw) } } } use bevy::prelude::*; /// Shared AT Protocol client for XRPC calls. #[derive(Resource, Clone)] pub struct AtpClient(pub Arc); pub fn setup_atp_client(mut commands: Commands) { commands.insert_resource(AtpClient(Arc::new(BasicClient::unauthenticated()))); }