at main 599 lines 29 kB view raw
1//! Collab coordinator - bridges EditorWorker and authenticated PDS ops. 2//! 3//! This component handles the main-thread side of real-time collaboration: 4//! - Spawns the editor worker and manages its lifecycle 5//! - Performs authenticated PDS operations (session records, peer discovery) 6//! - Forwards local Loro updates to the worker for gossip broadcast 7//! - Receives remote updates from worker and applies to main document 8//! - Provides CollabDebugState context for debug UI 9//! 10//! The worker handles all iroh/gossip networking off the main thread. 11 12// Only compile for WASM - no-op stub provided at end 13 14use super::document::SignalEditorDocument; 15 16use dioxus::prelude::*; 17 18#[cfg(target_arch = "wasm32")] 19use jacquard::smol_str::ToSmolStr; 20#[cfg(target_arch = "wasm32")] 21use jacquard::smol_str::{SmolStr, format_smolstr}; 22#[cfg(target_arch = "wasm32")] 23use jacquard::types::string::AtUri; 24use weaver_common::transport::PresenceSnapshot; 25 26#[cfg(target_arch = "wasm32")] 27use weaver_editor_crdt::{ 28 CoordinatorState, PEER_DISCOVERY_INTERVAL_MS, SESSION_REFRESH_INTERVAL_MS, SESSION_TTL_MINUTES, 29 compute_collab_topic, 30}; 31 32/// Props for the CollabCoordinator component. 33#[derive(Props, Clone, PartialEq)] 34pub struct CollabCoordinatorProps { 35 /// The editor document to sync 36 pub document: SignalEditorDocument, 37 /// Resource URI for the document being edited 38 pub resource_uri: String, 39 /// Presence state signal (updated by coordinator) 40 pub presence: Signal<PresenceSnapshot>, 41 /// Children to render (this component wraps the editor) 42 pub children: Element, 43} 44 45/// Coordinator component that bridges worker and PDS. 46/// 47/// This is a wrapper component that: 48/// 1. Provides CollabDebugState context 49/// 2. Manages collab lifecycle (worker, PDS records, peer discovery) 50/// 3. Renders children 51/// 52/// Lifecycle: 53/// 1. Worker spawned on mount, sends CollabReady with node_id 54/// 2. Coordinator creates session record on PDS 55/// 3. Coordinator discovers existing peers 56/// 4. Worker joins gossip session 57/// 5. Local updates forwarded to worker via subscribe_local_update 58/// 6. Remote updates from worker applied to main document 59/// 7. Session record deleted on unmount 60#[component] 61pub fn CollabCoordinator(props: CollabCoordinatorProps) -> Element { 62 #[cfg(target_arch = "wasm32")] 63 { 64 use crate::collab_context::CollabDebugState; 65 use crate::fetch::Fetcher; 66 use futures_util::stream::SplitSink; 67 use futures_util::{SinkExt, StreamExt}; 68 use gloo_worker::Spawnable; 69 use gloo_worker::reactor::ReactorBridge; 70 use jacquard::IntoStatic; 71 use weaver_common::WeaverExt; 72 use weaver_editor_crdt::{EditorReactor, WorkerInput, WorkerOutput}; 73 74 let fetcher = use_context::<Fetcher>(); 75 76 // Provide debug state context 77 let mut debug_state = use_signal(CollabDebugState::default); 78 use_context_provider(|| debug_state); 79 80 // Coordinator state 81 let mut state: Signal<CoordinatorState> = use_signal(|| CoordinatorState::Initializing); 82 83 // Worker sink for sending messages - Signal persists across renders 84 type WorkerSink = SplitSink<ReactorBridge<EditorReactor>, WorkerInput>; 85 let mut worker_sink: Signal<Option<WorkerSink>> = use_signal(|| None); 86 87 // Session record URI for cleanup 88 let mut session_uri: Signal<Option<AtUri<'static>>> = use_signal(|| None); 89 90 // Loro subscription handle (keep alive) 91 let mut loro_sub: Signal<Option<loro::Subscription>> = use_signal(|| None); 92 93 // Clone for closures 94 let resource_uri = props.resource_uri.clone(); 95 let mut doc = props.document.clone(); 96 let mut presence = props.presence; 97 98 // Spawn worker and set up message handling 99 let fetcher_for_spawn = fetcher.clone(); 100 let resource_uri_for_spawn = resource_uri.clone(); 101 use_effect(move || { 102 let mut worker_sink = worker_sink; 103 let fetcher = fetcher_for_spawn.clone(); 104 let resource_uri = resource_uri_for_spawn.clone(); 105 // Channel for local updates (Loro callback is Send+Sync, but ReactorBridge isn't) 106 let (local_update_tx, mut local_update_rx) = 107 tokio::sync::mpsc::unbounded_channel::<Vec<u8>>(); 108 109 let tx = local_update_tx.clone(); 110 111 // Subscribe to local Loro updates - just send to channel (Send+Sync) 112 let sub = doc 113 .loro_doc() 114 .subscribe_local_update(Box::new(move |update| { 115 let _ = tx.send(update.to_vec()); 116 true // Keep subscription active 117 })); 118 119 loro_sub.set(Some(sub)); 120 121 // Spawn the reactor 122 let bridge = EditorReactor::spawner().spawn("/editor_worker.js"); 123 let (sink, mut stream) = bridge.split(); 124 worker_sink.set(Some(sink)); 125 126 // Initialize worker with current document snapshot 127 let snapshot = doc.export_snapshot(); 128 let draft_key: SmolStr = resource_uri.clone().into(); // Use resource URI as the key 129 spawn(async move { 130 if let Some(ref mut sink) = *worker_sink.write() { 131 if let Err(e) = sink 132 .send(WorkerInput::Init { 133 snapshot, 134 draft_key, 135 }) 136 .await 137 { 138 tracing::error!("Failed to send Init to worker: {e}"); 139 } 140 } 141 }); 142 143 // Task 1: Forward local updates from channel to worker 144 spawn(async move { 145 while let Some(data) = local_update_rx.recv().await { 146 if let Some(ref mut s) = *worker_sink.write() { 147 if let Err(e) = s.send(WorkerInput::BroadcastUpdate { data }).await { 148 tracing::warn!("Failed to send BroadcastUpdate to worker: {e}"); 149 } 150 } 151 } 152 }); 153 154 // Task 2: Handle worker output messages 155 let doc_for_handler = doc.clone(); 156 spawn(async move { 157 let mut doc = doc_for_handler; 158 while let Some(output) = stream.next().await { 159 match output { 160 WorkerOutput::Ready => { 161 tracing::info!("CollabCoordinator: worker ready, starting collab"); 162 163 // Compute topic from resource URI 164 let topic = compute_collab_topic(&resource_uri); 165 166 // Send StartCollab to worker immediately (no blocking on profile fetch) 167 if let Some(ref mut s) = *worker_sink.write() { 168 if let Err(e) = s 169 .send(WorkerInput::StartCollab { 170 topic, 171 bootstrap_peers: vec![], 172 }) 173 .await 174 { 175 tracing::error!("Failed to send StartCollab to worker: {e}"); 176 } 177 } 178 } 179 180 WorkerOutput::CollabReady { node_id, relay_url } => { 181 tracing::info!( 182 node_id = %node_id, 183 relay_url = ?relay_url, 184 "CollabCoordinator: collab node ready" 185 ); 186 187 // Update debug state 188 debug_state.with_mut(|ds| { 189 ds.node_id = Some(node_id.clone()); 190 ds.relay_url = relay_url.clone(); 191 }); 192 193 state.set(CoordinatorState::CreatingSession { 194 node_id: node_id.clone(), 195 relay_url: relay_url.clone(), 196 }); 197 198 // Create session record on PDS 199 let fetcher = fetcher.clone(); 200 let resource_uri = resource_uri.clone(); 201 202 spawn(async move { 203 // Parse resource URI to get StrongRef 204 let uri = match AtUri::new(&resource_uri) { 205 Ok(u) => u.into_static(), 206 Err(e) => { 207 let err = format_smolstr!("Invalid resource URI: {e}"); 208 debug_state 209 .with_mut(|ds| ds.last_error = Some(err.clone())); 210 state.set(CoordinatorState::Error(err)); 211 return; 212 } 213 }; 214 215 // Get StrongRef for the resource 216 let strong_ref = match fetcher.confirm_record_ref(&uri).await { 217 Ok(r) => r, 218 Err(e) => { 219 let err = 220 format_smolstr!("Failed to get resource ref: {e}"); 221 debug_state 222 .with_mut(|ds| ds.last_error = Some(err.clone())); 223 state.set(CoordinatorState::Error(err)); 224 return; 225 } 226 }; 227 228 // Create session record 229 match fetcher 230 .create_collab_session( 231 &strong_ref, 232 &node_id, 233 relay_url.as_deref(), 234 Some(SESSION_TTL_MINUTES), 235 ) 236 .await 237 { 238 Ok(session_record_uri) => { 239 tracing::info!( 240 uri = %session_record_uri, 241 "CollabCoordinator: session record created" 242 ); 243 session_uri.set(Some(session_record_uri.clone())); 244 debug_state.with_mut(|ds| { 245 ds.session_record_uri = 246 Some(session_record_uri.to_string()); 247 }); 248 249 // Discover existing peers 250 #[cfg(feature = "use-index")] 251 let bootstrap_peers = match fetcher 252 .get_resource_sessions(&uri) 253 .await 254 { 255 Ok(output) => { 256 tracing::info!( 257 count = output.sessions.len(), 258 "CollabCoordinator: found peers via index" 259 ); 260 debug_state.with_mut(|ds| { 261 ds.discovered_peers = output.sessions.len(); 262 }); 263 output 264 .sessions 265 .into_iter() 266 .map(|s| s.node_id.as_ref().into()) 267 .collect::<Vec<SmolStr>>() 268 } 269 Err(e) => { 270 tracing::warn!( 271 "CollabCoordinator: peer discovery failed: {e}" 272 ); 273 vec![] 274 } 275 }; 276 277 #[cfg(not(feature = "use-index"))] 278 let bootstrap_peers = match fetcher 279 .find_session_peers(&uri) 280 .await 281 { 282 Ok(peers) => { 283 tracing::info!( 284 count = peers.len(), 285 "CollabCoordinator: found peers" 286 ); 287 debug_state.with_mut(|ds| { 288 ds.discovered_peers = peers.len(); 289 }); 290 peers 291 .into_iter() 292 .map(|p| p.node_id) 293 .collect::<Vec<_>>() 294 } 295 Err(e) => { 296 tracing::warn!( 297 "CollabCoordinator: peer discovery failed: {e}" 298 ); 299 vec![] 300 } 301 }; 302 303 // Send discovered peers to worker 304 if !bootstrap_peers.is_empty() { 305 tracing::info!( 306 count = bootstrap_peers.len(), 307 peers = ?bootstrap_peers, 308 "CollabCoordinator: sending AddPeers to worker" 309 ); 310 if let Some(ref mut s) = *worker_sink.write() { 311 if let Err(e) = s 312 .send(WorkerInput::AddPeers { 313 peers: bootstrap_peers, 314 }) 315 .await 316 { 317 tracing::error!( 318 "CollabCoordinator: AddPeers send failed: {e}" 319 ); 320 } 321 } else { 322 tracing::error!("CollabCoordinator: sink is None!"); 323 } 324 } else { 325 tracing::info!("CollabCoordinator: no peers to add"); 326 } 327 328 state.set(CoordinatorState::Active { 329 session_uri: session_record_uri.to_smolstr(), 330 }); 331 } 332 Err(e) => { 333 let err = format_smolstr!("Failed to create session: {e}"); 334 debug_state 335 .with_mut(|ds| ds.last_error = Some(err.clone())); 336 state.set(CoordinatorState::Error(err)); 337 } 338 } 339 }); 340 } 341 342 WorkerOutput::CollabJoined => { 343 tracing::info!("CollabCoordinator: joined gossip session"); 344 debug_state.with_mut(|ds| ds.is_joined = true); 345 } 346 347 WorkerOutput::RemoteUpdates { data } => { 348 if let Err(e) = doc.import_updates(&data) { 349 tracing::warn!( 350 "CollabCoordinator: failed to import updates: {:?}", 351 e 352 ); 353 } 354 } 355 356 WorkerOutput::PresenceUpdate(snapshot) => { 357 debug_state.with_mut(|ds| { 358 ds.connected_peers = snapshot.peer_count; 359 }); 360 presence.set(snapshot); 361 } 362 363 WorkerOutput::CollabStopped => { 364 tracing::info!("CollabCoordinator: collab stopped"); 365 debug_state.with_mut(|ds| { 366 ds.is_joined = false; 367 ds.connected_peers = 0; 368 }); 369 } 370 371 WorkerOutput::PeerConnected => { 372 tracing::info!("CollabCoordinator: peer connected, sending our Join"); 373 use weaver_api::sh_weaver::actor::ProfileDataViewInner; 374 375 let fetcher = fetcher.clone(); 376 377 // Get our profile info and send BroadcastJoin 378 let (our_did, our_display_name): (SmolStr, SmolStr) = match fetcher 379 .current_did() 380 .await 381 { 382 Some(did) => { 383 let display_name: SmolStr = 384 match fetcher.fetch_profile(&did.clone().into()).await { 385 Ok(profile) => match &profile.inner { 386 ProfileDataViewInner::ProfileView(p) => p 387 .display_name 388 .as_ref() 389 .map(|s| s.as_ref().into()) 390 .unwrap_or_else(|| did.as_ref().into()), 391 ProfileDataViewInner::ProfileViewDetailed(p) => p 392 .display_name 393 .as_ref() 394 .map(|s| s.as_ref().into()) 395 .unwrap_or_else(|| did.as_ref().into()), 396 ProfileDataViewInner::TangledProfileView(p) => { 397 p.handle.as_ref().into() 398 } 399 _ => did.as_ref().into(), 400 }, 401 Err(_) => did.as_ref().into(), 402 }; 403 (did.as_ref().into(), display_name) 404 } 405 None => { 406 tracing::warn!( 407 "CollabCoordinator: no current DID for Join message" 408 ); 409 ("unknown".into(), "Anonymous".into()) 410 } 411 }; 412 413 if let Some(ref mut s) = *worker_sink.write() { 414 if let Err(e) = s 415 .send(WorkerInput::BroadcastJoin { 416 did: our_did, 417 display_name: our_display_name, 418 }) 419 .await 420 { 421 tracing::error!( 422 "CollabCoordinator: BroadcastJoin send failed: {e}" 423 ); 424 } 425 } 426 } 427 428 WorkerOutput::Error { message } => { 429 tracing::error!("CollabCoordinator: worker error: {message}"); 430 debug_state.with_mut(|ds| ds.last_error = Some(message.clone())); 431 state.set(CoordinatorState::Error(message)); 432 } 433 434 WorkerOutput::Snapshot { .. } => {} 435 } 436 } 437 tracing::info!("CollabCoordinator: worker stream ended"); 438 }); 439 440 tracing::info!("CollabCoordinator: spawned worker"); 441 }); 442 443 // Forward cursor updates to worker - memo re-runs when cursor/selection signals change 444 let cursor_signal = props.document.cursor; 445 let selection_signal = props.document.selection; 446 447 let _cursor_broadcaster = use_memo(move || { 448 let cursor = cursor_signal.read(); 449 let selection = *selection_signal.read(); 450 let position = cursor.offset; 451 let sel = selection.map(|s| (s.anchor, s.head)); 452 453 tracing::debug!( 454 position, 455 ?sel, 456 "CollabCoordinator: cursor changed, broadcasting" 457 ); 458 459 spawn(async move { 460 if let Some(ref mut s) = *worker_sink.write() { 461 tracing::debug!( 462 position, 463 "CollabCoordinator: sending BroadcastCursor to worker" 464 ); 465 if let Err(e) = s 466 .send(WorkerInput::BroadcastCursor { 467 position, 468 selection: sel, 469 }) 470 .await 471 { 472 tracing::warn!("Failed to send BroadcastCursor to worker: {e}"); 473 } 474 } else { 475 tracing::debug!( 476 position, 477 "CollabCoordinator: worker sink not ready, skipping cursor broadcast" 478 ); 479 } 480 }); 481 }); 482 483 // Periodic peer discovery 484 let fetcher_for_discovery = fetcher.clone(); 485 let resource_uri_for_discovery = resource_uri.clone(); 486 dioxus_sdk::time::use_interval( 487 std::time::Duration::from_millis(PEER_DISCOVERY_INTERVAL_MS as u64), 488 move |_| { 489 let fetcher = fetcher_for_discovery.clone(); 490 let resource_uri = resource_uri_for_discovery.clone(); 491 492 spawn(async move { 493 let uri = match AtUri::new(&resource_uri) { 494 Ok(u) => u, 495 Err(_) => return, 496 }; 497 498 #[cfg(feature = "use-index")] 499 match fetcher.get_resource_sessions(&uri).await { 500 Ok(output) => { 501 debug_state.with_mut(|ds| ds.discovered_peers = output.sessions.len()); 502 if !output.sessions.is_empty() { 503 let peer_ids: Vec<SmolStr> = output 504 .sessions 505 .into_iter() 506 .map(|s| s.node_id.as_ref().into()) 507 .collect(); 508 509 if let Some(ref mut s) = *worker_sink.write() { 510 if let Err(e) = 511 s.send(WorkerInput::AddPeers { peers: peer_ids }).await 512 { 513 tracing::warn!("Periodic AddPeers send failed: {e}"); 514 } 515 } 516 } 517 } 518 Err(e) => { 519 tracing::debug!("Peer discovery failed: {e}"); 520 } 521 } 522 523 #[cfg(not(feature = "use-index"))] 524 match fetcher.find_session_peers(&uri).await { 525 Ok(peers) => { 526 debug_state.with_mut(|ds| ds.discovered_peers = peers.len()); 527 if !peers.is_empty() { 528 let peer_ids: Vec<SmolStr> = 529 peers.into_iter().map(|p| p.node_id).collect(); 530 531 if let Some(ref mut s) = *worker_sink.write() { 532 if let Err(e) = 533 s.send(WorkerInput::AddPeers { peers: peer_ids }).await 534 { 535 tracing::warn!("Periodic AddPeers send failed: {e}"); 536 } 537 } 538 } 539 } 540 Err(e) => { 541 tracing::debug!("Peer discovery failed: {e}"); 542 } 543 } 544 }); 545 }, 546 ); 547 548 // Periodic session refresh 549 let fetcher_for_refresh = fetcher.clone(); 550 dioxus_sdk::time::use_interval( 551 std::time::Duration::from_millis(SESSION_REFRESH_INTERVAL_MS as u64), 552 move |_| { 553 let fetcher = fetcher_for_refresh.clone(); 554 555 if let Some(ref uri) = *session_uri.peek() { 556 let uri = uri.clone(); 557 spawn(async move { 558 match fetcher 559 .refresh_collab_session(&uri, SESSION_TTL_MINUTES) 560 .await 561 { 562 Ok(_) => { 563 tracing::debug!("Session refreshed"); 564 } 565 Err(e) => { 566 tracing::warn!("Session refresh failed: {e}"); 567 } 568 } 569 }); 570 } 571 }, 572 ); 573 574 // Cleanup on unmount 575 let fetcher_for_cleanup = fetcher.clone(); 576 use_drop(move || { 577 // Stop collab in worker 578 spawn(async move { 579 if let Some(ref mut s) = *worker_sink.write() { 580 if let Err(e) = s.send(WorkerInput::StopCollab).await { 581 tracing::warn!("Failed to send StopCollab to worker: {e}"); 582 } 583 } 584 }); 585 586 // Delete session record 587 if let Some(uri) = session_uri.peek().clone() { 588 let fetcher = fetcher_for_cleanup.clone(); 589 spawn(async move { 590 if let Err(e) = fetcher.delete_collab_session(&uri).await { 591 tracing::warn!("Failed to delete session record: {e}"); 592 } 593 }); 594 } 595 }); 596 } 597 // Render children - this component is a wrapper that provides context 598 rsx! { {props.children} } 599}