A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

Merge branch 'main' of github.com:skywatch-bsky/skywatch-tail

+3
.env.example
··· 6 6 PDS=bsky.social 7 7 WSS_URL=wss://your-labeler-service.com/xrpc/com.atproto.label.subscribeLabels 8 8 9 + # PLC Directory (for DID resolution) 10 + PLC_ENDPOINT=https://plc.wtf 11 + 9 12 # Blob & Image Handling 10 13 HYDRATE_BLOBS=false # Set to true to download images/videos 11 14 BLOB_STORAGE_TYPE=local # 'local' or 's3'
+164
docs/profile-blob-hydration.md
··· 1 + # Profile Blob Hydration - Implementation Notes 2 + 3 + ## Overview 4 + 5 + This document captures key learnings from implementing avatar and banner blob hydration for Bluesky profiles. 6 + 7 + ## Key Discoveries 8 + 9 + ### 1. CID Deserialization in @atproto/api 10 + 11 + The `@atproto/api` library deserializes blob references from their JSON `$link` representation into CID class objects. 12 + 13 + **Raw JSON from API:** 14 + ```json 15 + { 16 + "avatar": { 17 + "$type": "blob", 18 + "ref": { 19 + "$link": "bafkreigg3s6plegjncmxubeufbohj3qasbm4r23q2x7zlivdhccfqfypve" 20 + }, 21 + "mimeType": "image/jpeg", 22 + "size": 101770 23 + } 24 + } 25 + ``` 26 + 27 + **What you get in TypeScript:** 28 + ```typescript 29 + record.avatar.ref // CID object with { code, version, hash, ... } 30 + ``` 31 + 32 + **Solution:** 33 + ```typescript 34 + const cid = record.avatar.ref.toString(); // "bafkrei..." 35 + ``` 36 + 37 + ### 2. PDS Endpoint Resolution 38 + 39 + Users can be on different Personal Data Servers (PDS), not just `bsky.social`. Blobs must be fetched from the user's actual PDS. 40 + 41 + **Process:** 42 + 1. Query PLC directory for DID document: `https://plc.wtf/${did}` 43 + 2. Find service with `id: "#atproto_pds"` and `type: "AtprotoPersonalDataServer"` 44 + 3. Extract `serviceEndpoint` URL 45 + 4. Use that endpoint for `com.atproto.sync.getBlob` 46 + 47 + **Example:** 48 + ```typescript 49 + const didDoc = await fetch(`https://plc.wtf/${did}`).then(r => r.json()); 50 + const pdsService = didDoc.service?.find(s => 51 + s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer" 52 + ); 53 + const pdsEndpoint = pdsService.serviceEndpoint; // e.g., "https://waxcap.us-west.host.bsky.network" 54 + ``` 55 + 56 + ### 3. Correct Blob Fetching 57 + 58 + **Don't use CDN paths** - they don't work reliably for all blobs and require authentication context. 59 + 60 + **Use the AT Protocol API:** 61 + ```typescript 62 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 63 + const response = await fetch(blobUrl); 64 + const blobData = Buffer.from(await response.arrayBuffer()); 65 + ``` 66 + 67 + ### 4. Database Schema Design 68 + 69 + **Separate tables for different blob types:** 70 + 71 + - `blobs` table: Post images with FK to `posts(uri)` 72 + - `profile_blobs` table: Avatars/banners with FK to `profiles(did)` 73 + 74 + This allows proper relational queries and analysis. 75 + 76 + **Profile blobs schema:** 77 + ```sql 78 + CREATE TABLE profile_blobs ( 79 + did TEXT NOT NULL, 80 + blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')), 81 + blob_cid TEXT NOT NULL, 82 + sha256 TEXT NOT NULL, 83 + phash TEXT, 84 + storage_path TEXT, 85 + mimetype TEXT, 86 + captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 87 + PRIMARY KEY (did, blob_type, captured_at), 88 + FOREIGN KEY (did) REFERENCES profiles(did) 89 + ); 90 + ``` 91 + 92 + ### 5. Change Tracking 93 + 94 + Including `captured_at` in the primary key allows tracking when users change their avatars/banners. 95 + 96 + **Query latest state:** 97 + ```sql 98 + SELECT * FROM profile_blobs 99 + WHERE did = ? AND blob_type = ? 100 + ORDER BY captured_at DESC 101 + LIMIT 1 102 + ``` 103 + 104 + **Only insert if changed:** 105 + ```typescript 106 + const latest = await findLatestByDidAndType(did, type); 107 + if (latest && latest.blob_cid === cid) { 108 + return; // No change, skip 109 + } 110 + // Insert new row with current timestamp 111 + ``` 112 + 113 + ### 6. Sentinel Values for Missing Data 114 + 115 + Use empty string (`""`) to distinguish "we checked, user has no avatar" from NULL "we haven't checked yet". 116 + 117 + ```typescript 118 + if (record.avatar?.ref) { 119 + avatarCid = record.avatar.ref.toString(); 120 + } else { 121 + avatarCid = ""; // Explicitly checked, not present 122 + } 123 + ``` 124 + 125 + This prevents infinite re-hydration loops for profiles without avatars. 126 + 127 + ### 7. Profile Re-hydration Logic 128 + 129 + ```typescript 130 + const existingProfile = await findByDid(did); 131 + const needsRehydration = existingProfile && 132 + (existingProfile.avatar_cid === null || existingProfile.banner_cid === null); 133 + 134 + if (existingProfile && !needsRehydration) { 135 + return; // Skip 136 + } 137 + ``` 138 + 139 + ## Configuration 140 + 141 + - `PLC_ENDPOINT`: DID resolution endpoint (default: `https://plc.wtf`) 142 + - Can be changed to `https://plc.directory` or custom instance 143 + - plc.wtf is faster but unofficial 144 + 145 + ## Common Errors 146 + 147 + ### "RepoNotFound" 148 + - **Cause:** Querying wrong PDS endpoint 149 + - **Solution:** Resolve correct PDS from DID document 150 + 151 + ### Foreign Key Constraint Violation 152 + - **Cause:** Trying to insert profile blobs into `blobs` table 153 + - **Solution:** Use separate `profile_blobs` table 154 + 155 + ### Missing CIDs Despite API Returning Them 156 + - **Cause:** Trying to access `ref.$link` when ref is a CID object 157 + - **Solution:** Call `.toString()` on the CID object 158 + 159 + ## Related Files 160 + 161 + - `src/hydration/profiles.service.ts` - Main hydration logic 162 + - `src/database/profile-blobs.repository.ts` - Profile blob persistence 163 + - `src/database/schema.ts` - Table definitions 164 + - `src/config/index.ts` - PLC endpoint configuration
+35 -43
src/blobs/processor.ts
··· 100 100 } 101 101 } 102 102 103 + private parseBlobUri(uri: string): { did: string; type: 'post' | 'avatar' | 'banner' } { 104 + if (uri.startsWith("profile://")) { 105 + const match = uri.match(/^profile:\/\/([^/]+)\/(avatar|banner)$/); 106 + if (match) { 107 + return { did: match[1], type: match[2] as 'avatar' | 'banner' }; 108 + } 109 + } 110 + 111 + const [, did] = uri.replace("at://", "").split("/"); 112 + return { did, type: 'post' }; 113 + } 114 + 103 115 private async processBlob( 104 116 postUri: string, 105 117 ref: BlobReference 106 118 ): Promise<void> { 107 - const existing = await this.blobsRepo.findBySha256(ref.cid); 119 + const existing = await this.blobsRepo.findByCid(ref.cid); 108 120 if (existing) { 121 + await this.blobsRepo.insert({ 122 + post_uri: postUri, 123 + blob_cid: ref.cid, 124 + sha256: existing.sha256, 125 + phash: existing.phash, 126 + storage_path: existing.storage_path, 127 + mimetype: existing.mimetype, 128 + }); 109 129 logger.debug( 110 130 { postUri, cid: ref.cid }, 111 - "Blob already processed, skipping" 131 + "Blob already processed, reusing hashes" 112 132 ); 113 133 return; 114 134 } 115 135 116 - const [, did] = postUri.replace("at://", "").split("/"); 136 + const { did, type } = this.parseBlobUri(postUri); 137 + const pds = `https://${config.bsky.pds}`; 138 + const blobUrl = `${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${ref.cid}`; 117 139 118 140 try { 119 - const response = await fetch( 120 - `https://cdn.bsky.app/img/feed_thumbnail/plain/${did}/${ref.cid}@jpeg`, 121 - { method: "HEAD" } 122 - ); 141 + const response = await fetch(blobUrl); 123 142 124 143 if (!response.ok) { 125 144 logger.warn( 126 - { postUri, cid: ref.cid, status: response.status }, 127 - "Failed to fetch blob metadata" 145 + { postUri, cid: ref.cid, status: response.status, did }, 146 + "Failed to fetch blob" 128 147 ); 129 148 return; 130 149 } 131 150 132 - let blobData: Buffer | null = null; 133 - let storagePath: string | undefined; 151 + const blobData = Buffer.from(await response.arrayBuffer()); 134 152 153 + let storagePath: string | undefined; 135 154 if (this.storage && config.blobs.hydrateBlobs) { 136 - const fullResponse = await fetch( 137 - `https://cdn.bsky.app/img/feed_fullsize/plain/${did}/${ref.cid}@jpeg` 155 + storagePath = await this.storage.store( 156 + ref.cid, 157 + blobData, 158 + ref.mimeType 138 159 ); 139 - 140 - if (fullResponse.ok) { 141 - blobData = Buffer.from( 142 - await fullResponse.arrayBuffer() 143 - ); 144 - storagePath = await this.storage.store( 145 - ref.cid, 146 - blobData, 147 - ref.mimeType 148 - ); 149 - } 150 - } else { 151 - const thumbResponse = await fetch( 152 - `https://cdn.bsky.app/img/feed_thumbnail/plain/${did}/${ref.cid}@jpeg` 153 - ); 154 - 155 - if (thumbResponse.ok) { 156 - blobData = Buffer.from( 157 - await thumbResponse.arrayBuffer() 158 - ); 159 - } 160 - } 161 - 162 - if (!blobData) { 163 - logger.warn( 164 - { postUri, cid: ref.cid }, 165 - "Could not fetch blob data" 166 - ); 167 - return; 168 160 } 169 161 170 162 const hashes = await computeBlobHashes( ··· 182 174 }); 183 175 184 176 logger.info( 185 - { postUri, cid: ref.cid, sha256: hashes.sha256 }, 177 + { postUri, cid: ref.cid, sha256: hashes.sha256, type }, 186 178 "Blob processed successfully" 187 179 ); 188 180 } catch (error) {
+6
src/config/index.ts
··· 9 9 password: z.string().min(1, "BSKY_PASSWORD is required"), 10 10 pds: z.string().default("bsky.social"), 11 11 }), 12 + plc: z.object({ 13 + endpoint: z.string().url().default("https://plc.wtf"), 14 + }), 12 15 labeler: z.object({ 13 16 wssUrl: z.string().url("WSS_URL must be a valid URL"), 14 17 }), ··· 42 45 handle: process.env.BSKY_HANDLE, 43 46 password: process.env.BSKY_PASSWORD, 44 47 pds: process.env.PDS, 48 + }, 49 + plc: { 50 + endpoint: process.env.PLC_ENDPOINT, 45 51 }, 46 52 labeler: { 47 53 wssUrl: process.env.WSS_URL,
+17
src/database/blobs.repository.ts
··· 103 103 ); 104 104 }); 105 105 } 106 + 107 + async findByCid(cid: string): Promise<Blob | null> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`, 111 + cid, 112 + (err, rows: Blob[]) => { 113 + if (err) { 114 + logger.error({ err, cid }, "Failed to find blob by CID"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows?.[0] || null); 119 + } 120 + ); 121 + }); 122 + } 106 123 }
+123
src/database/profile-blobs.repository.ts
··· 1 + import { Database } from "duckdb"; 2 + import { logger } from "../logger/index.js"; 3 + 4 + export interface ProfileBlob { 5 + did: string; 6 + blob_type: "avatar" | "banner"; 7 + blob_cid: string; 8 + sha256: string; 9 + phash?: string; 10 + storage_path?: string; 11 + mimetype?: string; 12 + captured_at?: Date; 13 + } 14 + 15 + export class ProfileBlobsRepository { 16 + constructor(private db: Database) {} 17 + 18 + async insert(blob: ProfileBlob): Promise<void> { 19 + return new Promise((resolve, reject) => { 20 + this.db.prepare( 21 + ` 22 + INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype, captured_at) 23 + VALUES ($1, $2, $3, $4, $5, $6, $7, COALESCE($8, CURRENT_TIMESTAMP)) 24 + `, 25 + (err, stmt) => { 26 + if (err) { 27 + logger.error({ err }, "Failed to prepare profile blob insert statement"); 28 + reject(err); 29 + return; 30 + } 31 + 32 + stmt.run( 33 + blob.did, 34 + blob.blob_type, 35 + blob.blob_cid, 36 + blob.sha256, 37 + blob.phash || null, 38 + blob.storage_path || null, 39 + blob.mimetype || null, 40 + blob.captured_at || null, 41 + (err) => { 42 + if (err) { 43 + logger.error({ err, blob }, "Failed to insert profile blob"); 44 + reject(err); 45 + return; 46 + } 47 + resolve(); 48 + } 49 + ); 50 + } 51 + ); 52 + }); 53 + } 54 + 55 + async findByDid(did: string): Promise<ProfileBlob[]> { 56 + return new Promise((resolve, reject) => { 57 + this.db.all( 58 + `SELECT * FROM profile_blobs WHERE did = $1 ORDER BY captured_at DESC`, 59 + did, 60 + (err, rows: ProfileBlob[]) => { 61 + if (err) { 62 + logger.error({ err, did }, "Failed to find profile blobs by DID"); 63 + reject(err); 64 + return; 65 + } 66 + resolve(rows || []); 67 + } 68 + ); 69 + }); 70 + } 71 + 72 + async findLatestByDidAndType(did: string, blobType: "avatar" | "banner"): Promise<ProfileBlob | null> { 73 + return new Promise((resolve, reject) => { 74 + this.db.all( 75 + `SELECT * FROM profile_blobs WHERE did = $1 AND blob_type = $2 ORDER BY captured_at DESC LIMIT 1`, 76 + did, 77 + blobType, 78 + (err, rows: ProfileBlob[]) => { 79 + if (err) { 80 + logger.error({ err, did, blobType }, "Failed to find latest profile blob"); 81 + reject(err); 82 + return; 83 + } 84 + resolve(rows?.[0] || null); 85 + } 86 + ); 87 + }); 88 + } 89 + 90 + async findBySha256(sha256: string): Promise<ProfileBlob | null> { 91 + return new Promise((resolve, reject) => { 92 + this.db.all( 93 + `SELECT * FROM profile_blobs WHERE sha256 = $1 LIMIT 1`, 94 + sha256, 95 + (err, rows: ProfileBlob[]) => { 96 + if (err) { 97 + logger.error({ err, sha256 }, "Failed to find profile blob by SHA256"); 98 + reject(err); 99 + return; 100 + } 101 + resolve(rows?.[0] || null); 102 + } 103 + ); 104 + }); 105 + } 106 + 107 + async findByPhash(phash: string): Promise<ProfileBlob[]> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM profile_blobs WHERE phash = $1`, 111 + phash, 112 + (err, rows: ProfileBlob[]) => { 113 + if (err) { 114 + logger.error({ err, phash }, "Failed to find profile blobs by pHash"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows || []); 119 + } 120 + ); 121 + }); 122 + } 123 + }
+9 -3
src/database/profiles.repository.ts
··· 6 6 handle?: string; 7 7 display_name?: string; 8 8 description?: string; 9 + avatar_cid?: string; 10 + banner_cid?: string; 9 11 } 10 12 11 13 export class ProfilesRepository { ··· 15 17 return new Promise((resolve, reject) => { 16 18 this.db.prepare( 17 19 ` 18 - INSERT INTO profiles (did, handle, display_name, description) 19 - VALUES ($1, $2, $3, $4) 20 + INSERT INTO profiles (did, handle, display_name, description, avatar_cid, banner_cid) 21 + VALUES ($1, $2, $3, $4, $5, $6) 20 22 ON CONFLICT (did) DO UPDATE SET 21 23 handle = EXCLUDED.handle, 22 24 display_name = EXCLUDED.display_name, 23 - description = EXCLUDED.description 25 + description = EXCLUDED.description, 26 + avatar_cid = EXCLUDED.avatar_cid, 27 + banner_cid = EXCLUDED.banner_cid 24 28 `, 25 29 (err, stmt) => { 26 30 if (err) { ··· 34 38 profile.handle || null, 35 39 profile.display_name || null, 36 40 profile.description || null, 41 + profile.avatar_cid || null, 42 + profile.banner_cid || null, 37 43 (err) => { 38 44 if (err) { 39 45 logger.error({ err, profile }, "Failed to insert profile");
+74 -3
src/database/schema.ts
··· 34 34 did TEXT PRIMARY KEY, 35 35 handle TEXT, 36 36 display_name TEXT, 37 - description TEXT 37 + description TEXT, 38 + avatar_cid TEXT, 39 + banner_cid TEXT 38 40 ); 39 41 40 42 -- Blobs table: stores information about image blobs found in posts ··· 49 51 FOREIGN KEY (post_uri) REFERENCES posts(uri) 50 52 ); 51 53 54 + -- Profile blobs table: stores avatar and banner blobs for profiles 55 + CREATE TABLE IF NOT EXISTS profile_blobs ( 56 + did TEXT NOT NULL, 57 + blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')), 58 + blob_cid TEXT NOT NULL, 59 + sha256 TEXT NOT NULL, 60 + phash TEXT, 61 + storage_path TEXT, 62 + mimetype TEXT, 63 + captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 64 + PRIMARY KEY (did, blob_type, captured_at), 65 + FOREIGN KEY (did) REFERENCES profiles(did) 66 + ); 67 + 52 68 -- Indexes for performance 53 69 CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri); 54 70 CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val); 55 71 CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts); 56 72 CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did); 73 + CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid); 57 74 CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 58 75 CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 76 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256); 77 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_phash ON profile_blobs(phash); 59 78 `; 60 79 80 + async function migrateProfilesTable(): Promise<void> { 81 + const db = getDatabase(); 82 + 83 + return new Promise((resolve, reject) => { 84 + db.all( 85 + "SELECT column_name FROM information_schema.columns WHERE table_name = 'profiles'", 86 + (err, rows: any[]) => { 87 + if (err) { 88 + logger.error({ err }, "Failed to check profiles table columns"); 89 + reject(err); 90 + return; 91 + } 92 + 93 + const columnNames = rows.map((row) => row.column_name); 94 + const hasAvatarCid = columnNames.includes("avatar_cid"); 95 + const hasBannerCid = columnNames.includes("banner_cid"); 96 + 97 + if (!hasAvatarCid || !hasBannerCid) { 98 + logger.info("Migrating profiles table to add avatar_cid and banner_cid columns"); 99 + 100 + const migrations: string[] = []; 101 + if (!hasAvatarCid) { 102 + migrations.push("ALTER TABLE profiles ADD COLUMN avatar_cid TEXT"); 103 + } 104 + if (!hasBannerCid) { 105 + migrations.push("ALTER TABLE profiles ADD COLUMN banner_cid TEXT"); 106 + } 107 + 108 + db.exec(migrations.join("; "), (err) => { 109 + if (err) { 110 + logger.error({ err }, "Failed to migrate profiles table"); 111 + reject(err); 112 + return; 113 + } 114 + logger.info("Profiles table migration completed"); 115 + resolve(); 116 + }); 117 + } else { 118 + logger.debug("Profiles table already has avatar_cid and banner_cid columns"); 119 + resolve(); 120 + } 121 + } 122 + ); 123 + }); 124 + } 125 + 61 126 export async function initializeSchema(): Promise<void> { 62 127 const db = getDatabase(); 63 128 64 129 return new Promise((resolve, reject) => { 65 - db.exec(SCHEMA_SQL, (err) => { 130 + db.exec(SCHEMA_SQL, async (err) => { 66 131 if (err) { 67 132 logger.error({ err }, "Failed to initialize schema"); 68 133 reject(err); 69 134 return; 70 135 } 71 136 logger.info("Database schema initialized"); 72 - resolve(); 137 + 138 + try { 139 + await migrateProfilesTable(); 140 + resolve(); 141 + } catch (migrationErr) { 142 + reject(migrationErr); 143 + } 73 144 }); 74 145 }); 75 146 }
+138 -3
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 + import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js"; 5 + import { computeBlobHashes } from "../blobs/hasher.js"; 6 + import { LocalBlobStorage } from "../blobs/storage/local.js"; 7 + import { S3BlobStorage } from "../blobs/storage/s3.js"; 8 + import { BlobStorage } from "../blobs/processor.js"; 4 9 import { pRateLimit } from "p-ratelimit"; 5 10 import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js"; 6 11 import { logger } from "../logger/index.js"; ··· 9 14 export class ProfileHydrationService { 10 15 private agent: AtpAgent; 11 16 private profilesRepo: ProfilesRepository; 17 + private profileBlobsRepo: ProfileBlobsRepository; 18 + private storage: BlobStorage | null = null; 12 19 private limit: ReturnType<typeof pRateLimit>; 13 20 14 21 constructor(db: Database) { 15 22 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 16 23 this.profilesRepo = new ProfilesRepository(db); 24 + this.profileBlobsRepo = new ProfileBlobsRepository(db); 25 + 26 + if (config.blobs.hydrateBlobs) { 27 + if (config.blobs.storage.type === "s3") { 28 + this.storage = new S3BlobStorage( 29 + config.blobs.storage.s3Bucket!, 30 + config.blobs.storage.s3Region! 31 + ); 32 + } else { 33 + this.storage = new LocalBlobStorage( 34 + config.blobs.storage.localPath 35 + ); 36 + } 37 + } 38 + 17 39 this.limit = pRateLimit({ 18 40 interval: 300000, 19 41 rate: 3000, ··· 38 60 async hydrateProfile(did: string): Promise<void> { 39 61 try { 40 62 const existingProfile = await this.profilesRepo.findByDid(did); 41 - if (existingProfile) { 42 - logger.debug({ did }, "Profile already hydrated, skipping"); 63 + const needsRehydration = existingProfile && (existingProfile.avatar_cid === null || existingProfile.banner_cid === null); 64 + 65 + if (existingProfile && !needsRehydration) { 66 + logger.debug({ did }, "Profile already fully hydrated, skipping"); 43 67 return; 68 + } 69 + 70 + if (needsRehydration) { 71 + logger.debug({ did }, "Re-hydrating profile to fetch avatar/banner CIDs"); 44 72 } 45 73 46 74 const profileResponse = await this.limit(() => ··· 68 96 69 97 let displayName: string | undefined; 70 98 let description: string | undefined; 99 + let avatarCid: string | undefined; 100 + let bannerCid: string | undefined; 71 101 72 102 if (profileResponse.success && profileResponse.data.value) { 73 103 const record = profileResponse.data.value as any; 74 104 displayName = record.displayName; 75 105 description = record.description; 106 + 107 + if (record.avatar?.ref) { 108 + avatarCid = record.avatar.ref.toString(); 109 + } else { 110 + avatarCid = ""; 111 + } 112 + 113 + if (record.banner?.ref) { 114 + bannerCid = record.banner.ref.toString(); 115 + } else { 116 + bannerCid = ""; 117 + } 118 + 119 + logger.debug({ did, avatarCid, bannerCid, hasAvatar: !!record.avatar, hasBanner: !!record.banner }, "Extracted CIDs from profile record"); 76 120 } 77 121 78 122 const profileLookup = await this.limit(() => ··· 104 148 handle, 105 149 display_name: displayName, 106 150 description, 151 + avatar_cid: avatarCid, 152 + banner_cid: bannerCid, 107 153 }); 108 154 109 - logger.info({ did, handle }, "Profile hydrated successfully"); 155 + if (avatarCid && avatarCid !== "") { 156 + try { 157 + await this.processProfileBlob(did, avatarCid, "avatar"); 158 + } catch (error) { 159 + logger.warn({ error, did, avatarCid }, "Failed to process avatar blob"); 160 + } 161 + } 162 + 163 + if (bannerCid && bannerCid !== "") { 164 + try { 165 + await this.processProfileBlob(did, bannerCid, "banner"); 166 + } catch (error) { 167 + logger.warn({ error, did, bannerCid }, "Failed to process banner blob"); 168 + } 169 + } 170 + 171 + logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully"); 110 172 } catch (error) { 111 173 if (isRecordNotFoundError(error)) { 112 174 logger.warn({ did }, "Profile record not found, skipping"); ··· 115 177 logger.error({ error, did }, "Failed to hydrate profile"); 116 178 throw error; 117 179 } 180 + } 181 + 182 + private async resolvePds(did: string): Promise<string | null> { 183 + try { 184 + const didDocResponse = await fetch(`${config.plc.endpoint}/${did}`); 185 + if (!didDocResponse.ok) { 186 + logger.warn({ did, status: didDocResponse.status }, "Failed to fetch DID document"); 187 + return null; 188 + } 189 + 190 + const didDoc = await didDocResponse.json(); 191 + const pdsService = didDoc.service?.find((s: any) => 192 + s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer" 193 + ); 194 + 195 + if (!pdsService?.serviceEndpoint) { 196 + logger.warn({ did }, "No PDS endpoint found in DID document"); 197 + return null; 198 + } 199 + 200 + return pdsService.serviceEndpoint; 201 + } catch (error) { 202 + logger.error({ error, did }, "Failed to resolve PDS from DID"); 203 + return null; 204 + } 205 + } 206 + 207 + private async processProfileBlob( 208 + did: string, 209 + cid: string, 210 + type: "avatar" | "banner" 211 + ): Promise<void> { 212 + const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type); 213 + 214 + if (latestBlob && latestBlob.blob_cid === cid) { 215 + logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping"); 216 + return; 217 + } 218 + 219 + const pdsEndpoint = await this.resolvePds(did); 220 + if (!pdsEndpoint) { 221 + logger.warn({ did, cid, type }, "Cannot fetch blob without PDS endpoint"); 222 + return; 223 + } 224 + 225 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 226 + const blobResponse = await fetch(blobUrl); 227 + 228 + if (!blobResponse.ok) { 229 + logger.warn({ did, cid, type, pdsEndpoint, status: blobResponse.status }, "Failed to fetch blob from PDS"); 230 + return; 231 + } 232 + 233 + const blobData = Buffer.from(await blobResponse.arrayBuffer()); 234 + 235 + let storagePath: string | undefined; 236 + if (this.storage && config.blobs.hydrateBlobs) { 237 + storagePath = await this.storage.store(cid, blobData, "image/jpeg"); 238 + } 239 + 240 + const hashes = await computeBlobHashes(blobData, "image/jpeg"); 241 + 242 + await this.profileBlobsRepo.insert({ 243 + did, 244 + blob_type: type, 245 + blob_cid: cid, 246 + sha256: hashes.sha256, 247 + phash: hashes.phash, 248 + storage_path: storagePath, 249 + mimetype: "image/jpeg", 250 + }); 251 + 252 + logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully"); 118 253 } 119 254 }
+27 -1
tests/integration/database.test.ts
··· 48 48 did TEXT PRIMARY KEY, 49 49 handle TEXT, 50 50 display_name TEXT, 51 - description TEXT 51 + description TEXT, 52 + avatar_cid TEXT, 53 + banner_cid TEXT 52 54 ); 53 55 54 56 CREATE TABLE IF NOT EXISTS blobs ( ··· 146 148 expect(found).not.toBeNull(); 147 149 expect(found?.did).toBe("did:plc:testuser"); 148 150 }); 151 + 152 + test("should insert and retrieve profile with avatar and banner", async () => { 153 + const profile = { 154 + did: "did:plc:testuser2", 155 + handle: "testuser2.bsky.social", 156 + display_name: "Test User 2", 157 + description: "A test user with avatar", 158 + avatar_cid: "bafyavatartest", 159 + banner_cid: "bafybannertest", 160 + }; 161 + 162 + await profilesRepo.insert(profile); 163 + const found = await profilesRepo.findByDid(profile.did); 164 + 165 + expect(found).not.toBeNull(); 166 + expect(found?.avatar_cid).toBe("bafyavatartest"); 167 + expect(found?.banner_cid).toBe("bafybannertest"); 168 + }); 149 169 }); 150 170 151 171 describe("BlobsRepository", () => { ··· 174 194 test("should find blobs by pHash", async () => { 175 195 const found = await blobsRepo.findByPhash("deadbeef"); 176 196 expect(found.length).toBeGreaterThan(0); 197 + }); 198 + 199 + test("should find blob by CID", async () => { 200 + const found = await blobsRepo.findByCid("bafytest123"); 201 + expect(found).not.toBeNull(); 202 + expect(found?.sha256).toBe("abc123def456"); 177 203 }); 178 204 }); 179 205 });