A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

fix: blob deduplication logic using proper CID lookup

The blob processor was using `findBySha256(ref.cid)` which incorrectly
passed a CID to a method expecting a SHA256 hash. This caused the
deduplication check to never find matches, resulting in blobs being
unnecessarily reprocessed regardless of the hydrate_blobs setting.

Changes:
- Add `findByCid` method to BlobsRepository for proper CID lookup
- Update processor to use CID-based deduplication
- When blob exists, reuse hashes but still insert post+blob relationship
- Add index on blob_cid column for query performance
- Add test coverage for new findByCid method

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+34 -2
src
tests
integration
+10 -2
src/blobs/processor.ts
··· 104 104 postUri: string, 105 105 ref: BlobReference 106 106 ): Promise<void> { 107 - const existing = await this.blobsRepo.findBySha256(ref.cid); 107 + const existing = await this.blobsRepo.findByCid(ref.cid); 108 108 if (existing) { 109 + await this.blobsRepo.insert({ 110 + post_uri: postUri, 111 + blob_cid: ref.cid, 112 + sha256: existing.sha256, 113 + phash: existing.phash, 114 + storage_path: existing.storage_path, 115 + mimetype: existing.mimetype, 116 + }); 109 117 logger.debug( 110 118 { postUri, cid: ref.cid }, 111 - "Blob already processed, skipping" 119 + "Blob already processed, reusing hashes" 112 120 ); 113 121 return; 114 122 }
+17
src/database/blobs.repository.ts
··· 103 103 ); 104 104 }); 105 105 } 106 + 107 + async findByCid(cid: string): Promise<Blob | null> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`, 111 + cid, 112 + (err, rows: Blob[]) => { 113 + if (err) { 114 + logger.error({ err, cid }, "Failed to find blob by CID"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows?.[0] || null); 119 + } 120 + ); 121 + }); 122 + } 106 123 }
+1
src/database/schema.ts
··· 54 54 CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val); 55 55 CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts); 56 56 CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did); 57 + CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid); 57 58 CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 58 59 CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 59 60 `;
+6
tests/integration/database.test.ts
··· 175 175 const found = await blobsRepo.findByPhash("deadbeef"); 176 176 expect(found.length).toBeGreaterThan(0); 177 177 }); 178 + 179 + test("should find blob by CID", async () => { 180 + const found = await blobsRepo.findByCid("bafytest123"); 181 + expect(found).not.toBeNull(); 182 + expect(found?.sha256).toBe("abc123def456"); 183 + }); 178 184 }); 179 185 });