···99use std::sync::Arc;
1010use weaver_common::transport::CollabNode;
11111212+/// Debug state for the collab session, displayed in editor debug panel.
1313+#[derive(Clone, Default)]
1414+pub struct CollabDebugState {
1515+ /// Our node ID
1616+ pub node_id: Option<String>,
1717+ /// Our relay URL
1818+ pub relay_url: Option<String>,
1919+ /// URI of our published session record
2020+ pub session_record_uri: Option<String>,
2121+ /// Number of discovered peers
2222+ pub discovered_peers: usize,
2323+ /// Number of connected peers
2424+ pub connected_peers: usize,
2525+ /// Whether we've joined the gossip swarm
2626+ pub is_joined: bool,
2727+ /// Last error message
2828+ pub last_error: Option<String>,
2929+}
3030+1231/// Context state for the collaboration node.
1332///
1433/// This is provided as a Dioxus context and can be accessed by editor components
···3958#[component]
4059pub fn CollabProvider(children: Element) -> Element {
4160 let mut collab_ctx = use_signal(CollabContext::default);
6161+ let debug_state = use_signal(CollabDebugState::default);
42624363 // Spawn the CollabNode on mount
4464 let _spawn_result = use_resource(move || async move {
···6282 }
6383 });
64846565- // Provide the context
8585+ // Provide the contexts
6686 use_context_provider(|| collab_ctx);
8787+ use_context_provider(|| debug_state);
67886889 rsx! { {children} }
6990}
···7495pub fn CollabProvider(children: Element) -> Element {
7596 // On server/native, provide an empty context (collab happens in browser)
7697 let collab_ctx = use_signal(CollabContext::default);
9898+ let debug_state = use_signal(CollabDebugState::default);
7799 use_context_provider(|| collab_ctx);
100100+ use_context_provider(|| debug_state);
78101 rsx! { {children} }
79102}
80103···91114 let ctx = use_context::<Signal<CollabContext>>();
92115 ctx.read().node.is_some()
93116}
117117+118118+/// Hook to get the collab debug state signal.
119119+/// Returns None if called outside CollabProvider.
120120+pub fn try_use_collab_debug() -> Option<Signal<CollabDebugState>> {
121121+ try_use_context::<Signal<CollabDebugState>>()
122122+}
···561561 let mut doc_for_dom = document.clone();
562562 #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
563563 use_effect(move || {
564564- tracing::debug!("DOM update effect triggered");
565565-566566- tracing::debug!(
567567- composition_active = doc_for_dom.composition.read().is_some(),
568568- cursor = doc_for_dom.cursor.read().offset,
569569- "DOM update: checking state"
570570- );
564564+ // tracing::debug!(
565565+ // composition_active = doc_for_dom.composition.read().is_some(),
566566+ // cursor = doc_for_dom.cursor.read().offset,
567567+ // "DOM update: checking state"
568568+ // );
571569572570 // Skip DOM updates during IME composition - browser controls the preview
573571 if doc_for_dom.composition.read().is_some() {
···575573 return;
576574 }
577575578578- tracing::debug!(
576576+ tracing::trace!(
579577 cursor = doc_for_dom.cursor.read().offset,
580578 len = doc_for_dom.len_chars(),
581579 "DOM update proceeding (not in composition)"
···10871085 onselect: {
10881086 let mut doc = document.clone();
10891087 move |_evt| {
10901090- tracing::debug!("onselect fired");
10881088+ tracing::trace!("onselect fired");
10911089 let paras = cached_paragraphs();
10921090 sync_cursor_from_dom(&mut doc, editor_id, ¶s);
10931091 let spans = syntax_spans();
···11051103 onselectstart: {
11061104 let mut doc = document.clone();
11071105 move |_evt| {
11081108- tracing::debug!("onselectstart fired");
11061106+ tracing::trace!("onselectstart fired");
11091107 let paras = cached_paragraphs();
11101108 sync_cursor_from_dom(&mut doc, editor_id, ¶s);
11111109 let spans = syntax_spans();
···11231121 onselectionchange: {
11241122 let mut doc = document.clone();
11251123 move |_evt| {
11261126- tracing::debug!("onselectionchange fired");
11241124+ tracing::trace!("onselectionchange fired");
11271125 let paras = cached_paragraphs();
11281126 sync_cursor_from_dom(&mut doc, editor_id, ¶s);
11291127 let spans = syntax_spans();
···11411139 onclick: {
11421140 let mut doc = document.clone();
11431141 move |evt| {
11441144- tracing::debug!("onclick fired");
11421142+ tracing::trace!("onclick fired");
11451143 let paras = cached_paragraphs();
1146114411471145 // Check if click target is a math-clickable element
···12551253 let mut doc = document.clone();
12561254 move |evt: CompositionEvent| {
12571255 let data = evt.data().data();
12581258- tracing::debug!(
12561256+ tracing::trace!(
12591257 data = %data,
12601258 "compositionstart"
12611259 );
···12641262 if let Some(sel) = sel {
12651263 let (start, end) =
12661264 (sel.anchor.min(sel.head), sel.anchor.max(sel.head));
12671267- tracing::debug!(
12651265+ tracing::trace!(
12681266 start,
12691267 end,
12701268 "compositionstart: deleting selection"
···12741272 }
1275127312761274 let cursor_offset = doc.cursor.read().offset;
12771277- tracing::debug!(
12751275+ tracing::trace!(
12781276 cursor = cursor_offset,
12791277 "compositionstart: setting composition state"
12801278 );
···12891287 let mut doc = document.clone();
12901288 move |evt: CompositionEvent| {
12911289 let data = evt.data().data();
12921292- tracing::debug!(
12901290+ tracing::trace!(
12931291 data = %data,
12941292 "compositionupdate"
12951293 );
···13061304 let mut doc = document.clone();
13071305 move |evt: CompositionEvent| {
13081306 let final_text = evt.data().data();
13091309- tracing::debug!(
13071307+ tracing::trace!(
13101308 data = %final_text,
13111309 "compositionend"
13121310 );
···13541352 }
13551353 div { class: "editor-debug",
13561354 div { "Cursor: {document.cursor.read().offset}, Chars: {document.len_chars()}" },
13551355+ // Collab debug info
13561356+ {
13571357+ if let Some(debug_state) = crate::collab_context::try_use_collab_debug() {
13581358+ let ds = debug_state.read();
13591359+ rsx! {
13601360+ div { class: "collab-debug",
13611361+ if let Some(ref node_id) = ds.node_id {
13621362+ span { title: "{node_id}", "Node: {&node_id[..8.min(node_id.len())]}…" }
13631363+ }
13641364+ if ds.is_joined {
13651365+ span { class: "joined", "✓ Joined" }
13661366+ }
13671367+ span { "Peers: {ds.discovered_peers}" }
13681368+ if let Some(ref err) = ds.last_error {
13691369+ span { class: "error", title: "{err}", "⚠" }
13701370+ }
13711371+ }
13721372+ }
13731373+ } else {
13741374+ rsx! {}
13751375+ }
13761376+ },
13571377 ReportButton {
13581378 email: "editor-bugs@weaver.sh".to_string(),
13591379 editor_id: "markdown-editor".to_string(),
···15161536 render_cache: Signal<render::RenderCache>,
15171537) -> Element {
15181538 let presence_read = presence.read();
15391539+ let cursor_count = presence_read.len();
15191540 let cursors: Vec<_> = presence_read
15201541 .cursors()
15211542 .map(|(c, cur)| (c.display_name.clone(), c.color, cur.position, cur.selection))
15221543 .collect();
15441544+15451545+ if cursor_count > 0 {
15461546+ tracing::debug!(
15471547+ "RemoteCursors: {} collaborators, {} with cursors",
15481548+ cursor_count,
15491549+ cursors.len()
15501550+ );
15511551+ }
1523155215241553 if cursors.is_empty() {
15251554 return rsx! {};
···15581587 color: u32,
15591588 offset_map: Vec<super::offset_map::OffsetMapping>,
15601589) -> Element {
15611561- use super::cursor::{get_cursor_rect_relative, CursorRect};
15901590+ use super::cursor::{get_cursor_rect_relative, get_selection_rects_relative};
1562159115631563- // Convert RGBA u32 to CSS color
15921592+ // Convert RGBA u32 to CSS color (fully opaque for cursor)
15641593 let r = (color >> 24) & 0xFF;
15651594 let g = (color >> 16) & 0xFF;
15661595 let b = (color >> 8) & 0xFF;
15671596 let a = (color & 0xFF) as f32 / 255.0;
15681597 let color_css = format!("rgba({}, {}, {}, {})", r, g, b, a);
15981598+ // Semi-transparent version for selection highlight
15991599+ let selection_color_css = format!("rgba({}, {}, {}, 0.25)", r, g, b);
1569160015701601 // Get cursor position relative to editor
15711602 let rect = get_cursor_rect_relative(position, &offset_map, "markdown-editor");
16031603+16041604+ // Get selection rectangles if there's a selection
16051605+ let selection_rects = if let Some((start, end)) = selection {
16061606+ let (start, end) = if start <= end { (start, end) } else { (end, start) };
16071607+ get_selection_rects_relative(start, end, &offset_map, "markdown-editor")
16081608+ } else {
16091609+ vec![]
16101610+ };
1572161115731612 let Some(rect) = rect else {
16131613+ tracing::debug!(
16141614+ "RemoteCursorIndicator: no rect for position {} (offset_map len: {})",
16151615+ position,
16161616+ offset_map.len()
16171617+ );
15741618 return rsx! {};
15751619 };
1576162016211621+ tracing::trace!(
16221622+ "RemoteCursorIndicator: {} at ({}, {}) h={}, selection_rects={}",
16231623+ display_name,
16241624+ rect.x,
16251625+ rect.y,
16261626+ rect.height,
16271627+ selection_rects.len()
16281628+ );
16291629+15771630 let style = format!(
15781631 "left: {}px; top: {}px; --cursor-height: {}px; --cursor-color: {};",
15791632 rect.x, rect.y, rect.height, color_css
15801633 );
1581163415821635 rsx! {
16361636+ // Selection highlight rectangles (rendered behind cursor)
16371637+ for (i, sel_rect) in selection_rects.iter().enumerate() {
16381638+ div {
16391639+ key: "sel-{i}",
16401640+ class: "remote-selection",
16411641+ style: "left: {sel_rect.x}px; top: {sel_rect.y}px; width: {sel_rect.width}px; height: {sel_rect.height}px; background-color: {selection_color_css};",
16421642+ }
16431643+ }
16441644+15831645 div {
15841646 class: "remote-cursor",
15851647 style: "{style}",
+120
crates/weaver-app/src/components/editor/cursor.rs
···307307) -> Option<CursorRect> {
308308 None
309309}
310310+311311+/// A rectangle for part of a selection (one per line).
312312+#[derive(Debug, Clone, Copy)]
313313+pub struct SelectionRect {
314314+ pub x: f64,
315315+ pub y: f64,
316316+ pub width: f64,
317317+ pub height: f64,
318318+}
319319+320320+/// Get screen rectangles for a selection range, relative to editor.
321321+///
322322+/// Returns multiple rects if selection spans multiple lines.
323323+#[cfg(all(target_family = "wasm", target_os = "unknown"))]
324324+pub fn get_selection_rects_relative(
325325+ start: usize,
326326+ end: usize,
327327+ offset_map: &[OffsetMapping],
328328+ editor_id: &str,
329329+) -> Vec<SelectionRect> {
330330+ use wasm_bindgen::JsCast;
331331+332332+ if offset_map.is_empty() || start >= end {
333333+ return vec![];
334334+ }
335335+336336+ let Some(window) = web_sys::window() else {
337337+ return vec![];
338338+ };
339339+ let Some(document) = window.document() else {
340340+ return vec![];
341341+ };
342342+ let Some(editor) = document.get_element_by_id(editor_id) else {
343343+ return vec![];
344344+ };
345345+ let editor_rect = editor.get_bounding_client_rect();
346346+347347+ // Find mappings for start and end
348348+ let Some((start_mapping, _)) = find_mapping_for_char(offset_map, start) else {
349349+ return vec![];
350350+ };
351351+ let Some((end_mapping, _)) = find_mapping_for_char(offset_map, end) else {
352352+ return vec![];
353353+ };
354354+355355+ // Get containers
356356+ let start_container = document
357357+ .get_element_by_id(&start_mapping.node_id)
358358+ .or_else(|| {
359359+ let selector = format!("[data-node-id='{}']", start_mapping.node_id);
360360+ document.query_selector(&selector).ok().flatten()
361361+ });
362362+ let end_container = document
363363+ .get_element_by_id(&end_mapping.node_id)
364364+ .or_else(|| {
365365+ let selector = format!("[data-node-id='{}']", end_mapping.node_id);
366366+ document.query_selector(&selector).ok().flatten()
367367+ });
368368+369369+ let (Some(start_container), Some(end_container)) = (start_container, end_container) else {
370370+ return vec![];
371371+ };
372372+373373+ // Create range
374374+ let Ok(range) = document.create_range() else {
375375+ return vec![];
376376+ };
377377+378378+ // Set start
379379+ if let Some(child_index) = start_mapping.child_index {
380380+ let _ = range.set_start(&start_container, child_index as u32);
381381+ } else if let Ok(container_element) = start_container.clone().dyn_into::<web_sys::HtmlElement>() {
382382+ let offset_in_range = start - start_mapping.char_range.start;
383383+ let target_utf16_offset = start_mapping.char_offset_in_node + offset_in_range;
384384+ if let Ok((text_node, node_offset)) = find_text_node_at_offset(&container_element, target_utf16_offset) {
385385+ let _ = range.set_start(&text_node, node_offset as u32);
386386+ }
387387+ }
388388+389389+ // Set end
390390+ if let Some(child_index) = end_mapping.child_index {
391391+ let _ = range.set_end(&end_container, child_index as u32);
392392+ } else if let Ok(container_element) = end_container.dyn_into::<web_sys::HtmlElement>() {
393393+ let offset_in_range = end - end_mapping.char_range.start;
394394+ let target_utf16_offset = end_mapping.char_offset_in_node + offset_in_range;
395395+ if let Ok((text_node, node_offset)) = find_text_node_at_offset(&container_element, target_utf16_offset) {
396396+ let _ = range.set_end(&text_node, node_offset as u32);
397397+ }
398398+ }
399399+400400+ // Get all rects (one per line)
401401+ let Some(rects) = range.get_client_rects() else {
402402+ return vec![];
403403+ };
404404+ let mut result = Vec::new();
405405+406406+ for i in 0..rects.length() {
407407+ if let Some(rect) = rects.get(i) {
408408+ let rect: web_sys::DomRect = rect;
409409+ result.push(SelectionRect {
410410+ x: rect.x() - editor_rect.x(),
411411+ y: rect.y() - editor_rect.y(),
412412+ width: rect.width(),
413413+ height: rect.height().max(16.0),
414414+ });
415415+ }
416416+ }
417417+418418+ result
419419+}
420420+421421+#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
422422+pub fn get_selection_rects_relative(
423423+ _start: usize,
424424+ _end: usize,
425425+ _offset_map: &[OffsetMapping],
426426+ _editor_id: &str,
427427+) -> Vec<SelectionRect> {
428428+ vec![]
429429+}
+272-30
crates/weaver-app/src/components/editor/sync.rs
···103103 if let Ok(records_container) =
104104 embeds_map.get_or_create_container("records", loro::LoroList::new())
105105 {
106106- tracing::debug!("Loro embeds map records len: {}", records_container.len());
107107-108106 for i in 0..records_container.len() {
109107 let Some(value) = records_container.get(i) else {
110108 continue;
···132130 .await
133131 {
134132 Ok(html) => {
135135- tracing::debug!("Pre-fetched embed from Loro map: {}", key_uri);
136133 resolved.add_embed(key_uri, html, None);
137134 }
138135 Err(e) => {
···148145149146 // Strategy 2: If no embeds found in Loro map, parse markdown text
150147 if resolved.embed_content.is_empty() {
151151- use weaver_common::{collect_refs_from_markdown, ExtractedRef};
148148+ use weaver_common::{ExtractedRef, collect_refs_from_markdown};
152149153150 let text = doc.get_text("content");
154151 let markdown = text.to_string();
155152156153 if !markdown.is_empty() {
157157- tracing::debug!("Falling back to markdown parsing for embeds");
158154 let refs = collect_refs_from_markdown(&markdown);
159155160156 for extracted in refs {
···166162167163 match weaver_renderer::atproto::fetch_and_render(&key_uri, fetcher).await {
168164 Ok(html) => {
169169- tracing::debug!("Pre-fetched embed from markdown: {}", uri);
170165 resolved.add_embed(key_uri, html, None);
171166 }
172167 Err(e) => {
···707702 // Use inline for small diffs, blob for larger ones
708703 let (blob_ref, inline_diff): (Option<jacquard::types::blob::BlobRef<'static>>, _) =
709704 if updates.len() <= INLINE_THRESHOLD {
710710- tracing::debug!("Using inline diff ({} bytes)", updates.len());
711705 (None, Some(jacquard::bytes::Bytes::from(updates)))
712706 } else {
713713- tracing::debug!("Using blob diff ({} bytes)", updates.len());
714707 let mime_type = MimeType::new_static("application/octet-stream");
715708 let blob = client.upload_blob(updates, mime_type).await.map_err(|e| {
716709 WeaverError::InvalidNotebook(format!("Failed to upload diff: {}", e))
···976969977970 // Get the last seen diff rkey for this root (if any)
978971 let after_rkey = root_uri.as_ref().and_then(|uri| {
979979- last_seen_diffs.get(uri).and_then(|diff_uri| {
980980- diff_uri.rkey().map(|rk| rk.0.to_string())
981981- })
972972+ last_seen_diffs
973973+ .get(uri)
974974+ .and_then(|diff_uri| diff_uri.rkey().map(|rk| rk.0.to_string()))
982975 });
983976984977 // Load state from this root (skipping already-seen diffs)
985985- if let Some(pds_state) = load_edit_state_from_root_id(fetcher, root_id, after_rkey.as_deref()).await? {
978978+ if let Some(pds_state) =
979979+ load_edit_state_from_root_id(fetcher, root_id, after_rkey.as_deref()).await?
980980+ {
986981 // Import root snapshot into merged doc
987982 if let Err(e) = merged_doc.import(&pds_state.root_snapshot) {
988983 tracing::warn!("Failed to import root snapshot from {}: {:?}", root_did, e);
···11121107 // Skip diffs we've already seen (rkey/TID is lexicographically sortable by time)
11131108 if let Some(after) = after_rkey {
11141109 if rkey_str <= after {
11151115- tracing::debug!("Skipping already-seen diff rkey: {}", rkey_str);
11101110+ tracing::trace!("Skipping already-seen diff rkey: {}", rkey_str);
11161111 continue;
11171112 }
11181113 }
···13651360#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
13661361#[component]
13671362pub fn RealTimeSync(props: RealTimeSyncProps) -> Element {
13631363+ use crate::collab_context::try_use_collab_debug;
13681364 use tokio::sync::mpsc;
13651365+ use weaver_common::WeaverExt;
13691366 use weaver_common::transport::{CollabMessage, CollabSession, SessionEvent};
13701370- use weaver_common::WeaverExt;
1371136713721368 let collab_node = use_collab_node();
13731369 let fetcher = use_context::<crate::fetch::Fetcher>();
13741370 let mut session: Signal<Option<Arc<CollabSession>>> = use_signal(|| None);
13751371 // URI of our published session record (for cleanup)
13761372 let mut session_record_uri: Signal<Option<AtUri<'static>>> = use_signal(|| None);
13731373+ // Debug state for display in editor debug panel (optional, may not be provided)
13741374+ let debug_state = try_use_collab_debug();
13771375 // Channel for sending local updates from Loro callback to async broadcast task
13781376 let mut update_tx: Signal<Option<mpsc::UnboundedSender<Vec<u8>>>> = use_signal(|| None);
13791377 // Channel for sending cursor updates
···14381436 cursor_tx.set(Some(ctx));
1439143714401438 // Subscribe to local updates from Loro - fires when local changes are committed
14411441- let sub = doc_for_join.loro_doc().subscribe_local_update(Box::new(move |update| {
14421442- tracing::debug!("RealTimeSync: local update ({} bytes)", update.len());
14431443- if let Err(e) = tx.send(update.to_vec()) {
14441444- tracing::warn!("RealTimeSync: failed to queue update: {}", e);
14451445- }
14461446- true // Keep subscription active
14471447- }));
14391439+ let sub = doc_for_join
14401440+ .loro_doc()
14411441+ .subscribe_local_update(Box::new(move |update| {
14421442+ tracing::debug!("RealTimeSync: local update ({} bytes)", update.len());
14431443+ if let Err(e) = tx.send(update.to_vec()) {
14441444+ tracing::warn!("RealTimeSync: failed to queue update: {}", e);
14451445+ }
14461446+ true // Keep subscription active
14471447+ }));
14481448 _subscription.set(Some(sub));
1449144914501450 let doc_for_recv = doc_for_join.clone();
···14551455 // Derive topic from resource URI
14561456 let topic = CollabSession::topic_from_uri(uri.as_str());
1457145714581458+ // Wait for relay connection before discovering peers or publishing session
14591459+ // Browser clients REQUIRE relay for peer connectivity
14601460+ let relay_url = node.wait_for_relay().await;
14611461+ tracing::info!(
14621462+ relay_url = %relay_url,
14631463+ "RealTimeSync: relay connection ready"
14641464+ );
14651465+14661466+ // Update debug state with node info
14671467+ if let Some(mut ds) = debug_state {
14681468+ ds.with_mut(|s| {
14691469+ s.node_id = Some(node.node_id_string());
14701470+ s.relay_url = Some(relay_url.clone());
14711471+ });
14721472+ }
14731473+14581474 // Discover existing session peers for bootstrap
14591475 let bootstrap_peers = match fetcher.find_session_peers(&uri).await {
14601476 Ok(peers) => {
14611477 tracing::info!("RealTimeSync: found {} existing peers", peers.len());
14781478+ if let Some(mut ds) = debug_state {
14791479+ ds.with_mut(|s| s.discovered_peers = peers.len());
14801480+ }
14811481+ for p in &peers {
14821482+ tracing::info!(
14831483+ did = %p.did,
14841484+ node_id = %p.node_id,
14851485+ relay_url = ?p.relay_url,
14861486+ expires_at = ?p.expires_at,
14871487+ "RealTimeSync: discovered peer"
14881488+ );
14891489+ }
14621490 peers
14631491 .into_iter()
14641492 .filter_map(|p| {
···14681496 }
14691497 Err(e) => {
14701498 tracing::warn!("RealTimeSync: failed to find peers: {}", e);
14991499+ if let Some(mut ds) = debug_state {
15001500+ ds.with_mut(|s| s.last_error = Some(format!("peer discovery: {}", e)));
15011501+ }
14711502 vec![]
14721503 }
14731504 };
···14781509 .create_collab_session(
14791510 &resource_ref_for_spawn,
14801511 &node_id_str,
14811481- None, // relay_url - could add if needed
15121512+ Some(&relay_url),
14821513 Some(SESSION_TTL_MINUTES),
14831514 )
14841515 .await
14851516 {
14861517 Ok(uri) => {
14871518 tracing::info!("RealTimeSync: published session record: {}", uri);
15191519+ if let Some(mut ds) = debug_state {
15201520+ ds.with_mut(|s| s.session_record_uri = Some(uri.to_string()));
15211521+ }
14881522 session_record_uri.set(Some(uri));
14891523 }
14901524 Err(e) => {
14911525 tracing::warn!("RealTimeSync: failed to publish session record: {}", e);
15261526+ if let Some(mut ds) = debug_state {
15271527+ ds.with_mut(|s| s.last_error = Some(format!("publish session: {}", e)));
15281528+ }
14921529 }
14931530 }
1494153115321532+ // Clone before join() consumes them
15331533+ let node_for_discovery = node.clone();
15341534+ let bootstrap_peers_set = bootstrap_peers.clone();
15351535+14951536 match CollabSession::join(node, topic, bootstrap_peers).await {
14961537 Ok((collab_session, mut event_stream)) => {
14971538 let collab_session = Arc::new(collab_session);
14981539 session.set(Some(collab_session.clone()));
15401540+ if let Some(mut ds) = debug_state {
15411541+ ds.with_mut(|s| s.is_joined = true);
15421542+ }
1499154315001544 tracing::info!("RealTimeSync: joined session for {}", uri);
1501154515461546+ // Broadcast Join message to announce ourselves
15471547+ let our_did = fetcher.current_did().await;
15481548+ let display_name = if let Some(ref did) = our_did {
15491549+ use jacquard::types::ident::AtIdentifier;
15501550+ use weaver_api::sh_weaver::actor::ProfileDataViewInner;
15511551+15521552+ let ident = AtIdentifier::Did(did.clone());
15531553+ fetcher
15541554+ .fetch_profile(&ident)
15551555+ .await
15561556+ .ok()
15571557+ .and_then(|p| match &p.inner {
15581558+ ProfileDataViewInner::ProfileView(pv) => {
15591559+ pv.display_name.as_ref().map(|s| s.to_string())
15601560+ }
15611561+ ProfileDataViewInner::ProfileViewDetailed(pv) => {
15621562+ pv.display_name.as_ref().map(|s| s.to_string())
15631563+ }
15641564+ _ => None,
15651565+ })
15661566+ .unwrap_or_else(|| "Collaborator".to_string())
15671567+ } else {
15681568+ "Collaborator".to_string()
15691569+ };
15701570+ let join_msg = CollabMessage::Join {
15711571+ did: our_did.map(|d| d.to_string()).unwrap_or_default(),
15721572+ display_name,
15731573+ };
15741574+ if let Err(e) = collab_session.broadcast(&join_msg).await {
15751575+ tracing::warn!("RealTimeSync: failed to broadcast Join: {}", e);
15761576+ }
15771577+15781578+ // Request sync from existing peers
15791579+ // Convert our version vector to wire format
15801580+ let our_vv = doc_for_recv.version_vector();
15811581+ let have_version: Vec<(u64, u64)> = our_vv
15821582+ .iter()
15831583+ .map(|(peer, counter)| (*peer, *counter as u64))
15841584+ .collect();
15851585+ let sync_request = CollabMessage::SyncRequest { have_version };
15861586+ if let Err(e) = collab_session.broadcast(&sync_request).await {
15871587+ tracing::warn!("RealTimeSync: failed to broadcast SyncRequest: {}", e);
15881588+ } else {
15891589+ tracing::debug!("RealTimeSync: sent sync request ({} vv entries)", our_vv.len());
15901590+ }
15911591+15921592+ // Spawn TTL refresh task - keeps our session record alive
15931593+ let session_uri_for_refresh = session_record_uri.clone();
15941594+ let fetcher_for_refresh = fetcher.clone();
15951595+ spawn(async move {
15961596+ // Refresh every 5 minutes (TTL is 15 min, so plenty of buffer)
15971597+ let mut interval = n0_future::time::interval(
15981598+ n0_future::time::Duration::from_secs(5 * 60),
15991599+ );
16001600+ loop {
16011601+ interval.tick().await;
16021602+ if let Some(uri) = session_uri_for_refresh.peek().clone() {
16031603+ tracing::debug!("RealTimeSync: refreshing session TTL");
16041604+ if let Err(e) = fetcher_for_refresh
16051605+ .refresh_collab_session(&uri, SESSION_TTL_MINUTES)
16061606+ .await
16071607+ {
16081608+ tracing::warn!(
16091609+ "RealTimeSync: failed to refresh session: {}",
16101610+ e
16111611+ );
16121612+ }
16131613+ }
16141614+ }
16151615+ });
16161616+15021617 // Spawn broadcast task - sends local updates to gossip
15031618 let session_for_broadcast = collab_session.clone();
15041619 spawn(async move {
···15321647 }
15331648 });
1534164916501650+ // Spawn periodic peer discovery task
16511651+ // This handles the race condition where peers publish sessions
16521652+ // at different times and might miss each other on initial discovery
16531653+ let session_for_discovery = collab_session.clone();
16541654+ let fetcher_for_discovery = fetcher.clone();
16551655+ let uri_for_discovery = uri.clone();
16561656+ let our_node_id = node_for_discovery.node_id();
16571657+ let mut known_peers: std::collections::HashSet<weaver_common::transport::EndpointId> =
16581658+ bootstrap_peers_set.iter().cloned().collect();
16591659+ spawn(async move {
16601660+ // Check for new peers every 30 seconds
16611661+ let mut interval =
16621662+ n0_future::time::interval(n0_future::time::Duration::from_secs(30));
16631663+ loop {
16641664+ interval.tick().await;
16651665+ tracing::debug!("RealTimeSync: periodic discovery tick");
16661666+ match fetcher_for_discovery
16671667+ .find_session_peers(&uri_for_discovery)
16681668+ .await
16691669+ {
16701670+ Ok(peers) => {
16711671+ tracing::info!(
16721672+ "RealTimeSync: periodic discovery found {} session records",
16731673+ peers.len()
16741674+ );
16751675+ for p in &peers {
16761676+ tracing::debug!(
16771677+ " - peer: {} (relay: {:?}, expires: {:?})",
16781678+ p.node_id,
16791679+ p.relay_url,
16801680+ p.expires_at
16811681+ );
16821682+ }
16831683+ // Filter: parse node ID, exclude ourselves, exclude already known
16841684+ let new_peers: Vec<_> = peers
16851685+ .into_iter()
16861686+ .filter_map(|p| {
16871687+ weaver_common::transport::parse_node_id(&p.node_id)
16881688+ .ok()
16891689+ })
16901690+ .filter(|id| *id != our_node_id)
16911691+ .filter(|id| !known_peers.contains(id))
16921692+ .collect();
16931693+16941694+ if !new_peers.is_empty() {
16951695+ tracing::info!(
16961696+ "RealTimeSync: periodic discovery found {} NEW peers",
16971697+ new_peers.len()
16981698+ );
16991699+ for p in &new_peers {
17001700+ known_peers.insert(*p);
17011701+ }
17021702+ if let Err(e) =
17031703+ session_for_discovery.join_peers(new_peers).await
17041704+ {
17051705+ tracing::warn!(
17061706+ "RealTimeSync: failed to join discovered peers: {}",
17071707+ e
17081708+ );
17091709+ }
17101710+ }
17111711+ }
17121712+ Err(e) => {
17131713+ tracing::warn!(
17141714+ "RealTimeSync: periodic peer discovery failed: {}",
17151715+ e
17161716+ );
17171717+ }
17181718+ }
17191719+ }
17201720+ });
17211721+15351722 // Spawn event receiver task - receives updates from peers
15361723 let mut doc_for_recv = doc_for_recv.clone();
17241724+ let session_for_sync = collab_session.clone();
15371725 spawn(async move {
15381726 use n0_future::StreamExt;
1539172715401540- while let Some(event) = event_stream.next().await {
17281728+ while let Some(result) = event_stream.next().await {
17291729+ let event = match result {
17301730+ Ok(e) => e,
17311731+ Err(e) => {
17321732+ tracing::error!("RealTimeSync: event stream error: {}", e);
17331733+ break;
17341734+ }
17351735+ };
15411736 match event {
15421737 SessionEvent::Message { from, message } => {
15431738 match message {
···15591754 selection,
15601755 ..
15611756 } => {
15621562- presence.write().update_cursor(
15631563- &from,
15641564- position,
15651565- selection,
15661566- );
17571757+ // Add peer if not known (cursor might arrive before Join)
17581758+ let mut p = presence.write();
17591759+ if !p.contains(&from) {
17601760+ p.add_collaborator(
17611761+ from,
17621762+ "unknown".into(),
17631763+ "Peer".into(),
17641764+ );
17651765+ }
17661766+ p.update_cursor(&from, position, selection);
15671767 }
15681768 CollabMessage::Join { did, display_name } => {
15691769 tracing::info!(
···15861786 "RealTimeSync: sync request (have {} entries)",
15871787 have_version.len()
15881788 );
15891589- // TODO: Send snapshot or updates based on their version
17891789+ // Convert their version vector from wire format
17901790+ let their_vv: loro::VersionVector = have_version
17911791+ .into_iter()
17921792+ .map(|(peer, counter)| (peer, counter as i32))
17931793+ .collect();
17941794+17951795+ // Export updates they don't have
17961796+ if let Some(data) =
17971797+ doc_for_recv.export_updates_from(&their_vv)
17981798+ {
17991799+ tracing::info!(
18001800+ "RealTimeSync: sending {} bytes to sync peer",
18011801+ data.len()
18021802+ );
18031803+ let response = CollabMessage::SyncResponse {
18041804+ data,
18051805+ is_snapshot: false,
18061806+ };
18071807+ if let Err(e) =
18081808+ session_for_sync.broadcast(&response).await
18091809+ {
18101810+ tracing::warn!(
18111811+ "RealTimeSync: failed to send sync response: {}",
18121812+ e
18131813+ );
18141814+ }
18151815+ } else {
18161816+ tracing::debug!(
18171817+ "RealTimeSync: no updates to send (peer is up to date)"
18181818+ );
18191819+ }
15901820 }
15911591- _ => {}
18211821+ CollabMessage::SyncResponse { data, is_snapshot } => {
18221822+ tracing::info!(
18231823+ "RealTimeSync: received sync response ({} bytes, snapshot: {})",
18241824+ data.len(),
18251825+ is_snapshot
18261826+ );
18271827+ if let Err(e) = doc_for_recv.import_updates(&data) {
18281828+ tracing::warn!(
18291829+ "RealTimeSync: failed to import sync response: {:?}",
18301830+ e
18311831+ );
18321832+ }
18331833+ }
15921834 }
15931835 }
15941836 SessionEvent::PeerJoined(peer) => {
+2-1
crates/weaver-app/src/main.rs
···148148 );
149149150150 // Filter out noisy crates
151151- let filter = EnvFilter::new("debug,loro_internal=warn");
151151+ let filter =
152152+ EnvFilter::new("debug,loro_internal=warn,jacquard_identity=info,jacquard_common=info");
152153153154 let reg = Registry::default()
154155 .with(filter)
-1
crates/weaver-common/Cargo.toml
···5858send_wrapper = "0.6"
5959wasmworker = "0.1"
6060wasmworker-proc-macro = "0.1"
6161-iroh-quinn = { version = "0.14", default-features = false }
6261ring = { version = "0.17", default-features = false, features = ["wasm32_unknown_unknown_js"]}
6362getrandom = { version = "0.3", default-features = false, features = ["wasm_js"] }
6463
+120
crates/weaver-common/src/agent.rs
···14201420 };
1421142114221422 if !accept_output.records.is_empty() {
14231423+ // Both parties in a valid invite+accept pair are authorized
14241424+ let inviter_did = record_id.did.clone().into_static();
14251425+ collaborators.push(inviter_did);
14231426 collaborators.push(invitee_did);
14241427 }
14251428 }
14291429+14301430+ // Deduplicate (someone might appear in multiple pairs)
14311431+ collaborators.sort();
14321432+ collaborators.dedup();
1426143314271434 Ok(collaborators)
14281435 }
···19061913 use jacquard::types::string::Datetime;
19071914 use weaver_api::sh_weaver::collab::session::Session;
1908191519161916+ // Clean up any expired sessions first
19171917+ let _ = self.cleanup_expired_sessions().await;
19181918+19091919 let now_chrono = chrono::Utc::now().fixed_offset();
19101920 let now = Datetime::new(now_chrono);
19111921 let expires_at = ttl_minutes.map(|mins| {
···19731983 }
19741984 }
1975198519861986+ /// Update the relay URL in an existing session record.
19871987+ ///
19881988+ /// Called when the relay connection changes during a session.
19891989+ fn update_collab_session_relay<'a>(
19901990+ &'a self,
19911991+ session_uri: &'a AtUri<'a>,
19921992+ relay_url: Option<&'a str>,
19931993+ ) -> impl Future<Output = Result<(), WeaverError>> + 'a {
19941994+ async move {
19951995+ use weaver_api::sh_weaver::collab::session::Session;
19961996+19971997+ let relay_uri = relay_url
19981998+ .map(|url| jacquard::types::string::Uri::new(url))
19991999+ .transpose()
20002000+ .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid relay URL")))?;
20012001+20022002+ self.update_record::<Session>(session_uri, |session| {
20032003+ session.relay_url = relay_uri.clone();
20042004+ })
20052005+ .await?;
20062006+ Ok(())
20072007+ }
20082008+ }
20092009+20102010+ /// Delete all expired session records for the current user.
20112011+ ///
20122012+ /// Called before creating a new session to clean up stale records.
20132013+ fn cleanup_expired_sessions<'a>(
20142014+ &'a self,
20152015+ ) -> impl Future<Output = Result<u32, WeaverError>> + 'a
20162016+ where
20172017+ Self: Sized,
20182018+ {
20192019+ async move {
20202020+ use jacquard::types::nsid::Nsid;
20212021+ use weaver_api::com_atproto::repo::list_records::ListRecords;
20222022+ use weaver_api::sh_weaver::collab::session::Session;
20232023+20242024+ let (did, _) = self.session_info().await.ok_or_else(|| {
20252025+ AgentError::from(ClientError::invalid_request("No active session"))
20262026+ })?;
20272027+ let now = chrono::Utc::now();
20282028+ let mut deleted = 0u32;
20292029+20302030+ // List all our session records
20312031+ let collection =
20322032+ Nsid::new("sh.weaver.collab.session").map_err(WeaverError::AtprotoString)?;
20332033+ let request = ListRecords::new()
20342034+ .repo(did.clone())
20352035+ .collection(collection)
20362036+ .limit(100)
20372037+ .build();
20382038+20392039+ let response = self.send(request).await.map_err(AgentError::from)?;
20402040+ let output = response.into_output().map_err(|e| {
20412041+ AgentError::from(ClientError::invalid_request(format!(
20422042+ "Failed to list sessions: {}",
20432043+ e
20442044+ )))
20452045+ })?;
20462046+20472047+ for record in output.records {
20482048+ if let Ok(session) = jacquard::from_data::<Session>(&record.value) {
20492049+ // Check if expired
20502050+ if let Some(ref expires_at) = session.expires_at {
20512051+ let expires_str = expires_at.as_str();
20522052+ if let Ok(expires) = chrono::DateTime::parse_from_rfc3339(expires_str) {
20532053+ if expires.with_timezone(&chrono::Utc) < now {
20542054+ // Delete expired session
20552055+ if let Some(rkey) = record.uri.rkey() {
20562056+ if let Err(e) =
20572057+ self.delete_record::<Session>(rkey.clone()).await
20582058+ {
20592059+ tracing::warn!("Failed to delete expired session: {}", e);
20602060+ } else {
20612061+ deleted += 1;
20622062+ }
20632063+ }
20642064+ }
20652065+ }
20662066+ }
20672067+ }
20682068+ }
20692069+20702070+ if deleted > 0 {
20712071+ tracing::info!("Cleaned up {} expired session records", deleted);
20722072+ }
20732073+20742074+ Ok(deleted)
20752075+ }
20762076+ }
20772077+19762078 /// Find active collaboration sessions for a resource.
19772079 ///
19782080 /// Queries Constellation for session records referencing the given resource,
···19912093 use weaver_api::sh_weaver::collab::session::Session;
1992209419932095 const SESSION_NSID: &str = "sh.weaver.collab.session";
20962096+20972097+ // Get authorized collaborators (owner is checked separately via URI authority)
20982098+ let collaborators: std::collections::HashSet<Did<'static>> = self
20992099+ .find_collaborators_for_resource(resource_uri)
21002100+ .await
21012101+ .unwrap_or_default()
21022102+ .into_iter()
21032103+ .collect();
1994210419952105 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| {
19962106 AgentError::from(ClientError::invalid_request(format!(
···20532163 if *expires_at < now {
20542164 continue; // Session expired
20552165 }
21662166+ }
21672167+21682168+ // Check if peer is authorized (has valid invite+accept pair)
21692169+ let peer_did = record_id.did.clone().into_static();
21702170+ if !collaborators.contains(&peer_did) {
21712171+ tracing::debug!(
21722172+ peer = %peer_did,
21732173+ "Filtering out unauthorized session peer"
21742174+ );
21752175+ continue;
20562176 }
2057217720582178 peers.push(SessionPeer {
+75-2
crates/weaver-common/src/transport/messages.rs
···11//! Wire protocol for collaborative editing messages.
2233+use iroh::{PublicKey, SecretKey, Signature};
34use serde::{Deserialize, Serialize};
4556/// Messages exchanged between collaborators over gossip.
···5354}
54555556impl CollabMessage {
5656- /// Serialize message to CBOR bytes for wire transmission
5757+ /// Serialize message to postcard bytes for wire transmission.
5758 pub fn to_bytes(&self) -> Result<Vec<u8>, postcard::Error> {
5859 postcard::to_stdvec(self)
5960 }
60616161- /// Deserialize message from CBOR bytes
6262+ /// Deserialize message from postcard bytes.
6263 pub fn from_bytes(bytes: &[u8]) -> Result<Self, postcard::Error> {
6364 postcard::from_bytes(bytes)
6565+ }
6666+}
6767+6868+/// A signed message wrapper for authenticated transport.
6969+///
7070+/// Includes the sender's public key so receivers can verify without context.
7171+#[derive(Debug, Clone, Serialize, Deserialize)]
7272+pub struct SignedMessage {
7373+ /// Sender's public key (also their EndpointId).
7474+ pub from: PublicKey,
7575+ /// The serialized TimestampedMessage (postcard bytes).
7676+ pub data: Vec<u8>,
7777+ /// Ed25519 signature over data.
7878+ pub signature: Signature,
7979+}
8080+8181+/// Versioned wire format with timestamp.
8282+#[derive(Debug, Clone, Serialize, Deserialize)]
8383+enum WireMessage {
8484+ V0 { timestamp: u64, message: CollabMessage },
8585+}
8686+8787+/// A verified message with sender and timestamp info.
8888+#[derive(Debug, Clone)]
8989+pub struct ReceivedMessage {
9090+ /// Sender's public key.
9191+ pub from: PublicKey,
9292+ /// When the message was sent (micros since epoch).
9393+ pub timestamp: u64,
9494+ /// The decoded message.
9595+ pub message: CollabMessage,
9696+}
9797+9898+/// Error type for signed message operations.
9999+#[derive(Debug, thiserror::Error)]
100100+pub enum SignedMessageError {
101101+ #[error("serialization failed: {0}")]
102102+ Serialization(#[from] postcard::Error),
103103+ #[error("signature verification failed")]
104104+ InvalidSignature,
105105+}
106106+107107+impl SignedMessage {
108108+ /// Sign a message and encode to bytes for wire transmission.
109109+ pub fn sign_and_encode(secret_key: &SecretKey, message: &CollabMessage) -> Result<Vec<u8>, SignedMessageError> {
110110+ use web_time::SystemTime;
111111+112112+ let timestamp = SystemTime::now()
113113+ .duration_since(SystemTime::UNIX_EPOCH)
114114+ .unwrap()
115115+ .as_micros() as u64;
116116+ let wire = WireMessage::V0 { timestamp, message: message.clone() };
117117+ let data = postcard::to_stdvec(&wire)?;
118118+ let signature = secret_key.sign(&data);
119119+ let from = secret_key.public();
120120+ let signed = Self { from, data, signature };
121121+ Ok(postcard::to_stdvec(&signed)?)
122122+ }
123123+124124+ /// Decode from bytes and verify signature.
125125+ pub fn decode_and_verify(bytes: &[u8]) -> Result<ReceivedMessage, SignedMessageError> {
126126+ let signed: Self = postcard::from_bytes(bytes)?;
127127+ signed.from
128128+ .verify(&signed.data, &signed.signature)
129129+ .map_err(|_| SignedMessageError::InvalidSignature)?;
130130+ let wire: WireMessage = postcard::from_bytes(&signed.data)?;
131131+ let WireMessage::V0 { timestamp, message } = wire;
132132+ Ok(ReceivedMessage {
133133+ from: signed.from,
134134+ timestamp,
135135+ message,
136136+ })
64137 }
65138}
66139
+1-4
crates/weaver-common/src/transport/mod.rs
···14141515pub use discovery::{node_id_to_string, parse_node_id, DiscoveredPeer, DiscoveryError};
1616pub use iroh::EndpointId;
1717-pub use messages::CollabMessage;
1717+pub use messages::{CollabMessage, ReceivedMessage, SignedMessage, SignedMessageError};
1818pub use node::{CollabNode, TransportError};
1919pub use presence::{Collaborator, PresenceTracker, RemoteCursor};
2020pub use session::{CollabSession, SessionError, SessionEvent, TopicId};
2121-2222-/// ALPN protocol identifier for weaver collaboration
2323-pub const WEAVER_GOSSIP_ALPN: &[u8] = b"weaver/collab/0";
+51-5
crates/weaver-common/src/transport/node.rs
···33use iroh::Endpoint;
44use iroh::EndpointId;
55use iroh::SecretKey;
66-use iroh_gossip::net::Gossip;
66+use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
77use miette::Diagnostic;
88use std::sync::Arc;
99-1010-use super::WEAVER_GOSSIP_ALPN;
1191210/// Error type for transport operations
1311#[derive(Debug, thiserror::Error, Diagnostic)]
···4846 // In native, this can do direct P2P with relay fallback
4947 let endpoint = Endpoint::builder()
5048 .secret_key(secret_key.clone())
5151- .alpns(vec![WEAVER_GOSSIP_ALPN.to_vec()])
4949+ .alpns(vec![GOSSIP_ALPN.to_vec()])
5250 .bind()
5351 .await
5452 .map_err(|e| TransportError::Bind(Box::new(e)))?;
···58565957 // Build router to dispatch incoming connections by ALPN
6058 let router = iroh::protocol::Router::builder(endpoint.clone())
6161- .accept(WEAVER_GOSSIP_ALPN, gossip.clone())
5959+ .accept(GOSSIP_ALPN, gossip.clone())
6260 .spawn();
63616462 tracing::info!(node_id = %endpoint.id(), "CollabNode started");
···9795 /// Get a clone of the secret key (for session persistence if needed).
9896 pub fn secret_key(&self) -> SecretKey {
9997 self.secret_key.clone()
9898+ }
9999+100100+ /// Get the relay URL this node is connected to (if any).
101101+ ///
102102+ /// This should be published in session records so other peers can connect
103103+ /// via relay (essential for browser-to-browser connections).
104104+ pub fn relay_url(&self) -> Option<String> {
105105+ self.endpoint
106106+ .addr()
107107+ .relay_urls()
108108+ .next()
109109+ .map(|url| url.to_string())
110110+ }
111111+112112+ /// Get the full node address including relay info.
113113+ ///
114114+ /// Use this when you need to connect to this node from another peer.
115115+ pub fn node_addr(&self) -> iroh::EndpointAddr {
116116+ self.endpoint.addr()
117117+ }
118118+119119+ /// Wait for the endpoint to be online (relay connected).
120120+ ///
121121+ /// This should be called before publishing session records to ensure
122122+ /// the relay URL is available for peer discovery. For browser clients,
123123+ /// relay is required - we wait indefinitely since there's no fallback.
124124+ pub async fn wait_online(&self) {
125125+ self.endpoint.online().await;
126126+ }
127127+128128+ /// Wait for relay connection and return the relay URL.
129129+ ///
130130+ /// Waits indefinitely for relay - browser clients require relay URLs
131131+ /// for peer discovery. Returns the relay URL once connected.
132132+ pub async fn wait_for_relay(&self) -> String {
133133+ self.endpoint.online().await;
134134+ // After online(), relay_url should always be Some for browser clients
135135+ self.relay_url()
136136+ .expect("relay URL should be available after online()")
137137+ }
138138+139139+ /// Watch for address changes (including relay URL changes).
140140+ ///
141141+ /// Returns a stream that yields the address on each change.
142142+ /// Use this to detect relay URL changes and update session records.
143143+ pub fn watch_addr(&self) -> n0_future::boxed::BoxStream<iroh::EndpointAddr> {
144144+ use iroh::Watcher;
145145+ Box::pin(self.endpoint.watch_addr().stream())
100146 }
101147}
+93-30
crates/weaver-common/src/transport/session.rs
···99use n0_future::boxed::BoxStream;
1010use n0_future::stream;
11111212-use super::{CollabMessage, CollabNode};
1212+use super::{CollabMessage, CollabNode, SignedMessage};
13131414/// Topic ID for a gossip session - derived from resource URI.
1515pub type TopicId = iroh_gossip::TopicId;
···5757pub struct CollabSession {
5858 topic: TopicId,
5959 sender: GossipSender,
6060- #[allow(dead_code)]
6160 node: Arc<CollabNode>,
6261}
6362···7978 node: Arc<CollabNode>,
8079 topic: TopicId,
8180 bootstrap_peers: Vec<EndpointId>,
8282- ) -> Result<(Self, BoxStream<SessionEvent>), SessionError> {
8181+ ) -> Result<(Self, BoxStream<Result<SessionEvent, SessionError>>), SessionError> {
8282+ tracing::info!(
8383+ topic = ?topic,
8484+ bootstrap_count = bootstrap_peers.len(),
8585+ "CollabSession: joining topic"
8686+ );
8787+8888+ for peer in &bootstrap_peers {
8989+ tracing::debug!(peer = %peer, "CollabSession: bootstrap peer");
9090+ }
9191+8392 // Subscribe to the gossip topic
8493 let (sender, receiver) = node
8594 .gossip()
8686- .subscribe(topic, bootstrap_peers)
9595+ .subscribe_and_join(topic, bootstrap_peers)
8796 .await
8897 .map_err(|e| SessionError::Subscribe(Box::new(e)))?
8998 .split();
9999+100100+ tracing::info!("CollabSession: subscribed to gossip topic");
9010191102 let session = Self {
92103 topic,
···101112 }
102113103114 /// Convert gossip receiver into a stream of session events.
104104- fn event_stream(receiver: GossipReceiver) -> BoxStream<SessionEvent> {
105105- let stream = stream::unfold(receiver, |mut receiver| async move {
115115+ fn event_stream(receiver: GossipReceiver) -> BoxStream<Result<SessionEvent, SessionError>> {
116116+ let stream = stream::try_unfold(receiver, |mut receiver| async move {
106117 loop {
107107- match receiver.next().await {
108108- Some(Ok(event)) => {
109109- let session_event = match event {
110110- Event::NeighborUp(peer) => SessionEvent::PeerJoined(peer),
111111- Event::NeighborDown(peer) => SessionEvent::PeerLeft(peer),
112112- Event::Received(msg) => match CollabMessage::from_bytes(&msg.content) {
113113- Ok(message) => SessionEvent::Message {
114114- from: msg.delivered_from,
115115- message,
116116- },
117117- Err(e) => {
118118- tracing::warn!(?e, "failed to decode collab message");
118118+ let Some(event) = receiver.try_next().await.map_err(|e| {
119119+ tracing::error!(?e, "CollabSession: gossip receiver error");
120120+ SessionError::Decode(Box::new(e))
121121+ })?
122122+ else {
123123+ tracing::debug!("CollabSession: gossip stream ended");
124124+ return Ok(None);
125125+ };
126126+127127+ tracing::debug!(?event, "CollabSession: raw gossip event");
128128+ let session_event = match event {
129129+ Event::NeighborUp(peer) => {
130130+ tracing::info!(peer = %peer, "CollabSession: neighbor up");
131131+ SessionEvent::PeerJoined(peer)
132132+ }
133133+ Event::NeighborDown(peer) => {
134134+ tracing::info!(peer = %peer, "CollabSession: neighbor down");
135135+ SessionEvent::PeerLeft(peer)
136136+ }
137137+ Event::Received(msg) => {
138138+ tracing::debug!(
139139+ from = %msg.delivered_from,
140140+ bytes = msg.content.len(),
141141+ "CollabSession: received message"
142142+ );
143143+ match SignedMessage::decode_and_verify(&msg.content) {
144144+ Ok(received) => {
145145+ // Verify claimed sender matches transport sender
146146+ if received.from != msg.delivered_from {
147147+ tracing::warn!(
148148+ claimed = %received.from,
149149+ transport = %msg.delivered_from,
150150+ "sender mismatch - possible spoofing attempt"
151151+ );
119152 continue;
120153 }
121121- },
122122- Event::Lagged => {
123123- tracing::warn!("gossip receiver lagged, some messages may be lost");
154154+ SessionEvent::Message {
155155+ from: received.from,
156156+ message: received.message,
157157+ }
158158+ }
159159+ Err(e) => {
160160+ tracing::warn!(?e, "failed to verify/decode signed message");
124161 continue;
125162 }
126126- };
127127- return Some((session_event, receiver));
163163+ }
128164 }
129129- Some(Err(e)) => {
130130- tracing::warn!(?e, "gossip receiver error");
165165+ Event::Lagged => {
166166+ tracing::warn!("gossip receiver lagged, some messages may be lost");
131167 continue;
132168 }
133133- None => return None,
134134- }
169169+ };
170170+ break Ok(Some((session_event, receiver)));
135171 }
136172 });
137173138174 Box::pin(stream)
139175 }
140176141141- /// Broadcast a message to all peers in the session.
177177+ /// Broadcast a signed message to all peers in the session.
142178 pub async fn broadcast(&self, message: &CollabMessage) -> Result<(), SessionError> {
143143- let bytes = message
144144- .to_bytes()
179179+ let bytes = SignedMessage::sign_and_encode(&self.node.secret_key(), message)
145180 .map_err(|e| SessionError::Broadcast(Box::new(e)))?;
181181+182182+ tracing::debug!(
183183+ bytes = bytes.len(),
184184+ topic = ?self.topic,
185185+ "CollabSession: broadcasting signed message"
186186+ );
146187147188 self.sender
148189 .broadcast(bytes.into())
···155196 /// Get the topic ID for this session.
156197 pub fn topic(&self) -> TopicId {
157198 self.topic
199199+ }
200200+201201+ /// Add new peers to the gossip session.
202202+ ///
203203+ /// Use this to add peers discovered after initial subscription.
204204+ /// The gossip layer will attempt to connect to these peers.
205205+ pub async fn join_peers(&self, peers: Vec<EndpointId>) -> Result<(), SessionError> {
206206+ if peers.is_empty() {
207207+ return Ok(());
208208+ }
209209+ tracing::info!(
210210+ count = peers.len(),
211211+ "CollabSession: joining additional peers"
212212+ );
213213+ for peer in &peers {
214214+ tracing::debug!(peer = %peer, "CollabSession: adding peer");
215215+ }
216216+ self.sender
217217+ .join_peers(peers)
218218+ .await
219219+ .map_err(|e| SessionError::Subscribe(Box::new(e)))?;
220220+ Ok(())
158221 }
159222}