[DEPRECATED] Go implementation of plcbundle
1package bundle_test
2
3import (
4 "path/filepath"
5 "testing"
6 "time"
7
8 "tangled.org/atscan.net/plcbundle-go/bundle"
9 "tangled.org/atscan.net/plcbundle-go/internal/bundleindex"
10 "tangled.org/atscan.net/plcbundle-go/internal/mempool"
11 "tangled.org/atscan.net/plcbundle-go/internal/plcclient"
12 "tangled.org/atscan.net/plcbundle-go/internal/storage"
13 "tangled.org/atscan.net/plcbundle-go/internal/types"
14)
15
16var (
17 bundleInfo = &storage.BundleInfo{
18 BundleNumber: 1,
19 Origin: "test-origin",
20 ParentHash: "",
21 Cursor: "",
22 CreatedBy: "test",
23 Hostname: "test-host",
24 }
25)
26
27// TestIndex tests index operations
28func TestIndex(t *testing.T) {
29
30 t.Run("CreateNewIndex", func(t *testing.T) {
31 idx := bundleindex.NewIndex("test-origin")
32 if idx == nil {
33 t.Fatal("NewIndex returned nil")
34 }
35 if idx.Version != types.INDEX_VERSION {
36 t.Errorf("expected version %s, got %s", types.INDEX_VERSION, idx.Version)
37 }
38 if idx.Count() != 0 {
39 t.Errorf("expected empty index, got count %d", idx.Count())
40 }
41 })
42
43 t.Run("AddBundle", func(t *testing.T) {
44 idx := bundleindex.NewIndex("test-origin")
45 meta := &bundleindex.BundleMetadata{
46 BundleNumber: 1,
47 StartTime: time.Now(),
48 EndTime: time.Now().Add(time.Hour),
49 OperationCount: types.BUNDLE_SIZE,
50 DIDCount: 1000,
51 Hash: "abc123",
52 CompressedHash: "def456",
53 }
54
55 idx.AddBundle(meta)
56
57 if idx.Count() != 1 {
58 t.Errorf("expected count 1, got %d", idx.Count())
59 }
60
61 retrieved, err := idx.GetBundle(1)
62 if err != nil {
63 t.Fatalf("GetBundle failed: %v", err)
64 }
65 if retrieved.Hash != meta.Hash {
66 t.Errorf("expected hash %s, got %s", meta.Hash, retrieved.Hash)
67 }
68 })
69
70 t.Run("SaveAndLoad", func(t *testing.T) {
71 tmpDir := t.TempDir()
72 indexPath := filepath.Join(tmpDir, "test_index.json")
73
74 // Create and save
75 idx := bundleindex.NewIndex("test-origin")
76 idx.AddBundle(&bundleindex.BundleMetadata{
77 BundleNumber: 1,
78 StartTime: time.Now(),
79 EndTime: time.Now().Add(time.Hour),
80 OperationCount: types.BUNDLE_SIZE,
81 Hash: "test123",
82 })
83
84 if err := idx.Save(indexPath); err != nil {
85 t.Fatalf("Save failed: %v", err)
86 }
87
88 // Load
89 loaded, err := bundleindex.LoadIndex(indexPath)
90 if err != nil {
91 t.Fatalf("LoadIndex failed: %v", err)
92 }
93
94 if loaded.Count() != 1 {
95 t.Errorf("expected count 1, got %d", loaded.Count())
96 }
97 })
98
99 t.Run("GetBundleRange", func(t *testing.T) {
100 idx := bundleindex.NewIndex("test-origin")
101 for i := 1; i <= 5; i++ {
102 idx.AddBundle(&bundleindex.BundleMetadata{
103 BundleNumber: i,
104 StartTime: time.Now(),
105 EndTime: time.Now().Add(time.Hour),
106 OperationCount: types.BUNDLE_SIZE,
107 })
108 }
109
110 bundles := idx.GetBundleRange(2, 4)
111 if len(bundles) != 3 {
112 t.Errorf("expected 3 bundles, got %d", len(bundles))
113 }
114 if bundles[0].BundleNumber != 2 || bundles[2].BundleNumber != 4 {
115 t.Errorf("unexpected bundle range")
116 }
117 })
118
119 t.Run("FindGaps", func(t *testing.T) {
120 idx := bundleindex.NewIndex("test-origin")
121 // Add bundles 1, 2, 4, 5 (missing 3)
122 for _, num := range []int{1, 2, 4, 5} {
123 idx.AddBundle(&bundleindex.BundleMetadata{
124 BundleNumber: num,
125 StartTime: time.Now(),
126 EndTime: time.Now().Add(time.Hour),
127 OperationCount: types.BUNDLE_SIZE,
128 })
129 }
130
131 gaps := idx.FindGaps()
132 if len(gaps) != 1 {
133 t.Errorf("expected 1 gap, got %d", len(gaps))
134 }
135 if len(gaps) > 0 && gaps[0] != 3 {
136 t.Errorf("expected gap at 3, got %d", gaps[0])
137 }
138 })
139}
140
141// TestBundle tests bundle operations
142func TestBundle(t *testing.T) {
143 t.Run("ValidateForSave", func(t *testing.T) {
144 tests := []struct {
145 name string
146 bundle *bundle.Bundle
147 wantErr bool
148 }{
149 {
150 name: "valid bundle",
151 bundle: &bundle.Bundle{
152 BundleNumber: 1,
153 StartTime: time.Now(),
154 EndTime: time.Now().Add(time.Hour),
155 Operations: makeTestOperations(types.BUNDLE_SIZE),
156 },
157 wantErr: false,
158 },
159 {
160 name: "invalid bundle number",
161 bundle: &bundle.Bundle{
162 BundleNumber: 0,
163 Operations: makeTestOperations(types.BUNDLE_SIZE),
164 },
165 wantErr: true,
166 },
167 {
168 name: "wrong operation count",
169 bundle: &bundle.Bundle{
170 BundleNumber: 1,
171 Operations: makeTestOperations(100),
172 },
173 wantErr: true,
174 },
175 {
176 name: "start after end",
177 bundle: &bundle.Bundle{
178 BundleNumber: 1,
179 StartTime: time.Now().Add(time.Hour),
180 EndTime: time.Now(),
181 Operations: makeTestOperations(types.BUNDLE_SIZE),
182 },
183 wantErr: true,
184 },
185 }
186
187 for _, tt := range tests {
188 t.Run(tt.name, func(t *testing.T) {
189 err := tt.bundle.ValidateForSave()
190 if (err != nil) != tt.wantErr {
191 t.Errorf("ValidateForSave() error = %v, wantErr %v", err, tt.wantErr)
192 }
193 })
194 }
195 })
196
197 t.Run("CompressionRatio", func(t *testing.T) {
198 b := &bundle.Bundle{
199 CompressedSize: 1000,
200 UncompressedSize: 5000,
201 }
202 ratio := b.CompressionRatio()
203 if ratio != 5.0 {
204 t.Errorf("expected ratio 5.0, got %f", ratio)
205 }
206 })
207}
208
209// TestMempool tests mempool operations
210func TestMempool(t *testing.T) {
211 tmpDir := t.TempDir()
212 logger := &testLogger{t: t}
213
214 t.Run("CreateAndAdd", func(t *testing.T) {
215 minTime := time.Now().Add(-time.Hour)
216 m, err := mempool.NewMempool(tmpDir, 1, minTime, logger, false)
217 if err != nil {
218 t.Fatalf("NewMempool failed: %v", err)
219 }
220
221 ops := makeTestOperations(100)
222 added, err := m.Add(ops)
223 if err != nil {
224 t.Fatalf("Add failed: %v", err)
225 }
226 if added != 100 {
227 t.Errorf("expected 100 added, got %d", added)
228 }
229 if m.Count() != 100 {
230 t.Errorf("expected count 100, got %d", m.Count())
231 }
232 })
233
234 t.Run("ChronologicalValidation", func(t *testing.T) {
235 minTime := time.Now().Add(-time.Hour)
236 m, err := mempool.NewMempool(tmpDir, 2, minTime, logger, false)
237 if err != nil {
238 t.Fatalf("NewMempool failed: %v", err)
239 }
240
241 // Add operations in order
242 ops := makeTestOperations(10)
243 _, err = m.Add(ops)
244 if err != nil {
245 t.Fatalf("Add failed: %v", err)
246 }
247
248 // Try to add operation before last one (should fail)
249 oldOp := []plcclient.PLCOperation{
250 {
251 DID: "did:plc:old",
252 CID: "old123",
253 CreatedAt: time.Now().Add(-2 * time.Hour),
254 },
255 }
256 _, err = m.Add(oldOp)
257 if err == nil {
258 t.Error("expected chronological validation error")
259 }
260 })
261
262 t.Run("TakeOperations", func(t *testing.T) {
263 minTime := time.Now().Add(-time.Hour)
264 m, err := mempool.NewMempool(tmpDir, 3, minTime, logger, false)
265 if err != nil {
266 t.Fatalf("NewMempool failed: %v", err)
267 }
268
269 ops := makeTestOperations(100)
270 m.Add(ops)
271
272 taken, err := m.Take(50)
273 if err != nil {
274 t.Fatalf("Take failed: %v", err)
275 }
276 if len(taken) != 50 {
277 t.Errorf("expected 50 operations, got %d", len(taken))
278 }
279 if m.Count() != 50 {
280 t.Errorf("expected 50 remaining, got %d", m.Count())
281 }
282 })
283
284 t.Run("SaveAndLoad", func(t *testing.T) {
285 minTime := time.Now().Add(-time.Hour)
286 m, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
287 if err != nil {
288 t.Fatalf("NewMempool failed: %v", err)
289 }
290
291 ops := makeTestOperations(50)
292 m.Add(ops)
293
294 if err := m.Save(); err != nil {
295 t.Fatalf("Save failed: %v", err)
296 }
297
298 // Create new mempool and load
299 m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
300 if err != nil {
301 t.Fatalf("NewMempool failed: %v", err)
302 }
303
304 if m2.Count() != 50 {
305 t.Errorf("expected 50 operations after load, got %d", m2.Count())
306 }
307 })
308
309 t.Run("Validate", func(t *testing.T) {
310 minTime := time.Now().Add(-time.Hour)
311 m, err := mempool.NewMempool(tmpDir, 5, minTime, logger, false)
312 if err != nil {
313 t.Fatalf("NewMempool failed: %v", err)
314 }
315
316 ops := makeTestOperations(10)
317 m.Add(ops)
318
319 if err := m.Validate(); err != nil {
320 t.Errorf("Validate failed: %v", err)
321 }
322 })
323}
324
325// TestOperations tests low-level operations
326func TestOperations(t *testing.T) {
327 tmpDir := t.TempDir()
328 logger := &testLogger{t: t}
329
330 ops, err := storage.NewOperations(logger, false)
331 if err != nil {
332 t.Fatalf("NewOperations failed: %v", err)
333 }
334 defer ops.Close()
335
336 t.Run("SerializeJSONL", func(t *testing.T) {
337 operations := makeTestOperations(10)
338 data := ops.SerializeJSONL(operations)
339 if len(data) == 0 {
340 t.Error("SerializeJSONL returned empty data")
341 }
342 })
343
344 t.Run("Hash", func(t *testing.T) {
345 data := []byte("test data")
346 hash := ops.Hash(data)
347 if len(hash) != 64 { // SHA256 hex = 64 chars
348 t.Errorf("expected hash length 64, got %d", len(hash))
349 }
350
351 // Same data should produce same hash
352 hash2 := ops.Hash(data)
353 if hash != hash2 {
354 t.Error("same data produced different hashes")
355 }
356 })
357
358 t.Run("SaveAndLoadBundle", func(t *testing.T) {
359 operations := makeTestOperations(types.BUNDLE_SIZE)
360 path := filepath.Join(tmpDir, "test_bundle.jsonl.zst")
361
362 // Save
363 uncompHash, compHash, uncompSize, compSize, err := ops.SaveBundle(path, operations, bundleInfo)
364 if err != nil {
365 t.Fatalf("SaveBundle failed: %v", err)
366 }
367
368 if uncompHash == "" || compHash == "" {
369 t.Error("empty hashes returned")
370 }
371 if uncompSize == 0 || compSize == 0 {
372 t.Error("zero sizes returned")
373 }
374 if compSize >= uncompSize {
375 t.Error("compressed size should be smaller than uncompressed")
376 }
377
378 // Load
379 loaded, err := ops.LoadBundle(path)
380 if err != nil {
381 t.Fatalf("LoadBundle failed: %v", err)
382 }
383
384 if len(loaded) != len(operations) {
385 t.Errorf("expected %d operations, got %d", len(operations), len(loaded))
386 }
387 })
388
389 t.Run("ExtractUniqueDIDs", func(t *testing.T) {
390 operations := []plcclient.PLCOperation{
391 {DID: "did:plc:1"},
392 {DID: "did:plc:2"},
393 {DID: "did:plc:1"}, // duplicate
394 {DID: "did:plc:3"},
395 }
396
397 dids := ops.ExtractUniqueDIDs(operations)
398 if len(dids) != 3 {
399 t.Errorf("expected 3 unique DIDs, got %d", len(dids))
400 }
401 })
402
403 t.Run("GetBoundaryCIDs", func(t *testing.T) {
404 baseTime := time.Now()
405 operations := []plcclient.PLCOperation{
406 {CID: "cid1", CreatedAt: baseTime},
407 {CID: "cid2", CreatedAt: baseTime.Add(time.Second)},
408 {CID: "cid3", CreatedAt: baseTime.Add(2 * time.Second)},
409 {CID: "cid4", CreatedAt: baseTime.Add(2 * time.Second)}, // same as cid3
410 {CID: "cid5", CreatedAt: baseTime.Add(2 * time.Second)}, // same as cid3
411 }
412
413 boundaryTime, cids := ops.GetBoundaryCIDs(operations)
414 if !boundaryTime.Equal(baseTime.Add(2 * time.Second)) {
415 t.Error("unexpected boundary time")
416 }
417 if len(cids) != 3 { // cid3, cid4, cid5
418 t.Errorf("expected 3 boundary CIDs, got %d", len(cids))
419 }
420 })
421}
422
423// Helper functions
424
425func makeTestOperations(count int) []plcclient.PLCOperation {
426 ops := make([]plcclient.PLCOperation, count)
427 baseTime := time.Now().Add(-time.Hour)
428
429 for i := 0; i < count; i++ {
430 ops[i] = plcclient.PLCOperation{
431 DID: "did:plc:test" + string(rune(i)),
432 CID: "bafytest" + string(rune(i)),
433 CreatedAt: baseTime.Add(time.Duration(i) * time.Second),
434 /*Operation: map[string]interface{}{
435 "type": "create",
436 },*/
437 }
438 }
439
440 return ops
441}
442
443type testLogger struct {
444 t *testing.T
445}
446
447func (l *testLogger) Printf(format string, v ...interface{}) {
448 l.t.Logf(format, v...)
449}
450
451func (l *testLogger) Println(v ...interface{}) {
452 l.t.Log(v...)
453}