A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

begin s3 garbage collection implementation, more envvar cleanup

evan.jarrett.net 9e600649 64cdb669

verified
-11
.env.appview.example
··· 21 21 # Production: Set to your public URL (e.g., https://atcr.io) 22 22 # ATCR_BASE_URL=http://127.0.0.1:5000 23 23 24 - # Service name (used for JWT service/issuer fields) 25 - # Default: Derived from base URL hostname, or "atcr.io" 26 - # ATCR_SERVICE_NAME=atcr.io 27 - 28 24 # ============================================================================== 29 25 # Storage Configuration 30 26 # ============================================================================== ··· 48 44 # Path to JWT signing certificate (auto-generated if missing) 49 45 # Default: /var/lib/atcr/auth/private-key.crt 50 46 # ATCR_AUTH_CERT_PATH=/var/lib/atcr/auth/private-key.crt 51 - 52 - # JWT token expiration in seconds (default: 300 = 5 minutes) 53 - # ATCR_TOKEN_EXPIRATION=300 54 47 55 48 # Path to OAuth client P-256 signing key (auto-generated on first run) 56 49 # Used for confidential OAuth client authentication (production only) ··· 130 123 # ATProto relay endpoint for backfill sync API 131 124 # Default: https://relay1.us-east.bsky.network 132 125 # ATCR_RELAY_ENDPOINT=https://relay1.us-east.bsky.network 133 - 134 - # Backfill interval (default: 1h) 135 - # Examples: 30m, 1h, 2h, 24h 136 - # ATCR_BACKFILL_INTERVAL=1h
-13
.env.example
··· 45 45 # Production: Set to your public URL (e.g., https://atcr.io) 46 46 # ATCR_BASE_URL=https://atcr.io 47 47 48 - # Service name for JWT issuer/service fields 49 - # Default: Derived from ATCR_BASE_URL hostname, or "atcr.io" 50 - # ATCR_SERVICE_NAME=atcr.io 51 - 52 48 # ============================================================================== 53 49 # APPVIEW - STORAGE CONFIGURATION (REQUIRED) 54 50 # ============================================================================== ··· 72 68 # Default: /var/lib/atcr/auth/private-key.crt 73 69 # ATCR_AUTH_CERT_PATH=/var/lib/atcr/auth/private-key.crt 74 70 75 - # JWT token expiration in seconds 76 - # Default: 300 (5 minutes) 77 - # ATCR_TOKEN_EXPIRATION=300 78 - 79 71 # Path to OAuth client P-256 signing key (auto-generated for production) 80 72 # Used for confidential OAuth client authentication 81 73 # Localhost deployments always use public OAuth clients (no key needed) ··· 109 101 # ATProto relay endpoint for backfill sync API 110 102 # Default: https://relay1.us-east.bsky.network 111 103 # ATCR_RELAY_ENDPOINT=https://relay1.us-east.bsky.network 112 - 113 - # Backfill sync interval 114 - # Default: 1h 115 - # Examples: 30m, 1h, 2h, 24h 116 - # ATCR_BACKFILL_INTERVAL=1h 117 104 118 105 # ============================================================================== 119 106 # APPVIEW - HEALTH CHECKS
+12
.env.hold.example
··· 151 151 # Basic auth credentials (optional) 152 152 # ATCR_LOG_SHIPPER_USERNAME= 153 153 # ATCR_LOG_SHIPPER_PASSWORD= 154 + 155 + # ============================================================================== 156 + # Garbage Collection 157 + # ============================================================================== 158 + 159 + # Enable garbage collection for orphaned blobs (default: true) 160 + # GC runs on startup and then nightly (every 24 hours) 161 + GC_ENABLED=true 162 + 163 + # Dry-run mode: log what would be deleted without actually deleting (default: true) 164 + # Set to false after validating the GC logs show correct behavior 165 + GC_DRY_RUN=true
+1 -2
CLAUDE.md
··· 230 230 - **Issued by:** AppView after OAuth login 231 231 - **Stored in:** Docker credential helper (`~/.atcr/credential-helper-token.json`) 232 232 - **Used for:** Docker client → AppView authentication 233 - - **Lifetime:** 15 minutes (configurable via `ATCR_TOKEN_EXPIRATION`) 233 + - **Lifetime:** 5 minutes 234 234 - **Format:** JWT with DID claim 235 235 236 236 **3. Service Tokens** ··· 666 666 667 667 **Authentication:** 668 668 - `ATCR_AUTH_KEY_PATH` - JWT signing key path (default: `/var/lib/atcr/auth/private-key.pem`) 669 - - `ATCR_TOKEN_EXPIRATION` - JWT expiration in seconds (default: 300) 670 669 671 670 **UI:** 672 671 - `ATCR_UI_DATABASE_PATH` - SQLite database path (default: `/var/lib/atcr/ui.db`)
+2 -2
cmd/appview/serve.go
··· 565 565 } 566 566 }() 567 567 568 - // Start periodic backfill scheduler 569 - interval := jetstreamCfg.BackfillInterval 568 + // Start periodic backfill scheduler (hardcoded 1h interval) 569 + interval := 1 * time.Hour 570 570 571 571 go func() { 572 572 ticker := time.NewTicker(interval)
+20
cmd/hold/main.go
··· 12 12 13 13 "atcr.io/pkg/hold" 14 14 "atcr.io/pkg/hold/admin" 15 + "atcr.io/pkg/hold/gc" 15 16 "atcr.io/pkg/hold/oci" 16 17 "atcr.io/pkg/hold/pds" 17 18 "atcr.io/pkg/hold/quota" ··· 122 123 123 124 // Create blob store adapter and XRPC handlers 124 125 var ociHandler *oci.XRPCHandler 126 + var garbageCollector *gc.GarbageCollector 125 127 if holdPDS != nil { 126 128 // Create storage driver from config 127 129 ctx := context.Background() ··· 142 144 143 145 // Create OCI XRPC handler (multipart upload endpoints) 144 146 ociHandler = oci.NewXRPCHandler(holdPDS, *s3Service, driver, cfg.Server.DisablePresignedURLs, cfg.Registration.EnableBlueskyPosts, nil, quotaMgr) 147 + 148 + // Initialize garbage collector 149 + gcConfig := gc.LoadConfigFromEnv() 150 + garbageCollector = gc.NewGarbageCollector(holdPDS, driver, gcConfig) 151 + slog.Info("Garbage collector initialized", 152 + "enabled", gcConfig.Enabled, 153 + "dryRun", gcConfig.DryRun) 145 154 } 146 155 147 156 // Setup HTTP routes with chi router ··· 238 247 } 239 248 } 240 249 250 + // Start garbage collector (runs on startup + nightly) 251 + if garbageCollector != nil { 252 + garbageCollector.Start(context.Background()) 253 + } 254 + 241 255 // Wait for signal or server error 242 256 select { 243 257 case err := <-serverErr: ··· 255 269 } else { 256 270 slog.Info("Status post set to offline") 257 271 } 272 + } 273 + 274 + // Stop garbage collector 275 + if garbageCollector != nil { 276 + garbageCollector.Stop() 277 + slog.Info("Garbage collector stopped") 258 278 } 259 279 260 280 // Close broadcaster database connection
-16
deploy/.env.prod.template
··· 142 142 # Uncomment to override if you want to use a different hold service as the default 143 143 # ATCR_DEFAULT_HOLD_DID=did:web:some-other-hold.example.com 144 144 145 - # JWT token expiration in seconds 146 - # Default: 300 (5 minutes) 147 - ATCR_TOKEN_EXPIRATION=300 148 - 149 145 # OAuth client display name (shown in authorization screens) 150 146 # Default: AT Container Registry 151 147 # ATCR_CLIENT_NAME=AT Container Registry ··· 178 174 # Default: https://relay1.us-east.bsky.network 179 175 ATCR_RELAY_ENDPOINT=https://relay1.us-east.bsky.network 180 176 181 - # Backfill interval 182 - # Examples: 30m, 1h, 2h, 24h 183 - # Default: 1h 184 - ATCR_BACKFILL_INTERVAL=1h 185 - 186 177 # ============================================================================== 187 178 # Optional: Filesystem Storage (alternative to S3) 188 179 # ============================================================================== ··· 194 185 195 186 # STORAGE_DRIVER=filesystem 196 187 # STORAGE_ROOT_DIR=/var/lib/atcr/hold 197 - 198 - # ============================================================================== 199 - # Advanced Configuration 200 - # ============================================================================== 201 - 202 - # Override service name (defaults to APPVIEW_DOMAIN) 203 - # ATCR_SERVICE_NAME=atcr.io 204 188 205 189 # ============================================================================== 206 190 # CHECKLIST
-3
deploy/docker-compose.prod.yml
··· 48 48 # Server configuration 49 49 ATCR_HTTP_ADDR: :5000 50 50 ATCR_BASE_URL: https://${APPVIEW_DOMAIN:-atcr.io} 51 - ATCR_SERVICE_NAME: ${APPVIEW_DOMAIN:-atcr.io} 52 51 53 52 # Storage configuration (derived from HOLD_DOMAIN) 54 53 ATCR_DEFAULT_HOLD_DID: ${ATCR_DEFAULT_HOLD_DID:-did:web:${HOLD_DOMAIN:-hold01.atcr.io}} ··· 56 55 # Authentication 57 56 ATCR_AUTH_KEY_PATH: /var/lib/atcr/auth/private-key.pem 58 57 ATCR_AUTH_CERT_PATH: /var/lib/atcr/auth/private-key.crt 59 - ATCR_TOKEN_EXPIRATION: ${ATCR_TOKEN_EXPIRATION:-300} 60 58 61 59 # UI configuration 62 60 ATCR_UI_DATABASE_PATH: /var/lib/atcr/ui.db ··· 69 67 JETSTREAM_URL: ${JETSTREAM_URL:-wss://jetstream2.us-west.bsky.network/subscribe} 70 68 ATCR_BACKFILL_ENABLED: ${ATCR_BACKFILL_ENABLED:-true} 71 69 ATCR_RELAY_ENDPOINT: ${ATCR_RELAY_ENDPOINT:-https://relay1.us-east.bsky.network} 72 - ATCR_BACKFILL_INTERVAL: ${ATCR_BACKFILL_INTERVAL:-1h} 73 70 volumes: 74 71 # Persistent data: auth keys, UI database, OAuth tokens, Jetstream cache 75 72 - atcr-appview-data:/var/lib/atcr
-15
docs/appview.md
··· 110 110 - **Production:** Set to your public URL (e.g., `https://atcr.example.com`) 111 111 - **Example:** `https://atcr.io`, `http://127.0.0.1:5000` 112 112 113 - #### `ATCR_SERVICE_NAME` 114 - - **Default:** Derived from `ATCR_BASE_URL` hostname, or `atcr.io` 115 - - **Description:** Service name used for JWT `service` and `issuer` fields. Controls token scope. 116 - - **Example:** `atcr.io`, `registry.example.com` 117 - 118 113 ### Storage Configuration 119 114 120 115 #### `ATCR_DEFAULT_HOLD_DID` ⚠️ REQUIRED ··· 137 132 - **Default:** `/var/lib/atcr/auth/private-key.crt` 138 133 - **Description:** Path to JWT signing certificate. Auto-generated if missing. 139 134 - **Note:** Paired with `ATCR_AUTH_KEY_PATH` 140 - 141 - #### `ATCR_TOKEN_EXPIRATION` 142 - - **Default:** `300` (5 minutes) 143 - - **Description:** JWT token expiration in seconds. Registry JWTs are short-lived for security. 144 - - **Recommendation:** Keep between 300-900 seconds (5-15 minutes) 145 135 146 136 ### Web UI Configuration 147 137 ··· 199 189 - **Default:** `https://relay1.us-east.bsky.network` 200 190 - **Description:** ATProto relay endpoint for backfill sync API 201 191 - **Note:** Used when `ATCR_BACKFILL_ENABLED=true` 202 - 203 - #### `ATCR_BACKFILL_INTERVAL` 204 - - **Default:** `1h` 205 - - **Description:** How often to run backfill sync 206 - - **Format:** Duration string (e.g., `30m`, `1h`, `2h`, `24h`) 207 192 208 193 ### Legacy Configuration 209 194
+6 -20
pkg/appview/config.go
··· 102 102 // BackfillEnabled controls whether backfill is enabled (from env: ATCR_BACKFILL_ENABLED, default: true) 103 103 BackfillEnabled bool `yaml:"backfill_enabled"` 104 104 105 - // BackfillInterval is the backfill interval (from env: ATCR_BACKFILL_INTERVAL, default: 1h) 106 - BackfillInterval time.Duration `yaml:"backfill_interval"` 107 - 108 105 // RelayEndpoint is the relay endpoint for sync API (from env: ATCR_RELAY_ENDPOINT, default: https://relay1.us-east.bsky.network) 109 106 RelayEndpoint string `yaml:"relay_endpoint"` 110 107 } ··· 117 114 // CertPath is the JWT certificate path (from env: ATCR_AUTH_CERT_PATH, default: "/var/lib/atcr/auth/private-key.crt") 118 115 CertPath string `yaml:"cert_path"` 119 116 120 - // TokenExpiration is the JWT expiration duration (from env: ATCR_TOKEN_EXPIRATION, default: 300s) 117 + // TokenExpiration is the JWT expiration duration (5 minutes) 121 118 TokenExpiration time.Duration `yaml:"token_expiration"` 122 119 123 120 // ServiceName is the service name used for JWT issuer and service fields 124 - // Derived from ATCR_SERVICE_NAME env var or extracted from base URL (e.g., "atcr.io") 121 + // Derived from base URL hostname (e.g., "atcr.io") 125 122 ServiceName string `yaml:"service_name"` 126 123 } 127 124 ··· 176 173 // Jetstream configuration 177 174 cfg.Jetstream.URL = getEnvOrDefault("JETSTREAM_URL", "wss://jetstream2.us-west.bsky.network/subscribe") 178 175 cfg.Jetstream.BackfillEnabled = os.Getenv("ATCR_BACKFILL_ENABLED") != "false" 179 - cfg.Jetstream.BackfillInterval = getDurationOrDefault("ATCR_BACKFILL_INTERVAL", 1*time.Hour) 180 176 cfg.Jetstream.RelayEndpoint = getEnvOrDefault("ATCR_RELAY_ENDPOINT", "https://relay1.us-east.bsky.network") 181 177 182 178 // Auth configuration 183 179 cfg.Auth.KeyPath = getEnvOrDefault("ATCR_AUTH_KEY_PATH", "/var/lib/atcr/auth/private-key.pem") 184 180 cfg.Auth.CertPath = getEnvOrDefault("ATCR_AUTH_CERT_PATH", "/var/lib/atcr/auth/private-key.crt") 185 181 186 - // Parse token expiration (default: 300 seconds = 5 minutes) 187 - expirationStr := getEnvOrDefault("ATCR_TOKEN_EXPIRATION", "300") 188 - expirationSecs, err := strconv.Atoi(expirationStr) 189 - if err != nil { 190 - return nil, fmt.Errorf("invalid ATCR_TOKEN_EXPIRATION: %w", err) 191 - } 192 - cfg.Auth.TokenExpiration = time.Duration(expirationSecs) * time.Second 182 + // Token expiration: 5 minutes (not configurable) 183 + cfg.Auth.TokenExpiration = 5 * time.Minute 193 184 194 185 // Derive service name from base URL or env var (used for JWT issuer and service) 195 186 cfg.Auth.ServiceName = getServiceName(cfg.Server.BaseURL) ··· 336 327 } 337 328 } 338 329 339 - // getServiceName extracts service name from base URL or uses env var 330 + // getServiceName extracts service name from base URL hostname 340 331 func getServiceName(baseURL string) string { 341 - // Check env var first 342 - if serviceName := os.Getenv("ATCR_SERVICE_NAME"); serviceName != "" { 343 - return serviceName 344 - } 345 - 346 - // Try to extract from base URL 332 + // Extract from base URL 347 333 parsed, err := url.Parse(baseURL) 348 334 if err == nil && parsed.Hostname() != "" { 349 335 hostname := parsed.Hostname()
+3 -24
pkg/appview/config_test.go
··· 8 8 9 9 func Test_getServiceName(t *testing.T) { 10 10 tests := []struct { 11 - name string 12 - baseURL string 13 - envService string 14 - setEnv bool 15 - want string 11 + name string 12 + baseURL string 13 + want string 16 14 }{ 17 15 { 18 - name: "env var set", 19 - baseURL: "http://127.0.0.1:5000", 20 - envService: "custom.registry.io", 21 - setEnv: true, 22 - want: "custom.registry.io", 23 - }, 24 - { 25 16 name: "localhost - use default", 26 17 baseURL: "http://localhost:5000", 27 - setEnv: false, 28 18 want: "atcr.io", 29 19 }, 30 20 { 31 21 name: "127.0.0.1 - use default", 32 22 baseURL: "http://127.0.0.1:5000", 33 - setEnv: false, 34 23 want: "atcr.io", 35 24 }, 36 25 { 37 26 name: "custom domain", 38 27 baseURL: "https://registry.example.com", 39 - setEnv: false, 40 28 want: "registry.example.com", 41 29 }, 42 30 { 43 31 name: "domain with port", 44 32 baseURL: "https://registry.example.com:443", 45 - setEnv: false, 46 33 want: "registry.example.com", 47 34 }, 48 35 { 49 36 name: "invalid URL - use default", 50 37 baseURL: "://invalid", 51 - setEnv: false, 52 38 want: "atcr.io", 53 39 }, 54 40 } 55 41 56 42 for _, tt := range tests { 57 43 t.Run(tt.name, func(t *testing.T) { 58 - if tt.setEnv { 59 - t.Setenv("ATCR_SERVICE_NAME", tt.envService) 60 - } else { 61 - os.Unsetenv("ATCR_SERVICE_NAME") 62 - } 63 - 64 44 got := getServiceName(tt.baseURL) 65 45 if got != tt.want { 66 46 t.Errorf("getServiceName() = %v, want %v", got, tt.want) ··· 214 194 215 195 // Clear other env vars to use defaults 216 196 os.Unsetenv("ATCR_BASE_URL") 217 - os.Unsetenv("ATCR_SERVICE_NAME") 218 197 219 198 got, err := LoadConfigFromEnv() 220 199 if (err != nil) != tt.wantError {
+37
pkg/hold/gc/config.go
··· 1 + // Package gc implements garbage collection for the hold service. 2 + // It periodically cleans up orphaned blobs from S3 storage based on 3 + // layer records in the hold's embedded PDS. 4 + package gc 5 + 6 + import ( 7 + "os" 8 + "time" 9 + ) 10 + 11 + // Hardcoded defaults - keep configuration simple 12 + const ( 13 + // gcInterval is how often GC runs (nightly) 14 + gcInterval = 24 * time.Hour 15 + 16 + // gcGracePeriod is how old a layer record must be before it's considered for GC. 17 + // Records created in the last 7 days are skipped (GDPR/CCPA compliant). 18 + gcGracePeriod = 7 * 24 * time.Hour 19 + ) 20 + 21 + // Config holds GC configuration, loaded from environment variables 22 + type Config struct { 23 + // Enabled controls whether GC is active (GC_ENABLED, default: true) 24 + Enabled bool 25 + 26 + // DryRun logs what would be deleted without actually deleting (GC_DRY_RUN, default: true) 27 + // Remove after initial validation 28 + DryRun bool 29 + } 30 + 31 + // LoadConfigFromEnv loads GC configuration from environment variables 32 + func LoadConfigFromEnv() Config { 33 + return Config{ 34 + Enabled: os.Getenv("GC_ENABLED") != "false", // Default true 35 + DryRun: os.Getenv("GC_DRY_RUN") != "false", // Default true 36 + } 37 + }
+446
pkg/hold/gc/gc.go
··· 1 + package gc 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "regexp" 12 + "strings" 13 + "sync" 14 + "time" 15 + 16 + "atcr.io/pkg/atproto" 17 + "atcr.io/pkg/hold/pds" 18 + "github.com/bluesky-social/indigo/atproto/syntax" 19 + storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" 20 + ) 21 + 22 + // GarbageCollector handles cleanup of orphaned blobs from storage 23 + type GarbageCollector struct { 24 + pds *pds.HoldPDS 25 + driver storagedriver.StorageDriver 26 + cfg Config 27 + logger *slog.Logger 28 + 29 + // stopCh signals the background goroutine to stop 30 + stopCh chan struct{} 31 + // wg tracks the background goroutine 32 + wg sync.WaitGroup 33 + } 34 + 35 + // GCResult contains statistics from a GC run 36 + type GCResult struct { 37 + BlobsDeleted int64 `json:"blobs_deleted"` 38 + BytesReclaimed int64 `json:"bytes_reclaimed"` 39 + RecordsDeleted int64 `json:"records_deleted"` 40 + OrphanedRecords int64 `json:"orphaned_records"` 41 + OrphanedBlobs int64 `json:"orphaned_blobs"` 42 + ReferencedBlobs int64 `json:"referenced_blobs"` 43 + Duration time.Duration `json:"duration"` 44 + } 45 + 46 + // NewGarbageCollector creates a new GC instance 47 + func NewGarbageCollector(holdPDS *pds.HoldPDS, driver storagedriver.StorageDriver, cfg Config) *GarbageCollector { 48 + return &GarbageCollector{ 49 + pds: holdPDS, 50 + driver: driver, 51 + cfg: cfg, 52 + logger: slog.Default().With("component", "gc"), 53 + stopCh: make(chan struct{}), 54 + } 55 + } 56 + 57 + // Start begins the GC background process 58 + // It runs GC immediately on startup, then periodically according to gcInterval 59 + func (gc *GarbageCollector) Start(ctx context.Context) { 60 + if !gc.cfg.Enabled { 61 + gc.logger.Info("GC disabled") 62 + return 63 + } 64 + 65 + // Run on startup 66 + gc.logger.Info("Running GC on startup", "dryRun", gc.cfg.DryRun) 67 + result, err := gc.Run(ctx) 68 + if err != nil { 69 + gc.logger.Error("Startup GC failed", "error", err) 70 + } else { 71 + gc.logResult(result) 72 + } 73 + 74 + // Start background ticker for nightly runs 75 + gc.wg.Add(1) 76 + go func() { 77 + defer gc.wg.Done() 78 + 79 + ticker := time.NewTicker(gcInterval) 80 + defer ticker.Stop() 81 + 82 + for { 83 + select { 84 + case <-gc.stopCh: 85 + gc.logger.Info("GC background process stopped") 86 + return 87 + case <-ctx.Done(): 88 + gc.logger.Info("GC context cancelled") 89 + return 90 + case <-ticker.C: 91 + gc.logger.Info("Running nightly GC", "dryRun", gc.cfg.DryRun) 92 + result, err := gc.Run(ctx) 93 + if err != nil { 94 + gc.logger.Error("Nightly GC failed", "error", err) 95 + } else { 96 + gc.logResult(result) 97 + } 98 + } 99 + } 100 + }() 101 + 102 + gc.logger.Info("GC background process started", "interval", gcInterval) 103 + } 104 + 105 + // Stop gracefully stops the GC background process 106 + func (gc *GarbageCollector) Stop() { 107 + close(gc.stopCh) 108 + gc.wg.Wait() 109 + } 110 + 111 + // Run executes a single GC cycle 112 + func (gc *GarbageCollector) Run(ctx context.Context) (*GCResult, error) { 113 + start := time.Now() 114 + result := &GCResult{} 115 + 116 + gc.logger.Info("Starting GC run", "dryRun", gc.cfg.DryRun) 117 + 118 + // Phase 1: Build referenced set from layer records 119 + referenced, orphanedRecords, err := gc.buildReferencedSet(ctx, result) 120 + if err != nil { 121 + return nil, fmt.Errorf("phase 1 (build referenced set) failed: %w", err) 122 + } 123 + 124 + gc.logger.Info("Phase 1 complete", 125 + "referenced", len(referenced), 126 + "orphanedRecords", len(orphanedRecords)) 127 + 128 + // Phase 2: Delete orphaned layer records 129 + if err := gc.deleteOrphanedRecords(ctx, orphanedRecords, result); err != nil { 130 + gc.logger.Error("Phase 2 (delete orphaned records) failed", "error", err) 131 + // Continue to phase 3 - we can still clean up blobs 132 + } 133 + 134 + // Phase 3: Walk storage and delete unreferenced blobs 135 + if err := gc.deleteOrphanedBlobs(ctx, referenced, result); err != nil { 136 + return nil, fmt.Errorf("phase 3 (delete orphaned blobs) failed: %w", err) 137 + } 138 + 139 + result.Duration = time.Since(start) 140 + result.ReferencedBlobs = int64(len(referenced)) 141 + 142 + return result, nil 143 + } 144 + 145 + // buildReferencedSet iterates layer records and builds a set of referenced digests 146 + // Returns: referenced digest set, list of orphaned record rkeys, error 147 + func (gc *GarbageCollector) buildReferencedSet(ctx context.Context, result *GCResult) (map[string]bool, []string, error) { 148 + referenced := make(map[string]bool) 149 + var orphanedRecords []string 150 + 151 + recordsIndex := gc.pds.RecordsIndex() 152 + if recordsIndex == nil { 153 + return nil, nil, fmt.Errorf("records index not available") 154 + } 155 + 156 + cursor := "" 157 + batchSize := 1000 158 + totalRecords := 0 159 + 160 + for { 161 + records, nextCursor, err := recordsIndex.ListRecords(atproto.LayerCollection, batchSize, cursor, true) 162 + if err != nil { 163 + return nil, nil, fmt.Errorf("failed to list layer records: %w", err) 164 + } 165 + 166 + for _, rec := range records { 167 + totalRecords++ 168 + 169 + // Decode the layer record 170 + layer, err := gc.decodeLayerRecord(ctx, rec) 171 + if err != nil { 172 + gc.logger.Warn("Failed to decode layer record", "rkey", rec.Rkey, "error", err) 173 + continue 174 + } 175 + 176 + // Grace period: skip records from last 7 days 177 + recordTime := tidToTime(rec.Rkey) 178 + if time.Since(recordTime) < gcGracePeriod { 179 + // Recent record - assume referenced, skip checking 180 + referenced[layer.Digest] = true 181 + continue 182 + } 183 + 184 + // Cross-check: does the manifest still exist? 185 + if gc.manifestExists(ctx, layer.Manifest) { 186 + referenced[layer.Digest] = true 187 + } else { 188 + result.OrphanedRecords++ 189 + orphanedRecords = append(orphanedRecords, rec.Rkey) 190 + gc.logger.Debug("Found orphaned layer record", 191 + "rkey", rec.Rkey, 192 + "digest", layer.Digest, 193 + "manifest", layer.Manifest) 194 + } 195 + } 196 + 197 + if nextCursor == "" { 198 + break 199 + } 200 + cursor = nextCursor 201 + 202 + // Progress logging 203 + if totalRecords%10000 == 0 { 204 + gc.logger.Info("Phase 1 progress", "processed", totalRecords) 205 + } 206 + } 207 + 208 + gc.logger.Info("Scanned layer records", "total", totalRecords) 209 + return referenced, orphanedRecords, nil 210 + } 211 + 212 + // deleteOrphanedRecords removes layer records whose manifests no longer exist 213 + func (gc *GarbageCollector) deleteOrphanedRecords(ctx context.Context, orphanedRkeys []string, result *GCResult) error { 214 + for _, rkey := range orphanedRkeys { 215 + if gc.cfg.DryRun { 216 + gc.logger.Info("DRY-RUN: Would delete layer record", "rkey", rkey) 217 + } else { 218 + if err := gc.pds.DeleteLayerRecord(ctx, rkey); err != nil { 219 + gc.logger.Error("Failed to delete layer record", "rkey", rkey, "error", err) 220 + continue 221 + } 222 + result.RecordsDeleted++ 223 + gc.logger.Debug("Deleted orphaned layer record", "rkey", rkey) 224 + } 225 + } 226 + 227 + gc.logger.Info("Phase 2 complete", 228 + "orphaned", len(orphanedRkeys), 229 + "deleted", result.RecordsDeleted, 230 + "dryRun", gc.cfg.DryRun) 231 + 232 + return nil 233 + } 234 + 235 + // deleteOrphanedBlobs walks storage and deletes blobs not in the referenced set 236 + func (gc *GarbageCollector) deleteOrphanedBlobs(ctx context.Context, referenced map[string]bool, result *GCResult) error { 237 + blobsPath := "/docker/registry/v2/blobs" 238 + 239 + err := gc.driver.Walk(ctx, blobsPath, func(fi storagedriver.FileInfo) error { 240 + if fi.IsDir() { 241 + return nil 242 + } 243 + 244 + // Only process data files 245 + if !strings.HasSuffix(fi.Path(), "/data") { 246 + return nil 247 + } 248 + 249 + // Extract digest from path 250 + digest := extractDigestFromPath(fi.Path()) 251 + if digest == "" { 252 + return nil 253 + } 254 + 255 + // Check if referenced by any layer record 256 + if referenced[digest] { 257 + return nil 258 + } 259 + 260 + result.OrphanedBlobs++ 261 + 262 + if gc.cfg.DryRun { 263 + gc.logger.Info("DRY-RUN: Would delete blob", 264 + "digest", digest, 265 + "size", fi.Size()) 266 + } else { 267 + if err := gc.driver.Delete(ctx, fi.Path()); err != nil { 268 + gc.logger.Error("Failed to delete blob", "path", fi.Path(), "error", err) 269 + return nil // Continue with other blobs 270 + } 271 + result.BlobsDeleted++ 272 + result.BytesReclaimed += fi.Size() 273 + gc.logger.Debug("Deleted orphaned blob", 274 + "digest", digest, 275 + "size", fi.Size()) 276 + } 277 + 278 + return nil 279 + }) 280 + 281 + if err != nil { 282 + return fmt.Errorf("walk storage failed: %w", err) 283 + } 284 + 285 + gc.logger.Info("Phase 3 complete", 286 + "orphanedBlobs", result.OrphanedBlobs, 287 + "deleted", result.BlobsDeleted, 288 + "reclaimed", result.BytesReclaimed, 289 + "dryRun", gc.cfg.DryRun) 290 + 291 + return nil 292 + } 293 + 294 + // decodeLayerRecord reads and decodes a layer record from the PDS 295 + func (gc *GarbageCollector) decodeLayerRecord(ctx context.Context, rec pds.Record) (*atproto.LayerRecord, error) { 296 + // Get the record from the repo 297 + recordPath := rec.Collection + "/" + rec.Rkey 298 + _, recBytes, err := gc.pds.GetRecordBytes(ctx, recordPath) 299 + if err != nil { 300 + return nil, fmt.Errorf("get record bytes: %w", err) 301 + } 302 + 303 + // Decode the layer record 304 + var layer atproto.LayerRecord 305 + if err := layer.UnmarshalCBOR(bytes.NewReader(*recBytes)); err != nil { 306 + return nil, fmt.Errorf("unmarshal CBOR: %w", err) 307 + } 308 + 309 + return &layer, nil 310 + } 311 + 312 + // manifestExists checks if a manifest still exists at the given AT-URI 313 + func (gc *GarbageCollector) manifestExists(ctx context.Context, manifestURI string) bool { 314 + // Parse AT-URI: at://did:plc:xxx/io.atcr.manifest/abc123 315 + parts := parseATURI(manifestURI) 316 + if parts == nil { 317 + gc.logger.Debug("Could not parse manifest URI", "uri", manifestURI) 318 + return false // Can't parse, assume orphaned 319 + } 320 + 321 + // Check if the manifest record still exists via XRPC 322 + exists, err := gc.checkManifestViaXRPC(ctx, parts.DID, parts.Collection, parts.Rkey) 323 + if err != nil { 324 + // Network error - assume manifest exists (safe default) 325 + gc.logger.Warn("Failed to check manifest existence, assuming exists", 326 + "uri", manifestURI, 327 + "error", err) 328 + return true 329 + } 330 + 331 + return exists 332 + } 333 + 334 + // atURIParts contains parsed components of an AT-URI 335 + type atURIParts struct { 336 + DID string 337 + Collection string 338 + Rkey string 339 + } 340 + 341 + // parseATURI parses an AT-URI into its components 342 + // Format: at://did:plc:xxx/collection/rkey 343 + func parseATURI(uri string) *atURIParts { 344 + if !strings.HasPrefix(uri, "at://") { 345 + return nil 346 + } 347 + 348 + // Remove at:// prefix 349 + path := strings.TrimPrefix(uri, "at://") 350 + 351 + // Split by / 352 + parts := strings.SplitN(path, "/", 3) 353 + if len(parts) != 3 { 354 + return nil 355 + } 356 + 357 + return &atURIParts{ 358 + DID: parts[0], 359 + Collection: parts[1], 360 + Rkey: parts[2], 361 + } 362 + } 363 + 364 + // checkManifestViaXRPC checks if a manifest record exists by querying the user's PDS 365 + func (gc *GarbageCollector) checkManifestViaXRPC(ctx context.Context, did, collection, rkey string) (bool, error) { 366 + // Resolve DID to PDS endpoint 367 + pdsEndpoint, err := atproto.ResolveDIDToPDS(ctx, did) 368 + if err != nil { 369 + return false, fmt.Errorf("resolve PDS: %w", err) 370 + } 371 + 372 + // Build XRPC URL 373 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 374 + pdsEndpoint, did, collection, rkey) 375 + 376 + // Make request with timeout 377 + client := &http.Client{Timeout: 10 * time.Second} 378 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 379 + if err != nil { 380 + return false, fmt.Errorf("create request: %w", err) 381 + } 382 + 383 + resp, err := client.Do(req) 384 + if err != nil { 385 + return false, fmt.Errorf("http request: %w", err) 386 + } 387 + defer resp.Body.Close() 388 + 389 + // Consume body to allow connection reuse 390 + _, _ = io.Copy(io.Discard, resp.Body) 391 + 392 + switch resp.StatusCode { 393 + case http.StatusOK: 394 + return true, nil 395 + case http.StatusNotFound, http.StatusBadRequest: 396 + // Record doesn't exist 397 + return false, nil 398 + default: 399 + // Read error body for debugging 400 + body, _ := io.ReadAll(resp.Body) 401 + return false, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body)) 402 + } 403 + } 404 + 405 + // tidToTime extracts the timestamp from a TID (Timestamp ID) 406 + // TIDs are 13-character base32 encoded timestamps with counter 407 + func tidToTime(tid string) time.Time { 408 + // TIDs are base32-sortable timestamps 409 + // Use indigo's syntax package for proper parsing 410 + t, err := syntax.ParseTID(tid) 411 + if err != nil { 412 + // Return zero time - will be older than grace period 413 + return time.Time{} 414 + } 415 + return t.Time() 416 + } 417 + 418 + // extractDigestFromPath extracts a digest from a storage path 419 + // Path format: /docker/registry/v2/blobs/{algorithm}/{xx}/{hash}/data 420 + // Returns: {algorithm}:{hash} 421 + func extractDigestFromPath(path string) string { 422 + // Match pattern: /blobs/{alg}/{xx}/{hash}/data 423 + re := regexp.MustCompile(`/blobs/([^/]+)/[^/]+/([^/]+)/data$`) 424 + matches := re.FindStringSubmatch(path) 425 + if len(matches) != 3 { 426 + return "" 427 + } 428 + return matches[1] + ":" + matches[2] 429 + } 430 + 431 + // logResult logs the GC result in a structured format 432 + func (gc *GarbageCollector) logResult(result *GCResult) { 433 + gc.logger.Info("GC run complete", 434 + "duration", result.Duration, 435 + "referencedBlobs", result.ReferencedBlobs, 436 + "orphanedRecords", result.OrphanedRecords, 437 + "recordsDeleted", result.RecordsDeleted, 438 + "orphanedBlobs", result.OrphanedBlobs, 439 + "blobsDeleted", result.BlobsDeleted, 440 + "bytesReclaimed", result.BytesReclaimed, 441 + "dryRun", gc.cfg.DryRun) 442 + 443 + // Also log as JSON for easier parsing 444 + resultJSON, _ := json.Marshal(result) 445 + gc.logger.Debug("GC result JSON", "result", string(resultJSON)) 446 + }
+228
pkg/hold/gc/gc_test.go
··· 1 + package gc 2 + 3 + import ( 4 + "testing" 5 + "time" 6 + ) 7 + 8 + func TestExtractDigestFromPath(t *testing.T) { 9 + tests := []struct { 10 + name string 11 + path string 12 + expected string 13 + }{ 14 + { 15 + name: "valid sha256 path", 16 + path: "/docker/registry/v2/blobs/sha256/ab/abc123def456/data", 17 + expected: "sha256:abc123def456", 18 + }, 19 + { 20 + name: "valid sha256 path with full hash", 21 + path: "/docker/registry/v2/blobs/sha256/e3/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855/data", 22 + expected: "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 23 + }, 24 + { 25 + name: "invalid path - no data suffix", 26 + path: "/docker/registry/v2/blobs/sha256/ab/abc123def456", 27 + expected: "", 28 + }, 29 + { 30 + name: "invalid path - wrong structure", 31 + path: "/some/other/path/data", 32 + expected: "", 33 + }, 34 + { 35 + name: "empty path", 36 + path: "", 37 + expected: "", 38 + }, 39 + { 40 + name: "uploads temp path (should not match)", 41 + path: "/docker/registry/v2/uploads/temp-uuid/data", 42 + expected: "", 43 + }, 44 + } 45 + 46 + for _, tt := range tests { 47 + t.Run(tt.name, func(t *testing.T) { 48 + result := extractDigestFromPath(tt.path) 49 + if result != tt.expected { 50 + t.Errorf("extractDigestFromPath(%q) = %q, want %q", tt.path, result, tt.expected) 51 + } 52 + }) 53 + } 54 + } 55 + 56 + func TestParseATURI(t *testing.T) { 57 + tests := []struct { 58 + name string 59 + uri string 60 + expectNil bool 61 + did string 62 + collection string 63 + rkey string 64 + }{ 65 + { 66 + name: "valid AT-URI", 67 + uri: "at://did:plc:abc123/io.atcr.manifest/xyz789", 68 + expectNil: false, 69 + did: "did:plc:abc123", 70 + collection: "io.atcr.manifest", 71 + rkey: "xyz789", 72 + }, 73 + { 74 + name: "valid AT-URI with did:web", 75 + uri: "at://did:web:example.com/io.atcr.manifest/manifest123", 76 + expectNil: false, 77 + did: "did:web:example.com", 78 + collection: "io.atcr.manifest", 79 + rkey: "manifest123", 80 + }, 81 + { 82 + name: "invalid - no at:// prefix", 83 + uri: "did:plc:abc123/io.atcr.manifest/xyz789", 84 + expectNil: true, 85 + }, 86 + { 87 + name: "invalid - missing rkey", 88 + uri: "at://did:plc:abc123/io.atcr.manifest", 89 + expectNil: true, 90 + }, 91 + { 92 + name: "invalid - empty string", 93 + uri: "", 94 + expectNil: true, 95 + }, 96 + { 97 + name: "invalid - http URL", 98 + uri: "https://example.com/xrpc/com.atproto.repo.getRecord", 99 + expectNil: true, 100 + }, 101 + } 102 + 103 + for _, tt := range tests { 104 + t.Run(tt.name, func(t *testing.T) { 105 + result := parseATURI(tt.uri) 106 + if tt.expectNil { 107 + if result != nil { 108 + t.Errorf("parseATURI(%q) = %+v, want nil", tt.uri, result) 109 + } 110 + return 111 + } 112 + 113 + if result == nil { 114 + t.Errorf("parseATURI(%q) = nil, want non-nil", tt.uri) 115 + return 116 + } 117 + 118 + if result.DID != tt.did { 119 + t.Errorf("parseATURI(%q).DID = %q, want %q", tt.uri, result.DID, tt.did) 120 + } 121 + if result.Collection != tt.collection { 122 + t.Errorf("parseATURI(%q).Collection = %q, want %q", tt.uri, result.Collection, tt.collection) 123 + } 124 + if result.Rkey != tt.rkey { 125 + t.Errorf("parseATURI(%q).Rkey = %q, want %q", tt.uri, result.Rkey, tt.rkey) 126 + } 127 + }) 128 + } 129 + } 130 + 131 + func TestTidToTime(t *testing.T) { 132 + // Test with known TID format 133 + // TIDs are base32-encoded timestamps with counter 134 + tests := []struct { 135 + name string 136 + tid string 137 + expectZero bool 138 + minAge time.Duration // Minimum expected age (roughly) 139 + }{ 140 + { 141 + name: "valid TID from 2024", 142 + tid: "3l7nqy25tks2c", // A real TID from around 2024 143 + expectZero: false, 144 + }, 145 + { 146 + name: "invalid TID - too short", 147 + tid: "abc", 148 + expectZero: true, 149 + }, 150 + { 151 + name: "invalid TID - empty", 152 + tid: "", 153 + expectZero: true, 154 + }, 155 + { 156 + name: "invalid TID - not base32", 157 + tid: "!!!!!!!!!!!!!!", 158 + expectZero: true, 159 + }, 160 + } 161 + 162 + for _, tt := range tests { 163 + t.Run(tt.name, func(t *testing.T) { 164 + result := tidToTime(tt.tid) 165 + if tt.expectZero { 166 + if !result.IsZero() { 167 + t.Errorf("tidToTime(%q) = %v, want zero time", tt.tid, result) 168 + } 169 + return 170 + } 171 + 172 + if result.IsZero() { 173 + t.Errorf("tidToTime(%q) = zero time, want non-zero", tt.tid) 174 + } 175 + }) 176 + } 177 + } 178 + 179 + func TestLoadConfigFromEnv(t *testing.T) { 180 + // Test default values 181 + t.Run("default values", func(t *testing.T) { 182 + // Clear any existing env vars 183 + t.Setenv("GC_ENABLED", "") 184 + t.Setenv("GC_DRY_RUN", "") 185 + 186 + cfg := LoadConfigFromEnv() 187 + 188 + // Default: enabled 189 + if !cfg.Enabled { 190 + t.Error("expected Enabled to be true by default") 191 + } 192 + 193 + // Default: dry run enabled 194 + if !cfg.DryRun { 195 + t.Error("expected DryRun to be true by default") 196 + } 197 + }) 198 + 199 + t.Run("disabled via env", func(t *testing.T) { 200 + t.Setenv("GC_ENABLED", "false") 201 + t.Setenv("GC_DRY_RUN", "false") 202 + 203 + cfg := LoadConfigFromEnv() 204 + 205 + if cfg.Enabled { 206 + t.Error("expected Enabled to be false when GC_ENABLED=false") 207 + } 208 + 209 + if cfg.DryRun { 210 + t.Error("expected DryRun to be false when GC_DRY_RUN=false") 211 + } 212 + }) 213 + 214 + t.Run("enabled via env", func(t *testing.T) { 215 + t.Setenv("GC_ENABLED", "true") 216 + t.Setenv("GC_DRY_RUN", "true") 217 + 218 + cfg := LoadConfigFromEnv() 219 + 220 + if !cfg.Enabled { 221 + t.Error("expected Enabled to be true when GC_ENABLED=true") 222 + } 223 + 224 + if !cfg.DryRun { 225 + t.Error("expected DryRun to be true when GC_DRY_RUN=true") 226 + } 227 + }) 228 + }
+19
pkg/hold/pds/layer.go
··· 49 49 return nil, fmt.Errorf("GetLayerRecord not yet implemented - use via XRPC listRecords instead") 50 50 } 51 51 52 + // DeleteLayerRecord deletes a layer record by rkey 53 + // This deletes from both the repo (MST) and the records index 54 + func (p *HoldPDS) DeleteLayerRecord(ctx context.Context, rkey string) error { 55 + // Delete from repo (MST) 56 + if err := p.repomgr.DeleteRecord(ctx, p.uid, atproto.LayerCollection, rkey); err != nil { 57 + return fmt.Errorf("failed to delete from repo: %w", err) 58 + } 59 + 60 + // Delete from index 61 + if p.recordsIndex != nil { 62 + if err := p.recordsIndex.DeleteRecord(atproto.LayerCollection, rkey); err != nil { 63 + // Log but don't fail - index will resync on backfill 64 + fmt.Printf("Warning: failed to delete from records index: %v\n", err) 65 + } 66 + } 67 + 68 + return nil 69 + } 70 + 52 71 // ListLayerRecords lists layer records with pagination 53 72 // Returns records, next cursor (empty if no more), and error 54 73 // Note: This is a simplified implementation. For production, consider adding filters
+30
pkg/hold/pds/server.go
··· 152 152 return p.uid 153 153 } 154 154 155 + // GetRecordBytes retrieves raw CBOR bytes for a record 156 + // recordPath format: "collection/rkey" 157 + func (p *HoldPDS) GetRecordBytes(ctx context.Context, recordPath string) (cid.Cid, *[]byte, error) { 158 + session, err := p.carstore.ReadOnlySession(p.uid) 159 + if err != nil { 160 + return cid.Undef, nil, fmt.Errorf("failed to create session: %w", err) 161 + } 162 + 163 + head, err := p.carstore.GetUserRepoHead(ctx, p.uid) 164 + if err != nil { 165 + return cid.Undef, nil, fmt.Errorf("failed to get repo head: %w", err) 166 + } 167 + 168 + if !head.Defined() { 169 + return cid.Undef, nil, fmt.Errorf("repo is empty") 170 + } 171 + 172 + repoHandle, err := repo.OpenRepo(ctx, session, head) 173 + if err != nil { 174 + return cid.Undef, nil, fmt.Errorf("failed to open repo: %w", err) 175 + } 176 + 177 + recordCID, recBytes, err := repoHandle.GetRecordBytes(ctx, recordPath) 178 + if err != nil { 179 + return cid.Undef, nil, fmt.Errorf("failed to get record: %w", err) 180 + } 181 + 182 + return recordCID, recBytes, nil 183 + } 184 + 155 185 // Bootstrap initializes the hold with the captain record, owner as first crew member, and profile 156 186 func (p *HoldPDS) Bootstrap(ctx context.Context, storageDriver driver.StorageDriver, ownerDID string, public bool, allowAllCrew bool, avatarURL, region string) error { 157 187 if ownerDID == "" {