atproto blogging
1//! Collab coordinator - bridges EditorWorker and authenticated PDS ops.
2//!
3//! This component handles the main-thread side of real-time collaboration:
4//! - Spawns the editor worker and manages its lifecycle
5//! - Performs authenticated PDS operations (session records, peer discovery)
6//! - Forwards local Loro updates to the worker for gossip broadcast
7//! - Receives remote updates from worker and applies to main document
8//! - Provides CollabDebugState context for debug UI
9//!
10//! The worker handles all iroh/gossip networking off the main thread.
11
12// Only compile for WASM - no-op stub provided at end
13
14use super::document::SignalEditorDocument;
15
16use dioxus::prelude::*;
17
18#[cfg(target_arch = "wasm32")]
19use jacquard::smol_str::ToSmolStr;
20#[cfg(target_arch = "wasm32")]
21use jacquard::smol_str::{SmolStr, format_smolstr};
22#[cfg(target_arch = "wasm32")]
23use jacquard::types::string::AtUri;
24use weaver_common::transport::PresenceSnapshot;
25
26#[cfg(target_arch = "wasm32")]
27use weaver_editor_crdt::{
28 CoordinatorState, PEER_DISCOVERY_INTERVAL_MS, SESSION_REFRESH_INTERVAL_MS, SESSION_TTL_MINUTES,
29 compute_collab_topic,
30};
31
32/// Props for the CollabCoordinator component.
33#[derive(Props, Clone, PartialEq)]
34pub struct CollabCoordinatorProps {
35 /// The editor document to sync
36 pub document: SignalEditorDocument,
37 /// Resource URI for the document being edited
38 pub resource_uri: String,
39 /// Presence state signal (updated by coordinator)
40 pub presence: Signal<PresenceSnapshot>,
41 /// Children to render (this component wraps the editor)
42 pub children: Element,
43}
44
45/// Coordinator component that bridges worker and PDS.
46///
47/// This is a wrapper component that:
48/// 1. Provides CollabDebugState context
49/// 2. Manages collab lifecycle (worker, PDS records, peer discovery)
50/// 3. Renders children
51///
52/// Lifecycle:
53/// 1. Worker spawned on mount, sends CollabReady with node_id
54/// 2. Coordinator creates session record on PDS
55/// 3. Coordinator discovers existing peers
56/// 4. Worker joins gossip session
57/// 5. Local updates forwarded to worker via subscribe_local_update
58/// 6. Remote updates from worker applied to main document
59/// 7. Session record deleted on unmount
60#[component]
61pub fn CollabCoordinator(props: CollabCoordinatorProps) -> Element {
62 #[cfg(target_arch = "wasm32")]
63 {
64 use crate::collab_context::CollabDebugState;
65 use crate::fetch::Fetcher;
66 use futures_util::stream::SplitSink;
67 use futures_util::{SinkExt, StreamExt};
68 use gloo_worker::Spawnable;
69 use gloo_worker::reactor::ReactorBridge;
70 use jacquard::IntoStatic;
71 use weaver_common::WeaverExt;
72 use weaver_editor_crdt::{EditorReactor, WorkerInput, WorkerOutput};
73
74 let fetcher = use_context::<Fetcher>();
75
76 // Provide debug state context
77 let mut debug_state = use_signal(CollabDebugState::default);
78 use_context_provider(|| debug_state);
79
80 // Coordinator state
81 let mut state: Signal<CoordinatorState> = use_signal(|| CoordinatorState::Initializing);
82
83 // Worker sink for sending messages - Signal persists across renders
84 type WorkerSink = SplitSink<ReactorBridge<EditorReactor>, WorkerInput>;
85 let mut worker_sink: Signal<Option<WorkerSink>> = use_signal(|| None);
86
87 // Session record URI for cleanup
88 let mut session_uri: Signal<Option<AtUri<'static>>> = use_signal(|| None);
89
90 // Loro subscription handle (keep alive)
91 let mut loro_sub: Signal<Option<loro::Subscription>> = use_signal(|| None);
92
93 // Clone for closures
94 let resource_uri = props.resource_uri.clone();
95 let mut doc = props.document.clone();
96 let mut presence = props.presence;
97
98 // Spawn worker and set up message handling
99 let fetcher_for_spawn = fetcher.clone();
100 let resource_uri_for_spawn = resource_uri.clone();
101 use_effect(move || {
102 let mut worker_sink = worker_sink;
103 let fetcher = fetcher_for_spawn.clone();
104 let resource_uri = resource_uri_for_spawn.clone();
105 // Channel for local updates (Loro callback is Send+Sync, but ReactorBridge isn't)
106 let (local_update_tx, mut local_update_rx) =
107 tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
108
109 let tx = local_update_tx.clone();
110
111 // Subscribe to local Loro updates - just send to channel (Send+Sync)
112 let sub = doc
113 .loro_doc()
114 .subscribe_local_update(Box::new(move |update| {
115 let _ = tx.send(update.to_vec());
116 true // Keep subscription active
117 }));
118
119 loro_sub.set(Some(sub));
120
121 // Spawn the reactor
122 let bridge = EditorReactor::spawner().spawn("/editor_worker.js");
123 let (sink, mut stream) = bridge.split();
124 worker_sink.set(Some(sink));
125
126 // Initialize worker with current document snapshot
127 let snapshot = doc.export_snapshot();
128 let draft_key: SmolStr = resource_uri.clone().into(); // Use resource URI as the key
129 spawn(async move {
130 if let Some(ref mut sink) = *worker_sink.write() {
131 if let Err(e) = sink
132 .send(WorkerInput::Init {
133 snapshot,
134 draft_key,
135 })
136 .await
137 {
138 tracing::error!("Failed to send Init to worker: {e}");
139 }
140 }
141 });
142
143 // Task 1: Forward local updates from channel to worker
144 spawn(async move {
145 while let Some(data) = local_update_rx.recv().await {
146 if let Some(ref mut s) = *worker_sink.write() {
147 if let Err(e) = s.send(WorkerInput::BroadcastUpdate { data }).await {
148 tracing::warn!("Failed to send BroadcastUpdate to worker: {e}");
149 }
150 }
151 }
152 });
153
154 // Task 2: Handle worker output messages
155 let doc_for_handler = doc.clone();
156 spawn(async move {
157 let mut doc = doc_for_handler;
158 while let Some(output) = stream.next().await {
159 match output {
160 WorkerOutput::Ready => {
161 tracing::info!("CollabCoordinator: worker ready, starting collab");
162
163 // Compute topic from resource URI
164 let topic = compute_collab_topic(&resource_uri);
165
166 // Send StartCollab to worker immediately (no blocking on profile fetch)
167 if let Some(ref mut s) = *worker_sink.write() {
168 if let Err(e) = s
169 .send(WorkerInput::StartCollab {
170 topic,
171 bootstrap_peers: vec![],
172 })
173 .await
174 {
175 tracing::error!("Failed to send StartCollab to worker: {e}");
176 }
177 }
178 }
179
180 WorkerOutput::CollabReady { node_id, relay_url } => {
181 tracing::info!(
182 node_id = %node_id,
183 relay_url = ?relay_url,
184 "CollabCoordinator: collab node ready"
185 );
186
187 // Update debug state
188 debug_state.with_mut(|ds| {
189 ds.node_id = Some(node_id.clone());
190 ds.relay_url = relay_url.clone();
191 });
192
193 state.set(CoordinatorState::CreatingSession {
194 node_id: node_id.clone(),
195 relay_url: relay_url.clone(),
196 });
197
198 // Create session record on PDS
199 let fetcher = fetcher.clone();
200 let resource_uri = resource_uri.clone();
201
202 spawn(async move {
203 // Parse resource URI to get StrongRef
204 let uri = match AtUri::new(&resource_uri) {
205 Ok(u) => u.into_static(),
206 Err(e) => {
207 let err = format_smolstr!("Invalid resource URI: {e}");
208 debug_state
209 .with_mut(|ds| ds.last_error = Some(err.clone()));
210 state.set(CoordinatorState::Error(err));
211 return;
212 }
213 };
214
215 // Get StrongRef for the resource
216 let strong_ref = match fetcher.confirm_record_ref(&uri).await {
217 Ok(r) => r,
218 Err(e) => {
219 let err =
220 format_smolstr!("Failed to get resource ref: {e}");
221 debug_state
222 .with_mut(|ds| ds.last_error = Some(err.clone()));
223 state.set(CoordinatorState::Error(err));
224 return;
225 }
226 };
227
228 // Create session record
229 match fetcher
230 .create_collab_session(
231 &strong_ref,
232 &node_id,
233 relay_url.as_deref(),
234 Some(SESSION_TTL_MINUTES),
235 )
236 .await
237 {
238 Ok(session_record_uri) => {
239 tracing::info!(
240 uri = %session_record_uri,
241 "CollabCoordinator: session record created"
242 );
243 session_uri.set(Some(session_record_uri.clone()));
244 debug_state.with_mut(|ds| {
245 ds.session_record_uri =
246 Some(session_record_uri.to_string());
247 });
248
249 // Discover existing peers
250 #[cfg(feature = "use-index")]
251 let bootstrap_peers = match fetcher
252 .get_resource_sessions(&uri)
253 .await
254 {
255 Ok(output) => {
256 tracing::info!(
257 count = output.sessions.len(),
258 "CollabCoordinator: found peers via index"
259 );
260 debug_state.with_mut(|ds| {
261 ds.discovered_peers = output.sessions.len();
262 });
263 output
264 .sessions
265 .into_iter()
266 .map(|s| s.node_id.as_ref().into())
267 .collect::<Vec<SmolStr>>()
268 }
269 Err(e) => {
270 tracing::warn!(
271 "CollabCoordinator: peer discovery failed: {e}"
272 );
273 vec![]
274 }
275 };
276
277 #[cfg(not(feature = "use-index"))]
278 let bootstrap_peers = match fetcher
279 .find_session_peers(&uri)
280 .await
281 {
282 Ok(peers) => {
283 tracing::info!(
284 count = peers.len(),
285 "CollabCoordinator: found peers"
286 );
287 debug_state.with_mut(|ds| {
288 ds.discovered_peers = peers.len();
289 });
290 peers
291 .into_iter()
292 .map(|p| p.node_id)
293 .collect::<Vec<_>>()
294 }
295 Err(e) => {
296 tracing::warn!(
297 "CollabCoordinator: peer discovery failed: {e}"
298 );
299 vec![]
300 }
301 };
302
303 // Send discovered peers to worker
304 if !bootstrap_peers.is_empty() {
305 tracing::info!(
306 count = bootstrap_peers.len(),
307 peers = ?bootstrap_peers,
308 "CollabCoordinator: sending AddPeers to worker"
309 );
310 if let Some(ref mut s) = *worker_sink.write() {
311 if let Err(e) = s
312 .send(WorkerInput::AddPeers {
313 peers: bootstrap_peers,
314 })
315 .await
316 {
317 tracing::error!(
318 "CollabCoordinator: AddPeers send failed: {e}"
319 );
320 }
321 } else {
322 tracing::error!("CollabCoordinator: sink is None!");
323 }
324 } else {
325 tracing::info!("CollabCoordinator: no peers to add");
326 }
327
328 state.set(CoordinatorState::Active {
329 session_uri: session_record_uri.to_smolstr(),
330 });
331 }
332 Err(e) => {
333 let err = format_smolstr!("Failed to create session: {e}");
334 debug_state
335 .with_mut(|ds| ds.last_error = Some(err.clone()));
336 state.set(CoordinatorState::Error(err));
337 }
338 }
339 });
340 }
341
342 WorkerOutput::CollabJoined => {
343 tracing::info!("CollabCoordinator: joined gossip session");
344 debug_state.with_mut(|ds| ds.is_joined = true);
345 }
346
347 WorkerOutput::RemoteUpdates { data } => {
348 if let Err(e) = doc.import_updates(&data) {
349 tracing::warn!(
350 "CollabCoordinator: failed to import updates: {:?}",
351 e
352 );
353 }
354 }
355
356 WorkerOutput::PresenceUpdate(snapshot) => {
357 debug_state.with_mut(|ds| {
358 ds.connected_peers = snapshot.peer_count;
359 });
360 presence.set(snapshot);
361 }
362
363 WorkerOutput::CollabStopped => {
364 tracing::info!("CollabCoordinator: collab stopped");
365 debug_state.with_mut(|ds| {
366 ds.is_joined = false;
367 ds.connected_peers = 0;
368 });
369 }
370
371 WorkerOutput::PeerConnected => {
372 tracing::info!("CollabCoordinator: peer connected, sending our Join");
373 use weaver_api::sh_weaver::actor::ProfileDataViewInner;
374
375 let fetcher = fetcher.clone();
376
377 // Get our profile info and send BroadcastJoin
378 let (our_did, our_display_name): (SmolStr, SmolStr) = match fetcher
379 .current_did()
380 .await
381 {
382 Some(did) => {
383 let display_name: SmolStr =
384 match fetcher.fetch_profile(&did.clone().into()).await {
385 Ok(profile) => match &profile.inner {
386 ProfileDataViewInner::ProfileView(p) => p
387 .display_name
388 .as_ref()
389 .map(|s| s.as_ref().into())
390 .unwrap_or_else(|| did.as_ref().into()),
391 ProfileDataViewInner::ProfileViewDetailed(p) => p
392 .display_name
393 .as_ref()
394 .map(|s| s.as_ref().into())
395 .unwrap_or_else(|| did.as_ref().into()),
396 ProfileDataViewInner::TangledProfileView(p) => {
397 p.handle.as_ref().into()
398 }
399 _ => did.as_ref().into(),
400 },
401 Err(_) => did.as_ref().into(),
402 };
403 (did.as_ref().into(), display_name)
404 }
405 None => {
406 tracing::warn!(
407 "CollabCoordinator: no current DID for Join message"
408 );
409 ("unknown".into(), "Anonymous".into())
410 }
411 };
412
413 if let Some(ref mut s) = *worker_sink.write() {
414 if let Err(e) = s
415 .send(WorkerInput::BroadcastJoin {
416 did: our_did,
417 display_name: our_display_name,
418 })
419 .await
420 {
421 tracing::error!(
422 "CollabCoordinator: BroadcastJoin send failed: {e}"
423 );
424 }
425 }
426 }
427
428 WorkerOutput::Error { message } => {
429 tracing::error!("CollabCoordinator: worker error: {message}");
430 debug_state.with_mut(|ds| ds.last_error = Some(message.clone()));
431 state.set(CoordinatorState::Error(message));
432 }
433
434 WorkerOutput::Snapshot { .. } => {}
435 }
436 }
437 tracing::info!("CollabCoordinator: worker stream ended");
438 });
439
440 tracing::info!("CollabCoordinator: spawned worker");
441 });
442
443 // Forward cursor updates to worker - memo re-runs when cursor/selection signals change
444 let cursor_signal = props.document.cursor;
445 let selection_signal = props.document.selection;
446
447 let _cursor_broadcaster = use_memo(move || {
448 let cursor = cursor_signal.read();
449 let selection = *selection_signal.read();
450 let position = cursor.offset;
451 let sel = selection.map(|s| (s.anchor, s.head));
452
453 tracing::debug!(
454 position,
455 ?sel,
456 "CollabCoordinator: cursor changed, broadcasting"
457 );
458
459 spawn(async move {
460 if let Some(ref mut s) = *worker_sink.write() {
461 tracing::debug!(
462 position,
463 "CollabCoordinator: sending BroadcastCursor to worker"
464 );
465 if let Err(e) = s
466 .send(WorkerInput::BroadcastCursor {
467 position,
468 selection: sel,
469 })
470 .await
471 {
472 tracing::warn!("Failed to send BroadcastCursor to worker: {e}");
473 }
474 } else {
475 tracing::debug!(
476 position,
477 "CollabCoordinator: worker sink not ready, skipping cursor broadcast"
478 );
479 }
480 });
481 });
482
483 // Periodic peer discovery
484 let fetcher_for_discovery = fetcher.clone();
485 let resource_uri_for_discovery = resource_uri.clone();
486 dioxus_sdk::time::use_interval(
487 std::time::Duration::from_millis(PEER_DISCOVERY_INTERVAL_MS as u64),
488 move |_| {
489 let fetcher = fetcher_for_discovery.clone();
490 let resource_uri = resource_uri_for_discovery.clone();
491
492 spawn(async move {
493 let uri = match AtUri::new(&resource_uri) {
494 Ok(u) => u,
495 Err(_) => return,
496 };
497
498 #[cfg(feature = "use-index")]
499 match fetcher.get_resource_sessions(&uri).await {
500 Ok(output) => {
501 debug_state.with_mut(|ds| ds.discovered_peers = output.sessions.len());
502 if !output.sessions.is_empty() {
503 let peer_ids: Vec<SmolStr> = output
504 .sessions
505 .into_iter()
506 .map(|s| s.node_id.as_ref().into())
507 .collect();
508
509 if let Some(ref mut s) = *worker_sink.write() {
510 if let Err(e) =
511 s.send(WorkerInput::AddPeers { peers: peer_ids }).await
512 {
513 tracing::warn!("Periodic AddPeers send failed: {e}");
514 }
515 }
516 }
517 }
518 Err(e) => {
519 tracing::debug!("Peer discovery failed: {e}");
520 }
521 }
522
523 #[cfg(not(feature = "use-index"))]
524 match fetcher.find_session_peers(&uri).await {
525 Ok(peers) => {
526 debug_state.with_mut(|ds| ds.discovered_peers = peers.len());
527 if !peers.is_empty() {
528 let peer_ids: Vec<SmolStr> =
529 peers.into_iter().map(|p| p.node_id).collect();
530
531 if let Some(ref mut s) = *worker_sink.write() {
532 if let Err(e) =
533 s.send(WorkerInput::AddPeers { peers: peer_ids }).await
534 {
535 tracing::warn!("Periodic AddPeers send failed: {e}");
536 }
537 }
538 }
539 }
540 Err(e) => {
541 tracing::debug!("Peer discovery failed: {e}");
542 }
543 }
544 });
545 },
546 );
547
548 // Periodic session refresh
549 let fetcher_for_refresh = fetcher.clone();
550 dioxus_sdk::time::use_interval(
551 std::time::Duration::from_millis(SESSION_REFRESH_INTERVAL_MS as u64),
552 move |_| {
553 let fetcher = fetcher_for_refresh.clone();
554
555 if let Some(ref uri) = *session_uri.peek() {
556 let uri = uri.clone();
557 spawn(async move {
558 match fetcher
559 .refresh_collab_session(&uri, SESSION_TTL_MINUTES)
560 .await
561 {
562 Ok(_) => {
563 tracing::debug!("Session refreshed");
564 }
565 Err(e) => {
566 tracing::warn!("Session refresh failed: {e}");
567 }
568 }
569 });
570 }
571 },
572 );
573
574 // Cleanup on unmount
575 let fetcher_for_cleanup = fetcher.clone();
576 use_drop(move || {
577 // Stop collab in worker
578 spawn(async move {
579 if let Some(ref mut s) = *worker_sink.write() {
580 if let Err(e) = s.send(WorkerInput::StopCollab).await {
581 tracing::warn!("Failed to send StopCollab to worker: {e}");
582 }
583 }
584 });
585
586 // Delete session record
587 if let Some(uri) = session_uri.peek().clone() {
588 let fetcher = fetcher_for_cleanup.clone();
589 spawn(async move {
590 if let Err(e) = fetcher.delete_collab_session(&uri).await {
591 tracing::warn!("Failed to delete session record: {e}");
592 }
593 });
594 }
595 });
596 }
597 // Render children - this component is a wrapper that provides context
598 rsx! { {props.children} }
599}