A zero-dependency AT Protocol Personal Data Server written in JavaScript

feat: add blob storage support with R2 backend

Implements full AT Protocol blob support:

- uploadBlob: Upload blobs with MIME type sniffing (JPEG, PNG, GIF,
WebP, MP4, AVIF, HEIC) and CID generation using raw codec
- getBlob: Retrieve blobs with proper Content-Type, security headers,
and CID format validation
- listBlobs: Paginated blob listing with composite cursor

Storage and lifecycle:
- R2 bucket for blob data with DID-prefixed keys
- SQLite tables for blob metadata and record associations
- Automatic orphan cleanup via DO alarm (24hr) and on record deletion
- Race-safe concurrent upload handling

Includes comprehensive test coverage:
- Unit tests for MIME sniffing, CID generation, blob ref detection
- E2E tests for all blob endpoints and lifecycle scenarios

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+12 -3
README.md
··· 8 8 9 9 - [x] Repo operations (createRecord, getRecord, putRecord, deleteRecord, applyWrites, listRecords) 10 10 - [x] Sync endpoints (getRepo, getRecord, subscribeRepos, listRepos, getLatestCommit) 11 - - [x] Auth (createSession, getSession) 11 + - [x] Auth (createSession, getSession, refreshSession) 12 12 - [x] Handle resolution (resolveHandle) 13 13 - [x] AppView proxy (app.bsky.* forwarding with service auth) 14 14 - [x] Relay notification (requestCrawl) 15 15 - [x] Single or multi-user (each DID gets isolated storage, no self-service signup yet) 16 - - [ ] Blob storage (uploadBlob, getBlob, listBlobs) 17 - - [ ] refreshSession 16 + - [x] Blob storage (uploadBlob, getBlob, listBlobs) 18 17 - [ ] deleteSession (logout) 19 18 - [ ] updateHandle 20 19 - [ ] importRepo ··· 63 62 wrangler secret put JWT_SECRET 64 63 wrangler secret put RELAY_HOST # optional 65 64 ``` 65 + 66 + ### Blob Storage 67 + 68 + Blobs (images, videos) are stored in Cloudflare R2. Create the bucket before deploying: 69 + 70 + ```bash 71 + npx wrangler r2 bucket create pds-blobs 72 + ``` 73 + 74 + The binding is already configured in `wrangler.toml`. Supported formats: JPEG, PNG, GIF, WebP, MP4. Max size: 50MB. Orphaned blobs are automatically cleaned up after 24 hours. 66 75 67 76 ## Testing 68 77
+888
docs/plans/2026-01-06-blob-support.md
··· 1 + # Blob Support Implementation Plan 2 + 3 + > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. 4 + 5 + **Goal:** Add blob (image/video) upload, storage, and retrieval to the PDS using Cloudflare R2. 6 + 7 + **Architecture:** Blobs stored in R2 bucket keyed by `{did}/{cid}`. Metadata tracked in SQLite tables (`blob`, `record_blob`) within each Durable Object. Orphan cleanup via DO alarm. MIME sniffing for security. 8 + 9 + **Tech Stack:** Cloudflare R2, Durable Object SQLite, Web Crypto API (SHA-256 for CID generation) 10 + 11 + --- 12 + 13 + ## Task 1: Add R2 Bucket Binding 14 + 15 + **Files:** 16 + - Modify: `wrangler.toml` 17 + 18 + **Step 1: Add R2 binding to wrangler.toml** 19 + 20 + Add after the existing migrations section: 21 + 22 + ```toml 23 + [[r2_buckets]] 24 + binding = "BLOBS" 25 + bucket_name = "pds-blobs" 26 + ``` 27 + 28 + **Step 2: Create R2 bucket (if not exists)** 29 + 30 + Run: `npx wrangler r2 bucket create pds-blobs` 31 + 32 + **Step 3: Commit** 33 + 34 + ```bash 35 + git add wrangler.toml 36 + git commit -m "feat: add R2 bucket binding for blob storage" 37 + ``` 38 + 39 + --- 40 + 41 + ## Task 2: Add Blob Database Schema 42 + 43 + **Files:** 44 + - Modify: `src/pds.js:1162-1190` (constructor schema initialization) 45 + 46 + **Step 1: Add blob and record_blob tables** 47 + 48 + In the `PersonalDataServer` constructor, after the existing `CREATE TABLE` statements (around line 1186), add: 49 + 50 + ```javascript 51 + CREATE TABLE IF NOT EXISTS blob ( 52 + cid TEXT PRIMARY KEY, 53 + mimeType TEXT NOT NULL, 54 + size INTEGER NOT NULL, 55 + createdAt TEXT NOT NULL 56 + ); 57 + 58 + CREATE TABLE IF NOT EXISTS record_blob ( 59 + blobCid TEXT NOT NULL, 60 + recordUri TEXT NOT NULL, 61 + PRIMARY KEY (blobCid, recordUri) 62 + ); 63 + ``` 64 + 65 + **Step 2: Test schema creation manually** 66 + 67 + Deploy and verify tables exist: 68 + ```bash 69 + npx wrangler deploy 70 + ``` 71 + 72 + **Step 3: Commit** 73 + 74 + ```bash 75 + git add src/pds.js 76 + git commit -m "feat: add blob and record_blob tables to schema" 77 + ``` 78 + 79 + --- 80 + 81 + ## Task 3: Implement MIME Type Sniffing 82 + 83 + **Files:** 84 + - Modify: `src/pds.js` (add after error helper, around line 30) 85 + - Test: `test/pds.test.js` 86 + 87 + **Step 1: Write the failing test** 88 + 89 + Add to `test/pds.test.js`: 90 + 91 + ```javascript 92 + import { 93 + // ... existing imports ... 94 + sniffMimeType, 95 + } from '../src/pds.js'; 96 + 97 + describe('MIME Type Sniffing', () => { 98 + test('detects JPEG', () => { 99 + const bytes = new Uint8Array([0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10]); 100 + assert.strictEqual(sniffMimeType(bytes), 'image/jpeg'); 101 + }); 102 + 103 + test('detects PNG', () => { 104 + const bytes = new Uint8Array([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]); 105 + assert.strictEqual(sniffMimeType(bytes), 'image/png'); 106 + }); 107 + 108 + test('detects GIF', () => { 109 + const bytes = new Uint8Array([0x47, 0x49, 0x46, 0x38, 0x39, 0x61]); 110 + assert.strictEqual(sniffMimeType(bytes), 'image/gif'); 111 + }); 112 + 113 + test('detects WebP', () => { 114 + const bytes = new Uint8Array([ 115 + 0x52, 0x49, 0x46, 0x46, // RIFF 116 + 0x00, 0x00, 0x00, 0x00, // size (ignored) 117 + 0x57, 0x45, 0x42, 0x50, // WEBP 118 + ]); 119 + assert.strictEqual(sniffMimeType(bytes), 'image/webp'); 120 + }); 121 + 122 + test('detects MP4', () => { 123 + const bytes = new Uint8Array([ 124 + 0x00, 0x00, 0x00, 0x18, // size 125 + 0x66, 0x74, 0x79, 0x70, // ftyp 126 + ]); 127 + assert.strictEqual(sniffMimeType(bytes), 'video/mp4'); 128 + }); 129 + 130 + test('returns null for unknown', () => { 131 + const bytes = new Uint8Array([0x00, 0x01, 0x02, 0x03]); 132 + assert.strictEqual(sniffMimeType(bytes), null); 133 + }); 134 + }); 135 + ``` 136 + 137 + **Step 2: Run test to verify it fails** 138 + 139 + Run: `npm test` 140 + Expected: FAIL with "sniffMimeType is not exported" 141 + 142 + **Step 3: Write minimal implementation** 143 + 144 + Add to `src/pds.js` after the error helper (around line 30): 145 + 146 + ```javascript 147 + // === MIME TYPE SNIFFING === 148 + // Detect file type from magic bytes (first 12 bytes) 149 + 150 + /** 151 + * Sniff MIME type from file magic bytes 152 + * @param {Uint8Array|ArrayBuffer} bytes - File bytes (only first 12 needed) 153 + * @returns {string|null} Detected MIME type or null if unknown 154 + */ 155 + export function sniffMimeType(bytes) { 156 + const arr = new Uint8Array(bytes.slice(0, 12)); 157 + 158 + // JPEG: FF D8 FF 159 + if (arr[0] === 0xff && arr[1] === 0xd8 && arr[2] === 0xff) { 160 + return 'image/jpeg'; 161 + } 162 + 163 + // PNG: 89 50 4E 47 0D 0A 1A 0A 164 + if ( 165 + arr[0] === 0x89 && 166 + arr[1] === 0x50 && 167 + arr[2] === 0x4e && 168 + arr[3] === 0x47 && 169 + arr[4] === 0x0d && 170 + arr[5] === 0x0a && 171 + arr[6] === 0x1a && 172 + arr[7] === 0x0a 173 + ) { 174 + return 'image/png'; 175 + } 176 + 177 + // GIF: 47 49 46 38 (GIF8) 178 + if ( 179 + arr[0] === 0x47 && 180 + arr[1] === 0x49 && 181 + arr[2] === 0x46 && 182 + arr[3] === 0x38 183 + ) { 184 + return 'image/gif'; 185 + } 186 + 187 + // WebP: RIFF....WEBP 188 + if ( 189 + arr[0] === 0x52 && 190 + arr[1] === 0x49 && 191 + arr[2] === 0x46 && 192 + arr[3] === 0x46 && 193 + arr[8] === 0x57 && 194 + arr[9] === 0x45 && 195 + arr[10] === 0x42 && 196 + arr[11] === 0x50 197 + ) { 198 + return 'image/webp'; 199 + } 200 + 201 + // MP4/MOV: ....ftyp at byte 4 202 + if ( 203 + arr[4] === 0x66 && 204 + arr[5] === 0x74 && 205 + arr[6] === 0x79 && 206 + arr[7] === 0x70 207 + ) { 208 + return 'video/mp4'; 209 + } 210 + 211 + return null; 212 + } 213 + ``` 214 + 215 + **Step 4: Run test to verify it passes** 216 + 217 + Run: `npm test` 218 + Expected: PASS 219 + 220 + **Step 5: Commit** 221 + 222 + ```bash 223 + git add src/pds.js test/pds.test.js 224 + git commit -m "feat: add MIME type sniffing from magic bytes" 225 + ``` 226 + 227 + --- 228 + 229 + ## Task 4: Implement Blob Ref Detection 230 + 231 + **Files:** 232 + - Modify: `src/pds.js` (add after sniffMimeType) 233 + - Test: `test/pds.test.js` 234 + 235 + **Step 1: Write the failing test** 236 + 237 + Add to `test/pds.test.js`: 238 + 239 + ```javascript 240 + import { 241 + // ... existing imports ... 242 + findBlobRefs, 243 + } from '../src/pds.js'; 244 + 245 + describe('Blob Ref Detection', () => { 246 + test('finds blob ref in simple object', () => { 247 + const record = { 248 + $type: 'app.bsky.feed.post', 249 + text: 'Hello', 250 + embed: { 251 + $type: 'app.bsky.embed.images', 252 + images: [ 253 + { 254 + image: { 255 + $type: 'blob', 256 + ref: { $link: 'bafkreiabc123' }, 257 + mimeType: 'image/jpeg', 258 + size: 1234, 259 + }, 260 + alt: 'test image', 261 + }, 262 + ], 263 + }, 264 + }; 265 + const refs = findBlobRefs(record); 266 + assert.deepStrictEqual(refs, ['bafkreiabc123']); 267 + }); 268 + 269 + test('finds multiple blob refs', () => { 270 + const record = { 271 + images: [ 272 + { image: { $type: 'blob', ref: { $link: 'cid1' }, mimeType: 'image/png', size: 100 } }, 273 + { image: { $type: 'blob', ref: { $link: 'cid2' }, mimeType: 'image/png', size: 200 } }, 274 + ], 275 + }; 276 + const refs = findBlobRefs(record); 277 + assert.deepStrictEqual(refs, ['cid1', 'cid2']); 278 + }); 279 + 280 + test('returns empty array when no blobs', () => { 281 + const record = { text: 'Hello world', count: 42 }; 282 + const refs = findBlobRefs(record); 283 + assert.deepStrictEqual(refs, []); 284 + }); 285 + 286 + test('handles null and primitives', () => { 287 + assert.deepStrictEqual(findBlobRefs(null), []); 288 + assert.deepStrictEqual(findBlobRefs('string'), []); 289 + assert.deepStrictEqual(findBlobRefs(42), []); 290 + }); 291 + }); 292 + ``` 293 + 294 + **Step 2: Run test to verify it fails** 295 + 296 + Run: `npm test` 297 + Expected: FAIL with "findBlobRefs is not exported" 298 + 299 + **Step 3: Write minimal implementation** 300 + 301 + Add to `src/pds.js` after sniffMimeType: 302 + 303 + ```javascript 304 + // === BLOB REF DETECTION === 305 + // Recursively find blob references in records 306 + 307 + /** 308 + * Find all blob CID references in a record 309 + * @param {*} obj - Record value to scan 310 + * @param {string[]} refs - Accumulator array (internal) 311 + * @returns {string[]} Array of blob CID strings 312 + */ 313 + export function findBlobRefs(obj, refs = []) { 314 + if (!obj || typeof obj !== 'object') { 315 + return refs; 316 + } 317 + 318 + // Check if this object is a blob ref 319 + if (obj.$type === 'blob' && obj.ref?.$link) { 320 + refs.push(obj.ref.$link); 321 + } 322 + 323 + // Recurse into arrays and objects 324 + if (Array.isArray(obj)) { 325 + for (const item of obj) { 326 + findBlobRefs(item, refs); 327 + } 328 + } else { 329 + for (const value of Object.values(obj)) { 330 + findBlobRefs(value, refs); 331 + } 332 + } 333 + 334 + return refs; 335 + } 336 + ``` 337 + 338 + **Step 4: Run test to verify it passes** 339 + 340 + Run: `npm test` 341 + Expected: PASS 342 + 343 + **Step 5: Commit** 344 + 345 + ```bash 346 + git add src/pds.js test/pds.test.js 347 + git commit -m "feat: add blob ref detection for records" 348 + ``` 349 + 350 + --- 351 + 352 + ## Task 5: Implement uploadBlob Endpoint 353 + 354 + **Files:** 355 + - Modify: `src/pds.js` (add route and handler) 356 + 357 + **Step 1: Add route to pdsRoutes** 358 + 359 + In `pdsRoutes` object (around line 1055), add: 360 + 361 + ```javascript 362 + '/xrpc/com.atproto.repo.uploadBlob': { 363 + method: 'POST', 364 + handler: (pds, req, _url) => pds.handleUploadBlob(req), 365 + }, 366 + ``` 367 + 368 + **Step 2: Add handler method to PersonalDataServer class** 369 + 370 + Add method to the class (after existing handlers): 371 + 372 + ```javascript 373 + async handleUploadBlob(request) { 374 + // Require auth 375 + const authResult = await this.requireAuth(request); 376 + if (authResult instanceof Response) return authResult; 377 + 378 + const did = await this.getDid(); 379 + if (!did) { 380 + return errorResponse('InvalidRequest', 'PDS not initialized', 400); 381 + } 382 + 383 + // Read body as ArrayBuffer 384 + const bodyBytes = await request.arrayBuffer(); 385 + const size = bodyBytes.byteLength; 386 + 387 + // Check size limit (50MB) 388 + const MAX_BLOB_SIZE = 50 * 1024 * 1024; 389 + if (size > MAX_BLOB_SIZE) { 390 + return errorResponse( 391 + 'BlobTooLarge', 392 + `Blob size ${size} exceeds maximum ${MAX_BLOB_SIZE}`, 393 + 400, 394 + ); 395 + } 396 + 397 + // Sniff MIME type, fall back to Content-Type header 398 + const contentType = request.headers.get('Content-Type') || 'application/octet-stream'; 399 + const sniffed = sniffMimeType(bodyBytes); 400 + const mimeType = sniffed || contentType; 401 + 402 + // Compute CID (reuse existing createCid) 403 + const cid = await createCid(new Uint8Array(bodyBytes)); 404 + const cidStr = cidToString(cid); 405 + 406 + // Check if blob already exists 407 + const existing = this.sql 408 + .exec('SELECT cid FROM blob WHERE cid = ?', cidStr) 409 + .toArray(); 410 + 411 + if (existing.length === 0) { 412 + // Upload to R2 413 + const r2Key = `${did}/${cidStr}`; 414 + await this.env.BLOBS.put(r2Key, bodyBytes, { 415 + httpMetadata: { contentType: mimeType }, 416 + }); 417 + 418 + // Insert metadata 419 + const createdAt = new Date().toISOString(); 420 + this.sql.exec( 421 + 'INSERT INTO blob (cid, mimeType, size, createdAt) VALUES (?, ?, ?, ?)', 422 + cidStr, 423 + mimeType, 424 + size, 425 + createdAt, 426 + ); 427 + } 428 + 429 + // Return BlobRef 430 + return Response.json({ 431 + blob: { 432 + $type: 'blob', 433 + ref: { $link: cidStr }, 434 + mimeType, 435 + size, 436 + }, 437 + }); 438 + } 439 + ``` 440 + 441 + **Step 3: Verify deployment** 442 + 443 + Run: `npx wrangler deploy` 444 + 445 + **Step 4: Test manually with curl** 446 + 447 + ```bash 448 + curl -X POST \ 449 + -H "Authorization: Bearer <access-token>" \ 450 + -H "Content-Type: image/png" \ 451 + --data-binary @test-image.png \ 452 + https://your-pds.workers.dev/xrpc/com.atproto.repo.uploadBlob 453 + ``` 454 + 455 + Expected: JSON response with blob ref 456 + 457 + **Step 5: Commit** 458 + 459 + ```bash 460 + git add src/pds.js 461 + git commit -m "feat: implement uploadBlob endpoint with R2 storage" 462 + ``` 463 + 464 + --- 465 + 466 + ## Task 6: Implement getBlob Endpoint 467 + 468 + **Files:** 469 + - Modify: `src/pds.js` (add route and handler) 470 + 471 + **Step 1: Add route to pdsRoutes** 472 + 473 + ```javascript 474 + '/xrpc/com.atproto.sync.getBlob': { 475 + handler: (pds, _req, url) => pds.handleGetBlob(url), 476 + }, 477 + ``` 478 + 479 + **Step 2: Add handler method** 480 + 481 + ```javascript 482 + async handleGetBlob(url) { 483 + const did = url.searchParams.get('did'); 484 + const cid = url.searchParams.get('cid'); 485 + 486 + if (!did || !cid) { 487 + return errorResponse('InvalidRequest', 'missing did or cid parameter', 400); 488 + } 489 + 490 + // Verify DID matches this DO 491 + const myDid = await this.getDid(); 492 + if (did !== myDid) { 493 + return errorResponse('InvalidRequest', 'DID does not match this repo', 400); 494 + } 495 + 496 + // Look up blob metadata 497 + const rows = this.sql 498 + .exec('SELECT mimeType, size FROM blob WHERE cid = ?', cid) 499 + .toArray(); 500 + 501 + if (rows.length === 0) { 502 + return errorResponse('BlobNotFound', 'blob not found', 404); 503 + } 504 + 505 + const { mimeType, size } = rows[0]; 506 + 507 + // Fetch from R2 508 + const r2Key = `${did}/${cid}`; 509 + const object = await this.env.BLOBS.get(r2Key); 510 + 511 + if (!object) { 512 + return errorResponse('BlobNotFound', 'blob not found in storage', 404); 513 + } 514 + 515 + // Return blob with security headers 516 + return new Response(object.body, { 517 + headers: { 518 + 'Content-Type': mimeType, 519 + 'Content-Length': String(size), 520 + 'X-Content-Type-Options': 'nosniff', 521 + 'Content-Security-Policy': "default-src 'none'; sandbox", 522 + 'Cache-Control': 'public, max-age=31536000, immutable', 523 + }, 524 + }); 525 + } 526 + ``` 527 + 528 + **Step 3: Deploy and test** 529 + 530 + Run: `npx wrangler deploy` 531 + 532 + Test: 533 + ```bash 534 + curl "https://your-pds.workers.dev/xrpc/com.atproto.sync.getBlob?did=did:plc:xxx&cid=bafkrei..." 535 + ``` 536 + 537 + **Step 4: Commit** 538 + 539 + ```bash 540 + git add src/pds.js 541 + git commit -m "feat: implement getBlob endpoint" 542 + ``` 543 + 544 + --- 545 + 546 + ## Task 7: Implement listBlobs Endpoint 547 + 548 + **Files:** 549 + - Modify: `src/pds.js` (add route and handler) 550 + 551 + **Step 1: Add route to pdsRoutes** 552 + 553 + ```javascript 554 + '/xrpc/com.atproto.sync.listBlobs': { 555 + handler: (pds, _req, url) => pds.handleListBlobs(url), 556 + }, 557 + ``` 558 + 559 + **Step 2: Add handler method** 560 + 561 + ```javascript 562 + async handleListBlobs(url) { 563 + const did = url.searchParams.get('did'); 564 + const cursor = url.searchParams.get('cursor'); 565 + const limit = Math.min(Number(url.searchParams.get('limit')) || 500, 1000); 566 + 567 + if (!did) { 568 + return errorResponse('InvalidRequest', 'missing did parameter', 400); 569 + } 570 + 571 + // Verify DID matches this DO 572 + const myDid = await this.getDid(); 573 + if (did !== myDid) { 574 + return errorResponse('InvalidRequest', 'DID does not match this repo', 400); 575 + } 576 + 577 + // Query blobs with pagination 578 + let query = 'SELECT cid, createdAt FROM blob'; 579 + const params = []; 580 + 581 + if (cursor) { 582 + query += ' WHERE createdAt > ?'; 583 + params.push(cursor); 584 + } 585 + 586 + query += ' ORDER BY createdAt ASC LIMIT ?'; 587 + params.push(limit + 1); // Fetch one extra to detect if there's more 588 + 589 + const rows = this.sql.exec(query, ...params).toArray(); 590 + 591 + // Determine if there's a next page 592 + let nextCursor = null; 593 + if (rows.length > limit) { 594 + rows.pop(); // Remove the extra row 595 + nextCursor = rows[rows.length - 1].createdAt; 596 + } 597 + 598 + return Response.json({ 599 + cids: rows.map((r) => r.cid), 600 + cursor: nextCursor, 601 + }); 602 + } 603 + ``` 604 + 605 + **Step 3: Deploy and test** 606 + 607 + Run: `npx wrangler deploy` 608 + 609 + Test: 610 + ```bash 611 + curl "https://your-pds.workers.dev/xrpc/com.atproto.sync.listBlobs?did=did:plc:xxx" 612 + ``` 613 + 614 + **Step 4: Commit** 615 + 616 + ```bash 617 + git add src/pds.js 618 + git commit -m "feat: implement listBlobs endpoint" 619 + ``` 620 + 621 + --- 622 + 623 + ## Task 8: Integrate Blob Association with createRecord 624 + 625 + **Files:** 626 + - Modify: `src/pds.js:1253` (createRecord method) 627 + 628 + **Step 1: Add blob association after record storage** 629 + 630 + In `createRecord` method, after storing the record in the `records` table (around line 1280), add: 631 + 632 + ```javascript 633 + // Associate blobs with this record 634 + const blobRefs = findBlobRefs(record); 635 + for (const blobCid of blobRefs) { 636 + // Verify blob exists 637 + const blobExists = this.sql 638 + .exec('SELECT cid FROM blob WHERE cid = ?', blobCid) 639 + .toArray(); 640 + 641 + if (blobExists.length === 0) { 642 + throw new Error(`BlobNotFound: ${blobCid}`); 643 + } 644 + 645 + // Create association 646 + this.sql.exec( 647 + 'INSERT OR IGNORE INTO record_blob (blobCid, recordUri) VALUES (?, ?)', 648 + blobCid, 649 + uri, 650 + ); 651 + } 652 + ``` 653 + 654 + **Step 2: Deploy and test** 655 + 656 + Test by uploading a blob, then creating a post that references it: 657 + 658 + ```bash 659 + # Upload blob 660 + BLOB=$(curl -X POST -H "Authorization: Bearer $TOKEN" \ 661 + -H "Content-Type: image/png" --data-binary @test.png \ 662 + https://your-pds.workers.dev/xrpc/com.atproto.repo.uploadBlob) 663 + 664 + echo $BLOB # Get the CID 665 + 666 + # Create post with image 667 + curl -X POST -H "Authorization: Bearer $TOKEN" \ 668 + -H "Content-Type: application/json" \ 669 + https://your-pds.workers.dev/xrpc/com.atproto.repo.createRecord \ 670 + -d '{ 671 + "repo": "did:plc:xxx", 672 + "collection": "app.bsky.feed.post", 673 + "record": { 674 + "$type": "app.bsky.feed.post", 675 + "text": "Hello with image!", 676 + "createdAt": "2026-01-06T12:00:00.000Z", 677 + "embed": { 678 + "$type": "app.bsky.embed.images", 679 + "images": [{ 680 + "image": { 681 + "$type": "blob", 682 + "ref": {"$link": "<cid-from-upload>"}, 683 + "mimeType": "image/png", 684 + "size": 1234 685 + }, 686 + "alt": "test" 687 + }] 688 + } 689 + } 690 + }' 691 + ``` 692 + 693 + **Step 3: Commit** 694 + 695 + ```bash 696 + git add src/pds.js 697 + git commit -m "feat: associate blobs with records on createRecord" 698 + ``` 699 + 700 + --- 701 + 702 + ## Task 9: Implement Blob Cleanup on deleteRecord 703 + 704 + **Files:** 705 + - Modify: `src/pds.js:1391` (deleteRecord method) 706 + 707 + **Step 1: Add blob cleanup after record deletion** 708 + 709 + In `deleteRecord` method, after deleting the record from the `records` table, add: 710 + 711 + ```javascript 712 + // Get blobs associated with this record 713 + const associatedBlobs = this.sql 714 + .exec('SELECT blobCid FROM record_blob WHERE recordUri = ?', uri) 715 + .toArray(); 716 + 717 + // Remove associations for this record 718 + this.sql.exec('DELETE FROM record_blob WHERE recordUri = ?', uri); 719 + 720 + // Check each blob for orphan status and delete if unreferenced 721 + for (const { blobCid } of associatedBlobs) { 722 + const stillReferenced = this.sql 723 + .exec('SELECT 1 FROM record_blob WHERE blobCid = ? LIMIT 1', blobCid) 724 + .toArray(); 725 + 726 + if (stillReferenced.length === 0) { 727 + // Blob is orphaned, delete from R2 and database 728 + const did = await this.getDid(); 729 + await this.env.BLOBS.delete(`${did}/${blobCid}`); 730 + this.sql.exec('DELETE FROM blob WHERE cid = ?', blobCid); 731 + } 732 + } 733 + ``` 734 + 735 + **Step 2: Deploy and test** 736 + 737 + Test by creating a post with an image, then deleting it: 738 + 739 + ```bash 740 + # Delete the post 741 + curl -X POST -H "Authorization: Bearer $TOKEN" \ 742 + -H "Content-Type: application/json" \ 743 + https://your-pds.workers.dev/xrpc/com.atproto.repo.deleteRecord \ 744 + -d '{ 745 + "repo": "did:plc:xxx", 746 + "collection": "app.bsky.feed.post", 747 + "rkey": "<rkey>" 748 + }' 749 + 750 + # Verify blob is gone 751 + curl "https://your-pds.workers.dev/xrpc/com.atproto.sync.listBlobs?did=did:plc:xxx" 752 + ``` 753 + 754 + **Step 3: Commit** 755 + 756 + ```bash 757 + git add src/pds.js 758 + git commit -m "feat: cleanup orphaned blobs on record deletion" 759 + ``` 760 + 761 + --- 762 + 763 + ## Task 10: Implement Orphan Cleanup Alarm 764 + 765 + **Files:** 766 + - Modify: `src/pds.js` (add alarm handler and scheduling) 767 + 768 + **Step 1: Add alarm scheduling in initIdentity** 769 + 770 + In the `initIdentity` method (or after successful init), add: 771 + 772 + ```javascript 773 + // Schedule blob cleanup alarm (runs daily) 774 + const currentAlarm = await this.state.storage.getAlarm(); 775 + if (!currentAlarm) { 776 + await this.state.storage.setAlarm(Date.now() + 24 * 60 * 60 * 1000); 777 + } 778 + ``` 779 + 780 + **Step 2: Add alarm handler to PersonalDataServer class** 781 + 782 + ```javascript 783 + async alarm() { 784 + await this.cleanupOrphanedBlobs(); 785 + // Reschedule for next day 786 + await this.state.storage.setAlarm(Date.now() + 24 * 60 * 60 * 1000); 787 + } 788 + 789 + async cleanupOrphanedBlobs() { 790 + const did = await this.getDid(); 791 + if (!did) return; 792 + 793 + // Find orphans: blobs not in record_blob, older than 24h 794 + const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(); 795 + 796 + const orphans = this.sql 797 + .exec( 798 + `SELECT b.cid FROM blob b 799 + LEFT JOIN record_blob rb ON b.cid = rb.blobCid 800 + WHERE rb.blobCid IS NULL AND b.createdAt < ?`, 801 + cutoff, 802 + ) 803 + .toArray(); 804 + 805 + for (const { cid } of orphans) { 806 + await this.env.BLOBS.delete(`${did}/${cid}`); 807 + this.sql.exec('DELETE FROM blob WHERE cid = ?', cid); 808 + } 809 + 810 + if (orphans.length > 0) { 811 + console.log(`Cleaned up ${orphans.length} orphaned blobs`); 812 + } 813 + } 814 + ``` 815 + 816 + **Step 3: Deploy** 817 + 818 + Run: `npx wrangler deploy` 819 + 820 + **Step 4: Commit** 821 + 822 + ```bash 823 + git add src/pds.js 824 + git commit -m "feat: add DO alarm for orphaned blob cleanup" 825 + ``` 826 + 827 + --- 828 + 829 + ## Task 11: Update README 830 + 831 + **Files:** 832 + - Modify: `README.md` 833 + 834 + **Step 1: Update feature checklist** 835 + 836 + Change: 837 + ```markdown 838 + - [ ] Blob storage (uploadBlob, getBlob, listBlobs) 839 + ``` 840 + 841 + To: 842 + ```markdown 843 + - [x] Blob storage (uploadBlob, getBlob, listBlobs) 844 + ``` 845 + 846 + **Step 2: Add blob configuration section** 847 + 848 + Add under configuration: 849 + 850 + ```markdown 851 + ### Blob Storage 852 + 853 + Blobs (images, videos) are stored in Cloudflare R2: 854 + 855 + 1. Create an R2 bucket: `npx wrangler r2 bucket create pds-blobs` 856 + 2. The binding is already configured in `wrangler.toml` 857 + 858 + Supported formats: JPEG, PNG, GIF, WebP, MP4 859 + Max size: 50MB 860 + Orphaned blobs are automatically cleaned up after 24 hours. 861 + ``` 862 + 863 + **Step 3: Commit** 864 + 865 + ```bash 866 + git add README.md 867 + git commit -m "docs: update README with blob storage feature" 868 + ``` 869 + 870 + --- 871 + 872 + ## Summary 873 + 874 + | Task | Description | Files Modified | 875 + |------|-------------|----------------| 876 + | 1 | Add R2 bucket binding | `wrangler.toml` | 877 + | 2 | Add blob database schema | `src/pds.js` | 878 + | 3 | Implement MIME sniffing | `src/pds.js`, `test/pds.test.js` | 879 + | 4 | Implement blob ref detection | `src/pds.js`, `test/pds.test.js` | 880 + | 5 | Implement uploadBlob endpoint | `src/pds.js` | 881 + | 6 | Implement getBlob endpoint | `src/pds.js` | 882 + | 7 | Implement listBlobs endpoint | `src/pds.js` | 883 + | 8 | Integrate blob association | `src/pds.js` | 884 + | 9 | Cleanup blobs on delete | `src/pds.js` | 885 + | 10 | Add orphan cleanup alarm | `src/pds.js` | 886 + | 11 | Update README | `README.md` | 887 + 888 + **Estimated additions:** ~250 lines to `src/pds.js`, ~60 lines to `test/pds.test.js`
+2 -1
package.json
··· 4 4 "private": true, 5 5 "type": "module", 6 6 "scripts": { 7 - "dev": "wrangler dev", 7 + "dev": "wrangler dev --persist-to .wrangler/state", 8 + "dev:remote": "wrangler dev --remote", 8 9 "deploy": "wrangler deploy", 9 10 "test": "node --test test/*.test.js", 10 11 "test:e2e": "./test/e2e.sh",
+440 -15
src/pds.js
··· 28 28 return Response.json({ error, message }, { status }); 29 29 } 30 30 31 + // === MIME TYPE SNIFFING === 32 + // Detect file type from magic bytes (first 12 bytes) 33 + // Reference: https://en.wikipedia.org/wiki/List_of_file_signatures 34 + 35 + /** 36 + * Sniff MIME type from file magic bytes 37 + * @param {Uint8Array|ArrayBuffer} bytes - File bytes (only first 12 needed) 38 + * @returns {string|null} Detected MIME type or null if unknown 39 + */ 40 + export function sniffMimeType(bytes) { 41 + const arr = new Uint8Array(bytes.slice(0, 12)); 42 + 43 + // JPEG: FF D8 FF 44 + if (arr[0] === 0xff && arr[1] === 0xd8 && arr[2] === 0xff) { 45 + return 'image/jpeg'; 46 + } 47 + 48 + // PNG: 89 50 4E 47 0D 0A 1A 0A 49 + if ( 50 + arr[0] === 0x89 && 51 + arr[1] === 0x50 && 52 + arr[2] === 0x4e && 53 + arr[3] === 0x47 && 54 + arr[4] === 0x0d && 55 + arr[5] === 0x0a && 56 + arr[6] === 0x1a && 57 + arr[7] === 0x0a 58 + ) { 59 + return 'image/png'; 60 + } 61 + 62 + // GIF: 47 49 46 38 (GIF8) 63 + if ( 64 + arr[0] === 0x47 && 65 + arr[1] === 0x49 && 66 + arr[2] === 0x46 && 67 + arr[3] === 0x38 68 + ) { 69 + return 'image/gif'; 70 + } 71 + 72 + // WebP: RIFF....WEBP 73 + if ( 74 + arr[0] === 0x52 && 75 + arr[1] === 0x49 && 76 + arr[2] === 0x46 && 77 + arr[3] === 0x46 && 78 + arr[8] === 0x57 && 79 + arr[9] === 0x45 && 80 + arr[10] === 0x42 && 81 + arr[11] === 0x50 82 + ) { 83 + return 'image/webp'; 84 + } 85 + 86 + // ISOBMFF container: ....ftyp at byte 4 (MP4, AVIF, HEIC, etc.) 87 + if ( 88 + arr[4] === 0x66 && 89 + arr[5] === 0x74 && 90 + arr[6] === 0x79 && 91 + arr[7] === 0x70 92 + ) { 93 + // Check brand code at bytes 8-11 94 + const brand = String.fromCharCode(arr[8], arr[9], arr[10], arr[11]); 95 + if (brand === 'avif') { 96 + return 'image/avif'; 97 + } 98 + if (brand === 'heic' || brand === 'heix' || brand === 'mif1') { 99 + return 'image/heic'; 100 + } 101 + return 'video/mp4'; 102 + } 103 + 104 + return null; 105 + } 106 + 107 + // === BLOB REF DETECTION === 108 + // Recursively find blob references in records 109 + 110 + /** 111 + * Find all blob CID references in a record 112 + * @param {*} obj - Record value to scan 113 + * @param {string[]} refs - Accumulator array (internal) 114 + * @returns {string[]} Array of blob CID strings 115 + */ 116 + export function findBlobRefs(obj, refs = []) { 117 + if (!obj || typeof obj !== 'object') { 118 + return refs; 119 + } 120 + 121 + // Check if this object is a blob ref 122 + if (obj.$type === 'blob' && obj.ref?.$link) { 123 + refs.push(obj.ref.$link); 124 + } 125 + 126 + // Recurse into arrays and objects 127 + if (Array.isArray(obj)) { 128 + for (const item of obj) { 129 + findBlobRefs(item, refs); 130 + } 131 + } else { 132 + for (const value of Object.values(obj)) { 133 + findBlobRefs(value, refs); 134 + } 135 + } 136 + 137 + return refs; 138 + } 139 + 31 140 // === CRAWLER NOTIFICATION === 32 141 // Notify relays to come crawl us after writes (like official PDS) 33 142 let lastCrawlNotify = 0; ··· 49 158 method: 'POST', 50 159 headers: { 'Content-Type': 'application/json' }, 51 160 body: JSON.stringify({ hostname }), 52 - }).catch((err) => { 53 - console.log('Failed to notify relay:', err.message); 161 + }).catch(() => { 162 + // Silently ignore relay notification failures 54 163 }); 55 164 } 56 165 ··· 286 395 } 287 396 288 397 // === CID GENERATION === 289 - // dag-cbor (0x71) + sha-256 (0x12) + 32 bytes 398 + // CID codec constants 399 + const CODEC_DAG_CBOR = 0x71; 400 + const CODEC_RAW = 0x55; 290 401 291 402 /** 292 - * Create a CIDv1 (dag-cbor + sha-256) from raw bytes 403 + * Create a CIDv1 with SHA-256 hash 293 404 * @param {Uint8Array} bytes - Content to hash 405 + * @param {number} codec - Codec identifier (0x71 for dag-cbor, 0x55 for raw) 294 406 * @returns {Promise<Uint8Array>} CID bytes (36 bytes: version + codec + multihash) 295 407 */ 296 - export async function createCid(bytes) { 408 + async function createCidWithCodec(bytes, codec) { 297 409 const hash = await crypto.subtle.digest('SHA-256', bytes); 298 410 const hashBytes = new Uint8Array(hash); 299 411 300 - // CIDv1: version(1) + codec(dag-cbor=0x71) + multihash(sha256) 412 + // CIDv1: version(1) + codec + multihash(sha256) 301 413 // Multihash: hash-type(0x12) + length(0x20=32) + digest 302 414 const cid = new Uint8Array(2 + 2 + 32); 303 415 cid[0] = 0x01; // CIDv1 304 - cid[1] = 0x71; // dag-cbor codec 416 + cid[1] = codec; 305 417 cid[2] = 0x12; // sha-256 306 418 cid[3] = 0x20; // 32 bytes 307 419 cid.set(hashBytes, 4); 308 420 309 421 return cid; 422 + } 423 + 424 + /** 425 + * Create CID for DAG-CBOR encoded data (records, commits) 426 + * @param {Uint8Array} bytes - DAG-CBOR encoded content 427 + * @returns {Promise<Uint8Array>} CID bytes 428 + */ 429 + export async function createCid(bytes) { 430 + return createCidWithCodec(bytes, CODEC_DAG_CBOR); 431 + } 432 + 433 + /** 434 + * Create CID for raw blob data (images, videos) 435 + * @param {Uint8Array} bytes - Raw binary content 436 + * @returns {Promise<Uint8Array>} CID bytes 437 + */ 438 + export async function createBlobCid(bytes) { 439 + return createCidWithCodec(bytes, CODEC_RAW); 310 440 } 311 441 312 442 /** ··· 1134 1264 '/xrpc/com.atproto.repo.listRecords': { 1135 1265 handler: (pds, _req, url) => pds.handleListRecords(url), 1136 1266 }, 1267 + '/xrpc/com.atproto.repo.uploadBlob': { 1268 + method: 'POST', 1269 + handler: (pds, req, _url) => pds.handleUploadBlob(req), 1270 + }, 1137 1271 '/xrpc/com.atproto.sync.getLatestCommit': { 1138 1272 handler: (pds, _req, _url) => pds.handleGetLatestCommit(), 1139 1273 }, ··· 1145 1279 }, 1146 1280 '/xrpc/com.atproto.sync.getRecord': { 1147 1281 handler: (pds, _req, url) => pds.handleSyncGetRecord(url), 1282 + }, 1283 + '/xrpc/com.atproto.sync.getBlob': { 1284 + handler: (pds, _req, url) => pds.handleGetBlob(url), 1285 + }, 1286 + '/xrpc/com.atproto.sync.listBlobs': { 1287 + handler: (pds, _req, url) => pds.handleListBlobs(url), 1148 1288 }, 1149 1289 '/xrpc/com.atproto.sync.subscribeRepos': { 1150 1290 handler: (pds, req, url) => pds.handleSubscribeRepos(req, url), ··· 1186 1326 evt BLOB NOT NULL 1187 1327 ); 1188 1328 1329 + CREATE TABLE IF NOT EXISTS blob ( 1330 + cid TEXT PRIMARY KEY, 1331 + mimeType TEXT NOT NULL, 1332 + size INTEGER NOT NULL, 1333 + createdAt TEXT NOT NULL 1334 + ); 1335 + 1336 + CREATE TABLE IF NOT EXISTS record_blob ( 1337 + blobCid TEXT NOT NULL, 1338 + recordUri TEXT NOT NULL, 1339 + PRIMARY KEY (blobCid, recordUri) 1340 + ); 1341 + 1342 + CREATE INDEX IF NOT EXISTS idx_record_blob_uri ON record_blob(recordUri); 1343 + 1189 1344 CREATE INDEX IF NOT EXISTS idx_records_collection ON records(collection, rkey); 1190 1345 `); 1191 1346 } ··· 1196 1351 if (handle) { 1197 1352 await this.state.storage.put('handle', handle); 1198 1353 } 1354 + 1355 + // Schedule blob cleanup alarm (runs daily) 1356 + const currentAlarm = await this.state.storage.getAlarm(); 1357 + if (!currentAlarm) { 1358 + await this.state.storage.setAlarm(Date.now() + 24 * 60 * 60 * 1000); 1359 + } 1199 1360 } 1200 1361 1201 1362 async getDid() { ··· 1279 1440 recordBytes, 1280 1441 ); 1281 1442 1443 + // Associate blobs with this record (delete old associations first for updates) 1444 + this.sql.exec('DELETE FROM record_blob WHERE recordUri = ?', uri); 1445 + 1446 + const blobRefs = findBlobRefs(record); 1447 + for (const blobCid of blobRefs) { 1448 + // Verify blob exists 1449 + const blobExists = this.sql 1450 + .exec('SELECT cid FROM blob WHERE cid = ?', blobCid) 1451 + .toArray(); 1452 + 1453 + if (blobExists.length === 0) { 1454 + throw new Error(`BlobNotFound: ${blobCid}`); 1455 + } 1456 + 1457 + // Create association 1458 + this.sql.exec( 1459 + 'INSERT INTO record_blob (blobCid, recordUri) VALUES (?, ?)', 1460 + blobCid, 1461 + uri, 1462 + ); 1463 + } 1464 + 1282 1465 // Rebuild MST 1283 1466 const mst = new MST(this.sql); 1284 1467 const dataRoot = await mst.computeRoot(); ··· 1379 1562 body: JSON.stringify({ ...row, evt: evtArray }), 1380 1563 }), 1381 1564 ) 1382 - .then((r) => r.json()) 1383 - .then((r) => console.log('forward result:', r)) 1384 - .catch((e) => console.log('forward error:', e)); 1565 + .catch(() => {}); // Ignore forward errors 1385 1566 } 1386 1567 } 1387 1568 ··· 1405 1586 // Delete from records table 1406 1587 this.sql.exec(`DELETE FROM records WHERE uri = ?`, uri); 1407 1588 1589 + // Get blobs associated with this record 1590 + const associatedBlobs = this.sql 1591 + .exec('SELECT blobCid FROM record_blob WHERE recordUri = ?', uri) 1592 + .toArray(); 1593 + 1594 + // Remove associations for this record 1595 + this.sql.exec('DELETE FROM record_blob WHERE recordUri = ?', uri); 1596 + 1597 + // Check each blob for orphan status and delete if unreferenced 1598 + for (const { blobCid } of associatedBlobs) { 1599 + const stillReferenced = this.sql 1600 + .exec('SELECT 1 FROM record_blob WHERE blobCid = ? LIMIT 1', blobCid) 1601 + .toArray(); 1602 + 1603 + if (stillReferenced.length === 0) { 1604 + // Blob is orphaned, delete from R2 and database 1605 + await this.env.BLOBS.delete(`${did}/${blobCid}`); 1606 + this.sql.exec('DELETE FROM blob WHERE cid = ?', blobCid); 1607 + } 1608 + } 1609 + 1408 1610 // Rebuild MST 1409 1611 const mst = new MST(this.sql); 1410 1612 const dataRoot = await mst.computeRoot(); ··· 1496 1698 body: JSON.stringify({ ...row, evt: evtArray }), 1497 1699 }), 1498 1700 ) 1499 - .catch((e) => console.log('forward error:', e)); 1701 + .catch(() => {}); // Ignore forward errors 1500 1702 } 1501 1703 } 1502 1704 ··· 1602 1804 async handleForwardEvent(request) { 1603 1805 const evt = await request.json(); 1604 1806 const numSockets = [...this.state.getWebSockets()].length; 1605 - console.log( 1606 - `forward-event: received event seq=${evt.seq}, ${numSockets} connected sockets`, 1607 - ); 1608 1807 this.broadcastEvent({ 1609 1808 seq: evt.seq, 1610 1809 did: evt.did, ··· 2287 2486 }); 2288 2487 } 2289 2488 2489 + async handleUploadBlob(request) { 2490 + // Require auth 2491 + const authHeader = request.headers.get('Authorization'); 2492 + if (!authHeader || !authHeader.startsWith('Bearer ')) { 2493 + return errorResponse( 2494 + 'AuthRequired', 2495 + 'Missing or invalid authorization header', 2496 + 401, 2497 + ); 2498 + } 2499 + 2500 + const token = authHeader.slice(7); 2501 + const jwtSecret = this.env?.JWT_SECRET; 2502 + if (!jwtSecret) { 2503 + return errorResponse( 2504 + 'InternalServerError', 2505 + 'Server not configured for authentication', 2506 + 500, 2507 + ); 2508 + } 2509 + 2510 + try { 2511 + await verifyAccessJwt(token, jwtSecret); 2512 + } catch (err) { 2513 + return errorResponse('InvalidToken', err.message, 401); 2514 + } 2515 + 2516 + const did = await this.getDid(); 2517 + if (!did) { 2518 + return errorResponse('InvalidRequest', 'PDS not initialized', 400); 2519 + } 2520 + 2521 + // Read body as ArrayBuffer 2522 + const bodyBytes = await request.arrayBuffer(); 2523 + const size = bodyBytes.byteLength; 2524 + 2525 + // Check size limits 2526 + if (size === 0) { 2527 + return errorResponse('InvalidRequest', 'Empty blobs are not allowed', 400); 2528 + } 2529 + const MAX_BLOB_SIZE = 50 * 1024 * 1024; 2530 + if (size > MAX_BLOB_SIZE) { 2531 + return errorResponse( 2532 + 'BlobTooLarge', 2533 + `Blob size ${size} exceeds maximum ${MAX_BLOB_SIZE}`, 2534 + 400, 2535 + ); 2536 + } 2537 + 2538 + // Sniff MIME type, fall back to Content-Type header 2539 + const contentType = 2540 + request.headers.get('Content-Type') || 'application/octet-stream'; 2541 + const sniffed = sniffMimeType(bodyBytes); 2542 + const mimeType = sniffed || contentType; 2543 + 2544 + // Compute CID using raw codec for blobs 2545 + const cid = await createBlobCid(new Uint8Array(bodyBytes)); 2546 + const cidStr = cidToString(cid); 2547 + 2548 + // Upload to R2 (idempotent - same CID always has same content) 2549 + const r2Key = `${did}/${cidStr}`; 2550 + await this.env.BLOBS.put(r2Key, bodyBytes, { 2551 + httpMetadata: { contentType: mimeType }, 2552 + }); 2553 + 2554 + // Insert metadata (INSERT OR IGNORE handles concurrent uploads) 2555 + const createdAt = new Date().toISOString(); 2556 + this.sql.exec( 2557 + 'INSERT OR IGNORE INTO blob (cid, mimeType, size, createdAt) VALUES (?, ?, ?, ?)', 2558 + cidStr, 2559 + mimeType, 2560 + size, 2561 + createdAt, 2562 + ); 2563 + 2564 + // Return BlobRef 2565 + return Response.json({ 2566 + blob: { 2567 + $type: 'blob', 2568 + ref: { $link: cidStr }, 2569 + mimeType, 2570 + size, 2571 + }, 2572 + }); 2573 + } 2574 + 2575 + async handleGetBlob(url) { 2576 + const did = url.searchParams.get('did'); 2577 + const cid = url.searchParams.get('cid'); 2578 + 2579 + if (!did || !cid) { 2580 + return errorResponse('InvalidRequest', 'missing did or cid parameter', 400); 2581 + } 2582 + 2583 + // Validate CID format (CIDv1 base32lower: starts with 'b', 59 chars total) 2584 + if (!/^b[a-z2-7]{58}$/.test(cid)) { 2585 + return errorResponse('InvalidRequest', 'invalid CID format', 400); 2586 + } 2587 + 2588 + // Verify DID matches this DO 2589 + const myDid = await this.getDid(); 2590 + if (did !== myDid) { 2591 + return errorResponse('InvalidRequest', 'DID does not match this repo', 400); 2592 + } 2593 + 2594 + // Look up blob metadata 2595 + const rows = this.sql 2596 + .exec('SELECT mimeType, size FROM blob WHERE cid = ?', cid) 2597 + .toArray(); 2598 + 2599 + if (rows.length === 0) { 2600 + return errorResponse('BlobNotFound', 'blob not found', 404); 2601 + } 2602 + 2603 + const { mimeType, size } = rows[0]; 2604 + 2605 + // Fetch from R2 2606 + const r2Key = `${did}/${cid}`; 2607 + const object = await this.env.BLOBS.get(r2Key); 2608 + 2609 + if (!object) { 2610 + return errorResponse('BlobNotFound', 'blob not found in storage', 404); 2611 + } 2612 + 2613 + // Return blob with security headers 2614 + return new Response(object.body, { 2615 + headers: { 2616 + 'Content-Type': mimeType, 2617 + 'Content-Length': String(size), 2618 + 'X-Content-Type-Options': 'nosniff', 2619 + 'Content-Security-Policy': "default-src 'none'; sandbox", 2620 + 'Cache-Control': 'public, max-age=31536000, immutable', 2621 + }, 2622 + }); 2623 + } 2624 + 2625 + async handleListBlobs(url) { 2626 + const did = url.searchParams.get('did'); 2627 + const cursor = url.searchParams.get('cursor'); 2628 + const limit = Math.min(Number(url.searchParams.get('limit')) || 500, 1000); 2629 + 2630 + if (!did) { 2631 + return errorResponse('InvalidRequest', 'missing did parameter', 400); 2632 + } 2633 + 2634 + // Verify DID matches this DO 2635 + const myDid = await this.getDid(); 2636 + if (did !== myDid) { 2637 + return errorResponse('InvalidRequest', 'DID does not match this repo', 400); 2638 + } 2639 + 2640 + // Query blobs with pagination (cursor is createdAt::cid for uniqueness) 2641 + let query = 'SELECT cid, createdAt FROM blob'; 2642 + const params = []; 2643 + 2644 + if (cursor) { 2645 + const [cursorTime, cursorCid] = cursor.split('::'); 2646 + query += ' WHERE (createdAt > ? OR (createdAt = ? AND cid > ?))'; 2647 + params.push(cursorTime, cursorTime, cursorCid); 2648 + } 2649 + 2650 + query += ' ORDER BY createdAt ASC, cid ASC LIMIT ?'; 2651 + params.push(limit + 1); // Fetch one extra to detect if there's more 2652 + 2653 + const rows = this.sql.exec(query, ...params).toArray(); 2654 + 2655 + // Determine if there's a next page 2656 + let nextCursor = null; 2657 + if (rows.length > limit) { 2658 + rows.pop(); // Remove the extra row 2659 + const last = rows[rows.length - 1]; 2660 + nextCursor = `${last.createdAt}::${last.cid}`; 2661 + } 2662 + 2663 + return Response.json({ 2664 + cids: rows.map((r) => r.cid), 2665 + cursor: nextCursor, 2666 + }); 2667 + } 2668 + 2290 2669 handleSubscribeRepos(request, url) { 2291 2670 const upgradeHeader = request.headers.get('Upgrade'); 2292 2671 if (upgradeHeader !== 'websocket') { ··· 2332 2711 2333 2712 return errorResponse('NotFound', 'not found', 404); 2334 2713 } 2714 + 2715 + async alarm() { 2716 + await this.cleanupOrphanedBlobs(); 2717 + // Reschedule for next day 2718 + await this.state.storage.setAlarm(Date.now() + 24 * 60 * 60 * 1000); 2719 + } 2720 + 2721 + async cleanupOrphanedBlobs() { 2722 + const did = await this.getDid(); 2723 + if (!did) return; 2724 + 2725 + // Find orphans: blobs not in record_blob, older than 24h 2726 + const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(); 2727 + 2728 + const orphans = this.sql 2729 + .exec( 2730 + `SELECT b.cid FROM blob b 2731 + LEFT JOIN record_blob rb ON b.cid = rb.blobCid 2732 + WHERE rb.blobCid IS NULL AND b.createdAt < ?`, 2733 + cutoff, 2734 + ) 2735 + .toArray(); 2736 + 2737 + for (const { cid } of orphans) { 2738 + await this.env.BLOBS.delete(`${did}/${cid}`); 2739 + this.sql.exec('DELETE FROM blob WHERE cid = ?', cid); 2740 + } 2741 + 2742 + } 2335 2743 } 2336 2744 2337 2745 const corsHeaders = { ··· 2430 2838 ), 2431 2839 }; 2432 2840 } 2841 + } 2842 + 2843 + async function handleAuthenticatedBlobUpload(request, env) { 2844 + const auth = await requireAuth(request, env); 2845 + if (auth.error) return auth.error; 2846 + 2847 + // Route to the user's DO based on their DID from the token 2848 + const id = env.PDS.idFromName(auth.did); 2849 + const pds = env.PDS.get(id); 2850 + return pds.fetch(request); 2433 2851 } 2434 2852 2435 2853 async function handleAuthenticatedRepoWrite(request, env) { ··· 2626 3044 url.pathname === '/xrpc/com.atproto.sync.getLatestCommit' || 2627 3045 url.pathname === '/xrpc/com.atproto.sync.getRepoStatus' || 2628 3046 url.pathname === '/xrpc/com.atproto.sync.getRepo' || 2629 - url.pathname === '/xrpc/com.atproto.sync.getRecord' 3047 + url.pathname === '/xrpc/com.atproto.sync.getRecord' || 3048 + url.pathname === '/xrpc/com.atproto.sync.getBlob' || 3049 + url.pathname === '/xrpc/com.atproto.sync.listBlobs' 2630 3050 ) { 2631 3051 const did = url.searchParams.get('did'); 2632 3052 if (!did) { ··· 2635 3055 const id = env.PDS.idFromName(did); 2636 3056 const pds = env.PDS.get(id); 2637 3057 return pds.fetch(request); 3058 + } 3059 + 3060 + // Blob upload endpoint (binary body, uses DID from token) 3061 + if (url.pathname === '/xrpc/com.atproto.repo.uploadBlob') { 3062 + return handleAuthenticatedBlobUpload(request, env); 2638 3063 } 2639 3064 2640 3065 // Authenticated repo write endpoints
+84 -3
test/e2e.sh
··· 3 3 set -e 4 4 5 5 BASE="http://localhost:8787" 6 - DID="did:plc:c6vxslynzebnlk5kw2orx37o" 6 + # Generate unique test DID (or use env var for consistency) 7 + DID="${TEST_DID:-did:plc:test$(openssl rand -hex 8)}" 7 8 8 9 # Helper for colored output 9 10 pass() { echo "✓ $1"; } ··· 23 24 } 24 25 trap cleanup EXIT 25 26 26 - # Start wrangler dev 27 + # Start wrangler dev with local R2 persistence 27 28 echo "Starting wrangler dev..." 28 - npx wrangler dev --port 8787 >/dev/null 2>&1 & 29 + npx wrangler dev --port 8787 --persist-to .wrangler/state >/dev/null 2>&1 & 29 30 WRANGLER_PID=$! 30 31 31 32 # Wait for server to be ready ··· 202 203 # Non-existent record returns 404 203 204 STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$BASE/xrpc/com.atproto.repo.getRecord?repo=$DID&collection=app.bsky.feed.post&rkey=nonexistent") 204 205 [ "$STATUS" = "400" ] || [ "$STATUS" = "404" ] && pass "Non-existent record error" || fail "Non-existent record should error" 206 + 207 + # Blob tests 208 + echo 209 + echo "Testing blob endpoints..." 210 + 211 + # Create a minimal valid PNG (1x1 transparent pixel) 212 + # PNG signature + IHDR + IDAT + IEND 213 + PNG_FILE=$(mktemp) 214 + printf '\x89PNG\r\n\x1a\n' > "$PNG_FILE" 215 + printf '\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89' >> "$PNG_FILE" 216 + printf '\x00\x00\x00\nIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01\r\n-\xb4' >> "$PNG_FILE" 217 + printf '\x00\x00\x00\x00IEND\xaeB`\x82' >> "$PNG_FILE" 218 + 219 + # uploadBlob requires auth 220 + STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/xrpc/com.atproto.repo.uploadBlob" \ 221 + -H "Content-Type: image/png" \ 222 + --data-binary @"$PNG_FILE") 223 + [ "$STATUS" = "401" ] && pass "uploadBlob rejects without auth" || fail "uploadBlob should require auth" 224 + 225 + # uploadBlob works with auth 226 + BLOB_RESULT=$(curl -sf -X POST "$BASE/xrpc/com.atproto.repo.uploadBlob" \ 227 + -H "Authorization: Bearer $TOKEN" \ 228 + -H "Content-Type: image/png" \ 229 + --data-binary @"$PNG_FILE") 230 + BLOB_CID=$(echo "$BLOB_RESULT" | jq -r '.blob.ref."$link"') 231 + BLOB_MIME=$(echo "$BLOB_RESULT" | jq -r '.blob.mimeType') 232 + [ "$BLOB_CID" != "null" ] && [ -n "$BLOB_CID" ] && pass "uploadBlob returns CID" || fail "uploadBlob" 233 + [ "$BLOB_MIME" = "image/png" ] && pass "uploadBlob detects PNG mime type" || fail "uploadBlob mime detection" 234 + 235 + # listBlobs shows the uploaded blob 236 + curl -sf "$BASE/xrpc/com.atproto.sync.listBlobs?did=$DID" | 237 + jq -e ".cids | index(\"$BLOB_CID\")" >/dev/null && pass "listBlobs includes uploaded blob" || fail "listBlobs" 238 + 239 + # getBlob retrieves the blob 240 + BLOB_SIZE=$(curl -sf "$BASE/xrpc/com.atproto.sync.getBlob?did=$DID&cid=$BLOB_CID" | wc -c) 241 + [ "$BLOB_SIZE" -gt 0 ] && pass "getBlob retrieves blob data" || fail "getBlob" 242 + 243 + # getBlob returns correct headers 244 + BLOB_HEADERS=$(curl -sI "$BASE/xrpc/com.atproto.sync.getBlob?did=$DID&cid=$BLOB_CID") 245 + echo "$BLOB_HEADERS" | grep -qi "content-type: image/png" && pass "getBlob Content-Type header" || fail "getBlob Content-Type" 246 + echo "$BLOB_HEADERS" | grep -qi "x-content-type-options: nosniff" && pass "getBlob security headers" || fail "getBlob security headers" 247 + 248 + # getBlob rejects wrong DID 249 + STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$BASE/xrpc/com.atproto.sync.getBlob?did=did:plc:wrongdid&cid=$BLOB_CID") 250 + [ "$STATUS" = "400" ] && pass "getBlob rejects wrong DID" || fail "getBlob should reject wrong DID" 251 + 252 + # getBlob returns 400 for invalid CID format 253 + STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$BASE/xrpc/com.atproto.sync.getBlob?did=$DID&cid=invalid") 254 + [ "$STATUS" = "400" ] && pass "getBlob rejects invalid CID format" || fail "getBlob should reject invalid CID" 255 + 256 + # getBlob returns 404 for non-existent blob (valid format CID - 59 chars) 257 + STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$BASE/xrpc/com.atproto.sync.getBlob?did=$DID&cid=bafkreiaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") 258 + [ "$STATUS" = "404" ] && pass "getBlob 404 for missing blob" || fail "getBlob should 404" 259 + 260 + # Create a record with blob reference 261 + BLOB_POST=$(curl -sf -X POST "$BASE/xrpc/com.atproto.repo.createRecord" \ 262 + -H "Authorization: Bearer $TOKEN" \ 263 + -H "Content-Type: application/json" \ 264 + -d "{\"repo\":\"$DID\",\"collection\":\"app.bsky.feed.post\",\"record\":{\"text\":\"post with image\",\"createdAt\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\",\"embed\":{\"\$type\":\"app.bsky.embed.images\",\"images\":[{\"image\":{\"\$type\":\"blob\",\"ref\":{\"\$link\":\"$BLOB_CID\"},\"mimeType\":\"image/png\",\"size\":$(wc -c < "$PNG_FILE")},\"alt\":\"test\"}]}}}") 265 + BLOB_POST_URI=$(echo "$BLOB_POST" | jq -r '.uri') 266 + BLOB_POST_RKEY=$(echo "$BLOB_POST_URI" | sed 's|.*/||') 267 + [ "$BLOB_POST_URI" != "null" ] && [ -n "$BLOB_POST_URI" ] && pass "createRecord with blob ref" || fail "createRecord with blob" 268 + 269 + # Blob still exists after record creation 270 + curl -sf "$BASE/xrpc/com.atproto.sync.listBlobs?did=$DID" | 271 + jq -e ".cids | index(\"$BLOB_CID\")" >/dev/null && pass "blob persists after record creation" || fail "blob should persist" 272 + 273 + # Delete the record with blob 274 + curl -sf -X POST "$BASE/xrpc/com.atproto.repo.deleteRecord" \ 275 + -H "Authorization: Bearer $TOKEN" \ 276 + -H "Content-Type: application/json" \ 277 + -d "{\"repo\":\"$DID\",\"collection\":\"app.bsky.feed.post\",\"rkey\":\"$BLOB_POST_RKEY\"}" >/dev/null && 278 + pass "deleteRecord with blob" || fail "deleteRecord with blob" 279 + 280 + # Blob should be cleaned up (orphaned) 281 + BLOB_COUNT=$(curl -sf "$BASE/xrpc/com.atproto.sync.listBlobs?did=$DID" | jq '.cids | length') 282 + [ "$BLOB_COUNT" = "0" ] && pass "orphaned blob cleaned up on delete" || fail "blob should be cleaned up" 283 + 284 + # Clean up temp file 285 + rm -f "$PNG_FILE" 205 286 206 287 # Cleanup: delete the test record 207 288 curl -sf -X POST "$BASE/xrpc/com.atproto.repo.deleteRecord" \
+145 -1
test/pds.test.js
··· 12 12 cidToString, 13 13 createAccessJwt, 14 14 createCid, 15 + createBlobCid, 15 16 createRefreshJwt, 16 17 createTid, 18 + findBlobRefs, 17 19 generateKeyPair, 18 20 getKeyDepth, 19 21 hexToBytes, 20 22 importPrivateKey, 21 23 sign, 24 + sniffMimeType, 22 25 varint, 23 26 verifyAccessJwt, 24 27 verifyRefreshJwt, ··· 127 130 }); 128 131 129 132 describe('CID Generation', () => { 130 - test('creates CIDv1 with dag-cbor codec', async () => { 133 + test('createCid uses dag-cbor codec', async () => { 131 134 const data = cborEncode({ test: 'data' }); 132 135 const cid = await createCid(data); 133 136 ··· 136 139 assert.strictEqual(cid[1], 0x71); // dag-cbor 137 140 assert.strictEqual(cid[2], 0x12); // sha-256 138 141 assert.strictEqual(cid[3], 0x20); // 32 bytes 142 + }); 143 + 144 + test('createBlobCid uses raw codec', async () => { 145 + const data = new Uint8Array([0xff, 0xd8, 0xff, 0xe0]); // JPEG magic bytes 146 + const cid = await createBlobCid(data); 147 + 148 + assert.strictEqual(cid.length, 36); 149 + assert.strictEqual(cid[0], 0x01); // CIDv1 150 + assert.strictEqual(cid[1], 0x55); // raw codec 151 + assert.strictEqual(cid[2], 0x12); // sha-256 152 + assert.strictEqual(cid[3], 0x20); // 32 bytes 153 + }); 154 + 155 + test('same bytes produce different CIDs with different codecs', async () => { 156 + const data = new Uint8Array([1, 2, 3, 4]); 157 + const dagCborCid = cidToString(await createCid(data)); 158 + const rawCid = cidToString(await createBlobCid(data)); 159 + 160 + assert.notStrictEqual(dagCborCid, rawCid); 139 161 }); 140 162 141 163 test('cidToString returns base32lower with b prefix', async () => { ··· 572 594 ); 573 595 }); 574 596 }); 597 + 598 + describe('MIME Type Sniffing', () => { 599 + test('detects JPEG', () => { 600 + const bytes = new Uint8Array([0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10]); 601 + assert.strictEqual(sniffMimeType(bytes), 'image/jpeg'); 602 + }); 603 + 604 + test('detects PNG', () => { 605 + const bytes = new Uint8Array([ 606 + 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 607 + ]); 608 + assert.strictEqual(sniffMimeType(bytes), 'image/png'); 609 + }); 610 + 611 + test('detects GIF', () => { 612 + const bytes = new Uint8Array([0x47, 0x49, 0x46, 0x38, 0x39, 0x61]); 613 + assert.strictEqual(sniffMimeType(bytes), 'image/gif'); 614 + }); 615 + 616 + test('detects WebP', () => { 617 + const bytes = new Uint8Array([ 618 + 0x52, 0x49, 0x46, 0x46, // RIFF 619 + 0x00, 0x00, 0x00, 0x00, // size (ignored) 620 + 0x57, 0x45, 0x42, 0x50, // WEBP 621 + ]); 622 + assert.strictEqual(sniffMimeType(bytes), 'image/webp'); 623 + }); 624 + 625 + test('detects MP4', () => { 626 + const bytes = new Uint8Array([ 627 + 0x00, 0x00, 0x00, 0x18, // size 628 + 0x66, 0x74, 0x79, 0x70, // ftyp 629 + 0x69, 0x73, 0x6f, 0x6d, // isom brand 630 + ]); 631 + assert.strictEqual(sniffMimeType(bytes), 'video/mp4'); 632 + }); 633 + 634 + test('detects AVIF', () => { 635 + const bytes = new Uint8Array([ 636 + 0x00, 0x00, 0x00, 0x1c, // size 637 + 0x66, 0x74, 0x79, 0x70, // ftyp 638 + 0x61, 0x76, 0x69, 0x66, // avif brand 639 + ]); 640 + assert.strictEqual(sniffMimeType(bytes), 'image/avif'); 641 + }); 642 + 643 + test('detects HEIC', () => { 644 + const bytes = new Uint8Array([ 645 + 0x00, 0x00, 0x00, 0x18, // size 646 + 0x66, 0x74, 0x79, 0x70, // ftyp 647 + 0x68, 0x65, 0x69, 0x63, // heic brand 648 + ]); 649 + assert.strictEqual(sniffMimeType(bytes), 'image/heic'); 650 + }); 651 + 652 + test('returns null for unknown', () => { 653 + const bytes = new Uint8Array([0x00, 0x01, 0x02, 0x03]); 654 + assert.strictEqual(sniffMimeType(bytes), null); 655 + }); 656 + }); 657 + 658 + describe('Blob Ref Detection', () => { 659 + test('finds blob ref in simple object', () => { 660 + const record = { 661 + $type: 'app.bsky.feed.post', 662 + text: 'Hello', 663 + embed: { 664 + $type: 'app.bsky.embed.images', 665 + images: [ 666 + { 667 + image: { 668 + $type: 'blob', 669 + ref: { $link: 'bafkreiabc123' }, 670 + mimeType: 'image/jpeg', 671 + size: 1234, 672 + }, 673 + alt: 'test image', 674 + }, 675 + ], 676 + }, 677 + }; 678 + const refs = findBlobRefs(record); 679 + assert.deepStrictEqual(refs, ['bafkreiabc123']); 680 + }); 681 + 682 + test('finds multiple blob refs', () => { 683 + const record = { 684 + images: [ 685 + { 686 + image: { 687 + $type: 'blob', 688 + ref: { $link: 'cid1' }, 689 + mimeType: 'image/png', 690 + size: 100, 691 + }, 692 + }, 693 + { 694 + image: { 695 + $type: 'blob', 696 + ref: { $link: 'cid2' }, 697 + mimeType: 'image/png', 698 + size: 200, 699 + }, 700 + }, 701 + ], 702 + }; 703 + const refs = findBlobRefs(record); 704 + assert.deepStrictEqual(refs, ['cid1', 'cid2']); 705 + }); 706 + 707 + test('returns empty array when no blobs', () => { 708 + const record = { text: 'Hello world', count: 42 }; 709 + const refs = findBlobRefs(record); 710 + assert.deepStrictEqual(refs, []); 711 + }); 712 + 713 + test('handles null and primitives', () => { 714 + assert.deepStrictEqual(findBlobRefs(null), []); 715 + assert.deepStrictEqual(findBlobRefs('string'), []); 716 + assert.deepStrictEqual(findBlobRefs(42), []); 717 + }); 718 + });
+5 -1
wrangler.toml
··· 8 8 9 9 [[migrations]] 10 10 tag = "v1" 11 - new_sqlite_classes = ["PersonalDataServer"] 11 + new_sqlite_classes = [ "PersonalDataServer" ] 12 + 13 + [[r2_buckets]] 14 + binding = "BLOBS" 15 + bucket_name = "pds-blobs"