A better Rust ATProto crate
1//! Stream abstractions for HTTP request/response bodies
2//!
3//! This module provides platform-agnostic streaming types for handling large
4//! payloads efficiently without loading everything into memory.
5//!
6//! # Features
7//!
8//! - [`ByteStream`]: Streaming response bodies
9//! - [`ByteSink`]: Streaming request bodies
10//! - [`StreamError`]: Concrete error type for streaming operations
11//!
12//! # Platform Support
13//!
14//! Uses `n0-future` for platform-agnostic async streams that work on both
15//! native and WASM targets without requiring `Send` bounds on WASM.
16//!
17//! # Examples
18//!
19//! ## Streaming Download
20//!
21//! ```no_run
22//! # #[cfg(all(feature = "streaming", feature = "reqwest-client"))]
23//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24//! use jacquard_common::http_client::{HttpClient, HttpClientExt};
25//! use futures_lite::StreamExt;
26//!
27//! let client = reqwest::Client::new();
28//! let request = http::Request::builder()
29//! .uri("https://example.com/large-file")
30//! .body(vec![])
31//! .unwrap();
32//!
33//! let response = client.send_http_streaming(request).await?;
34//! let (_parts, body) = response.into_parts();
35//! let mut stream = Box::pin(body.into_inner());
36//!
37//! // Use futures_lite::StreamExt for iteration
38//! while let Some(chunk) = stream.as_mut().try_next().await? {
39//! // Process chunk without loading entire file into memory
40//! }
41//! # Ok(())
42//! # }
43//! ```
44
45use alloc::boxed::Box;
46use alloc::string::String;
47use core::error::Error;
48use core::fmt;
49use core::pin::Pin;
50
51/// Boxed error type for streaming operations
52pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
53
54/// Error type for streaming operations
55#[derive(Debug, thiserror::Error, miette::Diagnostic)]
56pub struct StreamError {
57 kind: StreamErrorKind,
58 #[source]
59 source: Option<BoxError>,
60}
61
62/// Categories of streaming errors
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64#[non_exhaustive]
65pub enum StreamErrorKind {
66 /// Network or I/O error
67 Transport,
68 /// Stream or connection closed
69 Closed,
70 /// Protocol violation or framing error
71 Protocol,
72 /// Message deserialization failed
73 Decode,
74 /// Message serialization failed
75 Encode,
76 /// Wrong message format (e.g., text frame when expecting binary)
77 WrongMessageFormat,
78}
79
80impl StreamError {
81 /// Create a new streaming error
82 pub fn new(kind: StreamErrorKind, source: Option<BoxError>) -> Self {
83 Self { kind, source }
84 }
85
86 /// Get the error kind
87 pub fn kind(&self) -> &StreamErrorKind {
88 &self.kind
89 }
90
91 /// Get the underlying error source
92 pub fn source(&self) -> Option<&BoxError> {
93 self.source.as_ref()
94 }
95
96 /// Create a "connection closed" error
97 pub fn closed() -> Self {
98 Self {
99 kind: StreamErrorKind::Closed,
100 source: None,
101 }
102 }
103
104 /// Create a transport error with source
105 pub fn transport(source: impl Error + Send + Sync + 'static) -> Self {
106 Self {
107 kind: StreamErrorKind::Transport,
108 source: Some(Box::new(source)),
109 }
110 }
111
112 /// Create a protocol error
113 pub fn protocol(msg: impl Into<String>) -> Self {
114 Self {
115 kind: StreamErrorKind::Protocol,
116 source: Some(msg.into().into()),
117 }
118 }
119
120 /// Create a decode error with source
121 pub fn decode(source: impl Error + Send + Sync + 'static) -> Self {
122 Self {
123 kind: StreamErrorKind::Decode,
124 source: Some(Box::new(source)),
125 }
126 }
127
128 /// Create an encode error with source
129 pub fn encode(source: impl Error + Send + Sync + 'static) -> Self {
130 Self {
131 kind: StreamErrorKind::Encode,
132 source: Some(Box::new(source)),
133 }
134 }
135
136 /// Create a wrong message format error
137 pub fn wrong_message_format(msg: impl Into<String>) -> Self {
138 Self {
139 kind: StreamErrorKind::WrongMessageFormat,
140 source: Some(msg.into().into()),
141 }
142 }
143}
144
145impl fmt::Display for StreamError {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 match self.kind {
148 StreamErrorKind::Transport => write!(f, "Transport error"),
149 StreamErrorKind::Closed => write!(f, "Stream closed"),
150 StreamErrorKind::Protocol => write!(f, "Protocol error"),
151 StreamErrorKind::Decode => write!(f, "Decode error"),
152 StreamErrorKind::Encode => write!(f, "Encode error"),
153 StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"),
154 }?;
155
156 if let Some(source) = &self.source {
157 write!(f, ": {}", source)?;
158 }
159
160 Ok(())
161 }
162}
163
164use bytes::Bytes;
165
166/// Boxed stream type with proper Send bounds for native, no Send for WASM
167#[cfg(not(target_arch = "wasm32"))]
168type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>;
169
170/// Boxed stream type without Send bound for WASM
171#[cfg(target_arch = "wasm32")]
172type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>;
173
174/// Platform-agnostic byte stream abstraction
175pub struct ByteStream {
176 inner: Boxed<Result<Bytes, StreamError>>,
177}
178
179impl ByteStream {
180 /// Create a new byte stream from any compatible stream
181 #[cfg(not(target_arch = "wasm32"))]
182 pub fn new<S>(stream: S) -> Self
183 where
184 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static,
185 {
186 Self {
187 inner: Box::pin(stream),
188 }
189 }
190
191 /// Create a new byte stream from any compatible stream (WASM)
192 #[cfg(target_arch = "wasm32")]
193 pub fn new<S>(stream: S) -> Self
194 where
195 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static,
196 {
197 Self {
198 inner: Box::pin(stream),
199 }
200 }
201
202 /// Check if stream is known to be empty (always false for dynamic streams)
203 pub fn is_empty(&self) -> bool {
204 false
205 }
206
207 /// Convert into the inner boxed stream
208 pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> {
209 self.inner
210 }
211
212 /// Split this stream into two streams that both receive all chunks
213 ///
214 /// Chunks are cloned (cheaply via Bytes rc). Spawns a forwarder task.
215 /// Both returned streams will receive all chunks from the original stream.
216 /// The forwarder continues as long as at least one stream is alive.
217 /// If the underlying stream errors, both teed streams will end.
218 pub fn tee(self) -> (ByteStream, ByteStream) {
219 use futures::channel::mpsc;
220 use n0_future::StreamExt as _;
221
222 let (tx1, rx1) = mpsc::unbounded();
223 let (tx2, rx2) = mpsc::unbounded();
224
225 n0_future::task::spawn(async move {
226 let mut stream = self.inner;
227 while let Some(result) = stream.next().await {
228 match result {
229 Ok(chunk) => {
230 // Clone chunk (cheap - Bytes is rc'd)
231 let chunk2 = chunk.clone();
232
233 // Send to both channels, continue if at least one succeeds
234 let send1 = tx1.unbounded_send(Ok(chunk));
235 let send2 = tx2.unbounded_send(Ok(chunk2));
236
237 // Only stop if both channels are closed
238 if send1.is_err() && send2.is_err() {
239 break;
240 }
241 }
242 Err(_e) => {
243 // Underlying stream errored, stop forwarding.
244 // Both channels will close, ending both streams.
245 break;
246 }
247 }
248 }
249 });
250
251 (ByteStream::new(rx1), ByteStream::new(rx2))
252 }
253}
254
255impl fmt::Debug for ByteStream {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 f.debug_struct("ByteStream").finish_non_exhaustive()
258 }
259}
260
261/// Platform-agnostic byte sink abstraction
262pub struct ByteSink {
263 inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>,
264}
265
266impl ByteSink {
267 /// Create a new byte sink from any compatible sink
268 pub fn new<S>(sink: S) -> Self
269 where
270 S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
271 {
272 Self {
273 inner: Box::pin(sink),
274 }
275 }
276
277 /// Convert into the inner boxed sink
278 pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> {
279 self.inner
280 }
281}
282
283impl fmt::Debug for ByteSink {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 f.debug_struct("ByteSink").finish_non_exhaustive()
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use bytes::Bytes;
293
294 #[test]
295 fn stream_error_carries_kind_and_source() {
296 let source = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe closed");
297 let err = StreamError::new(StreamErrorKind::Transport, Some(Box::new(source)));
298
299 assert_eq!(err.kind(), &StreamErrorKind::Transport);
300 assert!(err.source().is_some());
301 assert_eq!(format!("{}", err), "Transport error: pipe closed");
302 }
303
304 #[test]
305 fn stream_error_without_source() {
306 let err = StreamError::closed();
307
308 assert_eq!(err.kind(), &StreamErrorKind::Closed);
309 assert!(err.source().is_none());
310 }
311
312 #[tokio::test]
313 async fn byte_stream_can_be_created() {
314 use futures::stream;
315
316 let data = vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
317 let stream = stream::iter(data);
318
319 let byte_stream = ByteStream::new(stream);
320 assert!(!byte_stream.is_empty());
321 }
322}