A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

server debug endpoints

Changed files
+754 -2064
bundle
cmd
plcbundle
commands
docs
internal
didindex
handleresolver
sync
server
+1 -1
README.md
··· 126 126 127 127 ```bash 128 128 # Fetch bundles from plc.directory 129 - plcbundle fetch 129 + plcbundle sync 130 130 131 131 # Clone from remote 132 132 plcbundle clone https://plc.example.com
+214 -3
bundle/manager.go
··· 12 12 "sort" 13 13 "strings" 14 14 "sync" 15 + "sync/atomic" 15 16 "time" 16 17 17 18 "tangled.org/atscan.net/plcbundle/internal/bundleindex" ··· 54 55 bundleCache map[int]*Bundle 55 56 cacheMu sync.RWMutex 56 57 maxCacheSize int 58 + 59 + // NEW: Resolver performance tracking 60 + resolverStats struct { 61 + sync.Mutex 62 + totalResolutions int64 63 + mempoolHits int64 64 + bundleHits int64 65 + errors int64 66 + 67 + // Timing (in microseconds) 68 + totalTime int64 69 + totalMempoolTime int64 70 + totalIndexTime int64 71 + totalLoadOpTime int64 72 + 73 + // Recent timings (circular buffer) 74 + recentTimes []resolverTiming 75 + recentIdx int 76 + recentSize int 77 + } 57 78 } 58 79 59 80 // NewManager creates a new bundle manager ··· 304 325 handleResolver = handleresolver.NewClient(config.HandleResolverURL) 305 326 } 306 327 307 - return &Manager{ 328 + m := &Manager{ 308 329 config: config, 309 330 operations: ops, 310 331 index: index, ··· 318 339 cloner: cloner, 319 340 plcClient: plcClient, 320 341 handleResolver: handleResolver, 321 - }, nil 342 + } 343 + // Initialize resolver stats 344 + m.resolverStats.recentSize = 1000 345 + m.resolverStats.recentTimes = make([]resolverTiming, 1000) 346 + 347 + return m, nil 322 348 } 323 349 324 350 // Close cleans up resources ··· 1375 1401 // ResolveDID resolves a DID to its current document with detailed timing metrics 1376 1402 func (m *Manager) ResolveDID(ctx context.Context, did string) (*ResolveDIDResult, error) { 1377 1403 if err := plcclient.ValidateDIDFormat(did); err != nil { 1404 + // Track error 1405 + atomic.AddInt64(&m.resolverStats.errors, 1) 1378 1406 return nil, err 1379 1407 } 1380 1408 ··· 1386 1414 1387 1415 var latestMempoolOp *plcclient.PLCOperation 1388 1416 if m.mempool != nil { 1389 - // Fast backwards search with early exit 1390 1417 latestMempoolOp = m.mempool.FindLatestDIDOperation(did) 1391 1418 } 1392 1419 result.MempoolTime = time.Since(mempoolStart) ··· 1395 1422 if latestMempoolOp != nil { 1396 1423 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*latestMempoolOp}) 1397 1424 if err != nil { 1425 + atomic.AddInt64(&m.resolverStats.errors, 1) 1398 1426 return nil, fmt.Errorf("resolution failed: %w", err) 1399 1427 } 1400 1428 1401 1429 result.Document = doc 1402 1430 result.Source = "mempool" 1403 1431 result.TotalTime = time.Since(totalStart) 1432 + 1433 + // Record stats 1434 + m.recordResolverTiming(result, nil) 1435 + 1404 1436 return result, nil 1405 1437 } 1406 1438 1407 1439 // STEP 2: Index lookup 1408 1440 if m.didIndex == nil || !m.didIndex.Exists() { 1441 + atomic.AddInt64(&m.resolverStats.errors, 1) 1409 1442 return nil, fmt.Errorf("DID index not available - run 'plcbundle index build' to enable DID resolution") 1410 1443 } 1411 1444 ··· 1414 1447 result.IndexTime = time.Since(indexStart) 1415 1448 1416 1449 if err != nil { 1450 + atomic.AddInt64(&m.resolverStats.errors, 1) 1417 1451 return nil, err 1418 1452 } 1419 1453 1420 1454 if len(locations) == 0 { 1455 + atomic.AddInt64(&m.resolverStats.errors, 1) 1421 1456 return nil, fmt.Errorf("DID not found") 1422 1457 } 1423 1458 ··· 1433 1468 } 1434 1469 1435 1470 if latestLoc == nil { 1471 + atomic.AddInt64(&m.resolverStats.errors, 1) 1436 1472 return nil, fmt.Errorf("no valid operations (all nullified)") 1437 1473 } 1438 1474 ··· 1442 1478 result.LoadOpTime = time.Since(opStart) 1443 1479 1444 1480 if err != nil { 1481 + atomic.AddInt64(&m.resolverStats.errors, 1) 1445 1482 return nil, fmt.Errorf("failed to load operation: %w", err) 1446 1483 } 1447 1484 ··· 1451 1488 // STEP 4: Resolve document 1452 1489 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op}) 1453 1490 if err != nil { 1491 + atomic.AddInt64(&m.resolverStats.errors, 1) 1454 1492 return nil, fmt.Errorf("resolution failed: %w", err) 1455 1493 } 1456 1494 1457 1495 result.Document = doc 1458 1496 result.Source = "bundle" 1459 1497 result.TotalTime = time.Since(totalStart) 1498 + 1499 + // Record stats 1500 + m.recordResolverTiming(result, nil) 1460 1501 1461 1502 return result, nil 1462 1503 } ··· 1631 1672 func (m *Manager) GetHandleResolver() *handleresolver.Client { 1632 1673 return m.handleResolver 1633 1674 } 1675 + 1676 + // recordResolverTiming records resolver performance metrics 1677 + func (m *Manager) recordResolverTiming(result *ResolveDIDResult, _ error) { 1678 + m.resolverStats.Lock() 1679 + defer m.resolverStats.Unlock() 1680 + 1681 + // Increment counters 1682 + atomic.AddInt64(&m.resolverStats.totalResolutions, 1) 1683 + 1684 + switch result.Source { 1685 + case "mempool": 1686 + atomic.AddInt64(&m.resolverStats.mempoolHits, 1) 1687 + case "bundle": 1688 + atomic.AddInt64(&m.resolverStats.bundleHits, 1) 1689 + } 1690 + 1691 + // Record timings 1692 + timing := resolverTiming{ 1693 + totalTime: result.TotalTime.Microseconds(), 1694 + mempoolTime: result.MempoolTime.Microseconds(), 1695 + indexTime: result.IndexTime.Microseconds(), 1696 + loadOpTime: result.LoadOpTime.Microseconds(), 1697 + source: result.Source, 1698 + } 1699 + 1700 + atomic.AddInt64(&m.resolverStats.totalTime, timing.totalTime) 1701 + atomic.AddInt64(&m.resolverStats.totalMempoolTime, timing.mempoolTime) 1702 + atomic.AddInt64(&m.resolverStats.totalIndexTime, timing.indexTime) 1703 + atomic.AddInt64(&m.resolverStats.totalLoadOpTime, timing.loadOpTime) 1704 + 1705 + // Add to circular buffer 1706 + m.resolverStats.recentTimes[m.resolverStats.recentIdx] = timing 1707 + m.resolverStats.recentIdx = (m.resolverStats.recentIdx + 1) % m.resolverStats.recentSize 1708 + } 1709 + 1710 + // GetResolverStats returns resolver performance statistics 1711 + func (m *Manager) GetResolverStats() map[string]interface{} { 1712 + totalResolutions := atomic.LoadInt64(&m.resolverStats.totalResolutions) 1713 + 1714 + if totalResolutions == 0 { 1715 + return map[string]interface{}{ 1716 + "total_resolutions": 0, 1717 + } 1718 + } 1719 + 1720 + mempoolHits := atomic.LoadInt64(&m.resolverStats.mempoolHits) 1721 + bundleHits := atomic.LoadInt64(&m.resolverStats.bundleHits) 1722 + errors := atomic.LoadInt64(&m.resolverStats.errors) 1723 + 1724 + totalTime := atomic.LoadInt64(&m.resolverStats.totalTime) 1725 + totalMempoolTime := atomic.LoadInt64(&m.resolverStats.totalMempoolTime) 1726 + totalIndexTime := atomic.LoadInt64(&m.resolverStats.totalIndexTime) 1727 + totalLoadOpTime := atomic.LoadInt64(&m.resolverStats.totalLoadOpTime) 1728 + 1729 + // Calculate overall averages 1730 + avgTotalMs := float64(totalTime) / float64(totalResolutions) / 1000.0 1731 + avgMempoolMs := float64(totalMempoolTime) / float64(totalResolutions) / 1000.0 1732 + 1733 + stats := map[string]interface{}{ 1734 + "total_resolutions": totalResolutions, 1735 + "mempool_hits": mempoolHits, 1736 + "bundle_hits": bundleHits, 1737 + "errors": errors, 1738 + "success_rate": float64(totalResolutions-errors) / float64(totalResolutions), 1739 + "mempool_hit_rate": float64(mempoolHits) / float64(totalResolutions), 1740 + 1741 + // Overall averages 1742 + "avg_total_time_ms": avgTotalMs, 1743 + "avg_mempool_time_ms": avgMempoolMs, 1744 + } 1745 + 1746 + // Only include bundle-specific stats if we have bundle hits 1747 + if bundleHits > 0 { 1748 + avgIndexMs := float64(totalIndexTime) / float64(bundleHits) / 1000.0 1749 + avgLoadMs := float64(totalLoadOpTime) / float64(bundleHits) / 1000.0 1750 + 1751 + stats["avg_index_time_ms"] = avgIndexMs 1752 + stats["avg_load_op_time_ms"] = avgLoadMs 1753 + } 1754 + 1755 + // Recent statistics 1756 + m.resolverStats.Lock() 1757 + recentCopy := make([]resolverTiming, m.resolverStats.recentSize) 1758 + copy(recentCopy, m.resolverStats.recentTimes) 1759 + m.resolverStats.Unlock() 1760 + 1761 + // Filter valid entries 1762 + validRecent := make([]resolverTiming, 0) 1763 + for _, t := range recentCopy { 1764 + if t.totalTime > 0 { 1765 + validRecent = append(validRecent, t) 1766 + } 1767 + } 1768 + 1769 + if len(validRecent) > 0 { 1770 + // Extract total times for percentiles 1771 + totalTimes := make([]int64, len(validRecent)) 1772 + for i, t := range validRecent { 1773 + totalTimes[i] = t.totalTime 1774 + } 1775 + sort.Slice(totalTimes, func(i, j int) bool { 1776 + return totalTimes[i] < totalTimes[j] 1777 + }) 1778 + 1779 + // Calculate recent average 1780 + var recentSum int64 1781 + var recentMempoolSum int64 1782 + var recentIndexSum int64 1783 + var recentLoadSum int64 1784 + recentBundleCount := 0 1785 + 1786 + for _, t := range validRecent { 1787 + recentSum += t.totalTime 1788 + recentMempoolSum += t.mempoolTime 1789 + if t.source == "bundle" { 1790 + recentIndexSum += t.indexTime 1791 + recentLoadSum += t.loadOpTime 1792 + recentBundleCount++ 1793 + } 1794 + } 1795 + 1796 + stats["recent_avg_total_time_ms"] = float64(recentSum) / float64(len(validRecent)) / 1000.0 1797 + stats["recent_avg_mempool_time_ms"] = float64(recentMempoolSum) / float64(len(validRecent)) / 1000.0 1798 + 1799 + if recentBundleCount > 0 { 1800 + stats["recent_avg_index_time_ms"] = float64(recentIndexSum) / float64(recentBundleCount) / 1000.0 1801 + stats["recent_avg_load_time_ms"] = float64(recentLoadSum) / float64(recentBundleCount) / 1000.0 1802 + } 1803 + 1804 + stats["recent_sample_size"] = len(validRecent) 1805 + 1806 + // Percentiles 1807 + p50idx := len(totalTimes) * 50 / 100 1808 + p95idx := len(totalTimes) * 95 / 100 1809 + p99idx := len(totalTimes) * 99 / 100 1810 + 1811 + stats["min_total_time_ms"] = float64(totalTimes[0]) / 1000.0 1812 + stats["max_total_time_ms"] = float64(totalTimes[len(totalTimes)-1]) / 1000.0 1813 + 1814 + if p50idx < len(totalTimes) { 1815 + stats["p50_total_time_ms"] = float64(totalTimes[p50idx]) / 1000.0 1816 + } 1817 + if p95idx < len(totalTimes) { 1818 + stats["p95_total_time_ms"] = float64(totalTimes[p95idx]) / 1000.0 1819 + } 1820 + if p99idx < len(totalTimes) { 1821 + stats["p99_total_time_ms"] = float64(totalTimes[p99idx]) / 1000.0 1822 + } 1823 + } 1824 + 1825 + return stats 1826 + } 1827 + 1828 + // ResetResolverStats resets resolver performance statistics 1829 + func (m *Manager) ResetResolverStats() { 1830 + m.resolverStats.Lock() 1831 + defer m.resolverStats.Unlock() 1832 + 1833 + atomic.StoreInt64(&m.resolverStats.totalResolutions, 0) 1834 + atomic.StoreInt64(&m.resolverStats.mempoolHits, 0) 1835 + atomic.StoreInt64(&m.resolverStats.bundleHits, 0) 1836 + atomic.StoreInt64(&m.resolverStats.errors, 0) 1837 + atomic.StoreInt64(&m.resolverStats.totalTime, 0) 1838 + atomic.StoreInt64(&m.resolverStats.totalMempoolTime, 0) 1839 + atomic.StoreInt64(&m.resolverStats.totalIndexTime, 0) 1840 + atomic.StoreInt64(&m.resolverStats.totalLoadOpTime, 0) 1841 + 1842 + m.resolverStats.recentTimes = make([]resolverTiming, m.resolverStats.recentSize) 1843 + m.resolverStats.recentIdx = 0 1844 + }
+8
bundle/types.go
··· 207 207 BundleNumber int // if from bundle 208 208 Position int // if from bundle 209 209 } 210 + 211 + type resolverTiming struct { 212 + totalTime int64 213 + mempoolTime int64 214 + indexTime int64 215 + loadOpTime int64 216 + source string // "mempool" or "bundle" 217 + }
+1 -1
cmd/plcbundle/commands/did.go
··· 1217 1217 return nil 1218 1218 } 1219 1219 1220 - func displayLookupResults(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation, totalElapsed, lookupElapsed, mempoolElapsed time.Duration, verbose bool, stats map[string]interface{}) error { 1220 + func displayLookupResults(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation, totalElapsed, lookupElapsed, mempoolElapsed time.Duration, verbose bool, _ map[string]interface{}) error { 1221 1221 nullifiedCount := 0 1222 1222 for _, owl := range opsWithLoc { 1223 1223 if owl.Operation.IsNullified() {
+1 -331
docs/cli.md
··· 1 1 # CLI Guide 2 2 3 - A concise guide to using the `plcbundle` command-line tool. 4 - 5 - ## Installation 6 - 7 - ```bash 8 - go install tangled.org/atscan.net/plcbundle/cmd/plcbundle@latest 9 - plcbundle version # Verify installation 10 - ``` 11 - 12 - ## Quick Start 13 - 14 - ```bash 15 - mkdir plc_archive && cd plc_archive 16 - plcbundle fetch -count 1 # Fetch one bundle 17 - plcbundle info # Check what you have 18 - ``` 19 - 20 - --- 21 - 22 - ## Commands 23 - 24 - ### `fetch` - Download from PLC Directory 25 - 26 - Fetches operations from PLC directory and creates bundles. 27 - 28 - ```bash 29 - plcbundle fetch -count 1 # Fetch exactly 1 bundle 30 - plcbundle fetch -count 10 # Fetch 10 bundles 31 - plcbundle fetch # Fetch continuously until caught up 32 - ``` 33 - 34 - **Important:** Without `-count`, fetch runs indefinitely. Always use `-count N` for controlled fetching. 35 - 36 - **Options:** 37 - - `-count N` - Number of bundles to fetch (0 = all available) 38 - - `-plc URL` - Custom PLC directory URL (default: `https://plc.directory`) 39 - 40 - --- 41 - 42 - ### `clone` - Download from Remote Server 43 - 44 - Downloads pre-made bundles from another plcbundle server (much faster than fetch). 45 - 46 - ```bash 47 - plcbundle clone https://plc.example.com 48 - plcbundle clone https://plc.example.com -workers 16 # Faster with more workers 49 - ``` 50 - 51 - **Resumable:** Press Ctrl+C to stop, run again to resume. 52 - 53 - **Options:** 54 - - `-workers N` - Concurrent downloads (default: 4) 55 - - `-v` - Verbose output 56 - - `-skip-existing` - Skip existing bundles (default: true) 57 - 58 - --- 59 - 60 - ### `info` - View Archive Status 61 - 62 - Shows bundle count, storage size, time ranges, and chain hashes. 63 - 64 - ```bash 65 - plcbundle info # General overview 66 - plcbundle info -bundle 42 # Specific bundle details 67 - plcbundle info --bundles # List all bundles 68 - plcbundle info --verify # Info + chain verification 69 - ``` 70 - 71 - --- 72 - 73 - ### `verify` - Check Integrity 74 - 75 - Verifies file hashes and chain links. 76 - 77 - ```bash 78 - plcbundle verify # Verify entire chain 79 - plcbundle verify -bundle 42 # Verify one bundle 80 - plcbundle verify -v # Verbose output 81 - ``` 82 - 83 - --- 84 - 85 - ### `rebuild` - Recreate Index 86 - 87 - Scans bundle files and rebuilds `index.json`. 88 - 89 - ```bash 90 - plcbundle rebuild # Auto-detect CPU cores 91 - plcbundle rebuild -workers 8 # Use 8 workers 92 - ``` 93 - 94 - **When to use:** 95 - - Lost/corrupted `index.json` 96 - - Added bundle files manually 97 - - Moved files from another location 98 - 99 - --- 100 - 101 - ### `export` - Extract Operations 102 - 103 - Exports operations as JSONL to stdout. 104 - 105 - ```bash 106 - plcbundle export -count 1000 > ops.jsonl 107 - plcbundle export -after "2024-01-01T00:00:00Z" -count 5000 > jan.jsonl 108 - ``` 109 - 110 - --- 111 - 112 - ### `backfill` - Stream All Operations 113 - 114 - Streams operations from all bundles, fetching missing ones on-demand. 115 - 116 - ```bash 117 - plcbundle backfill > all.jsonl 118 - plcbundle backfill -start 100 -end 200 > range.jsonl 119 - ``` 120 - 121 - --- 122 - 123 - ### `mempool` - Inspect Staging Area 124 - 125 - Shows operations waiting to form a bundle (need 10,000 to create bundle). 126 - 127 - ```bash 128 - plcbundle mempool # Show status 129 - plcbundle mempool -export > mem.jsonl # Export mempool ops 130 - plcbundle mempool -validate # Verify chronological order 131 - plcbundle mempool -clear # Clear (destructive) 132 - ``` 133 - 134 - --- 135 - 136 - ### `serve` - Run HTTP Server 137 - 138 - Starts an HTTP server to share bundles with others. 139 - 140 - ```bash 141 - plcbundle serve # Start on :8080 142 - plcbundle serve -port 9000 -host 0.0.0.0 # Custom port/host 143 - plcbundle serve -sync -sync-interval 5m # Auto-fetch new bundles 144 - plcbundle serve -websocket # Enable WebSocket streaming 145 - ``` 146 - 147 - **Endpoints:** 148 - - `GET /` - Info page 149 - - `GET /index.json` - Bundle index 150 - - `GET /data/:number` - Download bundle 151 - - `WS /ws` - WebSocket stream (if enabled) 152 - 153 - --- 154 - 155 - ### `compare` - Compare with Remote 156 - 157 - Shows differences between local and remote archives. 158 - 159 - ```bash 160 - plcbundle compare https://plc.example.com 161 - plcbundle compare https://plc.example.com --fetch-missing # Auto-fix 162 - ``` 163 - 164 - --- 165 - 166 - ### `version` - Show Version 167 - 168 - ```bash 169 - plcbundle version 170 - ``` 171 - 172 - --- 173 - 174 - ## Important Concepts 175 - 176 - ### Working Directory 177 - 178 - plcbundle operates in your **current directory**. Always `cd` to your archive first: 179 - 180 - ```bash 181 - cd /path/to/plc_archive 182 - plcbundle info 183 - ``` 184 - 185 - ### Files Created 186 - 187 - ``` 188 - plc_archive/ 189 - ├── 000001.jsonl.zst # Bundle files (10k ops each) 190 - ├── 000002.jsonl.zst 191 - ├── index.json # Index (metadata + hashes) 192 - └── plc_mempool_*.jsonl # Mempool (auto-managed, temporary) 193 - ``` 194 - 195 - ### Fetch vs Clone 196 - 197 - **Use `fetch`** when: 198 - - No mirror available 199 - - Want data directly from PLC 200 - - Building from scratch 201 - 202 - **Use `clone`** when: 203 - - A mirror exists 204 - - Want faster setup 205 - - Syncing with known good source 206 - 207 - --- 208 - 209 - ## Common Tasks 210 - 211 - **Initial setup from mirror:** 212 - ```bash 213 - mkdir plc_archive && cd plc_archive 214 - plcbundle clone https://plc.example.com -workers 16 215 - plcbundle verify 216 - ``` 217 - 218 - **Initial setup from PLC:** 219 - ```bash 220 - mkdir plc_archive && cd plc_archive 221 - plcbundle fetch -count 0 # Fetch all (can take hours) 222 - ``` 223 - 224 - **Daily sync (cron):** 225 - ```bash 226 - #!/bin/bash 227 - cd /path/to/plc_archive 228 - plcbundle fetch -count 5 # Fetch up to 5 new bundles 229 - ``` 230 - 231 - **Share your archive:** 232 - ```bash 233 - plcbundle serve -host 0.0.0.0 -sync 234 - ``` 235 - 236 - **Export recent data:** 237 - ```bash 238 - plcbundle export -count 10000 > recent.jsonl 239 - cat recent.jsonl | jq . # Process with jq 240 - ``` 241 - 242 - **Fix corrupted index:** 243 - ```bash 244 - plcbundle rebuild 245 - plcbundle verify 246 - ``` 247 - 248 - --- 249 - 250 - ## Troubleshooting 251 - 252 - **Command not found:** 253 - ```bash 254 - export PATH=$PATH:$(go env GOPATH)/bin 255 - ``` 256 - 257 - **Wrong directory:** 258 - ```bash 259 - pwd # Check where you are 260 - cd /path/to/plc_archive 261 - ``` 262 - 263 - **Fetch doesn't create bundle:** 264 - ```bash 265 - plcbundle mempool # Check if waiting for more ops 266 - # Need 10,000 operations to create a bundle 267 - ``` 268 - 269 - **Port already in use:** 270 - ```bash 271 - plcbundle serve -port 9000 272 - ``` 273 - 274 - **Hash verification failed:** 275 - ```bash 276 - rm 000042.jsonl.zst # Delete corrupted bundle 277 - plcbundle rebuild # Mark as missing 278 - plcbundle fetch -count 1 # Re-fetch 279 - ``` 280 - 281 - **Out of disk space:** 282 - ```bash 283 - df -h . # Check space 284 - # Move to larger disk or delete old bundles 285 - ``` 286 - 287 - --- 288 - 289 - ## Quick Reference 290 - 291 - ```bash 292 - # Fetch 293 - plcbundle fetch -count 1 # One bundle 294 - plcbundle fetch # All available 295 - 296 - # Clone 297 - plcbundle clone <url> # From mirror 298 - plcbundle clone <url> -workers 16 # Faster 299 - 300 - # Info 301 - plcbundle info # Overview 302 - plcbundle info -bundle 42 # Specific bundle 303 - 304 - # Verify 305 - plcbundle verify # Check chain 306 - plcbundle verify -bundle 42 # Check one 307 - 308 - # Rebuild 309 - plcbundle rebuild # Recreate index 310 - 311 - # Export 312 - plcbundle export -count 1000 > ops.jsonl 313 - 314 - # Serve 315 - plcbundle serve # Share bundles 316 - plcbundle serve -sync -websocket # Full-featured 317 - 318 - # Utilities 319 - plcbundle mempool # Check staging 320 - plcbundle compare <url> # Compare with remote 321 - plcbundle backfill > all.jsonl # Export all 322 - ``` 323 - 324 - --- 325 - 326 - ## Getting Help 327 - 328 - ```bash 329 - plcbundle <command> -h # Command-specific help 330 - ``` 331 - 332 - **Report issues:** https://tangled.org/@atscan.net/plcbundle/issues 333 - 3 + TODO
+1 -1666
docs/library.md
··· 14 14 15 15 --- 16 16 17 - ## Getting Started 18 - 19 - ### Installation 20 - 21 - ```bash 22 - go get tangled.org/atscan.net/plcbundle 23 - ``` 24 - 25 - ### Your First Program 26 - 27 - Create a simple program to fetch and display bundle information: 28 - 29 - ```go 30 - package main 31 - 32 - import ( 33 - "context" 34 - "log" 35 - 36 - plcbundle "tangled.org/atscan.net/plcbundle" 37 - ) 38 - 39 - func main() { 40 - // Create a manager 41 - mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 42 - if err != nil { 43 - log.Fatal(err) 44 - } 45 - defer mgr.Close() 46 - 47 - // Get repository info 48 - info := mgr.GetInfo() 49 - log.Printf("Bundle directory: %s", info["bundle_dir"]) 50 - 51 - // Get index stats 52 - index := mgr.GetIndex() 53 - stats := index.GetStats() 54 - log.Printf("Total bundles: %d", stats["bundle_count"]) 55 - } 56 - ``` 57 - 58 - Run it: 59 - ```bash 60 - go run main.go 61 - # 2025/01/15 10:30:00 Bundle directory: ./plc_data 62 - # 2025/01/15 10:30:00 Total bundles: 0 63 - ``` 64 - 65 - ### Fetching Your First Bundle 66 - 67 - Let's fetch a bundle from the PLC directory: 68 - 69 - ```go 70 - package main 71 - 72 - import ( 73 - "context" 74 - "log" 75 - 76 - plcbundle "tangled.org/atscan.net/plcbundle" 77 - ) 78 - 79 - func main() { 80 - mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 81 - if err != nil { 82 - log.Fatal(err) 83 - } 84 - defer mgr.Close() 85 - 86 - ctx := context.Background() 87 - 88 - // Fetch next bundle 89 - log.Println("Fetching bundle...") 90 - bundle, err := mgr.FetchNext(ctx) 91 - if err != nil { 92 - log.Fatal(err) 93 - } 94 - 95 - log.Printf("✓ Fetched bundle %d", bundle.BundleNumber) 96 - log.Printf(" Operations: %d", len(bundle.Operations)) 97 - log.Printf(" Unique DIDs: %d", bundle.DIDCount) 98 - log.Printf(" Time range: %s to %s", 99 - bundle.StartTime.Format("2006-01-02"), 100 - bundle.EndTime.Format("2006-01-02")) 101 - } 102 - ``` 103 - 104 - **What's happening here?** 105 - 106 - 1. `plcbundle.New()` creates a manager that handles all bundle operations 107 - 2. `FetchNext()` automatically: 108 - - Fetches operations from PLC directory 109 - - Creates a bundle when 10,000 operations are collected 110 - - Saves the bundle to disk 111 - - Updates the index 112 - - Returns the bundle object 113 - 114 - ### Reading Bundles 115 - 116 - Once you have bundles, you can load and read them: 117 - 118 - ```go 119 - package main 120 - 121 - import ( 122 - "context" 123 - "log" 124 - 125 - plcbundle "tangled.org/atscan.net/plcbundle" 126 - ) 127 - 128 - func main() { 129 - mgr, err := plcbundle.New("./plc_data", "") 130 - if err != nil { 131 - log.Fatal(err) 132 - } 133 - defer mgr.Close() 134 - 135 - ctx := context.Background() 136 - 137 - // Load bundle 1 138 - bundle, err := mgr.Load(ctx, 1) 139 - if err != nil { 140 - log.Fatal(err) 141 - } 142 - 143 - log.Printf("Bundle %d loaded", bundle.BundleNumber) 144 - 145 - // Iterate through operations 146 - for i, op := range bundle.Operations { 147 - if i >= 5 { 148 - break // Just show first 5 149 - } 150 - log.Printf("%d. DID: %s, CID: %s", i+1, op.DID, op.CID) 151 - } 152 - } 153 - ``` 154 - 155 - --- 156 - 157 - ## Core Concepts 158 - 159 - ### The Manager 160 - 161 - The `Manager` is your main entry point. It handles: 162 - - Bundle storage and retrieval 163 - - Index management 164 - - PLC directory synchronization 165 - - Verification 166 - - Mempool management 167 - 168 - **Creating a manager:** 169 - 170 - ```go 171 - // Simple creation 172 - mgr, err := plcbundle.New("./bundles", "https://plc.directory") 173 - 174 - // Custom configuration 175 - config := plcbundle.DefaultConfig("./bundles") 176 - config.VerifyOnLoad = true 177 - config.AutoRebuild = true 178 - 179 - plcClient := plcbundle.NewPLCClient("https://plc.directory") 180 - mgr, err := plcbundle.NewManager(config, plcClient) 181 - ``` 182 - 183 - ### Bundles 184 - 185 - A bundle contains exactly 10,000 operations: 186 - 187 - ```go 188 - type Bundle struct { 189 - BundleNumber int // Sequential number (1, 2, 3...) 190 - StartTime time.Time // First operation timestamp 191 - EndTime time.Time // Last operation timestamp 192 - Operations []plcclient.PLCOperation // The 10,000 operations 193 - DIDCount int // Unique DIDs in bundle 194 - Hash string // Chain hash (includes history) 195 - ContentHash string // This bundle's content hash 196 - Parent string // Previous bundle's chain hash 197 - CompressedSize int64 // File size on disk 198 - UncompressedSize int64 // Original JSONL size 199 - } 200 - ``` 201 - 202 - ### The Index 203 - 204 - The index tracks all bundles and their metadata: 205 - 206 - ```go 207 - index := mgr.GetIndex() 208 - 209 - // Get all bundles 210 - bundles := index.GetBundles() 211 - for _, meta := range bundles { 212 - log.Printf("Bundle %d: %s to %s", 213 - meta.BundleNumber, 214 - meta.StartTime.Format("2006-01-02"), 215 - meta.EndTime.Format("2006-01-02")) 216 - } 217 - 218 - // Get specific bundle metadata 219 - meta, err := index.GetBundle(42) 220 - 221 - // Get last bundle 222 - lastBundle := index.GetLastBundle() 223 - ``` 224 - 225 - ### Operations 226 - 227 - Each operation represents a DID PLC directory event: 228 - 229 - ```go 230 - type PLCOperation struct { 231 - DID string // The DID (did:plc:...) 232 - Operation json.RawMessage // Raw JSON bytes (use GetOperationMap() to parse) 233 - CID string // Content identifier 234 - Nullified interface{} // nil, false, or CID string 235 - CreatedAt time.Time // When it was created 236 - 237 - // Internal fields (populated automatically) 238 - RawJSON []byte // Original JSON line 239 - ParsedOperation map[string]interface{} // Cached parsed data 240 - } 241 - 242 - // Accessing operation data: 243 - operation, err := op.GetOperationMap() // Parses Operation field (cached) 244 - if err != nil || operation == nil { 245 - return 246 - } 247 - 248 - // Now you can access fields 249 - services := operation["services"].(map[string]interface{}) 250 - 251 - // Check if operation was nullified 252 - if op.IsNullified() { 253 - log.Printf("Operation %s was nullified by %s", op.CID, op.GetNullifyingCID()) 254 - } 255 - ``` 256 - 257 - ### Accessing Operation Data 258 - 259 - The `Operation` field uses lazy parsing for performance. Always parse it before accessing: 260 - 261 - ```go 262 - // ❌ Wrong - won't compile 263 - services := op.Operation["services"] 264 - 265 - // ✅ Correct 266 - operation, err := op.GetOperationMap() 267 - if err != nil || operation == nil { 268 - return 269 - } 270 - services, ok := operation["services"].(map[string]interface{}) 271 - ``` 272 - 273 - The parsed data is cached, so repeated calls are fast: 274 - // First call: parses JSON 275 - data1, _ := op.GetOperationMap() 276 - 277 - // Second call: returns cached data (fast) 278 - data2, _ := op.GetOperationMap() 279 - 280 - --- 281 - 282 - ## Common Patterns 283 - 284 - ### Pattern 1: Transparent Sync Service 285 - 286 - **Goal:** Keep a local PLC mirror continuously synchronized. 287 - 288 - This is the most common use case - maintaining an up-to-date copy of the PLC directory. 289 - 290 - ```go 291 - package main 292 - 293 - import ( 294 - "context" 295 - "log" 296 - "os" 297 - "os/signal" 298 - "syscall" 299 - "time" 300 - 301 - plcbundle "tangled.org/atscan.net/plcbundle" 302 - ) 303 - 304 - type SyncService struct { 305 - mgr *plcbundle.Manager 306 - interval time.Duration 307 - stop chan struct{} 308 - } 309 - 310 - func NewSyncService(bundleDir string, interval time.Duration) (*SyncService, error) { 311 - mgr, err := plcbundle.New(bundleDir, "https://plc.directory") 312 - if err != nil { 313 - return nil, err 314 - } 315 - 316 - return &SyncService{ 317 - mgr: mgr, 318 - interval: interval, 319 - stop: make(chan struct{}), 320 - }, nil 321 - } 322 - 323 - func (s *SyncService) Start() { 324 - log.Println("Starting sync service...") 325 - 326 - // Initial sync 327 - s.sync() 328 - 329 - // Periodic sync 330 - ticker := time.NewTicker(s.interval) 331 - defer ticker.Stop() 332 - 333 - for { 334 - select { 335 - case <-ticker.C: 336 - s.sync() 337 - case <-s.stop: 338 - log.Println("Sync service stopped") 339 - return 340 - } 341 - } 342 - } 343 - 344 - func (s *SyncService) sync() { 345 - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) 346 - defer cancel() 347 - 348 - log.Println("Checking for new bundles...") 349 - 350 - fetched := 0 351 - for { 352 - bundle, err := s.mgr.FetchNext(ctx) 353 - if err != nil { 354 - if isInsufficientOps(err) { 355 - if fetched > 0 { 356 - log.Printf("✓ Synced %d new bundles", fetched) 357 - } else { 358 - log.Println("✓ Up to date") 359 - } 360 - return 361 - } 362 - log.Printf("Error: %v", err) 363 - return 364 - } 365 - 366 - fetched++ 367 - log.Printf("✓ Fetched bundle %d (%d ops, %d DIDs)", 368 - bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount) 369 - } 370 - } 371 - 372 - func (s *SyncService) Stop() { 373 - close(s.stop) 374 - s.mgr.Close() 375 - } 376 - 377 - func isInsufficientOps(err error) bool { 378 - return err != nil && 379 - (strings.Contains(err.Error(), "insufficient operations") || 380 - strings.Contains(err.Error(), "no more available")) 381 - } 382 - 383 - func main() { 384 - service, err := NewSyncService("./plc_data", 5*time.Minute) 385 - if err != nil { 386 - log.Fatal(err) 387 - } 388 - 389 - // Start service in background 390 - go service.Start() 391 - 392 - // Wait for interrupt 393 - sigChan := make(chan os.Signal, 1) 394 - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 395 - <-sigChan 396 - 397 - log.Println("Shutting down...") 398 - service.Stop() 399 - } 400 - ``` 401 - 402 - **Usage:** 403 - ```bash 404 - go run main.go 405 - # Starting sync service... 406 - # Checking for new bundles... 407 - # ✓ Fetched bundle 8548 (10000 ops, 8234 DIDs) 408 - # ✓ Fetched bundle 8549 (10000 ops, 8156 DIDs) 409 - # ✓ Up to date 410 - # ... (repeats every 5 minutes) 411 - ``` 412 - 413 - ### Pattern 2: Reading and Processing Operations 414 - 415 - **Goal:** Process all historical operations for analysis. 416 - 417 - ```go 418 - package main 419 - 420 - import ( 421 - "context" 422 - "log" 423 - 424 - plcbundle "tangled.org/atscan.net/plcbundle" 425 - ) 426 - 427 - type OperationProcessor struct { 428 - mgr *plcbundle.Manager 429 - } 430 - 431 - func NewOperationProcessor(bundleDir string) (*OperationProcessor, error) { 432 - mgr, err := plcbundle.New(bundleDir, "") 433 - if err != nil { 434 - return nil, err 435 - } 436 - 437 - return &OperationProcessor{mgr: mgr}, nil 438 - } 439 - 440 - func (p *OperationProcessor) ProcessAll() error { 441 - ctx := context.Background() 442 - 443 - index := p.mgr.GetIndex() 444 - bundles := index.GetBundles() 445 - 446 - log.Printf("Processing %d bundles...", len(bundles)) 447 - 448 - totalOps := 0 449 - uniqueDIDs := make(map[string]bool) 450 - 451 - for _, meta := range bundles { 452 - // Load bundle 453 - bundle, err := p.mgr.Load(ctx, meta.BundleNumber) 454 - if err != nil { 455 - return err 456 - } 457 - 458 - // Process operations 459 - for _, op := range bundle.Operations { 460 - totalOps++ 461 - uniqueDIDs[op.DID] = true 462 - 463 - // Your processing logic here 464 - p.processOperation(op) 465 - } 466 - 467 - if meta.BundleNumber % 100 == 0 { 468 - log.Printf("Processed bundle %d...", meta.BundleNumber) 469 - } 470 - } 471 - 472 - log.Printf("✓ Processed %d operations from %d unique DIDs", 473 - totalOps, len(uniqueDIDs)) 474 - 475 - return nil 476 - } 477 - 478 - func (p *OperationProcessor) processOperation(op plcbundle.PLCOperation) { 479 - // Parse Operation field on-demand 480 - operation, err := op.GetOperationMap() 481 - if err != nil || operation == nil { 482 - return 483 - } 484 - 485 - // Example: Extract PDS endpoints 486 - if services, ok := operation["services"].(map[string]interface{}); ok { 487 - if pds, ok := services["atproto_pds"].(map[string]interface{}); ok { 488 - if endpoint, ok := pds["endpoint"].(string); ok { 489 - log.Printf("DID %s uses PDS: %s", op.DID, endpoint) 490 - } 491 - } 492 - } 493 - } 494 - 495 - 496 - func main() { 497 - processor, err := NewOperationProcessor("./plc_data") 498 - if err != nil { 499 - log.Fatal(err) 500 - } 501 - 502 - if err := processor.ProcessAll(); err != nil { 503 - log.Fatal(err) 504 - } 505 - } 506 - ``` 507 - 508 - ### Pattern 3: Time-Based Queries 509 - 510 - **Goal:** Export operations from a specific time period. 511 - 512 - ```go 513 - package main 514 - 515 - import ( 516 - "context" 517 - "encoding/json" 518 - "log" 519 - "os" 520 - "time" 521 - 522 - plcbundle "tangled.org/atscan.net/plcbundle" 523 - ) 524 - 525 - func exportOperationsSince(bundleDir string, since time.Time, limit int) error { 526 - mgr, err := plcbundle.New(bundleDir, "") 527 - if err != nil { 528 - return err 529 - } 530 - defer mgr.Close() 531 - 532 - ctx := context.Background() 533 - 534 - // Export operations after timestamp 535 - ops, err := mgr.Export(ctx, since, limit) 536 - if err != nil { 537 - return err 538 - } 539 - 540 - log.Printf("Exporting %d operations...", len(ops)) 541 - 542 - // Write as JSONL to stdout 543 - encoder := json.NewEncoder(os.Stdout) 544 - for _, op := range ops { 545 - if err := encoder.Encode(op); err != nil { 546 - return err 547 - } 548 - } 549 - 550 - return nil 551 - } 552 - 553 - func main() { 554 - // Export operations from the last 7 days 555 - since := time.Now().AddDate(0, 0, -7) 556 - 557 - if err := exportOperationsSince("./plc_data", since, 50000); err != nil { 558 - log.Fatal(err) 559 - } 560 - } 561 - ``` 562 - 563 - **Output to file:** 564 - ```bash 565 - go run main.go > last_7_days.jsonl 566 - ``` 567 - 568 - ### Pattern 4: Verification Service 569 - 570 - **Goal:** Periodically verify bundle integrity. 571 - 572 - ```go 573 - package main 574 - 575 - import ( 576 - "context" 577 - "log" 578 - "time" 579 - 580 - plcbundle "tangled.org/atscan.net/plcbundle" 581 - ) 582 - 583 - type VerificationService struct { 584 - mgr *plcbundle.Manager 585 - interval time.Duration 586 - } 587 - 588 - func NewVerificationService(bundleDir string, interval time.Duration) (*VerificationService, error) { 589 - mgr, err := plcbundle.New(bundleDir, "") 590 - if err != nil { 591 - return nil, err 592 - } 593 - 594 - return &VerificationService{ 595 - mgr: mgr, 596 - interval: interval, 597 - }, nil 598 - } 599 - 600 - func (v *VerificationService) Start() { 601 - ticker := time.NewTicker(v.interval) 602 - defer ticker.Stop() 603 - 604 - // Verify immediately on start 605 - v.verify() 606 - 607 - for range ticker.C { 608 - v.verify() 609 - } 610 - } 611 - 612 - func (v *VerificationService) verify() { 613 - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) 614 - defer cancel() 615 - 616 - log.Println("Starting chain verification...") 617 - start := time.Now() 618 - 619 - result, err := v.mgr.VerifyChain(ctx) 620 - if err != nil { 621 - log.Printf("❌ Verification error: %v", err) 622 - return 623 - } 624 - 625 - elapsed := time.Since(start) 626 - 627 - if result.Valid { 628 - log.Printf("✅ Chain verified: %d bundles, took %s", 629 - result.ChainLength, elapsed.Round(time.Second)) 630 - 631 - // Get head hash 632 - index := v.mgr.GetIndex() 633 - if last := index.GetLastBundle(); last != nil { 634 - log.Printf(" Head hash: %s...", last.Hash[:16]) 635 - } 636 - } else { 637 - log.Printf("❌ Chain broken at bundle %d: %s", 638 - result.BrokenAt, result.Error) 639 - 640 - // Alert or take action 641 - v.handleBrokenChain(result) 642 - } 643 - } 644 - 645 - func (v *VerificationService) handleBrokenChain(result *plcbundle.ChainVerificationResult) { 646 - // Send alert, trigger re-sync, etc. 647 - log.Printf("⚠️ ALERT: Chain integrity compromised!") 648 - // TODO: Implement your alerting logic 649 - } 650 - 651 - func main() { 652 - service, err := NewVerificationService("./plc_data", 24*time.Hour) 653 - if err != nil { 654 - log.Fatal(err) 655 - } 656 - 657 - log.Println("Verification service started (daily checks)") 658 - service.Start() 659 - } 660 - ``` 661 - 662 - ### Pattern 5: Custom HTTP API 663 - 664 - **Goal:** Build a custom API on top of your bundle archive. 665 - 666 - ```go 667 - package main 668 - 669 - import ( 670 - "encoding/json" 671 - "log" 672 - "net/http" 673 - "strconv" 674 - 675 - plcbundle "tangled.org/atscan.net/plcbundle" 676 - ) 677 - 678 - type API struct { 679 - mgr *plcbundle.Manager 680 - } 681 - 682 - func NewAPI(bundleDir string) (*API, error) { 683 - mgr, err := plcbundle.New(bundleDir, "") 684 - if err != nil { 685 - return nil, err 686 - } 687 - 688 - return &API{mgr: mgr}, nil 689 - } 690 - 691 - func (api *API) handleStats(w http.ResponseWriter, r *http.Request) { 692 - index := api.mgr.GetIndex() 693 - stats := index.GetStats() 694 - 695 - response := map[string]interface{}{ 696 - "bundles": stats["bundle_count"], 697 - "first": stats["first_bundle"], 698 - "last": stats["last_bundle"], 699 - "total_size": stats["total_size"], 700 - "start_time": stats["start_time"], 701 - "end_time": stats["end_time"], 702 - "updated_at": stats["updated_at"], 703 - } 704 - 705 - w.Header().Set("Content-Type", "application/json") 706 - json.NewEncoder(w).Encode(response) 707 - } 708 - 709 - func (api *API) handleOperations(w http.ResponseWriter, r *http.Request) { 710 - bundleNumStr := r.URL.Query().Get("bundle") 711 - if bundleNumStr == "" { 712 - http.Error(w, "bundle parameter required", http.StatusBadRequest) 713 - return 714 - } 715 - 716 - bundleNum, err := strconv.Atoi(bundleNumStr) 717 - if err != nil { 718 - http.Error(w, "invalid bundle number", http.StatusBadRequest) 719 - return 720 - } 721 - 722 - ctx := r.Context() 723 - bundle, err := api.mgr.Load(ctx, bundleNum) 724 - if err != nil { 725 - http.Error(w, err.Error(), http.StatusNotFound) 726 - return 727 - } 728 - 729 - w.Header().Set("Content-Type", "application/x-ndjson") 730 - encoder := json.NewEncoder(w) 731 - for _, op := range bundle.Operations { 732 - encoder.Encode(op) 733 - } 734 - } 735 - 736 - func (api *API) handleDID(w http.ResponseWriter, r *http.Request) { 737 - did := r.URL.Query().Get("did") 738 - if did == "" { 739 - http.Error(w, "did parameter required", http.StatusBadRequest) 740 - return 741 - } 742 - 743 - ctx := r.Context() 744 - 745 - // Search through bundles for this DID 746 - var operations []plcbundle.PLCOperation 747 - 748 - index := api.mgr.GetIndex() 749 - bundles := index.GetBundles() 750 - 751 - for _, meta := range bundles { 752 - bundle, err := api.mgr.Load(ctx, meta.BundleNumber) 753 - if err != nil { 754 - continue 755 - } 756 - 757 - for _, op := range bundle.Operations { 758 - if op.DID == did { 759 - operations = append(operations, op) 760 - } 761 - } 762 - } 763 - 764 - w.Header().Set("Content-Type", "application/json") 765 - json.NewEncoder(w).Encode(map[string]interface{}{ 766 - "did": did, 767 - "operations": operations, 768 - "count": len(operations), 769 - }) 770 - } 771 - 772 - func main() { 773 - api, err := NewAPI("./plc_data") 774 - if err != nil { 775 - log.Fatal(err) 776 - } 777 - 778 - http.HandleFunc("/stats", api.handleStats) 779 - http.HandleFunc("/operations", api.handleOperations) 780 - http.HandleFunc("/did", api.handleDID) 781 - 782 - log.Println("API listening on :8080") 783 - log.Fatal(http.ListenAndServe(":8080", nil)) 784 - } 785 - ``` 786 - 787 - **Usage:** 788 - ```bash 789 - # Get stats 790 - curl http://localhost:8080/stats 791 - 792 - # Get operations from bundle 1 793 - curl http://localhost:8080/operations?bundle=1 794 - 795 - # Get all operations for a DID 796 - curl http://localhost:8080/did?did=did:plc:example123 797 - ``` 798 - 799 - --- 800 - 801 - ## Building Applications 802 - 803 - ### Application 1: PDS Discovery Tool 804 - 805 - Find all PDS endpoints in the network: 806 - 807 - ```go 808 - package main 809 - 810 - import ( 811 - "context" 812 - "fmt" 813 - "log" 814 - 815 - plcbundle "tangled.org/atscan.net/plcbundle" 816 - ) 817 - 818 - type PDSTracker struct { 819 - mgr *plcbundle.Manager 820 - endpoints map[string]int // endpoint -> count 821 - } 822 - 823 - func NewPDSTracker(bundleDir string) (*PDSTracker, error) { 824 - mgr, err := plcbundle.New(bundleDir, "") 825 - if err != nil { 826 - return nil, err 827 - } 828 - 829 - return &PDSTracker{ 830 - mgr: mgr, 831 - endpoints: make(map[string]int), 832 - }, nil 833 - } 834 - 835 - func (pt *PDSTracker) Scan() error { 836 - ctx := context.Background() 837 - 838 - index := pt.mgr.GetIndex() 839 - bundles := index.GetBundles() 840 - 841 - log.Printf("Scanning %d bundles for PDS endpoints...", len(bundles)) 842 - 843 - for _, meta := range bundles { 844 - bundle, err := pt.mgr.Load(ctx, meta.BundleNumber) 845 - if err != nil { 846 - return err 847 - } 848 - 849 - for _, op := range bundle.Operations { 850 - if endpoint := pt.extractPDS(op); endpoint != "" { 851 - pt.endpoints[endpoint]++ 852 - } 853 - } 854 - } 855 - 856 - return nil 857 - } 858 - 859 - func (pt *PDSTracker) extractPDS(op plcbundle.PLCOperation) string { 860 - // Parse Operation field on-demand 861 - operation, err := op.GetOperationMap() 862 - if err != nil || operation == nil { 863 - return "" 864 - } 865 - 866 - services, ok := operation["services"].(map[string]interface{}) 867 - if !ok { 868 - return "" 869 - } 870 - 871 - pds, ok := services["atproto_pds"].(map[string]interface{}) 872 - if !ok { 873 - return "" 874 - } 875 - 876 - endpoint, ok := pds["endpoint"].(string) 877 - if !ok { 878 - return "" 879 - } 880 - 881 - return endpoint 882 - } 883 - 884 - 885 - func (pt *PDSTracker) PrintResults() { 886 - log.Printf("\nFound %d unique PDS endpoints:\n", len(pt.endpoints)) 887 - 888 - // Sort by count 889 - type endpointCount struct { 890 - endpoint string 891 - count int 892 - } 893 - 894 - var sorted []endpointCount 895 - for endpoint, count := range pt.endpoints { 896 - sorted = append(sorted, endpointCount{endpoint, count}) 897 - } 898 - 899 - sort.Slice(sorted, func(i, j int) bool { 900 - return sorted[i].count > sorted[j].count 901 - }) 902 - 903 - // Print top 20 904 - for i, ec := range sorted { 905 - if i >= 20 { 906 - break 907 - } 908 - fmt.Printf("%3d. %s (%d DIDs)\n", i+1, ec.endpoint, ec.count) 909 - } 910 - } 911 - 912 - func main() { 913 - tracker, err := NewPDSTracker("./plc_data") 914 - if err != nil { 915 - log.Fatal(err) 916 - } 917 - 918 - if err := tracker.Scan(); err != nil { 919 - log.Fatal(err) 920 - } 921 - 922 - tracker.PrintResults() 923 - } 924 - ``` 925 - 926 - ### Application 2: DID History Viewer 927 - 928 - View the complete history of a DID: 929 - 930 - ```go 931 - package main 932 - 933 - import ( 934 - "context" 935 - "encoding/json" 936 - "fmt" 937 - "log" 938 - "os" 939 - 940 - plcbundle "tangled.org/atscan.net/plcbundle" 941 - ) 942 - 943 - type DIDHistory struct { 944 - DID string `json:"did"` 945 - Operations []plcbundle.PLCOperation `json:"operations"` 946 - FirstSeen time.Time `json:"first_seen"` 947 - LastSeen time.Time `json:"last_seen"` 948 - OpCount int `json:"operation_count"` 949 - } 950 - 951 - func getDIDHistory(bundleDir, did string) (*DIDHistory, error) { 952 - mgr, err := plcbundle.New(bundleDir, "") 953 - if err != nil { 954 - return nil, err 955 - } 956 - defer mgr.Close() 957 - 958 - ctx := context.Background() 959 - 960 - history := &DIDHistory{ 961 - DID: did, 962 - Operations: make([]plcbundle.PLCOperation, 0), 963 - } 964 - 965 - index := mgr.GetIndex() 966 - bundles := index.GetBundles() 967 - 968 - log.Printf("Searching for DID %s...", did) 969 - 970 - for _, meta := range bundles { 971 - bundle, err := mgr.Load(ctx, meta.BundleNumber) 972 - if err != nil { 973 - continue 974 - } 975 - 976 - for _, op := range bundle.Operations { 977 - if op.DID == did { 978 - history.Operations = append(history.Operations, op) 979 - } 980 - } 981 - } 982 - 983 - if len(history.Operations) == 0 { 984 - return nil, fmt.Errorf("DID not found") 985 - } 986 - 987 - // Set timestamps 988 - history.FirstSeen = history.Operations[0].CreatedAt 989 - history.LastSeen = history.Operations[len(history.Operations)-1].CreatedAt 990 - history.OpCount = len(history.Operations) 991 - 992 - return history, nil 993 - } 994 - 995 - func main() { 996 - if len(os.Args) < 2 { 997 - log.Fatal("Usage: did-history <did>") 998 - } 999 - 1000 - did := os.Args[1] 1001 - 1002 - history, err := getDIDHistory("./plc_data", did) 1003 - if err != nil { 1004 - log.Fatal(err) 1005 - } 1006 - 1007 - // Print as JSON 1008 - encoder := json.NewEncoder(os.Stdout) 1009 - encoder.SetIndent("", " ") 1010 - encoder.Encode(history) 1011 - } 1012 - ``` 1013 - 1014 - ### Application 3: Real-time Monitor 1015 - 1016 - Monitor new operations as they arrive: 1017 - 1018 - ```go 1019 - package main 1020 - 1021 - import ( 1022 - "context" 1023 - "log" 1024 - "time" 1025 - 1026 - plcbundle "tangled.org/atscan.net/plcbundle" 1027 - ) 1028 - 1029 - type Monitor struct { 1030 - mgr *plcbundle.Manager 1031 - lastSeen int // Last bundle number processed 1032 - pollInterval time.Duration 1033 - } 1034 - 1035 - func NewMonitor(bundleDir string, pollInterval time.Duration) (*Monitor, error) { 1036 - mgr, err := plcbundle.New(bundleDir, "https://plc.directory") 1037 - if err != nil { 1038 - return nil, err 1039 - } 1040 - 1041 - // Get current position 1042 - index := mgr.GetIndex() 1043 - lastBundle := index.GetLastBundle() 1044 - lastSeen := 0 1045 - if lastBundle != nil { 1046 - lastSeen = lastBundle.BundleNumber 1047 - } 1048 - 1049 - return &Monitor{ 1050 - mgr: mgr, 1051 - lastSeen: lastSeen, 1052 - pollInterval: pollInterval, 1053 - }, nil 1054 - } 1055 - 1056 - func (m *Monitor) Start() { 1057 - log.Println("Monitor started, watching for new bundles...") 1058 - 1059 - ticker := time.NewTicker(m.pollInterval) 1060 - defer ticker.Stop() 1061 - 1062 - for range ticker.C { 1063 - m.check() 1064 - } 1065 - } 1066 - 1067 - func (m *Monitor) check() { 1068 - ctx := context.Background() 1069 - 1070 - // Try to fetch next bundle 1071 - bundle, err := m.mgr.FetchNext(ctx) 1072 - if err != nil { 1073 - // Not an error if no new bundle available 1074 - return 1075 - } 1076 - 1077 - // New bundle! 1078 - log.Printf("🔔 New bundle: %d", bundle.BundleNumber) 1079 - log.Printf(" Operations: %d", len(bundle.Operations)) 1080 - log.Printf(" DIDs: %d", bundle.DIDCount) 1081 - log.Printf(" Time: %s", bundle.EndTime.Format("2006-01-02 15:04:05")) 1082 - 1083 - // Process new operations 1084 - m.processNewOperations(bundle) 1085 - 1086 - m.lastSeen = bundle.BundleNumber 1087 - } 1088 - 1089 - func (m *Monitor) processNewOperations(bundle *plcbundle.Bundle) { 1090 - for _, op := range bundle.Operations { 1091 - // Check for interesting operations 1092 - if op.IsNullified() { 1093 - log.Printf(" ⚠️ Nullified: %s", op.DID) 1094 - } 1095 - 1096 - // Check for new DIDs (operation type "create") 1097 - operation, err := op.GetOperationMap() 1098 - if err == nil && operation != nil { 1099 - if opType, ok := operation["type"].(string); ok && opType == "create" { 1100 - log.Printf(" ➕ New DID: %s", op.DID) 1101 - } 1102 - } 1103 - } 1104 - } 1105 - 1106 - func main() { 1107 - monitor, err := NewMonitor("./plc_data", 30*time.Second) 1108 - if err != nil { 1109 - log.Fatal(err) 1110 - } 1111 - 1112 - monitor.Start() 1113 - } 1114 - ``` 1115 - 1116 - --- 1117 - 1118 - ## Advanced Usage 1119 - 1120 - ### Custom Configuration 1121 - 1122 - Full control over bundle manager behavior: 1123 - 1124 - ```go 1125 - package main 1126 - 1127 - import ( 1128 - "log" 1129 - "runtime" 1130 - "time" 1131 - 1132 - "tangled.org/atscan.net/plcbundle/bundle" 1133 - "tangled.org/atscan.net/plcbundle/plcclient" 1134 - plcbundle "tangled.org/atscan.net/plcbundle" 1135 - ) 1136 - 1137 - func main() { 1138 - // Custom configuration 1139 - config := &bundle.Config{ 1140 - BundleDir: "./my_bundles", 1141 - VerifyOnLoad: true, // Verify hashes when loading 1142 - AutoRebuild: true, // Auto-rebuild index if needed 1143 - RebuildWorkers: runtime.NumCPU(), // Parallel workers for rebuild 1144 - Logger: &MyCustomLogger{}, // Custom logger 1145 - 1146 - // Progress callback for rebuild 1147 - RebuildProgress: func(current, total int) { 1148 - if current%100 == 0 { 1149 - log.Printf("Rebuild: %d/%d (%.1f%%)", 1150 - current, total, float64(current)/float64(total)*100) 1151 - } 1152 - }, 1153 - } 1154 - 1155 - // Custom PLC client with rate limiting 1156 - plcClient := plcclient.NewClient("https://plc.directory", 1157 - plcclient.WithRateLimit(60, time.Minute), // 60 req/min 1158 - plcclient.WithTimeout(30*time.Second), // 30s timeout 1159 - plcclient.WithLogger(&MyCustomLogger{}), // Custom logger 1160 - ) 1161 - 1162 - // Create manager 1163 - mgr, err := bundle.NewManager(config, plcClient) 1164 - if err != nil { 1165 - log.Fatal(err) 1166 - } 1167 - defer mgr.Close() 1168 - 1169 - log.Println("Manager created with custom configuration") 1170 - } 1171 - 1172 - // Custom logger implementation 1173 - type MyCustomLogger struct{} 1174 - 1175 - func (l *MyCustomLogger) Printf(format string, v ...interface{}) { 1176 - // Add custom formatting, filtering, etc. 1177 - log.Printf("[PLCBUNDLE] "+format, v...) 1178 - } 1179 - 1180 - func (l *MyCustomLogger) Println(v ...interface{}) { 1181 - log.Println(append([]interface{}{"[PLCBUNDLE]"}, v...)...) 1182 - } 1183 - ``` 1184 - 1185 - ### Streaming Data 1186 - 1187 - Stream bundle data without loading everything into memory: 1188 - 1189 - ```go 1190 - package main 1191 - 1192 - import ( 1193 - "bufio" 1194 - "context" 1195 - "encoding/json" 1196 - "io" 1197 - "log" 1198 - 1199 - plcbundle "tangled.org/atscan.net/plcbundle" 1200 - ) 1201 - 1202 - func streamBundle(mgr *plcbundle.Manager, bundleNumber int) error { 1203 - ctx := context.Background() 1204 - 1205 - // Get decompressed stream 1206 - reader, err := mgr.StreamDecompressed(ctx, bundleNumber) 1207 - if err != nil { 1208 - return err 1209 - } 1210 - defer reader.Close() 1211 - 1212 - // Read line by line (JSONL) 1213 - scanner := bufio.NewScanner(reader) 1214 - 1215 - // Set buffer size for large lines 1216 - buf := make([]byte, 0, 64*1024) 1217 - scanner.Buffer(buf, 1024*1024) 1218 - 1219 - lineNum := 0 1220 - for scanner.Scan() { 1221 - lineNum++ 1222 - 1223 - var op plcbundle.PLCOperation 1224 - if err := json.Unmarshal(scanner.Bytes(), &op); err != nil { 1225 - log.Printf("Warning: failed to parse line %d: %v", lineNum, err) 1226 - continue 1227 - } 1228 - 1229 - // Process operation without storing all in memory 1230 - processOperation(op) 1231 - } 1232 - 1233 - return scanner.Err() 1234 - } 1235 - 1236 - func processOperation(op plcbundle.PLCOperation) { 1237 - // Your processing logic 1238 - log.Printf("Processing: %s", op.DID) 1239 - } 1240 - 1241 - func main() { 1242 - mgr, err := plcbundle.New("./plc_data", "") 1243 - if err != nil { 1244 - log.Fatal(err) 1245 - } 1246 - defer mgr.Close() 1247 - 1248 - // Stream bundle 1 1249 - if err := streamBundle(mgr, 1); err != nil { 1250 - log.Fatal(err) 1251 - } 1252 - } 1253 - ``` 1254 - 1255 - ### Parallel Processing 1256 - 1257 - Process multiple bundles concurrently: 1258 - 1259 - ```go 1260 - package main 1261 - 1262 - import ( 1263 - "context" 1264 - "log" 1265 - "sync" 1266 - 1267 - plcbundle "tangled.org/atscan.net/plcbundle" 1268 - ) 1269 - 1270 - func processParallel(mgr *plcbundle.Manager, workers int) error { 1271 - ctx := context.Background() 1272 - 1273 - index := mgr.GetIndex() 1274 - bundles := index.GetBundles() 1275 - 1276 - // Create job channel 1277 - jobs := make(chan int, len(bundles)) 1278 - results := make(chan error, len(bundles)) 1279 - 1280 - // Start workers 1281 - var wg sync.WaitGroup 1282 - for w := 0; w < workers; w++ { 1283 - wg.Add(1) 1284 - go func() { 1285 - defer wg.Done() 1286 - for bundleNum := range jobs { 1287 - if err := processBundle(ctx, mgr, bundleNum); err != nil { 1288 - results <- err 1289 - } else { 1290 - results <- nil 1291 - } 1292 - } 1293 - }() 1294 - } 1295 - 1296 - // Send jobs 1297 - for _, meta := range bundles { 1298 - jobs <- meta.BundleNumber 1299 - } 1300 - close(jobs) 1301 - 1302 - // Wait for completion 1303 - go func() { 1304 - wg.Wait() 1305 - close(results) 1306 - }() 1307 - 1308 - // Collect results 1309 - errors := 0 1310 - for err := range results { 1311 - if err != nil { 1312 - log.Printf("Error: %v", err) 1313 - errors++ 1314 - } 1315 - } 1316 - 1317 - if errors > 0 { 1318 - return fmt.Errorf("%d bundles failed processing", errors) 1319 - } 1320 - 1321 - return nil 1322 - } 1323 - 1324 - func processBundle(ctx context.Context, mgr *plcbundle.Manager, bundleNum int) error { 1325 - bundle, err := mgr.Load(ctx, bundleNum) 1326 - if err != nil { 1327 - return err 1328 - } 1329 - 1330 - // Process operations 1331 - for _, op := range bundle.Operations { 1332 - // Your logic here 1333 - _ = op 1334 - } 1335 - 1336 - log.Printf("Processed bundle %d", bundleNum) 1337 - return nil 1338 - } 1339 - 1340 - func main() { 1341 - mgr, err := plcbundle.New("./plc_data", "") 1342 - if err != nil { 1343 - log.Fatal(err) 1344 - } 1345 - defer mgr.Close() 1346 - 1347 - // Process with 8 workers 1348 - if err := processParallel(mgr, 8); err != nil { 1349 - log.Fatal(err) 1350 - } 1351 - } 1352 - ``` 1353 - 1354 - ### Working with Mempool 1355 - 1356 - Access operations before they're bundled: 1357 - 1358 - ```go 1359 - package main 1360 - 1361 - import ( 1362 - "log" 1363 - 1364 - plcbundle "tangled.org/atscan.net/plcbundle" 1365 - ) 1366 - 1367 - func main() { 1368 - mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 1369 - if err != nil { 1370 - log.Fatal(err) 1371 - } 1372 - defer mgr.Close() 1373 - 1374 - // Get mempool stats 1375 - stats := mgr.GetMempoolStats() 1376 - 1377 - count := stats["count"].(int) 1378 - targetBundle := stats["target_bundle"].(int) 1379 - canCreate := stats["can_create_bundle"].(bool) 1380 - 1381 - log.Printf("Mempool status:") 1382 - log.Printf(" Target bundle: %d", targetBundle) 1383 - log.Printf(" Operations: %d/%d", count, plcbundle.BUNDLE_SIZE) 1384 - log.Printf(" Ready: %v", canCreate) 1385 - 1386 - if count > 0 { 1387 - // Get mempool operations 1388 - ops, err := mgr.GetMempoolOperations() 1389 - if err != nil { 1390 - log.Fatal(err) 1391 - } 1392 - 1393 - log.Printf("Latest unbundled operations:") 1394 - for i, op := range ops { 1395 - if i >= 5 { 1396 - break 1397 - } 1398 - log.Printf(" %d. %s (%s)", i+1, op.DID, op.CreatedAt.Format("15:04:05")) 1399 - } 1400 - } 1401 - 1402 - // Validate chronological order 1403 - if err := mgr.ValidateMempool(); err != nil { 1404 - log.Printf("⚠️ Mempool validation failed: %v", err) 1405 - } else { 1406 - log.Println("✓ Mempool validated") 1407 - } 1408 - } 1409 - ``` 1410 - 1411 - --- 1412 - 1413 - ## Best Practices 1414 - 1415 - ### 1. Always Close the Manager 1416 - 1417 - Use `defer` to ensure cleanup: 1418 - 1419 - ```go 1420 - mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 1421 - if err != nil { 1422 - return err 1423 - } 1424 - defer mgr.Close() // Always close! 1425 - ``` 1426 - 1427 - ### 2. Handle Context Cancellation 1428 - 1429 - Support graceful shutdown: 1430 - 1431 - ```go 1432 - ctx, cancel := context.WithCancel(context.Background()) 1433 - defer cancel() 1434 - 1435 - // Listen for interrupt 1436 - sigChan := make(chan os.Signal, 1) 1437 - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 1438 - 1439 - go func() { 1440 - <-sigChan 1441 - log.Println("Interrupt received, stopping...") 1442 - cancel() 1443 - }() 1444 - 1445 - // Use context in operations 1446 - bundle, err := mgr.FetchNext(ctx) 1447 - if err == context.Canceled { 1448 - log.Println("Operation cancelled gracefully") 1449 - return nil 1450 - } 1451 - ``` 1452 - 1453 - ### 3. Check Errors Properly 1454 - 1455 - Distinguish between different error types: 1456 - 1457 - ```go 1458 - bundle, err := mgr.FetchNext(ctx) 1459 - if err != nil { 1460 - // Check if it's just "caught up" 1461 - if strings.Contains(err.Error(), "insufficient operations") { 1462 - log.Println("No new bundles available (caught up)") 1463 - return nil 1464 - } 1465 - 1466 - // Real error 1467 - return fmt.Errorf("fetch failed: %w", err) 1468 - } 1469 - ``` 1470 - 1471 - ### 4. Use Streaming for Large Datasets 1472 - 1473 - Don't load everything into memory: 1474 - 1475 - ```go 1476 - // ❌ Bad: Loads all operations into memory 1477 - index := mgr.GetIndex() 1478 - var allOps []plcbundle.PLCOperation 1479 - for _, meta := range index.GetBundles() { 1480 - bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1481 - allOps = append(allOps, bundle.Operations...) 1482 - } 1483 - 1484 - // ✅ Good: Process one bundle at a time 1485 - for _, meta := range index.GetBundles() { 1486 - bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1487 - for _, op := range bundle.Operations { 1488 - processOperation(op) 1489 - } 1490 - } 1491 - ``` 1492 - 1493 - ### 5. Enable Verification in Production 1494 - 1495 - ```go 1496 - config := plcbundle.DefaultConfig("./plc_data") 1497 - config.VerifyOnLoad = true // Verify hashes when loading 1498 - 1499 - mgr, err := plcbundle.NewManager(config, plcClient) 1500 - ``` 1501 - 1502 - ### 6. Log Appropriately 1503 - 1504 - Implement custom logger for production: 1505 - 1506 - ```go 1507 - type ProductionLogger struct { 1508 - logger *zap.Logger 1509 - } 1510 - 1511 - func (l *ProductionLogger) Printf(format string, v ...interface{}) { 1512 - l.logger.Sugar().Infof(format, v...) 1513 - } 1514 - 1515 - func (l *ProductionLogger) Println(v ...interface{}) { 1516 - l.logger.Sugar().Info(v...) 1517 - } 1518 - ``` 1519 - 1520 - ### 7. Handle Rate Limits 1521 - 1522 - Configure PLC client appropriately: 1523 - 1524 - ```go 1525 - // Production: Be conservative 1526 - plcClient := plcclient.NewClient("https://plc.directory", 1527 - plcclient.WithRateLimit(60, time.Minute), // 60 req/min max 1528 - plcclient.WithTimeout(60*time.Second), 1529 - ) 1530 - 1531 - // Development: Can be more aggressive (but respectful) 1532 - plcClient := plcclient.NewClient("https://plc.directory", 1533 - plcclient.WithRateLimit(90, time.Minute), 1534 - plcclient.WithTimeout(30*time.Second), 1535 - ) 1536 - ``` 1537 - 1538 - --- 1539 - 1540 - ## API Reference 1541 - 1542 - ### Manager Methods 1543 - 1544 - ```go 1545 - // Creation 1546 - New(bundleDir, plcURL string) (*Manager, error) 1547 - NewManager(config *Config, plcClient *PLCClient) (*Manager, error) 1548 - 1549 - // Lifecycle 1550 - Close() 1551 - 1552 - // Fetching 1553 - FetchNext(ctx) (*Bundle, error) 1554 - 1555 - // Loading 1556 - Load(ctx, bundleNumber int) (*Bundle, error) 1557 - 1558 - // Verification 1559 - Verify(ctx, bundleNumber int) (*VerificationResult, error) 1560 - VerifyChain(ctx) (*ChainVerificationResult, error) 1561 - 1562 - // Exporting 1563 - Export(ctx, afterTime time.Time, count int) ([]PLCOperation, error) 1564 - 1565 - // Streaming 1566 - StreamRaw(ctx, bundleNumber int) (io.ReadCloser, error) 1567 - StreamDecompressed(ctx, bundleNumber int) (io.ReadCloser, error) 1568 - 1569 - // Index 1570 - GetIndex() *Index 1571 - ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) 1572 - Scan() (*DirectoryScanResult, error) 1573 - 1574 - // Mempool 1575 - GetMempoolStats() map[string]interface{} 1576 - GetMempoolOperations() ([]PLCOperation, error) 1577 - ValidateMempool() error 1578 - ClearMempool() error 1579 - 1580 - // Info 1581 - GetInfo() map[string]interface{} 1582 - IsBundleIndexed(bundleNumber int) bool 1583 - ``` 1584 - 1585 - ### Index Methods 1586 - 1587 - ```go 1588 - // Creation 1589 - NewIndex() *Index 1590 - LoadIndex(path string) (*Index, error) 1591 - 1592 - // Persistence 1593 - Save(path string) error 1594 - 1595 - // Queries 1596 - GetBundle(bundleNumber int) (*BundleMetadata, error) 1597 - GetLastBundle() *BundleMetadata 1598 - GetBundles() []*BundleMetadata 1599 - GetBundleRange(start, end int) []*BundleMetadata 1600 - 1601 - // Stats 1602 - Count() int 1603 - FindGaps() []int 1604 - GetStats() map[string]interface{} 1605 - ``` 1606 - 1607 - ### Configuration Types 1608 - 1609 - ```go 1610 - type Config struct { 1611 - BundleDir string 1612 - VerifyOnLoad bool 1613 - AutoRebuild bool 1614 - RebuildWorkers int 1615 - RebuildProgress func(current, total int) 1616 - Logger Logger 1617 - } 1618 - 1619 - type Logger interface { 1620 - Printf(format string, v ...interface{}) 1621 - Println(v ...interface{}) 1622 - } 1623 - ``` 1624 - 1625 - --- 1626 - 1627 - ## Troubleshooting 1628 - 1629 - ### Bundle Not Found Error 1630 - 1631 - ```go 1632 - bundle, err := mgr.Load(ctx, 999) 1633 - if err != nil { 1634 - if strings.Contains(err.Error(), "not in index") { 1635 - // Bundle doesn't exist 1636 - log.Printf("Bundle 999 hasn't been fetched yet") 1637 - } 1638 - } 1639 - ``` 1640 - 1641 - ### Insufficient Operations Error 1642 - 1643 - ```go 1644 - bundle, err := mgr.FetchNext(ctx) 1645 - if err != nil { 1646 - if strings.Contains(err.Error(), "insufficient operations") { 1647 - // Not enough operations for a complete bundle 1648 - // Check mempool 1649 - stats := mgr.GetMempoolStats() 1650 - count := stats["count"].(int) 1651 - log.Printf("Only %d operations available (need %d)", count, plcbundle.BUNDLE_SIZE) 1652 - } 1653 - } 1654 - ``` 1655 - 1656 - ### Memory Usage 1657 - 1658 - If processing large numbers of bundles: 1659 - 1660 - ```go 1661 - // Force garbage collection between bundles 1662 - for _, meta := range index.GetBundles() { 1663 - bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1664 - processBundle(bundle) 1665 - 1666 - runtime.GC() // Help garbage collector 1667 - } 1668 - ``` 1669 - 1670 - --- 1671 - 1672 - ## Examples Repository 1673 - 1674 - Find complete, runnable examples at: 1675 - - https://github.com/plcbundle/examples 1676 - 1677 - Including: 1678 - - Complete sync service 1679 - - API server 1680 - - Analysis tools 1681 - - Monitoring services 1682 - 17 + TODO
+146 -12
internal/didindex/manager.go
··· 46 46 evictionThreshold: 5, 47 47 config: config, 48 48 logger: logger, 49 + recentLookupSize: 1000, // Track last 100 lookups 50 + recentLookups: make([]int64, 1000), 49 51 } 50 52 } 51 53 ··· 76 78 dim.verbose = verbose 77 79 } 78 80 79 - // GetDIDLocations returns all bundle+position locations for a DID 81 + // GetDIDLocations returns all bundle+position locations for a DID (with timing) 80 82 func (dim *Manager) GetDIDLocations(did string) ([]OpLocation, error) { 83 + // Start timing 84 + lookupStart := time.Now() 85 + defer func() { 86 + dim.recordLookupTime(time.Since(lookupStart)) 87 + }() 88 + 81 89 identifier, err := extractDIDIdentifier(did) 82 90 if err != nil { 83 91 return nil, err ··· 96 104 return nil, fmt.Errorf("failed to load shard %02x: %w", shardNum, err) 97 105 } 98 106 99 - // CRITICAL: Release shard when done 100 107 defer dim.releaseShard(shard) 101 108 102 109 if shard.data == nil { ··· 110 117 dim.logger.Printf("DEBUG: Shard %02x loaded, size: %d bytes", shardNum, len(shard.data)) 111 118 } 112 119 113 - // Safe to read - refcount prevents eviction 114 120 locations := dim.searchShard(shard, identifier) 115 121 116 122 if dim.verbose { ··· 141 147 atomic.AddInt64(&shard.refCount, 1) 142 148 atomic.StoreInt64(&shard.lastUsed, time.Now().Unix()) 143 149 atomic.AddInt64(&shard.accessCount, 1) 150 + atomic.AddInt64(&dim.cacheHits, 1) 144 151 145 152 return shard, nil 146 153 } 154 + atomic.AddInt64(&dim.cacheMisses, 1) 147 155 148 156 // Cache miss - load from disk 149 157 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) ··· 456 464 } 457 465 } 458 466 459 - // GetStats returns index statistics 467 + // GetStats returns index statistics (updated) 460 468 func (dim *Manager) GetStats() map[string]interface{} { 461 469 cachedShards := make([]int, 0) 462 470 ··· 467 475 468 476 sort.Ints(cachedShards) 469 477 470 - return map[string]interface{}{ 471 - "total_dids": dim.config.TotalDIDs, 472 - "last_bundle": dim.config.LastBundle, 473 - "shard_count": dim.config.ShardCount, 474 - "cached_shards": len(cachedShards), 475 - "cache_limit": dim.maxCache, 476 - "cache_order": cachedShards, 477 - "updated_at": dim.config.UpdatedAt, 478 + // Calculate cache hit rate 479 + hits := atomic.LoadInt64(&dim.cacheHits) 480 + misses := atomic.LoadInt64(&dim.cacheMisses) 481 + total := hits + misses 482 + 483 + cacheHitRate := 0.0 484 + if total > 0 { 485 + cacheHitRate = float64(hits) / float64(total) 486 + } 487 + 488 + baseStats := map[string]interface{}{ 489 + "total_dids": dim.config.TotalDIDs, 490 + "last_bundle": dim.config.LastBundle, 491 + "shard_count": dim.config.ShardCount, 492 + "cached_shards": len(cachedShards), 493 + "cache_limit": dim.maxCache, 494 + "cache_order": cachedShards, 495 + "updated_at": dim.config.UpdatedAt, 496 + "cache_hits": hits, 497 + "cache_misses": misses, 498 + "cache_hit_rate": cacheHitRate, 499 + "total_lookups": total, 500 + } 501 + 502 + // Merge with performance stats 503 + perfStats := dim.calculateLookupStats() 504 + for k, v := range perfStats { 505 + baseStats[k] = v 478 506 } 507 + 508 + return baseStats 479 509 } 480 510 481 511 // Exists checks if index exists ··· 865 895 866 896 atomic.AddInt64(&shard.refCount, -1) 867 897 } 898 + 899 + // ResetCacheStats resets cache statistics (useful for monitoring) 900 + func (dim *Manager) ResetCacheStats() { 901 + atomic.StoreInt64(&dim.cacheHits, 0) 902 + atomic.StoreInt64(&dim.cacheMisses, 0) 903 + } 904 + 905 + // recordLookupTime records a lookup time (thread-safe) 906 + func (dim *Manager) recordLookupTime(duration time.Duration) { 907 + micros := duration.Microseconds() 908 + 909 + // Update totals (atomic) 910 + atomic.AddInt64(&dim.totalLookups, 1) 911 + atomic.AddInt64(&dim.totalLookupTime, micros) 912 + 913 + // Update circular buffer (with lock) 914 + dim.lookupTimeLock.Lock() 915 + dim.recentLookups[dim.recentLookupIdx] = micros 916 + dim.recentLookupIdx = (dim.recentLookupIdx + 1) % dim.recentLookupSize 917 + dim.lookupTimeLock.Unlock() 918 + } 919 + 920 + // calculateLookupStats calculates performance statistics 921 + func (dim *Manager) calculateLookupStats() map[string]interface{} { 922 + totalLookups := atomic.LoadInt64(&dim.totalLookups) 923 + totalTime := atomic.LoadInt64(&dim.totalLookupTime) 924 + 925 + stats := make(map[string]interface{}) 926 + 927 + if totalLookups == 0 { 928 + return stats 929 + } 930 + 931 + // Overall average (all time) 932 + avgMicros := float64(totalTime) / float64(totalLookups) 933 + stats["avg_lookup_time_ms"] = avgMicros / 1000.0 934 + stats["total_lookups"] = totalLookups 935 + 936 + // Recent statistics (last N lookups) 937 + dim.lookupTimeLock.Lock() 938 + recentCopy := make([]int64, dim.recentLookupSize) 939 + copy(recentCopy, dim.recentLookups) 940 + dim.lookupTimeLock.Unlock() 941 + 942 + // Find valid entries (non-zero) 943 + validRecent := make([]int64, 0, dim.recentLookupSize) 944 + for _, t := range recentCopy { 945 + if t > 0 { 946 + validRecent = append(validRecent, t) 947 + } 948 + } 949 + 950 + if len(validRecent) > 0 { 951 + // Sort for percentiles 952 + sortedRecent := make([]int64, len(validRecent)) 953 + copy(sortedRecent, validRecent) 954 + sort.Slice(sortedRecent, func(i, j int) bool { 955 + return sortedRecent[i] < sortedRecent[j] 956 + }) 957 + 958 + // Calculate recent average 959 + var recentSum int64 960 + for _, t := range validRecent { 961 + recentSum += t 962 + } 963 + recentAvg := float64(recentSum) / float64(len(validRecent)) 964 + stats["recent_avg_lookup_time_ms"] = recentAvg / 1000.0 965 + stats["recent_sample_size"] = len(validRecent) 966 + 967 + // Min/Max 968 + stats["min_lookup_time_ms"] = float64(sortedRecent[0]) / 1000.0 969 + stats["max_lookup_time_ms"] = float64(sortedRecent[len(sortedRecent)-1]) / 1000.0 970 + 971 + // Percentiles (p50, p95, p99) 972 + p50idx := len(sortedRecent) * 50 / 100 973 + p95idx := len(sortedRecent) * 95 / 100 974 + p99idx := len(sortedRecent) * 99 / 100 975 + 976 + if p50idx < len(sortedRecent) { 977 + stats["p50_lookup_time_ms"] = float64(sortedRecent[p50idx]) / 1000.0 978 + } 979 + if p95idx < len(sortedRecent) { 980 + stats["p95_lookup_time_ms"] = float64(sortedRecent[p95idx]) / 1000.0 981 + } 982 + if p99idx < len(sortedRecent) { 983 + stats["p99_lookup_time_ms"] = float64(sortedRecent[p99idx]) / 1000.0 984 + } 985 + } 986 + 987 + return stats 988 + } 989 + 990 + // ResetPerformanceStats resets performance statistics (useful for monitoring periods) 991 + func (dim *Manager) ResetPerformanceStats() { 992 + atomic.StoreInt64(&dim.cacheHits, 0) 993 + atomic.StoreInt64(&dim.cacheMisses, 0) 994 + atomic.StoreInt64(&dim.totalLookups, 0) 995 + atomic.StoreInt64(&dim.totalLookupTime, 0) 996 + 997 + dim.lookupTimeLock.Lock() 998 + dim.recentLookups = make([]int64, dim.recentLookupSize) 999 + dim.recentLookupIdx = 0 1000 + dim.lookupTimeLock.Unlock() 1001 + }
+10
internal/didindex/types.go
··· 39 39 shardCache sync.Map 40 40 maxCache int 41 41 evictionThreshold int 42 + cacheHits int64 // atomic counter 43 + cacheMisses int64 // atomic counter 42 44 43 45 config *Config 44 46 logger Logger 45 47 verbose bool 46 48 47 49 indexMu sync.RWMutex 50 + 51 + // Performance tracking 52 + totalLookups int64 // Total number of lookups 53 + totalLookupTime int64 // Total time in microseconds 54 + lookupTimeLock sync.Mutex 55 + recentLookups []int64 // Circular buffer for recent lookup times (microseconds) 56 + recentLookupIdx int 57 + recentLookupSize int 48 58 } 49 59 50 60 // mmapShard represents a memory-mapped shard file
+5
internal/handleresolver/resolver.go
··· 123 123 handle = strings.TrimPrefix(handle, "@") 124 124 return handle 125 125 } 126 + 127 + // GetBaseURL returns the PLC directory base URL 128 + func (c *Client) GetBaseURL() string { 129 + return c.baseURL 130 + }
+3 -1
internal/sync/syncer.go
··· 71 71 72 72 // Success 73 73 fetchedCount++ 74 - totalIndexTime += stats.IndexTime 74 + if stats != nil { 75 + totalIndexTime += stats.IndexTime 76 + } 75 77 76 78 // Callback if provided 77 79 if config.OnBundleSynced != nil {
+224 -1
server/handlers.go
··· 340 340 UptimeSeconds: int(time.Since(s.startTime).Seconds()), 341 341 SyncMode: s.config.SyncMode, 342 342 WebSocketEnabled: s.config.EnableWebSocket, 343 + ResolverEnabled: s.config.EnableResolver, 343 344 Origin: s.manager.GetPLCOrigin(), 344 345 }, 345 346 Bundles: BundleStatus{ ··· 348 349 UncompressedSize: indexStats["total_uncompressed_size"].(int64), 349 350 UpdatedAt: indexStats["updated_at"].(time.Time), 350 351 }, 352 + } 353 + 354 + if resolver := s.manager.GetHandleResolver(); resolver != nil { 355 + response.Server.HandleResolver = resolver.GetBaseURL() 351 356 } 352 357 353 358 if s.config.SyncMode && s.config.SyncInterval > 0 { ··· 428 433 } 429 434 } 430 435 436 + // DID Index stats 437 + didStats := s.manager.GetDIDIndexStats() 438 + if didStats["enabled"].(bool) { 439 + didIndex := &DIDIndexStatus{ 440 + Enabled: true, 441 + Exists: didStats["exists"].(bool), 442 + TotalDIDs: didStats["total_dids"].(int64), 443 + IndexedDIDs: didStats["indexed_dids"].(int64), 444 + LastBundle: didStats["last_bundle"].(int), 445 + ShardCount: didStats["shard_count"].(int), 446 + CachedShards: didStats["cached_shards"].(int), 447 + CacheLimit: didStats["cache_limit"].(int), 448 + UpdatedAt: didStats["updated_at"].(time.Time), 449 + } 450 + 451 + // Mempool DIDs 452 + if mempoolDIDs, ok := didStats["mempool_dids"].(int64); ok && mempoolDIDs > 0 { 453 + didIndex.MempoolDIDs = mempoolDIDs 454 + } 455 + 456 + // Version and format 457 + if s.manager.GetDIDIndex() != nil { 458 + config := s.manager.GetDIDIndex().GetConfig() 459 + didIndex.Version = config.Version 460 + didIndex.Format = config.Format 461 + } 462 + 463 + // Hot shards 464 + if cacheOrder, ok := didStats["cache_order"].([]int); ok && len(cacheOrder) > 0 { 465 + maxShards := 10 466 + if len(cacheOrder) < maxShards { 467 + maxShards = len(cacheOrder) 468 + } 469 + didIndex.HotShards = cacheOrder[:maxShards] 470 + } 471 + 472 + // Cache performance 473 + if cacheHitRate, ok := didStats["cache_hit_rate"].(float64); ok { 474 + didIndex.CacheHitRate = cacheHitRate 475 + } 476 + if cacheHits, ok := didStats["cache_hits"].(int64); ok { 477 + didIndex.CacheHits = cacheHits 478 + } 479 + if cacheMisses, ok := didStats["cache_misses"].(int64); ok { 480 + didIndex.CacheMisses = cacheMisses 481 + } 482 + if totalLookups, ok := didStats["total_lookups"].(int64); ok { 483 + didIndex.TotalLookups = totalLookups 484 + } 485 + 486 + // NEW: Lookup performance metrics 487 + if avgTime, ok := didStats["avg_lookup_time_ms"].(float64); ok { 488 + didIndex.AvgLookupTimeMs = avgTime 489 + } 490 + if recentAvg, ok := didStats["recent_avg_lookup_time_ms"].(float64); ok { 491 + didIndex.RecentAvgLookupTimeMs = recentAvg 492 + } 493 + if minTime, ok := didStats["min_lookup_time_ms"].(float64); ok { 494 + didIndex.MinLookupTimeMs = minTime 495 + } 496 + if maxTime, ok := didStats["max_lookup_time_ms"].(float64); ok { 497 + didIndex.MaxLookupTimeMs = maxTime 498 + } 499 + if p50, ok := didStats["p50_lookup_time_ms"].(float64); ok { 500 + didIndex.P50LookupTimeMs = p50 501 + } 502 + if p95, ok := didStats["p95_lookup_time_ms"].(float64); ok { 503 + didIndex.P95LookupTimeMs = p95 504 + } 505 + if p99, ok := didStats["p99_lookup_time_ms"].(float64); ok { 506 + didIndex.P99LookupTimeMs = p99 507 + } 508 + if sampleSize, ok := didStats["recent_sample_size"].(int); ok { 509 + didIndex.RecentSampleSize = sampleSize 510 + } 511 + 512 + response.DIDIndex = didIndex 513 + } 514 + 515 + // Resolver performance stats 516 + if s.config.EnableResolver { 517 + resolverStats := s.manager.GetResolverStats() 518 + 519 + if totalRes, ok := resolverStats["total_resolutions"].(int64); ok && totalRes > 0 { 520 + resolver := &ResolverStatus{ 521 + Enabled: true, 522 + TotalResolutions: totalRes, 523 + } 524 + 525 + // Handle resolver URL 526 + if hr := s.manager.GetHandleResolver(); hr != nil { 527 + resolver.HandleResolver = hr.GetBaseURL() 528 + } 529 + 530 + // Counts 531 + if v, ok := resolverStats["mempool_hits"].(int64); ok { 532 + resolver.MempoolHits = v 533 + } 534 + if v, ok := resolverStats["bundle_hits"].(int64); ok { 535 + resolver.BundleHits = v 536 + } 537 + if v, ok := resolverStats["errors"].(int64); ok { 538 + resolver.Errors = v 539 + } 540 + if v, ok := resolverStats["success_rate"].(float64); ok { 541 + resolver.SuccessRate = v 542 + } 543 + if v, ok := resolverStats["mempool_hit_rate"].(float64); ok { 544 + resolver.MempoolHitRate = v 545 + } 546 + 547 + // Overall averages 548 + if v, ok := resolverStats["avg_total_time_ms"].(float64); ok { 549 + resolver.AvgTotalTimeMs = v 550 + } 551 + if v, ok := resolverStats["avg_mempool_time_ms"].(float64); ok { 552 + resolver.AvgMempoolTimeMs = v 553 + } 554 + if v, ok := resolverStats["avg_index_time_ms"].(float64); ok { 555 + resolver.AvgIndexTimeMs = v 556 + } 557 + if v, ok := resolverStats["avg_load_op_time_ms"].(float64); ok { 558 + resolver.AvgLoadOpTimeMs = v 559 + } 560 + 561 + // Recent averages 562 + if v, ok := resolverStats["recent_avg_total_time_ms"].(float64); ok { 563 + resolver.RecentAvgTotalTimeMs = v 564 + } 565 + if v, ok := resolverStats["recent_avg_mempool_time_ms"].(float64); ok { 566 + resolver.RecentAvgMempoolTimeMs = v 567 + } 568 + if v, ok := resolverStats["recent_avg_index_time_ms"].(float64); ok { 569 + resolver.RecentAvgIndexTimeMs = v 570 + } 571 + if v, ok := resolverStats["recent_avg_load_time_ms"].(float64); ok { 572 + resolver.RecentAvgLoadTimeMs = v 573 + } 574 + if v, ok := resolverStats["recent_sample_size"].(int); ok { 575 + resolver.RecentSampleSize = v 576 + } 577 + 578 + // Percentiles 579 + if v, ok := resolverStats["min_total_time_ms"].(float64); ok { 580 + resolver.MinTotalTimeMs = v 581 + } 582 + if v, ok := resolverStats["max_total_time_ms"].(float64); ok { 583 + resolver.MaxTotalTimeMs = v 584 + } 585 + if v, ok := resolverStats["p50_total_time_ms"].(float64); ok { 586 + resolver.P50TotalTimeMs = v 587 + } 588 + if v, ok := resolverStats["p95_total_time_ms"].(float64); ok { 589 + resolver.P95TotalTimeMs = v 590 + } 591 + if v, ok := resolverStats["p99_total_time_ms"].(float64); ok { 592 + resolver.P99TotalTimeMs = v 593 + } 594 + if v, ok := resolverStats["p95_index_time_ms"].(float64); ok { 595 + resolver.P95IndexTimeMs = v 596 + } 597 + if v, ok := resolverStats["p95_load_op_time_ms"].(float64); ok { 598 + resolver.P95LoadOpTimeMs = v 599 + } 600 + 601 + response.Resolver = resolver 602 + } else { 603 + // No resolutions yet, but resolver is enabled 604 + response.Resolver = &ResolverStatus{ 605 + Enabled: true, 606 + TotalResolutions: 0, 607 + } 608 + 609 + if hr := s.manager.GetHandleResolver(); hr != nil { 610 + response.Resolver.HandleResolver = hr.GetBaseURL() 611 + } 612 + } 613 + } 614 + 431 615 sendJSON(w, 200, response) 432 616 } 433 617 } ··· 520 704 521 705 func (s *Server) handleDIDDocument(did string) http.HandlerFunc { 522 706 return func(w http.ResponseWriter, r *http.Request) { 707 + // OPTIONS already handled by middleware, but extra safety check 708 + if r.Method == "OPTIONS" { 709 + return 710 + } 711 + 712 + // Track only actual GET requests 523 713 result, err := s.manager.ResolveDID(r.Context(), did) 524 714 if err != nil { 525 715 if strings.Contains(err.Error(), "deactivated") { ··· 532 722 return 533 723 } 534 724 535 - // Add timing headers in MILLISECONDS (float for precision) 725 + // Add timing headers in MILLISECONDS 536 726 w.Header().Set("X-Resolution-Time-Ms", fmt.Sprintf("%.3f", float64(result.TotalTime.Microseconds())/1000.0)) 537 727 w.Header().Set("X-Resolution-Source", result.Source) 538 728 w.Header().Set("X-Mempool-Time-Ms", fmt.Sprintf("%.3f", float64(result.MempoolTime.Microseconds())/1000.0)) ··· 759 949 w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") 760 950 w.Header().Set("ETag", op.CID) // CID is perfect for ETag 761 951 } 952 + 953 + // handleDIDIndexStats returns detailed DID index performance metrics 954 + func (s *Server) handleDebugDIDIndex() http.HandlerFunc { 955 + return func(w http.ResponseWriter, r *http.Request) { 956 + didStats := s.manager.GetDIDIndexStats() 957 + 958 + if !didStats["enabled"].(bool) || !didStats["exists"].(bool) { 959 + sendJSON(w, 404, map[string]string{ 960 + "error": "DID index not available", 961 + }) 962 + return 963 + } 964 + 965 + // Return all stats (more detailed than /status) 966 + sendJSON(w, 200, didStats) 967 + } 968 + } 969 + 970 + func (s *Server) handleDebugResolver() http.HandlerFunc { 971 + return func(w http.ResponseWriter, r *http.Request) { 972 + resolverStats := s.manager.GetResolverStats() 973 + 974 + if resolverStats == nil { 975 + sendJSON(w, 404, map[string]string{ 976 + "error": "Resolver not enabled", 977 + }) 978 + return 979 + } 980 + 981 + // Return all stats (more detailed than /status) 982 + sendJSON(w, 200, resolverStats) 983 + } 984 + }
+11 -11
server/middleware.go
··· 15 15 return 16 16 } 17 17 18 - // Normal CORS handling 18 + // Set CORS headers for all requests 19 19 w.Header().Set("Access-Control-Allow-Origin", "*") 20 - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") 21 - 22 - if requestedHeaders := r.Header.Get("Access-Control-Request-Headers"); requestedHeaders != "" { 23 - w.Header().Set("Access-Control-Allow-Headers", requestedHeaders) 24 - } else { 25 - w.Header().Set("Access-Control-Allow-Headers", "*") 26 - } 27 - 20 + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") 21 + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") 28 22 w.Header().Set("Access-Control-Max-Age", "86400") 23 + w.Header().Set("Access-Control-Expose-Headers", 24 + "X-Bundle-Number, X-Position, X-Global-Position, X-Pointer, "+ 25 + "X-Operation-DID, X-Operation-CID, X-Load-Time-Ms, X-Total-Time-Ms, "+ 26 + "X-Resolution-Time-Ms, X-Resolution-Source, X-Index-Time-Ms") 29 27 28 + // Handle OPTIONS preflight - return immediately WITHOUT calling handler 30 29 if r.Method == "OPTIONS" { 31 - w.WriteHeader(204) 32 - return 30 + w.WriteHeader(http.StatusNoContent) // 204 31 + return // STOP HERE - don't call next 33 32 } 34 33 34 + // Only call handler for non-OPTIONS requests 35 35 next.ServeHTTP(w, r) 36 36 }) 37 37 }
+17 -31
server/server.go
··· 8 8 "tangled.org/atscan.net/plcbundle/bundle" 9 9 ) 10 10 11 - // Server serves bundle data over HTTP 12 - type Server struct { 13 - manager *bundle.Manager 14 - addr string 15 - config *Config 16 - startTime time.Time 17 - httpServer *http.Server 18 - } 19 - 20 - // Config configures the server 21 - type Config struct { 22 - Addr string 23 - SyncMode bool 24 - SyncInterval time.Duration 25 - EnableWebSocket bool 26 - EnableResolver bool 27 - Version string 28 - } 29 - 30 11 // New creates a new HTTP server 31 12 func New(manager *bundle.Manager, config *Config) *Server { 32 13 if config.Version == "" { ··· 50 31 return s 51 32 } 52 33 53 - // ListenAndServe starts the HTTP server 54 - func (s *Server) ListenAndServe() error { 55 - return s.httpServer.ListenAndServe() 56 - } 57 - 58 - // Shutdown gracefully shuts down the server 59 - func (s *Server) Shutdown(ctx context.Context) error { 60 - return s.httpServer.Shutdown(ctx) 61 - } 62 - 63 34 // createHandler creates the HTTP handler with all routes 64 35 func (s *Server) createHandler() http.Handler { 65 36 mux := http.NewServeMux() ··· 72 43 mux.HandleFunc("GET /op/{pointer}", s.handleOperation()) 73 44 mux.HandleFunc("GET /status", s.handleStatus()) 74 45 mux.HandleFunc("GET /debug/memory", s.handleDebugMemory()) 46 + mux.HandleFunc("GET /debug/didindex", s.handleDebugDIDIndex()) 47 + mux.HandleFunc("GET /debug/resolver", s.handleDebugResolver()) 75 48 76 49 // WebSocket 77 50 if s.config.EnableWebSocket { ··· 100 73 sendJSON(w, 404, map[string]string{"error": "not found"}) 101 74 }) 102 75 103 - return corsMiddleware(mux) 76 + // Apply middleware in correct order: 77 + handler := corsMiddleware(mux) 78 + 79 + return handler 80 + } 81 + 82 + // ListenAndServe starts the HTTP server 83 + func (s *Server) ListenAndServe() error { 84 + return s.httpServer.ListenAndServe() 85 + } 86 + 87 + // Shutdown gracefully shuts down the server 88 + func (s *Server) Shutdown(ctx context.Context) error { 89 + return s.httpServer.Shutdown(ctx) 104 90 } 105 91 106 92 // GetStartTime returns when the server started ··· 108 94 return s.startTime 109 95 } 110 96 111 - // Add this method to Server 97 + // Handler returns the configured HTTP handler 112 98 func (s *Server) Handler() http.Handler { 113 99 return s.createHandler() 114 100 }
+112 -6
server/types.go
··· 1 1 package server 2 2 3 - import "time" 3 + import ( 4 + "net/http" 5 + "time" 6 + 7 + "tangled.org/atscan.net/plcbundle/bundle" 8 + ) 9 + 10 + // Server serves bundle data over HTTP 11 + type Server struct { 12 + manager *bundle.Manager 13 + addr string 14 + config *Config 15 + startTime time.Time 16 + httpServer *http.Server 17 + } 18 + 19 + // Config configures the server 20 + type Config struct { 21 + Addr string 22 + SyncMode bool 23 + SyncInterval time.Duration 24 + EnableWebSocket bool 25 + EnableResolver bool 26 + Version string 27 + } 4 28 5 29 // StatusResponse is the /status endpoint response 6 30 type StatusResponse struct { 7 - Bundles BundleStatus `json:"bundles"` 8 - Mempool *MempoolStatus `json:"mempool,omitempty"` 9 - Server ServerStatus `json:"server"` 31 + Server ServerStatus `json:"server"` 32 + Bundles BundleStatus `json:"bundles"` 33 + Mempool *MempoolStatus `json:"mempool,omitempty"` 34 + DIDIndex *DIDIndexStatus `json:"didindex,omitempty"` 35 + Resolver *ResolverStatus `json:"resolver,omitempty"` 10 36 } 11 37 12 38 // ServerStatus contains server information 13 39 type ServerStatus struct { 14 40 Version string `json:"version"` 15 - UptimeSeconds int `json:"uptime_seconds"` 41 + Origin string `json:"origin,omitempty"` 16 42 SyncMode bool `json:"sync_mode"` 17 43 SyncIntervalSeconds int `json:"sync_interval_seconds,omitempty"` 18 44 WebSocketEnabled bool `json:"websocket_enabled"` 19 - Origin string `json:"origin,omitempty"` 45 + ResolverEnabled bool `json:"resolver_enabled"` 46 + HandleResolver string `json:"handle_resolver,omitempty"` 47 + UptimeSeconds int `json:"uptime_seconds"` 48 + } 49 + 50 + // DIDIndexStatus contains DID index statistics 51 + type DIDIndexStatus struct { 52 + Enabled bool `json:"enabled"` 53 + Exists bool `json:"exists"` 54 + TotalDIDs int64 `json:"total_dids"` 55 + IndexedDIDs int64 `json:"indexed_dids"` 56 + MempoolDIDs int64 `json:"mempool_dids,omitempty"` 57 + LastBundle int `json:"last_bundle"` 58 + ShardCount int `json:"shard_count"` 59 + CachedShards int `json:"cached_shards"` 60 + CacheLimit int `json:"cache_limit"` 61 + CacheHitRate float64 `json:"cache_hit_rate"` 62 + CacheHits int64 `json:"cache_hits"` 63 + CacheMisses int64 `json:"cache_misses"` 64 + TotalLookups int64 `json:"total_lookups"` 65 + UpdatedAt time.Time `json:"updated_at"` 66 + Version int `json:"version,omitempty"` 67 + Format string `json:"format,omitempty"` 68 + HotShards []int `json:"hot_shards,omitempty"` 69 + 70 + // Lookup performance metrics 71 + AvgLookupTimeMs float64 `json:"avg_lookup_time_ms"` // All-time average 72 + RecentAvgLookupTimeMs float64 `json:"recent_avg_lookup_time_ms"` // Recent average 73 + MinLookupTimeMs float64 `json:"min_lookup_time_ms,omitempty"` // Fastest 74 + MaxLookupTimeMs float64 `json:"max_lookup_time_ms,omitempty"` // Slowest 75 + P50LookupTimeMs float64 `json:"p50_lookup_time_ms,omitempty"` // Median 76 + P95LookupTimeMs float64 `json:"p95_lookup_time_ms,omitempty"` // 95th percentile 77 + P99LookupTimeMs float64 `json:"p99_lookup_time_ms,omitempty"` // 99th percentile 78 + RecentSampleSize int `json:"recent_sample_size,omitempty"` // How many samples 79 + } 80 + 81 + // ResolverStatus contains DID document resolver performance metrics 82 + type ResolverStatus struct { 83 + Enabled bool `json:"enabled"` 84 + HandleResolver string `json:"handle_resolver,omitempty"` 85 + 86 + // Resolution counts 87 + TotalResolutions int64 `json:"total_resolutions"` 88 + MempoolHits int64 `json:"mempool_hits"` 89 + BundleHits int64 `json:"bundle_hits"` 90 + Errors int64 `json:"errors"` 91 + SuccessRate float64 `json:"success_rate"` 92 + MempoolHitRate float64 `json:"mempool_hit_rate"` 93 + 94 + // Overall timing (all-time averages) 95 + AvgTotalTimeMs float64 `json:"avg_total_time_ms"` 96 + AvgMempoolTimeMs float64 `json:"avg_mempool_time_ms"` 97 + AvgIndexTimeMs float64 `json:"avg_index_time_ms,omitempty"` 98 + AvgLoadOpTimeMs float64 `json:"avg_load_op_time_ms,omitempty"` 99 + 100 + // Recent performance (last N resolutions) 101 + RecentAvgTotalTimeMs float64 `json:"recent_avg_total_time_ms"` 102 + RecentAvgMempoolTimeMs float64 `json:"recent_avg_mempool_time_ms"` 103 + RecentAvgIndexTimeMs float64 `json:"recent_avg_index_time_ms,omitempty"` 104 + RecentAvgLoadTimeMs float64 `json:"recent_avg_load_time_ms,omitempty"` 105 + RecentSampleSize int `json:"recent_sample_size"` 106 + 107 + // Percentiles (total response time) 108 + MinTotalTimeMs float64 `json:"min_total_time_ms,omitempty"` 109 + MaxTotalTimeMs float64 `json:"max_total_time_ms,omitempty"` 110 + P50TotalTimeMs float64 `json:"p50_total_time_ms,omitempty"` 111 + P95TotalTimeMs float64 `json:"p95_total_time_ms,omitempty"` 112 + P99TotalTimeMs float64 `json:"p99_total_time_ms,omitempty"` 113 + 114 + // Breakdown percentiles (for bundle resolutions only) 115 + P95IndexTimeMs float64 `json:"p95_index_time_ms,omitempty"` 116 + P95LoadOpTimeMs float64 `json:"p95_load_op_time_ms,omitempty"` 20 117 } 21 118 22 119 // BundleStatus contains bundle statistics ··· 56 153 LastOpAgeSeconds int `json:"last_op_age_seconds,omitempty"` 57 154 EtaNextBundleSeconds int `json:"eta_next_bundle_seconds,omitempty"` 58 155 } 156 + 157 + // RequestLog represents a logged HTTP request 158 + type RequestLog struct { 159 + Timestamp time.Time `json:"timestamp"` 160 + Method string `json:"method"` 161 + Path string `json:"path"` 162 + UserAgent string `json:"user_agent"` 163 + RemoteAddr string `json:"remote_addr"` 164 + }