A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

fix backoff not clearing correctly. add better logging to find out why someone is denied access (backoff, pds issue, missing record etc)

evan.jarrett.net 647c33e1 1f0705a2

verified
+12 -3
cmd/appview/serve.go
··· 158 158 middleware.SetGlobalAuthorizer(holdAuthorizer) 159 159 slog.Info("Hold authorizer initialized with database caching") 160 160 161 + // Clear all denial caches on startup for a clean slate (non-blocking) 162 + if remote, ok := holdAuthorizer.(*auth.RemoteHoldAuthorizer); ok { 163 + go func() { 164 + if err := remote.ClearAllDenials(); err != nil { 165 + slog.Warn("Failed to clear denial caches on startup", "error", err) 166 + } 167 + }() 168 + } 169 + 161 170 // Initialize Jetstream workers (background services before HTTP routes) 162 171 initializeJetstream(uiDatabase, &cfg.Jetstream, defaultHoldDID, testMode, refresher) 163 172 ··· 303 312 // Run in background to avoid blocking OAuth callback if hold is offline 304 313 // Use background context - don't inherit request context which gets canceled on response 305 314 slog.Debug("Attempting crew registration", "component", "appview/callback", "did", did, "hold_did", holdDID) 306 - go func(client *atproto.Client, refresher *oauth.Refresher, holdDID string) { 315 + go func(client *atproto.Client, refresher *oauth.Refresher, holdDID string, authorizer auth.HoldAuthorizer) { 307 316 ctx := context.Background() 308 - storage.EnsureCrewMembership(ctx, client, refresher, holdDID) 309 - }(client, refresher, holdDID) 317 + storage.EnsureCrewMembership(ctx, client, refresher, holdDID, authorizer) 318 + }(client, refresher, holdDID, holdAuthorizer) 310 319 311 320 } 312 321
+9 -1
pkg/appview/middleware/registry.go
··· 196 196 globalAuthorizer = authorizer 197 197 } 198 198 199 + // GetGlobalAuthorizer returns the global authorizer instance 200 + // Used by components that need to clear denial cache (e.g., EnsureCrewMembership) 201 + func GetGlobalAuthorizer() auth.HoldAuthorizer { 202 + return globalAuthorizer 203 + } 204 + 199 205 func init() { 200 206 // Register the name resolution middleware 201 207 registrymw.Register("atproto-resolver", initATProtoResolver) ··· 298 304 if holdDID != "" && nr.refresher != nil { 299 305 slog.Debug("Auto-reconciling crew membership", "component", "registry/middleware", "did", did, "hold_did", holdDID) 300 306 client := atproto.NewClient(pdsEndpoint, did, "") 301 - storage.EnsureCrewMembership(ctx, client, nr.refresher, holdDID) 307 + storage.EnsureCrewMembership(ctx, client, nr.refresher, holdDID, nr.authorizer) 302 308 } 303 309 304 310 // Get service token for hold authentication (only if authenticated) ··· 345 351 "pullerDID", pullerDID, 346 352 "holdDID", holdDID, 347 353 "pullerPDSEndpoint", pullerPDSEndpoint, 354 + "denial_reason", "service_token_app_password_failed", 348 355 "error", err) 349 356 return "", err 350 357 } ··· 363 370 "pullerDID", pullerDID, 364 371 "holdDID", holdDID, 365 372 "pullerPDSEndpoint", pullerPDSEndpoint, 373 + "denial_reason", "service_token_oauth_failed", 366 374 "error", err) 367 375 return "", err 368 376 }
+12 -1
pkg/appview/storage/crew.go
··· 15 15 16 16 // EnsureCrewMembership attempts to register the user as a crew member on their default hold. 17 17 // The hold's requestCrew endpoint handles all authorization logic (checking allowAllCrew, existing membership, etc). 18 + // On success, clears any cached denial to ensure immediate access. 18 19 // This is best-effort and does not fail on errors. 19 - func EnsureCrewMembership(ctx context.Context, client *atproto.Client, refresher *oauth.Refresher, defaultHoldDID string) { 20 + func EnsureCrewMembership(ctx context.Context, client *atproto.Client, refresher *oauth.Refresher, defaultHoldDID string, authorizer auth.HoldAuthorizer) { 20 21 if defaultHoldDID == "" { 21 22 return 22 23 } ··· 55 56 } 56 57 57 58 slog.Info("successfully registered as crew member", "holdDID", holdDID, "userDID", client.DID()) 59 + 60 + // Clear any cached denial to ensure immediate access 61 + if authorizer != nil { 62 + if err := authorizer.ClearCrewDenial(ctx, holdDID, client.DID()); err != nil { 63 + slog.Warn("failed to clear denial cache after crew registration", 64 + "holdDID", holdDID, 65 + "userDID", client.DID(), 66 + "error", err) 67 + } 68 + } 58 69 } 59 70 60 71 // requestCrewMembership calls the hold's requestCrew endpoint
+1 -1
pkg/appview/storage/crew_test.go
··· 7 7 8 8 func TestEnsureCrewMembership_EmptyHoldDID(t *testing.T) { 9 9 // Test that empty hold DID returns early without error (best-effort function) 10 - EnsureCrewMembership(context.Background(), nil, nil, "") 10 + EnsureCrewMembership(context.Background(), nil, nil, "", nil) 11 11 // If we get here without panic, test passes 12 12 } 13 13
+28 -5
pkg/appview/storage/proxy_blob_store.go
··· 96 96 // checkWriteAccess validates that the user has write access to blobs in this hold 97 97 func (p *ProxyBlobStore) checkWriteAccess(ctx context.Context) error { 98 98 if p.ctx.Authorizer == nil { 99 - return nil // No authorization check if authorizer not configured 99 + slog.Debug("Write access check skipped - no authorizer configured", 100 + "component", "proxy_blob_store") 101 + return nil 100 102 } 101 103 102 - slog.Debug("Checking write access", "component", "proxy_blob_store", "user_did", p.ctx.DID, "hold_did", p.ctx.HoldDID) 104 + slog.Debug("Checking write access", 105 + "component", "proxy_blob_store", 106 + "user_did", p.ctx.DID, 107 + "hold_did", p.ctx.HoldDID) 108 + 103 109 allowed, err := p.ctx.Authorizer.CheckWriteAccess(ctx, p.ctx.HoldDID, p.ctx.DID) 104 110 if err != nil { 105 - slog.Error("Authorization check error", "component", "proxy_blob_store", "error", err) 111 + // Authorization check itself failed (network, PDS error, etc.) 112 + slog.Error("Write access authorization check failed", 113 + "component", "proxy_blob_store", 114 + "user_did", p.ctx.DID, 115 + "hold_did", p.ctx.HoldDID, 116 + "denial_reason", "authorization_check_error", 117 + "error", err) 106 118 return fmt.Errorf("authorization check failed: %w", err) 107 119 } 120 + 108 121 if !allowed { 109 - slog.Warn("Write access denied", "component", "proxy_blob_store", "user_did", p.ctx.DID, "hold_did", p.ctx.HoldDID) 122 + // Access explicitly denied (logged in detail by authorizer) 123 + slog.Warn("Write access denied", 124 + "component", "proxy_blob_store", 125 + "user_did", p.ctx.DID, 126 + "hold_did", p.ctx.HoldDID, 127 + "denial_reason", "access_denied_by_authorizer", 128 + "hint", "check DEBUG logs for specific denial reason (denial_reason field)") 110 129 return errcode.ErrorCodeDenied.WithMessage(fmt.Sprintf("write access denied to hold %s", p.ctx.HoldDID)) 111 130 } 112 - slog.Debug("Write access allowed", "component", "proxy_blob_store", "user_did", p.ctx.DID, "hold_did", p.ctx.HoldDID) 131 + 132 + slog.Debug("Write access allowed", 133 + "component", "proxy_blob_store", 134 + "user_did", p.ctx.DID, 135 + "hold_did", p.ctx.HoldDID) 113 136 return nil 114 137 } 115 138
+14 -2
pkg/auth/hold_authorizer.go
··· 25 25 26 26 // IsCrewMember checks if userDID is a crew member of holdDID 27 27 IsCrewMember(ctx context.Context, holdDID, userDID string) (bool, error) 28 + 29 + // ClearCrewDenial removes any cached denial for a user/hold pair 30 + // Called when user successfully becomes a crew member to ensure immediate access 31 + // Returns nil if no denial cache exists or invalidation succeeds 32 + ClearCrewDenial(ctx context.Context, holdDID, userDID string) error 28 33 } 29 34 30 35 // CheckReadAccessWithCaptain implements the standard read authorization logic ··· 60 65 61 66 if userDID == "" { 62 67 // Anonymous writes not allowed 63 - slog.Debug("Write access denied: anonymous user") 68 + slog.Debug("Write access denied", 69 + "userDID", userDID, 70 + "denial_reason", "anonymous_user", 71 + "message", "anonymous writes not allowed") 64 72 return false 65 73 } 66 74 ··· 75 83 if isCrew { 76 84 slog.Debug("Write access allowed: user is crew member") 77 85 } else { 78 - slog.Debug("Write access denied: user is not owner or crew") 86 + slog.Debug("Write access denied", 87 + "userDID", userDID, 88 + "owner", captain.Owner, 89 + "denial_reason", "not_owner_or_crew", 90 + "message", "user is not owner or crew member") 79 91 } 80 92 return isCrew 81 93 }
+6
pkg/auth/hold_local.go
··· 99 99 100 100 return CheckWriteAccessWithCaptain(captain, userDID, isCrew), nil 101 101 } 102 + 103 + // ClearCrewDenial is a no-op for LocalHoldAuthorizer 104 + // Local authorizer queries PDS directly without caching 105 + func (a *LocalHoldAuthorizer) ClearCrewDenial(ctx context.Context, holdDID, userDID string) error { 106 + return nil 107 + }
+11
pkg/auth/hold_local_test.go
··· 386 386 t.Error("Expected read access for crew member on private hold") 387 387 } 388 388 } 389 + 390 + func TestLocalHoldAuthorizer_ClearCrewDenial(t *testing.T) { 391 + // LocalHoldAuthorizer.ClearCrewDenial should be a no-op (returns nil) 392 + // since local authorizer doesn't have caching 393 + authorizer := NewLocalHoldAuthorizer(sharedEmptyPDS) 394 + 395 + err := authorizer.ClearCrewDenial(context.Background(), "did:web:hold.example.com", "did:plc:user123") 396 + if err != nil { 397 + t.Errorf("LocalHoldAuthorizer.ClearCrewDenial should return nil, got %v", err) 398 + } 399 + }
+83 -4
pkg/auth/hold_remote.go
··· 117 117 // Cache miss or expired - query XRPC endpoint 118 118 record, err := a.fetchCaptainRecordFromXRPC(ctx, holdDID) 119 119 if err != nil { 120 - return nil, err 120 + slog.Error("Captain record fetch failed", 121 + "holdDID", holdDID, 122 + "denial_reason", "captain_record_fetch_failed", 123 + "error", err) 124 + return nil, fmt.Errorf("failed to get captain record for %s: %w", holdDID, err) 121 125 } 122 126 123 127 // Update cache ··· 293 297 // Check denial cache with backoff 294 298 if blocked, err := a.isBlockedByDenialBackoff(holdDID, userDID); err == nil && blocked { 295 299 // Still in backoff period - don't query again 296 - slog.Debug("Blocked by denial backoff cache", "holdDID", holdDID, "userDID", userDID) 300 + // Detailed logging already emitted by isBlockedByDenialBackoff 297 301 return false, nil 298 302 } 299 303 ··· 470 474 entry := val.(denialEntry) 471 475 // Check if still within first denial backoff period 472 476 if time.Since(entry.timestamp) < a.firstDenialBackoff { 477 + slog.Debug("Write access blocked by in-memory denial cache", 478 + "holdDID", holdDID, 479 + "userDID", userDID, 480 + "denial_reason", "first_denial_backoff", 481 + "backoff_type", "in_memory", 482 + "backoff_duration", a.firstDenialBackoff, 483 + "denied_at", entry.timestamp, 484 + "retry_after", entry.timestamp.Add(a.firstDenialBackoff)) 473 485 return true, nil // Still blocked by in-memory first denial 474 486 } 475 487 } ··· 494 506 495 507 // Check if still in backoff period 496 508 if time.Now().Before(nextRetryAt) { 509 + slog.Debug("Write access blocked by database denial cache", 510 + "holdDID", holdDID, 511 + "userDID", userDID, 512 + "denial_reason", "exponential_backoff", 513 + "backoff_type", "database", 514 + "next_retry_at", nextRetryAt, 515 + "retry_in", time.Until(nextRetryAt).Round(time.Second)) 497 516 return true, nil // Still blocked 498 517 } 499 518 ··· 522 541 // If not in memory and not in DB, this is the first denial 523 542 if !inMemory && !inDB { 524 543 // First denial: store only in memory with configurable backoff 525 - a.recentDenials.Store(key, denialEntry{timestamp: time.Now()}) 544 + now := time.Now() 545 + a.recentDenials.Store(key, denialEntry{timestamp: now}) 546 + slog.Info("Cached first crew denial (in-memory)", 547 + "holdDID", holdDID, 548 + "userDID", userDID, 549 + "denial_count", 1, 550 + "backoff_type", "in_memory", 551 + "backoff_duration", a.firstDenialBackoff, 552 + "retry_after", now.Add(a.firstDenialBackoff)) 526 553 return nil 527 554 } 528 555 ··· 543 570 ` 544 571 545 572 _, err = a.db.Exec(upsertQuery, holdDID, userDID, denialCount, nextRetry, now) 573 + if err != nil { 574 + return err 575 + } 546 576 547 577 // Remove from in-memory cache since we're now tracking in DB 548 578 a.recentDenials.Delete(key) 549 579 550 - return err 580 + slog.Info("Cached crew denial with exponential backoff", 581 + "holdDID", holdDID, 582 + "userDID", userDID, 583 + "denial_count", denialCount, 584 + "backoff_type", "database", 585 + "backoff_duration", backoff, 586 + "next_retry_at", nextRetry) 587 + 588 + return nil 551 589 } 552 590 553 591 // getBackoffDuration returns the backoff duration based on denial count ··· 563 601 564 602 return backoffs[idx] 565 603 } 604 + 605 + // ClearCrewDenial removes crew denial from both in-memory and database caches 606 + // This allows immediate access after a user becomes a crew member 607 + func (a *RemoteHoldAuthorizer) ClearCrewDenial(ctx context.Context, holdDID, userDID string) error { 608 + // Clear in-memory cache 609 + key := fmt.Sprintf("%s:%s", holdDID, userDID) 610 + a.recentDenials.Delete(key) 611 + 612 + // Clear database cache 613 + if a.db != nil { 614 + query := `DELETE FROM hold_crew_denials WHERE hold_did = ? AND user_did = ?` 615 + _, err := a.db.ExecContext(ctx, query, holdDID, userDID) 616 + if err != nil { 617 + return fmt.Errorf("failed to clear denial cache: %w", err) 618 + } 619 + } 620 + 621 + slog.Debug("Cleared crew denial cache", "holdDID", holdDID, "userDID", userDID) 622 + return nil 623 + } 624 + 625 + // ClearAllDenials removes all crew denials from both in-memory and database caches 626 + // Called on startup to ensure a clean slate 627 + func (a *RemoteHoldAuthorizer) ClearAllDenials() error { 628 + // Clear all in-memory denials 629 + a.recentDenials.Range(func(key, value any) bool { 630 + a.recentDenials.Delete(key) 631 + return true 632 + }) 633 + 634 + // Clear all database denials 635 + if a.db != nil { 636 + _, err := a.db.Exec("DELETE FROM hold_crew_denials") 637 + if err != nil { 638 + return fmt.Errorf("failed to clear all denial caches: %w", err) 639 + } 640 + } 641 + 642 + slog.Info("Cleared all crew denial caches on startup") 643 + return nil 644 + }
+154
pkg/auth/hold_remote_test.go
··· 304 304 305 305 _ = server 306 306 } 307 + 308 + func TestClearCrewDenial_InMemory(t *testing.T) { 309 + testDB := setupTestDB(t) 310 + remote := NewRemoteHoldAuthorizerWithBackoffs( 311 + testDB, false, 312 + 10*time.Millisecond, // firstDenialBackoff 313 + 50*time.Millisecond, // cleanupInterval 314 + 50*time.Millisecond, // cleanupGracePeriod 315 + []time.Duration{10 * time.Millisecond, 20 * time.Millisecond}, 316 + ).(*RemoteHoldAuthorizer) 317 + defer close(remote.stopCleanup) 318 + 319 + holdDID := "did:web:hold01.atcr.io" 320 + userDID := "did:plc:user123" 321 + 322 + // Cache first denial (in-memory only) 323 + _ = remote.cacheDenial(holdDID, userDID) 324 + 325 + // Verify blocked 326 + blocked, _ := remote.isBlockedByDenialBackoff(holdDID, userDID) 327 + if !blocked { 328 + t.Error("Expected to be blocked by denial") 329 + } 330 + 331 + // Clear denial 332 + err := remote.ClearCrewDenial(context.Background(), holdDID, userDID) 333 + if err != nil { 334 + t.Fatalf("ClearCrewDenial failed: %v", err) 335 + } 336 + 337 + // Verify no longer blocked 338 + blocked, _ = remote.isBlockedByDenialBackoff(holdDID, userDID) 339 + if blocked { 340 + t.Error("Expected denial to be cleared") 341 + } 342 + } 343 + 344 + func TestClearCrewDenial_Database(t *testing.T) { 345 + testDB := setupTestDB(t) 346 + remote := NewRemoteHoldAuthorizerWithBackoffs( 347 + testDB, false, 348 + 10*time.Millisecond, // firstDenialBackoff 349 + 50*time.Millisecond, // cleanupInterval 350 + 50*time.Millisecond, // cleanupGracePeriod 351 + []time.Duration{10 * time.Millisecond, 20 * time.Millisecond}, 352 + ).(*RemoteHoldAuthorizer) 353 + defer close(remote.stopCleanup) 354 + 355 + holdDID := "did:web:hold01.atcr.io" 356 + userDID := "did:plc:user123" 357 + 358 + // Cache first denial (in-memory) 359 + _ = remote.cacheDenial(holdDID, userDID) 360 + 361 + // Wait for backoff, then trigger second denial (goes to DB) 362 + time.Sleep(15 * time.Millisecond) 363 + _ = remote.cacheDenial(holdDID, userDID) 364 + 365 + // Verify blocked by DB denial 366 + blocked, _ := remote.isBlockedByDenialBackoff(holdDID, userDID) 367 + if !blocked { 368 + t.Error("Expected to be blocked by DB denial") 369 + } 370 + 371 + // Clear denial 372 + err := remote.ClearCrewDenial(context.Background(), holdDID, userDID) 373 + if err != nil { 374 + t.Fatalf("ClearCrewDenial failed: %v", err) 375 + } 376 + 377 + // Verify no longer blocked 378 + blocked, _ = remote.isBlockedByDenialBackoff(holdDID, userDID) 379 + if blocked { 380 + t.Error("Expected denial to be cleared from DB") 381 + } 382 + } 383 + 384 + func TestDeniedUserBecomesCrewImmediateAccess(t *testing.T) { 385 + testDB := setupTestDB(t) 386 + remote := NewRemoteHoldAuthorizerWithBackoffs( 387 + testDB, false, 388 + 1*time.Hour, // Long backoff to ensure test would fail without fix 389 + 50*time.Millisecond, 390 + 50*time.Millisecond, 391 + []time.Duration{1 * time.Hour}, // Long DB backoff 392 + ).(*RemoteHoldAuthorizer) 393 + defer close(remote.stopCleanup) 394 + 395 + holdDID := "did:web:hold01.atcr.io" 396 + userDID := "did:plc:user123" 397 + 398 + // Simulate denial being cached (user not yet crew) 399 + _ = remote.cacheDenial(holdDID, userDID) 400 + 401 + // User is now blocked 402 + blocked, _ := remote.isBlockedByDenialBackoff(holdDID, userDID) 403 + if !blocked { 404 + t.Fatal("Expected user to be blocked initially") 405 + } 406 + 407 + // Simulate successful crew registration + cache clear 408 + // (This is what EnsureCrewMembership does after requestCrew succeeds) 409 + err := remote.ClearCrewDenial(context.Background(), holdDID, userDID) 410 + if err != nil { 411 + t.Fatalf("ClearCrewDenial failed: %v", err) 412 + } 413 + 414 + // User should no longer be blocked 415 + blocked, _ = remote.isBlockedByDenialBackoff(holdDID, userDID) 416 + if blocked { 417 + t.Error("User should have immediate access after crew registration") 418 + } 419 + } 420 + 421 + func TestClearAllDenials_OnStartup(t *testing.T) { 422 + testDB := setupTestDB(t) 423 + remote := NewRemoteHoldAuthorizerWithBackoffs( 424 + testDB, false, 425 + 1*time.Hour, // Long backoff 426 + 50*time.Millisecond, 427 + 50*time.Millisecond, 428 + []time.Duration{1 * time.Hour}, 429 + ).(*RemoteHoldAuthorizer) 430 + defer close(remote.stopCleanup) 431 + 432 + // Add multiple denials for different users/holds 433 + _ = remote.cacheDenial("did:web:hold01.atcr.io", "did:plc:user1") 434 + _ = remote.cacheDenial("did:web:hold01.atcr.io", "did:plc:user2") 435 + _ = remote.cacheDenial("did:web:hold02.atcr.io", "did:plc:user1") 436 + 437 + // Verify all are blocked 438 + blocked1, _ := remote.isBlockedByDenialBackoff("did:web:hold01.atcr.io", "did:plc:user1") 439 + blocked2, _ := remote.isBlockedByDenialBackoff("did:web:hold01.atcr.io", "did:plc:user2") 440 + blocked3, _ := remote.isBlockedByDenialBackoff("did:web:hold02.atcr.io", "did:plc:user1") 441 + 442 + if !blocked1 || !blocked2 || !blocked3 { 443 + t.Fatal("Expected all users to be blocked initially") 444 + } 445 + 446 + // Clear all denials (simulating startup) 447 + err := remote.ClearAllDenials() 448 + if err != nil { 449 + t.Fatalf("ClearAllDenials failed: %v", err) 450 + } 451 + 452 + // Verify none are blocked 453 + blocked1, _ = remote.isBlockedByDenialBackoff("did:web:hold01.atcr.io", "did:plc:user1") 454 + blocked2, _ = remote.isBlockedByDenialBackoff("did:web:hold01.atcr.io", "did:plc:user2") 455 + blocked3, _ = remote.isBlockedByDenialBackoff("did:web:hold02.atcr.io", "did:plc:user1") 456 + 457 + if blocked1 || blocked2 || blocked3 { 458 + t.Error("Expected all denials to be cleared after ClearAllDenials") 459 + } 460 + }