update

Changed files
+225
internal
+22
config.sample.yaml
··· 1 + database: 2 + type: "postgres" # or "sqlite" 3 + path: "postgres://atscanner:YOUR_PASSWORD@localhost:5432/atscanner?sslmode=disable" 4 + # For SQLite: path: "atscan.db" 5 + 6 + plc: 7 + directory_url: "https://plc.directory" 8 + scan_interval: "5s" 9 + bundle_dir: "./plc_bundles" 10 + use_cache: true 11 + index_dids: true 12 + 13 + pds: 14 + scan_interval: "30m" 15 + timeout: "30s" 16 + workers: 20 17 + recheck_interval: "1.5h" 18 + scan_retention: 20 19 + 20 + api: 21 + host: "0.0.0.0" 22 + port: 8080
+108
internal/api/handlers.go
··· 450 450 resp.json(stats) 451 451 } 452 452 453 + // ===== GLOBAL DID HANDLER ===== 454 + 455 + // extractHandle safely extracts the handle from a PLC operation 456 + func extractHandle(op *plc.PLCOperation) string { 457 + if op == nil || op.Operation == nil { 458 + return "" 459 + } 460 + 461 + // Get "alsoKnownAs" 462 + aka, ok := op.Operation["alsoKnownAs"].([]interface{}) 463 + if !ok { 464 + return "" 465 + } 466 + 467 + // Find the handle (e.g., "at://handle.bsky.social") 468 + for _, item := range aka { 469 + if handle, ok := item.(string); ok { 470 + if strings.HasPrefix(handle, "at://") { 471 + return strings.TrimPrefix(handle, "at://") 472 + } 473 + } 474 + } 475 + return "" 476 + } 477 + 478 + // extractPDS safely extracts the PDS endpoint from a PLC operation 479 + func extractPDS(op *plc.PLCOperation) string { 480 + if op == nil || op.Operation == nil { 481 + return "" 482 + } 483 + 484 + // Get "services" 485 + services, ok := op.Operation["services"].(map[string]interface{}) 486 + if !ok { 487 + return "" 488 + } 489 + 490 + // Get "atproto_pds" 491 + pdsService, ok := services["atproto_pds"].(map[string]interface{}) 492 + if !ok { 493 + return "" 494 + } 495 + 496 + // Get "endpoint" 497 + if endpoint, ok := pdsService["endpoint"].(string); ok { 498 + return endpoint 499 + } 500 + 501 + return "" 502 + } 503 + 504 + // handleGetGlobalDID provides a consolidated view of a DID 505 + func (s *Server) handleGetGlobalDID(w http.ResponseWriter, r *http.Request) { 506 + resp := newResponse(w) 507 + vars := mux.Vars(r) 508 + did := vars["did"] 509 + ctx := r.Context() 510 + 511 + // --- 1. Get Combined DID Info (from dids and pds_repos) --- 512 + didInfo, err := s.db.GetGlobalDIDInfo(ctx, did) 513 + if err != nil { 514 + if err == sql.ErrNoRows { 515 + // Check if DID indexing is disabled (from config) 516 + if !s.plcIndexDIDs { 517 + resp.error("DID not found. Note: DID indexing is disabled in configuration.", http.StatusNotFound) 518 + } else { 519 + resp.error("DID not found in PLC index.", http.StatusNotFound) 520 + } 521 + } else { 522 + resp.error(err.Error(), http.StatusInternalServerError) 523 + } 524 + return 525 + } 526 + 527 + // --- 2. Get Latest PLC Operation (from plc_bundles) --- 528 + var latestOperation *plc.PLCOperation 529 + if len(didInfo.BundleNumbers) > 0 { 530 + lastBundleNum := didInfo.BundleNumbers[len(didInfo.BundleNumbers)-1] 531 + ops, err := s.bundleManager.LoadBundleOperations(ctx, lastBundleNum) 532 + if err != nil { 533 + log.Error("Failed to load bundle %d for DID %s: %v", lastBundleNum, did, err) 534 + } else { 535 + // Find latest operation for this DID (in reverse) 536 + for i := len(ops) - 1; i >= 0; i-- { 537 + if ops[i].DID == did { 538 + latestOperation = &ops[i] 539 + break 540 + } 541 + } 542 + } 543 + } 544 + 545 + // --- 3. Extract Handle and PDS from latest operation --- 546 + currentHandle := extractHandle(latestOperation) 547 + currentPDS := extractPDS(latestOperation) 548 + 549 + // --- 4. Combine and Respond --- 550 + resp.json(map[string]interface{}{ 551 + "did": didInfo.DID, 552 + "handle": currentHandle, // NEW 553 + "current_pds": currentPDS, // NEW 554 + "plc_index_created_at": didInfo.CreatedAt, 555 + "plc_bundle_history": didInfo.BundleNumbers, 556 + "pds_hosting_on": didInfo.HostingOn, // This is the historical list from pds_repos 557 + "latest_plc_operation": latestOperation, 558 + }) 559 + } 560 + 453 561 // ===== DID HANDLERS ===== 454 562 455 563 func (s *Server) handleGetDID(w http.ResponseWriter, r *http.Request) {
+5
internal/api/server.go
··· 21 21 plcClient *plc.Client 22 22 plcBundleDir string 23 23 bundleManager *plc.BundleManager 24 + plcIndexDIDs bool 24 25 } 25 26 26 27 func NewServer(db storage.Database, apiCfg config.APIConfig, plcCfg config.PLCConfig) *Server { ··· 32 33 plcClient: plc.NewClient(plcCfg.DirectoryURL), 33 34 plcBundleDir: plcCfg.BundleDir, 34 35 bundleManager: bundleManager, 36 + plcIndexDIDs: plcCfg.IndexDIDs, 35 37 } 36 38 37 39 s.setupRoutes() ··· 72 74 api.HandleFunc("/pds/{endpoint}/repos", s.handleGetPDSRepos).Methods("GET") 73 75 api.HandleFunc("/pds/{endpoint}/repos/stats", s.handleGetPDSRepoStats).Methods("GET") 74 76 api.HandleFunc("/pds/repos/{did}", s.handleGetDIDRepos).Methods("GET") 77 + 78 + // Global DID route 79 + api.HandleFunc("/did/{did}", s.handleGetGlobalDID).Methods("GET") 75 80 76 81 // PLC Bundle routes 77 82 api.HandleFunc("/plc/bundles", s.handleGetPLCBundles).Methods("GET")
+1
internal/storage/db.go
··· 78 78 // DID operations 79 79 UpsertDID(ctx context.Context, did string, bundleNum int) error 80 80 GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) 81 + GetGlobalDIDInfo(ctx context.Context, did string) (*GlobalDIDInfo, error) 81 82 AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error 82 83 GetTotalDIDCount(ctx context.Context) (int64, error) 83 84
+83
internal/storage/postgres.go
··· 1674 1674 return &record, nil 1675 1675 } 1676 1676 1677 + // GetGlobalDIDInfo retrieves consolidated DID info from 'dids' and 'pds_repos' 1678 + func (p *PostgresDB) GetGlobalDIDInfo(ctx context.Context, did string) (*GlobalDIDInfo, error) { 1679 + // This query now includes a CTE to find primary endpoints and filters 1680 + // the 'hosting_on' aggregation to only include repos from those endpoints. 1681 + query := ` 1682 + WITH primary_endpoints AS ( 1683 + -- First, get the ID of every "primary" PDS. 1684 + -- A primary PDS is the one with the earliest 'discovered_at' timestamp 1685 + -- for a given 'server_did'. 1686 + SELECT DISTINCT ON (COALESCE(server_did, id::text)) 1687 + id 1688 + FROM endpoints 1689 + WHERE endpoint_type = 'pds' 1690 + ORDER BY COALESCE(server_did, id::text), discovered_at ASC 1691 + ) 1692 + SELECT 1693 + d.did, 1694 + d.bundle_numbers, 1695 + d.created_at, 1696 + COALESCE( 1697 + jsonb_agg( 1698 + jsonb_build_object( 1699 + 'id', pr.id, 1700 + 'endpoint_id', pr.endpoint_id, 1701 + 'endpoint', e.endpoint, 1702 + 'did', pr.did, 1703 + 'head', pr.head, 1704 + 'rev', pr.rev, 1705 + 'active', pr.active, 1706 + 'status', pr.status, 1707 + 'first_seen', pr.first_seen AT TIME ZONE 'UTC', 1708 + 'last_seen', pr.last_seen AT TIME ZONE 'UTC', 1709 + 'updated_at', pr.updated_at AT TIME ZONE 'UTC' 1710 + ) 1711 + ORDER BY pr.last_seen DESC 1712 + ) FILTER ( 1713 + -- This filter clause ensures we only aggregate repos 1714 + -- where the endpoint_id is in our list of primary endpoints. 1715 + WHERE pr.id IS NOT NULL AND pe.id IS NOT NULL 1716 + ), 1717 + '[]'::jsonb 1718 + ) AS hosting_on 1719 + FROM 1720 + dids d 1721 + LEFT JOIN 1722 + pds_repos pr ON d.did = pr.did 1723 + LEFT JOIN 1724 + endpoints e ON pr.endpoint_id = e.id 1725 + LEFT JOIN 1726 + -- We join the primary_endpoints CTE. 'pe.id' will be NON-NULL 1727 + -- only if the repo's endpoint_id (pr.endpoint_id) is a primary one. 1728 + primary_endpoints pe ON pr.endpoint_id = pe.id 1729 + WHERE 1730 + d.did = $1 1731 + GROUP BY 1732 + d.did, d.bundle_numbers, d.created_at 1733 + ` 1734 + 1735 + var info GlobalDIDInfo 1736 + var bundleNumbersJSON []byte 1737 + var hostingOnJSON []byte 1738 + 1739 + err := p.db.QueryRowContext(ctx, query, did).Scan( 1740 + &info.DID, 1741 + &bundleNumbersJSON, 1742 + &info.CreatedAt, 1743 + &hostingOnJSON, 1744 + ) 1745 + if err != nil { 1746 + return nil, err // This will correctly be sql.ErrNoRows if not in 'dids' 1747 + } 1748 + 1749 + if err := json.Unmarshal(bundleNumbersJSON, &info.BundleNumbers); err != nil { 1750 + return nil, fmt.Errorf("failed to unmarshal bundle_numbers: %w", err) 1751 + } 1752 + 1753 + if err := json.Unmarshal(hostingOnJSON, &info.HostingOn); err != nil { 1754 + return nil, fmt.Errorf("failed to unmarshal hosting_on: %w", err) 1755 + } 1756 + 1757 + return &info, nil 1758 + } 1759 + 1677 1760 func (p *PostgresDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error { 1678 1761 if len(dids) == 0 { 1679 1762 return nil
+6
internal/storage/types.go
··· 256 256 type PDSRepo struct { 257 257 ID int64 `json:"id"` 258 258 EndpointID int64 `json:"endpoint_id"` 259 + Endpoint string `json:"endpoint,omitempty"` 259 260 DID string `json:"did"` 260 261 Head string `json:"head,omitempty"` 261 262 Rev string `json:"rev,omitempty"` ··· 273 274 Active bool 274 275 Status string 275 276 } 277 + 278 + type GlobalDIDInfo struct { 279 + DIDRecord 280 + HostingOn []*PDSRepo `json:"hosting_on"` 281 + }