at main 150 lines 5.2 kB view raw
1#![cfg(feature = "iroh")] 2 3//! CollabNode - iroh endpoint with gossip router for real-time collaboration. 4 5use iroh::Endpoint; 6use iroh::EndpointId; 7use iroh::SecretKey; 8use iroh_gossip::net::{GOSSIP_ALPN, Gossip}; 9use jacquard::smol_str::{SmolStr, ToSmolStr}; 10use miette::Diagnostic; 11use std::sync::Arc; 12 13/// Error type for transport operations 14#[derive(Debug, thiserror::Error, Diagnostic)] 15#[diagnostic(code(weaver::transport))] 16pub enum TransportError { 17 #[error("failed to bind endpoint")] 18 Bind(#[source] Box<dyn std::error::Error + Send + Sync>), 19 20 #[error("gossip error")] 21 Gossip(#[source] Box<dyn std::error::Error + Send + Sync>), 22} 23 24/// A collaboration node wrapping an iroh endpoint and gossip router. 25/// 26/// There should be one CollabNode per application instance. It manages: 27/// - The iroh QUIC endpoint (with automatic relay fallback for browsers) 28/// - The gossip protocol handler 29/// - The protocol router for ALPN dispatch 30pub struct CollabNode { 31 endpoint: Endpoint, 32 gossip: Gossip, 33 #[allow(dead_code)] 34 router: iroh::protocol::Router, 35 secret_key: SecretKey, 36} 37 38impl CollabNode { 39 /// Spawn a new collaboration node. 40 /// 41 /// If no secret key is provided, a new one is generated. For browsers, 42 /// this means each session gets a fresh identity (published to PDS via 43 /// session records for peer discovery). 44 pub async fn spawn(secret_key: Option<SecretKey>) -> Result<Arc<Self>, TransportError> { 45 let secret_key = secret_key.unwrap_or_else(|| SecretKey::generate(&mut rand::rng())); 46 47 // Build endpoint with gossip ALPN 48 // In WASM, this automatically uses relay-only mode 49 // In native, this can do direct P2P with relay fallback 50 let endpoint = Endpoint::builder() 51 .secret_key(secret_key.clone()) 52 .alpns(vec![GOSSIP_ALPN.to_vec()]) 53 .bind() 54 .await 55 .map_err(|e| TransportError::Bind(Box::new(e)))?; 56 57 // Build gossip protocol handler 58 let gossip = Gossip::builder().spawn(endpoint.clone()); 59 60 // Build router to dispatch incoming connections by ALPN 61 let router = iroh::protocol::Router::builder(endpoint.clone()) 62 .accept(GOSSIP_ALPN, gossip.clone()) 63 .spawn(); 64 65 tracing::info!(node_id = %endpoint.id(), "CollabNode started"); 66 67 Ok(Arc::new(Self { 68 endpoint, 69 gossip, 70 router, 71 secret_key, 72 })) 73 } 74 75 /// Get this node's public identifier. 76 /// 77 /// This should be published to the user's PDS in a session record 78 /// so other collaborators can discover and connect to this node. 79 pub fn node_id(&self) -> EndpointId { 80 self.endpoint.id() 81 } 82 83 /// Get the node ID as a z-base32 string for storage in AT Protocol records. 84 pub fn node_id_string(&self) -> SmolStr { 85 self.endpoint.id().to_smolstr() 86 } 87 88 /// Get a reference to the gossip handler for joining topics. 89 pub fn gossip(&self) -> &Gossip { 90 &self.gossip 91 } 92 93 /// Get a reference to the underlying endpoint. 94 pub fn endpoint(&self) -> &Endpoint { 95 &self.endpoint 96 } 97 98 /// Get a clone of the secret key (for session persistence if needed). 99 pub fn secret_key(&self) -> SecretKey { 100 self.secret_key.clone() 101 } 102 103 /// Get the relay URL this node is connected to (if any). 104 /// 105 /// This should be published in session records so other peers can connect 106 /// via relay (essential for browser-to-browser connections). 107 pub fn relay_url(&self) -> Option<SmolStr> { 108 self.endpoint 109 .addr() 110 .relay_urls() 111 .next() 112 .map(|url| url.to_smolstr()) 113 } 114 115 /// Get the full node address including relay info. 116 /// 117 /// Use this when you need to connect to this node from another peer. 118 pub fn node_addr(&self) -> iroh::EndpointAddr { 119 self.endpoint.addr() 120 } 121 122 /// Wait for the endpoint to be online (relay connected). 123 /// 124 /// This should be called before publishing session records to ensure 125 /// the relay URL is available for peer discovery. For browser clients, 126 /// relay is required - we wait indefinitely since there's no fallback. 127 pub async fn wait_online(&self) { 128 self.endpoint.online().await; 129 } 130 131 /// Wait for relay connection and return the relay URL. 132 /// 133 /// Waits indefinitely for relay - browser clients require relay URLs 134 /// for peer discovery. Returns the relay URL once connected. 135 pub async fn wait_for_relay(&self) -> SmolStr { 136 self.endpoint.online().await; 137 // After online(), relay_url should always be Some for browser clients 138 self.relay_url() 139 .expect("relay URL should be available after online()") 140 } 141 142 /// Watch for address changes (including relay URL changes). 143 /// 144 /// Returns a stream that yields the address on each change. 145 /// Use this to detect relay URL changes and update session records. 146 pub fn watch_addr(&self) -> n0_future::boxed::BoxStream<iroh::EndpointAddr> { 147 use iroh::Watcher; 148 Box::pin(self.endpoint.watch_addr().stream()) 149 } 150}