Live video on the AT Protocol
1uniffi::setup_scaffolding!();
2
3pub mod c2pa;
4pub mod node_addr;
5pub mod public_key;
6
7use std::sync::LazyLock;
8
9mod db;
10pub use db::*;
11#[cfg(test)]
12mod tests;
13
14/// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime.
15static RUNTIME: LazyLock<tokio::runtime::Runtime> =
16 LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
17
18use std::{
19 collections::{BTreeMap, BTreeSet, HashSet},
20 str::FromStr,
21 sync::Arc,
22};
23
24use bytes::Bytes;
25use iroh::{NodeId, PublicKey, RelayMode, SecretKey, discovery::static_provider::StaticProvider};
26use iroh_base::ticket::NodeTicket;
27use iroh_gossip::{net::Gossip, proto::TopicId};
28use irpc::{WithChannels, rpc::RemoteService};
29use irpc_iroh::{IrohProtocol, IrohRemoteConnection};
30use n0_future::future::Boxed;
31
32mod rpc {
33 //! Protocol API
34 use bytes::Bytes;
35 use iroh::NodeId;
36 use irpc::{channel::oneshot, rpc_requests};
37 use serde::{Deserialize, Serialize};
38
39 pub const ALPN: &[u8] = b"/iroh/streamplace/1";
40
41 /// Subscribe to the given `key`
42 #[derive(Debug, Serialize, Deserialize)]
43 pub struct Subscribe {
44 pub key: String,
45 // TODO: verify
46 pub remote_id: NodeId,
47 }
48
49 /// Unsubscribe from the given `key`
50 #[derive(Debug, Serialize, Deserialize)]
51 pub struct Unsubscribe {
52 pub key: String,
53 // TODO: verify
54 pub remote_id: NodeId,
55 }
56
57 // #[derive(Debug, Serialize, Deserialize)]
58 // pub struct SendSegment {
59 // pub key: String,
60 // pub data: Bytes,
61 // }
62
63 #[derive(Debug, Clone, Serialize, Deserialize)]
64 pub struct RecvSegment {
65 pub from: NodeId,
66 pub key: String,
67 pub data: Bytes,
68 }
69
70 // Use the macro to generate both the Protocol and Message enums
71 // plus implement Channels for each type
72 #[rpc_requests(message = Message)]
73 #[derive(Serialize, Deserialize, Debug)]
74 pub enum Protocol {
75 #[rpc(tx=oneshot::Sender<()>)]
76 Subscribe(Subscribe),
77 #[rpc(tx=oneshot::Sender<()>)]
78 Unsubscribe(Unsubscribe),
79 #[rpc(tx=oneshot::Sender<()>)]
80 RecvSegment(RecvSegment),
81 }
82}
83
84mod api {
85 //! Protocol API
86 use bytes::Bytes;
87 use iroh::{NodeAddr, NodeId};
88 use irpc::{channel::oneshot, rpc_requests};
89 use serde::{Deserialize, Serialize};
90
91 /// Subscribe to the given `key`
92 #[derive(Debug, Serialize, Deserialize)]
93 pub struct Subscribe {
94 pub key: String,
95 // TODO: verify
96 pub remote_id: NodeId,
97 }
98
99 /// Unsubscribe from the given `key`
100 #[derive(Debug, Serialize, Deserialize)]
101 pub struct Unsubscribe {
102 pub key: String,
103 // TODO: verify
104 pub remote_id: NodeId,
105 }
106
107 #[derive(Debug, Serialize, Deserialize)]
108 pub struct SendSegment {
109 pub key: String,
110 pub data: Bytes,
111 }
112
113 #[derive(Debug, Serialize, Deserialize)]
114 pub struct JoinPeers {
115 pub peers: Vec<NodeAddr>,
116 }
117
118 #[derive(Debug, Serialize, Deserialize)]
119 pub struct AddTickets {
120 pub peers: Vec<NodeAddr>,
121 }
122
123 #[derive(Debug, Serialize, Deserialize)]
124 pub struct GetNodeAddr;
125
126 #[derive(Debug, Serialize, Deserialize)]
127 pub struct Shutdown;
128
129 // Use the macro to generate both the Protocol and Message enums
130 // plus implement Channels for each type
131 #[rpc_requests(message = Message)]
132 #[derive(Serialize, Deserialize, Debug)]
133 pub enum Protocol {
134 #[rpc(tx=oneshot::Sender<()>)]
135 Subscribe(Subscribe),
136 #[rpc(tx=oneshot::Sender<()>)]
137 Unsubscribe(Unsubscribe),
138 #[rpc(tx=oneshot::Sender<()>)]
139 SendSegment(SendSegment),
140 #[rpc(tx=oneshot::Sender<()>)]
141 JoinPeers(JoinPeers),
142 #[rpc(tx=oneshot::Sender<()>)]
143 AddTickets(AddTickets),
144 #[rpc(tx=oneshot::Sender<NodeAddr>)]
145 GetNodeAddr(GetNodeAddr),
146 #[rpc(tx=oneshot::Sender<()>)]
147 Shutdown(Shutdown),
148 }
149}
150use api::{Message as ApiMessage, Protocol as ApiProtocol};
151use n0_future::{FuturesUnordered, StreamExt};
152use rpc::{Message as RpcMessage, Protocol as RpcProtocol};
153use snafu::Snafu;
154use tracing::{Instrument, debug, error, trace, trace_span, warn};
155
156use crate::rpc::RecvSegment;
157
158pub(crate) enum HandlerMode {
159 Sender,
160 Forwarder,
161 Receiver(Arc<dyn DataHandler>),
162}
163
164impl HandlerMode {
165 pub fn mode_str(&self) -> &'static str {
166 match self {
167 HandlerMode::Sender => "sender",
168 HandlerMode::Forwarder => "forwarder",
169 HandlerMode::Receiver(_) => "receiver",
170 }
171 }
172}
173
174type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>;
175
176/// Actor that contains both a kv db for metadata and a handler for the rpc protocol.
177///
178/// This can be used both for sender and receiver nodes. Sender nodes will just set the
179/// handler to None.
180struct Actor {
181 /// Receiver for rpc messages from remote nodes
182 rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>,
183 /// Receiver for API messages from the user
184 api_rx: tokio::sync::mpsc::Receiver<ApiMessage>,
185 /// nodes I need to send to for each stream
186 subscribers: BTreeMap<String, BTreeSet<NodeId>>,
187 /// nodes I am subscribed to
188 subscriptions: BTreeMap<String, NodeId>,
189 /// lightweight typed connection pool
190 connections: ConnectionPool,
191 /// How to handle incoming data
192 handler: HandlerMode,
193 /// Static provider for node discovery
194 sp: StaticProvider,
195 /// Iroh protocol router, I need to keep it around to keep the protocol alive
196 router: iroh::protocol::Router,
197 /// Metadata db
198 client: db::Db,
199 /// Write scope for this node for the metadata db
200 write: db::WriteScope,
201 /// Ongoing tasks
202 tasks: Tasks,
203 /// Configuration, needed for timeouts etc.
204 config: Arc<Config>,
205}
206
207#[derive(Debug, Clone)]
208struct Connection {
209 id: NodeId,
210 rpc: irpc::Client<RpcProtocol>,
211}
212
213#[derive(Debug, Snafu)]
214enum RpcTaskError {
215 #[snafu(transparent)]
216 Task { source: irpc::Error },
217 #[snafu(transparent)]
218 Timeout { source: tokio::time::error::Elapsed },
219}
220
221struct ConnectionPool {
222 endpoint: iroh::Endpoint,
223 connections: BTreeMap<NodeId, Connection>,
224}
225
226impl ConnectionPool {
227 fn new(endpoint: iroh::Endpoint) -> Self {
228 Self {
229 endpoint,
230 connections: BTreeMap::new(),
231 }
232 }
233
234 /// Cheap conn pool hack
235 fn get(&mut self, remote: &NodeId) -> Connection {
236 if !self.connections.contains_key(remote) {
237 let conn = IrohRemoteConnection::new(
238 self.endpoint.clone(),
239 (*remote).into(),
240 rpc::ALPN.to_vec(),
241 );
242 let conn = Connection {
243 rpc: irpc::Client::boxed(conn),
244 id: *remote,
245 };
246 self.connections.insert(*remote, conn);
247 }
248 self.connections
249 .get_mut(remote)
250 .expect("just inserted")
251 .clone()
252 }
253
254 fn remove(&mut self, remote: &NodeId) {
255 self.connections.remove(remote);
256 }
257}
258
259impl Actor {
260 pub async fn spawn(
261 endpoint: iroh::Endpoint,
262 sp: StaticProvider,
263 topic: iroh_gossip::proto::TopicId,
264 config: Config,
265 handler: HandlerMode,
266 ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> {
267 let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(32);
268 let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(32);
269 let gossip = Gossip::builder().spawn(endpoint.clone());
270 let id = endpoint.node_id();
271 let router = iroh::protocol::Router::builder(endpoint.clone())
272 .accept(iroh_gossip::ALPN, gossip.clone())
273 .accept(
274 rpc::ALPN,
275 IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())),
276 )
277 .spawn();
278 let topic = gossip.subscribe(topic, vec![]).await?;
279 let secret = router.endpoint().secret_key().clone();
280 let db_config = Default::default();
281 let client = iroh_smol_kv::Client::local(topic, db_config);
282 let write = db::WriteScope::new(client.write(secret.clone()));
283 let client = db::Db::new(client);
284 let actor = Self {
285 rpc_rx,
286 api_rx,
287 subscribers: BTreeMap::new(),
288 subscriptions: BTreeMap::new(),
289 connections: ConnectionPool::new(router.endpoint().clone()),
290 handler,
291 router,
292 sp,
293 write: write.clone(),
294 client: client.clone(),
295 tasks: FuturesUnordered::new(),
296 config: Arc::new(config),
297 };
298 let api = Node {
299 client: Arc::new(client),
300 write: Arc::new(write),
301 api: irpc::Client::local(api_tx),
302 };
303 Ok((
304 api,
305 actor
306 .run()
307 .instrument(trace_span!("actor", id=%id.fmt_short())),
308 ))
309 }
310
311 async fn run(mut self) {
312 loop {
313 tokio::select! {
314 msg = self.rpc_rx.recv() => {
315 let Some(msg) = msg else {
316 error!("rpc channel closed");
317 break;
318 };
319 self.handle_rpc(msg).instrument(trace_span!("rpc")).await;
320 }
321 msg = self.api_rx.recv() => {
322 let Some(msg) = msg else {
323 break;
324 };
325 if let Some(shutdown) = self.handle_api(msg).instrument(trace_span!("api")).await {
326 shutdown.send(()).await.ok();
327 break;
328 }
329 }
330 res = self.tasks.next(), if !self.tasks.is_empty() => {
331 let Some((remote_id, res)) = res else {
332 error!("task finished but no result");
333 break;
334 };
335 match res {
336 Ok(()) => {}
337 Err(RpcTaskError::Timeout { source }) => {
338 warn!("call to {remote_id} timed out: {source}");
339 }
340 Err(RpcTaskError::Task { source }) => {
341 warn!("call to {remote_id} failed: {source}");
342 }
343 }
344 self.connections.remove(&remote_id);
345 }
346 }
347 }
348 }
349
350 async fn update_subscriber_meta(&mut self, key: &str) {
351 let n = self
352 .subscribers
353 .get(key)
354 .map(|s| s.len())
355 .unwrap_or_default();
356 let v = n.to_string().into_bytes();
357 self.write
358 .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into())
359 .await
360 .ok();
361 }
362
363 /// Requests from remote nodes
364 async fn handle_rpc(&mut self, msg: RpcMessage) {
365 match msg {
366 RpcMessage::Subscribe(msg) => {
367 trace!("{:?}", msg.inner);
368 let WithChannels {
369 tx,
370 inner: rpc::Subscribe { key, remote_id },
371 ..
372 } = msg;
373 self.subscribers
374 .entry(key.clone())
375 .or_default()
376 .insert(remote_id);
377 self.update_subscriber_meta(&key).await;
378 tx.send(()).await.ok();
379 }
380 RpcMessage::Unsubscribe(msg) => {
381 debug!("{:?}", msg.inner);
382 let WithChannels {
383 tx,
384 inner: rpc::Unsubscribe { key, remote_id },
385 ..
386 } = msg;
387 if let Some(e) = self.subscribers.get_mut(&key)
388 && !e.remove(&remote_id)
389 {
390 warn!(
391 "unsubscribe: no subscription for {} from {}",
392 key, remote_id
393 );
394 }
395 if let Some(subscriptions) = self.subscribers.get(&key)
396 && subscriptions.is_empty()
397 {
398 self.subscribers.remove(&key);
399 }
400 self.update_subscriber_meta(&key).await;
401 tx.send(()).await.ok();
402 }
403 RpcMessage::RecvSegment(msg) => {
404 trace!("{:?}", msg.inner);
405 let WithChannels {
406 tx,
407 inner: rpc::RecvSegment { key, from, data },
408 ..
409 } = msg;
410 match &self.handler {
411 HandlerMode::Sender => {
412 warn!("received segment but in sender mode");
413 }
414 HandlerMode::Forwarder => {
415 if let Some(remotes) = self.subscribers.get(&key) {
416 Self::handle_send(
417 &mut self.tasks,
418 &mut self.connections,
419 &self.config,
420 key,
421 data,
422 remotes,
423 );
424 } else {
425 trace!("no subscribers for stream {}", key);
426 }
427 }
428 HandlerMode::Receiver(handler) => {
429 if self.subscriptions.contains_key(&key) {
430 let from = Arc::new(from.into());
431 handler.handle_data(from, key, data.to_vec()).await;
432 } else {
433 warn!("received segment for unsubscribed key: {}", key);
434 }
435 }
436 };
437 tx.send(()).await.ok();
438 }
439 }
440 }
441
442 async fn handle_api(&mut self, msg: ApiMessage) -> Option<irpc::channel::oneshot::Sender<()>> {
443 match msg {
444 ApiMessage::SendSegment(msg) => {
445 trace!("{:?}", msg.inner);
446 let WithChannels {
447 tx,
448 inner: api::SendSegment { key, data },
449 ..
450 } = msg;
451 if let Some(remotes) = self.subscribers.get(&key) {
452 Self::handle_send(
453 &mut self.tasks,
454 &mut self.connections,
455 &self.config,
456 key,
457 data,
458 remotes,
459 );
460 } else {
461 trace!("no subscribers for stream {}", key);
462 }
463 tx.send(()).await.ok();
464 }
465 ApiMessage::Subscribe(msg) => {
466 trace!("{:?}", msg.inner);
467 let WithChannels {
468 tx,
469 inner: api::Subscribe { key, remote_id },
470 ..
471 } = msg;
472 let conn = self.connections.get(&remote_id);
473 conn.rpc
474 .rpc(rpc::Subscribe {
475 key: key.clone(),
476 remote_id: self.node_id(),
477 })
478 .await
479 .ok();
480 self.subscriptions.insert(key, remote_id);
481 tx.send(()).await.ok();
482 }
483 ApiMessage::Unsubscribe(msg) => {
484 trace!("{:?}", msg.inner);
485 let WithChannels {
486 tx,
487 inner: api::Unsubscribe { key, remote_id },
488 ..
489 } = msg;
490 let conn = self.connections.get(&remote_id);
491 conn.rpc
492 .rpc(rpc::Unsubscribe {
493 key: key.clone(),
494 remote_id: self.node_id(),
495 })
496 .await
497 .ok();
498 self.subscriptions.remove(&key);
499 tx.send(()).await.ok();
500 }
501 ApiMessage::AddTickets(msg) => {
502 trace!("{:?}", msg.inner);
503 let WithChannels {
504 tx,
505 inner: api::AddTickets { peers },
506 ..
507 } = msg;
508 for addr in &peers {
509 self.sp.add_node_info(addr.clone());
510 }
511 // self.client.inner().join_peers(ids).await.ok();
512 tx.send(()).await.ok();
513 }
514 ApiMessage::JoinPeers(msg) => {
515 trace!("{:?}", msg.inner);
516 let WithChannels {
517 tx,
518 inner: api::JoinPeers { peers },
519 ..
520 } = msg;
521 let ids = peers
522 .iter()
523 .map(|a| a.node_id)
524 .filter(|id| *id != self.node_id())
525 .collect::<HashSet<_>>();
526 for addr in &peers {
527 self.sp.add_node_info(addr.clone());
528 }
529 self.client.inner().join_peers(ids).await.ok();
530 tx.send(()).await.ok();
531 }
532 ApiMessage::GetNodeAddr(msg) => {
533 trace!("{:?}", msg.inner);
534 let WithChannels { tx, .. } = msg;
535 if !self.config.disable_relay {
536 // don't await home relay if we have disabled relays, this will hang forever
537 self.router.endpoint().online().await;
538 }
539 let addr = self.router.endpoint().node_addr();
540 tx.send(addr).await.ok();
541 }
542 ApiMessage::Shutdown(msg) => {
543 return Some(msg.tx);
544 }
545 }
546 None
547 }
548
549 fn handle_send(
550 tasks: &mut Tasks,
551 connections: &mut ConnectionPool,
552 config: &Arc<Config>,
553 key: String,
554 data: Bytes,
555 remotes: &BTreeSet<NodeId>,
556 ) {
557 let me = connections.endpoint.node_id();
558 let msg = rpc::RecvSegment {
559 key,
560 data,
561 from: me,
562 };
563 for remote in remotes {
564 trace!("sending to stream {}: {}", msg.key, remote);
565 let conn = connections.get(remote);
566 tasks.push(Box::pin(Self::forward_task(
567 config.clone(),
568 conn,
569 msg.clone(),
570 )));
571 }
572 }
573
574 async fn forward_task(
575 config: Arc<Config>,
576 conn: Connection,
577 msg: RecvSegment,
578 ) -> (NodeId, Result<(), RpcTaskError>) {
579 let id = conn.id;
580 let res = async move {
581 tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??;
582 Ok(())
583 }
584 .await;
585 (id, res)
586 }
587
588 fn node_id(&self) -> PublicKey {
589 self.router.endpoint().node_id()
590 }
591}
592
593/// Iroh-streamplace node that can send, forward or receive stream segments.
594#[derive(Clone, uniffi::Object)]
595pub struct Node {
596 client: Arc<db::Db>,
597 write: Arc<db::WriteScope>,
598 api: irpc::Client<ApiProtocol>,
599}
600
601impl Node {
602 pub(crate) async fn new_in_runtime(
603 config: Config,
604 handler: HandlerMode,
605 ) -> Result<Arc<Self>, CreateError> {
606 let mode_str = Bytes::from(handler.mode_str());
607 let secret_key =
608 SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| {
609 CreateError::PrivateKey {
610 size: e.len() as u64,
611 }
612 })?);
613 let topic =
614 TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| {
615 CreateError::Topic {
616 size: e.len() as u64,
617 }
618 })?);
619 let relay_mode = if config.disable_relay {
620 RelayMode::Disabled
621 } else {
622 RelayMode::Default
623 };
624 let sp = StaticProvider::new();
625 let endpoint = iroh::Endpoint::builder()
626 .secret_key(secret_key)
627 .relay_mode(relay_mode)
628 .discovery(sp.clone())
629 .bind()
630 .await
631 .map_err(|e| CreateError::Bind {
632 message: e.to_string(),
633 })?;
634 let (api, actor) = Actor::spawn(endpoint, sp, topic, config, handler)
635 .await
636 .map_err(|e| CreateError::Subscribe {
637 message: e.to_string(),
638 })?;
639 api.node_scope()
640 .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str)
641 .await
642 .ok();
643 tokio::spawn(actor);
644 Ok(Arc::new(api))
645 }
646}
647
648/// DataHandler trait that is exported to go for receiving data callbacks.
649#[uniffi::export(with_foreign)]
650#[async_trait::async_trait]
651pub trait DataHandler: Send + Sync {
652 async fn handle_data(
653 &self,
654 from: Arc<crate::public_key::PublicKey>,
655 topic: String,
656 data: Vec<u8>,
657 );
658}
659
660#[uniffi::export]
661impl Node {
662 /// Create a new streamplace client node.
663 #[uniffi::constructor]
664 pub async fn sender(config: Config) -> Result<Arc<Self>, CreateError> {
665 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender))
666 }
667
668 #[uniffi::constructor]
669 pub async fn forwarder(config: Config) -> Result<Arc<Self>, CreateError> {
670 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder))
671 }
672
673 #[uniffi::constructor]
674 pub async fn receiver(
675 config: Config,
676 handler: Arc<dyn DataHandler>,
677 ) -> Result<Arc<Self>, CreateError> {
678 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Receiver(handler)))
679 }
680
681 /// Get a handle to the db to watch for changes locally or globally.
682 pub fn db(&self) -> Arc<db::Db> {
683 self.client.clone()
684 }
685
686 /// Get a handle to the write scope for this node.
687 ///
688 /// This is equivalent to calling `db.write(...)` with the secret key used to create the node.
689 pub fn node_scope(&self) -> Arc<db::WriteScope> {
690 self.write.clone()
691 }
692
693 /// Subscribe to updates for a given stream from a remote node.
694 pub async fn subscribe(
695 &self,
696 key: String,
697 remote_id: Arc<crate::public_key::PublicKey>,
698 ) -> Result<(), PutError> {
699 self.api
700 .rpc(api::Subscribe {
701 key,
702 remote_id: remote_id.as_ref().into(),
703 })
704 .await
705 .map_err(|e| PutError::Irpc {
706 message: e.to_string(),
707 })
708 }
709
710 /// Unsubscribe from updates for a given stream from a remote node.
711 pub async fn unsubscribe(
712 &self,
713 key: String,
714 remote_id: Arc<crate::public_key::PublicKey>,
715 ) -> Result<(), PutError> {
716 self.api
717 .rpc(api::Unsubscribe {
718 key,
719 remote_id: remote_id.as_ref().into(),
720 })
721 .await
722 .map_err(|e| PutError::Irpc {
723 message: e.to_string(),
724 })
725 }
726
727 /// Send a segment to all subscribers of the given stream.
728 pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> {
729 self.api
730 .rpc(api::SendSegment {
731 key,
732 data: data.into(),
733 })
734 .await
735 .map_err(|e| PutError::Irpc {
736 message: e.to_string(),
737 })
738 }
739
740 /// Join peers by their node tickets.
741 pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
742 let peers = peers
743 .iter()
744 .map(|p| NodeTicket::from_str(p))
745 .collect::<Result<Vec<_>, _>>()
746 .map_err(|e| JoinPeersError::Ticket {
747 message: e.to_string(),
748 })?;
749 let addrs = peers
750 .iter()
751 .map(|t| t.node_addr().clone())
752 .collect::<Vec<_>>();
753 self.api
754 .rpc(api::JoinPeers { peers: addrs })
755 .await
756 .map_err(|e| JoinPeersError::Irpc {
757 message: e.to_string(),
758 })
759 }
760
761 /// Add tickets for remote peers
762 pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
763 let peers = peers
764 .iter()
765 .map(|p| NodeTicket::from_str(p))
766 .collect::<Result<Vec<_>, _>>()
767 .map_err(|e| JoinPeersError::Ticket {
768 message: e.to_string(),
769 })?;
770 let addrs = peers
771 .iter()
772 .map(|t| t.node_addr().clone())
773 .collect::<Vec<_>>();
774 self.api
775 .rpc(api::AddTickets { peers: addrs })
776 .await
777 .map_err(|e| JoinPeersError::Irpc {
778 message: e.to_string(),
779 })
780 }
781
782 /// Get this node's ticket.
783 pub async fn ticket(&self) -> Result<String, PutError> {
784 let addr = self
785 .api
786 .rpc(api::GetNodeAddr)
787 .await
788 .map_err(|e| PutError::Irpc {
789 message: e.to_string(),
790 })?;
791 Ok(NodeTicket::from(addr).to_string())
792 }
793
794 /// Get this node's node ID.
795 pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> {
796 let addr = self
797 .api
798 .rpc(api::GetNodeAddr)
799 .await
800 .map_err(|e| PutError::Irpc {
801 message: e.to_string(),
802 })?;
803 Ok(Arc::new(addr.node_id.into()))
804 }
805
806 /// Shutdown the node, including the streaming system and the metadata db.
807 pub async fn shutdown(&self) -> Result<(), ShutdownError> {
808 // shut down both the streams and the db concurrently, even if one fails
809 let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown());
810 res1?;
811 res2?;
812 Ok(())
813 }
814}
815
816impl Node {
817 async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> {
818 self.api
819 .rpc(api::Shutdown)
820 .await
821 .map_err(|e| ShutdownError::Irpc {
822 message: e.to_string(),
823 })
824 }
825}