Live video on the AT Protocol
at natb/command-errors 569 lines 16 kB view raw
1use std::{ 2 fmt::{self, Debug}, 3 ops::Bound, 4 pin::Pin, 5 sync::Arc, 6 time::Duration, 7}; 8 9use bytes::Bytes; 10use n0_future::{Stream, StreamExt}; 11use ref_cast::RefCast; 12use snafu::Snafu; 13use tokio::sync::Mutex; 14 15use crate::public_key::PublicKey; 16 17// the files here are just copied from iroh-smol-kv-uniffi/src/code 18mod kv { 19 mod time_bound; 20 pub use time_bound::TimeBound; 21 mod subscribe_mode; 22 pub use subscribe_mode::SubscribeMode; 23} 24pub use kv::{SubscribeMode, TimeBound}; 25use util::format_bytes; 26 27/// Error creating a new database node. 28#[derive(Debug, Snafu, uniffi::Error)] 29#[snafu(module)] 30pub enum CreateError { 31 /// The provided private key is invalid (not 32 bytes). 32 PrivateKey { size: u64 }, 33 /// The provided gossip topic is invalid (not 32 bytes). 34 Topic { size: u64 }, 35 /// Failed to bind the iroh endpoint. 36 Bind { message: String }, 37 /// Failed to subscribe to the gossip topic. 38 Subscribe { message: String }, 39} 40 41/// Error joining peers. 42#[derive(Debug, Snafu, uniffi::Error)] 43#[snafu(module)] 44pub enum JoinPeersError { 45 /// Failed to parse a provided iroh node ticket. 46 Ticket { message: String }, 47 /// Error during the join peers operation. 48 Irpc { message: String }, 49} 50 51/// Error joining peers. 52#[derive(Debug, Snafu, uniffi::Error)] 53#[snafu(module)] 54pub enum ParseError { 55 /// Failed to parse a provided iroh node ticket. 56 Ticket { message: String }, 57} 58 59/// Error putting a value into the database. 60#[derive(Debug, Snafu, uniffi::Error)] 61#[snafu(module)] 62pub enum PutError { 63 /// Error during the put operation. 64 Irpc { message: String }, 65} 66 67/// Configuration for an iroh-streamplace node. 68#[derive(uniffi::Record, Clone)] 69pub struct Config { 70 /// An Ed25519 secret key as a 32 byte array. 71 pub key: Vec<u8>, 72 /// The gossip topic to use. Must be 32 bytes. 73 /// 74 /// You can use e.g. a BLAKE3 hash of a topic string here. This can be used 75 /// as a cheap way to have a shared secret - nodes that do not know the topic 76 /// cannot connect to the swarm. 77 pub topic: Vec<u8>, 78 /// Maximum duration to wait for sending a stream piece to a peer. 79 pub max_send_duration: Duration, 80 /// Disable using relays, for tests. 81 pub disable_relay: bool, 82} 83 84#[derive(uniffi::Enum, Debug, Clone)] 85enum StreamFilter { 86 All, 87 Global, 88 Stream(Vec<u8>), 89} 90 91/// A filter for subscriptions and iteration. 92#[derive(uniffi::Object, Debug, Clone)] 93pub struct Filter { 94 scope: Option<Vec<Arc<PublicKey>>>, 95 stream: StreamFilter, 96 min_time: TimeBound, 97 max_time: TimeBound, 98} 99 100#[uniffi::export] 101impl Filter { 102 /// Creates a new filter that matches everything. 103 #[uniffi::constructor] 104 pub fn new() -> Arc<Self> { 105 Arc::new(Self { 106 scope: None, 107 stream: StreamFilter::All, 108 min_time: TimeBound::Unbounded, 109 max_time: TimeBound::Unbounded, 110 }) 111 } 112 113 /// Restrict to the global namespace, no per stream data. 114 pub fn global(mut self: Arc<Self>) -> Arc<Self> { 115 let this = Arc::make_mut(&mut self); 116 this.stream = StreamFilter::Global; 117 self 118 } 119 120 /// Restrict to one specific stream, no global data. 121 pub fn stream(mut self: Arc<Self>, stream: Vec<u8>) -> Arc<Self> { 122 let this = Arc::make_mut(&mut self); 123 this.stream = StreamFilter::Stream(stream); 124 self 125 } 126 127 /// Restrict to a set of scopes. 128 pub fn scopes(mut self: Arc<Self>, scopes: Vec<Arc<PublicKey>>) -> Arc<Self> { 129 let this = Arc::make_mut(&mut self); 130 this.scope = Some(scopes); 131 self 132 } 133 134 /// Restrict to a single scope. 135 pub fn scope(self: Arc<Self>, scope: Arc<PublicKey>) -> Arc<Self> { 136 self.scopes(vec![scope]) 137 } 138 139 /// Restrict to a time range. 140 pub fn timestamps(mut self: Arc<Self>, min: TimeBound, max: TimeBound) -> Arc<Self> { 141 let this = Arc::make_mut(&mut self); 142 this.min_time = min; 143 this.max_time = max; 144 self 145 } 146 147 /// Restrict to a time range given in nanoseconds since unix epoch. 148 pub fn time_range(self: Arc<Self>, min: u64, max: u64) -> Arc<Self> { 149 self.timestamps(TimeBound::Included(min), TimeBound::Excluded(max)) 150 } 151 152 /// Restrict to a time range starting at min, unbounded at the top. 153 pub fn time_from(self: Arc<Self>, min: u64) -> Arc<Self> { 154 self.timestamps(TimeBound::Included(min), TimeBound::Unbounded) 155 } 156} 157 158impl From<Filter> for iroh_smol_kv::Filter { 159 fn from(value: Filter) -> Self { 160 let mut filter = iroh_smol_kv::Filter::ALL; 161 match value.stream { 162 // everything 163 StreamFilter::All => {} 164 // everything starting with 'g', for the global namespace 165 StreamFilter::Global => { 166 filter = filter.key_prefix(b"g".as_ref()); 167 } 168 // a specific stream, everything starting with 's' + escaped stream name 169 StreamFilter::Stream(t) => { 170 let prefix = util::encode_stream_and_key(Some(&t), &[]); 171 filter = filter.key_prefix(prefix); 172 } 173 }; 174 filter = filter.timestamps_nanos(( 175 Bound::<u64>::from(value.min_time), 176 Bound::<u64>::from(value.max_time), 177 )); 178 if let Some(scopes) = value.scope { 179 let keys = scopes.iter().map(|k| iroh::PublicKey::from(k.as_ref())); 180 filter = filter.scopes(keys); 181 } 182 filter 183 } 184} 185 186/// Error getting the next item from a subscription. 187#[derive(uniffi::Enum, Snafu, Debug)] 188#[snafu(module)] 189pub enum SubscribeNextError { 190 /// Error during the subscribe next operation. 191 Irpc { message: String }, 192} 193 194/// Error getting the next item from a subscription. 195#[derive(uniffi::Enum, Snafu, Debug)] 196#[snafu(module)] 197pub enum WriteError { 198 /// The provided private key is invalid (not 32 bytes). 199 PrivateKeySize { size: u64 }, 200} 201 202/// Error shutting down the database. 203/// 204/// This can occur if the db is already shut down or if there is an internal error. 205#[derive(uniffi::Enum, Snafu, Debug)] 206#[snafu(module)] 207pub enum ShutdownError { 208 /// Error during the shutdown operation. 209 Irpc { message: String }, 210} 211 212/// An entry returned from the database. 213#[derive(uniffi::Record, Debug, PartialEq, Eq)] 214pub struct Entry { 215 scope: Arc<PublicKey>, 216 stream: Option<Vec<u8>>, 217 key: Vec<u8>, 218 value: Vec<u8>, 219 timestamp: u64, 220} 221 222/// An item returned from a subscription. 223#[derive(uniffi::Enum, Debug)] 224pub enum SubscribeItem { 225 Entry { 226 scope: Arc<PublicKey>, 227 stream: Option<Vec<u8>>, 228 key: Vec<u8>, 229 value: Vec<u8>, 230 timestamp: u64, 231 }, 232 CurrentDone, 233 Expired { 234 scope: Arc<PublicKey>, 235 stream: Option<Vec<u8>>, 236 key: Vec<u8>, 237 timestamp: u64, 238 }, 239 Other, 240} 241 242fn fmt_stream(stream: &Option<Vec<u8>>) -> String { 243 match stream { 244 None => "<nil>".to_string(), 245 Some(s) => format_bytes(s), 246 } 247} 248 249#[uniffi::export] 250pub fn subscribe_item_debug(item: &SubscribeItem) -> String { 251 match item { 252 SubscribeItem::Entry { 253 scope, 254 stream, 255 key, 256 value, 257 timestamp, 258 } => format!( 259 "Entry {{ scope: {}, stream: {}, key: {}, value: {}, timestamp: {} }}", 260 scope.fmt_short(), 261 fmt_stream(stream), 262 format_bytes(key), 263 format_bytes(value), 264 timestamp 265 ), 266 SubscribeItem::CurrentDone => "CurrentDone".to_string(), 267 SubscribeItem::Expired { 268 scope, 269 stream, 270 key, 271 timestamp, 272 } => format!( 273 "Expired {{ scope: {}, stream: {}, key: {}, timestamp: {} }}", 274 scope.fmt_short(), 275 fmt_stream(stream), 276 format_bytes(key), 277 timestamp 278 ), 279 SubscribeItem::Other => "Other".to_string(), 280 } 281} 282 283impl From<iroh_smol_kv::SubscribeItem> for SubscribeItem { 284 fn from(item: iroh_smol_kv::SubscribeItem) -> Self { 285 match &item { 286 iroh_smol_kv::SubscribeItem::Entry((scope, key, value)) => { 287 let Some((stream, key)) = util::decode_stream_and_key(key) else { 288 return Self::Other; 289 }; 290 Self::Entry { 291 scope: Arc::new((*scope).into()), 292 stream, 293 key, 294 value: value.value.to_vec(), 295 timestamp: value.timestamp, 296 } 297 } 298 iroh_smol_kv::SubscribeItem::CurrentDone => Self::CurrentDone, 299 iroh_smol_kv::SubscribeItem::Expired((scope, topic, timestamp)) => { 300 let (stream, key) = util::decode_stream_and_key(topic).unwrap(); 301 Self::Expired { 302 scope: Arc::new((*scope).into()), 303 stream, 304 key, 305 timestamp: *timestamp, 306 } 307 } 308 } 309 } 310} 311 312/// A response to a subscribe request. 313/// 314/// This can be used as a stream of [`SubscribeItem`]s. 315#[derive(uniffi::Object)] 316#[uniffi::export(Debug)] 317#[allow(clippy::type_complexity)] 318pub struct SubscribeResponse { 319 inner: Mutex< 320 Pin< 321 Box< 322 dyn Stream<Item = Result<iroh_smol_kv::SubscribeItem, irpc::Error>> 323 + Send 324 + Sync 325 + 'static, 326 >, 327 >, 328 >, 329} 330 331impl fmt::Debug for SubscribeResponse { 332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 333 f.debug_struct("SubscribeResponse").finish_non_exhaustive() 334 } 335} 336 337#[uniffi::export] 338impl SubscribeResponse { 339 pub async fn next_raw(&self) -> Result<Option<SubscribeItem>, SubscribeNextError> { 340 let mut this = self.inner.lock().await; 341 match this.as_mut().next().await { 342 None => Ok(None), 343 Some(Ok(item)) => Ok(Some(item.into())), 344 Some(Err(e)) => Err(SubscribeNextError::Irpc { 345 message: e.to_string(), 346 }), 347 } 348 } 349} 350 351/// Options for subscribing. 352/// 353/// `filter` specifies what to subscribe to. 354/// `mode` specifies whether to get current items, new items, or both. 355#[derive(uniffi::Record)] 356pub struct SubscribeOpts { 357 pub filter: Arc<Filter>, 358 pub mode: SubscribeMode, 359} 360 361impl From<SubscribeOpts> for iroh_smol_kv::Subscribe { 362 fn from(opts: SubscribeOpts) -> Self { 363 iroh_smol_kv::Subscribe { 364 filter: opts.filter.as_ref().clone().into(), 365 mode: opts.mode.into(), 366 } 367 } 368} 369 370/// A write scope that can be used to put values into the database. 371/// 372/// The default write scope is available from the [`Node::node_scope`] method. 373#[derive(Clone, Debug, RefCast, uniffi::Object)] 374#[repr(transparent)] 375pub struct WriteScope(iroh_smol_kv::WriteScope); 376 377#[uniffi::export] 378impl WriteScope { 379 pub async fn put( 380 &self, 381 stream: Option<Vec<u8>>, 382 key: Vec<u8>, 383 value: Vec<u8>, 384 ) -> Result<(), PutError> { 385 self.put_impl(stream, key, value.into()) 386 .await 387 .map_err(|e| PutError::Irpc { 388 message: e.to_string(), 389 }) 390 } 391} 392 393impl WriteScope { 394 pub fn new(inner: iroh_smol_kv::WriteScope) -> Self { 395 Self(inner) 396 } 397 398 /// Put a value into the database, optionally in a specific stream. 399 pub async fn put_impl( 400 &self, 401 stream: Option<impl AsRef<[u8]>>, 402 key: impl AsRef<[u8]>, 403 value: Bytes, 404 ) -> Result<(), irpc::Error> { 405 let key = key.as_ref(); 406 let stream = stream.as_ref().map(|s| s.as_ref()); 407 let encoded = util::encode_stream_and_key(stream, key); 408 self.0.put(encoded, value).await?; 409 Ok(()) 410 } 411} 412 413/// Iroh-streamplace specific metadata database. 414#[derive(Debug, Clone, RefCast, uniffi::Object)] 415#[repr(transparent)] 416pub struct Db(iroh_smol_kv::Client); 417 418impl Db { 419 pub fn new(inner: iroh_smol_kv::Client) -> Self { 420 Self(inner) 421 } 422 423 pub fn inner(&self) -> &iroh_smol_kv::Client { 424 &self.0 425 } 426} 427 428#[uniffi::export] 429impl Db { 430 pub fn write(&self, secret: Vec<u8>) -> Result<Arc<WriteScope>, WriteError> { 431 let secret = iroh::SecretKey::from_bytes(&secret.try_into().map_err(|e: Vec<u8>| { 432 WriteError::PrivateKeySize { 433 size: e.len() as u64, 434 } 435 })?); 436 let write = self.0.write(secret); 437 Ok(Arc::new(WriteScope::new(write))) 438 } 439 440 pub async fn iter_with_opts( 441 &self, 442 filter: Arc<Filter>, 443 ) -> Result<Vec<Entry>, SubscribeNextError> { 444 let sub = self.subscribe_with_opts(SubscribeOpts { 445 filter, 446 mode: SubscribeMode::Current, 447 }); 448 let mut items = Vec::new(); 449 while let Some(item) = sub.next_raw().await? { 450 match item { 451 SubscribeItem::Entry { 452 scope, 453 stream, 454 key, 455 value, 456 timestamp, 457 } => { 458 items.push(Entry { 459 scope, 460 stream, 461 key, 462 value, 463 timestamp, 464 }); 465 } 466 _ => unreachable!("we used SubscribeMode::Current, so we should only get entries"), 467 } 468 } 469 Ok(items) 470 } 471 472 pub fn subscribe(&self, filter: Arc<Filter>) -> Arc<SubscribeResponse> { 473 self.subscribe_with_opts(SubscribeOpts { 474 filter, 475 mode: SubscribeMode::Both, 476 }) 477 } 478 479 /// Subscribe with options. 480 pub fn subscribe_with_opts(&self, opts: SubscribeOpts) -> Arc<SubscribeResponse> { 481 Arc::new(SubscribeResponse { 482 inner: Mutex::new(Box::pin( 483 self.0.subscribe_with_opts(opts.into()).stream_raw(), 484 )), 485 }) 486 } 487 488 /// Shutdown the database client and all subscriptions. 489 pub async fn shutdown(&self) -> Result<(), ShutdownError> { 490 Ok(self.0.shutdown().await.map_err(|e| ShutdownError::Irpc { 491 message: e.to_string(), 492 })?) 493 } 494} 495 496mod util { 497 498 pub fn encode_stream_and_key(stream: Option<&[u8]>, key: &[u8]) -> Vec<u8> { 499 let mut result = Vec::new(); 500 if let Some(s) = stream { 501 result.push(b's'); 502 postcard::to_io(s, &mut result).unwrap(); 503 } else { 504 result.push(b'g'); 505 } 506 result.extend(key); 507 result 508 } 509 510 pub fn decode_stream_and_key(encoded: &[u8]) -> Option<(Option<Vec<u8>>, Vec<u8>)> { 511 match encoded.split_first() { 512 Some((b's', mut rest)) => { 513 let (stream, rest) = postcard::take_from_bytes(&mut rest).ok()?; 514 Some((Some(stream), rest.to_vec())) 515 } 516 Some((b'g', rest)) => Some((None, rest.to_vec())), 517 _ => None, 518 } 519 } 520 521 pub fn format_bytes(bytes: &[u8]) -> String { 522 if bytes.is_empty() { 523 return "\"\"".to_string(); 524 } 525 let Ok(s) = std::str::from_utf8(bytes) else { 526 return hex::encode(bytes); 527 }; 528 if s.chars() 529 .any(|c| c.is_control() && c != '\n' && c != '\t' && c != '\r') 530 { 531 return hex::encode(bytes); 532 } 533 format!("\"{}\"", escape_string(s)) 534 } 535 536 pub fn escape_string(s: &str) -> String { 537 s.chars() 538 .map(|c| match c { 539 '"' => "\\\"".to_string(), 540 '\\' => "\\\\".to_string(), 541 '\n' => "\\n".to_string(), 542 '\t' => "\\t".to_string(), 543 '\r' => "\\r".to_string(), 544 c => c.to_string(), 545 }) 546 .collect() 547 } 548} 549 550#[cfg(test)] 551mod tests { 552 use super::util; 553 554 #[test] 555 fn escape_unescape() { 556 let cases: Vec<(Option<&[u8]>, &[u8])> = vec![ 557 (None, b"key1"), 558 (Some(b""), b""), 559 (Some(b""), b"a"), 560 (Some(b"a"), b""), 561 ]; 562 for (stream, key) in cases { 563 let encoded = util::encode_stream_and_key(stream, key); 564 let (decoded_stream, decoded_key) = util::decode_stream_and_key(&encoded).unwrap(); 565 assert_eq!(decoded_stream.as_deref(), stream); 566 assert_eq!(decoded_key.as_slice(), key); 567 } 568 } 569}