this repo has no description
at main 294 lines 9.6 kB view raw
1//! Example: XRPC over surf with channel-based async dispatch in Bevy 2//! 3//! Demonstrates the channel-based XRPC pattern using surf (no tokio) with 4//! Bevy's IoTaskPool. This was the original approach before switching to 5//! reqwest + bevy-tokio-tasks. 6//! 7//! Run: `cargo run --example surf_xrpc_channel` 8//! 9//! Press Enter to fetch a profile. The app polls each frame for the response. 10 11use std::error::Error; 12use std::marker::PhantomData; 13 14use bevy::prelude::*; 15use bevy::window::WindowResolution; 16use crossbeam_channel::TryRecvError; 17use jacquard::common::xrpc::{ 18 self, CallOptions, XrpcRequest, XrpcResp, build_http_request, process_response, 19}; 20use jacquard::deps::fluent_uri::Uri; 21use jacquard::deps::smol_str::SmolStr; 22use jacquard::http_client::HttpClient; 23 24#[derive(Debug, Clone, Default)] 25struct SurfHttpClient; 26 27#[derive(Debug)] 28struct SurfHttpError(surf::Error); 29 30impl std::fmt::Display for SurfHttpError { 31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 32 write!(f, "{}", self.0) 33 } 34} 35 36impl Error for SurfHttpError {} 37 38impl HttpClient for SurfHttpClient { 39 type Error = SurfHttpError; 40 41 async fn send_http( 42 &self, 43 request: http::Request<Vec<u8>>, 44 ) -> Result<http::Response<Vec<u8>>, Self::Error> { 45 let (parts, body) = request.into_parts(); 46 47 let method: surf::http::Method = match parts.method { 48 http::Method::GET => surf::http::Method::Get, 49 http::Method::POST => surf::http::Method::Post, 50 _ => surf::http::Method::Get, 51 }; 52 53 let url = surf::Url::parse(&parts.uri.to_string()).map_err(|e| { 54 SurfHttpError(surf::Error::from_str( 55 surf::StatusCode::BadRequest, 56 e.to_string(), 57 )) 58 })?; 59 60 let mut surf_req = surf::Request::new(method, url); 61 for (name, value) in parts.headers.iter() { 62 if let Ok(v) = value.to_str() { 63 surf_req.append_header(name.as_str(), v); 64 } 65 } 66 if !body.is_empty() { 67 surf_req.body_bytes(&body); 68 } 69 70 let mut resp = surf::client().send(surf_req).await.map_err(SurfHttpError)?; 71 let status = resp.status() as u16; 72 let body = resp.body_bytes().await.map_err(SurfHttpError)?; 73 74 let mut builder = http::Response::builder().status(status); 75 for (name, values) in resp.iter() { 76 builder = builder.header(name.as_str(), values.as_str()); 77 } 78 builder.body(body).map_err(|e| { 79 SurfHttpError(surf::Error::from_str( 80 surf::StatusCode::InternalServerError, 81 e.to_string(), 82 )) 83 }) 84 } 85} 86 87// --------------------------------------------------------------------------- 88// Channel infrastructure 89// --------------------------------------------------------------------------- 90 91struct HttpRoundtrip { 92 request: http::Request<Vec<u8>>, 93 response_tx: crossbeam_channel::Sender<Result<http::Response<Vec<u8>>, SurfHttpError>>, 94} 95 96#[derive(Resource)] 97struct XrpcChannel { 98 sender: async_channel::Sender<HttpRoundtrip>, 99} 100 101struct PendingXrpc<R> { 102 receiver: crossbeam_channel::Receiver<Result<http::Response<Vec<u8>>, SurfHttpError>>, 103 _marker: PhantomData<R>, 104} 105 106impl<R: XrpcResp> PendingXrpc<R> { 107 fn try_recv(&self) -> Option<Result<xrpc::Response<R>, String>> { 108 match self.receiver.try_recv() { 109 Ok(Ok(http_resp)) => match process_response::<R>(http_resp) { 110 Ok(resp) => Some(Ok(resp)), 111 Err(e) => Some(Err(format!("XRPC error: {e}"))), 112 }, 113 Ok(Err(e)) => Some(Err(format!("HTTP error: {e}"))), 114 Err(TryRecvError::Empty) => None, 115 Err(TryRecvError::Disconnected) => Some(Err("channel disconnected".into())), 116 } 117 } 118} 119 120fn xrpc_request<R>( 121 channel: &XrpcChannel, 122 base: &Uri<&str>, 123 request: &R, 124) -> Result<PendingXrpc<R::Response>, String> 125where 126 R: XrpcRequest + serde::Serialize, 127{ 128 let http_req = build_http_request(base, request, &CallOptions::default()) 129 .map_err(|e| format!("build error: {e}"))?; 130 131 let (resp_tx, resp_rx) = crossbeam_channel::bounded(1); 132 133 channel 134 .sender 135 .try_send(HttpRoundtrip { 136 request: http_req, 137 response_tx: resp_tx, 138 }) 139 .map_err(|e| format!("send error: {e}"))?; 140 141 Ok(PendingXrpc { 142 receiver: resp_rx, 143 _marker: PhantomData, 144 }) 145} 146 147// --------------------------------------------------------------------------- 148// Bevy resources and systems 149// --------------------------------------------------------------------------- 150 151use jacquard::api::app_bsky::actor::get_profile::{GetProfile, GetProfileResponse}; 152use serde::Deserialize; 153 154/// Generic resource wrapping any in-flight XRPC request. 155/// The type parameter R is the XrpcResp type for the expected response. 156#[derive(Resource)] 157struct Pending<R: XrpcResp + Send + Sync + 'static>(Option<PendingXrpc<R>>); 158 159impl<R: XrpcResp + Send + Sync + 'static> Default for Pending<R> { 160 fn default() -> Self { 161 Self(None) 162 } 163} 164 165type PendingProfile = Pending<GetProfileResponse>; 166 167#[derive(Resource, Default)] 168struct FetchRequested(bool); 169 170/// Startup: spawn the surf HTTP handler on IoTaskPool. 171fn setup_channel(mut commands: Commands) { 172 let (sender, receiver) = async_channel::bounded::<HttpRoundtrip>(64); 173 commands.insert_resource(XrpcChannel { sender }); 174 175 bevy::tasks::IoTaskPool::get() 176 .spawn(async move { 177 let client = SurfHttpClient; 178 while let Ok(roundtrip) = receiver.recv().await { 179 let result = client.send_http(roundtrip.request).await; 180 if roundtrip.response_tx.send(result).is_err() { 181 warn!("response channel closed"); 182 } 183 } 184 }) 185 .detach(); 186 187 info!("surf XRPC handler spawned on IoTaskPool"); 188} 189 190/// System: press Enter to trigger a profile fetch. 191fn trigger_fetch(keys: Res<ButtonInput<KeyCode>>, mut requested: ResMut<FetchRequested>) { 192 if keys.just_pressed(KeyCode::Enter) { 193 requested.0 = true; 194 info!("Enter pressed — fetching profile..."); 195 } 196} 197 198/// System: send the XRPC request when triggered. 199fn send_request( 200 channel: Res<XrpcChannel>, 201 mut pending: ResMut<PendingProfile>, 202 mut requested: ResMut<FetchRequested>, 203) { 204 if !requested.0 || pending.0.is_some() { 205 return; 206 } 207 requested.0 = false; 208 209 let base = Uri::parse("https://public.api.bsky.app").expect("valid URI"); 210 211 // Fetch the Bluesky team account as a demo 212 let did = jacquard::types::string::Did::new_static("did:plc:z72i7hdynmk6r22z27h6tvur") 213 .expect("valid DID"); 214 let actor: jacquard::types::ident::AtIdentifier = did.into(); 215 let request = GetProfile::new().actor(actor).build(); 216 217 match xrpc_request(&channel, &base, &request) { 218 Ok(handle) => { 219 info!("request sent, polling..."); 220 pending.0 = Some(handle); 221 } 222 Err(e) => warn!("failed to send request: {e}"), 223 } 224} 225 226/// Generic poll system — works with any Pending<R> where R: XrpcResp. 227/// Returns the parsed response output when ready, or None if still waiting. 228fn poll_pending<R: XrpcResp + Send + Sync + 'static>( 229 pending: &mut Pending<R>, 230) -> Option<Result<<R as XrpcResp>::Output<SmolStr>, String>> 231where 232 for<'de> <R as XrpcResp>::Output<SmolStr>: Deserialize<'de>, 233{ 234 let Some(ref handle) = pending.0 else { 235 return None; 236 }; 237 238 let result = handle.try_recv()?; 239 pending.0 = None; 240 241 Some(match result { 242 Ok(response) => match response.into_output() { 243 Ok(output) => Ok(output), 244 Err(e) => Err(format!("parse error: {e}")), 245 }, 246 Err(e) => Err(e), 247 }) 248} 249 250/// Concrete system that uses the generic poll to handle profile responses. 251fn poll_response(mut pending: ResMut<PendingProfile>) { 252 if let Some(result) = poll_pending(&mut pending) { 253 match result { 254 Ok(profile) => { 255 let profile = profile.value; 256 let name: &str = profile 257 .display_name 258 .as_ref() 259 .map(|s| s.as_ref()) 260 .unwrap_or("(none)"); 261 let handle: &str = profile.handle.as_ref(); 262 263 info!("=== Profile fetched! ==="); 264 info!(" Display name: {name}"); 265 info!(" Handle: @{handle}"); 266 info!(" Followers: {:?}", profile.followers_count); 267 info!(" Posts: {:?}", profile.posts_count); 268 info!("Press Enter to fetch again."); 269 } 270 Err(e) => warn!("request error: {e}"), 271 } 272 } 273} 274 275// --------------------------------------------------------------------------- 276// Main 277// --------------------------------------------------------------------------- 278 279fn main() { 280 App::new() 281 .add_plugins(DefaultPlugins.set(WindowPlugin { 282 primary_window: Some(Window { 283 title: "surf XRPC channel example — press Enter to fetch a profile".into(), 284 resolution: WindowResolution::new(400, 200), 285 ..default() 286 }), 287 ..default() 288 })) 289 .init_resource::<PendingProfile>() 290 .init_resource::<FetchRequested>() 291 .add_systems(Startup, setup_channel) 292 .add_systems(Update, (trigger_fetch, send_request, poll_response).chain()) 293 .run(); 294}