+19
packages/appview/src/db.ts
+19
packages/appview/src/db.ts
···
13
13
status: Status
14
14
auth_session: AuthSession
15
15
auth_state: AuthState
16
+
cursor: Cursor
16
17
}
17
18
18
19
export type Status = {
···
33
34
state: AuthStateJson
34
35
}
35
36
37
+
export type Cursor = {
38
+
id: number
39
+
seq: number
40
+
}
41
+
36
42
type AuthStateJson = string
37
43
38
44
type AuthSessionJson = string
···
44
50
const migrationProvider: MigrationProvider = {
45
51
async getMigrations() {
46
52
return migrations
53
+
},
54
+
}
55
+
56
+
migrations['002'] = {
57
+
async up(db: Kysely<unknown>) {
58
+
await db.schema
59
+
.createTable('cursor')
60
+
.addColumn('id', 'integer', (col) => col.primaryKey())
61
+
.addColumn('seq', 'integer', (col) => col.notNull())
62
+
.execute()
63
+
},
64
+
async down(db: Kysely<unknown>) {
65
+
await db.schema.dropTable('cursor').execute()
47
66
},
48
67
}
49
68
+1
-1
packages/appview/src/index.ts
+1
-1
packages/appview/src/index.ts
···
47
47
// Create the atproto utilities
48
48
const oauthClient = await createClient(db)
49
49
const baseIdResolver = createIdResolver()
50
-
const ingester = createIngester(db, baseIdResolver)
50
+
const ingester = await createIngester(db, baseIdResolver)
51
51
const resolver = createBidirectionalResolver(baseIdResolver)
52
52
const ctx = {
53
53
db,
+31
-2
packages/appview/src/ingester.ts
+31
-2
packages/appview/src/ingester.ts
···
1
1
import { IdResolver } from '@atproto/identity'
2
-
import { Firehose, type Event } from '@atproto/sync'
2
+
import { Firehose, MemoryRunner, type Event } from '@atproto/sync'
3
3
import { XyzStatusphereStatus } from '@statusphere/lexicon'
4
4
import pino from 'pino'
5
5
6
6
import type { Database } from '#/db'
7
7
8
-
export function createIngester(db: Database, idResolver: IdResolver) {
8
+
export async function createIngester(db: Database, idResolver: IdResolver) {
9
9
const logger = pino({ name: 'firehose ingestion' })
10
+
11
+
const cursor = await db
12
+
.selectFrom('cursor')
13
+
.where('id', '=', 1)
14
+
.select('seq')
15
+
.executeTakeFirst()
16
+
17
+
logger.info(`start cursor: ${cursor?.seq}`)
18
+
19
+
// For throttling cursor writes
20
+
let lastCursorWrite = 0
21
+
22
+
const runner = new MemoryRunner({
23
+
startCursor: cursor?.seq || undefined,
24
+
setCursor: async (seq) => {
25
+
const now = Date.now()
26
+
27
+
if (now - lastCursorWrite >= 10000) {
28
+
lastCursorWrite = now
29
+
await db
30
+
.updateTable('cursor')
31
+
.set({ seq })
32
+
.where('id', '=', 1)
33
+
.execute()
34
+
}
35
+
},
36
+
})
37
+
10
38
return new Firehose({
11
39
idResolver,
40
+
runner,
12
41
handleEvent: async (evt: Event) => {
13
42
// Watch for write events
14
43
if (evt.event === 'create' || evt.event === 'update') {