Diffdown is a real-time collaborative Markdown editor/previewer built on the AT Protocol diffdown.com

fix: eliminate GetText race in ApplyEdits, add coverage, remove dead ApplyEdit

+415 -26
+4 -26
internal/collaboration/hub.go
··· 102 102 r.broadcast <- &broadcastMsg{data: data, except: except} 103 103 } 104 104 105 - // ApplyEdit applies an operation via the OT engine and returns the broadcast message. 106 - func (r *Room) ApplyEdit(op Operation, sender *Client) { 107 - newText, _ := r.ot.ApplyWithVersion(op) 108 - 109 - type editMsg struct { 110 - Type string `json:"type"` 111 - Delta Operation `json:"delta"` 112 - Author string `json:"author"` 113 - Content string `json:"content"` 114 - } 115 - msg := editMsg{ 116 - Type: "edit", 117 - Delta: op, 118 - Author: sender.DID, 119 - Content: newText, 120 - } 121 - data, err := json.Marshal(msg) 122 - if err != nil { 123 - log.Printf("ApplyEdit: marshal: %v", err) 124 - return 125 - } 126 - r.BroadcastExcept(data, sender) 127 - } 128 - 129 105 // ApplyEdits applies a sequence of operations in order and broadcasts one 130 106 // combined message to all other clients. Each op is applied to the text 131 107 // resulting from the previous op, so positions in each op must be relative ··· 138 114 return 139 115 } 140 116 117 + // Capture the text returned by the final ApplyWithVersion so we don't 118 + // race against another goroutine calling GetText() separately. 119 + var finalText string 141 120 for i := range ops { 142 - r.ot.ApplyWithVersion(ops[i]) 121 + finalText, _ = r.ot.ApplyWithVersion(ops[i]) 143 122 } 144 123 145 124 // Include the full document text so receivers can detect and recover from 146 125 // divergence without a reconnect. 147 - finalText := r.ot.GetText() 148 126 type editsMsg struct { 149 127 Type string `json:"type"` 150 128 Deltas []Operation `json:"deltas"`
+411
internal/collaboration/hub_test.go
··· 1 + package collaboration 2 + 3 + import ( 4 + "encoding/json" 5 + "testing" 6 + "time" 7 + ) 8 + 9 + // stubClient creates a Client with a real send channel but no WebSocket conn. 10 + // It is usable for hub/room tests that don't exercise the network layer. 11 + func stubClient(hub *Hub, did, name, color, roomKey string) *Client { 12 + return &Client{ 13 + hub: hub, 14 + conn: nil, // not used in hub logic 15 + send: make(chan []byte, 256), 16 + DID: did, 17 + Name: name, 18 + Color: color, 19 + roomKey: roomKey, 20 + } 21 + } 22 + 23 + // drain reads all messages from a client's send channel within timeout. 24 + func drain(c *Client, timeout time.Duration) [][]byte { 25 + deadline := time.After(timeout) 26 + var msgs [][]byte 27 + for { 28 + select { 29 + case msg, ok := <-c.send: 30 + if !ok { 31 + return msgs 32 + } 33 + msgs = append(msgs, msg) 34 + case <-deadline: 35 + return msgs 36 + } 37 + } 38 + } 39 + 40 + // waitForMessages blocks until n messages arrive on c.send or timeout. 41 + func waitForMessages(c *Client, n int, timeout time.Duration) [][]byte { 42 + deadline := time.After(timeout) 43 + var msgs [][]byte 44 + for len(msgs) < n { 45 + select { 46 + case msg, ok := <-c.send: 47 + if !ok { 48 + return msgs 49 + } 50 + msgs = append(msgs, msg) 51 + case <-deadline: 52 + return msgs 53 + } 54 + } 55 + return msgs 56 + } 57 + 58 + // --- Hub tests --- 59 + 60 + func TestHub_GetOrCreateRoom_CreatesNew(t *testing.T) { 61 + hub := NewHub() 62 + room := hub.GetOrCreateRoom("doc1") 63 + if room == nil { 64 + t.Fatal("expected non-nil room") 65 + } 66 + } 67 + 68 + func TestHub_GetOrCreateRoom_ReturnsSame(t *testing.T) { 69 + hub := NewHub() 70 + r1 := hub.GetOrCreateRoom("doc1") 71 + r2 := hub.GetOrCreateRoom("doc1") 72 + if r1 != r2 { 73 + t.Error("expected same room instance for same rkey") 74 + } 75 + } 76 + 77 + func TestHub_GetOrCreateRoom_DifferentRooms(t *testing.T) { 78 + hub := NewHub() 79 + r1 := hub.GetOrCreateRoom("doc1") 80 + r2 := hub.GetOrCreateRoom("doc2") 81 + if r1 == r2 { 82 + t.Error("expected different rooms for different rkeys") 83 + } 84 + } 85 + 86 + func TestHub_GetRoom_NilForUnknown(t *testing.T) { 87 + hub := NewHub() 88 + if hub.GetRoom("nonexistent") != nil { 89 + t.Error("expected nil for unknown room") 90 + } 91 + } 92 + 93 + func TestHub_GetRoom_AfterCreate(t *testing.T) { 94 + hub := NewHub() 95 + hub.GetOrCreateRoom("doc1") 96 + if hub.GetRoom("doc1") == nil { 97 + t.Error("expected room to be retrievable after creation") 98 + } 99 + } 100 + 101 + // --- Room presence tests --- 102 + 103 + func TestRoom_RegisterClient_AppearsInPresence(t *testing.T) { 104 + hub := NewHub() 105 + room := hub.GetOrCreateRoom("doc-presence") 106 + c := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-presence") 107 + 108 + room.RegisterClient(c) 109 + 110 + // Wait for presence broadcast (register → broadcastPresence) 111 + msgs := waitForMessages(c, 1, 2*time.Second) 112 + if len(msgs) == 0 { 113 + t.Fatal("expected presence message after register, got none") 114 + } 115 + 116 + var pres PresenceMessage 117 + if err := json.Unmarshal(msgs[0], &pres); err != nil { 118 + t.Fatalf("unmarshal presence: %v", err) 119 + } 120 + if pres.Type != "presence" { 121 + t.Errorf("expected type=presence, got %q", pres.Type) 122 + } 123 + if len(pres.Users) != 1 { 124 + t.Fatalf("expected 1 user in presence, got %d", len(pres.Users)) 125 + } 126 + if pres.Users[0].DID != "did:plc:alice" { 127 + t.Errorf("expected DID did:plc:alice, got %q", pres.Users[0].DID) 128 + } 129 + } 130 + 131 + func TestRoom_MultipleClients_PresenceContainsAll(t *testing.T) { 132 + hub := NewHub() 133 + room := hub.GetOrCreateRoom("doc-multi") 134 + 135 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-multi") 136 + bob := stubClient(hub, "did:plc:bob", "Bob", "#0000ff", "doc-multi") 137 + 138 + room.RegisterClient(alice) 139 + // Drain alice's initial presence (just herself) 140 + waitForMessages(alice, 1, time.Second) 141 + 142 + room.RegisterClient(bob) 143 + // Both get a new presence broadcast; wait for 1 more on each 144 + aliceMsgs := waitForMessages(alice, 1, 2*time.Second) 145 + bobMsgs := waitForMessages(bob, 1, 2*time.Second) 146 + 147 + checkPresenceCount := func(name string, msgs [][]byte, want int) { 148 + t.Helper() 149 + if len(msgs) == 0 { 150 + t.Fatalf("%s: expected presence message, got none", name) 151 + } 152 + var pres PresenceMessage 153 + if err := json.Unmarshal(msgs[len(msgs)-1], &pres); err != nil { 154 + t.Fatalf("%s: unmarshal: %v", name, err) 155 + } 156 + if len(pres.Users) != want { 157 + t.Errorf("%s: expected %d users in presence, got %d", name, want, len(pres.Users)) 158 + } 159 + } 160 + 161 + checkPresenceCount("alice", aliceMsgs, 2) 162 + checkPresenceCount("bob", bobMsgs, 2) 163 + } 164 + 165 + func TestRoom_UnregisterClient_RemovedFromPresence(t *testing.T) { 166 + hub := NewHub() 167 + room := hub.GetOrCreateRoom("doc-unreg") 168 + 169 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-unreg") 170 + bob := stubClient(hub, "did:plc:bob", "Bob", "#0000ff", "doc-unreg") 171 + 172 + room.RegisterClient(alice) 173 + room.RegisterClient(bob) 174 + // Drain initial presence messages 175 + time.Sleep(100 * time.Millisecond) 176 + drain(alice, 200*time.Millisecond) 177 + drain(bob, 200*time.Millisecond) 178 + 179 + // Now unregister alice 180 + room.UnregisterClient(alice) 181 + 182 + // Bob should receive a presence update with 1 user 183 + msgs := waitForMessages(bob, 1, 2*time.Second) 184 + if len(msgs) == 0 { 185 + t.Fatal("expected presence update after unregister, got none") 186 + } 187 + var pres PresenceMessage 188 + if err := json.Unmarshal(msgs[len(msgs)-1], &pres); err != nil { 189 + t.Fatalf("unmarshal: %v", err) 190 + } 191 + if len(pres.Users) != 1 { 192 + t.Errorf("expected 1 user after alice unregisters, got %d", len(pres.Users)) 193 + } 194 + if pres.Users[0].DID != "did:plc:bob" { 195 + t.Errorf("expected bob to remain, got %q", pres.Users[0].DID) 196 + } 197 + } 198 + 199 + // --- Room broadcast tests --- 200 + 201 + func TestRoom_Broadcast_SendsToAll(t *testing.T) { 202 + hub := NewHub() 203 + room := hub.GetOrCreateRoom("doc-broadcast") 204 + 205 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-broadcast") 206 + bob := stubClient(hub, "did:plc:bob", "Bob", "#0000ff", "doc-broadcast") 207 + 208 + room.RegisterClient(alice) 209 + room.RegisterClient(bob) 210 + // Drain presence messages 211 + time.Sleep(100 * time.Millisecond) 212 + drain(alice, 200*time.Millisecond) 213 + drain(bob, 200*time.Millisecond) 214 + 215 + room.Broadcast([]byte(`{"type":"ping"}`)) 216 + 217 + aliceMsgs := waitForMessages(alice, 1, time.Second) 218 + bobMsgs := waitForMessages(bob, 1, time.Second) 219 + 220 + if len(aliceMsgs) == 0 { 221 + t.Error("alice: expected broadcast message") 222 + } 223 + if len(bobMsgs) == 0 { 224 + t.Error("bob: expected broadcast message") 225 + } 226 + } 227 + 228 + func TestRoom_BroadcastExcept_SkipsSender(t *testing.T) { 229 + hub := NewHub() 230 + room := hub.GetOrCreateRoom("doc-except") 231 + 232 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-except") 233 + bob := stubClient(hub, "did:plc:bob", "Bob", "#0000ff", "doc-except") 234 + 235 + room.RegisterClient(alice) 236 + room.RegisterClient(bob) 237 + time.Sleep(100 * time.Millisecond) 238 + drain(alice, 200*time.Millisecond) 239 + drain(bob, 200*time.Millisecond) 240 + 241 + room.BroadcastExcept([]byte(`{"type":"edit"}`), alice) 242 + 243 + // Bob should receive it; alice should not 244 + bobMsgs := waitForMessages(bob, 1, time.Second) 245 + aliceMsgs := drain(alice, 300*time.Millisecond) 246 + 247 + if len(bobMsgs) == 0 { 248 + t.Error("bob: expected broadcast message") 249 + } 250 + if len(aliceMsgs) > 0 { 251 + t.Errorf("alice: should not receive her own broadcast, got %d messages", len(aliceMsgs)) 252 + } 253 + } 254 + 255 + // --- Room ApplyEdits tests --- 256 + 257 + func TestRoom_ApplyEdit_BroadcastsToOthers(t *testing.T) { 258 + hub := NewHub() 259 + room := hub.GetOrCreateRoom("doc-edit") 260 + 261 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-edit") 262 + bob := stubClient(hub, "did:plc:bob", "Bob", "#0000ff", "doc-edit") 263 + 264 + room.RegisterClient(alice) 265 + room.RegisterClient(bob) 266 + time.Sleep(100 * time.Millisecond) 267 + drain(alice, 200*time.Millisecond) 268 + drain(bob, 200*time.Millisecond) 269 + 270 + room.ApplyEdits([]Operation{{From: 0, To: -1, Insert: "hello world"}}, alice) 271 + 272 + // Bob should receive the edit message; alice should not 273 + bobMsgs := waitForMessages(bob, 1, time.Second) 274 + aliceMsgs := drain(alice, 300*time.Millisecond) 275 + 276 + if len(bobMsgs) == 0 { 277 + t.Fatal("bob: expected edit message, got none") 278 + } 279 + if len(aliceMsgs) > 0 { 280 + t.Errorf("alice: should not receive her own edit, got %d messages", len(aliceMsgs)) 281 + } 282 + 283 + var msg map[string]interface{} 284 + if err := json.Unmarshal(bobMsgs[0], &msg); err != nil { 285 + t.Fatalf("unmarshal edit msg: %v", err) 286 + } 287 + if msg["type"] != "edit" { 288 + t.Errorf("expected type=edit, got %q", msg["type"]) 289 + } 290 + if msg["author"] != "did:plc:alice" { 291 + t.Errorf("expected author did:plc:alice, got %q", msg["author"]) 292 + } 293 + if msg["content"] != "hello world" { 294 + t.Errorf("expected content %q, got %q", "hello world", msg["content"]) 295 + } 296 + } 297 + 298 + func TestRoom_ApplyEdit_UpdatesOTState(t *testing.T) { 299 + hub := NewHub() 300 + room := hub.GetOrCreateRoom("doc-ot-state") 301 + 302 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-ot-state") 303 + room.RegisterClient(alice) 304 + time.Sleep(100 * time.Millisecond) 305 + drain(alice, 200*time.Millisecond) 306 + 307 + room.ApplyEdits([]Operation{{From: 0, To: -1, Insert: "first"}}, alice) 308 + room.ApplyEdits([]Operation{{From: 5, To: 5, Insert: " second"}}, alice) 309 + 310 + // Give room goroutine time to process 311 + time.Sleep(100 * time.Millisecond) 312 + 313 + if got := room.ot.GetText(); got != "first second" { 314 + t.Errorf("OT state: got %q, want %q", got, "first second") 315 + } 316 + } 317 + 318 + func TestRoom_ApplyEdits_MultipleOpsAppliedInOrder(t *testing.T) { 319 + hub := NewHub() 320 + room := hub.GetOrCreateRoom("doc-multi-ops") 321 + 322 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-multi-ops") 323 + bob := stubClient(hub, "did:plc:bob", "Bob", "#0000ff", "doc-multi-ops") 324 + room.RegisterClient(alice) 325 + room.RegisterClient(bob) 326 + time.Sleep(100 * time.Millisecond) 327 + drain(alice, 200*time.Millisecond) 328 + drain(bob, 200*time.Millisecond) 329 + 330 + ops := []Operation{ 331 + {From: 0, To: -1, Insert: "hello"}, 332 + {From: 5, To: 5, Insert: " world"}, 333 + } 334 + room.ApplyEdits(ops, alice) 335 + 336 + bobMsgs := waitForMessages(bob, 1, time.Second) 337 + if len(bobMsgs) == 0 { 338 + t.Fatal("bob: expected edit message") 339 + } 340 + 341 + var msg struct { 342 + Type string `json:"type"` 343 + Deltas []Operation `json:"deltas"` 344 + Content string `json:"content"` 345 + Author string `json:"author"` 346 + } 347 + if err := json.Unmarshal(bobMsgs[0], &msg); err != nil { 348 + t.Fatalf("unmarshal: %v", err) 349 + } 350 + if msg.Type != "edit" { 351 + t.Errorf("type: got %q, want edit", msg.Type) 352 + } 353 + if msg.Content != "hello world" { 354 + t.Errorf("content: got %q, want %q", msg.Content, "hello world") 355 + } 356 + if len(msg.Deltas) != 2 { 357 + t.Errorf("deltas: got %d, want 2", len(msg.Deltas)) 358 + } 359 + if msg.Author != "did:plc:alice" { 360 + t.Errorf("author: got %q", msg.Author) 361 + } 362 + } 363 + 364 + func TestRoom_ApplyEdits_UpdatesOTState(t *testing.T) { 365 + hub := NewHub() 366 + room := hub.GetOrCreateRoom("doc-multi-ot") 367 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-multi-ot") 368 + room.RegisterClient(alice) 369 + time.Sleep(100 * time.Millisecond) 370 + drain(alice, 200*time.Millisecond) 371 + 372 + room.ApplyEdits([]Operation{ 373 + {From: 0, To: -1, Insert: "abc"}, 374 + {From: 3, To: 3, Insert: "def"}, 375 + }, alice) 376 + 377 + time.Sleep(50 * time.Millisecond) 378 + if got := room.ot.GetText(); got != "abcdef" { 379 + t.Errorf("OT state: got %q, want %q", got, "abcdef") 380 + } 381 + } 382 + 383 + func TestRoom_ApplyEdits_EmptyOpsIsNoop(t *testing.T) { 384 + hub := NewHub() 385 + room := hub.GetOrCreateRoom("doc-empty-ops") 386 + alice := stubClient(hub, "did:plc:alice", "Alice", "#ff0000", "doc-empty-ops") 387 + bob := stubClient(hub, "did:plc:bob", "Bob", "#0000ff", "doc-empty-ops") 388 + room.RegisterClient(alice) 389 + room.RegisterClient(bob) 390 + time.Sleep(100 * time.Millisecond) 391 + drain(alice, 200*time.Millisecond) 392 + drain(bob, 200*time.Millisecond) 393 + 394 + room.ApplyEdits(nil, alice) 395 + 396 + bobMsgs := drain(bob, 300*time.Millisecond) 397 + if len(bobMsgs) > 0 { 398 + t.Errorf("expected no broadcast for empty ops, got %d messages", len(bobMsgs)) 399 + } 400 + } 401 + 402 + // --- GetPresence --- 403 + 404 + func TestRoom_GetPresence_Empty(t *testing.T) { 405 + hub := NewHub() 406 + room := hub.GetOrCreateRoom("doc-empty-presence") 407 + users := room.GetPresence() 408 + if len(users) != 0 { 409 + t.Errorf("expected 0 users in new room, got %d", len(users)) 410 + } 411 + }