+4
-3
crates/jacquard-common/src/stream.rs
+4
-3
crates/jacquard-common/src/stream.rs
···
44
45
use std::error::Error;
46
use std::fmt;
47
48
/// Boxed error type for streaming operations
49
pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
···
237
238
/// Platform-agnostic byte sink abstraction
239
pub struct ByteSink {
240
-
inner: Box<dyn n0_future::Sink<Bytes, Error = StreamError>>,
241
}
242
243
impl ByteSink {
···
247
S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
248
{
249
Self {
250
-
inner: Box::new(sink),
251
}
252
}
253
254
/// Convert into the inner boxed sink
255
-
pub fn into_inner(self) -> Box<dyn n0_future::Sink<Bytes, Error = StreamError>> {
256
self.inner
257
}
258
}
···
44
45
use std::error::Error;
46
use std::fmt;
47
+
use std::pin::Pin;
48
49
/// Boxed error type for streaming operations
50
pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
···
238
239
/// Platform-agnostic byte sink abstraction
240
pub struct ByteSink {
241
+
inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>,
242
}
243
244
impl ByteSink {
···
248
S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
249
{
250
Self {
251
+
inner: Box::pin(sink),
252
}
253
}
254
255
/// Convert into the inner boxed sink
256
+
pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> {
257
self.inner
258
}
259
}
+42
-6
crates/jacquard-common/src/websocket.rs
+42
-6
crates/jacquard-common/src/websocket.rs
···
4
use crate::stream::StreamError;
5
use bytes::Bytes;
6
use n0_future::Stream;
7
-
use n0_future::stream::Boxed;
8
use std::borrow::Borrow;
9
use std::fmt::{self, Display};
10
use std::future::Future;
11
use std::ops::Deref;
12
use url::Url;
13
14
/// UTF-8 validated bytes for WebSocket text messages
···
282
}
283
284
/// WebSocket message stream
285
-
pub struct WsStream(Boxed<Result<WsMessage, StreamError>>);
286
287
impl WsStream {
288
/// Create a new message stream
···
304
}
305
306
/// Convert into the inner pinned boxed stream
307
-
pub fn into_inner(self) -> Boxed<Result<WsMessage, StreamError>> {
308
self.0
309
}
310
···
358
}
359
360
/// WebSocket message sink
361
-
pub struct WsSink(Box<dyn n0_future::Sink<WsMessage, Error = StreamError>>);
362
363
impl WsSink {
364
/// Create a new message sink
365
pub fn new<S>(sink: S) -> Self
366
where
367
S: n0_future::Sink<WsMessage, Error = StreamError> + Send + 'static,
368
{
369
-
Self(Box::new(sink))
370
}
371
372
/// Convert into the inner boxed sink
373
-
pub fn into_inner(self) -> Box<dyn n0_future::Sink<WsMessage, Error = StreamError>> {
374
self.0
375
}
376
}
···
4
use crate::stream::StreamError;
5
use bytes::Bytes;
6
use n0_future::Stream;
7
use std::borrow::Borrow;
8
use std::fmt::{self, Display};
9
use std::future::Future;
10
use std::ops::Deref;
11
+
use std::pin::Pin;
12
use url::Url;
13
14
/// UTF-8 validated bytes for WebSocket text messages
···
282
}
283
284
/// WebSocket message stream
285
+
#[cfg(not(target_arch = "wasm32"))]
286
+
pub struct WsStream(Pin<Box<dyn Stream<Item = Result<WsMessage, StreamError>> + Send>>);
287
+
288
+
/// WebSocket message stream
289
+
#[cfg(target_arch = "wasm32")]
290
+
pub struct WsStream(Pin<Box<dyn Stream<Item = Result<WsMessage, StreamError>>>>);
291
292
impl WsStream {
293
/// Create a new message stream
···
309
}
310
311
/// Convert into the inner pinned boxed stream
312
+
#[cfg(not(target_arch = "wasm32"))]
313
+
pub fn into_inner(self) -> Pin<Box<dyn Stream<Item = Result<WsMessage, StreamError>> + Send>> {
314
+
self.0
315
+
}
316
+
317
+
/// Convert into the inner pinned boxed stream
318
+
#[cfg(target_arch = "wasm32")]
319
+
pub fn into_inner(self) -> Pin<Box<dyn Stream<Item = Result<WsMessage, StreamError>>>> {
320
self.0
321
}
322
···
370
}
371
372
/// WebSocket message sink
373
+
#[cfg(not(target_arch = "wasm32"))]
374
+
pub struct WsSink(Pin<Box<dyn n0_future::Sink<WsMessage, Error = StreamError> + Send>>);
375
+
376
+
/// WebSocket message sink
377
+
#[cfg(target_arch = "wasm32")]
378
+
pub struct WsSink(Pin<Box<dyn n0_future::Sink<WsMessage, Error = StreamError>>>);
379
380
impl WsSink {
381
/// Create a new message sink
382
+
#[cfg(not(target_arch = "wasm32"))]
383
pub fn new<S>(sink: S) -> Self
384
where
385
S: n0_future::Sink<WsMessage, Error = StreamError> + Send + 'static,
386
{
387
+
Self(Box::pin(sink))
388
+
}
389
+
390
+
/// Create a new message sink
391
+
#[cfg(target_arch = "wasm32")]
392
+
pub fn new<S>(sink: S) -> Self
393
+
where
394
+
S: n0_future::Sink<WsMessage, Error = StreamError> + 'static,
395
+
{
396
+
Self(Box::pin(sink))
397
+
}
398
+
399
+
/// Convert into the inner boxed sink
400
+
#[cfg(not(target_arch = "wasm32"))]
401
+
pub fn into_inner(
402
+
self,
403
+
) -> Pin<Box<dyn n0_future::Sink<WsMessage, Error = StreamError> + Send>> {
404
+
self.0
405
}
406
407
/// Convert into the inner boxed sink
408
+
#[cfg(target_arch = "wasm32")]
409
+
pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<WsMessage, Error = StreamError>>> {
410
self.0
411
}
412
}
+146
-2
crates/jacquard-common/src/xrpc/subscription.rs
+146
-2
crates/jacquard-common/src/xrpc/subscription.rs
···
3
//! This module defines traits and types for typed WebSocket subscriptions,
4
//! mirroring the request/response pattern used for HTTP XRPC endpoints.
5
6
use n0_future::stream::Boxed;
7
use serde::{Deserialize, Serialize};
8
use std::error::Error;
9
use std::future::Future;
···
258
259
let (tx, rx) = self.connection.split();
260
261
let stream = match S::ENCODING {
262
MessageEncoding::Json => rx
263
.into_inner()
···
269
.boxed(),
270
};
271
272
(tx, stream)
273
}
274
···
288
serde_ipld_dagcbor::from_slice(bytes)
289
}
290
291
let stream = match S::ENCODING {
292
MessageEncoding::Json => rx
293
.into_inner()
···
343
.boxed(),
344
};
345
346
(tx, stream)
347
}
348
···
361
serde_ipld_dagcbor::from_slice(bytes)
362
}
363
364
let stream = match S::ENCODING {
365
MessageEncoding::Json => rx
366
.into_inner()
···
416
.boxed(),
417
};
418
419
(tx, stream)
420
}
421
···
442
// Put the raw stream back
443
*rx = raw_rx;
444
445
-
match S::ENCODING {
446
MessageEncoding::Json => typed_rx_source
447
.into_inner()
448
.filter_map(|msg| decode_json_msg::<S>(msg))
···
451
.into_inner()
452
.filter_map(|msg| decode_cbor_msg::<S>(msg))
453
.boxed(),
454
-
}
455
}
456
}
457
···
3
//! This module defines traits and types for typed WebSocket subscriptions,
4
//! mirroring the request/response pattern used for HTTP XRPC endpoints.
5
6
+
#[cfg(not(target_arch = "wasm32"))]
7
use n0_future::stream::Boxed;
8
+
#[cfg(target_arch = "wasm32")]
9
+
use n0_future::stream::BoxedLocal as Boxed;
10
use serde::{Deserialize, Serialize};
11
use std::error::Error;
12
use std::future::Future;
···
261
262
let (tx, rx) = self.connection.split();
263
264
+
#[cfg(not(target_arch = "wasm32"))]
265
let stream = match S::ENCODING {
266
MessageEncoding::Json => rx
267
.into_inner()
···
273
.boxed(),
274
};
275
276
+
#[cfg(target_arch = "wasm32")]
277
+
let stream = match S::ENCODING {
278
+
MessageEncoding::Json => rx
279
+
.into_inner()
280
+
.filter_map(|msg| decode_json_msg::<S>(msg))
281
+
.boxed_local(),
282
+
MessageEncoding::DagCbor => rx
283
+
.into_inner()
284
+
.filter_map(|msg| decode_cbor_msg::<S>(msg))
285
+
.boxed_local(),
286
+
};
287
+
288
(tx, stream)
289
}
290
···
304
serde_ipld_dagcbor::from_slice(bytes)
305
}
306
307
+
#[cfg(not(target_arch = "wasm32"))]
308
let stream = match S::ENCODING {
309
MessageEncoding::Json => rx
310
.into_inner()
···
360
.boxed(),
361
};
362
363
+
#[cfg(target_arch = "wasm32")]
364
+
let stream = match S::ENCODING {
365
+
MessageEncoding::Json => rx
366
+
.into_inner()
367
+
.filter_map(|msg_result| match msg_result {
368
+
Ok(WsMessage::Text(text)) => Some(
369
+
parse_msg(text.as_ref())
370
+
.map(|v| v.into_static())
371
+
.map_err(StreamError::decode),
372
+
),
373
+
Ok(WsMessage::Binary(bytes)) => {
374
+
#[cfg(feature = "zstd")]
375
+
{
376
+
match decompress_zstd(&bytes) {
377
+
Ok(decompressed) => Some(
378
+
parse_msg(&decompressed)
379
+
.map(|v| v.into_static())
380
+
.map_err(StreamError::decode),
381
+
),
382
+
Err(_) => Some(
383
+
parse_msg(&bytes)
384
+
.map(|v| v.into_static())
385
+
.map_err(StreamError::decode),
386
+
),
387
+
}
388
+
}
389
+
#[cfg(not(feature = "zstd"))]
390
+
{
391
+
Some(
392
+
parse_msg(&bytes)
393
+
.map(|v| v.into_static())
394
+
.map_err(StreamError::decode),
395
+
)
396
+
}
397
+
}
398
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
399
+
Err(e) => Some(Err(e)),
400
+
})
401
+
.boxed_local(),
402
+
MessageEncoding::DagCbor => rx
403
+
.into_inner()
404
+
.filter_map(|msg_result| match msg_result {
405
+
Ok(WsMessage::Binary(bytes)) => Some(
406
+
parse_cbor(&bytes)
407
+
.map(|v| v.into_static())
408
+
.map_err(|e| StreamError::decode(crate::error::DecodeError::from(e))),
409
+
),
410
+
Ok(WsMessage::Text(_)) => Some(Err(StreamError::wrong_message_format(
411
+
"expected binary frame for CBOR, got text",
412
+
))),
413
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
414
+
Err(e) => Some(Err(e)),
415
+
})
416
+
.boxed_local(),
417
+
};
418
+
419
(tx, stream)
420
}
421
···
434
serde_ipld_dagcbor::from_slice(bytes)
435
}
436
437
+
#[cfg(not(target_arch = "wasm32"))]
438
let stream = match S::ENCODING {
439
MessageEncoding::Json => rx
440
.into_inner()
···
490
.boxed(),
491
};
492
493
+
#[cfg(target_arch = "wasm32")]
494
+
let stream = match S::ENCODING {
495
+
MessageEncoding::Json => rx
496
+
.into_inner()
497
+
.filter_map(|msg_result| match msg_result {
498
+
Ok(WsMessage::Text(text)) => Some(
499
+
parse_msg(text.as_ref())
500
+
.map(|v| v.into_static())
501
+
.map_err(StreamError::decode),
502
+
),
503
+
Ok(WsMessage::Binary(bytes)) => {
504
+
#[cfg(feature = "zstd")]
505
+
{
506
+
match decompress_zstd(&bytes) {
507
+
Ok(decompressed) => Some(
508
+
parse_msg(&decompressed)
509
+
.map(|v| v.into_static())
510
+
.map_err(StreamError::decode),
511
+
),
512
+
Err(_) => Some(
513
+
parse_msg(&bytes)
514
+
.map(|v| v.into_static())
515
+
.map_err(StreamError::decode),
516
+
),
517
+
}
518
+
}
519
+
#[cfg(not(feature = "zstd"))]
520
+
{
521
+
Some(
522
+
parse_msg(&bytes)
523
+
.map(|v| v.into_static())
524
+
.map_err(StreamError::decode),
525
+
)
526
+
}
527
+
}
528
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
529
+
Err(e) => Some(Err(e)),
530
+
})
531
+
.boxed_local(),
532
+
MessageEncoding::DagCbor => rx
533
+
.into_inner()
534
+
.filter_map(|msg_result| match msg_result {
535
+
Ok(WsMessage::Binary(bytes)) => Some(
536
+
parse_cbor(&bytes)
537
+
.map(|v| v.into_static())
538
+
.map_err(|e| StreamError::decode(crate::error::DecodeError::from(e))),
539
+
),
540
+
Ok(WsMessage::Text(_)) => Some(Err(StreamError::wrong_message_format(
541
+
"expected binary frame for CBOR, got text",
542
+
))),
543
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
544
+
Err(e) => Some(Err(e)),
545
+
})
546
+
.boxed_local(),
547
+
};
548
+
549
(tx, stream)
550
}
551
···
572
// Put the raw stream back
573
*rx = raw_rx;
574
575
+
#[cfg(not(target_arch = "wasm32"))]
576
+
let stream = match S::ENCODING {
577
MessageEncoding::Json => typed_rx_source
578
.into_inner()
579
.filter_map(|msg| decode_json_msg::<S>(msg))
···
582
.into_inner()
583
.filter_map(|msg| decode_cbor_msg::<S>(msg))
584
.boxed(),
585
+
};
586
+
587
+
#[cfg(target_arch = "wasm32")]
588
+
let stream = match S::ENCODING {
589
+
MessageEncoding::Json => typed_rx_source
590
+
.into_inner()
591
+
.filter_map(|msg| decode_json_msg::<S>(msg))
592
+
.boxed_local(),
593
+
MessageEncoding::DagCbor => typed_rx_source
594
+
.into_inner()
595
+
.filter_map(|msg| decode_cbor_msg::<S>(msg))
596
+
.boxed_local(),
597
+
};
598
+
stream
599
}
600
}
601
+1
crates/jacquard/Cargo.toml
+1
crates/jacquard/Cargo.toml
+4
-6
examples/subscribe_jetstream.rs
+4
-6
examples/subscribe_jetstream.rs
···
85
let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
86
87
// Subscribe with no filters (firehose mode)
88
-
let mut params_builder = JetstreamParams::new();
89
-
90
// Enable compression if zstd feature is available
91
#[cfg(feature = "zstd")]
92
-
{
93
-
params_builder = params_builder.compress(true);
94
-
}
95
96
-
let params = params_builder.build();
97
let stream = client.subscribe(¶ms).await.into_diagnostic()?;
98
99
println!("Connected! Streaming messages (Ctrl-C to stop)...\n");
···
85
let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
86
87
// Subscribe with no filters (firehose mode)
88
// Enable compression if zstd feature is available
89
#[cfg(feature = "zstd")]
90
+
let params = { JetstreamParams::new().compress(true).build() };
91
92
+
#[cfg(not(feature = "zstd"))]
93
+
let params = { JetstreamParams::new().build() };
94
+
95
let stream = client.subscribe(¶ms).await.into_diagnostic()?;
96
97
println!("Connected! Streaming messages (Ctrl-C to stop)...\n");
+1
-1
justfile
+1
-1
justfile