+7
-3
packages/appview/src/ingestors/jetstream.ts
+7
-3
packages/appview/src/ingestors/jetstream.ts
···
3
import WebSocket from 'ws'
4
5
import type { Database } from '#/db'
6
7
export async function createJetstreamIngester(db: Database) {
8
const logger = pino({ name: 'jetstream ingestion' })
···
19
let lastCursorWrite = 0
20
21
return new Jetstream<XyzStatusphereStatus.Record>({
22
logger,
23
cursor: cursor?.seq || undefined,
24
setCursor: async (seq) => {
···
55
)
56
if (!validatedRecord.success) return
57
58
-
// Store the status in our SQLite
59
await db
60
.insertInto('status')
61
.values({
···
73
)
74
.execute()
75
} else if (evt.commit.operation === 'delete') {
76
-
// Remove the status from our SQLite
77
await db.deleteFrom('status').where('uri', '=', uri).execute()
78
}
79
},
···
85
}
86
87
export class Jetstream<T> {
88
private logger: pino.Logger
89
private handleEvent: (evt: JetstreamEvent<T>) => Promise<void>
90
private onError: (err: unknown) => void
···
95
private wantedCollections: string[]
96
97
constructor({
98
logger,
99
cursor,
100
setCursor,
···
102
onError,
103
wantedCollections,
104
}: {
105
logger: pino.Logger
106
cursor?: number
107
setCursor?: (seq: number) => Promise<void>
···
109
onError: (err: any) => void
110
wantedCollections: string[]
111
}) {
112
this.logger = logger
113
this.cursor = cursor
114
this.setCursor = setCursor
···
123
if (this.cursor !== undefined) {
124
params.append('cursor', this.cursor.toString())
125
}
126
-
return `wss://jetstream.mozzius.dev/subscribe?${params.toString()}`
127
}
128
129
start() {
···
3
import WebSocket from 'ws'
4
5
import type { Database } from '#/db'
6
+
import { env } from '#/lib/env'
7
8
export async function createJetstreamIngester(db: Database) {
9
const logger = pino({ name: 'jetstream ingestion' })
···
20
let lastCursorWrite = 0
21
22
return new Jetstream<XyzStatusphereStatus.Record>({
23
+
instanceUrl: env.JETSTREAM_INSTANCE,
24
logger,
25
cursor: cursor?.seq || undefined,
26
setCursor: async (seq) => {
···
57
)
58
if (!validatedRecord.success) return
59
60
await db
61
.insertInto('status')
62
.values({
···
74
)
75
.execute()
76
} else if (evt.commit.operation === 'delete') {
77
await db.deleteFrom('status').where('uri', '=', uri).execute()
78
}
79
},
···
85
}
86
87
export class Jetstream<T> {
88
+
private instanceUrl: string
89
private logger: pino.Logger
90
private handleEvent: (evt: JetstreamEvent<T>) => Promise<void>
91
private onError: (err: unknown) => void
···
96
private wantedCollections: string[]
97
98
constructor({
99
+
instanceUrl,
100
logger,
101
cursor,
102
setCursor,
···
104
onError,
105
wantedCollections,
106
}: {
107
+
instanceUrl: string
108
logger: pino.Logger
109
cursor?: number
110
setCursor?: (seq: number) => Promise<void>
···
112
onError: (err: any) => void
113
wantedCollections: string[]
114
}) {
115
+
this.instanceUrl = instanceUrl
116
this.logger = logger
117
this.cursor = cursor
118
this.setCursor = setCursor
···
127
if (this.cursor !== undefined) {
128
params.append('cursor', this.cursor.toString())
129
}
130
+
return `${this.instanceUrl}/subscribe?${params.toString()}`
131
}
132
133
start() {
+1
packages/appview/src/lib/env.ts
+1
packages/appview/src/lib/env.ts
-11
packages/client/vite.config.ts
-11
packages/client/vite.config.ts
···
21
'^/(xrpc|oauth|client-metadata\.json)/.*': {
22
target: 'http://localhost:3001',
23
changeOrigin: true,
24
-
configure: (proxy, _options) => {
25
-
proxy.on('error', (err, _req, _res) => {
26
-
console.log('PROXY ERROR', err);
27
-
});
28
-
proxy.on('proxyReq', (proxyReq, req, _res) => {
29
-
console.log('PROXY REQUEST', req.method, req.url);
30
-
});
31
-
proxy.on('proxyRes', (proxyRes, req, _res) => {
32
-
console.log('PROXY RESPONSE', req.method, req.url, proxyRes.statusCode);
33
-
});
34
-
},
35
},
36
},
37
},