A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
1import { AtpAgent } from "@atproto/api";
2import { Database } from "duckdb";
3import { ProfilesRepository } from "../database/profiles.repository.js";
4import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js";
5import { computeBlobHashes } from "../blobs/hasher.js";
6import { LocalBlobStorage } from "../blobs/storage/local.js";
7import { S3BlobStorage } from "../blobs/storage/s3.js";
8import { BlobStorage } from "../blobs/processor.js";
9import { pRateLimit } from "p-ratelimit";
10import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js";
11import { logger } from "../logger/index.js";
12import { config } from "../config/index.js";
13
14export class ProfileHydrationService {
15 private agent: AtpAgent;
16 private profilesRepo: ProfilesRepository;
17 private profileBlobsRepo: ProfileBlobsRepository;
18 private storage: BlobStorage | null = null;
19 private limit: ReturnType<typeof pRateLimit>;
20
21 constructor(db: Database) {
22 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` });
23 this.profilesRepo = new ProfilesRepository(db);
24 this.profileBlobsRepo = new ProfileBlobsRepository(db);
25
26 if (config.blobs.hydrateBlobs) {
27 if (config.blobs.storage.type === "s3") {
28 this.storage = new S3BlobStorage(
29 config.blobs.storage.s3Bucket!,
30 config.blobs.storage.s3Region!
31 );
32 } else {
33 this.storage = new LocalBlobStorage(
34 config.blobs.storage.localPath
35 );
36 }
37 }
38
39 this.limit = pRateLimit({
40 interval: 300000,
41 rate: 3000,
42 concurrency: 48,
43 maxDelay: 60000,
44 });
45 }
46
47 async initialize(): Promise<void> {
48 try {
49 await this.agent.login({
50 identifier: config.bsky.handle,
51 password: config.bsky.password,
52 });
53 logger.info("Profile hydration service authenticated");
54 } catch (error) {
55 logger.error({ error }, "Failed to authenticate profile hydration service");
56 throw error;
57 }
58 }
59
60 async hydrateProfile(did: string): Promise<void> {
61 try {
62 const existingProfile = await this.profilesRepo.findByDid(did);
63 const needsRehydration = existingProfile && (existingProfile.avatar_cid === null || existingProfile.banner_cid === null);
64
65 if (existingProfile && !needsRehydration) {
66 logger.debug({ did }, "Profile already fully hydrated, skipping");
67 return;
68 }
69
70 if (needsRehydration) {
71 logger.debug({ did }, "Re-hydrating profile to fetch avatar/banner CIDs");
72 }
73
74 const profileResponse = await this.limit(() =>
75 withRetry(
76 async () => {
77 return await this.agent.com.atproto.repo.getRecord({
78 repo: did,
79 collection: "app.bsky.actor.profile",
80 rkey: "self",
81 });
82 },
83 {
84 maxAttempts: 3,
85 initialDelay: 1000,
86 maxDelay: 10000,
87 backoffMultiplier: 2,
88 retryableErrors: [
89 isRateLimitError,
90 isNetworkError,
91 isServerError,
92 ],
93 }
94 )
95 );
96
97 let displayName: string | undefined;
98 let description: string | undefined;
99 let avatarCid: string | undefined;
100 let bannerCid: string | undefined;
101
102 if (profileResponse.success && profileResponse.data.value) {
103 const record = profileResponse.data.value as any;
104 displayName = record.displayName;
105 description = record.description;
106
107 if (record.avatar?.ref) {
108 avatarCid = record.avatar.ref.toString();
109 } else {
110 avatarCid = "";
111 }
112
113 if (record.banner?.ref) {
114 bannerCid = record.banner.ref.toString();
115 } else {
116 bannerCid = "";
117 }
118
119 logger.debug({ did, avatarCid, bannerCid, hasAvatar: !!record.avatar, hasBanner: !!record.banner }, "Extracted CIDs from profile record");
120 }
121
122 const profileLookup = await this.limit(() =>
123 withRetry(
124 async () => {
125 return await this.agent.getProfile({ actor: did });
126 },
127 {
128 maxAttempts: 3,
129 initialDelay: 1000,
130 maxDelay: 10000,
131 backoffMultiplier: 2,
132 retryableErrors: [
133 isRateLimitError,
134 isNetworkError,
135 isServerError,
136 ],
137 }
138 )
139 );
140
141 let handle: string | undefined;
142 if (profileLookup.success) {
143 handle = profileLookup.data.handle;
144 }
145
146 await this.profilesRepo.insert({
147 did,
148 handle,
149 display_name: displayName,
150 description,
151 avatar_cid: avatarCid,
152 banner_cid: bannerCid,
153 });
154
155 if (avatarCid && avatarCid !== "") {
156 try {
157 await this.processProfileBlob(did, avatarCid, "avatar");
158 } catch (error) {
159 logger.warn({ error, did, avatarCid }, "Failed to process avatar blob");
160 }
161 }
162
163 if (bannerCid && bannerCid !== "") {
164 try {
165 await this.processProfileBlob(did, bannerCid, "banner");
166 } catch (error) {
167 logger.warn({ error, did, bannerCid }, "Failed to process banner blob");
168 }
169 }
170
171 logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully");
172 } catch (error) {
173 if (isRecordNotFoundError(error)) {
174 logger.warn({ did }, "Profile record not found, skipping");
175 return;
176 }
177 logger.error({ error, did }, "Failed to hydrate profile");
178 throw error;
179 }
180 }
181
182 private async resolvePds(did: string): Promise<string | null> {
183 try {
184 const didDocResponse = await fetch(`${config.plc.endpoint}/${did}`);
185 if (!didDocResponse.ok) {
186 logger.warn({ did, status: didDocResponse.status }, "Failed to fetch DID document");
187 return null;
188 }
189
190 const didDoc = await didDocResponse.json();
191 const pdsService = didDoc.service?.find((s: any) =>
192 s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer"
193 );
194
195 if (!pdsService?.serviceEndpoint) {
196 logger.warn({ did }, "No PDS endpoint found in DID document");
197 return null;
198 }
199
200 return pdsService.serviceEndpoint;
201 } catch (error) {
202 logger.error({ error, did }, "Failed to resolve PDS from DID");
203 return null;
204 }
205 }
206
207 private async processProfileBlob(
208 did: string,
209 cid: string,
210 type: "avatar" | "banner"
211 ): Promise<void> {
212 const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type);
213
214 if (latestBlob && latestBlob.blob_cid === cid) {
215 logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping");
216 return;
217 }
218
219 const pdsEndpoint = await this.resolvePds(did);
220 if (!pdsEndpoint) {
221 logger.warn({ did, cid, type }, "Cannot fetch blob without PDS endpoint");
222 return;
223 }
224
225 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
226 const blobResponse = await fetch(blobUrl);
227
228 if (!blobResponse.ok) {
229 logger.warn({ did, cid, type, pdsEndpoint, status: blobResponse.status }, "Failed to fetch blob from PDS");
230 return;
231 }
232
233 const blobData = Buffer.from(await blobResponse.arrayBuffer());
234
235 let storagePath: string | undefined;
236 if (this.storage && config.blobs.hydrateBlobs) {
237 storagePath = await this.storage.store(cid, blobData, "image/jpeg");
238 }
239
240 const hashes = await computeBlobHashes(blobData, "image/jpeg");
241
242 await this.profileBlobsRepo.insert({
243 did,
244 blob_type: type,
245 blob_cid: cid,
246 sha256: hashes.sha256,
247 phash: hashes.phash,
248 storage_path: storagePath,
249 mimetype: "image/jpeg",
250 });
251
252 logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully");
253 }
254}