[DEPRECATED] Go implementation of plcbundle
at main 8.5 kB view raw
1package bundle 2 3import ( 4 "fmt" 5 "path/filepath" 6 "sort" 7 "strconv" 8 "strings" 9 "sync" 10 "time" 11 12 "tangled.org/atscan.net/plcbundle-go/internal/bundleindex" 13) 14 15// ScanDirectory scans the bundle directory and rebuilds the index 16func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) { 17 result := &DirectoryScanResult{ 18 BundleDir: m.config.BundleDir, 19 } 20 21 m.logger.Printf("Scanning directory: %s", m.config.BundleDir) 22 23 // Find all bundle files 24 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst")) 25 if err != nil { 26 return nil, fmt.Errorf("failed to scan directory: %w", err) 27 } 28 files = filterBundleFiles(files) 29 30 if len(files) == 0 { 31 m.logger.Printf("No bundle files found") 32 return result, nil 33 } 34 35 // Parse bundle numbers 36 var bundleNumbers []int 37 for _, file := range files { 38 base := filepath.Base(file) 39 numStr := strings.TrimSuffix(base, ".jsonl.zst") 40 num, err := strconv.Atoi(numStr) 41 if err != nil { 42 m.logger.Printf("Warning: skipping invalid filename: %s", base) 43 continue 44 } 45 bundleNumbers = append(bundleNumbers, num) 46 } 47 48 sort.Ints(bundleNumbers) 49 50 result.BundleCount = len(bundleNumbers) 51 if len(bundleNumbers) > 0 { 52 result.FirstBundle = bundleNumbers[0] 53 result.LastBundle = bundleNumbers[len(bundleNumbers)-1] 54 } 55 56 // Find gaps 57 if len(bundleNumbers) > 1 { 58 for i := result.FirstBundle; i <= result.LastBundle; i++ { 59 found := false 60 for _, num := range bundleNumbers { 61 if num == i { 62 found = true 63 break 64 } 65 } 66 if !found { 67 result.MissingGaps = append(result.MissingGaps, i) 68 } 69 } 70 } 71 72 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps)) 73 74 // Load each bundle and rebuild index 75 var newMetadata []*bundleindex.BundleMetadata 76 var totalSize int64 77 78 for _, num := range bundleNumbers { 79 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 80 81 // Load bundle 82 ops, err := m.operations.LoadBundle(path) 83 if err != nil { 84 m.logger.Printf("Warning: failed to load bundle %d: %v", num, err) 85 continue 86 } 87 88 // Get file size 89 size, _ := m.operations.GetFileSize(path) 90 totalSize += size 91 92 // Calculate parent and cursor from previous bundle 93 var parent string 94 var cursor string 95 if num > 1 && len(newMetadata) > 0 { 96 prevMeta := newMetadata[len(newMetadata)-1] 97 parent = prevMeta.Hash 98 cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 99 } 100 101 // Use the ONE method for metadata calculation 102 meta, err := m.CalculateBundleMetadata(num, path, ops, parent, cursor) 103 if err != nil { 104 m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err) 105 continue 106 } 107 108 newMetadata = append(newMetadata, meta) 109 110 m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount) 111 } 112 113 result.TotalSize = totalSize 114 115 // Rebuild index 116 m.index.Rebuild(newMetadata) 117 118 // Save index 119 if err := m.SaveIndex(); err != nil { 120 return nil, fmt.Errorf("failed to save index: %w", err) 121 } 122 123 result.IndexUpdated = true 124 125 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata)) 126 127 return result, nil 128} 129 130// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index 131func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) { 132 result := &DirectoryScanResult{ 133 BundleDir: m.config.BundleDir, 134 } 135 136 m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir) 137 138 // Find all bundle files 139 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst")) 140 if err != nil { 141 return nil, fmt.Errorf("failed to scan directory: %w", err) 142 } 143 files = filterBundleFiles(files) 144 145 if len(files) == 0 { 146 m.logger.Printf("No bundle files found") 147 return result, nil 148 } 149 150 // Parse bundle numbers 151 var bundleNumbers []int 152 for _, file := range files { 153 base := filepath.Base(file) 154 numStr := strings.TrimSuffix(base, ".jsonl.zst") 155 num, err := strconv.Atoi(numStr) 156 if err != nil { 157 m.logger.Printf("Warning: skipping invalid filename: %s", base) 158 continue 159 } 160 bundleNumbers = append(bundleNumbers, num) 161 } 162 163 sort.Ints(bundleNumbers) 164 165 result.BundleCount = len(bundleNumbers) 166 if len(bundleNumbers) > 0 { 167 result.FirstBundle = bundleNumbers[0] 168 result.LastBundle = bundleNumbers[len(bundleNumbers)-1] 169 } 170 171 // Find gaps 172 if len(bundleNumbers) > 1 { 173 for i := result.FirstBundle; i <= result.LastBundle; i++ { 174 found := false 175 for _, num := range bundleNumbers { 176 if num == i { 177 found = true 178 break 179 } 180 } 181 if !found { 182 result.MissingGaps = append(result.MissingGaps, i) 183 } 184 } 185 } 186 187 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps)) 188 189 // Process bundles in parallel 190 type bundleResult struct { 191 index int 192 meta *bundleindex.BundleMetadata 193 err error 194 } 195 196 jobs := make(chan int, len(bundleNumbers)) 197 results := make(chan bundleResult, len(bundleNumbers)) 198 199 // Start workers 200 var wg sync.WaitGroup 201 for w := 0; w < workers; w++ { 202 wg.Add(1) 203 go func() { 204 defer wg.Done() 205 for num := range jobs { 206 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 207 208 // Stream metadata WITHOUT loading all operations 209 meta, err := m.CalculateMetadataStreaming(num, path) 210 if err != nil { 211 results <- bundleResult{index: num, err: err} 212 continue 213 } 214 215 results <- bundleResult{index: num, meta: meta} 216 } 217 }() 218 } 219 220 // Send jobs 221 for _, num := range bundleNumbers { 222 jobs <- num 223 } 224 close(jobs) 225 226 // Wait for all workers to finish 227 go func() { 228 wg.Wait() 229 close(results) 230 }() 231 232 // Collect results 233 metadataMap := make(map[int]*bundleindex.BundleMetadata) 234 var totalSize int64 235 var totalUncompressed int64 236 processed := 0 237 238 for result := range results { 239 processed++ 240 241 // Update progress WITH bytes 242 if progressCallback != nil { 243 if result.meta != nil { 244 totalUncompressed += result.meta.UncompressedSize 245 } 246 progressCallback(processed, len(bundleNumbers), totalUncompressed) 247 } 248 249 if result.err != nil { 250 m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err) 251 continue 252 } 253 metadataMap[result.index] = result.meta 254 totalSize += result.meta.CompressedSize 255 } 256 257 // Build ordered metadata slice and calculate chain hashes 258 var newMetadata []*bundleindex.BundleMetadata 259 var parent string 260 261 for i, num := range bundleNumbers { 262 meta, ok := metadataMap[num] 263 if !ok { 264 continue 265 } 266 267 // Set cursor from previous bundle's EndTime 268 if i > 0 && len(newMetadata) > 0 { 269 prevMeta := newMetadata[len(newMetadata)-1] 270 meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 271 } 272 273 // Calculate chain hash (must be done sequentially) 274 meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash) 275 meta.Parent = parent 276 277 newMetadata = append(newMetadata, meta) 278 parent = meta.Hash 279 } 280 281 result.TotalSize = totalSize 282 result.TotalUncompressed = totalUncompressed 283 284 // Rebuild index 285 m.index.Rebuild(newMetadata) 286 287 // Save index 288 if err := m.SaveIndex(); err != nil { 289 return nil, fmt.Errorf("failed to save index: %w", err) 290 } 291 292 result.IndexUpdated = true 293 294 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata)) 295 296 return result, nil 297} 298 299// ScanBundle scans a single bundle file and returns its metadata 300func (m *Manager) ScanBundle(path string, bundleNumber int) (*bundleindex.BundleMetadata, error) { 301 // Load bundle file 302 operations, err := m.operations.LoadBundle(path) 303 if err != nil { 304 return nil, fmt.Errorf("failed to load bundle: %w", err) 305 } 306 307 if len(operations) == 0 { 308 return nil, fmt.Errorf("bundle is empty") 309 } 310 311 // Get parent chain hash and cursor from previous bundle 312 var parent string 313 var cursor string 314 if bundleNumber > 1 { 315 if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil { 316 parent = prevMeta.Hash 317 cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 318 } 319 } 320 321 // Use the ONE method 322 return m.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor) 323} 324 325// ScanAndIndexBundle scans a bundle file and adds it to the index 326func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*bundleindex.BundleMetadata, error) { 327 meta, err := m.ScanBundle(path, bundleNumber) 328 if err != nil { 329 return nil, err 330 } 331 332 // Add to index 333 m.index.AddBundle(meta) 334 335 // Save index 336 if err := m.SaveIndex(); err != nil { 337 return nil, fmt.Errorf("failed to save index: %w", err) 338 } 339 340 return meta, nil 341}