A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

fix: use proper AT Protocol getBlob endpoint

Replace CDN URL fetching with com.atproto.sync.getBlob XRPC endpoint.
Works for all blob types (posts, avatars, banners) via PDS.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+11 -52
src
blobs
+11 -52
src/blobs/processor.ts
··· 112 return { did, type: 'post' }; 113 } 114 115 - private getBlobUrls(did: string, cid: string, type: 'post' | 'avatar' | 'banner'): { thumbnail: string; fullsize: string } { 116 - if (type === 'avatar') { 117 - return { 118 - thumbnail: `https://cdn.bsky.app/img/avatar/plain/${did}/${cid}@jpeg`, 119 - fullsize: `https://cdn.bsky.app/img/avatar/plain/${did}/${cid}@jpeg`, 120 - }; 121 - } else if (type === 'banner') { 122 - return { 123 - thumbnail: `https://cdn.bsky.app/img/banner/plain/${did}/${cid}@jpeg`, 124 - fullsize: `https://cdn.bsky.app/img/banner/plain/${did}/${cid}@jpeg`, 125 - }; 126 - } else { 127 - return { 128 - thumbnail: `https://cdn.bsky.app/img/feed_thumbnail/plain/${did}/${cid}@jpeg`, 129 - fullsize: `https://cdn.bsky.app/img/feed_fullsize/plain/${did}/${cid}@jpeg`, 130 - }; 131 - } 132 - } 133 - 134 private async processBlob( 135 postUri: string, 136 ref: BlobReference ··· 145 } 146 147 const { did, type } = this.parseBlobUri(postUri); 148 - const urls = this.getBlobUrls(did, ref.cid, type); 149 150 try { 151 - const response = await fetch(urls.thumbnail, { method: "HEAD" }); 152 153 if (!response.ok) { 154 logger.warn( 155 - { postUri, cid: ref.cid, status: response.status }, 156 - "Failed to fetch blob metadata" 157 ); 158 return; 159 } 160 161 - let blobData: Buffer | null = null; 162 - let storagePath: string | undefined; 163 164 if (this.storage && config.blobs.hydrateBlobs) { 165 - const fullResponse = await fetch(urls.fullsize); 166 - 167 - if (fullResponse.ok) { 168 - blobData = Buffer.from( 169 - await fullResponse.arrayBuffer() 170 - ); 171 - storagePath = await this.storage.store( 172 - ref.cid, 173 - blobData, 174 - ref.mimeType 175 - ); 176 - } 177 - } else { 178 - const thumbResponse = await fetch(urls.thumbnail); 179 - 180 - if (thumbResponse.ok) { 181 - blobData = Buffer.from( 182 - await thumbResponse.arrayBuffer() 183 - ); 184 - } 185 - } 186 - 187 - if (!blobData) { 188 - logger.warn( 189 - { postUri, cid: ref.cid }, 190 - "Could not fetch blob data" 191 ); 192 - return; 193 } 194 195 const hashes = await computeBlobHashes(
··· 112 return { did, type: 'post' }; 113 } 114 115 private async processBlob( 116 postUri: string, 117 ref: BlobReference ··· 126 } 127 128 const { did, type } = this.parseBlobUri(postUri); 129 + const pds = `https://${config.bsky.pds}`; 130 + const blobUrl = `${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${ref.cid}`; 131 132 try { 133 + const response = await fetch(blobUrl); 134 135 if (!response.ok) { 136 logger.warn( 137 + { postUri, cid: ref.cid, status: response.status, did }, 138 + "Failed to fetch blob" 139 ); 140 return; 141 } 142 143 + const blobData = Buffer.from(await response.arrayBuffer()); 144 145 + let storagePath: string | undefined; 146 if (this.storage && config.blobs.hydrateBlobs) { 147 + storagePath = await this.storage.store( 148 + ref.cid, 149 + blobData, 150 + ref.mimeType 151 ); 152 } 153 154 const hashes = await computeBlobHashes(