Live video on the AT Protocol
1uniffi::setup_scaffolding!();
2
3pub mod c2pa;
4pub mod error;
5pub mod node_addr;
6pub mod public_key;
7pub mod streams;
8
9use std::sync::{LazyLock, Once};
10
11mod db;
12pub use db::*;
13#[cfg(test)]
14mod tests;
15
16#[cfg(test)]
17mod test_stream;
18
19/// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime.
20static RUNTIME: LazyLock<tokio::runtime::Runtime> =
21 LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
22
23/// Ensure logging is only initialized once
24static LOGGING_INIT: Once = Once::new();
25
26use std::{
27 collections::{BTreeMap, BTreeSet, HashSet},
28 str::FromStr,
29 sync::Arc,
30};
31
32use bytes::Bytes;
33use iroh::{NodeId, PublicKey, RelayMode, SecretKey, discovery::static_provider::StaticProvider};
34use iroh_base::ticket::NodeTicket;
35use iroh_gossip::{net::Gossip, proto::TopicId};
36use irpc::{WithChannels, rpc::RemoteService};
37use irpc_iroh::{IrohProtocol, IrohRemoteConnection};
38use n0_future::future::Boxed;
39
40mod rpc {
41 //! Protocol API
42 use bytes::Bytes;
43 use iroh::NodeId;
44 use irpc::{channel::oneshot, rpc_requests};
45 use serde::{Deserialize, Serialize};
46
47 pub const ALPN: &[u8] = b"/iroh/streamplace/1";
48
49 /// Subscribe to the given `key`
50 #[derive(Debug, Serialize, Deserialize)]
51 pub struct Subscribe {
52 pub key: String,
53 // TODO: verify
54 pub remote_id: NodeId,
55 }
56
57 /// Unsubscribe from the given `key`
58 #[derive(Debug, Serialize, Deserialize)]
59 pub struct Unsubscribe {
60 pub key: String,
61 // TODO: verify
62 pub remote_id: NodeId,
63 }
64
65 // #[derive(Debug, Serialize, Deserialize)]
66 // pub struct SendSegment {
67 // pub key: String,
68 // pub data: Bytes,
69 // }
70
71 #[derive(Debug, Clone, Serialize, Deserialize)]
72 pub struct RecvSegment {
73 pub from: NodeId,
74 pub key: String,
75 pub data: Bytes,
76 }
77
78 // Use the macro to generate both the Protocol and Message enums
79 // plus implement Channels for each type
80 #[rpc_requests(message = Message)]
81 #[derive(Serialize, Deserialize, Debug)]
82 pub enum Protocol {
83 #[rpc(tx=oneshot::Sender<()>)]
84 Subscribe(Subscribe),
85 #[rpc(tx=oneshot::Sender<()>)]
86 Unsubscribe(Unsubscribe),
87 #[rpc(tx=oneshot::Sender<()>)]
88 RecvSegment(RecvSegment),
89 }
90}
91
92mod api {
93 //! Protocol API
94 use bytes::Bytes;
95 use iroh::{NodeAddr, NodeId};
96 use irpc::{channel::oneshot, rpc_requests};
97 use serde::{Deserialize, Serialize};
98
99 /// Subscribe to the given `key`
100 #[derive(Debug, Serialize, Deserialize)]
101 pub struct Subscribe {
102 pub key: String,
103 // TODO: verify
104 pub remote_id: NodeId,
105 }
106
107 /// Unsubscribe from the given `key`
108 #[derive(Debug, Serialize, Deserialize)]
109 pub struct Unsubscribe {
110 pub key: String,
111 // TODO: verify
112 pub remote_id: NodeId,
113 }
114
115 #[derive(Debug, Serialize, Deserialize)]
116 pub struct SendSegment {
117 pub key: String,
118 pub data: Bytes,
119 }
120
121 #[derive(Debug, Serialize, Deserialize)]
122 pub struct JoinPeers {
123 pub peers: Vec<NodeAddr>,
124 }
125
126 #[derive(Debug, Serialize, Deserialize)]
127 pub struct AddTickets {
128 pub peers: Vec<NodeAddr>,
129 }
130
131 #[derive(Debug, Serialize, Deserialize)]
132 pub struct GetNodeAddr;
133
134 #[derive(Debug, Serialize, Deserialize)]
135 pub struct Shutdown;
136
137 // Use the macro to generate both the Protocol and Message enums
138 // plus implement Channels for each type
139 #[rpc_requests(message = Message)]
140 #[derive(Serialize, Deserialize, Debug)]
141 pub enum Protocol {
142 #[rpc(tx=oneshot::Sender<()>)]
143 Subscribe(Subscribe),
144 #[rpc(tx=oneshot::Sender<()>)]
145 Unsubscribe(Unsubscribe),
146 #[rpc(tx=oneshot::Sender<()>)]
147 SendSegment(SendSegment),
148 #[rpc(tx=oneshot::Sender<()>)]
149 JoinPeers(JoinPeers),
150 #[rpc(tx=oneshot::Sender<()>)]
151 AddTickets(AddTickets),
152 #[rpc(tx=oneshot::Sender<NodeAddr>)]
153 GetNodeAddr(GetNodeAddr),
154 #[rpc(tx=oneshot::Sender<()>)]
155 Shutdown(Shutdown),
156 }
157}
158use api::{Message as ApiMessage, Protocol as ApiProtocol};
159use n0_future::{FuturesUnordered, StreamExt};
160use rpc::{Message as RpcMessage, Protocol as RpcProtocol};
161use snafu::Snafu;
162use tracing::{Instrument, debug, error, trace, trace_span, warn};
163
164use crate::rpc::RecvSegment;
165
166/// Initialize logging with the default subscriber that respects RUST_LOG environment variable.
167/// This function is safe to call multiple times - it will only initialize logging once.
168#[uniffi::export]
169pub fn init_logging() {
170 LOGGING_INIT.call_once(|| {
171 tracing_subscriber::fmt()
172 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
173 .init();
174 });
175}
176
177/// Initialize logging with a custom log level.
178/// This function is safe to call multiple times - it will only initialize logging once.
179///
180/// # Arguments
181/// * `level` - Log level as a string (e.g., "trace", "debug", "info", "warn", "error")
182#[uniffi::export]
183pub fn init_logging_with_level(level: String) {
184 LOGGING_INIT.call_once(|| {
185 let filter = tracing_subscriber::EnvFilter::try_from_default_env()
186 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level));
187
188 tracing_subscriber::fmt().with_env_filter(filter).init();
189 });
190}
191
192pub(crate) enum HandlerMode {
193 Sender,
194 Forwarder,
195 Receiver(Arc<dyn DataHandler>),
196}
197
198impl HandlerMode {
199 pub fn mode_str(&self) -> &'static str {
200 match self {
201 HandlerMode::Sender => "sender",
202 HandlerMode::Forwarder => "forwarder",
203 HandlerMode::Receiver(_) => "receiver",
204 }
205 }
206}
207
208type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>;
209
210/// Actor that contains both a kv db for metadata and a handler for the rpc protocol.
211///
212/// This can be used both for sender and receiver nodes. Sender nodes will just set the
213/// handler to None.
214struct Actor {
215 /// Receiver for rpc messages from remote nodes
216 rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>,
217 /// Receiver for API messages from the user
218 api_rx: tokio::sync::mpsc::Receiver<ApiMessage>,
219 /// nodes I need to send to for each stream
220 subscribers: BTreeMap<String, BTreeSet<NodeId>>,
221 /// nodes I am subscribed to
222 subscriptions: BTreeMap<String, NodeId>,
223 /// lightweight typed connection pool
224 connections: ConnectionPool,
225 /// How to handle incoming data
226 handler: HandlerMode,
227 /// Static provider for node discovery
228 sp: StaticProvider,
229 /// Iroh protocol router, I need to keep it around to keep the protocol alive
230 router: iroh::protocol::Router,
231 /// Metadata db
232 client: db::Db,
233 /// Write scope for this node for the metadata db
234 write: db::WriteScope,
235 /// Ongoing tasks
236 tasks: Tasks,
237 /// Configuration, needed for timeouts etc.
238 config: Arc<Config>,
239}
240
241#[derive(Debug, Clone)]
242struct Connection {
243 id: NodeId,
244 rpc: irpc::Client<RpcProtocol>,
245}
246
247#[derive(Debug, Snafu)]
248enum RpcTaskError {
249 #[snafu(transparent)]
250 Task { source: irpc::Error },
251 #[snafu(transparent)]
252 Timeout { source: tokio::time::error::Elapsed },
253}
254
255struct ConnectionPool {
256 endpoint: iroh::Endpoint,
257 connections: BTreeMap<NodeId, Connection>,
258}
259
260impl ConnectionPool {
261 fn new(endpoint: iroh::Endpoint) -> Self {
262 Self {
263 endpoint,
264 connections: BTreeMap::new(),
265 }
266 }
267
268 /// Cheap conn pool hack
269 fn get(&mut self, remote: &NodeId) -> Connection {
270 if !self.connections.contains_key(remote) {
271 trace!(remote = %remote.fmt_short(),"ConnectionPool.get is inserting a new remote connection");
272 let conn = IrohRemoteConnection::new(
273 self.endpoint.clone(),
274 (*remote).into(),
275 rpc::ALPN.to_vec(),
276 );
277 let conn = Connection {
278 rpc: irpc::Client::boxed(conn),
279 id: *remote,
280 };
281 self.connections.insert(*remote, conn);
282 }
283 self.connections
284 .get_mut(remote)
285 .expect("just inserted")
286 .clone()
287 }
288
289 fn remove(&mut self, remote: &NodeId) {
290 self.connections.remove(remote);
291 }
292}
293
294impl Actor {
295 pub async fn spawn(
296 endpoint: iroh::Endpoint,
297 sp: StaticProvider,
298 topic: iroh_gossip::proto::TopicId,
299 config: Config,
300 handler: HandlerMode,
301 ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> {
302 let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(512);
303 let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(512);
304 let gossip = Gossip::builder().spawn(endpoint.clone());
305 let id = endpoint.node_id();
306 let router = iroh::protocol::Router::builder(endpoint.clone())
307 .accept(iroh_gossip::ALPN, gossip.clone())
308 .accept(
309 rpc::ALPN,
310 IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())),
311 )
312 .spawn();
313 let topic = gossip.subscribe(topic, vec![]).await?;
314 let secret = router.endpoint().secret_key().clone();
315 let db_config = Default::default();
316 let client = iroh_smol_kv::Client::local(topic, db_config);
317 let write = db::WriteScope::new(client.write(secret.clone()));
318 let client = db::Db::new(client);
319 let actor = Self {
320 rpc_rx,
321 api_rx,
322 subscribers: BTreeMap::new(),
323 subscriptions: BTreeMap::new(),
324 connections: ConnectionPool::new(router.endpoint().clone()),
325 handler,
326 router,
327 sp,
328 write: write.clone(),
329 client: client.clone(),
330 tasks: FuturesUnordered::new(),
331 config: Arc::new(config),
332 };
333 let api = Node {
334 client: Arc::new(client),
335 write: Arc::new(write),
336 api: irpc::Client::local(api_tx),
337 };
338 Ok((
339 api,
340 actor
341 .run()
342 .instrument(trace_span!("actor", id=%id.fmt_short())),
343 ))
344 }
345
346 async fn run(mut self) {
347 loop {
348 tokio::select! {
349 msg = self.rpc_rx.recv() => {
350 trace!("received local rpc message");
351 let Some(msg) = msg else {
352 error!("rpc channel closed");
353 break;
354 };
355 self.handle_rpc(msg).instrument(trace_span!("rpc")).await;
356 }
357 msg = self.api_rx.recv() => {
358 trace!("received remote rpc message");
359 let Some(msg) = msg else {
360 break;
361 };
362 if let Some(shutdown) = self.handle_api(msg).instrument(trace_span!("api")).await {
363 shutdown.send(()).await.ok();
364 break;
365 }
366 }
367 res = self.tasks.next(), if !self.tasks.is_empty() => {
368 trace!("processing task");
369 let Some((remote_id, res)) = res else {
370 error!("task finished but no result");
371 break;
372 };
373 match res {
374 Ok(()) => {}
375 Err(RpcTaskError::Timeout { source }) => {
376 warn!("call to {remote_id} timed out: {source}");
377 }
378 Err(RpcTaskError::Task { source }) => {
379 warn!("call to {remote_id} failed: {source}");
380 }
381 }
382 self.connections.remove(&remote_id);
383 }
384 }
385 }
386 warn!("RPC Actor loop has closed");
387 }
388
389 async fn update_subscriber_meta(&mut self, key: &str) {
390 let n = self
391 .subscribers
392 .get(key)
393 .map(|s| s.len())
394 .unwrap_or_default();
395 let v = n.to_string().into_bytes();
396 self.write
397 .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into())
398 .await
399 .ok();
400 }
401
402 /// Requests from remote nodes
403 async fn handle_rpc(&mut self, msg: RpcMessage) {
404 trace!("RPC.handle_rpc");
405 match msg {
406 RpcMessage::Subscribe(msg) => {
407 trace!(inner = ?msg.inner, "RpcMessage::Subscribe");
408 let WithChannels {
409 tx,
410 inner: rpc::Subscribe { key, remote_id },
411 ..
412 } = msg;
413 self.subscribers
414 .entry(key.clone())
415 .or_default()
416 .insert(remote_id);
417 self.update_subscriber_meta(&key).await;
418 tx.send(()).await.ok();
419 }
420 RpcMessage::Unsubscribe(msg) => {
421 debug!(inner = ?msg.inner, "RpcMessage::Unsubscribe");
422 let WithChannels {
423 tx,
424 inner: rpc::Unsubscribe { key, remote_id },
425 ..
426 } = msg;
427 if let Some(e) = self.subscribers.get_mut(&key)
428 && !e.remove(&remote_id)
429 {
430 warn!(
431 "unsubscribe: no subscription for {} from {}",
432 key, remote_id
433 );
434 }
435 if let Some(subscriptions) = self.subscribers.get(&key)
436 && subscriptions.is_empty()
437 {
438 self.subscribers.remove(&key);
439 }
440 self.update_subscriber_meta(&key).await;
441 tx.send(()).await.ok();
442 }
443 RpcMessage::RecvSegment(msg) => {
444 trace!(inner = ?msg.inner, "RpcMessage::RecvSegment");
445 let WithChannels {
446 tx,
447 inner: rpc::RecvSegment { key, from, data },
448 ..
449 } = msg;
450 match &self.handler {
451 HandlerMode::Sender => {
452 warn!("received segment but in sender mode");
453 }
454 HandlerMode::Forwarder => {
455 trace!("forwarding segment");
456 if let Some(remotes) = self.subscribers.get(&key) {
457 Self::handle_send(
458 &mut self.tasks,
459 &mut self.connections,
460 &self.config,
461 key,
462 data,
463 remotes,
464 );
465 } else {
466 trace!("no subscribers for stream {}", key);
467 }
468 }
469 HandlerMode::Receiver(handler) => {
470 if self.subscriptions.contains_key(&key) {
471 let from = Arc::new(from.into());
472 handler.handle_data(from, key, data.to_vec()).await;
473 } else {
474 warn!("received segment for unsubscribed key: {}", key);
475 }
476 }
477 };
478 tx.send(()).await.ok();
479 }
480 }
481 }
482
483 async fn handle_api(&mut self, msg: ApiMessage) -> Option<irpc::channel::oneshot::Sender<()>> {
484 trace!("RPC.handle_api");
485 match msg {
486 ApiMessage::SendSegment(msg) => {
487 trace!(inner = ?msg.inner, "ApiMessage::SendSegment");
488 let WithChannels {
489 tx,
490 inner: api::SendSegment { key, data },
491 ..
492 } = msg;
493 if let Some(remotes) = self.subscribers.get(&key) {
494 Self::handle_send(
495 &mut self.tasks,
496 &mut self.connections,
497 &self.config,
498 key,
499 data,
500 remotes,
501 );
502 } else {
503 trace!("no subscribers for stream {}", key);
504 }
505 tx.send(()).await.ok();
506 }
507 ApiMessage::Subscribe(msg) => {
508 trace!(inner = ?msg.inner, "ApiMessage::Subscribe");
509 let WithChannels {
510 tx,
511 inner: api::Subscribe { key, remote_id },
512 ..
513 } = msg;
514 let conn = self.connections.get(&remote_id);
515 tx.send(()).await.ok();
516 trace!(remote = %remote_id.fmt_short(), key = %key, "send rpc::Subscribe message");
517 conn.rpc
518 .rpc(rpc::Subscribe {
519 key: key.clone(),
520 remote_id: self.node_id(),
521 })
522 .await
523 .ok();
524 trace!(remote = %remote_id.fmt_short(), key = %key, "inserting subscription");
525 self.subscriptions.insert(key, remote_id);
526 trace!("finished inserting subscription");
527 }
528 ApiMessage::Unsubscribe(msg) => {
529 trace!(inner = ?msg.inner, "ApiMessage::Unsubscribe");
530 let WithChannels {
531 tx,
532 inner: api::Unsubscribe { key, remote_id },
533 ..
534 } = msg;
535 let conn = self.connections.get(&remote_id);
536 tx.send(()).await.ok();
537 conn.rpc
538 .rpc(rpc::Unsubscribe {
539 key: key.clone(),
540 remote_id: self.node_id(),
541 })
542 .await
543 .ok();
544 self.subscriptions.remove(&key);
545 }
546 ApiMessage::AddTickets(msg) => {
547 trace!(inner = ?msg.inner, "ApiMessage::AddTickets");
548 let WithChannels {
549 tx,
550 inner: api::AddTickets { peers },
551 ..
552 } = msg;
553 for addr in &peers {
554 self.sp.add_node_info(addr.clone());
555 }
556 // self.client.inner().join_peers(ids).await.ok();
557 tx.send(()).await.ok();
558 }
559 ApiMessage::JoinPeers(msg) => {
560 trace!(inner = ?msg.inner, "ApiMessage::JoinPeers");
561 let WithChannels {
562 tx,
563 inner: api::JoinPeers { peers },
564 ..
565 } = msg;
566 let ids = peers
567 .iter()
568 .map(|a| a.node_id)
569 .filter(|id| *id != self.node_id())
570 .collect::<HashSet<_>>();
571 for addr in &peers {
572 self.sp.add_node_info(addr.clone());
573 }
574 self.client.inner().join_peers(ids).await.ok();
575 tx.send(()).await.ok();
576 }
577 ApiMessage::GetNodeAddr(msg) => {
578 trace!(inner = ?msg.inner, "ApiMessage::GetNodeAddr");
579 let WithChannels { tx, .. } = msg;
580 if !self.config.disable_relay {
581 // don't await home relay if we have disabled relays, this will hang forever
582 self.router.endpoint().online().await;
583 }
584 let addr = self.router.endpoint().node_addr();
585 tx.send(addr).await.ok();
586 }
587 ApiMessage::Shutdown(msg) => {
588 trace!(inner = ?msg.inner, "ApiMessage::Shutdown");
589 return Some(msg.tx);
590 }
591 }
592 None
593 }
594
595 fn handle_send(
596 tasks: &mut Tasks,
597 connections: &mut ConnectionPool,
598 config: &Arc<Config>,
599 key: String,
600 data: Bytes,
601 remotes: &BTreeSet<NodeId>,
602 ) {
603 let me = connections.endpoint.node_id();
604 let msg = rpc::RecvSegment {
605 key,
606 data,
607 from: me,
608 };
609 for remote in remotes {
610 trace!(remote = %remote.fmt_short(), key = %msg.key, "handle_send to remote");
611 let conn = connections.get(remote);
612 tasks.push(Box::pin(Self::forward_task(
613 config.clone(),
614 conn,
615 msg.clone(),
616 )));
617 }
618 }
619
620 async fn forward_task(
621 config: Arc<Config>,
622 conn: Connection,
623 msg: RecvSegment,
624 ) -> (NodeId, Result<(), RpcTaskError>) {
625 let id = conn.id;
626 let res = async move {
627 tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??;
628 Ok(())
629 }
630 .await;
631 (id, res)
632 }
633
634 fn node_id(&self) -> PublicKey {
635 self.router.endpoint().node_id()
636 }
637}
638
639/// Iroh-streamplace node that can send, forward or receive stream segments.
640#[derive(Clone, uniffi::Object)]
641pub struct Node {
642 client: Arc<db::Db>,
643 write: Arc<db::WriteScope>,
644 api: irpc::Client<ApiProtocol>,
645}
646
647impl Node {
648 pub(crate) async fn new_in_runtime(
649 config: Config,
650 handler: HandlerMode,
651 ) -> Result<Arc<Self>, CreateError> {
652 let mode_str = Bytes::from(handler.mode_str());
653 let secret_key =
654 SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| {
655 CreateError::PrivateKey {
656 size: e.len() as u64,
657 }
658 })?);
659 let topic =
660 TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| {
661 CreateError::Topic {
662 size: e.len() as u64,
663 }
664 })?);
665 let relay_mode = if config.disable_relay {
666 RelayMode::Disabled
667 } else {
668 RelayMode::Default
669 };
670 let sp = StaticProvider::new();
671 let endpoint = iroh::Endpoint::builder()
672 .secret_key(secret_key)
673 .relay_mode(relay_mode)
674 .discovery(sp.clone())
675 .bind()
676 .await
677 .map_err(|e| CreateError::Bind {
678 message: e.to_string(),
679 })?;
680 let (api, actor) = Actor::spawn(endpoint, sp, topic, config, handler)
681 .await
682 .map_err(|e| CreateError::Subscribe {
683 message: e.to_string(),
684 })?;
685 api.node_scope()
686 .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str)
687 .await
688 .ok();
689 tokio::spawn(actor);
690 Ok(Arc::new(api))
691 }
692}
693
694/// DataHandler trait that is exported to go for receiving data callbacks.
695#[uniffi::export(with_foreign)]
696#[async_trait::async_trait]
697pub trait DataHandler: Send + Sync {
698 async fn handle_data(
699 &self,
700 from: Arc<crate::public_key::PublicKey>,
701 topic: String,
702 data: Vec<u8>,
703 );
704}
705
706#[uniffi::export]
707impl Node {
708 /// Create a new streamplace client node.
709 #[uniffi::constructor]
710 pub async fn sender(config: Config) -> Result<Arc<Self>, CreateError> {
711 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender))
712 }
713
714 #[uniffi::constructor]
715 pub async fn forwarder(config: Config) -> Result<Arc<Self>, CreateError> {
716 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder))
717 }
718
719 #[uniffi::constructor]
720 pub async fn receiver(
721 config: Config,
722 handler: Arc<dyn DataHandler>,
723 ) -> Result<Arc<Self>, CreateError> {
724 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Receiver(handler)))
725 }
726
727 /// Get a handle to the db to watch for changes locally or globally.
728 pub fn db(&self) -> Arc<db::Db> {
729 self.client.clone()
730 }
731
732 /// Get a handle to the write scope for this node.
733 ///
734 /// This is equivalent to calling `db.write(...)` with the secret key used to create the node.
735 pub fn node_scope(&self) -> Arc<db::WriteScope> {
736 self.write.clone()
737 }
738
739 /// Subscribe to updates for a given stream from a remote node.
740 pub async fn subscribe(
741 &self,
742 key: String,
743 remote_id: Arc<crate::public_key::PublicKey>,
744 ) -> Result<(), PutError> {
745 self.api
746 .rpc(api::Subscribe {
747 key,
748 remote_id: remote_id.as_ref().into(),
749 })
750 .await
751 .map_err(|e| PutError::Irpc {
752 message: e.to_string(),
753 })
754 }
755
756 /// Unsubscribe from updates for a given stream from a remote node.
757 pub async fn unsubscribe(
758 &self,
759 key: String,
760 remote_id: Arc<crate::public_key::PublicKey>,
761 ) -> Result<(), PutError> {
762 self.api
763 .rpc(api::Unsubscribe {
764 key,
765 remote_id: remote_id.as_ref().into(),
766 })
767 .await
768 .map_err(|e| PutError::Irpc {
769 message: e.to_string(),
770 })
771 }
772
773 /// Send a segment to all subscribers of the given stream.
774 pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> {
775 debug!(key = &key, data_len = data.len(), "Node.send_segment");
776 self.api
777 .rpc(api::SendSegment {
778 key,
779 data: data.into(),
780 })
781 .await
782 .map_err(|e| PutError::Irpc {
783 message: e.to_string(),
784 })
785 }
786
787 /// Join peers by their node tickets.
788 pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
789 let peers = peers
790 .iter()
791 .map(|p| NodeTicket::from_str(p))
792 .collect::<Result<Vec<_>, _>>()
793 .map_err(|e| JoinPeersError::Ticket {
794 message: e.to_string(),
795 })?;
796 let addrs = peers
797 .iter()
798 .map(|t| t.node_addr().clone())
799 .collect::<Vec<_>>();
800 self.api
801 .rpc(api::JoinPeers { peers: addrs })
802 .await
803 .map_err(|e| JoinPeersError::Irpc {
804 message: e.to_string(),
805 })
806 }
807
808 /// Add tickets for remote peers
809 pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
810 let peers = peers
811 .iter()
812 .map(|p| NodeTicket::from_str(p))
813 .collect::<Result<Vec<_>, _>>()
814 .map_err(|e| JoinPeersError::Ticket {
815 message: e.to_string(),
816 })?;
817 let addrs = peers
818 .iter()
819 .map(|t| t.node_addr().clone())
820 .collect::<Vec<_>>();
821 self.api
822 .rpc(api::AddTickets { peers: addrs })
823 .await
824 .map_err(|e| JoinPeersError::Irpc {
825 message: e.to_string(),
826 })
827 }
828
829 /// Get this node's ticket.
830 pub async fn ticket(&self) -> Result<String, PutError> {
831 let addr = self
832 .api
833 .rpc(api::GetNodeAddr)
834 .await
835 .map_err(|e| PutError::Irpc {
836 message: e.to_string(),
837 })?;
838 Ok(NodeTicket::from(addr).to_string())
839 }
840
841 /// Get this node's node ID.
842 pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> {
843 let addr = self
844 .api
845 .rpc(api::GetNodeAddr)
846 .await
847 .map_err(|e| PutError::Irpc {
848 message: e.to_string(),
849 })?;
850 Ok(Arc::new(addr.node_id.into()))
851 }
852
853 /// Shutdown the node, including the streaming system and the metadata db.
854 pub async fn shutdown(&self) -> Result<(), ShutdownError> {
855 // shut down both the streams and the db concurrently, even if one fails
856 let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown());
857 res1?;
858 res2?;
859 Ok(())
860 }
861}
862
863impl Node {
864 async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> {
865 self.api
866 .rpc(api::Shutdown)
867 .await
868 .map_err(|e| ShutdownError::Irpc {
869 message: e.to_string(),
870 })
871 }
872}