Switch to relational model of gmstn records #3

merged
opened by lewis.moe targeting main

The backfill and firehose logic should in fact save respective records into meaningful tables as they come in, instead of a raw table to straighten out after the fact. If it's found down the line that synchronous shunting of records into relational tables isn't fast enough, we can always decouple it with a raw table later again.

+33
biome.json
··· 1 + { 2 + "formatter": { 3 + "indentStyle": "space", 4 + "lineWidth": 100 5 + }, 6 + "linter": { 7 + "rules": { 8 + "a11y": { 9 + "useAriaPropsForRole": "off", 10 + "useButtonType": "off", 11 + "useSemanticElements": "off", 12 + "noSvgWithoutTitle": "off" 13 + }, 14 + "complexity": { 15 + "noStaticOnlyClass": "off", 16 + "noForEach": "off" 17 + }, 18 + "suspicious": { 19 + "noArrayIndexKey": "off", 20 + "noPrototypeBuiltins": "off" 21 + }, 22 + "style": { 23 + "noNonNullAssertion": "off" 24 + } 25 + } 26 + }, 27 + "javascript": { 28 + "formatter": { 29 + "quoteStyle": "single" 30 + } 31 + } 32 + } 33 +
+13
docker-compose.yaml
··· 1 + services: 2 + prism-db: 3 + image: postgres:latest 4 + environment: 5 + - POSTGRES_USER=prism 6 + - POSTGRES_PASSWORD=prism 7 + - POSTGRES_DB=prism 8 + ports: 9 + - 5432:5432 10 + healthcheck: 11 + test: 'exit 0' 12 + 13 +
-16
migrations/20251029194100_create_firehose_event_table.ts
··· 1 - import { Kysely, sql } from 'kysely' 2 - 3 - export async function up(db: Kysely<any>): Promise<void> { 4 - await db.schema 5 - .createTable('firehose_event') 6 - .addColumn('timestamp', 'timestamptz', (col) => 7 - col.notNull().defaultTo(sql`now()`) 8 - ) 9 - .addColumn('event_type', 'text', (col) => col.notNull()) 10 - .addColumn('event_data', 'jsonb', (col) => col.notNull()) 11 - .execute() 12 - } 13 - 14 - export async function down(db: Kysely<any>): Promise<void> { 15 - await db.schema.dropTable('firehose_event').execute() 16 - }
+86
migrations/20251108164300_create_channels_invites_users_shards.ts
··· 1 + import { Kysely, sql } from 'kysely' 2 + 3 + export async function up(db: Kysely<any>): Promise<void> { 4 + await db.schema 5 + .createTable('account') 6 + .addColumn('did', 'text', (col) => col.primaryKey().notNull()) 7 + .addColumn('handle', 'text', (col) => col.notNull().unique()) 8 + .addColumn('pds_hostname', 'text', (col) => 9 + col.references('pds.hostname') 10 + ) 11 + .addColumn('created_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) 12 + .execute() 13 + 14 + await db.schema 15 + .createTable('lattice') 16 + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) 17 + .addColumn('cid', 'text', (col) => col.notNull()) 18 + .addColumn('creator_did', 'text', (col) => 19 + col.references('account.did').notNull() 20 + ) 21 + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) 22 + .addColumn('data', 'jsonb', (col) => col.notNull()) 23 + .execute() 24 + 25 + await db.schema 26 + .createTable('shard') 27 + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) 28 + .addColumn('cid', 'text', (col) => col.notNull()) 29 + .addColumn('creator_did', 'text', (col) => 30 + col.references('account.did').notNull() 31 + ) 32 + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) 33 + .addColumn('data', 'jsonb', (col) => col.notNull()) 34 + .execute() 35 + 36 + await db.schema 37 + .createTable('channel') 38 + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) 39 + .addColumn('cid', 'text', (col) => col.notNull().unique()) 40 + .addColumn('creator_did', 'text', (col) => 41 + col.references('account.did').notNull() 42 + ) 43 + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) 44 + .addColumn('data', 'jsonb', (col) => col.notNull()) 45 + .execute() 46 + 47 + await db.schema 48 + .createTable('channel_invite') 49 + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) 50 + .addColumn('cid', 'text', (col) => col.notNull()) 51 + .addColumn('channel', 'text', (col) => 52 + col.references('channel.cid').notNull() 53 + ) 54 + .addColumn('creator_did', 'text', (col) => 55 + col.references('account.did').notNull() 56 + ) 57 + .addColumn('recipient_did', 'text', (col) => 58 + col.references('account.did').notNull() 59 + ) 60 + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) 61 + .addColumn('data', 'jsonb', (col) => col.notNull()) 62 + .execute() 63 + 64 + await db.schema 65 + .createTable('channel_membership') 66 + .addColumn('uri', 'text', (col) => col.primaryKey().notNull()) 67 + .addColumn('cid', 'text', (col) => col.notNull()) 68 + .addColumn('channel', 'text', (col) => 69 + col.references('channel.cid').notNull() 70 + ) 71 + .addColumn('recipient_did', 'text', (col) => 72 + col.references('account.did').notNull() 73 + ) 74 + .addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`)) 75 + .addColumn('data', 'jsonb', (col) => col.notNull()) 76 + .execute() 77 + } 78 + 79 + export async function down(db: Kysely<any>): Promise<void> { 80 + await db.schema.dropTable('channel_membership').execute() 81 + await db.schema.dropTable('channel_invite').execute() 82 + await db.schema.dropTable('channel').execute() 83 + await db.schema.dropTable('shard').execute() 84 + await db.schema.dropTable('lattice').execute() 85 + await db.schema.dropTable('account').execute() 86 + }
+2 -1
package.json
··· 6 6 "scripts": { 7 7 "test": "echo \"Error: no test specified\" && exit 1", 8 8 "dev": "tsx src/index.ts", 9 - "lint": "eslint src/", 9 + "lint": "npx biome lint ./src && npx biome format ./src", 10 + "lint:fix": "npx biome lint --fix ./src && npx biome format --fix ./src", 10 11 "db:migrate": "npx ts-node src/scripts/migrate.ts latest", 11 12 "db:revert": "npx ts-node src/scripts/migrate.ts down" 12 13 },
+180 -49
src/firehose.ts
··· 1 1 import { Firehose, CommitEvent, AccountEvent, IdentityEvent } from "@skyware/firehose"; 2 2 import WebSocket from "ws"; 3 3 import { db } from "./db"; 4 - import { Insertable } from "kysely"; 5 - import { FirehoseEventTable } from "./db"; 6 - 7 - const saveEvent = async (type: 'commit' | 'identity' | 'account', data: any) => { 8 -     try { 9 -         await db.insertInto('firehose_event').values({ 10 -             event_type: type, 11 -             event_data: data 12 -         }).execute(); 13 -     } catch (error) { 14 -         console.error("\nFailed to save event to database:", error); 15 -     } 16 - }; 4 + import { Insertable, sql } from "kysely"; 5 + 6 + interface AtpRecord { 7 + $type: string; 8 + createdAt: string; 9 + [key: string]: any; 10 + } 11 + 12 + interface NewAccount { 13 + did: string; 14 + handle: string; 15 + pds_hostname: string; 16 + created_at: any; 17 + } 18 + 19 + interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } 20 + interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } 21 + interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } 22 + 23 + interface NewChannelInvite { 24 + uri: string; 25 + cid: string; 26 + creator_did: string; 27 + indexed_at: string; 28 + data: AtpRecord; 29 + channel?: string; 30 + recipient_did?: string; 31 + } 32 + 33 + interface NewChannelMembership { 34 + uri: string; 35 + cid: string; 36 + indexed_at: string; 37 + data: AtpRecord; 38 + channel?: string; 39 + recipient_did?: string; 40 + } 41 + 42 + async function insertRecords(table: string, records: any[], did: string) { 43 + if (records.length === 0) return; 44 + try { 45 + await db.insertInto(table as any) 46 + .values(records) 47 + .execute(); 48 + console.log(`[${did}] Inserted ${records.length} records into '${table}'.`); 49 + } catch (e: any) { 50 + console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`); 51 + } 52 + } 53 + 54 + //TODO: de-duplicate from pds-backfill file. - Lewis 17 55 18 56 const main = () => { 19 -     console.log("Starting Firehose listener..."); 57 + console.log("Starting Firehose listener..."); 58 + 59 + const firehose = new Firehose({ 60 + ws: WebSocket, 61 + }); 62 + 63 + firehose.on("commit", async (commit: CommitEvent) => { 64 + const { repo: did, time: indexedAt } = commit 65 + 66 + const newLattices: NewLattice[] = [] 67 + const newShards: NewShard[] = [] 68 + const newChannels: NewChannel[] = [] 69 + const newInvites: NewChannelInvite[] = [] 70 + const newMemberships: NewChannelMembership[] = [] 71 + 72 + const LATTICE_LEXICON = 'systems.gmstn.development.lattice' 73 + const SHARD_LEXICON = 'systems.gmstn.development.shard' 74 + const CHANNEL_LEXICON = 'systems.gmstn.development.channel' 75 + const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite' 76 + const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership' 77 + 78 + const createOps = commit.ops.filter(op => op.action === 'create'); 79 + 80 + for (const op of createOps) { 81 + const record = op.record as AtpRecord 82 + const collection = record?.$type 83 + 84 + if (!collection || !collection.startsWith('systems.gmstn.development.')) { 85 + continue 86 + } 87 + 88 + const uri = op.uri 89 + const cid = op.cid.toString() 90 + const creatorDid = did 91 + 92 + if (!record.createdAt) { 93 + console.warn(`[${did}] Found matching record without 'createdAt', skipping. URI: ${uri}`); 94 + continue; 95 + } 96 + 97 + const baseRecord = { 98 + uri: uri, 99 + cid: cid, 100 + indexed_at: indexedAt, 101 + data: record, 102 + } 103 + 104 + switch (collection) { 105 + case LATTICE_LEXICON: 106 + newLattices.push({ 107 + ...baseRecord, 108 + creator_did: creatorDid 109 + } as NewLattice) 110 + break 111 + case SHARD_LEXICON: 112 + newShards.push({ 113 + ...baseRecord, 114 + creator_did: creatorDid 115 + } as NewShard) 116 + break 117 + case CHANNEL_LEXICON: 118 + newChannels.push({ 119 + ...baseRecord, 120 + creator_did: creatorDid 121 + } as NewChannel) 122 + break 123 + 124 + case CHANNEL_INVITE_LEXICON: { 125 + const recipientDid = record.recipient 20 126 21 -     const firehose = new Firehose({ 22 -         ws: WebSocket, 23 -     }); 127 + const existingAccount = await db.selectFrom('account') 128 + .select('did') 129 + .where('did', '=', recipientDid) 130 + .executeTakeFirst() 24 131 25 -     firehose.on("commit", async (commit: CommitEvent) => { 26 -         const createOps = commit.ops.filter(op => op.action === 'create'); 27 -         const relevantOps = []; 132 + if (!existingAccount) { 133 + try { 134 + const newAccount: NewAccount = { 135 + did: recipientDid, 136 + handle: recipientDid, 137 + // We'll probably resolve this later, no problem :3 138 + pds_hostname: null, 139 + created_at: sql`now()`, 140 + } 141 + await db.insertInto('account') 142 + .values(newAccount) 143 + .onConflict(oc => oc.column('did').doNothing()) 144 + .execute() 145 + console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`) 146 + } catch (e) { 147 + console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e) 148 + break 149 + } 150 + } 28 151 29 -         for (const op of createOps) { 30 -             const recordType = op.record['$type']; 152 + newInvites.push({ 153 + ...baseRecord, 154 + creator_did: did, 155 + channel: record.channel.cid, 156 + recipient_did: recipientDid, 157 + } as NewChannelInvite) 158 + break 159 + } 31 160 32 -             if (recordType && (recordType.startsWith('com.atproto.') || recordType.startsWith('systems.gmstn.'))) { 33 -                 relevantOps.push(op); 34 -             } 35 -         } 161 + case CHANNEL_MEMBERSHIP_LEXICON: 162 + newMemberships.push({ 163 + ...baseRecord, 164 + channel: record.channel, 165 + recipient_did: creatorDid, 166 + } as NewChannelMembership) 167 + break 36 168 37 -         if (relevantOps.length > 0) { 38 -             await saveEvent('commit', commit); 39 -         } 40 -     }); 169 + default: 170 + console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`) 171 + } 172 + } 41 173 42 -     firehose.on("identity", async (identity: IdentityEvent) => { 43 -         await saveEvent('identity', identity); 44 -     }); 174 + await insertRecords('lattice', newLattices, did) 175 + await insertRecords('shard', newShards, did) 176 + await insertRecords('channel', newChannels, did) 177 + await insertRecords('channel_invite', newInvites, did) 178 + await insertRecords('channel_membership', newMemberships, did) 179 + }); 45 180 46 -     firehose.on("account", async (account: AccountEvent) => { 47 -         await saveEvent('account', account); 48 -     }); 181 + firehose.on("open", () => { 182 + console.log("\nConnection opened"); 183 + }); 49 184 50 -     firehose.on("open", () => { 51 -         console.log("\nConnection opened"); 52 -     }); 185 + firehose.on("close", (cursor) => { 186 + console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`); 187 + firehose.start(); 188 + }); 53 189 54 -     firehose.on("close", (cursor) => { 55 -         console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`); 56 -         firehose.start(); 57 -     }); 190 + firehose.on("error", ({ error, cursor }) => { 191 + console.error(`\nAn error occurred at cursor ${cursor}:`, error); 192 + }); 58 193 59 -     firehose.on("error", ({ error, cursor }) => { 60 -         console.error(`\nAn error occurred at cursor ${cursor}:`, error); 61 -     }); 62 -      63 -     firehose.start(); 194 + firehose.start(); 64 195 65 -     console.log("Listeners attached. Waiting for events..."); 196 + console.log("Listeners attached. Waiting for events..."); 66 197 }; 67 198 68 199 main();
+236 -79
src/pds-backfill.ts
··· 1 + import { sql, Kysely } from 'kysely'; 1 2 import { db } from './db'; 2 3 3 - export interface FirehoseEventTable { 4 - timestamp: ColumnType<Date, string | Date, never>; 5 - event_type: string; 6 - event_data: Record<string, any>; 7 - } 8 - export type FirehoseEvent = Selectable<FirehoseEventTable>; 9 - export type NewFirehoseEvent = Insertable<FirehoseEventTable>; 10 - 11 4 interface AtpRecord { 12 5 $type: string; 13 6 createdAt: string; 14 7 [key: string]: any; 15 8 } 16 9 17 - async function processSingleRepo(pdsHostname: string, did: string) { 18 - const pdsBaseUrl = `https://` + pdsHostname; 19 - const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl); 20 - getRepoUrl.searchParams.set('did', did); 10 + interface NewAccount { 11 + did: string; 12 + handle: string; 13 + pds_hostname: string; 14 + created_at: any; 15 + } 16 + 17 + interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } 18 + interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } 19 + interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord } 20 + 21 + interface NewChannelInvite { 22 + uri: string; 23 + cid: string; 24 + creator_did: string; 25 + indexed_at: string; 26 + data: AtpRecord; 27 + channel?: string; 28 + recipient_did?: string; 29 + } 21 30 22 - let car: any; 31 + interface NewChannelMembership { 32 + uri: string; 33 + cid: string; 34 + indexed_at: string; 35 + data: AtpRecord; 36 + channel?: string; 37 + recipient_did?: string; 38 + } 39 + 40 + const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); 41 + 42 + async function processSingleRepo(pdsHostname: string, did: string): Promise<void> { 43 + const pdsBaseUrl = `https://${pdsHostname}` 44 + const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl) 45 + getRepoUrl.searchParams.set('did', did) 46 + 47 + let car: any 48 + let carRoot: string 23 49 try { 24 50 const { CarReader } = await import('@ipld/car'); 25 51 26 - const response = await fetch(getRepoUrl.href); 52 + const response = await fetch(getRepoUrl.href) 27 53 if (!response.ok) { 28 - throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`); 54 + throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`) 55 + } 56 + const carBytes = new Uint8Array(await response.arrayBuffer()) 57 + car = await CarReader.fromBytes(carBytes) 58 + const roots = await car.getRoots() 59 + if (roots.length === 0) { 60 + throw new Error('CAR file has no root CID') 29 61 } 30 - const carBytes = new Uint8Array(await response.arrayBuffer()); 31 - car = await CarReader.fromBytes(carBytes); 62 + carRoot = roots[0].toString() 32 63 } catch (e: any) { 33 - console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`); 34 - return; 64 + console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`) 65 + return 35 66 } 36 67 37 - const recordsToInsert: NewFirehoseEvent[] = []; 68 + const newLattices: NewLattice[] = [] 69 + const newShards: NewShard[] = [] 70 + const newChannels: NewChannel[] = [] 71 + const newInvites: NewChannelInvite[] = [] 72 + const newMemberships: NewChannelMembership[] = [] 73 + 74 + const LATTICE_LEXICON = 'systems.gmstn.development.lattice' 75 + const SHARD_LEXICON = 'systems.gmstn.development.shard' 76 + const CHANNEL_LEXICON = 'systems.gmstn.development.channel' 77 + const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite' 78 + const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership' 38 79 39 80 try { 40 81 const cbor = await import('@ipld/dag-cbor'); 41 - 42 82 for await (const block of car.blocks()) { 43 - const record = cbor.decode(block.bytes) as AtpRecord; 83 + const record = cbor.decode(block.bytes) as AtpRecord 84 + const cid = block.cid.toString() 85 + 86 + const collection = record?.$type 44 87 45 88 if ( 46 - record && 47 - record.$type && 48 - typeof record.$type === 'string' && 49 - record.$type.startsWith('systems.gmstn.') 89 + collection && 90 + typeof collection === 'string' && 91 + collection.startsWith('systems.gmstn.development.') 50 92 ) { 51 93 if (!record.createdAt || typeof record.createdAt !== 'string') { 52 - console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`); 53 - continue; 94 + console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`) 95 + continue 96 + } 97 + 98 + const uri = `at://${did}/${collection}/${cid}` 99 + 100 + const baseRecord = { 101 + uri: uri, 102 + cid: cid, 103 + indexed_at: record.createdAt, 104 + data: record, 54 105 } 55 106 56 - recordsToInsert.push({ 57 - timestamp: record.createdAt, 58 - event_type: record.$type, 59 - event_data: record, 60 - }); 107 + switch (collection) { 108 + case LATTICE_LEXICON: 109 + newLattices.push({ 110 + ...baseRecord, 111 + creator_did: did 112 + } as NewLattice) 113 + break 114 + case SHARD_LEXICON: 115 + newShards.push({ 116 + ...baseRecord, 117 + creator_did: did 118 + } as NewShard) 119 + break 120 + case CHANNEL_LEXICON: 121 + newChannels.push({ 122 + ...baseRecord, 123 + creator_did: did 124 + } as NewChannel) 125 + break 126 + case CHANNEL_INVITE_LEXICON: { 127 + const recipientDid = record.recipient 128 + 129 + const existingAccount = await db.selectFrom('account') 130 + .select('did') 131 + .where('did', '=', recipientDid) 132 + .executeTakeFirst() 133 + 134 + if (!existingAccount) { 135 + try { 136 + const newAccount: NewAccount = { 137 + did: recipientDid, 138 + handle: recipientDid, 139 + // We'll probably resolve this later, no problem :3 140 + pds_hostname: null, 141 + created_at: sql`now()`, 142 + } 143 + await db.insertInto('account') 144 + .values(newAccount) 145 + .onConflict(oc => oc.column('did').doNothing()) 146 + .execute() 147 + console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`) 148 + } catch (e) { 149 + console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e) 150 + break 151 + } 152 + } 153 + 154 + newInvites.push({ 155 + ...baseRecord, 156 + creator_did: did, 157 + channel: record.channel.cid, 158 + recipient_did: recipientDid, 159 + } as NewChannelInvite) 160 + break 161 + } 162 + case CHANNEL_MEMBERSHIP_LEXICON: 163 + newMemberships.push({ 164 + ...baseRecord, 165 + channel: record.channel.cid, 166 + recipient_did: did, 167 + } as NewChannelMembership) 168 + break 169 + default: 170 + console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`) 171 + } 61 172 } 62 173 } 63 174 } catch (e: any) { 64 - console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`); 65 - return; 175 + console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`) 176 + return 177 + } 178 + 179 + if (newLattices.length > 0) { 180 + await insertRecords('lattice', newLattices, did) 181 + } 182 + if (newShards.length > 0) { 183 + await insertRecords('shard', newShards, did) 66 184 } 185 + if (newChannels.length > 0) { 186 + await insertRecords('channel', newChannels, did) 187 + } 188 + if (newInvites.length > 0) { 189 + await insertRecords('channel_invite', newInvites, did) 190 + } 191 + if (newMemberships.length > 0) { 192 + await insertRecords('channel_membership', newMemberships, did) 193 + } 194 + } 67 195 68 - if (recordsToInsert.length > 0) { 196 + async function insertRecords(table: string, records: any[], did: string) { 197 + if (records.length === 0) return; 69 198 try { 70 - await db.insertInto('firehose_event').values(recordsToInsert).execute(); 71 - console.log(`[${did}] Inserted ${recordsToInsert.length} 'systems.gmstn.*' records.`); 199 + await db.insertInto(table as any) 200 + .values(records) 201 + .execute(); 202 + console.log(`[${did}] Inserted ${records.length} records into '${table}'.`); 72 203 } catch (e: any) { 73 - console.error(`[${did}] Failed to insert records into DB: ${e.message}`); 204 + console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`); 74 205 } 75 - } 76 206 } 77 207 78 208 async function backfillPds(pdsHostname: string) { ··· 103 233 104 234 console.log(`Fetched ${dids.length} repos. Cursor: ${cursor}`); 105 235 236 + const newAccounts: NewAccount[] = dids.map(repo => ({ 237 + did: repo, 238 + handle: repo, 239 + pds_hostname: pdsHostname, 240 + created_at: sql`now()`, 241 + })); 242 + 243 + if (newAccounts.length > 0) { 244 + try { 245 + await db.insertInto('account') 246 + .values(newAccounts) 247 + .onConflict((oc) => oc 248 + .column('did') 249 + .doUpdateSet({ 250 + pds_hostname: sql`excluded.pds_hostname`, 251 + }) 252 + ) 253 + .execute(); 254 + console.log(`Successfully bulk upserted ${newAccounts.length} accounts.`); 255 + } catch (e: any) { 256 + console.error(`Failed to bulk upsert accounts: ${e.message}`); 257 + } 258 + } 259 + 106 260 const BATCH_SIZE = 10; 107 261 for (let i = 0; i < dids.length; i += BATCH_SIZE) { 108 262 const batch = dids.slice(i, i + BATCH_SIZE); ··· 131 285 } 132 286 133 287 async function main() { 134 - let pdsesToBackfill: { hostname: string }[] = []; 288 + const CONSTANT_DELAY_MS = 10000; 135 289 136 - try { 137 - pdsesToBackfill = await db 138 - .selectFrom('pds') 139 - .select('hostname') 140 - .where('backfilled_at', 'is', null) 141 - .orderBy( 142 - (eb) => eb 143 - .case() 144 - .when('hostname', 'like', '%.bsky.network') 145 - .then(1) 146 - .else(0) 147 - .end(), 148 - 'asc' 149 - ) 150 - .orderBy('added_at', 'asc') 151 - .execute(); 290 + while (true) { 291 + let pdsesToBackfill: { hostname: string }[] = []; 152 292 153 - if (pdsesToBackfill.length === 0) { 154 - console.log('No PDSs to backfill. All caught up! Exiting.'); 155 - await db.destroy(); 156 - return; 157 - } 158 - 159 - console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`); 160 - 161 - } catch (e: any) { 162 - console.error('Failed to fetch PDS list from database:', e.message); 163 - process.exit(1); 164 - } 165 - 166 - for (const pds of pdsesToBackfill) { 167 293 try { 168 - await backfillPds(pds.hostname); 169 - } catch (e) { 170 - console.error(`---`); 171 - console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`); 172 - console.error(`---`); 294 + pdsesToBackfill = await db 295 + .selectFrom('pds') 296 + .select('hostname') 297 + .where((eb) => eb.or([ 298 + eb('backfilled_at', 'is', null), 299 + eb('backfilled_at', '<', sql`now() - interval '24 hours'`), 300 + ])) 301 + .orderBy( 302 + (eb) => eb 303 + .case() 304 + .when('hostname', 'like', '%.bsky.network') 305 + .then(1) 306 + .else(0) 307 + .end(), 308 + 'asc' 309 + ) 310 + .orderBy('added_at', 'asc') 311 + .execute(); 312 + 313 + if (pdsesToBackfill.length === 0) { 314 + console.log('No PDSs currently needing backfill. Waiting and checking again...'); 315 + } else { 316 + console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`); 317 + 318 + for (const pds of pdsesToBackfill) { 319 + try { 320 + await backfillPds(pds.hostname); 321 + } catch (e) { 322 + console.error(`---`); 323 + console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`); 324 + console.error(`---`); 325 + } 326 + } 327 + } 328 + } catch (e: any) { 329 + console.error('Fatal error during continuous backfill loop:', e.message); 173 330 } 174 - } 175 331 176 - console.log('All backfill jobs complete. Closing database connection.'); 177 - await db.destroy(); 332 + console.log(`Waiting for ${CONSTANT_DELAY_MS / 1000} seconds before next pass.`); 333 + await delay(CONSTANT_DELAY_MS); 334 + } 178 335 } 179 336 180 337 if (require.main === module) {