A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

server stats

+221 -63
+25 -15
bundle/index.go
··· 20 20 21 21 // Index represents the JSON index file 22 22 type Index struct { 23 - Version string `json:"version"` 24 - LastBundle int `json:"last_bundle"` 25 - UpdatedAt time.Time `json:"updated_at"` 26 - TotalSize int64 `json:"total_size_bytes"` 27 - Bundles []*BundleMetadata `json:"bundles"` 23 + Version string `json:"version"` 24 + LastBundle int `json:"last_bundle"` 25 + UpdatedAt time.Time `json:"updated_at"` 26 + TotalSize int64 `json:"total_size_bytes"` 27 + TotalUncompressedSize int64 `json:"total_uncompressed_size_bytes"` 28 + Bundles []*BundleMetadata `json:"bundles"` 28 29 29 30 mu sync.RWMutex `json:"-"` 30 31 } ··· 54 55 if idx.Version != INDEX_VERSION { 55 56 return nil, fmt.Errorf("unsupported index version: %s (expected %s)", idx.Version, INDEX_VERSION) 56 57 } 58 + 59 + // Recalculate derived fields (handles new fields added to Index) 60 + idx.recalculate() 57 61 58 62 return &idx, nil 59 63 } ··· 197 201 198 202 if len(idx.Bundles) == 0 { 199 203 return map[string]interface{}{ 200 - "bundle_count": 0, 201 - "total_size": 0, 204 + "bundle_count": 0, 205 + "total_size": 0, 206 + "total_uncompressed_size": 0, 202 207 } 203 208 } 204 209 ··· 206 211 last := idx.Bundles[len(idx.Bundles)-1] 207 212 208 213 return map[string]interface{}{ 209 - "bundle_count": len(idx.Bundles), 210 - "first_bundle": first.BundleNumber, 211 - "last_bundle": last.BundleNumber, 212 - "total_size": idx.TotalSize, 213 - "start_time": first.StartTime, 214 - "end_time": last.EndTime, 215 - "updated_at": idx.UpdatedAt, 216 - "gaps": len(idx.FindGaps()), 214 + "bundle_count": len(idx.Bundles), 215 + "first_bundle": first.BundleNumber, 216 + "last_bundle": last.BundleNumber, 217 + "total_size": idx.TotalSize, 218 + "total_uncompressed_size": idx.TotalUncompressedSize, 219 + "start_time": first.StartTime, 220 + "end_time": last.EndTime, 221 + "updated_at": idx.UpdatedAt, 222 + "gaps": len(idx.FindGaps()), 217 223 } 218 224 } 219 225 ··· 229 235 if len(idx.Bundles) == 0 { 230 236 idx.LastBundle = 0 231 237 idx.TotalSize = 0 238 + idx.TotalUncompressedSize = 0 232 239 return 233 240 } 234 241 235 242 // Find last bundle 236 243 maxBundle := 0 237 244 totalSize := int64(0) 245 + totalUncompressed := int64(0) 238 246 239 247 for _, meta := range idx.Bundles { 240 248 if meta.BundleNumber > maxBundle { 241 249 maxBundle = meta.BundleNumber 242 250 } 243 251 totalSize += meta.CompressedSize 252 + totalUncompressed += meta.UncompressedSize 244 253 } 245 254 246 255 idx.LastBundle = maxBundle 247 256 idx.TotalSize = totalSize 257 + idx.TotalUncompressedSize = totalUncompressed 248 258 } 249 259 250 260 // Rebuild rebuilds the index from bundle metadata
+8
bundle/manager.go
··· 1240 1240 } 1241 1241 return m.mempool.Save() 1242 1242 } 1243 + 1244 + // GetPLCOrigin returns the PLC directory origin URL (empty if not configured) 1245 + func (m *Manager) GetPLCOrigin() string { 1246 + if m.plcClient == nil { 1247 + return "" 1248 + } 1249 + return m.plcClient.GetBaseURL() 1250 + }
+4
cmd/plcbundle/main.go
··· 1255 1255 fmt.Printf("File: %s\n", filepath.Join(dir, mempoolFilename)) 1256 1256 } 1257 1257 1258 + var serverStartTime time.Time 1259 + 1258 1260 func cmdServe() { 1259 1261 fs := flag.NewFlagSet("serve", flag.ExitOnError) 1260 1262 port := fs.String("port", "8080", "HTTP server port") ··· 1265 1267 enableWebSocket := fs.Bool("websocket", false, "enable WebSocket endpoint for streaming records") 1266 1268 workers := fs.Int("workers", 4, "number of workers for auto-rebuild (0 = CPU count)") 1267 1269 fs.Parse(os.Args[2:]) 1270 + 1271 + serverStartTime = time.Now() 1268 1272 1269 1273 // Auto-detect CPU count 1270 1274 if *workers == 0 {
+179 -48
cmd/plcbundle/server.go
··· 62 62 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 63 63 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 64 64 65 - plcbundle %s 65 + plcbundle server 66 66 67 67 *~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~* 68 68 | ⚠️ Preview Version – Do Not Use In Production! | ··· 73 73 | Please wait for the 1.0 release. | 74 74 |________________________________________________________________| 75 75 76 - `, version) 76 + `) 77 77 78 78 fmt.Fprintf(w, "What is PLC Bundle?\n") 79 79 fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━━\n") ··· 83 83 fmt.Fprintf(w, "the previous bundle, creating a verifiable chain of DID operations.\n\n") 84 84 fmt.Fprintf(w, "More info: https://tangled.org/@atscan.net/plcbundle\n\n") 85 85 86 - fmt.Fprintf(w, "Server Stats\n") 87 - fmt.Fprintf(w, "━━━━━━━━━━━━\n") 88 - fmt.Fprintf(w, " Bundle count: %d\n", bundleCount) 89 - fmt.Fprintf(w, " Sync mode: %v\n", syncMode) 90 - fmt.Fprintf(w, " WebSocket: %v\n", wsEnabled) 86 + if bundleCount > 0 { 87 + fmt.Fprintf(w, "Bundles\n") 88 + fmt.Fprintf(w, "━━━━━━━\n") 89 + fmt.Fprintf(w, " Bundle count: %d\n", bundleCount) 91 90 92 - if bundleCount > 0 { 93 91 firstBundle := stats["first_bundle"].(int) 94 92 lastBundle := stats["last_bundle"].(int) 95 93 totalSize := stats["total_size"].(int64) 94 + totalUncompressed := stats["total_uncompressed_size"].(int64) 96 95 96 + fmt.Fprintf(w, " Last bundle: %d (%s)\n", lastBundle, 97 + stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05")) 97 98 fmt.Fprintf(w, " Range: %06d - %06d\n", firstBundle, lastBundle) 98 99 fmt.Fprintf(w, " Total size: %.2f MB\n", float64(totalSize)/(1000*1000)) 99 - fmt.Fprintf(w, " Updated: %s\n", stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05")) 100 + fmt.Fprintf(w, " Uncompressed: %.2f MB (%.2fx)\n", 101 + float64(totalUncompressed)/(1000*1000), 102 + float64(totalUncompressed)/float64(totalSize)) 100 103 101 104 if gaps, ok := stats["gaps"].(int); ok && gaps > 0 { 102 105 fmt.Fprintf(w, " ⚠ Gaps: %d missing bundles\n", gaps) ··· 123 126 124 127 fmt.Fprintf(w, "\nMempool Stats\n") 125 128 fmt.Fprintf(w, "━━━━━━━━━━━━━\n") 126 - fmt.Fprintf(w, " Target bundle: %06d\n", targetBundle) 129 + fmt.Fprintf(w, " Target bundle: %d\n", targetBundle) 127 130 fmt.Fprintf(w, " Operations: %d / %d\n", count, bundle.BUNDLE_SIZE) 128 131 fmt.Fprintf(w, " Can create bundle: %v\n", canCreate) 129 132 ··· 150 153 fmt.Fprintf(w, " (empty)\n") 151 154 } 152 155 } 156 + fmt.Fprintf(w, "\n") 157 + 158 + fmt.Fprintf(w, "Server Stats\n") 159 + fmt.Fprintf(w, "━━━━━━━━━━━━\n") 160 + fmt.Fprintf(w, " Version: %s\n", version) 161 + if origin := mgr.GetPLCOrigin(); origin != "" { 162 + fmt.Fprintf(w, " Origin: %s\n", origin) 163 + } 164 + fmt.Fprintf(w, " Sync mode: %v\n", syncMode) 165 + fmt.Fprintf(w, " WebSocket: %v\n", wsEnabled) 166 + fmt.Fprintf(w, " Uptime: %s\n", time.Since(serverStartTime).Round(time.Second)) 167 + fmt.Fprintf(w, "\n") 153 168 154 169 fmt.Fprintf(w, "\nAPI Endpoints\n") 155 170 fmt.Fprintf(w, "━━━━━━━━━━━━━\n") ··· 158 173 fmt.Fprintf(w, " GET /bundle/:number Bundle metadata (JSON)\n") 159 174 fmt.Fprintf(w, " GET /data/:number Raw bundle (zstd compressed)\n") 160 175 fmt.Fprintf(w, " GET /jsonl/:number Decompressed JSONL stream\n") 176 + fmt.Fprintf(w, " GET /status Server status\n") 177 + fmt.Fprintf(w, " GET /mempool Mempool operations (JSONL)\n") 161 178 162 179 if wsEnabled { 163 180 fmt.Fprintf(w, "\nWebSocket Endpoints\n") ··· 168 185 fmt.Fprintf(w, " Connection stays open until client closes\n") 169 186 fmt.Fprintf(w, " Cursor: global record number (0-based)\n") 170 187 fmt.Fprintf(w, " Example: 88410345 = bundle 8841, pos 345\n") 171 - } 172 - 173 - if syncMode { 174 - fmt.Fprintf(w, "\nSync Endpoints\n") 175 - fmt.Fprintf(w, "━━━━━━━━━━━━━━\n") 176 - fmt.Fprintf(w, " GET /sync Sync status & mempool info (JSON)\n") 177 - fmt.Fprintf(w, " GET /sync/mempool Mempool operations (JSONL)\n") 178 188 } 179 189 180 190 fmt.Fprintf(w, "\nExamples\n") ··· 205 215 } 206 216 207 217 fmt.Fprintf(w, "\n────────────────────────────────────────────────────────────────\n") 208 - fmt.Fprintf(w, "plcbundle %s | https://tangled.org/@atscan.net/plcbundle\n", version) 218 + fmt.Fprintf(w, "https://tangled.org/@atscan.net/plcbundle\n") 209 219 } 210 220 211 221 // getScheme returns the appropriate HTTP scheme (http or https) ··· 283 293 handleBundleJSONL(w, r, mgr) 284 294 }) 285 295 296 + // Status endpoint 297 + mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { 298 + handleStatus(w, mgr, syncMode, wsEnabled) 299 + }) 300 + 286 301 // WebSocket endpoint (if enabled) 287 302 if wsEnabled { 288 303 mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { ··· 292 307 293 308 // Sync endpoints (only if sync mode enabled) 294 309 if syncMode { 295 - mux.HandleFunc("/sync", func(w http.ResponseWriter, r *http.Request) { 296 - handleSync(w, mgr) 297 - }) 298 - 299 - mux.HandleFunc("/sync/mempool", func(w http.ResponseWriter, r *http.Request) { 300 - handleSyncMempool(w, mgr) 310 + mux.HandleFunc("/mempool", func(w http.ResponseWriter, r *http.Request) { 311 + handleMempool(w, mgr) 301 312 }) 302 313 } 303 314 ··· 559 570 return nil 560 571 } 561 572 562 - // handleSync returns sync status and mempool info as JSON 563 - func handleSync(w http.ResponseWriter, mgr *bundle.Manager) { 573 + // StatusResponse represents the /status endpoint response 574 + type StatusResponse struct { 575 + Bundles BundleStatus `json:"bundles"` 576 + Mempool *MempoolStatus `json:"mempool,omitempty"` // nil if sync disabled 577 + Server ServerStatus `json:"server"` 578 + } 579 + 580 + type ServerStatus struct { 581 + Version string `json:"version"` 582 + UptimeSeconds int `json:"uptime_seconds"` 583 + SyncMode bool `json:"sync_mode"` 584 + WebSocketEnabled bool `json:"websocket_enabled"` 585 + Origin string `json:"origin,omitempty"` // PLC directory URL 586 + } 587 + 588 + type BundleStatus struct { 589 + Count int `json:"count"` 590 + FirstBundle int `json:"first_bundle,omitempty"` 591 + LastBundle int `json:"last_bundle,omitempty"` 592 + TotalSize int64 `json:"total_size"` 593 + UncompressedSize int64 `json:"uncompressed_size,omitempty"` 594 + CompressionRatio float64 `json:"compression_ratio,omitempty"` 595 + TotalOperations int `json:"total_operations,omitempty"` 596 + AvgOpsPerHour int `json:"avg_ops_per_hour,omitempty"` 597 + StartTime time.Time `json:"start_time,omitempty"` 598 + EndTime time.Time `json:"end_time,omitempty"` 599 + UpdatedAt time.Time `json:"updated_at"` 600 + HeadAgeSeconds int `json:"head_age_seconds,omitempty"` 601 + RootHash string `json:"root_hash,omitempty"` 602 + HeadHash string `json:"head_hash,omitempty"` 603 + Gaps int `json:"gaps,omitempty"` 604 + HasGaps bool `json:"has_gaps"` 605 + GapNumbers []int `json:"gap_numbers,omitempty"` 606 + } 607 + 608 + type MempoolStatus struct { 609 + Count int `json:"count"` 610 + TargetBundle int `json:"target_bundle"` 611 + CanCreateBundle bool `json:"can_create_bundle"` 612 + MinTimestamp time.Time `json:"min_timestamp"` 613 + Validated bool `json:"validated"` 614 + ProgressPercent float64 `json:"progress_percent"` 615 + BundleSize int `json:"bundle_size"` 616 + OperationsNeeded int `json:"operations_needed"` 617 + FirstTime time.Time `json:"first_time,omitempty"` 618 + LastTime time.Time `json:"last_time,omitempty"` 619 + TimespanSeconds int `json:"timespan_seconds,omitempty"` 620 + LastOpAgeSeconds int `json:"last_op_age_seconds,omitempty"` 621 + EtaNextBundleSeconds int `json:"eta_next_bundle_seconds,omitempty"` 622 + } 623 + 624 + // handleStatus returns repository status and statistics 625 + func handleStatus(w http.ResponseWriter, mgr *bundle.Manager, syncMode bool, wsEnabled bool) { 564 626 w.Header().Set("Content-Type", "application/json") 565 627 w.Header().Set("Access-Control-Allow-Origin", "*") 566 628 567 629 index := mgr.GetIndex() 568 630 indexStats := index.GetStats() 569 - mempoolStats := mgr.GetMempoolStats() 570 631 571 - // Build response 572 - response := map[string]interface{}{ 573 - "bundles": map[string]interface{}{ 574 - "count": indexStats["bundle_count"], 575 - "total_size": indexStats["total_size"], 576 - "updated_at": indexStats["updated_at"], 632 + // Build response with proper types 633 + response := StatusResponse{ 634 + Server: ServerStatus{ 635 + Version: version, 636 + UptimeSeconds: int(time.Since(serverStartTime).Seconds()), 637 + SyncMode: syncMode, 638 + WebSocketEnabled: wsEnabled, 639 + Origin: mgr.GetPLCOrigin(), 640 + }, 641 + Bundles: BundleStatus{ 642 + Count: indexStats["bundle_count"].(int), 643 + TotalSize: indexStats["total_size"].(int64), 644 + UncompressedSize: indexStats["total_uncompressed_size"].(int64), 645 + //UpdatedAt: indexStats["updated_at"].(time.Time), 577 646 }, 578 - "mempool": mempoolStats, 579 647 } 580 648 581 - // Add bundle range if bundles exist 582 - if bundleCount, ok := indexStats["bundle_count"].(int); ok && bundleCount > 0 { 583 - response["bundles"].(map[string]interface{})["first_bundle"] = indexStats["first_bundle"] 584 - response["bundles"].(map[string]interface{})["last_bundle"] = indexStats["last_bundle"] 585 - response["bundles"].(map[string]interface{})["start_time"] = indexStats["start_time"] 586 - response["bundles"].(map[string]interface{})["end_time"] = indexStats["end_time"] 649 + // Add bundle details if bundles exist 650 + if bundleCount := response.Bundles.Count; bundleCount > 0 { 651 + firstBundle := indexStats["first_bundle"].(int) 652 + lastBundle := indexStats["last_bundle"].(int) 653 + 654 + response.Bundles.FirstBundle = firstBundle 655 + response.Bundles.LastBundle = lastBundle 656 + response.Bundles.StartTime = indexStats["start_time"].(time.Time) 657 + response.Bundles.EndTime = indexStats["end_time"].(time.Time) 587 658 659 + // Hashes 660 + if firstMeta, err := index.GetBundle(firstBundle); err == nil { 661 + response.Bundles.RootHash = firstMeta.Hash 662 + } 663 + 664 + if lastMeta, err := index.GetBundle(lastBundle); err == nil { 665 + response.Bundles.HeadHash = lastMeta.Hash 666 + response.Bundles.HeadAgeSeconds = int(time.Since(lastMeta.EndTime).Seconds()) 667 + } 668 + 669 + // Gaps 588 670 if gaps, ok := indexStats["gaps"].(int); ok { 589 - response["bundles"].(map[string]interface{})["gaps"] = gaps 671 + response.Bundles.Gaps = gaps 672 + response.Bundles.HasGaps = gaps > 0 673 + if gaps > 0 { 674 + response.Bundles.GapNumbers = index.FindGaps() 675 + } 676 + } 677 + 678 + // Total operations 679 + totalOps := bundleCount * bundle.BUNDLE_SIZE 680 + response.Bundles.TotalOperations = totalOps 681 + 682 + // Performance metrics 683 + duration := response.Bundles.EndTime.Sub(response.Bundles.StartTime) 684 + if duration.Hours() > 0 { 685 + response.Bundles.AvgOpsPerHour = int(float64(totalOps) / duration.Hours()) 590 686 } 591 687 } 592 688 593 - // Calculate mempool progress percentage 594 - if count, ok := mempoolStats["count"].(int); ok { 595 - progress := float64(count) / float64(bundle.BUNDLE_SIZE) * 100 596 - response["mempool"].(map[string]interface{})["progress_percent"] = progress 597 - response["mempool"].(map[string]interface{})["bundle_size"] = bundle.BUNDLE_SIZE 689 + // Only include mempool if sync mode is enabled 690 + if syncMode { 691 + mempoolStats := mgr.GetMempoolStats() 692 + 693 + if count, ok := mempoolStats["count"].(int); ok { 694 + mempool := &MempoolStatus{ 695 + Count: count, 696 + TargetBundle: mempoolStats["target_bundle"].(int), 697 + CanCreateBundle: mempoolStats["can_create_bundle"].(bool), 698 + MinTimestamp: mempoolStats["min_timestamp"].(time.Time), 699 + Validated: mempoolStats["validated"].(bool), 700 + ProgressPercent: float64(count) / float64(bundle.BUNDLE_SIZE) * 100, 701 + BundleSize: bundle.BUNDLE_SIZE, 702 + OperationsNeeded: bundle.BUNDLE_SIZE - count, 703 + } 704 + 705 + // Optional time fields 706 + if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 707 + mempool.FirstTime = firstTime 708 + mempool.TimespanSeconds = int(time.Since(firstTime).Seconds()) 709 + } 710 + if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 711 + mempool.LastTime = lastTime 712 + mempool.LastOpAgeSeconds = int(time.Since(lastTime).Seconds()) 713 + } 714 + 715 + // ETA calculation 716 + if count > 100 && count < bundle.BUNDLE_SIZE { 717 + if !mempool.FirstTime.IsZero() && !mempool.LastTime.IsZero() { 718 + timespan := mempool.LastTime.Sub(mempool.FirstTime) 719 + if timespan.Seconds() > 0 { 720 + opsPerSec := float64(count) / timespan.Seconds() 721 + remaining := bundle.BUNDLE_SIZE - count 722 + mempool.EtaNextBundleSeconds = int(float64(remaining) / opsPerSec) 723 + } 724 + } 725 + } 726 + 727 + response.Mempool = mempool 728 + } 598 729 } 599 730 600 731 data, err := json.MarshalIndent(response, "", " ") 601 732 if err != nil { 602 - http.Error(w, "Failed to marshal sync status", http.StatusInternalServerError) 733 + http.Error(w, "Failed to marshal status", http.StatusInternalServerError) 603 734 return 604 735 } 605 736 ··· 607 738 } 608 739 609 740 // handleSyncMempool streams mempool operations as JSONL 610 - func handleSyncMempool(w http.ResponseWriter, mgr *bundle.Manager) { 741 + func handleMempool(w http.ResponseWriter, mgr *bundle.Manager) { 611 742 ops, err := mgr.GetMempoolOperations() 612 743 if err != nil { 613 744 http.Error(w, fmt.Sprintf("Failed to get mempool operations: %v", err), http.StatusInternalServerError)
+5
plc/client.go
··· 284 284 "timeout": c.httpClient.Timeout, 285 285 } 286 286 } 287 + 288 + // GetBaseURL returns the PLC directory base URL 289 + func (c *Client) GetBaseURL() string { 290 + return c.baseURL 291 + }