wip
1package plc
2
3import (
4 "context"
5 "encoding/csv"
6 "fmt"
7 "io"
8 "os"
9 "path/filepath"
10 "sort"
11 "strconv"
12 "strings"
13 "time"
14
15 "github.com/atscan/atscand/internal/log"
16 "github.com/atscan/atscand/internal/storage"
17 "github.com/klauspost/compress/zstd"
18 plcbundle "tangled.org/atscan.net/plcbundle"
19)
20
21// BundleManager wraps the library's manager with database integration
22type BundleManager struct {
23 libManager *plcbundle.Manager
24 db storage.Database
25 bundleDir string
26 indexDIDs bool
27}
28
29func NewBundleManager(bundleDir string, plcURL string, db storage.Database, indexDIDs bool) (*BundleManager, error) {
30 // Create library config
31 config := plcbundle.DefaultConfig(bundleDir)
32
33 // Create PLC client
34 var client *plcbundle.PLCClient
35 if plcURL != "" {
36 client = plcbundle.NewPLCClient(plcURL)
37 }
38
39 // Create library manager
40 libMgr, err := plcbundle.NewManager(config, client)
41 if err != nil {
42 return nil, fmt.Errorf("failed to create library manager: %w", err)
43 }
44
45 return &BundleManager{
46 libManager: libMgr,
47 db: db,
48 bundleDir: bundleDir,
49 indexDIDs: indexDIDs,
50 }, nil
51}
52
53func (bm *BundleManager) Close() {
54 if bm.libManager != nil {
55 bm.libManager.Close()
56 }
57}
58
59// LoadBundle loads a bundle (from library) and returns operations
60func (bm *BundleManager) LoadBundleOperations(ctx context.Context, bundleNum int) ([]PLCOperation, error) {
61 bundle, err := bm.libManager.LoadBundle(ctx, bundleNum)
62 if err != nil {
63 return nil, err
64 }
65 return bundle.Operations, nil
66}
67
68// LoadBundle loads a full bundle with metadata
69func (bm *BundleManager) LoadBundle(ctx context.Context, bundleNum int) (*plcbundle.Bundle, error) {
70 return bm.libManager.LoadBundle(ctx, bundleNum)
71}
72
73// FetchAndSaveBundle fetches next bundle from PLC and saves
74func (bm *BundleManager) FetchAndSaveBundle(ctx context.Context) (*plcbundle.Bundle, error) {
75 // Fetch from PLC using library
76 bundle, err := bm.libManager.FetchNextBundle(ctx)
77 if err != nil {
78 return nil, err
79 }
80
81 // Save to disk (library handles this)
82 if err := bm.libManager.SaveBundle(ctx, bundle); err != nil {
83 return nil, fmt.Errorf("failed to save bundle to disk: %w", err)
84 }
85
86 // Index DIDs if enabled (still use database for this)
87 if bm.indexDIDs && len(bundle.Operations) > 0 {
88 if err := bm.indexBundleDIDs(ctx, bundle); err != nil {
89 log.Error("Failed to index DIDs for bundle %d: %v", bundle.BundleNumber, err)
90 }
91 }
92
93 log.Info("✓ Saved bundle %06d", bundle.BundleNumber)
94
95 return bundle, nil
96}
97
98// indexBundleDIDs indexes DIDs from a bundle into the database
99func (bm *BundleManager) indexBundleDIDs(ctx context.Context, bundle *plcbundle.Bundle) error {
100 start := time.Now()
101 log.Verbose("Indexing DIDs for bundle %06d...", bundle.BundleNumber)
102
103 // Extract DID info from operations
104 didInfoMap := ExtractDIDInfoMap(bundle.Operations)
105
106 successCount := 0
107 errorCount := 0
108 invalidHandleCount := 0
109
110 // Upsert each DID
111 for did, info := range didInfoMap {
112 validHandle := ValidateHandle(info.Handle)
113 if info.Handle != "" && validHandle == "" {
114 invalidHandleCount++
115 }
116
117 if err := bm.db.UpsertDID(ctx, did, bundle.BundleNumber, validHandle, info.PDS); err != nil {
118 log.Error("Failed to index DID %s: %v", did, err)
119 errorCount++
120 } else {
121 successCount++
122 }
123 }
124
125 elapsed := time.Since(start)
126 log.Info("✓ Indexed %d DIDs for bundle %06d (%d errors, %d invalid handles) in %v",
127 successCount, bundle.BundleNumber, errorCount, invalidHandleCount, elapsed)
128
129 return nil
130}
131
132// VerifyChain verifies bundle chain integrity
133func (bm *BundleManager) VerifyChain(ctx context.Context, endBundle int) error {
134 result, err := bm.libManager.VerifyChain(ctx)
135 if err != nil {
136 return err
137 }
138
139 if !result.Valid {
140 return fmt.Errorf("chain verification failed at bundle %d: %s", result.BrokenAt, result.Error)
141 }
142
143 return nil
144}
145
146// GetChainInfo returns chain information
147func (bm *BundleManager) GetChainInfo(ctx context.Context) (map[string]interface{}, error) {
148 return bm.libManager.GetInfo(), nil
149}
150
151// GetMempoolStats returns mempool statistics from the library
152func (bm *BundleManager) GetMempoolStats() map[string]interface{} {
153 return bm.libManager.GetMempoolStats()
154}
155
156// GetMempoolOperations returns all operations currently in mempool
157func (bm *BundleManager) GetMempoolOperations() ([]PLCOperation, error) {
158 return bm.libManager.GetMempoolOperations()
159}
160
161// GetIndex returns the library's bundle index
162func (bm *BundleManager) GetIndex() *plcbundle.Index {
163 return bm.libManager.GetIndex()
164}
165
166// GetLastBundleNumber returns the last bundle number
167func (bm *BundleManager) GetLastBundleNumber() int {
168 index := bm.libManager.GetIndex()
169 lastBundle := index.GetLastBundle()
170 if lastBundle == nil {
171 return 0
172 }
173 return lastBundle.BundleNumber
174}
175
176// GetBundleMetadata gets bundle metadata by number
177func (bm *BundleManager) GetBundleMetadata(bundleNum int) (*plcbundle.BundleMetadata, error) {
178 index := bm.libManager.GetIndex()
179 return index.GetBundle(bundleNum)
180}
181
182// GetBundles returns the most recent bundles (newest first)
183func (bm *BundleManager) GetBundles(limit int) []*plcbundle.BundleMetadata {
184 index := bm.libManager.GetIndex()
185 allBundles := index.GetBundles()
186
187 // Determine how many bundles to return
188 count := limit
189 if count <= 0 || count > len(allBundles) {
190 count = len(allBundles)
191 }
192
193 // Build result in reverse order (newest first)
194 result := make([]*plcbundle.BundleMetadata, count)
195 for i := 0; i < count; i++ {
196 result[i] = allBundles[len(allBundles)-1-i]
197 }
198
199 return result
200}
201
202// GetBundleStats returns bundle statistics
203func (bm *BundleManager) GetBundleStats() map[string]interface{} {
204 index := bm.libManager.GetIndex()
205 stats := index.GetStats()
206
207 // Convert to expected format
208 lastBundle := stats["last_bundle"]
209 if lastBundle == nil {
210 lastBundle = int64(0)
211 }
212
213 // Calculate total uncompressed size by iterating through all bundles
214 totalUncompressedSize := int64(0)
215 allBundles := index.GetBundles()
216 for _, bundle := range allBundles {
217 totalUncompressedSize += bundle.UncompressedSize
218 }
219
220 return map[string]interface{}{
221 "bundle_count": int64(stats["bundle_count"].(int)),
222 "total_size": stats["total_size"].(int64),
223 "total_uncompressed_size": totalUncompressedSize,
224 "last_bundle": int64(lastBundle.(int)),
225 }
226}
227
228// GetDIDsForBundle gets DIDs from a bundle (loads and extracts)
229func (bm *BundleManager) GetDIDsForBundle(ctx context.Context, bundleNum int) ([]string, int, error) {
230 bundle, err := bm.libManager.LoadBundle(ctx, bundleNum)
231 if err != nil {
232 return nil, 0, err
233 }
234
235 // Extract unique DIDs
236 didSet := make(map[string]bool)
237 for _, op := range bundle.Operations {
238 didSet[op.DID] = true
239 }
240
241 dids := make([]string, 0, len(didSet))
242 for did := range didSet {
243 dids = append(dids, did)
244 }
245
246 return dids, bundle.DIDCount, nil
247}
248
249// FindBundleForTimestamp finds bundle containing a timestamp
250func (bm *BundleManager) FindBundleForTimestamp(afterTime time.Time) int {
251 index := bm.libManager.GetIndex()
252 bundles := index.GetBundles()
253
254 // Find bundle containing this time
255 for _, bundle := range bundles {
256 if (bundle.StartTime.Before(afterTime) || bundle.StartTime.Equal(afterTime)) &&
257 (bundle.EndTime.After(afterTime) || bundle.EndTime.Equal(afterTime)) {
258 return bundle.BundleNumber
259 }
260 }
261
262 // Return closest bundle before this time
263 for i := len(bundles) - 1; i >= 0; i-- {
264 if bundles[i].EndTime.Before(afterTime) {
265 return bundles[i].BundleNumber
266 }
267 }
268
269 return 1 // Default to first bundle
270}
271
272// StreamRaw streams raw compressed bundle data
273func (bm *BundleManager) StreamRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
274 return bm.libManager.StreamBundleRaw(ctx, bundleNumber)
275}
276
277// StreamDecompressed streams decompressed bundle data
278func (bm *BundleManager) StreamDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
279 return bm.libManager.StreamBundleDecompressed(ctx, bundleNumber)
280}
281
282// GetPLCHistory calculates historical statistics from the bundle index
283func (bm *BundleManager) GetPLCHistory(ctx context.Context, limit int, fromBundle int) ([]*storage.PLCHistoryPoint, error) {
284 index := bm.libManager.GetIndex()
285 allBundles := index.GetBundles()
286
287 // Filter bundles >= fromBundle
288 var filtered []*plcbundle.BundleMetadata
289 for _, b := range allBundles {
290 if b.BundleNumber >= fromBundle {
291 filtered = append(filtered, b)
292 }
293 }
294
295 if len(filtered) == 0 {
296 return []*storage.PLCHistoryPoint{}, nil
297 }
298
299 // Sort bundles by bundle number to ensure proper cumulative calculation
300 sort.Slice(filtered, func(i, j int) bool {
301 return filtered[i].BundleNumber < filtered[j].BundleNumber
302 })
303
304 // Group by date
305 type dailyStat struct {
306 lastBundle int
307 bundleCount int
308 totalUncompressed int64
309 totalCompressed int64
310 }
311
312 dailyStats := make(map[string]*dailyStat)
313
314 // Map to store the cumulative values at the end of each date
315 dateCumulatives := make(map[string]struct {
316 uncompressed int64
317 compressed int64
318 })
319
320 // Calculate cumulative totals as we iterate through sorted bundles
321 cumulativeUncompressed := int64(0)
322 cumulativeCompressed := int64(0)
323
324 for _, bundle := range filtered {
325 dateStr := bundle.StartTime.Format("2006-01-02")
326
327 // Update cumulative totals
328 cumulativeUncompressed += bundle.UncompressedSize
329 cumulativeCompressed += bundle.CompressedSize
330
331 if stat, exists := dailyStats[dateStr]; exists {
332 // Update existing day
333 if bundle.BundleNumber > stat.lastBundle {
334 stat.lastBundle = bundle.BundleNumber
335 }
336 stat.bundleCount++
337 stat.totalUncompressed += bundle.UncompressedSize
338 stat.totalCompressed += bundle.CompressedSize
339 } else {
340 // Create new day entry
341 dailyStats[dateStr] = &dailyStat{
342 lastBundle: bundle.BundleNumber,
343 bundleCount: 1,
344 totalUncompressed: bundle.UncompressedSize,
345 totalCompressed: bundle.CompressedSize,
346 }
347 }
348
349 // Store the cumulative values at the end of this date
350 // (will be overwritten if there are multiple bundles on the same day)
351 dateCumulatives[dateStr] = struct {
352 uncompressed int64
353 compressed int64
354 }{
355 uncompressed: cumulativeUncompressed,
356 compressed: cumulativeCompressed,
357 }
358 }
359
360 // Convert map to sorted slice by date
361 var dates []string
362 for date := range dailyStats {
363 dates = append(dates, date)
364 }
365 sort.Strings(dates)
366
367 // Build history points with cumulative operations
368 var history []*storage.PLCHistoryPoint
369 cumulativeOps := 0
370
371 for _, date := range dates {
372 stat := dailyStats[date]
373 cumulativeOps += stat.bundleCount * 10000
374 cumulative := dateCumulatives[date]
375
376 history = append(history, &storage.PLCHistoryPoint{
377 Date: date,
378 BundleNumber: stat.lastBundle,
379 OperationCount: cumulativeOps,
380 UncompressedSize: stat.totalUncompressed,
381 CompressedSize: stat.totalCompressed,
382 CumulativeUncompressed: cumulative.uncompressed,
383 CumulativeCompressed: cumulative.compressed,
384 })
385 }
386
387 // Apply limit if specified
388 if limit > 0 && len(history) > limit {
389 history = history[:limit]
390 }
391
392 return history, nil
393}
394
395// GetBundleLabels reads labels from a compressed CSV file for a specific bundle
396func (bm *BundleManager) GetBundleLabels(ctx context.Context, bundleNum int) ([]*PLCOpLabel, error) {
397 // Define the path to the labels file
398 labelsDir := filepath.Join(bm.bundleDir, "labels")
399 labelsFile := filepath.Join(labelsDir, fmt.Sprintf("%06d.csv.zst", bundleNum))
400
401 // Check if file exists
402 if _, err := os.Stat(labelsFile); os.IsNotExist(err) {
403 log.Verbose("No labels file found for bundle %d at %s", bundleNum, labelsFile)
404 // Return empty, not an error
405 return []*PLCOpLabel{}, nil
406 }
407
408 // Open the Zstd-compressed file
409 file, err := os.Open(labelsFile)
410 if err != nil {
411 return nil, fmt.Errorf("failed to open labels file: %w", err)
412 }
413 defer file.Close()
414
415 // Create a Zstd reader
416 zstdReader, err := zstd.NewReader(file)
417 if err != nil {
418 return nil, fmt.Errorf("failed to create zstd reader: %w", err)
419 }
420 defer zstdReader.Close()
421
422 // Create a CSV reader
423 csvReader := csv.NewReader(zstdReader)
424 // We skipped the header, so no header read needed
425 // Set FieldsPerRecord to 7 for validation
426 //csvReader.FieldsPerRecord = 7
427
428 var labels []*PLCOpLabel
429
430 // Read all records
431 for {
432 // Check for context cancellation
433 if err := ctx.Err(); err != nil {
434 return nil, err
435 }
436
437 record, err := csvReader.Read()
438 if err == io.EOF {
439 break // End of file
440 }
441 if err != nil {
442 log.Error("Error reading CSV record in %s: %v", labelsFile, err)
443 continue // Skip bad line
444 }
445
446 // Parse the CSV record (which is []string)
447 label, err := parseLabelRecord(record)
448 if err != nil {
449 log.Error("Error parsing CSV data for bundle %d: %v", bundleNum, err)
450 continue // Skip bad data
451 }
452
453 labels = append(labels, label)
454 }
455
456 return labels, nil
457}
458
459// parseLabelRecord converts a new format CSV record into a PLCOpLabel struct
460func parseLabelRecord(record []string) (*PLCOpLabel, error) {
461 // New format: 0:bundle, 1:position, 2:cid(short), 3:size, 4:confidence, 5:labels
462 if len(record) != 6 {
463 err := fmt.Errorf("invalid record length: expected 6, got %d", len(record))
464 // --- ADDED LOG ---
465 log.Warn("Skipping malformed CSV line: %v (data: %s)", err, strings.Join(record, ","))
466 // ---
467 return nil, err
468 }
469
470 // 0:bundle
471 bundle, err := strconv.Atoi(record[0])
472 if err != nil {
473 // --- ADDED LOG ---
474 log.Warn("Skipping malformed CSV line: 'bundle' column: %v (data: %s)", err, strings.Join(record, ","))
475 // ---
476 return nil, fmt.Errorf("parsing 'bundle': %w", err)
477 }
478
479 // 1:position
480 position, err := strconv.Atoi(record[1])
481 if err != nil {
482 // --- ADDED LOG ---
483 log.Warn("Skipping malformed CSV line: 'position' column: %v (data: %s)", err, strings.Join(record, ","))
484 // ---
485 return nil, fmt.Errorf("parsing 'position': %w", err)
486 }
487
488 // 2:cid(short)
489 shortCID := record[2]
490
491 // 3:size
492 size, err := strconv.Atoi(record[3])
493 if err != nil {
494 // --- ADDED LOG ---
495 log.Warn("Skipping malformed CSV line: 'size' column: %v (data: %s)", err, strings.Join(record, ","))
496 // ---
497 return nil, fmt.Errorf("parsing 'size': %w", err)
498 }
499
500 // 4:confidence
501 confidence, err := strconv.ParseFloat(record[4], 64)
502 if err != nil {
503 // --- ADDED LOG ---
504 log.Warn("Skipping malformed CSV line: 'confidence' column: %v (data: %s)", err, strings.Join(record, ","))
505 // ---
506 return nil, fmt.Errorf("parsing 'confidence': %w", err)
507 }
508
509 // 5:labels
510 detectors := strings.Split(record[5], ";")
511
512 label := &PLCOpLabel{
513 Bundle: bundle,
514 Position: position,
515 CID: shortCID,
516 Size: size,
517 Confidence: confidence,
518 Detectors: detectors,
519 }
520
521 return label, nil
522}