this repo has no description
1//! Example: XRPC over surf with channel-based async dispatch in Bevy
2//!
3//! Demonstrates the channel-based XRPC pattern using surf (no tokio) with
4//! Bevy's IoTaskPool. This was the original approach before switching to
5//! reqwest + bevy-tokio-tasks.
6//!
7//! Run: `cargo run --example surf_xrpc_channel`
8//!
9//! Press Enter to fetch a profile. The app polls each frame for the response.
10
11use std::error::Error;
12use std::marker::PhantomData;
13
14use bevy::prelude::*;
15use bevy::window::WindowResolution;
16use crossbeam_channel::TryRecvError;
17use jacquard::common::xrpc::{
18 self, CallOptions, XrpcRequest, XrpcResp, build_http_request, process_response,
19};
20use jacquard::deps::fluent_uri::Uri;
21use jacquard::deps::smol_str::SmolStr;
22use jacquard::http_client::HttpClient;
23
24#[derive(Debug, Clone, Default)]
25struct SurfHttpClient;
26
27#[derive(Debug)]
28struct SurfHttpError(surf::Error);
29
30impl std::fmt::Display for SurfHttpError {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 write!(f, "{}", self.0)
33 }
34}
35
36impl Error for SurfHttpError {}
37
38impl HttpClient for SurfHttpClient {
39 type Error = SurfHttpError;
40
41 async fn send_http(
42 &self,
43 request: http::Request<Vec<u8>>,
44 ) -> Result<http::Response<Vec<u8>>, Self::Error> {
45 let (parts, body) = request.into_parts();
46
47 let method: surf::http::Method = match parts.method {
48 http::Method::GET => surf::http::Method::Get,
49 http::Method::POST => surf::http::Method::Post,
50 _ => surf::http::Method::Get,
51 };
52
53 let url = surf::Url::parse(&parts.uri.to_string()).map_err(|e| {
54 SurfHttpError(surf::Error::from_str(
55 surf::StatusCode::BadRequest,
56 e.to_string(),
57 ))
58 })?;
59
60 let mut surf_req = surf::Request::new(method, url);
61 for (name, value) in parts.headers.iter() {
62 if let Ok(v) = value.to_str() {
63 surf_req.append_header(name.as_str(), v);
64 }
65 }
66 if !body.is_empty() {
67 surf_req.body_bytes(&body);
68 }
69
70 let mut resp = surf::client().send(surf_req).await.map_err(SurfHttpError)?;
71 let status = resp.status() as u16;
72 let body = resp.body_bytes().await.map_err(SurfHttpError)?;
73
74 let mut builder = http::Response::builder().status(status);
75 for (name, values) in resp.iter() {
76 builder = builder.header(name.as_str(), values.as_str());
77 }
78 builder.body(body).map_err(|e| {
79 SurfHttpError(surf::Error::from_str(
80 surf::StatusCode::InternalServerError,
81 e.to_string(),
82 ))
83 })
84 }
85}
86
87// ---------------------------------------------------------------------------
88// Channel infrastructure
89// ---------------------------------------------------------------------------
90
91struct HttpRoundtrip {
92 request: http::Request<Vec<u8>>,
93 response_tx: crossbeam_channel::Sender<Result<http::Response<Vec<u8>>, SurfHttpError>>,
94}
95
96#[derive(Resource)]
97struct XrpcChannel {
98 sender: async_channel::Sender<HttpRoundtrip>,
99}
100
101struct PendingXrpc<R> {
102 receiver: crossbeam_channel::Receiver<Result<http::Response<Vec<u8>>, SurfHttpError>>,
103 _marker: PhantomData<R>,
104}
105
106impl<R: XrpcResp> PendingXrpc<R> {
107 fn try_recv(&self) -> Option<Result<xrpc::Response<R>, String>> {
108 match self.receiver.try_recv() {
109 Ok(Ok(http_resp)) => match process_response::<R>(http_resp) {
110 Ok(resp) => Some(Ok(resp)),
111 Err(e) => Some(Err(format!("XRPC error: {e}"))),
112 },
113 Ok(Err(e)) => Some(Err(format!("HTTP error: {e}"))),
114 Err(TryRecvError::Empty) => None,
115 Err(TryRecvError::Disconnected) => Some(Err("channel disconnected".into())),
116 }
117 }
118}
119
120fn xrpc_request<R>(
121 channel: &XrpcChannel,
122 base: &Uri<&str>,
123 request: &R,
124) -> Result<PendingXrpc<R::Response>, String>
125where
126 R: XrpcRequest + serde::Serialize,
127{
128 let http_req = build_http_request(base, request, &CallOptions::default())
129 .map_err(|e| format!("build error: {e}"))?;
130
131 let (resp_tx, resp_rx) = crossbeam_channel::bounded(1);
132
133 channel
134 .sender
135 .try_send(HttpRoundtrip {
136 request: http_req,
137 response_tx: resp_tx,
138 })
139 .map_err(|e| format!("send error: {e}"))?;
140
141 Ok(PendingXrpc {
142 receiver: resp_rx,
143 _marker: PhantomData,
144 })
145}
146
147// ---------------------------------------------------------------------------
148// Bevy resources and systems
149// ---------------------------------------------------------------------------
150
151use jacquard::api::app_bsky::actor::get_profile::{GetProfile, GetProfileResponse};
152use serde::Deserialize;
153
154/// Generic resource wrapping any in-flight XRPC request.
155/// The type parameter R is the XrpcResp type for the expected response.
156#[derive(Resource)]
157struct Pending<R: XrpcResp + Send + Sync + 'static>(Option<PendingXrpc<R>>);
158
159impl<R: XrpcResp + Send + Sync + 'static> Default for Pending<R> {
160 fn default() -> Self {
161 Self(None)
162 }
163}
164
165type PendingProfile = Pending<GetProfileResponse>;
166
167#[derive(Resource, Default)]
168struct FetchRequested(bool);
169
170/// Startup: spawn the surf HTTP handler on IoTaskPool.
171fn setup_channel(mut commands: Commands) {
172 let (sender, receiver) = async_channel::bounded::<HttpRoundtrip>(64);
173 commands.insert_resource(XrpcChannel { sender });
174
175 bevy::tasks::IoTaskPool::get()
176 .spawn(async move {
177 let client = SurfHttpClient;
178 while let Ok(roundtrip) = receiver.recv().await {
179 let result = client.send_http(roundtrip.request).await;
180 if roundtrip.response_tx.send(result).is_err() {
181 warn!("response channel closed");
182 }
183 }
184 })
185 .detach();
186
187 info!("surf XRPC handler spawned on IoTaskPool");
188}
189
190/// System: press Enter to trigger a profile fetch.
191fn trigger_fetch(keys: Res<ButtonInput<KeyCode>>, mut requested: ResMut<FetchRequested>) {
192 if keys.just_pressed(KeyCode::Enter) {
193 requested.0 = true;
194 info!("Enter pressed — fetching profile...");
195 }
196}
197
198/// System: send the XRPC request when triggered.
199fn send_request(
200 channel: Res<XrpcChannel>,
201 mut pending: ResMut<PendingProfile>,
202 mut requested: ResMut<FetchRequested>,
203) {
204 if !requested.0 || pending.0.is_some() {
205 return;
206 }
207 requested.0 = false;
208
209 let base = Uri::parse("https://public.api.bsky.app").expect("valid URI");
210
211 // Fetch the Bluesky team account as a demo
212 let did = jacquard::types::string::Did::new_static("did:plc:z72i7hdynmk6r22z27h6tvur")
213 .expect("valid DID");
214 let actor: jacquard::types::ident::AtIdentifier = did.into();
215 let request = GetProfile::new().actor(actor).build();
216
217 match xrpc_request(&channel, &base, &request) {
218 Ok(handle) => {
219 info!("request sent, polling...");
220 pending.0 = Some(handle);
221 }
222 Err(e) => warn!("failed to send request: {e}"),
223 }
224}
225
226/// Generic poll system — works with any Pending<R> where R: XrpcResp.
227/// Returns the parsed response output when ready, or None if still waiting.
228fn poll_pending<R: XrpcResp + Send + Sync + 'static>(
229 pending: &mut Pending<R>,
230) -> Option<Result<<R as XrpcResp>::Output<SmolStr>, String>>
231where
232 for<'de> <R as XrpcResp>::Output<SmolStr>: Deserialize<'de>,
233{
234 let Some(ref handle) = pending.0 else {
235 return None;
236 };
237
238 let result = handle.try_recv()?;
239 pending.0 = None;
240
241 Some(match result {
242 Ok(response) => match response.into_output() {
243 Ok(output) => Ok(output),
244 Err(e) => Err(format!("parse error: {e}")),
245 },
246 Err(e) => Err(e),
247 })
248}
249
250/// Concrete system that uses the generic poll to handle profile responses.
251fn poll_response(mut pending: ResMut<PendingProfile>) {
252 if let Some(result) = poll_pending(&mut pending) {
253 match result {
254 Ok(profile) => {
255 let profile = profile.value;
256 let name: &str = profile
257 .display_name
258 .as_ref()
259 .map(|s| s.as_ref())
260 .unwrap_or("(none)");
261 let handle: &str = profile.handle.as_ref();
262
263 info!("=== Profile fetched! ===");
264 info!(" Display name: {name}");
265 info!(" Handle: @{handle}");
266 info!(" Followers: {:?}", profile.followers_count);
267 info!(" Posts: {:?}", profile.posts_count);
268 info!("Press Enter to fetch again.");
269 }
270 Err(e) => warn!("request error: {e}"),
271 }
272 }
273}
274
275// ---------------------------------------------------------------------------
276// Main
277// ---------------------------------------------------------------------------
278
279fn main() {
280 App::new()
281 .add_plugins(DefaultPlugins.set(WindowPlugin {
282 primary_window: Some(Window {
283 title: "surf XRPC channel example — press Enter to fetch a profile".into(),
284 resolution: WindowResolution::new(400, 200),
285 ..default()
286 }),
287 ..default()
288 }))
289 .init_resource::<PendingProfile>()
290 .init_resource::<FetchRequested>()
291 .add_systems(Startup, setup_channel)
292 .add_systems(Update, (trigger_fetch, send_request, poll_response).chain())
293 .run();
294}