Live video on the AT Protocol
1use std::{
2 fmt::{self, Debug},
3 ops::Bound,
4 pin::Pin,
5 sync::Arc,
6 time::Duration,
7};
8
9use bytes::Bytes;
10use n0_future::{Stream, StreamExt};
11use ref_cast::RefCast;
12use snafu::Snafu;
13use tokio::sync::Mutex;
14
15use crate::public_key::PublicKey;
16
17// the files here are just copied from iroh-smol-kv-uniffi/src/code
18mod kv {
19 mod time_bound;
20 pub use time_bound::TimeBound;
21 mod subscribe_mode;
22 pub use subscribe_mode::SubscribeMode;
23}
24pub use kv::{SubscribeMode, TimeBound};
25use util::format_bytes;
26
27/// Error creating a new database node.
28#[derive(Debug, Snafu, uniffi::Error)]
29#[snafu(module)]
30pub enum CreateError {
31 /// The provided private key is invalid (not 32 bytes).
32 PrivateKey { size: u64 },
33 /// The provided gossip topic is invalid (not 32 bytes).
34 Topic { size: u64 },
35 /// Failed to bind the iroh endpoint.
36 Bind { message: String },
37 /// Failed to subscribe to the gossip topic.
38 Subscribe { message: String },
39}
40
41/// Error joining peers.
42#[derive(Debug, Snafu, uniffi::Error)]
43#[snafu(module)]
44pub enum JoinPeersError {
45 /// Failed to parse a provided iroh node ticket.
46 Ticket { message: String },
47 /// Error during the join peers operation.
48 Irpc { message: String },
49}
50
51/// Error joining peers.
52#[derive(Debug, Snafu, uniffi::Error)]
53#[snafu(module)]
54pub enum ParseError {
55 /// Failed to parse a provided iroh node ticket.
56 Ticket { message: String },
57}
58
59/// Error putting a value into the database.
60#[derive(Debug, Snafu, uniffi::Error)]
61#[snafu(module)]
62pub enum PutError {
63 /// Error during the put operation.
64 Irpc { message: String },
65}
66
67/// Configuration for an iroh-streamplace node.
68#[derive(uniffi::Record, Clone)]
69pub struct Config {
70 /// An Ed25519 secret key as a 32 byte array.
71 pub key: Vec<u8>,
72 /// The gossip topic to use. Must be 32 bytes.
73 ///
74 /// You can use e.g. a BLAKE3 hash of a topic string here. This can be used
75 /// as a cheap way to have a shared secret - nodes that do not know the topic
76 /// cannot connect to the swarm.
77 pub topic: Vec<u8>,
78 /// Maximum duration to wait for sending a stream piece to a peer.
79 pub max_send_duration: Duration,
80 /// Disable using relays, for tests.
81 pub disable_relay: bool,
82}
83
84#[derive(uniffi::Enum, Debug, Clone)]
85enum StreamFilter {
86 All,
87 Global,
88 Stream(Vec<u8>),
89}
90
91/// A filter for subscriptions and iteration.
92#[derive(uniffi::Object, Debug, Clone)]
93pub struct Filter {
94 scope: Option<Vec<Arc<PublicKey>>>,
95 stream: StreamFilter,
96 min_time: TimeBound,
97 max_time: TimeBound,
98}
99
100#[uniffi::export]
101impl Filter {
102 /// Creates a new filter that matches everything.
103 #[uniffi::constructor]
104 pub fn new() -> Arc<Self> {
105 Arc::new(Self {
106 scope: None,
107 stream: StreamFilter::All,
108 min_time: TimeBound::Unbounded,
109 max_time: TimeBound::Unbounded,
110 })
111 }
112
113 /// Restrict to the global namespace, no per stream data.
114 pub fn global(mut self: Arc<Self>) -> Arc<Self> {
115 let this = Arc::make_mut(&mut self);
116 this.stream = StreamFilter::Global;
117 self
118 }
119
120 /// Restrict to one specific stream, no global data.
121 pub fn stream(mut self: Arc<Self>, stream: Vec<u8>) -> Arc<Self> {
122 let this = Arc::make_mut(&mut self);
123 this.stream = StreamFilter::Stream(stream);
124 self
125 }
126
127 /// Restrict to a set of scopes.
128 pub fn scopes(mut self: Arc<Self>, scopes: Vec<Arc<PublicKey>>) -> Arc<Self> {
129 let this = Arc::make_mut(&mut self);
130 this.scope = Some(scopes);
131 self
132 }
133
134 /// Restrict to a single scope.
135 pub fn scope(self: Arc<Self>, scope: Arc<PublicKey>) -> Arc<Self> {
136 self.scopes(vec![scope])
137 }
138
139 /// Restrict to a time range.
140 pub fn timestamps(mut self: Arc<Self>, min: TimeBound, max: TimeBound) -> Arc<Self> {
141 let this = Arc::make_mut(&mut self);
142 this.min_time = min;
143 this.max_time = max;
144 self
145 }
146
147 /// Restrict to a time range given in nanoseconds since unix epoch.
148 pub fn time_range(self: Arc<Self>, min: u64, max: u64) -> Arc<Self> {
149 self.timestamps(TimeBound::Included(min), TimeBound::Excluded(max))
150 }
151
152 /// Restrict to a time range starting at min, unbounded at the top.
153 pub fn time_from(self: Arc<Self>, min: u64) -> Arc<Self> {
154 self.timestamps(TimeBound::Included(min), TimeBound::Unbounded)
155 }
156}
157
158impl From<Filter> for iroh_smol_kv::Filter {
159 fn from(value: Filter) -> Self {
160 let mut filter = iroh_smol_kv::Filter::ALL;
161 match value.stream {
162 // everything
163 StreamFilter::All => {}
164 // everything starting with 'g', for the global namespace
165 StreamFilter::Global => {
166 filter = filter.key_prefix(b"g".as_ref());
167 }
168 // a specific stream, everything starting with 's' + escaped stream name
169 StreamFilter::Stream(t) => {
170 let prefix = util::encode_stream_and_key(Some(&t), &[]);
171 filter = filter.key_prefix(prefix);
172 }
173 };
174 filter = filter.timestamps_nanos((
175 Bound::<u64>::from(value.min_time),
176 Bound::<u64>::from(value.max_time),
177 ));
178 if let Some(scopes) = value.scope {
179 let keys = scopes.iter().map(|k| iroh::PublicKey::from(k.as_ref()));
180 filter = filter.scopes(keys);
181 }
182 filter
183 }
184}
185
186/// Error getting the next item from a subscription.
187#[derive(uniffi::Enum, Snafu, Debug)]
188#[snafu(module)]
189pub enum SubscribeNextError {
190 /// Error during the subscribe next operation.
191 Irpc { message: String },
192}
193
194/// Error getting the next item from a subscription.
195#[derive(uniffi::Enum, Snafu, Debug)]
196#[snafu(module)]
197pub enum WriteError {
198 /// The provided private key is invalid (not 32 bytes).
199 PrivateKeySize { size: u64 },
200}
201
202/// Error shutting down the database.
203///
204/// This can occur if the db is already shut down or if there is an internal error.
205#[derive(uniffi::Enum, Snafu, Debug)]
206#[snafu(module)]
207pub enum ShutdownError {
208 /// Error during the shutdown operation.
209 Irpc { message: String },
210}
211
212/// An entry returned from the database.
213#[derive(uniffi::Record, Debug, PartialEq, Eq)]
214pub struct Entry {
215 scope: Arc<PublicKey>,
216 stream: Option<Vec<u8>>,
217 key: Vec<u8>,
218 value: Vec<u8>,
219 timestamp: u64,
220}
221
222/// An item returned from a subscription.
223#[derive(uniffi::Enum, Debug)]
224pub enum SubscribeItem {
225 Entry {
226 scope: Arc<PublicKey>,
227 stream: Option<Vec<u8>>,
228 key: Vec<u8>,
229 value: Vec<u8>,
230 timestamp: u64,
231 },
232 CurrentDone,
233 Expired {
234 scope: Arc<PublicKey>,
235 stream: Option<Vec<u8>>,
236 key: Vec<u8>,
237 timestamp: u64,
238 },
239 Other,
240}
241
242fn fmt_stream(stream: &Option<Vec<u8>>) -> String {
243 match stream {
244 None => "<nil>".to_string(),
245 Some(s) => format_bytes(s),
246 }
247}
248
249#[uniffi::export]
250pub fn subscribe_item_debug(item: &SubscribeItem) -> String {
251 match item {
252 SubscribeItem::Entry {
253 scope,
254 stream,
255 key,
256 value,
257 timestamp,
258 } => format!(
259 "Entry {{ scope: {}, stream: {}, key: {}, value: {}, timestamp: {} }}",
260 scope.fmt_short(),
261 fmt_stream(stream),
262 format_bytes(key),
263 format_bytes(value),
264 timestamp
265 ),
266 SubscribeItem::CurrentDone => "CurrentDone".to_string(),
267 SubscribeItem::Expired {
268 scope,
269 stream,
270 key,
271 timestamp,
272 } => format!(
273 "Expired {{ scope: {}, stream: {}, key: {}, timestamp: {} }}",
274 scope.fmt_short(),
275 fmt_stream(stream),
276 format_bytes(key),
277 timestamp
278 ),
279 SubscribeItem::Other => "Other".to_string(),
280 }
281}
282
283impl From<iroh_smol_kv::SubscribeItem> for SubscribeItem {
284 fn from(item: iroh_smol_kv::SubscribeItem) -> Self {
285 match &item {
286 iroh_smol_kv::SubscribeItem::Entry((scope, key, value)) => {
287 let Some((stream, key)) = util::decode_stream_and_key(key) else {
288 return Self::Other;
289 };
290 Self::Entry {
291 scope: Arc::new((*scope).into()),
292 stream,
293 key,
294 value: value.value.to_vec(),
295 timestamp: value.timestamp,
296 }
297 }
298 iroh_smol_kv::SubscribeItem::CurrentDone => Self::CurrentDone,
299 iroh_smol_kv::SubscribeItem::Expired((scope, topic, timestamp)) => {
300 let (stream, key) = util::decode_stream_and_key(topic).unwrap();
301 Self::Expired {
302 scope: Arc::new((*scope).into()),
303 stream,
304 key,
305 timestamp: *timestamp,
306 }
307 }
308 }
309 }
310}
311
312/// A response to a subscribe request.
313///
314/// This can be used as a stream of [`SubscribeItem`]s.
315#[derive(uniffi::Object)]
316#[uniffi::export(Debug)]
317#[allow(clippy::type_complexity)]
318pub struct SubscribeResponse {
319 inner: Mutex<
320 Pin<
321 Box<
322 dyn Stream<Item = Result<iroh_smol_kv::SubscribeItem, irpc::Error>>
323 + Send
324 + Sync
325 + 'static,
326 >,
327 >,
328 >,
329}
330
331impl fmt::Debug for SubscribeResponse {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 f.debug_struct("SubscribeResponse").finish_non_exhaustive()
334 }
335}
336
337#[uniffi::export]
338impl SubscribeResponse {
339 pub async fn next_raw(&self) -> Result<Option<SubscribeItem>, SubscribeNextError> {
340 let mut this = self.inner.lock().await;
341 match this.as_mut().next().await {
342 None => Ok(None),
343 Some(Ok(item)) => Ok(Some(item.into())),
344 Some(Err(e)) => Err(SubscribeNextError::Irpc {
345 message: e.to_string(),
346 }),
347 }
348 }
349}
350
351/// Options for subscribing.
352///
353/// `filter` specifies what to subscribe to.
354/// `mode` specifies whether to get current items, new items, or both.
355#[derive(uniffi::Record)]
356pub struct SubscribeOpts {
357 pub filter: Arc<Filter>,
358 pub mode: SubscribeMode,
359}
360
361impl From<SubscribeOpts> for iroh_smol_kv::Subscribe {
362 fn from(opts: SubscribeOpts) -> Self {
363 iroh_smol_kv::Subscribe {
364 filter: opts.filter.as_ref().clone().into(),
365 mode: opts.mode.into(),
366 }
367 }
368}
369
370/// A write scope that can be used to put values into the database.
371///
372/// The default write scope is available from the [`Node::node_scope`] method.
373#[derive(Clone, Debug, RefCast, uniffi::Object)]
374#[repr(transparent)]
375pub struct WriteScope(iroh_smol_kv::WriteScope);
376
377#[uniffi::export]
378impl WriteScope {
379 pub async fn put(
380 &self,
381 stream: Option<Vec<u8>>,
382 key: Vec<u8>,
383 value: Vec<u8>,
384 ) -> Result<(), PutError> {
385 self.put_impl(stream, key, value.into())
386 .await
387 .map_err(|e| PutError::Irpc {
388 message: e.to_string(),
389 })
390 }
391}
392
393impl WriteScope {
394 pub fn new(inner: iroh_smol_kv::WriteScope) -> Self {
395 Self(inner)
396 }
397
398 /// Put a value into the database, optionally in a specific stream.
399 pub async fn put_impl(
400 &self,
401 stream: Option<impl AsRef<[u8]>>,
402 key: impl AsRef<[u8]>,
403 value: Bytes,
404 ) -> Result<(), irpc::Error> {
405 let key = key.as_ref();
406 let stream = stream.as_ref().map(|s| s.as_ref());
407 let encoded = util::encode_stream_and_key(stream, key);
408 self.0.put(encoded, value).await?;
409 Ok(())
410 }
411}
412
413/// Iroh-streamplace specific metadata database.
414#[derive(Debug, Clone, RefCast, uniffi::Object)]
415#[repr(transparent)]
416pub struct Db(iroh_smol_kv::Client);
417
418impl Db {
419 pub fn new(inner: iroh_smol_kv::Client) -> Self {
420 Self(inner)
421 }
422
423 pub fn inner(&self) -> &iroh_smol_kv::Client {
424 &self.0
425 }
426}
427
428#[uniffi::export]
429impl Db {
430 pub fn write(&self, secret: Vec<u8>) -> Result<Arc<WriteScope>, WriteError> {
431 let secret = iroh::SecretKey::from_bytes(&secret.try_into().map_err(|e: Vec<u8>| {
432 WriteError::PrivateKeySize {
433 size: e.len() as u64,
434 }
435 })?);
436 let write = self.0.write(secret);
437 Ok(Arc::new(WriteScope::new(write)))
438 }
439
440 pub async fn iter_with_opts(
441 &self,
442 filter: Arc<Filter>,
443 ) -> Result<Vec<Entry>, SubscribeNextError> {
444 let sub = self.subscribe_with_opts(SubscribeOpts {
445 filter,
446 mode: SubscribeMode::Current,
447 });
448 let mut items = Vec::new();
449 while let Some(item) = sub.next_raw().await? {
450 match item {
451 SubscribeItem::Entry {
452 scope,
453 stream,
454 key,
455 value,
456 timestamp,
457 } => {
458 items.push(Entry {
459 scope,
460 stream,
461 key,
462 value,
463 timestamp,
464 });
465 }
466 _ => unreachable!("we used SubscribeMode::Current, so we should only get entries"),
467 }
468 }
469 Ok(items)
470 }
471
472 pub fn subscribe(&self, filter: Arc<Filter>) -> Arc<SubscribeResponse> {
473 self.subscribe_with_opts(SubscribeOpts {
474 filter,
475 mode: SubscribeMode::Both,
476 })
477 }
478
479 /// Subscribe with options.
480 pub fn subscribe_with_opts(&self, opts: SubscribeOpts) -> Arc<SubscribeResponse> {
481 Arc::new(SubscribeResponse {
482 inner: Mutex::new(Box::pin(
483 self.0.subscribe_with_opts(opts.into()).stream_raw(),
484 )),
485 })
486 }
487
488 /// Shutdown the database client and all subscriptions.
489 pub async fn shutdown(&self) -> Result<(), ShutdownError> {
490 Ok(self.0.shutdown().await.map_err(|e| ShutdownError::Irpc {
491 message: e.to_string(),
492 })?)
493 }
494}
495
496mod util {
497
498 pub fn encode_stream_and_key(stream: Option<&[u8]>, key: &[u8]) -> Vec<u8> {
499 let mut result = Vec::new();
500 if let Some(s) = stream {
501 result.push(b's');
502 postcard::to_io(s, &mut result).unwrap();
503 } else {
504 result.push(b'g');
505 }
506 result.extend(key);
507 result
508 }
509
510 pub fn decode_stream_and_key(encoded: &[u8]) -> Option<(Option<Vec<u8>>, Vec<u8>)> {
511 match encoded.split_first() {
512 Some((b's', mut rest)) => {
513 let (stream, rest) = postcard::take_from_bytes(&mut rest).ok()?;
514 Some((Some(stream), rest.to_vec()))
515 }
516 Some((b'g', rest)) => Some((None, rest.to_vec())),
517 _ => None,
518 }
519 }
520
521 pub fn format_bytes(bytes: &[u8]) -> String {
522 if bytes.is_empty() {
523 return "\"\"".to_string();
524 }
525 let Ok(s) = std::str::from_utf8(bytes) else {
526 return hex::encode(bytes);
527 };
528 if s.chars()
529 .any(|c| c.is_control() && c != '\n' && c != '\t' && c != '\r')
530 {
531 return hex::encode(bytes);
532 }
533 format!("\"{}\"", escape_string(s))
534 }
535
536 pub fn escape_string(s: &str) -> String {
537 s.chars()
538 .map(|c| match c {
539 '"' => "\\\"".to_string(),
540 '\\' => "\\\\".to_string(),
541 '\n' => "\\n".to_string(),
542 '\t' => "\\t".to_string(),
543 '\r' => "\\r".to_string(),
544 c => c.to_string(),
545 })
546 .collect()
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::util;
553
554 #[test]
555 fn escape_unescape() {
556 let cases: Vec<(Option<&[u8]>, &[u8])> = vec![
557 (None, b"key1"),
558 (Some(b""), b""),
559 (Some(b""), b"a"),
560 (Some(b"a"), b""),
561 ];
562 for (stream, key) in cases {
563 let encoded = util::encode_stream_and_key(stream, key);
564 let (decoded_stream, decoded_key) = util::decode_stream_and_key(&encoded).unwrap();
565 assert_eq!(decoded_stream.as_deref(), stream);
566 assert_eq!(decoded_key.as_slice(), key);
567 }
568 }
569}