A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 7.9 kB view raw
1import { AtpAgent } from "@atproto/api"; 2import { Database } from "duckdb"; 3import { ProfilesRepository } from "../database/profiles.repository.js"; 4import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js"; 5import { computeBlobHashes } from "../blobs/hasher.js"; 6import { LocalBlobStorage } from "../blobs/storage/local.js"; 7import { S3BlobStorage } from "../blobs/storage/s3.js"; 8import { BlobStorage } from "../blobs/processor.js"; 9import { pRateLimit } from "p-ratelimit"; 10import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js"; 11import { logger } from "../logger/index.js"; 12import { config } from "../config/index.js"; 13 14export class ProfileHydrationService { 15 private agent: AtpAgent; 16 private profilesRepo: ProfilesRepository; 17 private profileBlobsRepo: ProfileBlobsRepository; 18 private storage: BlobStorage | null = null; 19 private limit: ReturnType<typeof pRateLimit>; 20 21 constructor(db: Database) { 22 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 23 this.profilesRepo = new ProfilesRepository(db); 24 this.profileBlobsRepo = new ProfileBlobsRepository(db); 25 26 if (config.blobs.hydrateBlobs) { 27 if (config.blobs.storage.type === "s3") { 28 this.storage = new S3BlobStorage( 29 config.blobs.storage.s3Bucket!, 30 config.blobs.storage.s3Region! 31 ); 32 } else { 33 this.storage = new LocalBlobStorage( 34 config.blobs.storage.localPath 35 ); 36 } 37 } 38 39 this.limit = pRateLimit({ 40 interval: 300000, 41 rate: 3000, 42 concurrency: 48, 43 maxDelay: 60000, 44 }); 45 } 46 47 async initialize(): Promise<void> { 48 try { 49 await this.agent.login({ 50 identifier: config.bsky.handle, 51 password: config.bsky.password, 52 }); 53 logger.info("Profile hydration service authenticated"); 54 } catch (error) { 55 logger.error({ error }, "Failed to authenticate profile hydration service"); 56 throw error; 57 } 58 } 59 60 async hydrateProfile(did: string): Promise<void> { 61 try { 62 const existingProfile = await this.profilesRepo.findByDid(did); 63 const needsRehydration = existingProfile && (existingProfile.avatar_cid === null || existingProfile.banner_cid === null); 64 65 if (existingProfile && !needsRehydration) { 66 logger.debug({ did }, "Profile already fully hydrated, skipping"); 67 return; 68 } 69 70 if (needsRehydration) { 71 logger.debug({ did }, "Re-hydrating profile to fetch avatar/banner CIDs"); 72 } 73 74 const profileResponse = await this.limit(() => 75 withRetry( 76 async () => { 77 return await this.agent.com.atproto.repo.getRecord({ 78 repo: did, 79 collection: "app.bsky.actor.profile", 80 rkey: "self", 81 }); 82 }, 83 { 84 maxAttempts: 3, 85 initialDelay: 1000, 86 maxDelay: 10000, 87 backoffMultiplier: 2, 88 retryableErrors: [ 89 isRateLimitError, 90 isNetworkError, 91 isServerError, 92 ], 93 } 94 ) 95 ); 96 97 let displayName: string | undefined; 98 let description: string | undefined; 99 let avatarCid: string | undefined; 100 let bannerCid: string | undefined; 101 102 if (profileResponse.success && profileResponse.data.value) { 103 const record = profileResponse.data.value as any; 104 displayName = record.displayName; 105 description = record.description; 106 107 if (record.avatar?.ref) { 108 avatarCid = record.avatar.ref.toString(); 109 } else { 110 avatarCid = ""; 111 } 112 113 if (record.banner?.ref) { 114 bannerCid = record.banner.ref.toString(); 115 } else { 116 bannerCid = ""; 117 } 118 119 logger.debug({ did, avatarCid, bannerCid, hasAvatar: !!record.avatar, hasBanner: !!record.banner }, "Extracted CIDs from profile record"); 120 } 121 122 const profileLookup = await this.limit(() => 123 withRetry( 124 async () => { 125 return await this.agent.getProfile({ actor: did }); 126 }, 127 { 128 maxAttempts: 3, 129 initialDelay: 1000, 130 maxDelay: 10000, 131 backoffMultiplier: 2, 132 retryableErrors: [ 133 isRateLimitError, 134 isNetworkError, 135 isServerError, 136 ], 137 } 138 ) 139 ); 140 141 let handle: string | undefined; 142 if (profileLookup.success) { 143 handle = profileLookup.data.handle; 144 } 145 146 await this.profilesRepo.insert({ 147 did, 148 handle, 149 display_name: displayName, 150 description, 151 avatar_cid: avatarCid, 152 banner_cid: bannerCid, 153 }); 154 155 if (avatarCid && avatarCid !== "") { 156 try { 157 await this.processProfileBlob(did, avatarCid, "avatar"); 158 } catch (error) { 159 logger.warn({ error, did, avatarCid }, "Failed to process avatar blob"); 160 } 161 } 162 163 if (bannerCid && bannerCid !== "") { 164 try { 165 await this.processProfileBlob(did, bannerCid, "banner"); 166 } catch (error) { 167 logger.warn({ error, did, bannerCid }, "Failed to process banner blob"); 168 } 169 } 170 171 logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully"); 172 } catch (error) { 173 if (isRecordNotFoundError(error)) { 174 logger.warn({ did }, "Profile record not found, skipping"); 175 return; 176 } 177 logger.error({ error, did }, "Failed to hydrate profile"); 178 throw error; 179 } 180 } 181 182 private async resolvePds(did: string): Promise<string | null> { 183 try { 184 const didDocResponse = await fetch(`${config.plc.endpoint}/${did}`); 185 if (!didDocResponse.ok) { 186 logger.warn({ did, status: didDocResponse.status }, "Failed to fetch DID document"); 187 return null; 188 } 189 190 const didDoc = await didDocResponse.json(); 191 const pdsService = didDoc.service?.find((s: any) => 192 s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer" 193 ); 194 195 if (!pdsService?.serviceEndpoint) { 196 logger.warn({ did }, "No PDS endpoint found in DID document"); 197 return null; 198 } 199 200 return pdsService.serviceEndpoint; 201 } catch (error) { 202 logger.error({ error, did }, "Failed to resolve PDS from DID"); 203 return null; 204 } 205 } 206 207 private async processProfileBlob( 208 did: string, 209 cid: string, 210 type: "avatar" | "banner" 211 ): Promise<void> { 212 const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type); 213 214 if (latestBlob && latestBlob.blob_cid === cid) { 215 logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping"); 216 return; 217 } 218 219 const pdsEndpoint = await this.resolvePds(did); 220 if (!pdsEndpoint) { 221 logger.warn({ did, cid, type }, "Cannot fetch blob without PDS endpoint"); 222 return; 223 } 224 225 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 226 const blobResponse = await fetch(blobUrl); 227 228 if (!blobResponse.ok) { 229 logger.warn({ did, cid, type, pdsEndpoint, status: blobResponse.status }, "Failed to fetch blob from PDS"); 230 return; 231 } 232 233 const blobData = Buffer.from(await blobResponse.arrayBuffer()); 234 235 let storagePath: string | undefined; 236 if (this.storage && config.blobs.hydrateBlobs) { 237 storagePath = await this.storage.store(cid, blobData, "image/jpeg"); 238 } 239 240 const hashes = await computeBlobHashes(blobData, "image/jpeg"); 241 242 await this.profileBlobsRepo.insert({ 243 did, 244 blob_type: type, 245 blob_cid: cid, 246 sha256: hashes.sha256, 247 phash: hashes.phash, 248 storage_path: storagePath, 249 mimetype: "image/jpeg", 250 }); 251 252 logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully"); 253 } 254}