Select the types of activity you want to include in your feed.
remove the filesystem and buffered upload ability on the holds. going forward the only supported storage is s3. adds extra mocks and tests around uploading
···193193# HOLD_ADMIN_ENABLED=false
194194195195# ==============================================================================
196196-# STORAGE - S3 CONFIGURATION
196196+# STORAGE - S3 CONFIGURATION (REQUIRED)
197197# ==============================================================================
198198199199-# Storage driver type
200200-# Options: s3, filesystem
201201-# Default: s3
202202-STORAGE_DRIVER=s3
199199+# S3 is the only supported storage backend. Presigned URLs enable direct
200200+# client-to-S3 transfers, reducing hold bandwidth by ~99%.
203201204204-# S3 Access Credentials
202202+# S3 Access Credentials (REQUIRED)
205203AWS_ACCESS_KEY_ID=your_access_key
206204AWS_SECRET_ACCESS_KEY=your_secret_key
207205···221219# - Minio: http://minio:9000
222220# Leave empty for AWS S3
223221# S3_ENDPOINT=https://gateway.storjshare.io
224224-225225-# ==============================================================================
226226-# STORAGE - FILESYSTEM CONFIGURATION
227227-# ==============================================================================
228228-229229-# Root directory for filesystem storage (when STORAGE_DRIVER=filesystem)
230230-# Default: /var/lib/atcr/hold
231231-# STORAGE_ROOT_DIR=/var/lib/atcr/hold
232222233223# ==============================================================================
234224# LOGGING (Shared by AppView and Hold)
+6-16
.env.hold.example
···1717HOLD_PUBLIC_URL=http://127.0.0.1:8080
18181919# ==============================================================================
2020-# Storage Configuration
2020+# S3 Storage Configuration (REQUIRED)
2121# ==============================================================================
22222323-# Storage driver type (s3, filesystem)
2424-# Default: s3
2525-#
2626-# S3 Presigned URLs:
2727-# When using S3 storage, presigned URLs are automatically enabled for direct
2828-# client ↔ S3 transfers. This eliminates the hold service as a bandwidth
2929-# bottleneck, reducing hold bandwidth by ~99% for push/pull operations.
3030-# Falls back to proxy mode automatically for non-S3 drivers.
3131-STORAGE_DRIVER=filesystem
2323+# S3 is the only supported storage backend. Presigned URLs are used for direct
2424+# client ↔ S3 transfers, eliminating the hold service as a bandwidth bottleneck
2525+# and reducing hold bandwidth by ~99% for push/pull operations.
32263333-# S3 Access Credentials
2727+# S3 Access Credentials (REQUIRED)
3428AWS_ACCESS_KEY_ID=your_access_key
3529AWS_SECRET_ACCESS_KEY=your_secret_key
3630···4034# Default: us-east-1
4135AWS_REGION=us-east-1
42364343-# S3 Bucket Name
3737+# S3 Bucket Name (REQUIRED)
4438S3_BUCKET=atcr-blobs
45394640# S3 Endpoint (for S3-compatible services like Storj, Minio, UpCloud)
···5044# - Minio: http://minio:9000
5145# Leave empty for AWS S3
5246# S3_ENDPOINT=https://gateway.storjshare.io
5353-5454-# For filesystem driver:
5555-# STORAGE_DRIVER=filesystem
5656-# STORAGE_ROOT_DIR=/var/lib/atcr/hold
57475848# ==============================================================================
5949# Server Configuration
+26-22
CLAUDE.md
···5757# ./bin/atcr-appview serve config/config.yml
58585959# Run hold service (configure via env vars - see .env.hold.example)
6060+# For local development, use Minio as S3-compatible storage:
6161+# docker run -p 9000:9000 minio/minio server /data
6062export HOLD_PUBLIC_URL=http://127.0.0.1:8080
6161-export STORAGE_DRIVER=filesystem
6262-export STORAGE_ROOT_DIR=/tmp/atcr-hold
6363+export AWS_ACCESS_KEY_ID=minioadmin
6464+export AWS_SECRET_ACCESS_KEY=minioadmin
6565+export S3_BUCKET=test
6666+export S3_ENDPOINT=http://localhost:9000
6367export HOLD_OWNER=did:plc:your-did-here
6468./bin/atcr-hold
6569# Hold starts immediately with embedded PDS
···92962. **Hold Service** (`cmd/hold`) - Optional BYOS component
9397 - Lightweight HTTP server for presigned URLs
9498 - Embedded PDS with captain + crew records
9595- - Supports S3, Storj, Minio, filesystem, etc.
9999+ - Supports S3-compatible storage (AWS S3, Storj, Minio, UpCloud, etc.)
96100 - Authorization based on captain record (public, allowAllCrew)
97101 - Self-describing via DID resolution
98102 - Configured entirely via environment variables
···1221265. Blob PUT → ProxyBlobStore calls hold's XRPC multipart upload endpoints:
123127 a. POST /xrpc/io.atcr.hold.initiateUpload (gets uploadID)
124128 b. POST /xrpc/io.atcr.hold.getPartUploadUrl (gets presigned URL for each part)
125125- c. PUT to S3 presigned URL (or PUT /xrpc/io.atcr.hold.uploadPart for buffered mode)
129129+ c. PUT to S3 presigned URL (client uploads directly to S3)
126130 d. POST /xrpc/io.atcr.hold.completeUpload (finalizes upload)
1271316. Manifest PUT → alice's PDS as io.atcr.manifest record (includes holdDid + holdEndpoint)
128132 → Manifest also uploaded to PDS blob storage (ATProto CID format)
···419423- Resolves hold DID → HTTP URL for XRPC requests (did:web resolution)
420424- Gets service tokens from user's PDS (`com.atproto.server.getServiceAuth`)
421425- Calls hold XRPC endpoints with service token authentication:
422422- - Multipart upload: initiateUpload, getPartUploadUrl, uploadPart, completeUpload, abortUpload
426426+ - Multipart upload: initiateUpload, getPartUploadUrl, completeUpload, abortUpload
423427 - Blob read: com.atproto.sync.getBlob (returns presigned download URL)
424428- Implements full `distribution.BlobStore` interface
425425-- Supports both presigned URL mode (S3 direct) and buffered mode (proxy via hold)
429429+- Uses presigned URLs for direct client-to-S3 transfers
426430427431#### AppView Web UI (`pkg/appview/`)
428432···468472**Architecture:**
469473- **Embedded PDS**: Each hold has a full ATProto PDS for storing captain + crew records
470474- **DID**: Hold identified by did:web (e.g., `did:web:hold01.atcr.io`)
471471-- **Storage**: Reuses distribution's storage driver factory (S3, Storj, Minio, Azure, GCS, filesystem)
475475+- **Storage**: Requires S3-compatible storage (AWS S3, Storj, Minio, UpCloud, etc.)
472476- **Authorization**: Based on captain + crew records in embedded PDS
473477- **Blob operations**: Generates presigned URLs (15min expiry) or proxies uploads/downloads via XRPC
474478···546550All require blob:write permission via service token authentication:
547551- `POST /xrpc/io.atcr.hold.initiateUpload` - Start multipart upload session
548552- `POST /xrpc/io.atcr.hold.getPartUploadUrl` - Get presigned URL for uploading a part
549549-- `PUT /xrpc/io.atcr.hold.uploadPart` - Direct buffered part upload (alternative to presigned URLs)
550553- `POST /xrpc/io.atcr.hold.completeUpload` - Finalize multipart upload and move to final location
551554- `POST /xrpc/io.atcr.hold.abortUpload` - Cancel multipart upload and cleanup temp data
552555···558561559562**Configuration:** Environment variables (see `.env.hold.example`)
560563- `HOLD_PUBLIC_URL` - Public URL of hold service (required, used for did:web generation)
561561-- `STORAGE_DRIVER` - Storage driver type (s3, filesystem)
562562-- `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` - S3 credentials
563563-- `S3_BUCKET`, `S3_ENDPOINT` - S3 configuration
564564+- `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` - S3 credentials (required)
565565+- `S3_BUCKET` - S3 bucket name (required)
566566+- `S3_ENDPOINT` - S3 endpoint URL (for non-AWS providers like Storj, Minio, UpCloud)
564567- `HOLD_PUBLIC` - Allow public reads (default: false)
565568- `HOLD_OWNER` - DID for captain record creation (optional)
566569- `HOLD_ALLOW_ALL_CREW` - Allow any authenticated user to register as crew (default: false)
567567-- `HOLD_DATABASE_PATH` - Path for embedded PDS database (required)
568568-- `HOLD_DATABASE_KEY_PATH` - Path for PDS signing keys (optional, generated if missing)
570570+- `HOLD_DATABASE_DIR` - Directory for embedded PDS database (required)
571571+- `HOLD_KEY_PATH` - Path for PDS signing keys (optional, generated if missing)
569572570573**Deployment:** Can run on Fly.io, Railway, Docker, Kubernetes, etc.
571574···681684682685See `.env.hold.example` for all available options. Key environment variables:
683686- `HOLD_PUBLIC_URL` - Public URL of hold service (REQUIRED)
684684-- `STORAGE_DRIVER` - Storage backend (s3, filesystem)
685685-- `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` - S3 credentials
686686-- `S3_BUCKET`, `S3_ENDPOINT` - S3 configuration
687687+- `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` - S3 credentials (REQUIRED)
688688+- `S3_BUCKET` - S3 bucket name (REQUIRED)
689689+- `S3_ENDPOINT` - S3 endpoint URL (for non-AWS providers)
687690- `HOLD_PUBLIC` - Allow public reads (default: false)
688691- `HOLD_OWNER` - DID for captain record creation (optional)
689692- `HOLD_ALLOW_ALL_CREW` - Allow any authenticated user to register as crew (default: false)
···7497525. AppView automatically queries hold's PDS and routes blobs to user's storage
7507536. No AppView changes needed - fully decentralized
751754752752-**Supporting a new storage backend**:
753753-1. Ensure driver is registered in `cmd/hold/main.go` imports
754754-2. Distribution supports: S3, Azure, GCS, Swift, filesystem, OSS
755755-3. For custom drivers: implement `storagedriver.StorageDriver` interface
756756-4. Add case to `buildStorageConfig()` in `cmd/hold/main.go`
757757-5. Update `.env.example` with new driver's env vars
755755+**Using S3-compatible storage**:
756756+ATCR requires S3-compatible storage. Supported providers:
757757+- AWS S3 - Set `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `S3_BUCKET`
758758+- Storj - Set `S3_ENDPOINT=https://gateway.storjshare.io`
759759+- Minio - Set `S3_ENDPOINT=http://localhost:9000`
760760+- UpCloud - Set `S3_ENDPOINT=https://[bucket-id].upcloudobjects.com`
761761+- Azure/GCS - Use their S3-compatible API endpoints
758762759763**Working with the database**:
760764- **Base schema** defined in `pkg/appview/db/schema.sql` - source of truth for fresh installations
···55//
66// go run ./cmd/usage-report --hold https://hold01.atcr.io
77// go run ./cmd/usage-report --hold https://hold01.atcr.io --from-manifests
88+// go run ./cmd/usage-report --hold https://hold01.atcr.io --list-blobs
89package main
9101011import (
···83848485var client = &http.Client{Timeout: 30 * time.Second}
85868787+// BlobInfo represents a single blob with its metadata
8888+type BlobInfo struct {
8989+ Digest string
9090+ Size int64
9191+ MediaType string
9292+ UserDID string
9393+ Handle string
9494+}
9595+8696func main() {
8797 holdURL := flag.String("hold", "https://hold01.atcr.io", "Hold service URL")
8898 fromManifests := flag.Bool("from-manifests", false, "Calculate usage from user manifests instead of hold layer records (more accurate but slower)")
9999+ listBlobs := flag.Bool("list-blobs", false, "List all individual blobs sorted by size (largest first)")
89100 flag.Parse()
9010191102 // Normalize URL
···100111 os.Exit(1)
101112 }
102113 fmt.Printf("Hold DID: %s\n\n", holdDID)
114114+115115+ // If --list-blobs flag is set, run blob listing mode
116116+ if *listBlobs {
117117+ listAllBlobs(baseURL, holdDID)
118118+ return
119119+ }
103120104121 var userUsage map[string]*UserUsage
105122···189206 }
190207 sort.Strings(repos)
191208 fmt.Printf("%s,%s,%d,%d,%s,\"%s\"\n", u.Handle, u.DID, u.LayerCount, u.TotalSize, humanSize(u.TotalSize), strings.Join(repos, ";"))
209209+ }
210210+}
211211+212212+// listAllBlobs fetches all blobs and lists them sorted by size (largest first)
213213+func listAllBlobs(baseURL, holdDID string) {
214214+ fmt.Println("=== Fetching all blob records ===")
215215+216216+ layers, err := fetchAllLayerRecords(baseURL, holdDID)
217217+ if err != nil {
218218+ fmt.Fprintf(os.Stderr, "Failed to fetch layer records: %v\n", err)
219219+ os.Exit(1)
220220+ }
221221+222222+ fmt.Printf("Fetched %d layer records\n", len(layers))
223223+224224+ // Deduplicate by digest, keeping track of first seen user
225225+ blobMap := make(map[string]*BlobInfo)
226226+ for _, layer := range layers {
227227+ if existing, exists := blobMap[layer.Digest]; exists {
228228+ // If we have a record with a user DID and existing doesn't, prefer this one
229229+ if existing.UserDID == "" && layer.UserDID != "" {
230230+ existing.UserDID = layer.UserDID
231231+ }
232232+ continue
233233+ }
234234+ blobMap[layer.Digest] = &BlobInfo{
235235+ Digest: layer.Digest,
236236+ Size: layer.Size,
237237+ MediaType: layer.MediaType,
238238+ UserDID: layer.UserDID,
239239+ }
240240+ }
241241+242242+ // Convert to slice
243243+ var blobs []*BlobInfo
244244+ for _, b := range blobMap {
245245+ blobs = append(blobs, b)
246246+ }
247247+248248+ // Sort by size (largest first)
249249+ sort.Slice(blobs, func(i, j int) bool {
250250+ return blobs[i].Size > blobs[j].Size
251251+ })
252252+253253+ fmt.Printf("Found %d unique blobs\n\n", len(blobs))
254254+255255+ // Resolve DIDs to handles (batch for efficiency)
256256+ fmt.Println("Resolving DIDs to handles...")
257257+ didToHandle := make(map[string]string)
258258+ for _, b := range blobs {
259259+ if b.UserDID == "" {
260260+ continue
261261+ }
262262+ if _, exists := didToHandle[b.UserDID]; !exists {
263263+ handle, err := resolveDIDToHandle(b.UserDID)
264264+ if err != nil {
265265+ didToHandle[b.UserDID] = b.UserDID
266266+ } else {
267267+ didToHandle[b.UserDID] = handle
268268+ }
269269+ }
270270+ b.Handle = didToHandle[b.UserDID]
271271+ }
272272+273273+ // Calculate total
274274+ var totalSize int64
275275+ for _, b := range blobs {
276276+ totalSize += b.Size
277277+ }
278278+279279+ // Print report
280280+ fmt.Println("\n========================================")
281281+ fmt.Println("BLOB SIZE REPORT (sorted largest to smallest)")
282282+ fmt.Println("========================================")
283283+ fmt.Printf("\nTotal Unique Blobs: %d\n", len(blobs))
284284+ fmt.Printf("Total Storage: %s\n\n", humanSize(totalSize))
285285+286286+ fmt.Println("BLOBS:")
287287+ fmt.Println("----------------------------------------")
288288+ for i, b := range blobs {
289289+ pct := float64(0)
290290+ if totalSize > 0 {
291291+ pct = float64(b.Size) / float64(totalSize) * 100
292292+ }
293293+ owner := b.Handle
294294+ if owner == "" {
295295+ owner = "(unknown)"
296296+ }
297297+ fmt.Printf("%4d. %s\n", i+1, humanSize(b.Size))
298298+ fmt.Printf(" Digest: %s\n", b.Digest)
299299+ fmt.Printf(" Owner: %s\n", owner)
300300+ if b.MediaType != "" {
301301+ fmt.Printf(" Type: %s\n", b.MediaType)
302302+ }
303303+ fmt.Printf(" Share: %.2f%%\n\n", pct)
304304+ }
305305+306306+ // Output CSV format
307307+ fmt.Println("\n========================================")
308308+ fmt.Println("CSV FORMAT")
309309+ fmt.Println("========================================")
310310+ fmt.Println("rank,size_bytes,size_human,digest,owner,media_type,share_pct")
311311+ for i, b := range blobs {
312312+ pct := float64(0)
313313+ if totalSize > 0 {
314314+ pct = float64(b.Size) / float64(totalSize) * 100
315315+ }
316316+ owner := b.Handle
317317+ if owner == "" {
318318+ owner = ""
319319+ }
320320+ fmt.Printf("%d,%d,%s,%s,%s,%s,%.2f\n", i+1, b.Size, humanSize(b.Size), b.Digest, owner, b.MediaType, pct)
192321 }
193322}
194323
+3-17
deploy/.env.prod.template
···101101HOLD_BLUESKY_POSTS_ENABLED=true
102102103103# ==============================================================================
104104-# S3/UpCloud Object Storage Configuration
104104+# S3/UpCloud Object Storage Configuration (REQUIRED)
105105# ==============================================================================
106106107107-# Storage driver type
108108-# Options: s3, filesystem
109109-# Default: s3
110110-STORAGE_DRIVER=s3
107107+# S3 is the only supported storage backend. Presigned URLs are used for direct
108108+# client ↔ S3 transfers, eliminating the hold service as a bandwidth bottleneck.
111109112110# S3 Access Credentials
113111# Get these from UpCloud Object Storage console
···185183# ATProto relay endpoint for backfill sync API
186184# Default: https://relay1.us-east.bsky.network
187185ATCR_RELAY_ENDPOINT=https://relay1.us-east.bsky.network
188188-189189-# ==============================================================================
190190-# Optional: Filesystem Storage (alternative to S3)
191191-# ==============================================================================
192192-193193-# If using filesystem storage instead of S3:
194194-# 1. Uncomment these lines
195195-# 2. Comment out all S3 variables above
196196-# 3. Set STORAGE_DRIVER=filesystem
197197-198198-# STORAGE_DRIVER=filesystem
199199-# STORAGE_ROOT_DIR=/var/lib/atcr/hold
200186201187# ==============================================================================
202188# CHECKLIST
+3-3
deploy/README.md
···418418docker logs atcr-hold | grep -i presigned
419419```
420420421421-**Check S3 driver:**
421421+**Check S3 configuration:**
422422```bash
423423-docker exec atcr-hold env | grep STORAGE_DRIVER
424424-# Should be: s3 (not filesystem)
423423+docker exec atcr-hold env | grep S3_BUCKET
424424+# Should show your S3 bucket name
425425```
426426427427**Verify direct S3 access:**
···244244245245### Development/Testing
246246247247-Local Docker Compose setup:
247247+Local Docker Compose setup with Minio for S3-compatible storage:
248248249249```bash
250250+# Start Minio (S3-compatible storage)
251251+docker run -p 9000:9000 -p 9001:9001 minio/minio server /data --console-address ":9001"
252252+250253# AppView config
251254ATCR_HTTP_ADDR=:5000
252255ATCR_DEFAULT_HOLD_DID=did:web:atcr-hold:8080
253256ATCR_LOG_LEVEL=debug
254257255258# Hold config (linked hold service)
256256-STORAGE_DRIVER=filesystem
257257-STORAGE_ROOT_DIR=/tmp/atcr-hold
259259+AWS_ACCESS_KEY_ID=minioadmin
260260+AWS_SECRET_ACCESS_KEY=minioadmin
261261+S3_BUCKET=test
262262+S3_ENDPOINT=http://minio:9000
258263HOLD_PUBLIC=true
259264HOLD_ALLOW_ALL_CREW=true
260265```
+19-40
docs/hold.md
···4455## Overview
6677-**Hold Service** is the storage backend component of ATCR. It enables BYOS (Bring Your Own Storage) - users can store their own container image layers in their own S3, Storj, Minio, or filesystem storage. Each hold runs as a full ATProto user with an embedded PDS, exposing both standard ATProto sync endpoints and custom XRPC endpoints for OCI multipart blob uploads.
77+**Hold Service** is the storage backend component of ATCR. It enables BYOS (Bring Your Own Storage) - users can store their own container image layers in their own S3-compatible storage (AWS S3, Storj, Minio, UpCloud, etc.). Each hold runs as a full ATProto user with an embedded PDS, exposing both standard ATProto sync endpoints and custom XRPC endpoints for OCI multipart blob uploads.
8899### What Hold Service Does
10101111Hold Service is the storage layer that:
12121313-- **Bring Your Own Storage (BYOS)** - Store your own container image layers in your own S3, Storj, Minio, or filesystem
1313+- **Bring Your Own Storage (BYOS)** - Store your own container image layers in your own S3-compatible storage (AWS S3, Storj, Minio, UpCloud, etc.)
1414- **Embedded ATProto PDS** - Each hold is a full ATProto user with its own DID, repository, and identity
1515- **Custom XRPC Endpoints** - OCI-compatible multipart upload endpoints (`io.atcr.hold.*`) for blob operations
1616- **Presigned URL Generation** - Creates time-limited S3 URLs for direct client-to-storage transfers (~99% bandwidth reduction)
1717- **Crew Management** - Controls access via captain and crew records stored in the hold's embedded PDS
1818- **Standard ATProto Sync** - Exposes com.atproto.sync.* endpoints for repository synchronization and firehose
1919-- **Multi-Backend Support** - Works with S3, Storj, Minio, filesystem, Azure, GCS via distribution's driver system
1919+- **S3 Storage** - Works with any S3-compatible storage (AWS S3, Storj, Minio, UpCloud, Azure, GCS via S3 gateway)
2020- **Bluesky Integration** - Optional: Posts container image push notifications from the hold's identity to Bluesky
21212222### The ATCR Ecosystem
···5050- Maintain data sovereignty (keep blobs in specific geographic regions)
51515252**Prerequisites:**
5353-- S3-compatible storage (AWS S3, Storj, Minio, UpCloud, etc.) OR filesystem storage
5353+- S3-compatible storage (AWS S3, Storj, Minio, UpCloud, etc.)
5454- (Optional) Domain name with SSL/TLS certificates for production
5555- ATProto DID for hold owner (get from: `https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=yourhandle.bsky.social`)
5656···8787# Required: Your ATProto DID (for captain record)
8888HOLD_OWNER=did:plc:your-did-here
89899090-# Required: Storage driver type
9191-STORAGE_DRIVER=s3
9292-9393-# Required for S3: Credentials and bucket
9090+# Required: S3 credentials and bucket
9491AWS_ACCESS_KEY_ID=your-access-key
9592AWS_SECRET_ACCESS_KEY=your-secret-key
9693S3_BUCKET=your-bucket-name
···132129 - `true`: Public registry (anyone can pull, authenticated users can push if crew)
133130 - `false`: Private registry (authentication required for both push and pull)
134131135135-### Storage Configuration
136136-137137-#### `STORAGE_DRIVER`
138138-- **Default:** `s3`
139139-- **Options:** `s3`, `filesystem`
140140-- **Description:** Storage backend type. S3 enables presigned URLs for direct client-to-storage transfers (~99% bandwidth reduction). Filesystem stores blobs locally (development/testing).
132132+### S3 Storage Configuration
141133142142-#### S3 Storage (when `STORAGE_DRIVER=s3`)
134134+S3 is the only supported storage backend. Presigned URLs enable direct client-to-storage transfers (~99% bandwidth reduction).
143135144136##### `AWS_ACCESS_KEY_ID` ⚠️ REQUIRED for S3
145137- **Description:** S3 access key ID for authentication
···167159- **UpCloud:** `https://[bucket-id].upcloudobjects.com`
168160- **Minio:** `http://minio:9000`
169161- **Note:** Leave empty for AWS S3
170170-171171-#### Filesystem Storage (when `STORAGE_DRIVER=filesystem`)
172172-173173-##### `STORAGE_ROOT_DIR`
174174-- **Default:** `/var/lib/atcr/hold`
175175-- **Description:** Directory path where blobs will be stored on local filesystem
176176-- **Use case:** Development, testing, or single-server deployments
177177-- **Note:** Presigned URLs are not available with filesystem driver (hold proxies all blob transfers)
178162179163### Embedded PDS Configuration
180164···227211- **Default:** `false`
228212- **Description:** Enable test mode (skips some validations). Do not use in production.
229213230230-#### `DISABLE_PRESIGNED_URLS`
231231-- **Default:** `false`
232232-- **Description:** Force proxy mode even with S3 configured (for testing). Disables presigned URL generation and routes all blob transfers through the hold service.
233233-- **Use case:** Testing, debugging, or environments where presigned URLs don't work
234234-235214## XRPC Endpoints
236215237216Hold Service exposes two types of XRPC endpoints:
···250229### OCI Multipart Upload Endpoints (Custom)
251230- `POST /xrpc/io.atcr.hold.initiateUpload` - Start multipart upload session
252231- `POST /xrpc/io.atcr.hold.getPartUploadUrl` - Get presigned URL for uploading a part
253253-- `PUT /xrpc/io.atcr.hold.uploadPart` - Direct buffered part upload (alternative to presigned URLs)
254232- `POST /xrpc/io.atcr.hold.completeUpload` - Finalize multipart upload
255233- `POST /xrpc/io.atcr.hold.abortUpload` - Cancel multipart upload
256234- `POST /xrpc/io.atcr.hold.notifyManifest` - Notify hold of manifest upload (creates layer records, Bluesky posts)
···301279HOLD_ALLOW_ALL_CREW=false # Only you can push
302280HOLD_DATABASE_DIR=/var/lib/atcr-hold
303281304304-# S3 storage
305305-STORAGE_DRIVER=s3
282282+# S3 storage (using Storj)
306283AWS_ACCESS_KEY_ID=your-key
307284AWS_SECRET_ACCESS_KEY=your-secret
308285S3_BUCKET=alice-container-registry
309309-S3_ENDPOINT=https://gateway.storjshare.io # Using Storj
286286+S3_ENDPOINT=https://gateway.storjshare.io
310287```
311288312289### Shared Hold (Team/Organization)
···322299HOLD_DATABASE_DIR=/var/lib/atcr-hold
323300324301# S3 storage
325325-STORAGE_DRIVER=s3
326302AWS_ACCESS_KEY_ID=your-key
327303AWS_SECRET_ACCESS_KEY=your-secret
328304S3_BUCKET=acme-registry-blobs
···343319HOLD_DATABASE_DIR=/var/lib/atcr-hold
344320345321# S3 storage
346346-STORAGE_DRIVER=s3
347322AWS_ACCESS_KEY_ID=your-key
348323AWS_SECRET_ACCESS_KEY=your-secret
349324S3_BUCKET=community-registry-blobs
350325```
351326352352-### Development/Testing
327327+### Development/Testing with Minio
353328354354-Local filesystem storage for testing:
329329+For local development, use Minio as an S3-compatible storage:
355330356331```bash
332332+# Start Minio
333333+docker run -p 9000:9000 -p 9001:9001 minio/minio server /data --console-address ":9001"
334334+357335# Hold config
358336HOLD_PUBLIC_URL=http://127.0.0.1:8080
359337HOLD_OWNER=did:plc:your-test-did
···361339HOLD_ALLOW_ALL_CREW=true
362340HOLD_DATABASE_DIR=/tmp/atcr-hold
363341364364-# Filesystem storage
365365-STORAGE_DRIVER=filesystem
366366-STORAGE_ROOT_DIR=/tmp/atcr-hold-blobs
342342+# Minio S3 storage
343343+AWS_ACCESS_KEY_ID=minioadmin
344344+AWS_SECRET_ACCESS_KEY=minioadmin
345345+S3_BUCKET=test
346346+S3_ENDPOINT=http://localhost:9000
367347```
368348369349## Production Deployment
···383363384364- [ ] Set `HOLD_PUBLIC_URL` to your public HTTPS URL
385365- [ ] Set `HOLD_OWNER` to your ATProto DID
386386-- [ ] Configure S3 storage (`STORAGE_DRIVER=s3`)
387366- [ ] Set `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `S3_BUCKET`, `S3_ENDPOINT`
388367- [ ] Set `HOLD_DATABASE_DIR` to persistent directory
389368- [ ] Configure `HOLD_PUBLIC` and `HOLD_ALLOW_ALL_CREW` for desired access model
+4-17
pkg/appview/storage/proxy_blob_store.go
···463463 return result.UploadID, nil
464464}
465465466466-// PartUploadInfo contains structured information for uploading a part
466466+// PartUploadInfo contains the presigned URL for uploading a part
467467type PartUploadInfo struct {
468468- URL string `json:"url"`
469469- Method string `json:"method,omitempty"`
470470- Headers map[string]string `json:"headers,omitempty"`
468468+ URL string `json:"url"` // Presigned URL to PUT the part to
471469}
472470473471// getPartUploadInfo gets structured upload info for uploading a specific part via XRPC
···653651 return fmt.Errorf("failed to get part upload info: %w", err)
654652 }
655653656656- // Determine HTTP method (default to PUT)
657657- method := uploadInfo.Method
658658- if method == "" {
659659- method = "PUT"
660660- }
661661-662662- // Upload part (either to S3 presigned URL or back to XRPC with headers)
663663- req, err := http.NewRequestWithContext(ctx, method, uploadInfo.URL, bytes.NewReader(w.buffer.Bytes()))
654654+ // Upload part to S3 presigned URL
655655+ req, err := http.NewRequestWithContext(ctx, "PUT", uploadInfo.URL, bytes.NewReader(w.buffer.Bytes()))
664656 if err != nil {
665657 return err
666658 }
667659 req.Header.Set("Content-Type", "application/octet-stream")
668668-669669- // Apply any additional headers from the response (for buffered mode)
670670- for key, value := range uploadInfo.Headers {
671671- req.Header.Set(key, value)
672672- }
673660674661 resp, err := w.store.httpClient.Do(req)
675662 if err != nil {
+1376
pkg/appview/storage/proxy_blob_store_test.go
···11package storage
2233import (
44+ "bytes"
45 "context"
56 "encoding/base64"
67 "encoding/json"
78 "fmt"
99+ "io"
810 "net/http"
911 "net/http/httptest"
1212+ "strconv"
1013 "strings"
1414+ "sync"
1115 "testing"
1216 "time"
13171418 "atcr.io/pkg/atproto"
1519 "atcr.io/pkg/auth"
2020+ "github.com/distribution/distribution/v3"
1621 "github.com/opencontainers/go-digest"
1722)
1823···525530 // Verify S3 received NO Authorization header
526531 if s3ReceivedAuthHeader != "" {
527532 t.Errorf("S3 should not receive Authorization header for presigned URLs, got: %s", s3ReceivedAuthHeader)
533533+ }
534534+}
535535+536536+// ============================================================================
537537+// ProxyBlobWriter Tests
538538+// ============================================================================
539539+540540+// mockHoldServer is a test helper that mocks the hold service XRPC endpoints
541541+type mockHoldServer struct {
542542+ *httptest.Server
543543+544544+ // Track calls
545545+ mu sync.Mutex
546546+ InitiateCalls []mockInitiateCall
547547+ PartURLCalls []mockPartURLCall
548548+ CompleteCalls []mockCompleteCall
549549+ AbortCalls []mockAbortCall
550550+551551+ // Error injection
552552+ InitiateError error
553553+ PartURLError error
554554+ CompleteError error
555555+ AbortError error
556556+557557+ // Response customization
558558+ UploadID string
559559+}
560560+561561+type mockInitiateCall struct {
562562+ Digest string
563563+}
564564+565565+type mockPartURLCall struct {
566566+ UploadID string
567567+ PartNumber int
568568+}
569569+570570+type mockCompleteCall struct {
571571+ UploadID string
572572+ Digest string
573573+ Parts []map[string]any
574574+}
575575+576576+type mockAbortCall struct {
577577+ UploadID string
578578+}
579579+580580+// mockS3Server mocks S3 presigned URL uploads
581581+type mockS3Server struct {
582582+ *httptest.Server
583583+584584+ // Track uploads
585585+ mu sync.Mutex
586586+ Parts map[int][]byte
587587+588588+ // Error injection
589589+ UploadError error
590590+591591+ // Response customization
592592+ ETagInHeader bool // true = ETag in header, false = in JSON body
593593+}
594594+595595+// newMockHoldServer creates a mock hold service
596596+func newMockHoldServer(t *testing.T, s3URL string) *mockHoldServer {
597597+ m := &mockHoldServer{
598598+ UploadID: "test-upload-id-" + fmt.Sprintf("%d", time.Now().UnixNano()),
599599+ }
600600+601601+ m.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
602602+ m.mu.Lock()
603603+ defer m.mu.Unlock()
604604+605605+ w.Header().Set("Content-Type", "application/json")
606606+607607+ switch {
608608+ case strings.Contains(r.URL.Path, atproto.HoldInitiateUpload):
609609+ if m.InitiateError != nil {
610610+ w.WriteHeader(http.StatusInternalServerError)
611611+ fmt.Fprintf(w, `{"error":"%s"}`, m.InitiateError.Error())
612612+ return
613613+ }
614614+615615+ var body map[string]any
616616+ json.NewDecoder(r.Body).Decode(&body)
617617+ m.InitiateCalls = append(m.InitiateCalls, mockInitiateCall{
618618+ Digest: body["digest"].(string),
619619+ })
620620+621621+ w.WriteHeader(http.StatusOK)
622622+ json.NewEncoder(w).Encode(map[string]string{
623623+ "uploadId": m.UploadID,
624624+ })
625625+626626+ case strings.Contains(r.URL.Path, atproto.HoldGetPartUploadURL):
627627+ if m.PartURLError != nil {
628628+ w.WriteHeader(http.StatusInternalServerError)
629629+ fmt.Fprintf(w, `{"error":"%s"}`, m.PartURLError.Error())
630630+ return
631631+ }
632632+633633+ var body map[string]any
634634+ json.NewDecoder(r.Body).Decode(&body)
635635+ m.PartURLCalls = append(m.PartURLCalls, mockPartURLCall{
636636+ UploadID: body["uploadId"].(string),
637637+ PartNumber: int(body["partNumber"].(float64)),
638638+ })
639639+640640+ // Return presigned URL pointing to mock S3
641641+ partNum := int(body["partNumber"].(float64))
642642+ w.WriteHeader(http.StatusOK)
643643+ json.NewEncoder(w).Encode(map[string]string{
644644+ "url": fmt.Sprintf("%s/upload?partNumber=%d", s3URL, partNum),
645645+ })
646646+647647+ case strings.Contains(r.URL.Path, atproto.HoldCompleteUpload):
648648+ if m.CompleteError != nil {
649649+ w.WriteHeader(http.StatusInternalServerError)
650650+ fmt.Fprintf(w, `{"error":"%s"}`, m.CompleteError.Error())
651651+ return
652652+ }
653653+654654+ var body map[string]any
655655+ json.NewDecoder(r.Body).Decode(&body)
656656+ parts, _ := body["parts"].([]any)
657657+ partsArr := make([]map[string]any, len(parts))
658658+ for i, p := range parts {
659659+ partsArr[i] = p.(map[string]any)
660660+ }
661661+ m.CompleteCalls = append(m.CompleteCalls, mockCompleteCall{
662662+ UploadID: body["uploadId"].(string),
663663+ Digest: body["digest"].(string),
664664+ Parts: partsArr,
665665+ })
666666+667667+ w.WriteHeader(http.StatusOK)
668668+ json.NewEncoder(w).Encode(map[string]any{})
669669+670670+ case strings.Contains(r.URL.Path, atproto.HoldAbortUpload):
671671+ if m.AbortError != nil {
672672+ w.WriteHeader(http.StatusInternalServerError)
673673+ fmt.Fprintf(w, `{"error":"%s"}`, m.AbortError.Error())
674674+ return
675675+ }
676676+677677+ var body map[string]any
678678+ json.NewDecoder(r.Body).Decode(&body)
679679+ m.AbortCalls = append(m.AbortCalls, mockAbortCall{
680680+ UploadID: body["uploadId"].(string),
681681+ })
682682+683683+ w.WriteHeader(http.StatusOK)
684684+ json.NewEncoder(w).Encode(map[string]any{})
685685+686686+ default:
687687+ t.Errorf("Unexpected hold endpoint: %s", r.URL.Path)
688688+ w.WriteHeader(http.StatusNotFound)
689689+ }
690690+ }))
691691+692692+ return m
693693+}
694694+695695+// newMockS3Server creates a mock S3 server for presigned URL uploads
696696+func newMockS3Server(t *testing.T, etagInHeader bool) *mockS3Server {
697697+ m := &mockS3Server{
698698+ Parts: make(map[int][]byte),
699699+ ETagInHeader: etagInHeader,
700700+ }
701701+702702+ m.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
703703+ m.mu.Lock()
704704+ defer m.mu.Unlock()
705705+706706+ if m.UploadError != nil {
707707+ w.WriteHeader(http.StatusInternalServerError)
708708+ w.Write([]byte(m.UploadError.Error()))
709709+ return
710710+ }
711711+712712+ // Parse part number from URL
713713+ partNum, _ := strconv.Atoi(r.URL.Query().Get("partNumber"))
714714+715715+ // Read body
716716+ body, _ := io.ReadAll(r.Body)
717717+ m.Parts[partNum] = body
718718+719719+ // Generate ETag
720720+ etag := fmt.Sprintf(`"etag-part-%d"`, partNum)
721721+722722+ if m.ETagInHeader {
723723+ w.Header().Set("ETag", etag)
724724+ w.WriteHeader(http.StatusOK)
725725+ } else {
726726+ w.Header().Set("Content-Type", "application/json")
727727+ w.WriteHeader(http.StatusOK)
728728+ json.NewEncoder(w).Encode(map[string]string{
729729+ "etag": etag,
730730+ })
731731+ }
732732+ }))
733733+734734+ return m
735735+}
736736+737737+// createTestProxyBlobStore creates a ProxyBlobStore for testing with mock servers
738738+func createTestProxyBlobStore(t *testing.T, holdURL string) *ProxyBlobStore {
739739+ ctx := &RegistryContext{
740740+ DID: "did:plc:testuser",
741741+ HoldDID: "did:web:hold.example.com",
742742+ PDSEndpoint: "https://pds.example.com",
743743+ Repository: "test-repo",
744744+ ServiceToken: "test-service-token",
745745+ }
746746+ store := NewProxyBlobStore(ctx)
747747+ store.holdURL = holdURL
748748+ return store
749749+}
750750+751751+// generateTestData creates n bytes of predictable test data
752752+func generateTestData(n int) []byte {
753753+ data := make([]byte, n)
754754+ for i := 0; i < n; i++ {
755755+ data[i] = byte(i % 256)
756756+ }
757757+ return data
758758+}
759759+760760+// TestCreate_Success tests that Create() successfully initiates multipart upload
761761+func TestCreate_Success(t *testing.T) {
762762+ s3Server := newMockS3Server(t, true)
763763+ defer s3Server.Close()
764764+765765+ holdServer := newMockHoldServer(t, s3Server.URL)
766766+ defer holdServer.Close()
767767+768768+ store := createTestProxyBlobStore(t, holdServer.URL)
769769+770770+ writer, err := store.Create(context.Background())
771771+ if err != nil {
772772+ t.Fatalf("Create() failed: %v", err)
773773+ }
774774+775775+ // Verify writer is returned
776776+ if writer == nil {
777777+ t.Fatal("Expected non-nil writer")
778778+ }
779779+780780+ // Verify initiate was called
781781+ holdServer.mu.Lock()
782782+ if len(holdServer.InitiateCalls) != 1 {
783783+ t.Errorf("Expected 1 initiate call, got %d", len(holdServer.InitiateCalls))
784784+ }
785785+ holdServer.mu.Unlock()
786786+787787+ // Verify writer ID
788788+ if writer.ID() == "" {
789789+ t.Error("Expected non-empty writer ID")
790790+ }
791791+792792+ // Verify writer is stored in global uploads
793793+ globalUploadsMu.RLock()
794794+ _, exists := globalUploads[writer.ID()]
795795+ globalUploadsMu.RUnlock()
796796+ if !exists {
797797+ t.Error("Writer should be stored in globalUploads")
798798+ }
799799+800800+ // Cleanup
801801+ writer.Cancel(context.Background())
802802+}
803803+804804+// TestCreate_HoldError tests that Create() returns error when hold service fails
805805+func TestCreate_HoldError(t *testing.T) {
806806+ s3Server := newMockS3Server(t, true)
807807+ defer s3Server.Close()
808808+809809+ holdServer := newMockHoldServer(t, s3Server.URL)
810810+ holdServer.InitiateError = fmt.Errorf("hold service unavailable")
811811+ defer holdServer.Close()
812812+813813+ store := createTestProxyBlobStore(t, holdServer.URL)
814814+815815+ writer, err := store.Create(context.Background())
816816+ if err == nil {
817817+ t.Fatal("Expected error from Create()")
818818+ }
819819+ if writer != nil {
820820+ t.Error("Expected nil writer when error occurs")
821821+ }
822822+823823+ if !strings.Contains(err.Error(), "hold service unavailable") {
824824+ t.Errorf("Expected hold error message, got: %v", err)
825825+ }
826826+}
827827+828828+// TestWrite_BasicBuffer tests that small writes are buffered
829829+func TestWrite_BasicBuffer(t *testing.T) {
830830+ s3Server := newMockS3Server(t, true)
831831+ defer s3Server.Close()
832832+833833+ holdServer := newMockHoldServer(t, s3Server.URL)
834834+ defer holdServer.Close()
835835+836836+ store := createTestProxyBlobStore(t, holdServer.URL)
837837+838838+ writer, err := store.Create(context.Background())
839839+ if err != nil {
840840+ t.Fatalf("Create() failed: %v", err)
841841+ }
842842+ defer writer.Cancel(context.Background())
843843+844844+ // Write small data (1MB - well under 10MB threshold)
845845+ data := generateTestData(1 * 1024 * 1024)
846846+ n, err := writer.Write(data)
847847+ if err != nil {
848848+ t.Fatalf("Write() failed: %v", err)
849849+ }
850850+851851+ if n != len(data) {
852852+ t.Errorf("Expected to write %d bytes, wrote %d", len(data), n)
853853+ }
854854+855855+ // Verify size is tracked
856856+ if writer.Size() != int64(len(data)) {
857857+ t.Errorf("Expected size %d, got %d", len(data), writer.Size())
858858+ }
859859+860860+ // Verify NO flush occurred (no part uploads to S3)
861861+ s3Server.mu.Lock()
862862+ partCount := len(s3Server.Parts)
863863+ s3Server.mu.Unlock()
864864+865865+ if partCount != 0 {
866866+ t.Errorf("Expected 0 parts uploaded (data should be buffered), got %d", partCount)
867867+ }
868868+}
869869+870870+// TestWrite_TriggerFlush tests that writing 10MB triggers flush
871871+func TestWrite_TriggerFlush(t *testing.T) {
872872+ s3Server := newMockS3Server(t, true)
873873+ defer s3Server.Close()
874874+875875+ holdServer := newMockHoldServer(t, s3Server.URL)
876876+ defer holdServer.Close()
877877+878878+ store := createTestProxyBlobStore(t, holdServer.URL)
879879+880880+ writer, err := store.Create(context.Background())
881881+ if err != nil {
882882+ t.Fatalf("Create() failed: %v", err)
883883+ }
884884+ defer writer.Cancel(context.Background())
885885+886886+ // Write exactly 10MB (the threshold)
887887+ data := generateTestData(10 * 1024 * 1024)
888888+ _, err = writer.Write(data)
889889+ if err != nil {
890890+ t.Fatalf("Write() failed: %v", err)
891891+ }
892892+893893+ // Verify flush occurred (1 part uploaded)
894894+ s3Server.mu.Lock()
895895+ partCount := len(s3Server.Parts)
896896+ uploadedData := s3Server.Parts[1]
897897+ s3Server.mu.Unlock()
898898+899899+ if partCount != 1 {
900900+ t.Errorf("Expected 1 part uploaded after 10MB write, got %d", partCount)
901901+ }
902902+903903+ if len(uploadedData) != 10*1024*1024 {
904904+ t.Errorf("Expected uploaded part to be 10MB, got %d", len(uploadedData))
905905+ }
906906+}
907907+908908+// TestWrite_MultipleFlushes tests that writing 25MB triggers 2 flushes
909909+func TestWrite_MultipleFlushes(t *testing.T) {
910910+ s3Server := newMockS3Server(t, true)
911911+ defer s3Server.Close()
912912+913913+ holdServer := newMockHoldServer(t, s3Server.URL)
914914+ defer holdServer.Close()
915915+916916+ store := createTestProxyBlobStore(t, holdServer.URL)
917917+918918+ writer, err := store.Create(context.Background())
919919+ if err != nil {
920920+ t.Fatalf("Create() failed: %v", err)
921921+ }
922922+ defer writer.Cancel(context.Background())
923923+924924+ // Write 25MB in chunks (simulating Docker layer upload)
925925+ totalSize := 25 * 1024 * 1024
926926+ chunkSize := 64 * 1024 // 64KB chunks
927927+ data := generateTestData(chunkSize)
928928+929929+ for written := 0; written < totalSize; written += chunkSize {
930930+ _, err = writer.Write(data)
931931+ if err != nil {
932932+ t.Fatalf("Write() failed at byte %d: %v", written, err)
933933+ }
934934+ }
935935+936936+ // Verify 2 flushes occurred (10MB + 10MB), 5MB remains in buffer
937937+ s3Server.mu.Lock()
938938+ partCount := len(s3Server.Parts)
939939+ s3Server.mu.Unlock()
940940+941941+ if partCount != 2 {
942942+ t.Errorf("Expected 2 parts uploaded after 25MB write, got %d", partCount)
943943+ }
944944+945945+ // Verify size tracking
946946+ if writer.Size() != int64(totalSize) {
947947+ t.Errorf("Expected size %d, got %d", totalSize, writer.Size())
948948+ }
949949+950950+ // Verify part URL calls
951951+ holdServer.mu.Lock()
952952+ partURLCount := len(holdServer.PartURLCalls)
953953+ holdServer.mu.Unlock()
954954+955955+ if partURLCount != 2 {
956956+ t.Errorf("Expected 2 part URL calls, got %d", partURLCount)
957957+ }
958958+}
959959+960960+// TestWrite_ClosedWriter tests that Write() fails on closed writer
961961+func TestWrite_ClosedWriter(t *testing.T) {
962962+ s3Server := newMockS3Server(t, true)
963963+ defer s3Server.Close()
964964+965965+ holdServer := newMockHoldServer(t, s3Server.URL)
966966+ defer holdServer.Close()
967967+968968+ store := createTestProxyBlobStore(t, holdServer.URL)
969969+970970+ writer, err := store.Create(context.Background())
971971+ if err != nil {
972972+ t.Fatalf("Create() failed: %v", err)
973973+ }
974974+975975+ // Close the writer
976976+ writer.Cancel(context.Background())
977977+978978+ // Try to write
979979+ data := generateTestData(1024)
980980+ _, err = writer.Write(data)
981981+ if err == nil {
982982+ t.Fatal("Expected error writing to closed writer")
983983+ }
984984+985985+ if !strings.Contains(err.Error(), "closed") {
986986+ t.Errorf("Expected 'closed' error, got: %v", err)
987987+ }
988988+}
989989+990990+// TestFlushPart_Success tests successful part upload with ETag in header
991991+func TestFlushPart_Success(t *testing.T) {
992992+ s3Server := newMockS3Server(t, true) // ETag in header
993993+ defer s3Server.Close()
994994+995995+ holdServer := newMockHoldServer(t, s3Server.URL)
996996+ defer holdServer.Close()
997997+998998+ store := createTestProxyBlobStore(t, holdServer.URL)
999999+10001000+ writer, err := store.Create(context.Background())
10011001+ if err != nil {
10021002+ t.Fatalf("Create() failed: %v", err)
10031003+ }
10041004+ defer writer.Cancel(context.Background())
10051005+10061006+ // Write enough to trigger flush
10071007+ data := generateTestData(10 * 1024 * 1024)
10081008+ _, err = writer.Write(data)
10091009+ if err != nil {
10101010+ t.Fatalf("Write() failed: %v", err)
10111011+ }
10121012+10131013+ // Get the internal writer to check parts
10141014+ pbw := writer.(*ProxyBlobWriter)
10151015+ if len(pbw.parts) != 1 {
10161016+ t.Errorf("Expected 1 part recorded, got %d", len(pbw.parts))
10171017+ }
10181018+10191019+ if pbw.parts[0].PartNumber != 1 {
10201020+ t.Errorf("Expected part number 1, got %d", pbw.parts[0].PartNumber)
10211021+ }
10221022+10231023+ if pbw.parts[0].ETag != `"etag-part-1"` {
10241024+ t.Errorf("Expected ETag '\"etag-part-1\"', got %s", pbw.parts[0].ETag)
10251025+ }
10261026+}
10271027+10281028+// TestFlushPart_ETagInJSON tests successful part upload with ETag in JSON body
10291029+func TestFlushPart_ETagInJSON(t *testing.T) {
10301030+ s3Server := newMockS3Server(t, false) // ETag in JSON body
10311031+ defer s3Server.Close()
10321032+10331033+ holdServer := newMockHoldServer(t, s3Server.URL)
10341034+ defer holdServer.Close()
10351035+10361036+ store := createTestProxyBlobStore(t, holdServer.URL)
10371037+10381038+ writer, err := store.Create(context.Background())
10391039+ if err != nil {
10401040+ t.Fatalf("Create() failed: %v", err)
10411041+ }
10421042+ defer writer.Cancel(context.Background())
10431043+10441044+ // Write enough to trigger flush
10451045+ data := generateTestData(10 * 1024 * 1024)
10461046+ _, err = writer.Write(data)
10471047+ if err != nil {
10481048+ t.Fatalf("Write() failed: %v", err)
10491049+ }
10501050+10511051+ // Verify part was recorded with ETag from JSON
10521052+ pbw := writer.(*ProxyBlobWriter)
10531053+ if len(pbw.parts) != 1 {
10541054+ t.Errorf("Expected 1 part recorded, got %d", len(pbw.parts))
10551055+ }
10561056+10571057+ if pbw.parts[0].ETag != `"etag-part-1"` {
10581058+ t.Errorf("Expected ETag '\"etag-part-1\"', got %s", pbw.parts[0].ETag)
10591059+ }
10601060+}
10611061+10621062+// TestFlushPart_HoldError tests that flushPart returns error when hold fails
10631063+func TestFlushPart_HoldError(t *testing.T) {
10641064+ s3Server := newMockS3Server(t, true)
10651065+ defer s3Server.Close()
10661066+10671067+ holdServer := newMockHoldServer(t, s3Server.URL)
10681068+ holdServer.PartURLError = fmt.Errorf("hold service error")
10691069+ defer holdServer.Close()
10701070+10711071+ store := createTestProxyBlobStore(t, holdServer.URL)
10721072+10731073+ writer, err := store.Create(context.Background())
10741074+ if err != nil {
10751075+ t.Fatalf("Create() failed: %v", err)
10761076+ }
10771077+ defer writer.Cancel(context.Background())
10781078+10791079+ // Write enough to trigger flush
10801080+ data := generateTestData(10 * 1024 * 1024)
10811081+ _, err = writer.Write(data)
10821082+10831083+ if err == nil {
10841084+ t.Fatal("Expected error when hold service fails")
10851085+ }
10861086+10871087+ if !strings.Contains(err.Error(), "failed to get part upload info") {
10881088+ t.Errorf("Expected part upload info error, got: %v", err)
10891089+ }
10901090+}
10911091+10921092+// TestFlushPart_S3Error tests that flushPart returns error when S3 fails
10931093+func TestFlushPart_S3Error(t *testing.T) {
10941094+ s3Server := newMockS3Server(t, true)
10951095+ s3Server.UploadError = fmt.Errorf("S3 unavailable")
10961096+ defer s3Server.Close()
10971097+10981098+ holdServer := newMockHoldServer(t, s3Server.URL)
10991099+ defer holdServer.Close()
11001100+11011101+ store := createTestProxyBlobStore(t, holdServer.URL)
11021102+11031103+ writer, err := store.Create(context.Background())
11041104+ if err != nil {
11051105+ t.Fatalf("Create() failed: %v", err)
11061106+ }
11071107+ defer writer.Cancel(context.Background())
11081108+11091109+ // Write enough to trigger flush
11101110+ data := generateTestData(10 * 1024 * 1024)
11111111+ _, err = writer.Write(data)
11121112+11131113+ if err == nil {
11141114+ t.Fatal("Expected error when S3 fails")
11151115+ }
11161116+11171117+ if !strings.Contains(err.Error(), "part upload failed") {
11181118+ t.Errorf("Expected part upload failed error, got: %v", err)
11191119+ }
11201120+}
11211121+11221122+// TestFlushPart_NoETag tests that flushPart returns error when no ETag is returned
11231123+func TestFlushPart_NoETag(t *testing.T) {
11241124+ // Create a custom S3 server that returns no ETag
11251125+ s3Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
11261126+ // Return 200 OK but no ETag anywhere
11271127+ w.WriteHeader(http.StatusOK)
11281128+ w.Write([]byte(`{}`))
11291129+ }))
11301130+ defer s3Server.Close()
11311131+11321132+ holdServer := newMockHoldServer(t, s3Server.URL)
11331133+ defer holdServer.Close()
11341134+11351135+ store := createTestProxyBlobStore(t, holdServer.URL)
11361136+11371137+ writer, err := store.Create(context.Background())
11381138+ if err != nil {
11391139+ t.Fatalf("Create() failed: %v", err)
11401140+ }
11411141+ defer writer.Cancel(context.Background())
11421142+11431143+ // Write enough to trigger flush
11441144+ data := generateTestData(10 * 1024 * 1024)
11451145+ _, err = writer.Write(data)
11461146+11471147+ if err == nil {
11481148+ t.Fatal("Expected error when no ETag is returned")
11491149+ }
11501150+11511151+ if !strings.Contains(err.Error(), "no ETag") {
11521152+ t.Errorf("Expected no ETag error, got: %v", err)
11531153+ }
11541154+}
11551155+11561156+// TestReadFrom_SmallFile tests ReadFrom with data under flush threshold
11571157+func TestReadFrom_SmallFile(t *testing.T) {
11581158+ s3Server := newMockS3Server(t, true)
11591159+ defer s3Server.Close()
11601160+11611161+ holdServer := newMockHoldServer(t, s3Server.URL)
11621162+ defer holdServer.Close()
11631163+11641164+ store := createTestProxyBlobStore(t, holdServer.URL)
11651165+11661166+ writer, err := store.Create(context.Background())
11671167+ if err != nil {
11681168+ t.Fatalf("Create() failed: %v", err)
11691169+ }
11701170+ defer writer.Cancel(context.Background())
11711171+11721172+ // Stream 1MB through ReadFrom
11731173+ data := generateTestData(1 * 1024 * 1024)
11741174+ reader := bytes.NewReader(data)
11751175+11761176+ n, err := writer.ReadFrom(reader)
11771177+ if err != nil {
11781178+ t.Fatalf("ReadFrom() failed: %v", err)
11791179+ }
11801180+11811181+ if n != int64(len(data)) {
11821182+ t.Errorf("Expected to read %d bytes, got %d", len(data), n)
11831183+ }
11841184+11851185+ // Verify no flush occurred
11861186+ s3Server.mu.Lock()
11871187+ partCount := len(s3Server.Parts)
11881188+ s3Server.mu.Unlock()
11891189+11901190+ if partCount != 0 {
11911191+ t.Errorf("Expected 0 parts (data buffered), got %d", partCount)
11921192+ }
11931193+}
11941194+11951195+// TestReadFrom_LargeFile tests ReadFrom with multiple flushes
11961196+func TestReadFrom_LargeFile(t *testing.T) {
11971197+ s3Server := newMockS3Server(t, true)
11981198+ defer s3Server.Close()
11991199+12001200+ holdServer := newMockHoldServer(t, s3Server.URL)
12011201+ defer holdServer.Close()
12021202+12031203+ store := createTestProxyBlobStore(t, holdServer.URL)
12041204+12051205+ writer, err := store.Create(context.Background())
12061206+ if err != nil {
12071207+ t.Fatalf("Create() failed: %v", err)
12081208+ }
12091209+ defer writer.Cancel(context.Background())
12101210+12111211+ // Stream 25MB through ReadFrom
12121212+ data := generateTestData(25 * 1024 * 1024)
12131213+ reader := bytes.NewReader(data)
12141214+12151215+ n, err := writer.ReadFrom(reader)
12161216+ if err != nil {
12171217+ t.Fatalf("ReadFrom() failed: %v", err)
12181218+ }
12191219+12201220+ if n != int64(len(data)) {
12211221+ t.Errorf("Expected to read %d bytes, got %d", len(data), n)
12221222+ }
12231223+12241224+ // Verify 2 flushes occurred
12251225+ s3Server.mu.Lock()
12261226+ partCount := len(s3Server.Parts)
12271227+ s3Server.mu.Unlock()
12281228+12291229+ if partCount != 2 {
12301230+ t.Errorf("Expected 2 parts (2x 10MB), got %d", partCount)
12311231+ }
12321232+}
12331233+12341234+// TestReadFrom_ClosedWriter tests that ReadFrom fails on closed writer
12351235+func TestReadFrom_ClosedWriter(t *testing.T) {
12361236+ s3Server := newMockS3Server(t, true)
12371237+ defer s3Server.Close()
12381238+12391239+ holdServer := newMockHoldServer(t, s3Server.URL)
12401240+ defer holdServer.Close()
12411241+12421242+ store := createTestProxyBlobStore(t, holdServer.URL)
12431243+12441244+ writer, err := store.Create(context.Background())
12451245+ if err != nil {
12461246+ t.Fatalf("Create() failed: %v", err)
12471247+ }
12481248+12491249+ // Close the writer
12501250+ writer.Cancel(context.Background())
12511251+12521252+ // Try to ReadFrom
12531253+ data := generateTestData(1024)
12541254+ reader := bytes.NewReader(data)
12551255+12561256+ _, err = writer.ReadFrom(reader)
12571257+ if err == nil {
12581258+ t.Fatal("Expected error reading to closed writer")
12591259+ }
12601260+12611261+ if !strings.Contains(err.Error(), "closed") {
12621262+ t.Errorf("Expected 'closed' error, got: %v", err)
12631263+ }
12641264+}
12651265+12661266+// TestCommit_Success tests successful commit
12671267+func TestCommit_Success(t *testing.T) {
12681268+ s3Server := newMockS3Server(t, true)
12691269+ defer s3Server.Close()
12701270+12711271+ holdServer := newMockHoldServer(t, s3Server.URL)
12721272+ defer holdServer.Close()
12731273+12741274+ store := createTestProxyBlobStore(t, holdServer.URL)
12751275+12761276+ writer, err := store.Create(context.Background())
12771277+ if err != nil {
12781278+ t.Fatalf("Create() failed: %v", err)
12791279+ }
12801280+12811281+ // Write some data
12821282+ data := generateTestData(5 * 1024 * 1024)
12831283+ _, err = writer.Write(data)
12841284+ if err != nil {
12851285+ t.Fatalf("Write() failed: %v", err)
12861286+ }
12871287+12881288+ // Commit
12891289+ dgst := digest.FromBytes(data)
12901290+ desc, err := writer.Commit(context.Background(), distribution.Descriptor{
12911291+ Digest: dgst,
12921292+ Size: int64(len(data)),
12931293+ MediaType: "application/octet-stream",
12941294+ })
12951295+ if err != nil {
12961296+ t.Fatalf("Commit() failed: %v", err)
12971297+ }
12981298+12991299+ // Verify descriptor
13001300+ if desc.Digest != dgst {
13011301+ t.Errorf("Expected digest %s, got %s", dgst, desc.Digest)
13021302+ }
13031303+13041304+ if desc.Size != int64(len(data)) {
13051305+ t.Errorf("Expected size %d, got %d", len(data), desc.Size)
13061306+ }
13071307+13081308+ // Verify complete was called
13091309+ holdServer.mu.Lock()
13101310+ completeCount := len(holdServer.CompleteCalls)
13111311+ holdServer.mu.Unlock()
13121312+13131313+ if completeCount != 1 {
13141314+ t.Errorf("Expected 1 complete call, got %d", completeCount)
13151315+ }
13161316+13171317+ // Verify writer removed from global uploads
13181318+ globalUploadsMu.RLock()
13191319+ _, exists := globalUploads[writer.ID()]
13201320+ globalUploadsMu.RUnlock()
13211321+ if exists {
13221322+ t.Error("Writer should be removed from globalUploads after commit")
13231323+ }
13241324+}
13251325+13261326+// TestCommit_WithRemainingBuffer tests commit with data in buffer
13271327+func TestCommit_WithRemainingBuffer(t *testing.T) {
13281328+ s3Server := newMockS3Server(t, true)
13291329+ defer s3Server.Close()
13301330+13311331+ holdServer := newMockHoldServer(t, s3Server.URL)
13321332+ defer holdServer.Close()
13331333+13341334+ store := createTestProxyBlobStore(t, holdServer.URL)
13351335+13361336+ writer, err := store.Create(context.Background())
13371337+ if err != nil {
13381338+ t.Fatalf("Create() failed: %v", err)
13391339+ }
13401340+13411341+ // Write 15MB in chunks (simulating realistic upload)
13421342+ // This ensures: 1 flush at 10MB threshold + 5MB remaining in buffer
13431343+ totalSize := 15 * 1024 * 1024
13441344+ chunkSize := 64 * 1024 // 64KB chunks
13451345+ chunk := generateTestData(chunkSize)
13461346+13471347+ allData := make([]byte, 0, totalSize)
13481348+ for written := 0; written < totalSize; written += chunkSize {
13491349+ _, err = writer.Write(chunk)
13501350+ if err != nil {
13511351+ t.Fatalf("Write() failed: %v", err)
13521352+ }
13531353+ allData = append(allData, chunk...)
13541354+ }
13551355+13561356+ // At this point, 1 part should be uploaded (10MB), 5MB in buffer
13571357+ s3Server.mu.Lock()
13581358+ partsBeforeCommit := len(s3Server.Parts)
13591359+ s3Server.mu.Unlock()
13601360+13611361+ if partsBeforeCommit != 1 {
13621362+ t.Errorf("Expected 1 part before commit, got %d", partsBeforeCommit)
13631363+ }
13641364+13651365+ // Commit
13661366+ dgst := digest.FromBytes(allData)
13671367+ _, err = writer.Commit(context.Background(), distribution.Descriptor{
13681368+ Digest: dgst,
13691369+ Size: int64(totalSize),
13701370+ MediaType: "application/octet-stream",
13711371+ })
13721372+ if err != nil {
13731373+ t.Fatalf("Commit() failed: %v", err)
13741374+ }
13751375+13761376+ // Verify final flush happened (now 2 parts)
13771377+ s3Server.mu.Lock()
13781378+ partsAfterCommit := len(s3Server.Parts)
13791379+ s3Server.mu.Unlock()
13801380+13811381+ if partsAfterCommit != 2 {
13821382+ t.Errorf("Expected 2 parts after commit, got %d", partsAfterCommit)
13831383+ }
13841384+13851385+ // Verify complete was called with 2 parts
13861386+ holdServer.mu.Lock()
13871387+ if len(holdServer.CompleteCalls) != 1 {
13881388+ t.Fatalf("Expected 1 complete call, got %d", len(holdServer.CompleteCalls))
13891389+ }
13901390+ completedParts := len(holdServer.CompleteCalls[0].Parts)
13911391+ holdServer.mu.Unlock()
13921392+13931393+ if completedParts != 2 {
13941394+ t.Errorf("Expected complete to have 2 parts, got %d", completedParts)
13951395+ }
13961396+}
13971397+13981398+// TestCommit_FlushError tests that commit handles flush error
13991399+func TestCommit_FlushError(t *testing.T) {
14001400+ s3Server := newMockS3Server(t, true)
14011401+ defer s3Server.Close()
14021402+14031403+ holdServer := newMockHoldServer(t, s3Server.URL)
14041404+ defer holdServer.Close()
14051405+14061406+ store := createTestProxyBlobStore(t, holdServer.URL)
14071407+14081408+ writer, err := store.Create(context.Background())
14091409+ if err != nil {
14101410+ t.Fatalf("Create() failed: %v", err)
14111411+ }
14121412+14131413+ // Write some data
14141414+ data := generateTestData(5 * 1024 * 1024)
14151415+ _, err = writer.Write(data)
14161416+ if err != nil {
14171417+ t.Fatalf("Write() failed: %v", err)
14181418+ }
14191419+14201420+ // Inject error for final flush
14211421+ holdServer.mu.Lock()
14221422+ holdServer.PartURLError = fmt.Errorf("flush error")
14231423+ holdServer.mu.Unlock()
14241424+14251425+ // Commit should fail
14261426+ dgst := digest.FromBytes(data)
14271427+ _, err = writer.Commit(context.Background(), distribution.Descriptor{
14281428+ Digest: dgst,
14291429+ Size: int64(len(data)),
14301430+ })
14311431+14321432+ if err == nil {
14331433+ t.Fatal("Expected error from Commit() when flush fails")
14341434+ }
14351435+14361436+ if !strings.Contains(err.Error(), "failed to flush final part") {
14371437+ t.Errorf("Expected flush error, got: %v", err)
14381438+ }
14391439+14401440+ // Verify abort was called
14411441+ holdServer.mu.Lock()
14421442+ abortCount := len(holdServer.AbortCalls)
14431443+ holdServer.mu.Unlock()
14441444+14451445+ if abortCount != 1 {
14461446+ t.Errorf("Expected 1 abort call after flush error, got %d", abortCount)
14471447+ }
14481448+}
14491449+14501450+// TestCommit_CompleteError tests that commit handles complete error
14511451+func TestCommit_CompleteError(t *testing.T) {
14521452+ s3Server := newMockS3Server(t, true)
14531453+ defer s3Server.Close()
14541454+14551455+ holdServer := newMockHoldServer(t, s3Server.URL)
14561456+ holdServer.CompleteError = fmt.Errorf("complete failed")
14571457+ defer holdServer.Close()
14581458+14591459+ store := createTestProxyBlobStore(t, holdServer.URL)
14601460+14611461+ writer, err := store.Create(context.Background())
14621462+ if err != nil {
14631463+ t.Fatalf("Create() failed: %v", err)
14641464+ }
14651465+14661466+ // Write some data
14671467+ data := generateTestData(1 * 1024 * 1024)
14681468+ _, err = writer.Write(data)
14691469+ if err != nil {
14701470+ t.Fatalf("Write() failed: %v", err)
14711471+ }
14721472+14731473+ // Commit should fail
14741474+ dgst := digest.FromBytes(data)
14751475+ _, err = writer.Commit(context.Background(), distribution.Descriptor{
14761476+ Digest: dgst,
14771477+ Size: int64(len(data)),
14781478+ })
14791479+14801480+ if err == nil {
14811481+ t.Fatal("Expected error from Commit() when complete fails")
14821482+ }
14831483+14841484+ if !strings.Contains(err.Error(), "failed to complete multipart upload") {
14851485+ t.Errorf("Expected complete error, got: %v", err)
14861486+ }
14871487+}
14881488+14891489+// TestCommit_ClosedWriter tests that Commit() fails on closed writer
14901490+func TestCommit_ClosedWriter(t *testing.T) {
14911491+ s3Server := newMockS3Server(t, true)
14921492+ defer s3Server.Close()
14931493+14941494+ holdServer := newMockHoldServer(t, s3Server.URL)
14951495+ defer holdServer.Close()
14961496+14971497+ store := createTestProxyBlobStore(t, holdServer.URL)
14981498+14991499+ writer, err := store.Create(context.Background())
15001500+ if err != nil {
15011501+ t.Fatalf("Create() failed: %v", err)
15021502+ }
15031503+15041504+ // Close the writer
15051505+ writer.Cancel(context.Background())
15061506+15071507+ // Try to commit
15081508+ dgst := digest.FromString("test")
15091509+ _, err = writer.Commit(context.Background(), distribution.Descriptor{
15101510+ Digest: dgst,
15111511+ Size: 4,
15121512+ })
15131513+15141514+ if err == nil {
15151515+ t.Fatal("Expected error committing closed writer")
15161516+ }
15171517+15181518+ if !strings.Contains(err.Error(), "closed") {
15191519+ t.Errorf("Expected 'closed' error, got: %v", err)
15201520+ }
15211521+}
15221522+15231523+// TestCancel_Success tests successful cancel
15241524+func TestCancel_Success(t *testing.T) {
15251525+ s3Server := newMockS3Server(t, true)
15261526+ defer s3Server.Close()
15271527+15281528+ holdServer := newMockHoldServer(t, s3Server.URL)
15291529+ defer holdServer.Close()
15301530+15311531+ store := createTestProxyBlobStore(t, holdServer.URL)
15321532+15331533+ writer, err := store.Create(context.Background())
15341534+ if err != nil {
15351535+ t.Fatalf("Create() failed: %v", err)
15361536+ }
15371537+15381538+ writerID := writer.ID()
15391539+15401540+ // Cancel
15411541+ err = writer.Cancel(context.Background())
15421542+ if err != nil {
15431543+ t.Fatalf("Cancel() failed: %v", err)
15441544+ }
15451545+15461546+ // Verify abort was called
15471547+ holdServer.mu.Lock()
15481548+ abortCount := len(holdServer.AbortCalls)
15491549+ holdServer.mu.Unlock()
15501550+15511551+ if abortCount != 1 {
15521552+ t.Errorf("Expected 1 abort call, got %d", abortCount)
15531553+ }
15541554+15551555+ // Verify removed from global uploads
15561556+ globalUploadsMu.RLock()
15571557+ _, exists := globalUploads[writerID]
15581558+ globalUploadsMu.RUnlock()
15591559+15601560+ if exists {
15611561+ t.Error("Writer should be removed from globalUploads after cancel")
15621562+ }
15631563+}
15641564+15651565+// TestCancel_AbortError tests that Cancel() still succeeds when abort fails
15661566+func TestCancel_AbortError(t *testing.T) {
15671567+ s3Server := newMockS3Server(t, true)
15681568+ defer s3Server.Close()
15691569+15701570+ holdServer := newMockHoldServer(t, s3Server.URL)
15711571+ holdServer.AbortError = fmt.Errorf("abort failed")
15721572+ defer holdServer.Close()
15731573+15741574+ store := createTestProxyBlobStore(t, holdServer.URL)
15751575+15761576+ writer, err := store.Create(context.Background())
15771577+ if err != nil {
15781578+ t.Fatalf("Create() failed: %v", err)
15791579+ }
15801580+15811581+ writerID := writer.ID()
15821582+15831583+ // Cancel should still return nil (graceful)
15841584+ err = writer.Cancel(context.Background())
15851585+ if err != nil {
15861586+ t.Errorf("Cancel() should return nil even when abort fails, got: %v", err)
15871587+ }
15881588+15891589+ // Verify still removed from global uploads
15901590+ globalUploadsMu.RLock()
15911591+ _, exists := globalUploads[writerID]
15921592+ globalUploadsMu.RUnlock()
15931593+15941594+ if exists {
15951595+ t.Error("Writer should be removed from globalUploads even when abort fails")
15961596+ }
15971597+}
15981598+15991599+// TestCancel_AlreadyClosed tests that Cancel() is idempotent
16001600+func TestCancel_AlreadyClosed(t *testing.T) {
16011601+ s3Server := newMockS3Server(t, true)
16021602+ defer s3Server.Close()
16031603+16041604+ holdServer := newMockHoldServer(t, s3Server.URL)
16051605+ defer holdServer.Close()
16061606+16071607+ store := createTestProxyBlobStore(t, holdServer.URL)
16081608+16091609+ writer, err := store.Create(context.Background())
16101610+ if err != nil {
16111611+ t.Fatalf("Create() failed: %v", err)
16121612+ }
16131613+16141614+ // Cancel twice
16151615+ err1 := writer.Cancel(context.Background())
16161616+ err2 := writer.Cancel(context.Background())
16171617+16181618+ if err1 != nil || err2 != nil {
16191619+ t.Errorf("Cancel() should be idempotent, got err1=%v, err2=%v", err1, err2)
16201620+ }
16211621+}
16221622+16231623+// TestResume_Success tests successful resume
16241624+func TestResume_Success(t *testing.T) {
16251625+ s3Server := newMockS3Server(t, true)
16261626+ defer s3Server.Close()
16271627+16281628+ holdServer := newMockHoldServer(t, s3Server.URL)
16291629+ defer holdServer.Close()
16301630+16311631+ store := createTestProxyBlobStore(t, holdServer.URL)
16321632+16331633+ writer, err := store.Create(context.Background())
16341634+ if err != nil {
16351635+ t.Fatalf("Create() failed: %v", err)
16361636+ }
16371637+ defer writer.Cancel(context.Background())
16381638+16391639+ writerID := writer.ID()
16401640+16411641+ // Write some data
16421642+ data := generateTestData(1024)
16431643+ _, err = writer.Write(data)
16441644+ if err != nil {
16451645+ t.Fatalf("Write() failed: %v", err)
16461646+ }
16471647+16481648+ // Resume should return the same writer
16491649+ resumedWriter, err := store.Resume(context.Background(), writerID)
16501650+ if err != nil {
16511651+ t.Fatalf("Resume() failed: %v", err)
16521652+ }
16531653+16541654+ // Should be the same writer instance
16551655+ if resumedWriter.ID() != writerID {
16561656+ t.Errorf("Expected same writer ID, got %s", resumedWriter.ID())
16571657+ }
16581658+16591659+ // Size should be preserved
16601660+ if resumedWriter.Size() != int64(len(data)) {
16611661+ t.Errorf("Expected size %d, got %d", len(data), resumedWriter.Size())
16621662+ }
16631663+}
16641664+16651665+// TestResume_NotFound tests that Resume() returns error for unknown ID
16661666+func TestResume_NotFound(t *testing.T) {
16671667+ s3Server := newMockS3Server(t, true)
16681668+ defer s3Server.Close()
16691669+16701670+ holdServer := newMockHoldServer(t, s3Server.URL)
16711671+ defer holdServer.Close()
16721672+16731673+ store := createTestProxyBlobStore(t, holdServer.URL)
16741674+16751675+ // Try to resume non-existent upload
16761676+ _, err := store.Resume(context.Background(), "non-existent-upload-id")
16771677+ if err == nil {
16781678+ t.Fatal("Expected error for non-existent upload")
16791679+ }
16801680+16811681+ if err != distribution.ErrBlobUploadUnknown {
16821682+ t.Errorf("Expected ErrBlobUploadUnknown, got: %v", err)
16831683+ }
16841684+}
16851685+16861686+// TestFullUploadFlow_25MB tests the complete upload flow with a 25MB file
16871687+func TestFullUploadFlow_25MB(t *testing.T) {
16881688+ s3Server := newMockS3Server(t, true)
16891689+ defer s3Server.Close()
16901690+16911691+ holdServer := newMockHoldServer(t, s3Server.URL)
16921692+ defer holdServer.Close()
16931693+16941694+ store := createTestProxyBlobStore(t, holdServer.URL)
16951695+16961696+ // Create writer
16971697+ writer, err := store.Create(context.Background())
16981698+ if err != nil {
16991699+ t.Fatalf("Create() failed: %v", err)
17001700+ }
17011701+17021702+ // Write 25MB in chunks (simulating Docker layer upload)
17031703+ totalSize := 25 * 1024 * 1024
17041704+ chunkSize := 64 * 1024 // 64KB chunks (realistic for Docker)
17051705+ allData := generateTestData(totalSize)
17061706+17071707+ for i := 0; i < totalSize; i += chunkSize {
17081708+ end := i + chunkSize
17091709+ if end > totalSize {
17101710+ end = totalSize
17111711+ }
17121712+ _, err = writer.Write(allData[i:end])
17131713+ if err != nil {
17141714+ t.Fatalf("Write() failed at byte %d: %v", i, err)
17151715+ }
17161716+ }
17171717+17181718+ // Verify 2 parts uploaded during write (10MB + 10MB)
17191719+ s3Server.mu.Lock()
17201720+ partsBeforeCommit := len(s3Server.Parts)
17211721+ s3Server.mu.Unlock()
17221722+ if partsBeforeCommit != 2 {
17231723+ t.Errorf("Expected 2 parts before commit, got %d", partsBeforeCommit)
17241724+ }
17251725+17261726+ // Commit
17271727+ dgst := digest.FromBytes(allData)
17281728+ desc, err := writer.Commit(context.Background(), distribution.Descriptor{
17291729+ Digest: dgst,
17301730+ Size: int64(totalSize),
17311731+ MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
17321732+ })
17331733+ if err != nil {
17341734+ t.Fatalf("Commit() failed: %v", err)
17351735+ }
17361736+17371737+ // Verify 3 total parts (10MB + 10MB + 5MB final)
17381738+ s3Server.mu.Lock()
17391739+ partsAfterCommit := len(s3Server.Parts)
17401740+ s3Server.mu.Unlock()
17411741+ if partsAfterCommit != 3 {
17421742+ t.Errorf("Expected 3 parts after commit, got %d", partsAfterCommit)
17431743+ }
17441744+17451745+ // Verify complete was called with correct parts
17461746+ holdServer.mu.Lock()
17471747+ if len(holdServer.CompleteCalls) != 1 {
17481748+ t.Fatalf("Expected 1 complete call, got %d", len(holdServer.CompleteCalls))
17491749+ }
17501750+ completeCall := holdServer.CompleteCalls[0]
17511751+ holdServer.mu.Unlock()
17521752+17531753+ if len(completeCall.Parts) != 3 {
17541754+ t.Errorf("Expected 3 parts in complete call, got %d", len(completeCall.Parts))
17551755+ }
17561756+17571757+ if completeCall.Digest != dgst.String() {
17581758+ t.Errorf("Expected digest %s, got %s", dgst.String(), completeCall.Digest)
17591759+ }
17601760+17611761+ // Verify descriptor
17621762+ if desc.Size != int64(totalSize) {
17631763+ t.Errorf("Expected descriptor size %d, got %d", totalSize, desc.Size)
17641764+ }
17651765+17661766+ // Verify data integrity - check each part has correct size
17671767+ s3Server.mu.Lock()
17681768+ part1Size := len(s3Server.Parts[1])
17691769+ part2Size := len(s3Server.Parts[2])
17701770+ part3Size := len(s3Server.Parts[3])
17711771+ s3Server.mu.Unlock()
17721772+17731773+ expectedPart1 := 10 * 1024 * 1024
17741774+ expectedPart2 := 10 * 1024 * 1024
17751775+ expectedPart3 := 5 * 1024 * 1024
17761776+17771777+ if part1Size != expectedPart1 {
17781778+ t.Errorf("Part 1 expected %d bytes, got %d", expectedPart1, part1Size)
17791779+ }
17801780+ if part2Size != expectedPart2 {
17811781+ t.Errorf("Part 2 expected %d bytes, got %d", expectedPart2, part2Size)
17821782+ }
17831783+ if part3Size != expectedPart3 {
17841784+ t.Errorf("Part 3 expected %d bytes, got %d", expectedPart3, part3Size)
17851785+ }
17861786+}
17871787+17881788+// TestProxyBlobWriter_ID tests the ID() method
17891789+func TestProxyBlobWriter_ID(t *testing.T) {
17901790+ s3Server := newMockS3Server(t, true)
17911791+ defer s3Server.Close()
17921792+17931793+ holdServer := newMockHoldServer(t, s3Server.URL)
17941794+ defer holdServer.Close()
17951795+17961796+ store := createTestProxyBlobStore(t, holdServer.URL)
17971797+17981798+ writer, err := store.Create(context.Background())
17991799+ if err != nil {
18001800+ t.Fatalf("Create() failed: %v", err)
18011801+ }
18021802+ defer writer.Cancel(context.Background())
18031803+18041804+ id := writer.ID()
18051805+ if id == "" {
18061806+ t.Error("Expected non-empty ID")
18071807+ }
18081808+18091809+ if !strings.HasPrefix(id, "upload-") {
18101810+ t.Errorf("Expected ID to start with 'upload-', got %s", id)
18111811+ }
18121812+}
18131813+18141814+// TestProxyBlobWriter_StartedAt tests the StartedAt() method
18151815+func TestProxyBlobWriter_StartedAt(t *testing.T) {
18161816+ s3Server := newMockS3Server(t, true)
18171817+ defer s3Server.Close()
18181818+18191819+ holdServer := newMockHoldServer(t, s3Server.URL)
18201820+ defer holdServer.Close()
18211821+18221822+ store := createTestProxyBlobStore(t, holdServer.URL)
18231823+18241824+ before := time.Now()
18251825+ writer, err := store.Create(context.Background())
18261826+ if err != nil {
18271827+ t.Fatalf("Create() failed: %v", err)
18281828+ }
18291829+ after := time.Now()
18301830+ defer writer.Cancel(context.Background())
18311831+18321832+ startedAt := writer.StartedAt()
18331833+ if startedAt.Before(before) || startedAt.After(after) {
18341834+ t.Errorf("StartedAt() should be between %v and %v, got %v", before, after, startedAt)
18351835+ }
18361836+}
18371837+18381838+// TestProxyBlobWriter_Size tests the Size() method
18391839+func TestProxyBlobWriter_Size(t *testing.T) {
18401840+ s3Server := newMockS3Server(t, true)
18411841+ defer s3Server.Close()
18421842+18431843+ holdServer := newMockHoldServer(t, s3Server.URL)
18441844+ defer holdServer.Close()
18451845+18461846+ store := createTestProxyBlobStore(t, holdServer.URL)
18471847+18481848+ writer, err := store.Create(context.Background())
18491849+ if err != nil {
18501850+ t.Fatalf("Create() failed: %v", err)
18511851+ }
18521852+ defer writer.Cancel(context.Background())
18531853+18541854+ // Initial size should be 0
18551855+ if writer.Size() != 0 {
18561856+ t.Errorf("Expected initial size 0, got %d", writer.Size())
18571857+ }
18581858+18591859+ // Write 1KB
18601860+ data := generateTestData(1024)
18611861+ writer.Write(data)
18621862+18631863+ if writer.Size() != 1024 {
18641864+ t.Errorf("Expected size 1024 after write, got %d", writer.Size())
18651865+ }
18661866+18671867+ // Write another 2KB
18681868+ data2 := generateTestData(2048)
18691869+ writer.Write(data2)
18701870+18711871+ if writer.Size() != 3072 {
18721872+ t.Errorf("Expected size 3072 after second write, got %d", writer.Size())
18731873+ }
18741874+}
18751875+18761876+// TestProxyBlobWriter_Close tests the Close() method
18771877+func TestProxyBlobWriter_Close(t *testing.T) {
18781878+ s3Server := newMockS3Server(t, true)
18791879+ defer s3Server.Close()
18801880+18811881+ holdServer := newMockHoldServer(t, s3Server.URL)
18821882+ defer holdServer.Close()
18831883+18841884+ store := createTestProxyBlobStore(t, holdServer.URL)
18851885+18861886+ writer, err := store.Create(context.Background())
18871887+ if err != nil {
18881888+ t.Fatalf("Create() failed: %v", err)
18891889+ }
18901890+ defer writer.Cancel(context.Background())
18911891+18921892+ // Close should not error
18931893+ err = writer.Close()
18941894+ if err != nil {
18951895+ t.Errorf("Close() should not return error, got: %v", err)
18961896+ }
18971897+18981898+ // Close should NOT mark writer as closed (allows resume)
18991899+ // Write should still work after Close
19001900+ data := generateTestData(1024)
19011901+ _, err = writer.Write(data)
19021902+ if err != nil {
19031903+ t.Errorf("Write() should work after Close(), got: %v", err)
5281904 }
5291905}
5301906
+24-41
pkg/hold/config.go
···103103 // TestMode uses localhost for OAuth redirects while storing real URL in hold record (from env: TEST_MODE)
104104 TestMode bool `yaml:"test_mode"`
105105106106- // DisablePresignedURLs forces proxy mode even with S3 configured (for testing) (from env: DISABLE_PRESIGNED_URLS)
107107- DisablePresignedURLs bool `yaml:"disable_presigned_urls"`
108108-109106 // RelayEndpoint is the ATProto relay URL to request crawl from on startup (from env: HOLD_RELAY_ENDPOINT)
110107 // If empty, no crawl request is made. Default: https://bsky.network
111108 RelayEndpoint string `yaml:"relay_endpoint"`
···153150 }
154151 cfg.Server.Public = os.Getenv("HOLD_PUBLIC") == "true"
155152 cfg.Server.TestMode = os.Getenv("TEST_MODE") == "true"
156156- cfg.Server.DisablePresignedURLs = os.Getenv("DISABLE_PRESIGNED_URLS") == "true"
157153 cfg.Server.RelayEndpoint = os.Getenv("HOLD_RELAY_ENDPOINT")
158154 cfg.Server.ReadTimeout = 5 * time.Minute // Increased for large blob uploads
159155 cfg.Server.WriteTimeout = 5 * time.Minute // Increased for large blob uploads
···173169 cfg.Database.KeyPath = filepath.Join(cfg.Database.Path, "signing.key")
174170 }
175171176176- // Storage configuration - build from env vars based on storage type
177177- storageType := getEnvOrDefault("STORAGE_DRIVER", "s3")
172172+ // Storage configuration - S3 is required (filesystem support removed)
178173 var err error
179179- cfg.Storage, err = buildStorageConfig(storageType)
174174+ cfg.Storage, err = buildStorageConfig()
180175 if err != nil {
181176 return nil, fmt.Errorf("failed to build storage config: %w", err)
182177 }
···189184 cfg.Registration.Region = meta.Region
190185 slog.Info("Detected cloud metadata", "region", meta.Region)
191186 } else {
192192- // Fall back to S3 region
193193- if storageType == "s3" {
194194- cfg.Registration.Region = getEnvOrDefault("AWS_REGION", "us-east-1")
195195- slog.Info("Using S3 region", "region", cfg.Registration.Region)
196196- }
187187+ // Fall back to S3 region (S3 is always used)
188188+ cfg.Registration.Region = getEnvOrDefault("AWS_REGION", "us-east-1")
189189+ slog.Info("Using S3 region", "region", cfg.Registration.Region)
197190 }
198191199192 return cfg, nil
200193}
201194202202-// buildStorageConfig creates storage configuration based on driver type
203203-func buildStorageConfig(driver string) (StorageConfig, error) {
195195+// buildStorageConfig creates S3 storage configuration from environment variables
196196+// S3 is the only supported storage backend
197197+func buildStorageConfig() (StorageConfig, error) {
204198 params := make(map[string]any)
205199206206- switch driver {
207207- case "s3":
208208- // S3/Storj/Minio configuration from standard AWS env vars
209209- accessKey := os.Getenv("AWS_ACCESS_KEY_ID")
210210- secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
211211- region := getEnvOrDefault("AWS_REGION", "us-east-1")
212212- bucket := os.Getenv("S3_BUCKET")
213213- endpoint := os.Getenv("S3_ENDPOINT") // For Storj/Minio
200200+ // S3/Storj/Minio configuration from standard AWS env vars
201201+ accessKey := os.Getenv("AWS_ACCESS_KEY_ID")
202202+ secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
203203+ region := getEnvOrDefault("AWS_REGION", "us-east-1")
204204+ bucket := os.Getenv("S3_BUCKET")
205205+ endpoint := os.Getenv("S3_ENDPOINT") // For Storj/Minio
214206215215- if bucket == "" {
216216- return StorageConfig{}, fmt.Errorf("S3_BUCKET is required for S3 storage")
217217- }
218218-219219- params["accesskey"] = accessKey
220220- params["secretkey"] = secretKey
221221- params["region"] = region
222222- params["bucket"] = bucket
223223- if endpoint != "" {
224224- params["regionendpoint"] = endpoint
225225- }
226226-227227- case "filesystem":
228228- // Filesystem configuration
229229- rootDir := getEnvOrDefault("STORAGE_ROOT_DIR", "/var/lib/atcr/hold")
230230- params["rootdirectory"] = rootDir
207207+ if bucket == "" {
208208+ return StorageConfig{}, fmt.Errorf("S3_BUCKET is required - S3 is the only supported storage backend")
209209+ }
231210232232- default:
233233- return StorageConfig{}, fmt.Errorf("unsupported storage driver: %s", driver)
211211+ params["accesskey"] = accessKey
212212+ params["secretkey"] = secretKey
213213+ params["region"] = region
214214+ params["bucket"] = bucket
215215+ if endpoint != "" {
216216+ params["regionendpoint"] = endpoint
234217 }
235218236219 // Build distribution Storage config
237220 storageCfg := configuration.Storage{}
238238- storageCfg[driver] = configuration.Parameters(params)
221221+ storageCfg["s3"] = configuration.Parameters(params)
239222240223 return StorageConfig{Storage: storageCfg}, nil
241224}