Sifa professional network API (Fastify, AT Protocol, Jetstream) sifa.id/

feat: add PDS data repair script (#208)

Rebuilds PDS records from local DB data. The old code wrote records
with wrong field names (skillName, companyName, startDate, endDate)
which the PDS silently dropped, resulting in incomplete records. The
local DB has the correct data from write-through indexing.

Compares each PDS record against local DB and only rewrites records
that are missing data. Supports --dry-run.

authored by

Guido X Jansen and committed by
GitHub
8f4b113c 14df9223

+370
+370
scripts/repair-pds-records.ts
··· 1 + /** 2 + * PDS Data Repair Script 3 + * 4 + * Rebuilds PDS records from local DB data. The old code wrote records with 5 + * wrong field names (skillName, companyName, startDate, endDate) which the 6 + * PDS silently dropped, resulting in incomplete records. The local DB has 7 + * the correct data from write-through indexing. 8 + * 9 + * This script reads each user's skills, positions, and education from the 10 + * local DB and overwrites the PDS records with complete, lexicon-conformant data. 11 + * 12 + * Usage: 13 + * DATABASE_URL=... tsx scripts/repair-pds-records.ts [--dry-run] 14 + * 15 + * Delete this script after successful production run. 16 + */ 17 + import pg from 'pg'; 18 + import { drizzle } from 'drizzle-orm/node-postgres'; 19 + import { eq } from 'drizzle-orm'; 20 + import Redis from 'ioredis'; 21 + import { Agent } from '@atproto/api'; 22 + import { createOAuthClient } from '../src/oauth/client.js'; 23 + import { profiles } from '../src/db/schema/profiles.js'; 24 + import { skills } from '../src/db/schema/skills.js'; 25 + import { positions } from '../src/db/schema/positions.js'; 26 + import { education } from '../src/db/schema/education.js'; 27 + import { skillPositionLinks } from '../src/db/schema/skill-position-links.js'; 28 + import { loadConfig } from '../src/config.js'; 29 + import { logger } from '../src/logger.js'; 30 + 31 + const dryRun = process.argv.includes('--dry-run'); 32 + const log = logger.child({ script: 'repair-pds-records', dryRun }); 33 + log.info(dryRun ? 'DRY RUN' : 'LIVE RUN'); 34 + 35 + const config = loadConfig(); 36 + const pool = new pg.Pool({ connectionString: config.DATABASE_URL }); 37 + const db = drizzle(pool); 38 + const valkey = new Redis(config.VALKEY_URL ?? 'redis://localhost:6379'); 39 + const oauthClient = await createOAuthClient(config, db, valkey); 40 + 41 + // --- Build lexicon-conformant PDS records from local DB rows --- 42 + 43 + interface SkillRecord { 44 + $type: 'id.sifa.profile.skill'; 45 + name: string; 46 + category?: string; 47 + createdAt: string; 48 + } 49 + 50 + interface PositionRecord { 51 + $type: 'id.sifa.profile.position'; 52 + company: string; 53 + companyDid?: string; 54 + title: string; 55 + description?: string; 56 + employmentType?: string; 57 + workplaceType?: string; 58 + location?: { country: string; region?: string; city?: string; countryCode?: string }; 59 + startedAt: string; 60 + endedAt?: string; 61 + isPrimary?: boolean; 62 + skills?: Array<{ uri: string; cid: string }>; 63 + createdAt: string; 64 + } 65 + 66 + interface EducationRecord { 67 + $type: 'id.sifa.profile.education'; 68 + institution: string; 69 + institutionDid?: string; 70 + degree?: string; 71 + fieldOfStudy?: string; 72 + description?: string; 73 + startedAt?: string; 74 + endedAt?: string; 75 + createdAt: string; 76 + } 77 + 78 + function buildSkillRecord(row: typeof skills.$inferSelect): SkillRecord { 79 + const rec: SkillRecord = { 80 + $type: 'id.sifa.profile.skill', 81 + name: row.name, 82 + createdAt: row.createdAt.toISOString(), 83 + }; 84 + if (row.category) rec.category = row.category; 85 + return rec; 86 + } 87 + 88 + function buildPositionRecord( 89 + row: typeof positions.$inferSelect, 90 + skillRkeys: string[], 91 + ): PositionRecord { 92 + const rec: PositionRecord = { 93 + $type: 'id.sifa.profile.position', 94 + company: row.company, 95 + title: row.title, 96 + startedAt: row.startedAt, 97 + createdAt: row.createdAt.toISOString(), 98 + }; 99 + if (row.companyDid) rec.companyDid = row.companyDid; 100 + if (row.description) rec.description = row.description; 101 + if (row.employmentType) rec.employmentType = row.employmentType; 102 + if (row.workplaceType) rec.workplaceType = row.workplaceType; 103 + if (row.locationCountry) { 104 + rec.location = { country: row.locationCountry }; 105 + if (row.locationRegion) rec.location.region = row.locationRegion; 106 + if (row.locationCity) rec.location.city = row.locationCity; 107 + if (row.countryCode) rec.location.countryCode = row.countryCode; 108 + } 109 + if (row.endedAt) rec.endedAt = row.endedAt; 110 + if (row.isPrimary) rec.isPrimary = true; 111 + if (skillRkeys.length > 0) { 112 + rec.skills = skillRkeys.map((sk) => ({ 113 + uri: `at://${row.did}/id.sifa.profile.skill/${sk}`, 114 + cid: '', // CID will be resolved by PDS 115 + })); 116 + } 117 + return rec; 118 + } 119 + 120 + function buildEducationRecord(row: typeof education.$inferSelect): EducationRecord { 121 + const rec: EducationRecord = { 122 + $type: 'id.sifa.profile.education', 123 + institution: row.institution, 124 + createdAt: row.createdAt.toISOString(), 125 + }; 126 + if (row.institutionDid) rec.institutionDid = row.institutionDid; 127 + if (row.degree) rec.degree = row.degree; 128 + if (row.fieldOfStudy) rec.fieldOfStudy = row.fieldOfStudy; 129 + if (row.description) rec.description = row.description; 130 + if (row.startedAt) rec.startedAt = row.startedAt; 131 + if (row.endedAt) rec.endedAt = row.endedAt; 132 + return rec; 133 + } 134 + 135 + // --- Comparison: check if PDS record matches local DB --- 136 + 137 + function recordsMatch( 138 + pdsValue: Record<string, unknown>, 139 + dbRecord: Record<string, unknown>, 140 + ): boolean { 141 + // Compare all fields from the DB record against PDS 142 + for (const [key, val] of Object.entries(dbRecord)) { 143 + if (key === '$type') continue; 144 + const pdsVal = pdsValue[key]; 145 + if (val === undefined || val === null) continue; 146 + if (typeof val === 'object') { 147 + // Deep compare for location and skills 148 + if (JSON.stringify(val) !== JSON.stringify(pdsVal)) return false; 149 + } else if (pdsVal !== val) { 150 + return false; 151 + } 152 + } 153 + return true; 154 + } 155 + 156 + // --- Main --- 157 + 158 + const allProfiles = await db.select({ did: profiles.did, handle: profiles.handle }).from(profiles); 159 + log.info({ count: allProfiles.length }, 'Found profiles to repair'); 160 + 161 + const results = { repaired: 0, alreadyCorrect: 0, sessionExpired: 0, errors: 0, usersProcessed: 0 }; 162 + 163 + for (const { did, handle } of allProfiles) { 164 + try { 165 + const session = await oauthClient.restore(did); 166 + const agent = new Agent(session); 167 + let userRepairs = 0; 168 + 169 + // --- Skills --- 170 + const dbSkills = await db.select().from(skills).where(eq(skills.did, did)); 171 + if (dbSkills.length > 0) { 172 + const pdsRes = await agent.com.atproto.repo.listRecords({ 173 + repo: did, 174 + collection: 'id.sifa.profile.skill', 175 + limit: 200, 176 + }); 177 + const pdsMap = new Map( 178 + pdsRes.data.records.map((r) => [ 179 + r.uri.split('/').pop()!, 180 + r.value as Record<string, unknown>, 181 + ]), 182 + ); 183 + 184 + for (const row of dbSkills) { 185 + const dbRecord = buildSkillRecord(row) as unknown as Record<string, unknown>; 186 + const pdsValue = pdsMap.get(row.rkey); 187 + 188 + if (!pdsValue || !recordsMatch(pdsValue, dbRecord)) { 189 + log.info( 190 + { 191 + did, 192 + handle, 193 + collection: 'skill', 194 + rkey: row.rkey, 195 + name: row.name, 196 + hadPds: !!pdsValue, 197 + }, 198 + dryRun ? 'WOULD repair' : 'Repairing', 199 + ); 200 + if (!dryRun) { 201 + const writes: Array<Record<string, unknown>> = []; 202 + if (pdsValue) { 203 + writes.push({ 204 + $type: 'com.atproto.repo.applyWrites#delete', 205 + collection: 'id.sifa.profile.skill', 206 + rkey: row.rkey, 207 + }); 208 + } 209 + writes.push({ 210 + $type: 'com.atproto.repo.applyWrites#create', 211 + collection: 'id.sifa.profile.skill', 212 + rkey: row.rkey, 213 + value: dbRecord, 214 + }); 215 + await agent.com.atproto.repo.applyWrites({ repo: did, writes }); 216 + } 217 + userRepairs++; 218 + } 219 + } 220 + } 221 + 222 + // --- Positions --- 223 + const dbPositions = await db.select().from(positions).where(eq(positions.did, did)); 224 + if (dbPositions.length > 0) { 225 + const pdsRes = await agent.com.atproto.repo.listRecords({ 226 + repo: did, 227 + collection: 'id.sifa.profile.position', 228 + limit: 200, 229 + }); 230 + const pdsMap = new Map( 231 + pdsRes.data.records.map((r) => [ 232 + r.uri.split('/').pop()!, 233 + r.value as Record<string, unknown>, 234 + ]), 235 + ); 236 + 237 + // Fetch skill-position links for this user 238 + const allLinks = await db 239 + .select() 240 + .from(skillPositionLinks) 241 + .where(eq(skillPositionLinks.did, did)); 242 + const linksByPosition = new Map<string, string[]>(); 243 + for (const link of allLinks) { 244 + const arr = linksByPosition.get(link.positionRkey) ?? []; 245 + arr.push(link.skillRkey); 246 + linksByPosition.set(link.positionRkey, arr); 247 + } 248 + 249 + for (const row of dbPositions) { 250 + const skillRkeys = linksByPosition.get(row.rkey) ?? []; 251 + const dbRecord = buildPositionRecord(row, skillRkeys) as unknown as Record<string, unknown>; 252 + const pdsValue = pdsMap.get(row.rkey); 253 + 254 + if (!pdsValue || !recordsMatch(pdsValue, dbRecord)) { 255 + log.info( 256 + { 257 + did, 258 + handle, 259 + collection: 'position', 260 + rkey: row.rkey, 261 + company: row.company, 262 + hadPds: !!pdsValue, 263 + }, 264 + dryRun ? 'WOULD repair' : 'Repairing', 265 + ); 266 + if (!dryRun) { 267 + const writes: Array<Record<string, unknown>> = []; 268 + if (pdsValue) { 269 + writes.push({ 270 + $type: 'com.atproto.repo.applyWrites#delete', 271 + collection: 'id.sifa.profile.position', 272 + rkey: row.rkey, 273 + }); 274 + } 275 + writes.push({ 276 + $type: 'com.atproto.repo.applyWrites#create', 277 + collection: 'id.sifa.profile.position', 278 + rkey: row.rkey, 279 + value: dbRecord, 280 + }); 281 + await agent.com.atproto.repo.applyWrites({ repo: did, writes }); 282 + } 283 + userRepairs++; 284 + } 285 + } 286 + } 287 + 288 + // --- Education --- 289 + const dbEducation = await db.select().from(education).where(eq(education.did, did)); 290 + if (dbEducation.length > 0) { 291 + const pdsRes = await agent.com.atproto.repo.listRecords({ 292 + repo: did, 293 + collection: 'id.sifa.profile.education', 294 + limit: 200, 295 + }); 296 + const pdsMap = new Map( 297 + pdsRes.data.records.map((r) => [ 298 + r.uri.split('/').pop()!, 299 + r.value as Record<string, unknown>, 300 + ]), 301 + ); 302 + 303 + for (const row of dbEducation) { 304 + const dbRecord = buildEducationRecord(row) as unknown as Record<string, unknown>; 305 + const pdsValue = pdsMap.get(row.rkey); 306 + 307 + if (!pdsValue || !recordsMatch(pdsValue, dbRecord)) { 308 + log.info( 309 + { 310 + did, 311 + handle, 312 + collection: 'education', 313 + rkey: row.rkey, 314 + institution: row.institution, 315 + hadPds: !!pdsValue, 316 + }, 317 + dryRun ? 'WOULD repair' : 'Repairing', 318 + ); 319 + if (!dryRun) { 320 + const writes: Array<Record<string, unknown>> = []; 321 + if (pdsValue) { 322 + writes.push({ 323 + $type: 'com.atproto.repo.applyWrites#delete', 324 + collection: 'id.sifa.profile.education', 325 + rkey: row.rkey, 326 + }); 327 + } 328 + writes.push({ 329 + $type: 'com.atproto.repo.applyWrites#create', 330 + collection: 'id.sifa.profile.education', 331 + rkey: row.rkey, 332 + value: dbRecord, 333 + }); 334 + await agent.com.atproto.repo.applyWrites({ repo: did, writes }); 335 + } 336 + userRepairs++; 337 + } 338 + } 339 + } 340 + 341 + if (userRepairs > 0) { 342 + results.repaired += userRepairs; 343 + log.info({ did, handle, repairs: userRepairs }, 'User repaired'); 344 + } else { 345 + results.alreadyCorrect++; 346 + } 347 + results.usersProcessed++; 348 + 349 + // Rate limit: 100ms between users 350 + await new Promise((r) => setTimeout(r, 100)); 351 + } catch (err: unknown) { 352 + const msg = err instanceof Error ? err.message : String(err); 353 + if ( 354 + msg.includes('invalid_grant') || 355 + msg.includes('expired') || 356 + msg.includes('token_revoked') || 357 + msg.includes('deleted by another process') || 358 + msg.includes('XRPC Not Found') 359 + ) { 360 + results.sessionExpired++; 361 + } else { 362 + log.error({ did, handle, err: msg }, 'Failed to repair'); 363 + results.errors++; 364 + } 365 + } 366 + } 367 + 368 + log.info(results, 'PDS repair complete'); 369 + valkey.disconnect(); 370 + await pool.end();