A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

fix sql migration bug. add better error logs for auth failures. fix showing incorrect pull commands with helm charts

evan.jarrett.net e6bd4c12 7dcef54d

verified
Changed files
+121 -22
pkg
appview
auth
+2 -1
pkg/appview/db/models.go
··· 154 Tag 155 Platforms []PlatformInfo 156 IsMultiArch bool 157 - HasAttestations bool // true if manifest list contains attestation references 158 } 159 160 // ManifestWithMetadata extends Manifest with tags and platform information
··· 154 Tag 155 Platforms []PlatformInfo 156 IsMultiArch bool 157 + HasAttestations bool // true if manifest list contains attestation references 158 + ArtifactType string // container-image, helm-chart, unknown 159 } 160 161 // ManifestWithMetadata extends Manifest with tags and platform information
+6 -4
pkg/appview/db/queries.go
··· 653 t.digest, 654 t.created_at, 655 m.media_type, 656 COALESCE(mr.platform_os, '') as platform_os, 657 COALESCE(mr.platform_architecture, '') as platform_architecture, 658 COALESCE(mr.platform_variant, '') as platform_variant, ··· 676 677 for rows.Next() { 678 var t Tag 679 - var mediaType, platformOS, platformArch, platformVariant, platformOSVersion string 680 var isAttestation bool 681 682 if err := rows.Scan(&t.ID, &t.DID, &t.Repository, &t.Tag, &t.Digest, &t.CreatedAt, 683 - &mediaType, &platformOS, &platformArch, &platformVariant, &platformOSVersion, &isAttestation); err != nil { 684 return nil, err 685 } 686 ··· 688 tagKey := t.Tag 689 if _, exists := tagMap[tagKey]; !exists { 690 tagMap[tagKey] = &TagWithPlatforms{ 691 - Tag: t, 692 - Platforms: []PlatformInfo{}, 693 } 694 tagOrder = append(tagOrder, tagKey) 695 }
··· 653 t.digest, 654 t.created_at, 655 m.media_type, 656 + m.artifact_type, 657 COALESCE(mr.platform_os, '') as platform_os, 658 COALESCE(mr.platform_architecture, '') as platform_architecture, 659 COALESCE(mr.platform_variant, '') as platform_variant, ··· 677 678 for rows.Next() { 679 var t Tag 680 + var mediaType, artifactType, platformOS, platformArch, platformVariant, platformOSVersion string 681 var isAttestation bool 682 683 if err := rows.Scan(&t.ID, &t.DID, &t.Repository, &t.Tag, &t.Digest, &t.CreatedAt, 684 + &mediaType, &artifactType, &platformOS, &platformArch, &platformVariant, &platformOSVersion, &isAttestation); err != nil { 685 return nil, err 686 } 687 ··· 689 tagKey := t.Tag 690 if _, exists := tagMap[tagKey]; !exists { 691 tagMap[tagKey] = &TagWithPlatforms{ 692 + Tag: t, 693 + Platforms: []PlatformInfo{}, 694 + ArtifactType: artifactType, 695 } 696 tagOrder = append(tagOrder, tagKey) 697 }
+62 -6
pkg/appview/db/schema.go
··· 37 return nil, err 38 } 39 40 - // Create schema from embedded SQL file 41 - if _, err := db.Exec(schemaSQL); err != nil { 42 - return nil, err 43 } 44 45 // Run migrations unless skipped 46 if !skipMigrations { 47 - if err := runMigrations(db); err != nil { 48 return nil, err 49 } 50 } ··· 52 return db, nil 53 } 54 55 // Migration represents a database migration 56 type Migration struct { 57 Version int ··· 61 } 62 63 // runMigrations applies any pending database migrations 64 - func runMigrations(db *sql.DB) error { 65 // Load migrations from files 66 migrations, err := loadMigrations() 67 if err != nil { ··· 86 continue 87 } 88 89 - // Apply migration in a transaction 90 slog.Info("Applying migration", "version", m.Version, "name", m.Name, "description", m.Description) 91 92 tx, err := db.Begin()
··· 37 return nil, err 38 } 39 40 + // Check if this is an existing database with migrations applied 41 + isExisting, err := hasAppliedMigrations(db) 42 + if err != nil { 43 + return nil, fmt.Errorf("failed to check database state: %w", err) 44 + } 45 + 46 + if isExisting { 47 + // Existing database: skip schema.sql, only run pending migrations 48 + slog.Debug("Existing database detected, skipping schema.sql") 49 + } else { 50 + // Fresh database: apply schema.sql 51 + slog.Info("Fresh database detected, applying schema") 52 + if err := applySchema(db); err != nil { 53 + return nil, err 54 + } 55 } 56 57 // Run migrations unless skipped 58 + // For fresh databases, migrations are recorded but not executed (schema.sql is already complete) 59 if !skipMigrations { 60 + if err := runMigrations(db, !isExisting); err != nil { 61 return nil, err 62 } 63 } ··· 65 return db, nil 66 } 67 68 + // hasAppliedMigrations checks if this is an existing database with migrations applied 69 + func hasAppliedMigrations(db *sql.DB) (bool, error) { 70 + // Check if schema_migrations table exists 71 + var count int 72 + err := db.QueryRow(` 73 + SELECT COUNT(*) FROM sqlite_master 74 + WHERE type='table' AND name='schema_migrations' 75 + `).Scan(&count) 76 + if err != nil { 77 + return false, err 78 + } 79 + if count == 0 { 80 + return false, nil // No migrations table = fresh DB 81 + } 82 + 83 + // Table exists, check if it has entries 84 + err = db.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&count) 85 + if err != nil { 86 + return false, err 87 + } 88 + return count > 0, nil 89 + } 90 + 91 + // applySchema executes schema.sql for fresh databases 92 + func applySchema(db *sql.DB) error { 93 + for _, stmt := range splitSQLStatements(schemaSQL) { 94 + if _, err := db.Exec(stmt); err != nil { 95 + return fmt.Errorf("failed to apply schema: %w", err) 96 + } 97 + } 98 + return nil 99 + } 100 + 101 // Migration represents a database migration 102 type Migration struct { 103 Version int ··· 107 } 108 109 // runMigrations applies any pending database migrations 110 + // If freshDB is true, migrations are recorded but not executed (schema.sql already includes their changes) 111 + func runMigrations(db *sql.DB, freshDB bool) error { 112 // Load migrations from files 113 migrations, err := loadMigrations() 114 if err != nil { ··· 133 continue 134 } 135 136 + if freshDB { 137 + // Fresh database: schema.sql already has everything, just record the migration 138 + slog.Debug("Recording migration as applied (fresh DB)", "version", m.Version, "name", m.Name) 139 + if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", m.Version); err != nil { 140 + return fmt.Errorf("failed to record migration %d: %w", m.Version, err) 141 + } 142 + continue 143 + } 144 + 145 + // Existing database: apply migration in a transaction 146 slog.Info("Applying migration", "version", m.Version, "name", m.Name, "description", m.Description) 147 148 tx, err := db.Begin()
+6 -3
pkg/appview/handlers/repository.go
··· 231 } 232 } 233 234 - // Determine dominant artifact type from manifests 235 artifactType := "container-image" 236 - if len(manifests) > 0 { 237 - // Use the most recent manifest's artifact type 238 artifactType = manifests[0].ArtifactType 239 } 240
··· 231 } 232 } 233 234 + // Determine artifact type for header section from first tag 235 + // This is used for the "Pull this image/chart" header command 236 artifactType := "container-image" 237 + if len(tagsWithPlatforms) > 0 { 238 + artifactType = tagsWithPlatforms[0].ArtifactType 239 + } else if len(manifests) > 0 { 240 + // Fallback to manifests if no tags 241 artifactType = manifests[0].ArtifactType 242 } 243
+6
pkg/appview/storage/manifest_store.go
··· 124 return "", fmt.Errorf("failed to create manifest record: %w", err) 125 } 126 127 // Set the blob reference, hold DID, and hold endpoint 128 manifestRecord.ManifestBlob = blobRef 129 manifestRecord.HoldDID = s.ctx.HoldDID // Primary reference (DID)
··· 124 return "", fmt.Errorf("failed to create manifest record: %w", err) 125 } 126 127 + // OCI spec allows omitting mediaType from the manifest body (inferred from Content-Type header) 128 + // Helm charts typically omit it, so use the media type from the request if body is empty 129 + if manifestRecord.MediaType == "" && mediaType != "" { 130 + manifestRecord.MediaType = mediaType 131 + } 132 + 133 // Set the blob reference, hold DID, and hold endpoint 134 manifestRecord.ManifestBlob = blobRef 135 manifestRecord.HoldDID = s.ctx.HoldDID // Primary reference (DID)
+1 -1
pkg/appview/templates/pages/repository.html
··· 183 {{ end }} 184 </div> 185 </div> 186 - {{ if eq $.ArtifactType "helm-chart" }} 187 {{ template "docker-command" (print "helm pull oci://" $.RegistryURL "/" $.Owner.Handle "/" $.Repository.Name " --version " .Tag.Tag) }} 188 {{ else }} 189 {{ template "docker-command" (print "docker pull " $.RegistryURL "/" $.Owner.Handle "/" $.Repository.Name ":" .Tag.Tag) }}
··· 183 {{ end }} 184 </div> 185 </div> 186 + {{ if eq .ArtifactType "helm-chart" }} 187 {{ template "docker-command" (print "helm pull oci://" $.RegistryURL "/" $.Owner.Handle "/" $.Repository.Name " --version " .Tag.Tag) }} 188 {{ else }} 189 {{ template "docker-command" (print "docker pull " $.RegistryURL "/" $.Owner.Handle "/" $.Repository.Name ":" .Tag.Tag) }}
+23 -5
pkg/auth/session.go
··· 9 "crypto/sha256" 10 "encoding/hex" 11 "encoding/json" 12 "fmt" 13 "io" 14 "log/slog" ··· 17 "time" 18 19 "atcr.io/pkg/atproto" 20 ) 21 22 // CachedSession represents a cached session ··· 99 // Resolve identifier to PDS endpoint 100 _, _, pds, err := atproto.ResolveIdentity(ctx, identifier) 101 if err != nil { 102 - return "", "", "", err 103 } 104 105 // Create session 106 sessionResp, err := v.createSession(ctx, pds, identifier, password) 107 if err != nil { 108 - return "", "", "", fmt.Errorf("authentication failed: %w", err) 109 } 110 111 // Cache the session (ATProto sessions typically last 2 hours) ··· 146 resp, err := v.httpClient.Do(req) 147 if err != nil { 148 slog.Debug("Session creation HTTP request failed", "error", err) 149 - return nil, fmt.Errorf("failed to create session: %w", err) 150 } 151 defer resp.Body.Close() 152 ··· 155 if resp.StatusCode == http.StatusUnauthorized { 156 bodyBytes, _ := io.ReadAll(resp.Body) 157 slog.Debug("Session creation unauthorized", "response", string(bodyBytes)) 158 - return nil, fmt.Errorf("invalid credentials") 159 } 160 161 if resp.StatusCode != http.StatusOK { 162 bodyBytes, _ := io.ReadAll(resp.Body) 163 slog.Debug("Session creation failed", "status", resp.StatusCode, "response", string(bodyBytes)) 164 - return nil, fmt.Errorf("create session failed with status %d: %s", resp.StatusCode, string(bodyBytes)) 165 } 166 167 var sessionResp SessionResponse
··· 9 "crypto/sha256" 10 "encoding/hex" 11 "encoding/json" 12 + "errors" 13 "fmt" 14 "io" 15 "log/slog" ··· 18 "time" 19 20 "atcr.io/pkg/atproto" 21 + ) 22 + 23 + // Sentinel errors for authentication failures 24 + var ( 25 + // ErrIdentityResolution indicates handle/DID resolution failed 26 + ErrIdentityResolution = errors.New("identity resolution failed") 27 + // ErrInvalidCredentials indicates PDS returned 401 (bad password/app-password) 28 + ErrInvalidCredentials = errors.New("invalid credentials") 29 + // ErrPDSUnavailable indicates PDS is unreachable or returned a server error 30 + ErrPDSUnavailable = errors.New("PDS unavailable") 31 ) 32 33 // CachedSession represents a cached session ··· 110 // Resolve identifier to PDS endpoint 111 _, _, pds, err := atproto.ResolveIdentity(ctx, identifier) 112 if err != nil { 113 + return "", "", "", fmt.Errorf("%w: %v", ErrIdentityResolution, err) 114 } 115 116 // Create session 117 sessionResp, err := v.createSession(ctx, pds, identifier, password) 118 if err != nil { 119 + // Pass through typed errors from createSession 120 + return "", "", "", err 121 } 122 123 // Cache the session (ATProto sessions typically last 2 hours) ··· 158 resp, err := v.httpClient.Do(req) 159 if err != nil { 160 slog.Debug("Session creation HTTP request failed", "error", err) 161 + return nil, fmt.Errorf("%w: %v", ErrPDSUnavailable, err) 162 } 163 defer resp.Body.Close() 164 ··· 167 if resp.StatusCode == http.StatusUnauthorized { 168 bodyBytes, _ := io.ReadAll(resp.Body) 169 slog.Debug("Session creation unauthorized", "response", string(bodyBytes)) 170 + return nil, ErrInvalidCredentials 171 + } 172 + 173 + if resp.StatusCode >= 500 { 174 + bodyBytes, _ := io.ReadAll(resp.Body) 175 + slog.Debug("PDS server error", "status", resp.StatusCode, "response", string(bodyBytes)) 176 + return nil, fmt.Errorf("%w: server returned %d", ErrPDSUnavailable, resp.StatusCode) 177 } 178 179 if resp.StatusCode != http.StatusOK { 180 bodyBytes, _ := io.ReadAll(resp.Body) 181 slog.Debug("Session creation failed", "status", resp.StatusCode, "response", string(bodyBytes)) 182 + return nil, fmt.Errorf("%w: unexpected status %d: %s", ErrPDSUnavailable, resp.StatusCode, string(bodyBytes)) 183 } 184 185 var sessionResp SessionResponse
+15 -2
pkg/auth/token/handler.go
··· 3 import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" ··· 194 slog.Debug("Trying app password authentication", "username", username) 195 did, handle, accessToken, err = h.validator.CreateSessionAndGetToken(r.Context(), username, password) 196 if err != nil { 197 - slog.Debug("App password validation failed", "error", err, "username", username) 198 - sendAuthError(w, r, "authentication failed") 199 return 200 } 201
··· 3 import ( 4 "context" 5 "encoding/json" 6 + "errors" 7 "fmt" 8 "log/slog" 9 "net/http" ··· 195 slog.Debug("Trying app password authentication", "username", username) 196 did, handle, accessToken, err = h.validator.CreateSessionAndGetToken(r.Context(), username, password) 197 if err != nil { 198 + // Log at WARN level with specific error type 199 + if errors.Is(err, auth.ErrIdentityResolution) { 200 + slog.Warn("Identity resolution failed", "error", err, "username", username) 201 + sendAuthError(w, r, "authentication failed: could not resolve handle") 202 + } else if errors.Is(err, auth.ErrInvalidCredentials) { 203 + slog.Warn("Invalid credentials", "username", username) 204 + sendAuthError(w, r, "authentication failed: invalid credentials") 205 + } else if errors.Is(err, auth.ErrPDSUnavailable) { 206 + slog.Warn("PDS unavailable", "error", err, "username", username) 207 + sendAuthError(w, r, "authentication failed: PDS unavailable") 208 + } else { 209 + slog.Warn("Authentication failed", "error", err, "username", username) 210 + sendAuthError(w, r, "authentication failed") 211 + } 212 return 213 } 214