A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

fix bug creating layer records on non-tagged pushes

evan.jarrett.net a448e825 487fc8a4

verified
Changed files
+630 -3
cmd
usage-report
pkg
appview
hold
oci
+624
cmd/usage-report/main.go
··· 1 + // usage-report queries a hold service and generates a storage usage report 2 + // grouped by user, with unique layers and totals. 3 + // 4 + // Usage: 5 + // 6 + // go run ./cmd/usage-report --hold https://hold01.atcr.io 7 + // go run ./cmd/usage-report --hold https://hold01.atcr.io --from-manifests 8 + package main 9 + 10 + import ( 11 + "encoding/json" 12 + "flag" 13 + "fmt" 14 + "io" 15 + "net/http" 16 + "net/url" 17 + "os" 18 + "sort" 19 + "strings" 20 + "time" 21 + ) 22 + 23 + // LayerRecord matches the io.atcr.hold.layer record structure 24 + type LayerRecord struct { 25 + Type string `json:"$type"` 26 + Digest string `json:"digest"` 27 + Size int64 `json:"size"` 28 + MediaType string `json:"mediaType"` 29 + Manifest string `json:"manifest"` 30 + UserDID string `json:"userDid"` 31 + CreatedAt string `json:"createdAt"` 32 + } 33 + 34 + // ManifestRecord matches the io.atcr.manifest record structure 35 + type ManifestRecord struct { 36 + Type string `json:"$type"` 37 + Repository string `json:"repository"` 38 + Digest string `json:"digest"` 39 + HoldDID string `json:"holdDid"` 40 + Config *struct { 41 + Digest string `json:"digest"` 42 + Size int64 `json:"size"` 43 + } `json:"config"` 44 + Layers []struct { 45 + Digest string `json:"digest"` 46 + Size int64 `json:"size"` 47 + MediaType string `json:"mediaType"` 48 + } `json:"layers"` 49 + Manifests []struct { 50 + Digest string `json:"digest"` 51 + Size int64 `json:"size"` 52 + } `json:"manifests"` 53 + CreatedAt string `json:"createdAt"` 54 + } 55 + 56 + // CrewRecord matches the io.atcr.hold.crew record structure 57 + type CrewRecord struct { 58 + Member string `json:"member"` 59 + Role string `json:"role"` 60 + Permissions []string `json:"permissions"` 61 + AddedAt string `json:"addedAt"` 62 + } 63 + 64 + // ListRecordsResponse is the response from com.atproto.repo.listRecords 65 + type ListRecordsResponse struct { 66 + Records []struct { 67 + URI string `json:"uri"` 68 + CID string `json:"cid"` 69 + Value json.RawMessage `json:"value"` 70 + } `json:"records"` 71 + Cursor string `json:"cursor,omitempty"` 72 + } 73 + 74 + // UserUsage tracks storage for a single user 75 + type UserUsage struct { 76 + DID string 77 + Handle string 78 + UniqueLayers map[string]int64 // digest -> size 79 + TotalSize int64 80 + LayerCount int 81 + Repositories map[string]bool // unique repos 82 + } 83 + 84 + var client = &http.Client{Timeout: 30 * time.Second} 85 + 86 + func main() { 87 + holdURL := flag.String("hold", "https://hold01.atcr.io", "Hold service URL") 88 + fromManifests := flag.Bool("from-manifests", false, "Calculate usage from user manifests instead of hold layer records (more accurate but slower)") 89 + flag.Parse() 90 + 91 + // Normalize URL 92 + baseURL := strings.TrimSuffix(*holdURL, "/") 93 + 94 + fmt.Printf("Querying %s...\n\n", baseURL) 95 + 96 + // First, get the hold's DID 97 + holdDID, err := getHoldDID(baseURL) 98 + if err != nil { 99 + fmt.Fprintf(os.Stderr, "Failed to get hold DID: %v\n", err) 100 + os.Exit(1) 101 + } 102 + fmt.Printf("Hold DID: %s\n\n", holdDID) 103 + 104 + var userUsage map[string]*UserUsage 105 + 106 + if *fromManifests { 107 + fmt.Println("=== Calculating from user manifests (bypasses layer record bug) ===") 108 + userUsage, err = calculateFromManifests(baseURL, holdDID) 109 + } else { 110 + fmt.Println("=== Calculating from hold layer records ===") 111 + fmt.Println("NOTE: May undercount app-password users due to layer record bug") 112 + fmt.Println(" Use --from-manifests for more accurate results") 113 + 114 + userUsage, err = calculateFromLayerRecords(baseURL, holdDID) 115 + } 116 + 117 + if err != nil { 118 + fmt.Fprintf(os.Stderr, "Failed to calculate usage: %v\n", err) 119 + os.Exit(1) 120 + } 121 + 122 + // Resolve DIDs to handles 123 + fmt.Println("\n\nResolving DIDs to handles...") 124 + for _, usage := range userUsage { 125 + handle, err := resolveDIDToHandle(usage.DID) 126 + if err != nil { 127 + usage.Handle = usage.DID 128 + } else { 129 + usage.Handle = handle 130 + } 131 + } 132 + 133 + // Convert to slice and sort by total size (descending) 134 + var sorted []*UserUsage 135 + for _, u := range userUsage { 136 + sorted = append(sorted, u) 137 + } 138 + sort.Slice(sorted, func(i, j int) bool { 139 + return sorted[i].TotalSize > sorted[j].TotalSize 140 + }) 141 + 142 + // Print report 143 + fmt.Println("\n========================================") 144 + fmt.Println("STORAGE USAGE REPORT") 145 + fmt.Println("========================================") 146 + 147 + var grandTotal int64 148 + var grandLayers int 149 + for _, u := range sorted { 150 + grandTotal += u.TotalSize 151 + grandLayers += u.LayerCount 152 + } 153 + 154 + fmt.Printf("\nTotal Users: %d\n", len(sorted)) 155 + fmt.Printf("Total Unique Layers: %d\n", grandLayers) 156 + fmt.Printf("Total Storage: %s\n\n", humanSize(grandTotal)) 157 + 158 + fmt.Println("BY USER (sorted by storage):") 159 + fmt.Println("----------------------------------------") 160 + for i, u := range sorted { 161 + fmt.Printf("%3d. %s\n", i+1, u.Handle) 162 + fmt.Printf(" DID: %s\n", u.DID) 163 + fmt.Printf(" Unique Layers: %d\n", u.LayerCount) 164 + fmt.Printf(" Total Size: %s\n", humanSize(u.TotalSize)) 165 + if len(u.Repositories) > 0 { 166 + var repos []string 167 + for r := range u.Repositories { 168 + repos = append(repos, r) 169 + } 170 + sort.Strings(repos) 171 + fmt.Printf(" Repositories: %s\n", strings.Join(repos, ", ")) 172 + } 173 + pct := float64(0) 174 + if grandTotal > 0 { 175 + pct = float64(u.TotalSize) / float64(grandTotal) * 100 176 + } 177 + fmt.Printf(" Share: %.1f%%\n\n", pct) 178 + } 179 + 180 + // Output CSV format for easy analysis 181 + fmt.Println("\n========================================") 182 + fmt.Println("CSV FORMAT") 183 + fmt.Println("========================================") 184 + fmt.Println("handle,did,unique_layers,total_bytes,total_human,repositories") 185 + for _, u := range sorted { 186 + var repos []string 187 + for r := range u.Repositories { 188 + repos = append(repos, r) 189 + } 190 + sort.Strings(repos) 191 + fmt.Printf("%s,%s,%d,%d,%s,\"%s\"\n", u.Handle, u.DID, u.LayerCount, u.TotalSize, humanSize(u.TotalSize), strings.Join(repos, ";")) 192 + } 193 + } 194 + 195 + // calculateFromLayerRecords uses the hold's layer records (original method) 196 + func calculateFromLayerRecords(baseURL, holdDID string) (map[string]*UserUsage, error) { 197 + layers, err := fetchAllLayerRecords(baseURL, holdDID) 198 + if err != nil { 199 + return nil, err 200 + } 201 + 202 + fmt.Printf("Fetched %d layer records\n", len(layers)) 203 + 204 + userUsage := make(map[string]*UserUsage) 205 + for _, layer := range layers { 206 + if layer.UserDID == "" { 207 + continue 208 + } 209 + 210 + usage, exists := userUsage[layer.UserDID] 211 + if !exists { 212 + usage = &UserUsage{ 213 + DID: layer.UserDID, 214 + UniqueLayers: make(map[string]int64), 215 + Repositories: make(map[string]bool), 216 + } 217 + userUsage[layer.UserDID] = usage 218 + } 219 + 220 + if _, seen := usage.UniqueLayers[layer.Digest]; !seen { 221 + usage.UniqueLayers[layer.Digest] = layer.Size 222 + usage.TotalSize += layer.Size 223 + usage.LayerCount++ 224 + } 225 + } 226 + 227 + return userUsage, nil 228 + } 229 + 230 + // calculateFromManifests queries crew members and fetches their manifests from their PDSes 231 + func calculateFromManifests(baseURL, holdDID string) (map[string]*UserUsage, error) { 232 + // Get all crew members 233 + crewDIDs, err := fetchCrewMembers(baseURL, holdDID) 234 + if err != nil { 235 + return nil, fmt.Errorf("failed to fetch crew: %w", err) 236 + } 237 + 238 + // Also get captain 239 + captainDID, err := fetchCaptain(baseURL, holdDID) 240 + if err == nil && captainDID != "" { 241 + // Add captain to list if not already there 242 + found := false 243 + for _, d := range crewDIDs { 244 + if d == captainDID { 245 + found = true 246 + break 247 + } 248 + } 249 + if !found { 250 + crewDIDs = append(crewDIDs, captainDID) 251 + } 252 + } 253 + 254 + fmt.Printf("Found %d users (crew + captain)\n", len(crewDIDs)) 255 + 256 + userUsage := make(map[string]*UserUsage) 257 + 258 + for _, did := range crewDIDs { 259 + fmt.Printf(" Checking manifests for %s...", did) 260 + 261 + // Resolve DID to PDS 262 + pdsEndpoint, err := resolveDIDToPDS(did) 263 + if err != nil { 264 + fmt.Printf(" (failed to resolve PDS: %v)\n", err) 265 + continue 266 + } 267 + 268 + // Fetch manifests that use this hold 269 + manifests, err := fetchUserManifestsForHold(pdsEndpoint, did, holdDID) 270 + if err != nil { 271 + fmt.Printf(" (failed to fetch manifests: %v)\n", err) 272 + continue 273 + } 274 + 275 + if len(manifests) == 0 { 276 + fmt.Printf(" 0 manifests\n") 277 + continue 278 + } 279 + 280 + // Calculate unique layers across all manifests 281 + usage := &UserUsage{ 282 + DID: did, 283 + UniqueLayers: make(map[string]int64), 284 + Repositories: make(map[string]bool), 285 + } 286 + 287 + for _, m := range manifests { 288 + usage.Repositories[m.Repository] = true 289 + 290 + // Add config blob 291 + if m.Config != nil { 292 + if _, seen := usage.UniqueLayers[m.Config.Digest]; !seen { 293 + usage.UniqueLayers[m.Config.Digest] = m.Config.Size 294 + usage.TotalSize += m.Config.Size 295 + usage.LayerCount++ 296 + } 297 + } 298 + 299 + // Add layers 300 + for _, layer := range m.Layers { 301 + if _, seen := usage.UniqueLayers[layer.Digest]; !seen { 302 + usage.UniqueLayers[layer.Digest] = layer.Size 303 + usage.TotalSize += layer.Size 304 + usage.LayerCount++ 305 + } 306 + } 307 + } 308 + 309 + fmt.Printf(" %d manifests, %d unique layers, %s\n", len(manifests), usage.LayerCount, humanSize(usage.TotalSize)) 310 + 311 + if usage.LayerCount > 0 { 312 + userUsage[did] = usage 313 + } 314 + } 315 + 316 + return userUsage, nil 317 + } 318 + 319 + // fetchCrewMembers gets all crew member DIDs from the hold 320 + func fetchCrewMembers(baseURL, holdDID string) ([]string, error) { 321 + var dids []string 322 + seen := make(map[string]bool) 323 + 324 + cursor := "" 325 + for { 326 + u := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords", baseURL) 327 + params := url.Values{} 328 + params.Set("repo", holdDID) 329 + params.Set("collection", "io.atcr.hold.crew") 330 + params.Set("limit", "100") 331 + if cursor != "" { 332 + params.Set("cursor", cursor) 333 + } 334 + 335 + resp, err := client.Get(u + "?" + params.Encode()) 336 + if err != nil { 337 + return nil, err 338 + } 339 + 340 + var listResp ListRecordsResponse 341 + if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { 342 + resp.Body.Close() 343 + return nil, err 344 + } 345 + resp.Body.Close() 346 + 347 + for _, rec := range listResp.Records { 348 + var crew CrewRecord 349 + if err := json.Unmarshal(rec.Value, &crew); err != nil { 350 + continue 351 + } 352 + if crew.Member != "" && !seen[crew.Member] { 353 + seen[crew.Member] = true 354 + dids = append(dids, crew.Member) 355 + } 356 + } 357 + 358 + if listResp.Cursor == "" || len(listResp.Records) < 100 { 359 + break 360 + } 361 + cursor = listResp.Cursor 362 + } 363 + 364 + return dids, nil 365 + } 366 + 367 + // fetchCaptain gets the captain DID from the hold 368 + func fetchCaptain(baseURL, holdDID string) (string, error) { 369 + u := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=io.atcr.hold.captain&rkey=self", 370 + baseURL, url.QueryEscape(holdDID)) 371 + 372 + resp, err := client.Get(u) 373 + if err != nil { 374 + return "", err 375 + } 376 + defer resp.Body.Close() 377 + 378 + if resp.StatusCode != http.StatusOK { 379 + return "", fmt.Errorf("status %d", resp.StatusCode) 380 + } 381 + 382 + var result struct { 383 + Value struct { 384 + Owner string `json:"owner"` 385 + } `json:"value"` 386 + } 387 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 388 + return "", err 389 + } 390 + 391 + return result.Value.Owner, nil 392 + } 393 + 394 + // fetchUserManifestsForHold fetches all manifests from a user's PDS that use the specified hold 395 + func fetchUserManifestsForHold(pdsEndpoint, userDID, holdDID string) ([]ManifestRecord, error) { 396 + var manifests []ManifestRecord 397 + cursor := "" 398 + 399 + for { 400 + u := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords", pdsEndpoint) 401 + params := url.Values{} 402 + params.Set("repo", userDID) 403 + params.Set("collection", "io.atcr.manifest") 404 + params.Set("limit", "100") 405 + if cursor != "" { 406 + params.Set("cursor", cursor) 407 + } 408 + 409 + resp, err := client.Get(u + "?" + params.Encode()) 410 + if err != nil { 411 + return nil, err 412 + } 413 + 414 + if resp.StatusCode != http.StatusOK { 415 + resp.Body.Close() 416 + return nil, fmt.Errorf("status %d", resp.StatusCode) 417 + } 418 + 419 + var listResp ListRecordsResponse 420 + if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { 421 + resp.Body.Close() 422 + return nil, err 423 + } 424 + resp.Body.Close() 425 + 426 + for _, rec := range listResp.Records { 427 + var m ManifestRecord 428 + if err := json.Unmarshal(rec.Value, &m); err != nil { 429 + continue 430 + } 431 + // Only include manifests for this hold 432 + if m.HoldDID == holdDID { 433 + manifests = append(manifests, m) 434 + } 435 + } 436 + 437 + if listResp.Cursor == "" || len(listResp.Records) < 100 { 438 + break 439 + } 440 + cursor = listResp.Cursor 441 + } 442 + 443 + return manifests, nil 444 + } 445 + 446 + // getHoldDID fetches the hold's DID from /.well-known/atproto-did 447 + func getHoldDID(baseURL string) (string, error) { 448 + resp, err := http.Get(baseURL + "/.well-known/atproto-did") 449 + if err != nil { 450 + return "", err 451 + } 452 + defer resp.Body.Close() 453 + 454 + if resp.StatusCode != http.StatusOK { 455 + return "", fmt.Errorf("unexpected status: %d", resp.StatusCode) 456 + } 457 + 458 + body, err := io.ReadAll(resp.Body) 459 + if err != nil { 460 + return "", err 461 + } 462 + 463 + return strings.TrimSpace(string(body)), nil 464 + } 465 + 466 + // fetchAllLayerRecords fetches all layer records with pagination 467 + func fetchAllLayerRecords(baseURL, holdDID string) ([]LayerRecord, error) { 468 + var allLayers []LayerRecord 469 + cursor := "" 470 + limit := 100 471 + 472 + for { 473 + u := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords", baseURL) 474 + params := url.Values{} 475 + params.Set("repo", holdDID) 476 + params.Set("collection", "io.atcr.hold.layer") 477 + params.Set("limit", fmt.Sprintf("%d", limit)) 478 + if cursor != "" { 479 + params.Set("cursor", cursor) 480 + } 481 + 482 + fullURL := u + "?" + params.Encode() 483 + fmt.Printf(" Fetching: %s\n", fullURL) 484 + 485 + resp, err := client.Get(fullURL) 486 + if err != nil { 487 + return nil, fmt.Errorf("request failed: %w", err) 488 + } 489 + 490 + if resp.StatusCode != http.StatusOK { 491 + body, _ := io.ReadAll(resp.Body) 492 + resp.Body.Close() 493 + return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body)) 494 + } 495 + 496 + var listResp ListRecordsResponse 497 + if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { 498 + resp.Body.Close() 499 + return nil, fmt.Errorf("decode failed: %w", err) 500 + } 501 + resp.Body.Close() 502 + 503 + for _, rec := range listResp.Records { 504 + var layer LayerRecord 505 + if err := json.Unmarshal(rec.Value, &layer); err != nil { 506 + fmt.Fprintf(os.Stderr, "Warning: failed to parse layer record: %v\n", err) 507 + continue 508 + } 509 + allLayers = append(allLayers, layer) 510 + } 511 + 512 + fmt.Printf(" Got %d records (total: %d)\n", len(listResp.Records), len(allLayers)) 513 + 514 + if listResp.Cursor == "" || len(listResp.Records) < limit { 515 + break 516 + } 517 + cursor = listResp.Cursor 518 + } 519 + 520 + return allLayers, nil 521 + } 522 + 523 + // resolveDIDToHandle resolves a DID to a handle using the PLC directory or did:web 524 + func resolveDIDToHandle(did string) (string, error) { 525 + if strings.HasPrefix(did, "did:web:") { 526 + return strings.TrimPrefix(did, "did:web:"), nil 527 + } 528 + 529 + if strings.HasPrefix(did, "did:plc:") { 530 + plcURL := "https://plc.directory/" + did 531 + resp, err := client.Get(plcURL) 532 + if err != nil { 533 + return "", fmt.Errorf("PLC query failed: %w", err) 534 + } 535 + defer resp.Body.Close() 536 + 537 + if resp.StatusCode != http.StatusOK { 538 + return "", fmt.Errorf("PLC returned status %d", resp.StatusCode) 539 + } 540 + 541 + var plcDoc struct { 542 + AlsoKnownAs []string `json:"alsoKnownAs"` 543 + } 544 + if err := json.NewDecoder(resp.Body).Decode(&plcDoc); err != nil { 545 + return "", fmt.Errorf("failed to parse PLC response: %w", err) 546 + } 547 + 548 + for _, aka := range plcDoc.AlsoKnownAs { 549 + if strings.HasPrefix(aka, "at://") { 550 + return strings.TrimPrefix(aka, "at://"), nil 551 + } 552 + } 553 + 554 + return did, nil 555 + } 556 + 557 + return did, nil 558 + } 559 + 560 + // resolveDIDToPDS resolves a DID to its PDS endpoint 561 + func resolveDIDToPDS(did string) (string, error) { 562 + if strings.HasPrefix(did, "did:web:") { 563 + // did:web:example.com -> https://example.com 564 + domain := strings.TrimPrefix(did, "did:web:") 565 + return "https://" + domain, nil 566 + } 567 + 568 + if strings.HasPrefix(did, "did:plc:") { 569 + plcURL := "https://plc.directory/" + did 570 + resp, err := client.Get(plcURL) 571 + if err != nil { 572 + return "", fmt.Errorf("PLC query failed: %w", err) 573 + } 574 + defer resp.Body.Close() 575 + 576 + if resp.StatusCode != http.StatusOK { 577 + return "", fmt.Errorf("PLC returned status %d", resp.StatusCode) 578 + } 579 + 580 + var plcDoc struct { 581 + Service []struct { 582 + ID string `json:"id"` 583 + Type string `json:"type"` 584 + ServiceEndpoint string `json:"serviceEndpoint"` 585 + } `json:"service"` 586 + } 587 + if err := json.NewDecoder(resp.Body).Decode(&plcDoc); err != nil { 588 + return "", fmt.Errorf("failed to parse PLC response: %w", err) 589 + } 590 + 591 + for _, svc := range plcDoc.Service { 592 + if svc.Type == "AtprotoPersonalDataServer" { 593 + return svc.ServiceEndpoint, nil 594 + } 595 + } 596 + 597 + return "", fmt.Errorf("no PDS found in DID document") 598 + } 599 + 600 + return "", fmt.Errorf("unsupported DID method") 601 + } 602 + 603 + // humanSize converts bytes to human-readable format 604 + func humanSize(bytes int64) string { 605 + const ( 606 + KB = 1024 607 + MB = 1024 * KB 608 + GB = 1024 * MB 609 + TB = 1024 * GB 610 + ) 611 + 612 + switch { 613 + case bytes >= TB: 614 + return fmt.Sprintf("%.2f TB", float64(bytes)/TB) 615 + case bytes >= GB: 616 + return fmt.Sprintf("%.2f GB", float64(bytes)/GB) 617 + case bytes >= MB: 618 + return fmt.Sprintf("%.2f MB", float64(bytes)/MB) 619 + case bytes >= KB: 620 + return fmt.Sprintf("%.2f KB", float64(bytes)/KB) 621 + default: 622 + return fmt.Sprintf("%d B", bytes) 623 + } 624 + }
+3 -1
pkg/appview/storage/manifest_store.go
··· 224 224 225 225 // Notify hold about manifest push (for layer tracking, Bluesky posts, and stats) 226 226 // Do this asynchronously to avoid blocking the push 227 - if tag != "" && s.ctx.ServiceToken != "" && s.ctx.Handle != "" { 227 + // Note: We notify even for tagless pushes (e.g., buildx platform manifests) to create layer records 228 + // Bluesky posts are only created for tagged pushes (handled by hold service) 229 + if s.ctx.ServiceToken != "" && s.ctx.Handle != "" { 228 230 go func() { 229 231 defer func() { 230 232 if r := recover(); r != nil {
+3 -2
pkg/hold/oci/xrpc.go
··· 350 350 } 351 351 } 352 352 353 - // Create Bluesky post if enabled 354 - if postsEnabled { 353 + // Create Bluesky post if enabled and tag is present 354 + // Skip posts for tagless pushes (e.g., buildx platform manifests pushed by digest) 355 + if postsEnabled && req.Tag != "" { 355 356 // Resolve handle from DID (cached, 24-hour TTL) 356 357 _, userHandle, _, resolveErr := atproto.ResolveIdentity(ctx, req.UserDID) 357 358 if resolveErr != nil {