//! Example: XRPC over surf with channel-based async dispatch in Bevy //! //! Demonstrates the channel-based XRPC pattern using surf (no tokio) with //! Bevy's IoTaskPool. This was the original approach before switching to //! reqwest + bevy-tokio-tasks. //! //! Run: `cargo run --example surf_xrpc_channel` //! //! Press Enter to fetch a profile. The app polls each frame for the response. use std::error::Error; use std::marker::PhantomData; use bevy::prelude::*; use bevy::window::WindowResolution; use crossbeam_channel::TryRecvError; use jacquard::common::xrpc::{ self, CallOptions, XrpcRequest, XrpcResp, build_http_request, process_response, }; use jacquard::deps::fluent_uri::Uri; use jacquard::deps::smol_str::SmolStr; use jacquard::http_client::HttpClient; #[derive(Debug, Clone, Default)] struct SurfHttpClient; #[derive(Debug)] struct SurfHttpError(surf::Error); impl std::fmt::Display for SurfHttpError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } } impl Error for SurfHttpError {} impl HttpClient for SurfHttpClient { type Error = SurfHttpError; async fn send_http( &self, request: http::Request>, ) -> Result>, Self::Error> { let (parts, body) = request.into_parts(); let method: surf::http::Method = match parts.method { http::Method::GET => surf::http::Method::Get, http::Method::POST => surf::http::Method::Post, _ => surf::http::Method::Get, }; let url = surf::Url::parse(&parts.uri.to_string()).map_err(|e| { SurfHttpError(surf::Error::from_str( surf::StatusCode::BadRequest, e.to_string(), )) })?; let mut surf_req = surf::Request::new(method, url); for (name, value) in parts.headers.iter() { if let Ok(v) = value.to_str() { surf_req.append_header(name.as_str(), v); } } if !body.is_empty() { surf_req.body_bytes(&body); } let mut resp = surf::client().send(surf_req).await.map_err(SurfHttpError)?; let status = resp.status() as u16; let body = resp.body_bytes().await.map_err(SurfHttpError)?; let mut builder = http::Response::builder().status(status); for (name, values) in resp.iter() { builder = builder.header(name.as_str(), values.as_str()); } builder.body(body).map_err(|e| { SurfHttpError(surf::Error::from_str( surf::StatusCode::InternalServerError, e.to_string(), )) }) } } // --------------------------------------------------------------------------- // Channel infrastructure // --------------------------------------------------------------------------- struct HttpRoundtrip { request: http::Request>, response_tx: crossbeam_channel::Sender>, SurfHttpError>>, } #[derive(Resource)] struct XrpcChannel { sender: async_channel::Sender, } struct PendingXrpc { receiver: crossbeam_channel::Receiver>, SurfHttpError>>, _marker: PhantomData, } impl PendingXrpc { fn try_recv(&self) -> Option, String>> { match self.receiver.try_recv() { Ok(Ok(http_resp)) => match process_response::(http_resp) { Ok(resp) => Some(Ok(resp)), Err(e) => Some(Err(format!("XRPC error: {e}"))), }, Ok(Err(e)) => Some(Err(format!("HTTP error: {e}"))), Err(TryRecvError::Empty) => None, Err(TryRecvError::Disconnected) => Some(Err("channel disconnected".into())), } } } fn xrpc_request( channel: &XrpcChannel, base: &Uri<&str>, request: &R, ) -> Result, String> where R: XrpcRequest + serde::Serialize, { let http_req = build_http_request(base, request, &CallOptions::default()) .map_err(|e| format!("build error: {e}"))?; let (resp_tx, resp_rx) = crossbeam_channel::bounded(1); channel .sender .try_send(HttpRoundtrip { request: http_req, response_tx: resp_tx, }) .map_err(|e| format!("send error: {e}"))?; Ok(PendingXrpc { receiver: resp_rx, _marker: PhantomData, }) } // --------------------------------------------------------------------------- // Bevy resources and systems // --------------------------------------------------------------------------- use jacquard::api::app_bsky::actor::get_profile::{GetProfile, GetProfileResponse}; use serde::Deserialize; /// Generic resource wrapping any in-flight XRPC request. /// The type parameter R is the XrpcResp type for the expected response. #[derive(Resource)] struct Pending(Option>); impl Default for Pending { fn default() -> Self { Self(None) } } type PendingProfile = Pending; #[derive(Resource, Default)] struct FetchRequested(bool); /// Startup: spawn the surf HTTP handler on IoTaskPool. fn setup_channel(mut commands: Commands) { let (sender, receiver) = async_channel::bounded::(64); commands.insert_resource(XrpcChannel { sender }); bevy::tasks::IoTaskPool::get() .spawn(async move { let client = SurfHttpClient; while let Ok(roundtrip) = receiver.recv().await { let result = client.send_http(roundtrip.request).await; if roundtrip.response_tx.send(result).is_err() { warn!("response channel closed"); } } }) .detach(); info!("surf XRPC handler spawned on IoTaskPool"); } /// System: press Enter to trigger a profile fetch. fn trigger_fetch(keys: Res>, mut requested: ResMut) { if keys.just_pressed(KeyCode::Enter) { requested.0 = true; info!("Enter pressed — fetching profile..."); } } /// System: send the XRPC request when triggered. fn send_request( channel: Res, mut pending: ResMut, mut requested: ResMut, ) { if !requested.0 || pending.0.is_some() { return; } requested.0 = false; let base = Uri::parse("https://public.api.bsky.app").expect("valid URI"); // Fetch the Bluesky team account as a demo let did = jacquard::types::string::Did::new_static("did:plc:z72i7hdynmk6r22z27h6tvur") .expect("valid DID"); let actor: jacquard::types::ident::AtIdentifier = did.into(); let request = GetProfile::new().actor(actor).build(); match xrpc_request(&channel, &base, &request) { Ok(handle) => { info!("request sent, polling..."); pending.0 = Some(handle); } Err(e) => warn!("failed to send request: {e}"), } } /// Generic poll system — works with any Pending where R: XrpcResp. /// Returns the parsed response output when ready, or None if still waiting. fn poll_pending( pending: &mut Pending, ) -> Option::Output, String>> where for<'de> ::Output: Deserialize<'de>, { let Some(ref handle) = pending.0 else { return None; }; let result = handle.try_recv()?; pending.0 = None; Some(match result { Ok(response) => match response.into_output() { Ok(output) => Ok(output), Err(e) => Err(format!("parse error: {e}")), }, Err(e) => Err(e), }) } /// Concrete system that uses the generic poll to handle profile responses. fn poll_response(mut pending: ResMut) { if let Some(result) = poll_pending(&mut pending) { match result { Ok(profile) => { let profile = profile.value; let name: &str = profile .display_name .as_ref() .map(|s| s.as_ref()) .unwrap_or("(none)"); let handle: &str = profile.handle.as_ref(); info!("=== Profile fetched! ==="); info!(" Display name: {name}"); info!(" Handle: @{handle}"); info!(" Followers: {:?}", profile.followers_count); info!(" Posts: {:?}", profile.posts_count); info!("Press Enter to fetch again."); } Err(e) => warn!("request error: {e}"), } } } // --------------------------------------------------------------------------- // Main // --------------------------------------------------------------------------- fn main() { App::new() .add_plugins(DefaultPlugins.set(WindowPlugin { primary_window: Some(Window { title: "surf XRPC channel example — press Enter to fetch a profile".into(), resolution: WindowResolution::new(400, 200), ..default() }), ..default() })) .init_resource::() .init_resource::() .add_systems(Startup, setup_channel) .add_systems(Update, (trigger_fetch, send_request, poll_response).chain()) .run(); }