···1uniffi::setup_scaffolding!();
23-pub mod c2pa;
4-pub mod error;
5pub mod node_addr;
6pub mod public_key;
7-pub mod streams;
89use std::sync::{LazyLock, Once};
1011-mod db;
12-pub use db::*;
13-#[cfg(test)]
14-mod tests;
15-16-#[cfg(test)]
17-mod test_stream;
18-19mod socket;
20pub use socket::*;
21···26/// Ensure logging is only initialized once
27static LOGGING_INIT: Once = Once::new();
2829-use std::{
30- collections::{BTreeMap, BTreeSet, HashSet},
31- str::FromStr,
32- sync::Arc,
33-};
34-35-use bytes::Bytes;
36-use iroh::{NodeId, PublicKey, RelayMode, SecretKey, discovery::static_provider::StaticProvider};
37-use iroh_base::ticket::NodeTicket;
38-use iroh_gossip::{net::Gossip, proto::TopicId};
39-use irpc::{WithChannels, rpc::RemoteService};
40-use irpc_iroh::{IrohProtocol, IrohRemoteConnection};
41-use n0_future::future::Boxed;
42-43-mod rpc {
44- //! Protocol API
45- use bytes::Bytes;
46- use iroh::NodeId;
47- use irpc::{channel::oneshot, rpc_requests};
48- use serde::{Deserialize, Serialize};
49-50- pub const ALPN: &[u8] = b"/iroh/streamplace/1";
51-52- /// Subscribe to the given `key`
53- #[derive(Debug, Serialize, Deserialize)]
54- pub struct Subscribe {
55- pub key: String,
56- // TODO: verify
57- pub remote_id: NodeId,
58- }
59-60- /// Unsubscribe from the given `key`
61- #[derive(Debug, Serialize, Deserialize)]
62- pub struct Unsubscribe {
63- pub key: String,
64- // TODO: verify
65- pub remote_id: NodeId,
66- }
67-68- // #[derive(Debug, Serialize, Deserialize)]
69- // pub struct SendSegment {
70- // pub key: String,
71- // pub data: Bytes,
72- // }
73-74- #[derive(Debug, Clone, Serialize, Deserialize)]
75- pub struct RecvSegment {
76- pub from: NodeId,
77- pub key: String,
78- pub data: Bytes,
79- }
80-81- // Use the macro to generate both the Protocol and Message enums
82- // plus implement Channels for each type
83- #[rpc_requests(message = Message)]
84- #[derive(Serialize, Deserialize, Debug)]
85- pub enum Protocol {
86- #[rpc(tx=oneshot::Sender<()>)]
87- Subscribe(Subscribe),
88- #[rpc(tx=oneshot::Sender<()>)]
89- Unsubscribe(Unsubscribe),
90- #[rpc(tx=oneshot::Sender<()>)]
91- RecvSegment(RecvSegment),
92- }
93-}
94-95-mod api {
96- //! Protocol API
97- use bytes::Bytes;
98- use iroh::{NodeAddr, NodeId};
99- use irpc::{channel::oneshot, rpc_requests};
100- use serde::{Deserialize, Serialize};
101-102- /// Subscribe to the given `key`
103- #[derive(Debug, Serialize, Deserialize)]
104- pub struct Subscribe {
105- pub key: String,
106- // TODO: verify
107- pub remote_id: NodeId,
108- }
109-110- /// Unsubscribe from the given `key`
111- #[derive(Debug, Serialize, Deserialize)]
112- pub struct Unsubscribe {
113- pub key: String,
114- // TODO: verify
115- pub remote_id: NodeId,
116- }
117-118- #[derive(Debug, Serialize, Deserialize)]
119- pub struct SendSegment {
120- pub key: String,
121- pub data: Bytes,
122- }
123-124- #[derive(Debug, Serialize, Deserialize)]
125- pub struct JoinPeers {
126- pub peers: Vec<NodeAddr>,
127- }
128-129- #[derive(Debug, Serialize, Deserialize)]
130- pub struct AddTickets {
131- pub peers: Vec<NodeAddr>,
132- }
133-134- #[derive(Debug, Serialize, Deserialize)]
135- pub struct GetNodeAddr;
136-137- #[derive(Debug, Serialize, Deserialize)]
138- pub struct Shutdown;
139-140- // Use the macro to generate both the Protocol and Message enums
141- // plus implement Channels for each type
142- #[rpc_requests(message = Message)]
143- #[derive(Serialize, Deserialize, Debug)]
144- pub enum Protocol {
145- #[rpc(tx=oneshot::Sender<()>)]
146- Subscribe(Subscribe),
147- #[rpc(tx=oneshot::Sender<()>)]
148- Unsubscribe(Unsubscribe),
149- #[rpc(tx=oneshot::Sender<()>)]
150- SendSegment(SendSegment),
151- #[rpc(tx=oneshot::Sender<()>)]
152- JoinPeers(JoinPeers),
153- #[rpc(tx=oneshot::Sender<()>)]
154- AddTickets(AddTickets),
155- #[rpc(tx=oneshot::Sender<NodeAddr>)]
156- GetNodeAddr(GetNodeAddr),
157- #[rpc(tx=oneshot::Sender<()>)]
158- Shutdown(Shutdown),
159- }
160-}
161-use api::{Message as ApiMessage, Protocol as ApiProtocol};
162-use n0_future::{FuturesUnordered, StreamExt};
163-use rpc::{Message as RpcMessage, Protocol as RpcProtocol};
164-use snafu::Snafu;
165-use tracing::{Instrument, debug, error, trace, trace_span, warn};
166-167-use crate::rpc::RecvSegment;
168-169/// Initialize logging with the default subscriber that respects RUST_LOG environment variable.
170/// This function is safe to call multiple times - it will only initialize logging once.
171#[uniffi::export]
···191 tracing_subscriber::fmt().with_env_filter(filter).init();
192 });
193}
194-195-pub(crate) enum HandlerMode {
196- Sender,
197- Forwarder,
198- Receiver(Arc<dyn DataHandler>),
199-}
200-201-impl HandlerMode {
202- pub fn mode_str(&self) -> &'static str {
203- match self {
204- HandlerMode::Sender => "sender",
205- HandlerMode::Forwarder => "forwarder",
206- HandlerMode::Receiver(_) => "receiver",
207- }
208- }
209-}
210-211-type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>;
212-213-/// Actor that contains both a kv db for metadata and a handler for the rpc protocol.
214-///
215-/// This can be used both for sender and receiver nodes. Sender nodes will just set the
216-/// handler to None.
217-struct Actor {
218- /// Receiver for rpc messages from remote nodes
219- rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>,
220- /// Receiver for API messages from the user
221- api_rx: tokio::sync::mpsc::Receiver<ApiMessage>,
222- /// nodes I need to send to for each stream
223- subscribers: BTreeMap<String, BTreeSet<NodeId>>,
224- /// nodes I am subscribed to
225- subscriptions: BTreeMap<String, NodeId>,
226- /// lightweight typed connection pool
227- connections: ConnectionPool,
228- /// How to handle incoming data
229- handler: HandlerMode,
230- /// Static provider for node discovery
231- sp: StaticProvider,
232- /// Iroh protocol router, I need to keep it around to keep the protocol alive
233- router: iroh::protocol::Router,
234- /// Metadata db
235- client: db::Db,
236- /// Write scope for this node for the metadata db
237- write: db::WriteScope,
238- /// Ongoing tasks
239- tasks: Tasks,
240- /// Configuration, needed for timeouts etc.
241- config: Arc<Config>,
242-}
243-244-#[derive(Debug, Clone)]
245-struct Connection {
246- id: NodeId,
247- rpc: irpc::Client<RpcProtocol>,
248-}
249-250-#[derive(Debug, Snafu)]
251-enum RpcTaskError {
252- #[snafu(transparent)]
253- Task { source: irpc::Error },
254- #[snafu(transparent)]
255- Timeout { source: tokio::time::error::Elapsed },
256-}
257-258-struct ConnectionPool {
259- endpoint: iroh::Endpoint,
260- connections: BTreeMap<NodeId, Connection>,
261-}
262-263-impl ConnectionPool {
264- fn new(endpoint: iroh::Endpoint) -> Self {
265- Self {
266- endpoint,
267- connections: BTreeMap::new(),
268- }
269- }
270-271- /// Cheap conn pool hack
272- fn get(&mut self, remote: &NodeId) -> Connection {
273- if !self.connections.contains_key(remote) {
274- trace!(remote = %remote.fmt_short(),"ConnectionPool.get is inserting a new remote connection");
275- let conn = IrohRemoteConnection::new(
276- self.endpoint.clone(),
277- (*remote).into(),
278- rpc::ALPN.to_vec(),
279- );
280- let conn = Connection {
281- rpc: irpc::Client::boxed(conn),
282- id: *remote,
283- };
284- self.connections.insert(*remote, conn);
285- }
286- self.connections
287- .get_mut(remote)
288- .expect("just inserted")
289- .clone()
290- }
291-292- fn remove(&mut self, remote: &NodeId) {
293- self.connections.remove(remote);
294- }
295-}
296-297-impl Actor {
298- pub async fn spawn(
299- endpoint: iroh::Endpoint,
300- sp: StaticProvider,
301- topic: iroh_gossip::proto::TopicId,
302- config: Config,
303- handler: HandlerMode,
304- ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> {
305- let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(512);
306- let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(512);
307- let gossip = Gossip::builder().spawn(endpoint.clone());
308- let id = endpoint.node_id();
309- let router = iroh::protocol::Router::builder(endpoint.clone())
310- .accept(iroh_gossip::ALPN, gossip.clone())
311- .accept(
312- rpc::ALPN,
313- IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())),
314- )
315- .spawn();
316- let topic = gossip.subscribe(topic, vec![]).await?;
317- let secret = router.endpoint().secret_key().clone();
318- let db_config = Default::default();
319- let client = iroh_smol_kv::Client::local(topic, db_config);
320- let write = db::WriteScope::new(client.write(secret.clone()));
321- let client = db::Db::new(client);
322- let actor = Self {
323- rpc_rx,
324- api_rx,
325- subscribers: BTreeMap::new(),
326- subscriptions: BTreeMap::new(),
327- connections: ConnectionPool::new(router.endpoint().clone()),
328- handler,
329- router,
330- sp,
331- write: write.clone(),
332- client: client.clone(),
333- tasks: FuturesUnordered::new(),
334- config: Arc::new(config),
335- };
336- let api = Node {
337- client: Arc::new(client),
338- write: Arc::new(write),
339- api: irpc::Client::local(api_tx),
340- };
341- Ok((
342- api,
343- actor
344- .run()
345- .instrument(trace_span!("actor", id=%id.fmt_short())),
346- ))
347- }
348-349- async fn run(mut self) {
350- loop {
351- tokio::select! {
352- msg = self.rpc_rx.recv() => {
353- trace!("received local rpc message");
354- let Some(msg) = msg else {
355- error!("rpc channel closed");
356- break;
357- };
358- self.handle_rpc(msg).instrument(trace_span!("rpc")).await;
359- }
360- msg = self.api_rx.recv() => {
361- trace!("received remote rpc message");
362- let Some(msg) = msg else {
363- break;
364- };
365- if let Some(shutdown) = self.handle_api(msg).instrument(trace_span!("api")).await {
366- shutdown.send(()).await.ok();
367- break;
368- }
369- }
370- res = self.tasks.next(), if !self.tasks.is_empty() => {
371- trace!("processing task");
372- let Some((remote_id, res)) = res else {
373- error!("task finished but no result");
374- break;
375- };
376- match res {
377- Ok(()) => {}
378- Err(RpcTaskError::Timeout { source }) => {
379- warn!("call to {remote_id} timed out: {source}");
380- }
381- Err(RpcTaskError::Task { source }) => {
382- warn!("call to {remote_id} failed: {source}");
383- }
384- }
385- self.connections.remove(&remote_id);
386- }
387- }
388- }
389- warn!("RPC Actor loop has closed");
390- }
391-392- async fn update_subscriber_meta(&mut self, key: &str) {
393- let n = self
394- .subscribers
395- .get(key)
396- .map(|s| s.len())
397- .unwrap_or_default();
398- let v = n.to_string().into_bytes();
399- self.write
400- .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into())
401- .await
402- .ok();
403- }
404-405- /// Requests from remote nodes
406- async fn handle_rpc(&mut self, msg: RpcMessage) {
407- trace!("RPC.handle_rpc");
408- match msg {
409- RpcMessage::Subscribe(msg) => {
410- trace!(inner = ?msg.inner, "RpcMessage::Subscribe");
411- let WithChannels {
412- tx,
413- inner: rpc::Subscribe { key, remote_id },
414- ..
415- } = msg;
416- self.subscribers
417- .entry(key.clone())
418- .or_default()
419- .insert(remote_id);
420- self.update_subscriber_meta(&key).await;
421- tx.send(()).await.ok();
422- }
423- RpcMessage::Unsubscribe(msg) => {
424- debug!(inner = ?msg.inner, "RpcMessage::Unsubscribe");
425- let WithChannels {
426- tx,
427- inner: rpc::Unsubscribe { key, remote_id },
428- ..
429- } = msg;
430- if let Some(e) = self.subscribers.get_mut(&key)
431- && !e.remove(&remote_id)
432- {
433- warn!(
434- "unsubscribe: no subscription for {} from {}",
435- key, remote_id
436- );
437- }
438- if let Some(subscriptions) = self.subscribers.get(&key)
439- && subscriptions.is_empty()
440- {
441- self.subscribers.remove(&key);
442- }
443- self.update_subscriber_meta(&key).await;
444- tx.send(()).await.ok();
445- }
446- RpcMessage::RecvSegment(msg) => {
447- trace!(inner = ?msg.inner, "RpcMessage::RecvSegment");
448- let WithChannels {
449- tx,
450- inner: rpc::RecvSegment { key, from, data },
451- ..
452- } = msg;
453- match &self.handler {
454- HandlerMode::Sender => {
455- warn!("received segment but in sender mode");
456- }
457- HandlerMode::Forwarder => {
458- trace!("forwarding segment");
459- if let Some(remotes) = self.subscribers.get(&key) {
460- Self::handle_send(
461- &mut self.tasks,
462- &mut self.connections,
463- &self.config,
464- key,
465- data,
466- remotes,
467- );
468- } else {
469- trace!("no subscribers for stream {}", key);
470- }
471- }
472- HandlerMode::Receiver(handler) => {
473- if self.subscriptions.contains_key(&key) {
474- let from = Arc::new(from.into());
475- handler.handle_data(from, key, data.to_vec()).await;
476- } else {
477- warn!("received segment for unsubscribed key: {}", key);
478- }
479- }
480- };
481- tx.send(()).await.ok();
482- }
483- }
484- }
485-486- async fn handle_api(&mut self, msg: ApiMessage) -> Option<irpc::channel::oneshot::Sender<()>> {
487- trace!("RPC.handle_api");
488- match msg {
489- ApiMessage::SendSegment(msg) => {
490- trace!(inner = ?msg.inner, "ApiMessage::SendSegment");
491- let WithChannels {
492- tx,
493- inner: api::SendSegment { key, data },
494- ..
495- } = msg;
496- if let Some(remotes) = self.subscribers.get(&key) {
497- Self::handle_send(
498- &mut self.tasks,
499- &mut self.connections,
500- &self.config,
501- key,
502- data,
503- remotes,
504- );
505- } else {
506- trace!("no subscribers for stream {}", key);
507- }
508- tx.send(()).await.ok();
509- }
510- ApiMessage::Subscribe(msg) => {
511- trace!(inner = ?msg.inner, "ApiMessage::Subscribe");
512- let WithChannels {
513- tx,
514- inner: api::Subscribe { key, remote_id },
515- ..
516- } = msg;
517- let conn = self.connections.get(&remote_id);
518- tx.send(()).await.ok();
519- trace!(remote = %remote_id.fmt_short(), key = %key, "send rpc::Subscribe message");
520- conn.rpc
521- .rpc(rpc::Subscribe {
522- key: key.clone(),
523- remote_id: self.node_id(),
524- })
525- .await
526- .ok();
527- trace!(remote = %remote_id.fmt_short(), key = %key, "inserting subscription");
528- self.subscriptions.insert(key, remote_id);
529- trace!("finished inserting subscription");
530- }
531- ApiMessage::Unsubscribe(msg) => {
532- trace!(inner = ?msg.inner, "ApiMessage::Unsubscribe");
533- let WithChannels {
534- tx,
535- inner: api::Unsubscribe { key, remote_id },
536- ..
537- } = msg;
538- let conn = self.connections.get(&remote_id);
539- tx.send(()).await.ok();
540- conn.rpc
541- .rpc(rpc::Unsubscribe {
542- key: key.clone(),
543- remote_id: self.node_id(),
544- })
545- .await
546- .ok();
547- self.subscriptions.remove(&key);
548- }
549- ApiMessage::AddTickets(msg) => {
550- trace!(inner = ?msg.inner, "ApiMessage::AddTickets");
551- let WithChannels {
552- tx,
553- inner: api::AddTickets { peers },
554- ..
555- } = msg;
556- for addr in &peers {
557- self.sp.add_node_info(addr.clone());
558- }
559- // self.client.inner().join_peers(ids).await.ok();
560- tx.send(()).await.ok();
561- }
562- ApiMessage::JoinPeers(msg) => {
563- trace!(inner = ?msg.inner, "ApiMessage::JoinPeers");
564- let WithChannels {
565- tx,
566- inner: api::JoinPeers { peers },
567- ..
568- } = msg;
569- let ids = peers
570- .iter()
571- .map(|a| a.node_id)
572- .filter(|id| *id != self.node_id())
573- .collect::<HashSet<_>>();
574- for addr in &peers {
575- self.sp.add_node_info(addr.clone());
576- }
577- self.client.inner().join_peers(ids).await.ok();
578- tx.send(()).await.ok();
579- }
580- ApiMessage::GetNodeAddr(msg) => {
581- trace!(inner = ?msg.inner, "ApiMessage::GetNodeAddr");
582- let WithChannels { tx, .. } = msg;
583- if !self.config.disable_relay {
584- // don't await home relay if we have disabled relays, this will hang forever
585- self.router.endpoint().online().await;
586- }
587- let addr = self.router.endpoint().node_addr();
588- tx.send(addr).await.ok();
589- }
590- ApiMessage::Shutdown(msg) => {
591- trace!(inner = ?msg.inner, "ApiMessage::Shutdown");
592- return Some(msg.tx);
593- }
594- }
595- None
596- }
597-598- fn handle_send(
599- tasks: &mut Tasks,
600- connections: &mut ConnectionPool,
601- config: &Arc<Config>,
602- key: String,
603- data: Bytes,
604- remotes: &BTreeSet<NodeId>,
605- ) {
606- let me = connections.endpoint.node_id();
607- let msg = rpc::RecvSegment {
608- key,
609- data,
610- from: me,
611- };
612- for remote in remotes {
613- trace!(remote = %remote.fmt_short(), key = %msg.key, "handle_send to remote");
614- let conn = connections.get(remote);
615- tasks.push(Box::pin(Self::forward_task(
616- config.clone(),
617- conn,
618- msg.clone(),
619- )));
620- }
621- }
622-623- async fn forward_task(
624- config: Arc<Config>,
625- conn: Connection,
626- msg: RecvSegment,
627- ) -> (NodeId, Result<(), RpcTaskError>) {
628- let id = conn.id;
629- let res = async move {
630- tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??;
631- Ok(())
632- }
633- .await;
634- (id, res)
635- }
636-637- fn node_id(&self) -> PublicKey {
638- self.router.endpoint().node_id()
639- }
640-}
641-642-/// Iroh-streamplace node that can send, forward or receive stream segments.
643-#[derive(Clone, uniffi::Object)]
644-pub struct Node {
645- client: Arc<db::Db>,
646- write: Arc<db::WriteScope>,
647- api: irpc::Client<ApiProtocol>,
648-}
649-650-impl Node {
651- pub(crate) async fn new_in_runtime(
652- config: Config,
653- handler: HandlerMode,
654- ) -> Result<Arc<Self>, CreateError> {
655- let mode_str = Bytes::from(handler.mode_str());
656- let secret_key =
657- SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| {
658- CreateError::PrivateKey {
659- size: e.len() as u64,
660- }
661- })?);
662- let topic =
663- TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| {
664- CreateError::Topic {
665- size: e.len() as u64,
666- }
667- })?);
668- let relay_mode = if config.disable_relay {
669- RelayMode::Disabled
670- } else {
671- RelayMode::Default
672- };
673- let sp = StaticProvider::new();
674- let endpoint = iroh::Endpoint::builder()
675- .secret_key(secret_key)
676- .relay_mode(relay_mode)
677- .discovery(sp.clone())
678- .bind()
679- .await
680- .map_err(|e| CreateError::Bind {
681- message: e.to_string(),
682- })?;
683- let (api, actor) = Actor::spawn(endpoint, sp, topic, config, handler)
684- .await
685- .map_err(|e| CreateError::Subscribe {
686- message: e.to_string(),
687- })?;
688- api.node_scope()
689- .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str)
690- .await
691- .ok();
692- tokio::spawn(actor);
693- Ok(Arc::new(api))
694- }
695-}
696-697-/// DataHandler trait that is exported to go for receiving data callbacks.
698-#[uniffi::export(with_foreign)]
699-#[async_trait::async_trait]
700-pub trait DataHandler: Send + Sync {
701- async fn handle_data(
702- &self,
703- from: Arc<crate::public_key::PublicKey>,
704- topic: String,
705- data: Vec<u8>,
706- );
707-}
708-709-#[uniffi::export]
710-impl Node {
711- /// Create a new streamplace client node.
712- #[uniffi::constructor]
713- pub async fn sender(config: Config) -> Result<Arc<Self>, CreateError> {
714- RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender))
715- }
716-717- #[uniffi::constructor]
718- pub async fn forwarder(config: Config) -> Result<Arc<Self>, CreateError> {
719- RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder))
720- }
721-722- #[uniffi::constructor]
723- pub async fn receiver(
724- config: Config,
725- handler: Arc<dyn DataHandler>,
726- ) -> Result<Arc<Self>, CreateError> {
727- RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Receiver(handler)))
728- }
729-730- /// Get a handle to the db to watch for changes locally or globally.
731- pub fn db(&self) -> Arc<db::Db> {
732- self.client.clone()
733- }
734-735- /// Get a handle to the write scope for this node.
736- ///
737- /// This is equivalent to calling `db.write(...)` with the secret key used to create the node.
738- pub fn node_scope(&self) -> Arc<db::WriteScope> {
739- self.write.clone()
740- }
741-742- /// Subscribe to updates for a given stream from a remote node.
743- pub async fn subscribe(
744- &self,
745- key: String,
746- remote_id: Arc<crate::public_key::PublicKey>,
747- ) -> Result<(), PutError> {
748- self.api
749- .rpc(api::Subscribe {
750- key,
751- remote_id: remote_id.as_ref().into(),
752- })
753- .await
754- .map_err(|e| PutError::Irpc {
755- message: e.to_string(),
756- })
757- }
758-759- /// Unsubscribe from updates for a given stream from a remote node.
760- pub async fn unsubscribe(
761- &self,
762- key: String,
763- remote_id: Arc<crate::public_key::PublicKey>,
764- ) -> Result<(), PutError> {
765- self.api
766- .rpc(api::Unsubscribe {
767- key,
768- remote_id: remote_id.as_ref().into(),
769- })
770- .await
771- .map_err(|e| PutError::Irpc {
772- message: e.to_string(),
773- })
774- }
775-776- /// Send a segment to all subscribers of the given stream.
777- pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> {
778- debug!(key = &key, data_len = data.len(), "Node.send_segment");
779- self.api
780- .rpc(api::SendSegment {
781- key,
782- data: data.into(),
783- })
784- .await
785- .map_err(|e| PutError::Irpc {
786- message: e.to_string(),
787- })
788- }
789-790- /// Join peers by their node tickets.
791- pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
792- let peers = peers
793- .iter()
794- .map(|p| NodeTicket::from_str(p))
795- .collect::<Result<Vec<_>, _>>()
796- .map_err(|e| JoinPeersError::Ticket {
797- message: e.to_string(),
798- })?;
799- let addrs = peers
800- .iter()
801- .map(|t| t.node_addr().clone())
802- .collect::<Vec<_>>();
803- self.api
804- .rpc(api::JoinPeers { peers: addrs })
805- .await
806- .map_err(|e| JoinPeersError::Irpc {
807- message: e.to_string(),
808- })
809- }
810-811- /// Add tickets for remote peers
812- pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
813- let peers = peers
814- .iter()
815- .map(|p| NodeTicket::from_str(p))
816- .collect::<Result<Vec<_>, _>>()
817- .map_err(|e| JoinPeersError::Ticket {
818- message: e.to_string(),
819- })?;
820- let addrs = peers
821- .iter()
822- .map(|t| t.node_addr().clone())
823- .collect::<Vec<_>>();
824- self.api
825- .rpc(api::AddTickets { peers: addrs })
826- .await
827- .map_err(|e| JoinPeersError::Irpc {
828- message: e.to_string(),
829- })
830- }
831-832- /// Get this node's ticket.
833- pub async fn ticket(&self) -> Result<String, PutError> {
834- let addr = self
835- .api
836- .rpc(api::GetNodeAddr)
837- .await
838- .map_err(|e| PutError::Irpc {
839- message: e.to_string(),
840- })?;
841- Ok(NodeTicket::from(addr).to_string())
842- }
843-844- /// Get this node's node ID.
845- pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> {
846- let addr = self
847- .api
848- .rpc(api::GetNodeAddr)
849- .await
850- .map_err(|e| PutError::Irpc {
851- message: e.to_string(),
852- })?;
853- Ok(Arc::new(addr.node_id.into()))
854- }
855-856- /// Shutdown the node, including the streaming system and the metadata db.
857- pub async fn shutdown(&self) -> Result<(), ShutdownError> {
858- // shut down both the streams and the db concurrently, even if one fails
859- let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown());
860- res1?;
861- res2?;
862- Ok(())
863- }
864-}
865-866-impl Node {
867- async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> {
868- self.api
869- .rpc(api::Shutdown)
870- .await
871- .map_err(|e| ShutdownError::Irpc {
872- message: e.to_string(),
873- })
874- }
875-}
···1uniffi::setup_scaffolding!();
2003pub mod node_addr;
4pub mod public_key;
056use std::sync::{LazyLock, Once};
7000000008mod socket;
9pub use socket::*;
10···15/// Ensure logging is only initialized once
16static LOGGING_INIT: Once = Once::new();
170000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000018/// Initialize logging with the default subscriber that respects RUST_LOG environment variable.
19/// This function is safe to call multiple times - it will only initialize logging once.
20#[uniffi::export]
···40 tracing_subscriber::fmt().with_env_filter(filter).init();
41 });
42}
0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
···46 self.alpn.clone()
47 }
4849+ /// Accept an incoming connection and return a [`Stream`].
50+ pub async fn accept(&self) -> Result<Arc<Stream>, AcceptError> {
51 RUNTIME.block_on(self.accept0())
52 }
5354+ /// Connect to a peer at the given [`NodeAddr`] and return a [`Stream`].
55 pub async fn connect(
56 &self,
57 addr: Arc<crate::node_addr::NodeAddr>,
58+ ) -> Result<Arc<Stream>, ConnectError> {
59 RUNTIME.block_on(self.connect0(addr))
60 }
61···83 })
84 }
8586+ async fn accept0(&self) -> Result<Arc<Stream>, AcceptError> {
87 let incoming = self
88 .endpoint
89 .accept()
···97 let (send, recv) = conn.accept_bi().await.map_err(|e| AcceptError::Other {
98 message: e.to_string(),
99 })?;
100+ Ok(Arc::new(Stream {
101 recv: tokio::sync::Mutex::new(recv),
102 send: tokio::sync::Mutex::new(send),
103 conn,
···107 async fn connect0(
108 &self,
109 addr: Arc<crate::node_addr::NodeAddr>,
110+ ) -> Result<Arc<Stream>, ConnectError> {
111 let node_addr: iroh::NodeAddr =
112 (*addr)
113 .clone()
···125 let (send, recv) = conn.open_bi().await.map_err(|e| ConnectError::Other {
126 message: e.to_string(),
127 })?;
128+ Ok(Arc::new(Stream {
129 recv: tokio::sync::Mutex::new(recv),
130 send: tokio::sync::Mutex::new(send),
131 conn,
···156157#[derive(Debug, thiserror::Error, uniffi::Error)]
158#[uniffi(flat_error)]
159+pub enum WriteError {
160 #[error("Other error: {message}")]
161 Other { message: String },
162}
···168 Other { message: String },
169}
170171+/// A bidirectional stream over an iroh connection.
172+///
173+/// In QUIC streams and connections are separate concepts. A connection can have multiple streams.
174+/// For simplicity we expose a single bidirectional stream per connection here.
175#[derive(Debug, uniffi::Object)]
176+pub struct Stream {
177 recv: tokio::sync::Mutex<iroh::endpoint::RecvStream>,
178 send: tokio::sync::Mutex<iroh::endpoint::SendStream>,
179 conn: iroh::endpoint::Connection,
180}
181182#[uniffi::export]
183+impl Stream {
184+185+ /// Read up to n bytes from the stream.
186+ ///
187+ /// Due to the way uniffi works, this can't have the signature that is
188+ /// usually used in golang code. Instead of taking a mutable buffer and
189+ /// returning the number of bytes read, it takes the number of bytes to read
190+ /// and returns a vector with the data read.
191+ ///
192+ /// Wrapping this into a more idiomatic golang interface needs a few lines
193+ /// on the golang side.
194 pub async fn read(&self, n: u64) -> Result<Vec<u8>, ReadError> {
195 let mut buf = vec![0u8; n as usize];
196 let n = self
···209 Ok(buf)
210 }
211212+ /// Write all bytes in buf to the stream.
213+ pub async fn write_all(&self, buf: &[u8]) -> Result<(), WriteError> {
214+ self.send.lock().await.write_all(buf).await.map_err(|e| WriteError::Other {
215 message: e.to_string(),
216 })
217 }
218219+ /// Write up to n bytes from buf to the stream.
220+ pub async fn write(&self, buf: &[u8]) -> Result<u32, WriteError> {
221 let n = self.send
222 .lock()
223 .await
224 .write(&buf)
225 .await
226+ .map_err(|e| WriteError::Other {
227 message: e.to_string(),
228 })?;
229 Ok(n as u32)
230 }
231232+ /// Close the write side of the stream.
233+ ///
234+ /// Note: this does not close the underlying connection.
235+ pub async fn close_write(&self) -> Result<(), WriteError> {
236+ self.send.lock().await.finish().map_err(|e| WriteError::Other {
237 message: e.to_string(),
238 })?;
239 Ok(())
240 }
241242+ /// Close the read side of the stream.
243+ ///
244+ /// Note: this does not close the underlying connection.
245 pub async fn close_read(&self) -> Result<(), ReadError> {
246 self.recv.lock().await.stop(0u32.into()).map_err(|e| ReadError::Other {
247 message: e.to_string(),
···249 Ok(())
250 }
251252+ /// Close the underlying connection.
253 pub fn close(&self) {
254 self.conn.close(0u32.into(), b"");
255 }
256257+ /// Wait until the connection is closed.
258 pub async fn closed(&self) {
259 RUNTIME.block_on(async {
260 let _reason = self.conn.closed().await;
-168
rust/iroh-streamplace/src/streams.rs
···1-// Copyright 2023 Adobe. All rights reserved.
2-// This file is licensed to you under the Apache License,
3-// Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
4-// or the MIT license (http://opensource.org/licenses/MIT),
5-// at your option.
6-7-// Unless required by applicable law or agreed to in writing,
8-// this software is distributed on an "AS IS" BASIS, WITHOUT
9-// WARRANTIES OR REPRESENTATIONS OF ANY KIND, either express or
10-// implied. See the LICENSE-MIT and LICENSE-APACHE files for the
11-// specific language governing permissions and limitations under
12-// each license.
13-14-use std::{
15- io::{Read, Seek, SeekFrom, Write},
16- sync::Arc,
17-};
18-19-use crate::error::SPError;
20-21-// #[repr(C)]
22-// #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23-// pub enum SeekMode {
24-// Start = 0,
25-// End = 1,
26-// Current = 2,
27-// }
28-29-/// This allows for a callback stream over the Uniffi interface.
30-/// Implement these stream functions in the foreign language
31-/// and this will provide Rust Stream trait implementations
32-/// This is necessary since the Rust traits cannot be implemented directly
33-/// as uniffi callbacks
34-#[uniffi::export(with_foreign)]
35-pub trait Stream: Send + Sync {
36- /// Read a stream of bytes from the stream
37- fn read_stream(&self, length: u64) -> Result<Vec<u8>, SPError>;
38- /// Seek to a position in the stream
39- fn seek_stream(&self, pos: i64, mode: u64) -> Result<u64, SPError>;
40- /// Write a stream of bytes to the stream
41- fn write_stream(&self, data: Vec<u8>) -> Result<u64, SPError>;
42-}
43-44-impl Stream for Arc<dyn Stream> {
45- fn read_stream(&self, length: u64) -> Result<Vec<u8>, SPError> {
46- (**self).read_stream(length)
47- }
48-49- fn seek_stream(&self, pos: i64, mode: u64) -> Result<u64, SPError> {
50- (**self).seek_stream(pos, mode)
51- }
52-53- fn write_stream(&self, data: Vec<u8>) -> Result<u64, SPError> {
54- (**self).write_stream(data)
55- }
56-}
57-58-impl AsMut<dyn Stream> for dyn Stream {
59- fn as_mut(&mut self) -> &mut Self {
60- self
61- }
62-}
63-64-pub struct StreamAdapter<'a> {
65- pub stream: &'a dyn Stream,
66-}
67-68-impl<'a> StreamAdapter<'a> {
69- pub fn from_stream_mut(stream: &'a mut dyn Stream) -> Self {
70- Self { stream }
71- }
72-}
73-74-impl<'a> From<&'a dyn Stream> for StreamAdapter<'a> {
75- fn from(stream: &'a dyn Stream) -> Self {
76- Self { stream }
77- }
78-}
79-80-impl<'a> Read for StreamAdapter<'a> {
81- fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
82- let mut bytes = self
83- .stream
84- .read_stream(buf.len() as u64)
85- .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
86- let len = bytes.len();
87- buf.iter_mut().zip(bytes.drain(..)).for_each(|(dest, src)| {
88- *dest = src;
89- });
90- //println!("read: {:?}", len);
91- Ok(len)
92- }
93-}
94-95-impl<'a> Seek for StreamAdapter<'a> {
96- fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
97- let (pos, mode) = match pos {
98- SeekFrom::Current(pos) => (pos, 2),
99- SeekFrom::Start(pos) => (pos as i64, 0),
100- SeekFrom::End(pos) => (pos, 1),
101- };
102- //println!("Stream Seek {}", pos);
103- self.stream
104- .seek_stream(pos, mode)
105- .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
106- }
107-}
108-109-impl<'a> Write for StreamAdapter<'a> {
110- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
111- let len = self
112- .stream
113- .write_stream(buf.to_vec())
114- .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
115- Ok(len as usize)
116- }
117-118- fn flush(&mut self) -> std::io::Result<()> {
119- Ok(())
120- }
121-}
122-123-#[uniffi::export(with_foreign)]
124-pub trait ManyStreams: Send + Sync {
125- /// Get the next stream from the many streams
126- fn next(&self) -> Option<Arc<dyn Stream>>;
127-}
128-129-#[cfg(test)]
130-mod tests {
131- use super::*;
132- use crate::test_stream::TestStream;
133-134- #[test]
135- fn test_stream_read() {
136- let mut test = TestStream::from_memory(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
137- let mut stream = StreamAdapter::from_stream_mut(&mut test);
138- let mut buf = [0u8; 5];
139- let len = stream.read(&mut buf).unwrap();
140- assert_eq!(len, 5);
141- assert_eq!(buf, [0, 1, 2, 3, 4]);
142- }
143-144- #[test]
145- fn test_stream_seek() {
146- let mut test = TestStream::from_memory(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
147- let mut stream = StreamAdapter { stream: &mut test };
148- let pos = stream.seek(SeekFrom::Start(5)).unwrap();
149- assert_eq!(pos, 5);
150- let mut buf = [0u8; 5];
151- let len = stream.read(&mut buf).unwrap();
152- assert_eq!(len, 5);
153- assert_eq!(buf, [5, 6, 7, 8, 9]);
154- }
155-156- #[test]
157- fn test_stream_write() {
158- let mut test = TestStream::new();
159- let mut stream = StreamAdapter { stream: &mut test };
160- let len = stream.write(&[0, 1, 2, 3, 4]).unwrap();
161- assert_eq!(len, 5);
162- stream.seek(SeekFrom::Start(0)).unwrap();
163- let mut buf = [0u8; 5];
164- let len = stream.read(&mut buf).unwrap();
165- assert_eq!(len, 5);
166- assert_eq!(buf, [0, 1, 2, 3, 4]);
167- }
168-}