package sync import ( "context" "encoding/json" "errors" "fmt" "iter" "maps" "sync" "testing" "testing/synctest" "time" "github.com/bluesky-social/indigo/atproto/atclient" "tangled.org/karitham.dev/lazuli/atproto" "tangled.org/karitham.dev/lazuli/cache" ) // Mock Storage type mockStorage struct { cache.RecordStore unpublished map[string][]byte published map[string]bool failed map[string]string kv map[string]int mu sync.Mutex } func newMockStorage() *mockStorage { return &mockStorage{ unpublished: make(map[string][]byte), published: make(map[string]bool), failed: make(map[string]string), kv: make(map[string]int), } } func (m *mockStorage) SaveRecords(did string, records map[string][]byte) error { m.mu.Lock() defer m.mu.Unlock() maps.Copy(m.unpublished, records) return nil } func (m *mockStorage) IterateUnpublished(did string, reverse bool) iter.Seq2[string, []byte] { return func(yield func(key string, rec []byte) bool) { m.mu.Lock() keys := make([]string, 0, len(m.unpublished)) for k := range m.unpublished { keys = append(keys, k) } m.mu.Unlock() for _, k := range keys { m.mu.Lock() rec, ok := m.unpublished[k] m.mu.Unlock() if ok { if !yield(k, rec) { return } } } } } func (m *mockStorage) IteratePublished(did string, reverse bool) iter.Seq2[string, []byte] { return func(yield func(key string, rec []byte) bool) { m.mu.Lock() keys := make([]string, 0, len(m.published)) for k := range m.published { keys = append(keys, k) } m.mu.Unlock() for _, k := range keys { m.mu.Lock() rec, ok := m.unpublished[k] // Get record data from unpublished map m.mu.Unlock() if ok { if !yield(k, rec) { return } } } } } func (m *mockStorage) MarkPublished(did string, keys ...string) error { m.mu.Lock() defer m.mu.Unlock() for _, k := range keys { delete(m.unpublished, k) m.published[k] = true } return nil } func (m *mockStorage) MarkFailed(did string, keys []string, err string) error { m.mu.Lock() defer m.mu.Unlock() for _, k := range keys { m.failed[k] = err } return nil } func (m *mockStorage) Get(key string) (int, error) { m.mu.Lock() defer m.mu.Unlock() return m.kv[key], nil } func (m *mockStorage) IncrBy(key string, n int) (int, error) { m.mu.Lock() defer m.mu.Unlock() m.kv[key] += n return m.kv[key], nil } // Mock ATProtoClient type mockATProtoClient struct { applyWritesFunc func(ctx context.Context, collection string, records []*PlayRecord) error listRecordsFunc func(ctx context.Context, collection string, limit int, cursor string) ([]atproto.RecordRef[*PlayRecord], string, error) deleteRecordFunc func(ctx context.Context, collection, rkey string) error } func (m *mockATProtoClient) ApplyWrites(ctx context.Context, collection string, records []*PlayRecord) error { if m.applyWritesFunc != nil { return m.applyWritesFunc(ctx, collection, records) } return nil } func (m *mockATProtoClient) ListRecords(ctx context.Context, collection string, limit int, cursor string) ([]atproto.RecordRef[*PlayRecord], string, error) { if m.listRecordsFunc != nil { return m.listRecordsFunc(ctx, collection, limit, cursor) } return nil, "", nil } func (m *mockATProtoClient) DeleteRecord(ctx context.Context, collection, rkey string) error { if m.deleteRecordFunc != nil { return m.deleteRecordFunc(ctx, collection, rkey) } return nil } // Mock AuthClient type mockAuthClient struct { did string } func (m *mockAuthClient) APIClient() *atclient.APIClient { return nil } func (m *mockAuthClient) DID() string { return m.did } type timeoutError struct{} func (e timeoutError) Error() string { return "timeout" } func (e timeoutError) Timeout() bool { return true } func (e timeoutError) Temporary() bool { return true } type failingStorage struct { *mockStorage } func newFailingStorage() *failingStorage { return &failingStorage{ mockStorage: newMockStorage(), } } func (s *failingStorage) SaveRecords(did string, records map[string][]byte) error { return errors.New("failed to save records") } func TestBuildRecordBatches(t *testing.T) { tests := []struct { name string records []PlayRecord batchSize int wantBatches int wantErr bool ctxCancel bool }{ { name: "empty storage", records: []PlayRecord{}, batchSize: 2, wantBatches: 0, wantErr: false, }, { name: "single batch", records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, }, batchSize: 5, wantBatches: 1, wantErr: false, }, { name: "multiple exact batches", records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, {TrackName: "Song 3"}, {TrackName: "Song 4"}, }, batchSize: 2, wantBatches: 2, wantErr: false, }, { name: "partial final batch", records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, {TrackName: "Song 3"}, }, batchSize: 2, wantBatches: 2, wantErr: false, }, { name: "context cancelled", records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, }, batchSize: 2, wantBatches: 0, wantErr: true, ctxCancel: true, }, { name: "malformed records skipped", records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, }, batchSize: 2, wantBatches: 1, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() ctx := t.Context() if tt.ctxCancel { var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) cancel() } storage := newMockStorage() did := "did:example:123" // Add records to storage for i, record := range tt.records { data, _ := json.Marshal(record) if tt.name == "malformed records skipped" && i == 1 { data = []byte("invalid json") } storage.unpublished[fmt.Sprintf("key%d", i)] = data } batches, err := batchRecords(ctx, storage, did, tt.batchSize, false) if (err != nil) != tt.wantErr { t.Errorf("BuildRecordBatches() error = %v, wantErr %v", err, tt.wantErr) return } if len(batches) != tt.wantBatches { t.Errorf("BuildRecordBatches() batches = %d, want %d", len(batches), tt.wantBatches) } if !tt.wantErr { totalRecords := 0 for _, batch := range batches { totalRecords += len(batch.Records) } expectedRecords := len(tt.records) if tt.name == "malformed records skipped" { expectedRecords = len(tt.records) - 1 // Skip malformed record } if totalRecords != expectedRecords { t.Errorf("BuildRecordBatches() total records = %d, want %d", totalRecords, expectedRecords) } } }) } } func TestIteratePublished(t *testing.T) { tests := []struct { name string setupStorage func() *mockStorage reverse bool wantKeys []string }{ { name: "returns only published records", setupStorage: func() *mockStorage { s := newMockStorage() s.unpublished["key1"] = []byte(`{"trackName":"a"}`) s.unpublished["key2"] = []byte(`{"trackName":"b"}`) s.unpublished["key3"] = []byte(`{"trackName":"c"}`) s.published["key1"] = true s.published["key3"] = true return s }, reverse: false, wantKeys: []string{"key1", "key3"}, }, { name: "handles empty published set", setupStorage: func() *mockStorage { s := newMockStorage() s.unpublished["key1"] = []byte(`{"trackName":"a"}`) return s }, reverse: false, wantKeys: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() storage := tt.setupStorage() var gotKeys []string for key, rec := range storage.IteratePublished("did:test", tt.reverse) { gotKeys = append(gotKeys, key) _ = rec } if len(gotKeys) != len(tt.wantKeys) { t.Errorf("IteratePublished() returned %d keys, want %d", len(gotKeys), len(tt.wantKeys)) return } wantSet := make(map[string]bool) for _, k := range tt.wantKeys { wantSet[k] = true } for _, k := range gotKeys { if !wantSet[k] { t.Errorf("IteratePublished() got unexpected key %s", k) } } }) } } func TestProcessBatch(t *testing.T) { tests := []struct { name string batch recordBatch processor batchProcessor wantSuccess int wantError int wantErr bool setupClient func() *mockATProtoClient setupStorage func() cache.Storage }{ { name: "empty batch", batch: recordBatch{ Records: []*PlayRecord{}, Keys: []string{}, }, processor: batchProcessor{ Client: &mockATProtoClient{}, Storage: newMockStorage(), }, wantSuccess: 0, wantError: 0, wantErr: false, }, { name: "successful batch", batch: recordBatch{ Records: []*PlayRecord{{TrackName: "Song 1"}, {TrackName: "Song 2"}}, Keys: []string{"key1", "key2"}, }, processor: batchProcessor{ Client: &mockATProtoClient{}, Storage: newMockStorage(), DID: "did:example:123", ClientAgent: "test-agent", }, wantSuccess: 2, wantError: 0, wantErr: false, }, { name: "dry run batch", batch: recordBatch{ Records: []*PlayRecord{{TrackName: "Song 1"}, {TrackName: "Song 2"}}, Keys: []string{"key1", "key2"}, }, processor: batchProcessor{ Client: &mockATProtoClient{}, Storage: newMockStorage(), DID: "did:example:123", ClientAgent: "test-agent", DryRun: true, }, wantSuccess: 2, wantError: 0, wantErr: false, }, { name: "batch with apply writes failure", batch: recordBatch{ Records: []*PlayRecord{{TrackName: "Song 1"}}, Keys: []string{"key1"}, }, processor: batchProcessor{ Client: func() *mockATProtoClient { return &mockATProtoClient{ applyWritesFunc: func(ctx context.Context, collection string, records []*PlayRecord) error { return errors.New("apply writes failed") }, } }(), Storage: newMockStorage(), DID: "did:example:123", ClientAgent: "test-agent", }, wantSuccess: 0, wantError: 1, wantErr: false, }, { name: "batch with storage failure", batch: recordBatch{ Records: []*PlayRecord{{TrackName: "Song 1"}}, Keys: []string{"key1"}, }, processor: batchProcessor{ Client: &mockATProtoClient{}, Storage: newFailingStorage(), DID: "did:example:123", ClientAgent: "test-agent", }, wantSuccess: 0, wantError: 1, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() ctx := t.Context() result := processBatch(ctx, tt.batch, tt.processor) if result.SuccessCount != tt.wantSuccess { t.Errorf("ProcessBatch() success count = %d, want %d", result.SuccessCount, tt.wantSuccess) } if result.ErrorCount != tt.wantError { t.Errorf("ProcessBatch() error count = %d, want %d", result.ErrorCount, tt.wantError) } if tt.wantError > 0 && len(result.Errors) == 0 { t.Error("ProcessBatch() expected errors but got none") } }) } } func TestAggregateResults(t *testing.T) { tests := []struct { name string results []batchResult startTime time.Time wantSuccess int wantErrors int wantTotal int wantDuration bool wantRatePerMin bool }{ { name: "empty results", results: []batchResult{}, wantSuccess: 0, wantErrors: 0, wantTotal: 0, }, { name: "single successful result", results: []batchResult{ {SuccessCount: 5, ErrorCount: 0, Duration: time.Second}, }, wantSuccess: 5, wantErrors: 0, wantTotal: 5, wantDuration: true, wantRatePerMin: true, }, { name: "multiple mixed results", results: []batchResult{ {SuccessCount: 3, ErrorCount: 1, Duration: time.Second}, {SuccessCount: 2, ErrorCount: 0, Duration: time.Second}, {SuccessCount: 0, ErrorCount: 2, Duration: time.Second}, }, wantSuccess: 5, wantErrors: 3, wantTotal: 8, wantDuration: true, wantRatePerMin: true, }, { name: "all errors", results: []batchResult{ {SuccessCount: 0, ErrorCount: 3, Duration: time.Second}, {SuccessCount: 0, ErrorCount: 2, Duration: time.Second}, }, wantSuccess: 0, wantErrors: 5, wantTotal: 5, wantDuration: true, wantRatePerMin: false, // 0 success rate }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() startTime := time.Now() if !tt.startTime.IsZero() { startTime = tt.startTime } result := aggregate(tt.results, startTime) if result.SuccessCount != tt.wantSuccess { t.Errorf("AggregateResults() success = %d, want %d", result.SuccessCount, tt.wantSuccess) } if result.ErrorCount != tt.wantErrors { t.Errorf("AggregateResults() errors = %d, want %d", result.ErrorCount, tt.wantErrors) } if result.TotalRecords != tt.wantTotal { t.Errorf("AggregateResults() total = %d, want %d", result.TotalRecords, tt.wantTotal) } if tt.wantDuration && result.Duration == 0 { t.Error("AggregateResults() expected non-zero duration") } if tt.wantRatePerMin && result.RecordsPerMinute == 0 && tt.wantSuccess > 0 { t.Error("AggregateResults() expected non-zero rate per minute") } }) } } func TestPublish(t *testing.T) { tests := []struct { name string opts PublishOptions records []PlayRecord setupClient func() *mockATProtoClient wantSuccess int wantErrors int wantCancelled bool }{ { name: "successful publish", opts: PublishOptions{ BatchSize: 2, Storage: newMockStorage(), ClientAgent: "test-agent", }, records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, }, wantSuccess: 2, wantErrors: 0, }, { name: "dry run publish", opts: PublishOptions{ BatchSize: 2, DryRun: true, Storage: newMockStorage(), ClientAgent: "test-agent", }, records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, }, wantSuccess: 2, wantErrors: 0, }, { name: "publish with client errors", opts: PublishOptions{ BatchSize: 1, Storage: newMockStorage(), ClientAgent: "test-agent", }, records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, }, setupClient: func() *mockATProtoClient { return &mockATProtoClient{ applyWritesFunc: func(ctx context.Context, collection string, records []*PlayRecord) error { return &atclient.APIError{StatusCode: 400} // Non-transient error }, } }, wantSuccess: 0, wantErrors: 2, }, { name: "publish with reverse iteration", opts: PublishOptions{ BatchSize: 2, Reverse: true, Storage: newMockStorage(), ClientAgent: "test-agent", }, records: []PlayRecord{ {TrackName: "Song 1"}, {TrackName: "Song 2"}, {TrackName: "Song 3"}, }, wantSuccess: 3, wantErrors: 0, }, { name: "publish with empty records", opts: PublishOptions{ BatchSize: 2, Storage: newMockStorage(), ClientAgent: "test-agent", }, records: []PlayRecord{}, wantSuccess: 0, wantErrors: 0, }, { name: "publish with transient errors and retry", opts: PublishOptions{ BatchSize: 1, Storage: newMockStorage(), ClientAgent: "test-agent", }, records: []PlayRecord{ {TrackName: "Song 1"}, }, setupClient: func() *mockATProtoClient { return &mockATProtoClient{ applyWritesFunc: func(ctx context.Context, collection string, records []*PlayRecord) error { return &atclient.APIError{StatusCode: 500} // Transient error }, } }, wantSuccess: 0, wantErrors: 1, }, { name: "publish with storage failure", opts: PublishOptions{ BatchSize: 1, Storage: newFailingStorage(), ClientAgent: "test-agent", }, records: []PlayRecord{ {TrackName: "Song 1"}, }, wantSuccess: 0, wantErrors: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { synctest.Test(t, func(t *testing.T) { ctx := t.Context() did := "did:example:123" var storage *mockStorage if fs, ok := tt.opts.Storage.(*failingStorage); ok { storage = fs.mockStorage } else { storage = tt.opts.Storage.(*mockStorage) } // Add records to storage for i, record := range tt.records { data, _ := json.Marshal(record) storage.unpublished[fmt.Sprintf("key%d", i)] = data } client := &mockAuthClient{did: did} if tt.setupClient != nil { tt.opts.ATProtoClient = tt.setupClient() } else { tt.opts.ATProtoClient = &mockATProtoClient{} } result := Publish(ctx, client, tt.opts) if result.SuccessCount != tt.wantSuccess { t.Errorf("Publish() success = %d, want %d", result.SuccessCount, tt.wantSuccess) } if result.ErrorCount != tt.wantErrors { t.Errorf("Publish() errors = %d, want %d", result.ErrorCount, tt.wantErrors) } if result.Cancelled != tt.wantCancelled { t.Errorf("Publish() cancelled = %v, want %v", result.Cancelled, tt.wantCancelled) } }) }) } } func TestIsTransientError(t *testing.T) { tests := []struct { name string err error want bool }{ {"nil", nil, false}, {"generic error", errors.New("some error"), false}, {"API 400", &atclient.APIError{StatusCode: 400}, false}, {"API 429", &atclient.APIError{StatusCode: 429}, true}, {"API 500", &atclient.APIError{StatusCode: 500}, true}, {"API 503", &atclient.APIError{StatusCode: 503}, true}, {"net timeout", timeoutError{}, true}, {"net non-timeout", errors.New("network is down"), false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() if got := IsTransientError(tt.err); got != tt.want { t.Errorf("IsTransientError() = %v, want %v", got, tt.want) } }) } }