+1
-2
pkg/appview/config.go
+1
-2
pkg/appview/config.go
+2
-5
pkg/appview/db/schema.go
+2
-5
pkg/appview/db/schema.go
···
225
var statements []string
226
227
// Split on semicolons
228
-
parts := strings.Split(query, ";")
229
-
230
-
for _, part := range parts {
231
// Trim whitespace
232
stmt := strings.TrimSpace(part)
233
···
237
}
238
239
// Skip comment-only statements
240
-
lines := strings.Split(stmt, "\n")
241
hasCode := false
242
-
for _, line := range lines {
243
trimmed := strings.TrimSpace(line)
244
if trimmed != "" && !strings.HasPrefix(trimmed, "--") {
245
hasCode = true
···
225
var statements []string
226
227
// Split on semicolons
228
+
for part := range strings.SplitSeq(query, ";") {
229
// Trim whitespace
230
stmt := strings.TrimSpace(part)
231
···
235
}
236
237
// Skip comment-only statements
238
hasCode := false
239
+
for line := range strings.SplitSeq(stmt, "\n") {
240
trimmed := strings.TrimSpace(line)
241
if trimmed != "" && !strings.HasPrefix(trimmed, "--") {
242
hasCode = true
+1
-1
pkg/appview/handlers/opengraph.go
+1
-1
pkg/appview/handlers/opengraph.go
+11
-14
pkg/appview/handlers/repository.go
+11
-14
pkg/appview/handlers/repository.go
···
89
continue
90
}
91
92
-
wg.Add(1)
93
-
go func(idx int) {
94
-
defer wg.Done()
95
-
96
-
endpoint := manifests[idx].HoldEndpoint
97
98
// Try to get cached status first (instant)
99
if cached := h.HealthChecker.GetCachedStatus(endpoint); cached != nil {
100
mu.Lock()
101
-
manifests[idx].Reachable = cached.Reachable
102
-
manifests[idx].Pending = false
103
mu.Unlock()
104
return
105
}
···
110
mu.Lock()
111
if ctx.Err() == context.DeadlineExceeded {
112
// Timeout - mark as pending for HTMX polling
113
-
manifests[idx].Reachable = false
114
-
manifests[idx].Pending = true
115
} else if err != nil {
116
// Error - mark as unreachable
117
-
manifests[idx].Reachable = false
118
-
manifests[idx].Pending = false
119
} else {
120
// Success
121
-
manifests[idx].Reachable = reachable
122
-
manifests[idx].Pending = false
123
}
124
mu.Unlock()
125
-
}(i)
126
}
127
128
// Wait for all checks to complete or timeout
···
89
continue
90
}
91
92
+
wg.Go(func() {
93
+
endpoint := manifests[i].HoldEndpoint
94
95
// Try to get cached status first (instant)
96
if cached := h.HealthChecker.GetCachedStatus(endpoint); cached != nil {
97
mu.Lock()
98
+
manifests[i].Reachable = cached.Reachable
99
+
manifests[i].Pending = false
100
mu.Unlock()
101
return
102
}
···
107
mu.Lock()
108
if ctx.Err() == context.DeadlineExceeded {
109
// Timeout - mark as pending for HTMX polling
110
+
manifests[i].Reachable = false
111
+
manifests[i].Pending = true
112
} else if err != nil {
113
// Error - mark as unreachable
114
+
manifests[i].Reachable = false
115
+
manifests[i].Pending = false
116
} else {
117
// Success
118
+
manifests[i].Reachable = reachable
119
+
manifests[i].Pending = false
120
}
121
mu.Unlock()
122
+
})
123
}
124
125
// Wait for all checks to complete or timeout
+7
-14
pkg/appview/holdhealth/worker.go
+7
-14
pkg/appview/holdhealth/worker.go
···
53
54
// Start begins the background worker
55
func (w *Worker) Start(ctx context.Context) {
56
-
w.wg.Add(1)
57
-
go func() {
58
-
defer w.wg.Done()
59
-
60
slog.Info("Hold health worker starting background health checks")
61
62
// Wait for services to be ready (Docker startup race condition)
···
89
w.checker.Cleanup()
90
}
91
}
92
-
}()
93
}
94
95
// Stop gracefully stops the worker
···
154
var statsMu sync.Mutex
155
156
for _, endpoint := range uniqueEndpoints {
157
-
wg.Add(1)
158
-
159
-
go func(ep string) {
160
-
defer wg.Done()
161
-
162
// Acquire semaphore
163
sem <- struct{}{}
164
defer func() { <-sem }()
165
166
// Check health
167
-
isReachable, err := w.checker.CheckHealth(ctx, ep)
168
169
// Update cache
170
-
w.checker.SetStatus(ep, isReachable, err)
171
172
// Update stats
173
statsMu.Lock()
···
175
reachable++
176
} else {
177
unreachable++
178
-
slog.Warn("Hold health worker hold unreachable", "endpoint", ep, "error", err)
179
}
180
statsMu.Unlock()
181
-
}(endpoint)
182
}
183
184
// Wait for all checks to complete
···
53
54
// Start begins the background worker
55
func (w *Worker) Start(ctx context.Context) {
56
+
w.wg.Go(func() {
57
slog.Info("Hold health worker starting background health checks")
58
59
// Wait for services to be ready (Docker startup race condition)
···
86
w.checker.Cleanup()
87
}
88
}
89
+
})
90
}
91
92
// Stop gracefully stops the worker
···
151
var statsMu sync.Mutex
152
153
for _, endpoint := range uniqueEndpoints {
154
+
wg.Go(func() {
155
// Acquire semaphore
156
sem <- struct{}{}
157
defer func() { <-sem }()
158
159
// Check health
160
+
isReachable, err := w.checker.CheckHealth(ctx, endpoint)
161
162
// Update cache
163
+
w.checker.SetStatus(endpoint, isReachable, err)
164
165
// Update stats
166
statsMu.Lock()
···
168
reachable++
169
} else {
170
unreachable++
171
+
slog.Warn("Hold health worker hold unreachable", "endpoint", endpoint, "error", err)
172
}
173
statsMu.Unlock()
174
+
})
175
}
176
177
// Wait for all checks to complete
+1
-3
pkg/appview/licenses/licenses.go
+1
-3
pkg/appview/licenses/licenses.go
···
129
licensesStr = strings.ReplaceAll(licensesStr, " OR ", ",")
130
licensesStr = strings.ReplaceAll(licensesStr, ";", ",")
131
132
-
parts := strings.Split(licensesStr, ",")
133
-
134
var result []LicenseInfo
135
seen := make(map[string]bool) // Deduplicate
136
137
-
for _, part := range parts {
138
part = strings.TrimSpace(part)
139
if part == "" {
140
continue
···
129
licensesStr = strings.ReplaceAll(licensesStr, " OR ", ",")
130
licensesStr = strings.ReplaceAll(licensesStr, ";", ",")
131
132
var result []LicenseInfo
133
seen := make(map[string]bool) // Deduplicate
134
135
+
for part := range strings.SplitSeq(licensesStr, ",") {
136
part = strings.TrimSpace(part)
137
if part == "" {
138
continue
+5
-8
pkg/appview/middleware/auth_test.go
+5
-8
pkg/appview/middleware/auth_test.go
···
358
var wg sync.WaitGroup
359
var mu sync.Mutex // Protect results map
360
361
-
for i := range 10 {
362
-
wg.Add(1)
363
-
go func(index int, sessionID string) {
364
-
defer wg.Done()
365
-
366
req := httptest.NewRequest("GET", "/test", nil)
367
req.AddCookie(&http.Cookie{
368
Name: "atcr_session",
369
-
Value: sessionID,
370
})
371
w := httptest.NewRecorder()
372
373
wrappedHandler.ServeHTTP(w, req)
374
375
mu.Lock()
376
-
results[index] = w.Code
377
mu.Unlock()
378
-
}(i, sessionIDs[i])
379
}
380
381
wg.Wait()
···
358
var wg sync.WaitGroup
359
var mu sync.Mutex // Protect results map
360
361
+
for i := range results {
362
+
wg.Go(func() {
363
req := httptest.NewRequest("GET", "/test", nil)
364
req.AddCookie(&http.Cookie{
365
Name: "atcr_session",
366
+
Value: sessionIDs[i],
367
})
368
w := httptest.NewRecorder()
369
370
wrappedHandler.ServeHTTP(w, req)
371
372
mu.Lock()
373
+
results[i] = w.Code
374
mu.Unlock()
375
+
})
376
}
377
378
wg.Wait()
+2
-4
pkg/appview/storage/profile_test.go
+2
-4
pkg/appview/storage/profile_test.go
···
341
// Make 5 concurrent GetProfile calls
342
var wg sync.WaitGroup
343
for range 5 {
344
-
wg.Add(1)
345
-
go func() {
346
-
defer wg.Done()
347
_, err := GetProfile(context.Background(), client)
348
if err != nil {
349
t.Errorf("GetProfile() error = %v", err)
350
}
351
-
}()
352
}
353
354
wg.Wait()
+42
-45
pkg/appview/storage/routing_repository.go
+42
-45
pkg/appview/storage/routing_repository.go
···
7
import (
8
"context"
9
"log/slog"
10
11
"github.com/distribution/distribution/v3"
12
)
···
18
// RoutingRepository routes manifests to ATProto and blobs to external hold service
19
// The registry (AppView) is stateless and NEVER stores blobs locally
20
// NOTE: A fresh instance is created per-request (see middleware/registry.go)
21
-
// so no mutex is needed - each request has its own instance
22
type RoutingRepository struct {
23
distribution.Repository
24
-
Ctx *RegistryContext // All context and services (exported for token updates)
25
-
manifestStore *ManifestStore // Manifest store instance (lazy-initialized)
26
-
blobStore *ProxyBlobStore // Blob store instance (lazy-initialized)
27
}
28
29
// NewRoutingRepository creates a new routing repository
···
36
37
// Manifests returns the ATProto-backed manifest service
38
func (r *RoutingRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
39
-
// Lazy-initialize manifest store (no mutex needed - one instance per request)
40
-
if r.manifestStore == nil {
41
// Ensure blob store is created first (needed for label extraction during push)
42
blobStore := r.Blobs(ctx)
43
r.manifestStore = NewManifestStore(r.Ctx, blobStore)
44
-
}
45
return r.manifestStore, nil
46
}
47
48
// Blobs returns a proxy blob store that routes to external hold service
49
// The registry (AppView) NEVER stores blobs locally - all blobs go through hold service
50
func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore {
51
-
// Return cached blob store if available (no mutex needed - one instance per request)
52
-
if r.blobStore != nil {
53
-
slog.Debug("Returning cached blob store", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository)
54
-
return r.blobStore
55
-
}
56
57
-
// Determine if this is a pull (GET/HEAD) or push (PUT/POST/etc) operation
58
-
// Pull operations use the historical hold DID from the database (blobs are where they were pushed)
59
-
// Push operations use the discovery-based hold DID from user's profile/default
60
-
// This allows users to change their default hold and have new pushes go there
61
-
isPull := false
62
-
if method, ok := ctx.Value(HTTPRequestMethod).(string); ok {
63
-
isPull = method == "GET" || method == "HEAD"
64
-
}
65
66
-
holdDID := r.Ctx.HoldDID // Default to discovery-based DID
67
-
holdSource := "discovery"
68
69
-
// Only query database for pull operations
70
-
if isPull && r.Ctx.Database != nil {
71
-
// Query database for the latest manifest's hold DID
72
-
if dbHoldDID, err := r.Ctx.Database.GetLatestHoldDIDForRepo(r.Ctx.DID, r.Ctx.Repository); err == nil && dbHoldDID != "" {
73
-
// Use hold DID from database (pull case - use historical reference)
74
-
holdDID = dbHoldDID
75
-
holdSource = "database"
76
-
slog.Debug("Using hold from database manifest (pull)", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", dbHoldDID)
77
-
} else if err != nil {
78
-
// Log error but don't fail - fall back to discovery-based DID
79
-
slog.Warn("Failed to query database for hold DID", "component", "storage/blobs", "error", err)
80
}
81
-
// If dbHoldDID is empty (no manifests yet), fall through to use discovery-based DID
82
-
}
83
84
-
if holdDID == "" {
85
-
// This should never happen if middleware is configured correctly
86
-
panic("hold DID not set in RegistryContext - ensure default_hold_did is configured in middleware")
87
-
}
88
89
-
slog.Debug("Using hold DID for blobs", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", holdDID, "source", holdSource)
90
-
91
-
// Update context with the correct hold DID (may be from database or discovered)
92
-
r.Ctx.HoldDID = holdDID
93
94
-
// Create and cache proxy blob store
95
-
r.blobStore = NewProxyBlobStore(r.Ctx)
96
return r.blobStore
97
}
98
···
7
import (
8
"context"
9
"log/slog"
10
+
"sync"
11
12
"github.com/distribution/distribution/v3"
13
)
···
19
// RoutingRepository routes manifests to ATProto and blobs to external hold service
20
// The registry (AppView) is stateless and NEVER stores blobs locally
21
// NOTE: A fresh instance is created per-request (see middleware/registry.go)
22
type RoutingRepository struct {
23
distribution.Repository
24
+
Ctx *RegistryContext // All context and services (exported for token updates)
25
+
manifestStore *ManifestStore // Manifest store instance (lazy-initialized)
26
+
manifestStoreOnce sync.Once // Ensures thread-safe lazy initialization
27
+
blobStore *ProxyBlobStore // Blob store instance (lazy-initialized)
28
+
blobStoreOnce sync.Once // Ensures thread-safe lazy initialization
29
}
30
31
// NewRoutingRepository creates a new routing repository
···
38
39
// Manifests returns the ATProto-backed manifest service
40
func (r *RoutingRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
41
+
r.manifestStoreOnce.Do(func() {
42
// Ensure blob store is created first (needed for label extraction during push)
43
blobStore := r.Blobs(ctx)
44
r.manifestStore = NewManifestStore(r.Ctx, blobStore)
45
+
})
46
return r.manifestStore, nil
47
}
48
49
// Blobs returns a proxy blob store that routes to external hold service
50
// The registry (AppView) NEVER stores blobs locally - all blobs go through hold service
51
func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore {
52
+
r.blobStoreOnce.Do(func() {
53
+
// Determine if this is a pull (GET/HEAD) or push (PUT/POST/etc) operation
54
+
// Pull operations use the historical hold DID from the database (blobs are where they were pushed)
55
+
// Push operations use the discovery-based hold DID from user's profile/default
56
+
// This allows users to change their default hold and have new pushes go there
57
+
isPull := false
58
+
if method, ok := ctx.Value(HTTPRequestMethod).(string); ok {
59
+
isPull = method == "GET" || method == "HEAD"
60
+
}
61
62
+
holdDID := r.Ctx.HoldDID // Default to discovery-based DID
63
+
holdSource := "discovery"
64
65
+
// Only query database for pull operations
66
+
if isPull && r.Ctx.Database != nil {
67
+
// Query database for the latest manifest's hold DID
68
+
if dbHoldDID, err := r.Ctx.Database.GetLatestHoldDIDForRepo(r.Ctx.DID, r.Ctx.Repository); err == nil && dbHoldDID != "" {
69
+
// Use hold DID from database (pull case - use historical reference)
70
+
holdDID = dbHoldDID
71
+
holdSource = "database"
72
+
slog.Debug("Using hold from database manifest (pull)", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", dbHoldDID)
73
+
} else if err != nil {
74
+
// Log error but don't fail - fall back to discovery-based DID
75
+
slog.Warn("Failed to query database for hold DID", "component", "storage/blobs", "error", err)
76
+
}
77
+
// If dbHoldDID is empty (no manifests yet), fall through to use discovery-based DID
78
+
}
79
80
+
if holdDID == "" {
81
+
// This should never happen if middleware is configured correctly
82
+
panic("hold DID not set in RegistryContext - ensure default_hold_did is configured in middleware")
83
}
84
85
+
slog.Debug("Using hold DID for blobs", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", holdDID, "source", holdSource)
86
87
+
// Update context with the correct hold DID (may be from database or discovered)
88
+
r.Ctx.HoldDID = holdDID
89
90
+
// Create and cache proxy blob store
91
+
r.blobStore = NewProxyBlobStore(r.Ctx)
92
+
})
93
return r.blobStore
94
}
95
+6
-10
pkg/appview/storage/routing_repository_test.go
+6
-10
pkg/appview/storage/routing_repository_test.go
···
318
319
// Concurrent access to Manifests()
320
for i := 0; i < numGoroutines; i++ {
321
-
wg.Add(1)
322
-
go func(index int) {
323
-
defer wg.Done()
324
store, err := repo.Manifests(context.Background())
325
require.NoError(t, err)
326
-
manifestStores[index] = store
327
-
}(i)
328
}
329
330
wg.Wait()
···
341
342
// Concurrent access to Blobs()
343
for i := 0; i < numGoroutines; i++ {
344
-
wg.Add(1)
345
-
go func(index int) {
346
-
defer wg.Done()
347
-
blobStores[index] = repo.Blobs(context.Background())
348
-
}(i)
349
}
350
351
wg.Wait()
···
318
319
// Concurrent access to Manifests()
320
for i := 0; i < numGoroutines; i++ {
321
+
wg.Go(func() {
322
store, err := repo.Manifests(context.Background())
323
require.NoError(t, err)
324
+
manifestStores[i] = store
325
+
})
326
}
327
328
wg.Wait()
···
339
340
// Concurrent access to Blobs()
341
for i := 0; i < numGoroutines; i++ {
342
+
wg.Go(func() {
343
+
blobStores[i] = repo.Blobs(context.Background())
344
+
})
345
}
346
347
wg.Wait()
+5
-9
pkg/atproto/directory_test.go
+5
-9
pkg/atproto/directory_test.go
···
29
t.Run("concurrent access is thread-safe", func(t *testing.T) {
30
const numGoroutines = 100
31
var wg sync.WaitGroup
32
-
wg.Add(numGoroutines)
33
34
// Channel to collect all directory instances
35
instances := make(chan any, numGoroutines)
36
37
// Launch many goroutines concurrently accessing GetDirectory
38
for range numGoroutines {
39
-
go func() {
40
-
defer wg.Done()
41
dir := GetDirectory()
42
instances <- dir
43
-
}()
44
}
45
46
// Wait for all goroutines to complete
···
120
121
const numGoroutines = 50
122
var wg sync.WaitGroup
123
-
wg.Add(numGoroutines)
124
125
instances := make([]any, numGoroutines)
126
var mu sync.Mutex
127
128
// Simulate many goroutines trying to get the directory simultaneously
129
for i := 0; i < numGoroutines; i++ {
130
-
go func(idx int) {
131
-
defer wg.Done()
132
dir := GetDirectory()
133
mu.Lock()
134
-
instances[idx] = dir
135
mu.Unlock()
136
-
}(i)
137
}
138
139
wg.Wait()
···
29
t.Run("concurrent access is thread-safe", func(t *testing.T) {
30
const numGoroutines = 100
31
var wg sync.WaitGroup
32
33
// Channel to collect all directory instances
34
instances := make(chan any, numGoroutines)
35
36
// Launch many goroutines concurrently accessing GetDirectory
37
for range numGoroutines {
38
+
wg.Go(func() {
39
dir := GetDirectory()
40
instances <- dir
41
+
})
42
}
43
44
// Wait for all goroutines to complete
···
118
119
const numGoroutines = 50
120
var wg sync.WaitGroup
121
122
instances := make([]any, numGoroutines)
123
var mu sync.Mutex
124
125
// Simulate many goroutines trying to get the directory simultaneously
126
for i := 0; i < numGoroutines; i++ {
127
+
wg.Go(func() {
128
dir := GetDirectory()
129
mu.Lock()
130
+
instances[i] = dir
131
mu.Unlock()
132
+
})
133
}
134
135
wg.Wait()
+5
-7
pkg/auth/token/issuer_test.go
+5
-7
pkg/auth/token/issuer_test.go
···
378
// Issue tokens concurrently
379
const numGoroutines = 10
380
var wg sync.WaitGroup
381
-
wg.Add(numGoroutines)
382
383
tokens := make([]string, numGoroutines)
384
errors := make([]error, numGoroutines)
385
386
for i := 0; i < numGoroutines; i++ {
387
-
go func(idx int) {
388
-
defer wg.Done()
389
-
subject := "did:plc:user" + string(rune('0'+idx))
390
token, err := issuer.Issue(subject, nil, AuthMethodOAuth)
391
-
tokens[idx] = token
392
-
errors[idx] = err
393
-
}(i)
394
}
395
396
wg.Wait()
···
378
// Issue tokens concurrently
379
const numGoroutines = 10
380
var wg sync.WaitGroup
381
382
tokens := make([]string, numGoroutines)
383
errors := make([]error, numGoroutines)
384
385
for i := 0; i < numGoroutines; i++ {
386
+
wg.Go(func() {
387
+
subject := "did:plc:user" + string(rune('0'+i))
388
token, err := issuer.Issue(subject, nil, AuthMethodOAuth)
389
+
tokens[i] = token
390
+
errors[i] = err
391
+
})
392
}
393
394
wg.Wait()
-90
pkg/hold/oci/helpers_test.go
-90
pkg/hold/oci/helpers_test.go
···
5
)
6
7
// Tests for helper functions
8
-
9
-
func TestBlobPath_SHA256(t *testing.T) {
10
-
tests := []struct {
11
-
name string
12
-
digest string
13
-
expected string
14
-
}{
15
-
{
16
-
name: "standard sha256 digest",
17
-
digest: "sha256:abc123def456",
18
-
expected: "/docker/registry/v2/blobs/sha256/ab/abc123def456/data",
19
-
},
20
-
{
21
-
name: "short hash (less than 2 chars)",
22
-
digest: "sha256:a",
23
-
expected: "/docker/registry/v2/blobs/sha256/a/data",
24
-
},
25
-
{
26
-
name: "exactly 2 char hash",
27
-
digest: "sha256:ab",
28
-
expected: "/docker/registry/v2/blobs/sha256/ab/ab/data",
29
-
},
30
-
}
31
-
32
-
for _, tt := range tests {
33
-
t.Run(tt.name, func(t *testing.T) {
34
-
result := blobPath(tt.digest)
35
-
if result != tt.expected {
36
-
t.Errorf("Expected %s, got %s", tt.expected, result)
37
-
}
38
-
})
39
-
}
40
-
}
41
-
42
-
func TestBlobPath_TempUpload(t *testing.T) {
43
-
tests := []struct {
44
-
name string
45
-
digest string
46
-
expected string
47
-
}{
48
-
{
49
-
name: "temp upload path",
50
-
digest: "uploads/temp-uuid-123",
51
-
expected: "/docker/registry/v2/uploads/temp-uuid-123/data",
52
-
},
53
-
{
54
-
name: "temp upload with different uuid",
55
-
digest: "uploads/temp-abc-def-456",
56
-
expected: "/docker/registry/v2/uploads/temp-abc-def-456/data",
57
-
},
58
-
}
59
-
60
-
for _, tt := range tests {
61
-
t.Run(tt.name, func(t *testing.T) {
62
-
result := blobPath(tt.digest)
63
-
if result != tt.expected {
64
-
t.Errorf("Expected %s, got %s", tt.expected, result)
65
-
}
66
-
})
67
-
}
68
-
}
69
-
70
-
func TestBlobPath_MalformedDigest(t *testing.T) {
71
-
tests := []struct {
72
-
name string
73
-
digest string
74
-
expected string
75
-
}{
76
-
{
77
-
name: "no colon in digest",
78
-
digest: "malformed-digest",
79
-
expected: "/docker/registry/v2/blobs/malformed-digest/data",
80
-
},
81
-
{
82
-
name: "empty digest",
83
-
digest: "",
84
-
expected: "/docker/registry/v2/blobs//data",
85
-
},
86
-
}
87
-
88
-
for _, tt := range tests {
89
-
t.Run(tt.name, func(t *testing.T) {
90
-
result := blobPath(tt.digest)
91
-
if result != tt.expected {
92
-
t.Errorf("Expected %s, got %s", tt.expected, result)
93
-
}
94
-
})
95
-
}
96
-
}
97
-
98
func TestNormalizeETag(t *testing.T) {
99
tests := []struct {
100
name string
+15
-43
pkg/hold/oci/multipart.go
+15
-43
pkg/hold/oci/multipart.go
···
12
"time"
13
14
"atcr.io/pkg/atproto"
15
-
"github.com/aws/aws-sdk-go/service/s3"
16
"github.com/google/uuid"
17
)
18
···
237
if h.s3Service.Client == nil {
238
return "", S3Native, fmt.Errorf("S3 not configured")
239
}
240
-
path := blobPath(digest)
241
s3Key := strings.TrimPrefix(path, "/")
242
if h.s3Service.PathPrefix != "" {
243
s3Key = h.s3Service.PathPrefix + "/" + s3Key
244
}
245
246
-
result, err := h.s3Service.Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
247
Bucket: &h.s3Service.Bucket,
248
Key: &s3Key,
249
})
···
280
return nil, fmt.Errorf("S3 not configured")
281
}
282
283
-
path := blobPath(session.Digest)
284
s3Key := strings.TrimPrefix(path, "/")
285
if h.s3Service.PathPrefix != "" {
286
s3Key = h.s3Service.PathPrefix + "/" + s3Key
287
}
288
pnum := int64(partNumber)
289
-
req, _ := h.s3Service.Client.UploadPartRequest(&s3.UploadPartInput{
290
Bucket: &h.s3Service.Bucket,
291
Key: &s3Key,
292
UploadId: &session.S3UploadID,
···
342
343
// Convert to S3 CompletedPart format
344
// IMPORTANT: S3 requires ETags to be quoted in the CompleteMultipartUpload XML
345
-
s3Parts := make([]*s3.CompletedPart, len(parts))
346
for i, p := range parts {
347
etag := normalizeETag(p.ETag)
348
pnum := int64(p.PartNumber)
349
-
s3Parts[i] = &s3.CompletedPart{
350
PartNumber: &pnum,
351
ETag: &etag,
352
}
353
}
354
-
sourcePath := blobPath(session.Digest)
355
s3Key := strings.TrimPrefix(sourcePath, "/")
356
if h.s3Service.PathPrefix != "" {
357
s3Key = h.s3Service.PathPrefix + "/" + s3Key
358
}
359
360
-
_, err = h.s3Service.Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
361
Bucket: &h.s3Service.Bucket,
362
Key: &s3Key,
363
UploadId: &session.S3UploadID,
364
-
MultipartUpload: &s3.CompletedMultipartUpload{
365
Parts: s3Parts,
366
},
367
})
···
374
"parts", len(s3Parts))
375
376
// Verify the blob exists at temp location before moving
377
-
destPath := blobPath(finalDigest)
378
slog.Debug("About to move blob",
379
"source", sourcePath,
380
"dest", destPath)
···
412
}
413
414
// Write assembled blob to final digest location (not temp)
415
-
path := blobPath(finalDigest)
416
writer, err := h.driver.Writer(ctx, path, false)
417
if err != nil {
418
return fmt.Errorf("failed to create writer: %w", err)
···
448
if h.s3Service.Client == nil {
449
return fmt.Errorf("S3 not configured")
450
}
451
-
path := blobPath(session.Digest)
452
s3Key := strings.TrimPrefix(path, "/")
453
if h.s3Service.PathPrefix != "" {
454
s3Key = h.s3Service.PathPrefix + "/" + s3Key
455
}
456
457
-
_, err := h.s3Service.Client.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
458
Bucket: &h.s3Service.Bucket,
459
Key: &s3Key,
460
UploadId: &session.S3UploadID,
···
499
// Add quotes
500
return fmt.Sprintf("\"%s\"", etag)
501
}
502
-
503
-
// blobPath converts a digest (e.g., "sha256:abc123...") or temp path to a storage path
504
-
// Distribution stores blobs as: /docker/registry/v2/blobs/{algorithm}/{xx}/{hash}/data
505
-
// where xx is the first 2 characters of the hash for directory sharding
506
-
// NOTE: Path must start with / for filesystem driver
507
-
// This is used for OCI container layers (content-addressed, globally deduplicated)
508
-
func blobPath(digest string) string {
509
-
// Handle temp paths (start with uploads/temp-)
510
-
if strings.HasPrefix(digest, "uploads/temp-") {
511
-
return fmt.Sprintf("/docker/registry/v2/%s/data", digest)
512
-
}
513
-
514
-
// Split digest into algorithm and hash
515
-
parts := strings.SplitN(digest, ":", 2)
516
-
if len(parts) != 2 {
517
-
// Fallback for malformed digest
518
-
return fmt.Sprintf("/docker/registry/v2/blobs/%s/data", digest)
519
-
}
520
-
521
-
algorithm := parts[0]
522
-
hash := parts[1]
523
-
524
-
// Use first 2 characters for sharding
525
-
if len(hash) < 2 {
526
-
return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/data", algorithm, hash)
527
-
}
528
-
529
-
return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", algorithm, hash[:2], hash)
530
-
}
···
12
"time"
13
14
"atcr.io/pkg/atproto"
15
+
"atcr.io/pkg/s3"
16
+
awss3 "github.com/aws/aws-sdk-go/service/s3"
17
"github.com/google/uuid"
18
)
19
···
238
if h.s3Service.Client == nil {
239
return "", S3Native, fmt.Errorf("S3 not configured")
240
}
241
+
path := s3.BlobPath(digest)
242
s3Key := strings.TrimPrefix(path, "/")
243
if h.s3Service.PathPrefix != "" {
244
s3Key = h.s3Service.PathPrefix + "/" + s3Key
245
}
246
247
+
result, err := h.s3Service.Client.CreateMultipartUploadWithContext(ctx, &awss3.CreateMultipartUploadInput{
248
Bucket: &h.s3Service.Bucket,
249
Key: &s3Key,
250
})
···
281
return nil, fmt.Errorf("S3 not configured")
282
}
283
284
+
path := s3.BlobPath(session.Digest)
285
s3Key := strings.TrimPrefix(path, "/")
286
if h.s3Service.PathPrefix != "" {
287
s3Key = h.s3Service.PathPrefix + "/" + s3Key
288
}
289
pnum := int64(partNumber)
290
+
req, _ := h.s3Service.Client.UploadPartRequest(&awss3.UploadPartInput{
291
Bucket: &h.s3Service.Bucket,
292
Key: &s3Key,
293
UploadId: &session.S3UploadID,
···
343
344
// Convert to S3 CompletedPart format
345
// IMPORTANT: S3 requires ETags to be quoted in the CompleteMultipartUpload XML
346
+
s3Parts := make([]*awss3.CompletedPart, len(parts))
347
for i, p := range parts {
348
etag := normalizeETag(p.ETag)
349
pnum := int64(p.PartNumber)
350
+
s3Parts[i] = &awss3.CompletedPart{
351
PartNumber: &pnum,
352
ETag: &etag,
353
}
354
}
355
+
sourcePath := s3.BlobPath(session.Digest)
356
s3Key := strings.TrimPrefix(sourcePath, "/")
357
if h.s3Service.PathPrefix != "" {
358
s3Key = h.s3Service.PathPrefix + "/" + s3Key
359
}
360
361
+
_, err = h.s3Service.Client.CompleteMultipartUploadWithContext(ctx, &awss3.CompleteMultipartUploadInput{
362
Bucket: &h.s3Service.Bucket,
363
Key: &s3Key,
364
UploadId: &session.S3UploadID,
365
+
MultipartUpload: &awss3.CompletedMultipartUpload{
366
Parts: s3Parts,
367
},
368
})
···
375
"parts", len(s3Parts))
376
377
// Verify the blob exists at temp location before moving
378
+
destPath := s3.BlobPath(finalDigest)
379
slog.Debug("About to move blob",
380
"source", sourcePath,
381
"dest", destPath)
···
413
}
414
415
// Write assembled blob to final digest location (not temp)
416
+
path := s3.BlobPath(finalDigest)
417
writer, err := h.driver.Writer(ctx, path, false)
418
if err != nil {
419
return fmt.Errorf("failed to create writer: %w", err)
···
449
if h.s3Service.Client == nil {
450
return fmt.Errorf("S3 not configured")
451
}
452
+
path := s3.BlobPath(session.Digest)
453
s3Key := strings.TrimPrefix(path, "/")
454
if h.s3Service.PathPrefix != "" {
455
s3Key = h.s3Service.PathPrefix + "/" + s3Key
456
}
457
458
+
_, err := h.s3Service.Client.AbortMultipartUploadWithContext(ctx, &awss3.AbortMultipartUploadInput{
459
Bucket: &h.s3Service.Bucket,
460
Key: &s3Key,
461
UploadId: &session.S3UploadID,
···
500
// Add quotes
501
return fmt.Sprintf("\"%s\"", etag)
502
}
+2
-2
pkg/hold/pds/xrpc.go
+2
-2
pkg/hold/pds/xrpc.go
···
217
hostname := h.pds.PublicURL
218
hostname = strings.TrimPrefix(hostname, "http://")
219
hostname = strings.TrimPrefix(hostname, "https://")
220
-
hostname = strings.Split(hostname, "/")[0] // Remove path
221
-
hostname = strings.Split(hostname, ":")[0] // Remove port
222
223
response := map[string]any{
224
"did": h.pds.DID(),
···
217
hostname := h.pds.PublicURL
218
hostname = strings.TrimPrefix(hostname, "http://")
219
hostname = strings.TrimPrefix(hostname, "https://")
220
+
hostname, _, _ = strings.Cut(hostname, "/") // Remove path
221
+
hostname, _, _ = strings.Cut(hostname, ":") // Remove port
222
223
response := map[string]any{
224
"did": h.pds.DID(),