Live video on the AT Protocol
at eli/docker-deployment-docs 825 lines 26 kB view raw
1uniffi::setup_scaffolding!(); 2 3pub mod c2pa; 4pub mod node_addr; 5pub mod public_key; 6 7use std::sync::LazyLock; 8 9mod db; 10pub use db::*; 11#[cfg(test)] 12mod tests; 13 14/// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime. 15static RUNTIME: LazyLock<tokio::runtime::Runtime> = 16 LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); 17 18use std::{ 19 collections::{BTreeMap, BTreeSet, HashSet}, 20 str::FromStr, 21 sync::Arc, 22}; 23 24use bytes::Bytes; 25use iroh::{NodeId, PublicKey, RelayMode, SecretKey, discovery::static_provider::StaticProvider}; 26use iroh_base::ticket::NodeTicket; 27use iroh_gossip::{net::Gossip, proto::TopicId}; 28use irpc::{WithChannels, rpc::RemoteService}; 29use irpc_iroh::{IrohProtocol, IrohRemoteConnection}; 30use n0_future::future::Boxed; 31 32mod rpc { 33 //! Protocol API 34 use bytes::Bytes; 35 use iroh::NodeId; 36 use irpc::{channel::oneshot, rpc_requests}; 37 use serde::{Deserialize, Serialize}; 38 39 pub const ALPN: &[u8] = b"/iroh/streamplace/1"; 40 41 /// Subscribe to the given `key` 42 #[derive(Debug, Serialize, Deserialize)] 43 pub struct Subscribe { 44 pub key: String, 45 // TODO: verify 46 pub remote_id: NodeId, 47 } 48 49 /// Unsubscribe from the given `key` 50 #[derive(Debug, Serialize, Deserialize)] 51 pub struct Unsubscribe { 52 pub key: String, 53 // TODO: verify 54 pub remote_id: NodeId, 55 } 56 57 // #[derive(Debug, Serialize, Deserialize)] 58 // pub struct SendSegment { 59 // pub key: String, 60 // pub data: Bytes, 61 // } 62 63 #[derive(Debug, Clone, Serialize, Deserialize)] 64 pub struct RecvSegment { 65 pub from: NodeId, 66 pub key: String, 67 pub data: Bytes, 68 } 69 70 // Use the macro to generate both the Protocol and Message enums 71 // plus implement Channels for each type 72 #[rpc_requests(message = Message)] 73 #[derive(Serialize, Deserialize, Debug)] 74 pub enum Protocol { 75 #[rpc(tx=oneshot::Sender<()>)] 76 Subscribe(Subscribe), 77 #[rpc(tx=oneshot::Sender<()>)] 78 Unsubscribe(Unsubscribe), 79 #[rpc(tx=oneshot::Sender<()>)] 80 RecvSegment(RecvSegment), 81 } 82} 83 84mod api { 85 //! Protocol API 86 use bytes::Bytes; 87 use iroh::{NodeAddr, NodeId}; 88 use irpc::{channel::oneshot, rpc_requests}; 89 use serde::{Deserialize, Serialize}; 90 91 /// Subscribe to the given `key` 92 #[derive(Debug, Serialize, Deserialize)] 93 pub struct Subscribe { 94 pub key: String, 95 // TODO: verify 96 pub remote_id: NodeId, 97 } 98 99 /// Unsubscribe from the given `key` 100 #[derive(Debug, Serialize, Deserialize)] 101 pub struct Unsubscribe { 102 pub key: String, 103 // TODO: verify 104 pub remote_id: NodeId, 105 } 106 107 #[derive(Debug, Serialize, Deserialize)] 108 pub struct SendSegment { 109 pub key: String, 110 pub data: Bytes, 111 } 112 113 #[derive(Debug, Serialize, Deserialize)] 114 pub struct JoinPeers { 115 pub peers: Vec<NodeAddr>, 116 } 117 118 #[derive(Debug, Serialize, Deserialize)] 119 pub struct AddTickets { 120 pub peers: Vec<NodeAddr>, 121 } 122 123 #[derive(Debug, Serialize, Deserialize)] 124 pub struct GetNodeAddr; 125 126 #[derive(Debug, Serialize, Deserialize)] 127 pub struct Shutdown; 128 129 // Use the macro to generate both the Protocol and Message enums 130 // plus implement Channels for each type 131 #[rpc_requests(message = Message)] 132 #[derive(Serialize, Deserialize, Debug)] 133 pub enum Protocol { 134 #[rpc(tx=oneshot::Sender<()>)] 135 Subscribe(Subscribe), 136 #[rpc(tx=oneshot::Sender<()>)] 137 Unsubscribe(Unsubscribe), 138 #[rpc(tx=oneshot::Sender<()>)] 139 SendSegment(SendSegment), 140 #[rpc(tx=oneshot::Sender<()>)] 141 JoinPeers(JoinPeers), 142 #[rpc(tx=oneshot::Sender<()>)] 143 AddTickets(AddTickets), 144 #[rpc(tx=oneshot::Sender<NodeAddr>)] 145 GetNodeAddr(GetNodeAddr), 146 #[rpc(tx=oneshot::Sender<()>)] 147 Shutdown(Shutdown), 148 } 149} 150use api::{Message as ApiMessage, Protocol as ApiProtocol}; 151use n0_future::{FuturesUnordered, StreamExt}; 152use rpc::{Message as RpcMessage, Protocol as RpcProtocol}; 153use snafu::Snafu; 154use tracing::{Instrument, debug, error, trace, trace_span, warn}; 155 156use crate::rpc::RecvSegment; 157 158pub(crate) enum HandlerMode { 159 Sender, 160 Forwarder, 161 Receiver(Arc<dyn DataHandler>), 162} 163 164impl HandlerMode { 165 pub fn mode_str(&self) -> &'static str { 166 match self { 167 HandlerMode::Sender => "sender", 168 HandlerMode::Forwarder => "forwarder", 169 HandlerMode::Receiver(_) => "receiver", 170 } 171 } 172} 173 174type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>; 175 176/// Actor that contains both a kv db for metadata and a handler for the rpc protocol. 177/// 178/// This can be used both for sender and receiver nodes. Sender nodes will just set the 179/// handler to None. 180struct Actor { 181 /// Receiver for rpc messages from remote nodes 182 rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>, 183 /// Receiver for API messages from the user 184 api_rx: tokio::sync::mpsc::Receiver<ApiMessage>, 185 /// nodes I need to send to for each stream 186 subscribers: BTreeMap<String, BTreeSet<NodeId>>, 187 /// nodes I am subscribed to 188 subscriptions: BTreeMap<String, NodeId>, 189 /// lightweight typed connection pool 190 connections: ConnectionPool, 191 /// How to handle incoming data 192 handler: HandlerMode, 193 /// Static provider for node discovery 194 sp: StaticProvider, 195 /// Iroh protocol router, I need to keep it around to keep the protocol alive 196 router: iroh::protocol::Router, 197 /// Metadata db 198 client: db::Db, 199 /// Write scope for this node for the metadata db 200 write: db::WriteScope, 201 /// Ongoing tasks 202 tasks: Tasks, 203 /// Configuration, needed for timeouts etc. 204 config: Arc<Config>, 205} 206 207#[derive(Debug, Clone)] 208struct Connection { 209 id: NodeId, 210 rpc: irpc::Client<RpcProtocol>, 211} 212 213#[derive(Debug, Snafu)] 214enum RpcTaskError { 215 #[snafu(transparent)] 216 Task { source: irpc::Error }, 217 #[snafu(transparent)] 218 Timeout { source: tokio::time::error::Elapsed }, 219} 220 221struct ConnectionPool { 222 endpoint: iroh::Endpoint, 223 connections: BTreeMap<NodeId, Connection>, 224} 225 226impl ConnectionPool { 227 fn new(endpoint: iroh::Endpoint) -> Self { 228 Self { 229 endpoint, 230 connections: BTreeMap::new(), 231 } 232 } 233 234 /// Cheap conn pool hack 235 fn get(&mut self, remote: &NodeId) -> Connection { 236 if !self.connections.contains_key(remote) { 237 let conn = IrohRemoteConnection::new( 238 self.endpoint.clone(), 239 (*remote).into(), 240 rpc::ALPN.to_vec(), 241 ); 242 let conn = Connection { 243 rpc: irpc::Client::boxed(conn), 244 id: *remote, 245 }; 246 self.connections.insert(*remote, conn); 247 } 248 self.connections 249 .get_mut(remote) 250 .expect("just inserted") 251 .clone() 252 } 253 254 fn remove(&mut self, remote: &NodeId) { 255 self.connections.remove(remote); 256 } 257} 258 259impl Actor { 260 pub async fn spawn( 261 endpoint: iroh::Endpoint, 262 sp: StaticProvider, 263 topic: iroh_gossip::proto::TopicId, 264 config: Config, 265 handler: HandlerMode, 266 ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> { 267 let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(32); 268 let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(32); 269 let gossip = Gossip::builder().spawn(endpoint.clone()); 270 let id = endpoint.node_id(); 271 let router = iroh::protocol::Router::builder(endpoint.clone()) 272 .accept(iroh_gossip::ALPN, gossip.clone()) 273 .accept( 274 rpc::ALPN, 275 IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())), 276 ) 277 .spawn(); 278 let topic = gossip.subscribe(topic, vec![]).await?; 279 let secret = router.endpoint().secret_key().clone(); 280 let db_config = Default::default(); 281 let client = iroh_smol_kv::Client::local(topic, db_config); 282 let write = db::WriteScope::new(client.write(secret.clone())); 283 let client = db::Db::new(client); 284 let actor = Self { 285 rpc_rx, 286 api_rx, 287 subscribers: BTreeMap::new(), 288 subscriptions: BTreeMap::new(), 289 connections: ConnectionPool::new(router.endpoint().clone()), 290 handler, 291 router, 292 sp, 293 write: write.clone(), 294 client: client.clone(), 295 tasks: FuturesUnordered::new(), 296 config: Arc::new(config), 297 }; 298 let api = Node { 299 client: Arc::new(client), 300 write: Arc::new(write), 301 api: irpc::Client::local(api_tx), 302 }; 303 Ok(( 304 api, 305 actor 306 .run() 307 .instrument(trace_span!("actor", id=%id.fmt_short())), 308 )) 309 } 310 311 async fn run(mut self) { 312 loop { 313 tokio::select! { 314 msg = self.rpc_rx.recv() => { 315 let Some(msg) = msg else { 316 error!("rpc channel closed"); 317 break; 318 }; 319 self.handle_rpc(msg).instrument(trace_span!("rpc")).await; 320 } 321 msg = self.api_rx.recv() => { 322 let Some(msg) = msg else { 323 break; 324 }; 325 if let Some(shutdown) = self.handle_api(msg).instrument(trace_span!("api")).await { 326 shutdown.send(()).await.ok(); 327 break; 328 } 329 } 330 res = self.tasks.next(), if !self.tasks.is_empty() => { 331 let Some((remote_id, res)) = res else { 332 error!("task finished but no result"); 333 break; 334 }; 335 match res { 336 Ok(()) => {} 337 Err(RpcTaskError::Timeout { source }) => { 338 warn!("call to {remote_id} timed out: {source}"); 339 } 340 Err(RpcTaskError::Task { source }) => { 341 warn!("call to {remote_id} failed: {source}"); 342 } 343 } 344 self.connections.remove(&remote_id); 345 } 346 } 347 } 348 } 349 350 async fn update_subscriber_meta(&mut self, key: &str) { 351 let n = self 352 .subscribers 353 .get(key) 354 .map(|s| s.len()) 355 .unwrap_or_default(); 356 let v = n.to_string().into_bytes(); 357 self.write 358 .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into()) 359 .await 360 .ok(); 361 } 362 363 /// Requests from remote nodes 364 async fn handle_rpc(&mut self, msg: RpcMessage) { 365 match msg { 366 RpcMessage::Subscribe(msg) => { 367 trace!("{:?}", msg.inner); 368 let WithChannels { 369 tx, 370 inner: rpc::Subscribe { key, remote_id }, 371 .. 372 } = msg; 373 self.subscribers 374 .entry(key.clone()) 375 .or_default() 376 .insert(remote_id); 377 self.update_subscriber_meta(&key).await; 378 tx.send(()).await.ok(); 379 } 380 RpcMessage::Unsubscribe(msg) => { 381 debug!("{:?}", msg.inner); 382 let WithChannels { 383 tx, 384 inner: rpc::Unsubscribe { key, remote_id }, 385 .. 386 } = msg; 387 if let Some(e) = self.subscribers.get_mut(&key) 388 && !e.remove(&remote_id) 389 { 390 warn!( 391 "unsubscribe: no subscription for {} from {}", 392 key, remote_id 393 ); 394 } 395 if let Some(subscriptions) = self.subscribers.get(&key) 396 && subscriptions.is_empty() 397 { 398 self.subscribers.remove(&key); 399 } 400 self.update_subscriber_meta(&key).await; 401 tx.send(()).await.ok(); 402 } 403 RpcMessage::RecvSegment(msg) => { 404 trace!("{:?}", msg.inner); 405 let WithChannels { 406 tx, 407 inner: rpc::RecvSegment { key, from, data }, 408 .. 409 } = msg; 410 match &self.handler { 411 HandlerMode::Sender => { 412 warn!("received segment but in sender mode"); 413 } 414 HandlerMode::Forwarder => { 415 if let Some(remotes) = self.subscribers.get(&key) { 416 Self::handle_send( 417 &mut self.tasks, 418 &mut self.connections, 419 &self.config, 420 key, 421 data, 422 remotes, 423 ); 424 } else { 425 trace!("no subscribers for stream {}", key); 426 } 427 } 428 HandlerMode::Receiver(handler) => { 429 if self.subscriptions.contains_key(&key) { 430 let from = Arc::new(from.into()); 431 handler.handle_data(from, key, data.to_vec()).await; 432 } else { 433 warn!("received segment for unsubscribed key: {}", key); 434 } 435 } 436 }; 437 tx.send(()).await.ok(); 438 } 439 } 440 } 441 442 async fn handle_api(&mut self, msg: ApiMessage) -> Option<irpc::channel::oneshot::Sender<()>> { 443 match msg { 444 ApiMessage::SendSegment(msg) => { 445 trace!("{:?}", msg.inner); 446 let WithChannels { 447 tx, 448 inner: api::SendSegment { key, data }, 449 .. 450 } = msg; 451 if let Some(remotes) = self.subscribers.get(&key) { 452 Self::handle_send( 453 &mut self.tasks, 454 &mut self.connections, 455 &self.config, 456 key, 457 data, 458 remotes, 459 ); 460 } else { 461 trace!("no subscribers for stream {}", key); 462 } 463 tx.send(()).await.ok(); 464 } 465 ApiMessage::Subscribe(msg) => { 466 trace!("{:?}", msg.inner); 467 let WithChannels { 468 tx, 469 inner: api::Subscribe { key, remote_id }, 470 .. 471 } = msg; 472 let conn = self.connections.get(&remote_id); 473 conn.rpc 474 .rpc(rpc::Subscribe { 475 key: key.clone(), 476 remote_id: self.node_id(), 477 }) 478 .await 479 .ok(); 480 self.subscriptions.insert(key, remote_id); 481 tx.send(()).await.ok(); 482 } 483 ApiMessage::Unsubscribe(msg) => { 484 trace!("{:?}", msg.inner); 485 let WithChannels { 486 tx, 487 inner: api::Unsubscribe { key, remote_id }, 488 .. 489 } = msg; 490 let conn = self.connections.get(&remote_id); 491 conn.rpc 492 .rpc(rpc::Unsubscribe { 493 key: key.clone(), 494 remote_id: self.node_id(), 495 }) 496 .await 497 .ok(); 498 self.subscriptions.remove(&key); 499 tx.send(()).await.ok(); 500 } 501 ApiMessage::AddTickets(msg) => { 502 trace!("{:?}", msg.inner); 503 let WithChannels { 504 tx, 505 inner: api::AddTickets { peers }, 506 .. 507 } = msg; 508 for addr in &peers { 509 self.sp.add_node_info(addr.clone()); 510 } 511 // self.client.inner().join_peers(ids).await.ok(); 512 tx.send(()).await.ok(); 513 } 514 ApiMessage::JoinPeers(msg) => { 515 trace!("{:?}", msg.inner); 516 let WithChannels { 517 tx, 518 inner: api::JoinPeers { peers }, 519 .. 520 } = msg; 521 let ids = peers 522 .iter() 523 .map(|a| a.node_id) 524 .filter(|id| *id != self.node_id()) 525 .collect::<HashSet<_>>(); 526 for addr in &peers { 527 self.sp.add_node_info(addr.clone()); 528 } 529 self.client.inner().join_peers(ids).await.ok(); 530 tx.send(()).await.ok(); 531 } 532 ApiMessage::GetNodeAddr(msg) => { 533 trace!("{:?}", msg.inner); 534 let WithChannels { tx, .. } = msg; 535 if !self.config.disable_relay { 536 // don't await home relay if we have disabled relays, this will hang forever 537 self.router.endpoint().online().await; 538 } 539 let addr = self.router.endpoint().node_addr(); 540 tx.send(addr).await.ok(); 541 } 542 ApiMessage::Shutdown(msg) => { 543 return Some(msg.tx); 544 } 545 } 546 None 547 } 548 549 fn handle_send( 550 tasks: &mut Tasks, 551 connections: &mut ConnectionPool, 552 config: &Arc<Config>, 553 key: String, 554 data: Bytes, 555 remotes: &BTreeSet<NodeId>, 556 ) { 557 let me = connections.endpoint.node_id(); 558 let msg = rpc::RecvSegment { 559 key, 560 data, 561 from: me, 562 }; 563 for remote in remotes { 564 trace!("sending to stream {}: {}", msg.key, remote); 565 let conn = connections.get(remote); 566 tasks.push(Box::pin(Self::forward_task( 567 config.clone(), 568 conn, 569 msg.clone(), 570 ))); 571 } 572 } 573 574 async fn forward_task( 575 config: Arc<Config>, 576 conn: Connection, 577 msg: RecvSegment, 578 ) -> (NodeId, Result<(), RpcTaskError>) { 579 let id = conn.id; 580 let res = async move { 581 tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??; 582 Ok(()) 583 } 584 .await; 585 (id, res) 586 } 587 588 fn node_id(&self) -> PublicKey { 589 self.router.endpoint().node_id() 590 } 591} 592 593/// Iroh-streamplace node that can send, forward or receive stream segments. 594#[derive(Clone, uniffi::Object)] 595pub struct Node { 596 client: Arc<db::Db>, 597 write: Arc<db::WriteScope>, 598 api: irpc::Client<ApiProtocol>, 599} 600 601impl Node { 602 pub(crate) async fn new_in_runtime( 603 config: Config, 604 handler: HandlerMode, 605 ) -> Result<Arc<Self>, CreateError> { 606 let mode_str = Bytes::from(handler.mode_str()); 607 let secret_key = 608 SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| { 609 CreateError::PrivateKey { 610 size: e.len() as u64, 611 } 612 })?); 613 let topic = 614 TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| { 615 CreateError::Topic { 616 size: e.len() as u64, 617 } 618 })?); 619 let relay_mode = if config.disable_relay { 620 RelayMode::Disabled 621 } else { 622 RelayMode::Default 623 }; 624 let sp = StaticProvider::new(); 625 let endpoint = iroh::Endpoint::builder() 626 .secret_key(secret_key) 627 .relay_mode(relay_mode) 628 .discovery(sp.clone()) 629 .bind() 630 .await 631 .map_err(|e| CreateError::Bind { 632 message: e.to_string(), 633 })?; 634 let (api, actor) = Actor::spawn(endpoint, sp, topic, config, handler) 635 .await 636 .map_err(|e| CreateError::Subscribe { 637 message: e.to_string(), 638 })?; 639 api.node_scope() 640 .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str) 641 .await 642 .ok(); 643 tokio::spawn(actor); 644 Ok(Arc::new(api)) 645 } 646} 647 648/// DataHandler trait that is exported to go for receiving data callbacks. 649#[uniffi::export(with_foreign)] 650#[async_trait::async_trait] 651pub trait DataHandler: Send + Sync { 652 async fn handle_data( 653 &self, 654 from: Arc<crate::public_key::PublicKey>, 655 topic: String, 656 data: Vec<u8>, 657 ); 658} 659 660#[uniffi::export] 661impl Node { 662 /// Create a new streamplace client node. 663 #[uniffi::constructor] 664 pub async fn sender(config: Config) -> Result<Arc<Self>, CreateError> { 665 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender)) 666 } 667 668 #[uniffi::constructor] 669 pub async fn forwarder(config: Config) -> Result<Arc<Self>, CreateError> { 670 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder)) 671 } 672 673 #[uniffi::constructor] 674 pub async fn receiver( 675 config: Config, 676 handler: Arc<dyn DataHandler>, 677 ) -> Result<Arc<Self>, CreateError> { 678 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Receiver(handler))) 679 } 680 681 /// Get a handle to the db to watch for changes locally or globally. 682 pub fn db(&self) -> Arc<db::Db> { 683 self.client.clone() 684 } 685 686 /// Get a handle to the write scope for this node. 687 /// 688 /// This is equivalent to calling `db.write(...)` with the secret key used to create the node. 689 pub fn node_scope(&self) -> Arc<db::WriteScope> { 690 self.write.clone() 691 } 692 693 /// Subscribe to updates for a given stream from a remote node. 694 pub async fn subscribe( 695 &self, 696 key: String, 697 remote_id: Arc<crate::public_key::PublicKey>, 698 ) -> Result<(), PutError> { 699 self.api 700 .rpc(api::Subscribe { 701 key, 702 remote_id: remote_id.as_ref().into(), 703 }) 704 .await 705 .map_err(|e| PutError::Irpc { 706 message: e.to_string(), 707 }) 708 } 709 710 /// Unsubscribe from updates for a given stream from a remote node. 711 pub async fn unsubscribe( 712 &self, 713 key: String, 714 remote_id: Arc<crate::public_key::PublicKey>, 715 ) -> Result<(), PutError> { 716 self.api 717 .rpc(api::Unsubscribe { 718 key, 719 remote_id: remote_id.as_ref().into(), 720 }) 721 .await 722 .map_err(|e| PutError::Irpc { 723 message: e.to_string(), 724 }) 725 } 726 727 /// Send a segment to all subscribers of the given stream. 728 pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> { 729 self.api 730 .rpc(api::SendSegment { 731 key, 732 data: data.into(), 733 }) 734 .await 735 .map_err(|e| PutError::Irpc { 736 message: e.to_string(), 737 }) 738 } 739 740 /// Join peers by their node tickets. 741 pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> { 742 let peers = peers 743 .iter() 744 .map(|p| NodeTicket::from_str(p)) 745 .collect::<Result<Vec<_>, _>>() 746 .map_err(|e| JoinPeersError::Ticket { 747 message: e.to_string(), 748 })?; 749 let addrs = peers 750 .iter() 751 .map(|t| t.node_addr().clone()) 752 .collect::<Vec<_>>(); 753 self.api 754 .rpc(api::JoinPeers { peers: addrs }) 755 .await 756 .map_err(|e| JoinPeersError::Irpc { 757 message: e.to_string(), 758 }) 759 } 760 761 /// Add tickets for remote peers 762 pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> { 763 let peers = peers 764 .iter() 765 .map(|p| NodeTicket::from_str(p)) 766 .collect::<Result<Vec<_>, _>>() 767 .map_err(|e| JoinPeersError::Ticket { 768 message: e.to_string(), 769 })?; 770 let addrs = peers 771 .iter() 772 .map(|t| t.node_addr().clone()) 773 .collect::<Vec<_>>(); 774 self.api 775 .rpc(api::AddTickets { peers: addrs }) 776 .await 777 .map_err(|e| JoinPeersError::Irpc { 778 message: e.to_string(), 779 }) 780 } 781 782 /// Get this node's ticket. 783 pub async fn ticket(&self) -> Result<String, PutError> { 784 let addr = self 785 .api 786 .rpc(api::GetNodeAddr) 787 .await 788 .map_err(|e| PutError::Irpc { 789 message: e.to_string(), 790 })?; 791 Ok(NodeTicket::from(addr).to_string()) 792 } 793 794 /// Get this node's node ID. 795 pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> { 796 let addr = self 797 .api 798 .rpc(api::GetNodeAddr) 799 .await 800 .map_err(|e| PutError::Irpc { 801 message: e.to_string(), 802 })?; 803 Ok(Arc::new(addr.node_id.into())) 804 } 805 806 /// Shutdown the node, including the streaming system and the metadata db. 807 pub async fn shutdown(&self) -> Result<(), ShutdownError> { 808 // shut down both the streams and the db concurrently, even if one fails 809 let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown()); 810 res1?; 811 res2?; 812 Ok(()) 813 } 814} 815 816impl Node { 817 async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> { 818 self.api 819 .rpc(api::Shutdown) 820 .await 821 .map_err(|e| ShutdownError::Irpc { 822 message: e.to_string(), 823 }) 824 } 825}