Sifa professional network API (Fastify, AT Protocol, Jetstream) sifa.id/

fix(api): write-through to local DB after PDS writes (#81)

* fix(api): write-through to local DB after PDS writes

After a successful PDS write, immediately index the record in the local
database so GET requests reflect the change without waiting for the
Jetstream firehose to deliver the event. This eliminates the race
condition where page refresh showed stale data (skills, positions,
education, etc. appearing to vanish after adding/removing).

Extract DB write logic from Jetstream indexers into a shared
record-indexer service. Both write routes and Jetstream handlers now
call the same idempotent upsert functions -- when Jetstream eventually
delivers the duplicate event, the ON CONFLICT clause makes it a no-op.

Follows the write-through pattern used by Cloudflare's Statusphere
(the canonical ATProto AppView example) for read-your-own-writes
consistency on custom lexicons.

Closes #79

* style: format new files with prettier

* fix(tests): use unique canonical skill slug to avoid cross-suite FK conflict

The record-indexer test and skill-normalization-db test both created a
canonical skill with slug 'javascript'. When running together, cleanup
order caused FK violations. Use a unique slug 'writethrough-testlang'
to isolate the test data.

authored by

Guido X Jansen and committed by
GitHub
c208ef0a b7943f71

+927 -474
+5 -37
src/jetstream/indexers/certification.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { certifications } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.certification'; 7 6 8 7 export function createCertificationIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db 17 - .delete(certifications) 18 - .where(and(eq(certifications.did, did), eq(certifications.rkey, rkey))); 19 - logger.info({ did, rkey }, 'Deleted certification'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 20 16 return; 21 17 } 22 18 23 19 if (!record) return; 24 - 25 - await db 26 - .insert(certifications) 27 - .values({ 28 - did, 29 - rkey, 30 - name: sanitize(record.name as string), 31 - authority: sanitizeOptional(record.authority as string | undefined) ?? null, 32 - credentialId: (record.credentialId as string) ?? null, 33 - credentialUrl: (record.credentialUrl as string) ?? null, 34 - issuedAt: (record.issuedAt as string) ?? null, 35 - expiresAt: (record.expiresAt as string) ?? null, 36 - createdAt: new Date(record.createdAt as string), 37 - indexedAt: new Date(), 38 - }) 39 - .onConflictDoUpdate({ 40 - target: [certifications.did, certifications.rkey], 41 - set: { 42 - name: sanitize(record.name as string), 43 - authority: sanitizeOptional(record.authority as string | undefined) ?? null, 44 - credentialId: (record.credentialId as string) ?? null, 45 - credentialUrl: (record.credentialUrl as string) ?? null, 46 - issuedAt: (record.issuedAt as string) ?? null, 47 - expiresAt: (record.expiresAt as string) ?? null, 48 - indexedAt: new Date(), 49 - }, 50 - }); 51 - 52 - logger.info({ did, rkey, operation }, 'Indexed certification'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 53 21 }; 54 22 }
+5 -29
src/jetstream/indexers/course.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { courses } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.course'; 7 6 8 7 export function createCourseIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db.delete(courses).where(and(eq(courses.did, did), eq(courses.rkey, rkey))); 17 - logger.info({ did, rkey }, 'Deleted course'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 18 16 return; 19 17 } 20 18 21 19 if (!record) return; 22 - 23 - await db 24 - .insert(courses) 25 - .values({ 26 - did, 27 - rkey, 28 - name: sanitize(record.name as string), 29 - number: sanitizeOptional(record.number as string | undefined) ?? null, 30 - institution: sanitizeOptional(record.institution as string | undefined) ?? null, 31 - createdAt: new Date(record.createdAt as string), 32 - indexedAt: new Date(), 33 - }) 34 - .onConflictDoUpdate({ 35 - target: [courses.did, courses.rkey], 36 - set: { 37 - name: sanitize(record.name as string), 38 - number: sanitizeOptional(record.number as string | undefined) ?? null, 39 - institution: sanitizeOptional(record.institution as string | undefined) ?? null, 40 - indexedAt: new Date(), 41 - }, 42 - }); 43 - 44 - logger.info({ did, rkey, operation }, 'Indexed course'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 45 21 }; 46 22 }
+3 -37
src/jetstream/indexers/education.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { education } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexEducation, deleteEducation } from '../../services/record-indexer.js'; 7 4 8 5 export function createEducationIndexer(db: Database) { 9 6 return async (event: JetstreamEvent) => { ··· 13 10 const { operation, rkey, record } = commit; 14 11 15 12 if (operation === 'delete') { 16 - await db.delete(education).where(and(eq(education.did, did), eq(education.rkey, rkey))); 17 - logger.info({ did, rkey }, 'Deleted education'); 13 + await deleteEducation(db, did, rkey); 18 14 return; 19 15 } 20 16 21 17 if (!record) return; 22 - 23 - await db 24 - .insert(education) 25 - .values({ 26 - did, 27 - rkey, 28 - institution: sanitize(record.institution as string), 29 - institutionDid: (record.institutionDid as string) ?? null, 30 - degree: sanitizeOptional(record.degree as string | undefined) ?? null, 31 - fieldOfStudy: sanitizeOptional(record.fieldOfStudy as string | undefined) ?? null, 32 - description: sanitizeOptional(record.description as string | undefined) ?? null, 33 - startDate: (record.startDate as string) ?? null, 34 - endDate: (record.endDate as string) ?? null, 35 - createdAt: new Date(record.createdAt as string), 36 - indexedAt: new Date(), 37 - }) 38 - .onConflictDoUpdate({ 39 - target: [education.did, education.rkey], 40 - set: { 41 - institution: sanitize(record.institution as string), 42 - institutionDid: (record.institutionDid as string) ?? null, 43 - degree: sanitizeOptional(record.degree as string | undefined) ?? null, 44 - fieldOfStudy: sanitizeOptional(record.fieldOfStudy as string | undefined) ?? null, 45 - description: sanitizeOptional(record.description as string | undefined) ?? null, 46 - startDate: (record.startDate as string) ?? null, 47 - endDate: (record.endDate as string) ?? null, 48 - indexedAt: new Date(), 49 - }, 50 - }); 51 - 52 - logger.info({ did, rkey, operation }, 'Indexed education'); 18 + await indexEducation(db, did, rkey, record); 53 19 }; 54 20 }
+5 -35
src/jetstream/indexers/external-account.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { externalAccounts } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.externalAccount'; 7 6 8 7 export function createExternalAccountIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db 17 - .delete(externalAccounts) 18 - .where(and(eq(externalAccounts.did, did), eq(externalAccounts.rkey, rkey))); 19 - logger.info({ did, rkey }, 'Deleted external account'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 20 16 return; 21 17 } 22 18 23 19 if (!record) return; 24 - 25 - await db 26 - .insert(externalAccounts) 27 - .values({ 28 - did, 29 - rkey, 30 - platform: sanitize(record.platform as string), 31 - url: sanitize(record.url as string), 32 - label: sanitizeOptional(record.label as string | undefined) ?? null, 33 - feedUrl: sanitizeOptional(record.feedUrl as string | undefined) ?? null, 34 - isPrimary: (record.isPrimary as boolean) ?? false, 35 - createdAt: new Date(record.createdAt as string), 36 - indexedAt: new Date(), 37 - }) 38 - .onConflictDoUpdate({ 39 - target: [externalAccounts.did, externalAccounts.rkey], 40 - set: { 41 - platform: sanitize(record.platform as string), 42 - url: sanitize(record.url as string), 43 - label: sanitizeOptional(record.label as string | undefined) ?? null, 44 - feedUrl: sanitizeOptional(record.feedUrl as string | undefined) ?? null, 45 - isPrimary: (record.isPrimary as boolean) ?? false, 46 - indexedAt: new Date(), 47 - }, 48 - }); 49 - 50 - logger.info({ did, rkey, operation }, 'Indexed external account'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 51 21 }; 52 22 }
+5 -31
src/jetstream/indexers/honor.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { honors } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.honor'; 7 6 8 7 export function createHonorIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db.delete(honors).where(and(eq(honors.did, did), eq(honors.rkey, rkey))); 17 - logger.info({ did, rkey }, 'Deleted honor'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 18 16 return; 19 17 } 20 18 21 19 if (!record) return; 22 - 23 - await db 24 - .insert(honors) 25 - .values({ 26 - did, 27 - rkey, 28 - title: sanitize(record.title as string), 29 - issuer: sanitizeOptional(record.issuer as string | undefined) ?? null, 30 - description: sanitizeOptional(record.description as string | undefined) ?? null, 31 - awardedAt: (record.awardedAt as string) ?? null, 32 - createdAt: new Date(record.createdAt as string), 33 - indexedAt: new Date(), 34 - }) 35 - .onConflictDoUpdate({ 36 - target: [honors.did, honors.rkey], 37 - set: { 38 - title: sanitize(record.title as string), 39 - issuer: sanitizeOptional(record.issuer as string | undefined) ?? null, 40 - description: sanitizeOptional(record.description as string | undefined) ?? null, 41 - awardedAt: (record.awardedAt as string) ?? null, 42 - indexedAt: new Date(), 43 - }, 44 - }); 45 - 46 - logger.info({ did, rkey, operation }, 'Indexed honor'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 47 21 }; 48 22 }
+5 -27
src/jetstream/indexers/language.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { languages } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.language'; 7 6 8 7 export function createLanguageIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db.delete(languages).where(and(eq(languages.did, did), eq(languages.rkey, rkey))); 17 - logger.info({ did, rkey }, 'Deleted language'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 18 16 return; 19 17 } 20 18 21 19 if (!record) return; 22 - 23 - await db 24 - .insert(languages) 25 - .values({ 26 - did, 27 - rkey, 28 - name: sanitize(record.name as string), 29 - proficiency: (record.proficiency as string) ?? null, 30 - createdAt: new Date(record.createdAt as string), 31 - indexedAt: new Date(), 32 - }) 33 - .onConflictDoUpdate({ 34 - target: [languages.did, languages.rkey], 35 - set: { 36 - name: sanitize(record.name as string), 37 - proficiency: (record.proficiency as string) ?? null, 38 - indexedAt: new Date(), 39 - }, 40 - }); 41 - 42 - logger.info({ did, rkey, operation }, 'Indexed language'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 43 21 }; 44 22 }
+3 -98
src/jetstream/indexers/position.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { positions, skillPositionLinks } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 7 - 8 - interface StrongRef { 9 - uri: string; 10 - cid: string; 11 - } 12 - 13 - /** Extract rkey from an AT Protocol URI: at://did/collection/rkey */ 14 - function parseRkeyFromUri(uri: string): string | null { 15 - const parts = uri.split('/'); 16 - return parts.length >= 5 ? (parts[4] ?? null) : null; 17 - } 18 - 19 - interface RecordLocation { 20 - country?: string; 21 - region?: string; 22 - city?: string; 23 - countryCode?: string; 24 - } 3 + import { indexPosition, deletePosition } from '../../services/record-indexer.js'; 25 4 26 5 export function createPositionIndexer(db: Database) { 27 6 return async (event: JetstreamEvent) => { ··· 31 10 const { operation, rkey, record } = commit; 32 11 33 12 if (operation === 'delete') { 34 - await db 35 - .delete(skillPositionLinks) 36 - .where(and(eq(skillPositionLinks.did, did), eq(skillPositionLinks.positionRkey, rkey))); 37 - await db.delete(positions).where(and(eq(positions.did, did), eq(positions.rkey, rkey))); 38 - logger.info({ did, rkey }, 'Deleted position'); 13 + await deletePosition(db, did, rkey); 39 14 return; 40 15 } 41 16 42 17 if (!record) return; 43 - 44 - const location = record.location as RecordLocation | undefined; 45 - 46 - await db 47 - .insert(positions) 48 - .values({ 49 - did, 50 - rkey, 51 - companyName: sanitize(record.companyName as string), 52 - companyDid: (record.companyDid as string) ?? null, 53 - title: sanitize(record.title as string), 54 - description: sanitizeOptional(record.description as string | undefined) ?? null, 55 - employmentType: (record.employmentType as string) ?? null, 56 - workplaceType: (record.workplaceType as string) ?? null, 57 - locationCountry: sanitizeOptional(location?.country) ?? null, 58 - locationRegion: sanitizeOptional(location?.region) ?? null, 59 - locationCity: sanitizeOptional(location?.city) ?? null, 60 - countryCode: sanitizeOptional(location?.countryCode) ?? null, 61 - startDate: record.startDate as string, 62 - endDate: (record.endDate as string) ?? null, 63 - current: (record.current as boolean) ?? false, 64 - createdAt: new Date(record.createdAt as string), 65 - indexedAt: new Date(), 66 - }) 67 - .onConflictDoUpdate({ 68 - target: [positions.did, positions.rkey], 69 - set: { 70 - companyName: sanitize(record.companyName as string), 71 - companyDid: (record.companyDid as string) ?? null, 72 - title: sanitize(record.title as string), 73 - description: sanitizeOptional(record.description as string | undefined) ?? null, 74 - employmentType: (record.employmentType as string) ?? null, 75 - workplaceType: (record.workplaceType as string) ?? null, 76 - locationCountry: sanitizeOptional(location?.country) ?? null, 77 - locationRegion: sanitizeOptional(location?.region) ?? null, 78 - locationCity: sanitizeOptional(location?.city) ?? null, 79 - countryCode: sanitizeOptional(location?.countryCode) ?? null, 80 - startDate: record.startDate as string, 81 - endDate: (record.endDate as string) ?? null, 82 - current: (record.current as boolean) ?? false, 83 - indexedAt: new Date(), 84 - }, 85 - }); 86 - 87 - // Sync skill-position links: delete-and-replace strategy 88 - await db 89 - .delete(skillPositionLinks) 90 - .where(and(eq(skillPositionLinks.did, did), eq(skillPositionLinks.positionRkey, rkey))); 91 - 92 - const skillRefs = record.skills as StrongRef[] | undefined; 93 - if (skillRefs && Array.isArray(skillRefs) && skillRefs.length > 0) { 94 - const linkValues = skillRefs 95 - .map((ref) => { 96 - const skillRkey = parseRkeyFromUri(ref.uri); 97 - if (!skillRkey) { 98 - logger.warn( 99 - { did, rkey, uri: ref.uri }, 100 - 'Could not parse skill rkey from strongRef URI', 101 - ); 102 - return null; 103 - } 104 - return { did, positionRkey: rkey, skillRkey }; 105 - }) 106 - .filter((v): v is NonNullable<typeof v> => v !== null); 107 - 108 - if (linkValues.length > 0) { 109 - await db.insert(skillPositionLinks).values(linkValues).onConflictDoNothing(); 110 - } 111 - } 112 - 113 - logger.info({ did, rkey, operation, skillLinks: skillRefs?.length ?? 0 }, 'Indexed position'); 18 + await indexPosition(db, did, rkey, record); 114 19 }; 115 20 }
+5 -33
src/jetstream/indexers/project.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { projects } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.project'; 7 6 8 7 export function createProjectIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db.delete(projects).where(and(eq(projects.did, did), eq(projects.rkey, rkey))); 17 - logger.info({ did, rkey }, 'Deleted project'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 18 16 return; 19 17 } 20 18 21 19 if (!record) return; 22 - 23 - await db 24 - .insert(projects) 25 - .values({ 26 - did, 27 - rkey, 28 - name: sanitize(record.name as string), 29 - description: sanitizeOptional(record.description as string | undefined) ?? null, 30 - url: (record.url as string) ?? null, 31 - startedAt: (record.startedAt as string) ?? null, 32 - endedAt: (record.endedAt as string) ?? null, 33 - createdAt: new Date(record.createdAt as string), 34 - indexedAt: new Date(), 35 - }) 36 - .onConflictDoUpdate({ 37 - target: [projects.did, projects.rkey], 38 - set: { 39 - name: sanitize(record.name as string), 40 - description: sanitizeOptional(record.description as string | undefined) ?? null, 41 - url: (record.url as string) ?? null, 42 - startedAt: (record.startedAt as string) ?? null, 43 - endedAt: (record.endedAt as string) ?? null, 44 - indexedAt: new Date(), 45 - }, 46 - }); 47 - 48 - logger.info({ did, rkey, operation }, 'Indexed project'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 49 21 }; 50 22 }
+5 -35
src/jetstream/indexers/publication.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { publications } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.publication'; 7 6 8 7 export function createPublicationIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db 17 - .delete(publications) 18 - .where(and(eq(publications.did, did), eq(publications.rkey, rkey))); 19 - logger.info({ did, rkey }, 'Deleted publication'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 20 16 return; 21 17 } 22 18 23 19 if (!record) return; 24 - 25 - await db 26 - .insert(publications) 27 - .values({ 28 - did, 29 - rkey, 30 - title: sanitize(record.title as string), 31 - publisher: sanitizeOptional(record.publisher as string | undefined) ?? null, 32 - url: (record.url as string) ?? null, 33 - description: sanitizeOptional(record.description as string | undefined) ?? null, 34 - publishedAt: (record.publishedAt as string) ?? null, 35 - createdAt: new Date(record.createdAt as string), 36 - indexedAt: new Date(), 37 - }) 38 - .onConflictDoUpdate({ 39 - target: [publications.did, publications.rkey], 40 - set: { 41 - title: sanitize(record.title as string), 42 - publisher: sanitizeOptional(record.publisher as string | undefined) ?? null, 43 - url: (record.url as string) ?? null, 44 - description: sanitizeOptional(record.description as string | undefined) ?? null, 45 - publishedAt: (record.publishedAt as string) ?? null, 46 - indexedAt: new Date(), 47 - }, 48 - }); 49 - 50 - logger.info({ did, rkey, operation }, 'Indexed publication'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 51 21 }; 52 22 }
+3 -75
src/jetstream/indexers/skill.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { skills, canonicalSkills } from '../../db/schema/index.js'; 3 - import { and, eq, sql } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 7 - import { resolveSkill } from '../../services/skill-normalization.js'; 3 + import { indexSkill, deleteSkill } from '../../services/record-indexer.js'; 8 4 9 5 export function createSkillIndexer(db: Database) { 10 6 return async (event: JetstreamEvent) => { ··· 14 10 const { operation, rkey, record } = commit; 15 11 16 12 if (operation === 'delete') { 17 - // Look up canonical_skill_id before deleting so we can decrement user_count 18 - const existing = await db 19 - .select({ canonicalSkillId: skills.canonicalSkillId }) 20 - .from(skills) 21 - .where(and(eq(skills.did, did), eq(skills.rkey, rkey))) 22 - .limit(1); 23 - 24 - await db.delete(skills).where(and(eq(skills.did, did), eq(skills.rkey, rkey))); 25 - 26 - if (existing[0]?.canonicalSkillId) { 27 - await db 28 - .update(canonicalSkills) 29 - .set({ userCount: sql`GREATEST(${canonicalSkills.userCount} - 1, 0)` }) 30 - .where(eq(canonicalSkills.id, existing[0].canonicalSkillId)); 31 - } 32 - 33 - logger.info({ did, rkey }, 'Deleted skill'); 13 + await deleteSkill(db, did, rkey); 34 14 return; 35 15 } 36 16 37 17 if (!record) return; 38 - 39 - const skillName = sanitize(record.skillName as string); 40 - const category = sanitizeOptional(record.category as string | undefined) ?? null; 41 - 42 - // Run normalization pipeline 43 - const canonical = await resolveSkill(db, skillName); 44 - const canonicalSkillId = canonical?.id ?? null; 45 - 46 - // Check if this is an update (existing record may already be linked to a different canonical) 47 - const existing = await db 48 - .select({ canonicalSkillId: skills.canonicalSkillId }) 49 - .from(skills) 50 - .where(and(eq(skills.did, did), eq(skills.rkey, rkey))) 51 - .limit(1); 52 - 53 - const previousCanonicalId = existing[0]?.canonicalSkillId ?? null; 54 - 55 - await db 56 - .insert(skills) 57 - .values({ 58 - did, 59 - rkey, 60 - skillName, 61 - category, 62 - canonicalSkillId, 63 - createdAt: new Date(record.createdAt as string), 64 - indexedAt: new Date(), 65 - }) 66 - .onConflictDoUpdate({ 67 - target: [skills.did, skills.rkey], 68 - set: { 69 - skillName, 70 - category, 71 - canonicalSkillId, 72 - indexedAt: new Date(), 73 - }, 74 - }); 75 - 76 - // Update user_count: decrement old canonical (if changed), increment new 77 - if (previousCanonicalId && previousCanonicalId !== canonicalSkillId) { 78 - await db 79 - .update(canonicalSkills) 80 - .set({ userCount: sql`GREATEST(${canonicalSkills.userCount} - 1, 0)` }) 81 - .where(eq(canonicalSkills.id, previousCanonicalId)); 82 - } 83 - if (canonicalSkillId && canonicalSkillId !== previousCanonicalId) { 84 - await db 85 - .update(canonicalSkills) 86 - .set({ userCount: sql`${canonicalSkills.userCount} + 1` }) 87 - .where(eq(canonicalSkills.id, canonicalSkillId)); 88 - } 89 - 90 - logger.info({ did, rkey, operation, canonicalSkillId }, 'Indexed skill'); 18 + await indexSkill(db, did, rkey, record); 91 19 }; 92 20 }
+5 -37
src/jetstream/indexers/volunteering.ts
··· 1 1 import type { Database } from '../../db/index.js'; 2 - import { volunteering } from '../../db/schema/index.js'; 3 - import { and, eq } from 'drizzle-orm'; 4 2 import type { JetstreamEvent } from '../types.js'; 5 - import { logger } from '../../logger.js'; 6 - import { sanitize, sanitizeOptional } from '../../lib/sanitize.js'; 3 + import { indexRecord, deleteRecord } from '../../services/record-indexer.js'; 4 + 5 + const COLLECTION = 'id.sifa.profile.volunteering'; 7 6 8 7 export function createVolunteeringIndexer(db: Database) { 9 8 return async (event: JetstreamEvent) => { ··· 13 12 const { operation, rkey, record } = commit; 14 13 15 14 if (operation === 'delete') { 16 - await db 17 - .delete(volunteering) 18 - .where(and(eq(volunteering.did, did), eq(volunteering.rkey, rkey))); 19 - logger.info({ did, rkey }, 'Deleted volunteering'); 15 + await deleteRecord(db, COLLECTION, did, rkey); 20 16 return; 21 17 } 22 18 23 19 if (!record) return; 24 - 25 - await db 26 - .insert(volunteering) 27 - .values({ 28 - did, 29 - rkey, 30 - organization: sanitize(record.organization as string), 31 - role: sanitizeOptional(record.role as string | undefined) ?? null, 32 - cause: sanitizeOptional(record.cause as string | undefined) ?? null, 33 - description: sanitizeOptional(record.description as string | undefined) ?? null, 34 - startedAt: (record.startedAt as string) ?? null, 35 - endedAt: (record.endedAt as string) ?? null, 36 - createdAt: new Date(record.createdAt as string), 37 - indexedAt: new Date(), 38 - }) 39 - .onConflictDoUpdate({ 40 - target: [volunteering.did, volunteering.rkey], 41 - set: { 42 - organization: sanitize(record.organization as string), 43 - role: sanitizeOptional(record.role as string | undefined) ?? null, 44 - cause: sanitizeOptional(record.cause as string | undefined) ?? null, 45 - description: sanitizeOptional(record.description as string | undefined) ?? null, 46 - startedAt: (record.startedAt as string) ?? null, 47 - endedAt: (record.endedAt as string) ?? null, 48 - indexedAt: new Date(), 49 - }, 50 - }); 51 - 52 - logger.info({ did, rkey, operation }, 'Indexed volunteering'); 20 + await indexRecord(db, COLLECTION, did, rkey, record); 53 21 }; 54 22 }
+8
src/routes/external-accounts.ts
··· 15 15 import { createAuthMiddleware, getAuthContext } from '../middleware/auth.js'; 16 16 import { discoverFeedUrl, fetchFeedItems } from '../services/feed-discovery.js'; 17 17 import { checkAndStoreVerification, isVerifiablePlatform } from '../services/verification.js'; 18 + import { indexRecord, deleteRecord } from '../services/record-indexer.js'; 18 19 19 20 const FEED_CACHE_TTL = 1800; // 30 minutes 20 21 ··· 56 57 await writeToUserPds(session, did, [ 57 58 buildApplyWritesOp('create', 'id.sifa.profile.externalAccount', rkey, record), 58 59 ]); 60 + 61 + await indexRecord(db, 'id.sifa.profile.externalAccount', did, rkey, record); 59 62 60 63 // Trigger verification in the background 61 64 const [profile] = await db.select().from(profiles).where(eq(profiles.did, did)).limit(1); ··· 111 114 buildApplyWritesOp('update', 'id.sifa.profile.externalAccount', rkey, record), 112 115 ]); 113 116 117 + await indexRecord(db, 'id.sifa.profile.externalAccount', did, rkey, record); 118 + 114 119 // Re-verify in the background 115 120 const [profile] = await db.select().from(profiles).where(eq(profiles.did, did)).limit(1); 116 121 if (profile) { ··· 141 146 ]); 142 147 } catch (err) { 143 148 if (isPdsRecordNotFound(err)) { 149 + await deleteRecord(db, 'id.sifa.profile.externalAccount', did, rkey); 144 150 return reply.status(200).send({ ok: true }); 145 151 } 146 152 return handlePdsError(err, reply); 147 153 } 154 + 155 + await deleteRecord(db, 'id.sifa.profile.externalAccount', did, rkey); 148 156 149 157 return reply.status(200).send({ ok: true }); 150 158 },
+38
src/routes/profile-write.ts
··· 35 35 import { sanitize, sanitizeOptional } from '../lib/sanitize.js'; 36 36 import { wipeSifaData } from '../services/profile-wipe.js'; 37 37 import { 38 + indexSkill, 39 + deleteSkill, 40 + indexPosition, 41 + deletePosition, 42 + indexEducation, 43 + deleteEducation, 44 + indexRecord, 45 + deleteRecord, 46 + } from '../services/record-indexer.js'; 47 + import { 38 48 sessions as sessionsTable, 39 49 oauthSessions as oauthSessionsTable, 40 50 } from '../db/schema/index.js'; ··· 94 104 buildApplyWritesOp('create', 'id.sifa.profile.position', rkey, record), 95 105 ]); 96 106 107 + // Write-through: index in local DB immediately 108 + await indexPosition(db, did, rkey, record); 109 + 97 110 return reply.status(201).send({ rkey }); 98 111 }); 99 112 ··· 118 131 buildApplyWritesOp('update', 'id.sifa.profile.position', rkey, record), 119 132 ]); 120 133 134 + // Write-through: update local DB immediately 135 + await indexPosition(db, did, rkey, record); 136 + 121 137 return reply.status(200).send({ ok: true }); 122 138 }, 123 139 ); ··· 136 152 ]); 137 153 } catch (err) { 138 154 if (isPdsRecordNotFound(err)) { 155 + await deletePosition(db, did, rkey); 139 156 return reply.status(200).send({ ok: true }); 140 157 } 141 158 return handlePdsError(err, reply); 142 159 } 143 160 161 + // Write-through: delete from local DB immediately 162 + await deletePosition(db, did, rkey); 163 + 144 164 return reply.status(200).send({ ok: true }); 145 165 }, 146 166 ); ··· 162 182 await writeToUserPds(session, did, [ 163 183 buildApplyWritesOp('create', 'id.sifa.profile.education', rkey, record), 164 184 ]); 185 + 186 + await indexEducation(db, did, rkey, record); 165 187 166 188 return reply.status(201).send({ rkey }); 167 189 }); ··· 186 208 await writeToUserPds(session, did, [ 187 209 buildApplyWritesOp('update', 'id.sifa.profile.education', rkey, record), 188 210 ]); 211 + 212 + await indexEducation(db, did, rkey, record); 189 213 190 214 return reply.status(200).send({ ok: true }); 191 215 }, ··· 205 229 ]); 206 230 } catch (err) { 207 231 if (isPdsRecordNotFound(err)) { 232 + await deleteEducation(db, did, rkey); 208 233 return reply.status(200).send({ ok: true }); 209 234 } 210 235 return handlePdsError(err, reply); 211 236 } 237 + 238 + await deleteEducation(db, did, rkey); 212 239 213 240 return reply.status(200).send({ ok: true }); 214 241 }, ··· 231 258 await writeToUserPds(session, did, [ 232 259 buildApplyWritesOp('create', 'id.sifa.profile.skill', rkey, record), 233 260 ]); 261 + 262 + await indexSkill(db, did, rkey, record); 234 263 235 264 return reply.status(201).send({ rkey }); 236 265 }); ··· 256 285 buildApplyWritesOp('update', 'id.sifa.profile.skill', rkey, record), 257 286 ]); 258 287 288 + await indexSkill(db, did, rkey, record); 289 + 259 290 return reply.status(200).send({ ok: true }); 260 291 }, 261 292 ); ··· 274 305 ]); 275 306 } catch (err) { 276 307 if (isPdsRecordNotFound(err)) { 308 + await deleteSkill(db, did, rkey); 277 309 return reply.status(200).send({ ok: true }); 278 310 } 279 311 return handlePdsError(err, reply); 280 312 } 313 + 314 + await deleteSkill(db, did, rkey); 281 315 282 316 return reply.status(200).send({ ok: true }); 283 317 }, ··· 304 338 const data = parsed.data as Record<string, unknown>; 305 339 const record: Record<string, unknown> = { createdAt: new Date().toISOString(), ...data }; 306 340 await writeToUserPds(session, did, [buildApplyWritesOp('create', collection, rkey, record)]); 341 + await indexRecord(db, collection, did, rkey, record); 307 342 return reply.status(201).send({ rkey }); 308 343 }, 309 344 ); ··· 327 362 const data = parsed.data as Record<string, unknown>; 328 363 const record: Record<string, unknown> = { createdAt: new Date().toISOString(), ...data }; 329 364 await writeToUserPds(session, did, [buildApplyWritesOp('update', collection, rkey, record)]); 365 + await indexRecord(db, collection, did, rkey, record); 330 366 return reply.status(200).send({ ok: true }); 331 367 }, 332 368 ); ··· 346 382 await writeToUserPds(session, did, [buildApplyWritesOp('delete', collection, rkey)]); 347 383 } catch (err) { 348 384 if (isPdsRecordNotFound(err)) { 385 + await deleteRecord(db, collection, did, rkey); 349 386 return reply.status(200).send({ ok: true }); 350 387 } 351 388 return handlePdsError(err, reply); 352 389 } 390 + await deleteRecord(db, collection, did, rkey); 353 391 return reply.status(200).send({ ok: true }); 354 392 }, 355 393 );
+396
src/services/record-indexer.ts
··· 1 + import type { Database } from '../db/index.js'; 2 + import { 3 + skills, 4 + positions, 5 + education, 6 + certifications, 7 + projects, 8 + volunteering, 9 + publications, 10 + courses, 11 + honors, 12 + languages, 13 + externalAccounts, 14 + canonicalSkills, 15 + skillPositionLinks, 16 + } from '../db/schema/index.js'; 17 + import { and, eq, sql } from 'drizzle-orm'; 18 + import { sanitize, sanitizeOptional } from '../lib/sanitize.js'; 19 + import { resolveSkill } from './skill-normalization.js'; 20 + import { logger } from '../logger.js'; 21 + 22 + // --- Skill --- 23 + 24 + export async function indexSkill( 25 + db: Database, 26 + did: string, 27 + rkey: string, 28 + record: Record<string, unknown>, 29 + ): Promise<void> { 30 + const skillName = sanitize(record.skillName as string); 31 + const category = sanitizeOptional(record.category as string | undefined) ?? null; 32 + 33 + const canonical = await resolveSkill(db, skillName); 34 + const canonicalSkillId = canonical?.id ?? null; 35 + 36 + const existing = await db 37 + .select({ canonicalSkillId: skills.canonicalSkillId }) 38 + .from(skills) 39 + .where(and(eq(skills.did, did), eq(skills.rkey, rkey))) 40 + .limit(1); 41 + 42 + const previousCanonicalId = existing[0]?.canonicalSkillId ?? null; 43 + 44 + await db 45 + .insert(skills) 46 + .values({ 47 + did, 48 + rkey, 49 + skillName, 50 + category, 51 + canonicalSkillId, 52 + createdAt: new Date(record.createdAt as string), 53 + indexedAt: new Date(), 54 + }) 55 + .onConflictDoUpdate({ 56 + target: [skills.did, skills.rkey], 57 + set: { 58 + skillName, 59 + category, 60 + canonicalSkillId, 61 + indexedAt: new Date(), 62 + }, 63 + }); 64 + 65 + if (previousCanonicalId && previousCanonicalId !== canonicalSkillId) { 66 + await db 67 + .update(canonicalSkills) 68 + .set({ userCount: sql`GREATEST(${canonicalSkills.userCount} - 1, 0)` }) 69 + .where(eq(canonicalSkills.id, previousCanonicalId)); 70 + } 71 + if (canonicalSkillId && canonicalSkillId !== previousCanonicalId) { 72 + await db 73 + .update(canonicalSkills) 74 + .set({ userCount: sql`${canonicalSkills.userCount} + 1` }) 75 + .where(eq(canonicalSkills.id, canonicalSkillId)); 76 + } 77 + 78 + logger.info({ did, rkey, canonicalSkillId }, 'Indexed skill (write-through)'); 79 + } 80 + 81 + export async function deleteSkill(db: Database, did: string, rkey: string): Promise<void> { 82 + const existing = await db 83 + .select({ canonicalSkillId: skills.canonicalSkillId }) 84 + .from(skills) 85 + .where(and(eq(skills.did, did), eq(skills.rkey, rkey))) 86 + .limit(1); 87 + 88 + await db.delete(skills).where(and(eq(skills.did, did), eq(skills.rkey, rkey))); 89 + 90 + if (existing[0]?.canonicalSkillId) { 91 + await db 92 + .update(canonicalSkills) 93 + .set({ userCount: sql`GREATEST(${canonicalSkills.userCount} - 1, 0)` }) 94 + .where(eq(canonicalSkills.id, existing[0].canonicalSkillId)); 95 + } 96 + 97 + logger.info({ did, rkey }, 'Deleted skill (write-through)'); 98 + } 99 + 100 + // --- Position --- 101 + 102 + function parseRkeyFromUri(uri: string): string | null { 103 + const parts = uri.split('/'); 104 + return parts.length >= 5 ? (parts[4] ?? null) : null; 105 + } 106 + 107 + interface StrongRef { 108 + uri: string; 109 + cid: string; 110 + } 111 + 112 + interface RecordLocation { 113 + country?: string; 114 + region?: string; 115 + city?: string; 116 + countryCode?: string; 117 + } 118 + 119 + export async function indexPosition( 120 + db: Database, 121 + did: string, 122 + rkey: string, 123 + record: Record<string, unknown>, 124 + ): Promise<void> { 125 + const location = record.location as RecordLocation | undefined; 126 + 127 + await db 128 + .insert(positions) 129 + .values({ 130 + did, 131 + rkey, 132 + companyName: sanitize(record.companyName as string), 133 + companyDid: (record.companyDid as string) ?? null, 134 + title: sanitize(record.title as string), 135 + description: sanitizeOptional(record.description as string | undefined) ?? null, 136 + employmentType: (record.employmentType as string) ?? null, 137 + workplaceType: (record.workplaceType as string) ?? null, 138 + locationCountry: sanitizeOptional(location?.country) ?? null, 139 + locationRegion: sanitizeOptional(location?.region) ?? null, 140 + locationCity: sanitizeOptional(location?.city) ?? null, 141 + countryCode: sanitizeOptional(location?.countryCode) ?? null, 142 + startDate: record.startDate as string, 143 + endDate: (record.endDate as string) ?? null, 144 + current: (record.current as boolean) ?? false, 145 + createdAt: new Date(record.createdAt as string), 146 + indexedAt: new Date(), 147 + }) 148 + .onConflictDoUpdate({ 149 + target: [positions.did, positions.rkey], 150 + set: { 151 + companyName: sanitize(record.companyName as string), 152 + companyDid: (record.companyDid as string) ?? null, 153 + title: sanitize(record.title as string), 154 + description: sanitizeOptional(record.description as string | undefined) ?? null, 155 + employmentType: (record.employmentType as string) ?? null, 156 + workplaceType: (record.workplaceType as string) ?? null, 157 + locationCountry: sanitizeOptional(location?.country) ?? null, 158 + locationRegion: sanitizeOptional(location?.region) ?? null, 159 + locationCity: sanitizeOptional(location?.city) ?? null, 160 + countryCode: sanitizeOptional(location?.countryCode) ?? null, 161 + startDate: record.startDate as string, 162 + endDate: (record.endDate as string) ?? null, 163 + current: (record.current as boolean) ?? false, 164 + indexedAt: new Date(), 165 + }, 166 + }); 167 + 168 + // Sync skill-position links: delete-and-replace 169 + await db 170 + .delete(skillPositionLinks) 171 + .where(and(eq(skillPositionLinks.did, did), eq(skillPositionLinks.positionRkey, rkey))); 172 + 173 + const skillRefs = record.skills as StrongRef[] | undefined; 174 + if (skillRefs && Array.isArray(skillRefs) && skillRefs.length > 0) { 175 + const linkValues = skillRefs 176 + .map((ref) => { 177 + const skillRkey = parseRkeyFromUri(ref.uri); 178 + if (!skillRkey) { 179 + logger.warn({ did, rkey, uri: ref.uri }, 'Could not parse skill rkey from strongRef URI'); 180 + return null; 181 + } 182 + return { did, positionRkey: rkey, skillRkey }; 183 + }) 184 + .filter((v): v is NonNullable<typeof v> => v !== null); 185 + 186 + if (linkValues.length > 0) { 187 + await db.insert(skillPositionLinks).values(linkValues).onConflictDoNothing(); 188 + } 189 + } 190 + 191 + logger.info( 192 + { did, rkey, skillLinks: skillRefs?.length ?? 0 }, 193 + 'Indexed position (write-through)', 194 + ); 195 + } 196 + 197 + export async function deletePosition(db: Database, did: string, rkey: string): Promise<void> { 198 + await db 199 + .delete(skillPositionLinks) 200 + .where(and(eq(skillPositionLinks.did, did), eq(skillPositionLinks.positionRkey, rkey))); 201 + await db.delete(positions).where(and(eq(positions.did, did), eq(positions.rkey, rkey))); 202 + logger.info({ did, rkey }, 'Deleted position (write-through)'); 203 + } 204 + 205 + // --- Education --- 206 + 207 + export async function indexEducation( 208 + db: Database, 209 + did: string, 210 + rkey: string, 211 + record: Record<string, unknown>, 212 + ): Promise<void> { 213 + await db 214 + .insert(education) 215 + .values({ 216 + did, 217 + rkey, 218 + institution: sanitize(record.institution as string), 219 + institutionDid: (record.institutionDid as string) ?? null, 220 + degree: sanitizeOptional(record.degree as string | undefined) ?? null, 221 + fieldOfStudy: sanitizeOptional(record.fieldOfStudy as string | undefined) ?? null, 222 + description: sanitizeOptional(record.description as string | undefined) ?? null, 223 + startDate: (record.startDate as string) ?? null, 224 + endDate: (record.endDate as string) ?? null, 225 + createdAt: new Date(record.createdAt as string), 226 + indexedAt: new Date(), 227 + }) 228 + .onConflictDoUpdate({ 229 + target: [education.did, education.rkey], 230 + set: { 231 + institution: sanitize(record.institution as string), 232 + institutionDid: (record.institutionDid as string) ?? null, 233 + degree: sanitizeOptional(record.degree as string | undefined) ?? null, 234 + fieldOfStudy: sanitizeOptional(record.fieldOfStudy as string | undefined) ?? null, 235 + description: sanitizeOptional(record.description as string | undefined) ?? null, 236 + startDate: (record.startDate as string) ?? null, 237 + endDate: (record.endDate as string) ?? null, 238 + indexedAt: new Date(), 239 + }, 240 + }); 241 + 242 + logger.info({ did, rkey }, 'Indexed education (write-through)'); 243 + } 244 + 245 + export async function deleteEducation(db: Database, did: string, rkey: string): Promise<void> { 246 + await db.delete(education).where(and(eq(education.did, did), eq(education.rkey, rkey))); 247 + logger.info({ did, rkey }, 'Deleted education (write-through)'); 248 + } 249 + 250 + // --- Generic record indexer for remaining collections --- 251 + 252 + const COLLECTION_INDEXERS: Record< 253 + string, 254 + { 255 + table: 256 + | typeof certifications 257 + | typeof projects 258 + | typeof volunteering 259 + | typeof publications 260 + | typeof courses 261 + | typeof honors 262 + | typeof languages 263 + | typeof externalAccounts; 264 + fields: (record: Record<string, unknown>) => Record<string, unknown>; 265 + } 266 + > = { 267 + 'id.sifa.profile.certification': { 268 + table: certifications, 269 + fields: (r) => ({ 270 + name: sanitize(r.name as string), 271 + authority: sanitizeOptional(r.authority as string | undefined) ?? null, 272 + credentialId: (r.credentialId as string) ?? null, 273 + credentialUrl: (r.credentialUrl as string) ?? null, 274 + issuedAt: (r.issuedAt as string) ?? null, 275 + expiresAt: (r.expiresAt as string) ?? null, 276 + }), 277 + }, 278 + 'id.sifa.profile.project': { 279 + table: projects, 280 + fields: (r) => ({ 281 + name: sanitize(r.name as string), 282 + description: sanitizeOptional(r.description as string | undefined) ?? null, 283 + url: (r.url as string) ?? null, 284 + startedAt: (r.startedAt as string) ?? null, 285 + endedAt: (r.endedAt as string) ?? null, 286 + }), 287 + }, 288 + 'id.sifa.profile.volunteering': { 289 + table: volunteering, 290 + fields: (r) => ({ 291 + organization: sanitize(r.organization as string), 292 + role: sanitizeOptional(r.role as string | undefined) ?? null, 293 + cause: sanitizeOptional(r.cause as string | undefined) ?? null, 294 + description: sanitizeOptional(r.description as string | undefined) ?? null, 295 + startedAt: (r.startedAt as string) ?? null, 296 + endedAt: (r.endedAt as string) ?? null, 297 + }), 298 + }, 299 + 'id.sifa.profile.publication': { 300 + table: publications, 301 + fields: (r) => ({ 302 + title: sanitize(r.title as string), 303 + publisher: sanitizeOptional(r.publisher as string | undefined) ?? null, 304 + url: (r.url as string) ?? null, 305 + description: sanitizeOptional(r.description as string | undefined) ?? null, 306 + publishedAt: (r.publishedAt as string) ?? null, 307 + }), 308 + }, 309 + 'id.sifa.profile.course': { 310 + table: courses, 311 + fields: (r) => ({ 312 + name: sanitize(r.name as string), 313 + number: sanitizeOptional(r.number as string | undefined) ?? null, 314 + institution: sanitizeOptional(r.institution as string | undefined) ?? null, 315 + }), 316 + }, 317 + 'id.sifa.profile.honor': { 318 + table: honors, 319 + fields: (r) => ({ 320 + title: sanitize(r.title as string), 321 + issuer: sanitizeOptional(r.issuer as string | undefined) ?? null, 322 + description: sanitizeOptional(r.description as string | undefined) ?? null, 323 + awardedAt: (r.awardedAt as string) ?? null, 324 + }), 325 + }, 326 + 'id.sifa.profile.language': { 327 + table: languages, 328 + fields: (r) => ({ 329 + name: sanitize(r.name as string), 330 + proficiency: (r.proficiency as string) ?? null, 331 + }), 332 + }, 333 + 'id.sifa.profile.externalAccount': { 334 + table: externalAccounts, 335 + fields: (r) => ({ 336 + platform: sanitize(r.platform as string), 337 + url: sanitize(r.url as string), 338 + label: sanitizeOptional(r.label as string | undefined) ?? null, 339 + feedUrl: sanitizeOptional(r.feedUrl as string | undefined) ?? null, 340 + isPrimary: (r.isPrimary as boolean) ?? false, 341 + }), 342 + }, 343 + }; 344 + 345 + export async function indexRecord( 346 + db: Database, 347 + collection: string, 348 + did: string, 349 + rkey: string, 350 + record: Record<string, unknown>, 351 + ): Promise<void> { 352 + const config = COLLECTION_INDEXERS[collection]; 353 + if (!config) { 354 + logger.warn({ collection }, 'No indexer config for collection'); 355 + return; 356 + } 357 + 358 + const fields = config.fields(record); 359 + const table = config.table as typeof certifications; // all have same (did, rkey) conflict target shape 360 + 361 + await db 362 + .insert(table) 363 + .values({ 364 + did, 365 + rkey, 366 + ...fields, 367 + createdAt: new Date(record.createdAt as string), 368 + indexedAt: new Date(), 369 + } as typeof table.$inferInsert) 370 + .onConflictDoUpdate({ 371 + target: [table.did, table.rkey], 372 + set: { 373 + ...fields, 374 + indexedAt: new Date(), 375 + } as Partial<typeof table.$inferInsert>, 376 + }); 377 + 378 + logger.info({ did, rkey, collection }, 'Indexed record (write-through)'); 379 + } 380 + 381 + export async function deleteRecord( 382 + db: Database, 383 + collection: string, 384 + did: string, 385 + rkey: string, 386 + ): Promise<void> { 387 + const config = COLLECTION_INDEXERS[collection]; 388 + if (!config) { 389 + logger.warn({ collection }, 'No indexer config for collection'); 390 + return; 391 + } 392 + 393 + const table = config.table as typeof certifications; 394 + await db.delete(table).where(and(eq(table.did, did), eq(table.rkey, rkey))); 395 + logger.info({ did, rkey, collection }, 'Deleted record (write-through)'); 396 + }
+436
tests/services/record-indexer.test.ts
··· 1 + import { describe, it, expect, beforeAll, afterAll, afterEach } from 'vitest'; 2 + import { createDb } from '../../src/db/index.js'; 3 + import { 4 + skills, 5 + positions, 6 + education, 7 + certifications, 8 + projects, 9 + volunteering, 10 + publications, 11 + courses, 12 + honors, 13 + languages, 14 + canonicalSkills, 15 + profiles, 16 + skillPositionLinks, 17 + } from '../../src/db/schema/index.js'; 18 + import { eq, and, sql } from 'drizzle-orm'; 19 + 20 + // These functions don't exist yet -- this test will fail (RED) 21 + import { 22 + indexSkill, 23 + deleteSkill, 24 + indexPosition, 25 + deletePosition, 26 + indexEducation, 27 + deleteEducation, 28 + indexRecord, 29 + deleteRecord, 30 + } from '../../src/services/record-indexer.js'; 31 + 32 + describe('Record indexer service', () => { 33 + const db = createDb(process.env.DATABASE_URL ?? 'postgresql://sifa:sifa@localhost:5432/sifa'); 34 + const testDid = 'did:plc:write-through-test'; 35 + 36 + beforeAll(async () => { 37 + await db 38 + .insert(profiles) 39 + .values({ 40 + did: testDid, 41 + handle: 'write-through-test.bsky.social', 42 + createdAt: new Date(), 43 + }) 44 + .onConflictDoNothing(); 45 + 46 + await db 47 + .insert(canonicalSkills) 48 + .values({ 49 + canonicalName: 'WriteThrough TestLang', 50 + slug: 'writethrough-testlang', 51 + category: 'technical', 52 + aliases: ['wt-test', 'writethrough-testlang'], 53 + userCount: 0, 54 + }) 55 + .onConflictDoNothing(); 56 + }); 57 + 58 + afterAll(async () => { 59 + await db.delete(skillPositionLinks).where(eq(skillPositionLinks.did, testDid)); 60 + await db.delete(skills).where(eq(skills.did, testDid)); 61 + await db.delete(positions).where(eq(positions.did, testDid)); 62 + await db.delete(education).where(eq(education.did, testDid)); 63 + await db.delete(certifications).where(eq(certifications.did, testDid)); 64 + await db.delete(projects).where(eq(projects.did, testDid)); 65 + await db.delete(volunteering).where(eq(volunteering.did, testDid)); 66 + await db.delete(publications).where(eq(publications.did, testDid)); 67 + await db.delete(courses).where(eq(courses.did, testDid)); 68 + await db.delete(honors).where(eq(honors.did, testDid)); 69 + await db.delete(languages).where(eq(languages.did, testDid)); 70 + await db.execute( 71 + sql`DELETE FROM canonical_skills WHERE slug = 'writethrough-testlang' AND canonical_name = 'WriteThrough TestLang'`, 72 + ); 73 + await db.execute(sql`DELETE FROM profiles WHERE did = ${testDid}`); 74 + await db.$client.end(); 75 + }); 76 + 77 + // --- Skill indexing --- 78 + 79 + describe('indexSkill', () => { 80 + afterEach(async () => { 81 + await db.delete(skills).where(eq(skills.did, testDid)); 82 + }); 83 + 84 + it('creates a skill record in local DB', async () => { 85 + await indexSkill(db, testDid, '3wt-skill-1', { 86 + skillName: 'TypeScript', 87 + category: 'technical', 88 + createdAt: '2026-03-16T00:00:00Z', 89 + }); 90 + 91 + const rows = await db 92 + .select() 93 + .from(skills) 94 + .where(and(eq(skills.did, testDid), eq(skills.rkey, '3wt-skill-1'))); 95 + expect(rows).toHaveLength(1); 96 + expect(rows[0].skillName).toBe('TypeScript'); 97 + expect(rows[0].category).toBe('technical'); 98 + }); 99 + 100 + it('resolves canonical skill when alias matches', async () => { 101 + await indexSkill(db, testDid, '3wt-skill-2', { 102 + skillName: 'WT-Test', 103 + category: 'technical', 104 + createdAt: '2026-03-16T00:00:00Z', 105 + }); 106 + 107 + const rows = await db 108 + .select() 109 + .from(skills) 110 + .where(and(eq(skills.did, testDid), eq(skills.rkey, '3wt-skill-2'))); 111 + expect(rows).toHaveLength(1); 112 + expect(rows[0].canonicalSkillId).not.toBeNull(); 113 + }); 114 + 115 + it('is idempotent -- second call updates, does not duplicate', async () => { 116 + await indexSkill(db, testDid, '3wt-skill-3', { 117 + skillName: 'React', 118 + category: 'technical', 119 + createdAt: '2026-03-16T00:00:00Z', 120 + }); 121 + await indexSkill(db, testDid, '3wt-skill-3', { 122 + skillName: 'React', 123 + category: 'technical', 124 + createdAt: '2026-03-16T00:00:00Z', 125 + }); 126 + 127 + const rows = await db 128 + .select() 129 + .from(skills) 130 + .where(and(eq(skills.did, testDid), eq(skills.rkey, '3wt-skill-3'))); 131 + expect(rows).toHaveLength(1); 132 + }); 133 + }); 134 + 135 + describe('deleteSkill', () => { 136 + it('removes a skill record from local DB', async () => { 137 + await indexSkill(db, testDid, '3wt-skill-del', { 138 + skillName: 'Python', 139 + createdAt: '2026-03-16T00:00:00Z', 140 + }); 141 + 142 + await deleteSkill(db, testDid, '3wt-skill-del'); 143 + 144 + const rows = await db 145 + .select() 146 + .from(skills) 147 + .where(and(eq(skills.did, testDid), eq(skills.rkey, '3wt-skill-del'))); 148 + expect(rows).toHaveLength(0); 149 + }); 150 + 151 + it('is idempotent -- deleting non-existent record does not throw', async () => { 152 + await expect(deleteSkill(db, testDid, '3wt-nonexistent')).resolves.not.toThrow(); 153 + }); 154 + }); 155 + 156 + // --- Position indexing --- 157 + 158 + describe('indexPosition', () => { 159 + afterEach(async () => { 160 + await db.delete(skillPositionLinks).where(eq(skillPositionLinks.did, testDid)); 161 + await db.delete(positions).where(eq(positions.did, testDid)); 162 + }); 163 + 164 + it('creates a position record in local DB', async () => { 165 + await indexPosition(db, testDid, '3wt-pos-1', { 166 + companyName: 'Acme Corp', 167 + title: 'Senior Engineer', 168 + startDate: '2024-01', 169 + current: true, 170 + createdAt: '2026-03-16T00:00:00Z', 171 + }); 172 + 173 + const rows = await db 174 + .select() 175 + .from(positions) 176 + .where(and(eq(positions.did, testDid), eq(positions.rkey, '3wt-pos-1'))); 177 + expect(rows).toHaveLength(1); 178 + expect(rows[0].companyName).toBe('Acme Corp'); 179 + expect(rows[0].title).toBe('Senior Engineer'); 180 + expect(rows[0].current).toBe(true); 181 + }); 182 + 183 + it('syncs skill-position links', async () => { 184 + // First create the skill 185 + await indexSkill(db, testDid, '3wt-link-skill', { 186 + skillName: 'Go', 187 + createdAt: '2026-03-16T00:00:00Z', 188 + }); 189 + 190 + await indexPosition(db, testDid, '3wt-pos-link', { 191 + companyName: 'GoLand', 192 + title: 'Go Dev', 193 + startDate: '2024-01', 194 + current: false, 195 + skills: [{ uri: `at://${testDid}/id.sifa.profile.skill/3wt-link-skill`, cid: 'bafyfake' }], 196 + createdAt: '2026-03-16T00:00:00Z', 197 + }); 198 + 199 + const links = await db 200 + .select() 201 + .from(skillPositionLinks) 202 + .where( 203 + and( 204 + eq(skillPositionLinks.did, testDid), 205 + eq(skillPositionLinks.positionRkey, '3wt-pos-link'), 206 + ), 207 + ); 208 + expect(links).toHaveLength(1); 209 + expect(links[0].skillRkey).toBe('3wt-link-skill'); 210 + }); 211 + 212 + it('is idempotent', async () => { 213 + const record = { 214 + companyName: 'Dupe Corp', 215 + title: 'Eng', 216 + startDate: '2024-01', 217 + current: false, 218 + createdAt: '2026-03-16T00:00:00Z', 219 + }; 220 + await indexPosition(db, testDid, '3wt-pos-idem', record); 221 + await indexPosition(db, testDid, '3wt-pos-idem', record); 222 + 223 + const rows = await db 224 + .select() 225 + .from(positions) 226 + .where(and(eq(positions.did, testDid), eq(positions.rkey, '3wt-pos-idem'))); 227 + expect(rows).toHaveLength(1); 228 + }); 229 + }); 230 + 231 + describe('deletePosition', () => { 232 + it('removes position and its skill links', async () => { 233 + await indexPosition(db, testDid, '3wt-pos-del', { 234 + companyName: 'Del Corp', 235 + title: 'Eng', 236 + startDate: '2024-01', 237 + current: false, 238 + createdAt: '2026-03-16T00:00:00Z', 239 + }); 240 + 241 + await deletePosition(db, testDid, '3wt-pos-del'); 242 + 243 + const rows = await db 244 + .select() 245 + .from(positions) 246 + .where(and(eq(positions.did, testDid), eq(positions.rkey, '3wt-pos-del'))); 247 + expect(rows).toHaveLength(0); 248 + }); 249 + }); 250 + 251 + // --- Education indexing --- 252 + 253 + describe('indexEducation', () => { 254 + afterEach(async () => { 255 + await db.delete(education).where(eq(education.did, testDid)); 256 + }); 257 + 258 + it('creates an education record in local DB', async () => { 259 + await indexEducation(db, testDid, '3wt-edu-1', { 260 + institution: 'MIT', 261 + degree: 'MSc', 262 + fieldOfStudy: 'Computer Science', 263 + startDate: '2020-09', 264 + endDate: '2022-06', 265 + createdAt: '2026-03-16T00:00:00Z', 266 + }); 267 + 268 + const rows = await db 269 + .select() 270 + .from(education) 271 + .where(and(eq(education.did, testDid), eq(education.rkey, '3wt-edu-1'))); 272 + expect(rows).toHaveLength(1); 273 + expect(rows[0].institution).toBe('MIT'); 274 + expect(rows[0].degree).toBe('MSc'); 275 + }); 276 + }); 277 + 278 + describe('deleteEducation', () => { 279 + it('removes an education record', async () => { 280 + await indexEducation(db, testDid, '3wt-edu-del', { 281 + institution: 'Oxford', 282 + createdAt: '2026-03-16T00:00:00Z', 283 + }); 284 + await deleteEducation(db, testDid, '3wt-edu-del'); 285 + 286 + const rows = await db 287 + .select() 288 + .from(education) 289 + .where(and(eq(education.did, testDid), eq(education.rkey, '3wt-edu-del'))); 290 + expect(rows).toHaveLength(0); 291 + }); 292 + }); 293 + 294 + // --- Generic indexRecord for remaining collections --- 295 + 296 + describe('indexRecord (generic)', () => { 297 + it('creates a certification record', async () => { 298 + await indexRecord(db, 'id.sifa.profile.certification', testDid, '3wt-cert-1', { 299 + name: 'AWS Solutions Architect', 300 + authority: 'Amazon', 301 + createdAt: '2026-03-16T00:00:00Z', 302 + }); 303 + 304 + const rows = await db 305 + .select() 306 + .from(certifications) 307 + .where(and(eq(certifications.did, testDid), eq(certifications.rkey, '3wt-cert-1'))); 308 + expect(rows).toHaveLength(1); 309 + expect(rows[0].name).toBe('AWS Solutions Architect'); 310 + }); 311 + 312 + it('creates a project record', async () => { 313 + await indexRecord(db, 'id.sifa.profile.project', testDid, '3wt-proj-1', { 314 + name: 'Open Source Tool', 315 + url: 'https://example.com', 316 + createdAt: '2026-03-16T00:00:00Z', 317 + }); 318 + 319 + const rows = await db 320 + .select() 321 + .from(projects) 322 + .where(and(eq(projects.did, testDid), eq(projects.rkey, '3wt-proj-1'))); 323 + expect(rows).toHaveLength(1); 324 + expect(rows[0].name).toBe('Open Source Tool'); 325 + }); 326 + 327 + it('creates a course record', async () => { 328 + await indexRecord(db, 'id.sifa.profile.course', testDid, '3wt-course-1', { 329 + name: 'Distributed Systems', 330 + institution: 'MIT OCW', 331 + createdAt: '2026-03-16T00:00:00Z', 332 + }); 333 + 334 + const rows = await db 335 + .select() 336 + .from(courses) 337 + .where(and(eq(courses.did, testDid), eq(courses.rkey, '3wt-course-1'))); 338 + expect(rows).toHaveLength(1); 339 + expect(rows[0].name).toBe('Distributed Systems'); 340 + }); 341 + 342 + it('creates a volunteering record', async () => { 343 + await indexRecord(db, 'id.sifa.profile.volunteering', testDid, '3wt-vol-1', { 344 + organization: 'Code for All', 345 + role: 'Mentor', 346 + createdAt: '2026-03-16T00:00:00Z', 347 + }); 348 + 349 + const rows = await db 350 + .select() 351 + .from(volunteering) 352 + .where(and(eq(volunteering.did, testDid), eq(volunteering.rkey, '3wt-vol-1'))); 353 + expect(rows).toHaveLength(1); 354 + expect(rows[0].organization).toBe('Code for All'); 355 + }); 356 + 357 + it('creates a publication record', async () => { 358 + await indexRecord(db, 'id.sifa.profile.publication', testDid, '3wt-pub-1', { 359 + title: 'My Paper', 360 + publisher: 'IEEE', 361 + createdAt: '2026-03-16T00:00:00Z', 362 + }); 363 + 364 + const rows = await db 365 + .select() 366 + .from(publications) 367 + .where(and(eq(publications.did, testDid), eq(publications.rkey, '3wt-pub-1'))); 368 + expect(rows).toHaveLength(1); 369 + expect(rows[0].title).toBe('My Paper'); 370 + }); 371 + 372 + it('creates an honor record', async () => { 373 + await indexRecord(db, 'id.sifa.profile.honor', testDid, '3wt-honor-1', { 374 + title: 'Best Paper Award', 375 + issuer: 'ACM', 376 + createdAt: '2026-03-16T00:00:00Z', 377 + }); 378 + 379 + const rows = await db 380 + .select() 381 + .from(honors) 382 + .where(and(eq(honors.did, testDid), eq(honors.rkey, '3wt-honor-1'))); 383 + expect(rows).toHaveLength(1); 384 + expect(rows[0].title).toBe('Best Paper Award'); 385 + }); 386 + 387 + it('creates a language record', async () => { 388 + await indexRecord(db, 'id.sifa.profile.language', testDid, '3wt-lang-1', { 389 + name: 'Dutch', 390 + proficiency: 'native', 391 + createdAt: '2026-03-16T00:00:00Z', 392 + }); 393 + 394 + const rows = await db 395 + .select() 396 + .from(languages) 397 + .where(and(eq(languages.did, testDid), eq(languages.rkey, '3wt-lang-1'))); 398 + expect(rows).toHaveLength(1); 399 + expect(rows[0].name).toBe('Dutch'); 400 + }); 401 + 402 + it('is idempotent for generic records', async () => { 403 + const record = { name: 'Idempotent Cert', createdAt: '2026-03-16T00:00:00Z' }; 404 + await indexRecord(db, 'id.sifa.profile.certification', testDid, '3wt-cert-idem', record); 405 + await indexRecord(db, 'id.sifa.profile.certification', testDid, '3wt-cert-idem', record); 406 + 407 + const rows = await db 408 + .select() 409 + .from(certifications) 410 + .where(and(eq(certifications.did, testDid), eq(certifications.rkey, '3wt-cert-idem'))); 411 + expect(rows).toHaveLength(1); 412 + }); 413 + }); 414 + 415 + describe('deleteRecord (generic)', () => { 416 + it('deletes a certification record', async () => { 417 + await indexRecord(db, 'id.sifa.profile.certification', testDid, '3wt-cert-del', { 418 + name: 'To Delete', 419 + createdAt: '2026-03-16T00:00:00Z', 420 + }); 421 + await deleteRecord(db, 'id.sifa.profile.certification', testDid, '3wt-cert-del'); 422 + 423 + const rows = await db 424 + .select() 425 + .from(certifications) 426 + .where(and(eq(certifications.did, testDid), eq(certifications.rkey, '3wt-cert-del'))); 427 + expect(rows).toHaveLength(0); 428 + }); 429 + 430 + it('is idempotent -- deleting non-existent record does not throw', async () => { 431 + await expect( 432 + deleteRecord(db, 'id.sifa.profile.certification', testDid, '3wt-nonexistent'), 433 + ).resolves.not.toThrow(); 434 + }); 435 + }); 436 + });