mirror of https://git.lenooby09.tech/LeNooby09/social-app.git
1import {BskyAgent, ChatBskyConvoGetLog} from '@atproto/api'
2import EventEmitter from 'eventemitter3'
3import {nanoid} from 'nanoid/non-secure'
4
5import {networkRetry} from '#/lib/async/retry'
6import {logger} from '#/logger'
7import {
8 BACKGROUND_POLL_INTERVAL,
9 DEFAULT_POLL_INTERVAL,
10} from '#/state/messages/events/const'
11import {
12 MessagesEventBusDispatch,
13 MessagesEventBusDispatchEvent,
14 MessagesEventBusErrorCode,
15 MessagesEventBusEvent,
16 MessagesEventBusParams,
17 MessagesEventBusStatus,
18} from '#/state/messages/events/types'
19import {DM_SERVICE_HEADERS} from '#/state/queries/messages/const'
20
21const LOGGER_CONTEXT = 'MessagesEventBus'
22
23export class MessagesEventBus {
24 private id: string
25
26 private agent: BskyAgent
27 private emitter = new EventEmitter<{event: [MessagesEventBusEvent]}>()
28
29 private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing
30 private latestRev: string | undefined = undefined
31 private pollInterval = DEFAULT_POLL_INTERVAL
32 private requestedPollIntervals: Map<string, number> = new Map()
33
34 constructor(params: MessagesEventBusParams) {
35 this.id = nanoid(3)
36 this.agent = params.agent
37
38 this.init()
39 }
40
41 requestPollInterval(interval: number) {
42 const id = nanoid()
43 this.requestedPollIntervals.set(id, interval)
44 this.dispatch({
45 event: MessagesEventBusDispatchEvent.UpdatePoll,
46 })
47 return () => {
48 this.requestedPollIntervals.delete(id)
49 this.dispatch({
50 event: MessagesEventBusDispatchEvent.UpdatePoll,
51 })
52 }
53 }
54
55 getLatestRev() {
56 return this.latestRev
57 }
58
59 on(
60 handler: (event: MessagesEventBusEvent) => void,
61 options: {
62 convoId?: string
63 },
64 ) {
65 const handle = (event: MessagesEventBusEvent) => {
66 if (event.type === 'logs' && options.convoId) {
67 const filteredLogs = event.logs.filter(log => {
68 if (
69 typeof log.convoId === 'string' &&
70 log.convoId === options.convoId
71 ) {
72 return log.convoId === options.convoId
73 }
74 return false
75 })
76
77 if (filteredLogs.length > 0) {
78 handler({
79 ...event,
80 logs: filteredLogs,
81 })
82 }
83 } else {
84 handler(event)
85 }
86 }
87
88 this.emitter.on('event', handle)
89
90 return () => {
91 this.emitter.off('event', handle)
92 }
93 }
94
95 background() {
96 logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
97 this.dispatch({event: MessagesEventBusDispatchEvent.Background})
98 }
99
100 suspend() {
101 logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
102 this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
103 }
104
105 resume() {
106 logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
107 this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
108 }
109
110 private dispatch(action: MessagesEventBusDispatch) {
111 const prevStatus = this.status
112
113 switch (this.status) {
114 case MessagesEventBusStatus.Initializing: {
115 switch (action.event) {
116 case MessagesEventBusDispatchEvent.Ready: {
117 this.status = MessagesEventBusStatus.Ready
118 this.resetPoll()
119 this.emitter.emit('event', {type: 'connect'})
120 break
121 }
122 case MessagesEventBusDispatchEvent.Background: {
123 this.status = MessagesEventBusStatus.Backgrounded
124 this.resetPoll()
125 this.emitter.emit('event', {type: 'connect'})
126 break
127 }
128 case MessagesEventBusDispatchEvent.Suspend: {
129 this.status = MessagesEventBusStatus.Suspended
130 break
131 }
132 case MessagesEventBusDispatchEvent.Error: {
133 this.status = MessagesEventBusStatus.Error
134 this.emitter.emit('event', {type: 'error', error: action.payload})
135 break
136 }
137 }
138 break
139 }
140 case MessagesEventBusStatus.Ready: {
141 switch (action.event) {
142 case MessagesEventBusDispatchEvent.Background: {
143 this.status = MessagesEventBusStatus.Backgrounded
144 this.resetPoll()
145 break
146 }
147 case MessagesEventBusDispatchEvent.Suspend: {
148 this.status = MessagesEventBusStatus.Suspended
149 this.stopPoll()
150 break
151 }
152 case MessagesEventBusDispatchEvent.Error: {
153 this.status = MessagesEventBusStatus.Error
154 this.stopPoll()
155 this.emitter.emit('event', {type: 'error', error: action.payload})
156 break
157 }
158 case MessagesEventBusDispatchEvent.UpdatePoll: {
159 this.resetPoll()
160 break
161 }
162 }
163 break
164 }
165 case MessagesEventBusStatus.Backgrounded: {
166 switch (action.event) {
167 case MessagesEventBusDispatchEvent.Resume: {
168 this.status = MessagesEventBusStatus.Ready
169 this.resetPoll()
170 break
171 }
172 case MessagesEventBusDispatchEvent.Suspend: {
173 this.status = MessagesEventBusStatus.Suspended
174 this.stopPoll()
175 break
176 }
177 case MessagesEventBusDispatchEvent.Error: {
178 this.status = MessagesEventBusStatus.Error
179 this.stopPoll()
180 this.emitter.emit('event', {type: 'error', error: action.payload})
181 break
182 }
183 case MessagesEventBusDispatchEvent.UpdatePoll: {
184 this.resetPoll()
185 break
186 }
187 }
188 break
189 }
190 case MessagesEventBusStatus.Suspended: {
191 switch (action.event) {
192 case MessagesEventBusDispatchEvent.Resume: {
193 this.status = MessagesEventBusStatus.Ready
194 this.resetPoll()
195 break
196 }
197 case MessagesEventBusDispatchEvent.Background: {
198 this.status = MessagesEventBusStatus.Backgrounded
199 this.resetPoll()
200 break
201 }
202 case MessagesEventBusDispatchEvent.Error: {
203 this.status = MessagesEventBusStatus.Error
204 this.stopPoll()
205 this.emitter.emit('event', {type: 'error', error: action.payload})
206 break
207 }
208 }
209 break
210 }
211 case MessagesEventBusStatus.Error: {
212 switch (action.event) {
213 case MessagesEventBusDispatchEvent.UpdatePoll:
214 case MessagesEventBusDispatchEvent.Resume: {
215 // basically reset
216 this.status = MessagesEventBusStatus.Initializing
217 this.latestRev = undefined
218 this.init()
219 break
220 }
221 }
222 break
223 }
224 default:
225 break
226 }
227
228 logger.debug(
229 `${LOGGER_CONTEXT}: dispatch '${action.event}'`,
230 {
231 id: this.id,
232 prev: prevStatus,
233 next: this.status,
234 },
235 logger.DebugContext.convo,
236 )
237 }
238
239 private async init() {
240 logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
241
242 try {
243 const response = await networkRetry(2, () => {
244 return this.agent.api.chat.bsky.convo.listConvos(
245 {
246 limit: 1,
247 },
248 {headers: DM_SERVICE_HEADERS},
249 )
250 })
251 // throw new Error('UNCOMMENT TO TEST INIT FAILURE')
252
253 const {convos} = response.data
254
255 for (const convo of convos) {
256 if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
257 this.latestRev = convo.rev
258 }
259 }
260
261 this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
262 } catch (e: any) {
263 logger.error(e, {
264 context: `${LOGGER_CONTEXT}: init failed`,
265 })
266
267 this.dispatch({
268 event: MessagesEventBusDispatchEvent.Error,
269 payload: {
270 exception: e,
271 code: MessagesEventBusErrorCode.InitFailed,
272 retry: () => {
273 this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
274 },
275 },
276 })
277 }
278 }
279
280 /*
281 * Polling
282 */
283
284 private isPolling = false
285 private pollIntervalRef: NodeJS.Timeout | undefined
286
287 private getPollInterval() {
288 switch (this.status) {
289 case MessagesEventBusStatus.Ready: {
290 const requested = Array.from(this.requestedPollIntervals.values())
291 const lowest = Math.min(DEFAULT_POLL_INTERVAL, ...requested)
292 return lowest
293 }
294 case MessagesEventBusStatus.Backgrounded: {
295 return BACKGROUND_POLL_INTERVAL
296 }
297 default:
298 return DEFAULT_POLL_INTERVAL
299 }
300 }
301
302 private resetPoll() {
303 this.pollInterval = this.getPollInterval()
304 this.stopPoll()
305 this.startPoll()
306 }
307
308 private startPoll() {
309 if (!this.isPolling) this.poll()
310
311 this.pollIntervalRef = setInterval(() => {
312 if (this.isPolling) return
313 this.poll()
314 }, this.pollInterval)
315 }
316
317 private stopPoll() {
318 if (this.pollIntervalRef) clearInterval(this.pollIntervalRef)
319 }
320
321 private async poll() {
322 if (this.isPolling) return
323
324 this.isPolling = true
325
326 // logger.debug(
327 // `${LOGGER_CONTEXT}: poll`,
328 // {
329 // requestedPollIntervals: Array.from(
330 // this.requestedPollIntervals.values(),
331 // ),
332 // },
333 // logger.DebugContext.convo,
334 // )
335
336 try {
337 const response = await networkRetry(2, () => {
338 return this.agent.api.chat.bsky.convo.getLog(
339 {
340 cursor: this.latestRev,
341 },
342 {headers: DM_SERVICE_HEADERS},
343 )
344 })
345
346 // throw new Error('UNCOMMENT TO TEST POLL FAILURE')
347
348 const {logs: events} = response.data
349
350 let needsEmit = false
351 let batch: ChatBskyConvoGetLog.OutputSchema['logs'] = []
352
353 for (const ev of events) {
354 /*
355 * If there's a rev, we should handle it. If there's not a rev, we don't
356 * know what it is.
357 */
358 if (typeof ev.rev === 'string') {
359 /*
360 * We only care about new events
361 */
362 if (ev.rev > (this.latestRev = this.latestRev || ev.rev)) {
363 /*
364 * Update rev regardless of if it's a ev type we care about or not
365 */
366 this.latestRev = ev.rev
367 needsEmit = true
368 batch.push(ev)
369 }
370 }
371 }
372
373 if (needsEmit) {
374 try {
375 this.emitter.emit('event', {type: 'logs', logs: batch})
376 } catch (e: any) {
377 logger.error(e, {
378 context: `${LOGGER_CONTEXT}: process latest events`,
379 })
380 }
381 }
382 } catch (e: any) {
383 logger.error(e, {context: `${LOGGER_CONTEXT}: poll events failed`})
384
385 this.dispatch({
386 event: MessagesEventBusDispatchEvent.Error,
387 payload: {
388 exception: e,
389 code: MessagesEventBusErrorCode.PollFailed,
390 retry: () => {
391 this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
392 },
393 },
394 })
395 } finally {
396 this.isPolling = false
397 }
398 }
399}