wip
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

update

+154 -32
+129 -32
internal/plc/scanner.go
··· 57 57 currentBundle++ 58 58 } 59 59 60 - log.Info("Starting from bundle %06d", currentBundle) // Changed from %06x 60 + log.Info("Starting from bundle %06d", currentBundle) 61 61 62 62 // Ensure bundle continuity (all previous bundles exist) 63 63 if currentBundle > 1 { ··· 70 70 totalProcessed := int64(0) 71 71 newPDSCount := int64(0) 72 72 73 - // Process bundles sequentially 73 + // ✅ CHECK MEMPOOL FIRST - if it has data, continue filling it instead of fetching new bundle 74 + mempoolCount, err := s.db.GetMempoolCount(ctx) 75 + if err != nil { 76 + return err 77 + } 78 + 79 + if mempoolCount > 0 { 80 + log.Info("→ Mempool has %d operations, continuing to fill it before fetching new bundles", mempoolCount) 81 + 82 + // Fill mempool until we have 10,000 83 + if err := s.fillMempoolToSize(ctx, &newPDSCount, &totalProcessed); err != nil { 84 + log.Error("Error filling mempool: %v", err) 85 + return err 86 + } 87 + 88 + // Try to create bundles from mempool 89 + if err := s.processMempoolRecursive(ctx, &newPDSCount, &currentBundle, &totalProcessed); err != nil { 90 + log.Error("Error processing mempool: %v", err) 91 + } 92 + 93 + log.Info("PLC scan completed: %d operations, %d new PDS servers in %v", 94 + totalProcessed, newPDSCount, time.Since(startTime)) 95 + return nil 96 + } 97 + 98 + // Process bundles sequentially (normal flow when mempool is empty) 74 99 for { 75 100 select { 76 101 case <-ctx.Done(): ··· 104 129 } 105 130 106 131 if isComplete { 107 - // Complete bundle (1000 operations fetched, even if some were duplicates) 132 + // Complete bundle 108 133 batchNewPDS, err := s.processBatch(ctx, operations) 109 134 if err != nil { 110 135 log.Error("Error processing bundle: %v", err) ··· 135 160 log.Error("Error adding to mempool: %v", err) 136 161 } 137 162 163 + // ✅ Now fill mempool to 10,000 164 + if err := s.fillMempoolToSize(ctx, &newPDSCount, &totalProcessed); err != nil { 165 + log.Error("Error filling mempool: %v", err) 166 + } 167 + 138 168 // Process mempool 139 169 if err := s.processMempoolRecursive(ctx, &newPDSCount, &currentBundle, &totalProcessed); err != nil { 140 170 log.Error("Error processing mempool: %v", err) ··· 150 180 return nil 151 181 } 152 182 183 + func (s *Scanner) fillMempoolToSize(ctx context.Context, newPDSCount *int64, totalProcessed *int64) error { 184 + const fetchLimit = 1000 // PLC directory limit 185 + 186 + for { 187 + countBefore, err := s.db.GetMempoolCount(ctx) 188 + if err != nil { 189 + return err 190 + } 191 + 192 + if countBefore >= BUNDLE_SIZE { 193 + log.Info("✓ Mempool filled to %d operations (target: %d)", countBefore, BUNDLE_SIZE) 194 + return nil 195 + } 196 + 197 + log.Info("→ Mempool has %d/%d operations, fetching more from PLC directory...", countBefore, BUNDLE_SIZE) 198 + 199 + // ✅ Get just the last operation (much faster!) 200 + lastOp, err := s.db.GetLastMempoolOperation(ctx) 201 + if err != nil { 202 + return err 203 + } 204 + 205 + var afterTimestamp string 206 + if lastOp != nil { 207 + afterTimestamp = lastOp.CreatedAt.Format(time.RFC3339Nano) 208 + log.Verbose(" Using cursor: %s", afterTimestamp) 209 + } 210 + 211 + // ✅ Always fetch 1000 (PLC limit) 212 + operations, err := s.client.Export(ctx, ExportOptions{ 213 + Count: fetchLimit, 214 + After: afterTimestamp, 215 + }) 216 + if err != nil { 217 + return fmt.Errorf("failed to fetch from PLC: %w", err) 218 + } 219 + 220 + fetchedCount := len(operations) 221 + log.Verbose(" Fetched %d operations from PLC", fetchedCount) 222 + 223 + // ✅ No data at all - we're done 224 + if fetchedCount == 0 { 225 + log.Info("→ No more data available from PLC directory (mempool has %d/%d)", countBefore, BUNDLE_SIZE) 226 + return nil 227 + } 228 + 229 + // Add to mempool (with duplicate checking) 230 + if err := s.addToMempool(ctx, operations); err != nil { 231 + return err 232 + } 233 + 234 + *totalProcessed += int64(fetchedCount) 235 + 236 + // Check if mempool actually grew 237 + countAfter, err := s.db.GetMempoolCount(ctx) 238 + if err != nil { 239 + return err 240 + } 241 + 242 + newOpsAdded := countAfter - countBefore 243 + duplicateCount := fetchedCount - newOpsAdded 244 + 245 + log.Verbose(" Added %d new unique operations to mempool (%d were duplicates)", 246 + newOpsAdded, duplicateCount) 247 + 248 + // ✅ KEY LOGIC: Only repeat if we got a FULL batch (1000) 249 + // If < 1000, it means we've caught up to the latest data 250 + if fetchedCount < fetchLimit { 251 + log.Info("→ Received incomplete batch (%d/%d), caught up to latest data", 252 + fetchedCount, fetchLimit) 253 + log.Info("→ Stopping fill, mempool has %d/%d operations", countAfter, BUNDLE_SIZE) 254 + return nil 255 + } 256 + 257 + // Got full batch (1000), might be more data - continue loop 258 + log.Verbose(" Received full batch (%d), checking for more data...", fetchLimit) 259 + } 260 + } 261 + 153 262 // addToMempool adds operations to mempool and processes them for PDS discovery 154 263 func (s *Scanner) addToMempool(ctx context.Context, operations []PLCOperation) error { 155 264 mempoolOps := make([]storage.MempoolOperation, len(operations)) ··· 186 295 log.Verbose("Mempool contains %d operations", count) 187 296 188 297 if count < BUNDLE_SIZE { 189 - log.Info("Mempool has < %d operations, waiting for more data", BUNDLE_SIZE) 298 + log.Info("Mempool has %d/%d operations, cannot create bundle yet", count, BUNDLE_SIZE) 190 299 break 191 300 } 192 301 193 - // ✅ Fetch MORE than needed to account for potential duplicates during dedup 194 - // Fetch 1.2x to have buffer (20% extra) 195 - fetchSize := int(float64(BUNDLE_SIZE) * 1.2) 196 - if fetchSize > count { 197 - fetchSize = count 198 - } 302 + log.Info("→ Creating bundle from mempool (%d operations available)...", count) 199 303 200 - mempoolOps, err := s.db.GetMempoolOperations(ctx, fetchSize) 304 + // Get first BUNDLE_SIZE operations ordered by timestamp 305 + mempoolOps, err := s.db.GetMempoolOperations(ctx, BUNDLE_SIZE) 201 306 if err != nil { 202 307 return err 203 308 } 204 309 205 - // ✅ Deduplicate by CID while preserving order 310 + // Convert to PLCOperations and track IDs 311 + operations := make([]PLCOperation, 0, BUNDLE_SIZE) 312 + mempoolIDs := make([]int64, 0, BUNDLE_SIZE) 206 313 seenCIDs := make(map[string]bool) 207 - var uniqueOps []PLCOperation 208 - mempoolIDs := make([]int64, 0, len(mempoolOps)) 209 314 210 315 for _, mop := range mempoolOps { 316 + // ✅ Skip duplicates (shouldn't happen but safety check) 211 317 if seenCIDs[mop.CID] { 212 - // Duplicate - still mark for deletion from mempool 213 - mempoolIDs = append(mempoolIDs, mop.ID) 318 + mempoolIDs = append(mempoolIDs, mop.ID) // Still delete it 214 319 continue 215 320 } 216 - 217 321 seenCIDs[mop.CID] = true 218 322 219 323 var op PLCOperation 220 324 json.Unmarshal([]byte(mop.Operation), &op) 221 - uniqueOps = append(uniqueOps, op) 325 + operations = append(operations, op) 222 326 mempoolIDs = append(mempoolIDs, mop.ID) 223 327 224 - // Stop when we have enough unique operations 225 - if len(uniqueOps) >= BUNDLE_SIZE { 328 + if len(operations) >= BUNDLE_SIZE { 226 329 break 227 330 } 228 331 } 229 332 230 - // ✅ Check if we have enough unique operations 231 - if len(uniqueOps) < BUNDLE_SIZE { 232 - log.Info("Mempool has only %d unique operations after dedup (need %d), waiting for more data", 233 - len(uniqueOps), BUNDLE_SIZE) 333 + // Final check 334 + if len(operations) < BUNDLE_SIZE { 335 + log.Error("⚠ Only got %d unique operations from mempool, need %d", len(operations), BUNDLE_SIZE) 234 336 break 235 337 } 236 338 237 - // Trim to exact size 238 - operations := uniqueOps[:BUNDLE_SIZE] 239 - idsToDelete := mempoolIDs[:len(operations)] 240 - 241 339 // Create bundle from these operations 242 340 bundleNum, err := s.bundleManager.CreateBundleFromMempool(ctx, operations) 243 341 if err != nil { 244 342 return err 245 343 } 246 344 247 - // Remove from mempool (only the ones we used) 248 - if err := s.db.DeleteFromMempool(ctx, idsToDelete); err != nil { 345 + // Remove from mempool (only what we used) 346 + if err := s.db.DeleteFromMempool(ctx, mempoolIDs[:len(operations)]); err != nil { 249 347 return err 250 348 } 251 349 252 - // Process for PDS (already processed when added, but for consistency) 350 + // Process for PDS 253 351 batchNewPDS, _ := s.processBatch(ctx, operations) 254 352 *newPDSCount += batchNewPDS 255 - *totalProcessed += int64(len(operations)) 256 353 257 354 *currentBundle = bundleNum 258 355
+1
internal/storage/db.go
··· 38 38 GetMempoolCount(ctx context.Context) (int, error) 39 39 GetMempoolOperations(ctx context.Context, limit int) ([]MempoolOperation, error) 40 40 DeleteFromMempool(ctx context.Context, ids []int64) error 41 + GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error) 41 42 42 43 // Metrics 43 44 StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error
+24
internal/storage/sqlite.go
··· 205 205 } 206 206 defer tx.Rollback() 207 207 208 + // ✅ Use ON CONFLICT to skip duplicates 208 209 stmt, err := tx.PrepareContext(ctx, ` 209 210 INSERT INTO plc_mempool (did, operation, cid, created_at) 210 211 VALUES (?, ?, ?, ?) ··· 279 280 280 281 _, err := s.db.ExecContext(ctx, query, args...) 281 282 return err 283 + } 284 + 285 + // GetLastMempoolOperation retrieves the most recent operation from mempool 286 + func (s *SQLiteDB) GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error) { 287 + query := ` 288 + SELECT id, did, operation, cid, created_at, added_at 289 + FROM plc_mempool 290 + ORDER BY created_at DESC, id DESC 291 + LIMIT 1 292 + ` 293 + 294 + var op MempoolOperation 295 + err := s.db.QueryRowContext(ctx, query).Scan( 296 + &op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt, 297 + ) 298 + if err == sql.ErrNoRows { 299 + return nil, nil // No operations in mempool 300 + } 301 + if err != nil { 302 + return nil, err 303 + } 304 + 305 + return &op, nil 282 306 } 283 307 284 308 func (s *SQLiteDB) CreateBundle(ctx context.Context, bundle *PLCBundle) error {