+15
-2
cmd/hold/main.go
+15
-2
cmd/hold/main.go
···
13
13
"atcr.io/pkg/hold"
14
14
"atcr.io/pkg/hold/oci"
15
15
"atcr.io/pkg/hold/pds"
16
+
"atcr.io/pkg/hold/quota"
16
17
"atcr.io/pkg/logging"
17
18
"atcr.io/pkg/s3"
18
19
···
98
99
os.Exit(1)
99
100
}
100
101
102
+
// Initialize quota manager from quotas.yaml
103
+
quotaMgr, err := quota.NewManager("./quotas.yaml")
104
+
if err != nil {
105
+
slog.Error("Failed to load quota config", "error", err)
106
+
os.Exit(1)
107
+
}
108
+
if quotaMgr.IsEnabled() {
109
+
slog.Info("Quota enforcement enabled", "berths", quotaMgr.BerthCount(), "defaultBerth", quotaMgr.GetDefaultBerth())
110
+
} else {
111
+
slog.Info("Quota enforcement disabled (no quotas.yaml found)")
112
+
}
113
+
101
114
// Create blob store adapter and XRPC handlers
102
115
var ociHandler *oci.XRPCHandler
103
116
if holdPDS != nil {
···
116
129
}
117
130
118
131
// Create PDS XRPC handler (ATProto endpoints)
119
-
xrpcHandler = pds.NewXRPCHandler(holdPDS, *s3Service, driver, broadcaster, nil)
132
+
xrpcHandler = pds.NewXRPCHandler(holdPDS, *s3Service, driver, broadcaster, nil, quotaMgr)
120
133
121
134
// Create OCI XRPC handler (multipart upload endpoints)
122
-
ociHandler = oci.NewXRPCHandler(holdPDS, *s3Service, driver, cfg.Server.DisablePresignedURLs, cfg.Registration.EnableBlueskyPosts, nil)
135
+
ociHandler = oci.NewXRPCHandler(holdPDS, *s3Service, driver, cfg.Server.DisablePresignedURLs, cfg.Registration.EnableBlueskyPosts, nil, quotaMgr)
123
136
}
124
137
125
138
// Setup HTTP routes with chi router
+1
deploy/docker-compose.prod.yml
+1
deploy/docker-compose.prod.yml
+35
deploy/quotas.yaml
+35
deploy/quotas.yaml
···
1
+
# ATCR Hold Service Quota Configuration
2
+
# Copy this file to quotas.yaml to enable quota enforcement.
3
+
# If quotas.yaml doesn't exist, quotas are disabled (unlimited for all users).
4
+
5
+
# Berths define quota tiers using nautical crew ranks.
6
+
# Each berth has a quota limit specified in human-readable format.
7
+
# Supported units: B, KB, MB, GB, TB, PB (case-insensitive)
8
+
berths:
9
+
# Entry-level crew - suitable for new or casual users
10
+
deckhand:
11
+
quota: 5GB
12
+
13
+
# Mid-level crew - for regular contributors
14
+
bosun:
15
+
quota: 50GB
16
+
17
+
# Senior crew - for power users or trusted contributors
18
+
quartermaster:
19
+
quota: 100GB
20
+
21
+
# You can add custom berths with any name:
22
+
# unlimited_crew:
23
+
# quota: 1TB
24
+
25
+
defaults:
26
+
# Default berth assigned to new crew members who don't have an explicit berth.
27
+
# This berth must exist in the berths section above.
28
+
new_crew_berth: deckhand
29
+
30
+
# Notes:
31
+
# - The hold captain (owner) always has unlimited quota regardless of berths.
32
+
# - Crew members can be assigned a specific berth in their crew record.
33
+
# - If a crew member's berth doesn't exist in config, they fall back to the default.
34
+
# - Quota is calculated per-user by summing unique blob sizes (deduplicated).
35
+
# - Quota is checked when pushing manifests (after blobs are already uploaded).
+41
-6
docs/QUOTAS.md
+41
-6
docs/QUOTAS.md
···
507
507
- Email/webhook notifications
508
508
- Grace period before hard enforcement
509
509
510
-
### 3. Tiered Quotas
510
+
### 3. Berth-Based Quotas (Implemented)
511
+
512
+
ATCR uses nautical-themed "berths" for quota tiers, configured via `quotas.yaml`:
513
+
514
+
```yaml
515
+
# quotas.yaml
516
+
berths:
517
+
deckhand: # Entry-level crew
518
+
quota: 5GB
519
+
bosun: # Mid-level crew
520
+
quota: 50GB
521
+
quartermaster: # High-level crew
522
+
quota: 100GB
523
+
524
+
defaults:
525
+
new_crew_berth: deckhand # Default berth for new crew members
526
+
```
527
+
528
+
| Berth | Limit | Description |
529
+
|-------|-------|-------------|
530
+
| deckhand | 5 GB | Entry-level crew member |
531
+
| bosun | 50 GB | Mid-level crew member |
532
+
| quartermaster | 100 GB | Senior crew member |
533
+
| owner (captain) | Unlimited | Hold owner always has unlimited |
511
534
512
-
| Tier | Limit |
513
-
|------|-------|
514
-
| Free | 10 GB |
515
-
| Pro | 100 GB |
516
-
| Enterprise | Unlimited |
535
+
**Berth Resolution:**
536
+
1. If user is captain (owner) → unlimited
537
+
2. If crew member has explicit berth → use that berth's limit
538
+
3. If crew member has no berth → use `defaults.new_crew_berth`
539
+
4. If default berth not found → unlimited
540
+
541
+
**Crew Record Example:**
542
+
```json
543
+
{
544
+
"$type": "io.atcr.hold.crew",
545
+
"member": "did:plc:alice123",
546
+
"role": "writer",
547
+
"permissions": ["blob:write"],
548
+
"berth": "bosun",
549
+
"addedAt": "2026-01-04T12:00:00Z"
550
+
}
551
+
```
517
552
518
553
### 4. Rate Limiting
519
554
+5
lexicons/io/atcr/hold/crew.json
+5
lexicons/io/atcr/hold/crew.json
···
29
29
"maxLength": 64
30
30
}
31
31
},
32
+
"berth": {
33
+
"type": "string",
34
+
"description": "Optional berth (nautical rank) for quota limits (e.g., 'deckhand', 'bosun', 'quartermaster'). If empty, uses defaults.new_crew_berth from quotas.yaml.",
35
+
"maxLength": 32
36
+
},
32
37
"addedAt": {
33
38
"type": "string",
34
39
"format": "datetime",
+30
-6
pkg/appview/handlers/storage.go
+30
-6
pkg/appview/handlers/storage.go
···
25
25
UserDID string `json:"userDid"`
26
26
UniqueBlobs int `json:"uniqueBlobs"`
27
27
TotalSize int64 `json:"totalSize"`
28
+
Limit *int64 `json:"limit,omitempty"` // nil = unlimited
29
+
Berth string `json:"berth,omitempty"` // e.g., "deckhand", "bosun", "owner"
28
30
}
29
31
30
32
func (h *StorageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
···
87
89
}
88
90
89
91
func (h *StorageHandler) renderStats(w http.ResponseWriter, stats QuotaStats) {
92
+
// Calculate usage percentage if limit exists
93
+
var usagePercent int
94
+
var hasLimit bool
95
+
var humanLimit string
96
+
97
+
if stats.Limit != nil && *stats.Limit > 0 {
98
+
hasLimit = true
99
+
humanLimit = humanizeBytes(*stats.Limit)
100
+
usagePercent = int(float64(stats.TotalSize) / float64(*stats.Limit) * 100)
101
+
if usagePercent > 100 {
102
+
usagePercent = 100
103
+
}
104
+
}
105
+
90
106
data := struct {
91
-
UniqueBlobs int
92
-
TotalSize int64
93
-
HumanSize string
107
+
UniqueBlobs int
108
+
TotalSize int64
109
+
HumanSize string
110
+
HasLimit bool
111
+
HumanLimit string
112
+
UsagePercent int
113
+
Berth string
94
114
}{
95
-
UniqueBlobs: stats.UniqueBlobs,
96
-
TotalSize: stats.TotalSize,
97
-
HumanSize: humanizeBytes(stats.TotalSize),
115
+
UniqueBlobs: stats.UniqueBlobs,
116
+
TotalSize: stats.TotalSize,
117
+
HumanSize: humanizeBytes(stats.TotalSize),
118
+
HasLimit: hasLimit,
119
+
HumanLimit: humanLimit,
120
+
UsagePercent: usagePercent,
121
+
Berth: stats.Berth,
98
122
}
99
123
100
124
w.Header().Set("Content-Type", "text/html")
+65
pkg/appview/templates/pages/settings.html
+65
pkg/appview/templates/pages/settings.html
···
259
259
to { transform: rotate(360deg); }
260
260
}
261
261
262
+
/* Quota Progress Bar */
263
+
.storage-section .quota-progress {
264
+
display: flex;
265
+
align-items: center;
266
+
gap: 0.75rem;
267
+
padding: 0.75rem 0;
268
+
}
269
+
.storage-section .progress-bar {
270
+
flex: 1;
271
+
height: 8px;
272
+
background: var(--border);
273
+
border-radius: 4px;
274
+
overflow: hidden;
275
+
}
276
+
.storage-section .progress-fill {
277
+
height: 100%;
278
+
border-radius: 4px;
279
+
transition: width 0.3s ease;
280
+
}
281
+
.storage-section .progress-ok {
282
+
background: #22c55e;
283
+
}
284
+
.storage-section .progress-warning {
285
+
background: #eab308;
286
+
}
287
+
.storage-section .progress-danger {
288
+
background: #ef4444;
289
+
}
290
+
.storage-section .progress-text {
291
+
font-size: 0.85rem;
292
+
color: var(--fg-muted);
293
+
white-space: nowrap;
294
+
}
295
+
296
+
/* Berth Badge */
297
+
.storage-section .berth-badge {
298
+
text-transform: capitalize;
299
+
padding: 0.125rem 0.5rem;
300
+
border-radius: 4px;
301
+
font-size: 0.85rem;
302
+
background: var(--accent-bg, #e0f2fe);
303
+
color: var(--accent, #0369a1);
304
+
}
305
+
.storage-section .berth-owner {
306
+
background: #fef3c7;
307
+
color: #92400e;
308
+
}
309
+
.storage-section .berth-quartermaster {
310
+
background: #dcfce7;
311
+
color: #166534;
312
+
}
313
+
.storage-section .berth-bosun {
314
+
background: #e0e7ff;
315
+
color: #3730a3;
316
+
}
317
+
.storage-section .unlimited-badge {
318
+
font-size: 0.75rem;
319
+
padding: 0.125rem 0.375rem;
320
+
background: #22c55e;
321
+
color: #fff;
322
+
border-radius: 3px;
323
+
margin-left: 0.25rem;
324
+
font-weight: 500;
325
+
}
326
+
262
327
/* Devices Section Styles */
263
328
.devices-section .setup-instructions {
264
329
margin: 1rem 0;
+24
-4
pkg/appview/templates/partials/storage_stats.html
+24
-4
pkg/appview/templates/partials/storage_stats.html
···
1
1
{{ define "storage_stats" }}
2
2
<div class="storage-stats">
3
+
{{ if .Berth }}
3
4
<div class="stat-row">
4
-
<span class="stat-label">Unique Blobs:</span>
5
-
<span class="stat-value">{{ .UniqueBlobs }}</span>
5
+
<span class="stat-label">Berth:</span>
6
+
<span class="stat-value berth-badge berth-{{ .Berth }}">{{ .Berth }}</span>
6
7
</div>
8
+
{{ end }}
7
9
<div class="stat-row">
8
-
<span class="stat-label">Total Storage:</span>
9
-
<span class="stat-value">{{ .HumanSize }}</span>
10
+
<span class="stat-label">Storage:</span>
11
+
<span class="stat-value">
12
+
{{ if .HasLimit }}
13
+
{{ .HumanSize }} / {{ .HumanLimit }}
14
+
{{ else }}
15
+
{{ .HumanSize }} <span class="unlimited-badge">Unlimited</span>
16
+
{{ end }}
17
+
</span>
18
+
</div>
19
+
{{ if .HasLimit }}
20
+
<div class="quota-progress">
21
+
<div class="progress-bar">
22
+
<div class="progress-fill {{ if ge .UsagePercent 95 }}progress-danger{{ else if ge .UsagePercent 80 }}progress-warning{{ else }}progress-ok{{ end }}" style="width: {{ .UsagePercent }}%"></div>
23
+
</div>
24
+
<span class="progress-text">{{ .UsagePercent }}% used</span>
25
+
</div>
26
+
{{ end }}
27
+
<div class="stat-row">
28
+
<span class="stat-label">Unique Blobs:</span>
29
+
<span class="stat-value">{{ .UniqueBlobs }}</span>
10
30
</div>
11
31
</div>
12
32
{{ end }}
+43
-1
pkg/atproto/cbor_gen.go
+43
-1
pkg/atproto/cbor_gen.go
···
25
25
}
26
26
27
27
cw := cbg.NewCborWriter(w)
28
+
fieldCount := 6
28
29
29
-
if _, err := cw.Write([]byte{165}); err != nil {
30
+
if t.Berth == "" {
31
+
fieldCount--
32
+
}
33
+
34
+
if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil {
30
35
return err
31
36
}
32
37
···
74
79
}
75
80
if _, err := cw.WriteString(string(t.Type)); err != nil {
76
81
return err
82
+
}
83
+
84
+
// t.Berth (string) (string)
85
+
if t.Berth != "" {
86
+
87
+
if len("berth") > 8192 {
88
+
return xerrors.Errorf("Value in field \"berth\" was too long")
89
+
}
90
+
91
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("berth"))); err != nil {
92
+
return err
93
+
}
94
+
if _, err := cw.WriteString(string("berth")); err != nil {
95
+
return err
96
+
}
97
+
98
+
if len(t.Berth) > 8192 {
99
+
return xerrors.Errorf("Value in field t.Berth was too long")
100
+
}
101
+
102
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Berth))); err != nil {
103
+
return err
104
+
}
105
+
if _, err := cw.WriteString(string(t.Berth)); err != nil {
106
+
return err
107
+
}
77
108
}
78
109
79
110
// t.Member (string) (string)
···
219
250
}
220
251
221
252
t.Type = string(sval)
253
+
}
254
+
// t.Berth (string) (string)
255
+
case "berth":
256
+
257
+
{
258
+
sval, err := cbg.ReadStringWithMax(cr, 8192)
259
+
if err != nil {
260
+
return err
261
+
}
262
+
263
+
t.Berth = string(sval)
222
264
}
223
265
// t.Member (string) (string)
224
266
case "member":
+2
-1
pkg/atproto/lexicon.go
+2
-1
pkg/atproto/lexicon.go
···
594
594
Member string `json:"member" cborgen:"member"`
595
595
Role string `json:"role" cborgen:"role"`
596
596
Permissions []string `json:"permissions" cborgen:"permissions"`
597
-
AddedAt string `json:"addedAt" cborgen:"addedAt"` // RFC3339 timestamp
597
+
Berth string `json:"berth,omitempty" cborgen:"berth,omitempty"` // Optional berth for quota limits (nautical rank)
598
+
AddedAt string `json:"addedAt" cborgen:"addedAt"` // RFC3339 timestamp
598
599
}
599
600
600
601
// LayerRecord represents metadata about a container layer stored in the hold
+2
-7
pkg/auth/oauth/client.go
+2
-7
pkg/auth/oauth/client.go
···
47
47
return nil, fmt.Errorf("failed to configure confidential client: %w", err)
48
48
}
49
49
50
-
// Log clock information for debugging timestamp issues
51
-
now := time.Now()
52
50
slog.Info("Configured confidential OAuth client",
53
51
"key_id", keyID,
54
52
"key_path", keyPath,
55
-
"system_time_unix", now.Unix(),
56
-
"system_time_rfc3339", now.Format(time.RFC3339),
57
-
"timezone", now.Location().String())
53
+
)
58
54
} else {
59
55
config = oauth.NewLocalhostConfig(redirectURI, scopes)
60
56
···
78
74
func GetDefaultScopes(did string) []string {
79
75
return []string{
80
76
"atproto",
81
-
// Permission-set (for future PDS support)
77
+
// Permission-set
82
78
// See lexicons/io/atcr/authFullApp.json for definition
83
-
// Uses "include:" prefix per ATProto permission spec
84
79
"include:io.atcr.authFullApp",
85
80
// com.atproto scopes must be separate (permission-sets are namespace-limited)
86
81
"rpc:com.atproto.repo.getRecord?aud=*",
+22
-1
pkg/hold/oci/xrpc.go
+22
-1
pkg/hold/oci/xrpc.go
···
9
9
10
10
"atcr.io/pkg/atproto"
11
11
"atcr.io/pkg/hold/pds"
12
+
"atcr.io/pkg/hold/quota"
12
13
"atcr.io/pkg/s3"
13
14
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
14
15
"github.com/go-chi/chi/v5"
···
23
24
pds *pds.HoldPDS
24
25
httpClient pds.HTTPClient
25
26
enableBlueskyPosts bool
27
+
quotaMgr *quota.Manager // Quota manager for berth-based limits
26
28
}
27
29
28
30
// NewXRPCHandler creates a new OCI XRPC handler
29
-
func NewXRPCHandler(holdPDS *pds.HoldPDS, s3Service s3.S3Service, driver storagedriver.StorageDriver, disablePresignedURLs bool, enableBlueskyPosts bool, httpClient pds.HTTPClient) *XRPCHandler {
31
+
func NewXRPCHandler(holdPDS *pds.HoldPDS, s3Service s3.S3Service, driver storagedriver.StorageDriver, disablePresignedURLs bool, enableBlueskyPosts bool, httpClient pds.HTTPClient, quotaMgr *quota.Manager) *XRPCHandler {
30
32
return &XRPCHandler{
31
33
driver: driver,
32
34
disablePresignedURLs: disablePresignedURLs,
···
35
37
pds: holdPDS,
36
38
httpClient: httpClient,
37
39
enableBlueskyPosts: enableBlueskyPosts,
40
+
quotaMgr: quotaMgr,
38
41
}
39
42
}
40
43
···
276
279
277
280
// Only create layer records and Bluesky posts for pushes
278
281
if operation == "push" {
282
+
// Soft limit check: block if ALREADY over quota
283
+
// (blobs already uploaded to S3 by this point, no sense rejecting)
284
+
stats, err := h.pds.GetQuotaForUserWithBerth(ctx, req.UserDID, h.quotaMgr)
285
+
if err == nil && stats.Limit != nil && stats.TotalSize > *stats.Limit {
286
+
slog.Warn("Quota exceeded for push",
287
+
"userDid", req.UserDID,
288
+
"currentUsage", stats.TotalSize,
289
+
"limit", *stats.Limit,
290
+
"repository", req.Repository,
291
+
"tag", req.Tag,
292
+
)
293
+
RespondError(w, http.StatusForbidden, fmt.Sprintf(
294
+
"quota exceeded: current=%d bytes, limit=%d bytes. Delete images to free space.",
295
+
stats.TotalSize, *stats.Limit,
296
+
))
297
+
return
298
+
}
299
+
279
300
// Check if manifest posts are enabled
280
301
// Read from captain record (which is synced with HOLD_BLUESKY_POSTS_ENABLED env var)
281
302
postsEnabled := false
+1
-1
pkg/hold/oci/xrpc_test.go
+1
-1
pkg/hold/oci/xrpc_test.go
···
127
127
128
128
// Create OCI handler with buffered mode (no S3)
129
129
mockS3 := s3.S3Service{}
130
-
handler := NewXRPCHandler(holdPDS, mockS3, driver, true, false, mockClient)
130
+
handler := NewXRPCHandler(holdPDS, mockS3, driver, true, false, mockClient, nil)
131
131
132
132
return handler, ctx
133
133
}
+52
pkg/hold/pds/layer.go
+52
pkg/hold/pds/layer.go
···
5
5
"fmt"
6
6
7
7
"atcr.io/pkg/atproto"
8
+
"atcr.io/pkg/hold/quota"
8
9
lexutil "github.com/bluesky-social/indigo/lex/util"
9
10
"github.com/bluesky-social/indigo/repo"
10
11
)
···
65
66
UserDID string `json:"userDid"`
66
67
UniqueBlobs int `json:"uniqueBlobs"`
67
68
TotalSize int64 `json:"totalSize"`
69
+
Limit *int64 `json:"limit,omitempty"` // nil = unlimited
70
+
Berth string `json:"berth,omitempty"` // nautical rank for quota tier
68
71
}
69
72
70
73
// GetQuotaForUser calculates storage quota for a specific user
···
160
163
TotalSize: totalSize,
161
164
}, nil
162
165
}
166
+
167
+
// GetQuotaForUserWithBerth calculates quota with berth-aware limits
168
+
// It returns the base quota stats plus the berth limit and berth name.
169
+
// Captain (owner) always has unlimited quota.
170
+
func (p *HoldPDS) GetQuotaForUserWithBerth(ctx context.Context, userDID string, quotaMgr *quota.Manager) (*QuotaStats, error) {
171
+
// Get base stats
172
+
stats, err := p.GetQuotaForUser(ctx, userDID)
173
+
if err != nil {
174
+
return nil, err
175
+
}
176
+
177
+
// If quota manager is nil or disabled, return unlimited
178
+
if quotaMgr == nil || !quotaMgr.IsEnabled() {
179
+
return stats, nil
180
+
}
181
+
182
+
// Check if user is captain (owner) - always unlimited
183
+
_, captain, err := p.GetCaptainRecord(ctx)
184
+
if err == nil && captain.Owner == userDID {
185
+
stats.Berth = "owner"
186
+
// Limit remains nil (unlimited)
187
+
return stats, nil
188
+
}
189
+
190
+
// Get crew record to find berth
191
+
crewBerth := p.getCrewBerth(ctx, userDID)
192
+
193
+
// Resolve limit from quota manager
194
+
stats.Limit = quotaMgr.GetBerthLimit(crewBerth)
195
+
stats.Berth = quotaMgr.GetBerthName(crewBerth)
196
+
197
+
return stats, nil
198
+
}
199
+
200
+
// getCrewBerth returns the berth for a crew member, or empty string if not found
201
+
func (p *HoldPDS) getCrewBerth(ctx context.Context, userDID string) string {
202
+
crewMembers, err := p.ListCrewMembers(ctx)
203
+
if err != nil {
204
+
return ""
205
+
}
206
+
207
+
for _, member := range crewMembers {
208
+
if member.Record.Member == userDID {
209
+
return member.Record.Berth
210
+
}
211
+
}
212
+
213
+
return ""
214
+
}
+436
pkg/hold/pds/layer_test.go
+436
pkg/hold/pds/layer_test.go
···
1
1
package pds
2
2
3
3
import (
4
+
"os"
5
+
"path/filepath"
4
6
"testing"
5
7
6
8
"atcr.io/pkg/atproto"
9
+
"atcr.io/pkg/hold/quota"
7
10
)
8
11
9
12
func TestCreateLayerRecord(t *testing.T) {
···
281
284
})
282
285
}
283
286
}
287
+
288
+
// setupTestPDSWithIndex creates a PDS with file-based database (enables RecordsIndex)
289
+
// and bootstraps it with the given owner. Required for quota tests.
290
+
func setupTestPDSWithIndex(t *testing.T, ownerDID string) (*HoldPDS, func()) {
291
+
t.Helper()
292
+
293
+
ctx := sharedCtx
294
+
tmpDir := t.TempDir()
295
+
296
+
// Use file-based database to enable RecordsIndex
297
+
dbPath := filepath.Join(tmpDir, "pds.db")
298
+
keyPath := filepath.Join(tmpDir, "signing-key")
299
+
300
+
// Copy shared signing key
301
+
if err := os.WriteFile(keyPath, sharedTestKey, 0600); err != nil {
302
+
t.Fatalf("Failed to copy shared signing key: %v", err)
303
+
}
304
+
305
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
306
+
if err != nil {
307
+
t.Fatalf("Failed to create test PDS: %v", err)
308
+
}
309
+
310
+
// Bootstrap with owner
311
+
if err := pds.Bootstrap(ctx, nil, ownerDID, true, false, ""); err != nil {
312
+
t.Fatalf("Failed to bootstrap PDS: %v", err)
313
+
}
314
+
315
+
// Wire up records indexing
316
+
indexingHandler := pds.CreateRecordsIndexEventHandler(nil)
317
+
pds.RepomgrRef().SetEventHandler(indexingHandler, true)
318
+
319
+
// Backfill index from MST
320
+
if err := pds.BackfillRecordsIndex(ctx); err != nil {
321
+
t.Fatalf("Failed to backfill records index: %v", err)
322
+
}
323
+
324
+
cleanup := func() {
325
+
pds.Close()
326
+
}
327
+
328
+
return pds, cleanup
329
+
}
330
+
331
+
// addCrewMemberWithBerth adds a crew member with a specific berth (nautical rank)
332
+
func addCrewMemberWithBerth(t *testing.T, pds *HoldPDS, memberDID, role string, permissions []string, berth string) {
333
+
t.Helper()
334
+
335
+
crewRecord := &atproto.CrewRecord{
336
+
Type: atproto.CrewCollection,
337
+
Member: memberDID,
338
+
Role: role,
339
+
Permissions: permissions,
340
+
Berth: berth,
341
+
AddedAt: "2026-01-04T12:00:00Z",
342
+
}
343
+
344
+
_, _, err := pds.repomgr.CreateRecord(sharedCtx, pds.uid, atproto.CrewCollection, crewRecord)
345
+
if err != nil {
346
+
t.Fatalf("Failed to add crew member with berth: %v", err)
347
+
}
348
+
}
349
+
350
+
func TestGetQuotaForUserWithBerth_OwnerUnlimited(t *testing.T) {
351
+
ownerDID := "did:plc:owner123"
352
+
pds, cleanup := setupTestPDSWithIndex(t, ownerDID)
353
+
defer cleanup()
354
+
355
+
ctx := sharedCtx
356
+
357
+
// Create quota manager with config
358
+
tmpDir := t.TempDir()
359
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
360
+
configContent := `
361
+
berths:
362
+
deckhand:
363
+
quota: 5GB
364
+
bosun:
365
+
quota: 50GB
366
+
367
+
defaults:
368
+
new_crew_berth: deckhand
369
+
`
370
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
371
+
t.Fatalf("Failed to write quota config: %v", err)
372
+
}
373
+
374
+
quotaMgr, err := quota.NewManager(configPath)
375
+
if err != nil {
376
+
t.Fatalf("Failed to create quota manager: %v", err)
377
+
}
378
+
379
+
// Create layer records for owner
380
+
for i := 0; i < 3; i++ {
381
+
record := atproto.NewLayerRecord(
382
+
"sha256:owner"+string(rune('a'+i)),
383
+
1024*1024*100, // 100MB each
384
+
"application/vnd.oci.image.layer.v1.tar+gzip",
385
+
ownerDID,
386
+
"at://"+ownerDID+"/io.atcr.manifest/test123",
387
+
)
388
+
if _, _, err := pds.CreateLayerRecord(ctx, record); err != nil {
389
+
t.Fatalf("Failed to create layer record: %v", err)
390
+
}
391
+
}
392
+
393
+
// Get quota for owner
394
+
stats, err := pds.GetQuotaForUserWithBerth(ctx, ownerDID, quotaMgr)
395
+
if err != nil {
396
+
t.Fatalf("GetQuotaForUserWithBerth failed: %v", err)
397
+
}
398
+
399
+
// Owner should have unlimited quota (nil limit)
400
+
if stats.Limit != nil {
401
+
t.Errorf("Expected nil limit for owner, got %d", *stats.Limit)
402
+
}
403
+
404
+
// Berth should be "owner"
405
+
if stats.Berth != "owner" {
406
+
t.Errorf("Expected berth 'owner', got %q", stats.Berth)
407
+
}
408
+
409
+
// Should have 3 unique blobs
410
+
if stats.UniqueBlobs != 3 {
411
+
t.Errorf("Expected 3 unique blobs, got %d", stats.UniqueBlobs)
412
+
}
413
+
414
+
// Total size should be 300MB
415
+
expectedSize := int64(3 * 100 * 1024 * 1024)
416
+
if stats.TotalSize != expectedSize {
417
+
t.Errorf("Expected total size %d, got %d", expectedSize, stats.TotalSize)
418
+
}
419
+
420
+
t.Logf("Owner quota stats: %+v", stats)
421
+
}
422
+
423
+
func TestGetQuotaForUserWithBerth_CrewWithDefaultBerth(t *testing.T) {
424
+
ownerDID := "did:plc:owner456"
425
+
crewDID := "did:plc:crew123"
426
+
pds, cleanup := setupTestPDSWithIndex(t, ownerDID)
427
+
defer cleanup()
428
+
429
+
ctx := sharedCtx
430
+
431
+
// Create quota manager
432
+
tmpDir := t.TempDir()
433
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
434
+
configContent := `
435
+
berths:
436
+
deckhand:
437
+
quota: 5GB
438
+
bosun:
439
+
quota: 50GB
440
+
441
+
defaults:
442
+
new_crew_berth: deckhand
443
+
`
444
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
445
+
t.Fatalf("Failed to write quota config: %v", err)
446
+
}
447
+
448
+
quotaMgr, err := quota.NewManager(configPath)
449
+
if err != nil {
450
+
t.Fatalf("Failed to create quota manager: %v", err)
451
+
}
452
+
453
+
// Add crew member with no berth (should use default)
454
+
addCrewMemberWithBerth(t, pds, crewDID, "writer", []string{"blob:write"}, "")
455
+
456
+
// Create layer records for crew member
457
+
for i := 0; i < 2; i++ {
458
+
record := atproto.NewLayerRecord(
459
+
"sha256:crew"+string(rune('a'+i)),
460
+
1024*1024*50, // 50MB each
461
+
"application/vnd.oci.image.layer.v1.tar+gzip",
462
+
crewDID,
463
+
"at://"+crewDID+"/io.atcr.manifest/test456",
464
+
)
465
+
if _, _, err := pds.CreateLayerRecord(ctx, record); err != nil {
466
+
t.Fatalf("Failed to create layer record: %v", err)
467
+
}
468
+
}
469
+
470
+
// Get quota for crew member
471
+
stats, err := pds.GetQuotaForUserWithBerth(ctx, crewDID, quotaMgr)
472
+
if err != nil {
473
+
t.Fatalf("GetQuotaForUserWithBerth failed: %v", err)
474
+
}
475
+
476
+
// Should have 5GB limit (deckhand berth)
477
+
expectedLimit := int64(5 * 1024 * 1024 * 1024)
478
+
if stats.Limit == nil {
479
+
t.Fatal("Expected non-nil limit for crew member")
480
+
}
481
+
if *stats.Limit != expectedLimit {
482
+
t.Errorf("Expected limit %d, got %d", expectedLimit, *stats.Limit)
483
+
}
484
+
485
+
// Berth should be "deckhand"
486
+
if stats.Berth != "deckhand" {
487
+
t.Errorf("Expected berth 'deckhand', got %q", stats.Berth)
488
+
}
489
+
490
+
// Should have 2 unique blobs
491
+
if stats.UniqueBlobs != 2 {
492
+
t.Errorf("Expected 2 unique blobs, got %d", stats.UniqueBlobs)
493
+
}
494
+
495
+
t.Logf("Crew (deckhand berth) quota stats: %+v", stats)
496
+
}
497
+
498
+
func TestGetQuotaForUserWithBerth_CrewWithExplicitBerth(t *testing.T) {
499
+
ownerDID := "did:plc:owner789"
500
+
crewDID := "did:plc:bosuncrew456"
501
+
pds, cleanup := setupTestPDSWithIndex(t, ownerDID)
502
+
defer cleanup()
503
+
504
+
ctx := sharedCtx
505
+
506
+
// Create quota manager
507
+
tmpDir := t.TempDir()
508
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
509
+
configContent := `
510
+
berths:
511
+
deckhand:
512
+
quota: 5GB
513
+
bosun:
514
+
quota: 50GB
515
+
516
+
defaults:
517
+
new_crew_berth: deckhand
518
+
`
519
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
520
+
t.Fatalf("Failed to write quota config: %v", err)
521
+
}
522
+
523
+
quotaMgr, err := quota.NewManager(configPath)
524
+
if err != nil {
525
+
t.Fatalf("Failed to create quota manager: %v", err)
526
+
}
527
+
528
+
// Add crew member with explicit "bosun" berth
529
+
addCrewMemberWithBerth(t, pds, crewDID, "writer", []string{"blob:write"}, "bosun")
530
+
531
+
// Create layer records for crew member
532
+
record := atproto.NewLayerRecord(
533
+
"sha256:bosunlayer1",
534
+
1024*1024*1024, // 1GB
535
+
"application/vnd.oci.image.layer.v1.tar+gzip",
536
+
crewDID,
537
+
"at://"+crewDID+"/io.atcr.manifest/test789",
538
+
)
539
+
if _, _, err := pds.CreateLayerRecord(ctx, record); err != nil {
540
+
t.Fatalf("Failed to create layer record: %v", err)
541
+
}
542
+
543
+
// Get quota for crew member
544
+
stats, err := pds.GetQuotaForUserWithBerth(ctx, crewDID, quotaMgr)
545
+
if err != nil {
546
+
t.Fatalf("GetQuotaForUserWithBerth failed: %v", err)
547
+
}
548
+
549
+
// Should have 50GB limit (bosun berth)
550
+
expectedLimit := int64(50 * 1024 * 1024 * 1024)
551
+
if stats.Limit == nil {
552
+
t.Fatal("Expected non-nil limit for crew member")
553
+
}
554
+
if *stats.Limit != expectedLimit {
555
+
t.Errorf("Expected limit %d, got %d", expectedLimit, *stats.Limit)
556
+
}
557
+
558
+
// Berth should be "bosun"
559
+
if stats.Berth != "bosun" {
560
+
t.Errorf("Expected berth 'bosun', got %q", stats.Berth)
561
+
}
562
+
563
+
t.Logf("Crew (bosun berth) quota stats: %+v", stats)
564
+
}
565
+
566
+
func TestGetQuotaForUserWithBerth_NoQuotaManager(t *testing.T) {
567
+
ownerDID := "did:plc:ownerabc"
568
+
crewDID := "did:plc:crewabc"
569
+
pds, cleanup := setupTestPDSWithIndex(t, ownerDID)
570
+
defer cleanup()
571
+
572
+
ctx := sharedCtx
573
+
574
+
// Add crew member
575
+
addCrewMemberWithBerth(t, pds, crewDID, "writer", []string{"blob:write"}, "deckhand")
576
+
577
+
// Create layer record
578
+
record := atproto.NewLayerRecord(
579
+
"sha256:noquotalayer1",
580
+
1024*1024*100,
581
+
"application/vnd.oci.image.layer.v1.tar+gzip",
582
+
crewDID,
583
+
"at://"+crewDID+"/io.atcr.manifest/testabc",
584
+
)
585
+
if _, _, err := pds.CreateLayerRecord(ctx, record); err != nil {
586
+
t.Fatalf("Failed to create layer record: %v", err)
587
+
}
588
+
589
+
// Get quota with nil quota manager (no enforcement)
590
+
stats, err := pds.GetQuotaForUserWithBerth(ctx, crewDID, nil)
591
+
if err != nil {
592
+
t.Fatalf("GetQuotaForUserWithBerth failed: %v", err)
593
+
}
594
+
595
+
// Should have nil limit (unlimited)
596
+
if stats.Limit != nil {
597
+
t.Errorf("Expected nil limit when quota manager is nil, got %d", *stats.Limit)
598
+
}
599
+
600
+
// Berth should be empty
601
+
if stats.Berth != "" {
602
+
t.Errorf("Expected empty berth, got %q", stats.Berth)
603
+
}
604
+
605
+
t.Logf("No quota manager stats: %+v", stats)
606
+
}
607
+
608
+
func TestGetQuotaForUserWithBerth_DisabledQuotas(t *testing.T) {
609
+
ownerDID := "did:plc:ownerdef"
610
+
crewDID := "did:plc:crewdef"
611
+
pds, cleanup := setupTestPDSWithIndex(t, ownerDID)
612
+
defer cleanup()
613
+
614
+
ctx := sharedCtx
615
+
616
+
// Create quota manager with nonexistent config (disabled)
617
+
quotaMgr, err := quota.NewManager("/nonexistent/quotas.yaml")
618
+
if err != nil {
619
+
t.Fatalf("Failed to create quota manager: %v", err)
620
+
}
621
+
622
+
if quotaMgr.IsEnabled() {
623
+
t.Fatal("Expected quotas to be disabled")
624
+
}
625
+
626
+
// Add crew member
627
+
addCrewMemberWithBerth(t, pds, crewDID, "writer", []string{"blob:write"}, "bosun")
628
+
629
+
// Create layer record
630
+
record := atproto.NewLayerRecord(
631
+
"sha256:disabledlayer1",
632
+
1024*1024*100,
633
+
"application/vnd.oci.image.layer.v1.tar+gzip",
634
+
crewDID,
635
+
"at://"+crewDID+"/io.atcr.manifest/testdef",
636
+
)
637
+
if _, _, err := pds.CreateLayerRecord(ctx, record); err != nil {
638
+
t.Fatalf("Failed to create layer record: %v", err)
639
+
}
640
+
641
+
// Get quota with disabled quota manager
642
+
stats, err := pds.GetQuotaForUserWithBerth(ctx, crewDID, quotaMgr)
643
+
if err != nil {
644
+
t.Fatalf("GetQuotaForUserWithBerth failed: %v", err)
645
+
}
646
+
647
+
// Should have nil limit (unlimited when quotas disabled)
648
+
if stats.Limit != nil {
649
+
t.Errorf("Expected nil limit when quotas disabled, got %d", *stats.Limit)
650
+
}
651
+
652
+
t.Logf("Disabled quotas stats: %+v", stats)
653
+
}
654
+
655
+
func TestGetQuotaForUserWithBerth_DeduplicatesBlobs(t *testing.T) {
656
+
ownerDID := "did:plc:ownerghi"
657
+
crewDID := "did:plc:crewghi"
658
+
pds, cleanup := setupTestPDSWithIndex(t, ownerDID)
659
+
defer cleanup()
660
+
661
+
ctx := sharedCtx
662
+
663
+
// Create quota manager
664
+
tmpDir := t.TempDir()
665
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
666
+
configContent := `
667
+
berths:
668
+
deckhand:
669
+
quota: 5GB
670
+
671
+
defaults:
672
+
new_crew_berth: deckhand
673
+
`
674
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
675
+
t.Fatalf("Failed to write quota config: %v", err)
676
+
}
677
+
678
+
quotaMgr, err := quota.NewManager(configPath)
679
+
if err != nil {
680
+
t.Fatalf("Failed to create quota manager: %v", err)
681
+
}
682
+
683
+
// Add crew member
684
+
addCrewMemberWithBerth(t, pds, crewDID, "writer", []string{"blob:write"}, "")
685
+
686
+
// Create multiple layer records with same digest (should be deduplicated)
687
+
digest := "sha256:duplicatelayer"
688
+
for i := 0; i < 5; i++ {
689
+
record := atproto.NewLayerRecord(
690
+
digest,
691
+
1024*1024*100, // 100MB
692
+
"application/vnd.oci.image.layer.v1.tar+gzip",
693
+
crewDID,
694
+
"at://"+crewDID+"/io.atcr.manifest/manifest"+string(rune('a'+i)),
695
+
)
696
+
if _, _, err := pds.CreateLayerRecord(ctx, record); err != nil {
697
+
t.Fatalf("Failed to create layer record %d: %v", i, err)
698
+
}
699
+
}
700
+
701
+
// Get quota
702
+
stats, err := pds.GetQuotaForUserWithBerth(ctx, crewDID, quotaMgr)
703
+
if err != nil {
704
+
t.Fatalf("GetQuotaForUserWithBerth failed: %v", err)
705
+
}
706
+
707
+
// Should have 1 unique blob (deduplicated)
708
+
if stats.UniqueBlobs != 1 {
709
+
t.Errorf("Expected 1 unique blob (deduplicated), got %d", stats.UniqueBlobs)
710
+
}
711
+
712
+
// Total size should be 100MB (not 500MB)
713
+
expectedSize := int64(100 * 1024 * 1024)
714
+
if stats.TotalSize != expectedSize {
715
+
t.Errorf("Expected total size %d, got %d", expectedSize, stats.TotalSize)
716
+
}
717
+
718
+
t.Logf("Deduplicated quota stats: %+v", stats)
719
+
}
+2
-2
pkg/hold/pds/status_test.go
+2
-2
pkg/hold/pds/status_test.go
···
55
55
}
56
56
57
57
// Create handler for XRPC endpoints
58
-
handler := NewXRPCHandler(holdPDS, s3.S3Service{}, nil, nil, &mockPDSClient{})
58
+
handler := NewXRPCHandler(holdPDS, s3.S3Service{}, nil, nil, &mockPDSClient{}, nil)
59
59
60
60
// Helper function to list posts via XRPC
61
61
listPosts := func() ([]map[string]any, error) {
···
283
283
}
284
284
285
285
// Create shared handler
286
-
sharedHandler = NewXRPCHandler(sharedPDS, s3.S3Service{}, nil, nil, &mockPDSClient{})
286
+
sharedHandler = NewXRPCHandler(sharedPDS, s3.S3Service{}, nil, nil, &mockPDSClient{}, nil)
287
287
288
288
// Run tests
289
289
code := m.Run()
+8
-4
pkg/hold/pds/xrpc.go
+8
-4
pkg/hold/pds/xrpc.go
···
7
7
"fmt"
8
8
9
9
"atcr.io/pkg/atproto"
10
+
"atcr.io/pkg/hold/quota"
10
11
"atcr.io/pkg/s3"
11
12
"github.com/bluesky-social/indigo/api/bsky"
12
13
lexutil "github.com/bluesky-social/indigo/lex/util"
···
46
47
s3Service s3.S3Service
47
48
storageDriver driver.StorageDriver
48
49
broadcaster *EventBroadcaster
49
-
httpClient HTTPClient // For testing - allows injecting mock HTTP client
50
+
httpClient HTTPClient // For testing - allows injecting mock HTTP client
51
+
quotaMgr *quota.Manager // Quota manager for tier-based limits
50
52
}
51
53
52
54
// PartInfo represents a completed part in a multipart upload
···
64
66
}
65
67
66
68
// NewXRPCHandler creates a new XRPC handler
67
-
func NewXRPCHandler(pds *HoldPDS, s3Service s3.S3Service, storageDriver driver.StorageDriver, broadcaster *EventBroadcaster, httpClient HTTPClient) *XRPCHandler {
69
+
func NewXRPCHandler(pds *HoldPDS, s3Service s3.S3Service, storageDriver driver.StorageDriver, broadcaster *EventBroadcaster, httpClient HTTPClient, quotaMgr *quota.Manager) *XRPCHandler {
68
70
return &XRPCHandler{
69
71
pds: pds,
70
72
s3Service: s3Service,
71
73
storageDriver: storageDriver,
72
74
broadcaster: broadcaster,
73
75
httpClient: httpClient,
76
+
quotaMgr: quotaMgr,
74
77
}
75
78
}
76
79
···
1520
1523
// HandleGetQuota returns storage quota information for a user
1521
1524
// This calculates the total unique blob storage used by a specific user
1522
1525
// by iterating layer records and deduplicating by digest.
1526
+
// Also returns tier-aware quota limits if quotas.yaml is configured.
1523
1527
func (h *XRPCHandler) HandleGetQuota(w http.ResponseWriter, r *http.Request) {
1524
1528
userDID := r.URL.Query().Get("userDid")
1525
1529
if userDID == "" {
···
1533
1537
return
1534
1538
}
1535
1539
1536
-
// Get quota stats
1537
-
stats, err := h.pds.GetQuotaForUser(r.Context(), userDID)
1540
+
// Get quota stats with berth-aware limits
1541
+
stats, err := h.pds.GetQuotaForUserWithBerth(r.Context(), userDID, h.quotaMgr)
1538
1542
if err != nil {
1539
1543
slog.Error("Failed to get quota", "userDid", userDID, "error", err)
1540
1544
http.Error(w, fmt.Sprintf("failed to get quota: %v", err), http.StatusInternalServerError)
+6
-6
pkg/hold/pds/xrpc_test.go
+6
-6
pkg/hold/pds/xrpc_test.go
···
76
76
mockS3 := s3.S3Service{}
77
77
78
78
// Create XRPC handler with mock HTTP client
79
-
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient)
79
+
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient, nil)
80
80
81
81
return handler, ctx
82
82
}
···
143
143
mockS3 := s3.S3Service{}
144
144
145
145
// Create XRPC handler with mock HTTP client
146
-
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient)
146
+
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient, nil)
147
147
148
148
return handler, ctx
149
149
}
···
753
753
pds, ctx := setupTestPDS(t) // Don't bootstrap - no records created yet
754
754
mockClient := &mockPDSClient{}
755
755
mockS3 := s3.S3Service{}
756
-
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient)
756
+
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient, nil)
757
757
758
758
// Initialize repo manually (setupTestPDS doesn't call Bootstrap, so no crew members)
759
759
err := pds.repomgr.InitNewActor(ctx, pds.uid, "", pds.did, "", "", "")
···
1231
1231
pds, ctx := setupTestPDS(t) // Don't bootstrap
1232
1232
mockClient := &mockPDSClient{}
1233
1233
mockS3 := s3.S3Service{}
1234
-
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient)
1234
+
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient, nil)
1235
1235
1236
1236
// setupTestPDS creates the PDS/database but doesn't initialize the repo
1237
1237
// Check if implementation returns repos before initialization
···
1317
1317
pds, ctx := setupTestPDS(t) // Don't bootstrap
1318
1318
mockClient := &mockPDSClient{}
1319
1319
mockS3 := s3.S3Service{}
1320
-
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient)
1320
+
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient, nil)
1321
1321
holdDID := "did:web:hold.example.com"
1322
1322
1323
1323
// Initialize repo but don't add any records
···
2014
2014
mockClient := &mockPDSClient{}
2015
2015
2016
2016
// Create XRPC handler with mock s3 service and real filesystem driver
2017
-
handler := NewXRPCHandler(pds, mockS3Svc.toS3Service(), driver, nil, mockClient)
2017
+
handler := NewXRPCHandler(pds, mockS3Svc.toS3Service(), driver, nil, mockClient, nil)
2018
2018
2019
2019
return handler, mockS3Svc, ctx
2020
2020
}
+197
pkg/hold/quota/config.go
+197
pkg/hold/quota/config.go
···
1
+
package quota
2
+
3
+
import (
4
+
"fmt"
5
+
"os"
6
+
"regexp"
7
+
"strconv"
8
+
"strings"
9
+
10
+
"go.yaml.in/yaml/v4"
11
+
)
12
+
13
+
// Config represents the quotas.yaml configuration
14
+
type Config struct {
15
+
Berths map[string]BerthConfig `yaml:"berths"`
16
+
Defaults DefaultsConfig `yaml:"defaults"`
17
+
}
18
+
19
+
// BerthConfig represents a single berth's configuration
20
+
type BerthConfig struct {
21
+
Quota string `yaml:"quota"` // Human-readable size: "5GB", "50GB", etc.
22
+
}
23
+
24
+
// DefaultsConfig represents default settings
25
+
type DefaultsConfig struct {
26
+
NewCrewBerth string `yaml:"new_crew_berth"`
27
+
}
28
+
29
+
// Manager manages quota configuration and berth resolution
30
+
type Manager struct {
31
+
config *Config
32
+
berths map[string]int64 // resolved berth name -> bytes
33
+
}
34
+
35
+
// NewManager creates a quota manager, loading config from file if present
36
+
func NewManager(configPath string) (*Manager, error) {
37
+
m := &Manager{
38
+
berths: make(map[string]int64),
39
+
}
40
+
41
+
// Try to load config file
42
+
data, err := os.ReadFile(configPath)
43
+
if err != nil {
44
+
if os.IsNotExist(err) {
45
+
// No config file = no quotas enforced
46
+
return m, nil
47
+
}
48
+
return nil, fmt.Errorf("failed to read quota config: %w", err)
49
+
}
50
+
51
+
var cfg Config
52
+
if err := yaml.Unmarshal(data, &cfg); err != nil {
53
+
return nil, fmt.Errorf("failed to parse quota config: %w", err)
54
+
}
55
+
56
+
m.config = &cfg
57
+
58
+
// Parse and resolve all berths
59
+
for name, berth := range cfg.Berths {
60
+
bytes, err := ParseHumanBytes(berth.Quota)
61
+
if err != nil {
62
+
return nil, fmt.Errorf("invalid quota for berth %q: %w", name, err)
63
+
}
64
+
m.berths[name] = bytes
65
+
}
66
+
67
+
return m, nil
68
+
}
69
+
70
+
// IsEnabled returns true if quotas are being enforced
71
+
func (m *Manager) IsEnabled() bool {
72
+
return m.config != nil
73
+
}
74
+
75
+
// GetBerthLimit resolves the quota limit for a berth key
76
+
// Returns nil for unlimited (captain, no config, or berth not found with no default)
77
+
//
78
+
// Resolution order:
79
+
// 1. If quotas disabled → nil (unlimited)
80
+
// 2. If berthKey provided and found → return that berth's limit
81
+
// 3. If berthKey not found or empty → use defaults.new_crew_berth
82
+
// 4. If default berth not found → nil (unlimited)
83
+
func (m *Manager) GetBerthLimit(berthKey string) *int64 {
84
+
if !m.IsEnabled() {
85
+
return nil
86
+
}
87
+
88
+
// Try the provided berth key first
89
+
if berthKey != "" {
90
+
if limit, ok := m.berths[berthKey]; ok {
91
+
return &limit
92
+
}
93
+
}
94
+
95
+
// Fall back to default berth
96
+
if m.config.Defaults.NewCrewBerth != "" {
97
+
if limit, ok := m.berths[m.config.Defaults.NewCrewBerth]; ok {
98
+
return &limit
99
+
}
100
+
}
101
+
102
+
// No valid berth found - unlimited
103
+
return nil
104
+
}
105
+
106
+
// GetBerthName resolves the berth name for a berth key
107
+
// Returns the actual berth name being used (after fallback resolution)
108
+
func (m *Manager) GetBerthName(berthKey string) string {
109
+
if !m.IsEnabled() {
110
+
return ""
111
+
}
112
+
113
+
// Try the provided berth key first
114
+
if berthKey != "" {
115
+
if _, ok := m.berths[berthKey]; ok {
116
+
return berthKey
117
+
}
118
+
}
119
+
120
+
// Fall back to default berth
121
+
if m.config.Defaults.NewCrewBerth != "" {
122
+
if _, ok := m.berths[m.config.Defaults.NewCrewBerth]; ok {
123
+
return m.config.Defaults.NewCrewBerth
124
+
}
125
+
}
126
+
127
+
return ""
128
+
}
129
+
130
+
// GetDefaultBerth returns the default berth name for new crew members
131
+
func (m *Manager) GetDefaultBerth() string {
132
+
if m.config == nil {
133
+
return ""
134
+
}
135
+
return m.config.Defaults.NewCrewBerth
136
+
}
137
+
138
+
// BerthCount returns the number of configured berths
139
+
func (m *Manager) BerthCount() int {
140
+
return len(m.berths)
141
+
}
142
+
143
+
// ParseHumanBytes parses human-readable byte sizes like "5GB", "100MB", "1.5TB"
144
+
func ParseHumanBytes(s string) (int64, error) {
145
+
s = strings.TrimSpace(strings.ToUpper(s))
146
+
if s == "" {
147
+
return 0, fmt.Errorf("empty size string")
148
+
}
149
+
150
+
// Match number (with optional decimal) followed by optional unit
151
+
re := regexp.MustCompile(`^(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)?$`)
152
+
matches := re.FindStringSubmatch(s)
153
+
if matches == nil {
154
+
return 0, fmt.Errorf("invalid size format: %s", s)
155
+
}
156
+
157
+
value, err := strconv.ParseFloat(matches[1], 64)
158
+
if err != nil {
159
+
return 0, fmt.Errorf("invalid number: %w", err)
160
+
}
161
+
162
+
unit := matches[2]
163
+
if unit == "" {
164
+
unit = "B"
165
+
}
166
+
167
+
multipliers := map[string]float64{
168
+
"B": 1,
169
+
"KB": 1024,
170
+
"MB": 1024 * 1024,
171
+
"GB": 1024 * 1024 * 1024,
172
+
"TB": 1024 * 1024 * 1024 * 1024,
173
+
"PB": 1024 * 1024 * 1024 * 1024 * 1024,
174
+
}
175
+
176
+
mult, ok := multipliers[unit]
177
+
if !ok {
178
+
return 0, fmt.Errorf("unknown unit: %s", unit)
179
+
}
180
+
181
+
return int64(value * mult), nil
182
+
}
183
+
184
+
// FormatHumanBytes formats bytes as a human-readable string
185
+
func FormatHumanBytes(bytes int64) string {
186
+
const unit = 1024
187
+
if bytes < unit {
188
+
return fmt.Sprintf("%d B", bytes)
189
+
}
190
+
div, exp := int64(unit), 0
191
+
for n := bytes / unit; n >= unit; n /= unit {
192
+
div *= unit
193
+
exp++
194
+
}
195
+
units := []string{"KB", "MB", "GB", "TB", "PB"}
196
+
return fmt.Sprintf("%.1f %s", float64(bytes)/float64(div), units[exp])
197
+
}
+275
pkg/hold/quota/config_test.go
+275
pkg/hold/quota/config_test.go
···
1
+
package quota
2
+
3
+
import (
4
+
"os"
5
+
"path/filepath"
6
+
"testing"
7
+
)
8
+
9
+
func TestParseHumanBytes(t *testing.T) {
10
+
tests := []struct {
11
+
input string
12
+
expected int64
13
+
wantErr bool
14
+
}{
15
+
// Basic units
16
+
{"1024", 1024, false},
17
+
{"1024B", 1024, false},
18
+
{"1KB", 1024, false},
19
+
{"1MB", 1024 * 1024, false},
20
+
{"1GB", 1024 * 1024 * 1024, false},
21
+
{"1TB", 1024 * 1024 * 1024 * 1024, false},
22
+
{"1PB", 1024 * 1024 * 1024 * 1024 * 1024, false},
23
+
24
+
// Common sizes
25
+
{"5GB", 5 * 1024 * 1024 * 1024, false},
26
+
{"50GB", 50 * 1024 * 1024 * 1024, false},
27
+
{"100GB", 100 * 1024 * 1024 * 1024, false},
28
+
{"500MB", 500 * 1024 * 1024, false},
29
+
30
+
// Case insensitive
31
+
{"5gb", 5 * 1024 * 1024 * 1024, false},
32
+
{"5Gb", 5 * 1024 * 1024 * 1024, false},
33
+
34
+
// With whitespace
35
+
{" 5GB ", 5 * 1024 * 1024 * 1024, false},
36
+
37
+
// Decimals
38
+
{"1.5GB", int64(1.5 * 1024 * 1024 * 1024), false},
39
+
{"2.5TB", int64(2.5 * 1024 * 1024 * 1024 * 1024), false},
40
+
41
+
// Errors
42
+
{"", 0, true},
43
+
{"invalid", 0, true},
44
+
{"GB", 0, true},
45
+
{"-5GB", 0, true},
46
+
}
47
+
48
+
for _, tt := range tests {
49
+
t.Run(tt.input, func(t *testing.T) {
50
+
result, err := ParseHumanBytes(tt.input)
51
+
if tt.wantErr {
52
+
if err == nil {
53
+
t.Errorf("expected error for input %q", tt.input)
54
+
}
55
+
return
56
+
}
57
+
if err != nil {
58
+
t.Errorf("unexpected error: %v", err)
59
+
return
60
+
}
61
+
if result != tt.expected {
62
+
t.Errorf("got %d, want %d", result, tt.expected)
63
+
}
64
+
})
65
+
}
66
+
}
67
+
68
+
func TestFormatHumanBytes(t *testing.T) {
69
+
tests := []struct {
70
+
bytes int64
71
+
expected string
72
+
}{
73
+
{0, "0 B"},
74
+
{512, "512 B"},
75
+
{1024, "1.0 KB"},
76
+
{1024 * 1024, "1.0 MB"},
77
+
{1024 * 1024 * 1024, "1.0 GB"},
78
+
{5 * 1024 * 1024 * 1024, "5.0 GB"},
79
+
{1024 * 1024 * 1024 * 1024, "1.0 TB"},
80
+
}
81
+
82
+
for _, tt := range tests {
83
+
t.Run(tt.expected, func(t *testing.T) {
84
+
result := FormatHumanBytes(tt.bytes)
85
+
if result != tt.expected {
86
+
t.Errorf("got %q, want %q", result, tt.expected)
87
+
}
88
+
})
89
+
}
90
+
}
91
+
92
+
func TestNewManager_NoConfigFile(t *testing.T) {
93
+
m, err := NewManager("/nonexistent/quotas.yaml")
94
+
if err != nil {
95
+
t.Fatalf("expected no error for missing file, got: %v", err)
96
+
}
97
+
if m.IsEnabled() {
98
+
t.Error("expected quotas to be disabled when file missing")
99
+
}
100
+
if m.GetBerthLimit("anything") != nil {
101
+
t.Error("expected nil limit when quotas disabled")
102
+
}
103
+
}
104
+
105
+
func TestNewManager_ValidConfig(t *testing.T) {
106
+
tmpDir := t.TempDir()
107
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
108
+
109
+
configContent := `
110
+
berths:
111
+
deckhand:
112
+
quota: 5GB
113
+
bosun:
114
+
quota: 50GB
115
+
quartermaster:
116
+
quota: 100GB
117
+
118
+
defaults:
119
+
new_crew_berth: deckhand
120
+
`
121
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
122
+
t.Fatalf("failed to write config: %v", err)
123
+
}
124
+
125
+
m, err := NewManager(configPath)
126
+
if err != nil {
127
+
t.Fatalf("failed to load config: %v", err)
128
+
}
129
+
130
+
if !m.IsEnabled() {
131
+
t.Error("expected quotas to be enabled")
132
+
}
133
+
134
+
if m.BerthCount() != 3 {
135
+
t.Errorf("expected 3 berths, got %d", m.BerthCount())
136
+
}
137
+
138
+
// Test default berth (empty string)
139
+
limit := m.GetBerthLimit("")
140
+
if limit == nil {
141
+
t.Fatal("expected non-nil limit for default berth")
142
+
}
143
+
if *limit != 5*1024*1024*1024 {
144
+
t.Errorf("expected 5GB limit for default, got %d", *limit)
145
+
}
146
+
147
+
// Test explicit berth
148
+
limit = m.GetBerthLimit("bosun")
149
+
if limit == nil {
150
+
t.Fatal("expected non-nil limit for bosun")
151
+
}
152
+
if *limit != 50*1024*1024*1024 {
153
+
t.Errorf("expected 50GB limit for bosun, got %d", *limit)
154
+
}
155
+
156
+
// Test berth name resolution
157
+
if m.GetBerthName("") != "deckhand" {
158
+
t.Errorf("expected berth name 'deckhand' for empty key, got %q", m.GetBerthName(""))
159
+
}
160
+
if m.GetBerthName("bosun") != "bosun" {
161
+
t.Errorf("expected berth name 'bosun', got %q", m.GetBerthName("bosun"))
162
+
}
163
+
}
164
+
165
+
func TestNewManager_FallbackToDefault(t *testing.T) {
166
+
tmpDir := t.TempDir()
167
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
168
+
169
+
configContent := `
170
+
berths:
171
+
deckhand:
172
+
quota: 5GB
173
+
quartermaster:
174
+
quota: 50GB
175
+
176
+
defaults:
177
+
new_crew_berth: deckhand
178
+
`
179
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
180
+
t.Fatalf("failed to write config: %v", err)
181
+
}
182
+
183
+
m, err := NewManager(configPath)
184
+
if err != nil {
185
+
t.Fatalf("failed to load config: %v", err)
186
+
}
187
+
188
+
// Unknown berth should fall back to default
189
+
limit := m.GetBerthLimit("unknown_berth")
190
+
if limit == nil {
191
+
t.Fatal("expected fallback to default berth")
192
+
}
193
+
if *limit != 5*1024*1024*1024 {
194
+
t.Errorf("expected 5GB limit from default fallback, got %d", *limit)
195
+
}
196
+
197
+
// Berth name should also fall back
198
+
if m.GetBerthName("unknown_berth") != "deckhand" {
199
+
t.Errorf("expected berth name 'deckhand' for unknown berth, got %q", m.GetBerthName("unknown_berth"))
200
+
}
201
+
}
202
+
203
+
func TestNewManager_InvalidConfig(t *testing.T) {
204
+
tmpDir := t.TempDir()
205
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
206
+
207
+
// Invalid YAML
208
+
if err := os.WriteFile(configPath, []byte("invalid: [yaml"), 0644); err != nil {
209
+
t.Fatalf("failed to write config: %v", err)
210
+
}
211
+
212
+
_, err := NewManager(configPath)
213
+
if err == nil {
214
+
t.Error("expected error for invalid YAML")
215
+
}
216
+
}
217
+
218
+
func TestNewManager_InvalidQuotaSize(t *testing.T) {
219
+
tmpDir := t.TempDir()
220
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
221
+
222
+
configContent := `
223
+
berths:
224
+
deckhand:
225
+
quota: invalid_size
226
+
227
+
defaults:
228
+
new_crew_berth: deckhand
229
+
`
230
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
231
+
t.Fatalf("failed to write config: %v", err)
232
+
}
233
+
234
+
_, err := NewManager(configPath)
235
+
if err == nil {
236
+
t.Error("expected error for invalid quota size")
237
+
}
238
+
}
239
+
240
+
func TestNewManager_NoDefaultBerth(t *testing.T) {
241
+
tmpDir := t.TempDir()
242
+
configPath := filepath.Join(tmpDir, "quotas.yaml")
243
+
244
+
configContent := `
245
+
berths:
246
+
quartermaster:
247
+
quota: 50GB
248
+
249
+
defaults:
250
+
new_crew_berth: nonexistent
251
+
`
252
+
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
253
+
t.Fatalf("failed to write config: %v", err)
254
+
}
255
+
256
+
m, err := NewManager(configPath)
257
+
if err != nil {
258
+
t.Fatalf("failed to load config: %v", err)
259
+
}
260
+
261
+
// Empty berth key with nonexistent default should return nil (unlimited)
262
+
limit := m.GetBerthLimit("")
263
+
if limit != nil {
264
+
t.Error("expected nil limit when default berth doesn't exist")
265
+
}
266
+
267
+
// Explicit berth should still work
268
+
limit = m.GetBerthLimit("quartermaster")
269
+
if limit == nil {
270
+
t.Fatal("expected non-nil limit for quartermaster berth")
271
+
}
272
+
if *limit != 50*1024*1024*1024 {
273
+
t.Errorf("expected 50GB limit for quartermaster, got %d", *limit)
274
+
}
275
+
}
+35
quotas.yaml.example
+35
quotas.yaml.example
···
1
+
# ATCR Hold Service Quota Configuration
2
+
# Copy this file to quotas.yaml to enable quota enforcement.
3
+
# If quotas.yaml doesn't exist, quotas are disabled (unlimited for all users).
4
+
5
+
# Berths define quota tiers using nautical crew ranks.
6
+
# Each berth has a quota limit specified in human-readable format.
7
+
# Supported units: B, KB, MB, GB, TB, PB (case-insensitive)
8
+
berths:
9
+
# Entry-level crew - suitable for new or casual users
10
+
deckhand:
11
+
quota: 5GB
12
+
13
+
# Mid-level crew - for regular contributors
14
+
bosun:
15
+
quota: 50GB
16
+
17
+
# Senior crew - for power users or trusted contributors
18
+
quartermaster:
19
+
quota: 100GB
20
+
21
+
# You can add custom berths with any name:
22
+
# unlimited_crew:
23
+
# quota: 1TB
24
+
25
+
defaults:
26
+
# Default berth assigned to new crew members who don't have an explicit berth.
27
+
# This berth must exist in the berths section above.
28
+
new_crew_berth: deckhand
29
+
30
+
# Notes:
31
+
# - The hold captain (owner) always has unlimited quota regardless of berths.
32
+
# - Crew members can be assigned a specific berth in their crew record.
33
+
# - If a crew member's berth doesn't exist in config, they fall back to the default.
34
+
# - Quota is calculated per-user by summing unique blob sizes (deduplicated).
35
+
# - Quota is checked when pushing manifests (after blobs are already uploaded).