A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #1 from skywatch-bsky/fix/blob-handling-bug

fix: blob deduplication logic using proper CID lookup

authored by

Scarnecchia and committed by
GitHub
2223c6e2 4006d353

+34 -2
+10 -2
src/blobs/processor.ts
··· 116 postUri: string, 117 ref: BlobReference 118 ): Promise<void> { 119 - const existing = await this.blobsRepo.findBySha256(ref.cid); 120 if (existing) { 121 logger.debug( 122 { postUri, cid: ref.cid }, 123 - "Blob already processed, skipping" 124 ); 125 return; 126 }
··· 116 postUri: string, 117 ref: BlobReference 118 ): Promise<void> { 119 + const existing = await this.blobsRepo.findByCid(ref.cid); 120 if (existing) { 121 + await this.blobsRepo.insert({ 122 + post_uri: postUri, 123 + blob_cid: ref.cid, 124 + sha256: existing.sha256, 125 + phash: existing.phash, 126 + storage_path: existing.storage_path, 127 + mimetype: existing.mimetype, 128 + }); 129 logger.debug( 130 { postUri, cid: ref.cid }, 131 + "Blob already processed, reusing hashes" 132 ); 133 return; 134 }
+17
src/database/blobs.repository.ts
··· 103 ); 104 }); 105 } 106 }
··· 103 ); 104 }); 105 } 106 + 107 + async findByCid(cid: string): Promise<Blob | null> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`, 111 + cid, 112 + (err, rows: Blob[]) => { 113 + if (err) { 114 + logger.error({ err, cid }, "Failed to find blob by CID"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows?.[0] || null); 119 + } 120 + ); 121 + }); 122 + } 123 }
+1
src/database/schema.ts
··· 70 CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val); 71 CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts); 72 CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did); 73 CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 74 CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 75 CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256);
··· 70 CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val); 71 CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts); 72 CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did); 73 + CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid); 74 CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 75 CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 76 CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256);
+6
tests/integration/database.test.ts
··· 195 const found = await blobsRepo.findByPhash("deadbeef"); 196 expect(found.length).toBeGreaterThan(0); 197 }); 198 }); 199 });
··· 195 const found = await blobsRepo.findByPhash("deadbeef"); 196 expect(found.length).toBeGreaterThan(0); 197 }); 198 + 199 + test("should find blob by CID", async () => { 200 + const found = await blobsRepo.findByCid("bafytest123"); 201 + expect(found).not.toBeNull(); 202 + expect(found?.sha256).toBe("abc123def456"); 203 + }); 204 }); 205 });