atproto blogging
1#![cfg(feature = "iroh")]
2
3//! CollabSession - per-resource gossip session for real-time collaboration.
4
5use std::sync::Arc;
6
7use iroh::EndpointId;
8use iroh_gossip::api::{Event, GossipReceiver, GossipSender};
9use miette::Diagnostic;
10use n0_future::StreamExt;
11use n0_future::boxed::BoxStream;
12use n0_future::stream;
13
14use super::{CollabMessage, CollabNode, SignedMessage};
15
16/// Topic ID for a gossip session - derived from resource URI.
17pub type TopicId = iroh_gossip::TopicId;
18
19/// Error type for session operations
20#[derive(Debug, thiserror::Error, Diagnostic)]
21#[diagnostic(code(weaver::transport::session))]
22pub enum SessionError {
23 #[error("failed to subscribe to topic")]
24 Subscribe(#[source] Box<dyn std::error::Error + Send + Sync>),
25
26 #[error("failed to broadcast message")]
27 Broadcast(#[source] Box<dyn std::error::Error + Send + Sync>),
28
29 #[error("failed to decode message")]
30 Decode(#[source] Box<dyn std::error::Error + Send + Sync>),
31
32 #[error("session closed")]
33 Closed,
34}
35
36/// Events emitted by a collaboration session.
37#[derive(Debug, Clone)]
38pub enum SessionEvent {
39 /// A collaborator joined the session
40 PeerJoined(EndpointId),
41
42 /// A collaborator left the session
43 PeerLeft(EndpointId),
44
45 /// Received a collaboration message from a peer
46 Message {
47 from: EndpointId,
48 message: CollabMessage,
49 },
50
51 /// We successfully joined the gossip swarm
52 Joined,
53}
54
55/// A collaboration session for a specific resource.
56///
57/// Each session manages gossip subscriptions for one resource (e.g., one notebook).
58/// Create via `CollabSession::join()`.
59pub struct CollabSession {
60 topic: TopicId,
61 sender: GossipSender,
62 node: Arc<CollabNode>,
63}
64
65impl CollabSession {
66 /// Derive a topic ID from a resource identifier.
67 ///
68 /// We use blake3 hash of the AT-URI to get a stable 32-byte topic ID.
69 /// Format: `at://{did}/{collection}/{rkey}`
70 pub fn topic_from_uri(uri: &str) -> TopicId {
71 let hash = blake3::hash(uri.as_bytes());
72 TopicId::from_bytes(*hash.as_bytes())
73 }
74
75 /// Join a collaboration session for a resource.
76 ///
77 /// Returns the session handle and a stream for receiving events.
78 /// Bootstrap peers are NodeIds of collaborators discovered from session records.
79 pub async fn join(
80 node: Arc<CollabNode>,
81 topic: TopicId,
82 bootstrap_peers: Vec<EndpointId>,
83 ) -> Result<(Self, BoxStream<Result<SessionEvent, SessionError>>), SessionError> {
84 tracing::info!(
85 topic = ?topic,
86 bootstrap_count = bootstrap_peers.len(),
87 "CollabSession: joining topic"
88 );
89
90 for peer in &bootstrap_peers {
91 tracing::debug!(peer = %peer, "CollabSession: bootstrap peer");
92 }
93
94 // Subscribe to the gossip topic
95 // Use subscribe (non-blocking) if no bootstrap peers, otherwise subscribe_and_join
96 let (sender, receiver) = if bootstrap_peers.is_empty() {
97 node.gossip()
98 .subscribe(topic, vec![])
99 .await
100 .map_err(|e| SessionError::Subscribe(Box::new(e)))?
101 .split()
102 } else {
103 node.gossip()
104 .subscribe_and_join(topic, bootstrap_peers)
105 .await
106 .map_err(|e| SessionError::Subscribe(Box::new(e)))?
107 .split()
108 };
109
110 tracing::info!("CollabSession: subscribed to gossip topic");
111
112 let session = Self {
113 topic,
114 sender,
115 node: node.clone(),
116 };
117
118 // Create event stream from the gossip receiver
119 let event_stream = Self::event_stream(receiver);
120
121 Ok((session, event_stream))
122 }
123
124 /// Convert gossip receiver into a stream of session events.
125 fn event_stream(receiver: GossipReceiver) -> BoxStream<Result<SessionEvent, SessionError>> {
126 let stream = stream::try_unfold(receiver, |mut receiver| async move {
127 loop {
128 let Some(event) = receiver.try_next().await.map_err(|e| {
129 tracing::error!(?e, "CollabSession: gossip receiver error");
130 SessionError::Decode(Box::new(e))
131 })?
132 else {
133 tracing::debug!("CollabSession: gossip stream ended");
134 return Ok(None);
135 };
136
137 tracing::debug!(?event, "CollabSession: raw gossip event");
138 let session_event = match event {
139 Event::NeighborUp(peer) => {
140 tracing::info!(peer = %peer, "CollabSession: neighbor up");
141 SessionEvent::PeerJoined(peer)
142 }
143 Event::NeighborDown(peer) => {
144 tracing::info!(peer = %peer, "CollabSession: neighbor down");
145 SessionEvent::PeerLeft(peer)
146 }
147 Event::Received(msg) => {
148 tracing::debug!(
149 from = %msg.delivered_from,
150 bytes = msg.content.len(),
151 "CollabSession: received message"
152 );
153 match SignedMessage::decode_and_verify(&msg.content) {
154 Ok(received) => {
155 // Verify claimed sender matches transport sender
156 if received.from != msg.delivered_from {
157 tracing::warn!(
158 claimed = %received.from,
159 transport = %msg.delivered_from,
160 "sender mismatch - possible spoofing attempt"
161 );
162 continue;
163 }
164 SessionEvent::Message {
165 from: received.from,
166 message: received.message,
167 }
168 }
169 Err(e) => {
170 tracing::warn!(?e, "failed to verify/decode signed message");
171 continue;
172 }
173 }
174 }
175 Event::Lagged => {
176 tracing::warn!("gossip receiver lagged, some messages may be lost");
177 continue;
178 }
179 };
180 break Ok(Some((session_event, receiver)));
181 }
182 });
183
184 Box::pin(stream)
185 }
186
187 /// Broadcast a signed message to all peers in the session.
188 pub async fn broadcast(&self, message: &CollabMessage) -> Result<(), SessionError> {
189 let bytes = SignedMessage::sign_and_encode(&self.node.secret_key(), message)
190 .map_err(|e| SessionError::Broadcast(Box::new(e)))?;
191
192 tracing::debug!(
193 bytes = bytes.len(),
194 topic = ?self.topic,
195 "CollabSession: broadcasting signed message"
196 );
197
198 self.sender
199 .broadcast(bytes.into())
200 .await
201 .map_err(|e| SessionError::Broadcast(Box::new(e)))?;
202
203 Ok(())
204 }
205
206 /// Get the topic ID for this session.
207 pub fn topic(&self) -> TopicId {
208 self.topic
209 }
210
211 /// Add new peers to the gossip session.
212 ///
213 /// Use this to add peers discovered after initial subscription.
214 /// The gossip layer will attempt to connect to these peers.
215 pub async fn join_peers(&self, peers: Vec<EndpointId>) -> Result<(), SessionError> {
216 if peers.is_empty() {
217 return Ok(());
218 }
219 tracing::info!(
220 count = peers.len(),
221 "CollabSession: joining additional peers"
222 );
223 for peer in &peers {
224 tracing::debug!(peer = %peer, "CollabSession: adding peer");
225 }
226 self.sender
227 .join_peers(peers)
228 .await
229 .map_err(|e| SessionError::Subscribe(Box::new(e)))?;
230 Ok(())
231 }
232}