A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "bufio"
5 "bytes"
6 "encoding/json"
7 "fmt"
8 "os"
9 "path/filepath"
10 "sync"
11 "time"
12
13 "tangled.org/atscan.net/plcbundle/plc"
14)
15
16const MEMPOOL_FILE_PREFIX = "plc_mempool_"
17
18// Mempool stores operations waiting to be bundled
19// Operations must be strictly chronological
20type Mempool struct {
21 operations []plc.PLCOperation
22 targetBundle int // Which bundle number these operations are for
23 minTimestamp time.Time // Operations must be after this time
24 file string
25 mu sync.RWMutex
26 logger Logger
27 validated bool // Track if we've validated chronological order
28}
29
30// NewMempool creates a new mempool for a specific bundle number
31func NewMempool(bundleDir string, targetBundle int, minTimestamp time.Time, logger Logger) (*Mempool, error) {
32 filename := fmt.Sprintf("%s%06d.jsonl", MEMPOOL_FILE_PREFIX, targetBundle)
33
34 m := &Mempool{
35 file: filepath.Join(bundleDir, filename),
36 targetBundle: targetBundle,
37 minTimestamp: minTimestamp,
38 operations: make([]plc.PLCOperation, 0),
39 logger: logger,
40 validated: false,
41 }
42
43 // Load existing mempool from disk if it exists
44 if err := m.Load(); err != nil {
45 // If file doesn't exist, that's OK
46 if !os.IsNotExist(err) {
47 return nil, fmt.Errorf("failed to load mempool: %w", err)
48 }
49 }
50
51 return m, nil
52}
53
54// Add adds operations to the mempool with strict validation
55func (m *Mempool) Add(ops []plc.PLCOperation) (int, error) {
56 m.mu.Lock()
57 defer m.mu.Unlock()
58
59 if len(ops) == 0 {
60 return 0, nil
61 }
62
63 // Build existing CID set
64 existingCIDs := make(map[string]bool)
65 for _, op := range m.operations {
66 existingCIDs[op.CID] = true
67 }
68
69 // Validate and add operations
70 var newOps []plc.PLCOperation
71 var lastTime time.Time
72
73 // Start from last operation time if we have any
74 if len(m.operations) > 0 {
75 lastTime = m.operations[len(m.operations)-1].CreatedAt
76 } else {
77 lastTime = m.minTimestamp
78 }
79
80 for _, op := range ops {
81 // Skip duplicates
82 if existingCIDs[op.CID] {
83 continue
84 }
85
86 // CRITICAL: Validate chronological order
87 if !op.CreatedAt.After(lastTime) && !op.CreatedAt.Equal(lastTime) {
88 return len(newOps), fmt.Errorf(
89 "chronological violation: operation %s at %s is not after %s",
90 op.CID, op.CreatedAt.Format(time.RFC3339Nano), lastTime.Format(time.RFC3339Nano),
91 )
92 }
93
94 // Validate operation is after minimum timestamp
95 if op.CreatedAt.Before(m.minTimestamp) {
96 return len(newOps), fmt.Errorf(
97 "operation %s at %s is before minimum timestamp %s (belongs in earlier bundle)",
98 op.CID, op.CreatedAt.Format(time.RFC3339Nano), m.minTimestamp.Format(time.RFC3339Nano),
99 )
100 }
101
102 newOps = append(newOps, op)
103 existingCIDs[op.CID] = true
104 lastTime = op.CreatedAt
105 }
106
107 // Add new operations
108 m.operations = append(m.operations, newOps...)
109 m.validated = true
110
111 return len(newOps), nil
112}
113
114// Validate performs a full chronological validation of all operations
115func (m *Mempool) Validate() error {
116 m.mu.RLock()
117 defer m.mu.RUnlock()
118
119 if len(m.operations) == 0 {
120 return nil
121 }
122
123 // Check all operations are after minimum timestamp
124 for i, op := range m.operations {
125 if op.CreatedAt.Before(m.minTimestamp) {
126 return fmt.Errorf(
127 "operation %d (CID: %s) at %s is before minimum timestamp %s",
128 i, op.CID, op.CreatedAt.Format(time.RFC3339Nano), m.minTimestamp.Format(time.RFC3339Nano),
129 )
130 }
131 }
132
133 // Check chronological order
134 for i := 1; i < len(m.operations); i++ {
135 prev := m.operations[i-1]
136 curr := m.operations[i]
137
138 if curr.CreatedAt.Before(prev.CreatedAt) {
139 return fmt.Errorf(
140 "chronological violation at index %d: %s (%s) is before %s (%s)",
141 i, curr.CID, curr.CreatedAt.Format(time.RFC3339Nano),
142 prev.CID, prev.CreatedAt.Format(time.RFC3339Nano),
143 )
144 }
145 }
146
147 // Check for duplicate CIDs
148 cidSet := make(map[string]int)
149 for i, op := range m.operations {
150 if prevIdx, exists := cidSet[op.CID]; exists {
151 return fmt.Errorf(
152 "duplicate CID %s at indices %d and %d",
153 op.CID, prevIdx, i,
154 )
155 }
156 cidSet[op.CID] = i
157 }
158
159 return nil
160}
161
162// Count returns the number of operations in mempool
163func (m *Mempool) Count() int {
164 m.mu.RLock()
165 defer m.mu.RUnlock()
166 return len(m.operations)
167}
168
169// Take removes and returns up to n operations from the front
170func (m *Mempool) Take(n int) ([]plc.PLCOperation, error) {
171 m.mu.Lock()
172 defer m.mu.Unlock()
173
174 // Validate before taking
175 if err := m.validateLocked(); err != nil {
176 return nil, fmt.Errorf("mempool validation failed: %w", err)
177 }
178
179 if n > len(m.operations) {
180 n = len(m.operations)
181 }
182
183 result := make([]plc.PLCOperation, n)
184 copy(result, m.operations[:n])
185
186 // Remove taken operations
187 m.operations = m.operations[n:]
188
189 return result, nil
190}
191
192// validateLocked performs validation with lock already held
193func (m *Mempool) validateLocked() error {
194 if m.validated {
195 return nil
196 }
197
198 if len(m.operations) == 0 {
199 return nil
200 }
201
202 // Check chronological order
203 lastTime := m.minTimestamp
204 for i, op := range m.operations {
205 if op.CreatedAt.Before(lastTime) {
206 return fmt.Errorf(
207 "chronological violation at index %d: %s is before %s",
208 i, op.CreatedAt.Format(time.RFC3339Nano), lastTime.Format(time.RFC3339Nano),
209 )
210 }
211 lastTime = op.CreatedAt
212 }
213
214 m.validated = true
215 return nil
216}
217
218// Peek returns up to n operations without removing them
219func (m *Mempool) Peek(n int) []plc.PLCOperation {
220 m.mu.RLock()
221 defer m.mu.RUnlock()
222
223 if n > len(m.operations) {
224 n = len(m.operations)
225 }
226
227 result := make([]plc.PLCOperation, n)
228 copy(result, m.operations[:n])
229
230 return result
231}
232
233// Clear removes all operations
234func (m *Mempool) Clear() {
235 m.mu.Lock()
236 defer m.mu.Unlock()
237 m.operations = make([]plc.PLCOperation, 0)
238 m.validated = false
239}
240
241// Save persists mempool to disk
242func (m *Mempool) Save() error {
243 m.mu.RLock()
244 defer m.mu.RUnlock()
245
246 if len(m.operations) == 0 {
247 // Remove file if empty
248 os.Remove(m.file)
249 return nil
250 }
251
252 // Validate before saving
253 if err := m.validateLocked(); err != nil {
254 return fmt.Errorf("mempool validation failed, refusing to save: %w", err)
255 }
256
257 // Serialize to JSONL
258 var buf bytes.Buffer
259 for _, op := range m.operations {
260 if len(op.RawJSON) > 0 {
261 buf.Write(op.RawJSON)
262 } else {
263 data, _ := json.Marshal(op)
264 buf.Write(data)
265 }
266 buf.WriteByte('\n')
267 }
268
269 // Write atomically
270 tempFile := m.file + ".tmp"
271 if err := os.WriteFile(tempFile, buf.Bytes(), 0644); err != nil {
272 return fmt.Errorf("failed to write mempool: %w", err)
273 }
274
275 if err := os.Rename(tempFile, m.file); err != nil {
276 os.Remove(tempFile)
277 return fmt.Errorf("failed to rename mempool file: %w", err)
278 }
279
280 return nil
281}
282
283// Load reads mempool from disk and validates it
284func (m *Mempool) Load() error {
285 data, err := os.ReadFile(m.file)
286 if err != nil {
287 return err
288 }
289
290 m.mu.Lock()
291 defer m.mu.Unlock()
292
293 // Parse JSONL
294 scanner := bufio.NewScanner(bytes.NewReader(data))
295 buf := make([]byte, 0, 64*1024)
296 scanner.Buffer(buf, 1024*1024)
297
298 m.operations = make([]plc.PLCOperation, 0)
299
300 for scanner.Scan() {
301 line := scanner.Bytes()
302 if len(line) == 0 {
303 continue
304 }
305
306 var op plc.PLCOperation
307 if err := json.Unmarshal(line, &op); err != nil {
308 return fmt.Errorf("failed to parse mempool operation: %w", err)
309 }
310
311 op.RawJSON = make([]byte, len(line))
312 copy(op.RawJSON, line)
313
314 m.operations = append(m.operations, op)
315 }
316
317 if err := scanner.Err(); err != nil {
318 return fmt.Errorf("scanner error: %w", err)
319 }
320
321 // CRITICAL: Validate loaded data
322 if err := m.validateLocked(); err != nil {
323 return fmt.Errorf("loaded mempool failed validation: %w", err)
324 }
325
326 if len(m.operations) > 0 {
327 m.logger.Printf("Loaded %d operations from mempool for bundle %06d", len(m.operations), m.targetBundle)
328 }
329
330 return nil
331}
332
333// GetFirstTime returns the created_at of the first operation
334func (m *Mempool) GetFirstTime() string {
335 m.mu.RLock()
336 defer m.mu.RUnlock()
337
338 if len(m.operations) == 0 {
339 return ""
340 }
341
342 return m.operations[0].CreatedAt.Format(time.RFC3339Nano)
343}
344
345// GetLastTime returns the created_at of the last operation
346func (m *Mempool) GetLastTime() string {
347 m.mu.RLock()
348 defer m.mu.RUnlock()
349
350 if len(m.operations) == 0 {
351 return ""
352 }
353
354 return m.operations[len(m.operations)-1].CreatedAt.Format(time.RFC3339Nano)
355}
356
357// GetTargetBundle returns the bundle number this mempool is for
358func (m *Mempool) GetTargetBundle() int {
359 return m.targetBundle
360}
361
362// GetMinTimestamp returns the minimum timestamp for operations
363func (m *Mempool) GetMinTimestamp() time.Time {
364 return m.minTimestamp
365}
366
367// Stats returns mempool statistics
368func (m *Mempool) Stats() map[string]interface{} {
369 m.mu.RLock()
370 defer m.mu.RUnlock()
371
372 count := len(m.operations)
373
374 stats := map[string]interface{}{
375 "count": count,
376 "can_create_bundle": count >= BUNDLE_SIZE,
377 "target_bundle": m.targetBundle,
378 "min_timestamp": m.minTimestamp,
379 "validated": m.validated,
380 }
381
382 if count > 0 {
383 stats["first_time"] = m.operations[0].CreatedAt
384 stats["last_time"] = m.operations[len(m.operations)-1].CreatedAt
385
386 // Calculate size and unique DIDs
387 totalSize := 0
388 didSet := make(map[string]bool)
389 for _, op := range m.operations {
390 totalSize += len(op.RawJSON)
391 didSet[op.DID] = true
392 }
393 stats["size_bytes"] = totalSize
394 stats["did_count"] = len(didSet) // ← ADDED
395 }
396
397 return stats
398}
399
400// Delete removes the mempool file
401func (m *Mempool) Delete() error {
402 if err := os.Remove(m.file); err != nil && !os.IsNotExist(err) {
403 return fmt.Errorf("failed to delete mempool file: %w", err)
404 }
405 return nil
406}
407
408// GetFilename returns the mempool filename
409func (m *Mempool) GetFilename() string {
410 return filepath.Base(m.file)
411}