[DEPRECATED] Go implementation of plcbundle
1package sync_test
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 "net/http/httptest"
8 "sync"
9 "sync/atomic"
10 "testing"
11 "time"
12
13 "github.com/goccy/go-json"
14 "tangled.org/atscan.net/plcbundle/internal/plcclient"
15 "tangled.org/atscan.net/plcbundle/internal/storage"
16 internalsync "tangled.org/atscan.net/plcbundle/internal/sync"
17 "tangled.org/atscan.net/plcbundle/internal/types"
18)
19
20type testLogger struct {
21 t *testing.T
22}
23
24func (l *testLogger) Printf(format string, v ...interface{}) {
25 l.t.Logf(format, v...)
26}
27
28func (l *testLogger) Println(v ...interface{}) {
29 l.t.Log(v...)
30}
31
32// Mock mempool for testing
33type mockMempool struct {
34 operations []plcclient.PLCOperation
35 mu sync.Mutex
36 saveCount int32
37}
38
39func newMockMempool() *mockMempool {
40 return &mockMempool{
41 operations: make([]plcclient.PLCOperation, 0),
42 }
43}
44
45func (m *mockMempool) Add(ops []plcclient.PLCOperation) (int, error) {
46 m.mu.Lock()
47 defer m.mu.Unlock()
48
49 // Build existing CID set (like real mempool does)
50 existingCIDs := make(map[string]bool)
51 for _, op := range m.operations {
52 existingCIDs[op.CID] = true
53 }
54
55 // Only add new operations (deduplicate by CID)
56 addedCount := 0
57 for _, op := range ops {
58 if !existingCIDs[op.CID] {
59 m.operations = append(m.operations, op)
60 existingCIDs[op.CID] = true
61 addedCount++
62 }
63 }
64
65 return addedCount, nil
66}
67
68func (m *mockMempool) Save() error {
69 atomic.AddInt32(&m.saveCount, 1)
70 return nil
71}
72
73func (m *mockMempool) SaveIfNeeded() error {
74 return m.Save()
75}
76
77func (m *mockMempool) Count() int {
78 m.mu.Lock()
79 defer m.mu.Unlock()
80 return len(m.operations)
81}
82
83func (m *mockMempool) GetLastTime() string {
84 m.mu.Lock()
85 defer m.mu.Unlock()
86 if len(m.operations) == 0 {
87 return ""
88 }
89 return m.operations[len(m.operations)-1].CreatedAt.Format(time.RFC3339Nano)
90}
91
92// ====================================================================================
93// FETCHER TESTS - DEDUPLICATION & RETRY LOGIC
94// ====================================================================================
95
96func TestFetcherDeduplication(t *testing.T) {
97 t.Run("BoundaryDuplicateHandling", func(t *testing.T) {
98 // Setup mock server
99 baseTime := time.Now()
100 boundaryTime := baseTime.Add(5 * time.Second)
101
102 // Simulate operations at bundle boundary
103 mockOps := []plcclient.PLCOperation{
104 {DID: "did:plc:001", CID: "cid1", CreatedAt: boundaryTime},
105 {DID: "did:plc:002", CID: "cid2", CreatedAt: boundaryTime},
106 {DID: "did:plc:003", CID: "cid3", CreatedAt: boundaryTime.Add(1 * time.Second)},
107 }
108
109 server := createMockPLCServer(t, mockOps)
110 defer server.Close()
111
112 // Create fetcher
113 client := plcclient.NewClient(server.URL)
114 defer client.Close()
115
116 logger := &testLogger{t: t}
117 ops, _ := storage.NewOperations(logger, false)
118 defer ops.Close()
119
120 fetcher := internalsync.NewFetcher(client, ops, logger)
121
122 // Previous bundle had cid1 and cid2 at boundary
123 prevBoundaryCIDs := map[string]bool{
124 "cid1": true,
125 "cid2": true,
126 }
127
128 mempool := newMockMempool()
129
130 // Fetch
131 newOps, fetchCount, err := fetcher.FetchToMempool(
132 context.Background(),
133 boundaryTime.Add(-1*time.Second).Format(time.RFC3339Nano),
134 prevBoundaryCIDs,
135 10,
136 true, // quiet
137 mempool,
138 0,
139 )
140
141 if err != nil {
142 t.Fatalf("FetchToMempool failed: %v", err)
143 }
144
145 // Should have filtered out cid1 and cid2 (duplicates)
146 // Only cid3 should be returned
147 if len(newOps) != 1 {
148 t.Errorf("expected 1 unique operation, got %d", len(newOps))
149 }
150
151 if len(newOps) > 0 && newOps[0].CID != "cid3" {
152 t.Errorf("expected cid3, got %s", newOps[0].CID)
153 }
154
155 if fetchCount == 0 {
156 t.Error("expected at least one fetch")
157 }
158 })
159
160 t.Run("ConcurrentFetchDedup", func(t *testing.T) {
161 baseTime := time.Now()
162 mockOps := make([]plcclient.PLCOperation, 50)
163 for i := 0; i < 50; i++ {
164 mockOps[i] = plcclient.PLCOperation{
165 DID: fmt.Sprintf("did:plc:%03d", i),
166 CID: fmt.Sprintf("cid%03d", i),
167 CreatedAt: baseTime.Add(time.Duration(i) * time.Second),
168 }
169 }
170
171 server := createMockPLCServer(t, mockOps)
172 defer server.Close()
173
174 client := plcclient.NewClient(server.URL)
175 defer client.Close()
176
177 logger := &testLogger{t: t}
178 storageOps, _ := storage.NewOperations(logger, false)
179 defer storageOps.Close()
180
181 fetcher := internalsync.NewFetcher(client, storageOps, logger)
182 mempool := newMockMempool()
183
184 // First fetch
185 initialCount := mempool.Count()
186 _, _, err := fetcher.FetchToMempool(
187 context.Background(),
188 "",
189 nil,
190 30,
191 true,
192 mempool,
193 0,
194 )
195 if err != nil {
196 t.Fatalf("First fetch failed: %v", err)
197 }
198
199 countAfterFirst := mempool.Count()
200 addedFirst := countAfterFirst - initialCount
201
202 if addedFirst == 0 {
203 t.Fatal("first fetch should add operations")
204 }
205
206 // Second fetch with same cursor - mempool deduplicates
207 countBeforeSecond := mempool.Count()
208 _, _, err = fetcher.FetchToMempool(
209 context.Background(),
210 "", // Same cursor - fetches same data
211 nil,
212 30,
213 true,
214 mempool,
215 1,
216 )
217 if err != nil {
218 t.Fatalf("Second fetch failed: %v", err)
219 }
220
221 countAfterSecond := mempool.Count()
222 addedSecond := countAfterSecond - countBeforeSecond
223
224 // Mempool's Add() method deduplicates by CID
225 // So second fetch should add 0 (all duplicates)
226 if addedSecond != 0 {
227 t.Errorf("expected 0 new ops in mempool after second fetch (duplicates), got %d", addedSecond)
228 }
229
230 t.Logf("First fetch: +%d ops, Second fetch: +%d ops (deduped)", addedFirst, addedSecond)
231 })
232
233 t.Run("EmptyBoundaryCIDs", func(t *testing.T) {
234 baseTime := time.Now()
235 mockOps := []plcclient.PLCOperation{
236 {DID: "did:plc:001", CID: "cid1", CreatedAt: baseTime},
237 }
238
239 server := createMockPLCServer(t, mockOps)
240 defer server.Close()
241
242 client := plcclient.NewClient(server.URL)
243 defer client.Close()
244
245 logger := &testLogger{t: t}
246 storageOps, _ := storage.NewOperations(logger, false)
247 defer storageOps.Close()
248
249 fetcher := internalsync.NewFetcher(client, storageOps, logger)
250 mempool := newMockMempool()
251
252 // Fetch with no boundary CIDs (genesis bundle)
253 newOps, _, err := fetcher.FetchToMempool(
254 context.Background(),
255 "",
256 nil, // No previous boundary
257 10,
258 true,
259 mempool,
260 0,
261 )
262
263 if err != nil {
264 t.Fatalf("FetchToMempool failed: %v", err)
265 }
266
267 if len(newOps) != 1 {
268 t.Errorf("expected 1 operation, got %d", len(newOps))
269 }
270 })
271}
272
273func TestFetcherRetry(t *testing.T) {
274 t.Run("TransientFailures", func(t *testing.T) {
275 attemptCount := 0
276 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
277 attemptCount++
278
279 if attemptCount < 3 {
280 // Fail first 2 attempts
281 w.WriteHeader(500)
282 return
283 }
284
285 // Succeed on 3rd attempt
286 w.Header().Set("Content-Type", "application/x-ndjson")
287 op := plcclient.PLCOperation{
288 DID: "did:plc:test",
289 CID: "cid1",
290 CreatedAt: time.Now(),
291 }
292 json.NewEncoder(w).Encode(op)
293 }))
294 defer server.Close()
295
296 client := plcclient.NewClient(server.URL)
297 defer client.Close()
298
299 // Should retry and eventually succeed
300 _, err := client.Export(context.Background(), plcclient.ExportOptions{Count: 1})
301 if err != nil {
302 t.Fatalf("expected retry to succeed, got error: %v", err)
303 }
304
305 if attemptCount < 3 {
306 t.Errorf("expected at least 3 attempts, got %d", attemptCount)
307 }
308 })
309
310 t.Run("RateLimitHandling", func(t *testing.T) {
311 attemptCount := 0
312 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
313 attemptCount++
314
315 if attemptCount == 1 {
316 // Return 429 with Retry-After
317 w.Header().Set("Retry-After", "1")
318 w.WriteHeader(429)
319 return
320 }
321
322 // Success
323 w.Header().Set("Content-Type", "application/x-ndjson")
324 op := plcclient.PLCOperation{
325 DID: "did:plc:test",
326 CID: "cid1",
327 CreatedAt: time.Now(),
328 }
329 json.NewEncoder(w).Encode(op)
330 }))
331 defer server.Close()
332
333 client := plcclient.NewClient(server.URL)
334 defer client.Close()
335
336 startTime := time.Now()
337 _, err := client.Export(context.Background(), plcclient.ExportOptions{Count: 1})
338 elapsed := time.Since(startTime)
339
340 if err != nil {
341 t.Fatalf("expected success after rate limit, got: %v", err)
342 }
343
344 // Should have waited at least 1 second
345 if elapsed < 1*time.Second {
346 t.Errorf("expected wait for rate limit, elapsed: %v", elapsed)
347 }
348
349 if attemptCount != 2 {
350 t.Errorf("expected 2 attempts, got %d", attemptCount)
351 }
352 })
353
354 t.Run("ContextCancellation", func(t *testing.T) {
355 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
356 // Slow response
357 time.Sleep(5 * time.Second)
358 }))
359 defer server.Close()
360
361 client := plcclient.NewClient(server.URL)
362 defer client.Close()
363
364 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
365 defer cancel()
366
367 _, err := client.Export(ctx, plcclient.ExportOptions{Count: 1})
368 if err == nil {
369 t.Error("expected timeout error, got nil")
370 }
371 })
372}
373
374func TestFetcherMempoolIntegration(t *testing.T) {
375 t.Run("AutoSaveAfterFetch", func(t *testing.T) {
376 baseTime := time.Now()
377 mockOps := []plcclient.PLCOperation{
378 {DID: "did:plc:001", CID: "cid1", CreatedAt: baseTime},
379 {DID: "did:plc:002", CID: "cid2", CreatedAt: baseTime.Add(1 * time.Second)},
380 }
381
382 server := createMockPLCServer(t, mockOps)
383 defer server.Close()
384
385 client := plcclient.NewClient(server.URL)
386 defer client.Close()
387
388 logger := &testLogger{t: t}
389 storageOps, _ := storage.NewOperations(logger, false)
390 defer storageOps.Close()
391
392 fetcher := internalsync.NewFetcher(client, storageOps, logger)
393 mempool := newMockMempool()
394
395 _, _, err := fetcher.FetchToMempool(
396 context.Background(),
397 "",
398 nil,
399 10,
400 true,
401 mempool,
402 0,
403 )
404
405 if err != nil {
406 t.Fatalf("FetchToMempool failed: %v", err)
407 }
408
409 // Verify mempool.SaveIfNeeded was called
410 if mempool.saveCount == 0 {
411 t.Error("expected mempool to be saved after fetch")
412 }
413 })
414}
415
416// ====================================================================================
417// CLONER TESTS
418// ====================================================================================
419
420func TestClonerAtomicity(t *testing.T) {
421 // Note: Cloner tests would need more complex mocking
422 // Including mock HTTP server, file system operations, etc.
423 // This is a template showing what to test
424
425 t.Run("InterruptedClone", func(t *testing.T) {
426 // TODO: Test context cancellation mid-download
427 // Verify:
428 // - .tmp files are cleaned up OR kept for resume
429 // - Index not updated for incomplete downloads
430 // - Partial progress can resume with --resume flag
431 })
432
433 t.Run("HashVerificationFailure", func(t *testing.T) {
434 // TODO: Mock server returns file with wrong hash
435 // Verify:
436 // - File is deleted (or .tmp is not renamed)
437 // - Bundle NOT added to index
438 // - Error returned to user
439 })
440
441 t.Run("IndexUpdateTiming", func(t *testing.T) {
442 // CRITICAL: Index must only update AFTER file write succeeds
443 // TODO: Implement test that verifies ordering
444 })
445}
446
447// ====================================================================================
448// SYNC LOOP TESTS
449// ====================================================================================
450
451func TestSyncLoopBehavior(t *testing.T) {
452 t.Run("CatchUpDetection", func(t *testing.T) {
453 // Mock manager
454 mockMgr := &mockSyncManager{
455 lastBundle: 5,
456 mempoolCount: 500,
457 }
458
459 logger := &testLogger{t: t}
460 config := &internalsync.SyncLoopConfig{
461 MaxBundles: 0,
462 Verbose: false,
463 Logger: logger,
464 SkipDIDIndex: false,
465 }
466
467 // First sync should detect "caught up" when no progress
468 synced, err := internalsync.SyncOnce(context.Background(), mockMgr, config)
469
470 if err != nil {
471 t.Fatalf("SyncOnce failed: %v", err)
472 }
473
474 // Should return 0 if already caught up
475 if synced != 0 {
476 t.Logf("Note: synced %d bundles (manager may not be caught up)", synced)
477 }
478 })
479
480 t.Run("MaxBundlesLimit", func(t *testing.T) {
481 mockMgr := &mockSyncManager{
482 lastBundle: 0,
483 mempoolCount: 10000, // Always has enough for bundle
484 }
485
486 logger := &testLogger{t: t}
487 config := &internalsync.SyncLoopConfig{
488 MaxBundles: 3,
489 Verbose: false,
490 Logger: logger,
491 SkipDIDIndex: false,
492 }
493
494 ctx := context.Background()
495 synced, err := internalsync.SyncOnce(ctx, mockMgr, config)
496
497 if err != nil {
498 t.Fatalf("SyncOnce failed: %v", err)
499 }
500
501 // Should respect max limit
502 if synced > 3 {
503 t.Errorf("synced %d bundles, but max was 3", synced)
504 }
505 })
506
507 t.Run("GracefulShutdown", func(t *testing.T) {
508 mockMgr := &mockSyncManager{
509 lastBundle: 0,
510 mempoolCount: 10000,
511 fetchDelay: 50 * time.Millisecond,
512 }
513
514 logger := &testLogger{t: t}
515 config := &internalsync.SyncLoopConfig{
516 Interval: 100 * time.Millisecond,
517 MaxBundles: 0,
518 Verbose: false,
519 Logger: logger,
520 SkipDIDIndex: false,
521 }
522
523 ctx, cancel := context.WithCancel(context.Background())
524
525 // Start sync loop in goroutine
526 done := make(chan error, 1)
527 go func() {
528 done <- internalsync.RunSyncLoop(ctx, mockMgr, config)
529 }()
530
531 // Let it run briefly (should complete at least one cycle)
532 time.Sleep(250 * time.Millisecond)
533
534 // Cancel context
535 cancel()
536
537 // Should exit gracefully with context.Canceled error
538 select {
539 case err := <-done:
540 // Expected: context.Canceled or nil
541 if err != nil && err != context.Canceled {
542 t.Errorf("unexpected error on shutdown: %v", err)
543 }
544 t.Logf("Sync loop stopped cleanly: %v", err)
545
546 case <-time.After(2 * time.Second):
547 t.Error("sync loop did not stop within timeout after context cancellation")
548 }
549
550 // NOTE: Mempool saving on shutdown is handled by the caller (commands/server),
551 // not by the sync loop itself. The sync loop only respects context cancellation.
552 //
553 // For mempool save testing, see command-level tests.
554 })
555}
556
557// ====================================================================================
558// BUNDLER TESTS
559// ====================================================================================
560
561func TestBundlerCreateBundle(t *testing.T) {
562 logger := &testLogger{t: t}
563 storageOps, _ := storage.NewOperations(logger, false)
564 defer storageOps.Close()
565
566 t.Run("BasicBundleCreation", func(t *testing.T) {
567 operations := makeTestOperations(10000)
568 cursor := operations[len(operations)-1].CreatedAt.Format(time.RFC3339Nano)
569
570 bundle := internalsync.CreateBundle(1, operations, cursor, "", storageOps)
571
572 if bundle.BundleNumber != 1 {
573 t.Errorf("wrong bundle number: got %d, want 1", bundle.BundleNumber)
574 }
575
576 if len(bundle.Operations) != 10000 {
577 t.Errorf("wrong operation count: got %d, want 10000", len(bundle.Operations))
578 }
579
580 if bundle.DIDCount == 0 {
581 t.Error("DIDCount should not be zero")
582 }
583
584 if len(bundle.BoundaryCIDs) == 0 {
585 t.Error("BoundaryCIDs should not be empty")
586 }
587
588 if bundle.Cursor != cursor {
589 t.Error("cursor mismatch")
590 }
591 })
592
593 t.Run("GenesisBundle", func(t *testing.T) {
594 operations := makeTestOperations(10000)
595 cursor := operations[len(operations)-1].CreatedAt.Format(time.RFC3339Nano)
596
597 bundle := internalsync.CreateBundle(1, operations, cursor, "", storageOps)
598
599 // Genesis should have empty parent
600 if bundle.Parent != "" {
601 t.Errorf("genesis bundle should have empty parent, got %s", bundle.Parent)
602 }
603 })
604
605 t.Run("ChainedBundle", func(t *testing.T) {
606 operations := makeTestOperations(10000)
607 cursor := operations[len(operations)-1].CreatedAt.Format(time.RFC3339Nano)
608 parentHash := "parent_hash_from_bundle_1"
609
610 bundle := internalsync.CreateBundle(2, operations, cursor, parentHash, storageOps)
611
612 if bundle.Parent != parentHash {
613 t.Errorf("parent mismatch: got %s, want %s", bundle.Parent, parentHash)
614 }
615
616 if bundle.BundleNumber != 2 {
617 t.Error("bundle number should be 2")
618 }
619 })
620
621 t.Run("BoundaryTimestamps", func(t *testing.T) {
622 baseTime := time.Now()
623
624 // Create operations where last 5 share same timestamp
625 operations := makeTestOperations(10000)
626 for i := 9995; i < 10000; i++ {
627 operations[i].CreatedAt = baseTime
628 }
629
630 cursor := baseTime.Format(time.RFC3339Nano)
631 bundle := internalsync.CreateBundle(1, operations, cursor, "", storageOps)
632
633 // Should capture all 5 CIDs at boundary
634 if len(bundle.BoundaryCIDs) != 5 {
635 t.Errorf("expected 5 boundary CIDs, got %d", len(bundle.BoundaryCIDs))
636 }
637 })
638}
639
640// ====================================================================================
641// MOCK SERVER & HELPERS
642// ====================================================================================
643
644func createMockPLCServer(_ *testing.T, operations []plcclient.PLCOperation) *httptest.Server {
645 return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
646 if r.URL.Path != "/export" {
647 w.WriteHeader(404)
648 return
649 }
650
651 w.Header().Set("Content-Type", "application/x-ndjson")
652
653 // Return operations as JSONL
654 for _, op := range operations {
655 json.NewEncoder(w).Encode(op)
656 }
657 }))
658}
659
660func makeTestOperations(count int) []plcclient.PLCOperation {
661 ops := make([]plcclient.PLCOperation, count)
662 baseTime := time.Now().Add(-time.Hour)
663
664 for i := 0; i < count; i++ {
665 ops[i] = plcclient.PLCOperation{
666 DID: fmt.Sprintf("did:plc:test%06d", i),
667 CID: fmt.Sprintf("bafy%06d", i),
668 CreatedAt: baseTime.Add(time.Duration(i) * time.Second),
669 }
670 }
671
672 return ops
673}
674
675// Mock sync manager for testing
676type mockSyncManager struct {
677 lastBundle int
678 mempoolCount int
679 fetchDelay time.Duration
680 mempoolSaveCount int
681 mu sync.Mutex
682}
683
684func (m *mockSyncManager) GetLastBundleNumber() int {
685 m.mu.Lock()
686 defer m.mu.Unlock()
687 return m.lastBundle
688}
689
690func (m *mockSyncManager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error {
691 m.mu.Lock()
692 defer m.mu.Unlock()
693 return nil
694}
695
696func (m *mockSyncManager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error {
697 m.mu.Lock()
698 defer m.mu.Unlock()
699 return nil
700}
701
702func (m *mockSyncManager) GetMempoolCount() int {
703 m.mu.Lock()
704 defer m.mu.Unlock()
705 return m.mempoolCount
706}
707
708func (m *mockSyncManager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) {
709 m.mu.Lock()
710 defer m.mu.Unlock()
711
712 if m.fetchDelay > 0 {
713 time.Sleep(m.fetchDelay)
714 }
715
716 // Simulate creating bundle if we have enough ops
717 if m.mempoolCount >= 10000 {
718 m.lastBundle++
719 m.mempoolCount -= 10000
720 return m.lastBundle, nil, nil
721 }
722
723 // Not enough ops
724 return 0, nil, fmt.Errorf("insufficient operations")
725}
726
727func (m *mockSyncManager) SaveMempool() error {
728 m.mu.Lock()
729 defer m.mu.Unlock()
730 m.mempoolSaveCount++
731 return nil
732}
733
734func TestMockMempoolDeduplication(t *testing.T) {
735 m := newMockMempool()
736
737 op1 := plcclient.PLCOperation{
738 CID: "duplicate_cid",
739 DID: "did:plc:test",
740 CreatedAt: time.Now(),
741 }
742
743 // Add first time
744 added, _ := m.Add([]plcclient.PLCOperation{op1})
745 if added != 1 {
746 t.Fatalf("first add should return 1, got %d", added)
747 }
748
749 // Add same CID again
750 added, _ = m.Add([]plcclient.PLCOperation{op1})
751 if added != 0 {
752 t.Fatalf("duplicate add should return 0, got %d", added)
753 }
754
755 if m.Count() != 1 {
756 t.Fatalf("count should be 1, got %d", m.Count())
757 }
758}