[DEPRECATED] Go implementation of plcbundle
1package bundle
2
3import (
4 "fmt"
5 "path/filepath"
6 "sort"
7 "strconv"
8 "strings"
9 "sync"
10 "time"
11
12 "tangled.org/atscan.net/plcbundle-go/internal/bundleindex"
13)
14
15// ScanDirectory scans the bundle directory and rebuilds the index
16func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) {
17 result := &DirectoryScanResult{
18 BundleDir: m.config.BundleDir,
19 }
20
21 m.logger.Printf("Scanning directory: %s", m.config.BundleDir)
22
23 // Find all bundle files
24 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
25 if err != nil {
26 return nil, fmt.Errorf("failed to scan directory: %w", err)
27 }
28 files = filterBundleFiles(files)
29
30 if len(files) == 0 {
31 m.logger.Printf("No bundle files found")
32 return result, nil
33 }
34
35 // Parse bundle numbers
36 var bundleNumbers []int
37 for _, file := range files {
38 base := filepath.Base(file)
39 numStr := strings.TrimSuffix(base, ".jsonl.zst")
40 num, err := strconv.Atoi(numStr)
41 if err != nil {
42 m.logger.Printf("Warning: skipping invalid filename: %s", base)
43 continue
44 }
45 bundleNumbers = append(bundleNumbers, num)
46 }
47
48 sort.Ints(bundleNumbers)
49
50 result.BundleCount = len(bundleNumbers)
51 if len(bundleNumbers) > 0 {
52 result.FirstBundle = bundleNumbers[0]
53 result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
54 }
55
56 // Find gaps
57 if len(bundleNumbers) > 1 {
58 for i := result.FirstBundle; i <= result.LastBundle; i++ {
59 found := false
60 for _, num := range bundleNumbers {
61 if num == i {
62 found = true
63 break
64 }
65 }
66 if !found {
67 result.MissingGaps = append(result.MissingGaps, i)
68 }
69 }
70 }
71
72 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
73
74 // Load each bundle and rebuild index
75 var newMetadata []*bundleindex.BundleMetadata
76 var totalSize int64
77
78 for _, num := range bundleNumbers {
79 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
80
81 // Load bundle
82 ops, err := m.operations.LoadBundle(path)
83 if err != nil {
84 m.logger.Printf("Warning: failed to load bundle %d: %v", num, err)
85 continue
86 }
87
88 // Get file size
89 size, _ := m.operations.GetFileSize(path)
90 totalSize += size
91
92 // Calculate parent and cursor from previous bundle
93 var parent string
94 var cursor string
95 if num > 1 && len(newMetadata) > 0 {
96 prevMeta := newMetadata[len(newMetadata)-1]
97 parent = prevMeta.Hash
98 cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
99 }
100
101 // Use the ONE method for metadata calculation
102 meta, err := m.CalculateBundleMetadata(num, path, ops, parent, cursor)
103 if err != nil {
104 m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err)
105 continue
106 }
107
108 newMetadata = append(newMetadata, meta)
109
110 m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount)
111 }
112
113 result.TotalSize = totalSize
114
115 // Rebuild index
116 m.index.Rebuild(newMetadata)
117
118 // Save index
119 if err := m.SaveIndex(); err != nil {
120 return nil, fmt.Errorf("failed to save index: %w", err)
121 }
122
123 result.IndexUpdated = true
124
125 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
126
127 return result, nil
128}
129
130// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index
131func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) {
132 result := &DirectoryScanResult{
133 BundleDir: m.config.BundleDir,
134 }
135
136 m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir)
137
138 // Find all bundle files
139 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
140 if err != nil {
141 return nil, fmt.Errorf("failed to scan directory: %w", err)
142 }
143 files = filterBundleFiles(files)
144
145 if len(files) == 0 {
146 m.logger.Printf("No bundle files found")
147 return result, nil
148 }
149
150 // Parse bundle numbers
151 var bundleNumbers []int
152 for _, file := range files {
153 base := filepath.Base(file)
154 numStr := strings.TrimSuffix(base, ".jsonl.zst")
155 num, err := strconv.Atoi(numStr)
156 if err != nil {
157 m.logger.Printf("Warning: skipping invalid filename: %s", base)
158 continue
159 }
160 bundleNumbers = append(bundleNumbers, num)
161 }
162
163 sort.Ints(bundleNumbers)
164
165 result.BundleCount = len(bundleNumbers)
166 if len(bundleNumbers) > 0 {
167 result.FirstBundle = bundleNumbers[0]
168 result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
169 }
170
171 // Find gaps
172 if len(bundleNumbers) > 1 {
173 for i := result.FirstBundle; i <= result.LastBundle; i++ {
174 found := false
175 for _, num := range bundleNumbers {
176 if num == i {
177 found = true
178 break
179 }
180 }
181 if !found {
182 result.MissingGaps = append(result.MissingGaps, i)
183 }
184 }
185 }
186
187 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
188
189 // Process bundles in parallel
190 type bundleResult struct {
191 index int
192 meta *bundleindex.BundleMetadata
193 err error
194 }
195
196 jobs := make(chan int, len(bundleNumbers))
197 results := make(chan bundleResult, len(bundleNumbers))
198
199 // Start workers
200 var wg sync.WaitGroup
201 for w := 0; w < workers; w++ {
202 wg.Add(1)
203 go func() {
204 defer wg.Done()
205 for num := range jobs {
206 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
207
208 // Stream metadata WITHOUT loading all operations
209 meta, err := m.CalculateMetadataStreaming(num, path)
210 if err != nil {
211 results <- bundleResult{index: num, err: err}
212 continue
213 }
214
215 results <- bundleResult{index: num, meta: meta}
216 }
217 }()
218 }
219
220 // Send jobs
221 for _, num := range bundleNumbers {
222 jobs <- num
223 }
224 close(jobs)
225
226 // Wait for all workers to finish
227 go func() {
228 wg.Wait()
229 close(results)
230 }()
231
232 // Collect results
233 metadataMap := make(map[int]*bundleindex.BundleMetadata)
234 var totalSize int64
235 var totalUncompressed int64
236 processed := 0
237
238 for result := range results {
239 processed++
240
241 // Update progress WITH bytes
242 if progressCallback != nil {
243 if result.meta != nil {
244 totalUncompressed += result.meta.UncompressedSize
245 }
246 progressCallback(processed, len(bundleNumbers), totalUncompressed)
247 }
248
249 if result.err != nil {
250 m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err)
251 continue
252 }
253 metadataMap[result.index] = result.meta
254 totalSize += result.meta.CompressedSize
255 }
256
257 // Build ordered metadata slice and calculate chain hashes
258 var newMetadata []*bundleindex.BundleMetadata
259 var parent string
260
261 for i, num := range bundleNumbers {
262 meta, ok := metadataMap[num]
263 if !ok {
264 continue
265 }
266
267 // Set cursor from previous bundle's EndTime
268 if i > 0 && len(newMetadata) > 0 {
269 prevMeta := newMetadata[len(newMetadata)-1]
270 meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
271 }
272
273 // Calculate chain hash (must be done sequentially)
274 meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash)
275 meta.Parent = parent
276
277 newMetadata = append(newMetadata, meta)
278 parent = meta.Hash
279 }
280
281 result.TotalSize = totalSize
282 result.TotalUncompressed = totalUncompressed
283
284 // Rebuild index
285 m.index.Rebuild(newMetadata)
286
287 // Save index
288 if err := m.SaveIndex(); err != nil {
289 return nil, fmt.Errorf("failed to save index: %w", err)
290 }
291
292 result.IndexUpdated = true
293
294 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
295
296 return result, nil
297}
298
299// ScanBundle scans a single bundle file and returns its metadata
300func (m *Manager) ScanBundle(path string, bundleNumber int) (*bundleindex.BundleMetadata, error) {
301 // Load bundle file
302 operations, err := m.operations.LoadBundle(path)
303 if err != nil {
304 return nil, fmt.Errorf("failed to load bundle: %w", err)
305 }
306
307 if len(operations) == 0 {
308 return nil, fmt.Errorf("bundle is empty")
309 }
310
311 // Get parent chain hash and cursor from previous bundle
312 var parent string
313 var cursor string
314 if bundleNumber > 1 {
315 if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil {
316 parent = prevMeta.Hash
317 cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
318 }
319 }
320
321 // Use the ONE method
322 return m.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor)
323}
324
325// ScanAndIndexBundle scans a bundle file and adds it to the index
326func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*bundleindex.BundleMetadata, error) {
327 meta, err := m.ScanBundle(path, bundleNumber)
328 if err != nil {
329 return nil, err
330 }
331
332 // Add to index
333 m.index.AddBundle(meta)
334
335 // Save index
336 if err := m.SaveIndex(); err != nil {
337 return nil, fmt.Errorf("failed to save index: %w", err)
338 }
339
340 return meta, nil
341}