A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

better index rebuilding

Changed files
+205 -82
bundle
cmd
plcbundle
internal
+91 -57
bundle/manager.go
··· 461 461 } 462 462 463 463 // SaveBundle saves a bundle to disk and updates the index 464 - func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, verbose bool, quiet bool, stats types.BundleProductionStats) (time.Duration, error) { 464 + func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, verbose bool, quiet bool, stats types.BundleProductionStats, skipDIDIndex bool) (time.Duration, error) { 465 465 466 466 totalStart := time.Now() 467 467 if err := bundle.ValidateForSave(); err != nil { ··· 574 574 575 575 // DID index update (if enabled) 576 576 var indexUpdateDuration time.Duration 577 - if m.didIndex != nil && m.didIndex.Exists() { 577 + if !skipDIDIndex && m.didIndex != nil && m.didIndex.Exists() { 578 578 indexUpdateStart := time.Now() 579 579 if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil { 580 580 m.logger.Printf("Warning: failed to update DID index: %v", err) ··· 1513 1513 return m.mempool.Count() 1514 1514 } 1515 1515 1516 - // FetchAndSaveNextBundle fetches and saves next bundle, returns bundle number and index time 1517 - func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool) (int, *types.BundleProductionStats, error) { 1516 + func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) { 1518 1517 bundle, stats, err := m.FetchNextBundle(ctx, verbose, quiet) 1519 1518 if err != nil { 1520 1519 return 0, nil, err 1521 1520 } 1522 1521 1523 - indexTime, err := m.SaveBundle(ctx, bundle, verbose, quiet, stats) 1522 + indexTime, err := m.SaveBundle(ctx, bundle, verbose, quiet, stats, skipDIDIndex) 1524 1523 if err != nil { 1525 1524 return 0, nil, err 1526 1525 } ··· 1536 1535 } 1537 1536 1538 1537 // RunSyncOnce performs a single sync cycle 1539 - func (m *Manager) RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig, verbose bool) (int, error) { 1538 + func (m *Manager) RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig) (int, error) { 1540 1539 // Manager itself implements the SyncManager interface 1541 - return internalsync.SyncOnce(ctx, m, config, verbose) 1540 + return internalsync.SyncOnce(ctx, m, config) 1542 1541 } 1543 1542 1544 1543 // EnsureDIDIndex ensures DID index is built and up-to-date 1545 1544 // Returns true if index was built/rebuilt, false if already up-to-date 1546 1545 func (m *Manager) EnsureDIDIndex(ctx context.Context, progressCallback func(current, total int)) (bool, error) { 1547 - bundleCount := m.index.Count() 1548 - didStats := m.GetDIDIndexStats() 1549 - 1550 - if bundleCount == 0 { 1551 - return false, nil 1552 - } 1553 - 1554 - needsBuild := false 1555 - reason := "" 1556 - 1557 - if !didStats["exists"].(bool) { 1558 - needsBuild = true 1559 - reason = "index does not exist" 1560 - } else { 1561 - // Check version 1562 - if m.didIndex != nil { 1563 - config := m.didIndex.GetConfig() 1564 - if config.Version != didindex.DIDINDEX_VERSION { 1565 - needsBuild = true 1566 - reason = fmt.Sprintf("index version outdated (v%d, need v%d)", 1567 - config.Version, didindex.DIDINDEX_VERSION) 1568 - } else { 1569 - // Check if index is behind bundles 1570 - lastBundle := m.index.GetLastBundle() 1571 - if lastBundle != nil && config.LastBundle < lastBundle.BundleNumber { 1572 - needsBuild = true 1573 - reason = fmt.Sprintf("index is behind (bundle %d, need %d)", 1574 - config.LastBundle, lastBundle.BundleNumber) 1575 - } 1576 - } 1577 - } 1578 - } 1579 - 1580 - if !needsBuild { 1581 - return false, nil 1582 - } 1583 - 1584 1546 // Build index 1585 - m.logger.Printf("Building DID index (%s)", reason) 1586 - m.logger.Printf("This may take several minutes...") 1587 - 1588 - if err := m.BuildDIDIndex(ctx, progressCallback); err != nil { 1589 - return false, fmt.Errorf("failed to build DID index: %w", err) 1590 - } 1591 - 1592 - // Verify index consistency 1593 - m.logger.Printf("Verifying index consistency...") 1594 - if err := m.didIndex.VerifyAndRepairIndex(ctx, m); err != nil { 1595 - return false, fmt.Errorf("index verification/repair failed: %w", err) 1596 - } 1597 - 1547 + m.UpdateDIDIndexSmart(ctx, progressCallback) 1598 1548 return true, nil 1599 1549 } 1600 1550 ··· 1843 1793 func (m *Manager) SetQuiet(quiet bool) { 1844 1794 m.config.Quiet = quiet 1845 1795 } 1796 + 1797 + // ShouldRebuildDIDIndex checks if DID index needs rebuilding 1798 + // Returns: (needsRebuild bool, reason string, canUpdateIncrementally bool) 1799 + func (m *Manager) ShouldRebuildDIDIndex() (bool, string, bool) { 1800 + if m.didIndex == nil { 1801 + return false, "DID index disabled", false 1802 + } 1803 + 1804 + needsRebuild, reason := m.didIndex.NeedsRebuild(m.GetBundleIndex()) 1805 + 1806 + if needsRebuild { 1807 + return true, reason, false 1808 + } 1809 + 1810 + // Check if incremental update is better 1811 + canIncremental, behindBy := m.didIndex.ShouldUpdateIncrementally(m.GetBundleIndex()) 1812 + if canIncremental { 1813 + return false, fmt.Sprintf("can update incrementally (%d bundles)", behindBy), true 1814 + } 1815 + 1816 + return false, "index is up to date", false 1817 + } 1818 + 1819 + // UpdateDIDIndexSmart updates DID index intelligently (rebuild vs incremental) 1820 + func (m *Manager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error { 1821 + needsRebuild, reason, canIncremental := m.ShouldRebuildDIDIndex() 1822 + 1823 + if !needsRebuild && !canIncremental { 1824 + if m.config.Verbose { 1825 + m.logger.Printf("DID index is up to date") 1826 + } 1827 + return nil 1828 + } 1829 + 1830 + if needsRebuild { 1831 + m.logger.Printf("Rebuilding DID index: %s", reason) 1832 + return m.BuildDIDIndex(ctx, progressCallback) 1833 + } 1834 + 1835 + if canIncremental { 1836 + m.logger.Printf("Updating DID index incrementally: %s", reason) 1837 + return m.updateDIDIndexIncremental(ctx, progressCallback) 1838 + } 1839 + 1840 + return nil 1841 + } 1842 + 1843 + // updateDIDIndexIncremental updates index for missing bundles only 1844 + func (m *Manager) updateDIDIndexIncremental(ctx context.Context, progressCallback func(current, total int)) error { 1845 + config := m.didIndex.GetConfig() 1846 + lastBundle := m.index.GetLastBundle() 1847 + 1848 + if lastBundle == nil || config.LastBundle >= lastBundle.BundleNumber { 1849 + return nil 1850 + } 1851 + 1852 + start := config.LastBundle + 1 1853 + end := lastBundle.BundleNumber 1854 + total := end - start + 1 1855 + 1856 + m.logger.Printf("Updating DID index for bundles %d-%d (%d bundles)", start, end, total) 1857 + 1858 + for bundleNum := start; bundleNum <= end; bundleNum++ { 1859 + bundle, err := m.LoadBundle(ctx, bundleNum) 1860 + if err != nil { 1861 + return fmt.Errorf("failed to load bundle %d: %w", bundleNum, err) 1862 + } 1863 + 1864 + bundleData := &didindex.BundleData{ 1865 + BundleNumber: bundle.BundleNumber, 1866 + Operations: bundle.Operations, 1867 + } 1868 + 1869 + if err := m.didIndex.UpdateIndexForBundle(ctx, bundleData); err != nil { 1870 + return fmt.Errorf("failed to update bundle %d: %w", bundleNum, err) 1871 + } 1872 + 1873 + if progressCallback != nil { 1874 + progressCallback(bundleNum-start+1, total) 1875 + } 1876 + } 1877 + 1878 + return nil 1879 + }
+2 -2
cmd/plcbundle/commands/common.go
··· 31 31 RefreshMempool() error 32 32 ClearMempool() error 33 33 FetchNextBundle(ctx context.Context, verbose bool, quiet bool) (*bundle.Bundle, types.BundleProductionStats, error) 34 - SaveBundle(ctx context.Context, b *bundle.Bundle, verbose bool, quiet bool, stats types.BundleProductionStats) (time.Duration, error) 34 + SaveBundle(ctx context.Context, bundle *bundle.Bundle, verbose bool, quiet bool, stats types.BundleProductionStats, skipDIDIndex bool) (time.Duration, error) 35 35 SaveIndex() error 36 36 GetDIDIndexStats() map[string]interface{} 37 37 GetDIDIndex() *didindex.Manager ··· 44 44 LoadOperations(ctx context.Context, bundleNumber int, positions []int) (map[int]*plcclient.PLCOperation, error) 45 45 CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error) 46 46 ResolveDID(ctx context.Context, did string) (*bundle.ResolveDIDResult, error) 47 - RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig, verbose bool) (int, error) 47 + RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig) (int, error) 48 48 RunSyncLoop(ctx context.Context, config *internalsync.SyncLoopConfig) error 49 49 GetBundleIndex() didindex.BundleIndexProvider 50 50 ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*bundle.DirectoryScanResult, error)
+1 -1
cmd/plcbundle/commands/export.go
··· 238 238 } 239 239 240 240 var err error 241 - fetchedCount, err = mgr.RunSyncOnce(ctx, config, opts.verbose) 241 + fetchedCount, err = mgr.RunSyncOnce(ctx, config) 242 242 if err != nil { 243 243 return err 244 244 }
+1 -1
cmd/plcbundle/commands/sync.go
··· 112 112 } 113 113 114 114 // Call manager method (not internal directly) 115 - synced, err := mgr.RunSyncOnce(ctx, config, verbose) 115 + synced, err := mgr.RunSyncOnce(ctx, config) 116 116 if err != nil { 117 117 return err 118 118 }
+66
internal/didindex/manager.go
··· 999 999 dim.recentLookupIdx = 0 1000 1000 dim.lookupTimeLock.Unlock() 1001 1001 } 1002 + 1003 + // NeedsRebuild checks if index needs rebuilding and returns reason 1004 + func (dim *Manager) NeedsRebuild(bundleProvider BundleIndexProvider) (bool, string) { 1005 + // Check if index exists 1006 + if !dim.Exists() { 1007 + return true, "index does not exist" 1008 + } 1009 + 1010 + // Get repository state 1011 + bundles := bundleProvider.GetBundles() 1012 + if len(bundles) == 0 { 1013 + return false, "" // No bundles, no need to rebuild 1014 + } 1015 + 1016 + lastBundleInRepo := bundles[len(bundles)-1].BundleNumber 1017 + 1018 + // Check version 1019 + if dim.config.Version != DIDINDEX_VERSION { 1020 + return true, fmt.Sprintf("index version outdated (v%d, need v%d)", 1021 + dim.config.Version, DIDINDEX_VERSION) 1022 + } 1023 + 1024 + // Check if index is behind 1025 + if dim.config.LastBundle < lastBundleInRepo { 1026 + bundlesBehind := lastBundleInRepo - dim.config.LastBundle 1027 + 1028 + // Smart logic: only rebuild if significantly behind 1029 + // Otherwise can do incremental update 1030 + if bundlesBehind > 100 { 1031 + return true, fmt.Sprintf("index significantly behind (%d bundles)", bundlesBehind) 1032 + } 1033 + 1034 + return false, fmt.Sprintf("index slightly behind (%d bundles) - can update incrementally", bundlesBehind) 1035 + } 1036 + 1037 + // Check if index is ahead (corruption indicator) 1038 + if dim.config.LastBundle > lastBundleInRepo { 1039 + return true, fmt.Sprintf("index is ahead of repository (has %d, repo has %d) - likely corrupted", 1040 + dim.config.LastBundle, lastBundleInRepo) 1041 + } 1042 + 1043 + // Index is up to date 1044 + return false, "" 1045 + } 1046 + 1047 + // ShouldUpdateIncrementally checks if incremental update is appropriate 1048 + func (dim *Manager) ShouldUpdateIncrementally(bundleProvider BundleIndexProvider) (bool, int) { 1049 + if !dim.Exists() { 1050 + return false, 0 1051 + } 1052 + 1053 + bundles := bundleProvider.GetBundles() 1054 + if len(bundles) == 0 { 1055 + return false, 0 1056 + } 1057 + 1058 + lastBundleInRepo := bundles[len(bundles)-1].BundleNumber 1059 + bundlesBehind := lastBundleInRepo - dim.config.LastBundle 1060 + 1061 + // Only do incremental if behind by less than 100 bundles 1062 + if bundlesBehind > 0 && bundlesBehind <= 100 { 1063 + return true, bundlesBehind 1064 + } 1065 + 1066 + return false, 0 1067 + }
+28 -13
internal/sync/sync_test.go
··· 458 458 459 459 logger := &testLogger{t: t} 460 460 config := &internalsync.SyncLoopConfig{ 461 - MaxBundles: 0, 462 - Verbose: false, 463 - Logger: logger, 461 + MaxBundles: 0, 462 + Verbose: false, 463 + Logger: logger, 464 + SkipDIDIndex: false, 464 465 } 465 466 466 467 // First sync should detect "caught up" when no progress 467 - synced, err := internalsync.SyncOnce(context.Background(), mockMgr, config, false) 468 + synced, err := internalsync.SyncOnce(context.Background(), mockMgr, config) 468 469 469 470 if err != nil { 470 471 t.Fatalf("SyncOnce failed: %v", err) ··· 484 485 485 486 logger := &testLogger{t: t} 486 487 config := &internalsync.SyncLoopConfig{ 487 - MaxBundles: 3, 488 - Verbose: false, 489 - Logger: logger, 488 + MaxBundles: 3, 489 + Verbose: false, 490 + Logger: logger, 491 + SkipDIDIndex: false, 490 492 } 491 493 492 494 ctx := context.Background() 493 - synced, err := internalsync.SyncOnce(ctx, mockMgr, config, false) 495 + synced, err := internalsync.SyncOnce(ctx, mockMgr, config) 494 496 495 497 if err != nil { 496 498 t.Fatalf("SyncOnce failed: %v", err) ··· 511 513 512 514 logger := &testLogger{t: t} 513 515 config := &internalsync.SyncLoopConfig{ 514 - Interval: 100 * time.Millisecond, 515 - MaxBundles: 0, 516 - Verbose: false, 517 - Logger: logger, 516 + Interval: 100 * time.Millisecond, 517 + MaxBundles: 0, 518 + Verbose: false, 519 + Logger: logger, 520 + SkipDIDIndex: false, 518 521 } 519 522 520 523 ctx, cancel := context.WithCancel(context.Background()) ··· 684 687 return m.lastBundle 685 688 } 686 689 690 + func (m *mockSyncManager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error { 691 + m.mu.Lock() 692 + defer m.mu.Unlock() 693 + return nil 694 + } 695 + 696 + func (m *mockSyncManager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error { 697 + m.mu.Lock() 698 + defer m.mu.Unlock() 699 + return nil 700 + } 701 + 687 702 func (m *mockSyncManager) GetMempoolCount() int { 688 703 m.mu.Lock() 689 704 defer m.mu.Unlock() 690 705 return m.mempoolCount 691 706 } 692 707 693 - func (m *mockSyncManager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool) (int, *types.BundleProductionStats, error) { 708 + func (m *mockSyncManager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) { 694 709 m.mu.Lock() 695 710 defer m.mu.Unlock() 696 711
+16 -8
internal/sync/syncer.go
··· 15 15 Verbose bool 16 16 Logger types.Logger 17 17 OnBundleSynced func(bundleNum int, fetchedCount int, mempoolCount int, duration time.Duration, indexTime time.Duration) 18 + SkipDIDIndex bool 19 + Quiet bool 18 20 } 19 21 20 22 // DefaultSyncLoopConfig returns default configuration 21 23 func DefaultSyncLoopConfig() *SyncLoopConfig { 22 24 return &SyncLoopConfig{ 23 - Interval: 1 * time.Minute, 24 - MaxBundles: 0, 25 - Verbose: false, 25 + Interval: 1 * time.Minute, 26 + MaxBundles: 0, 27 + Verbose: false, 28 + SkipDIDIndex: false, 29 + Quiet: false, 26 30 } 27 31 } 28 32 ··· 31 35 GetLastBundleNumber() int 32 36 GetMempoolCount() int 33 37 // Returns: bundleNumber, indexUpdateTime, error 34 - FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool) (int, *types.BundleProductionStats, error) 38 + FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) 35 39 SaveMempool() error 40 + BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error 41 + UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error 36 42 } 37 43 38 44 // SyncOnce performs a single sync cycle - fetches until caught up 39 - func SyncOnce(ctx context.Context, mgr SyncManager, config *SyncLoopConfig, verbose bool) (int, error) { 45 + func SyncOnce(ctx context.Context, mgr SyncManager, config *SyncLoopConfig) (int, error) { 40 46 cycleStart := time.Now() 41 47 startMempool := mgr.GetMempoolCount() 42 48 ··· 50 56 mempoolBefore := mgr.GetMempoolCount() 51 57 52 58 // Attempt to fetch and save next bundle 53 - bundleNum, stats, err := mgr.FetchAndSaveNextBundle(ctx, verbose, false) 59 + bundleNum, stats, err := mgr.FetchAndSaveNextBundle(ctx, config.Verbose, config.Quiet, config.SkipDIDIndex) 54 60 55 61 // Check if we made any progress 56 62 bundleAfter := mgr.GetLastBundleNumber() ··· 136 142 config.Logger.Printf("[Sync] Initial sync starting...") 137 143 } 138 144 139 - synced, err := SyncOnce(ctx, mgr, config, config.Verbose) 145 + config.SkipDIDIndex = true 146 + synced, err := SyncOnce(ctx, mgr, config) 140 147 if err != nil { 141 148 return err 142 149 } 143 150 bundlesSynced += synced 151 + mgr.UpdateDIDIndexSmart(ctx, nil) 144 152 145 153 // Check if reached limit 146 154 if config.MaxBundles > 0 && bundlesSynced >= config.MaxBundles { ··· 167 175 168 176 case <-ticker.C: 169 177 // Each tick, do one sync cycle (which fetches until caught up) 170 - synced, err := SyncOnce(ctx, mgr, config, config.Verbose) 178 + synced, err := SyncOnce(ctx, mgr, config) 171 179 if err != nil { 172 180 if config.Logger != nil { 173 181 config.Logger.Printf("[Sync] Error: %v", err)