A better Rust ATProto crate
at main 322 lines 9.7 kB view raw
1//! Stream abstractions for HTTP request/response bodies 2//! 3//! This module provides platform-agnostic streaming types for handling large 4//! payloads efficiently without loading everything into memory. 5//! 6//! # Features 7//! 8//! - [`ByteStream`]: Streaming response bodies 9//! - [`ByteSink`]: Streaming request bodies 10//! - [`StreamError`]: Concrete error type for streaming operations 11//! 12//! # Platform Support 13//! 14//! Uses `n0-future` for platform-agnostic async streams that work on both 15//! native and WASM targets without requiring `Send` bounds on WASM. 16//! 17//! # Examples 18//! 19//! ## Streaming Download 20//! 21//! ```no_run 22//! # #[cfg(all(feature = "streaming", feature = "reqwest-client"))] 23//! # async fn example() -> Result<(), Box<dyn std::error::Error>> { 24//! use jacquard_common::http_client::{HttpClient, HttpClientExt}; 25//! use futures_lite::StreamExt; 26//! 27//! let client = reqwest::Client::new(); 28//! let request = http::Request::builder() 29//! .uri("https://example.com/large-file") 30//! .body(vec![]) 31//! .unwrap(); 32//! 33//! let response = client.send_http_streaming(request).await?; 34//! let (_parts, body) = response.into_parts(); 35//! let mut stream = Box::pin(body.into_inner()); 36//! 37//! // Use futures_lite::StreamExt for iteration 38//! while let Some(chunk) = stream.as_mut().try_next().await? { 39//! // Process chunk without loading entire file into memory 40//! } 41//! # Ok(()) 42//! # } 43//! ``` 44 45use alloc::boxed::Box; 46use alloc::string::String; 47use core::error::Error; 48use core::fmt; 49use core::pin::Pin; 50 51/// Boxed error type for streaming operations 52pub type BoxError = Box<dyn Error + Send + Sync + 'static>; 53 54/// Error type for streaming operations 55#[derive(Debug, thiserror::Error, miette::Diagnostic)] 56pub struct StreamError { 57 kind: StreamErrorKind, 58 #[source] 59 source: Option<BoxError>, 60} 61 62/// Categories of streaming errors 63#[derive(Debug, Clone, Copy, PartialEq, Eq)] 64#[non_exhaustive] 65pub enum StreamErrorKind { 66 /// Network or I/O error 67 Transport, 68 /// Stream or connection closed 69 Closed, 70 /// Protocol violation or framing error 71 Protocol, 72 /// Message deserialization failed 73 Decode, 74 /// Message serialization failed 75 Encode, 76 /// Wrong message format (e.g., text frame when expecting binary) 77 WrongMessageFormat, 78} 79 80impl StreamError { 81 /// Create a new streaming error 82 pub fn new(kind: StreamErrorKind, source: Option<BoxError>) -> Self { 83 Self { kind, source } 84 } 85 86 /// Get the error kind 87 pub fn kind(&self) -> &StreamErrorKind { 88 &self.kind 89 } 90 91 /// Get the underlying error source 92 pub fn source(&self) -> Option<&BoxError> { 93 self.source.as_ref() 94 } 95 96 /// Create a "connection closed" error 97 pub fn closed() -> Self { 98 Self { 99 kind: StreamErrorKind::Closed, 100 source: None, 101 } 102 } 103 104 /// Create a transport error with source 105 pub fn transport(source: impl Error + Send + Sync + 'static) -> Self { 106 Self { 107 kind: StreamErrorKind::Transport, 108 source: Some(Box::new(source)), 109 } 110 } 111 112 /// Create a protocol error 113 pub fn protocol(msg: impl Into<String>) -> Self { 114 Self { 115 kind: StreamErrorKind::Protocol, 116 source: Some(msg.into().into()), 117 } 118 } 119 120 /// Create a decode error with source 121 pub fn decode(source: impl Error + Send + Sync + 'static) -> Self { 122 Self { 123 kind: StreamErrorKind::Decode, 124 source: Some(Box::new(source)), 125 } 126 } 127 128 /// Create an encode error with source 129 pub fn encode(source: impl Error + Send + Sync + 'static) -> Self { 130 Self { 131 kind: StreamErrorKind::Encode, 132 source: Some(Box::new(source)), 133 } 134 } 135 136 /// Create a wrong message format error 137 pub fn wrong_message_format(msg: impl Into<String>) -> Self { 138 Self { 139 kind: StreamErrorKind::WrongMessageFormat, 140 source: Some(msg.into().into()), 141 } 142 } 143} 144 145impl fmt::Display for StreamError { 146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 147 match self.kind { 148 StreamErrorKind::Transport => write!(f, "Transport error"), 149 StreamErrorKind::Closed => write!(f, "Stream closed"), 150 StreamErrorKind::Protocol => write!(f, "Protocol error"), 151 StreamErrorKind::Decode => write!(f, "Decode error"), 152 StreamErrorKind::Encode => write!(f, "Encode error"), 153 StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"), 154 }?; 155 156 if let Some(source) = &self.source { 157 write!(f, ": {}", source)?; 158 } 159 160 Ok(()) 161 } 162} 163 164use bytes::Bytes; 165 166/// Boxed stream type with proper Send bounds for native, no Send for WASM 167#[cfg(not(target_arch = "wasm32"))] 168type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>; 169 170/// Boxed stream type without Send bound for WASM 171#[cfg(target_arch = "wasm32")] 172type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>; 173 174/// Platform-agnostic byte stream abstraction 175pub struct ByteStream { 176 inner: Boxed<Result<Bytes, StreamError>>, 177} 178 179impl ByteStream { 180 /// Create a new byte stream from any compatible stream 181 #[cfg(not(target_arch = "wasm32"))] 182 pub fn new<S>(stream: S) -> Self 183 where 184 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static, 185 { 186 Self { 187 inner: Box::pin(stream), 188 } 189 } 190 191 /// Create a new byte stream from any compatible stream (WASM) 192 #[cfg(target_arch = "wasm32")] 193 pub fn new<S>(stream: S) -> Self 194 where 195 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static, 196 { 197 Self { 198 inner: Box::pin(stream), 199 } 200 } 201 202 /// Check if stream is known to be empty (always false for dynamic streams) 203 pub fn is_empty(&self) -> bool { 204 false 205 } 206 207 /// Convert into the inner boxed stream 208 pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> { 209 self.inner 210 } 211 212 /// Split this stream into two streams that both receive all chunks 213 /// 214 /// Chunks are cloned (cheaply via Bytes rc). Spawns a forwarder task. 215 /// Both returned streams will receive all chunks from the original stream. 216 /// The forwarder continues as long as at least one stream is alive. 217 /// If the underlying stream errors, both teed streams will end. 218 pub fn tee(self) -> (ByteStream, ByteStream) { 219 use futures::channel::mpsc; 220 use n0_future::StreamExt as _; 221 222 let (tx1, rx1) = mpsc::unbounded(); 223 let (tx2, rx2) = mpsc::unbounded(); 224 225 n0_future::task::spawn(async move { 226 let mut stream = self.inner; 227 while let Some(result) = stream.next().await { 228 match result { 229 Ok(chunk) => { 230 // Clone chunk (cheap - Bytes is rc'd) 231 let chunk2 = chunk.clone(); 232 233 // Send to both channels, continue if at least one succeeds 234 let send1 = tx1.unbounded_send(Ok(chunk)); 235 let send2 = tx2.unbounded_send(Ok(chunk2)); 236 237 // Only stop if both channels are closed 238 if send1.is_err() && send2.is_err() { 239 break; 240 } 241 } 242 Err(_e) => { 243 // Underlying stream errored, stop forwarding. 244 // Both channels will close, ending both streams. 245 break; 246 } 247 } 248 } 249 }); 250 251 (ByteStream::new(rx1), ByteStream::new(rx2)) 252 } 253} 254 255impl fmt::Debug for ByteStream { 256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 257 f.debug_struct("ByteStream").finish_non_exhaustive() 258 } 259} 260 261/// Platform-agnostic byte sink abstraction 262pub struct ByteSink { 263 inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>, 264} 265 266impl ByteSink { 267 /// Create a new byte sink from any compatible sink 268 pub fn new<S>(sink: S) -> Self 269 where 270 S: n0_future::Sink<Bytes, Error = StreamError> + 'static, 271 { 272 Self { 273 inner: Box::pin(sink), 274 } 275 } 276 277 /// Convert into the inner boxed sink 278 pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> { 279 self.inner 280 } 281} 282 283impl fmt::Debug for ByteSink { 284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 285 f.debug_struct("ByteSink").finish_non_exhaustive() 286 } 287} 288 289#[cfg(test)] 290mod tests { 291 use super::*; 292 use bytes::Bytes; 293 294 #[test] 295 fn stream_error_carries_kind_and_source() { 296 let source = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe closed"); 297 let err = StreamError::new(StreamErrorKind::Transport, Some(Box::new(source))); 298 299 assert_eq!(err.kind(), &StreamErrorKind::Transport); 300 assert!(err.source().is_some()); 301 assert_eq!(format!("{}", err), "Transport error: pipe closed"); 302 } 303 304 #[test] 305 fn stream_error_without_source() { 306 let err = StreamError::closed(); 307 308 assert_eq!(err.kind(), &StreamErrorKind::Closed); 309 assert!(err.source().is_none()); 310 } 311 312 #[tokio::test] 313 async fn byte_stream_can_be_created() { 314 use futures::stream; 315 316 let data = vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))]; 317 let stream = stream::iter(data); 318 319 let byte_stream = ByteStream::new(stream); 320 assert!(!byte_stream.is_empty()); 321 } 322}