[DEPRECATED] Go implementation of plcbundle
at rust-test 758 lines 19 kB view raw
1package sync_test 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 "net/http/httptest" 8 "sync" 9 "sync/atomic" 10 "testing" 11 "time" 12 13 "github.com/goccy/go-json" 14 "tangled.org/atscan.net/plcbundle/internal/plcclient" 15 "tangled.org/atscan.net/plcbundle/internal/storage" 16 internalsync "tangled.org/atscan.net/plcbundle/internal/sync" 17 "tangled.org/atscan.net/plcbundle/internal/types" 18) 19 20type testLogger struct { 21 t *testing.T 22} 23 24func (l *testLogger) Printf(format string, v ...interface{}) { 25 l.t.Logf(format, v...) 26} 27 28func (l *testLogger) Println(v ...interface{}) { 29 l.t.Log(v...) 30} 31 32// Mock mempool for testing 33type mockMempool struct { 34 operations []plcclient.PLCOperation 35 mu sync.Mutex 36 saveCount int32 37} 38 39func newMockMempool() *mockMempool { 40 return &mockMempool{ 41 operations: make([]plcclient.PLCOperation, 0), 42 } 43} 44 45func (m *mockMempool) Add(ops []plcclient.PLCOperation) (int, error) { 46 m.mu.Lock() 47 defer m.mu.Unlock() 48 49 // Build existing CID set (like real mempool does) 50 existingCIDs := make(map[string]bool) 51 for _, op := range m.operations { 52 existingCIDs[op.CID] = true 53 } 54 55 // Only add new operations (deduplicate by CID) 56 addedCount := 0 57 for _, op := range ops { 58 if !existingCIDs[op.CID] { 59 m.operations = append(m.operations, op) 60 existingCIDs[op.CID] = true 61 addedCount++ 62 } 63 } 64 65 return addedCount, nil 66} 67 68func (m *mockMempool) Save() error { 69 atomic.AddInt32(&m.saveCount, 1) 70 return nil 71} 72 73func (m *mockMempool) SaveIfNeeded() error { 74 return m.Save() 75} 76 77func (m *mockMempool) Count() int { 78 m.mu.Lock() 79 defer m.mu.Unlock() 80 return len(m.operations) 81} 82 83func (m *mockMempool) GetLastTime() string { 84 m.mu.Lock() 85 defer m.mu.Unlock() 86 if len(m.operations) == 0 { 87 return "" 88 } 89 return m.operations[len(m.operations)-1].CreatedAt.Format(time.RFC3339Nano) 90} 91 92// ==================================================================================== 93// FETCHER TESTS - DEDUPLICATION & RETRY LOGIC 94// ==================================================================================== 95 96func TestFetcherDeduplication(t *testing.T) { 97 t.Run("BoundaryDuplicateHandling", func(t *testing.T) { 98 // Setup mock server 99 baseTime := time.Now() 100 boundaryTime := baseTime.Add(5 * time.Second) 101 102 // Simulate operations at bundle boundary 103 mockOps := []plcclient.PLCOperation{ 104 {DID: "did:plc:001", CID: "cid1", CreatedAt: boundaryTime}, 105 {DID: "did:plc:002", CID: "cid2", CreatedAt: boundaryTime}, 106 {DID: "did:plc:003", CID: "cid3", CreatedAt: boundaryTime.Add(1 * time.Second)}, 107 } 108 109 server := createMockPLCServer(t, mockOps) 110 defer server.Close() 111 112 // Create fetcher 113 client := plcclient.NewClient(server.URL) 114 defer client.Close() 115 116 logger := &testLogger{t: t} 117 ops, _ := storage.NewOperations(logger, false) 118 defer ops.Close() 119 120 fetcher := internalsync.NewFetcher(client, ops, logger) 121 122 // Previous bundle had cid1 and cid2 at boundary 123 prevBoundaryCIDs := map[string]bool{ 124 "cid1": true, 125 "cid2": true, 126 } 127 128 mempool := newMockMempool() 129 130 // Fetch 131 newOps, fetchCount, err := fetcher.FetchToMempool( 132 context.Background(), 133 boundaryTime.Add(-1*time.Second).Format(time.RFC3339Nano), 134 prevBoundaryCIDs, 135 10, 136 true, // quiet 137 mempool, 138 0, 139 ) 140 141 if err != nil { 142 t.Fatalf("FetchToMempool failed: %v", err) 143 } 144 145 // Should have filtered out cid1 and cid2 (duplicates) 146 // Only cid3 should be returned 147 if len(newOps) != 1 { 148 t.Errorf("expected 1 unique operation, got %d", len(newOps)) 149 } 150 151 if len(newOps) > 0 && newOps[0].CID != "cid3" { 152 t.Errorf("expected cid3, got %s", newOps[0].CID) 153 } 154 155 if fetchCount == 0 { 156 t.Error("expected at least one fetch") 157 } 158 }) 159 160 t.Run("ConcurrentFetchDedup", func(t *testing.T) { 161 baseTime := time.Now() 162 mockOps := make([]plcclient.PLCOperation, 50) 163 for i := 0; i < 50; i++ { 164 mockOps[i] = plcclient.PLCOperation{ 165 DID: fmt.Sprintf("did:plc:%03d", i), 166 CID: fmt.Sprintf("cid%03d", i), 167 CreatedAt: baseTime.Add(time.Duration(i) * time.Second), 168 } 169 } 170 171 server := createMockPLCServer(t, mockOps) 172 defer server.Close() 173 174 client := plcclient.NewClient(server.URL) 175 defer client.Close() 176 177 logger := &testLogger{t: t} 178 storageOps, _ := storage.NewOperations(logger, false) 179 defer storageOps.Close() 180 181 fetcher := internalsync.NewFetcher(client, storageOps, logger) 182 mempool := newMockMempool() 183 184 // First fetch 185 initialCount := mempool.Count() 186 _, _, err := fetcher.FetchToMempool( 187 context.Background(), 188 "", 189 nil, 190 30, 191 true, 192 mempool, 193 0, 194 ) 195 if err != nil { 196 t.Fatalf("First fetch failed: %v", err) 197 } 198 199 countAfterFirst := mempool.Count() 200 addedFirst := countAfterFirst - initialCount 201 202 if addedFirst == 0 { 203 t.Fatal("first fetch should add operations") 204 } 205 206 // Second fetch with same cursor - mempool deduplicates 207 countBeforeSecond := mempool.Count() 208 _, _, err = fetcher.FetchToMempool( 209 context.Background(), 210 "", // Same cursor - fetches same data 211 nil, 212 30, 213 true, 214 mempool, 215 1, 216 ) 217 if err != nil { 218 t.Fatalf("Second fetch failed: %v", err) 219 } 220 221 countAfterSecond := mempool.Count() 222 addedSecond := countAfterSecond - countBeforeSecond 223 224 // Mempool's Add() method deduplicates by CID 225 // So second fetch should add 0 (all duplicates) 226 if addedSecond != 0 { 227 t.Errorf("expected 0 new ops in mempool after second fetch (duplicates), got %d", addedSecond) 228 } 229 230 t.Logf("First fetch: +%d ops, Second fetch: +%d ops (deduped)", addedFirst, addedSecond) 231 }) 232 233 t.Run("EmptyBoundaryCIDs", func(t *testing.T) { 234 baseTime := time.Now() 235 mockOps := []plcclient.PLCOperation{ 236 {DID: "did:plc:001", CID: "cid1", CreatedAt: baseTime}, 237 } 238 239 server := createMockPLCServer(t, mockOps) 240 defer server.Close() 241 242 client := plcclient.NewClient(server.URL) 243 defer client.Close() 244 245 logger := &testLogger{t: t} 246 storageOps, _ := storage.NewOperations(logger, false) 247 defer storageOps.Close() 248 249 fetcher := internalsync.NewFetcher(client, storageOps, logger) 250 mempool := newMockMempool() 251 252 // Fetch with no boundary CIDs (genesis bundle) 253 newOps, _, err := fetcher.FetchToMempool( 254 context.Background(), 255 "", 256 nil, // No previous boundary 257 10, 258 true, 259 mempool, 260 0, 261 ) 262 263 if err != nil { 264 t.Fatalf("FetchToMempool failed: %v", err) 265 } 266 267 if len(newOps) != 1 { 268 t.Errorf("expected 1 operation, got %d", len(newOps)) 269 } 270 }) 271} 272 273func TestFetcherRetry(t *testing.T) { 274 t.Run("TransientFailures", func(t *testing.T) { 275 attemptCount := 0 276 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 277 attemptCount++ 278 279 if attemptCount < 3 { 280 // Fail first 2 attempts 281 w.WriteHeader(500) 282 return 283 } 284 285 // Succeed on 3rd attempt 286 w.Header().Set("Content-Type", "application/x-ndjson") 287 op := plcclient.PLCOperation{ 288 DID: "did:plc:test", 289 CID: "cid1", 290 CreatedAt: time.Now(), 291 } 292 json.NewEncoder(w).Encode(op) 293 })) 294 defer server.Close() 295 296 client := plcclient.NewClient(server.URL) 297 defer client.Close() 298 299 // Should retry and eventually succeed 300 _, err := client.Export(context.Background(), plcclient.ExportOptions{Count: 1}) 301 if err != nil { 302 t.Fatalf("expected retry to succeed, got error: %v", err) 303 } 304 305 if attemptCount < 3 { 306 t.Errorf("expected at least 3 attempts, got %d", attemptCount) 307 } 308 }) 309 310 t.Run("RateLimitHandling", func(t *testing.T) { 311 attemptCount := 0 312 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 313 attemptCount++ 314 315 if attemptCount == 1 { 316 // Return 429 with Retry-After 317 w.Header().Set("Retry-After", "1") 318 w.WriteHeader(429) 319 return 320 } 321 322 // Success 323 w.Header().Set("Content-Type", "application/x-ndjson") 324 op := plcclient.PLCOperation{ 325 DID: "did:plc:test", 326 CID: "cid1", 327 CreatedAt: time.Now(), 328 } 329 json.NewEncoder(w).Encode(op) 330 })) 331 defer server.Close() 332 333 client := plcclient.NewClient(server.URL) 334 defer client.Close() 335 336 startTime := time.Now() 337 _, err := client.Export(context.Background(), plcclient.ExportOptions{Count: 1}) 338 elapsed := time.Since(startTime) 339 340 if err != nil { 341 t.Fatalf("expected success after rate limit, got: %v", err) 342 } 343 344 // Should have waited at least 1 second 345 if elapsed < 1*time.Second { 346 t.Errorf("expected wait for rate limit, elapsed: %v", elapsed) 347 } 348 349 if attemptCount != 2 { 350 t.Errorf("expected 2 attempts, got %d", attemptCount) 351 } 352 }) 353 354 t.Run("ContextCancellation", func(t *testing.T) { 355 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 356 // Slow response 357 time.Sleep(5 * time.Second) 358 })) 359 defer server.Close() 360 361 client := plcclient.NewClient(server.URL) 362 defer client.Close() 363 364 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) 365 defer cancel() 366 367 _, err := client.Export(ctx, plcclient.ExportOptions{Count: 1}) 368 if err == nil { 369 t.Error("expected timeout error, got nil") 370 } 371 }) 372} 373 374func TestFetcherMempoolIntegration(t *testing.T) { 375 t.Run("AutoSaveAfterFetch", func(t *testing.T) { 376 baseTime := time.Now() 377 mockOps := []plcclient.PLCOperation{ 378 {DID: "did:plc:001", CID: "cid1", CreatedAt: baseTime}, 379 {DID: "did:plc:002", CID: "cid2", CreatedAt: baseTime.Add(1 * time.Second)}, 380 } 381 382 server := createMockPLCServer(t, mockOps) 383 defer server.Close() 384 385 client := plcclient.NewClient(server.URL) 386 defer client.Close() 387 388 logger := &testLogger{t: t} 389 storageOps, _ := storage.NewOperations(logger, false) 390 defer storageOps.Close() 391 392 fetcher := internalsync.NewFetcher(client, storageOps, logger) 393 mempool := newMockMempool() 394 395 _, _, err := fetcher.FetchToMempool( 396 context.Background(), 397 "", 398 nil, 399 10, 400 true, 401 mempool, 402 0, 403 ) 404 405 if err != nil { 406 t.Fatalf("FetchToMempool failed: %v", err) 407 } 408 409 // Verify mempool.SaveIfNeeded was called 410 if mempool.saveCount == 0 { 411 t.Error("expected mempool to be saved after fetch") 412 } 413 }) 414} 415 416// ==================================================================================== 417// CLONER TESTS 418// ==================================================================================== 419 420func TestClonerAtomicity(t *testing.T) { 421 // Note: Cloner tests would need more complex mocking 422 // Including mock HTTP server, file system operations, etc. 423 // This is a template showing what to test 424 425 t.Run("InterruptedClone", func(t *testing.T) { 426 // TODO: Test context cancellation mid-download 427 // Verify: 428 // - .tmp files are cleaned up OR kept for resume 429 // - Index not updated for incomplete downloads 430 // - Partial progress can resume with --resume flag 431 }) 432 433 t.Run("HashVerificationFailure", func(t *testing.T) { 434 // TODO: Mock server returns file with wrong hash 435 // Verify: 436 // - File is deleted (or .tmp is not renamed) 437 // - Bundle NOT added to index 438 // - Error returned to user 439 }) 440 441 t.Run("IndexUpdateTiming", func(t *testing.T) { 442 // CRITICAL: Index must only update AFTER file write succeeds 443 // TODO: Implement test that verifies ordering 444 }) 445} 446 447// ==================================================================================== 448// SYNC LOOP TESTS 449// ==================================================================================== 450 451func TestSyncLoopBehavior(t *testing.T) { 452 t.Run("CatchUpDetection", func(t *testing.T) { 453 // Mock manager 454 mockMgr := &mockSyncManager{ 455 lastBundle: 5, 456 mempoolCount: 500, 457 } 458 459 logger := &testLogger{t: t} 460 config := &internalsync.SyncLoopConfig{ 461 MaxBundles: 0, 462 Verbose: false, 463 Logger: logger, 464 SkipDIDIndex: false, 465 } 466 467 // First sync should detect "caught up" when no progress 468 synced, err := internalsync.SyncOnce(context.Background(), mockMgr, config) 469 470 if err != nil { 471 t.Fatalf("SyncOnce failed: %v", err) 472 } 473 474 // Should return 0 if already caught up 475 if synced != 0 { 476 t.Logf("Note: synced %d bundles (manager may not be caught up)", synced) 477 } 478 }) 479 480 t.Run("MaxBundlesLimit", func(t *testing.T) { 481 mockMgr := &mockSyncManager{ 482 lastBundle: 0, 483 mempoolCount: 10000, // Always has enough for bundle 484 } 485 486 logger := &testLogger{t: t} 487 config := &internalsync.SyncLoopConfig{ 488 MaxBundles: 3, 489 Verbose: false, 490 Logger: logger, 491 SkipDIDIndex: false, 492 } 493 494 ctx := context.Background() 495 synced, err := internalsync.SyncOnce(ctx, mockMgr, config) 496 497 if err != nil { 498 t.Fatalf("SyncOnce failed: %v", err) 499 } 500 501 // Should respect max limit 502 if synced > 3 { 503 t.Errorf("synced %d bundles, but max was 3", synced) 504 } 505 }) 506 507 t.Run("GracefulShutdown", func(t *testing.T) { 508 mockMgr := &mockSyncManager{ 509 lastBundle: 0, 510 mempoolCount: 10000, 511 fetchDelay: 50 * time.Millisecond, 512 } 513 514 logger := &testLogger{t: t} 515 config := &internalsync.SyncLoopConfig{ 516 Interval: 100 * time.Millisecond, 517 MaxBundles: 0, 518 Verbose: false, 519 Logger: logger, 520 SkipDIDIndex: false, 521 } 522 523 ctx, cancel := context.WithCancel(context.Background()) 524 525 // Start sync loop in goroutine 526 done := make(chan error, 1) 527 go func() { 528 done <- internalsync.RunSyncLoop(ctx, mockMgr, config) 529 }() 530 531 // Let it run briefly (should complete at least one cycle) 532 time.Sleep(250 * time.Millisecond) 533 534 // Cancel context 535 cancel() 536 537 // Should exit gracefully with context.Canceled error 538 select { 539 case err := <-done: 540 // Expected: context.Canceled or nil 541 if err != nil && err != context.Canceled { 542 t.Errorf("unexpected error on shutdown: %v", err) 543 } 544 t.Logf("Sync loop stopped cleanly: %v", err) 545 546 case <-time.After(2 * time.Second): 547 t.Error("sync loop did not stop within timeout after context cancellation") 548 } 549 550 // NOTE: Mempool saving on shutdown is handled by the caller (commands/server), 551 // not by the sync loop itself. The sync loop only respects context cancellation. 552 // 553 // For mempool save testing, see command-level tests. 554 }) 555} 556 557// ==================================================================================== 558// BUNDLER TESTS 559// ==================================================================================== 560 561func TestBundlerCreateBundle(t *testing.T) { 562 logger := &testLogger{t: t} 563 storageOps, _ := storage.NewOperations(logger, false) 564 defer storageOps.Close() 565 566 t.Run("BasicBundleCreation", func(t *testing.T) { 567 operations := makeTestOperations(10000) 568 cursor := operations[len(operations)-1].CreatedAt.Format(time.RFC3339Nano) 569 570 bundle := internalsync.CreateBundle(1, operations, cursor, "", storageOps) 571 572 if bundle.BundleNumber != 1 { 573 t.Errorf("wrong bundle number: got %d, want 1", bundle.BundleNumber) 574 } 575 576 if len(bundle.Operations) != 10000 { 577 t.Errorf("wrong operation count: got %d, want 10000", len(bundle.Operations)) 578 } 579 580 if bundle.DIDCount == 0 { 581 t.Error("DIDCount should not be zero") 582 } 583 584 if len(bundle.BoundaryCIDs) == 0 { 585 t.Error("BoundaryCIDs should not be empty") 586 } 587 588 if bundle.Cursor != cursor { 589 t.Error("cursor mismatch") 590 } 591 }) 592 593 t.Run("GenesisBundle", func(t *testing.T) { 594 operations := makeTestOperations(10000) 595 cursor := operations[len(operations)-1].CreatedAt.Format(time.RFC3339Nano) 596 597 bundle := internalsync.CreateBundle(1, operations, cursor, "", storageOps) 598 599 // Genesis should have empty parent 600 if bundle.Parent != "" { 601 t.Errorf("genesis bundle should have empty parent, got %s", bundle.Parent) 602 } 603 }) 604 605 t.Run("ChainedBundle", func(t *testing.T) { 606 operations := makeTestOperations(10000) 607 cursor := operations[len(operations)-1].CreatedAt.Format(time.RFC3339Nano) 608 parentHash := "parent_hash_from_bundle_1" 609 610 bundle := internalsync.CreateBundle(2, operations, cursor, parentHash, storageOps) 611 612 if bundle.Parent != parentHash { 613 t.Errorf("parent mismatch: got %s, want %s", bundle.Parent, parentHash) 614 } 615 616 if bundle.BundleNumber != 2 { 617 t.Error("bundle number should be 2") 618 } 619 }) 620 621 t.Run("BoundaryTimestamps", func(t *testing.T) { 622 baseTime := time.Now() 623 624 // Create operations where last 5 share same timestamp 625 operations := makeTestOperations(10000) 626 for i := 9995; i < 10000; i++ { 627 operations[i].CreatedAt = baseTime 628 } 629 630 cursor := baseTime.Format(time.RFC3339Nano) 631 bundle := internalsync.CreateBundle(1, operations, cursor, "", storageOps) 632 633 // Should capture all 5 CIDs at boundary 634 if len(bundle.BoundaryCIDs) != 5 { 635 t.Errorf("expected 5 boundary CIDs, got %d", len(bundle.BoundaryCIDs)) 636 } 637 }) 638} 639 640// ==================================================================================== 641// MOCK SERVER & HELPERS 642// ==================================================================================== 643 644func createMockPLCServer(_ *testing.T, operations []plcclient.PLCOperation) *httptest.Server { 645 return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 646 if r.URL.Path != "/export" { 647 w.WriteHeader(404) 648 return 649 } 650 651 w.Header().Set("Content-Type", "application/x-ndjson") 652 653 // Return operations as JSONL 654 for _, op := range operations { 655 json.NewEncoder(w).Encode(op) 656 } 657 })) 658} 659 660func makeTestOperations(count int) []plcclient.PLCOperation { 661 ops := make([]plcclient.PLCOperation, count) 662 baseTime := time.Now().Add(-time.Hour) 663 664 for i := 0; i < count; i++ { 665 ops[i] = plcclient.PLCOperation{ 666 DID: fmt.Sprintf("did:plc:test%06d", i), 667 CID: fmt.Sprintf("bafy%06d", i), 668 CreatedAt: baseTime.Add(time.Duration(i) * time.Second), 669 } 670 } 671 672 return ops 673} 674 675// Mock sync manager for testing 676type mockSyncManager struct { 677 lastBundle int 678 mempoolCount int 679 fetchDelay time.Duration 680 mempoolSaveCount int 681 mu sync.Mutex 682} 683 684func (m *mockSyncManager) GetLastBundleNumber() int { 685 m.mu.Lock() 686 defer m.mu.Unlock() 687 return m.lastBundle 688} 689 690func (m *mockSyncManager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error { 691 m.mu.Lock() 692 defer m.mu.Unlock() 693 return nil 694} 695 696func (m *mockSyncManager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error { 697 m.mu.Lock() 698 defer m.mu.Unlock() 699 return nil 700} 701 702func (m *mockSyncManager) GetMempoolCount() int { 703 m.mu.Lock() 704 defer m.mu.Unlock() 705 return m.mempoolCount 706} 707 708func (m *mockSyncManager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) { 709 m.mu.Lock() 710 defer m.mu.Unlock() 711 712 if m.fetchDelay > 0 { 713 time.Sleep(m.fetchDelay) 714 } 715 716 // Simulate creating bundle if we have enough ops 717 if m.mempoolCount >= 10000 { 718 m.lastBundle++ 719 m.mempoolCount -= 10000 720 return m.lastBundle, nil, nil 721 } 722 723 // Not enough ops 724 return 0, nil, fmt.Errorf("insufficient operations") 725} 726 727func (m *mockSyncManager) SaveMempool() error { 728 m.mu.Lock() 729 defer m.mu.Unlock() 730 m.mempoolSaveCount++ 731 return nil 732} 733 734func TestMockMempoolDeduplication(t *testing.T) { 735 m := newMockMempool() 736 737 op1 := plcclient.PLCOperation{ 738 CID: "duplicate_cid", 739 DID: "did:plc:test", 740 CreatedAt: time.Now(), 741 } 742 743 // Add first time 744 added, _ := m.Add([]plcclient.PLCOperation{op1}) 745 if added != 1 { 746 t.Fatalf("first add should return 1, got %d", added) 747 } 748 749 // Add same CID again 750 added, _ = m.Add([]plcclient.PLCOperation{op1}) 751 if added != 0 { 752 t.Fatalf("duplicate add should return 0, got %d", added) 753 } 754 755 if m.Count() != 1 { 756 t.Fatalf("count should be 1, got %d", m.Count()) 757 } 758}