update

Changed files
+246 -103
internal
utils
+33 -48
internal/api/handlers.go
··· 56 56 return 57 57 } 58 58 59 - respondJSON(w, pds) 59 + // Optionally include DIDs list or just count 60 + includeDIDs := r.URL.Query().Get("include_dids") == "true" 61 + 62 + response := map[string]interface{}{ 63 + "endpoint": pds.Endpoint, 64 + "discovered_at": pds.DiscoveredAt, 65 + "last_checked": pds.LastChecked, 66 + "status": pds.Status, 67 + "user_count": pds.UserCount, 68 + } 69 + 70 + if includeDIDs { 71 + response["dids"] = pds.DIDs 72 + } 73 + 74 + respondJSON(w, response) 60 75 } 61 76 62 77 func (s *Server) handleGetPDSStats(w http.ResponseWriter, r *http.Request) { ··· 75 90 vars := mux.Vars(r) 76 91 did := vars["did"] 77 92 78 - // Get all bundles and search for DID 79 - bundles, err := s.db.GetAllBundles(ctx) 93 + // Use SQL JSON query to find bundles 94 + bundles, err := s.db.GetBundlesForDID(ctx, did) 80 95 if err != nil { 81 96 http.Error(w, err.Error(), http.StatusInternalServerError) 82 97 return 83 98 } 84 99 85 - // Find bundles containing this DID 86 - var relevantBundles []*storage.PLCBundle 87 - for _, bundle := range bundles { 88 - for _, bundleDID := range bundle.DIDs { 89 - if bundleDID == did { 90 - relevantBundles = append(relevantBundles, bundle) 91 - break 92 - } 93 - } 94 - } 95 - 96 - if len(relevantBundles) == 0 { 100 + if len(bundles) == 0 { 97 101 http.Error(w, "DID not found in bundles", http.StatusNotFound) 98 102 return 99 103 } 100 104 101 - // Load the last bundle and find latest operation 102 - lastBundle := relevantBundles[len(relevantBundles)-1] 105 + // Load the last bundle (most recent) 106 + lastBundle := bundles[len(bundles)-1] 103 107 104 108 operations, err := s.loadBundleOperations(lastBundle.FilePath) 105 109 if err != nil { ··· 129 133 vars := mux.Vars(r) 130 134 did := vars["did"] 131 135 132 - // Get all bundles 133 - bundles, err := s.db.GetAllBundles(ctx) 136 + // Use SQL JSON query 137 + bundles, err := s.db.GetBundlesForDID(ctx, did) 134 138 if err != nil { 135 139 http.Error(w, err.Error(), http.StatusInternalServerError) 136 140 return 137 141 } 138 142 139 - // Find bundles containing this DID 140 - var relevantBundles []*storage.PLCBundle 141 - for _, bundle := range bundles { 142 - for _, bundleDID := range bundle.DIDs { 143 - if bundleDID == did { 144 - relevantBundles = append(relevantBundles, bundle) 145 - break 146 - } 147 - } 148 - } 149 - 150 - if len(relevantBundles) == 0 { 143 + if len(bundles) == 0 { 151 144 http.Error(w, "DID not found in bundles", http.StatusNotFound) 152 145 return 153 146 } ··· 155 148 var allOperations []plc.DIDHistoryEntry 156 149 var currentOp *plc.PLCOperation 157 150 158 - // Load relevant bundles 159 - for _, bundle := range relevantBundles { 151 + // Load all relevant bundles 152 + for _, bundle := range bundles { 160 153 operations, err := s.loadBundleOperations(bundle.FilePath) 161 154 if err != nil { 162 155 log.Printf("Warning: failed to load bundle: %v", err) ··· 194 187 return 195 188 } 196 189 197 - // Get all bundles and find the one we want 198 - bundles, err := s.db.GetAllBundles(ctx) 190 + bundle, err := s.db.GetBundleByID(ctx, bundleID) 199 191 if err != nil { 200 - http.Error(w, err.Error(), http.StatusInternalServerError) 192 + http.Error(w, "bundle not found", http.StatusNotFound) 201 193 return 202 194 } 203 195 204 - for _, bundle := range bundles { 205 - if bundle.ID == bundleID { 206 - respondJSON(w, map[string]interface{}{ 207 - "bundle_id": bundleID, 208 - "did_count": len(bundle.DIDs), 209 - "dids": bundle.DIDs, 210 - }) 211 - return 212 - } 213 - } 214 - 215 - http.Error(w, "bundle not found", http.StatusNotFound) 196 + respondJSON(w, map[string]interface{}{ 197 + "bundle_id": bundleID, 198 + "did_count": len(bundle.DIDs), 199 + "dids": bundle.DIDs, 200 + }) 216 201 } 217 202 218 203 // Helper to load bundle operations
+62
internal/pds/client.go
··· 20 20 } 21 21 } 22 22 23 + // ListReposResponse represents the response from com.atproto.sync.listRepos 24 + type ListReposResponse struct { 25 + Repos []Repo `json:"repos"` 26 + Cursor *string `json:"cursor,omitempty"` 27 + } 28 + 29 + // Repo represents a repository in the list 30 + type Repo struct { 31 + DID string `json:"did"` 32 + Head string `json:"head,omitempty"` 33 + Rev string `json:"rev,omitempty"` 34 + } 35 + 36 + // ListRepos fetches all repositories from a PDS with pagination 37 + func (c *Client) ListRepos(ctx context.Context, endpoint string) ([]string, error) { 38 + var allDIDs []string 39 + var cursor *string 40 + 41 + for { 42 + // Build URL 43 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000", endpoint) 44 + if cursor != nil { 45 + url += fmt.Sprintf("&cursor=%s", *cursor) 46 + } 47 + 48 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 49 + if err != nil { 50 + return nil, err 51 + } 52 + 53 + resp, err := c.httpClient.Do(req) 54 + if err != nil { 55 + return nil, err 56 + } 57 + 58 + if resp.StatusCode != http.StatusOK { 59 + resp.Body.Close() 60 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 61 + } 62 + 63 + var result ListReposResponse 64 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 65 + resp.Body.Close() 66 + return nil, err 67 + } 68 + resp.Body.Close() 69 + 70 + // Collect DIDs 71 + for _, repo := range result.Repos { 72 + allDIDs = append(allDIDs, repo.DID) 73 + } 74 + 75 + // Check if there are more pages 76 + if result.Cursor == nil || *result.Cursor == "" { 77 + break 78 + } 79 + cursor = result.Cursor 80 + } 81 + 82 + return allDIDs, nil 83 + } 84 + 23 85 // DescribeServer fetches com.atproto.server.describeServer 24 86 func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, error) { 25 87 url := fmt.Sprintf("%s/xrpc/com.atproto.server.describeServer", endpoint)
+16 -4
internal/pds/scanner.go
··· 66 66 // Process results 67 67 successCount := 0 68 68 failureCount := 0 69 + totalUsers := int64(0) 69 70 70 71 for status := range results { 71 72 if err := s.db.UpdatePDSStatus(ctx, status.Endpoint, &storage.PDSUpdate{ ··· 74 75 ResponseTime: status.ResponseTime.Milliseconds(), 75 76 ErrorMessage: status.ErrorMessage, 76 77 ServerInfo: status.Description, 78 + DIDs: status.DIDs, // NEW: Store DIDs 77 79 }); err != nil { 78 80 log.Printf("Error updating PDS %s: %v", status.Endpoint, err) 79 81 } 80 82 81 83 if status.Available { 82 84 successCount++ 85 + totalUsers += int64(len(status.DIDs)) 83 86 } else { 84 87 failureCount++ 85 88 } 86 89 } 87 90 88 - log.Printf("PDS scan completed: %d available, %d unavailable in %v", 89 - successCount, failureCount, time.Since(startTime)) 91 + log.Printf("PDS scan completed: %d available, %d unavailable, %d total users in %v", 92 + successCount, failureCount, totalUsers, time.Since(startTime)) 90 93 91 94 return nil 92 95 } ··· 127 130 // Describe server 128 131 desc, err := s.client.DescribeServer(ctx, endpoint) 129 132 if err != nil { 130 - log.Printf("Error describing server %s: %v", endpoint, err) 131 - // Still mark as available if health check passed 133 + log.Printf("Warning: failed to describe server %s: %v", endpoint, err) 132 134 } else { 133 135 status.Description = desc 136 + } 137 + 138 + // List repos (DIDs) - NEW 139 + dids, err := s.client.ListRepos(ctx, endpoint) 140 + if err != nil { 141 + log.Printf("Warning: failed to list repos for %s: %v", endpoint, err) 142 + status.DIDs = []string{} // Empty if failed 143 + } else { 144 + status.DIDs = dids 145 + log.Printf(" → Found %d users on %s", len(dids), endpoint) 134 146 } 135 147 136 148 return status
+1
internal/pds/types.go
··· 27 27 LastChecked time.Time 28 28 ErrorMessage string 29 29 Description *ServerDescription 30 + DIDs []string 30 31 }
+11 -6
internal/plc/scanner.go
··· 259 259 260 260 func (s *Scanner) processBatch(ctx context.Context, operations []PLCOperation) (int64, error) { 261 261 newPDSCount := int64(0) 262 - seenInBatch := make(map[string]bool) 262 + seenInBatch := make(map[string]*PLCOperation) // Store operation, not just bool 263 263 264 264 for _, op := range operations { 265 265 if op.IsNullified() { ··· 271 271 continue 272 272 } 273 273 274 - if seenInBatch[pdsEndpoint] { 275 - continue 274 + // Track first occurrence in this batch 275 + if _, seen := seenInBatch[pdsEndpoint]; !seen { 276 + seenInBatch[pdsEndpoint] = &op 276 277 } 277 - seenInBatch[pdsEndpoint] = true 278 + } 278 279 280 + // Now process unique PDS endpoints 281 + for pdsEndpoint, firstOp := range seenInBatch { 279 282 exists, err := s.db.PDSExists(ctx, pdsEndpoint) 280 283 if err != nil { 281 284 continue ··· 285 288 continue 286 289 } 287 290 291 + // New PDS! Use the operation timestamp as discovered_at 288 292 if err := s.db.UpsertPDS(ctx, &storage.PDS{ 289 293 Endpoint: pdsEndpoint, 290 - DiscoveredAt: time.Now(), 294 + DiscoveredAt: firstOp.CreatedAt, // Use operation timestamp! 291 295 LastChecked: time.Time{}, 292 296 Status: "unknown", 293 297 }); err != nil { ··· 295 299 continue 296 300 } 297 301 298 - log.Printf("✓ Discovered new PDS: %s", pdsEndpoint) 302 + log.Printf("✓ Discovered new PDS: %s (first seen: %s)", 303 + pdsEndpoint, firstOp.CreatedAt.Format("2006-01-02 15:04")) 299 304 newPDSCount++ 300 305 } 301 306
+2 -1
internal/storage/db.go
··· 23 23 // Bundle operations 24 24 CreateBundle(ctx context.Context, bundle *PLCBundle) error 25 25 GetBundle(ctx context.Context, afterTime time.Time) (*PLCBundle, error) 26 + GetBundleByID(ctx context.Context, bundleID int64) (*PLCBundle, error) 26 27 GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) 28 + GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) // JSON query 27 29 GetBundleStats(ctx context.Context) (int64, int64, error) 28 - GetAllBundles(ctx context.Context) ([]*PLCBundle, error) // For DID search 29 30 30 31 // Metrics 31 32 StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error
+111 -43
internal/storage/sqlite.go
··· 43 43 response_time_ms INTEGER, 44 44 error_message TEXT, 45 45 server_info TEXT, 46 + dids TEXT DEFAULT '[]', 46 47 user_count INTEGER DEFAULT 0, 47 48 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 48 49 ); 49 50 50 51 CREATE INDEX IF NOT EXISTS idx_pds_status ON pds_servers(status); 51 52 CREATE INDEX IF NOT EXISTS idx_pds_last_checked ON pds_servers(last_checked); 53 + CREATE INDEX IF NOT EXISTS idx_pds_user_count ON pds_servers(user_count); 52 54 53 55 CREATE TABLE IF NOT EXISTS plc_metrics ( 54 56 id INTEGER PRIMARY KEY AUTOINCREMENT, ··· 93 95 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 94 96 ); 95 97 96 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time); 97 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_created ON plc_bundles(created_at); 98 + -- CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time); 99 + -- CREATE INDEX IF NOT EXISTS idx_plc_bundles_created ON plc_bundles(created_at); 98 100 ` 99 101 100 102 _, err := s.db.Exec(schema) ··· 206 208 return s.scanBundles(rows) 207 209 } 208 210 211 + // GetBundlesForDID finds bundles containing a specific DID using JSON functions 212 + func (s *SQLiteDB) GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) { 213 + query := ` 214 + SELECT id, start_time, end_time, operation_count, dids, file_path, file_size, compressed, created_at 215 + FROM plc_bundles 216 + WHERE EXISTS ( 217 + SELECT 1 FROM json_each(dids) 218 + WHERE json_each.value = ? 219 + ) 220 + ORDER BY start_time ASC 221 + ` 222 + 223 + rows, err := s.db.QueryContext(ctx, query, did) 224 + if err != nil { 225 + return nil, err 226 + } 227 + defer rows.Close() 228 + 229 + return s.scanBundles(rows) 230 + } 231 + 232 + // GetBundleByID retrieves a single bundle by ID 233 + func (s *SQLiteDB) GetBundleByID(ctx context.Context, bundleID int64) (*PLCBundle, error) { 234 + query := ` 235 + SELECT id, start_time, end_time, operation_count, dids, file_path, file_size, compressed, created_at 236 + FROM plc_bundles 237 + WHERE id = ? 238 + ` 239 + 240 + var bundle PLCBundle 241 + var didsJSON string 242 + 243 + err := s.db.QueryRowContext(ctx, query, bundleID).Scan( 244 + &bundle.ID, &bundle.StartTime, &bundle.EndTime, &bundle.OperationCount, 245 + &didsJSON, &bundle.FilePath, &bundle.FileSize, &bundle.Compressed, &bundle.CreatedAt, 246 + ) 247 + if err != nil { 248 + return nil, err 249 + } 250 + 251 + if err := json.Unmarshal([]byte(didsJSON), &bundle.DIDs); err != nil { 252 + return nil, fmt.Errorf("failed to unmarshal DIDs: %w", err) 253 + } 254 + 255 + return &bundle, nil 256 + } 257 + 209 258 // Helper to scan bundle rows 210 259 func (s *SQLiteDB) scanBundles(rows *sql.Rows) ([]*PLCBundle, error) { 211 260 var bundles []*PLCBundle ··· 266 315 return err 267 316 } 268 317 318 + // UpdatePDSStatus updates the status, metrics, and DIDs of a PDS server 319 + func (s *SQLiteDB) UpdatePDSStatus(ctx context.Context, endpoint string, update *PDSUpdate) error { 320 + tx, err := s.db.BeginTx(ctx, nil) 321 + if err != nil { 322 + return err 323 + } 324 + defer tx.Rollback() 325 + 326 + // Marshal server info 327 + var serverInfoJSON []byte 328 + if update.ServerInfo != nil { 329 + serverInfoJSON, _ = json.Marshal(update.ServerInfo) 330 + } 331 + 332 + // Marshal DIDs 333 + var didsJSON []byte 334 + if update.DIDs != nil { 335 + didsJSON, _ = json.Marshal(update.DIDs) 336 + } else { 337 + didsJSON = []byte("[]") 338 + } 339 + 340 + // Calculate user count 341 + userCount := len(update.DIDs) 342 + 343 + query := ` 344 + UPDATE pds_servers 345 + SET status = ?, last_checked = ?, response_time_ms = ?, error_message = ?, 346 + server_info = ?, dids = ?, user_count = ?, updated_at = ? 347 + WHERE endpoint = ? 348 + ` 349 + _, err = tx.ExecContext(ctx, query, 350 + update.Status, update.LastChecked, update.ResponseTime, 351 + update.ErrorMessage, string(serverInfoJSON), string(didsJSON), userCount, time.Now(), endpoint) 352 + if err != nil { 353 + return err 354 + } 355 + 356 + // Insert scan history 357 + scanQuery := ` 358 + INSERT INTO pds_scans (endpoint, status, response_time_ms, error_message, server_info) 359 + VALUES (?, ?, ?, ?, ?) 360 + ` 361 + _, err = tx.ExecContext(ctx, scanQuery, endpoint, update.Status, update.ResponseTime, 362 + update.ErrorMessage, string(serverInfoJSON)) 363 + if err != nil { 364 + return err 365 + } 366 + 367 + return tx.Commit() 368 + } 369 + 269 370 // GetPDS retrieves a single PDS server by endpoint 270 371 func (s *SQLiteDB) GetPDS(ctx context.Context, endpoint string) (*PDS, error) { 271 372 query := ` 272 373 SELECT endpoint, discovered_at, last_checked, status, response_time_ms, 273 - error_message, server_info, user_count 374 + error_message, server_info, dids, user_count 274 375 FROM pds_servers 275 376 WHERE endpoint = ? 276 377 ` ··· 280 381 var responseTime sql.NullInt64 281 382 var errorMsg sql.NullString 282 383 var serverInfo sql.NullString 384 + var didsJSON string 283 385 284 386 err := s.db.QueryRowContext(ctx, query, endpoint).Scan( 285 387 &pds.Endpoint, &pds.DiscoveredAt, &lastChecked, &pds.Status, 286 - &responseTime, &errorMsg, &serverInfo, &pds.UserCount, 388 + &responseTime, &errorMsg, &serverInfo, &didsJSON, &pds.UserCount, 287 389 ) 288 390 if err != nil { 289 391 return nil, err ··· 300 402 } 301 403 if serverInfo.Valid && serverInfo.String != "" { 302 404 json.Unmarshal([]byte(serverInfo.String), &pds.ServerInfo) 405 + } 406 + 407 + // Unmarshal DIDs 408 + if didsJSON != "" { 409 + json.Unmarshal([]byte(didsJSON), &pds.DIDs) 303 410 } 304 411 305 412 return &pds, nil ··· 360 467 } 361 468 362 469 return servers, rows.Err() 363 - } 364 - 365 - // UpdatePDSStatus updates the status and metrics of a PDS server 366 - func (s *SQLiteDB) UpdatePDSStatus(ctx context.Context, endpoint string, update *PDSUpdate) error { 367 - tx, err := s.db.BeginTx(ctx, nil) 368 - if err != nil { 369 - return err 370 - } 371 - defer tx.Rollback() 372 - 373 - // Update main record 374 - var serverInfoJSON []byte 375 - if update.ServerInfo != nil { 376 - serverInfoJSON, _ = json.Marshal(update.ServerInfo) 377 - } 378 - 379 - query := ` 380 - UPDATE pds_servers 381 - SET status = ?, last_checked = ?, response_time_ms = ?, error_message = ?, server_info = ?, updated_at = ? 382 - WHERE endpoint = ? 383 - ` 384 - _, err = tx.ExecContext(ctx, query, update.Status, update.LastChecked, update.ResponseTime, 385 - update.ErrorMessage, string(serverInfoJSON), time.Now(), endpoint) 386 - if err != nil { 387 - return err 388 - } 389 - 390 - // Insert scan history 391 - scanQuery := ` 392 - INSERT INTO pds_scans (endpoint, status, response_time_ms, error_message, server_info) 393 - VALUES (?, ?, ?, ?, ?) 394 - ` 395 - _, err = tx.ExecContext(ctx, scanQuery, endpoint, update.Status, update.ResponseTime, 396 - update.ErrorMessage, string(serverInfoJSON)) 397 - if err != nil { 398 - return err 399 - } 400 - 401 - return tx.Commit() 402 470 } 403 471 404 472 // GetScanCursor retrieves the last scan cursor for a source
+3 -1
internal/storage/types.go
··· 20 20 ResponseTime int64 21 21 ErrorMessage string 22 22 ServerInfo interface{} 23 - UserCount int64 23 + DIDs []string // NEW: List of DIDs hosted on this PDS 24 + UserCount int64 // Calculated from len(DIDs) 24 25 } 25 26 26 27 // PDSUpdate contains fields to update for a PDS ··· 30 31 ResponseTime int64 31 32 ErrorMessage string 32 33 ServerInfo interface{} 34 + DIDs []string 33 35 } 34 36 35 37 // PDSFilter for querying PDS servers
+7
utils/plc-cache-size.sh
··· 1 + #!/bin/sh 2 + total_uncompressed_size=0 3 + for file in plc_cache/*.zst; do 4 + uncompressed_size=$(zstd -l "$file" | awk '/Decompressed Size/ {print $3}') 5 + total_uncompressed_size=$((total_uncompressed_size + uncompressed_size)) 6 + done 7 + echo "Total uncompressed size: $total_uncompressed_size Bytes"