The backfill and firehose logic should in fact save respective records into meaningful tables as they come in, instead of a raw table to straighten out after the fact. If it's found down the line that synchronous shunting of records into relational tables isn't fast enough, we can always decouple it with a raw table later again.
+33
biome.json
+33
biome.json
···
···
1
+
{
2
+
"formatter": {
3
+
"indentStyle": "space",
4
+
"lineWidth": 100
5
+
},
6
+
"linter": {
7
+
"rules": {
8
+
"a11y": {
9
+
"useAriaPropsForRole": "off",
10
+
"useButtonType": "off",
11
+
"useSemanticElements": "off",
12
+
"noSvgWithoutTitle": "off"
13
+
},
14
+
"complexity": {
15
+
"noStaticOnlyClass": "off",
16
+
"noForEach": "off"
17
+
},
18
+
"suspicious": {
19
+
"noArrayIndexKey": "off",
20
+
"noPrototypeBuiltins": "off"
21
+
},
22
+
"style": {
23
+
"noNonNullAssertion": "off"
24
+
}
25
+
}
26
+
},
27
+
"javascript": {
28
+
"formatter": {
29
+
"quoteStyle": "single"
30
+
}
31
+
}
32
+
}
33
+
+13
docker-compose.yaml
+13
docker-compose.yaml
-16
migrations/20251029194100_create_firehose_event_table.ts
-16
migrations/20251029194100_create_firehose_event_table.ts
···
1
-
import { Kysely, sql } from 'kysely'
2
-
3
-
export async function up(db: Kysely<any>): Promise<void> {
4
-
await db.schema
5
-
.createTable('firehose_event')
6
-
.addColumn('timestamp', 'timestamptz', (col) =>
7
-
col.notNull().defaultTo(sql`now()`)
8
-
)
9
-
.addColumn('event_type', 'text', (col) => col.notNull())
10
-
.addColumn('event_data', 'jsonb', (col) => col.notNull())
11
-
.execute()
12
-
}
13
-
14
-
export async function down(db: Kysely<any>): Promise<void> {
15
-
await db.schema.dropTable('firehose_event').execute()
16
-
}
···
+86
migrations/20251108164300_create_channels_invites_users_shards.ts
+86
migrations/20251108164300_create_channels_invites_users_shards.ts
···
···
1
+
import { Kysely, sql } from 'kysely'
2
+
3
+
export async function up(db: Kysely<any>): Promise<void> {
4
+
await db.schema
5
+
.createTable('account')
6
+
.addColumn('did', 'text', (col) => col.primaryKey().notNull())
7
+
.addColumn('handle', 'text', (col) => col.notNull().unique())
8
+
.addColumn('pds_hostname', 'text', (col) =>
9
+
col.references('pds.hostname')
10
+
)
11
+
.addColumn('created_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
12
+
.execute()
13
+
14
+
await db.schema
15
+
.createTable('lattice')
16
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
17
+
.addColumn('cid', 'text', (col) => col.notNull())
18
+
.addColumn('creator_did', 'text', (col) =>
19
+
col.references('account.did').notNull()
20
+
)
21
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
22
+
.addColumn('data', 'jsonb', (col) => col.notNull())
23
+
.execute()
24
+
25
+
await db.schema
26
+
.createTable('shard')
27
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
28
+
.addColumn('cid', 'text', (col) => col.notNull())
29
+
.addColumn('creator_did', 'text', (col) =>
30
+
col.references('account.did').notNull()
31
+
)
32
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
33
+
.addColumn('data', 'jsonb', (col) => col.notNull())
34
+
.execute()
35
+
36
+
await db.schema
37
+
.createTable('channel')
38
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
39
+
.addColumn('cid', 'text', (col) => col.notNull().unique())
40
+
.addColumn('creator_did', 'text', (col) =>
41
+
col.references('account.did').notNull()
42
+
)
43
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
44
+
.addColumn('data', 'jsonb', (col) => col.notNull())
45
+
.execute()
46
+
47
+
await db.schema
48
+
.createTable('channel_invite')
49
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
50
+
.addColumn('cid', 'text', (col) => col.notNull())
51
+
.addColumn('channel', 'text', (col) =>
52
+
col.references('channel.cid').notNull()
53
+
)
54
+
.addColumn('creator_did', 'text', (col) =>
55
+
col.references('account.did').notNull()
56
+
)
57
+
.addColumn('recipient_did', 'text', (col) =>
58
+
col.references('account.did').notNull()
59
+
)
60
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
61
+
.addColumn('data', 'jsonb', (col) => col.notNull())
62
+
.execute()
63
+
64
+
await db.schema
65
+
.createTable('channel_membership')
66
+
.addColumn('uri', 'text', (col) => col.primaryKey().notNull())
67
+
.addColumn('cid', 'text', (col) => col.notNull())
68
+
.addColumn('channel', 'text', (col) =>
69
+
col.references('channel.cid').notNull()
70
+
)
71
+
.addColumn('recipient_did', 'text', (col) =>
72
+
col.references('account.did').notNull()
73
+
)
74
+
.addColumn('indexed_at', 'timestamptz', (col) => col.notNull().defaultTo(sql`now()`))
75
+
.addColumn('data', 'jsonb', (col) => col.notNull())
76
+
.execute()
77
+
}
78
+
79
+
export async function down(db: Kysely<any>): Promise<void> {
80
+
await db.schema.dropTable('channel_membership').execute()
81
+
await db.schema.dropTable('channel_invite').execute()
82
+
await db.schema.dropTable('channel').execute()
83
+
await db.schema.dropTable('shard').execute()
84
+
await db.schema.dropTable('lattice').execute()
85
+
await db.schema.dropTable('account').execute()
86
+
}
+2
-1
package.json
+2
-1
package.json
···
6
"scripts": {
7
"test": "echo \"Error: no test specified\" && exit 1",
8
"dev": "tsx src/index.ts",
9
+
"lint": "npx biome lint ./src && npx biome format ./src",
10
+
"lint:fix": "npx biome lint --fix ./src && npx biome format --fix ./src",
11
"db:migrate": "npx ts-node src/scripts/migrate.ts latest",
12
"db:revert": "npx ts-node src/scripts/migrate.ts down"
13
},
+180
-49
src/firehose.ts
+180
-49
src/firehose.ts
···
1
import { Firehose, CommitEvent, AccountEvent, IdentityEvent } from "@skyware/firehose";
2
import WebSocket from "ws";
3
import { db } from "./db";
4
-
import { Insertable } from "kysely";
5
-
import { FirehoseEventTable } from "./db";
6
-
7
-
const saveEvent = async (type: 'commit' | 'identity' | 'account', data: any) => {
8
-
try {
9
-
await db.insertInto('firehose_event').values({
10
-
event_type: type,
11
-
event_data: data
12
-
}).execute();
13
-
} catch (error) {
14
-
console.error("\nFailed to save event to database:", error);
15
-
}
16
-
};
17
18
const main = () => {
19
-
console.log("Starting Firehose listener...");
20
21
-
const firehose = new Firehose({
22
-
ws: WebSocket,
23
-
});
24
25
-
firehose.on("commit", async (commit: CommitEvent) => {
26
-
const createOps = commit.ops.filter(op => op.action === 'create');
27
-
const relevantOps = [];
28
29
-
for (const op of createOps) {
30
-
const recordType = op.record['$type'];
31
32
-
if (recordType && (recordType.startsWith('com.atproto.') || recordType.startsWith('systems.gmstn.'))) {
33
-
relevantOps.push(op);
34
-
}
35
-
}
36
37
-
if (relevantOps.length > 0) {
38
-
await saveEvent('commit', commit);
39
-
}
40
-
});
41
42
-
firehose.on("identity", async (identity: IdentityEvent) => {
43
-
await saveEvent('identity', identity);
44
-
});
45
46
-
firehose.on("account", async (account: AccountEvent) => {
47
-
await saveEvent('account', account);
48
-
});
49
50
-
firehose.on("open", () => {
51
-
console.log("\nConnection opened");
52
-
});
53
54
-
firehose.on("close", (cursor) => {
55
-
console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`);
56
-
firehose.start();
57
-
});
58
59
-
firehose.on("error", ({ error, cursor }) => {
60
-
console.error(`\nAn error occurred at cursor ${cursor}:`, error);
61
-
});
62
-
63
-
firehose.start();
64
65
-
console.log("Listeners attached. Waiting for events...");
66
};
67
68
main();
···
1
import { Firehose, CommitEvent, AccountEvent, IdentityEvent } from "@skyware/firehose";
2
import WebSocket from "ws";
3
import { db } from "./db";
4
+
import { Insertable, sql } from "kysely";
5
+
6
+
interface AtpRecord {
7
+
$type: string;
8
+
createdAt: string;
9
+
[key: string]: any;
10
+
}
11
+
12
+
interface NewAccount {
13
+
did: string;
14
+
handle: string;
15
+
pds_hostname: string;
16
+
created_at: any;
17
+
}
18
+
19
+
interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
20
+
interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
21
+
interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
22
+
23
+
interface NewChannelInvite {
24
+
uri: string;
25
+
cid: string;
26
+
creator_did: string;
27
+
indexed_at: string;
28
+
data: AtpRecord;
29
+
channel?: string;
30
+
recipient_did?: string;
31
+
}
32
+
33
+
interface NewChannelMembership {
34
+
uri: string;
35
+
cid: string;
36
+
indexed_at: string;
37
+
data: AtpRecord;
38
+
channel?: string;
39
+
recipient_did?: string;
40
+
}
41
+
42
+
async function insertRecords(table: string, records: any[], did: string) {
43
+
if (records.length === 0) return;
44
+
try {
45
+
await db.insertInto(table as any)
46
+
.values(records)
47
+
.execute();
48
+
console.log(`[${did}] Inserted ${records.length} records into '${table}'.`);
49
+
} catch (e: any) {
50
+
console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`);
51
+
}
52
+
}
53
+
54
+
//TODO: de-duplicate from pds-backfill file. - Lewis
55
56
const main = () => {
57
+
console.log("Starting Firehose listener...");
58
+
59
+
const firehose = new Firehose({
60
+
ws: WebSocket,
61
+
});
62
+
63
+
firehose.on("commit", async (commit: CommitEvent) => {
64
+
const { repo: did, time: indexedAt } = commit
65
+
66
+
const newLattices: NewLattice[] = []
67
+
const newShards: NewShard[] = []
68
+
const newChannels: NewChannel[] = []
69
+
const newInvites: NewChannelInvite[] = []
70
+
const newMemberships: NewChannelMembership[] = []
71
+
72
+
const LATTICE_LEXICON = 'systems.gmstn.development.lattice'
73
+
const SHARD_LEXICON = 'systems.gmstn.development.shard'
74
+
const CHANNEL_LEXICON = 'systems.gmstn.development.channel'
75
+
const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite'
76
+
const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership'
77
+
78
+
const createOps = commit.ops.filter(op => op.action === 'create');
79
+
80
+
for (const op of createOps) {
81
+
const record = op.record as AtpRecord
82
+
const collection = record?.$type
83
+
84
+
if (!collection || !collection.startsWith('systems.gmstn.development.')) {
85
+
continue
86
+
}
87
+
88
+
const uri = op.uri
89
+
const cid = op.cid.toString()
90
+
const creatorDid = did
91
+
92
+
if (!record.createdAt) {
93
+
console.warn(`[${did}] Found matching record without 'createdAt', skipping. URI: ${uri}`);
94
+
continue;
95
+
}
96
+
97
+
const baseRecord = {
98
+
uri: uri,
99
+
cid: cid,
100
+
indexed_at: indexedAt,
101
+
data: record,
102
+
}
103
+
104
+
switch (collection) {
105
+
case LATTICE_LEXICON:
106
+
newLattices.push({
107
+
...baseRecord,
108
+
creator_did: creatorDid
109
+
} as NewLattice)
110
+
break
111
+
case SHARD_LEXICON:
112
+
newShards.push({
113
+
...baseRecord,
114
+
creator_did: creatorDid
115
+
} as NewShard)
116
+
break
117
+
case CHANNEL_LEXICON:
118
+
newChannels.push({
119
+
...baseRecord,
120
+
creator_did: creatorDid
121
+
} as NewChannel)
122
+
break
123
+
124
+
case CHANNEL_INVITE_LEXICON: {
125
+
const recipientDid = record.recipient
126
127
+
const existingAccount = await db.selectFrom('account')
128
+
.select('did')
129
+
.where('did', '=', recipientDid)
130
+
.executeTakeFirst()
131
132
+
if (!existingAccount) {
133
+
try {
134
+
const newAccount: NewAccount = {
135
+
did: recipientDid,
136
+
handle: recipientDid,
137
+
// We'll probably resolve this later, no problem :3
138
+
pds_hostname: null,
139
+
created_at: sql`now()`,
140
+
}
141
+
await db.insertInto('account')
142
+
.values(newAccount)
143
+
.onConflict(oc => oc.column('did').doNothing())
144
+
.execute()
145
+
console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`)
146
+
} catch (e) {
147
+
console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e)
148
+
break
149
+
}
150
+
}
151
152
+
newInvites.push({
153
+
...baseRecord,
154
+
creator_did: did,
155
+
channel: record.channel.cid,
156
+
recipient_did: recipientDid,
157
+
} as NewChannelInvite)
158
+
break
159
+
}
160
161
+
case CHANNEL_MEMBERSHIP_LEXICON:
162
+
newMemberships.push({
163
+
...baseRecord,
164
+
channel: record.channel,
165
+
recipient_did: creatorDid,
166
+
} as NewChannelMembership)
167
+
break
168
169
+
default:
170
+
console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`)
171
+
}
172
+
}
173
174
+
await insertRecords('lattice', newLattices, did)
175
+
await insertRecords('shard', newShards, did)
176
+
await insertRecords('channel', newChannels, did)
177
+
await insertRecords('channel_invite', newInvites, did)
178
+
await insertRecords('channel_membership', newMemberships, did)
179
+
});
180
181
+
firehose.on("open", () => {
182
+
console.log("\nConnection opened");
183
+
});
184
185
+
firehose.on("close", (cursor) => {
186
+
console.log(`\nConnection closed. Last cursor was: ${cursor}. Restarting.`);
187
+
firehose.start();
188
+
});
189
190
+
firehose.on("error", ({ error, cursor }) => {
191
+
console.error(`\nAn error occurred at cursor ${cursor}:`, error);
192
+
});
193
194
+
firehose.start();
195
196
+
console.log("Listeners attached. Waiting for events...");
197
};
198
199
main();
+236
-79
src/pds-backfill.ts
+236
-79
src/pds-backfill.ts
···
1
import { db } from './db';
2
3
-
export interface FirehoseEventTable {
4
-
timestamp: ColumnType<Date, string | Date, never>;
5
-
event_type: string;
6
-
event_data: Record<string, any>;
7
-
}
8
-
export type FirehoseEvent = Selectable<FirehoseEventTable>;
9
-
export type NewFirehoseEvent = Insertable<FirehoseEventTable>;
10
-
11
interface AtpRecord {
12
$type: string;
13
createdAt: string;
14
[key: string]: any;
15
}
16
17
-
async function processSingleRepo(pdsHostname: string, did: string) {
18
-
const pdsBaseUrl = `https://` + pdsHostname;
19
-
const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl);
20
-
getRepoUrl.searchParams.set('did', did);
21
22
-
let car: any;
23
try {
24
const { CarReader } = await import('@ipld/car');
25
26
-
const response = await fetch(getRepoUrl.href);
27
if (!response.ok) {
28
-
throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`);
29
}
30
-
const carBytes = new Uint8Array(await response.arrayBuffer());
31
-
car = await CarReader.fromBytes(carBytes);
32
} catch (e: any) {
33
-
console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`);
34
-
return;
35
}
36
37
-
const recordsToInsert: NewFirehoseEvent[] = [];
38
39
try {
40
const cbor = await import('@ipld/dag-cbor');
41
-
42
for await (const block of car.blocks()) {
43
-
const record = cbor.decode(block.bytes) as AtpRecord;
44
45
if (
46
-
record &&
47
-
record.$type &&
48
-
typeof record.$type === 'string' &&
49
-
record.$type.startsWith('systems.gmstn.')
50
) {
51
if (!record.createdAt || typeof record.createdAt !== 'string') {
52
-
console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`);
53
-
continue;
54
}
55
56
-
recordsToInsert.push({
57
-
timestamp: record.createdAt,
58
-
event_type: record.$type,
59
-
event_data: record,
60
-
});
61
}
62
}
63
} catch (e: any) {
64
-
console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`);
65
-
return;
66
}
67
68
-
if (recordsToInsert.length > 0) {
69
try {
70
-
await db.insertInto('firehose_event').values(recordsToInsert).execute();
71
-
console.log(`[${did}] Inserted ${recordsToInsert.length} 'systems.gmstn.*' records.`);
72
} catch (e: any) {
73
-
console.error(`[${did}] Failed to insert records into DB: ${e.message}`);
74
}
75
-
}
76
}
77
78
async function backfillPds(pdsHostname: string) {
···
103
104
console.log(`Fetched ${dids.length} repos. Cursor: ${cursor}`);
105
106
const BATCH_SIZE = 10;
107
for (let i = 0; i < dids.length; i += BATCH_SIZE) {
108
const batch = dids.slice(i, i + BATCH_SIZE);
···
131
}
132
133
async function main() {
134
-
let pdsesToBackfill: { hostname: string }[] = [];
135
136
-
try {
137
-
pdsesToBackfill = await db
138
-
.selectFrom('pds')
139
-
.select('hostname')
140
-
.where('backfilled_at', 'is', null)
141
-
.orderBy(
142
-
(eb) => eb
143
-
.case()
144
-
.when('hostname', 'like', '%.bsky.network')
145
-
.then(1)
146
-
.else(0)
147
-
.end(),
148
-
'asc'
149
-
)
150
-
.orderBy('added_at', 'asc')
151
-
.execute();
152
153
-
if (pdsesToBackfill.length === 0) {
154
-
console.log('No PDSs to backfill. All caught up! Exiting.');
155
-
await db.destroy();
156
-
return;
157
-
}
158
-
159
-
console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`);
160
-
161
-
} catch (e: any) {
162
-
console.error('Failed to fetch PDS list from database:', e.message);
163
-
process.exit(1);
164
-
}
165
-
166
-
for (const pds of pdsesToBackfill) {
167
try {
168
-
await backfillPds(pds.hostname);
169
-
} catch (e) {
170
-
console.error(`---`);
171
-
console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`);
172
-
console.error(`---`);
173
}
174
-
}
175
176
-
console.log('All backfill jobs complete. Closing database connection.');
177
-
await db.destroy();
178
}
179
180
if (require.main === module) {
···
1
+
import { sql, Kysely } from 'kysely';
2
import { db } from './db';
3
4
interface AtpRecord {
5
$type: string;
6
createdAt: string;
7
[key: string]: any;
8
}
9
10
+
interface NewAccount {
11
+
did: string;
12
+
handle: string;
13
+
pds_hostname: string;
14
+
created_at: any;
15
+
}
16
+
17
+
interface NewLattice { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
18
+
interface NewShard { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
19
+
interface NewChannel { uri: string; cid: string; creator_did: string; indexed_at: string; data: AtpRecord }
20
+
21
+
interface NewChannelInvite {
22
+
uri: string;
23
+
cid: string;
24
+
creator_did: string;
25
+
indexed_at: string;
26
+
data: AtpRecord;
27
+
channel?: string;
28
+
recipient_did?: string;
29
+
}
30
31
+
interface NewChannelMembership {
32
+
uri: string;
33
+
cid: string;
34
+
indexed_at: string;
35
+
data: AtpRecord;
36
+
channel?: string;
37
+
recipient_did?: string;
38
+
}
39
+
40
+
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
41
+
42
+
async function processSingleRepo(pdsHostname: string, did: string): Promise<void> {
43
+
const pdsBaseUrl = `https://${pdsHostname}`
44
+
const getRepoUrl = new URL(`/xrpc/com.atproto.sync.getRepo`, pdsBaseUrl)
45
+
getRepoUrl.searchParams.set('did', did)
46
+
47
+
let car: any
48
+
let carRoot: string
49
try {
50
const { CarReader } = await import('@ipld/car');
51
52
+
const response = await fetch(getRepoUrl.href)
53
if (!response.ok) {
54
+
throw new Error(`Failed to getRepo for ${did}: ${response.status} ${response.statusText}`)
55
+
}
56
+
const carBytes = new Uint8Array(await response.arrayBuffer())
57
+
car = await CarReader.fromBytes(carBytes)
58
+
const roots = await car.getRoots()
59
+
if (roots.length === 0) {
60
+
throw new Error('CAR file has no root CID')
61
}
62
+
carRoot = roots[0].toString()
63
} catch (e: any) {
64
+
console.error(`[${did}] Failed to fetch or parse CAR: ${e.message}`)
65
+
return
66
}
67
68
+
const newLattices: NewLattice[] = []
69
+
const newShards: NewShard[] = []
70
+
const newChannels: NewChannel[] = []
71
+
const newInvites: NewChannelInvite[] = []
72
+
const newMemberships: NewChannelMembership[] = []
73
+
74
+
const LATTICE_LEXICON = 'systems.gmstn.development.lattice'
75
+
const SHARD_LEXICON = 'systems.gmstn.development.shard'
76
+
const CHANNEL_LEXICON = 'systems.gmstn.development.channel'
77
+
const CHANNEL_INVITE_LEXICON = 'systems.gmstn.development.channel.invite'
78
+
const CHANNEL_MEMBERSHIP_LEXICON = 'systems.gmstn.development.channel.membership'
79
80
try {
81
const cbor = await import('@ipld/dag-cbor');
82
for await (const block of car.blocks()) {
83
+
const record = cbor.decode(block.bytes) as AtpRecord
84
+
const cid = block.cid.toString()
85
+
86
+
const collection = record?.$type
87
88
if (
89
+
collection &&
90
+
typeof collection === 'string' &&
91
+
collection.startsWith('systems.gmstn.development.')
92
) {
93
if (!record.createdAt || typeof record.createdAt !== 'string') {
94
+
console.warn(`[${did}] Found matching record without valid 'createdAt', skipping.`)
95
+
continue
96
+
}
97
+
98
+
const uri = `at://${did}/${collection}/${cid}`
99
+
100
+
const baseRecord = {
101
+
uri: uri,
102
+
cid: cid,
103
+
indexed_at: record.createdAt,
104
+
data: record,
105
}
106
107
+
switch (collection) {
108
+
case LATTICE_LEXICON:
109
+
newLattices.push({
110
+
...baseRecord,
111
+
creator_did: did
112
+
} as NewLattice)
113
+
break
114
+
case SHARD_LEXICON:
115
+
newShards.push({
116
+
...baseRecord,
117
+
creator_did: did
118
+
} as NewShard)
119
+
break
120
+
case CHANNEL_LEXICON:
121
+
newChannels.push({
122
+
...baseRecord,
123
+
creator_did: did
124
+
} as NewChannel)
125
+
break
126
+
case CHANNEL_INVITE_LEXICON: {
127
+
const recipientDid = record.recipient
128
+
129
+
const existingAccount = await db.selectFrom('account')
130
+
.select('did')
131
+
.where('did', '=', recipientDid)
132
+
.executeTakeFirst()
133
+
134
+
if (!existingAccount) {
135
+
try {
136
+
const newAccount: NewAccount = {
137
+
did: recipientDid,
138
+
handle: recipientDid,
139
+
// We'll probably resolve this later, no problem :3
140
+
pds_hostname: null,
141
+
created_at: sql`now()`,
142
+
}
143
+
await db.insertInto('account')
144
+
.values(newAccount)
145
+
.onConflict(oc => oc.column('did').doNothing())
146
+
.execute()
147
+
console.log(`[${did}] Created new placeholder account entry for recipient ${recipientDid}.`)
148
+
} catch (e) {
149
+
console.error(`[${did}] Failed to upsert recipient account ${recipientDid}:`, e)
150
+
break
151
+
}
152
+
}
153
+
154
+
newInvites.push({
155
+
...baseRecord,
156
+
creator_did: did,
157
+
channel: record.channel.cid,
158
+
recipient_did: recipientDid,
159
+
} as NewChannelInvite)
160
+
break
161
+
}
162
+
case CHANNEL_MEMBERSHIP_LEXICON:
163
+
newMemberships.push({
164
+
...baseRecord,
165
+
channel: record.channel.cid,
166
+
recipient_did: did,
167
+
} as NewChannelMembership)
168
+
break
169
+
default:
170
+
console.warn(`[${did}] Unhandled 'systems.gmstn.development.*' lexicon: ${collection}`)
171
+
}
172
}
173
}
174
} catch (e: any) {
175
+
console.error(`[${did}] Error parsing CAR blocks: ${e.message}. Skipping rest of repo.`)
176
+
return
177
+
}
178
+
179
+
if (newLattices.length > 0) {
180
+
await insertRecords('lattice', newLattices, did)
181
+
}
182
+
if (newShards.length > 0) {
183
+
await insertRecords('shard', newShards, did)
184
}
185
+
if (newChannels.length > 0) {
186
+
await insertRecords('channel', newChannels, did)
187
+
}
188
+
if (newInvites.length > 0) {
189
+
await insertRecords('channel_invite', newInvites, did)
190
+
}
191
+
if (newMemberships.length > 0) {
192
+
await insertRecords('channel_membership', newMemberships, did)
193
+
}
194
+
}
195
196
+
async function insertRecords(table: string, records: any[], did: string) {
197
+
if (records.length === 0) return;
198
try {
199
+
await db.insertInto(table as any)
200
+
.values(records)
201
+
.execute();
202
+
console.log(`[${did}] Inserted ${records.length} records into '${table}'.`);
203
} catch (e: any) {
204
+
console.error(`[${did}] Failed to insert records into '${table}': ${e.message}`);
205
}
206
}
207
208
async function backfillPds(pdsHostname: string) {
···
233
234
console.log(`Fetched ${dids.length} repos. Cursor: ${cursor}`);
235
236
+
const newAccounts: NewAccount[] = dids.map(repo => ({
237
+
did: repo,
238
+
handle: repo,
239
+
pds_hostname: pdsHostname,
240
+
created_at: sql`now()`,
241
+
}));
242
+
243
+
if (newAccounts.length > 0) {
244
+
try {
245
+
await db.insertInto('account')
246
+
.values(newAccounts)
247
+
.onConflict((oc) => oc
248
+
.column('did')
249
+
.doUpdateSet({
250
+
pds_hostname: sql`excluded.pds_hostname`,
251
+
})
252
+
)
253
+
.execute();
254
+
console.log(`Successfully bulk upserted ${newAccounts.length} accounts.`);
255
+
} catch (e: any) {
256
+
console.error(`Failed to bulk upsert accounts: ${e.message}`);
257
+
}
258
+
}
259
+
260
const BATCH_SIZE = 10;
261
for (let i = 0; i < dids.length; i += BATCH_SIZE) {
262
const batch = dids.slice(i, i + BATCH_SIZE);
···
285
}
286
287
async function main() {
288
+
const CONSTANT_DELAY_MS = 10000;
289
290
+
while (true) {
291
+
let pdsesToBackfill: { hostname: string }[] = [];
292
293
try {
294
+
pdsesToBackfill = await db
295
+
.selectFrom('pds')
296
+
.select('hostname')
297
+
.where((eb) => eb.or([
298
+
eb('backfilled_at', 'is', null),
299
+
eb('backfilled_at', '<', sql`now() - interval '24 hours'`),
300
+
]))
301
+
.orderBy(
302
+
(eb) => eb
303
+
.case()
304
+
.when('hostname', 'like', '%.bsky.network')
305
+
.then(1)
306
+
.else(0)
307
+
.end(),
308
+
'asc'
309
+
)
310
+
.orderBy('added_at', 'asc')
311
+
.execute();
312
+
313
+
if (pdsesToBackfill.length === 0) {
314
+
console.log('No PDSs currently needing backfill. Waiting and checking again...');
315
+
} else {
316
+
console.log(`Found ${pdsesToBackfill.length} PDS(s) to backfill. Starting job...`);
317
+
318
+
for (const pds of pdsesToBackfill) {
319
+
try {
320
+
await backfillPds(pds.hostname);
321
+
} catch (e) {
322
+
console.error(`---`);
323
+
console.error(`Job for ${pds.hostname} failed. Moving to next PDS.`);
324
+
console.error(`---`);
325
+
}
326
+
}
327
+
}
328
+
} catch (e: any) {
329
+
console.error('Fatal error during continuous backfill loop:', e.message);
330
}
331
332
+
console.log(`Waiting for ${CONSTANT_DELAY_MS / 1000} seconds before next pass.`);
333
+
await delay(CONSTANT_DELAY_MS);
334
+
}
335
}
336
337
if (require.main === module) {