Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/rust-testing 1049 lines 37 kB view raw
1uniffi::setup_scaffolding!(); 2 3pub mod c2pa; 4pub mod node_addr; 5pub mod public_key; 6 7use std::sync::{LazyLock, Once}; 8 9mod db; 10pub use db::*; 11#[cfg(test)] 12mod tests; 13 14/// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime. 15static RUNTIME: LazyLock<tokio::runtime::Runtime> = 16 LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); 17 18/// Ensure logging is only initialized once 19static LOGGING_INIT: Once = Once::new(); 20 21use std::{ 22 collections::{BTreeMap, BTreeSet, HashSet}, 23 str::FromStr, 24 sync::Arc, 25}; 26use tokio::sync::Mutex; 27 28use bytes::Bytes; 29use iroh::{NodeId, RelayMode, SecretKey, discovery::static_provider::StaticProvider}; 30use iroh_base::ticket::NodeTicket; 31use iroh_gossip::{net::Gossip, proto::TopicId}; 32use irpc::{WithChannels, rpc::RemoteService}; 33use irpc_iroh::{IrohProtocol, IrohRemoteConnection}; 34use n0_future::future::Boxed; 35 36mod rpc { 37 //! Protocol API 38 use bytes::Bytes; 39 use iroh::NodeId; 40 use irpc::{channel::oneshot, rpc_requests}; 41 use serde::{Deserialize, Serialize}; 42 43 pub const ALPN: &[u8] = b"/iroh/streamplace/1"; 44 45 /// Subscribe to the given `key` 46 #[derive(Debug, Serialize, Deserialize)] 47 pub struct Subscribe { 48 pub key: String, 49 // TODO: verify 50 pub remote_id: NodeId, 51 } 52 53 /// Unsubscribe from the given `key` 54 #[derive(Debug, Serialize, Deserialize)] 55 pub struct Unsubscribe { 56 pub key: String, 57 // TODO: verify 58 pub remote_id: NodeId, 59 } 60 61 // #[derive(Debug, Serialize, Deserialize)] 62 // pub struct SendSegment { 63 // pub key: String, 64 // pub data: Bytes, 65 // } 66 67 #[derive(Debug, Clone, Serialize, Deserialize)] 68 pub struct RecvSegment { 69 pub from: NodeId, 70 pub key: String, 71 pub data: Bytes, 72 } 73 74 // Use the macro to generate both the Protocol and Message enums 75 // plus implement Channels for each type 76 #[rpc_requests(message = Message)] 77 #[derive(Serialize, Deserialize, Debug)] 78 pub enum Protocol { 79 #[rpc(tx=oneshot::Sender<()>)] 80 Subscribe(Subscribe), 81 #[rpc(tx=oneshot::Sender<()>)] 82 Unsubscribe(Unsubscribe), 83 #[rpc(tx=oneshot::Sender<()>)] 84 RecvSegment(RecvSegment), 85 } 86} 87 88mod api { 89 //! Protocol API 90 use bytes::Bytes; 91 use iroh::{NodeAddr, NodeId}; 92 use irpc::{channel::oneshot, rpc_requests}; 93 use serde::{Deserialize, Serialize}; 94 95 /// Subscribe to the given `key` 96 #[derive(Debug, Serialize, Deserialize)] 97 pub struct Subscribe { 98 pub key: String, 99 // TODO: verify 100 pub remote_id: NodeId, 101 } 102 103 /// Unsubscribe from the given `key` 104 #[derive(Debug, Serialize, Deserialize)] 105 pub struct Unsubscribe { 106 pub key: String, 107 // TODO: verify 108 pub remote_id: NodeId, 109 } 110 111 #[derive(Debug, Serialize, Deserialize)] 112 pub struct SendSegment { 113 pub key: String, 114 pub data: Bytes, 115 } 116 117 #[derive(Debug, Serialize, Deserialize)] 118 pub struct JoinPeers { 119 pub peers: Vec<NodeAddr>, 120 } 121 122 #[derive(Debug, Serialize, Deserialize)] 123 pub struct AddTickets { 124 pub peers: Vec<NodeAddr>, 125 } 126 127 /// Subscribe to a key with a ticket - handles discovery, dialing, and subscription atomically 128 #[derive(Debug, Serialize, Deserialize)] 129 pub struct SubscribeWithTicket { 130 pub key: String, 131 pub ticket: NodeAddr, 132 } 133 134 #[derive(Debug, Serialize, Deserialize)] 135 pub struct GetNodeAddr; 136 137 #[derive(Debug, Serialize, Deserialize)] 138 pub struct Shutdown; 139 140 // Use the macro to generate both the Protocol and Message enums 141 // plus implement Channels for each type 142 #[rpc_requests(message = Message)] 143 #[derive(Serialize, Deserialize, Debug)] 144 pub enum Protocol { 145 #[rpc(tx=oneshot::Sender<()>)] 146 Subscribe(Subscribe), 147 #[rpc(tx=oneshot::Sender<()>)] 148 Unsubscribe(Unsubscribe), 149 #[rpc(tx=oneshot::Sender<()>)] 150 SendSegment(SendSegment), 151 #[rpc(tx=oneshot::Sender<()>)] 152 JoinPeers(JoinPeers), 153 #[rpc(tx=oneshot::Sender<()>)] 154 AddTickets(AddTickets), 155 #[rpc(tx=oneshot::Sender<()>)] 156 SubscribeWithTicket(SubscribeWithTicket), 157 #[rpc(tx=oneshot::Sender<NodeAddr>)] 158 GetNodeAddr(GetNodeAddr), 159 #[rpc(tx=oneshot::Sender<()>)] 160 Shutdown(Shutdown), 161 } 162} 163use api::{Message as ApiMessage, Protocol as ApiProtocol}; 164use n0_future::{FuturesUnordered, StreamExt}; 165use rpc::{Message as RpcMessage, Protocol as RpcProtocol}; 166use snafu::Snafu; 167use tracing::{Instrument, debug, error, trace, trace_span, warn}; 168 169use crate::rpc::RecvSegment; 170 171/// Initialize logging with the default subscriber that respects RUST_LOG environment variable. 172/// This function is safe to call multiple times - it will only initialize logging once. 173#[uniffi::export] 174pub fn init_logging() { 175 LOGGING_INIT.call_once(|| { 176 tracing_subscriber::fmt() 177 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) 178 .init(); 179 }); 180} 181 182/// Initialize logging with a custom log level. 183/// This function is safe to call multiple times - it will only initialize logging once. 184/// 185/// # Arguments 186/// * `level` - Log level as a string (e.g., "trace", "debug", "info", "warn", "error") 187#[uniffi::export] 188pub fn init_logging_with_level(level: String) { 189 LOGGING_INIT.call_once(|| { 190 let filter = tracing_subscriber::EnvFilter::try_from_default_env() 191 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level)); 192 193 tracing_subscriber::fmt().with_env_filter(filter).init(); 194 }); 195} 196 197pub(crate) enum HandlerMode { 198 Sender, 199 Forwarder, 200 Receiver(Arc<dyn DataHandler>), 201} 202 203impl Clone for HandlerMode { 204 fn clone(&self) -> Self { 205 match self { 206 HandlerMode::Sender => HandlerMode::Sender, 207 HandlerMode::Forwarder => HandlerMode::Forwarder, 208 HandlerMode::Receiver(h) => HandlerMode::Receiver(h.clone()), 209 } 210 } 211} 212 213impl HandlerMode { 214 pub fn mode_str(&self) -> &'static str { 215 match self { 216 HandlerMode::Sender => "sender", 217 HandlerMode::Forwarder => "forwarder", 218 HandlerMode::Receiver(_) => "receiver", 219 } 220 } 221} 222 223type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>; 224 225/// Actor that contains both a kv db for metadata and a handler for the rpc protocol. 226/// 227/// This can be used both for sender and receiver nodes. Sender nodes will just set the 228/// handler to None. 229struct Actor { 230 /// Receiver for rpc messages from remote nodes 231 rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>, 232 /// Receiver for API messages from the user 233 api_rx: tokio::sync::mpsc::Receiver<ApiMessage>, 234 /// Shared state wrapped for concurrent access 235 state: Arc<Mutex<ActorState>>, 236} 237 238/// Shared mutable state for the actor 239struct ActorState { 240 /// nodes I need to send to for each stream 241 subscribers: BTreeMap<String, BTreeSet<NodeId>>, 242 /// nodes I am subscribed to 243 subscriptions: BTreeMap<String, NodeId>, 244 /// lightweight typed connection pool 245 connections: ConnectionPool, 246 /// How to handle incoming data 247 handler: HandlerMode, 248 /// Static provider for node discovery 249 sp: StaticProvider, 250 /// Iroh protocol router, I need to keep it around to keep the protocol alive 251 router: iroh::protocol::Router, 252 /// Metadata db 253 client: db::Db, 254 /// Write scope for this node for the metadata db 255 write: db::WriteScope, 256 /// Ongoing tasks 257 tasks: Tasks, 258 /// Configuration, needed for timeouts etc. 259 config: Arc<Config>, 260} 261 262#[derive(Debug, Clone)] 263struct Connection { 264 id: NodeId, 265 rpc: irpc::Client<RpcProtocol>, 266} 267 268#[derive(Debug, Snafu)] 269enum RpcTaskError { 270 #[snafu(transparent)] 271 Task { source: irpc::Error }, 272 #[snafu(transparent)] 273 Timeout { source: tokio::time::error::Elapsed }, 274} 275 276struct ConnectionPool { 277 endpoint: iroh::Endpoint, 278 connections: BTreeMap<NodeId, Connection>, 279} 280 281impl ConnectionPool { 282 fn new(endpoint: iroh::Endpoint) -> Self { 283 Self { 284 endpoint, 285 connections: BTreeMap::new(), 286 } 287 } 288 289 /// Cheap conn pool hack 290 fn get(&mut self, remote: &NodeId) -> Connection { 291 if !self.connections.contains_key(remote) { 292 trace!(remote = %remote.fmt_short(),"ConnectionPool.get is inserting a new remote connection"); 293 let conn = IrohRemoteConnection::new( 294 self.endpoint.clone(), 295 (*remote).into(), 296 rpc::ALPN.to_vec(), 297 ); 298 let conn = Connection { 299 rpc: irpc::Client::boxed(conn), 300 id: *remote, 301 }; 302 self.connections.insert(*remote, conn); 303 } 304 self.connections 305 .get_mut(remote) 306 .expect("just inserted") 307 .clone() 308 } 309 310 fn remove(&mut self, remote: &NodeId) { 311 self.connections.remove(remote); 312 } 313} 314 315impl Actor { 316 pub async fn spawn( 317 endpoint: iroh::Endpoint, 318 sp: StaticProvider, 319 topic: iroh_gossip::proto::TopicId, 320 config: Config, 321 handler: HandlerMode, 322 ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> { 323 let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(512); 324 let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(512); 325 let gossip = Gossip::builder().spawn(endpoint.clone()); 326 let id = endpoint.node_id(); 327 let router = iroh::protocol::Router::builder(endpoint.clone()) 328 .accept(iroh_gossip::ALPN, gossip.clone()) 329 .accept( 330 rpc::ALPN, 331 IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())), 332 ) 333 .spawn(); 334 let topic = gossip.subscribe(topic, vec![]).await?; 335 let secret = router.endpoint().secret_key().clone(); 336 let db_config = Default::default(); 337 let client = iroh_smol_kv::Client::local(topic, db_config); 338 let write = db::WriteScope::new(client.write(secret.clone())); 339 let client = db::Db::new(client); 340 let state = Arc::new(Mutex::new(ActorState { 341 subscribers: BTreeMap::new(), 342 subscriptions: BTreeMap::new(), 343 connections: ConnectionPool::new(router.endpoint().clone()), 344 handler, 345 router, 346 sp, 347 write: write.clone(), 348 client: client.clone(), 349 tasks: FuturesUnordered::new(), 350 config: Arc::new(config), 351 })); 352 let actor = Self { 353 rpc_rx, 354 api_rx, 355 state, 356 }; 357 let api = Node { 358 client: Arc::new(client), 359 write: Arc::new(write), 360 api: irpc::Client::local(api_tx), 361 }; 362 Ok(( 363 api, 364 actor 365 .run() 366 .instrument(trace_span!("actor", id=%id.fmt_short())), 367 )) 368 } 369 370 async fn run(mut self) { 371 loop { 372 tokio::select! { 373 msg = self.rpc_rx.recv() => { 374 trace!("received local rpc message"); 375 let Some(msg) = msg else { 376 error!("rpc channel closed"); 377 break; 378 }; 379 let state = self.state.clone(); 380 tokio::spawn(async move { 381 Self::handle_rpc(state, msg).instrument(trace_span!("rpc")).await; 382 }); 383 } 384 msg = self.api_rx.recv() => { 385 trace!("received remote rpc message"); 386 let Some(msg) = msg else { 387 break; 388 }; 389 // check if it's a shutdown message before spawning 390 if matches!(msg, ApiMessage::Shutdown(_)) { 391 let state = self.state.clone(); 392 let shutdown = tokio::spawn(async move { 393 Self::handle_api(state, msg).instrument(trace_span!("api")).await 394 }).await.ok().flatten(); 395 if let Some(shutdown) = shutdown { 396 shutdown.send(()).await.ok(); 397 } 398 break; 399 } else { 400 let state = self.state.clone(); 401 tokio::spawn(async move { 402 Self::handle_api(state, msg).instrument(trace_span!("api")).await; 403 }); 404 } 405 } 406 else => { 407 trace!("processing task"); 408 // poll tasks - briefly lock to check and poll 409 let poll_result = { 410 let mut state = self.state.lock().await; 411 if !state.tasks.is_empty() { 412 state.tasks.next().await 413 } else { 414 None 415 } 416 }; 417 if let Some((remote_id, res)) = poll_result { 418 match res { 419 Ok(()) => {} 420 Err(RpcTaskError::Timeout { source }) => { 421 warn!("call to {remote_id} timed out: {source}"); 422 } 423 Err(RpcTaskError::Task { source }) => { 424 warn!("call to {remote_id} failed: {source}"); 425 } 426 } 427 self.state.lock().await.connections.remove(&remote_id); 428 } 429 } 430 } 431 } 432 warn!("RPC Actor loop has closed"); 433 } 434 435 /// Update subscriber meta in the gossip database. Will lock internally to fetch subscribers. 436 async fn update_subscriber_meta_unlocked(state: Arc<Mutex<ActorState>>, key: &str) { 437 let (n, write) = { 438 let state = state.lock().await; 439 let n = state 440 .subscribers 441 .get(key) 442 .map(|s| s.len()) 443 .unwrap_or_default(); 444 (n, state.write.clone()) 445 }; 446 let v = n.to_string().into_bytes(); 447 write 448 .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into()) 449 .await 450 .ok(); 451 } 452 453 /// Requests from remote nodes 454 async fn handle_rpc(state: Arc<Mutex<ActorState>>, msg: RpcMessage) { 455 trace!("RPC.handle_rpc"); 456 match msg { 457 RpcMessage::Subscribe(msg) => { 458 trace!(inner = ?msg.inner, "RpcMessage::Subscribe"); 459 let WithChannels { 460 tx, 461 inner: rpc::Subscribe { key, remote_id }, 462 .. 463 } = msg; 464 { 465 let mut state = state.lock().await; 466 state 467 .subscribers 468 .entry(key.clone()) 469 .or_default() 470 .insert(remote_id); 471 debug!(key = %key, remote = %remote_id.fmt_short(), count = state.subscribers.get(&key).map(|s| s.len()).unwrap_or(0), "added subscriber"); 472 } 473 Self::update_subscriber_meta_unlocked(state.clone(), &key).await; 474 tx.send(()).await.ok(); 475 } 476 RpcMessage::Unsubscribe(msg) => { 477 debug!(inner = ?msg.inner, "RpcMessage::Unsubscribe"); 478 let WithChannels { 479 tx, 480 inner: rpc::Unsubscribe { key, remote_id }, 481 .. 482 } = msg; 483 { 484 let mut state = state.lock().await; 485 if let Some(e) = state.subscribers.get_mut(&key) 486 && !e.remove(&remote_id) 487 { 488 warn!( 489 "unsubscribe: no subscription for {} from {}", 490 key, remote_id 491 ); 492 } 493 if let Some(subscriptions) = state.subscribers.get(&key) 494 && subscriptions.is_empty() 495 { 496 state.subscribers.remove(&key); 497 } 498 } 499 Self::update_subscriber_meta_unlocked(state.clone(), &key).await; 500 tx.send(()).await.ok(); 501 } 502 RpcMessage::RecvSegment(msg) => { 503 trace!(inner = ?msg.inner, "RpcMessage::RecvSegment"); 504 let WithChannels { 505 tx, 506 inner: rpc::RecvSegment { key, from, data }, 507 .. 508 } = msg; 509 let mut state = state.lock().await; 510 match &state.handler { 511 HandlerMode::Sender => { 512 warn!("received segment but in sender mode"); 513 } 514 HandlerMode::Forwarder => { 515 trace!("forwarding segment"); 516 if let Some(remotes) = state.subscribers.get(&key).cloned() { 517 Self::handle_send(&mut state, key, data, &remotes); 518 } else { 519 trace!("no subscribers for stream {}", key); 520 } 521 drop(state); // release lock 522 } 523 HandlerMode::Receiver(handler) => { 524 if state.subscriptions.contains_key(&key) { 525 let from = Arc::new(from.into()); 526 let handler = handler.clone(); 527 drop(state); // release lock before async call 528 handler.handle_data(from, key, data.to_vec()).await; 529 } else { 530 warn!("received segment for unsubscribed key: {}", key); 531 drop(state); 532 } 533 } 534 }; 535 tx.send(()).await.ok(); 536 } 537 } 538 } 539 540 async fn handle_api( 541 state: Arc<Mutex<ActorState>>, 542 msg: ApiMessage, 543 ) -> Option<irpc::channel::oneshot::Sender<()>> { 544 trace!("RPC.handle_api"); 545 match msg { 546 ApiMessage::SendSegment(msg) => { 547 trace!(inner = ?msg.inner, "ApiMessage::SendSegment"); 548 let WithChannels { 549 tx, 550 inner: api::SendSegment { key, data }, 551 .. 552 } = msg; 553 { 554 let mut state = state.lock().await; 555 if let Some(remotes) = state.subscribers.get(&key).cloned() { 556 Self::handle_send(&mut state, key, data, &remotes); 557 } else { 558 trace!("no subscribers for stream {}", key); 559 } 560 } 561 tx.send(()).await.ok(); 562 } 563 ApiMessage::Subscribe(msg) => { 564 trace!(inner = ?msg.inner, "ApiMessage::Subscribe"); 565 let WithChannels { 566 tx, 567 inner: api::Subscribe { key, remote_id }, 568 .. 569 } = msg; 570 let (conn, node_id) = { 571 let mut state = state.lock().await; 572 ( 573 state.connections.get(&remote_id), 574 state.router.endpoint().node_id(), 575 ) 576 }; 577 trace!(remote = %remote_id.fmt_short(), key = %key, "send rpc::Subscribe message"); 578 let result = conn 579 .rpc 580 .rpc(rpc::Subscribe { 581 key: key.clone(), 582 remote_id: node_id, 583 }) 584 .await; 585 match &result { 586 Ok(_) => { 587 debug!(remote = %remote_id.fmt_short(), key = %key, "subscribe RPC succeeded"); 588 state.lock().await.subscriptions.insert(key, remote_id); 589 } 590 Err(e) => { 591 error!(remote = %remote_id.fmt_short(), key = %key, error = %e, "subscribe RPC failed"); 592 } 593 } 594 tx.send(()).await.ok(); 595 } 596 ApiMessage::Unsubscribe(msg) => { 597 trace!(inner = ?msg.inner, "ApiMessage::Unsubscribe"); 598 let WithChannels { 599 tx, 600 inner: api::Unsubscribe { key, remote_id }, 601 .. 602 } = msg; 603 let (conn, node_id) = { 604 let mut state = state.lock().await; 605 ( 606 state.connections.get(&remote_id), 607 state.router.endpoint().node_id(), 608 ) 609 }; 610 trace!(remote = %remote_id.fmt_short(), key = %key, "send rpc::Unsubscribe message"); 611 conn.rpc 612 .rpc(rpc::Unsubscribe { 613 key: key.clone(), 614 remote_id: node_id, 615 }) 616 .await 617 .ok(); 618 state.lock().await.subscriptions.remove(&key); 619 tx.send(()).await.ok(); 620 } 621 ApiMessage::AddTickets(msg) => { 622 trace!(inner = ?msg.inner, "ApiMessage::AddTickets"); 623 let WithChannels { 624 tx, 625 inner: api::AddTickets { peers }, 626 .. 627 } = msg; 628 let sp = { 629 let state = state.lock().await; 630 state.sp.clone() 631 }; 632 for addr in &peers { 633 sp.add_node_info(addr.clone()); 634 } 635 tx.send(()).await.ok(); 636 } 637 ApiMessage::JoinPeers(msg) => { 638 trace!(inner = ?msg.inner, "ApiMessage::JoinPeers"); 639 let WithChannels { 640 tx, 641 inner: api::JoinPeers { peers }, 642 .. 643 } = msg; 644 let state = state.lock().await; 645 let node_id = state.router.endpoint().node_id(); 646 let ids = peers 647 .iter() 648 .map(|a| a.node_id) 649 .filter(|id| *id != node_id) 650 .collect::<HashSet<_>>(); 651 for addr in &peers { 652 state.sp.add_node_info(addr.clone()); 653 } 654 let client = state.client.clone(); 655 drop(state); 656 client.inner().join_peers(ids).await.ok(); 657 tx.send(()).await.ok(); 658 } 659 ApiMessage::SubscribeWithTicket(msg) => { 660 trace!(inner = ?msg.inner, "ApiMessage::SubscribeWithTicket"); 661 let WithChannels { 662 tx, 663 inner: api::SubscribeWithTicket { key, ticket }, 664 .. 665 } = msg; 666 667 // Extract remote node ID from ticket 668 let remote_id = ticket.node_id; 669 let node_id = { 670 let state = state.lock().await; 671 state.router.endpoint().node_id() 672 }; 673 674 // Don't subscribe to ourselves 675 if remote_id == node_id { 676 debug!(key = %key, "skipping self-subscription"); 677 tx.send(()).await.ok(); 678 return None; 679 } 680 681 // Add to discovery and join peers atomically 682 { 683 let state = state.lock().await; 684 state.sp.add_node_info(ticket.clone()); 685 let client = state.client.clone(); 686 drop(state); 687 688 // Wait for join to complete before proceeding 689 if let Err(e) = client.inner().join_peers([remote_id]).await { 690 error!(remote = %remote_id.fmt_short(), key = %key, error = ?e, "failed to join peer"); 691 tx.send(()).await.ok(); 692 return None; 693 } 694 } 695 696 debug!(remote = %remote_id.fmt_short(), key = %key, "joined peer, now subscribing with retries"); 697 698 // Retry subscribe with backoff to handle connection timing 699 const MAX_RETRIES: usize = 5; 700 let mut attempt = 0; 701 let mut backoff = std::time::Duration::from_millis(200); 702 let mut last_error; 703 704 while attempt < MAX_RETRIES { 705 attempt += 1; 706 707 // Get connection and subscribe 708 let result = { 709 let mut state = state.lock().await; 710 let conn = state.connections.get(&remote_id); 711 conn.rpc 712 .rpc(rpc::Subscribe { 713 key: key.clone(), 714 remote_id: node_id, 715 }) 716 .await 717 }; 718 719 match result { 720 Ok(_) => { 721 debug!(remote = %remote_id.fmt_short(), key = %key, attempt = attempt, "SubscribeWithTicket succeeded"); 722 state.lock().await.subscriptions.insert(key, remote_id); 723 break; 724 } 725 Err(e) => { 726 last_error = Some(e); 727 if attempt >= MAX_RETRIES { 728 if let Some(ref err) = last_error { 729 error!(remote = %remote_id.fmt_short(), key = %key, error = ?err, "SubscribeWithTicket failed after all retries"); 730 } 731 } else { 732 if let Some(ref err) = last_error { 733 debug!(remote = %remote_id.fmt_short(), key = %key, attempt = attempt, error = ?err, backoff_ms = backoff.as_millis(), "SubscribeWithTicket attempt failed, retrying"); 734 } 735 tokio::time::sleep(backoff).await; 736 // Exponential backoff with cap at 2s 737 if backoff < std::time::Duration::from_secs(2) { 738 backoff = std::cmp::min(backoff * 2, std::time::Duration::from_secs(2)); 739 } 740 } 741 } 742 } 743 } 744 745 tx.send(()).await.ok(); 746 } 747 ApiMessage::GetNodeAddr(msg) => { 748 trace!(inner = ?msg.inner, "ApiMessage::GetNodeAddr"); 749 let WithChannels { tx, .. } = msg; 750 let (disable_relay, endpoint) = { 751 let state = state.lock().await; 752 (state.config.disable_relay, state.router.endpoint().clone()) 753 }; 754 if !disable_relay { 755 endpoint.online().await; 756 } 757 let addr = endpoint.node_addr(); 758 tx.send(addr).await.ok(); 759 } 760 ApiMessage::Shutdown(msg) => { 761 trace!(inner = ?msg.inner, "ApiMessage::Shutdown"); 762 return Some(msg.tx); 763 } 764 } 765 None 766 } 767 768 fn handle_send(state: &mut ActorState, key: String, data: Bytes, remotes: &BTreeSet<NodeId>) { 769 let me = state.connections.endpoint.node_id(); 770 let msg = rpc::RecvSegment { 771 key, 772 data, 773 from: me, 774 }; 775 for remote in remotes { 776 trace!(remote = %remote.fmt_short(), key = %msg.key, "handle_send to remote"); 777 let conn = state.connections.get(remote); 778 state.tasks.push(Box::pin(Self::forward_task( 779 state.config.clone(), 780 conn, 781 msg.clone(), 782 ))); 783 } 784 } 785 786 async fn forward_task( 787 config: Arc<Config>, 788 conn: Connection, 789 msg: RecvSegment, 790 ) -> (NodeId, Result<(), RpcTaskError>) { 791 let id = conn.id; 792 let res = async move { 793 tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??; 794 Ok(()) 795 } 796 .await; 797 (id, res) 798 } 799} 800 801/// Iroh-streamplace node that can send, forward or receive stream segments. 802#[derive(Clone, uniffi::Object)] 803pub struct Node { 804 client: Arc<db::Db>, 805 write: Arc<db::WriteScope>, 806 api: irpc::Client<ApiProtocol>, 807} 808 809impl Node { 810 pub(crate) async fn new_in_runtime( 811 config: Config, 812 handler: HandlerMode, 813 ) -> Result<Arc<Self>, CreateError> { 814 let mode_str = Bytes::from(handler.mode_str()); 815 let secret_key = 816 SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| { 817 CreateError::PrivateKey { 818 size: e.len() as u64, 819 } 820 })?); 821 let topic = 822 TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| { 823 CreateError::Topic { 824 size: e.len() as u64, 825 } 826 })?); 827 let relay_mode = if config.disable_relay { 828 RelayMode::Disabled 829 } else { 830 RelayMode::Default 831 }; 832 let sp = StaticProvider::new(); 833 let endpoint = iroh::Endpoint::builder() 834 .secret_key(secret_key) 835 .relay_mode(relay_mode) 836 .discovery(sp.clone()) 837 .bind() 838 .await 839 .map_err(|e| CreateError::Bind { 840 message: e.to_string(), 841 })?; 842 let (api, actor) = Actor::spawn(endpoint, sp, topic, config, handler) 843 .await 844 .map_err(|e| CreateError::Subscribe { 845 message: e.to_string(), 846 })?; 847 api.node_scope() 848 .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str) 849 .await 850 .ok(); 851 tokio::spawn(actor); 852 Ok(Arc::new(api)) 853 } 854} 855 856/// DataHandler trait that is exported to go for receiving data callbacks. 857#[uniffi::export(with_foreign)] 858#[async_trait::async_trait] 859pub trait DataHandler: Send + Sync { 860 async fn handle_data( 861 &self, 862 from: Arc<crate::public_key::PublicKey>, 863 topic: String, 864 data: Vec<u8>, 865 ); 866} 867 868#[uniffi::export] 869impl Node { 870 /// Create a new streamplace client node. 871 #[uniffi::constructor] 872 pub async fn sender(config: Config) -> Result<Arc<Self>, CreateError> { 873 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender)) 874 } 875 876 #[uniffi::constructor] 877 pub async fn forwarder(config: Config) -> Result<Arc<Self>, CreateError> { 878 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder)) 879 } 880 881 #[uniffi::constructor] 882 pub async fn receiver( 883 config: Config, 884 handler: Arc<dyn DataHandler>, 885 ) -> Result<Arc<Self>, CreateError> { 886 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Receiver(handler))) 887 } 888 889 /// Get a handle to the db to watch for changes locally or globally. 890 pub fn db(&self) -> Arc<db::Db> { 891 self.client.clone() 892 } 893 894 /// Get a handle to the write scope for this node. 895 /// 896 /// This is equivalent to calling `db.write(...)` with the secret key used to create the node. 897 pub fn node_scope(&self) -> Arc<db::WriteScope> { 898 self.write.clone() 899 } 900 901 /// Subscribe to updates for a given stream from a remote node. 902 pub async fn subscribe( 903 &self, 904 key: String, 905 remote_id: Arc<crate::public_key::PublicKey>, 906 ) -> Result<(), PutError> { 907 self.api 908 .rpc(api::Subscribe { 909 key, 910 remote_id: remote_id.as_ref().into(), 911 }) 912 .await 913 .map_err(|e| PutError::Irpc { 914 message: e.to_string(), 915 }) 916 } 917 918 /// Unsubscribe from updates for a given stream from a remote node. 919 pub async fn unsubscribe( 920 &self, 921 key: String, 922 remote_id: Arc<crate::public_key::PublicKey>, 923 ) -> Result<(), PutError> { 924 self.api 925 .rpc(api::Unsubscribe { 926 key, 927 remote_id: remote_id.as_ref().into(), 928 }) 929 .await 930 .map_err(|e| PutError::Irpc { 931 message: e.to_string(), 932 }) 933 } 934 935 /// Send a segment to all subscribers of the given stream. 936 pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> { 937 debug!(key = &key, data_len = data.len(), "Node.send_segment"); 938 self.api 939 .rpc(api::SendSegment { 940 key, 941 data: data.into(), 942 }) 943 .await 944 .map_err(|e| PutError::Irpc { 945 message: e.to_string(), 946 }) 947 } 948 949 /// Join peers by their node tickets. 950 pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> { 951 let peers = peers 952 .iter() 953 .map(|p| NodeTicket::from_str(p)) 954 .collect::<Result<Vec<_>, _>>() 955 .map_err(|e| JoinPeersError::Ticket { 956 message: e.to_string(), 957 })?; 958 let addrs = peers 959 .iter() 960 .map(|t| t.node_addr().clone()) 961 .collect::<Vec<_>>(); 962 self.api 963 .rpc(api::JoinPeers { peers: addrs }) 964 .await 965 .map_err(|e| JoinPeersError::Irpc { 966 message: e.to_string(), 967 }) 968 } 969 970 /// Add tickets for remote peers 971 pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> { 972 let peers = peers 973 .iter() 974 .map(|p| NodeTicket::from_str(p)) 975 .collect::<Result<Vec<_>, _>>() 976 .map_err(|e| JoinPeersError::Ticket { 977 message: e.to_string(), 978 })?; 979 let addrs = peers 980 .iter() 981 .map(|t| t.node_addr().clone()) 982 .collect::<Vec<_>>(); 983 self.api 984 .rpc(api::AddTickets { peers: addrs }) 985 .await 986 .map_err(|e| JoinPeersError::Irpc { 987 message: e.to_string(), 988 }) 989 } 990 991 /// Subscribe to a stream with a ticket - handles discovery, dialing, and subscription atomically 992 pub async fn subscribe_with_ticket(&self, key: String, ticket: String) -> Result<(), JoinPeersError> { 993 let node_ticket = NodeTicket::from_str(&ticket) 994 .map_err(|e| JoinPeersError::Ticket { 995 message: e.to_string(), 996 })?; 997 let addr = node_ticket.node_addr().clone(); 998 self.api 999 .rpc(api::SubscribeWithTicket { key, ticket: addr }) 1000 .await 1001 .map_err(|e| JoinPeersError::Irpc { 1002 message: e.to_string(), 1003 }) 1004 } 1005 1006 /// Get this node's ticket. 1007 pub async fn ticket(&self) -> Result<String, PutError> { 1008 let addr = self 1009 .api 1010 .rpc(api::GetNodeAddr) 1011 .await 1012 .map_err(|e| PutError::Irpc { 1013 message: e.to_string(), 1014 })?; 1015 Ok(NodeTicket::from(addr).to_string()) 1016 } 1017 1018 /// Get this node's node ID. 1019 pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> { 1020 let addr = self 1021 .api 1022 .rpc(api::GetNodeAddr) 1023 .await 1024 .map_err(|e| PutError::Irpc { 1025 message: e.to_string(), 1026 })?; 1027 Ok(Arc::new(addr.node_id.into())) 1028 } 1029 1030 /// Shutdown the node, including the streaming system and the metadata db. 1031 pub async fn shutdown(&self) -> Result<(), ShutdownError> { 1032 // shut down both the streams and the db concurrently, even if one fails 1033 let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown()); 1034 res1?; 1035 res2?; 1036 Ok(()) 1037 } 1038} 1039 1040impl Node { 1041 async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> { 1042 self.api 1043 .rpc(api::Shutdown) 1044 .await 1045 .map_err(|e| ShutdownError::Irpc { 1046 message: e.to_string(), 1047 }) 1048 } 1049}