[DEPRECATED] Go implementation of plcbundle

Compare changes

Choose any two refs to compare.

Changed files
+359 -192
bundle
cmd
plcbundle
commands
+25 -48
bundle/manager.go
··· 15 "sync/atomic" 16 "time" 17 18 "tangled.org/atscan.net/plcbundle/internal/bundleindex" 19 "tangled.org/atscan.net/plcbundle/internal/didindex" 20 "tangled.org/atscan.net/plcbundle/internal/handleresolver" ··· 75 recentIdx int 76 recentSize int 77 } 78 } 79 80 // NewManager creates a new bundle manager ··· 347 return m, nil 348 } 349 350 // Close cleans up resources 351 func (m *Manager) Close() { 352 if m.operations != nil { 353 m.operations.Close() 354 } ··· 709 return stats 710 } 711 712 - // ExportOperations exports operations from bundles 713 - func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plcclient.PLCOperation, error) { 714 - if count <= 0 { 715 - count = 1000 716 - } 717 - 718 - var result []plcclient.PLCOperation 719 - seenCIDs := make(map[string]bool) 720 - 721 - bundles := m.index.GetBundles() 722 - 723 - for _, meta := range bundles { 724 - if result != nil && len(result) >= count { 725 - break 726 - } 727 - 728 - // Skip bundles before afterTime 729 - if !afterTime.IsZero() && meta.EndTime.Before(afterTime) { 730 - continue 731 - } 732 - 733 - // Load bundle 734 - bundle, err := m.LoadBundle(ctx, meta.BundleNumber) 735 - if err != nil { 736 - m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err) 737 - continue 738 - } 739 - 740 - // Add operations 741 - for _, op := range bundle.Operations { 742 - if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) { 743 - continue 744 - } 745 - 746 - if seenCIDs[op.CID] { 747 - continue 748 - } 749 - 750 - seenCIDs[op.CID] = true 751 - result = append(result, op) 752 - 753 - if len(result) >= count { 754 - break 755 - } 756 - } 757 - } 758 - 759 - return result, nil 760 } 761 762 // IsBundleIndexed checks if a bundle is already in the index
··· 15 "sync/atomic" 16 "time" 17 18 + plcbundle "tangled.org/atscan.net/plcbundle-rs/bindings/go" 19 "tangled.org/atscan.net/plcbundle/internal/bundleindex" 20 "tangled.org/atscan.net/plcbundle/internal/didindex" 21 "tangled.org/atscan.net/plcbundle/internal/handleresolver" ··· 76 recentIdx int 77 recentSize int 78 } 79 + 80 + // Rust-based bundle manager for high-performance operations 81 + rsManager *plcbundle.BundleManager 82 + rsManagerOnce sync.Once 83 + rsManagerErr error 84 } 85 86 // NewManager creates a new bundle manager ··· 353 return m, nil 354 } 355 356 + // getRSManager lazily initializes the Rust bundle manager 357 + func (m *Manager) getRSManager() (*plcbundle.BundleManager, error) { 358 + m.rsManagerOnce.Do(func() { 359 + rsMgr, err := plcbundle.NewBundleManager(m.config.BundleDir) 360 + if err != nil { 361 + m.rsManagerErr = fmt.Errorf("failed to create Rust bundle manager: %w", err) 362 + return 363 + } 364 + m.rsManager = rsMgr 365 + }) 366 + return m.rsManager, m.rsManagerErr 367 + } 368 + 369 // Close cleans up resources 370 func (m *Manager) Close() { 371 + if m.rsManager != nil { 372 + m.rsManager.Close() 373 + } 374 if m.operations != nil { 375 m.operations.Close() 376 } ··· 731 return stats 732 } 733 734 + // GetRSManager returns the Rust bundle manager (proxy method) 735 + func (m *Manager) GetRSManager() (*plcbundle.BundleManager, error) { 736 + return m.getRSManager() 737 } 738 739 // IsBundleIndexed checks if a bundle is already in the index
+7
cmd/plcbundle/commands/common.go
··· 291 } 292 return b 293 }
··· 291 } 292 return b 293 } 294 + 295 + // getVerboseQuiet extracts verbose and quiet flags from command 296 + func getVerboseQuiet(cmd *cobra.Command) (verbose, quiet bool) { 297 + verbose, _ = cmd.Root().PersistentFlags().GetBool("verbose") 298 + quiet, _ = cmd.Root().PersistentFlags().GetBool("quiet") 299 + return verbose, quiet 300 + }
+323 -143
cmd/plcbundle/commands/export.go
··· 3 import ( 4 "context" 5 "fmt" 6 "os" 7 "time" 8 9 "github.com/goccy/go-json" 10 "github.com/spf13/cobra" 11 internalsync "tangled.org/atscan.net/plcbundle/internal/sync" 12 ) 13 ··· 70 plcbundle backfill --all`, 71 72 RunE: func(cmd *cobra.Command, args []string) error { 73 - verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose") 74 - quiet, _ := cmd.Root().PersistentFlags().GetBool("quiet") 75 - 76 mgr, dir, err := getManager(&ManagerOptions{Cmd: cmd, PLCURL: plcURL}) 77 if err != nil { 78 return err 79 } 80 defer mgr.Close() 81 82 - if !quiet { 83 - fmt.Fprintf(os.Stderr, "Exporting from: %s\n", dir) 84 - } 85 - 86 - // Parse after timestamp if provided 87 - var afterTime time.Time 88 - if after != "" { 89 - afterTime, err = time.Parse(time.RFC3339, after) 90 - if err != nil { 91 - return fmt.Errorf("invalid --after timestamp (use RFC3339 format): %w", err) 92 - } 93 - } 94 - 95 - // Determine bundle range 96 - var start, end int 97 - 98 - if all { 99 - index := mgr.GetIndex() 100 - bundles := index.GetBundles() 101 - 102 - if len(bundles) == 0 { 103 - if sync { 104 - // No bundles but sync enabled - start from 1 105 - start = 1 106 - end = 0 // Will be updated after sync 107 - } else { 108 - if !quiet { 109 - fmt.Fprintf(os.Stderr, "No bundles available (use --sync to fetch)\n") 110 - } 111 - return nil 112 - } 113 - } else { 114 - start = bundles[0].BundleNumber 115 - end = bundles[len(bundles)-1].BundleNumber 116 - } 117 - 118 - } else if rangeStr != "" { 119 - var err error 120 - start, end, err = parseBundleRange(rangeStr) 121 - if err != nil { 122 - return err 123 - } 124 - 125 - } else { 126 - return fmt.Errorf("either --all or --range required") 127 - } 128 - 129 - if !quiet { 130 - if sync { 131 - fmt.Fprintf(os.Stderr, "Mode: export existing + sync new bundles\n") 132 - } else { 133 - fmt.Fprintf(os.Stderr, "Mode: export existing only\n") 134 - } 135 - 136 - if count > 0 { 137 - fmt.Fprintf(os.Stderr, "Limit: %d operations\n", count) 138 - } 139 - if after != "" { 140 - fmt.Fprintf(os.Stderr, "After: %s\n", after) 141 - } 142 - fmt.Fprintf(os.Stderr, "\n") 143 - } 144 - 145 - return exportBundles(cmd.Context(), mgr, exportOptions{ 146 - start: start, 147 - end: end, 148 - sync: sync, 149 - count: count, 150 - afterTime: afterTime, 151 - verbose: verbose, 152 - quiet: quiet, 153 }) 154 }, 155 } ··· 164 return cmd 165 } 166 167 type exportOptions struct { 168 - start int 169 - end int 170 - sync bool 171 - count int 172 - afterTime time.Time 173 - verbose bool 174 - quiet bool 175 } 176 177 - func exportBundles(ctx context.Context, mgr BundleManager, opts exportOptions) error { 178 - operationCount := 0 179 - exported := 0 180 181 // Phase 1: Export existing bundles 182 - existingCount := 0 183 - if opts.end > 0 { 184 - existingCount, exported = exportExistingBundles( 185 - ctx, mgr, opts.start, opts.end, 186 - &operationCount, opts.count, opts.afterTime, 187 - opts.verbose, opts.quiet, 188 - ) 189 } 190 191 // Check if we hit the count limit 192 - if opts.count > 0 && exported >= opts.count { 193 if !opts.quiet { 194 fmt.Fprintf(os.Stderr, "\nโœ“ Export complete (limit reached)\n") 195 - fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount) 196 fmt.Fprintf(os.Stderr, " Operations: %d\n", exported) 197 } 198 return nil ··· 200 201 // Phase 2: Sync and export new bundles (if enabled and not at limit) 202 fetchedCount := 0 203 - if opts.sync && (opts.count == 0 || exported < opts.count) { 204 if !opts.quiet { 205 fmt.Fprintf(os.Stderr, "\nSyncing new bundles from PLC...\n") 206 } ··· 215 if bundle, err := mgr.LoadBundle(ctx, bundleNum); err == nil { 216 for _, op := range bundle.Operations { 217 // Apply filters 218 - if !opts.afterTime.IsZero() && op.CreatedAt.Before(opts.afterTime) { 219 - continue 220 } 221 222 - if opts.count > 0 && exported >= opts.count { 223 return // Stop when limit reached 224 } 225 ··· 231 fmt.Println(string(data)) 232 } 233 exported++ 234 - operationCount++ 235 } 236 } 237 }, ··· 253 existingCount+fetchedCount, existingCount, fetchedCount) 254 } else if opts.sync { 255 fmt.Fprintf(os.Stderr, " Bundles: %d (already up to date)\n", existingCount) 256 - } else { 257 fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount) 258 } 259 ··· 267 return nil 268 } 269 270 - func exportExistingBundles( 271 - ctx context.Context, 272 - mgr BundleManager, 273 - start, end int, 274 - operationCount *int, 275 - limit int, 276 - afterTime time.Time, 277 - verbose bool, 278 - quiet bool, 279 - ) (bundleCount int, exported int) { 280 - 281 - processedCount := 0 282 - exportedOps := 0 283 - 284 - for bundleNum := start; bundleNum <= end; bundleNum++ { 285 select { 286 case <-ctx.Done(): 287 - return processedCount, exportedOps 288 default: 289 } 290 291 - bundle, err := mgr.LoadBundle(ctx, bundleNum) 292 if err != nil { 293 - if verbose { 294 - fmt.Fprintf(os.Stderr, "Bundle %06d: not found (skipped)\n", bundleNum) 295 - } 296 continue 297 } 298 299 - // Export operations with filters 300 for _, op := range bundle.Operations { 301 - // Filter by timestamp 302 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) { 303 continue 304 } 305 - 306 - // Check count limit 307 - if limit > 0 && exportedOps >= limit { 308 - if verbose { 309 - fmt.Fprintf(os.Stderr, "Bundle %06d: limit reached, stopping\n", bundleNum) 310 } 311 - return processedCount, exportedOps 312 } 313 314 - // Output operation to stdout (JSONL) 315 if len(op.RawJSON) > 0 { 316 - fmt.Println(string(op.RawJSON)) 317 } else { 318 - data, _ := json.Marshal(op) 319 - fmt.Println(string(data)) 320 } 321 - exportedOps++ 322 - } 323 324 - *operationCount += len(bundle.Operations) 325 - processedCount++ 326 327 - if verbose { 328 - fmt.Fprintf(os.Stderr, "Bundle %06d: โœ“ (%d ops, %d exported)\n", 329 - bundleNum, len(bundle.Operations), exportedOps) 330 - } else if !quiet && processedCount%100 == 0 { 331 - fmt.Fprintf(os.Stderr, "Exported: %d bundles, %d ops\r", processedCount, exportedOps) 332 } 333 } 334 335 - if !quiet && !verbose && processedCount > 0 { 336 - fmt.Fprintf(os.Stderr, "Existing: %d bundles, %d ops\n", processedCount, exportedOps) 337 } 338 339 - return processedCount, exportedOps 340 } 341 342 type exportLogger struct {
··· 3 import ( 4 "context" 5 "fmt" 6 + "io" 7 "os" 8 "time" 9 10 "github.com/goccy/go-json" 11 "github.com/spf13/cobra" 12 + plcbundle "tangled.org/atscan.net/plcbundle-rs/bindings/go" 13 + "tangled.org/atscan.net/plcbundle/bundle" 14 internalsync "tangled.org/atscan.net/plcbundle/internal/sync" 15 ) 16 ··· 73 plcbundle backfill --all`, 74 75 RunE: func(cmd *cobra.Command, args []string) error { 76 mgr, dir, err := getManager(&ManagerOptions{Cmd: cmd, PLCURL: plcURL}) 77 if err != nil { 78 return err 79 } 80 defer mgr.Close() 81 82 + verbose, quiet := getVerboseQuiet(cmd) 83 + return runExport(cmd.Context(), mgr, dir, exportFlags{ 84 + all: all, 85 + rangeStr: rangeStr, 86 + sync: sync, 87 + count: count, 88 + after: after, 89 + verbose: verbose, 90 + quiet: quiet, 91 }) 92 }, 93 } ··· 102 return cmd 103 } 104 105 + type exportFlags struct { 106 + all bool 107 + rangeStr string 108 + sync bool 109 + count int 110 + after string 111 + verbose bool 112 + quiet bool 113 + } 114 + 115 type exportOptions struct { 116 + start int 117 + end int 118 + exportAll bool 119 + sync bool 120 + count int 121 + afterTimestamp string 122 + verbose bool 123 + quiet bool 124 } 125 126 + // runExport performs the export operation 127 + func runExport(ctx context.Context, mgr BundleManager, dir string, flags exportFlags) error { 128 + // Parse after timestamp if provided 129 + var afterTimestamp string 130 + if flags.after != "" { 131 + afterTime, err := time.Parse(time.RFC3339, flags.after) 132 + if err != nil { 133 + return fmt.Errorf("invalid --after timestamp (use RFC3339 format): %w", err) 134 + } 135 + afterTimestamp = afterTime.Format(time.RFC3339) 136 + } 137 + 138 + // Determine bundle range 139 + var start, end uint32 140 + var exportAll bool 141 + 142 + if flags.all { 143 + index := mgr.GetIndex() 144 + bundles := index.GetBundles() 145 + 146 + if len(bundles) == 0 { 147 + if flags.sync { 148 + // No bundles but sync enabled - start from 1 149 + start = 1 150 + end = 0 // Will be updated after sync 151 + exportAll = false 152 + } else { 153 + if !flags.quiet { 154 + fmt.Fprintf(os.Stderr, "No bundles available (use --sync to fetch)\n") 155 + } 156 + return nil 157 + } 158 + } else { 159 + if flags.sync { 160 + // Export existing bundles, then sync 161 + start = uint32(bundles[0].BundleNumber) 162 + end = uint32(bundles[len(bundles)-1].BundleNumber) 163 + exportAll = false 164 + } else { 165 + // For export only, use export_all flag 166 + exportAll = true 167 + start = 0 168 + end = 0 169 + } 170 + } 171 + 172 + } else if flags.rangeStr != "" { 173 + var err error 174 + startInt, endInt, err := parseBundleRange(flags.rangeStr) 175 + if err != nil { 176 + return err 177 + } 178 + start = uint32(startInt) 179 + end = uint32(endInt) 180 + exportAll = false 181 + 182 + } else { 183 + return fmt.Errorf("either --all or --range required") 184 + } 185 186 + opts := exportOptions{ 187 + start: int(start), 188 + end: int(end), 189 + exportAll: exportAll, 190 + sync: flags.sync, 191 + count: flags.count, 192 + afterTimestamp: afterTimestamp, 193 + verbose: flags.verbose, 194 + quiet: flags.quiet, 195 + } 196 + 197 + if !flags.quiet { 198 + fmt.Fprintf(os.Stderr, "Exporting from: %s\n", dir) 199 + if flags.sync { 200 + fmt.Fprintf(os.Stderr, "Mode: export existing + sync new bundles\n") 201 + } else { 202 + fmt.Fprintf(os.Stderr, "Mode: export existing only (using Rust library)\n") 203 + } 204 + 205 + if flags.count > 0 { 206 + fmt.Fprintf(os.Stderr, "Limit: %d operations\n", flags.count) 207 + } 208 + if flags.after != "" { 209 + fmt.Fprintf(os.Stderr, "After: %s\n", flags.after) 210 + } 211 + fmt.Fprintf(os.Stderr, "\n") 212 + } 213 + 214 + return exportBundles(ctx, mgr, opts) 215 + } 216 + 217 + // ExportSpec defines export parameters 218 + type ExportSpec struct { 219 + BundleStart uint32 220 + BundleEnd uint32 221 + ExportAll bool 222 + CountLimit uint64 223 + AfterTimestamp string 224 + DIDFilter string 225 + OpTypeFilter string 226 + } 227 + 228 + // ExportStats contains export statistics 229 + type ExportStats struct { 230 + RecordsWritten uint64 231 + BytesWritten uint64 232 + BundlesProcessed int 233 + } 234 + 235 + // exportBundles exports bundles using the Rust library via Manager proxy 236 + func exportBundles(ctx context.Context, mgr BundleManager, opts exportOptions) error { 237 // Phase 1: Export existing bundles 238 + var exported uint64 239 + var existingCount int 240 + 241 + if opts.exportAll || opts.end > 0 { 242 + // Build export spec 243 + spec := ExportSpec{ 244 + BundleStart: uint32(opts.start), 245 + BundleEnd: uint32(opts.end), 246 + ExportAll: opts.exportAll, 247 + CountLimit: uint64(opts.count), 248 + AfterTimestamp: opts.afterTimestamp, 249 + DIDFilter: "", // Not supported in current flags 250 + OpTypeFilter: "", // Not supported in current flags 251 + } 252 + 253 + // Progress callback 254 + var progressCallback func(records, bytes uint64) 255 + if !opts.quiet { 256 + progressCallback = func(records, bytes uint64) { 257 + if opts.verbose { 258 + fmt.Fprintf(os.Stderr, "Progress: %d records, %s\r", records, formatBytes(int64(bytes))) 259 + } else if records%10000 == 0 { 260 + fmt.Fprintf(os.Stderr, "Exported: %d records\r", records) 261 + } 262 + } 263 + } 264 + 265 + // Get Rust manager from bundle manager 266 + concreteMgr := mgr.(*bundle.Manager) 267 + rsMgr, err := concreteMgr.GetRSManager() 268 + if err != nil { 269 + // Fallback to Go implementation if Rust manager unavailable 270 + return exportToWriterGo(ctx, concreteMgr, spec, os.Stdout, progressCallback, &exported, &existingCount) 271 + } 272 + 273 + // Convert to Rust export spec 274 + rsSpec := plcbundle.ExportSpec{ 275 + BundleStart: spec.BundleStart, 276 + BundleEnd: spec.BundleEnd, 277 + ExportAll: spec.ExportAll, 278 + Format: 0, // 0 = jsonl 279 + CountLimit: spec.CountLimit, 280 + AfterTimestamp: spec.AfterTimestamp, 281 + DIDFilter: spec.DIDFilter, 282 + OpTypeFilter: spec.OpTypeFilter, 283 + } 284 + 285 + // Export options 286 + rsOpts := &plcbundle.ExportOptions{ 287 + Writer: os.Stdout, 288 + Progress: progressCallback, 289 + } 290 + 291 + // Perform export using Rust library 292 + rsStats, err := rsMgr.Export(rsSpec, rsOpts) 293 + if err != nil { 294 + return fmt.Errorf("export failed: %w", err) 295 + } 296 + 297 + exported = rsStats.RecordsWritten 298 + 299 + // Calculate bundle count 300 + if !spec.ExportAll && spec.BundleEnd >= spec.BundleStart { 301 + existingCount = int(spec.BundleEnd - spec.BundleStart + 1) 302 + } else if spec.ExportAll { 303 + index := mgr.GetIndex() 304 + bundles := index.GetBundles() 305 + existingCount = len(bundles) 306 + } 307 } 308 309 // Check if we hit the count limit 310 + if opts.count > 0 && exported >= uint64(opts.count) { 311 if !opts.quiet { 312 fmt.Fprintf(os.Stderr, "\nโœ“ Export complete (limit reached)\n") 313 + if existingCount > 0 { 314 + fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount) 315 + } 316 fmt.Fprintf(os.Stderr, " Operations: %d\n", exported) 317 } 318 return nil ··· 320 321 // Phase 2: Sync and export new bundles (if enabled and not at limit) 322 fetchedCount := 0 323 + if opts.sync && (opts.count == 0 || exported < uint64(opts.count)) { 324 if !opts.quiet { 325 fmt.Fprintf(os.Stderr, "\nSyncing new bundles from PLC...\n") 326 } ··· 335 if bundle, err := mgr.LoadBundle(ctx, bundleNum); err == nil { 336 for _, op := range bundle.Operations { 337 // Apply filters 338 + if opts.afterTimestamp != "" { 339 + opTime := op.CreatedAt.Format(time.RFC3339) 340 + if opTime < opts.afterTimestamp { 341 + continue 342 + } 343 } 344 345 + if opts.count > 0 && exported >= uint64(opts.count) { 346 return // Stop when limit reached 347 } 348 ··· 354 fmt.Println(string(data)) 355 } 356 exported++ 357 } 358 } 359 }, ··· 375 existingCount+fetchedCount, existingCount, fetchedCount) 376 } else if opts.sync { 377 fmt.Fprintf(os.Stderr, " Bundles: %d (already up to date)\n", existingCount) 378 + } else if existingCount > 0 { 379 fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount) 380 } 381 ··· 389 return nil 390 } 391 392 + // exportToWriterGo is the fallback Go implementation 393 + func exportToWriterGo(ctx context.Context, mgr *bundle.Manager, spec ExportSpec, writer io.Writer, progress func(records, bytes uint64), exported *uint64, existingCount *int) error { 394 + var recordsWritten uint64 395 + var bytesWritten uint64 396 + var bundlesProcessed int 397 + 398 + bundles := mgr.GetIndex().GetBundles() 399 + if len(bundles) == 0 { 400 + *exported = 0 401 + *existingCount = 0 402 + return nil 403 + } 404 + 405 + // Determine bundle range 406 + startBundle := 0 407 + endBundle := len(bundles) - 1 408 + 409 + if spec.ExportAll { 410 + startBundle = 0 411 + endBundle = len(bundles) - 1 412 + } else { 413 + // Find start and end indices 414 + startBundle = -1 415 + endBundle = -1 416 + for i, b := range bundles { 417 + if startBundle == -1 && b.BundleNumber >= int(spec.BundleStart) { 418 + startBundle = i 419 + } 420 + if b.BundleNumber <= int(spec.BundleEnd) { 421 + endBundle = i 422 + } 423 + } 424 + if startBundle == -1 || endBundle == -1 || startBundle > endBundle { 425 + *exported = 0 426 + *existingCount = 0 427 + return nil 428 + } 429 + } 430 + 431 + // Parse after timestamp 432 + var afterTime time.Time 433 + if spec.AfterTimestamp != "" { 434 + var err error 435 + afterTime, err = time.Parse(time.RFC3339, spec.AfterTimestamp) 436 + if err != nil { 437 + return fmt.Errorf("invalid after timestamp: %w", err) 438 + } 439 + } 440 + 441 + // Export bundles 442 + for i := startBundle; i <= endBundle; i++ { 443 select { 444 case <-ctx.Done(): 445 + *exported = recordsWritten 446 + *existingCount = bundlesProcessed 447 + return ctx.Err() 448 default: 449 } 450 451 + meta := bundles[i] 452 + bundle, err := mgr.LoadBundle(ctx, meta.BundleNumber) 453 if err != nil { 454 continue 455 } 456 457 + // Export operations 458 for _, op := range bundle.Operations { 459 + // Apply filters 460 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) { 461 continue 462 } 463 + if spec.DIDFilter != "" && op.DID != spec.DIDFilter { 464 + continue 465 + } 466 + if spec.OpTypeFilter != "" { 467 + // Operation type is inside the parsed operation data 468 + opData, err := op.GetOperationData() 469 + if err == nil && opData != nil { 470 + if opType, ok := opData["type"].(string); ok && opType != spec.OpTypeFilter { 471 + continue 472 + } else if !ok { 473 + continue // Skip if no type field 474 + } 475 + } else { 476 + continue // Skip if can't parse 477 } 478 + } 479 + if spec.CountLimit > 0 && recordsWritten >= spec.CountLimit { 480 + *exported = recordsWritten 481 + *existingCount = bundlesProcessed 482 + return nil 483 } 484 485 + // Write operation 486 + var data []byte 487 if len(op.RawJSON) > 0 { 488 + data = op.RawJSON 489 } else { 490 + var err error 491 + data, err = json.Marshal(op) 492 + if err != nil { 493 + continue 494 + } 495 + } 496 + data = append(data, '\n') 497 + 498 + if _, err := writer.Write(data); err != nil { 499 + return fmt.Errorf("write failed: %w", err) 500 } 501 502 + recordsWritten++ 503 + bytesWritten += uint64(len(data)) 504 505 + if progress != nil && recordsWritten%1000 == 0 { 506 + progress(recordsWritten, bytesWritten) 507 + } 508 } 509 + 510 + bundlesProcessed++ 511 } 512 513 + if progress != nil { 514 + progress(recordsWritten, bytesWritten) 515 } 516 517 + *exported = recordsWritten 518 + *existingCount = bundlesProcessed 519 + return nil 520 } 521 522 type exportLogger struct {
+4 -1
go.mod
··· 1 module tangled.org/atscan.net/plcbundle 2 3 - go 1.25 4 5 require ( 6 github.com/goccy/go-json v0.10.5 ··· 10 github.com/valyala/gozstd v1.23.2 11 golang.org/x/sys v0.38.0 12 golang.org/x/term v0.36.0 13 ) 14 15 require ( 16 github.com/inconshreveable/mousetrap v1.1.0 // indirect 17 github.com/spf13/pflag v1.0.9 // indirect
··· 1 module tangled.org/atscan.net/plcbundle 2 3 + go 1.25.3 4 5 require ( 6 github.com/goccy/go-json v0.10.5 ··· 10 github.com/valyala/gozstd v1.23.2 11 golang.org/x/sys v0.38.0 12 golang.org/x/term v0.36.0 13 + tangled.org/atscan.net/plcbundle-rs/bindings/go v0.0.0 14 ) 15 16 + replace tangled.org/atscan.net/plcbundle-rs/bindings/go => ../plcbundle-rs/bindings/go 17 + 18 require ( 19 github.com/inconshreveable/mousetrap v1.1.0 // indirect 20 github.com/spf13/pflag v1.0.9 // indirect