atproto blogging
1#![cfg(feature = "iroh")]
2
3//! CollabNode - iroh endpoint with gossip router for real-time collaboration.
4
5use iroh::Endpoint;
6use iroh::EndpointId;
7use iroh::SecretKey;
8use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
9use jacquard::smol_str::{SmolStr, ToSmolStr};
10use miette::Diagnostic;
11use std::sync::Arc;
12
13/// Error type for transport operations
14#[derive(Debug, thiserror::Error, Diagnostic)]
15#[diagnostic(code(weaver::transport))]
16pub enum TransportError {
17 #[error("failed to bind endpoint")]
18 Bind(#[source] Box<dyn std::error::Error + Send + Sync>),
19
20 #[error("gossip error")]
21 Gossip(#[source] Box<dyn std::error::Error + Send + Sync>),
22}
23
24/// A collaboration node wrapping an iroh endpoint and gossip router.
25///
26/// There should be one CollabNode per application instance. It manages:
27/// - The iroh QUIC endpoint (with automatic relay fallback for browsers)
28/// - The gossip protocol handler
29/// - The protocol router for ALPN dispatch
30pub struct CollabNode {
31 endpoint: Endpoint,
32 gossip: Gossip,
33 #[allow(dead_code)]
34 router: iroh::protocol::Router,
35 secret_key: SecretKey,
36}
37
38impl CollabNode {
39 /// Spawn a new collaboration node.
40 ///
41 /// If no secret key is provided, a new one is generated. For browsers,
42 /// this means each session gets a fresh identity (published to PDS via
43 /// session records for peer discovery).
44 pub async fn spawn(secret_key: Option<SecretKey>) -> Result<Arc<Self>, TransportError> {
45 let secret_key = secret_key.unwrap_or_else(|| SecretKey::generate(&mut rand::rng()));
46
47 // Build endpoint with gossip ALPN
48 // In WASM, this automatically uses relay-only mode
49 // In native, this can do direct P2P with relay fallback
50 let endpoint = Endpoint::builder()
51 .secret_key(secret_key.clone())
52 .alpns(vec![GOSSIP_ALPN.to_vec()])
53 .bind()
54 .await
55 .map_err(|e| TransportError::Bind(Box::new(e)))?;
56
57 // Build gossip protocol handler
58 let gossip = Gossip::builder().spawn(endpoint.clone());
59
60 // Build router to dispatch incoming connections by ALPN
61 let router = iroh::protocol::Router::builder(endpoint.clone())
62 .accept(GOSSIP_ALPN, gossip.clone())
63 .spawn();
64
65 tracing::info!(node_id = %endpoint.id(), "CollabNode started");
66
67 Ok(Arc::new(Self {
68 endpoint,
69 gossip,
70 router,
71 secret_key,
72 }))
73 }
74
75 /// Get this node's public identifier.
76 ///
77 /// This should be published to the user's PDS in a session record
78 /// so other collaborators can discover and connect to this node.
79 pub fn node_id(&self) -> EndpointId {
80 self.endpoint.id()
81 }
82
83 /// Get the node ID as a z-base32 string for storage in AT Protocol records.
84 pub fn node_id_string(&self) -> SmolStr {
85 self.endpoint.id().to_smolstr()
86 }
87
88 /// Get a reference to the gossip handler for joining topics.
89 pub fn gossip(&self) -> &Gossip {
90 &self.gossip
91 }
92
93 /// Get a reference to the underlying endpoint.
94 pub fn endpoint(&self) -> &Endpoint {
95 &self.endpoint
96 }
97
98 /// Get a clone of the secret key (for session persistence if needed).
99 pub fn secret_key(&self) -> SecretKey {
100 self.secret_key.clone()
101 }
102
103 /// Get the relay URL this node is connected to (if any).
104 ///
105 /// This should be published in session records so other peers can connect
106 /// via relay (essential for browser-to-browser connections).
107 pub fn relay_url(&self) -> Option<SmolStr> {
108 self.endpoint
109 .addr()
110 .relay_urls()
111 .next()
112 .map(|url| url.to_smolstr())
113 }
114
115 /// Get the full node address including relay info.
116 ///
117 /// Use this when you need to connect to this node from another peer.
118 pub fn node_addr(&self) -> iroh::EndpointAddr {
119 self.endpoint.addr()
120 }
121
122 /// Wait for the endpoint to be online (relay connected).
123 ///
124 /// This should be called before publishing session records to ensure
125 /// the relay URL is available for peer discovery. For browser clients,
126 /// relay is required - we wait indefinitely since there's no fallback.
127 pub async fn wait_online(&self) {
128 self.endpoint.online().await;
129 }
130
131 /// Wait for relay connection and return the relay URL.
132 ///
133 /// Waits indefinitely for relay - browser clients require relay URLs
134 /// for peer discovery. Returns the relay URL once connected.
135 pub async fn wait_for_relay(&self) -> SmolStr {
136 self.endpoint.online().await;
137 // After online(), relay_url should always be Some for browser clients
138 self.relay_url()
139 .expect("relay URL should be available after online()")
140 }
141
142 /// Watch for address changes (including relay URL changes).
143 ///
144 /// Returns a stream that yields the address on each change.
145 /// Use this to detect relay URL changes and update session records.
146 pub fn watch_addr(&self) -> n0_future::boxed::BoxStream<iroh::EndpointAddr> {
147 use iroh::Watcher;
148 Box::pin(self.endpoint.watch_addr().stream())
149 }
150}