+36
-1
CLAUDE.md
+36
-1
CLAUDE.md
···
475
475
476
476
Read access:
477
477
- **Public hold** (`HOLD_PUBLIC=true`): Anonymous + all authenticated users
478
-
- **Private hold** (`HOLD_PUBLIC=false`): Requires authentication + crew membership with blob:read permission
478
+
- **Private hold** (`HOLD_PUBLIC=false`): Requires authentication + crew membership with blob:read OR blob:write permission
479
+
- **Note:** `blob:write` implicitly grants `blob:read` access (can't push without pulling)
479
480
480
481
Write access:
481
482
- Hold owner OR crew members with blob:write permission
482
483
- Verified via `io.atcr.hold.crew` records in hold's embedded PDS
484
+
485
+
**Permission Matrix:**
486
+
487
+
| User Type | Public Read | Private Read | Write | Crew Admin |
488
+
|-----------|-------------|--------------|-------|------------|
489
+
| Anonymous | Yes | No | No | No |
490
+
| Owner (captain) | Yes | Yes | Yes | Yes (implied) |
491
+
| Crew (blob:read only) | Yes | Yes | No | No |
492
+
| Crew (blob:write only) | Yes | Yes* | Yes | No |
493
+
| Crew (blob:read + blob:write) | Yes | Yes | Yes | No |
494
+
| Crew (crew:admin) | Yes | Yes | Yes | Yes |
495
+
| Authenticated non-crew | Yes | No | No | No |
496
+
497
+
*`blob:write` implicitly grants `blob:read` access
498
+
499
+
**Authorization Error Format:**
500
+
501
+
All authorization failures use consistent structured errors (`pkg/hold/pds/auth.go`):
502
+
```
503
+
access denied for [action]: [reason] (required: [permission(s)])
504
+
```
505
+
506
+
Examples:
507
+
- `access denied for blob:read: user is not a crew member (required: blob:read or blob:write)`
508
+
- `access denied for blob:write: crew member lacks permission (required: blob:write)`
509
+
- `access denied for crew:admin: user is not a crew member (required: crew:admin)`
510
+
511
+
**Shared Error Constants** (`pkg/hold/pds/auth.go`):
512
+
- `ErrMissingAuthHeader` - Missing Authorization header
513
+
- `ErrInvalidAuthFormat` - Invalid Authorization header format
514
+
- `ErrInvalidAuthScheme` - Invalid scheme (expected Bearer or DPoP)
515
+
- `ErrInvalidJWTFormat` - Malformed JWT
516
+
- `ErrMissingISSClaim` / `ErrMissingSubClaim` - Missing JWT claims
517
+
- `ErrTokenExpired` - Token has expired
483
518
484
519
**Embedded PDS Endpoints** (`pkg/hold/pds/xrpc.go`):
485
520
+399
docs/VALKEY_MIGRATION.md
+399
docs/VALKEY_MIGRATION.md
···
1
+
# Analysis: AppView SQL Database Usage
2
+
3
+
## Overview
4
+
5
+
The AppView uses SQLite with 19 tables. The key finding: **most data is a cache of ATProto records** that could theoretically be rebuilt from users' PDS instances.
6
+
7
+
## Data Categories
8
+
9
+
### 1. MUST PERSIST (Local State Only)
10
+
11
+
These tables contain data that **cannot be reconstructed** from external sources:
12
+
13
+
| Table | Purpose | Why It Must Persist |
14
+
|-------|---------|---------------------|
15
+
| `oauth_sessions` | OAuth tokens | Refresh tokens are stateful; losing them = users must re-auth |
16
+
| `ui_sessions` | Web browser sessions | Session continuity for logged-in users |
17
+
| `devices` | Approved devices + bcrypt secrets | User authorization decisions; secrets are one-way hashed |
18
+
| `pending_device_auth` | In-flight auth flows | Short-lived (10min) but critical during auth |
19
+
| `oauth_auth_requests` | OAuth flow state | Short-lived but required for auth completion |
20
+
| `repository_stats` | Pull/push counts | **Locally tracked metrics** - not stored in ATProto |
21
+
22
+
### 2. CACHED FROM PDS (Rebuildable)
23
+
24
+
These tables are essentially a **read-through cache** of ATProto data:
25
+
26
+
| Table | Source | ATProto Collection |
27
+
|-------|--------|-------------------|
28
+
| `users` | User's PDS profile | `app.bsky.actor.profile` + DID document |
29
+
| `manifests` | User's PDS | `io.atcr.manifest` records |
30
+
| `tags` | User's PDS | `io.atcr.tag` records |
31
+
| `layers` | Derived from manifests | Parsed from manifest content |
32
+
| `manifest_references` | Derived from manifest lists | Parsed from multi-arch manifests |
33
+
| `repository_annotations` | Manifest config blob | OCI annotations from config |
34
+
| `repo_pages` | User's PDS | `io.atcr.repo.page` records |
35
+
| `stars` | User's PDS | `io.atcr.sailor.star` records (synced via Jetstream) |
36
+
| `hold_captain_records` | Hold's embedded PDS | `io.atcr.hold.captain` records |
37
+
| `hold_crew_approvals` | Hold's embedded PDS | `io.atcr.hold.crew` records |
38
+
| `hold_crew_denials` | Local authorization cache | Could re-check on demand |
39
+
40
+
### 3. OPERATIONAL
41
+
42
+
| Table | Purpose |
43
+
|-------|---------|
44
+
| `schema_migrations` | Migration tracking |
45
+
| `firehose_cursor` | Jetstream position (can restart from 0) |
46
+
47
+
## Key Insights
48
+
49
+
### What's Actually Unique to AppView?
50
+
51
+
1. **Authentication state** - OAuth sessions, devices, UI sessions
52
+
2. **Engagement metrics** - Pull/push counts (locally tracked, not in ATProto)
53
+
54
+
### What Could Be Eliminated?
55
+
56
+
If ATCR fully embraced the ATProto model:
57
+
58
+
1. **`users`** - Query PDS on demand (with caching)
59
+
2. **`manifests`, `tags`, `layers`** - Query PDS on demand (with caching)
60
+
3. **`repository_annotations`** - Fetch manifest config on demand
61
+
4. **`repo_pages`** - Query PDS on demand
62
+
5. **`hold_*` tables** - Query hold's PDS on demand
63
+
64
+
### Trade-offs
65
+
66
+
**Current approach (heavy caching):**
67
+
- Fast queries for UI (search, browse, stats)
68
+
- Offline resilience (PDS down doesn't break UI)
69
+
- Complex sync logic (Jetstream consumer, backfill)
70
+
- State can diverge from source of truth
71
+
72
+
**Lighter approach (query on demand):**
73
+
- Always fresh data
74
+
- Simpler codebase (no sync)
75
+
- Slower queries (network round-trips)
76
+
- Depends on PDS availability
77
+
78
+
## Current Limitation: No Cache-Miss Queries
79
+
80
+
**Finding:** There's no "query PDS on cache miss" logic. Users/manifests only enter the DB via:
81
+
1. OAuth login (user authenticates)
82
+
2. Jetstream events (firehose activity)
83
+
84
+
**Problem:** If someone visits `atcr.io/alice/myapp` before alice is indexed → 404
85
+
86
+
**Where this happens:**
87
+
- `pkg/appview/handlers/repository.go:50-53`: If `db.GetUserByDID()` returns nil → 404
88
+
- No fallback to `atproto.Client.ListRecords()` or similar
89
+
90
+
**This matters for Valkey migration:** If cache is ephemeral and restarts clear it, you need cache-miss logic to repopulate on demand. Otherwise:
91
+
- Restart Valkey → all users/manifests gone
92
+
- Wait for Jetstream to re-index OR implement cache-miss queries
93
+
94
+
**Cache-miss implementation design:**
95
+
96
+
Existing code to reuse: `pkg/appview/jetstream/processor.go:43-97` (`EnsureUser`)
97
+
98
+
```go
99
+
// New: pkg/appview/cache/loader.go
100
+
101
+
type Loader struct {
102
+
cache Cache // Valkey interface
103
+
client *atproto.Client
104
+
}
105
+
106
+
// GetUser with cache-miss fallback
107
+
func (l *Loader) GetUser(ctx context.Context, did string) (*User, error) {
108
+
// 1. Try cache
109
+
if user := l.cache.GetUser(did); user != nil {
110
+
return user, nil
111
+
}
112
+
113
+
// 2. Cache miss - resolve identity (already queries network)
114
+
_, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
115
+
if err != nil {
116
+
return nil, err // User doesn't exist in network
117
+
}
118
+
119
+
// 3. Fetch profile for avatar
120
+
client := atproto.NewClient(pdsEndpoint, "", "")
121
+
profile, _ := client.GetProfileRecord(ctx, did)
122
+
avatarURL := ""
123
+
if profile != nil && profile.Avatar != nil {
124
+
avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
125
+
}
126
+
127
+
// 4. Cache and return
128
+
user := &User{DID: did, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL}
129
+
l.cache.SetUser(user, 1*time.Hour)
130
+
return user, nil
131
+
}
132
+
133
+
// GetManifestsForRepo with cache-miss fallback
134
+
func (l *Loader) GetManifestsForRepo(ctx context.Context, did, repo string) ([]Manifest, error) {
135
+
cacheKey := fmt.Sprintf("manifests:%s:%s", did, repo)
136
+
137
+
// 1. Try cache
138
+
if cached := l.cache.Get(cacheKey); cached != nil {
139
+
return cached.([]Manifest), nil
140
+
}
141
+
142
+
// 2. Cache miss - get user's PDS endpoint
143
+
user, err := l.GetUser(ctx, did)
144
+
if err != nil {
145
+
return nil, err
146
+
}
147
+
148
+
// 3. Query PDS for manifests
149
+
client := atproto.NewClient(user.PDSEndpoint, "", "")
150
+
records, _, err := client.ListRecordsForRepo(ctx, did, atproto.ManifestCollection, 100, "")
151
+
if err != nil {
152
+
return nil, err
153
+
}
154
+
155
+
// 4. Filter by repository and parse
156
+
var manifests []Manifest
157
+
for _, rec := range records {
158
+
var m atproto.ManifestRecord
159
+
if err := json.Unmarshal(rec.Value, &m); err != nil {
160
+
continue
161
+
}
162
+
if m.Repository == repo {
163
+
manifests = append(manifests, convertManifest(m))
164
+
}
165
+
}
166
+
167
+
// 5. Cache and return
168
+
l.cache.Set(cacheKey, manifests, 10*time.Minute)
169
+
return manifests, nil
170
+
}
171
+
```
172
+
173
+
**Handler changes:**
174
+
```go
175
+
// Before (repository.go:45-53):
176
+
owner, err := db.GetUserByDID(h.DB, did)
177
+
if owner == nil {
178
+
RenderNotFound(w, r, h.Templates, h.RegistryURL)
179
+
return
180
+
}
181
+
182
+
// After:
183
+
owner, err := h.Loader.GetUser(r.Context(), did)
184
+
if err != nil {
185
+
RenderNotFound(w, r, h.Templates, h.RegistryURL)
186
+
return
187
+
}
188
+
```
189
+
190
+
**Performance considerations:**
191
+
- Cache hit: ~1ms (Valkey lookup)
192
+
- Cache miss: ~200-500ms (PDS round-trip)
193
+
- First request after restart: slower but correct
194
+
- Jetstream still useful for proactive warming
195
+
196
+
---
197
+
198
+
## Proposed Architecture: Valkey + ATProto
199
+
200
+
### Goal
201
+
Replace SQLite with Valkey (Redis-compatible) for ephemeral state, push remaining persistent data to ATProto.
202
+
203
+
### What goes to Valkey (ephemeral, TTL-based)
204
+
205
+
| Current Table | Valkey Key Pattern | TTL | Notes |
206
+
|---------------|-------------------|-----|-------|
207
+
| `oauth_sessions` | `oauth:{did}:{session_id}` | 90 days | Lost on restart = re-auth |
208
+
| `ui_sessions` | `ui:{session_id}` | Session duration | Lost on restart = re-login |
209
+
| `oauth_auth_requests` | `authreq:{state}` | 10 min | In-flight flows |
210
+
| `pending_device_auth` | `pending:{device_code}` | 10 min | In-flight flows |
211
+
| `firehose_cursor` | `cursor:jetstream` | None | Can restart from 0 |
212
+
| All PDS cache tables | `cache:{collection}:{did}:{rkey}` | 10-60 min | Query PDS on miss |
213
+
214
+
**Benefits:**
215
+
- Multi-instance ready (shared Valkey)
216
+
- No schema migrations
217
+
- Natural TTL expiry
218
+
- Simpler code (no SQL)
219
+
220
+
### What could become ATProto records
221
+
222
+
| Current Table | Proposed Collection | Where Stored | Open Questions |
223
+
|---------------|---------------------|--------------|----------------|
224
+
| `devices` | `io.atcr.sailor.device` | User's PDS | Privacy: IP, user-agent sensitive? |
225
+
| `repository_stats` | `io.atcr.repo.stats` | Hold's PDS or User's PDS | Who owns the stats? |
226
+
227
+
**Devices → Valkey:**
228
+
- Move current device table to Valkey
229
+
- Key: `device:{did}:{device_id}` → `{name, secret_hash, ip, user_agent, created_at, last_used}`
230
+
- TTL: Long (1 year?) or no expiry
231
+
- Device list: `devices:{did}` → Set of device IDs
232
+
- Secret validation works the same, just different backend
233
+
234
+
**Service auth exploration (future):**
235
+
The challenge with pure ATProto service auth is the AppView still needs the user's OAuth session to write manifests to their PDS. The current flow:
236
+
1. User authenticates via OAuth → AppView gets OAuth tokens
237
+
2. AppView issues registry JWT to credential helper
238
+
3. Credential helper presents JWT on each push/pull
239
+
4. AppView uses OAuth session to write to user's PDS
240
+
241
+
Service auth could work for the hold side (AppView → Hold), but not for the user's OAuth session.
242
+
243
+
**Repository stats → Hold's PDS:**
244
+
245
+
**Challenge discovered:** The hold's `getBlob` endpoint only receives `did` + `cid`, not the repository name.
246
+
247
+
Current flow (`proxy_blob_store.go:358-362`):
248
+
```go
249
+
xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s",
250
+
p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation)
251
+
```
252
+
253
+
**Implementation options:**
254
+
255
+
**Option A: Add repository parameter to getBlob (recommended)**
256
+
```go
257
+
// Modified AppView call:
258
+
xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s&repo=%s",
259
+
p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation, p.ctx.Repository)
260
+
```
261
+
262
+
```go
263
+
// Modified hold handler (xrpc.go:969):
264
+
func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
265
+
did := r.URL.Query().Get("did")
266
+
cidOrDigest := r.URL.Query().Get("cid")
267
+
repo := r.URL.Query().Get("repo") // NEW
268
+
269
+
// ... existing blob handling ...
270
+
271
+
// Increment stats if repo provided
272
+
if repo != "" {
273
+
go h.pds.IncrementPullCount(did, repo) // Async, non-blocking
274
+
}
275
+
}
276
+
```
277
+
278
+
**Stats record structure:**
279
+
```
280
+
Collection: io.atcr.hold.stats
281
+
Rkey: base64(did:repository) // Deterministic, unique
282
+
283
+
{
284
+
"$type": "io.atcr.hold.stats",
285
+
"did": "did:plc:alice123",
286
+
"repository": "myapp",
287
+
"pullCount": 1542,
288
+
"pushCount": 47,
289
+
"lastPull": "2025-01-15T...",
290
+
"lastPush": "2025-01-10T...",
291
+
"createdAt": "2025-01-01T..."
292
+
}
293
+
```
294
+
295
+
**Hold-side implementation:**
296
+
```go
297
+
// New file: pkg/hold/pds/stats.go
298
+
299
+
func (p *HoldPDS) IncrementPullCount(ctx context.Context, did, repo string) error {
300
+
rkey := statsRecordKey(did, repo)
301
+
302
+
// Get or create stats record
303
+
stats, err := p.GetStatsRecord(ctx, rkey)
304
+
if err != nil || stats == nil {
305
+
stats = &atproto.StatsRecord{
306
+
Type: atproto.StatsCollection,
307
+
DID: did,
308
+
Repository: repo,
309
+
PullCount: 0,
310
+
PushCount: 0,
311
+
CreatedAt: time.Now(),
312
+
}
313
+
}
314
+
315
+
// Increment and update
316
+
stats.PullCount++
317
+
stats.LastPull = time.Now()
318
+
319
+
_, err = p.repomgr.UpdateRecord(ctx, p.uid, atproto.StatsCollection, rkey, stats)
320
+
return err
321
+
}
322
+
```
323
+
324
+
**Query endpoint (new XRPC):**
325
+
```
326
+
GET /xrpc/io.atcr.hold.getStats?did={userDID}&repo={repository}
327
+
→ Returns JSON: { pullCount, pushCount, lastPull, lastPush }
328
+
329
+
GET /xrpc/io.atcr.hold.listStats?did={userDID}
330
+
→ Returns all stats for a user across all repos on this hold
331
+
```
332
+
333
+
**AppView aggregation:**
334
+
```go
335
+
func (l *Loader) GetAggregatedStats(ctx context.Context, did, repo string) (*Stats, error) {
336
+
// 1. Get all holds that have served this repo
337
+
holdDIDs, _ := l.cache.GetHoldDIDsForRepo(did, repo)
338
+
339
+
// 2. Query each hold for stats
340
+
var total Stats
341
+
for _, holdDID := range holdDIDs {
342
+
holdURL := resolveHoldDID(holdDID)
343
+
stats, _ := queryHoldStats(ctx, holdURL, did, repo)
344
+
total.PullCount += stats.PullCount
345
+
total.PushCount += stats.PushCount
346
+
}
347
+
348
+
return &total, nil
349
+
}
350
+
```
351
+
352
+
**Files to modify:**
353
+
- `pkg/atproto/lexicon.go` - Add `StatsCollection` + `StatsRecord`
354
+
- `pkg/hold/pds/stats.go` - New file for stats operations
355
+
- `pkg/hold/pds/xrpc.go` - Add `repo` param to getBlob, add stats endpoints
356
+
- `pkg/appview/storage/proxy_blob_store.go` - Pass repository to getBlob
357
+
- `pkg/appview/cache/loader.go` - Aggregation logic
358
+
359
+
### Migration Path
360
+
361
+
**Phase 1: Add Valkey infrastructure**
362
+
- Add Valkey client to AppView
363
+
- Create store interfaces that abstract SQLite vs Valkey
364
+
- Dual-write OAuth sessions to both
365
+
366
+
**Phase 2: Migrate sessions to Valkey**
367
+
- OAuth sessions, UI sessions, auth requests, pending device auth
368
+
- Remove SQLite session tables
369
+
- Test: restart AppView, users get logged out (acceptable)
370
+
371
+
**Phase 3: Migrate devices to Valkey**
372
+
- Move device store to Valkey
373
+
- Same data structure, different backend
374
+
- Consider device expiry policy
375
+
376
+
**Phase 4: Implement hold-side stats**
377
+
- Add `io.atcr.hold.stats` collection to hold's embedded PDS
378
+
- Hold increments stats on blob access
379
+
- Add XRPC endpoint: `io.atcr.hold.getStats`
380
+
381
+
**Phase 5: AppView stats aggregation**
382
+
- Track holdDids per repo in Valkey cache
383
+
- Query holds for stats, aggregate
384
+
- Cache aggregated stats with TTL
385
+
386
+
**Phase 6: Remove SQLite (optional)**
387
+
- Keep SQLite as optional cache layer for UI queries
388
+
- Or: Query PDS on demand with Valkey caching
389
+
- Jetstream still useful for real-time updates
390
+
391
+
## Summary Table
392
+
393
+
| Category | Tables | % of Schema | Truly Persistent? |
394
+
|----------|--------|-------------|-------------------|
395
+
| Auth & Sessions + Metrics | 6 | 32% | Yes |
396
+
| PDS Cache | 11 | 58% | No (rebuildable) |
397
+
| Operational | 2 | 10% | No |
398
+
399
+
**~58% of the database is cached ATProto data that could be rebuilt from PDSes.**
+127
-104
pkg/appview/middleware/registry.go
+127
-104
pkg/appview/middleware/registry.go
···
29
29
// authMethodKey is the context key for storing auth method from JWT
30
30
const authMethodKey contextKey = "auth.method"
31
31
32
+
// pullerDIDKey is the context key for storing the authenticated user's DID from JWT
33
+
const pullerDIDKey contextKey = "puller.did"
34
+
32
35
// validationCacheEntry stores a validated service token with expiration
33
36
type validationCacheEntry struct {
34
37
serviceToken string
···
302
305
// Get service token for hold authentication (only if authenticated)
303
306
// Use validation cache to prevent concurrent requests from racing on OAuth/DPoP
304
307
// Route based on auth method from JWT token
308
+
// IMPORTANT: Use PULLER's DID/PDS for service token, not owner's!
309
+
// The puller (authenticated user) needs to authenticate to the hold service.
305
310
var serviceToken string
306
311
authMethod, _ := ctx.Value(authMethodKey).(string)
312
+
pullerDID, _ := ctx.Value(pullerDIDKey).(string)
313
+
var pullerPDSEndpoint string
307
314
308
315
// Only fetch service token if user is authenticated
309
316
// Unauthenticated requests (like /v2/ ping) should not trigger token fetching
310
-
if authMethod != "" {
311
-
// Create cache key: "did:holdDID"
312
-
cacheKey := fmt.Sprintf("%s:%s", did, holdDID)
317
+
if authMethod != "" && pullerDID != "" {
318
+
// Resolve puller's PDS endpoint for service token request
319
+
_, _, pullerPDSEndpoint, err = atproto.ResolveIdentity(ctx, pullerDID)
320
+
if err != nil {
321
+
slog.Warn("Failed to resolve puller's PDS, falling back to anonymous access",
322
+
"component", "registry/middleware",
323
+
"pullerDID", pullerDID,
324
+
"error", err)
325
+
// Continue without service token - hold will decide if anonymous access is allowed
326
+
} else {
327
+
// Create cache key: "pullerDID:holdDID"
328
+
cacheKey := fmt.Sprintf("%s:%s", pullerDID, holdDID)
313
329
314
-
// Fetch service token through validation cache
315
-
// This ensures only ONE request per DID:holdDID pair fetches the token
316
-
// Concurrent requests will wait for the first request to complete
317
-
var fetchErr error
318
-
serviceToken, fetchErr = nr.validationCache.getOrFetch(ctx, cacheKey, func() (string, error) {
319
-
if authMethod == token.AuthMethodAppPassword {
320
-
// App-password flow: use Bearer token authentication
321
-
slog.Debug("Using app-password flow for service token",
322
-
"component", "registry/middleware",
323
-
"did", did,
324
-
"cacheKey", cacheKey)
330
+
// Fetch service token through validation cache
331
+
// This ensures only ONE request per pullerDID:holdDID pair fetches the token
332
+
// Concurrent requests will wait for the first request to complete
333
+
var fetchErr error
334
+
serviceToken, fetchErr = nr.validationCache.getOrFetch(ctx, cacheKey, func() (string, error) {
335
+
if authMethod == token.AuthMethodAppPassword {
336
+
// App-password flow: use Bearer token authentication
337
+
slog.Debug("Using app-password flow for service token",
338
+
"component", "registry/middleware",
339
+
"pullerDID", pullerDID,
340
+
"cacheKey", cacheKey)
325
341
326
-
token, err := token.GetOrFetchServiceTokenWithAppPassword(ctx, did, holdDID, pdsEndpoint)
327
-
if err != nil {
328
-
slog.Error("Failed to get service token with app-password",
342
+
token, err := token.GetOrFetchServiceTokenWithAppPassword(ctx, pullerDID, holdDID, pullerPDSEndpoint)
343
+
if err != nil {
344
+
slog.Error("Failed to get service token with app-password",
345
+
"component", "registry/middleware",
346
+
"pullerDID", pullerDID,
347
+
"holdDID", holdDID,
348
+
"pullerPDSEndpoint", pullerPDSEndpoint,
349
+
"error", err)
350
+
return "", err
351
+
}
352
+
return token, nil
353
+
} else if nr.refresher != nil {
354
+
// OAuth flow: use DPoP authentication
355
+
slog.Debug("Using OAuth flow for service token",
329
356
"component", "registry/middleware",
330
-
"did", did,
331
-
"holdDID", holdDID,
332
-
"pdsEndpoint", pdsEndpoint,
333
-
"error", err)
334
-
return "", err
357
+
"pullerDID", pullerDID,
358
+
"cacheKey", cacheKey)
359
+
360
+
token, err := token.GetOrFetchServiceToken(ctx, nr.refresher, pullerDID, holdDID, pullerPDSEndpoint)
361
+
if err != nil {
362
+
slog.Error("Failed to get service token with OAuth",
363
+
"component", "registry/middleware",
364
+
"pullerDID", pullerDID,
365
+
"holdDID", holdDID,
366
+
"pullerPDSEndpoint", pullerPDSEndpoint,
367
+
"error", err)
368
+
return "", err
369
+
}
370
+
return token, nil
335
371
}
336
-
return token, nil
337
-
} else if nr.refresher != nil {
338
-
// OAuth flow: use DPoP authentication
339
-
slog.Debug("Using OAuth flow for service token",
340
-
"component", "registry/middleware",
341
-
"did", did,
342
-
"cacheKey", cacheKey)
372
+
return "", fmt.Errorf("no authentication method available")
373
+
})
343
374
344
-
token, err := token.GetOrFetchServiceToken(ctx, nr.refresher, did, holdDID, pdsEndpoint)
345
-
if err != nil {
346
-
slog.Error("Failed to get service token with OAuth",
347
-
"component", "registry/middleware",
348
-
"did", did,
349
-
"holdDID", holdDID,
350
-
"pdsEndpoint", pdsEndpoint,
351
-
"error", err)
352
-
return "", err
375
+
// Handle errors from cached fetch
376
+
if fetchErr != nil {
377
+
errMsg := fetchErr.Error()
378
+
379
+
// Check for app-password specific errors
380
+
if authMethod == token.AuthMethodAppPassword {
381
+
if strings.Contains(errMsg, "expired or invalid") || strings.Contains(errMsg, "no app-password") {
382
+
return nil, nr.authErrorMessage("App-password authentication failed. Please re-authenticate with: docker login")
383
+
}
353
384
}
354
-
return token, nil
355
-
}
356
-
return "", fmt.Errorf("no authentication method available")
357
-
})
358
385
359
-
// Handle errors from cached fetch
360
-
if fetchErr != nil {
361
-
errMsg := fetchErr.Error()
362
-
363
-
// Check for app-password specific errors
364
-
if authMethod == token.AuthMethodAppPassword {
365
-
if strings.Contains(errMsg, "expired or invalid") || strings.Contains(errMsg, "no app-password") {
366
-
return nil, nr.authErrorMessage("App-password authentication failed. Please re-authenticate with: docker login")
386
+
// Check for OAuth specific errors
387
+
if strings.Contains(errMsg, "OAuth session") || strings.Contains(errMsg, "OAuth validation") {
388
+
return nil, nr.authErrorMessage("OAuth session expired or invalidated by PDS. Your session has been cleared")
367
389
}
368
-
}
369
390
370
-
// Check for OAuth specific errors
371
-
if strings.Contains(errMsg, "OAuth session") || strings.Contains(errMsg, "OAuth validation") {
372
-
return nil, nr.authErrorMessage("OAuth session expired or invalidated by PDS. Your session has been cleared")
391
+
// Generic service token error
392
+
return nil, nr.authErrorMessage(fmt.Sprintf("Failed to obtain storage credentials: %v", fetchErr))
373
393
}
374
-
375
-
// Generic service token error
376
-
return nil, nr.authErrorMessage(fmt.Sprintf("Failed to obtain storage credentials: %v", fetchErr))
377
394
}
378
395
} else {
379
396
slog.Debug("Skipping service token fetch for unauthenticated request",
380
397
"component", "registry/middleware",
381
-
"did", did)
398
+
"ownerDID", did)
382
399
}
383
400
384
401
// Create a new reference with identity/image format
···
396
413
return nil, err
397
414
}
398
415
399
-
// Get access token for PDS operations
400
-
// Use auth method from JWT to determine client type:
401
-
// - OAuth users: use session provider (DPoP-enabled)
402
-
// - App-password users: use Basic Auth token cache
416
+
// Create ATProto client for manifest/tag operations
417
+
// Pulls: ATProto records are public, no auth needed
418
+
// Pushes: Need auth, but puller must be owner anyway
403
419
var atprotoClient *atproto.Client
404
420
405
-
if authMethod == token.AuthMethodOAuth && nr.refresher != nil {
406
-
// OAuth flow: use session provider for locked OAuth sessions
407
-
// This prevents DPoP nonce race conditions during concurrent layer uploads
408
-
slog.Debug("Creating ATProto client with OAuth session provider",
409
-
"component", "registry/middleware",
410
-
"did", did,
411
-
"authMethod", authMethod)
412
-
atprotoClient = atproto.NewClientWithSessionProvider(pdsEndpoint, did, nr.refresher)
413
-
} else {
414
-
// App-password flow (or fallback): use Basic Auth token cache
415
-
accessToken, ok := auth.GetGlobalTokenCache().Get(did)
416
-
if !ok {
417
-
slog.Debug("No cached access token found for app-password auth",
418
-
"component", "registry/middleware",
419
-
"did", did,
420
-
"authMethod", authMethod)
421
-
accessToken = "" // Will fail on manifest push, but let it try
421
+
if pullerDID == did {
422
+
// Puller is owner - may need auth for pushes
423
+
if authMethod == token.AuthMethodOAuth && nr.refresher != nil {
424
+
atprotoClient = atproto.NewClientWithSessionProvider(pdsEndpoint, did, nr.refresher)
425
+
} else if authMethod == token.AuthMethodAppPassword {
426
+
accessToken, _ := auth.GetGlobalTokenCache().Get(did)
427
+
atprotoClient = atproto.NewClient(pdsEndpoint, did, accessToken)
422
428
} else {
423
-
slog.Debug("Creating ATProto client with app-password",
424
-
"component", "registry/middleware",
425
-
"did", did,
426
-
"authMethod", authMethod,
427
-
"token_length", len(accessToken))
429
+
atprotoClient = atproto.NewClient(pdsEndpoint, did, "")
428
430
}
429
-
atprotoClient = atproto.NewClient(pdsEndpoint, did, accessToken)
431
+
} else {
432
+
// Puller != owner - reads only, no auth needed
433
+
atprotoClient = atproto.NewClient(pdsEndpoint, did, "")
430
434
}
431
435
432
436
// IMPORTANT: Use only the image name (not identity/image) for ATProto storage
···
449
453
// 3. The refresher already caches sessions efficiently (in-memory + DB)
450
454
// 4. Caching the repository with a stale ATProtoClient causes refresh token errors
451
455
registryCtx := &storage.RegistryContext{
452
-
DID: did,
453
-
Handle: handle,
454
-
HoldDID: holdDID,
455
-
PDSEndpoint: pdsEndpoint,
456
-
Repository: repositoryName,
457
-
ServiceToken: serviceToken, // Cached service token from middleware validation
458
-
ATProtoClient: atprotoClient,
459
-
AuthMethod: authMethod, // Auth method from JWT token
460
-
Database: nr.database,
461
-
Authorizer: nr.authorizer,
462
-
Refresher: nr.refresher,
463
-
ReadmeFetcher: nr.readmeFetcher,
456
+
DID: did,
457
+
Handle: handle,
458
+
HoldDID: holdDID,
459
+
PDSEndpoint: pdsEndpoint,
460
+
Repository: repositoryName,
461
+
ServiceToken: serviceToken, // Cached service token from puller's PDS
462
+
ATProtoClient: atprotoClient,
463
+
AuthMethod: authMethod, // Auth method from JWT token
464
+
PullerDID: pullerDID, // Authenticated user making the request
465
+
PullerPDSEndpoint: pullerPDSEndpoint, // Puller's PDS for service token refresh
466
+
Database: nr.database,
467
+
Authorizer: nr.authorizer,
468
+
Refresher: nr.refresher,
469
+
ReadmeFetcher: nr.readmeFetcher,
464
470
}
465
471
466
472
return storage.NewRoutingRepository(repo, registryCtx), nil
···
533
539
return false
534
540
}
535
541
536
-
// ExtractAuthMethod is an HTTP middleware that extracts the auth method from the JWT Authorization header
537
-
// and stores it in the request context for later use by the registry middleware
542
+
// ExtractAuthMethod is an HTTP middleware that extracts the auth method and puller DID from the JWT Authorization header
543
+
// and stores them in the request context for later use by the registry middleware.
544
+
// Also stores the HTTP method for routing decisions (GET/HEAD = pull, PUT/POST = push).
538
545
func ExtractAuthMethod(next http.Handler) http.Handler {
539
546
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
547
+
ctx := r.Context()
548
+
549
+
// Store HTTP method in context for routing decisions
550
+
// This is used by routing_repository.go to distinguish pull (GET/HEAD) from push (PUT/POST)
551
+
ctx = context.WithValue(ctx, "http.request.method", r.Method)
552
+
540
553
// Extract Authorization header
541
554
authHeader := r.Header.Get("Authorization")
542
555
if authHeader != "" {
···
549
562
authMethod := token.ExtractAuthMethod(tokenString)
550
563
if authMethod != "" {
551
564
// Store in context for registry middleware
552
-
ctx := context.WithValue(r.Context(), authMethodKey, authMethod)
553
-
r = r.WithContext(ctx)
554
-
slog.Debug("Extracted auth method from JWT",
555
-
"component", "registry/middleware",
556
-
"authMethod", authMethod)
565
+
ctx = context.WithValue(ctx, authMethodKey, authMethod)
566
+
}
567
+
568
+
// Extract puller DID (Subject) from JWT
569
+
// This is the authenticated user's DID, used for service token requests
570
+
pullerDID := token.ExtractSubject(tokenString)
571
+
if pullerDID != "" {
572
+
ctx = context.WithValue(ctx, pullerDIDKey, pullerDID)
557
573
}
574
+
575
+
slog.Debug("Extracted auth info from JWT",
576
+
"component", "registry/middleware",
577
+
"authMethod", authMethod,
578
+
"pullerDID", pullerDID,
579
+
"httpMethod", r.Method)
558
580
}
559
581
}
560
582
583
+
r = r.WithContext(ctx)
561
584
next.ServeHTTP(w, r)
562
585
})
563
586
}
+12
-8
pkg/appview/storage/context.go
+12
-8
pkg/appview/storage/context.go
···
18
18
// This includes both per-request data (DID, hold) and shared services
19
19
type RegistryContext struct {
20
20
// Per-request identity and routing information
21
-
DID string // User's DID (e.g., "did:plc:abc123")
22
-
Handle string // User's handle (e.g., "alice.bsky.social")
23
-
HoldDID string // Hold service DID (e.g., "did:web:hold01.atcr.io")
24
-
PDSEndpoint string // User's PDS endpoint URL
25
-
Repository string // Image repository name (e.g., "debian")
26
-
ServiceToken string // Service token for hold authentication (cached by middleware)
27
-
ATProtoClient *atproto.Client // Authenticated ATProto client for this user
28
-
AuthMethod string // Auth method used ("oauth" or "app_password")
21
+
// Owner = the user whose repository is being accessed
22
+
// Puller = the authenticated user making the request (from JWT Subject)
23
+
DID string // Owner's DID - whose repo is being accessed (e.g., "did:plc:abc123")
24
+
Handle string // Owner's handle (e.g., "alice.bsky.social")
25
+
HoldDID string // Hold service DID (e.g., "did:web:hold01.atcr.io")
26
+
PDSEndpoint string // Owner's PDS endpoint URL
27
+
Repository string // Image repository name (e.g., "debian")
28
+
ServiceToken string // Service token for hold authentication (from puller's PDS)
29
+
ATProtoClient *atproto.Client // Authenticated ATProto client for the owner
30
+
AuthMethod string // Auth method used ("oauth" or "app_password")
31
+
PullerDID string // Puller's DID - who is making the request (from JWT Subject)
32
+
PullerPDSEndpoint string // Puller's PDS endpoint URL
29
33
30
34
// Shared services (same for all requests)
31
35
Database DatabaseMetrics // Metrics tracking database
+11
-5
pkg/appview/storage/manifest_store.go
+11
-5
pkg/appview/storage/manifest_store.go
···
8
8
"fmt"
9
9
"io"
10
10
"log/slog"
11
-
"maps"
12
11
"net/http"
13
12
"strings"
14
13
"sync"
···
180
179
if err != nil {
181
180
// Log error but don't fail the push - labels are optional
182
181
slog.Warn("Failed to extract config labels", "error", err)
183
-
} else {
182
+
} else if len(labels) > 0 {
184
183
// Initialize annotations map if needed
185
184
if manifestRecord.Annotations == nil {
186
185
manifestRecord.Annotations = make(map[string]string)
187
186
}
188
187
189
-
// Copy labels to annotations (Dockerfile LABELs → manifest annotations)
190
-
maps.Copy(manifestRecord.Annotations, labels)
188
+
// Copy labels to annotations as fallback
189
+
// Only set label values for keys NOT already in manifest annotations
190
+
// This ensures explicit annotations take precedence over Dockerfile LABELs
191
+
// (which may be inherited from base images)
192
+
for key, value := range labels {
193
+
if _, exists := manifestRecord.Annotations[key]; !exists {
194
+
manifestRecord.Annotations[key] = value
195
+
}
196
+
}
191
197
192
-
slog.Debug("Extracted labels from config blob", "count", len(labels))
198
+
slog.Debug("Merged labels from config blob", "labelsCount", len(labels), "annotationsCount", len(manifestRecord.Annotations))
193
199
}
194
200
}
195
201
+2
-2
pkg/appview/storage/routing_repository.go
+2
-2
pkg/appview/storage/routing_repository.go
···
64
64
return blobStore
65
65
}
66
66
67
-
// Determine if this is a pull (GET) or push (PUT/POST/HEAD/etc) operation
67
+
// Determine if this is a pull (GET/HEAD) or push (PUT/POST/etc) operation
68
68
// Pull operations use the historical hold DID from the database (blobs are where they were pushed)
69
69
// Push operations use the discovery-based hold DID from user's profile/default
70
70
// This allows users to change their default hold and have new pushes go there
71
71
isPull := false
72
72
if method, ok := ctx.Value("http.request.method").(string); ok {
73
-
isPull = method == "GET"
73
+
isPull = method == "GET" || method == "HEAD"
74
74
}
75
75
76
76
holdDID := r.Ctx.HoldDID // Default to discovery-based DID
+19
-17
pkg/appview/storage/routing_repository_test.go
+19
-17
pkg/appview/storage/routing_repository_test.go
···
109
109
assert.NotNil(t, repo.manifestStore)
110
110
}
111
111
112
-
// TestRoutingRepository_Blobs_PullUsesDatabase tests that GET (pull) uses database hold DID
112
+
// TestRoutingRepository_Blobs_PullUsesDatabase tests that GET and HEAD (pull) use database hold DID
113
113
func TestRoutingRepository_Blobs_PullUsesDatabase(t *testing.T) {
114
114
dbHoldDID := "did:web:database.hold.io"
115
115
discoveryHoldDID := "did:web:discovery.hold.io"
116
116
117
-
ctx := &RegistryContext{
118
-
DID: "did:plc:test123",
119
-
Repository: "myapp",
120
-
HoldDID: discoveryHoldDID, // Discovery-based hold (should be overridden for pull)
121
-
ATProtoClient: atproto.NewClient("https://pds.example.com", "did:plc:test123", ""),
122
-
Database: &mockDatabase{holdDID: dbHoldDID},
123
-
}
124
-
125
-
repo := NewRoutingRepository(nil, ctx)
117
+
// Test both GET and HEAD as pull operations
118
+
for _, method := range []string{"GET", "HEAD"} {
119
+
// Reset context for each test
120
+
ctx := &RegistryContext{
121
+
DID: "did:plc:test123",
122
+
Repository: "myapp-" + method, // Unique repo to avoid caching
123
+
HoldDID: discoveryHoldDID,
124
+
ATProtoClient: atproto.NewClient("https://pds.example.com", "did:plc:test123", ""),
125
+
Database: &mockDatabase{holdDID: dbHoldDID},
126
+
}
127
+
repo := NewRoutingRepository(nil, ctx)
126
128
127
-
// Create context with GET method (pull operation)
128
-
pullCtx := context.WithValue(context.Background(), "http.request.method", "GET")
129
-
blobStore := repo.Blobs(pullCtx)
129
+
pullCtx := context.WithValue(context.Background(), "http.request.method", method)
130
+
blobStore := repo.Blobs(pullCtx)
130
131
131
-
assert.NotNil(t, blobStore)
132
-
// Verify the hold DID was updated to use the database value for pull
133
-
assert.Equal(t, dbHoldDID, repo.Ctx.HoldDID, "pull (GET) should use database hold DID")
132
+
assert.NotNil(t, blobStore)
133
+
// Verify the hold DID was updated to use the database value for pull
134
+
assert.Equal(t, dbHoldDID, repo.Ctx.HoldDID, "pull (%s) should use database hold DID", method)
135
+
}
134
136
}
135
137
136
138
// TestRoutingRepository_Blobs_PushUsesDiscovery tests that push operations use discovery hold DID
···
144
146
}{
145
147
{"PUT", "PUT"},
146
148
{"POST", "POST"},
147
-
{"HEAD", "HEAD"},
149
+
// HEAD is now treated as pull (like GET) - see TestRoutingRepository_Blobs_Pull
148
150
{"PATCH", "PATCH"},
149
151
{"DELETE", "DELETE"},
150
152
}
+19
pkg/auth/token/claims.go
+19
pkg/auth/token/claims.go
···
56
56
57
57
return claims.AuthMethod
58
58
}
59
+
60
+
// ExtractSubject parses a JWT token string and extracts the Subject claim (the user's DID)
61
+
// Returns the subject or empty string if not found or token is invalid
62
+
// This does NOT validate the token - it only parses it to extract the claim
63
+
func ExtractSubject(tokenString string) string {
64
+
// Parse token without validation (we only need the claims, validation is done by distribution library)
65
+
parser := jwt.NewParser(jwt.WithoutClaimsValidation())
66
+
token, _, err := parser.ParseUnverified(tokenString, &Claims{})
67
+
if err != nil {
68
+
return "" // Invalid token format
69
+
}
70
+
71
+
claims, ok := token.Claims.(*Claims)
72
+
if !ok {
73
+
return "" // Wrong claims type
74
+
}
75
+
76
+
return claims.Subject
77
+
}
+70
-27
pkg/hold/pds/auth.go
+70
-27
pkg/hold/pds/auth.go
···
4
4
"context"
5
5
"encoding/base64"
6
6
"encoding/json"
7
+
"errors"
7
8
"fmt"
8
9
"io"
9
10
"log/slog"
···
18
19
"github.com/golang-jwt/jwt/v5"
19
20
)
20
21
22
+
// Authentication errors
23
+
var (
24
+
ErrMissingAuthHeader = errors.New("missing Authorization header")
25
+
ErrInvalidAuthFormat = errors.New("invalid Authorization header format")
26
+
ErrInvalidAuthScheme = errors.New("invalid authorization scheme: expected 'Bearer' or 'DPoP'")
27
+
ErrMissingToken = errors.New("missing token")
28
+
ErrMissingDPoPHeader = errors.New("missing DPoP header")
29
+
)
30
+
31
+
// JWT validation errors
32
+
var (
33
+
ErrInvalidJWTFormat = errors.New("invalid JWT format: expected header.payload.signature")
34
+
ErrMissingISSClaim = errors.New("missing 'iss' claim in token")
35
+
ErrMissingSubClaim = errors.New("missing 'sub' claim in token")
36
+
ErrTokenExpired = errors.New("token has expired")
37
+
)
38
+
39
+
// AuthError provides structured authorization error information
40
+
type AuthError struct {
41
+
Action string // The action being attempted: "blob:read", "blob:write", "crew:admin"
42
+
Reason string // Why access was denied
43
+
Required []string // What permission(s) would grant access
44
+
}
45
+
46
+
func (e *AuthError) Error() string {
47
+
return fmt.Sprintf("access denied for %s: %s (required: %s)",
48
+
e.Action, e.Reason, strings.Join(e.Required, " or "))
49
+
}
50
+
51
+
// NewAuthError creates a new AuthError
52
+
func NewAuthError(action, reason string, required ...string) *AuthError {
53
+
return &AuthError{
54
+
Action: action,
55
+
Reason: reason,
56
+
Required: required,
57
+
}
58
+
}
59
+
21
60
// HTTPClient interface allows injecting a custom HTTP client for testing
22
61
type HTTPClient interface {
23
62
Do(*http.Request) (*http.Response, error)
···
44
83
// Extract Authorization header
45
84
authHeader := r.Header.Get("Authorization")
46
85
if authHeader == "" {
47
-
return nil, fmt.Errorf("missing Authorization header")
86
+
return nil, ErrMissingAuthHeader
48
87
}
49
88
50
89
// Check for DPoP authorization scheme
51
90
parts := strings.SplitN(authHeader, " ", 2)
52
91
if len(parts) != 2 {
53
-
return nil, fmt.Errorf("invalid Authorization header format")
92
+
return nil, ErrInvalidAuthFormat
54
93
}
55
94
56
95
if parts[0] != "DPoP" {
···
59
98
60
99
accessToken := parts[1]
61
100
if accessToken == "" {
62
-
return nil, fmt.Errorf("missing access token")
101
+
return nil, ErrMissingToken
63
102
}
64
103
65
104
// Extract DPoP header
66
105
dpopProof := r.Header.Get("DPoP")
67
106
if dpopProof == "" {
68
-
return nil, fmt.Errorf("missing DPoP header")
107
+
return nil, ErrMissingDPoPHeader
69
108
}
70
109
71
110
// TODO: We could verify the DPoP proof locally (signature, HTM, HTU, etc.)
···
109
148
// JWT format: header.payload.signature
110
149
parts := strings.Split(token, ".")
111
150
if len(parts) != 3 {
112
-
return "", "", fmt.Errorf("invalid JWT format")
151
+
return "", "", ErrInvalidJWTFormat
113
152
}
114
153
115
154
// Decode payload (base64url)
···
129
168
}
130
169
131
170
if claims.Sub == "" {
132
-
return "", "", fmt.Errorf("missing sub claim (DID)")
171
+
return "", "", ErrMissingSubClaim
133
172
}
134
173
135
174
if claims.Iss == "" {
136
-
return "", "", fmt.Errorf("missing iss claim (PDS)")
175
+
return "", "", ErrMissingISSClaim
137
176
}
138
177
139
178
return claims.Sub, claims.Iss, nil
···
216
255
return nil, fmt.Errorf("DPoP authentication failed: %w", err)
217
256
}
218
257
} else {
219
-
return nil, fmt.Errorf("missing or invalid Authorization header (expected Bearer or DPoP)")
258
+
return nil, ErrInvalidAuthScheme
220
259
}
221
260
222
261
// Get captain record to check owner
···
243
282
return user, nil
244
283
}
245
284
// User is crew but doesn't have admin permission
246
-
return nil, fmt.Errorf("crew member lacks required 'crew:admin' permission")
285
+
return nil, NewAuthError("crew:admin", "crew member lacks permission", "crew:admin")
247
286
}
248
287
}
249
288
250
289
// User is neither owner nor authorized crew
251
-
return nil, fmt.Errorf("user is not authorized (must be hold owner or crew admin)")
290
+
return nil, NewAuthError("crew:admin", "user is not a crew member", "crew:admin")
252
291
}
253
292
254
293
// ValidateBlobWriteAccess validates that the request has valid authentication
···
276
315
return nil, fmt.Errorf("DPoP authentication failed: %w", err)
277
316
}
278
317
} else {
279
-
return nil, fmt.Errorf("missing or invalid Authorization header (expected Bearer or DPoP)")
318
+
return nil, ErrInvalidAuthScheme
280
319
}
281
320
282
321
// Get captain record to check owner and public settings
···
303
342
return user, nil
304
343
}
305
344
// User is crew but doesn't have write permission
306
-
return nil, fmt.Errorf("crew member lacks required 'blob:write' permission")
345
+
return nil, NewAuthError("blob:write", "crew member lacks permission", "blob:write")
307
346
}
308
347
}
309
348
310
349
// User is neither owner nor authorized crew
311
-
return nil, fmt.Errorf("user is not authorized for blob write (must be hold owner or crew with blob:write permission)")
350
+
return nil, NewAuthError("blob:write", "user is not a crew member", "blob:write")
312
351
}
313
352
314
353
// ValidateBlobReadAccess validates that the request has read access to blobs
315
354
// If captain.public = true: No auth required (returns nil user to indicate public access)
316
-
// If captain.public = false: Requires valid DPoP + OAuth and (captain OR crew with blob:read permission).
355
+
// If captain.public = false: Requires valid DPoP + OAuth and (captain OR crew with blob:read or blob:write permission).
356
+
// Note: blob:write implicitly grants blob:read access.
317
357
// The httpClient parameter is optional and defaults to http.DefaultClient if nil.
318
358
func ValidateBlobReadAccess(r *http.Request, pds *HoldPDS, httpClient HTTPClient) (*ValidatedUser, error) {
319
359
// Get captain record to check public setting
···
344
384
return nil, fmt.Errorf("DPoP authentication failed: %w", err)
345
385
}
346
386
} else {
347
-
return nil, fmt.Errorf("missing or invalid Authorization header (expected Bearer or DPoP)")
387
+
return nil, ErrInvalidAuthScheme
348
388
}
349
389
350
390
// Check if user is the owner (always has read access)
···
352
392
return user, nil
353
393
}
354
394
355
-
// Check if user is crew with blob:read permission
395
+
// Check if user is crew with blob:read or blob:write permission
396
+
// Note: blob:write implicitly grants blob:read access
356
397
crew, err := pds.ListCrewMembers(r.Context())
357
398
if err != nil {
358
399
return nil, fmt.Errorf("failed to check crew membership: %w", err)
···
360
401
361
402
for _, member := range crew {
362
403
if member.Record.Member == user.DID {
363
-
// Check if this crew member has blob:read permission
364
-
if slices.Contains(member.Record.Permissions, "blob:read") {
404
+
// Check if this crew member has blob:read or blob:write permission
405
+
// blob:write implicitly grants read access (can't push without pulling)
406
+
if slices.Contains(member.Record.Permissions, "blob:read") ||
407
+
slices.Contains(member.Record.Permissions, "blob:write") {
365
408
return user, nil
366
409
}
367
-
// User is crew but doesn't have read permission
368
-
return nil, fmt.Errorf("crew member lacks required 'blob:read' permission")
410
+
// User is crew but doesn't have read or write permission
411
+
return nil, NewAuthError("blob:read", "crew member lacks permission", "blob:read", "blob:write")
369
412
}
370
413
}
371
414
372
415
// User is neither owner nor authorized crew
373
-
return nil, fmt.Errorf("user is not authorized for blob read (must be hold owner or crew with blob:read permission)")
416
+
return nil, NewAuthError("blob:read", "user is not a crew member", "blob:read", "blob:write")
374
417
}
375
418
376
419
// ServiceTokenClaims represents the claims in a service token JWT
···
385
428
// Extract Authorization header
386
429
authHeader := r.Header.Get("Authorization")
387
430
if authHeader == "" {
388
-
return nil, fmt.Errorf("missing Authorization header")
431
+
return nil, ErrMissingAuthHeader
389
432
}
390
433
391
434
// Check for Bearer authorization scheme
392
435
parts := strings.SplitN(authHeader, " ", 2)
393
436
if len(parts) != 2 {
394
-
return nil, fmt.Errorf("invalid Authorization header format")
437
+
return nil, ErrInvalidAuthFormat
395
438
}
396
439
397
440
if parts[0] != "Bearer" {
···
400
443
401
444
tokenString := parts[1]
402
445
if tokenString == "" {
403
-
return nil, fmt.Errorf("missing token")
446
+
return nil, ErrMissingToken
404
447
}
405
448
406
449
slog.Debug("Validating service token", "holdDID", holdDID)
···
409
452
// Split token: header.payload.signature
410
453
tokenParts := strings.Split(tokenString, ".")
411
454
if len(tokenParts) != 3 {
412
-
return nil, fmt.Errorf("invalid JWT format")
455
+
return nil, ErrInvalidJWTFormat
413
456
}
414
457
415
458
// Decode payload (second part) to extract claims
···
427
470
// Get issuer (user DID)
428
471
issuerDID := claims.Issuer
429
472
if issuerDID == "" {
430
-
return nil, fmt.Errorf("missing iss claim")
473
+
return nil, ErrMissingISSClaim
431
474
}
432
475
433
476
// Verify audience matches this hold service
···
445
488
return nil, fmt.Errorf("failed to get expiration: %w", err)
446
489
}
447
490
if exp != nil && time.Now().After(exp.Time) {
448
-
return nil, fmt.Errorf("token has expired")
491
+
return nil, ErrTokenExpired
449
492
}
450
493
451
494
// Verify JWT signature using ATProto's secp256k1 crypto
+110
pkg/hold/pds/auth_test.go
+110
pkg/hold/pds/auth_test.go
···
771
771
}
772
772
}
773
773
774
+
// TestValidateBlobReadAccess_BlobWriteImpliesRead tests that blob:write grants read access
775
+
func TestValidateBlobReadAccess_BlobWriteImpliesRead(t *testing.T) {
776
+
ownerDID := "did:plc:owner123"
777
+
778
+
pds, ctx := setupTestPDSWithBootstrap(t, ownerDID, false, false)
779
+
780
+
// Verify captain record has public=false (private hold)
781
+
_, captain, err := pds.GetCaptainRecord(ctx)
782
+
if err != nil {
783
+
t.Fatalf("Failed to get captain record: %v", err)
784
+
}
785
+
786
+
if captain.Public {
787
+
t.Error("Expected public=false for captain record")
788
+
}
789
+
790
+
// Add crew member with ONLY blob:write permission (no blob:read)
791
+
writerDID := "did:plc:writer123"
792
+
_, err = pds.AddCrewMember(ctx, writerDID, "writer", []string{"blob:write"})
793
+
if err != nil {
794
+
t.Fatalf("Failed to add crew writer: %v", err)
795
+
}
796
+
797
+
mockClient := &mockPDSClient{}
798
+
799
+
// Test writer (has only blob:write permission) can read
800
+
t.Run("crew with blob:write can read", func(t *testing.T) {
801
+
dpopHelper, err := NewDPoPTestHelper(writerDID, "https://test-pds.example.com")
802
+
if err != nil {
803
+
t.Fatalf("Failed to create DPoP helper: %v", err)
804
+
}
805
+
806
+
req := httptest.NewRequest(http.MethodGet, "/test", nil)
807
+
if err := dpopHelper.AddDPoPToRequest(req); err != nil {
808
+
t.Fatalf("Failed to add DPoP to request: %v", err)
809
+
}
810
+
811
+
// This should SUCCEED because blob:write implies blob:read
812
+
user, err := ValidateBlobReadAccess(req, pds, mockClient)
813
+
if err != nil {
814
+
t.Errorf("Expected blob:write to grant read access, got error: %v", err)
815
+
}
816
+
817
+
if user == nil {
818
+
t.Error("Expected user to be returned for valid read access")
819
+
} else if user.DID != writerDID {
820
+
t.Errorf("Expected user DID %s, got %s", writerDID, user.DID)
821
+
}
822
+
})
823
+
824
+
// Also verify that crew with only blob:read still works
825
+
t.Run("crew with blob:read can read", func(t *testing.T) {
826
+
readerDID := "did:plc:reader123"
827
+
_, err = pds.AddCrewMember(ctx, readerDID, "reader", []string{"blob:read"})
828
+
if err != nil {
829
+
t.Fatalf("Failed to add crew reader: %v", err)
830
+
}
831
+
832
+
dpopHelper, err := NewDPoPTestHelper(readerDID, "https://test-pds.example.com")
833
+
if err != nil {
834
+
t.Fatalf("Failed to create DPoP helper: %v", err)
835
+
}
836
+
837
+
req := httptest.NewRequest(http.MethodGet, "/test", nil)
838
+
if err := dpopHelper.AddDPoPToRequest(req); err != nil {
839
+
t.Fatalf("Failed to add DPoP to request: %v", err)
840
+
}
841
+
842
+
user, err := ValidateBlobReadAccess(req, pds, mockClient)
843
+
if err != nil {
844
+
t.Errorf("Expected blob:read to grant read access, got error: %v", err)
845
+
}
846
+
847
+
if user == nil {
848
+
t.Error("Expected user to be returned for valid read access")
849
+
} else if user.DID != readerDID {
850
+
t.Errorf("Expected user DID %s, got %s", readerDID, user.DID)
851
+
}
852
+
})
853
+
854
+
// Verify crew with neither permission cannot read
855
+
t.Run("crew without read or write cannot read", func(t *testing.T) {
856
+
noPermDID := "did:plc:noperm123"
857
+
_, err = pds.AddCrewMember(ctx, noPermDID, "noperm", []string{"crew:admin"})
858
+
if err != nil {
859
+
t.Fatalf("Failed to add crew member: %v", err)
860
+
}
861
+
862
+
dpopHelper, err := NewDPoPTestHelper(noPermDID, "https://test-pds.example.com")
863
+
if err != nil {
864
+
t.Fatalf("Failed to create DPoP helper: %v", err)
865
+
}
866
+
867
+
req := httptest.NewRequest(http.MethodGet, "/test", nil)
868
+
if err := dpopHelper.AddDPoPToRequest(req); err != nil {
869
+
t.Fatalf("Failed to add DPoP to request: %v", err)
870
+
}
871
+
872
+
_, err = ValidateBlobReadAccess(req, pds, mockClient)
873
+
if err == nil {
874
+
t.Error("Expected error for crew without read or write permission")
875
+
}
876
+
877
+
// Verify error message format
878
+
if !strings.Contains(err.Error(), "access denied for blob:read") {
879
+
t.Errorf("Expected structured error message, got: %v", err)
880
+
}
881
+
})
882
+
}
883
+
774
884
// TestValidateOwnerOrCrewAdmin tests admin permission checking
775
885
func TestValidateOwnerOrCrewAdmin(t *testing.T) {
776
886
ownerDID := "did:plc:owner123"