update

Changed files
+517 -17
internal
+84
internal/api/handlers.go
··· 366 return result 367 } 368 369 // ===== DID HANDLERS ===== 370 371 func (s *Server) handleGetDID(w http.ResponseWriter, r *http.Request) {
··· 366 return result 367 } 368 369 + // Get repos for a specific PDS 370 + func (s *Server) handleGetPDSRepos(w http.ResponseWriter, r *http.Request) { 371 + resp := newResponse(w) 372 + vars := mux.Vars(r) 373 + endpoint := "https://" + normalizeEndpoint(vars["endpoint"]) 374 + 375 + pds, err := s.db.GetPDSDetail(r.Context(), endpoint) 376 + if err != nil { 377 + resp.error("PDS not found", http.StatusNotFound) 378 + return 379 + } 380 + 381 + // Parse query parameters 382 + activeOnly := r.URL.Query().Get("active") == "true" 383 + limit := getQueryInt(r, "limit", 100) 384 + offset := getQueryInt(r, "offset", 0) 385 + 386 + // Cap limit at 1000 387 + if limit > 1000 { 388 + limit = 1000 389 + } 390 + 391 + repos, err := s.db.GetPDSRepos(r.Context(), pds.ID, activeOnly, limit, offset) 392 + if err != nil { 393 + resp.error(err.Error(), http.StatusInternalServerError) 394 + return 395 + } 396 + 397 + // Get total from latest scan (same as user_count) 398 + totalRepos := 0 399 + if pds.LatestScan != nil { 400 + totalRepos = pds.LatestScan.UserCount 401 + } 402 + 403 + resp.json(map[string]interface{}{ 404 + "endpoint": pds.Endpoint, 405 + "total_repos": totalRepos, 406 + "returned": len(repos), 407 + "limit": limit, 408 + "offset": offset, 409 + "repos": repos, 410 + }) 411 + } 412 + 413 + // Find which PDS hosts a specific DID 414 + func (s *Server) handleGetDIDRepos(w http.ResponseWriter, r *http.Request) { 415 + resp := newResponse(w) 416 + vars := mux.Vars(r) 417 + did := vars["did"] 418 + 419 + repos, err := s.db.GetReposByDID(r.Context(), did) 420 + if err != nil { 421 + resp.error(err.Error(), http.StatusInternalServerError) 422 + return 423 + } 424 + 425 + resp.json(map[string]interface{}{ 426 + "did": did, 427 + "pds_count": len(repos), 428 + "hosting_on": repos, 429 + }) 430 + } 431 + 432 + // Add to internal/api/handlers.go 433 + func (s *Server) handleGetPDSRepoStats(w http.ResponseWriter, r *http.Request) { 434 + resp := newResponse(w) 435 + vars := mux.Vars(r) 436 + endpoint := "https://" + normalizeEndpoint(vars["endpoint"]) 437 + 438 + pds, err := s.db.GetPDSDetail(r.Context(), endpoint) 439 + if err != nil { 440 + resp.error("PDS not found", http.StatusNotFound) 441 + return 442 + } 443 + 444 + stats, err := s.db.GetPDSRepoStats(r.Context(), pds.ID) 445 + if err != nil { 446 + resp.error(err.Error(), http.StatusInternalServerError) 447 + return 448 + } 449 + 450 + resp.json(stats) 451 + } 452 + 453 // ===== DID HANDLERS ===== 454 455 func (s *Server) handleGetDID(w http.ResponseWriter, r *http.Request) {
+6 -1
internal/api/server.go
··· 60 api.HandleFunc("/endpoints", s.handleGetEndpoints).Methods("GET") 61 api.HandleFunc("/endpoints/stats", s.handleGetEndpointStats).Methods("GET") 62 63 - // NEW: PDS-specific endpoints (virtual, created via JOINs) 64 api.HandleFunc("/pds", s.handleGetPDSList).Methods("GET") 65 api.HandleFunc("/pds/stats", s.handleGetPDSStats).Methods("GET") 66 api.HandleFunc("/pds/countries", s.handleGetCountryLeaderboard).Methods("GET") 67 api.HandleFunc("/pds/versions", s.handleGetVersionStats).Methods("GET") 68 api.HandleFunc("/pds/duplicates", s.handleGetDuplicateEndpoints).Methods("GET") 69 api.HandleFunc("/pds/{endpoint}", s.handleGetPDSDetail).Methods("GET") 70 71 // PLC Bundle routes 72 api.HandleFunc("/plc/bundles", s.handleGetPLCBundles).Methods("GET")
··· 60 api.HandleFunc("/endpoints", s.handleGetEndpoints).Methods("GET") 61 api.HandleFunc("/endpoints/stats", s.handleGetEndpointStats).Methods("GET") 62 63 + //PDS-specific endpoints (virtual, created via JOINs) 64 api.HandleFunc("/pds", s.handleGetPDSList).Methods("GET") 65 api.HandleFunc("/pds/stats", s.handleGetPDSStats).Methods("GET") 66 api.HandleFunc("/pds/countries", s.handleGetCountryLeaderboard).Methods("GET") 67 api.HandleFunc("/pds/versions", s.handleGetVersionStats).Methods("GET") 68 api.HandleFunc("/pds/duplicates", s.handleGetDuplicateEndpoints).Methods("GET") 69 api.HandleFunc("/pds/{endpoint}", s.handleGetPDSDetail).Methods("GET") 70 + 71 + // PDS repos 72 + api.HandleFunc("/pds/{endpoint}/repos", s.handleGetPDSRepos).Methods("GET") 73 + api.HandleFunc("/pds/{endpoint}/repos/stats", s.handleGetPDSRepoStats).Methods("GET") 74 + api.HandleFunc("/pds/repos/{did}", s.handleGetDIDRepos).Methods("GET") 75 76 // PLC Bundle routes 77 api.HandleFunc("/plc/bundles", s.handleGetPLCBundles).Methods("GET")
+10 -10
internal/pds/client.go
··· 28 29 // Repo represents a repository in the list 30 type Repo struct { 31 - DID string `json:"did"` 32 - Head string `json:"head,omitempty"` 33 - Rev string `json:"rev,omitempty"` 34 } 35 36 // ListRepos fetches all repositories from a PDS with pagination 37 - func (c *Client) ListRepos(ctx context.Context, endpoint string) ([]string, error) { 38 - var allDIDs []string 39 var cursor *string 40 41 for { ··· 67 } 68 resp.Body.Close() 69 70 - // Collect DIDs 71 - for _, repo := range result.Repos { 72 - allDIDs = append(allDIDs, repo.DID) 73 - } 74 75 // Check if there are more pages 76 if result.Cursor == nil || *result.Cursor == "" { ··· 79 cursor = result.Cursor 80 } 81 82 - return allDIDs, nil 83 } 84 85 // DescribeServer fetches com.atproto.server.describeServer
··· 28 29 // Repo represents a repository in the list 30 type Repo struct { 31 + DID string `json:"did"` 32 + Head string `json:"head,omitempty"` 33 + Rev string `json:"rev,omitempty"` 34 + Active *bool `json:"active,omitempty"` 35 + Status *string `json:"status,omitempty"` 36 } 37 38 // ListRepos fetches all repositories from a PDS with pagination 39 + func (c *Client) ListRepos(ctx context.Context, endpoint string) ([]Repo, error) { 40 + var allRepos []Repo 41 var cursor *string 42 43 for { ··· 69 } 70 resp.Body.Close() 71 72 + // Collect repos 73 + allRepos = append(allRepos, result.Repos...) 74 75 // Check if there are more pages 76 if result.Cursor == nil || *result.Cursor == "" { ··· 79 cursor = result.Cursor 80 } 81 82 + return allRepos, nil 83 } 84 85 // DescribeServer fetches com.atproto.server.describeServer
+53 -4
internal/pds/scanner.go
··· 158 if err != nil { 159 log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err) 160 } else if desc != nil && desc.DID != "" { 161 - // NEW: Update server DID 162 s.db.UpdateEndpointServerDID(ctx, ep.ID, desc.DID) 163 } 164 165 - dids, err := s.client.ListRepos(ctx, ep.Endpoint) 166 if err != nil { 167 log.Verbose("Warning: failed to list repos for %s: %v", ep.Endpoint, err) 168 - dids = []string{} 169 } 170 171 - // STEP 4: SAVE IMMEDIATELY 172 s.saveScanResult(ctx, ep.ID, &ScanResult{ 173 Status: storage.EndpointStatusOnline, 174 ResponseTime: responseTime, ··· 176 DIDs: dids, 177 Version: version, 178 }) 179 180 // STEP 5: Fetch IP info if needed (async, with backoff) 181 go s.updateIPInfoIfNeeded(ctx, ip)
··· 158 if err != nil { 159 log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err) 160 } else if desc != nil && desc.DID != "" { 161 s.db.UpdateEndpointServerDID(ctx, ep.ID, desc.DID) 162 } 163 164 + // Fetch repos with full info 165 + repoList, err := s.client.ListRepos(ctx, ep.Endpoint) 166 if err != nil { 167 log.Verbose("Warning: failed to list repos for %s: %v", ep.Endpoint, err) 168 + repoList = []Repo{} 169 + } 170 + 171 + // Convert to DIDs for backward compatibility 172 + dids := make([]string, len(repoList)) 173 + for i, repo := range repoList { 174 + dids[i] = repo.DID 175 } 176 177 + // STEP 4: SAVE scan result 178 s.saveScanResult(ctx, ep.ID, &ScanResult{ 179 Status: storage.EndpointStatusOnline, 180 ResponseTime: responseTime, ··· 182 DIDs: dids, 183 Version: version, 184 }) 185 + 186 + // Save repos in batches (only tracks changes) 187 + if len(repoList) > 0 { 188 + batchSize := 10000 189 + 190 + log.Verbose("Processing %d repos for %s (tracking changes only)", len(repoList), ep.Endpoint) 191 + 192 + for i := 0; i < len(repoList); i += batchSize { 193 + end := i + batchSize 194 + if end > len(repoList) { 195 + end = len(repoList) 196 + } 197 + 198 + batch := repoList[i:end] 199 + repoData := make([]storage.PDSRepoData, len(batch)) 200 + 201 + for j, repo := range batch { 202 + active := true 203 + if repo.Active != nil { 204 + active = *repo.Active 205 + } 206 + 207 + status := "" 208 + if repo.Status != nil { 209 + status = *repo.Status 210 + } 211 + 212 + repoData[j] = storage.PDSRepoData{ 213 + DID: repo.DID, 214 + Head: repo.Head, 215 + Rev: repo.Rev, 216 + Active: active, 217 + Status: status, 218 + } 219 + } 220 + 221 + if err := s.db.UpsertPDSRepos(ctx, ep.ID, repoData); err != nil { 222 + log.Error("Failed to save repo batch for endpoint %d: %v", ep.ID, err) 223 + } 224 + } 225 + 226 + log.Verbose("✓ Processed %d repos for %s", len(repoList), ep.Endpoint) 227 + } 228 229 // STEP 5: Fetch IP info if needed (async, with backoff) 230 go s.updateIPInfoIfNeeded(ctx, ip)
+7 -2
internal/storage/db.go
··· 38 GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) 39 GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) 40 GetPDSStats(ctx context.Context) (*PDSStats, error) 41 42 // IP operations (IP as primary key) 43 UpsertIPInfo(ctx context.Context, ip string, ipInfo map[string]interface{}) error ··· 79 AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error 80 GetTotalDIDCount(ctx context.Context) (int64, error) 81 82 - GetCountryLeaderboard(ctx context.Context) ([]*CountryStats, error) 83 - GetVersionStats(ctx context.Context) ([]*VersionStats, error) 84 }
··· 38 GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) 39 GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) 40 GetPDSStats(ctx context.Context) (*PDSStats, error) 41 + GetCountryLeaderboard(ctx context.Context) ([]*CountryStats, error) 42 + GetVersionStats(ctx context.Context) ([]*VersionStats, error) 43 44 // IP operations (IP as primary key) 45 UpsertIPInfo(ctx context.Context, ip string, ipInfo map[string]interface{}) error ··· 81 AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error 82 GetTotalDIDCount(ctx context.Context) (int64, error) 83 84 + // PDS Repo operations 85 + UpsertPDSRepos(ctx context.Context, endpointID int64, repos []PDSRepoData) error 86 + GetPDSRepos(ctx context.Context, endpointID int64, activeOnly bool, limit int, offset int) ([]*PDSRepo, error) // Updated 87 + GetReposByDID(ctx context.Context, did string) ([]*PDSRepo, error) 88 + GetPDSRepoStats(ctx context.Context, endpointID int64) (map[string]interface{}, error) 89 }
+336
internal/storage/postgres.go
··· 194 195 CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers); 196 CREATE INDEX IF NOT EXISTS idx_dids_created_at ON dids(created_at); 197 ` 198 199 _, err := p.db.Exec(schema) ··· 1916 } 1917 return "0%" 1918 }
··· 194 195 CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers); 196 CREATE INDEX IF NOT EXISTS idx_dids_created_at ON dids(created_at); 197 + 198 + -- PDS Repositories table 199 + CREATE TABLE IF NOT EXISTS pds_repos ( 200 + id BIGSERIAL PRIMARY KEY, 201 + endpoint_id BIGINT NOT NULL, 202 + did TEXT NOT NULL, 203 + head TEXT, 204 + rev TEXT, 205 + active BOOLEAN DEFAULT true, 206 + status TEXT, 207 + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 208 + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 209 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 210 + FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE, 211 + UNIQUE(endpoint_id, did) 212 + ); 213 + 214 + CREATE INDEX IF NOT EXISTS idx_pds_repos_endpoint ON pds_repos(endpoint_id); 215 + CREATE INDEX IF NOT EXISTS idx_pds_repos_endpoint_id_desc ON pds_repos(endpoint_id, id DESC); 216 + CREATE INDEX IF NOT EXISTS idx_pds_repos_did ON pds_repos(did); 217 + CREATE INDEX IF NOT EXISTS idx_pds_repos_active ON pds_repos(active); 218 + CREATE INDEX IF NOT EXISTS idx_pds_repos_status ON pds_repos(status); 219 + CREATE INDEX IF NOT EXISTS idx_pds_repos_last_seen ON pds_repos(last_seen DESC); 220 ` 221 222 _, err := p.db.Exec(schema) ··· 1939 } 1940 return "0%" 1941 } 1942 + 1943 + func (p *PostgresDB) UpsertPDSRepos(ctx context.Context, endpointID int64, repos []PDSRepoData) error { 1944 + if len(repos) == 0 { 1945 + return nil 1946 + } 1947 + 1948 + // Step 1: Load all existing repos for this endpoint into memory 1949 + query := ` 1950 + SELECT did, head, rev, active, status 1951 + FROM pds_repos 1952 + WHERE endpoint_id = $1 1953 + ` 1954 + 1955 + rows, err := p.db.QueryContext(ctx, query, endpointID) 1956 + if err != nil { 1957 + return err 1958 + } 1959 + 1960 + existingRepos := make(map[string]*PDSRepo) 1961 + for rows.Next() { 1962 + var repo PDSRepo 1963 + var head, rev, status sql.NullString 1964 + 1965 + err := rows.Scan(&repo.DID, &head, &rev, &repo.Active, &status) 1966 + if err != nil { 1967 + rows.Close() 1968 + return err 1969 + } 1970 + 1971 + if head.Valid { 1972 + repo.Head = head.String 1973 + } 1974 + if rev.Valid { 1975 + repo.Rev = rev.String 1976 + } 1977 + if status.Valid { 1978 + repo.Status = status.String 1979 + } 1980 + 1981 + existingRepos[repo.DID] = &repo 1982 + } 1983 + rows.Close() 1984 + 1985 + if err := rows.Err(); err != nil { 1986 + return err 1987 + } 1988 + 1989 + // Step 2: Compare and collect changes 1990 + var newRepos []PDSRepoData 1991 + var changedRepos []PDSRepoData 1992 + 1993 + for _, repo := range repos { 1994 + existing, exists := existingRepos[repo.DID] 1995 + if !exists { 1996 + // New repo 1997 + newRepos = append(newRepos, repo) 1998 + } else if existing.Head != repo.Head || 1999 + existing.Rev != repo.Rev || 2000 + existing.Active != repo.Active || 2001 + existing.Status != repo.Status { 2002 + // Repo changed 2003 + changedRepos = append(changedRepos, repo) 2004 + } 2005 + } 2006 + 2007 + // Log comparison results 2008 + log.Verbose("UpsertPDSRepos: endpoint_id=%d, total=%d, existing=%d, new=%d, changed=%d, unchanged=%d", 2009 + endpointID, len(repos), len(existingRepos), len(newRepos), len(changedRepos), 2010 + len(repos)-len(newRepos)-len(changedRepos)) 2011 + 2012 + // If nothing changed, return early 2013 + if len(newRepos) == 0 && len(changedRepos) == 0 { 2014 + log.Verbose("UpsertPDSRepos: endpoint_id=%d, no changes detected, skipping database operations", endpointID) 2015 + return nil 2016 + } 2017 + 2018 + // Step 3: Execute batched operations 2019 + conn, err := p.pool.Acquire(ctx) 2020 + if err != nil { 2021 + return err 2022 + } 2023 + defer conn.Release() 2024 + 2025 + tx, err := conn.Begin(ctx) 2026 + if err != nil { 2027 + return err 2028 + } 2029 + defer tx.Rollback(ctx) 2030 + 2031 + // Insert new repos 2032 + if len(newRepos) > 0 { 2033 + _, err := tx.Exec(ctx, ` 2034 + CREATE TEMP TABLE temp_new_repos ( 2035 + did TEXT, 2036 + head TEXT, 2037 + rev TEXT, 2038 + active BOOLEAN, 2039 + status TEXT 2040 + ) ON COMMIT DROP 2041 + `) 2042 + if err != nil { 2043 + return err 2044 + } 2045 + 2046 + _, err = tx.Conn().CopyFrom( 2047 + ctx, 2048 + pgx.Identifier{"temp_new_repos"}, 2049 + []string{"did", "head", "rev", "active", "status"}, 2050 + pgx.CopyFromSlice(len(newRepos), func(i int) ([]interface{}, error) { 2051 + repo := newRepos[i] 2052 + return []interface{}{repo.DID, repo.Head, repo.Rev, repo.Active, repo.Status}, nil 2053 + }), 2054 + ) 2055 + if err != nil { 2056 + return err 2057 + } 2058 + 2059 + result, err := tx.Exec(ctx, ` 2060 + INSERT INTO pds_repos (endpoint_id, did, head, rev, active, status, first_seen, last_seen) 2061 + SELECT $1, did, head, rev, active, status, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP 2062 + FROM temp_new_repos 2063 + `, endpointID) 2064 + if err != nil { 2065 + return err 2066 + } 2067 + 2068 + log.Verbose("UpsertPDSRepos: endpoint_id=%d, inserted %d new repos", endpointID, result.RowsAffected()) 2069 + } 2070 + 2071 + // Update changed repos 2072 + if len(changedRepos) > 0 { 2073 + _, err := tx.Exec(ctx, ` 2074 + CREATE TEMP TABLE temp_changed_repos ( 2075 + did TEXT, 2076 + head TEXT, 2077 + rev TEXT, 2078 + active BOOLEAN, 2079 + status TEXT 2080 + ) ON COMMIT DROP 2081 + `) 2082 + if err != nil { 2083 + return err 2084 + } 2085 + 2086 + _, err = tx.Conn().CopyFrom( 2087 + ctx, 2088 + pgx.Identifier{"temp_changed_repos"}, 2089 + []string{"did", "head", "rev", "active", "status"}, 2090 + pgx.CopyFromSlice(len(changedRepos), func(i int) ([]interface{}, error) { 2091 + repo := changedRepos[i] 2092 + return []interface{}{repo.DID, repo.Head, repo.Rev, repo.Active, repo.Status}, nil 2093 + }), 2094 + ) 2095 + if err != nil { 2096 + return err 2097 + } 2098 + 2099 + result, err := tx.Exec(ctx, ` 2100 + UPDATE pds_repos 2101 + SET head = t.head, 2102 + rev = t.rev, 2103 + active = t.active, 2104 + status = t.status, 2105 + last_seen = CURRENT_TIMESTAMP, 2106 + updated_at = CURRENT_TIMESTAMP 2107 + FROM temp_changed_repos t 2108 + WHERE pds_repos.endpoint_id = $1 2109 + AND pds_repos.did = t.did 2110 + `, endpointID) 2111 + if err != nil { 2112 + return err 2113 + } 2114 + 2115 + log.Verbose("UpsertPDSRepos: endpoint_id=%d, updated %d changed repos", endpointID, result.RowsAffected()) 2116 + } 2117 + 2118 + if err := tx.Commit(ctx); err != nil { 2119 + return err 2120 + } 2121 + 2122 + log.Verbose("UpsertPDSRepos: endpoint_id=%d, transaction committed successfully", endpointID) 2123 + return nil 2124 + } 2125 + 2126 + func (p *PostgresDB) GetPDSRepos(ctx context.Context, endpointID int64, activeOnly bool, limit int, offset int) ([]*PDSRepo, error) { 2127 + query := ` 2128 + SELECT id, endpoint_id, did, head, rev, active, status, first_seen, last_seen, updated_at 2129 + FROM pds_repos 2130 + WHERE endpoint_id = $1 2131 + ` 2132 + 2133 + args := []interface{}{endpointID} 2134 + argIdx := 2 2135 + 2136 + if activeOnly { 2137 + query += " AND active = true" 2138 + } 2139 + 2140 + // Order by id (primary key) - fastest 2141 + query += " ORDER BY id DESC" 2142 + 2143 + if limit > 0 { 2144 + query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1) 2145 + args = append(args, limit, offset) 2146 + } 2147 + 2148 + rows, err := p.db.QueryContext(ctx, query, args...) 2149 + if err != nil { 2150 + return nil, err 2151 + } 2152 + defer rows.Close() 2153 + 2154 + var repos []*PDSRepo 2155 + for rows.Next() { 2156 + var repo PDSRepo 2157 + var head, rev, status sql.NullString 2158 + 2159 + err := rows.Scan( 2160 + &repo.ID, &repo.EndpointID, &repo.DID, &head, &rev, 2161 + &repo.Active, &status, &repo.FirstSeen, &repo.LastSeen, &repo.UpdatedAt, 2162 + ) 2163 + if err != nil { 2164 + return nil, err 2165 + } 2166 + 2167 + if head.Valid { 2168 + repo.Head = head.String 2169 + } 2170 + if rev.Valid { 2171 + repo.Rev = rev.String 2172 + } 2173 + if status.Valid { 2174 + repo.Status = status.String 2175 + } 2176 + 2177 + repos = append(repos, &repo) 2178 + } 2179 + 2180 + return repos, rows.Err() 2181 + } 2182 + 2183 + func (p *PostgresDB) GetReposByDID(ctx context.Context, did string) ([]*PDSRepo, error) { 2184 + query := ` 2185 + SELECT id, endpoint_id, did, head, rev, active, status, first_seen, last_seen, updated_at 2186 + FROM pds_repos 2187 + WHERE did = $1 2188 + ORDER BY last_seen DESC 2189 + ` 2190 + 2191 + rows, err := p.db.QueryContext(ctx, query, did) 2192 + if err != nil { 2193 + return nil, err 2194 + } 2195 + defer rows.Close() 2196 + 2197 + var repos []*PDSRepo 2198 + for rows.Next() { 2199 + var repo PDSRepo 2200 + var head, rev, status sql.NullString 2201 + 2202 + err := rows.Scan( 2203 + &repo.ID, &repo.EndpointID, &repo.DID, &head, &rev, 2204 + &repo.Active, &status, &repo.FirstSeen, &repo.LastSeen, &repo.UpdatedAt, 2205 + ) 2206 + if err != nil { 2207 + return nil, err 2208 + } 2209 + 2210 + if head.Valid { 2211 + repo.Head = head.String 2212 + } 2213 + if rev.Valid { 2214 + repo.Rev = rev.String 2215 + } 2216 + if status.Valid { 2217 + repo.Status = status.String 2218 + } 2219 + 2220 + repos = append(repos, &repo) 2221 + } 2222 + 2223 + return repos, rows.Err() 2224 + } 2225 + 2226 + func (p *PostgresDB) GetPDSRepoStats(ctx context.Context, endpointID int64) (map[string]interface{}, error) { 2227 + query := ` 2228 + SELECT 2229 + COUNT(*) as total_repos, 2230 + COUNT(*) FILTER (WHERE active = true) as active_repos, 2231 + COUNT(*) FILTER (WHERE active = false) as inactive_repos, 2232 + COUNT(*) FILTER (WHERE status IS NOT NULL AND status != '') as repos_with_status, 2233 + COUNT(*) FILTER (WHERE updated_at > CURRENT_TIMESTAMP - INTERVAL '1 hour') as recent_changes 2234 + FROM pds_repos 2235 + WHERE endpoint_id = $1 2236 + ` 2237 + 2238 + var totalRepos, activeRepos, inactiveRepos, reposWithStatus, recentChanges int64 2239 + 2240 + err := p.db.QueryRowContext(ctx, query, endpointID).Scan( 2241 + &totalRepos, &activeRepos, &inactiveRepos, &reposWithStatus, &recentChanges, 2242 + ) 2243 + if err != nil { 2244 + return nil, err 2245 + } 2246 + 2247 + return map[string]interface{}{ 2248 + "total_repos": totalRepos, 2249 + "active_repos": activeRepos, 2250 + "inactive_repos": inactiveRepos, 2251 + "repos_with_status": reposWithStatus, 2252 + "recent_changes": recentChanges, 2253 + }, nil 2254 + }
+21
internal/storage/types.go
··· 252 FirstSeen time.Time `json:"first_seen"` 253 LastSeen time.Time `json:"last_seen"` 254 }
··· 252 FirstSeen time.Time `json:"first_seen"` 253 LastSeen time.Time `json:"last_seen"` 254 } 255 + 256 + type PDSRepo struct { 257 + ID int64 `json:"id"` 258 + EndpointID int64 `json:"endpoint_id"` 259 + DID string `json:"did"` 260 + Head string `json:"head,omitempty"` 261 + Rev string `json:"rev,omitempty"` 262 + Active bool `json:"active"` 263 + Status string `json:"status,omitempty"` 264 + FirstSeen time.Time `json:"first_seen"` 265 + LastSeen time.Time `json:"last_seen"` 266 + UpdatedAt time.Time `json:"updated_at"` 267 + } 268 + 269 + type PDSRepoData struct { 270 + DID string 271 + Head string 272 + Rev string 273 + Active bool 274 + Status string 275 + }