A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: use agent.com.atproto.sync.getBlob directly for profile images

- Remove blob processor dependency for profile images
- Fetch blobs directly using AtpAgent.com.atproto.sync.getBlob()
- Compute hashes and store in blobs table directly
- Simpler, cleaner implementation without profile:// URI hack

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+45 -29
+45 -29
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 - import { BlobProcessor } from "../blobs/processor.js"; 4 + import { BlobsRepository } from "../database/blobs.repository.js"; 5 + import { computeBlobHashes } from "../blobs/hasher.js"; 5 6 import { pRateLimit } from "p-ratelimit"; 6 7 import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 7 8 import { logger } from "../logger/index.js"; ··· 10 11 export class ProfileHydrationService { 11 12 private agent: AtpAgent; 12 13 private profilesRepo: ProfilesRepository; 13 - private blobProcessor: BlobProcessor; 14 + private blobsRepo: BlobsRepository; 14 15 private limit: ReturnType<typeof pRateLimit>; 15 16 16 17 constructor(db: Database) { 17 18 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 18 19 this.profilesRepo = new ProfilesRepository(db); 19 - this.blobProcessor = new BlobProcessor(db, this.agent); 20 + this.blobsRepo = new BlobsRepository(db); 20 21 this.limit = pRateLimit({ 21 22 interval: 300000, 22 23 rate: 3000, ··· 136 137 137 138 if (avatarCid && avatarCid !== "") { 138 139 try { 139 - await this.blobProcessor.processBlobs(`profile://${did}/avatar`, [ 140 - { 141 - images: [ 142 - { 143 - image: { 144 - ref: { $link: avatarCid }, 145 - mimeType: "image/jpeg", 146 - }, 147 - }, 148 - ], 149 - }, 150 - ]); 140 + await this.processProfileBlob(did, avatarCid, "avatar"); 151 141 } catch (error) { 152 - logger.warn({ error, did }, "Failed to process avatar blob"); 142 + logger.warn({ error, did, avatarCid }, "Failed to process avatar blob"); 153 143 } 154 144 } 155 145 156 146 if (bannerCid && bannerCid !== "") { 157 147 try { 158 - await this.blobProcessor.processBlobs(`profile://${did}/banner`, [ 159 - { 160 - images: [ 161 - { 162 - image: { 163 - ref: { $link: bannerCid }, 164 - mimeType: "image/jpeg", 165 - }, 166 - }, 167 - ], 168 - }, 169 - ]); 148 + await this.processProfileBlob(did, bannerCid, "banner"); 170 149 } catch (error) { 171 - logger.warn({ error, did }, "Failed to process banner blob"); 150 + logger.warn({ error, did, bannerCid }, "Failed to process banner blob"); 172 151 } 173 152 } 174 153 ··· 177 156 logger.error({ error, did }, "Failed to hydrate profile"); 178 157 throw error; 179 158 } 159 + } 160 + 161 + private async processProfileBlob( 162 + did: string, 163 + cid: string, 164 + type: "avatar" | "banner" 165 + ): Promise<void> { 166 + const postUri = `profile://${did}/${type}`; 167 + const existing = await this.blobsRepo.findByPostUri(postUri); 168 + 169 + if (existing.length > 0 && existing.some(b => b.blob_cid === cid)) { 170 + logger.debug({ did, cid, type }, "Blob already processed, skipping"); 171 + return; 172 + } 173 + 174 + const blobResponse = await this.agent.com.atproto.sync.getBlob({ 175 + did, 176 + cid, 177 + }); 178 + 179 + if (!blobResponse.success) { 180 + logger.warn({ did, cid, type }, "Failed to fetch blob from PDS"); 181 + return; 182 + } 183 + 184 + const blobData = Buffer.from(await blobResponse.data.arrayBuffer()); 185 + const hashes = await computeBlobHashes(blobData, "image/jpeg"); 186 + 187 + await this.blobsRepo.insert({ 188 + post_uri: postUri, 189 + blob_cid: cid, 190 + sha256: hashes.sha256, 191 + phash: hashes.phash, 192 + mimetype: "image/jpeg", 193 + }); 194 + 195 + logger.info({ did, cid, type, sha256: hashes.sha256 }, "Profile blob processed successfully"); 180 196 } 181 197 }