at main 232 lines 8.2 kB view raw
1#![cfg(feature = "iroh")] 2 3//! CollabSession - per-resource gossip session for real-time collaboration. 4 5use std::sync::Arc; 6 7use iroh::EndpointId; 8use iroh_gossip::api::{Event, GossipReceiver, GossipSender}; 9use miette::Diagnostic; 10use n0_future::StreamExt; 11use n0_future::boxed::BoxStream; 12use n0_future::stream; 13 14use super::{CollabMessage, CollabNode, SignedMessage}; 15 16/// Topic ID for a gossip session - derived from resource URI. 17pub type TopicId = iroh_gossip::TopicId; 18 19/// Error type for session operations 20#[derive(Debug, thiserror::Error, Diagnostic)] 21#[diagnostic(code(weaver::transport::session))] 22pub enum SessionError { 23 #[error("failed to subscribe to topic")] 24 Subscribe(#[source] Box<dyn std::error::Error + Send + Sync>), 25 26 #[error("failed to broadcast message")] 27 Broadcast(#[source] Box<dyn std::error::Error + Send + Sync>), 28 29 #[error("failed to decode message")] 30 Decode(#[source] Box<dyn std::error::Error + Send + Sync>), 31 32 #[error("session closed")] 33 Closed, 34} 35 36/// Events emitted by a collaboration session. 37#[derive(Debug, Clone)] 38pub enum SessionEvent { 39 /// A collaborator joined the session 40 PeerJoined(EndpointId), 41 42 /// A collaborator left the session 43 PeerLeft(EndpointId), 44 45 /// Received a collaboration message from a peer 46 Message { 47 from: EndpointId, 48 message: CollabMessage, 49 }, 50 51 /// We successfully joined the gossip swarm 52 Joined, 53} 54 55/// A collaboration session for a specific resource. 56/// 57/// Each session manages gossip subscriptions for one resource (e.g., one notebook). 58/// Create via `CollabSession::join()`. 59pub struct CollabSession { 60 topic: TopicId, 61 sender: GossipSender, 62 node: Arc<CollabNode>, 63} 64 65impl CollabSession { 66 /// Derive a topic ID from a resource identifier. 67 /// 68 /// We use blake3 hash of the AT-URI to get a stable 32-byte topic ID. 69 /// Format: `at://{did}/{collection}/{rkey}` 70 pub fn topic_from_uri(uri: &str) -> TopicId { 71 let hash = blake3::hash(uri.as_bytes()); 72 TopicId::from_bytes(*hash.as_bytes()) 73 } 74 75 /// Join a collaboration session for a resource. 76 /// 77 /// Returns the session handle and a stream for receiving events. 78 /// Bootstrap peers are NodeIds of collaborators discovered from session records. 79 pub async fn join( 80 node: Arc<CollabNode>, 81 topic: TopicId, 82 bootstrap_peers: Vec<EndpointId>, 83 ) -> Result<(Self, BoxStream<Result<SessionEvent, SessionError>>), SessionError> { 84 tracing::info!( 85 topic = ?topic, 86 bootstrap_count = bootstrap_peers.len(), 87 "CollabSession: joining topic" 88 ); 89 90 for peer in &bootstrap_peers { 91 tracing::debug!(peer = %peer, "CollabSession: bootstrap peer"); 92 } 93 94 // Subscribe to the gossip topic 95 // Use subscribe (non-blocking) if no bootstrap peers, otherwise subscribe_and_join 96 let (sender, receiver) = if bootstrap_peers.is_empty() { 97 node.gossip() 98 .subscribe(topic, vec![]) 99 .await 100 .map_err(|e| SessionError::Subscribe(Box::new(e)))? 101 .split() 102 } else { 103 node.gossip() 104 .subscribe_and_join(topic, bootstrap_peers) 105 .await 106 .map_err(|e| SessionError::Subscribe(Box::new(e)))? 107 .split() 108 }; 109 110 tracing::info!("CollabSession: subscribed to gossip topic"); 111 112 let session = Self { 113 topic, 114 sender, 115 node: node.clone(), 116 }; 117 118 // Create event stream from the gossip receiver 119 let event_stream = Self::event_stream(receiver); 120 121 Ok((session, event_stream)) 122 } 123 124 /// Convert gossip receiver into a stream of session events. 125 fn event_stream(receiver: GossipReceiver) -> BoxStream<Result<SessionEvent, SessionError>> { 126 let stream = stream::try_unfold(receiver, |mut receiver| async move { 127 loop { 128 let Some(event) = receiver.try_next().await.map_err(|e| { 129 tracing::error!(?e, "CollabSession: gossip receiver error"); 130 SessionError::Decode(Box::new(e)) 131 })? 132 else { 133 tracing::debug!("CollabSession: gossip stream ended"); 134 return Ok(None); 135 }; 136 137 tracing::debug!(?event, "CollabSession: raw gossip event"); 138 let session_event = match event { 139 Event::NeighborUp(peer) => { 140 tracing::info!(peer = %peer, "CollabSession: neighbor up"); 141 SessionEvent::PeerJoined(peer) 142 } 143 Event::NeighborDown(peer) => { 144 tracing::info!(peer = %peer, "CollabSession: neighbor down"); 145 SessionEvent::PeerLeft(peer) 146 } 147 Event::Received(msg) => { 148 tracing::debug!( 149 from = %msg.delivered_from, 150 bytes = msg.content.len(), 151 "CollabSession: received message" 152 ); 153 match SignedMessage::decode_and_verify(&msg.content) { 154 Ok(received) => { 155 // Verify claimed sender matches transport sender 156 if received.from != msg.delivered_from { 157 tracing::warn!( 158 claimed = %received.from, 159 transport = %msg.delivered_from, 160 "sender mismatch - possible spoofing attempt" 161 ); 162 continue; 163 } 164 SessionEvent::Message { 165 from: received.from, 166 message: received.message, 167 } 168 } 169 Err(e) => { 170 tracing::warn!(?e, "failed to verify/decode signed message"); 171 continue; 172 } 173 } 174 } 175 Event::Lagged => { 176 tracing::warn!("gossip receiver lagged, some messages may be lost"); 177 continue; 178 } 179 }; 180 break Ok(Some((session_event, receiver))); 181 } 182 }); 183 184 Box::pin(stream) 185 } 186 187 /// Broadcast a signed message to all peers in the session. 188 pub async fn broadcast(&self, message: &CollabMessage) -> Result<(), SessionError> { 189 let bytes = SignedMessage::sign_and_encode(&self.node.secret_key(), message) 190 .map_err(|e| SessionError::Broadcast(Box::new(e)))?; 191 192 tracing::debug!( 193 bytes = bytes.len(), 194 topic = ?self.topic, 195 "CollabSession: broadcasting signed message" 196 ); 197 198 self.sender 199 .broadcast(bytes.into()) 200 .await 201 .map_err(|e| SessionError::Broadcast(Box::new(e)))?; 202 203 Ok(()) 204 } 205 206 /// Get the topic ID for this session. 207 pub fn topic(&self) -> TopicId { 208 self.topic 209 } 210 211 /// Add new peers to the gossip session. 212 /// 213 /// Use this to add peers discovered after initial subscription. 214 /// The gossip layer will attempt to connect to these peers. 215 pub async fn join_peers(&self, peers: Vec<EndpointId>) -> Result<(), SessionError> { 216 if peers.is_empty() { 217 return Ok(()); 218 } 219 tracing::info!( 220 count = peers.len(), 221 "CollabSession: joining additional peers" 222 ); 223 for peer in &peers { 224 tracing::debug!(peer = %peer, "CollabSession: adding peer"); 225 } 226 self.sender 227 .join_peers(peers) 228 .await 229 .map_err(|e| SessionError::Subscribe(Box::new(e)))?; 230 Ok(()) 231 } 232}