A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at test-validate 411 lines 9.6 kB view raw
1package bundle 2 3import ( 4 "bufio" 5 "bytes" 6 "encoding/json" 7 "fmt" 8 "os" 9 "path/filepath" 10 "sync" 11 "time" 12 13 "tangled.org/atscan.net/plcbundle/plc" 14) 15 16const MEMPOOL_FILE_PREFIX = "plc_mempool_" 17 18// Mempool stores operations waiting to be bundled 19// Operations must be strictly chronological 20type Mempool struct { 21 operations []plc.PLCOperation 22 targetBundle int // Which bundle number these operations are for 23 minTimestamp time.Time // Operations must be after this time 24 file string 25 mu sync.RWMutex 26 logger Logger 27 validated bool // Track if we've validated chronological order 28} 29 30// NewMempool creates a new mempool for a specific bundle number 31func NewMempool(bundleDir string, targetBundle int, minTimestamp time.Time, logger Logger) (*Mempool, error) { 32 filename := fmt.Sprintf("%s%06d.jsonl", MEMPOOL_FILE_PREFIX, targetBundle) 33 34 m := &Mempool{ 35 file: filepath.Join(bundleDir, filename), 36 targetBundle: targetBundle, 37 minTimestamp: minTimestamp, 38 operations: make([]plc.PLCOperation, 0), 39 logger: logger, 40 validated: false, 41 } 42 43 // Load existing mempool from disk if it exists 44 if err := m.Load(); err != nil { 45 // If file doesn't exist, that's OK 46 if !os.IsNotExist(err) { 47 return nil, fmt.Errorf("failed to load mempool: %w", err) 48 } 49 } 50 51 return m, nil 52} 53 54// Add adds operations to the mempool with strict validation 55func (m *Mempool) Add(ops []plc.PLCOperation) (int, error) { 56 m.mu.Lock() 57 defer m.mu.Unlock() 58 59 if len(ops) == 0 { 60 return 0, nil 61 } 62 63 // Build existing CID set 64 existingCIDs := make(map[string]bool) 65 for _, op := range m.operations { 66 existingCIDs[op.CID] = true 67 } 68 69 // Validate and add operations 70 var newOps []plc.PLCOperation 71 var lastTime time.Time 72 73 // Start from last operation time if we have any 74 if len(m.operations) > 0 { 75 lastTime = m.operations[len(m.operations)-1].CreatedAt 76 } else { 77 lastTime = m.minTimestamp 78 } 79 80 for _, op := range ops { 81 // Skip duplicates 82 if existingCIDs[op.CID] { 83 continue 84 } 85 86 // CRITICAL: Validate chronological order 87 if !op.CreatedAt.After(lastTime) && !op.CreatedAt.Equal(lastTime) { 88 return len(newOps), fmt.Errorf( 89 "chronological violation: operation %s at %s is not after %s", 90 op.CID, op.CreatedAt.Format(time.RFC3339Nano), lastTime.Format(time.RFC3339Nano), 91 ) 92 } 93 94 // Validate operation is after minimum timestamp 95 if op.CreatedAt.Before(m.minTimestamp) { 96 return len(newOps), fmt.Errorf( 97 "operation %s at %s is before minimum timestamp %s (belongs in earlier bundle)", 98 op.CID, op.CreatedAt.Format(time.RFC3339Nano), m.minTimestamp.Format(time.RFC3339Nano), 99 ) 100 } 101 102 newOps = append(newOps, op) 103 existingCIDs[op.CID] = true 104 lastTime = op.CreatedAt 105 } 106 107 // Add new operations 108 m.operations = append(m.operations, newOps...) 109 m.validated = true 110 111 return len(newOps), nil 112} 113 114// Validate performs a full chronological validation of all operations 115func (m *Mempool) Validate() error { 116 m.mu.RLock() 117 defer m.mu.RUnlock() 118 119 if len(m.operations) == 0 { 120 return nil 121 } 122 123 // Check all operations are after minimum timestamp 124 for i, op := range m.operations { 125 if op.CreatedAt.Before(m.minTimestamp) { 126 return fmt.Errorf( 127 "operation %d (CID: %s) at %s is before minimum timestamp %s", 128 i, op.CID, op.CreatedAt.Format(time.RFC3339Nano), m.minTimestamp.Format(time.RFC3339Nano), 129 ) 130 } 131 } 132 133 // Check chronological order 134 for i := 1; i < len(m.operations); i++ { 135 prev := m.operations[i-1] 136 curr := m.operations[i] 137 138 if curr.CreatedAt.Before(prev.CreatedAt) { 139 return fmt.Errorf( 140 "chronological violation at index %d: %s (%s) is before %s (%s)", 141 i, curr.CID, curr.CreatedAt.Format(time.RFC3339Nano), 142 prev.CID, prev.CreatedAt.Format(time.RFC3339Nano), 143 ) 144 } 145 } 146 147 // Check for duplicate CIDs 148 cidSet := make(map[string]int) 149 for i, op := range m.operations { 150 if prevIdx, exists := cidSet[op.CID]; exists { 151 return fmt.Errorf( 152 "duplicate CID %s at indices %d and %d", 153 op.CID, prevIdx, i, 154 ) 155 } 156 cidSet[op.CID] = i 157 } 158 159 return nil 160} 161 162// Count returns the number of operations in mempool 163func (m *Mempool) Count() int { 164 m.mu.RLock() 165 defer m.mu.RUnlock() 166 return len(m.operations) 167} 168 169// Take removes and returns up to n operations from the front 170func (m *Mempool) Take(n int) ([]plc.PLCOperation, error) { 171 m.mu.Lock() 172 defer m.mu.Unlock() 173 174 // Validate before taking 175 if err := m.validateLocked(); err != nil { 176 return nil, fmt.Errorf("mempool validation failed: %w", err) 177 } 178 179 if n > len(m.operations) { 180 n = len(m.operations) 181 } 182 183 result := make([]plc.PLCOperation, n) 184 copy(result, m.operations[:n]) 185 186 // Remove taken operations 187 m.operations = m.operations[n:] 188 189 return result, nil 190} 191 192// validateLocked performs validation with lock already held 193func (m *Mempool) validateLocked() error { 194 if m.validated { 195 return nil 196 } 197 198 if len(m.operations) == 0 { 199 return nil 200 } 201 202 // Check chronological order 203 lastTime := m.minTimestamp 204 for i, op := range m.operations { 205 if op.CreatedAt.Before(lastTime) { 206 return fmt.Errorf( 207 "chronological violation at index %d: %s is before %s", 208 i, op.CreatedAt.Format(time.RFC3339Nano), lastTime.Format(time.RFC3339Nano), 209 ) 210 } 211 lastTime = op.CreatedAt 212 } 213 214 m.validated = true 215 return nil 216} 217 218// Peek returns up to n operations without removing them 219func (m *Mempool) Peek(n int) []plc.PLCOperation { 220 m.mu.RLock() 221 defer m.mu.RUnlock() 222 223 if n > len(m.operations) { 224 n = len(m.operations) 225 } 226 227 result := make([]plc.PLCOperation, n) 228 copy(result, m.operations[:n]) 229 230 return result 231} 232 233// Clear removes all operations 234func (m *Mempool) Clear() { 235 m.mu.Lock() 236 defer m.mu.Unlock() 237 m.operations = make([]plc.PLCOperation, 0) 238 m.validated = false 239} 240 241// Save persists mempool to disk 242func (m *Mempool) Save() error { 243 m.mu.RLock() 244 defer m.mu.RUnlock() 245 246 if len(m.operations) == 0 { 247 // Remove file if empty 248 os.Remove(m.file) 249 return nil 250 } 251 252 // Validate before saving 253 if err := m.validateLocked(); err != nil { 254 return fmt.Errorf("mempool validation failed, refusing to save: %w", err) 255 } 256 257 // Serialize to JSONL 258 var buf bytes.Buffer 259 for _, op := range m.operations { 260 if len(op.RawJSON) > 0 { 261 buf.Write(op.RawJSON) 262 } else { 263 data, _ := json.Marshal(op) 264 buf.Write(data) 265 } 266 buf.WriteByte('\n') 267 } 268 269 // Write atomically 270 tempFile := m.file + ".tmp" 271 if err := os.WriteFile(tempFile, buf.Bytes(), 0644); err != nil { 272 return fmt.Errorf("failed to write mempool: %w", err) 273 } 274 275 if err := os.Rename(tempFile, m.file); err != nil { 276 os.Remove(tempFile) 277 return fmt.Errorf("failed to rename mempool file: %w", err) 278 } 279 280 return nil 281} 282 283// Load reads mempool from disk and validates it 284func (m *Mempool) Load() error { 285 data, err := os.ReadFile(m.file) 286 if err != nil { 287 return err 288 } 289 290 m.mu.Lock() 291 defer m.mu.Unlock() 292 293 // Parse JSONL 294 scanner := bufio.NewScanner(bytes.NewReader(data)) 295 buf := make([]byte, 0, 64*1024) 296 scanner.Buffer(buf, 1024*1024) 297 298 m.operations = make([]plc.PLCOperation, 0) 299 300 for scanner.Scan() { 301 line := scanner.Bytes() 302 if len(line) == 0 { 303 continue 304 } 305 306 var op plc.PLCOperation 307 if err := json.Unmarshal(line, &op); err != nil { 308 return fmt.Errorf("failed to parse mempool operation: %w", err) 309 } 310 311 op.RawJSON = make([]byte, len(line)) 312 copy(op.RawJSON, line) 313 314 m.operations = append(m.operations, op) 315 } 316 317 if err := scanner.Err(); err != nil { 318 return fmt.Errorf("scanner error: %w", err) 319 } 320 321 // CRITICAL: Validate loaded data 322 if err := m.validateLocked(); err != nil { 323 return fmt.Errorf("loaded mempool failed validation: %w", err) 324 } 325 326 if len(m.operations) > 0 { 327 m.logger.Printf("Loaded %d operations from mempool for bundle %06d", len(m.operations), m.targetBundle) 328 } 329 330 return nil 331} 332 333// GetFirstTime returns the created_at of the first operation 334func (m *Mempool) GetFirstTime() string { 335 m.mu.RLock() 336 defer m.mu.RUnlock() 337 338 if len(m.operations) == 0 { 339 return "" 340 } 341 342 return m.operations[0].CreatedAt.Format(time.RFC3339Nano) 343} 344 345// GetLastTime returns the created_at of the last operation 346func (m *Mempool) GetLastTime() string { 347 m.mu.RLock() 348 defer m.mu.RUnlock() 349 350 if len(m.operations) == 0 { 351 return "" 352 } 353 354 return m.operations[len(m.operations)-1].CreatedAt.Format(time.RFC3339Nano) 355} 356 357// GetTargetBundle returns the bundle number this mempool is for 358func (m *Mempool) GetTargetBundle() int { 359 return m.targetBundle 360} 361 362// GetMinTimestamp returns the minimum timestamp for operations 363func (m *Mempool) GetMinTimestamp() time.Time { 364 return m.minTimestamp 365} 366 367// Stats returns mempool statistics 368func (m *Mempool) Stats() map[string]interface{} { 369 m.mu.RLock() 370 defer m.mu.RUnlock() 371 372 count := len(m.operations) 373 374 stats := map[string]interface{}{ 375 "count": count, 376 "can_create_bundle": count >= BUNDLE_SIZE, 377 "target_bundle": m.targetBundle, 378 "min_timestamp": m.minTimestamp, 379 "validated": m.validated, 380 } 381 382 if count > 0 { 383 stats["first_time"] = m.operations[0].CreatedAt 384 stats["last_time"] = m.operations[len(m.operations)-1].CreatedAt 385 386 // Calculate size and unique DIDs 387 totalSize := 0 388 didSet := make(map[string]bool) 389 for _, op := range m.operations { 390 totalSize += len(op.RawJSON) 391 didSet[op.DID] = true 392 } 393 stats["size_bytes"] = totalSize 394 stats["did_count"] = len(didSet) // ← ADDED 395 } 396 397 return stats 398} 399 400// Delete removes the mempool file 401func (m *Mempool) Delete() error { 402 if err := os.Remove(m.file); err != nil && !os.IsNotExist(err) { 403 return fmt.Errorf("failed to delete mempool file: %w", err) 404 } 405 return nil 406} 407 408// GetFilename returns the mempool filename 409func (m *Mempool) GetFilename() string { 410 return filepath.Base(m.file) 411}