+201
-147
internal/api/handlers.go
+201
-147
internal/api/handlers.go
···
20
"github.com/klauspost/compress/zstd"
21
)
22
23
-
func (s *Server) handleGetPDSList(w http.ResponseWriter, r *http.Request) {
24
ctx := r.Context()
25
26
-
filter := &storage.PDSFilter{}
27
28
if status := r.URL.Query().Get("status"); status != "" {
29
filter.Status = status
30
}
31
32
if limit := r.URL.Query().Get("limit"); limit != "" {
33
if l, err := strconv.Atoi(limit); err == nil {
34
filter.Limit = l
···
41
}
42
}
43
44
-
servers, err := s.db.GetPDSServers(ctx, filter)
45
if err != nil {
46
http.Error(w, err.Error(), http.StatusInternalServerError)
47
return
48
}
49
50
// Convert status codes to strings for API
51
-
response := make([]map[string]interface{}, len(servers))
52
-
for i, srv := range servers {
53
response[i] = map[string]interface{}{
54
-
"id": srv.ID,
55
-
"endpoint": srv.Endpoint,
56
-
"discovered_at": srv.DiscoveredAt,
57
-
"last_checked": srv.LastChecked,
58
-
"status": statusToString(srv.Status),
59
-
"user_count": srv.UserCount,
60
}
61
}
62
63
respondJSON(w, response)
64
}
65
66
-
func (s *Server) handleGetPDS(w http.ResponseWriter, r *http.Request) {
67
ctx := r.Context()
68
vars := mux.Vars(r)
69
endpoint := vars["endpoint"]
70
71
-
pds, err := s.db.GetPDS(ctx, endpoint)
72
if err != nil {
73
-
http.Error(w, "PDS not found", http.StatusNotFound)
74
return
75
}
76
77
// Get recent scans
78
-
scans, _ := s.db.GetPDSScans(ctx, pds.ID, 10)
79
80
response := map[string]interface{}{
81
-
"id": pds.ID,
82
-
"endpoint": pds.Endpoint,
83
-
"discovered_at": pds.DiscoveredAt,
84
-
"last_checked": pds.LastChecked,
85
-
"status": statusToString(pds.Status),
86
-
"user_count": pds.UserCount,
87
"recent_scans": scans,
88
}
89
90
respondJSON(w, response)
91
}
92
93
-
func (s *Server) handleGetPDSStats(w http.ResponseWriter, r *http.Request) {
94
ctx := r.Context()
95
96
-
stats, err := s.db.GetPDSStats(ctx)
97
if err != nil {
98
http.Error(w, err.Error(), http.StatusInternalServerError)
99
return
···
101
102
respondJSON(w, stats)
103
}
104
105
func (s *Server) handleGetDID(w http.ResponseWriter, r *http.Request) {
106
ctx := r.Context()
···
196
respondJSON(w, history)
197
}
198
199
func (s *Server) handleGetPLCBundle(w http.ResponseWriter, r *http.Request) {
200
ctx := r.Context()
201
vars := mux.Vars(r)
···
218
"end_time": bundle.EndTime,
219
"operation_count": plc.BUNDLE_SIZE,
220
"did_count": len(bundle.DIDs),
221
-
"hash": bundle.Hash, // Uncompressed (verifiable)
222
-
"compressed_hash": bundle.CompressedHash, // File integrity
223
"compressed_size": bundle.CompressedSize,
224
"prev_bundle_hash": bundle.PrevBundleHash,
225
"created_at": bundle.CreatedAt,
···
348
}
349
}
350
351
func (s *Server) handleGetMempoolStats(w http.ResponseWriter, r *http.Request) {
352
ctx := r.Context()
353
···
403
respondJSON(w, response)
404
}
405
406
-
// Helper to load bundle operations - UPDATED FOR JSONL FORMAT
407
-
func (s *Server) loadBundleOperations(path string) ([]plc.PLCOperation, error) {
408
-
decoder, err := zstd.NewReader(nil)
409
-
if err != nil {
410
-
return nil, err
411
-
}
412
-
defer decoder.Close()
413
-
414
-
compressedData, err := os.ReadFile(path)
415
-
if err != nil {
416
-
return nil, err
417
-
}
418
-
419
-
decompressed, err := decoder.DecodeAll(compressedData, nil)
420
-
if err != nil {
421
-
return nil, err
422
-
}
423
-
424
-
// Parse JSONL (newline-delimited JSON)
425
-
var operations []plc.PLCOperation
426
-
scanner := bufio.NewScanner(bytes.NewReader(decompressed))
427
-
428
-
lineNum := 0
429
-
for scanner.Scan() {
430
-
lineNum++
431
-
line := scanner.Bytes()
432
-
433
-
// Skip empty lines
434
-
if len(line) == 0 {
435
-
continue
436
-
}
437
-
438
-
var op plc.PLCOperation
439
-
if err := json.Unmarshal(line, &op); err != nil {
440
-
return nil, fmt.Errorf("failed to parse operation on line %d: %w", lineNum, err)
441
-
}
442
-
443
-
// CRITICAL: Store the original raw JSON bytes
444
-
op.RawJSON = make([]byte, len(line))
445
-
copy(op.RawJSON, line)
446
-
447
-
operations = append(operations, op)
448
-
}
449
-
450
-
if err := scanner.Err(); err != nil {
451
-
return nil, fmt.Errorf("error reading JSONL: %w", err)
452
-
}
453
-
454
-
return operations, nil
455
-
}
456
457
func (s *Server) handleGetPLCMetrics(w http.ResponseWriter, r *http.Request) {
458
ctx := r.Context()
···
473
respondJSON(w, metrics)
474
}
475
476
-
func (s *Server) handleGetPLCBundles(w http.ResponseWriter, r *http.Request) {
477
-
ctx := r.Context()
478
-
479
-
limit := 50
480
-
if l := r.URL.Query().Get("limit"); l != "" {
481
-
if parsed, err := strconv.Atoi(l); err == nil {
482
-
limit = parsed
483
-
}
484
-
}
485
-
486
-
bundles, err := s.db.GetBundles(ctx, limit)
487
-
if err != nil {
488
-
http.Error(w, err.Error(), http.StatusInternalServerError)
489
-
return
490
-
}
491
-
492
-
response := make([]map[string]interface{}, len(bundles))
493
-
for i, bundle := range bundles {
494
-
response[i] = map[string]interface{}{
495
-
"plc_bundle_number": bundle.BundleNumber,
496
-
"start_time": bundle.StartTime,
497
-
"end_time": bundle.EndTime,
498
-
"operation_count": 10000,
499
-
"did_count": len(bundle.DIDs),
500
-
"hash": bundle.Hash,
501
-
"compressed_hash": bundle.CompressedHash,
502
-
"compressed_size": bundle.CompressedSize,
503
-
"prev_bundle_hash": bundle.PrevBundleHash,
504
-
}
505
-
}
506
-
507
-
respondJSON(w, response)
508
-
}
509
-
510
-
func (s *Server) handleGetPLCBundleStats(w http.ResponseWriter, r *http.Request) {
511
-
ctx := r.Context()
512
-
513
-
count, size, err := s.db.GetBundleStats(ctx)
514
-
if err != nil {
515
-
http.Error(w, err.Error(), http.StatusInternalServerError)
516
-
return
517
-
}
518
-
519
-
respondJSON(w, map[string]interface{}{
520
-
"plc_bundle_count": count,
521
-
"total_size": size,
522
-
"total_size_mb": float64(size) / 1024 / 1024,
523
-
})
524
-
}
525
-
526
-
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
527
-
respondJSON(w, map[string]string{"status": "ok"})
528
-
}
529
530
func (s *Server) handleVerifyPLCBundle(w http.ResponseWriter, r *http.Request) {
531
ctx := r.Context()
···
637
"verified": verified,
638
"local_hash": bundle.Hash,
639
"remote_hash": remoteHash,
640
-
"local_op_count": bundle.OperationCount,
641
"remote_op_count": len(allRemoteOps),
642
"boundary_cids_used": len(prevBoundaryCIDs),
643
})
···
736
"chain_start_time": firstBundle.StartTime,
737
"chain_end_time": lastBundleData.EndTime,
738
"chain_head_hash": lastBundleData.Hash,
739
-
"first_prev_hash": firstBundle.PrevBundleHash, // Should be empty
740
"last_prev_hash": lastBundleData.PrevBundleHash,
741
})
742
}
743
744
-
// handlePLCExport simulates PLC directory /export endpoint using cached bundles
745
func (s *Server) handlePLCExport(w http.ResponseWriter, r *http.Request) {
746
ctx := r.Context()
747
···
763
if afterStr != "" {
764
// Try multiple timestamp formats (from most specific to least)
765
formats := []string{
766
-
time.RFC3339Nano, // 2023-11-09T03:55:00.123456789Z
767
-
time.RFC3339, // 2023-11-09T03:55:00Z
768
-
"2006-01-02T15:04:05.000Z", // 2023-11-09T03:55:00.000Z
769
-
"2006-01-02T15:04:05", // 2023-11-09T03:55:00
770
-
"2006-01-02T15:04", // 2023-11-09T03:55
771
-
"2006-01-02", // 2023-11-09
772
}
773
774
var parsed time.Time
775
var parseErr error
776
-
parsed = time.Time{} // zero value
777
778
for _, format := range formats {
779
parsed, parseErr = time.Parse(format, afterStr)
···
867
}
868
}
869
870
-
// computeRemoteOperationsHash - matching format
871
func computeRemoteOperationsHash(ops []plc.PLCOperation) (string, error) {
872
var jsonlData []byte
873
for i, op := range ops {
···
884
return hex.EncodeToString(hash[:]), nil
885
}
886
887
-
func respondJSON(w http.ResponseWriter, data interface{}) {
888
-
w.Header().Set("Content-Type", "application/json")
889
-
json.NewEncoder(w).Encode(data)
890
-
}
891
-
892
-
// Helper function
893
func statusToString(status int) string {
894
switch status {
895
-
case storage.PDSStatusOnline:
896
return "online"
897
-
case storage.PDSStatusOffline:
898
return "offline"
899
default:
900
return "unknown"
901
}
902
}
···
20
"github.com/klauspost/compress/zstd"
21
)
22
23
+
// ====================
24
+
// Endpoint Handlers (new)
25
+
// ====================
26
+
27
+
func (s *Server) handleGetEndpoints(w http.ResponseWriter, r *http.Request) {
28
ctx := r.Context()
29
30
+
filter := &storage.EndpointFilter{}
31
+
32
+
if typ := r.URL.Query().Get("type"); typ != "" {
33
+
filter.Type = typ
34
+
}
35
36
if status := r.URL.Query().Get("status"); status != "" {
37
filter.Status = status
38
}
39
40
+
if minUserCount := r.URL.Query().Get("min_user_count"); minUserCount != "" {
41
+
if count, err := strconv.ParseInt(minUserCount, 10, 64); err == nil {
42
+
filter.MinUserCount = count
43
+
}
44
+
}
45
+
46
if limit := r.URL.Query().Get("limit"); limit != "" {
47
if l, err := strconv.Atoi(limit); err == nil {
48
filter.Limit = l
···
55
}
56
}
57
58
+
endpoints, err := s.db.GetEndpoints(ctx, filter)
59
if err != nil {
60
http.Error(w, err.Error(), http.StatusInternalServerError)
61
return
62
}
63
64
// Convert status codes to strings for API
65
+
response := make([]map[string]interface{}, len(endpoints))
66
+
for i, ep := range endpoints {
67
response[i] = map[string]interface{}{
68
+
"id": ep.ID,
69
+
"endpoint_type": ep.EndpointType,
70
+
"endpoint": ep.Endpoint,
71
+
"discovered_at": ep.DiscoveredAt,
72
+
"last_checked": ep.LastChecked,
73
+
"status": statusToString(ep.Status),
74
+
"user_count": ep.UserCount,
75
}
76
}
77
78
respondJSON(w, response)
79
}
80
81
+
func (s *Server) handleGetEndpoint(w http.ResponseWriter, r *http.Request) {
82
ctx := r.Context()
83
vars := mux.Vars(r)
84
endpoint := vars["endpoint"]
85
86
+
// Get type from query param, default to "pds" for backward compatibility
87
+
endpointType := r.URL.Query().Get("type")
88
+
if endpointType == "" {
89
+
endpointType = "pds"
90
+
}
91
+
92
+
ep, err := s.db.GetEndpoint(ctx, endpoint, endpointType)
93
if err != nil {
94
+
http.Error(w, "Endpoint not found", http.StatusNotFound)
95
return
96
}
97
98
// Get recent scans
99
+
scans, _ := s.db.GetEndpointScans(ctx, ep.ID, 10)
100
101
response := map[string]interface{}{
102
+
"id": ep.ID,
103
+
"endpoint_type": ep.EndpointType,
104
+
"endpoint": ep.Endpoint,
105
+
"discovered_at": ep.DiscoveredAt,
106
+
"last_checked": ep.LastChecked,
107
+
"status": statusToString(ep.Status),
108
+
"user_count": ep.UserCount,
109
"recent_scans": scans,
110
}
111
112
respondJSON(w, response)
113
}
114
115
+
func (s *Server) handleGetEndpointStats(w http.ResponseWriter, r *http.Request) {
116
ctx := r.Context()
117
118
+
stats, err := s.db.GetEndpointStats(ctx)
119
if err != nil {
120
http.Error(w, err.Error(), http.StatusInternalServerError)
121
return
···
123
124
respondJSON(w, stats)
125
}
126
+
127
+
// ====================
128
+
// DID Handlers
129
+
// ====================
130
131
func (s *Server) handleGetDID(w http.ResponseWriter, r *http.Request) {
132
ctx := r.Context()
···
222
respondJSON(w, history)
223
}
224
225
+
// ====================
226
+
// PLC Bundle Handlers
227
+
// ====================
228
+
229
func (s *Server) handleGetPLCBundle(w http.ResponseWriter, r *http.Request) {
230
ctx := r.Context()
231
vars := mux.Vars(r)
···
248
"end_time": bundle.EndTime,
249
"operation_count": plc.BUNDLE_SIZE,
250
"did_count": len(bundle.DIDs),
251
+
"hash": bundle.Hash,
252
+
"compressed_hash": bundle.CompressedHash,
253
"compressed_size": bundle.CompressedSize,
254
"prev_bundle_hash": bundle.PrevBundleHash,
255
"created_at": bundle.CreatedAt,
···
378
}
379
}
380
381
+
func (s *Server) handleGetPLCBundles(w http.ResponseWriter, r *http.Request) {
382
+
ctx := r.Context()
383
+
384
+
limit := 50
385
+
if l := r.URL.Query().Get("limit"); l != "" {
386
+
if parsed, err := strconv.Atoi(l); err == nil {
387
+
limit = parsed
388
+
}
389
+
}
390
+
391
+
bundles, err := s.db.GetBundles(ctx, limit)
392
+
if err != nil {
393
+
http.Error(w, err.Error(), http.StatusInternalServerError)
394
+
return
395
+
}
396
+
397
+
response := make([]map[string]interface{}, len(bundles))
398
+
for i, bundle := range bundles {
399
+
response[i] = map[string]interface{}{
400
+
"plc_bundle_number": bundle.BundleNumber,
401
+
"start_time": bundle.StartTime,
402
+
"end_time": bundle.EndTime,
403
+
"operation_count": plc.BUNDLE_SIZE,
404
+
"did_count": len(bundle.DIDs),
405
+
"hash": bundle.Hash,
406
+
"compressed_hash": bundle.CompressedHash,
407
+
"compressed_size": bundle.CompressedSize,
408
+
"prev_bundle_hash": bundle.PrevBundleHash,
409
+
}
410
+
}
411
+
412
+
respondJSON(w, response)
413
+
}
414
+
415
+
func (s *Server) handleGetPLCBundleStats(w http.ResponseWriter, r *http.Request) {
416
+
ctx := r.Context()
417
+
418
+
count, size, err := s.db.GetBundleStats(ctx)
419
+
if err != nil {
420
+
http.Error(w, err.Error(), http.StatusInternalServerError)
421
+
return
422
+
}
423
+
424
+
respondJSON(w, map[string]interface{}{
425
+
"plc_bundle_count": count,
426
+
"total_size": size,
427
+
"total_size_mb": float64(size) / 1024 / 1024,
428
+
})
429
+
}
430
+
431
+
// ====================
432
+
// Mempool Handlers
433
+
// ====================
434
+
435
func (s *Server) handleGetMempoolStats(w http.ResponseWriter, r *http.Request) {
436
ctx := r.Context()
437
···
487
respondJSON(w, response)
488
}
489
490
+
// ====================
491
+
// PLC Metrics Handlers
492
+
// ====================
493
494
func (s *Server) handleGetPLCMetrics(w http.ResponseWriter, r *http.Request) {
495
ctx := r.Context()
···
510
respondJSON(w, metrics)
511
}
512
513
+
// ====================
514
+
// Verification Handlers
515
+
// ====================
516
517
func (s *Server) handleVerifyPLCBundle(w http.ResponseWriter, r *http.Request) {
518
ctx := r.Context()
···
624
"verified": verified,
625
"local_hash": bundle.Hash,
626
"remote_hash": remoteHash,
627
+
"local_op_count": plc.BUNDLE_SIZE,
628
"remote_op_count": len(allRemoteOps),
629
"boundary_cids_used": len(prevBoundaryCIDs),
630
})
···
723
"chain_start_time": firstBundle.StartTime,
724
"chain_end_time": lastBundleData.EndTime,
725
"chain_head_hash": lastBundleData.Hash,
726
+
"first_prev_hash": firstBundle.PrevBundleHash,
727
"last_prev_hash": lastBundleData.PrevBundleHash,
728
})
729
}
730
731
+
// ====================
732
+
// PLC Export Handler
733
+
// ====================
734
+
735
func (s *Server) handlePLCExport(w http.ResponseWriter, r *http.Request) {
736
ctx := r.Context()
737
···
753
if afterStr != "" {
754
// Try multiple timestamp formats (from most specific to least)
755
formats := []string{
756
+
time.RFC3339Nano,
757
+
time.RFC3339,
758
+
"2006-01-02T15:04:05.000Z",
759
+
"2006-01-02T15:04:05",
760
+
"2006-01-02T15:04",
761
+
"2006-01-02",
762
}
763
764
var parsed time.Time
765
var parseErr error
766
+
parsed = time.Time{}
767
768
for _, format := range formats {
769
parsed, parseErr = time.Parse(format, afterStr)
···
857
}
858
}
859
860
+
// ====================
861
+
// Health Handler
862
+
// ====================
863
+
864
+
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
865
+
respondJSON(w, map[string]string{"status": "ok"})
866
+
}
867
+
868
+
// ====================
869
+
// Helper Functions
870
+
// ====================
871
+
872
+
// loadBundleOperations loads operations from a bundle file
873
+
func (s *Server) loadBundleOperations(path string) ([]plc.PLCOperation, error) {
874
+
decoder, err := zstd.NewReader(nil)
875
+
if err != nil {
876
+
return nil, err
877
+
}
878
+
defer decoder.Close()
879
+
880
+
compressedData, err := os.ReadFile(path)
881
+
if err != nil {
882
+
return nil, err
883
+
}
884
+
885
+
decompressed, err := decoder.DecodeAll(compressedData, nil)
886
+
if err != nil {
887
+
return nil, err
888
+
}
889
+
890
+
// Parse JSONL (newline-delimited JSON)
891
+
var operations []plc.PLCOperation
892
+
scanner := bufio.NewScanner(bytes.NewReader(decompressed))
893
+
894
+
lineNum := 0
895
+
for scanner.Scan() {
896
+
lineNum++
897
+
line := scanner.Bytes()
898
+
899
+
// Skip empty lines
900
+
if len(line) == 0 {
901
+
continue
902
+
}
903
+
904
+
var op plc.PLCOperation
905
+
if err := json.Unmarshal(line, &op); err != nil {
906
+
return nil, fmt.Errorf("failed to parse operation on line %d: %w", lineNum, err)
907
+
}
908
+
909
+
// CRITICAL: Store the original raw JSON bytes
910
+
op.RawJSON = make([]byte, len(line))
911
+
copy(op.RawJSON, line)
912
+
913
+
operations = append(operations, op)
914
+
}
915
+
916
+
if err := scanner.Err(); err != nil {
917
+
return nil, fmt.Errorf("error reading JSONL: %w", err)
918
+
}
919
+
920
+
return operations, nil
921
+
}
922
+
923
+
// computeRemoteOperationsHash computes hash for remote operations
924
func computeRemoteOperationsHash(ops []plc.PLCOperation) (string, error) {
925
var jsonlData []byte
926
for i, op := range ops {
···
937
return hex.EncodeToString(hash[:]), nil
938
}
939
940
+
// statusToString converts status int to string
941
func statusToString(status int) string {
942
switch status {
943
+
case storage.PDSStatusOnline: // Use PDSStatusOnline (alias)
944
return "online"
945
+
case storage.PDSStatusOffline: // Use PDSStatusOffline (alias)
946
return "offline"
947
default:
948
return "unknown"
949
}
950
}
951
+
952
+
// respondJSON writes JSON response
953
+
func respondJSON(w http.ResponseWriter, data interface{}) {
954
+
w.Header().Set("Content-Type", "application/json")
955
+
json.NewEncoder(w).Encode(data)
956
+
}
+13
-12
internal/api/server.go
+13
-12
internal/api/server.go
···
52
func (s *Server) setupRoutes() {
53
api := s.router.PathPrefix("/api/v1").Subrouter()
54
55
-
// PDS endpoints
56
-
api.HandleFunc("/pds", s.handleGetPDSList).Methods("GET")
57
-
api.HandleFunc("/pds/stats", s.handleGetPDSStats).Methods("GET")
58
-
api.HandleFunc("/pds/{endpoint}", s.handleGetPDS).Methods("GET")
59
60
-
// Metrics endpoints
61
-
api.HandleFunc("/metrics/plc", s.handleGetPLCMetrics).Methods("GET")
62
-
63
-
// PLC Bundle endpoints
64
api.HandleFunc("/plc/bundles", s.handleGetPLCBundles).Methods("GET")
65
api.HandleFunc("/plc/bundles/stats", s.handleGetPLCBundleStats).Methods("GET")
66
api.HandleFunc("/plc/bundles/chain", s.handleGetChainInfo).Methods("GET")
67
api.HandleFunc("/plc/bundles/verify-chain", s.handleVerifyChain).Methods("POST")
68
api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET")
69
api.HandleFunc("/plc/bundles/{number}/download", s.handleDownloadPLCBundle).Methods("GET")
70
api.HandleFunc("/plc/bundles/{bundleNumber}/verify", s.handleVerifyPLCBundle).Methods("POST")
71
-
api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET")
72
api.HandleFunc("/plc/export", s.handlePLCExport).Methods("GET")
73
74
-
// PLC/DID endpoints
75
api.HandleFunc("/plc/did/{did}", s.handleGetDID).Methods("GET")
76
api.HandleFunc("/plc/did/{did}/history", s.handleGetDIDHistory).Methods("GET")
77
78
-
// Mempool endpoint - NEW
79
api.HandleFunc("/mempool/stats", s.handleGetMempoolStats).Methods("GET")
80
81
-
// Chain verification - NEW
82
83
// Health check
84
s.router.HandleFunc("/health", s.handleHealth).Methods("GET")
85
}
86
func (s *Server) Start() error {
87
log.Info("API server listening on %s", s.server.Addr)
88
return s.server.ListenAndServe()
···
52
func (s *Server) setupRoutes() {
53
api := s.router.PathPrefix("/api/v1").Subrouter()
54
55
+
// Endpoint routes (replaces PDS routes)
56
+
api.HandleFunc("/endpoints", s.handleGetEndpoints).Methods("GET")
57
+
api.HandleFunc("/endpoints/stats", s.handleGetEndpointStats).Methods("GET")
58
+
api.HandleFunc("/endpoints/{endpoint}", s.handleGetEndpoint).Methods("GET")
59
60
+
// PLC Bundle routes
61
api.HandleFunc("/plc/bundles", s.handleGetPLCBundles).Methods("GET")
62
api.HandleFunc("/plc/bundles/stats", s.handleGetPLCBundleStats).Methods("GET")
63
api.HandleFunc("/plc/bundles/chain", s.handleGetChainInfo).Methods("GET")
64
api.HandleFunc("/plc/bundles/verify-chain", s.handleVerifyChain).Methods("POST")
65
+
api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET")
66
api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET")
67
api.HandleFunc("/plc/bundles/{number}/download", s.handleDownloadPLCBundle).Methods("GET")
68
api.HandleFunc("/plc/bundles/{bundleNumber}/verify", s.handleVerifyPLCBundle).Methods("POST")
69
+
70
+
// PLC Export endpoint (simulates PLC directory)
71
api.HandleFunc("/plc/export", s.handlePLCExport).Methods("GET")
72
73
+
// DID routes
74
api.HandleFunc("/plc/did/{did}", s.handleGetDID).Methods("GET")
75
api.HandleFunc("/plc/did/{did}/history", s.handleGetDIDHistory).Methods("GET")
76
77
+
// Mempool routes
78
api.HandleFunc("/mempool/stats", s.handleGetMempoolStats).Methods("GET")
79
80
+
// Metrics routes
81
+
api.HandleFunc("/metrics/plc", s.handleGetPLCMetrics).Methods("GET")
82
83
// Health check
84
s.router.HandleFunc("/health", s.handleHealth).Methods("GET")
85
}
86
+
87
func (s *Server) Start() error {
88
log.Info("API server listening on %s", s.server.Addr)
89
return s.server.ListenAndServe()
+13
-10
internal/pds/scanner.go
+13
-10
internal/pds/scanner.go
···
29
startTime := time.Now()
30
log.Info("Starting PDS availability scan...")
31
32
-
servers, err := s.db.GetPDSServers(ctx, nil)
33
if err != nil {
34
return err
35
}
···
37
log.Info("Scanning %d PDS servers...", len(servers))
38
39
// Worker pool
40
-
jobs := make(chan *storage.PDS, len(servers))
41
results := make(chan *PDSStatus, len(servers))
42
43
var wg sync.WaitGroup
···
74
}
75
76
// Build scan data
77
-
scanData := &storage.PDSScanData{
78
ServerInfo: status.Description,
79
DIDs: status.DIDs,
80
DIDCount: len(status.DIDs),
81
}
82
83
-
// Update using PDS ID
84
-
if err := s.db.UpdatePDSStatus(ctx, status.PDSID, &storage.PDSUpdate{
85
Status: statusCode,
86
LastChecked: status.LastChecked,
87
ResponseTime: status.ResponseTime.Seconds() * 1000, // Convert to ms
88
ScanData: scanData,
89
}); err != nil {
90
-
log.Error("Error updating PDS ID %d: %v", status.PDSID, err)
91
}
92
93
if status.Available {
···
104
return nil
105
}
106
107
-
func (s *Scanner) worker(ctx context.Context, jobs <-chan *storage.PDS, results chan<- *PDSStatus) {
108
for server := range jobs {
109
select {
110
case <-ctx.Done():
···
116
}
117
}
118
119
-
func (s *Scanner) scanPDS(ctx context.Context, pdsID int64, endpoint string) *PDSStatus {
120
status := &PDSStatus{
121
-
PDSID: pdsID, // Store ID
122
Endpoint: endpoint,
123
LastChecked: time.Now(),
124
}
···
146
status.Description = desc
147
}
148
149
-
// List repos (DIDs)
150
/*dids, err := s.client.ListRepos(ctx, endpoint)
151
if err != nil {
152
log.Verbose("Warning: failed to list repos for %s: %v", endpoint, err)
···
29
startTime := time.Now()
30
log.Info("Starting PDS availability scan...")
31
32
+
// Get only PDS endpoints
33
+
servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{
34
+
Type: "pds",
35
+
})
36
if err != nil {
37
return err
38
}
···
40
log.Info("Scanning %d PDS servers...", len(servers))
41
42
// Worker pool
43
+
jobs := make(chan *storage.Endpoint, len(servers))
44
results := make(chan *PDSStatus, len(servers))
45
46
var wg sync.WaitGroup
···
77
}
78
79
// Build scan data
80
+
scanData := &storage.EndpointScanData{
81
ServerInfo: status.Description,
82
DIDs: status.DIDs,
83
DIDCount: len(status.DIDs),
84
}
85
86
+
// Update using Endpoint ID
87
+
if err := s.db.UpdateEndpointStatus(ctx, status.EndpointID, &storage.EndpointUpdate{
88
Status: statusCode,
89
LastChecked: status.LastChecked,
90
ResponseTime: status.ResponseTime.Seconds() * 1000, // Convert to ms
91
ScanData: scanData,
92
}); err != nil {
93
+
log.Error("Error updating endpoint ID %d: %v", status.EndpointID, err)
94
}
95
96
if status.Available {
···
107
return nil
108
}
109
110
+
func (s *Scanner) worker(ctx context.Context, jobs <-chan *storage.Endpoint, results chan<- *PDSStatus) {
111
for server := range jobs {
112
select {
113
case <-ctx.Done():
···
119
}
120
}
121
122
+
func (s *Scanner) scanPDS(ctx context.Context, endpointID int64, endpoint string) *PDSStatus {
123
status := &PDSStatus{
124
+
EndpointID: endpointID, // Store Endpoint ID
125
Endpoint: endpoint,
126
LastChecked: time.Now(),
127
}
···
149
status.Description = desc
150
}
151
152
+
// Optionally list repos (DIDs) - commented out by default for performance
153
/*dids, err := s.client.ListRepos(ctx, endpoint)
154
if err != nil {
155
log.Verbose("Warning: failed to list repos for %s: %v", endpoint, err)
+1
-1
internal/pds/types.go
+1
-1
internal/pds/types.go
+50
-22
internal/plc/scanner.go
+50
-22
internal/plc/scanner.go
···
4
"context"
5
"encoding/json"
6
"fmt"
7
"time"
8
9
-
"github.com/acarl005/stripansi"
10
"github.com/atscan/atscanner/internal/config"
11
"github.com/atscan/atscanner/internal/log"
12
"github.com/atscan/atscanner/internal/storage"
···
373
374
// processBatch processes operations for PDS discovery
375
func (s *Scanner) processBatch(ctx context.Context, operations []PLCOperation) (int64, error) {
376
-
newPDSCount := int64(0)
377
-
seenInBatch := make(map[string]*PLCOperation)
378
379
for _, op := range operations {
380
if op.IsNullified() {
381
continue
382
}
383
384
-
pdsEndpoint := s.extractPDSFromOperation(op)
385
-
if pdsEndpoint == "" {
386
-
continue
387
}
388
389
-
if _, seen := seenInBatch[pdsEndpoint]; !seen {
390
-
seenInBatch[pdsEndpoint] = &op
391
-
}
392
-
}
393
394
-
for pdsEndpoint, firstOp := range seenInBatch {
395
-
exists, err := s.db.PDSExists(ctx, pdsEndpoint)
396
if err != nil || exists {
397
continue
398
}
399
400
-
if err := s.db.UpsertPDS(ctx, &storage.PDS{
401
-
Endpoint: pdsEndpoint,
402
DiscoveredAt: firstOp.CreatedAt,
403
LastChecked: time.Time{},
404
-
Status: storage.PDSStatusUnknown,
405
}); err != nil {
406
-
log.Error("Error storing PDS %s: %v", stripansi.Strip(pdsEndpoint), err)
407
continue
408
}
409
410
-
log.Info("✓ Discovered new PDS: %s", stripansi.Strip(pdsEndpoint))
411
-
newPDSCount++
412
}
413
414
-
return newPDSCount, nil
415
}
416
417
-
func (s *Scanner) extractPDSFromOperation(op PLCOperation) string {
418
if services, ok := op.Operation["services"].(map[string]interface{}); ok {
419
if atprotoPDS, ok := services["atproto_pds"].(map[string]interface{}); ok {
420
if endpoint, ok := atprotoPDS["endpoint"].(string); ok {
421
if svcType, ok := atprotoPDS["type"].(string); ok {
422
if svcType == "AtprotoPersonalDataServer" {
423
-
return endpoint
424
}
425
}
426
}
427
}
428
}
429
-
return ""
430
}
431
432
func contains(s, substr string) bool {
···
4
"context"
5
"encoding/json"
6
"fmt"
7
+
"strings"
8
"time"
9
10
"github.com/atscan/atscanner/internal/config"
11
"github.com/atscan/atscanner/internal/log"
12
"github.com/atscan/atscanner/internal/storage"
···
373
374
// processBatch processes operations for PDS discovery
375
func (s *Scanner) processBatch(ctx context.Context, operations []PLCOperation) (int64, error) {
376
+
newEndpointCount := int64(0)
377
+
seenInBatch := make(map[string]*PLCOperation) // key: "type:endpoint"
378
379
for _, op := range operations {
380
if op.IsNullified() {
381
continue
382
}
383
384
+
endpoints := s.extractEndpointsFromOperation(op)
385
+
for _, ep := range endpoints {
386
+
key := fmt.Sprintf("%s:%s", ep.Type, ep.Endpoint)
387
+
if _, seen := seenInBatch[key]; !seen {
388
+
seenInBatch[key] = &op
389
+
}
390
}
391
+
}
392
393
+
for key, firstOp := range seenInBatch {
394
+
parts := strings.SplitN(key, ":", 2)
395
+
endpointType := parts[0]
396
+
endpoint := parts[1]
397
398
+
exists, err := s.db.EndpointExists(ctx, endpoint, endpointType)
399
if err != nil || exists {
400
continue
401
}
402
403
+
if err := s.db.UpsertEndpoint(ctx, &storage.Endpoint{
404
+
EndpointType: endpointType,
405
+
Endpoint: endpoint,
406
DiscoveredAt: firstOp.CreatedAt,
407
LastChecked: time.Time{},
408
+
Status: storage.EndpointStatusUnknown,
409
}); err != nil {
410
+
log.Error("Error storing %s endpoint %s: %v", endpointType, endpoint, err)
411
continue
412
}
413
414
+
log.Info("✓ Discovered new %s endpoint: %s", endpointType, endpoint)
415
+
newEndpointCount++
416
}
417
418
+
return newEndpointCount, nil
419
}
420
421
+
// extractEndpointsFromOperation extracts ALL service endpoints
422
+
func (s *Scanner) extractEndpointsFromOperation(op PLCOperation) []EndpointInfo {
423
+
var endpoints []EndpointInfo
424
+
425
if services, ok := op.Operation["services"].(map[string]interface{}); ok {
426
+
// Extract PDS
427
if atprotoPDS, ok := services["atproto_pds"].(map[string]interface{}); ok {
428
if endpoint, ok := atprotoPDS["endpoint"].(string); ok {
429
if svcType, ok := atprotoPDS["type"].(string); ok {
430
if svcType == "AtprotoPersonalDataServer" {
431
+
endpoints = append(endpoints, EndpointInfo{
432
+
Type: "pds",
433
+
Endpoint: endpoint,
434
+
})
435
+
}
436
+
}
437
+
}
438
+
}
439
+
440
+
// Extract Labeler
441
+
if atprotoLabeler, ok := services["atproto_labeler"].(map[string]interface{}); ok {
442
+
if endpoint, ok := atprotoLabeler["endpoint"].(string); ok {
443
+
if svcType, ok := atprotoLabeler["type"].(string); ok {
444
+
if svcType == "AtprotoLabeler" {
445
+
endpoints = append(endpoints, EndpointInfo{
446
+
Type: "labeler",
447
+
Endpoint: endpoint,
448
+
})
449
}
450
}
451
}
452
}
453
+
454
+
// Add more service types as needed...
455
}
456
+
457
+
return endpoints
458
}
459
460
func contains(s, substr string) bool {
+5
internal/plc/types.go
+5
internal/plc/types.go
+10
-11
internal/storage/db.go
+10
-11
internal/storage/db.go
···
9
Close() error
10
Migrate() error
11
12
-
// PDS operations
13
-
UpsertPDS(ctx context.Context, pds *PDS) error
14
-
GetPDS(ctx context.Context, endpoint string) (*PDS, error)
15
-
GetPDSByID(ctx context.Context, id int64) (*PDS, error)
16
-
GetPDSServers(ctx context.Context, filter *PDSFilter) ([]*PDS, error)
17
-
UpdatePDSStatus(ctx context.Context, pdsID int64, update *PDSUpdate) error
18
-
PDSExists(ctx context.Context, endpoint string) (bool, error)
19
-
GetPDSIDByEndpoint(ctx context.Context, endpoint string) (int64, error)
20
-
GetPDSScans(ctx context.Context, pdsID int64, limit int) ([]*PDSScan, error)
21
22
// Cursor operations
23
GetScanCursor(ctx context.Context, source string) (*ScanCursor, error)
···
26
// Bundle operations
27
CreateBundle(ctx context.Context, bundle *PLCBundle) error
28
GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error)
29
-
// GetBundleByID removed - bundle_number IS the ID
30
GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error)
31
GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error)
32
GetBundleStats(ctx context.Context) (int64, int64, error)
···
44
// Metrics
45
StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error
46
GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error)
47
-
GetPDSStats(ctx context.Context) (*PDSStats, error)
48
}
···
9
Close() error
10
Migrate() error
11
12
+
// Endpoint operations (renamed from PDS)
13
+
UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error
14
+
GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error)
15
+
GetEndpointByID(ctx context.Context, id int64) (*Endpoint, error)
16
+
GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error)
17
+
UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error
18
+
EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error)
19
+
GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error)
20
+
GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error)
21
22
// Cursor operations
23
GetScanCursor(ctx context.Context, source string) (*ScanCursor, error)
···
26
// Bundle operations
27
CreateBundle(ctx context.Context, bundle *PLCBundle) error
28
GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error)
29
GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error)
30
GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error)
31
GetBundleStats(ctx context.Context) (int64, int64, error)
···
43
// Metrics
44
StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error
45
GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error)
46
+
GetEndpointStats(ctx context.Context) (*EndpointStats, error)
47
}
+121
-85
internal/storage/sqlite.go
+121
-85
internal/storage/sqlite.go
···
35
36
func (s *SQLiteDB) Migrate() error {
37
schema := `
38
-
-- PDS tables (same as before)
39
-
CREATE TABLE IF NOT EXISTS pds_servers (
40
id INTEGER PRIMARY KEY AUTOINCREMENT,
41
-
endpoint TEXT UNIQUE NOT NULL,
42
discovered_at TIMESTAMP NOT NULL,
43
last_checked TIMESTAMP,
44
status INTEGER DEFAULT 0,
45
user_count INTEGER DEFAULT 0,
46
-
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
47
);
48
49
-
CREATE INDEX IF NOT EXISTS idx_pds_endpoint ON pds_servers(endpoint);
50
-
CREATE INDEX IF NOT EXISTS idx_pds_status ON pds_servers(status);
51
-
CREATE INDEX IF NOT EXISTS idx_pds_user_count ON pds_servers(user_count);
52
53
CREATE TABLE IF NOT EXISTS pds_scans (
54
id INTEGER PRIMARY KEY AUTOINCREMENT,
55
pds_id INTEGER NOT NULL,
···
57
response_time REAL,
58
scan_data TEXT,
59
scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
60
-
FOREIGN KEY (pds_id) REFERENCES pds_servers(id) ON DELETE CASCADE
61
);
62
63
CREATE INDEX IF NOT EXISTS idx_pds_scans_pds_id ON pds_scans(pds_id);
···
501
return count, totalSize, err
502
}
503
504
-
// UpsertPDS inserts or updates a PDS server
505
-
func (s *SQLiteDB) UpsertPDS(ctx context.Context, pds *PDS) error {
506
query := `
507
-
INSERT INTO pds_servers (endpoint, discovered_at, last_checked, status)
508
-
VALUES (?, ?, ?, ?)
509
-
ON CONFLICT(endpoint) DO UPDATE SET
510
last_checked = excluded.last_checked
511
RETURNING id
512
`
513
-
err := s.db.QueryRowContext(ctx, query, pds.Endpoint, pds.DiscoveredAt, pds.LastChecked, pds.Status).Scan(&pds.ID)
514
return err
515
}
516
517
-
// PDSExists checks if a PDS endpoint already exists
518
-
func (s *SQLiteDB) PDSExists(ctx context.Context, endpoint string) (bool, error) {
519
-
query := "SELECT EXISTS(SELECT 1 FROM pds_servers WHERE endpoint = ?)"
520
var exists bool
521
-
err := s.db.QueryRowContext(ctx, query, endpoint).Scan(&exists)
522
return exists, err
523
}
524
525
-
// GetPDSIDByEndpoint gets the ID for an endpoint
526
-
func (s *SQLiteDB) GetPDSIDByEndpoint(ctx context.Context, endpoint string) (int64, error) {
527
-
query := "SELECT id FROM pds_servers WHERE endpoint = ?"
528
var id int64
529
-
err := s.db.QueryRowContext(ctx, query, endpoint).Scan(&id)
530
return id, err
531
}
532
533
-
// GetPDS retrieves a PDS by endpoint
534
-
func (s *SQLiteDB) GetPDS(ctx context.Context, endpoint string) (*PDS, error) {
535
query := `
536
-
SELECT id, endpoint, discovered_at, last_checked, status, user_count, updated_at
537
-
FROM pds_servers
538
-
WHERE endpoint = ?
539
`
540
541
-
var pds PDS
542
var lastChecked sql.NullTime
543
544
-
err := s.db.QueryRowContext(ctx, query, endpoint).Scan(
545
-
&pds.ID, &pds.Endpoint, &pds.DiscoveredAt, &lastChecked,
546
-
&pds.Status, &pds.UserCount, &pds.UpdatedAt,
547
)
548
if err != nil {
549
return nil, err
550
}
551
552
if lastChecked.Valid {
553
-
pds.LastChecked = lastChecked.Time
554
}
555
556
-
return &pds, nil
557
}
558
559
-
// GetPDSByID retrieves a PDS by ID
560
-
func (s *SQLiteDB) GetPDSByID(ctx context.Context, id int64) (*PDS, error) {
561
query := `
562
-
SELECT id, endpoint, discovered_at, last_checked, status, user_count, updated_at
563
-
FROM pds_servers
564
WHERE id = ?
565
`
566
567
-
var pds PDS
568
var lastChecked sql.NullTime
569
570
err := s.db.QueryRowContext(ctx, query, id).Scan(
571
-
&pds.ID, &pds.Endpoint, &pds.DiscoveredAt, &lastChecked,
572
-
&pds.Status, &pds.UserCount, &pds.UpdatedAt,
573
)
574
if err != nil {
575
return nil, err
576
}
577
578
if lastChecked.Valid {
579
-
pds.LastChecked = lastChecked.Time
580
}
581
582
-
return &pds, nil
583
}
584
585
-
// GetPDSServers retrieves multiple PDS servers
586
-
func (s *SQLiteDB) GetPDSServers(ctx context.Context, filter *PDSFilter) ([]*PDS, error) {
587
query := `
588
-
SELECT id, endpoint, discovered_at, last_checked, status, user_count, updated_at
589
-
FROM pds_servers
590
`
591
args := []interface{}{}
592
593
-
if filter != nil && filter.Status != "" {
594
-
// Map string status to int
595
-
statusInt := PDSStatusUnknown
596
-
switch filter.Status {
597
-
case "online":
598
-
statusInt = PDSStatusOnline
599
-
case "offline":
600
-
statusInt = PDSStatusOffline
601
}
602
-
query += " WHERE status = ?"
603
-
args = append(args, statusInt)
604
}
605
606
query += " ORDER BY user_count DESC"
···
615
}
616
defer rows.Close()
617
618
-
var servers []*PDS
619
for rows.Next() {
620
-
var pds PDS
621
var lastChecked sql.NullTime
622
623
err := rows.Scan(
624
-
&pds.ID, &pds.Endpoint, &pds.DiscoveredAt, &lastChecked,
625
-
&pds.Status, &pds.UserCount, &pds.UpdatedAt,
626
)
627
if err != nil {
628
return nil, err
629
}
630
631
if lastChecked.Valid {
632
-
pds.LastChecked = lastChecked.Time
633
}
634
635
-
servers = append(servers, &pds)
636
}
637
638
-
return servers, rows.Err()
639
}
640
641
-
// UpdatePDSStatus updates the status and creates a scan record
642
-
func (s *SQLiteDB) UpdatePDSStatus(ctx context.Context, pdsID int64, update *PDSUpdate) error {
643
tx, err := s.db.BeginTx(ctx, nil)
644
if err != nil {
645
return err
···
652
userCount = update.ScanData.DIDCount
653
}
654
655
-
// Update main pds_servers record
656
query := `
657
-
UPDATE pds_servers
658
SET status = ?, last_checked = ?, user_count = ?, updated_at = ?
659
WHERE id = ?
660
`
661
-
_, err = tx.ExecContext(ctx, query, update.Status, update.LastChecked, userCount, time.Now(), pdsID)
662
if err != nil {
663
return err
664
}
···
669
scanDataJSON, _ = json.Marshal(update.ScanData)
670
}
671
672
-
// Insert scan history
673
scanQuery := `
674
INSERT INTO pds_scans (pds_id, status, response_time, scan_data)
675
VALUES (?, ?, ?, ?)
676
`
677
-
_, err = tx.ExecContext(ctx, scanQuery, pdsID, update.Status, update.ResponseTime, string(scanDataJSON))
678
if err != nil {
679
return err
680
}
···
682
return tx.Commit()
683
}
684
685
-
// GetPDSScans retrieves scan history for a PDS
686
-
func (s *SQLiteDB) GetPDSScans(ctx context.Context, pdsID int64, limit int) ([]*PDSScan, error) {
687
query := `
688
SELECT id, pds_id, status, response_time, scan_data, scanned_at
689
FROM pds_scans
···
692
LIMIT ?
693
`
694
695
-
rows, err := s.db.QueryContext(ctx, query, pdsID, limit)
696
if err != nil {
697
return nil, err
698
}
699
defer rows.Close()
700
701
-
var scans []*PDSScan
702
for rows.Next() {
703
-
var scan PDSScan
704
var responseTime sql.NullFloat64
705
var scanDataJSON sql.NullString
706
707
-
err := rows.Scan(&scan.ID, &scan.PDSID, &scan.Status, &responseTime, &scanDataJSON, &scan.ScannedAt)
708
if err != nil {
709
return nil, err
710
}
···
714
}
715
716
if scanDataJSON.Valid && scanDataJSON.String != "" {
717
-
var scanData PDSScanData
718
if err := json.Unmarshal([]byte(scanDataJSON.String), &scanData); err == nil {
719
scan.ScanData = &scanData
720
}
···
726
return scans, rows.Err()
727
}
728
729
-
// GetPDSStats returns aggregate statistics
730
-
func (s *SQLiteDB) GetPDSStats(ctx context.Context) (*PDSStats, error) {
731
query := `
732
SELECT
733
-
COUNT(*) as total_pds,
734
-
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_pds,
735
-
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_pds,
736
(SELECT AVG(response_time) FROM pds_scans WHERE response_time > 0
737
AND scanned_at > datetime('now', '-1 hour')) as avg_response_time,
738
SUM(user_count) as total_dids
739
-
FROM pds_servers
740
`
741
742
-
var stats PDSStats
743
var avgResponseTime sql.NullFloat64
744
745
err := s.db.QueryRowContext(ctx, query).Scan(
746
-
&stats.TotalPDS, &stats.OnlinePDS, &stats.OfflinePDS, &avgResponseTime, &stats.TotalDIDs,
747
)
748
749
if avgResponseTime.Valid {
750
stats.AvgResponseTime = avgResponseTime.Float64
751
}
752
753
return &stats, err
···
35
36
func (s *SQLiteDB) Migrate() error {
37
schema := `
38
+
-- Endpoints table (replaces pds_servers)
39
+
CREATE TABLE IF NOT EXISTS endpoints (
40
id INTEGER PRIMARY KEY AUTOINCREMENT,
41
+
endpoint_type TEXT NOT NULL DEFAULT 'pds',
42
+
endpoint TEXT NOT NULL,
43
discovered_at TIMESTAMP NOT NULL,
44
last_checked TIMESTAMP,
45
status INTEGER DEFAULT 0,
46
user_count INTEGER DEFAULT 0,
47
+
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
48
+
UNIQUE(endpoint_type, endpoint)
49
);
50
51
+
CREATE INDEX IF NOT EXISTS idx_endpoints_type_endpoint ON endpoints(endpoint_type, endpoint);
52
+
CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status);
53
+
CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type);
54
+
CREATE INDEX IF NOT EXISTS idx_endpoints_user_count ON endpoints(user_count);
55
56
+
-- Keep pds_scans table (or rename to endpoint_scans later)
57
CREATE TABLE IF NOT EXISTS pds_scans (
58
id INTEGER PRIMARY KEY AUTOINCREMENT,
59
pds_id INTEGER NOT NULL,
···
61
response_time REAL,
62
scan_data TEXT,
63
scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
64
+
FOREIGN KEY (pds_id) REFERENCES endpoints(id) ON DELETE CASCADE
65
);
66
67
CREATE INDEX IF NOT EXISTS idx_pds_scans_pds_id ON pds_scans(pds_id);
···
505
return count, totalSize, err
506
}
507
508
+
// UpsertEndpoint inserts or updates an endpoint
509
+
func (s *SQLiteDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error {
510
query := `
511
+
INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status)
512
+
VALUES (?, ?, ?, ?, ?)
513
+
ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET
514
last_checked = excluded.last_checked
515
RETURNING id
516
`
517
+
err := s.db.QueryRowContext(ctx, query,
518
+
endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt,
519
+
endpoint.LastChecked, endpoint.Status).Scan(&endpoint.ID)
520
return err
521
}
522
523
+
// EndpointExists checks if an endpoint already exists
524
+
func (s *SQLiteDB) EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) {
525
+
query := "SELECT EXISTS(SELECT 1 FROM endpoints WHERE endpoint = ? AND endpoint_type = ?)"
526
var exists bool
527
+
err := s.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&exists)
528
return exists, err
529
}
530
531
+
// GetEndpointIDByEndpoint gets the ID for an endpoint
532
+
func (s *SQLiteDB) GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) {
533
+
query := "SELECT id FROM endpoints WHERE endpoint = ? AND endpoint_type = ?"
534
var id int64
535
+
err := s.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&id)
536
return id, err
537
}
538
539
+
// GetEndpoint retrieves an endpoint by endpoint string and type
540
+
func (s *SQLiteDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) {
541
query := `
542
+
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at
543
+
FROM endpoints
544
+
WHERE endpoint = ? AND endpoint_type = ?
545
`
546
547
+
var ep Endpoint
548
var lastChecked sql.NullTime
549
550
+
err := s.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(
551
+
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
552
+
&ep.Status, &ep.UserCount, &ep.UpdatedAt,
553
)
554
if err != nil {
555
return nil, err
556
}
557
558
if lastChecked.Valid {
559
+
ep.LastChecked = lastChecked.Time
560
}
561
562
+
return &ep, nil
563
}
564
565
+
// GetEndpointByID retrieves an endpoint by ID
566
+
func (s *SQLiteDB) GetEndpointByID(ctx context.Context, id int64) (*Endpoint, error) {
567
query := `
568
+
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at
569
+
FROM endpoints
570
WHERE id = ?
571
`
572
573
+
var ep Endpoint
574
var lastChecked sql.NullTime
575
576
err := s.db.QueryRowContext(ctx, query, id).Scan(
577
+
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
578
+
&ep.Status, &ep.UserCount, &ep.UpdatedAt,
579
)
580
if err != nil {
581
return nil, err
582
}
583
584
if lastChecked.Valid {
585
+
ep.LastChecked = lastChecked.Time
586
}
587
588
+
return &ep, nil
589
}
590
591
+
// GetEndpoints retrieves multiple endpoints
592
+
func (s *SQLiteDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) {
593
query := `
594
+
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at
595
+
FROM endpoints
596
+
WHERE 1=1
597
`
598
args := []interface{}{}
599
600
+
if filter != nil {
601
+
if filter.Type != "" {
602
+
query += " AND endpoint_type = ?"
603
+
args = append(args, filter.Type)
604
}
605
+
if filter.Status != "" {
606
+
statusInt := EndpointStatusUnknown
607
+
switch filter.Status {
608
+
case "online":
609
+
statusInt = EndpointStatusOnline
610
+
case "offline":
611
+
statusInt = EndpointStatusOffline
612
+
}
613
+
query += " AND status = ?"
614
+
args = append(args, statusInt)
615
+
}
616
+
if filter.MinUserCount > 0 {
617
+
query += " AND user_count >= ?"
618
+
args = append(args, filter.MinUserCount)
619
+
}
620
}
621
622
query += " ORDER BY user_count DESC"
···
631
}
632
defer rows.Close()
633
634
+
var endpoints []*Endpoint
635
for rows.Next() {
636
+
var ep Endpoint
637
var lastChecked sql.NullTime
638
639
err := rows.Scan(
640
+
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
641
+
&ep.Status, &ep.UserCount, &ep.UpdatedAt,
642
)
643
if err != nil {
644
return nil, err
645
}
646
647
if lastChecked.Valid {
648
+
ep.LastChecked = lastChecked.Time
649
}
650
651
+
endpoints = append(endpoints, &ep)
652
}
653
654
+
return endpoints, rows.Err()
655
}
656
657
+
// UpdateEndpointStatus updates the status and creates a scan record
658
+
func (s *SQLiteDB) UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error {
659
tx, err := s.db.BeginTx(ctx, nil)
660
if err != nil {
661
return err
···
668
userCount = update.ScanData.DIDCount
669
}
670
671
+
// Update main endpoints record
672
query := `
673
+
UPDATE endpoints
674
SET status = ?, last_checked = ?, user_count = ?, updated_at = ?
675
WHERE id = ?
676
`
677
+
_, err = tx.ExecContext(ctx, query, update.Status, update.LastChecked, userCount, time.Now(), endpointID)
678
if err != nil {
679
return err
680
}
···
685
scanDataJSON, _ = json.Marshal(update.ScanData)
686
}
687
688
+
// Insert scan history (reuse pds_scans table or rename it to endpoint_scans)
689
scanQuery := `
690
INSERT INTO pds_scans (pds_id, status, response_time, scan_data)
691
VALUES (?, ?, ?, ?)
692
`
693
+
_, err = tx.ExecContext(ctx, scanQuery, endpointID, update.Status, update.ResponseTime, string(scanDataJSON))
694
if err != nil {
695
return err
696
}
···
698
return tx.Commit()
699
}
700
701
+
// GetEndpointScans retrieves scan history for an endpoint
702
+
func (s *SQLiteDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) {
703
query := `
704
SELECT id, pds_id, status, response_time, scan_data, scanned_at
705
FROM pds_scans
···
708
LIMIT ?
709
`
710
711
+
rows, err := s.db.QueryContext(ctx, query, endpointID, limit)
712
if err != nil {
713
return nil, err
714
}
715
defer rows.Close()
716
717
+
var scans []*EndpointScan
718
for rows.Next() {
719
+
var scan EndpointScan
720
var responseTime sql.NullFloat64
721
var scanDataJSON sql.NullString
722
723
+
err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &scanDataJSON, &scan.ScannedAt)
724
if err != nil {
725
return nil, err
726
}
···
730
}
731
732
if scanDataJSON.Valid && scanDataJSON.String != "" {
733
+
var scanData EndpointScanData
734
if err := json.Unmarshal([]byte(scanDataJSON.String), &scanData); err == nil {
735
scan.ScanData = &scanData
736
}
···
742
return scans, rows.Err()
743
}
744
745
+
// GetEndpointStats returns aggregate statistics about all endpoints
746
+
func (s *SQLiteDB) GetEndpointStats(ctx context.Context) (*EndpointStats, error) {
747
query := `
748
SELECT
749
+
COUNT(*) as total_endpoints,
750
+
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_endpoints,
751
+
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_endpoints,
752
(SELECT AVG(response_time) FROM pds_scans WHERE response_time > 0
753
AND scanned_at > datetime('now', '-1 hour')) as avg_response_time,
754
SUM(user_count) as total_dids
755
+
FROM endpoints
756
`
757
758
+
var stats EndpointStats
759
var avgResponseTime sql.NullFloat64
760
761
err := s.db.QueryRowContext(ctx, query).Scan(
762
+
&stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints,
763
+
&avgResponseTime, &stats.TotalDIDs,
764
)
765
766
if avgResponseTime.Valid {
767
stats.AvgResponseTime = avgResponseTime.Float64
768
+
}
769
+
770
+
// Get counts by type
771
+
typeQuery := `
772
+
SELECT endpoint_type, COUNT(*)
773
+
FROM endpoints
774
+
GROUP BY endpoint_type
775
+
`
776
+
rows, err := s.db.QueryContext(ctx, typeQuery)
777
+
if err == nil {
778
+
defer rows.Close()
779
+
stats.ByType = make(map[string]int64)
780
+
for rows.Next() {
781
+
var typ string
782
+
var count int64
783
+
if err := rows.Scan(&typ, &count); err == nil {
784
+
stats.ByType[typ] = count
785
+
}
786
+
}
787
}
788
789
return &stats, err
+53
-35
internal/storage/types.go
+53
-35
internal/storage/types.go
···
15
UpdatedAt time.Time
16
}
17
18
-
// PDS represents a Personal Data Server
19
-
type PDS struct {
20
-
ID int64 // NEW: Primary key
21
-
Endpoint string // UNIQUE but not primary key
22
DiscoveredAt time.Time
23
LastChecked time.Time
24
-
Status int // 0=unknown, 1=online, 2=offline
25
UserCount int64
26
UpdatedAt time.Time
27
}
28
29
-
// PDSUpdate contains fields to update for a PDS
30
-
type PDSUpdate struct {
31
Status int
32
LastChecked time.Time
33
-
ResponseTime float64 // milliseconds as float
34
-
ScanData *PDSScanData
35
}
36
37
-
// PDSScanData contains data from a PDS scan
38
-
type PDSScanData struct {
39
ServerInfo interface{} `json:"server_info,omitempty"`
40
DIDs []string `json:"dids,omitempty"`
41
DIDCount int `json:"did_count"`
42
}
43
44
-
// PDSScan represents a historical PDS scan
45
-
type PDSScan struct {
46
ID int64
47
-
PDSID int64
48
Status int
49
ResponseTime float64
50
-
ScanData *PDSScanData
51
ScannedAt time.Time
52
}
53
···
58
PDSStatusOffline = 2
59
)
60
61
-
// PDSFilter for querying PDS servers
62
-
type PDSFilter struct {
63
Status string
64
MinUserCount int64
65
Limit int
66
Offset int
67
}
68
69
-
// PDSStats contains aggregate statistics about PDS servers
70
-
type PDSStats struct {
71
-
TotalPDS int64 `json:"total_pds"`
72
-
UniquePDS int64 `json:"unique_pds"`
73
-
OnlinePDS int64 `json:"online_pds"`
74
-
OfflinePDS int64 `json:"offline_pds"`
75
-
AvgResponseTime float64 `json:"avg_response_time"`
76
-
TotalDIDs int64 `json:"total_dids"`
77
}
78
79
// PLCMetrics contains metrics from PLC directory scans
80
type PLCMetrics struct {
81
TotalDIDs int64 `json:"total_dids"`
···
88
89
// PLCBundle represents a cached bundle of PLC operations
90
type PLCBundle struct {
91
-
BundleNumber int // PRIMARY KEY
92
StartTime time.Time
93
EndTime time.Time
94
BoundaryCIDs []string
95
DIDs []string
96
-
Hash string // SHA256 of uncompressed JSONL (verifiable against PLC)
97
-
CompressedHash string // SHA256 of compressed file on disk
98
-
CompressedSize int64 // Size of compressed file in bytes
99
-
PrevBundleHash string // Hash of previous bundle (for chain)
100
Compressed bool
101
CreatedAt time.Time
102
}
···
106
return filepath.Join(bundleDir, fmt.Sprintf("%06d.jsonl.zst", b.BundleNumber))
107
}
108
109
-
// OperationCount() returns 1000 (all bundles have exactly 1000 operations)
110
func (b *PLCBundle) OperationCount() int {
111
-
return 1000
112
}
113
114
// MempoolOperation represents an operation waiting to be bundled
115
type MempoolOperation struct {
116
ID int64
117
DID string
118
-
Operation string // JSON of the full operation
119
CID string
120
CreatedAt time.Time
121
AddedAt time.Time
122
}
123
124
-
// ScanCursor now stores bundle number
125
type ScanCursor struct {
126
Source string
127
-
LastBundleNumber int // NEW: Last processed bundle number
128
LastScanTime time.Time
129
RecordsProcessed int64
130
}
···
15
UpdatedAt time.Time
16
}
17
18
+
// Endpoint represents any AT Protocol service endpoint
19
+
type Endpoint struct {
20
+
ID int64
21
+
EndpointType string // "pds", "labeler", etc.
22
+
Endpoint string
23
DiscoveredAt time.Time
24
LastChecked time.Time
25
+
Status int
26
UserCount int64
27
UpdatedAt time.Time
28
}
29
30
+
// EndpointUpdate contains fields to update for an Endpoint
31
+
type EndpointUpdate struct {
32
Status int
33
LastChecked time.Time
34
+
ResponseTime float64
35
+
ScanData *EndpointScanData
36
}
37
38
+
// EndpointScanData contains data from an endpoint scan
39
+
type EndpointScanData struct {
40
ServerInfo interface{} `json:"server_info,omitempty"`
41
DIDs []string `json:"dids,omitempty"`
42
DIDCount int `json:"did_count"`
43
+
Metadata interface{} `json:"metadata,omitempty"` // Type-specific metadata
44
}
45
46
+
// EndpointScan represents a historical endpoint scan
47
+
type EndpointScan struct {
48
ID int64
49
+
EndpointID int64
50
Status int
51
ResponseTime float64
52
+
ScanData *EndpointScanData
53
ScannedAt time.Time
54
}
55
···
60
PDSStatusOffline = 2
61
)
62
63
+
// Endpoint status constants (aliases for compatibility)
64
+
const (
65
+
EndpointStatusUnknown = PDSStatusUnknown
66
+
EndpointStatusOnline = PDSStatusOnline
67
+
EndpointStatusOffline = PDSStatusOffline
68
+
)
69
+
70
+
// EndpointFilter for querying endpoints
71
+
type EndpointFilter struct {
72
+
Type string // "pds", "labeler", etc.
73
Status string
74
MinUserCount int64
75
Limit int
76
Offset int
77
}
78
79
+
// EndpointStats contains aggregate statistics about endpoints
80
+
type EndpointStats struct {
81
+
TotalEndpoints int64 `json:"total_endpoints"`
82
+
ByType map[string]int64 `json:"by_type"`
83
+
OnlineEndpoints int64 `json:"online_endpoints"`
84
+
OfflineEndpoints int64 `json:"offline_endpoints"`
85
+
AvgResponseTime float64 `json:"avg_response_time"`
86
+
TotalDIDs int64 `json:"total_dids"` // Only for PDS
87
}
88
89
+
// Legacy type aliases for backward compatibility in code
90
+
type PDS = Endpoint
91
+
type PDSUpdate = EndpointUpdate
92
+
type PDSScanData = EndpointScanData
93
+
type PDSScan = EndpointScan
94
+
type PDSFilter = EndpointFilter
95
+
type PDSStats = EndpointStats
96
+
97
// PLCMetrics contains metrics from PLC directory scans
98
type PLCMetrics struct {
99
TotalDIDs int64 `json:"total_dids"`
···
106
107
// PLCBundle represents a cached bundle of PLC operations
108
type PLCBundle struct {
109
+
BundleNumber int
110
StartTime time.Time
111
EndTime time.Time
112
BoundaryCIDs []string
113
DIDs []string
114
+
Hash string
115
+
CompressedHash string
116
+
CompressedSize int64
117
+
PrevBundleHash string
118
Compressed bool
119
CreatedAt time.Time
120
}
···
124
return filepath.Join(bundleDir, fmt.Sprintf("%06d.jsonl.zst", b.BundleNumber))
125
}
126
127
+
// OperationCount returns the number of operations in a bundle (always 10000)
128
func (b *PLCBundle) OperationCount() int {
129
+
return 10000
130
}
131
132
// MempoolOperation represents an operation waiting to be bundled
133
type MempoolOperation struct {
134
ID int64
135
DID string
136
+
Operation string
137
CID string
138
CreatedAt time.Time
139
AddedAt time.Time
140
}
141
142
+
// ScanCursor stores scanning progress
143
type ScanCursor struct {
144
Source string
145
+
LastBundleNumber int
146
LastScanTime time.Time
147
RecordsProcessed int64
148
}