+150
-18
package-lock.json
+150
-18
package-lock.json
···
12
12
"@atproto/api": "^0.13.4",
13
13
"@atproto/common": "^0.4.1",
14
14
"@atproto/identity": "^0.4.0",
15
-
"@atproto/lexicon": "^0.4.1",
15
+
"@atproto/lexicon": "0.4.1-rc.0",
16
16
"@atproto/oauth-client-node": "^0.1.0",
17
-
"@atproto/repo": "^0.4.3",
17
+
"@atproto/repo": "0.4.2-rc.0",
18
+
"@atproto/sync": "^0.1.0",
18
19
"@atproto/syntax": "^0.3.0",
19
20
"@atproto/xrpc-server": "^0.6.3",
20
21
"better-sqlite3": "^11.1.2",
···
75
76
"undici": "^6.14.1"
76
77
}
77
78
},
79
+
"node_modules/@atproto-labs/fetch-node/node_modules/ipaddr.js": {
80
+
"version": "2.2.0",
81
+
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz",
82
+
"integrity": "sha512-Ag3wB2o37wslZS19hZqorUnrnzSkpOVy+IiiDEiTqNubEYpYuHWIf6K4psgN2ZWKExS4xhVCrRVfb/wfW8fWJA==",
83
+
"engines": {
84
+
"node": ">= 10"
85
+
}
86
+
},
78
87
"node_modules/@atproto-labs/handle-resolver": {
79
88
"version": "0.1.2",
80
89
"resolved": "https://registry.npmjs.org/@atproto-labs/handle-resolver/-/handle-resolver-0.1.2.tgz",
···
139
148
"tlds": "^1.234.0"
140
149
}
141
150
},
151
+
"node_modules/@atproto/api/node_modules/@atproto/lexicon": {
152
+
"version": "0.4.1",
153
+
"resolved": "https://registry.npmjs.org/@atproto/lexicon/-/lexicon-0.4.1.tgz",
154
+
"integrity": "sha512-bzyr+/VHXLQWbumViX5L7h1NKQObfs8Z+XZJl43OUK8nYFUI4e/sW1IZKRNfw7Wvi5YVNK+J+yP3DWIBZhkCYA==",
155
+
"dependencies": {
156
+
"@atproto/common-web": "^0.3.0",
157
+
"@atproto/syntax": "^0.3.0",
158
+
"iso-datestring-validator": "^2.2.2",
159
+
"multiformats": "^9.9.0",
160
+
"zod": "^3.23.8"
161
+
}
162
+
},
142
163
"node_modules/@atproto/common": {
143
164
"version": "0.4.1",
144
165
"resolved": "https://registry.npmjs.org/@atproto/common/-/common-0.4.1.tgz",
···
284
305
"lex": "dist/index.js"
285
306
}
286
307
},
287
-
"node_modules/@atproto/lexicon": {
308
+
"node_modules/@atproto/lex-cli/node_modules/@atproto/lexicon": {
288
309
"version": "0.4.1",
289
310
"resolved": "https://registry.npmjs.org/@atproto/lexicon/-/lexicon-0.4.1.tgz",
290
311
"integrity": "sha512-bzyr+/VHXLQWbumViX5L7h1NKQObfs8Z+XZJl43OUK8nYFUI4e/sW1IZKRNfw7Wvi5YVNK+J+yP3DWIBZhkCYA==",
312
+
"dev": true,
313
+
"dependencies": {
314
+
"@atproto/common-web": "^0.3.0",
315
+
"@atproto/syntax": "^0.3.0",
316
+
"iso-datestring-validator": "^2.2.2",
317
+
"multiformats": "^9.9.0",
318
+
"zod": "^3.23.8"
319
+
}
320
+
},
321
+
"node_modules/@atproto/lexicon": {
322
+
"version": "0.4.1-rc.0",
323
+
"resolved": "https://registry.npmjs.org/@atproto/lexicon/-/lexicon-0.4.1-rc.0.tgz",
324
+
"integrity": "sha512-CSYO8MWbxTXTLQMEJ1mTXD2pDxIXO2oCK/FVw9T/BeXLMcvwmeVgKAaytd1AGFkapX8IMAAtjBB3cnaltuHwbg==",
291
325
"dependencies": {
292
326
"@atproto/common-web": "^0.3.0",
293
327
"@atproto/syntax": "^0.3.0",
···
341
375
}
342
376
},
343
377
"node_modules/@atproto/repo": {
344
-
"version": "0.4.3",
345
-
"resolved": "https://registry.npmjs.org/@atproto/repo/-/repo-0.4.3.tgz",
346
-
"integrity": "sha512-9w4TlyxExLfFL9BysvXq4vhIsYsnmGi0uelmxJjlRXBICgLpqjqQVqhS8OBqE25ZTYvgXUi7nNqpo2llCHhOvQ==",
378
+
"version": "0.4.2-rc.0",
379
+
"resolved": "https://registry.npmjs.org/@atproto/repo/-/repo-0.4.2-rc.0.tgz",
380
+
"integrity": "sha512-y8zXAR23r6qlsTmbzXaBEHYjvlgeNlAKj9eJ6V17JtT+4FVdW246alhsgSsglJ2Uv/e24RC1r90yNJNRxqDzXw==",
381
+
"dependencies": {
382
+
"@atproto/common": "^0.4.1",
383
+
"@atproto/common-web": "^0.3.0",
384
+
"@atproto/crypto": "^0.4.0",
385
+
"@atproto/lexicon": "^0.4.1-rc.0",
386
+
"@ipld/car": "^3.2.3",
387
+
"@ipld/dag-cbor": "^7.0.0",
388
+
"multiformats": "^9.9.0",
389
+
"uint8arrays": "3.0.0",
390
+
"zod": "^3.23.8"
391
+
}
392
+
},
393
+
"node_modules/@atproto/sync": {
394
+
"version": "0.1.0",
395
+
"resolved": "https://registry.npmjs.org/@atproto/sync/-/sync-0.1.0.tgz",
396
+
"integrity": "sha512-2O1UPaeZfL0agitE9rp2mjYVezvZsao3DgJwWCSid1S0N7Y2pOdc7/fSLH/OHn96QhG7g0FGpWzTEcdekRuT0g==",
397
+
"dependencies": {
398
+
"@atproto/common": "^0.4.1",
399
+
"@atproto/identity": "^0.4.1",
400
+
"@atproto/lexicon": "^0.4.1",
401
+
"@atproto/repo": "^0.5.0",
402
+
"@atproto/syntax": "^0.3.0",
403
+
"@atproto/xrpc-server": "^0.6.3",
404
+
"multiformats": "^9.9.0",
405
+
"p-queue": "^6.6.2"
406
+
}
407
+
},
408
+
"node_modules/@atproto/sync/node_modules/@atproto/lexicon": {
409
+
"version": "0.4.1",
410
+
"resolved": "https://registry.npmjs.org/@atproto/lexicon/-/lexicon-0.4.1.tgz",
411
+
"integrity": "sha512-bzyr+/VHXLQWbumViX5L7h1NKQObfs8Z+XZJl43OUK8nYFUI4e/sW1IZKRNfw7Wvi5YVNK+J+yP3DWIBZhkCYA==",
412
+
"dependencies": {
413
+
"@atproto/common-web": "^0.3.0",
414
+
"@atproto/syntax": "^0.3.0",
415
+
"iso-datestring-validator": "^2.2.2",
416
+
"multiformats": "^9.9.0",
417
+
"zod": "^3.23.8"
418
+
}
419
+
},
420
+
"node_modules/@atproto/sync/node_modules/@atproto/repo": {
421
+
"version": "0.5.0",
422
+
"resolved": "https://registry.npmjs.org/@atproto/repo/-/repo-0.5.0.tgz",
423
+
"integrity": "sha512-kZbj4wW5eFrDjkSTS9z+6bT4OTr5K4GrqWukWbfdBJtZPXsRDm75AV0C9ItoHDTdbBXn65TK6kqaJTrf89osCg==",
347
424
"dependencies": {
348
425
"@atproto/common": "^0.4.1",
349
426
"@atproto/common-web": "^0.3.0",
···
386
463
"rate-limiter-flexible": "^2.4.1",
387
464
"uint8arrays": "3.0.0",
388
465
"ws": "^8.12.0",
466
+
"zod": "^3.23.8"
467
+
}
468
+
},
469
+
"node_modules/@atproto/xrpc-server/node_modules/@atproto/lexicon": {
470
+
"version": "0.4.1",
471
+
"resolved": "https://registry.npmjs.org/@atproto/lexicon/-/lexicon-0.4.1.tgz",
472
+
"integrity": "sha512-bzyr+/VHXLQWbumViX5L7h1NKQObfs8Z+XZJl43OUK8nYFUI4e/sW1IZKRNfw7Wvi5YVNK+J+yP3DWIBZhkCYA==",
473
+
"dependencies": {
474
+
"@atproto/common-web": "^0.3.0",
475
+
"@atproto/syntax": "^0.3.0",
476
+
"iso-datestring-validator": "^2.2.2",
477
+
"multiformats": "^9.9.0",
478
+
"zod": "^3.23.8"
479
+
}
480
+
},
481
+
"node_modules/@atproto/xrpc/node_modules/@atproto/lexicon": {
482
+
"version": "0.4.1",
483
+
"resolved": "https://registry.npmjs.org/@atproto/lexicon/-/lexicon-0.4.1.tgz",
484
+
"integrity": "sha512-bzyr+/VHXLQWbumViX5L7h1NKQObfs8Z+XZJl43OUK8nYFUI4e/sW1IZKRNfw7Wvi5YVNK+J+yP3DWIBZhkCYA==",
485
+
"dependencies": {
486
+
"@atproto/common-web": "^0.3.0",
487
+
"@atproto/syntax": "^0.3.0",
488
+
"iso-datestring-validator": "^2.2.2",
489
+
"multiformats": "^9.9.0",
389
490
"zod": "^3.23.8"
390
491
}
391
492
},
···
2279
2380
"node": ">=6"
2280
2381
}
2281
2382
},
2383
+
"node_modules/eventemitter3": {
2384
+
"version": "4.0.7",
2385
+
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
2386
+
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
2387
+
},
2282
2388
"node_modules/events": {
2283
2389
"version": "3.3.0",
2284
2390
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
···
2811
2917
"integrity": "sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew=="
2812
2918
},
2813
2919
"node_modules/ipaddr.js": {
2814
-
"version": "2.2.0",
2815
-
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz",
2816
-
"integrity": "sha512-Ag3wB2o37wslZS19hZqorUnrnzSkpOVy+IiiDEiTqNubEYpYuHWIf6K4psgN2ZWKExS4xhVCrRVfb/wfW8fWJA==",
2920
+
"version": "1.9.1",
2921
+
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz",
2922
+
"integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==",
2817
2923
"engines": {
2818
-
"node": ">= 10"
2924
+
"node": ">= 0.10"
2819
2925
}
2820
2926
},
2821
2927
"node_modules/iron-session": {
···
3288
3394
"url": "https://github.com/sponsors/sindresorhus"
3289
3395
}
3290
3396
},
3397
+
"node_modules/p-finally": {
3398
+
"version": "1.0.0",
3399
+
"resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz",
3400
+
"integrity": "sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==",
3401
+
"engines": {
3402
+
"node": ">=4"
3403
+
}
3404
+
},
3405
+
"node_modules/p-queue": {
3406
+
"version": "6.6.2",
3407
+
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz",
3408
+
"integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==",
3409
+
"dependencies": {
3410
+
"eventemitter3": "^4.0.4",
3411
+
"p-timeout": "^3.2.0"
3412
+
},
3413
+
"engines": {
3414
+
"node": ">=8"
3415
+
},
3416
+
"funding": {
3417
+
"url": "https://github.com/sponsors/sindresorhus"
3418
+
}
3419
+
},
3420
+
"node_modules/p-timeout": {
3421
+
"version": "3.2.0",
3422
+
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz",
3423
+
"integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==",
3424
+
"dependencies": {
3425
+
"p-finally": "^1.0.0"
3426
+
},
3427
+
"engines": {
3428
+
"node": ">=8"
3429
+
}
3430
+
},
3291
3431
"node_modules/package-json-from-dist": {
3292
3432
"version": "1.0.0",
3293
3433
"resolved": "https://registry.npmjs.org/package-json-from-dist/-/package-json-from-dist-1.0.0.tgz",
···
3537
3677
"forwarded": "0.2.0",
3538
3678
"ipaddr.js": "1.9.1"
3539
3679
},
3540
-
"engines": {
3541
-
"node": ">= 0.10"
3542
-
}
3543
-
},
3544
-
"node_modules/proxy-addr/node_modules/ipaddr.js": {
3545
-
"version": "1.9.1",
3546
-
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz",
3547
-
"integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==",
3548
3680
"engines": {
3549
3681
"node": ">= 0.10"
3550
3682
}
+3
-2
package.json
+3
-2
package.json
···
17
17
"@atproto/common": "^0.4.1",
18
18
"@atproto/api": "^0.13.4",
19
19
"@atproto/identity": "^0.4.0",
20
-
"@atproto/lexicon": "^0.4.1",
20
+
"@atproto/lexicon": "0.4.1-rc.0",
21
21
"@atproto/oauth-client-node": "^0.1.0",
22
-
"@atproto/repo": "^0.4.3",
22
+
"@atproto/repo": "0.4.2-rc.0",
23
+
"@atproto/sync": "^0.1.0",
23
24
"@atproto/syntax": "^0.3.0",
24
25
"@atproto/xrpc-server": "^0.6.3",
25
26
"better-sqlite3": "^11.1.2",
-194
src/firehose/firehose.ts
-194
src/firehose/firehose.ts
···
1
-
import type { RepoRecord } from '@atproto/lexicon'
2
-
import { cborToLexRecord, readCar } from '@atproto/repo'
3
-
import { AtUri } from '@atproto/syntax'
4
-
import { Subscription } from '@atproto/xrpc-server'
5
-
import type { CID } from 'multiformats/cid'
6
-
import {
7
-
type Account,
8
-
type Commit,
9
-
type Identity,
10
-
type RepoEvent,
11
-
isAccount,
12
-
isCommit,
13
-
isIdentity,
14
-
isValidRepoEvent,
15
-
} from './lexicons'
16
-
17
-
type Opts = {
18
-
service?: string
19
-
getCursor?: () => Promise<number | undefined>
20
-
setCursor?: (cursor: number) => Promise<void>
21
-
subscriptionReconnectDelay?: number
22
-
filterCollections?: string[]
23
-
excludeIdentity?: boolean
24
-
excludeAccount?: boolean
25
-
excludeCommit?: boolean
26
-
}
27
-
28
-
export class Firehose {
29
-
public sub: Subscription<RepoEvent>
30
-
private abortController: AbortController
31
-
32
-
constructor(public opts: Opts) {
33
-
this.abortController = new AbortController()
34
-
this.sub = new Subscription({
35
-
service: opts.service ?? 'https://bsky.network',
36
-
method: 'com.atproto.sync.subscribeRepos',
37
-
signal: this.abortController.signal,
38
-
getParams: async () => {
39
-
if (!opts.getCursor) return undefined
40
-
const cursor = await opts.getCursor()
41
-
return { cursor }
42
-
},
43
-
validate: (value: unknown) => {
44
-
try {
45
-
return isValidRepoEvent(value)
46
-
} catch (err) {
47
-
console.error('repo subscription skipped invalid message', err)
48
-
}
49
-
},
50
-
})
51
-
}
52
-
53
-
async *run(): AsyncGenerator<Event> {
54
-
try {
55
-
for await (const evt of this.sub) {
56
-
try {
57
-
if (isCommit(evt) && !this.opts.excludeCommit) {
58
-
const parsed = await parseCommit(evt)
59
-
for (const write of parsed) {
60
-
if (
61
-
!this.opts.filterCollections ||
62
-
this.opts.filterCollections.includes(write.uri.collection)
63
-
) {
64
-
yield write
65
-
}
66
-
}
67
-
} else if (isAccount(evt) && !this.opts.excludeAccount) {
68
-
const parsed = parseAccount(evt)
69
-
if (parsed) {
70
-
yield parsed
71
-
}
72
-
} else if (isIdentity(evt) && !this.opts.excludeIdentity) {
73
-
yield parseIdentity(evt)
74
-
}
75
-
} catch (err) {
76
-
console.error('repo subscription could not handle message', err)
77
-
}
78
-
if (this.opts.setCursor && typeof evt.seq === 'number') {
79
-
await this.opts.setCursor(evt.seq)
80
-
}
81
-
}
82
-
} catch (err) {
83
-
console.error('repo subscription errored', err)
84
-
setTimeout(() => this.run(), this.opts.subscriptionReconnectDelay ?? 3000)
85
-
}
86
-
}
87
-
88
-
destroy() {
89
-
this.abortController.abort()
90
-
}
91
-
}
92
-
93
-
export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => {
94
-
const car = await readCar(evt.blocks)
95
-
96
-
const evts: CommitEvt[] = []
97
-
98
-
for (const op of evt.ops) {
99
-
const uri = new AtUri(`at://${evt.repo}/${op.path}`)
100
-
101
-
const meta: CommitMeta = {
102
-
uri,
103
-
author: uri.host,
104
-
collection: uri.collection,
105
-
rkey: uri.rkey,
106
-
}
107
-
108
-
if (op.action === 'create' || op.action === 'update') {
109
-
if (!op.cid) continue
110
-
const recordBytes = car.blocks.get(op.cid)
111
-
if (!recordBytes) continue
112
-
const record = cborToLexRecord(recordBytes)
113
-
evts.push({
114
-
...meta,
115
-
event: op.action as 'create' | 'update',
116
-
cid: op.cid,
117
-
record,
118
-
})
119
-
}
120
-
121
-
if (op.action === 'delete') {
122
-
evts.push({
123
-
...meta,
124
-
event: 'delete',
125
-
})
126
-
}
127
-
}
128
-
129
-
return evts
130
-
}
131
-
132
-
export const parseIdentity = (evt: Identity): IdentityEvt => {
133
-
return {
134
-
event: 'identity',
135
-
did: evt.did,
136
-
handle: evt.handle,
137
-
}
138
-
}
139
-
140
-
export const parseAccount = (evt: Account): AccountEvt | undefined => {
141
-
if (evt.status && !isValidStatus(evt.status)) return
142
-
return {
143
-
event: 'account',
144
-
did: evt.did,
145
-
active: evt.active,
146
-
status: evt.status as AccountStatus,
147
-
}
148
-
}
149
-
150
-
const isValidStatus = (str: string): str is AccountStatus => {
151
-
return ['takendown', 'suspended', 'deleted', 'deactivated'].includes(str)
152
-
}
153
-
154
-
type Event = CommitEvt | IdentityEvt | AccountEvt
155
-
156
-
type CommitMeta = {
157
-
uri: AtUri
158
-
author: string
159
-
collection: string
160
-
rkey: string
161
-
}
162
-
163
-
type CommitEvt = Create | Update | Delete
164
-
165
-
type Create = CommitMeta & {
166
-
event: 'create'
167
-
record: RepoRecord
168
-
cid: CID
169
-
}
170
-
171
-
type Update = CommitMeta & {
172
-
event: 'update'
173
-
record: RepoRecord
174
-
cid: CID
175
-
}
176
-
177
-
type Delete = CommitMeta & {
178
-
event: 'delete'
179
-
}
180
-
181
-
type IdentityEvt = {
182
-
event: 'identity'
183
-
did: string
184
-
handle?: string
185
-
}
186
-
187
-
type AccountEvt = {
188
-
event: 'account'
189
-
did: string
190
-
active: boolean
191
-
status?: AccountStatus
192
-
}
193
-
194
-
type AccountStatus = 'takendown' | 'suspended' | 'deleted' | 'deactivated'
+19
-18
src/firehose/ingester.ts
src/ingester.ts
+19
-18
src/firehose/ingester.ts
src/ingester.ts
···
1
+
import pino from 'pino'
2
+
import { IdResolver } from '@atproto/identity'
3
+
import { Firehose } from '@atproto/sync'
1
4
import type { Database } from '#/db'
2
-
import { Firehose } from '#/firehose/firehose'
3
5
import * as Status from '#/lexicon/types/com/example/status'
4
6
5
-
export class Ingester {
6
-
firehose: Firehose | undefined
7
-
constructor(public db: Database) {}
8
-
9
-
async start() {
10
-
const firehose = new Firehose({})
11
-
12
-
for await (const evt of firehose.run()) {
7
+
export function createIngester(db: Database, idResolver: IdResolver) {
8
+
const logger = pino({ name: 'firehose ingestion' })
9
+
return new Firehose({
10
+
idResolver,
11
+
handleEvent: async (evt) => {
13
12
// Watch for write events
14
13
if (evt.event === 'create' || evt.event === 'update') {
15
14
const record = evt.record
···
21
20
Status.validateRecord(record).success
22
21
) {
23
22
// Store the status in our SQLite
24
-
await this.db
23
+
await db
25
24
.insertInto('status')
26
25
.values({
27
26
uri: evt.uri.toString(),
28
-
authorDid: evt.author,
27
+
authorDid: evt.did,
29
28
status: record.status,
30
29
createdAt: record.createdAt,
31
30
indexedAt: new Date().toISOString(),
···
43
42
evt.collection === 'com.example.status'
44
43
) {
45
44
// Remove the status from our SQLite
46
-
await this.db.deleteFrom('status').where({ uri: evt.uri.toString() })
45
+
await db.deleteFrom('status').where({ uri: evt.uri.toString() })
47
46
}
48
-
}
49
-
}
50
-
51
-
destroy() {
52
-
this.firehose?.destroy()
53
-
}
47
+
},
48
+
onError: (err) => {
49
+
logger.error({ err }, 'error on firehose ingestion')
50
+
},
51
+
filterCollections: ['com.example.status'],
52
+
excludeIdentity: true,
53
+
excludeAccount: true,
54
+
})
54
55
}
-355
src/firehose/lexicons.ts
-355
src/firehose/lexicons.ts
···
1
-
import type { IncomingMessage } from 'node:http'
2
-
3
-
import { type LexiconDoc, Lexicons } from '@atproto/lexicon'
4
-
import type { ErrorFrame, HandlerAuth } from '@atproto/xrpc-server'
5
-
import type { CID } from 'multiformats/cid'
6
-
7
-
// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up
8
-
9
-
export function isObj(v: unknown): v is Record<string, unknown> {
10
-
return typeof v === 'object' && v !== null
11
-
}
12
-
13
-
export function hasProp<K extends PropertyKey>(data: object, prop: K): data is Record<K, unknown> {
14
-
return prop in data
15
-
}
16
-
17
-
export interface QueryParams {
18
-
/** The last known event seq number to backfill from. */
19
-
cursor?: number
20
-
}
21
-
22
-
export type RepoEvent =
23
-
| Commit
24
-
| Identity
25
-
| Account
26
-
| Handle
27
-
| Migrate
28
-
| Tombstone
29
-
| Info
30
-
| { $type: string; [k: string]: unknown }
31
-
export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'>
32
-
export type HandlerOutput = HandlerError | RepoEvent
33
-
export type HandlerReqCtx<HA extends HandlerAuth = never> = {
34
-
auth: HA
35
-
params: QueryParams
36
-
req: IncomingMessage
37
-
signal: AbortSignal
38
-
}
39
-
export type Handler<HA extends HandlerAuth = never> = (ctx: HandlerReqCtx<HA>) => AsyncIterable<HandlerOutput>
40
-
41
-
/** Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. */
42
-
export interface Commit {
43
-
/** The stream sequence number of this message. */
44
-
seq: number
45
-
/** DEPRECATED -- unused */
46
-
rebase: boolean
47
-
/** Indicates that this commit contained too many ops, or data size was too large. Consumers will need to make a separate request to get missing data. */
48
-
tooBig: boolean
49
-
/** The repo this event comes from. */
50
-
repo: string
51
-
/** Repo commit object CID. */
52
-
commit: CID
53
-
/** DEPRECATED -- unused. WARNING -- nullable and optional; stick with optional to ensure golang interoperability. */
54
-
prev?: CID | null
55
-
/** The rev of the emitted commit. Note that this information is also in the commit object included in blocks, unless this is a tooBig event. */
56
-
rev: string
57
-
/** The rev of the last emitted commit from this repo (if any). */
58
-
since: string | null
59
-
/** CAR file containing relevant blocks, as a diff since the previous repo state. */
60
-
blocks: Uint8Array
61
-
ops: RepoOp[]
62
-
blobs: CID[]
63
-
/** Timestamp of when this message was originally broadcast. */
64
-
time: string
65
-
[k: string]: unknown
66
-
}
67
-
68
-
export function isCommit(v: unknown): v is Commit {
69
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#commit'
70
-
}
71
-
72
-
/** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */
73
-
export interface Identity {
74
-
seq: number
75
-
did: string
76
-
time: string
77
-
/** The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. */
78
-
handle?: string
79
-
[k: string]: unknown
80
-
}
81
-
82
-
export function isIdentity(v: unknown): v is Identity {
83
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#identity'
84
-
}
85
-
86
-
/** Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. */
87
-
export interface Account {
88
-
seq: number
89
-
did: string
90
-
time: string
91
-
/** Indicates that the account has a repository which can be fetched from the host that emitted this event. */
92
-
active: boolean
93
-
/** If active=false, this optional field indicates a reason for why the account is not active. */
94
-
status?: 'takendown' | 'suspended' | 'deleted' | 'deactivated' | (string & {})
95
-
[k: string]: unknown
96
-
}
97
-
98
-
export function isAccount(v: unknown): v is Account {
99
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#account'
100
-
}
101
-
102
-
/** DEPRECATED -- Use #identity event instead */
103
-
export interface Handle {
104
-
seq: number
105
-
did: string
106
-
handle: string
107
-
time: string
108
-
[k: string]: unknown
109
-
}
110
-
111
-
export function isHandle(v: unknown): v is Handle {
112
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#handle'
113
-
}
114
-
115
-
/** DEPRECATED -- Use #account event instead */
116
-
export interface Migrate {
117
-
seq: number
118
-
did: string
119
-
migrateTo: string | null
120
-
time: string
121
-
[k: string]: unknown
122
-
}
123
-
124
-
export function isMigrate(v: unknown): v is Migrate {
125
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#migrate'
126
-
}
127
-
128
-
/** DEPRECATED -- Use #account event instead */
129
-
export interface Tombstone {
130
-
seq: number
131
-
did: string
132
-
time: string
133
-
[k: string]: unknown
134
-
}
135
-
136
-
export function isTombstone(v: unknown): v is Tombstone {
137
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#tombstone'
138
-
}
139
-
140
-
export interface Info {
141
-
name: 'OutdatedCursor' | (string & {})
142
-
message?: string
143
-
[k: string]: unknown
144
-
}
145
-
146
-
export function isInfo(v: unknown): v is Info {
147
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#info'
148
-
}
149
-
150
-
/** A repo operation, ie a mutation of a single record. */
151
-
export interface RepoOp {
152
-
action: 'create' | 'update' | 'delete' | (string & {})
153
-
path: string
154
-
/** For creates and updates, the new record CID. For deletions, null. */
155
-
cid: CID | null
156
-
[k: string]: unknown
157
-
}
158
-
159
-
export function isRepoOp(v: unknown): v is RepoOp {
160
-
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#repoOp'
161
-
}
162
-
163
-
export const ComAtprotoSyncSubscribeRepos: LexiconDoc = {
164
-
lexicon: 1,
165
-
id: 'com.atproto.sync.subscribeRepos',
166
-
defs: {
167
-
main: {
168
-
type: 'subscription',
169
-
description: 'Subscribe to repo updates',
170
-
parameters: {
171
-
type: 'params',
172
-
properties: {
173
-
cursor: {
174
-
type: 'integer',
175
-
description: 'The last known event to backfill from.',
176
-
},
177
-
},
178
-
},
179
-
message: {
180
-
schema: {
181
-
type: 'union',
182
-
refs: [
183
-
'lex:com.atproto.sync.subscribeRepos#commit',
184
-
'lex:com.atproto.sync.subscribeRepos#handle',
185
-
'lex:com.atproto.sync.subscribeRepos#migrate',
186
-
'lex:com.atproto.sync.subscribeRepos#tombstone',
187
-
'lex:com.atproto.sync.subscribeRepos#info',
188
-
],
189
-
},
190
-
},
191
-
errors: [
192
-
{
193
-
name: 'FutureCursor',
194
-
},
195
-
{
196
-
name: 'ConsumerTooSlow',
197
-
},
198
-
],
199
-
},
200
-
commit: {
201
-
type: 'object',
202
-
required: ['seq', 'rebase', 'tooBig', 'repo', 'commit', 'rev', 'since', 'blocks', 'ops', 'blobs', 'time'],
203
-
nullable: ['prev', 'since'],
204
-
properties: {
205
-
seq: {
206
-
type: 'integer',
207
-
},
208
-
rebase: {
209
-
type: 'boolean',
210
-
},
211
-
tooBig: {
212
-
type: 'boolean',
213
-
},
214
-
repo: {
215
-
type: 'string',
216
-
format: 'did',
217
-
},
218
-
commit: {
219
-
type: 'cid-link',
220
-
},
221
-
prev: {
222
-
type: 'cid-link',
223
-
},
224
-
rev: {
225
-
type: 'string',
226
-
description: 'The rev of the emitted commit',
227
-
},
228
-
since: {
229
-
type: 'string',
230
-
description: 'The rev of the last emitted commit from this repo',
231
-
},
232
-
blocks: {
233
-
type: 'bytes',
234
-
description: 'CAR file containing relevant blocks',
235
-
maxLength: 1000000,
236
-
},
237
-
ops: {
238
-
type: 'array',
239
-
items: {
240
-
type: 'ref',
241
-
ref: 'lex:com.atproto.sync.subscribeRepos#repoOp',
242
-
},
243
-
maxLength: 200,
244
-
},
245
-
blobs: {
246
-
type: 'array',
247
-
items: {
248
-
type: 'cid-link',
249
-
},
250
-
},
251
-
time: {
252
-
type: 'string',
253
-
format: 'datetime',
254
-
},
255
-
},
256
-
},
257
-
handle: {
258
-
type: 'object',
259
-
required: ['seq', 'did', 'handle', 'time'],
260
-
properties: {
261
-
seq: {
262
-
type: 'integer',
263
-
},
264
-
did: {
265
-
type: 'string',
266
-
format: 'did',
267
-
},
268
-
handle: {
269
-
type: 'string',
270
-
format: 'handle',
271
-
},
272
-
time: {
273
-
type: 'string',
274
-
format: 'datetime',
275
-
},
276
-
},
277
-
},
278
-
migrate: {
279
-
type: 'object',
280
-
required: ['seq', 'did', 'migrateTo', 'time'],
281
-
nullable: ['migrateTo'],
282
-
properties: {
283
-
seq: {
284
-
type: 'integer',
285
-
},
286
-
did: {
287
-
type: 'string',
288
-
format: 'did',
289
-
},
290
-
migrateTo: {
291
-
type: 'string',
292
-
},
293
-
time: {
294
-
type: 'string',
295
-
format: 'datetime',
296
-
},
297
-
},
298
-
},
299
-
tombstone: {
300
-
type: 'object',
301
-
required: ['seq', 'did', 'time'],
302
-
properties: {
303
-
seq: {
304
-
type: 'integer',
305
-
},
306
-
did: {
307
-
type: 'string',
308
-
format: 'did',
309
-
},
310
-
time: {
311
-
type: 'string',
312
-
format: 'datetime',
313
-
},
314
-
},
315
-
},
316
-
info: {
317
-
type: 'object',
318
-
required: ['name'],
319
-
properties: {
320
-
name: {
321
-
type: 'string',
322
-
knownValues: ['OutdatedCursor'],
323
-
},
324
-
message: {
325
-
type: 'string',
326
-
},
327
-
},
328
-
},
329
-
repoOp: {
330
-
type: 'object',
331
-
description:
332
-
"A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null.",
333
-
required: ['action', 'path', 'cid'],
334
-
nullable: ['cid'],
335
-
properties: {
336
-
action: {
337
-
type: 'string',
338
-
knownValues: ['create', 'update', 'delete'],
339
-
},
340
-
path: {
341
-
type: 'string',
342
-
},
343
-
cid: {
344
-
type: 'cid-link',
345
-
},
346
-
},
347
-
},
348
-
},
349
-
}
350
-
351
-
const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos])
352
-
353
-
export const isValidRepoEvent = (evt: unknown) => {
354
-
return lexicons.assertValidXrpcMessage<RepoEvent>('com.atproto.sync.subscribeRepos', evt)
355
-
}
+9
-6
src/firehose/resolver.ts
src/id-resolver.ts
+9
-6
src/firehose/resolver.ts
src/id-resolver.ts
···
3
3
const HOUR = 60e3 * 60
4
4
const DAY = HOUR * 24
5
5
6
-
export interface Resolver {
6
+
7
+
export function createIdResolver() {
8
+
return new IdResolver({
9
+
didCache: new MemoryCache(HOUR, DAY),
10
+
})
11
+
}
12
+
13
+
export interface BidirectionalResolver {
7
14
resolveDidToHandle(did: string): Promise<string>
8
15
resolveDidsToHandles(dids: string[]): Promise<Record<string, string>>
9
16
}
10
17
11
-
export function createResolver() {
12
-
const resolver = new IdResolver({
13
-
didCache: new MemoryCache(HOUR, DAY),
14
-
})
15
-
18
+
export function createBidirectionalResolver(resolver: IdResolver) {
16
19
return {
17
20
async resolveDidToHandle(did: string): Promise<string> {
18
21
const didDoc = await resolver.did.resolveAtprotoData(did)
+10
-7
src/index.ts
+10
-7
src/index.ts
···
3
3
import express, { type Express } from 'express'
4
4
import { pino } from 'pino'
5
5
import type { OAuthClient } from '@atproto/oauth-client-node'
6
+
import { Firehose } from '@atproto/sync'
6
7
7
8
import { createDb, migrateToLatest } from '#/db'
8
9
import { env } from '#/lib/env'
9
-
import { Ingester } from '#/firehose/ingester'
10
+
import { createIngester } from '#/ingester'
10
11
import { createRouter } from '#/routes'
11
12
import { createClient } from '#/auth/client'
12
-
import { createResolver, Resolver } from '#/firehose/resolver'
13
+
import { createBidirectionalResolver, createIdResolver, BidirectionalResolver } from '#/id-resolver'
13
14
import type { Database } from '#/db'
15
+
import { IdResolver, MemoryCache } from '@atproto/identity'
14
16
15
17
// Application state passed to the router and elsewhere
16
18
export type AppContext = {
17
19
db: Database
18
-
ingester: Ingester
20
+
ingester: Firehose
19
21
logger: pino.Logger
20
22
oauthClient: OAuthClient
21
-
resolver: Resolver
23
+
resolver: BidirectionalResolver
22
24
}
23
25
24
26
export class Server {
···
38
40
39
41
// Create the atproto utilities
40
42
const oauthClient = await createClient(db)
41
-
const ingester = new Ingester(db)
42
-
const resolver = createResolver()
43
+
const baseIdResolver = createIdResolver()
44
+
const ingester = createIngester(db, baseIdResolver)
45
+
const resolver = createBidirectionalResolver(baseIdResolver)
43
46
const ctx = {
44
47
db,
45
48
ingester,
···
72
75
73
76
async close() {
74
77
this.ctx.logger.info('sigint received, shutting down')
75
-
this.ctx.ingester.destroy()
78
+
await this.ctx.ingester.destroy()
76
79
return new Promise<void>((resolve) => {
77
80
this.server.close(() => {
78
81
this.ctx.logger.info('server closed')
+3
-1
src/routes.ts
+3
-1
src/routes.ts
···
4
4
import { OAuthResolverError } from '@atproto/oauth-client-node'
5
5
import { isValidHandle } from '@atproto/syntax'
6
6
import { TID } from '@atproto/common'
7
+
import { Agent } from '@atproto/api'
7
8
import express from 'express'
8
9
import { getIronSession } from 'iron-session'
9
10
import type { AppContext } from '#/index'
···
43
44
})
44
45
if (!session.did) return null
45
46
try {
46
-
return await ctx.oauthClient.restore(session.did)
47
+
const oauthSession = await ctx.oauthClient.restore(session.did)
48
+
return oauthSession ? new Agent(oauthSession) : null
47
49
} catch (err) {
48
50
ctx.logger.warn({ err }, 'oauth restore failed')
49
51
await session.destroy()