A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at codeberg-source 822 lines 25 kB view raw
1package storage 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "sync" 12 "time" 13 14 "atcr.io/pkg/atproto" 15 "github.com/distribution/distribution/v3" 16 "github.com/distribution/distribution/v3/registry/api/errcode" 17 "github.com/opencontainers/go-digest" 18) 19 20const ( 21 // maxChunkSize is the maximum buffer size before flushing to hold service 22 // Matches S3's minimum multipart upload size 23 maxChunkSize = 10 * 1024 * 1024 // 10MB 24) 25 26// Global upload tracking (shared across all ProxyBlobStore instances) 27// This is necessary because distribution creates new repository/blob store instances per request 28var ( 29 globalUploads = make(map[string]*ProxyBlobWriter) 30 globalUploadsMu sync.RWMutex 31) 32 33// ProxyBlobStore proxies blob requests to an external storage service 34type ProxyBlobStore struct { 35 ctx *RegistryContext // All context and services 36 holdURL string // Resolved HTTP URL for XRPC requests 37 httpClient *http.Client 38} 39 40// NewProxyBlobStore creates a new proxy blob store 41func NewProxyBlobStore(ctx *RegistryContext) *ProxyBlobStore { 42 // Resolve DID to URL once at construction time 43 holdURL := atproto.ResolveHoldURL(ctx.HoldDID) 44 45 slog.Debug("NewProxyBlobStore created", "component", "proxy_blob_store", "hold_did", ctx.HoldDID, "hold_url", holdURL, "user_did", ctx.DID, "repo", ctx.Repository) 46 47 return &ProxyBlobStore{ 48 ctx: ctx, 49 holdURL: holdURL, 50 httpClient: &http.Client{ 51 Timeout: 5 * time.Minute, // Timeout for presigned URL requests and uploads 52 Transport: &http.Transport{ 53 DisableKeepAlives: false, // Re-enable keep-alive 54 MaxIdleConns: 100, 55 MaxIdleConnsPerHost: 100, 56 MaxConnsPerHost: 0, // unlimited 57 IdleConnTimeout: 90 * time.Second, 58 }, 59 }, 60 } 61} 62 63// doAuthenticatedRequest performs an HTTP request with service token authentication 64// Uses the service token from middleware to authenticate requests to the hold service 65func (p *ProxyBlobStore) doAuthenticatedRequest(ctx context.Context, req *http.Request) (*http.Response, error) { 66 // Use service token that middleware already validated and cached 67 // Middleware fails fast with HTTP 401 if OAuth session is invalid 68 if p.ctx.ServiceToken == "" { 69 // Should never happen - middleware validates OAuth before handlers run 70 slog.Error("No service token in context", "component", "proxy_blob_store", "did", p.ctx.DID) 71 return nil, fmt.Errorf("no service token available (middleware should have validated)") 72 } 73 74 // Add Bearer token to Authorization header 75 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.ctx.ServiceToken)) 76 77 return p.httpClient.Do(req) 78} 79 80// checkReadAccess validates that the user has read access to blobs in this hold 81func (p *ProxyBlobStore) checkReadAccess(ctx context.Context) error { 82 if p.ctx.Authorizer == nil { 83 return nil // No authorization check if authorizer not configured 84 } 85 allowed, err := p.ctx.Authorizer.CheckReadAccess(ctx, p.ctx.HoldDID, p.ctx.DID) 86 if err != nil { 87 return fmt.Errorf("authorization check failed: %w", err) 88 } 89 if !allowed { 90 // Return 403 Forbidden instead of masquerading as missing blob 91 return errcode.ErrorCodeDenied.WithMessage("read access denied") 92 } 93 return nil 94} 95 96// checkWriteAccess validates that the user has write access to blobs in this hold 97func (p *ProxyBlobStore) checkWriteAccess(ctx context.Context) error { 98 if p.ctx.Authorizer == nil { 99 slog.Debug("Write access check skipped - no authorizer configured", 100 "component", "proxy_blob_store") 101 return nil 102 } 103 104 slog.Debug("Checking write access", 105 "component", "proxy_blob_store", 106 "user_did", p.ctx.DID, 107 "hold_did", p.ctx.HoldDID) 108 109 allowed, err := p.ctx.Authorizer.CheckWriteAccess(ctx, p.ctx.HoldDID, p.ctx.DID) 110 if err != nil { 111 // Authorization check itself failed (network, PDS error, etc.) 112 slog.Error("Write access authorization check failed", 113 "component", "proxy_blob_store", 114 "user_did", p.ctx.DID, 115 "hold_did", p.ctx.HoldDID, 116 "denial_reason", "authorization_check_error", 117 "error", err) 118 return fmt.Errorf("authorization check failed: %w", err) 119 } 120 121 if !allowed { 122 // Access explicitly denied (logged in detail by authorizer) 123 slog.Warn("Write access denied", 124 "component", "proxy_blob_store", 125 "user_did", p.ctx.DID, 126 "hold_did", p.ctx.HoldDID, 127 "denial_reason", "access_denied_by_authorizer", 128 "hint", "check DEBUG logs for specific denial reason (denial_reason field)") 129 return errcode.ErrorCodeDenied.WithMessage(fmt.Sprintf("write access denied to hold %s", p.ctx.HoldDID)) 130 } 131 132 slog.Debug("Write access allowed", 133 "component", "proxy_blob_store", 134 "user_did", p.ctx.DID, 135 "hold_did", p.ctx.HoldDID) 136 return nil 137} 138 139// Stat returns the descriptor for a blob 140func (p *ProxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { 141 // Check read access 142 if err := p.checkReadAccess(ctx); err != nil { 143 return distribution.Descriptor{}, err 144 } 145 146 method := "HEAD" 147 148 url, err := p.getPresignedURL(ctx, method, dgst) 149 if err != nil { 150 return distribution.Descriptor{}, distribution.ErrBlobUnknown 151 } 152 153 // Make HEAD request to presigned URL 154 req, err := http.NewRequestWithContext(ctx, method, url, nil) 155 if err != nil { 156 return distribution.Descriptor{}, distribution.ErrBlobUnknown 157 } 158 159 // Go directly to the presigned URL, no need to authenticate 160 resp, err := p.httpClient.Do(req) 161 if err != nil { 162 return distribution.Descriptor{}, distribution.ErrBlobUnknown 163 } 164 defer resp.Body.Close() 165 166 if resp.StatusCode != http.StatusOK { 167 return distribution.Descriptor{}, distribution.ErrBlobUnknown 168 } 169 170 // Return a minimal descriptor with size from Content-Length if available 171 size := int64(0) 172 if contentLength := resp.Header.Get("Content-Length"); contentLength != "" { 173 fmt.Sscanf(contentLength, "%d", &size) 174 } 175 176 return distribution.Descriptor{ 177 Digest: dgst, 178 Size: size, 179 MediaType: "application/octet-stream", 180 }, nil 181} 182 183// Get retrieves a blob 184func (p *ProxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { 185 // Check read access 186 if err := p.checkReadAccess(ctx); err != nil { 187 return nil, err 188 } 189 190 method := "GET" 191 192 url, err := p.getPresignedURL(ctx, method, dgst) 193 if err != nil { 194 return nil, err 195 } 196 197 // Download the blob from presigned URL 198 req, err := http.NewRequestWithContext(ctx, method, url, nil) 199 if err != nil { 200 return nil, err 201 } 202 203 // Go directly to the presigned URL, no need to authenticate 204 resp, err := p.httpClient.Do(req) 205 if err != nil { 206 return nil, err 207 } 208 209 defer resp.Body.Close() 210 211 if resp.StatusCode != http.StatusOK { 212 return nil, distribution.ErrBlobUnknown 213 } 214 215 return io.ReadAll(resp.Body) 216} 217 218// Open returns a reader for a blob 219func (p *ProxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) { 220 // Check read access 221 if err := p.checkReadAccess(ctx); err != nil { 222 return nil, err 223 } 224 225 method := "GET" 226 227 url, err := p.getPresignedURL(ctx, method, dgst) 228 if err != nil { 229 return nil, err 230 } 231 232 // Download the blob from presigned URL 233 req, err := http.NewRequestWithContext(ctx, method, url, nil) 234 if err != nil { 235 return nil, err 236 } 237 238 // Go directly to the presigned URL, no need to authenticate 239 resp, err := p.httpClient.Do(req) 240 if err != nil { 241 return nil, err 242 } 243 244 if resp.StatusCode != http.StatusOK { 245 resp.Body.Close() 246 return nil, distribution.ErrBlobUnknown 247 } 248 249 // Wrap in a ReadSeekCloser 250 return &readSeekCloser{ 251 ReadCloser: resp.Body, 252 }, nil 253} 254 255// Put stores a blob using the multipart upload flow 256// This ensures all uploads go through the same XRPC path 257func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []byte) (distribution.Descriptor, error) { 258 // Check write access (fast-fail before starting multipart upload) 259 if err := p.checkWriteAccess(ctx); err != nil { 260 return distribution.Descriptor{}, err 261 } 262 263 // Calculate digest 264 dgst := digest.FromBytes(content) 265 266 // Use Create() flow for all uploads (goes through multipart XRPC endpoints) 267 writer, err := p.Create(ctx) 268 if err != nil { 269 slog.Error("Failed to create writer", "component", "proxy_blob_store/Put", "error", err) 270 return distribution.Descriptor{}, err 271 } 272 273 // Write the content 274 if _, err := writer.Write(content); err != nil { 275 writer.Cancel(ctx) 276 slog.Error("Failed to write content", "component", "proxy_blob_store/Put", "error", err) 277 return distribution.Descriptor{}, err 278 } 279 280 // Commit with the calculated digest 281 desc, err := writer.Commit(ctx, distribution.Descriptor{ 282 Digest: dgst, 283 Size: int64(len(content)), 284 MediaType: mediaType, 285 }) 286 if err != nil { 287 slog.Error("Failed to commit", "component", "proxy_blob_store/Put", "error", err) 288 return distribution.Descriptor{}, err 289 } 290 291 slog.Debug("Upload successful", "component", "proxy_blob_store/Put", "digest", dgst, "size", len(content)) 292 return desc, nil 293} 294 295// Delete removes a blob 296func (p *ProxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { 297 // Not implemented - storage service would need a delete endpoint 298 return fmt.Errorf("delete not supported for proxy blob store") 299} 300 301// ServeBlob serves a blob via HTTP redirect or proxied response 302func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { 303 // Check read access 304 if err := p.checkReadAccess(ctx); err != nil { 305 return err 306 } 307 308 url, err := p.getPresignedURL(ctx, r.Method, dgst) 309 if err != nil { 310 return err 311 } 312 313 // Redirect to presigned URL 314 http.Redirect(w, r, url, http.StatusTemporaryRedirect) 315 return nil 316} 317 318// Create returns a blob writer for uploading using multipart upload 319func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { 320 // Check write access 321 if err := p.checkWriteAccess(ctx); err != nil { 322 return nil, err 323 } 324 325 // Parse options 326 var opts distribution.CreateOptions 327 for _, option := range options { 328 if err := option.Apply(&opts); err != nil { 329 return nil, err 330 } 331 } 332 333 // Generate unique writer ID 334 writerID := fmt.Sprintf("upload-%d", time.Now().UnixNano()) 335 336 // Use temp digest for upload location (will be moved to final digest on commit) 337 tempDigest := fmt.Sprintf("uploads/temp-%s", writerID) 338 339 // Start multipart upload via hold service 340 uploadID, err := p.startMultipartUpload(ctx, tempDigest) 341 if err != nil { 342 return nil, err 343 } 344 345 writer := &ProxyBlobWriter{ 346 store: p, 347 options: opts, 348 uploadID: uploadID, 349 parts: make([]CompletedPart, 0), 350 partNumber: 1, 351 buffer: bytes.NewBuffer(make([]byte, 0, maxChunkSize)), 352 id: writerID, 353 startedAt: time.Now(), 354 } 355 356 // Store in global uploads map for resume support 357 globalUploadsMu.Lock() 358 globalUploads[writer.id] = writer 359 globalUploadsMu.Unlock() 360 361 return writer, nil 362} 363 364// Resume returns a blob writer for resuming an upload 365func (p *ProxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { 366 // Retrieve upload from global map 367 globalUploadsMu.RLock() 368 writer, ok := globalUploads[id] 369 globalUploadsMu.RUnlock() 370 371 if !ok { 372 return nil, distribution.ErrBlobUploadUnknown 373 } 374 375 // Just return the writer - parts are buffered and flushed on demand 376 return writer, nil 377} 378 379// getPresignedURL returns the XRPC endpoint URL for blob operations 380func (p *ProxyBlobStore) getPresignedURL(ctx context.Context, operation string, dgst digest.Digest) (string, error) { 381 // Use XRPC endpoint: /xrpc/com.atproto.sync.getBlob?did={userDID}&cid={digest} 382 // The 'did' parameter is the USER's DID (whose blob we're fetching), not the hold service DID 383 // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix) 384 xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s", 385 p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation) 386 387 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 388 if err != nil { 389 return "", fmt.Errorf("failed to create request: %w", err) 390 } 391 392 resp, err := p.doAuthenticatedRequest(ctx, req) 393 if err != nil { 394 // Don't wrap errcode errors - return them directly 395 if _, ok := err.(errcode.Error); ok { 396 return "", err 397 } 398 return "", fmt.Errorf("failed to get presigned URL: %w", err) 399 } 400 defer resp.Body.Close() 401 402 if resp.StatusCode != http.StatusOK { 403 bodyBytes, _ := io.ReadAll(resp.Body) 404 return "", fmt.Errorf("hold service returned error: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 405 } 406 407 // Parse JSON response to get presigned HEAD URL 408 var result struct { 409 URL string `json:"url"` 410 } 411 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 412 return "", fmt.Errorf("failed to parse hold service response: %w", err) 413 } 414 415 if result.URL == "" { 416 return "", fmt.Errorf("hold service returned empty URL") 417 } 418 419 slog.Debug("Got presigned HEAD URL from hold service", "component", "proxy_blob_store", "url", result.URL) 420 return result.URL, nil 421} 422 423// startMultipartUpload initiates a multipart upload via XRPC initiateUpload endpoint 424func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, digest string) (string, error) { 425 reqBody := map[string]any{ 426 "digest": digest, 427 } 428 429 body, err := json.Marshal(reqBody) 430 if err != nil { 431 return "", err 432 } 433 434 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldInitiateUpload) 435 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 436 if err != nil { 437 return "", err 438 } 439 req.Header.Set("Content-Type", "application/json") 440 441 // Use authenticated request (OAuth with DPoP) 442 resp, err := p.doAuthenticatedRequest(ctx, req) 443 if err != nil { 444 return "", err 445 } 446 defer resp.Body.Close() 447 448 if resp.StatusCode != http.StatusOK { 449 bodyBytes, _ := io.ReadAll(resp.Body) 450 return "", fmt.Errorf("start multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 451 } 452 453 var result struct { 454 UploadID string `json:"uploadId"` 455 } 456 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 457 return "", err 458 } 459 460 return result.UploadID, nil 461} 462 463// PartUploadInfo contains structured information for uploading a part 464type PartUploadInfo struct { 465 URL string `json:"url"` 466 Method string `json:"method,omitempty"` 467 Headers map[string]string `json:"headers,omitempty"` 468} 469 470// getPartUploadInfo gets structured upload info for uploading a specific part via XRPC 471func (p *ProxyBlobStore) getPartUploadInfo(ctx context.Context, digest, uploadID string, partNumber int) (*PartUploadInfo, error) { 472 reqBody := map[string]any{ 473 "uploadId": uploadID, 474 "partNumber": partNumber, 475 } 476 477 body, err := json.Marshal(reqBody) 478 if err != nil { 479 return nil, err 480 } 481 482 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldGetPartUploadURL) 483 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 484 if err != nil { 485 return nil, err 486 } 487 req.Header.Set("Content-Type", "application/json") 488 489 // Use authenticated request (OAuth with DPoP) 490 resp, err := p.doAuthenticatedRequest(ctx, req) 491 if err != nil { 492 return nil, err 493 } 494 defer resp.Body.Close() 495 496 if resp.StatusCode != http.StatusOK { 497 bodyBytes, _ := io.ReadAll(resp.Body) 498 return nil, fmt.Errorf("get part URL failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 499 } 500 501 var uploadInfo PartUploadInfo 502 if err := json.NewDecoder(resp.Body).Decode(&uploadInfo); err != nil { 503 return nil, err 504 } 505 506 return &uploadInfo, nil 507} 508 509// completeMultipartUpload completes a multipart upload via XRPC completeUpload endpoint 510// The XRPC complete action handles the move from temp to final location internally 511func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error { 512 // Convert parts to XRPC format 513 xrpcParts := make([]map[string]any, len(parts)) 514 for i, part := range parts { 515 xrpcParts[i] = map[string]any{ 516 "part_number": part.PartNumber, 517 "etag": part.ETag, 518 } 519 } 520 521 reqBody := map[string]any{ 522 "uploadId": uploadID, 523 "digest": digest, 524 "parts": xrpcParts, 525 } 526 527 body, err := json.Marshal(reqBody) 528 if err != nil { 529 return err 530 } 531 532 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldCompleteUpload) 533 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 534 if err != nil { 535 return err 536 } 537 req.Header.Set("Content-Type", "application/json") 538 539 // Use authenticated request (OAuth with DPoP) 540 resp, err := p.doAuthenticatedRequest(ctx, req) 541 if err != nil { 542 return err 543 } 544 defer resp.Body.Close() 545 546 if resp.StatusCode != http.StatusOK { 547 bodyBytes, _ := io.ReadAll(resp.Body) 548 return fmt.Errorf("complete multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 549 } 550 551 return nil 552} 553 554// abortMultipartUpload aborts a multipart upload via XRPC abortUpload endpoint 555func (p *ProxyBlobStore) abortMultipartUpload(ctx context.Context, digest, uploadID string) error { 556 reqBody := map[string]any{ 557 "uploadId": uploadID, 558 } 559 560 body, err := json.Marshal(reqBody) 561 if err != nil { 562 return err 563 } 564 565 url := fmt.Sprintf("%s%s", p.holdURL, atproto.HoldAbortUpload) 566 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 567 if err != nil { 568 return err 569 } 570 req.Header.Set("Content-Type", "application/json") 571 572 // Use authenticated request (OAuth with DPoP) 573 resp, err := p.doAuthenticatedRequest(ctx, req) 574 if err != nil { 575 return err 576 } 577 defer resp.Body.Close() 578 579 if resp.StatusCode != http.StatusOK { 580 bodyBytes, _ := io.ReadAll(resp.Body) 581 return fmt.Errorf("abort multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 582 } 583 584 return nil 585} 586 587// CompletedPart represents an uploaded part with its ETag 588type CompletedPart struct { 589 PartNumber int `json:"part_number"` 590 ETag string `json:"etag"` 591} 592 593// ProxyBlobWriter implements distribution.BlobWriter for proxy uploads using multipart upload 594type ProxyBlobWriter struct { 595 store *ProxyBlobStore 596 options distribution.CreateOptions 597 uploadID string // S3 multipart upload ID 598 parts []CompletedPart // Track uploaded parts with ETags 599 partNumber int // Current part number (starts at 1) 600 buffer *bytes.Buffer // Buffer for current part 601 size int64 // Total bytes written 602 closed bool 603 id string // Distribution's upload ID (for state) 604 startedAt time.Time 605} 606 607// ID returns the upload ID 608func (w *ProxyBlobWriter) ID() string { 609 return w.id 610} 611 612// StartedAt returns when the upload started 613func (w *ProxyBlobWriter) StartedAt() time.Time { 614 return w.startedAt 615} 616 617// Write writes data to the upload 618// Buffers data and flushes when buffer reaches 5MB 619func (w *ProxyBlobWriter) Write(p []byte) (int, error) { 620 if w.closed { 621 return 0, fmt.Errorf("writer closed") 622 } 623 624 n, err := w.buffer.Write(p) 625 w.size += int64(n) 626 627 // Flush if buffer reaches limit (S3 part size) 628 if w.buffer.Len() >= maxChunkSize { 629 if err := w.flushPart(); err != nil { 630 return n, err 631 } 632 } 633 634 return n, err 635} 636 637// flushPart uploads the current buffer as a part 638func (w *ProxyBlobWriter) flushPart() error { 639 if w.buffer.Len() == 0 { 640 return nil 641 } 642 643 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 644 defer cancel() 645 646 // Get structured upload info for this part 647 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id) 648 uploadInfo, err := w.store.getPartUploadInfo(ctx, tempDigest, w.uploadID, w.partNumber) 649 if err != nil { 650 return fmt.Errorf("failed to get part upload info: %w", err) 651 } 652 653 // Determine HTTP method (default to PUT) 654 method := uploadInfo.Method 655 if method == "" { 656 method = "PUT" 657 } 658 659 // Upload part (either to S3 presigned URL or back to XRPC with headers) 660 req, err := http.NewRequestWithContext(ctx, method, uploadInfo.URL, bytes.NewReader(w.buffer.Bytes())) 661 if err != nil { 662 return err 663 } 664 req.Header.Set("Content-Type", "application/octet-stream") 665 666 // Apply any additional headers from the response (for buffered mode) 667 for key, value := range uploadInfo.Headers { 668 req.Header.Set(key, value) 669 } 670 671 resp, err := w.store.httpClient.Do(req) 672 if err != nil { 673 return err 674 } 675 defer resp.Body.Close() 676 677 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 678 bodyBytes, _ := io.ReadAll(resp.Body) 679 return fmt.Errorf("part upload failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 680 } 681 682 // Store ETag for completion 683 // For buffered mode, ETag might be in JSON response body 684 etag := resp.Header.Get("ETag") 685 if etag == "" { 686 // Try to parse JSON response for buffered mode 687 var result struct { 688 ETag string `json:"etag"` 689 } 690 if err := json.NewDecoder(resp.Body).Decode(&result); err == nil && result.ETag != "" { 691 etag = result.ETag 692 } else { 693 return fmt.Errorf("no ETag in response") 694 } 695 } 696 697 w.parts = append(w.parts, CompletedPart{ 698 PartNumber: w.partNumber, 699 ETag: etag, 700 }) 701 702 slog.Debug("Part uploaded successfully", "component", "proxy_blob_store/flushPart", "part_number", w.partNumber, "etag", etag) 703 704 // Reset buffer and increment part number 705 w.buffer.Reset() 706 w.partNumber++ 707 708 return nil 709} 710 711// ReadFrom reads from a reader 712func (w *ProxyBlobWriter) ReadFrom(r io.Reader) (int64, error) { 713 if w.closed { 714 return 0, fmt.Errorf("writer closed") 715 } 716 717 // Read in chunks and flush when needed 718 buf := make([]byte, 32*1024) // 32KB read buffer 719 var total int64 720 721 for { 722 nr, err := r.Read(buf) 723 if nr > 0 { 724 nw, werr := w.Write(buf[:nr]) 725 total += int64(nw) 726 if werr != nil { 727 return total, werr 728 } 729 } 730 if err == io.EOF { 731 break 732 } 733 if err != nil { 734 return total, err 735 } 736 } 737 738 return total, nil 739} 740 741// Size returns the current size 742func (w *ProxyBlobWriter) Size() int64 { 743 return w.size 744} 745 746// Commit finalizes the upload by completing multipart upload and moving to final location 747func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { 748 if w.closed { 749 return distribution.Descriptor{}, fmt.Errorf("writer closed") 750 } 751 w.closed = true 752 753 // Remove from global uploads map 754 globalUploadsMu.Lock() 755 delete(globalUploads, w.id) 756 globalUploadsMu.Unlock() 757 758 // Flush any remaining buffered data 759 if w.buffer.Len() > 0 { 760 slog.Debug("Flushing final buffer", "component", "proxy_blob_store/Commit", "bytes", w.buffer.Len()) 761 if err := w.flushPart(); err != nil { 762 // Try to abort multipart on error 763 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id) 764 w.store.abortMultipartUpload(ctx, tempDigest, w.uploadID) 765 return distribution.Descriptor{}, fmt.Errorf("failed to flush final part: %w", err) 766 } 767 } 768 769 // Complete multipart upload - XRPC complete action handles move internally 770 // Send the real digest (not tempDigest) so hold can move temp → final location 771 slog.Info("Completing multipart upload", "component", "proxy_blob_store/Commit", "upload_id", w.uploadID, "parts", len(w.parts), "digest", desc.Digest) 772 if err := w.store.completeMultipartUpload(ctx, desc.Digest.String(), w.uploadID, w.parts); err != nil { 773 return distribution.Descriptor{}, fmt.Errorf("failed to complete multipart upload: %w", err) 774 } 775 776 slog.Info("Upload completed successfully", "component", "proxy_blob_store/Commit", "digest", desc.Digest, "size", w.size, "parts", len(w.parts)) 777 778 return distribution.Descriptor{ 779 Digest: desc.Digest, 780 Size: w.size, 781 MediaType: desc.MediaType, 782 }, nil 783} 784 785// Cancel cancels the upload by aborting the multipart upload 786func (w *ProxyBlobWriter) Cancel(ctx context.Context) error { 787 w.closed = true 788 789 slog.Debug("Cancelling upload", "component", "proxy_blob_store/Cancel", "id", w.id) 790 791 // Remove from global uploads map 792 globalUploadsMu.Lock() 793 delete(globalUploads, w.id) 794 globalUploadsMu.Unlock() 795 796 // Abort multipart upload 797 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id) 798 if err := w.store.abortMultipartUpload(ctx, tempDigest, w.uploadID); err != nil { 799 slog.Warn("Failed to abort multipart upload", "component", "proxy_blob_store/Cancel", "error", err) 800 // Continue anyway - we want to mark upload as cancelled 801 } 802 803 slog.Debug("Upload cancelled", "component", "proxy_blob_store/Cancel", "id", w.id) 804 return nil 805} 806 807// Close closes the writer 808// Parts are flushed on demand, so this is a no-op 809func (w *ProxyBlobWriter) Close() error { 810 // Don't set w.closed = true - allow resuming for next PATCH 811 return nil 812} 813 814// readSeekCloser wraps an io.ReadCloser to implement ReadSeekCloser 815type readSeekCloser struct { 816 io.ReadCloser 817} 818 819func (r *readSeekCloser) Seek(offset int64, whence int) (int64, error) { 820 // Not implemented - would need buffering or re-downloading 821 return 0, fmt.Errorf("seek not supported") 822}