Live video on the AT Protocol
at eli/rtmp-push 872 lines 29 kB view raw
1uniffi::setup_scaffolding!(); 2 3pub mod c2pa; 4pub mod error; 5pub mod node_addr; 6pub mod public_key; 7pub mod streams; 8 9use std::sync::{LazyLock, Once}; 10 11mod db; 12pub use db::*; 13#[cfg(test)] 14mod tests; 15 16#[cfg(test)] 17mod test_stream; 18 19/// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime. 20static RUNTIME: LazyLock<tokio::runtime::Runtime> = 21 LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); 22 23/// Ensure logging is only initialized once 24static LOGGING_INIT: Once = Once::new(); 25 26use std::{ 27 collections::{BTreeMap, BTreeSet, HashSet}, 28 str::FromStr, 29 sync::Arc, 30}; 31 32use bytes::Bytes; 33use iroh::{NodeId, PublicKey, RelayMode, SecretKey, discovery::static_provider::StaticProvider}; 34use iroh_base::ticket::NodeTicket; 35use iroh_gossip::{net::Gossip, proto::TopicId}; 36use irpc::{WithChannels, rpc::RemoteService}; 37use irpc_iroh::{IrohProtocol, IrohRemoteConnection}; 38use n0_future::future::Boxed; 39 40mod rpc { 41 //! Protocol API 42 use bytes::Bytes; 43 use iroh::NodeId; 44 use irpc::{channel::oneshot, rpc_requests}; 45 use serde::{Deserialize, Serialize}; 46 47 pub const ALPN: &[u8] = b"/iroh/streamplace/1"; 48 49 /// Subscribe to the given `key` 50 #[derive(Debug, Serialize, Deserialize)] 51 pub struct Subscribe { 52 pub key: String, 53 // TODO: verify 54 pub remote_id: NodeId, 55 } 56 57 /// Unsubscribe from the given `key` 58 #[derive(Debug, Serialize, Deserialize)] 59 pub struct Unsubscribe { 60 pub key: String, 61 // TODO: verify 62 pub remote_id: NodeId, 63 } 64 65 // #[derive(Debug, Serialize, Deserialize)] 66 // pub struct SendSegment { 67 // pub key: String, 68 // pub data: Bytes, 69 // } 70 71 #[derive(Debug, Clone, Serialize, Deserialize)] 72 pub struct RecvSegment { 73 pub from: NodeId, 74 pub key: String, 75 pub data: Bytes, 76 } 77 78 // Use the macro to generate both the Protocol and Message enums 79 // plus implement Channels for each type 80 #[rpc_requests(message = Message)] 81 #[derive(Serialize, Deserialize, Debug)] 82 pub enum Protocol { 83 #[rpc(tx=oneshot::Sender<()>)] 84 Subscribe(Subscribe), 85 #[rpc(tx=oneshot::Sender<()>)] 86 Unsubscribe(Unsubscribe), 87 #[rpc(tx=oneshot::Sender<()>)] 88 RecvSegment(RecvSegment), 89 } 90} 91 92mod api { 93 //! Protocol API 94 use bytes::Bytes; 95 use iroh::{NodeAddr, NodeId}; 96 use irpc::{channel::oneshot, rpc_requests}; 97 use serde::{Deserialize, Serialize}; 98 99 /// Subscribe to the given `key` 100 #[derive(Debug, Serialize, Deserialize)] 101 pub struct Subscribe { 102 pub key: String, 103 // TODO: verify 104 pub remote_id: NodeId, 105 } 106 107 /// Unsubscribe from the given `key` 108 #[derive(Debug, Serialize, Deserialize)] 109 pub struct Unsubscribe { 110 pub key: String, 111 // TODO: verify 112 pub remote_id: NodeId, 113 } 114 115 #[derive(Debug, Serialize, Deserialize)] 116 pub struct SendSegment { 117 pub key: String, 118 pub data: Bytes, 119 } 120 121 #[derive(Debug, Serialize, Deserialize)] 122 pub struct JoinPeers { 123 pub peers: Vec<NodeAddr>, 124 } 125 126 #[derive(Debug, Serialize, Deserialize)] 127 pub struct AddTickets { 128 pub peers: Vec<NodeAddr>, 129 } 130 131 #[derive(Debug, Serialize, Deserialize)] 132 pub struct GetNodeAddr; 133 134 #[derive(Debug, Serialize, Deserialize)] 135 pub struct Shutdown; 136 137 // Use the macro to generate both the Protocol and Message enums 138 // plus implement Channels for each type 139 #[rpc_requests(message = Message)] 140 #[derive(Serialize, Deserialize, Debug)] 141 pub enum Protocol { 142 #[rpc(tx=oneshot::Sender<()>)] 143 Subscribe(Subscribe), 144 #[rpc(tx=oneshot::Sender<()>)] 145 Unsubscribe(Unsubscribe), 146 #[rpc(tx=oneshot::Sender<()>)] 147 SendSegment(SendSegment), 148 #[rpc(tx=oneshot::Sender<()>)] 149 JoinPeers(JoinPeers), 150 #[rpc(tx=oneshot::Sender<()>)] 151 AddTickets(AddTickets), 152 #[rpc(tx=oneshot::Sender<NodeAddr>)] 153 GetNodeAddr(GetNodeAddr), 154 #[rpc(tx=oneshot::Sender<()>)] 155 Shutdown(Shutdown), 156 } 157} 158use api::{Message as ApiMessage, Protocol as ApiProtocol}; 159use n0_future::{FuturesUnordered, StreamExt}; 160use rpc::{Message as RpcMessage, Protocol as RpcProtocol}; 161use snafu::Snafu; 162use tracing::{Instrument, debug, error, trace, trace_span, warn}; 163 164use crate::rpc::RecvSegment; 165 166/// Initialize logging with the default subscriber that respects RUST_LOG environment variable. 167/// This function is safe to call multiple times - it will only initialize logging once. 168#[uniffi::export] 169pub fn init_logging() { 170 LOGGING_INIT.call_once(|| { 171 tracing_subscriber::fmt() 172 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) 173 .init(); 174 }); 175} 176 177/// Initialize logging with a custom log level. 178/// This function is safe to call multiple times - it will only initialize logging once. 179/// 180/// # Arguments 181/// * `level` - Log level as a string (e.g., "trace", "debug", "info", "warn", "error") 182#[uniffi::export] 183pub fn init_logging_with_level(level: String) { 184 LOGGING_INIT.call_once(|| { 185 let filter = tracing_subscriber::EnvFilter::try_from_default_env() 186 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level)); 187 188 tracing_subscriber::fmt().with_env_filter(filter).init(); 189 }); 190} 191 192pub(crate) enum HandlerMode { 193 Sender, 194 Forwarder, 195 Receiver(Arc<dyn DataHandler>), 196} 197 198impl HandlerMode { 199 pub fn mode_str(&self) -> &'static str { 200 match self { 201 HandlerMode::Sender => "sender", 202 HandlerMode::Forwarder => "forwarder", 203 HandlerMode::Receiver(_) => "receiver", 204 } 205 } 206} 207 208type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>; 209 210/// Actor that contains both a kv db for metadata and a handler for the rpc protocol. 211/// 212/// This can be used both for sender and receiver nodes. Sender nodes will just set the 213/// handler to None. 214struct Actor { 215 /// Receiver for rpc messages from remote nodes 216 rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>, 217 /// Receiver for API messages from the user 218 api_rx: tokio::sync::mpsc::Receiver<ApiMessage>, 219 /// nodes I need to send to for each stream 220 subscribers: BTreeMap<String, BTreeSet<NodeId>>, 221 /// nodes I am subscribed to 222 subscriptions: BTreeMap<String, NodeId>, 223 /// lightweight typed connection pool 224 connections: ConnectionPool, 225 /// How to handle incoming data 226 handler: HandlerMode, 227 /// Static provider for node discovery 228 sp: StaticProvider, 229 /// Iroh protocol router, I need to keep it around to keep the protocol alive 230 router: iroh::protocol::Router, 231 /// Metadata db 232 client: db::Db, 233 /// Write scope for this node for the metadata db 234 write: db::WriteScope, 235 /// Ongoing tasks 236 tasks: Tasks, 237 /// Configuration, needed for timeouts etc. 238 config: Arc<Config>, 239} 240 241#[derive(Debug, Clone)] 242struct Connection { 243 id: NodeId, 244 rpc: irpc::Client<RpcProtocol>, 245} 246 247#[derive(Debug, Snafu)] 248enum RpcTaskError { 249 #[snafu(transparent)] 250 Task { source: irpc::Error }, 251 #[snafu(transparent)] 252 Timeout { source: tokio::time::error::Elapsed }, 253} 254 255struct ConnectionPool { 256 endpoint: iroh::Endpoint, 257 connections: BTreeMap<NodeId, Connection>, 258} 259 260impl ConnectionPool { 261 fn new(endpoint: iroh::Endpoint) -> Self { 262 Self { 263 endpoint, 264 connections: BTreeMap::new(), 265 } 266 } 267 268 /// Cheap conn pool hack 269 fn get(&mut self, remote: &NodeId) -> Connection { 270 if !self.connections.contains_key(remote) { 271 trace!(remote = %remote.fmt_short(),"ConnectionPool.get is inserting a new remote connection"); 272 let conn = IrohRemoteConnection::new( 273 self.endpoint.clone(), 274 (*remote).into(), 275 rpc::ALPN.to_vec(), 276 ); 277 let conn = Connection { 278 rpc: irpc::Client::boxed(conn), 279 id: *remote, 280 }; 281 self.connections.insert(*remote, conn); 282 } 283 self.connections 284 .get_mut(remote) 285 .expect("just inserted") 286 .clone() 287 } 288 289 fn remove(&mut self, remote: &NodeId) { 290 self.connections.remove(remote); 291 } 292} 293 294impl Actor { 295 pub async fn spawn( 296 endpoint: iroh::Endpoint, 297 sp: StaticProvider, 298 topic: iroh_gossip::proto::TopicId, 299 config: Config, 300 handler: HandlerMode, 301 ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> { 302 let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(512); 303 let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(512); 304 let gossip = Gossip::builder().spawn(endpoint.clone()); 305 let id = endpoint.node_id(); 306 let router = iroh::protocol::Router::builder(endpoint.clone()) 307 .accept(iroh_gossip::ALPN, gossip.clone()) 308 .accept( 309 rpc::ALPN, 310 IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())), 311 ) 312 .spawn(); 313 let topic = gossip.subscribe(topic, vec![]).await?; 314 let secret = router.endpoint().secret_key().clone(); 315 let db_config = Default::default(); 316 let client = iroh_smol_kv::Client::local(topic, db_config); 317 let write = db::WriteScope::new(client.write(secret.clone())); 318 let client = db::Db::new(client); 319 let actor = Self { 320 rpc_rx, 321 api_rx, 322 subscribers: BTreeMap::new(), 323 subscriptions: BTreeMap::new(), 324 connections: ConnectionPool::new(router.endpoint().clone()), 325 handler, 326 router, 327 sp, 328 write: write.clone(), 329 client: client.clone(), 330 tasks: FuturesUnordered::new(), 331 config: Arc::new(config), 332 }; 333 let api = Node { 334 client: Arc::new(client), 335 write: Arc::new(write), 336 api: irpc::Client::local(api_tx), 337 }; 338 Ok(( 339 api, 340 actor 341 .run() 342 .instrument(trace_span!("actor", id=%id.fmt_short())), 343 )) 344 } 345 346 async fn run(mut self) { 347 loop { 348 tokio::select! { 349 msg = self.rpc_rx.recv() => { 350 trace!("received local rpc message"); 351 let Some(msg) = msg else { 352 error!("rpc channel closed"); 353 break; 354 }; 355 self.handle_rpc(msg).instrument(trace_span!("rpc")).await; 356 } 357 msg = self.api_rx.recv() => { 358 trace!("received remote rpc message"); 359 let Some(msg) = msg else { 360 break; 361 }; 362 if let Some(shutdown) = self.handle_api(msg).instrument(trace_span!("api")).await { 363 shutdown.send(()).await.ok(); 364 break; 365 } 366 } 367 res = self.tasks.next(), if !self.tasks.is_empty() => { 368 trace!("processing task"); 369 let Some((remote_id, res)) = res else { 370 error!("task finished but no result"); 371 break; 372 }; 373 match res { 374 Ok(()) => {} 375 Err(RpcTaskError::Timeout { source }) => { 376 warn!("call to {remote_id} timed out: {source}"); 377 } 378 Err(RpcTaskError::Task { source }) => { 379 warn!("call to {remote_id} failed: {source}"); 380 } 381 } 382 self.connections.remove(&remote_id); 383 } 384 } 385 } 386 warn!("RPC Actor loop has closed"); 387 } 388 389 async fn update_subscriber_meta(&mut self, key: &str) { 390 let n = self 391 .subscribers 392 .get(key) 393 .map(|s| s.len()) 394 .unwrap_or_default(); 395 let v = n.to_string().into_bytes(); 396 self.write 397 .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into()) 398 .await 399 .ok(); 400 } 401 402 /// Requests from remote nodes 403 async fn handle_rpc(&mut self, msg: RpcMessage) { 404 trace!("RPC.handle_rpc"); 405 match msg { 406 RpcMessage::Subscribe(msg) => { 407 trace!(inner = ?msg.inner, "RpcMessage::Subscribe"); 408 let WithChannels { 409 tx, 410 inner: rpc::Subscribe { key, remote_id }, 411 .. 412 } = msg; 413 self.subscribers 414 .entry(key.clone()) 415 .or_default() 416 .insert(remote_id); 417 self.update_subscriber_meta(&key).await; 418 tx.send(()).await.ok(); 419 } 420 RpcMessage::Unsubscribe(msg) => { 421 debug!(inner = ?msg.inner, "RpcMessage::Unsubscribe"); 422 let WithChannels { 423 tx, 424 inner: rpc::Unsubscribe { key, remote_id }, 425 .. 426 } = msg; 427 if let Some(e) = self.subscribers.get_mut(&key) 428 && !e.remove(&remote_id) 429 { 430 warn!( 431 "unsubscribe: no subscription for {} from {}", 432 key, remote_id 433 ); 434 } 435 if let Some(subscriptions) = self.subscribers.get(&key) 436 && subscriptions.is_empty() 437 { 438 self.subscribers.remove(&key); 439 } 440 self.update_subscriber_meta(&key).await; 441 tx.send(()).await.ok(); 442 } 443 RpcMessage::RecvSegment(msg) => { 444 trace!(inner = ?msg.inner, "RpcMessage::RecvSegment"); 445 let WithChannels { 446 tx, 447 inner: rpc::RecvSegment { key, from, data }, 448 .. 449 } = msg; 450 match &self.handler { 451 HandlerMode::Sender => { 452 warn!("received segment but in sender mode"); 453 } 454 HandlerMode::Forwarder => { 455 trace!("forwarding segment"); 456 if let Some(remotes) = self.subscribers.get(&key) { 457 Self::handle_send( 458 &mut self.tasks, 459 &mut self.connections, 460 &self.config, 461 key, 462 data, 463 remotes, 464 ); 465 } else { 466 trace!("no subscribers for stream {}", key); 467 } 468 } 469 HandlerMode::Receiver(handler) => { 470 if self.subscriptions.contains_key(&key) { 471 let from = Arc::new(from.into()); 472 handler.handle_data(from, key, data.to_vec()).await; 473 } else { 474 warn!("received segment for unsubscribed key: {}", key); 475 } 476 } 477 }; 478 tx.send(()).await.ok(); 479 } 480 } 481 } 482 483 async fn handle_api(&mut self, msg: ApiMessage) -> Option<irpc::channel::oneshot::Sender<()>> { 484 trace!("RPC.handle_api"); 485 match msg { 486 ApiMessage::SendSegment(msg) => { 487 trace!(inner = ?msg.inner, "ApiMessage::SendSegment"); 488 let WithChannels { 489 tx, 490 inner: api::SendSegment { key, data }, 491 .. 492 } = msg; 493 if let Some(remotes) = self.subscribers.get(&key) { 494 Self::handle_send( 495 &mut self.tasks, 496 &mut self.connections, 497 &self.config, 498 key, 499 data, 500 remotes, 501 ); 502 } else { 503 trace!("no subscribers for stream {}", key); 504 } 505 tx.send(()).await.ok(); 506 } 507 ApiMessage::Subscribe(msg) => { 508 trace!(inner = ?msg.inner, "ApiMessage::Subscribe"); 509 let WithChannels { 510 tx, 511 inner: api::Subscribe { key, remote_id }, 512 .. 513 } = msg; 514 let conn = self.connections.get(&remote_id); 515 tx.send(()).await.ok(); 516 trace!(remote = %remote_id.fmt_short(), key = %key, "send rpc::Subscribe message"); 517 conn.rpc 518 .rpc(rpc::Subscribe { 519 key: key.clone(), 520 remote_id: self.node_id(), 521 }) 522 .await 523 .ok(); 524 trace!(remote = %remote_id.fmt_short(), key = %key, "inserting subscription"); 525 self.subscriptions.insert(key, remote_id); 526 trace!("finished inserting subscription"); 527 } 528 ApiMessage::Unsubscribe(msg) => { 529 trace!(inner = ?msg.inner, "ApiMessage::Unsubscribe"); 530 let WithChannels { 531 tx, 532 inner: api::Unsubscribe { key, remote_id }, 533 .. 534 } = msg; 535 let conn = self.connections.get(&remote_id); 536 tx.send(()).await.ok(); 537 conn.rpc 538 .rpc(rpc::Unsubscribe { 539 key: key.clone(), 540 remote_id: self.node_id(), 541 }) 542 .await 543 .ok(); 544 self.subscriptions.remove(&key); 545 } 546 ApiMessage::AddTickets(msg) => { 547 trace!(inner = ?msg.inner, "ApiMessage::AddTickets"); 548 let WithChannels { 549 tx, 550 inner: api::AddTickets { peers }, 551 .. 552 } = msg; 553 for addr in &peers { 554 self.sp.add_node_info(addr.clone()); 555 } 556 // self.client.inner().join_peers(ids).await.ok(); 557 tx.send(()).await.ok(); 558 } 559 ApiMessage::JoinPeers(msg) => { 560 trace!(inner = ?msg.inner, "ApiMessage::JoinPeers"); 561 let WithChannels { 562 tx, 563 inner: api::JoinPeers { peers }, 564 .. 565 } = msg; 566 let ids = peers 567 .iter() 568 .map(|a| a.node_id) 569 .filter(|id| *id != self.node_id()) 570 .collect::<HashSet<_>>(); 571 for addr in &peers { 572 self.sp.add_node_info(addr.clone()); 573 } 574 self.client.inner().join_peers(ids).await.ok(); 575 tx.send(()).await.ok(); 576 } 577 ApiMessage::GetNodeAddr(msg) => { 578 trace!(inner = ?msg.inner, "ApiMessage::GetNodeAddr"); 579 let WithChannels { tx, .. } = msg; 580 if !self.config.disable_relay { 581 // don't await home relay if we have disabled relays, this will hang forever 582 self.router.endpoint().online().await; 583 } 584 let addr = self.router.endpoint().node_addr(); 585 tx.send(addr).await.ok(); 586 } 587 ApiMessage::Shutdown(msg) => { 588 trace!(inner = ?msg.inner, "ApiMessage::Shutdown"); 589 return Some(msg.tx); 590 } 591 } 592 None 593 } 594 595 fn handle_send( 596 tasks: &mut Tasks, 597 connections: &mut ConnectionPool, 598 config: &Arc<Config>, 599 key: String, 600 data: Bytes, 601 remotes: &BTreeSet<NodeId>, 602 ) { 603 let me = connections.endpoint.node_id(); 604 let msg = rpc::RecvSegment { 605 key, 606 data, 607 from: me, 608 }; 609 for remote in remotes { 610 trace!(remote = %remote.fmt_short(), key = %msg.key, "handle_send to remote"); 611 let conn = connections.get(remote); 612 tasks.push(Box::pin(Self::forward_task( 613 config.clone(), 614 conn, 615 msg.clone(), 616 ))); 617 } 618 } 619 620 async fn forward_task( 621 config: Arc<Config>, 622 conn: Connection, 623 msg: RecvSegment, 624 ) -> (NodeId, Result<(), RpcTaskError>) { 625 let id = conn.id; 626 let res = async move { 627 tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??; 628 Ok(()) 629 } 630 .await; 631 (id, res) 632 } 633 634 fn node_id(&self) -> PublicKey { 635 self.router.endpoint().node_id() 636 } 637} 638 639/// Iroh-streamplace node that can send, forward or receive stream segments. 640#[derive(Clone, uniffi::Object)] 641pub struct Node { 642 client: Arc<db::Db>, 643 write: Arc<db::WriteScope>, 644 api: irpc::Client<ApiProtocol>, 645} 646 647impl Node { 648 pub(crate) async fn new_in_runtime( 649 config: Config, 650 handler: HandlerMode, 651 ) -> Result<Arc<Self>, CreateError> { 652 let mode_str = Bytes::from(handler.mode_str()); 653 let secret_key = 654 SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| { 655 CreateError::PrivateKey { 656 size: e.len() as u64, 657 } 658 })?); 659 let topic = 660 TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| { 661 CreateError::Topic { 662 size: e.len() as u64, 663 } 664 })?); 665 let relay_mode = if config.disable_relay { 666 RelayMode::Disabled 667 } else { 668 RelayMode::Default 669 }; 670 let sp = StaticProvider::new(); 671 let endpoint = iroh::Endpoint::builder() 672 .secret_key(secret_key) 673 .relay_mode(relay_mode) 674 .discovery(sp.clone()) 675 .bind() 676 .await 677 .map_err(|e| CreateError::Bind { 678 message: e.to_string(), 679 })?; 680 let (api, actor) = Actor::spawn(endpoint, sp, topic, config, handler) 681 .await 682 .map_err(|e| CreateError::Subscribe { 683 message: e.to_string(), 684 })?; 685 api.node_scope() 686 .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str) 687 .await 688 .ok(); 689 tokio::spawn(actor); 690 Ok(Arc::new(api)) 691 } 692} 693 694/// DataHandler trait that is exported to go for receiving data callbacks. 695#[uniffi::export(with_foreign)] 696#[async_trait::async_trait] 697pub trait DataHandler: Send + Sync { 698 async fn handle_data( 699 &self, 700 from: Arc<crate::public_key::PublicKey>, 701 topic: String, 702 data: Vec<u8>, 703 ); 704} 705 706#[uniffi::export] 707impl Node { 708 /// Create a new streamplace client node. 709 #[uniffi::constructor] 710 pub async fn sender(config: Config) -> Result<Arc<Self>, CreateError> { 711 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender)) 712 } 713 714 #[uniffi::constructor] 715 pub async fn forwarder(config: Config) -> Result<Arc<Self>, CreateError> { 716 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder)) 717 } 718 719 #[uniffi::constructor] 720 pub async fn receiver( 721 config: Config, 722 handler: Arc<dyn DataHandler>, 723 ) -> Result<Arc<Self>, CreateError> { 724 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Receiver(handler))) 725 } 726 727 /// Get a handle to the db to watch for changes locally or globally. 728 pub fn db(&self) -> Arc<db::Db> { 729 self.client.clone() 730 } 731 732 /// Get a handle to the write scope for this node. 733 /// 734 /// This is equivalent to calling `db.write(...)` with the secret key used to create the node. 735 pub fn node_scope(&self) -> Arc<db::WriteScope> { 736 self.write.clone() 737 } 738 739 /// Subscribe to updates for a given stream from a remote node. 740 pub async fn subscribe( 741 &self, 742 key: String, 743 remote_id: Arc<crate::public_key::PublicKey>, 744 ) -> Result<(), PutError> { 745 self.api 746 .rpc(api::Subscribe { 747 key, 748 remote_id: remote_id.as_ref().into(), 749 }) 750 .await 751 .map_err(|e| PutError::Irpc { 752 message: e.to_string(), 753 }) 754 } 755 756 /// Unsubscribe from updates for a given stream from a remote node. 757 pub async fn unsubscribe( 758 &self, 759 key: String, 760 remote_id: Arc<crate::public_key::PublicKey>, 761 ) -> Result<(), PutError> { 762 self.api 763 .rpc(api::Unsubscribe { 764 key, 765 remote_id: remote_id.as_ref().into(), 766 }) 767 .await 768 .map_err(|e| PutError::Irpc { 769 message: e.to_string(), 770 }) 771 } 772 773 /// Send a segment to all subscribers of the given stream. 774 pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> { 775 debug!(key = &key, data_len = data.len(), "Node.send_segment"); 776 self.api 777 .rpc(api::SendSegment { 778 key, 779 data: data.into(), 780 }) 781 .await 782 .map_err(|e| PutError::Irpc { 783 message: e.to_string(), 784 }) 785 } 786 787 /// Join peers by their node tickets. 788 pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> { 789 let peers = peers 790 .iter() 791 .map(|p| NodeTicket::from_str(p)) 792 .collect::<Result<Vec<_>, _>>() 793 .map_err(|e| JoinPeersError::Ticket { 794 message: e.to_string(), 795 })?; 796 let addrs = peers 797 .iter() 798 .map(|t| t.node_addr().clone()) 799 .collect::<Vec<_>>(); 800 self.api 801 .rpc(api::JoinPeers { peers: addrs }) 802 .await 803 .map_err(|e| JoinPeersError::Irpc { 804 message: e.to_string(), 805 }) 806 } 807 808 /// Add tickets for remote peers 809 pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> { 810 let peers = peers 811 .iter() 812 .map(|p| NodeTicket::from_str(p)) 813 .collect::<Result<Vec<_>, _>>() 814 .map_err(|e| JoinPeersError::Ticket { 815 message: e.to_string(), 816 })?; 817 let addrs = peers 818 .iter() 819 .map(|t| t.node_addr().clone()) 820 .collect::<Vec<_>>(); 821 self.api 822 .rpc(api::AddTickets { peers: addrs }) 823 .await 824 .map_err(|e| JoinPeersError::Irpc { 825 message: e.to_string(), 826 }) 827 } 828 829 /// Get this node's ticket. 830 pub async fn ticket(&self) -> Result<String, PutError> { 831 let addr = self 832 .api 833 .rpc(api::GetNodeAddr) 834 .await 835 .map_err(|e| PutError::Irpc { 836 message: e.to_string(), 837 })?; 838 Ok(NodeTicket::from(addr).to_string()) 839 } 840 841 /// Get this node's node ID. 842 pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> { 843 let addr = self 844 .api 845 .rpc(api::GetNodeAddr) 846 .await 847 .map_err(|e| PutError::Irpc { 848 message: e.to_string(), 849 })?; 850 Ok(Arc::new(addr.node_id.into())) 851 } 852 853 /// Shutdown the node, including the streaming system and the metadata db. 854 pub async fn shutdown(&self) -> Result<(), ShutdownError> { 855 // shut down both the streams and the db concurrently, even if one fails 856 let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown()); 857 res1?; 858 res2?; 859 Ok(()) 860 } 861} 862 863impl Node { 864 async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> { 865 self.api 866 .rpc(api::Shutdown) 867 .await 868 .map_err(|e| ShutdownError::Irpc { 869 message: e.to_string(), 870 }) 871 } 872}