A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1package storage
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "sync"
12 "time"
13
14 "atcr.io/pkg/atproto"
15 "github.com/distribution/distribution/v3"
16 "github.com/distribution/distribution/v3/registry/api/errcode"
17 "github.com/opencontainers/go-digest"
18)
19
20const (
21 // maxChunkSize is the maximum buffer size before flushing to hold service
22 // Matches S3's minimum multipart upload size
23 maxChunkSize = 10 * 1024 * 1024 // 10MB
24)
25
26// Global upload tracking (shared across all ProxyBlobStore instances)
27// This is necessary because distribution creates new repository/blob store instances per request
28var (
29 globalUploads = make(map[string]*ProxyBlobWriter)
30 globalUploadsMu sync.RWMutex
31)
32
33// ProxyBlobStore proxies blob requests to an external storage service
34type ProxyBlobStore struct {
35 ctx *RegistryContext // All context and services
36 holdURL string // Resolved HTTP URL for XRPC requests
37 httpClient *http.Client
38}
39
40// NewProxyBlobStore creates a new proxy blob store
41func NewProxyBlobStore(ctx *RegistryContext) *ProxyBlobStore {
42 // Resolve DID to URL once at construction time
43 holdURL := atproto.ResolveHoldURL(ctx.HoldDID)
44
45 slog.Debug("NewProxyBlobStore created", "component", "proxy_blob_store", "hold_did", ctx.HoldDID, "hold_url", holdURL, "user_did", ctx.DID, "repo", ctx.Repository)
46
47 return &ProxyBlobStore{
48 ctx: ctx,
49 holdURL: holdURL,
50 httpClient: &http.Client{
51 Timeout: 5 * time.Minute, // Timeout for presigned URL requests and uploads
52 Transport: &http.Transport{
53 DisableKeepAlives: false, // Re-enable keep-alive
54 MaxIdleConns: 100,
55 MaxIdleConnsPerHost: 100,
56 MaxConnsPerHost: 0, // unlimited
57 IdleConnTimeout: 90 * time.Second,
58 },
59 },
60 }
61}
62
63// doAuthenticatedRequest performs an HTTP request with service token authentication
64// Uses the service token from middleware to authenticate requests to the hold service
65func (p *ProxyBlobStore) doAuthenticatedRequest(ctx context.Context, req *http.Request) (*http.Response, error) {
66 // Use service token that middleware already validated and cached
67 // Middleware fails fast with HTTP 401 if OAuth session is invalid
68 if p.ctx.ServiceToken == "" {
69 // Should never happen - middleware validates OAuth before handlers run
70 slog.Error("No service token in context", "component", "proxy_blob_store", "did", p.ctx.DID)
71 return nil, fmt.Errorf("no service token available (middleware should have validated)")
72 }
73
74 // Add Bearer token to Authorization header
75 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.ctx.ServiceToken))
76
77 return p.httpClient.Do(req)
78}
79
80// checkReadAccess validates that the user has read access to blobs in this hold
81func (p *ProxyBlobStore) checkReadAccess(ctx context.Context) error {
82 if p.ctx.Authorizer == nil {
83 return nil // No authorization check if authorizer not configured
84 }
85 allowed, err := p.ctx.Authorizer.CheckReadAccess(ctx, p.ctx.HoldDID, p.ctx.DID)
86 if err != nil {
87 return fmt.Errorf("authorization check failed: %w", err)
88 }
89 if !allowed {
90 // Return 403 Forbidden instead of masquerading as missing blob
91 return errcode.ErrorCodeDenied.WithMessage("read access denied")
92 }
93 return nil
94}
95
96// checkWriteAccess validates that the user has write access to blobs in this hold
97func (p *ProxyBlobStore) checkWriteAccess(ctx context.Context) error {
98 if p.ctx.Authorizer == nil {
99 slog.Debug("Write access check skipped - no authorizer configured",
100 "component", "proxy_blob_store")
101 return nil
102 }
103
104 slog.Debug("Checking write access",
105 "component", "proxy_blob_store",
106 "user_did", p.ctx.DID,
107 "hold_did", p.ctx.HoldDID)
108
109 allowed, err := p.ctx.Authorizer.CheckWriteAccess(ctx, p.ctx.HoldDID, p.ctx.DID)
110 if err != nil {
111 // Authorization check itself failed (network, PDS error, etc.)
112 slog.Error("Write access authorization check failed",
113 "component", "proxy_blob_store",
114 "user_did", p.ctx.DID,
115 "hold_did", p.ctx.HoldDID,
116 "denial_reason", "authorization_check_error",
117 "error", err)
118 return fmt.Errorf("authorization check failed: %w", err)
119 }
120
121 if !allowed {
122 // Access explicitly denied (logged in detail by authorizer)
123 slog.Warn("Write access denied",
124 "component", "proxy_blob_store",
125 "user_did", p.ctx.DID,
126 "hold_did", p.ctx.HoldDID,
127 "denial_reason", "access_denied_by_authorizer",
128 "hint", "check DEBUG logs for specific denial reason (denial_reason field)")
129 return errcode.ErrorCodeDenied.WithMessage(fmt.Sprintf("write access denied to hold %s", p.ctx.HoldDID))
130 }
131
132 slog.Debug("Write access allowed",
133 "component", "proxy_blob_store",
134 "user_did", p.ctx.DID,
135 "hold_did", p.ctx.HoldDID)
136 return nil
137}
138
139// Stat returns the descriptor for a blob
140func (p *ProxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
141 // Check read access
142 if err := p.checkReadAccess(ctx); err != nil {
143 return distribution.Descriptor{}, err
144 }
145
146 method := "HEAD"
147
148 url, err := p.getPresignedURL(ctx, method, dgst)
149 if err != nil {
150 return distribution.Descriptor{}, distribution.ErrBlobUnknown
151 }
152
153 // Make HEAD request to presigned URL
154 req, err := http.NewRequestWithContext(ctx, method, url, nil)
155 if err != nil {
156 return distribution.Descriptor{}, distribution.ErrBlobUnknown
157 }
158
159 // Go directly to the presigned URL, no need to authenticate
160 resp, err := p.httpClient.Do(req)
161 if err != nil {
162 return distribution.Descriptor{}, distribution.ErrBlobUnknown
163 }
164 defer resp.Body.Close()
165
166 if resp.StatusCode != http.StatusOK {
167 return distribution.Descriptor{}, distribution.ErrBlobUnknown
168 }
169
170 // Return a minimal descriptor with size from Content-Length if available
171 size := int64(0)
172 if contentLength := resp.Header.Get("Content-Length"); contentLength != "" {
173 fmt.Sscanf(contentLength, "%d", &size)
174 }
175
176 return distribution.Descriptor{
177 Digest: dgst,
178 Size: size,
179 MediaType: "application/octet-stream",
180 }, nil
181}
182
183// Get retrieves a blob
184func (p *ProxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
185 // Check read access
186 if err := p.checkReadAccess(ctx); err != nil {
187 return nil, err
188 }
189
190 method := "GET"
191
192 url, err := p.getPresignedURL(ctx, method, dgst)
193 if err != nil {
194 return nil, err
195 }
196
197 // Download the blob from presigned URL
198 req, err := http.NewRequestWithContext(ctx, method, url, nil)
199 if err != nil {
200 return nil, err
201 }
202
203 // Go directly to the presigned URL, no need to authenticate
204 resp, err := p.httpClient.Do(req)
205 if err != nil {
206 return nil, err
207 }
208
209 defer resp.Body.Close()
210
211 if resp.StatusCode != http.StatusOK {
212 return nil, distribution.ErrBlobUnknown
213 }
214
215 return io.ReadAll(resp.Body)
216}
217
218// Open returns a reader for a blob
219func (p *ProxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
220 // Check read access
221 if err := p.checkReadAccess(ctx); err != nil {
222 return nil, err
223 }
224
225 method := "GET"
226
227 url, err := p.getPresignedURL(ctx, method, dgst)
228 if err != nil {
229 return nil, err
230 }
231
232 // Download the blob from presigned URL
233 req, err := http.NewRequestWithContext(ctx, method, url, nil)
234 if err != nil {
235 return nil, err
236 }
237
238 // Go directly to the presigned URL, no need to authenticate
239 resp, err := p.httpClient.Do(req)
240 if err != nil {
241 return nil, err
242 }
243
244 if resp.StatusCode != http.StatusOK {
245 resp.Body.Close()
246 return nil, distribution.ErrBlobUnknown
247 }
248
249 // Wrap in a ReadSeekCloser
250 return &readSeekCloser{
251 ReadCloser: resp.Body,
252 }, nil
253}
254
255// Put stores a blob using the multipart upload flow
256// This ensures all uploads go through the same XRPC path
257func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []byte) (distribution.Descriptor, error) {
258 // Check write access (fast-fail before starting multipart upload)
259 if err := p.checkWriteAccess(ctx); err != nil {
260 return distribution.Descriptor{}, err
261 }
262
263 // Calculate digest
264 dgst := digest.FromBytes(content)
265
266 // Use Create() flow for all uploads (goes through multipart XRPC endpoints)
267 writer, err := p.Create(ctx)
268 if err != nil {
269 slog.Error("Failed to create writer", "component", "proxy_blob_store/Put", "error", err)
270 return distribution.Descriptor{}, err
271 }
272
273 // Write the content
274 if _, err := writer.Write(content); err != nil {
275 writer.Cancel(ctx)
276 slog.Error("Failed to write content", "component", "proxy_blob_store/Put", "error", err)
277 return distribution.Descriptor{}, err
278 }
279
280 // Commit with the calculated digest
281 desc, err := writer.Commit(ctx, distribution.Descriptor{
282 Digest: dgst,
283 Size: int64(len(content)),
284 MediaType: mediaType,
285 })
286 if err != nil {
287 slog.Error("Failed to commit", "component", "proxy_blob_store/Put", "error", err)
288 return distribution.Descriptor{}, err
289 }
290
291 slog.Debug("Upload successful", "component", "proxy_blob_store/Put", "digest", dgst, "size", len(content))
292 return desc, nil
293}
294
295// Delete removes a blob
296func (p *ProxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
297 // Not implemented - storage service would need a delete endpoint
298 return fmt.Errorf("delete not supported for proxy blob store")
299}
300
301// ServeBlob serves a blob via HTTP redirect or proxied response
302func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
303 // Check read access
304 if err := p.checkReadAccess(ctx); err != nil {
305 return err
306 }
307
308 url, err := p.getPresignedURL(ctx, r.Method, dgst)
309 if err != nil {
310 return err
311 }
312
313 // Redirect to presigned URL
314 http.Redirect(w, r, url, http.StatusTemporaryRedirect)
315 return nil
316}
317
318// Create returns a blob writer for uploading using multipart upload
319func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
320 // Check write access
321 if err := p.checkWriteAccess(ctx); err != nil {
322 return nil, err
323 }
324
325 // Parse options
326 var opts distribution.CreateOptions
327 for _, option := range options {
328 if err := option.Apply(&opts); err != nil {
329 return nil, err
330 }
331 }
332
333 // Generate unique writer ID
334 writerID := fmt.Sprintf("upload-%d", time.Now().UnixNano())
335
336 // Use temp digest for upload location (will be moved to final digest on commit)
337 tempDigest := fmt.Sprintf("uploads/temp-%s", writerID)
338
339 // Start multipart upload via hold service
340 uploadID, err := p.startMultipartUpload(ctx, tempDigest)
341 if err != nil {
342 return nil, err
343 }
344
345 writer := &ProxyBlobWriter{
346 store: p,
347 options: opts,
348 uploadID: uploadID,
349 parts: make([]CompletedPart, 0),
350 partNumber: 1,
351 buffer: bytes.NewBuffer(make([]byte, 0, maxChunkSize)),
352 id: writerID,
353 startedAt: time.Now(),
354 }
355
356 // Store in global uploads map for resume support
357 globalUploadsMu.Lock()
358 globalUploads[writer.id] = writer
359 globalUploadsMu.Unlock()
360
361 return writer, nil
362}
363
364// Resume returns a blob writer for resuming an upload
365func (p *ProxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
366 // Retrieve upload from global map
367 globalUploadsMu.RLock()
368 writer, ok := globalUploads[id]
369 globalUploadsMu.RUnlock()
370
371 if !ok {
372 return nil, distribution.ErrBlobUploadUnknown
373 }
374
375 // Just return the writer - parts are buffered and flushed on demand
376 return writer, nil
377}
378
379// getPresignedURL returns the XRPC endpoint URL for blob operations
380func (p *ProxyBlobStore) getPresignedURL(ctx context.Context, operation string, dgst digest.Digest) (string, error) {
381 // Use XRPC endpoint: /xrpc/com.atproto.sync.getBlob?did={userDID}&cid={digest}
382 // The 'did' parameter is the USER's DID (whose blob we're fetching), not the hold service DID
383 // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix)
384 xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s",
385 p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation)
386
387 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil)
388 if err != nil {
389 return "", fmt.Errorf("failed to create request: %w", err)
390 }
391
392 resp, err := p.doAuthenticatedRequest(ctx, req)
393 if err != nil {
394 // Don't wrap errcode errors - return them directly
395 if _, ok := err.(errcode.Error); ok {
396 return "", err
397 }
398 return "", fmt.Errorf("failed to get presigned URL: %w", err)
399 }
400 defer resp.Body.Close()
401
402 if resp.StatusCode != http.StatusOK {
403 bodyBytes, _ := io.ReadAll(resp.Body)
404 return "", fmt.Errorf("hold service returned error: status %d, body: %s", resp.StatusCode, string(bodyBytes))
405 }
406
407 // Parse JSON response to get presigned HEAD URL
408 var result struct {
409 URL string `json:"url"`
410 }
411 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
412 return "", fmt.Errorf("failed to parse hold service response: %w", err)
413 }
414
415 if result.URL == "" {
416 return "", fmt.Errorf("hold service returned empty URL")
417 }
418
419 slog.Debug("Got presigned HEAD URL from hold service", "component", "proxy_blob_store", "url", result.URL)
420 return result.URL, nil
421}
422
423// startMultipartUpload initiates a multipart upload via XRPC initiateUpload endpoint
424func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, digest string) (string, error) {
425 reqBody := map[string]any{
426 "digest": digest,
427 }
428
429 body, err := json.Marshal(reqBody)
430 if err != nil {
431 return "", err
432 }
433
434 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldInitiateUpload)
435 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
436 if err != nil {
437 return "", err
438 }
439 req.Header.Set("Content-Type", "application/json")
440
441 // Use authenticated request (OAuth with DPoP)
442 resp, err := p.doAuthenticatedRequest(ctx, req)
443 if err != nil {
444 return "", err
445 }
446 defer resp.Body.Close()
447
448 if resp.StatusCode != http.StatusOK {
449 bodyBytes, _ := io.ReadAll(resp.Body)
450 return "", fmt.Errorf("start multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
451 }
452
453 var result struct {
454 UploadID string `json:"uploadId"`
455 }
456 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
457 return "", err
458 }
459
460 return result.UploadID, nil
461}
462
463// PartUploadInfo contains structured information for uploading a part
464type PartUploadInfo struct {
465 URL string `json:"url"`
466 Method string `json:"method,omitempty"`
467 Headers map[string]string `json:"headers,omitempty"`
468}
469
470// getPartUploadInfo gets structured upload info for uploading a specific part via XRPC
471func (p *ProxyBlobStore) getPartUploadInfo(ctx context.Context, digest, uploadID string, partNumber int) (*PartUploadInfo, error) {
472 reqBody := map[string]any{
473 "uploadId": uploadID,
474 "partNumber": partNumber,
475 }
476
477 body, err := json.Marshal(reqBody)
478 if err != nil {
479 return nil, err
480 }
481
482 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldGetPartUploadURL)
483 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
484 if err != nil {
485 return nil, err
486 }
487 req.Header.Set("Content-Type", "application/json")
488
489 // Use authenticated request (OAuth with DPoP)
490 resp, err := p.doAuthenticatedRequest(ctx, req)
491 if err != nil {
492 return nil, err
493 }
494 defer resp.Body.Close()
495
496 if resp.StatusCode != http.StatusOK {
497 bodyBytes, _ := io.ReadAll(resp.Body)
498 return nil, fmt.Errorf("get part URL failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
499 }
500
501 var uploadInfo PartUploadInfo
502 if err := json.NewDecoder(resp.Body).Decode(&uploadInfo); err != nil {
503 return nil, err
504 }
505
506 return &uploadInfo, nil
507}
508
509// completeMultipartUpload completes a multipart upload via XRPC completeUpload endpoint
510// The XRPC complete action handles the move from temp to final location internally
511func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error {
512 // Convert parts to XRPC format
513 xrpcParts := make([]map[string]any, len(parts))
514 for i, part := range parts {
515 xrpcParts[i] = map[string]any{
516 "part_number": part.PartNumber,
517 "etag": part.ETag,
518 }
519 }
520
521 reqBody := map[string]any{
522 "uploadId": uploadID,
523 "digest": digest,
524 "parts": xrpcParts,
525 }
526
527 body, err := json.Marshal(reqBody)
528 if err != nil {
529 return err
530 }
531
532 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldCompleteUpload)
533 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
534 if err != nil {
535 return err
536 }
537 req.Header.Set("Content-Type", "application/json")
538
539 // Use authenticated request (OAuth with DPoP)
540 resp, err := p.doAuthenticatedRequest(ctx, req)
541 if err != nil {
542 return err
543 }
544 defer resp.Body.Close()
545
546 if resp.StatusCode != http.StatusOK {
547 bodyBytes, _ := io.ReadAll(resp.Body)
548 return fmt.Errorf("complete multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
549 }
550
551 return nil
552}
553
554// abortMultipartUpload aborts a multipart upload via XRPC abortUpload endpoint
555func (p *ProxyBlobStore) abortMultipartUpload(ctx context.Context, digest, uploadID string) error {
556 reqBody := map[string]any{
557 "uploadId": uploadID,
558 }
559
560 body, err := json.Marshal(reqBody)
561 if err != nil {
562 return err
563 }
564
565 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldAbortUpload)
566 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
567 if err != nil {
568 return err
569 }
570 req.Header.Set("Content-Type", "application/json")
571
572 // Use authenticated request (OAuth with DPoP)
573 resp, err := p.doAuthenticatedRequest(ctx, req)
574 if err != nil {
575 return err
576 }
577 defer resp.Body.Close()
578
579 if resp.StatusCode != http.StatusOK {
580 bodyBytes, _ := io.ReadAll(resp.Body)
581 return fmt.Errorf("abort multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
582 }
583
584 return nil
585}
586
587// CompletedPart represents an uploaded part with its ETag
588type CompletedPart struct {
589 PartNumber int `json:"part_number"`
590 ETag string `json:"etag"`
591}
592
593// ProxyBlobWriter implements distribution.BlobWriter for proxy uploads using multipart upload
594type ProxyBlobWriter struct {
595 store *ProxyBlobStore
596 options distribution.CreateOptions
597 uploadID string // S3 multipart upload ID
598 parts []CompletedPart // Track uploaded parts with ETags
599 partNumber int // Current part number (starts at 1)
600 buffer *bytes.Buffer // Buffer for current part
601 size int64 // Total bytes written
602 closed bool
603 id string // Distribution's upload ID (for state)
604 startedAt time.Time
605}
606
607// ID returns the upload ID
608func (w *ProxyBlobWriter) ID() string {
609 return w.id
610}
611
612// StartedAt returns when the upload started
613func (w *ProxyBlobWriter) StartedAt() time.Time {
614 return w.startedAt
615}
616
617// Write writes data to the upload
618// Buffers data and flushes when buffer reaches 5MB
619func (w *ProxyBlobWriter) Write(p []byte) (int, error) {
620 if w.closed {
621 return 0, fmt.Errorf("writer closed")
622 }
623
624 n, err := w.buffer.Write(p)
625 w.size += int64(n)
626
627 // Flush if buffer reaches limit (S3 part size)
628 if w.buffer.Len() >= maxChunkSize {
629 if err := w.flushPart(); err != nil {
630 return n, err
631 }
632 }
633
634 return n, err
635}
636
637// flushPart uploads the current buffer as a part
638func (w *ProxyBlobWriter) flushPart() error {
639 if w.buffer.Len() == 0 {
640 return nil
641 }
642
643 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
644 defer cancel()
645
646 // Get structured upload info for this part
647 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id)
648 uploadInfo, err := w.store.getPartUploadInfo(ctx, tempDigest, w.uploadID, w.partNumber)
649 if err != nil {
650 return fmt.Errorf("failed to get part upload info: %w", err)
651 }
652
653 // Determine HTTP method (default to PUT)
654 method := uploadInfo.Method
655 if method == "" {
656 method = "PUT"
657 }
658
659 // Upload part (either to S3 presigned URL or back to XRPC with headers)
660 req, err := http.NewRequestWithContext(ctx, method, uploadInfo.URL, bytes.NewReader(w.buffer.Bytes()))
661 if err != nil {
662 return err
663 }
664 req.Header.Set("Content-Type", "application/octet-stream")
665
666 // Apply any additional headers from the response (for buffered mode)
667 for key, value := range uploadInfo.Headers {
668 req.Header.Set(key, value)
669 }
670
671 resp, err := w.store.httpClient.Do(req)
672 if err != nil {
673 return err
674 }
675 defer resp.Body.Close()
676
677 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
678 bodyBytes, _ := io.ReadAll(resp.Body)
679 return fmt.Errorf("part upload failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
680 }
681
682 // Store ETag for completion
683 // For buffered mode, ETag might be in JSON response body
684 etag := resp.Header.Get("ETag")
685 if etag == "" {
686 // Try to parse JSON response for buffered mode
687 var result struct {
688 ETag string `json:"etag"`
689 }
690 if err := json.NewDecoder(resp.Body).Decode(&result); err == nil && result.ETag != "" {
691 etag = result.ETag
692 } else {
693 return fmt.Errorf("no ETag in response")
694 }
695 }
696
697 w.parts = append(w.parts, CompletedPart{
698 PartNumber: w.partNumber,
699 ETag: etag,
700 })
701
702 slog.Debug("Part uploaded successfully", "component", "proxy_blob_store/flushPart", "part_number", w.partNumber, "etag", etag)
703
704 // Reset buffer and increment part number
705 w.buffer.Reset()
706 w.partNumber++
707
708 return nil
709}
710
711// ReadFrom reads from a reader
712func (w *ProxyBlobWriter) ReadFrom(r io.Reader) (int64, error) {
713 if w.closed {
714 return 0, fmt.Errorf("writer closed")
715 }
716
717 // Read in chunks and flush when needed
718 buf := make([]byte, 32*1024) // 32KB read buffer
719 var total int64
720
721 for {
722 nr, err := r.Read(buf)
723 if nr > 0 {
724 nw, werr := w.Write(buf[:nr])
725 total += int64(nw)
726 if werr != nil {
727 return total, werr
728 }
729 }
730 if err == io.EOF {
731 break
732 }
733 if err != nil {
734 return total, err
735 }
736 }
737
738 return total, nil
739}
740
741// Size returns the current size
742func (w *ProxyBlobWriter) Size() int64 {
743 return w.size
744}
745
746// Commit finalizes the upload by completing multipart upload and moving to final location
747func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
748 if w.closed {
749 return distribution.Descriptor{}, fmt.Errorf("writer closed")
750 }
751 w.closed = true
752
753 // Remove from global uploads map
754 globalUploadsMu.Lock()
755 delete(globalUploads, w.id)
756 globalUploadsMu.Unlock()
757
758 // Flush any remaining buffered data
759 if w.buffer.Len() > 0 {
760 slog.Debug("Flushing final buffer", "component", "proxy_blob_store/Commit", "bytes", w.buffer.Len())
761 if err := w.flushPart(); err != nil {
762 // Try to abort multipart on error
763 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id)
764 w.store.abortMultipartUpload(ctx, tempDigest, w.uploadID)
765 return distribution.Descriptor{}, fmt.Errorf("failed to flush final part: %w", err)
766 }
767 }
768
769 // Complete multipart upload - XRPC complete action handles move internally
770 // Send the real digest (not tempDigest) so hold can move temp → final location
771 slog.Info("Completing multipart upload", "component", "proxy_blob_store/Commit", "upload_id", w.uploadID, "parts", len(w.parts), "digest", desc.Digest)
772 if err := w.store.completeMultipartUpload(ctx, desc.Digest.String(), w.uploadID, w.parts); err != nil {
773 return distribution.Descriptor{}, fmt.Errorf("failed to complete multipart upload: %w", err)
774 }
775
776 slog.Info("Upload completed successfully", "component", "proxy_blob_store/Commit", "digest", desc.Digest, "size", w.size, "parts", len(w.parts))
777
778 return distribution.Descriptor{
779 Digest: desc.Digest,
780 Size: w.size,
781 MediaType: desc.MediaType,
782 }, nil
783}
784
785// Cancel cancels the upload by aborting the multipart upload
786func (w *ProxyBlobWriter) Cancel(ctx context.Context) error {
787 w.closed = true
788
789 slog.Debug("Cancelling upload", "component", "proxy_blob_store/Cancel", "id", w.id)
790
791 // Remove from global uploads map
792 globalUploadsMu.Lock()
793 delete(globalUploads, w.id)
794 globalUploadsMu.Unlock()
795
796 // Abort multipart upload
797 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id)
798 if err := w.store.abortMultipartUpload(ctx, tempDigest, w.uploadID); err != nil {
799 slog.Warn("Failed to abort multipart upload", "component", "proxy_blob_store/Cancel", "error", err)
800 // Continue anyway - we want to mark upload as cancelled
801 }
802
803 slog.Debug("Upload cancelled", "component", "proxy_blob_store/Cancel", "id", w.id)
804 return nil
805}
806
807// Close closes the writer
808// Parts are flushed on demand, so this is a no-op
809func (w *ProxyBlobWriter) Close() error {
810 // Don't set w.closed = true - allow resuming for next PATCH
811 return nil
812}
813
814// readSeekCloser wraps an io.ReadCloser to implement ReadSeekCloser
815type readSeekCloser struct {
816 io.ReadCloser
817}
818
819func (r *readSeekCloser) Seek(offset int64, whence int) (int64, error) {
820 // Not implemented - would need buffering or re-downloading
821 return 0, fmt.Errorf("seek not supported")
822}