Live video on the AT Protocol
1uniffi::setup_scaffolding!();
2
3pub mod c2pa;
4pub mod node_addr;
5pub mod public_key;
6
7use std::sync::{LazyLock, Once};
8
9mod db;
10pub use db::*;
11#[cfg(test)]
12mod tests;
13
14/// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime.
15static RUNTIME: LazyLock<tokio::runtime::Runtime> =
16 LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
17
18/// Ensure logging is only initialized once
19static LOGGING_INIT: Once = Once::new();
20
21use std::{
22 collections::{BTreeMap, BTreeSet, HashSet},
23 str::FromStr,
24 sync::Arc,
25};
26use tokio::sync::Mutex;
27
28use bytes::Bytes;
29use iroh::{NodeId, RelayMode, SecretKey, discovery::static_provider::StaticProvider};
30use iroh_base::ticket::NodeTicket;
31use iroh_gossip::{net::Gossip, proto::TopicId};
32use irpc::{WithChannels, rpc::RemoteService};
33use irpc_iroh::{IrohProtocol, IrohRemoteConnection};
34use n0_future::future::Boxed;
35
36mod rpc {
37 //! Protocol API
38 use bytes::Bytes;
39 use iroh::NodeId;
40 use irpc::{channel::oneshot, rpc_requests};
41 use serde::{Deserialize, Serialize};
42
43 pub const ALPN: &[u8] = b"/iroh/streamplace/1";
44
45 /// Subscribe to the given `key`
46 #[derive(Debug, Serialize, Deserialize)]
47 pub struct Subscribe {
48 pub key: String,
49 // TODO: verify
50 pub remote_id: NodeId,
51 }
52
53 /// Unsubscribe from the given `key`
54 #[derive(Debug, Serialize, Deserialize)]
55 pub struct Unsubscribe {
56 pub key: String,
57 // TODO: verify
58 pub remote_id: NodeId,
59 }
60
61 // #[derive(Debug, Serialize, Deserialize)]
62 // pub struct SendSegment {
63 // pub key: String,
64 // pub data: Bytes,
65 // }
66
67 #[derive(Debug, Clone, Serialize, Deserialize)]
68 pub struct RecvSegment {
69 pub from: NodeId,
70 pub key: String,
71 pub data: Bytes,
72 }
73
74 // Use the macro to generate both the Protocol and Message enums
75 // plus implement Channels for each type
76 #[rpc_requests(message = Message)]
77 #[derive(Serialize, Deserialize, Debug)]
78 pub enum Protocol {
79 #[rpc(tx=oneshot::Sender<()>)]
80 Subscribe(Subscribe),
81 #[rpc(tx=oneshot::Sender<()>)]
82 Unsubscribe(Unsubscribe),
83 #[rpc(tx=oneshot::Sender<()>)]
84 RecvSegment(RecvSegment),
85 }
86}
87
88mod api {
89 //! Protocol API
90 use bytes::Bytes;
91 use iroh::{NodeAddr, NodeId};
92 use irpc::{channel::oneshot, rpc_requests};
93 use serde::{Deserialize, Serialize};
94
95 /// Subscribe to the given `key`
96 #[derive(Debug, Serialize, Deserialize)]
97 pub struct Subscribe {
98 pub key: String,
99 // TODO: verify
100 pub remote_id: NodeId,
101 }
102
103 /// Unsubscribe from the given `key`
104 #[derive(Debug, Serialize, Deserialize)]
105 pub struct Unsubscribe {
106 pub key: String,
107 // TODO: verify
108 pub remote_id: NodeId,
109 }
110
111 #[derive(Debug, Serialize, Deserialize)]
112 pub struct SendSegment {
113 pub key: String,
114 pub data: Bytes,
115 }
116
117 #[derive(Debug, Serialize, Deserialize)]
118 pub struct JoinPeers {
119 pub peers: Vec<NodeAddr>,
120 }
121
122 #[derive(Debug, Serialize, Deserialize)]
123 pub struct AddTickets {
124 pub peers: Vec<NodeAddr>,
125 }
126
127 /// Subscribe to a key with a ticket - handles discovery, dialing, and subscription atomically
128 #[derive(Debug, Serialize, Deserialize)]
129 pub struct SubscribeWithTicket {
130 pub key: String,
131 pub ticket: NodeAddr,
132 }
133
134 #[derive(Debug, Serialize, Deserialize)]
135 pub struct GetNodeAddr;
136
137 #[derive(Debug, Serialize, Deserialize)]
138 pub struct Shutdown;
139
140 // Use the macro to generate both the Protocol and Message enums
141 // plus implement Channels for each type
142 #[rpc_requests(message = Message)]
143 #[derive(Serialize, Deserialize, Debug)]
144 pub enum Protocol {
145 #[rpc(tx=oneshot::Sender<()>)]
146 Subscribe(Subscribe),
147 #[rpc(tx=oneshot::Sender<()>)]
148 Unsubscribe(Unsubscribe),
149 #[rpc(tx=oneshot::Sender<()>)]
150 SendSegment(SendSegment),
151 #[rpc(tx=oneshot::Sender<()>)]
152 JoinPeers(JoinPeers),
153 #[rpc(tx=oneshot::Sender<()>)]
154 AddTickets(AddTickets),
155 #[rpc(tx=oneshot::Sender<()>)]
156 SubscribeWithTicket(SubscribeWithTicket),
157 #[rpc(tx=oneshot::Sender<NodeAddr>)]
158 GetNodeAddr(GetNodeAddr),
159 #[rpc(tx=oneshot::Sender<()>)]
160 Shutdown(Shutdown),
161 }
162}
163use api::{Message as ApiMessage, Protocol as ApiProtocol};
164use n0_future::{FuturesUnordered, StreamExt};
165use rpc::{Message as RpcMessage, Protocol as RpcProtocol};
166use snafu::Snafu;
167use tracing::{Instrument, debug, error, trace, trace_span, warn};
168
169use crate::rpc::RecvSegment;
170
171/// Initialize logging with the default subscriber that respects RUST_LOG environment variable.
172/// This function is safe to call multiple times - it will only initialize logging once.
173#[uniffi::export]
174pub fn init_logging() {
175 LOGGING_INIT.call_once(|| {
176 tracing_subscriber::fmt()
177 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
178 .init();
179 });
180}
181
182/// Initialize logging with a custom log level.
183/// This function is safe to call multiple times - it will only initialize logging once.
184///
185/// # Arguments
186/// * `level` - Log level as a string (e.g., "trace", "debug", "info", "warn", "error")
187#[uniffi::export]
188pub fn init_logging_with_level(level: String) {
189 LOGGING_INIT.call_once(|| {
190 let filter = tracing_subscriber::EnvFilter::try_from_default_env()
191 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level));
192
193 tracing_subscriber::fmt().with_env_filter(filter).init();
194 });
195}
196
197pub(crate) enum HandlerMode {
198 Sender,
199 Forwarder,
200 Receiver(Arc<dyn DataHandler>),
201}
202
203impl Clone for HandlerMode {
204 fn clone(&self) -> Self {
205 match self {
206 HandlerMode::Sender => HandlerMode::Sender,
207 HandlerMode::Forwarder => HandlerMode::Forwarder,
208 HandlerMode::Receiver(h) => HandlerMode::Receiver(h.clone()),
209 }
210 }
211}
212
213impl HandlerMode {
214 pub fn mode_str(&self) -> &'static str {
215 match self {
216 HandlerMode::Sender => "sender",
217 HandlerMode::Forwarder => "forwarder",
218 HandlerMode::Receiver(_) => "receiver",
219 }
220 }
221}
222
223type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>;
224
225/// Actor that contains both a kv db for metadata and a handler for the rpc protocol.
226///
227/// This can be used both for sender and receiver nodes. Sender nodes will just set the
228/// handler to None.
229struct Actor {
230 /// Receiver for rpc messages from remote nodes
231 rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>,
232 /// Receiver for API messages from the user
233 api_rx: tokio::sync::mpsc::Receiver<ApiMessage>,
234 /// Shared state wrapped for concurrent access
235 state: Arc<Mutex<ActorState>>,
236}
237
238/// Shared mutable state for the actor
239struct ActorState {
240 /// nodes I need to send to for each stream
241 subscribers: BTreeMap<String, BTreeSet<NodeId>>,
242 /// nodes I am subscribed to
243 subscriptions: BTreeMap<String, NodeId>,
244 /// lightweight typed connection pool
245 connections: ConnectionPool,
246 /// How to handle incoming data
247 handler: HandlerMode,
248 /// Static provider for node discovery
249 sp: StaticProvider,
250 /// Iroh protocol router, I need to keep it around to keep the protocol alive
251 router: iroh::protocol::Router,
252 /// Metadata db
253 client: db::Db,
254 /// Write scope for this node for the metadata db
255 write: db::WriteScope,
256 /// Ongoing tasks
257 tasks: Tasks,
258 /// Configuration, needed for timeouts etc.
259 config: Arc<Config>,
260}
261
262#[derive(Debug, Clone)]
263struct Connection {
264 id: NodeId,
265 rpc: irpc::Client<RpcProtocol>,
266}
267
268#[derive(Debug, Snafu)]
269enum RpcTaskError {
270 #[snafu(transparent)]
271 Task { source: irpc::Error },
272 #[snafu(transparent)]
273 Timeout { source: tokio::time::error::Elapsed },
274}
275
276struct ConnectionPool {
277 endpoint: iroh::Endpoint,
278 connections: BTreeMap<NodeId, Connection>,
279}
280
281impl ConnectionPool {
282 fn new(endpoint: iroh::Endpoint) -> Self {
283 Self {
284 endpoint,
285 connections: BTreeMap::new(),
286 }
287 }
288
289 /// Cheap conn pool hack
290 fn get(&mut self, remote: &NodeId) -> Connection {
291 if !self.connections.contains_key(remote) {
292 trace!(remote = %remote.fmt_short(),"ConnectionPool.get is inserting a new remote connection");
293 let conn = IrohRemoteConnection::new(
294 self.endpoint.clone(),
295 (*remote).into(),
296 rpc::ALPN.to_vec(),
297 );
298 let conn = Connection {
299 rpc: irpc::Client::boxed(conn),
300 id: *remote,
301 };
302 self.connections.insert(*remote, conn);
303 }
304 self.connections
305 .get_mut(remote)
306 .expect("just inserted")
307 .clone()
308 }
309
310 fn remove(&mut self, remote: &NodeId) {
311 self.connections.remove(remote);
312 }
313}
314
315impl Actor {
316 pub async fn spawn(
317 endpoint: iroh::Endpoint,
318 sp: StaticProvider,
319 topic: iroh_gossip::proto::TopicId,
320 config: Config,
321 handler: HandlerMode,
322 ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> {
323 let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(512);
324 let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(512);
325 let gossip = Gossip::builder().spawn(endpoint.clone());
326 let id = endpoint.node_id();
327 let router = iroh::protocol::Router::builder(endpoint.clone())
328 .accept(iroh_gossip::ALPN, gossip.clone())
329 .accept(
330 rpc::ALPN,
331 IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())),
332 )
333 .spawn();
334 let topic = gossip.subscribe(topic, vec![]).await?;
335 let secret = router.endpoint().secret_key().clone();
336 let db_config = Default::default();
337 let client = iroh_smol_kv::Client::local(topic, db_config);
338 let write = db::WriteScope::new(client.write(secret.clone()));
339 let client = db::Db::new(client);
340 let state = Arc::new(Mutex::new(ActorState {
341 subscribers: BTreeMap::new(),
342 subscriptions: BTreeMap::new(),
343 connections: ConnectionPool::new(router.endpoint().clone()),
344 handler,
345 router,
346 sp,
347 write: write.clone(),
348 client: client.clone(),
349 tasks: FuturesUnordered::new(),
350 config: Arc::new(config),
351 }));
352 let actor = Self {
353 rpc_rx,
354 api_rx,
355 state,
356 };
357 let api = Node {
358 client: Arc::new(client),
359 write: Arc::new(write),
360 api: irpc::Client::local(api_tx),
361 };
362 Ok((
363 api,
364 actor
365 .run()
366 .instrument(trace_span!("actor", id=%id.fmt_short())),
367 ))
368 }
369
370 async fn run(mut self) {
371 loop {
372 tokio::select! {
373 msg = self.rpc_rx.recv() => {
374 trace!("received local rpc message");
375 let Some(msg) = msg else {
376 error!("rpc channel closed");
377 break;
378 };
379 let state = self.state.clone();
380 tokio::spawn(async move {
381 Self::handle_rpc(state, msg).instrument(trace_span!("rpc")).await;
382 });
383 }
384 msg = self.api_rx.recv() => {
385 trace!("received remote rpc message");
386 let Some(msg) = msg else {
387 break;
388 };
389 // check if it's a shutdown message before spawning
390 if matches!(msg, ApiMessage::Shutdown(_)) {
391 let state = self.state.clone();
392 let shutdown = tokio::spawn(async move {
393 Self::handle_api(state, msg).instrument(trace_span!("api")).await
394 }).await.ok().flatten();
395 if let Some(shutdown) = shutdown {
396 shutdown.send(()).await.ok();
397 }
398 break;
399 } else {
400 let state = self.state.clone();
401 tokio::spawn(async move {
402 Self::handle_api(state, msg).instrument(trace_span!("api")).await;
403 });
404 }
405 }
406 else => {
407 trace!("processing task");
408 // poll tasks - briefly lock to check and poll
409 let poll_result = {
410 let mut state = self.state.lock().await;
411 if !state.tasks.is_empty() {
412 state.tasks.next().await
413 } else {
414 None
415 }
416 };
417 if let Some((remote_id, res)) = poll_result {
418 match res {
419 Ok(()) => {}
420 Err(RpcTaskError::Timeout { source }) => {
421 warn!("call to {remote_id} timed out: {source}");
422 }
423 Err(RpcTaskError::Task { source }) => {
424 warn!("call to {remote_id} failed: {source}");
425 }
426 }
427 self.state.lock().await.connections.remove(&remote_id);
428 }
429 }
430 }
431 }
432 warn!("RPC Actor loop has closed");
433 }
434
435 /// Update subscriber meta in the gossip database. Will lock internally to fetch subscribers.
436 async fn update_subscriber_meta_unlocked(state: Arc<Mutex<ActorState>>, key: &str) {
437 let (n, write) = {
438 let state = state.lock().await;
439 let n = state
440 .subscribers
441 .get(key)
442 .map(|s| s.len())
443 .unwrap_or_default();
444 (n, state.write.clone())
445 };
446 let v = n.to_string().into_bytes();
447 write
448 .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into())
449 .await
450 .ok();
451 }
452
453 /// Requests from remote nodes
454 async fn handle_rpc(state: Arc<Mutex<ActorState>>, msg: RpcMessage) {
455 trace!("RPC.handle_rpc");
456 match msg {
457 RpcMessage::Subscribe(msg) => {
458 trace!(inner = ?msg.inner, "RpcMessage::Subscribe");
459 let WithChannels {
460 tx,
461 inner: rpc::Subscribe { key, remote_id },
462 ..
463 } = msg;
464 {
465 let mut state = state.lock().await;
466 state
467 .subscribers
468 .entry(key.clone())
469 .or_default()
470 .insert(remote_id);
471 debug!(key = %key, remote = %remote_id.fmt_short(), count = state.subscribers.get(&key).map(|s| s.len()).unwrap_or(0), "added subscriber");
472 }
473 Self::update_subscriber_meta_unlocked(state.clone(), &key).await;
474 tx.send(()).await.ok();
475 }
476 RpcMessage::Unsubscribe(msg) => {
477 debug!(inner = ?msg.inner, "RpcMessage::Unsubscribe");
478 let WithChannels {
479 tx,
480 inner: rpc::Unsubscribe { key, remote_id },
481 ..
482 } = msg;
483 {
484 let mut state = state.lock().await;
485 if let Some(e) = state.subscribers.get_mut(&key)
486 && !e.remove(&remote_id)
487 {
488 warn!(
489 "unsubscribe: no subscription for {} from {}",
490 key, remote_id
491 );
492 }
493 if let Some(subscriptions) = state.subscribers.get(&key)
494 && subscriptions.is_empty()
495 {
496 state.subscribers.remove(&key);
497 }
498 }
499 Self::update_subscriber_meta_unlocked(state.clone(), &key).await;
500 tx.send(()).await.ok();
501 }
502 RpcMessage::RecvSegment(msg) => {
503 trace!(inner = ?msg.inner, "RpcMessage::RecvSegment");
504 let WithChannels {
505 tx,
506 inner: rpc::RecvSegment { key, from, data },
507 ..
508 } = msg;
509 let mut state = state.lock().await;
510 match &state.handler {
511 HandlerMode::Sender => {
512 warn!("received segment but in sender mode");
513 }
514 HandlerMode::Forwarder => {
515 trace!("forwarding segment");
516 if let Some(remotes) = state.subscribers.get(&key).cloned() {
517 Self::handle_send(&mut state, key, data, &remotes);
518 } else {
519 trace!("no subscribers for stream {}", key);
520 }
521 drop(state); // release lock
522 }
523 HandlerMode::Receiver(handler) => {
524 if state.subscriptions.contains_key(&key) {
525 let from = Arc::new(from.into());
526 let handler = handler.clone();
527 drop(state); // release lock before async call
528 handler.handle_data(from, key, data.to_vec()).await;
529 } else {
530 warn!("received segment for unsubscribed key: {}", key);
531 drop(state);
532 }
533 }
534 };
535 tx.send(()).await.ok();
536 }
537 }
538 }
539
540 async fn handle_api(
541 state: Arc<Mutex<ActorState>>,
542 msg: ApiMessage,
543 ) -> Option<irpc::channel::oneshot::Sender<()>> {
544 trace!("RPC.handle_api");
545 match msg {
546 ApiMessage::SendSegment(msg) => {
547 trace!(inner = ?msg.inner, "ApiMessage::SendSegment");
548 let WithChannels {
549 tx,
550 inner: api::SendSegment { key, data },
551 ..
552 } = msg;
553 {
554 let mut state = state.lock().await;
555 if let Some(remotes) = state.subscribers.get(&key).cloned() {
556 Self::handle_send(&mut state, key, data, &remotes);
557 } else {
558 trace!("no subscribers for stream {}", key);
559 }
560 }
561 tx.send(()).await.ok();
562 }
563 ApiMessage::Subscribe(msg) => {
564 trace!(inner = ?msg.inner, "ApiMessage::Subscribe");
565 let WithChannels {
566 tx,
567 inner: api::Subscribe { key, remote_id },
568 ..
569 } = msg;
570 let (conn, node_id) = {
571 let mut state = state.lock().await;
572 (
573 state.connections.get(&remote_id),
574 state.router.endpoint().node_id(),
575 )
576 };
577 trace!(remote = %remote_id.fmt_short(), key = %key, "send rpc::Subscribe message");
578 let result = conn
579 .rpc
580 .rpc(rpc::Subscribe {
581 key: key.clone(),
582 remote_id: node_id,
583 })
584 .await;
585 match &result {
586 Ok(_) => {
587 debug!(remote = %remote_id.fmt_short(), key = %key, "subscribe RPC succeeded");
588 state.lock().await.subscriptions.insert(key, remote_id);
589 }
590 Err(e) => {
591 error!(remote = %remote_id.fmt_short(), key = %key, error = %e, "subscribe RPC failed");
592 }
593 }
594 tx.send(()).await.ok();
595 }
596 ApiMessage::Unsubscribe(msg) => {
597 trace!(inner = ?msg.inner, "ApiMessage::Unsubscribe");
598 let WithChannels {
599 tx,
600 inner: api::Unsubscribe { key, remote_id },
601 ..
602 } = msg;
603 let (conn, node_id) = {
604 let mut state = state.lock().await;
605 (
606 state.connections.get(&remote_id),
607 state.router.endpoint().node_id(),
608 )
609 };
610 trace!(remote = %remote_id.fmt_short(), key = %key, "send rpc::Unsubscribe message");
611 conn.rpc
612 .rpc(rpc::Unsubscribe {
613 key: key.clone(),
614 remote_id: node_id,
615 })
616 .await
617 .ok();
618 state.lock().await.subscriptions.remove(&key);
619 tx.send(()).await.ok();
620 }
621 ApiMessage::AddTickets(msg) => {
622 trace!(inner = ?msg.inner, "ApiMessage::AddTickets");
623 let WithChannels {
624 tx,
625 inner: api::AddTickets { peers },
626 ..
627 } = msg;
628 let sp = {
629 let state = state.lock().await;
630 state.sp.clone()
631 };
632 for addr in &peers {
633 sp.add_node_info(addr.clone());
634 }
635 tx.send(()).await.ok();
636 }
637 ApiMessage::JoinPeers(msg) => {
638 trace!(inner = ?msg.inner, "ApiMessage::JoinPeers");
639 let WithChannels {
640 tx,
641 inner: api::JoinPeers { peers },
642 ..
643 } = msg;
644 let state = state.lock().await;
645 let node_id = state.router.endpoint().node_id();
646 let ids = peers
647 .iter()
648 .map(|a| a.node_id)
649 .filter(|id| *id != node_id)
650 .collect::<HashSet<_>>();
651 for addr in &peers {
652 state.sp.add_node_info(addr.clone());
653 }
654 let client = state.client.clone();
655 drop(state);
656 client.inner().join_peers(ids).await.ok();
657 tx.send(()).await.ok();
658 }
659 ApiMessage::SubscribeWithTicket(msg) => {
660 trace!(inner = ?msg.inner, "ApiMessage::SubscribeWithTicket");
661 let WithChannels {
662 tx,
663 inner: api::SubscribeWithTicket { key, ticket },
664 ..
665 } = msg;
666
667 // Extract remote node ID from ticket
668 let remote_id = ticket.node_id;
669 let node_id = {
670 let state = state.lock().await;
671 state.router.endpoint().node_id()
672 };
673
674 // Don't subscribe to ourselves
675 if remote_id == node_id {
676 debug!(key = %key, "skipping self-subscription");
677 tx.send(()).await.ok();
678 return None;
679 }
680
681 // Add to discovery and join peers atomically
682 {
683 let state = state.lock().await;
684 state.sp.add_node_info(ticket.clone());
685 let client = state.client.clone();
686 drop(state);
687
688 // Wait for join to complete before proceeding
689 if let Err(e) = client.inner().join_peers([remote_id]).await {
690 error!(remote = %remote_id.fmt_short(), key = %key, error = ?e, "failed to join peer");
691 tx.send(()).await.ok();
692 return None;
693 }
694 }
695
696 debug!(remote = %remote_id.fmt_short(), key = %key, "joined peer, now subscribing with retries");
697
698 // Retry subscribe with backoff to handle connection timing
699 const MAX_RETRIES: usize = 5;
700 let mut attempt = 0;
701 let mut backoff = std::time::Duration::from_millis(200);
702 let mut last_error;
703
704 while attempt < MAX_RETRIES {
705 attempt += 1;
706
707 // Get connection and subscribe
708 let result = {
709 let mut state = state.lock().await;
710 let conn = state.connections.get(&remote_id);
711 conn.rpc
712 .rpc(rpc::Subscribe {
713 key: key.clone(),
714 remote_id: node_id,
715 })
716 .await
717 };
718
719 match result {
720 Ok(_) => {
721 debug!(remote = %remote_id.fmt_short(), key = %key, attempt = attempt, "SubscribeWithTicket succeeded");
722 state.lock().await.subscriptions.insert(key, remote_id);
723 break;
724 }
725 Err(e) => {
726 last_error = Some(e);
727 if attempt >= MAX_RETRIES {
728 if let Some(ref err) = last_error {
729 error!(remote = %remote_id.fmt_short(), key = %key, error = ?err, "SubscribeWithTicket failed after all retries");
730 }
731 } else {
732 if let Some(ref err) = last_error {
733 debug!(remote = %remote_id.fmt_short(), key = %key, attempt = attempt, error = ?err, backoff_ms = backoff.as_millis(), "SubscribeWithTicket attempt failed, retrying");
734 }
735 tokio::time::sleep(backoff).await;
736 // Exponential backoff with cap at 2s
737 if backoff < std::time::Duration::from_secs(2) {
738 backoff = std::cmp::min(backoff * 2, std::time::Duration::from_secs(2));
739 }
740 }
741 }
742 }
743 }
744
745 tx.send(()).await.ok();
746 }
747 ApiMessage::GetNodeAddr(msg) => {
748 trace!(inner = ?msg.inner, "ApiMessage::GetNodeAddr");
749 let WithChannels { tx, .. } = msg;
750 let (disable_relay, endpoint) = {
751 let state = state.lock().await;
752 (state.config.disable_relay, state.router.endpoint().clone())
753 };
754 if !disable_relay {
755 endpoint.online().await;
756 }
757 let addr = endpoint.node_addr();
758 tx.send(addr).await.ok();
759 }
760 ApiMessage::Shutdown(msg) => {
761 trace!(inner = ?msg.inner, "ApiMessage::Shutdown");
762 return Some(msg.tx);
763 }
764 }
765 None
766 }
767
768 fn handle_send(state: &mut ActorState, key: String, data: Bytes, remotes: &BTreeSet<NodeId>) {
769 let me = state.connections.endpoint.node_id();
770 let msg = rpc::RecvSegment {
771 key,
772 data,
773 from: me,
774 };
775 for remote in remotes {
776 trace!(remote = %remote.fmt_short(), key = %msg.key, "handle_send to remote");
777 let conn = state.connections.get(remote);
778 state.tasks.push(Box::pin(Self::forward_task(
779 state.config.clone(),
780 conn,
781 msg.clone(),
782 )));
783 }
784 }
785
786 async fn forward_task(
787 config: Arc<Config>,
788 conn: Connection,
789 msg: RecvSegment,
790 ) -> (NodeId, Result<(), RpcTaskError>) {
791 let id = conn.id;
792 let res = async move {
793 tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??;
794 Ok(())
795 }
796 .await;
797 (id, res)
798 }
799}
800
801/// Iroh-streamplace node that can send, forward or receive stream segments.
802#[derive(Clone, uniffi::Object)]
803pub struct Node {
804 client: Arc<db::Db>,
805 write: Arc<db::WriteScope>,
806 api: irpc::Client<ApiProtocol>,
807}
808
809impl Node {
810 pub(crate) async fn new_in_runtime(
811 config: Config,
812 handler: HandlerMode,
813 ) -> Result<Arc<Self>, CreateError> {
814 let mode_str = Bytes::from(handler.mode_str());
815 let secret_key =
816 SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| {
817 CreateError::PrivateKey {
818 size: e.len() as u64,
819 }
820 })?);
821 let topic =
822 TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| {
823 CreateError::Topic {
824 size: e.len() as u64,
825 }
826 })?);
827 let relay_mode = if config.disable_relay {
828 RelayMode::Disabled
829 } else {
830 RelayMode::Default
831 };
832 let sp = StaticProvider::new();
833 let endpoint = iroh::Endpoint::builder()
834 .secret_key(secret_key)
835 .relay_mode(relay_mode)
836 .discovery(sp.clone())
837 .bind()
838 .await
839 .map_err(|e| CreateError::Bind {
840 message: e.to_string(),
841 })?;
842 let (api, actor) = Actor::spawn(endpoint, sp, topic, config, handler)
843 .await
844 .map_err(|e| CreateError::Subscribe {
845 message: e.to_string(),
846 })?;
847 api.node_scope()
848 .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str)
849 .await
850 .ok();
851 tokio::spawn(actor);
852 Ok(Arc::new(api))
853 }
854}
855
856/// DataHandler trait that is exported to go for receiving data callbacks.
857#[uniffi::export(with_foreign)]
858#[async_trait::async_trait]
859pub trait DataHandler: Send + Sync {
860 async fn handle_data(
861 &self,
862 from: Arc<crate::public_key::PublicKey>,
863 topic: String,
864 data: Vec<u8>,
865 );
866}
867
868#[uniffi::export]
869impl Node {
870 /// Create a new streamplace client node.
871 #[uniffi::constructor]
872 pub async fn sender(config: Config) -> Result<Arc<Self>, CreateError> {
873 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender))
874 }
875
876 #[uniffi::constructor]
877 pub async fn forwarder(config: Config) -> Result<Arc<Self>, CreateError> {
878 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder))
879 }
880
881 #[uniffi::constructor]
882 pub async fn receiver(
883 config: Config,
884 handler: Arc<dyn DataHandler>,
885 ) -> Result<Arc<Self>, CreateError> {
886 RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Receiver(handler)))
887 }
888
889 /// Get a handle to the db to watch for changes locally or globally.
890 pub fn db(&self) -> Arc<db::Db> {
891 self.client.clone()
892 }
893
894 /// Get a handle to the write scope for this node.
895 ///
896 /// This is equivalent to calling `db.write(...)` with the secret key used to create the node.
897 pub fn node_scope(&self) -> Arc<db::WriteScope> {
898 self.write.clone()
899 }
900
901 /// Subscribe to updates for a given stream from a remote node.
902 pub async fn subscribe(
903 &self,
904 key: String,
905 remote_id: Arc<crate::public_key::PublicKey>,
906 ) -> Result<(), PutError> {
907 self.api
908 .rpc(api::Subscribe {
909 key,
910 remote_id: remote_id.as_ref().into(),
911 })
912 .await
913 .map_err(|e| PutError::Irpc {
914 message: e.to_string(),
915 })
916 }
917
918 /// Unsubscribe from updates for a given stream from a remote node.
919 pub async fn unsubscribe(
920 &self,
921 key: String,
922 remote_id: Arc<crate::public_key::PublicKey>,
923 ) -> Result<(), PutError> {
924 self.api
925 .rpc(api::Unsubscribe {
926 key,
927 remote_id: remote_id.as_ref().into(),
928 })
929 .await
930 .map_err(|e| PutError::Irpc {
931 message: e.to_string(),
932 })
933 }
934
935 /// Send a segment to all subscribers of the given stream.
936 pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> {
937 debug!(key = &key, data_len = data.len(), "Node.send_segment");
938 self.api
939 .rpc(api::SendSegment {
940 key,
941 data: data.into(),
942 })
943 .await
944 .map_err(|e| PutError::Irpc {
945 message: e.to_string(),
946 })
947 }
948
949 /// Join peers by their node tickets.
950 pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
951 let peers = peers
952 .iter()
953 .map(|p| NodeTicket::from_str(p))
954 .collect::<Result<Vec<_>, _>>()
955 .map_err(|e| JoinPeersError::Ticket {
956 message: e.to_string(),
957 })?;
958 let addrs = peers
959 .iter()
960 .map(|t| t.node_addr().clone())
961 .collect::<Vec<_>>();
962 self.api
963 .rpc(api::JoinPeers { peers: addrs })
964 .await
965 .map_err(|e| JoinPeersError::Irpc {
966 message: e.to_string(),
967 })
968 }
969
970 /// Add tickets for remote peers
971 pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
972 let peers = peers
973 .iter()
974 .map(|p| NodeTicket::from_str(p))
975 .collect::<Result<Vec<_>, _>>()
976 .map_err(|e| JoinPeersError::Ticket {
977 message: e.to_string(),
978 })?;
979 let addrs = peers
980 .iter()
981 .map(|t| t.node_addr().clone())
982 .collect::<Vec<_>>();
983 self.api
984 .rpc(api::AddTickets { peers: addrs })
985 .await
986 .map_err(|e| JoinPeersError::Irpc {
987 message: e.to_string(),
988 })
989 }
990
991 /// Subscribe to a stream with a ticket - handles discovery, dialing, and subscription atomically
992 pub async fn subscribe_with_ticket(&self, key: String, ticket: String) -> Result<(), JoinPeersError> {
993 let node_ticket = NodeTicket::from_str(&ticket)
994 .map_err(|e| JoinPeersError::Ticket {
995 message: e.to_string(),
996 })?;
997 let addr = node_ticket.node_addr().clone();
998 self.api
999 .rpc(api::SubscribeWithTicket { key, ticket: addr })
1000 .await
1001 .map_err(|e| JoinPeersError::Irpc {
1002 message: e.to_string(),
1003 })
1004 }
1005
1006 /// Get this node's ticket.
1007 pub async fn ticket(&self) -> Result<String, PutError> {
1008 let addr = self
1009 .api
1010 .rpc(api::GetNodeAddr)
1011 .await
1012 .map_err(|e| PutError::Irpc {
1013 message: e.to_string(),
1014 })?;
1015 Ok(NodeTicket::from(addr).to_string())
1016 }
1017
1018 /// Get this node's node ID.
1019 pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> {
1020 let addr = self
1021 .api
1022 .rpc(api::GetNodeAddr)
1023 .await
1024 .map_err(|e| PutError::Irpc {
1025 message: e.to_string(),
1026 })?;
1027 Ok(Arc::new(addr.node_id.into()))
1028 }
1029
1030 /// Shutdown the node, including the streaming system and the metadata db.
1031 pub async fn shutdown(&self) -> Result<(), ShutdownError> {
1032 // shut down both the streams and the db concurrently, even if one fails
1033 let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown());
1034 res1?;
1035 res2?;
1036 Ok(())
1037 }
1038}
1039
1040impl Node {
1041 async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> {
1042 self.api
1043 .rpc(api::Shutdown)
1044 .await
1045 .map_err(|e| ShutdownError::Irpc {
1046 message: e.to_string(),
1047 })
1048 }
1049}