[DEPRECATED] Go implementation of plcbundle
at main 11 kB view raw
1package bundle_test 2 3import ( 4 "path/filepath" 5 "testing" 6 "time" 7 8 "tangled.org/atscan.net/plcbundle-go/bundle" 9 "tangled.org/atscan.net/plcbundle-go/internal/bundleindex" 10 "tangled.org/atscan.net/plcbundle-go/internal/mempool" 11 "tangled.org/atscan.net/plcbundle-go/internal/plcclient" 12 "tangled.org/atscan.net/plcbundle-go/internal/storage" 13 "tangled.org/atscan.net/plcbundle-go/internal/types" 14) 15 16var ( 17 bundleInfo = &storage.BundleInfo{ 18 BundleNumber: 1, 19 Origin: "test-origin", 20 ParentHash: "", 21 Cursor: "", 22 CreatedBy: "test", 23 Hostname: "test-host", 24 } 25) 26 27// TestIndex tests index operations 28func TestIndex(t *testing.T) { 29 30 t.Run("CreateNewIndex", func(t *testing.T) { 31 idx := bundleindex.NewIndex("test-origin") 32 if idx == nil { 33 t.Fatal("NewIndex returned nil") 34 } 35 if idx.Version != types.INDEX_VERSION { 36 t.Errorf("expected version %s, got %s", types.INDEX_VERSION, idx.Version) 37 } 38 if idx.Count() != 0 { 39 t.Errorf("expected empty index, got count %d", idx.Count()) 40 } 41 }) 42 43 t.Run("AddBundle", func(t *testing.T) { 44 idx := bundleindex.NewIndex("test-origin") 45 meta := &bundleindex.BundleMetadata{ 46 BundleNumber: 1, 47 StartTime: time.Now(), 48 EndTime: time.Now().Add(time.Hour), 49 OperationCount: types.BUNDLE_SIZE, 50 DIDCount: 1000, 51 Hash: "abc123", 52 CompressedHash: "def456", 53 } 54 55 idx.AddBundle(meta) 56 57 if idx.Count() != 1 { 58 t.Errorf("expected count 1, got %d", idx.Count()) 59 } 60 61 retrieved, err := idx.GetBundle(1) 62 if err != nil { 63 t.Fatalf("GetBundle failed: %v", err) 64 } 65 if retrieved.Hash != meta.Hash { 66 t.Errorf("expected hash %s, got %s", meta.Hash, retrieved.Hash) 67 } 68 }) 69 70 t.Run("SaveAndLoad", func(t *testing.T) { 71 tmpDir := t.TempDir() 72 indexPath := filepath.Join(tmpDir, "test_index.json") 73 74 // Create and save 75 idx := bundleindex.NewIndex("test-origin") 76 idx.AddBundle(&bundleindex.BundleMetadata{ 77 BundleNumber: 1, 78 StartTime: time.Now(), 79 EndTime: time.Now().Add(time.Hour), 80 OperationCount: types.BUNDLE_SIZE, 81 Hash: "test123", 82 }) 83 84 if err := idx.Save(indexPath); err != nil { 85 t.Fatalf("Save failed: %v", err) 86 } 87 88 // Load 89 loaded, err := bundleindex.LoadIndex(indexPath) 90 if err != nil { 91 t.Fatalf("LoadIndex failed: %v", err) 92 } 93 94 if loaded.Count() != 1 { 95 t.Errorf("expected count 1, got %d", loaded.Count()) 96 } 97 }) 98 99 t.Run("GetBundleRange", func(t *testing.T) { 100 idx := bundleindex.NewIndex("test-origin") 101 for i := 1; i <= 5; i++ { 102 idx.AddBundle(&bundleindex.BundleMetadata{ 103 BundleNumber: i, 104 StartTime: time.Now(), 105 EndTime: time.Now().Add(time.Hour), 106 OperationCount: types.BUNDLE_SIZE, 107 }) 108 } 109 110 bundles := idx.GetBundleRange(2, 4) 111 if len(bundles) != 3 { 112 t.Errorf("expected 3 bundles, got %d", len(bundles)) 113 } 114 if bundles[0].BundleNumber != 2 || bundles[2].BundleNumber != 4 { 115 t.Errorf("unexpected bundle range") 116 } 117 }) 118 119 t.Run("FindGaps", func(t *testing.T) { 120 idx := bundleindex.NewIndex("test-origin") 121 // Add bundles 1, 2, 4, 5 (missing 3) 122 for _, num := range []int{1, 2, 4, 5} { 123 idx.AddBundle(&bundleindex.BundleMetadata{ 124 BundleNumber: num, 125 StartTime: time.Now(), 126 EndTime: time.Now().Add(time.Hour), 127 OperationCount: types.BUNDLE_SIZE, 128 }) 129 } 130 131 gaps := idx.FindGaps() 132 if len(gaps) != 1 { 133 t.Errorf("expected 1 gap, got %d", len(gaps)) 134 } 135 if len(gaps) > 0 && gaps[0] != 3 { 136 t.Errorf("expected gap at 3, got %d", gaps[0]) 137 } 138 }) 139} 140 141// TestBundle tests bundle operations 142func TestBundle(t *testing.T) { 143 t.Run("ValidateForSave", func(t *testing.T) { 144 tests := []struct { 145 name string 146 bundle *bundle.Bundle 147 wantErr bool 148 }{ 149 { 150 name: "valid bundle", 151 bundle: &bundle.Bundle{ 152 BundleNumber: 1, 153 StartTime: time.Now(), 154 EndTime: time.Now().Add(time.Hour), 155 Operations: makeTestOperations(types.BUNDLE_SIZE), 156 }, 157 wantErr: false, 158 }, 159 { 160 name: "invalid bundle number", 161 bundle: &bundle.Bundle{ 162 BundleNumber: 0, 163 Operations: makeTestOperations(types.BUNDLE_SIZE), 164 }, 165 wantErr: true, 166 }, 167 { 168 name: "wrong operation count", 169 bundle: &bundle.Bundle{ 170 BundleNumber: 1, 171 Operations: makeTestOperations(100), 172 }, 173 wantErr: true, 174 }, 175 { 176 name: "start after end", 177 bundle: &bundle.Bundle{ 178 BundleNumber: 1, 179 StartTime: time.Now().Add(time.Hour), 180 EndTime: time.Now(), 181 Operations: makeTestOperations(types.BUNDLE_SIZE), 182 }, 183 wantErr: true, 184 }, 185 } 186 187 for _, tt := range tests { 188 t.Run(tt.name, func(t *testing.T) { 189 err := tt.bundle.ValidateForSave() 190 if (err != nil) != tt.wantErr { 191 t.Errorf("ValidateForSave() error = %v, wantErr %v", err, tt.wantErr) 192 } 193 }) 194 } 195 }) 196 197 t.Run("CompressionRatio", func(t *testing.T) { 198 b := &bundle.Bundle{ 199 CompressedSize: 1000, 200 UncompressedSize: 5000, 201 } 202 ratio := b.CompressionRatio() 203 if ratio != 5.0 { 204 t.Errorf("expected ratio 5.0, got %f", ratio) 205 } 206 }) 207} 208 209// TestMempool tests mempool operations 210func TestMempool(t *testing.T) { 211 tmpDir := t.TempDir() 212 logger := &testLogger{t: t} 213 214 t.Run("CreateAndAdd", func(t *testing.T) { 215 minTime := time.Now().Add(-time.Hour) 216 m, err := mempool.NewMempool(tmpDir, 1, minTime, logger, false) 217 if err != nil { 218 t.Fatalf("NewMempool failed: %v", err) 219 } 220 221 ops := makeTestOperations(100) 222 added, err := m.Add(ops) 223 if err != nil { 224 t.Fatalf("Add failed: %v", err) 225 } 226 if added != 100 { 227 t.Errorf("expected 100 added, got %d", added) 228 } 229 if m.Count() != 100 { 230 t.Errorf("expected count 100, got %d", m.Count()) 231 } 232 }) 233 234 t.Run("ChronologicalValidation", func(t *testing.T) { 235 minTime := time.Now().Add(-time.Hour) 236 m, err := mempool.NewMempool(tmpDir, 2, minTime, logger, false) 237 if err != nil { 238 t.Fatalf("NewMempool failed: %v", err) 239 } 240 241 // Add operations in order 242 ops := makeTestOperations(10) 243 _, err = m.Add(ops) 244 if err != nil { 245 t.Fatalf("Add failed: %v", err) 246 } 247 248 // Try to add operation before last one (should fail) 249 oldOp := []plcclient.PLCOperation{ 250 { 251 DID: "did:plc:old", 252 CID: "old123", 253 CreatedAt: time.Now().Add(-2 * time.Hour), 254 }, 255 } 256 _, err = m.Add(oldOp) 257 if err == nil { 258 t.Error("expected chronological validation error") 259 } 260 }) 261 262 t.Run("TakeOperations", func(t *testing.T) { 263 minTime := time.Now().Add(-time.Hour) 264 m, err := mempool.NewMempool(tmpDir, 3, minTime, logger, false) 265 if err != nil { 266 t.Fatalf("NewMempool failed: %v", err) 267 } 268 269 ops := makeTestOperations(100) 270 m.Add(ops) 271 272 taken, err := m.Take(50) 273 if err != nil { 274 t.Fatalf("Take failed: %v", err) 275 } 276 if len(taken) != 50 { 277 t.Errorf("expected 50 operations, got %d", len(taken)) 278 } 279 if m.Count() != 50 { 280 t.Errorf("expected 50 remaining, got %d", m.Count()) 281 } 282 }) 283 284 t.Run("SaveAndLoad", func(t *testing.T) { 285 minTime := time.Now().Add(-time.Hour) 286 m, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false) 287 if err != nil { 288 t.Fatalf("NewMempool failed: %v", err) 289 } 290 291 ops := makeTestOperations(50) 292 m.Add(ops) 293 294 if err := m.Save(); err != nil { 295 t.Fatalf("Save failed: %v", err) 296 } 297 298 // Create new mempool and load 299 m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false) 300 if err != nil { 301 t.Fatalf("NewMempool failed: %v", err) 302 } 303 304 if m2.Count() != 50 { 305 t.Errorf("expected 50 operations after load, got %d", m2.Count()) 306 } 307 }) 308 309 t.Run("Validate", func(t *testing.T) { 310 minTime := time.Now().Add(-time.Hour) 311 m, err := mempool.NewMempool(tmpDir, 5, minTime, logger, false) 312 if err != nil { 313 t.Fatalf("NewMempool failed: %v", err) 314 } 315 316 ops := makeTestOperations(10) 317 m.Add(ops) 318 319 if err := m.Validate(); err != nil { 320 t.Errorf("Validate failed: %v", err) 321 } 322 }) 323} 324 325// TestOperations tests low-level operations 326func TestOperations(t *testing.T) { 327 tmpDir := t.TempDir() 328 logger := &testLogger{t: t} 329 330 ops, err := storage.NewOperations(logger, false) 331 if err != nil { 332 t.Fatalf("NewOperations failed: %v", err) 333 } 334 defer ops.Close() 335 336 t.Run("SerializeJSONL", func(t *testing.T) { 337 operations := makeTestOperations(10) 338 data := ops.SerializeJSONL(operations) 339 if len(data) == 0 { 340 t.Error("SerializeJSONL returned empty data") 341 } 342 }) 343 344 t.Run("Hash", func(t *testing.T) { 345 data := []byte("test data") 346 hash := ops.Hash(data) 347 if len(hash) != 64 { // SHA256 hex = 64 chars 348 t.Errorf("expected hash length 64, got %d", len(hash)) 349 } 350 351 // Same data should produce same hash 352 hash2 := ops.Hash(data) 353 if hash != hash2 { 354 t.Error("same data produced different hashes") 355 } 356 }) 357 358 t.Run("SaveAndLoadBundle", func(t *testing.T) { 359 operations := makeTestOperations(types.BUNDLE_SIZE) 360 path := filepath.Join(tmpDir, "test_bundle.jsonl.zst") 361 362 // Save 363 uncompHash, compHash, uncompSize, compSize, err := ops.SaveBundle(path, operations, bundleInfo) 364 if err != nil { 365 t.Fatalf("SaveBundle failed: %v", err) 366 } 367 368 if uncompHash == "" || compHash == "" { 369 t.Error("empty hashes returned") 370 } 371 if uncompSize == 0 || compSize == 0 { 372 t.Error("zero sizes returned") 373 } 374 if compSize >= uncompSize { 375 t.Error("compressed size should be smaller than uncompressed") 376 } 377 378 // Load 379 loaded, err := ops.LoadBundle(path) 380 if err != nil { 381 t.Fatalf("LoadBundle failed: %v", err) 382 } 383 384 if len(loaded) != len(operations) { 385 t.Errorf("expected %d operations, got %d", len(operations), len(loaded)) 386 } 387 }) 388 389 t.Run("ExtractUniqueDIDs", func(t *testing.T) { 390 operations := []plcclient.PLCOperation{ 391 {DID: "did:plc:1"}, 392 {DID: "did:plc:2"}, 393 {DID: "did:plc:1"}, // duplicate 394 {DID: "did:plc:3"}, 395 } 396 397 dids := ops.ExtractUniqueDIDs(operations) 398 if len(dids) != 3 { 399 t.Errorf("expected 3 unique DIDs, got %d", len(dids)) 400 } 401 }) 402 403 t.Run("GetBoundaryCIDs", func(t *testing.T) { 404 baseTime := time.Now() 405 operations := []plcclient.PLCOperation{ 406 {CID: "cid1", CreatedAt: baseTime}, 407 {CID: "cid2", CreatedAt: baseTime.Add(time.Second)}, 408 {CID: "cid3", CreatedAt: baseTime.Add(2 * time.Second)}, 409 {CID: "cid4", CreatedAt: baseTime.Add(2 * time.Second)}, // same as cid3 410 {CID: "cid5", CreatedAt: baseTime.Add(2 * time.Second)}, // same as cid3 411 } 412 413 boundaryTime, cids := ops.GetBoundaryCIDs(operations) 414 if !boundaryTime.Equal(baseTime.Add(2 * time.Second)) { 415 t.Error("unexpected boundary time") 416 } 417 if len(cids) != 3 { // cid3, cid4, cid5 418 t.Errorf("expected 3 boundary CIDs, got %d", len(cids)) 419 } 420 }) 421} 422 423// Helper functions 424 425func makeTestOperations(count int) []plcclient.PLCOperation { 426 ops := make([]plcclient.PLCOperation, count) 427 baseTime := time.Now().Add(-time.Hour) 428 429 for i := 0; i < count; i++ { 430 ops[i] = plcclient.PLCOperation{ 431 DID: "did:plc:test" + string(rune(i)), 432 CID: "bafytest" + string(rune(i)), 433 CreatedAt: baseTime.Add(time.Duration(i) * time.Second), 434 /*Operation: map[string]interface{}{ 435 "type": "create", 436 },*/ 437 } 438 } 439 440 return ops 441} 442 443type testLogger struct { 444 t *testing.T 445} 446 447func (l *testLogger) Printf(format string, v ...interface{}) { 448 l.t.Logf(format, v...) 449} 450 451func (l *testLogger) Println(v ...interface{}) { 452 l.t.Log(v...) 453}