at main 15 kB view raw
1package plc 2 3import ( 4 "context" 5 "encoding/csv" 6 "fmt" 7 "io" 8 "os" 9 "path/filepath" 10 "sort" 11 "strconv" 12 "strings" 13 "time" 14 15 "github.com/atscan/atscand/internal/log" 16 "github.com/atscan/atscand/internal/storage" 17 "github.com/klauspost/compress/zstd" 18 plcbundle "tangled.org/atscan.net/plcbundle" 19) 20 21// BundleManager wraps the library's manager with database integration 22type BundleManager struct { 23 libManager *plcbundle.Manager 24 db storage.Database 25 bundleDir string 26 indexDIDs bool 27} 28 29func NewBundleManager(bundleDir string, plcURL string, db storage.Database, indexDIDs bool) (*BundleManager, error) { 30 // Create library config 31 config := plcbundle.DefaultConfig(bundleDir) 32 33 // Create PLC client 34 var client *plcbundle.PLCClient 35 if plcURL != "" { 36 client = plcbundle.NewPLCClient(plcURL) 37 } 38 39 // Create library manager 40 libMgr, err := plcbundle.NewManager(config, client) 41 if err != nil { 42 return nil, fmt.Errorf("failed to create library manager: %w", err) 43 } 44 45 return &BundleManager{ 46 libManager: libMgr, 47 db: db, 48 bundleDir: bundleDir, 49 indexDIDs: indexDIDs, 50 }, nil 51} 52 53func (bm *BundleManager) Close() { 54 if bm.libManager != nil { 55 bm.libManager.Close() 56 } 57} 58 59// LoadBundle loads a bundle (from library) and returns operations 60func (bm *BundleManager) LoadBundleOperations(ctx context.Context, bundleNum int) ([]PLCOperation, error) { 61 bundle, err := bm.libManager.LoadBundle(ctx, bundleNum) 62 if err != nil { 63 return nil, err 64 } 65 return bundle.Operations, nil 66} 67 68// LoadBundle loads a full bundle with metadata 69func (bm *BundleManager) LoadBundle(ctx context.Context, bundleNum int) (*plcbundle.Bundle, error) { 70 return bm.libManager.LoadBundle(ctx, bundleNum) 71} 72 73// FetchAndSaveBundle fetches next bundle from PLC and saves 74func (bm *BundleManager) FetchAndSaveBundle(ctx context.Context) (*plcbundle.Bundle, error) { 75 // Fetch from PLC using library 76 bundle, err := bm.libManager.FetchNextBundle(ctx) 77 if err != nil { 78 return nil, err 79 } 80 81 // Save to disk (library handles this) 82 if err := bm.libManager.SaveBundle(ctx, bundle); err != nil { 83 return nil, fmt.Errorf("failed to save bundle to disk: %w", err) 84 } 85 86 // Index DIDs if enabled (still use database for this) 87 if bm.indexDIDs && len(bundle.Operations) > 0 { 88 if err := bm.indexBundleDIDs(ctx, bundle); err != nil { 89 log.Error("Failed to index DIDs for bundle %d: %v", bundle.BundleNumber, err) 90 } 91 } 92 93 log.Info("✓ Saved bundle %06d", bundle.BundleNumber) 94 95 return bundle, nil 96} 97 98// indexBundleDIDs indexes DIDs from a bundle into the database 99func (bm *BundleManager) indexBundleDIDs(ctx context.Context, bundle *plcbundle.Bundle) error { 100 start := time.Now() 101 log.Verbose("Indexing DIDs for bundle %06d...", bundle.BundleNumber) 102 103 // Extract DID info from operations 104 didInfoMap := ExtractDIDInfoMap(bundle.Operations) 105 106 successCount := 0 107 errorCount := 0 108 invalidHandleCount := 0 109 110 // Upsert each DID 111 for did, info := range didInfoMap { 112 validHandle := ValidateHandle(info.Handle) 113 if info.Handle != "" && validHandle == "" { 114 invalidHandleCount++ 115 } 116 117 if err := bm.db.UpsertDID(ctx, did, bundle.BundleNumber, validHandle, info.PDS); err != nil { 118 log.Error("Failed to index DID %s: %v", did, err) 119 errorCount++ 120 } else { 121 successCount++ 122 } 123 } 124 125 elapsed := time.Since(start) 126 log.Info("✓ Indexed %d DIDs for bundle %06d (%d errors, %d invalid handles) in %v", 127 successCount, bundle.BundleNumber, errorCount, invalidHandleCount, elapsed) 128 129 return nil 130} 131 132// VerifyChain verifies bundle chain integrity 133func (bm *BundleManager) VerifyChain(ctx context.Context, endBundle int) error { 134 result, err := bm.libManager.VerifyChain(ctx) 135 if err != nil { 136 return err 137 } 138 139 if !result.Valid { 140 return fmt.Errorf("chain verification failed at bundle %d: %s", result.BrokenAt, result.Error) 141 } 142 143 return nil 144} 145 146// GetChainInfo returns chain information 147func (bm *BundleManager) GetChainInfo(ctx context.Context) (map[string]interface{}, error) { 148 return bm.libManager.GetInfo(), nil 149} 150 151// GetMempoolStats returns mempool statistics from the library 152func (bm *BundleManager) GetMempoolStats() map[string]interface{} { 153 return bm.libManager.GetMempoolStats() 154} 155 156// GetMempoolOperations returns all operations currently in mempool 157func (bm *BundleManager) GetMempoolOperations() ([]PLCOperation, error) { 158 return bm.libManager.GetMempoolOperations() 159} 160 161// GetIndex returns the library's bundle index 162func (bm *BundleManager) GetIndex() *plcbundle.Index { 163 return bm.libManager.GetIndex() 164} 165 166// GetLastBundleNumber returns the last bundle number 167func (bm *BundleManager) GetLastBundleNumber() int { 168 index := bm.libManager.GetIndex() 169 lastBundle := index.GetLastBundle() 170 if lastBundle == nil { 171 return 0 172 } 173 return lastBundle.BundleNumber 174} 175 176// GetBundleMetadata gets bundle metadata by number 177func (bm *BundleManager) GetBundleMetadata(bundleNum int) (*plcbundle.BundleMetadata, error) { 178 index := bm.libManager.GetIndex() 179 return index.GetBundle(bundleNum) 180} 181 182// GetBundles returns the most recent bundles (newest first) 183func (bm *BundleManager) GetBundles(limit int) []*plcbundle.BundleMetadata { 184 index := bm.libManager.GetIndex() 185 allBundles := index.GetBundles() 186 187 // Determine how many bundles to return 188 count := limit 189 if count <= 0 || count > len(allBundles) { 190 count = len(allBundles) 191 } 192 193 // Build result in reverse order (newest first) 194 result := make([]*plcbundle.BundleMetadata, count) 195 for i := 0; i < count; i++ { 196 result[i] = allBundles[len(allBundles)-1-i] 197 } 198 199 return result 200} 201 202// GetBundleStats returns bundle statistics 203func (bm *BundleManager) GetBundleStats() map[string]interface{} { 204 index := bm.libManager.GetIndex() 205 stats := index.GetStats() 206 207 // Convert to expected format 208 lastBundle := stats["last_bundle"] 209 if lastBundle == nil { 210 lastBundle = int64(0) 211 } 212 213 // Calculate total uncompressed size by iterating through all bundles 214 totalUncompressedSize := int64(0) 215 allBundles := index.GetBundles() 216 for _, bundle := range allBundles { 217 totalUncompressedSize += bundle.UncompressedSize 218 } 219 220 return map[string]interface{}{ 221 "bundle_count": int64(stats["bundle_count"].(int)), 222 "total_size": stats["total_size"].(int64), 223 "total_uncompressed_size": totalUncompressedSize, 224 "last_bundle": int64(lastBundle.(int)), 225 } 226} 227 228// GetDIDsForBundle gets DIDs from a bundle (loads and extracts) 229func (bm *BundleManager) GetDIDsForBundle(ctx context.Context, bundleNum int) ([]string, int, error) { 230 bundle, err := bm.libManager.LoadBundle(ctx, bundleNum) 231 if err != nil { 232 return nil, 0, err 233 } 234 235 // Extract unique DIDs 236 didSet := make(map[string]bool) 237 for _, op := range bundle.Operations { 238 didSet[op.DID] = true 239 } 240 241 dids := make([]string, 0, len(didSet)) 242 for did := range didSet { 243 dids = append(dids, did) 244 } 245 246 return dids, bundle.DIDCount, nil 247} 248 249// FindBundleForTimestamp finds bundle containing a timestamp 250func (bm *BundleManager) FindBundleForTimestamp(afterTime time.Time) int { 251 index := bm.libManager.GetIndex() 252 bundles := index.GetBundles() 253 254 // Find bundle containing this time 255 for _, bundle := range bundles { 256 if (bundle.StartTime.Before(afterTime) || bundle.StartTime.Equal(afterTime)) && 257 (bundle.EndTime.After(afterTime) || bundle.EndTime.Equal(afterTime)) { 258 return bundle.BundleNumber 259 } 260 } 261 262 // Return closest bundle before this time 263 for i := len(bundles) - 1; i >= 0; i-- { 264 if bundles[i].EndTime.Before(afterTime) { 265 return bundles[i].BundleNumber 266 } 267 } 268 269 return 1 // Default to first bundle 270} 271 272// StreamRaw streams raw compressed bundle data 273func (bm *BundleManager) StreamRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 274 return bm.libManager.StreamBundleRaw(ctx, bundleNumber) 275} 276 277// StreamDecompressed streams decompressed bundle data 278func (bm *BundleManager) StreamDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 279 return bm.libManager.StreamBundleDecompressed(ctx, bundleNumber) 280} 281 282// GetPLCHistory calculates historical statistics from the bundle index 283func (bm *BundleManager) GetPLCHistory(ctx context.Context, limit int, fromBundle int) ([]*storage.PLCHistoryPoint, error) { 284 index := bm.libManager.GetIndex() 285 allBundles := index.GetBundles() 286 287 // Filter bundles >= fromBundle 288 var filtered []*plcbundle.BundleMetadata 289 for _, b := range allBundles { 290 if b.BundleNumber >= fromBundle { 291 filtered = append(filtered, b) 292 } 293 } 294 295 if len(filtered) == 0 { 296 return []*storage.PLCHistoryPoint{}, nil 297 } 298 299 // Sort bundles by bundle number to ensure proper cumulative calculation 300 sort.Slice(filtered, func(i, j int) bool { 301 return filtered[i].BundleNumber < filtered[j].BundleNumber 302 }) 303 304 // Group by date 305 type dailyStat struct { 306 lastBundle int 307 bundleCount int 308 totalUncompressed int64 309 totalCompressed int64 310 } 311 312 dailyStats := make(map[string]*dailyStat) 313 314 // Map to store the cumulative values at the end of each date 315 dateCumulatives := make(map[string]struct { 316 uncompressed int64 317 compressed int64 318 }) 319 320 // Calculate cumulative totals as we iterate through sorted bundles 321 cumulativeUncompressed := int64(0) 322 cumulativeCompressed := int64(0) 323 324 for _, bundle := range filtered { 325 dateStr := bundle.StartTime.Format("2006-01-02") 326 327 // Update cumulative totals 328 cumulativeUncompressed += bundle.UncompressedSize 329 cumulativeCompressed += bundle.CompressedSize 330 331 if stat, exists := dailyStats[dateStr]; exists { 332 // Update existing day 333 if bundle.BundleNumber > stat.lastBundle { 334 stat.lastBundle = bundle.BundleNumber 335 } 336 stat.bundleCount++ 337 stat.totalUncompressed += bundle.UncompressedSize 338 stat.totalCompressed += bundle.CompressedSize 339 } else { 340 // Create new day entry 341 dailyStats[dateStr] = &dailyStat{ 342 lastBundle: bundle.BundleNumber, 343 bundleCount: 1, 344 totalUncompressed: bundle.UncompressedSize, 345 totalCompressed: bundle.CompressedSize, 346 } 347 } 348 349 // Store the cumulative values at the end of this date 350 // (will be overwritten if there are multiple bundles on the same day) 351 dateCumulatives[dateStr] = struct { 352 uncompressed int64 353 compressed int64 354 }{ 355 uncompressed: cumulativeUncompressed, 356 compressed: cumulativeCompressed, 357 } 358 } 359 360 // Convert map to sorted slice by date 361 var dates []string 362 for date := range dailyStats { 363 dates = append(dates, date) 364 } 365 sort.Strings(dates) 366 367 // Build history points with cumulative operations 368 var history []*storage.PLCHistoryPoint 369 cumulativeOps := 0 370 371 for _, date := range dates { 372 stat := dailyStats[date] 373 cumulativeOps += stat.bundleCount * 10000 374 cumulative := dateCumulatives[date] 375 376 history = append(history, &storage.PLCHistoryPoint{ 377 Date: date, 378 BundleNumber: stat.lastBundle, 379 OperationCount: cumulativeOps, 380 UncompressedSize: stat.totalUncompressed, 381 CompressedSize: stat.totalCompressed, 382 CumulativeUncompressed: cumulative.uncompressed, 383 CumulativeCompressed: cumulative.compressed, 384 }) 385 } 386 387 // Apply limit if specified 388 if limit > 0 && len(history) > limit { 389 history = history[:limit] 390 } 391 392 return history, nil 393} 394 395// GetBundleLabels reads labels from a compressed CSV file for a specific bundle 396func (bm *BundleManager) GetBundleLabels(ctx context.Context, bundleNum int) ([]*PLCOpLabel, error) { 397 // Define the path to the labels file 398 labelsDir := filepath.Join(bm.bundleDir, "labels") 399 labelsFile := filepath.Join(labelsDir, fmt.Sprintf("%06d.csv.zst", bundleNum)) 400 401 // Check if file exists 402 if _, err := os.Stat(labelsFile); os.IsNotExist(err) { 403 log.Verbose("No labels file found for bundle %d at %s", bundleNum, labelsFile) 404 // Return empty, not an error 405 return []*PLCOpLabel{}, nil 406 } 407 408 // Open the Zstd-compressed file 409 file, err := os.Open(labelsFile) 410 if err != nil { 411 return nil, fmt.Errorf("failed to open labels file: %w", err) 412 } 413 defer file.Close() 414 415 // Create a Zstd reader 416 zstdReader, err := zstd.NewReader(file) 417 if err != nil { 418 return nil, fmt.Errorf("failed to create zstd reader: %w", err) 419 } 420 defer zstdReader.Close() 421 422 // Create a CSV reader 423 csvReader := csv.NewReader(zstdReader) 424 // We skipped the header, so no header read needed 425 // Set FieldsPerRecord to 7 for validation 426 //csvReader.FieldsPerRecord = 7 427 428 var labels []*PLCOpLabel 429 430 // Read all records 431 for { 432 // Check for context cancellation 433 if err := ctx.Err(); err != nil { 434 return nil, err 435 } 436 437 record, err := csvReader.Read() 438 if err == io.EOF { 439 break // End of file 440 } 441 if err != nil { 442 log.Error("Error reading CSV record in %s: %v", labelsFile, err) 443 continue // Skip bad line 444 } 445 446 // Parse the CSV record (which is []string) 447 label, err := parseLabelRecord(record) 448 if err != nil { 449 log.Error("Error parsing CSV data for bundle %d: %v", bundleNum, err) 450 continue // Skip bad data 451 } 452 453 labels = append(labels, label) 454 } 455 456 return labels, nil 457} 458 459// parseLabelRecord converts a new format CSV record into a PLCOpLabel struct 460func parseLabelRecord(record []string) (*PLCOpLabel, error) { 461 // New format: 0:bundle, 1:position, 2:cid(short), 3:size, 4:confidence, 5:labels 462 if len(record) != 6 { 463 err := fmt.Errorf("invalid record length: expected 6, got %d", len(record)) 464 // --- ADDED LOG --- 465 log.Warn("Skipping malformed CSV line: %v (data: %s)", err, strings.Join(record, ",")) 466 // --- 467 return nil, err 468 } 469 470 // 0:bundle 471 bundle, err := strconv.Atoi(record[0]) 472 if err != nil { 473 // --- ADDED LOG --- 474 log.Warn("Skipping malformed CSV line: 'bundle' column: %v (data: %s)", err, strings.Join(record, ",")) 475 // --- 476 return nil, fmt.Errorf("parsing 'bundle': %w", err) 477 } 478 479 // 1:position 480 position, err := strconv.Atoi(record[1]) 481 if err != nil { 482 // --- ADDED LOG --- 483 log.Warn("Skipping malformed CSV line: 'position' column: %v (data: %s)", err, strings.Join(record, ",")) 484 // --- 485 return nil, fmt.Errorf("parsing 'position': %w", err) 486 } 487 488 // 2:cid(short) 489 shortCID := record[2] 490 491 // 3:size 492 size, err := strconv.Atoi(record[3]) 493 if err != nil { 494 // --- ADDED LOG --- 495 log.Warn("Skipping malformed CSV line: 'size' column: %v (data: %s)", err, strings.Join(record, ",")) 496 // --- 497 return nil, fmt.Errorf("parsing 'size': %w", err) 498 } 499 500 // 4:confidence 501 confidence, err := strconv.ParseFloat(record[4], 64) 502 if err != nil { 503 // --- ADDED LOG --- 504 log.Warn("Skipping malformed CSV line: 'confidence' column: %v (data: %s)", err, strings.Join(record, ",")) 505 // --- 506 return nil, fmt.Errorf("parsing 'confidence': %w", err) 507 } 508 509 // 5:labels 510 detectors := strings.Split(record[5], ";") 511 512 label := &PLCOpLabel{ 513 Bundle: bundle, 514 Position: position, 515 CID: shortCID, 516 Size: size, 517 Confidence: confidence, 518 Detectors: detectors, 519 } 520 521 return label, nil 522}