mirror of https://git.lenooby09.tech/LeNooby09/social-app.git
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at utm-source 399 lines 11 kB view raw
1import {BskyAgent, ChatBskyConvoGetLog} from '@atproto/api' 2import EventEmitter from 'eventemitter3' 3import {nanoid} from 'nanoid/non-secure' 4 5import {networkRetry} from '#/lib/async/retry' 6import {logger} from '#/logger' 7import { 8 BACKGROUND_POLL_INTERVAL, 9 DEFAULT_POLL_INTERVAL, 10} from '#/state/messages/events/const' 11import { 12 MessagesEventBusDispatch, 13 MessagesEventBusDispatchEvent, 14 MessagesEventBusErrorCode, 15 MessagesEventBusEvent, 16 MessagesEventBusParams, 17 MessagesEventBusStatus, 18} from '#/state/messages/events/types' 19import {DM_SERVICE_HEADERS} from '#/state/queries/messages/const' 20 21const LOGGER_CONTEXT = 'MessagesEventBus' 22 23export class MessagesEventBus { 24 private id: string 25 26 private agent: BskyAgent 27 private emitter = new EventEmitter<{event: [MessagesEventBusEvent]}>() 28 29 private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing 30 private latestRev: string | undefined = undefined 31 private pollInterval = DEFAULT_POLL_INTERVAL 32 private requestedPollIntervals: Map<string, number> = new Map() 33 34 constructor(params: MessagesEventBusParams) { 35 this.id = nanoid(3) 36 this.agent = params.agent 37 38 this.init() 39 } 40 41 requestPollInterval(interval: number) { 42 const id = nanoid() 43 this.requestedPollIntervals.set(id, interval) 44 this.dispatch({ 45 event: MessagesEventBusDispatchEvent.UpdatePoll, 46 }) 47 return () => { 48 this.requestedPollIntervals.delete(id) 49 this.dispatch({ 50 event: MessagesEventBusDispatchEvent.UpdatePoll, 51 }) 52 } 53 } 54 55 getLatestRev() { 56 return this.latestRev 57 } 58 59 on( 60 handler: (event: MessagesEventBusEvent) => void, 61 options: { 62 convoId?: string 63 }, 64 ) { 65 const handle = (event: MessagesEventBusEvent) => { 66 if (event.type === 'logs' && options.convoId) { 67 const filteredLogs = event.logs.filter(log => { 68 if ( 69 typeof log.convoId === 'string' && 70 log.convoId === options.convoId 71 ) { 72 return log.convoId === options.convoId 73 } 74 return false 75 }) 76 77 if (filteredLogs.length > 0) { 78 handler({ 79 ...event, 80 logs: filteredLogs, 81 }) 82 } 83 } else { 84 handler(event) 85 } 86 } 87 88 this.emitter.on('event', handle) 89 90 return () => { 91 this.emitter.off('event', handle) 92 } 93 } 94 95 background() { 96 logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) 97 this.dispatch({event: MessagesEventBusDispatchEvent.Background}) 98 } 99 100 suspend() { 101 logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) 102 this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) 103 } 104 105 resume() { 106 logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) 107 this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) 108 } 109 110 private dispatch(action: MessagesEventBusDispatch) { 111 const prevStatus = this.status 112 113 switch (this.status) { 114 case MessagesEventBusStatus.Initializing: { 115 switch (action.event) { 116 case MessagesEventBusDispatchEvent.Ready: { 117 this.status = MessagesEventBusStatus.Ready 118 this.resetPoll() 119 this.emitter.emit('event', {type: 'connect'}) 120 break 121 } 122 case MessagesEventBusDispatchEvent.Background: { 123 this.status = MessagesEventBusStatus.Backgrounded 124 this.resetPoll() 125 this.emitter.emit('event', {type: 'connect'}) 126 break 127 } 128 case MessagesEventBusDispatchEvent.Suspend: { 129 this.status = MessagesEventBusStatus.Suspended 130 break 131 } 132 case MessagesEventBusDispatchEvent.Error: { 133 this.status = MessagesEventBusStatus.Error 134 this.emitter.emit('event', {type: 'error', error: action.payload}) 135 break 136 } 137 } 138 break 139 } 140 case MessagesEventBusStatus.Ready: { 141 switch (action.event) { 142 case MessagesEventBusDispatchEvent.Background: { 143 this.status = MessagesEventBusStatus.Backgrounded 144 this.resetPoll() 145 break 146 } 147 case MessagesEventBusDispatchEvent.Suspend: { 148 this.status = MessagesEventBusStatus.Suspended 149 this.stopPoll() 150 break 151 } 152 case MessagesEventBusDispatchEvent.Error: { 153 this.status = MessagesEventBusStatus.Error 154 this.stopPoll() 155 this.emitter.emit('event', {type: 'error', error: action.payload}) 156 break 157 } 158 case MessagesEventBusDispatchEvent.UpdatePoll: { 159 this.resetPoll() 160 break 161 } 162 } 163 break 164 } 165 case MessagesEventBusStatus.Backgrounded: { 166 switch (action.event) { 167 case MessagesEventBusDispatchEvent.Resume: { 168 this.status = MessagesEventBusStatus.Ready 169 this.resetPoll() 170 break 171 } 172 case MessagesEventBusDispatchEvent.Suspend: { 173 this.status = MessagesEventBusStatus.Suspended 174 this.stopPoll() 175 break 176 } 177 case MessagesEventBusDispatchEvent.Error: { 178 this.status = MessagesEventBusStatus.Error 179 this.stopPoll() 180 this.emitter.emit('event', {type: 'error', error: action.payload}) 181 break 182 } 183 case MessagesEventBusDispatchEvent.UpdatePoll: { 184 this.resetPoll() 185 break 186 } 187 } 188 break 189 } 190 case MessagesEventBusStatus.Suspended: { 191 switch (action.event) { 192 case MessagesEventBusDispatchEvent.Resume: { 193 this.status = MessagesEventBusStatus.Ready 194 this.resetPoll() 195 break 196 } 197 case MessagesEventBusDispatchEvent.Background: { 198 this.status = MessagesEventBusStatus.Backgrounded 199 this.resetPoll() 200 break 201 } 202 case MessagesEventBusDispatchEvent.Error: { 203 this.status = MessagesEventBusStatus.Error 204 this.stopPoll() 205 this.emitter.emit('event', {type: 'error', error: action.payload}) 206 break 207 } 208 } 209 break 210 } 211 case MessagesEventBusStatus.Error: { 212 switch (action.event) { 213 case MessagesEventBusDispatchEvent.UpdatePoll: 214 case MessagesEventBusDispatchEvent.Resume: { 215 // basically reset 216 this.status = MessagesEventBusStatus.Initializing 217 this.latestRev = undefined 218 this.init() 219 break 220 } 221 } 222 break 223 } 224 default: 225 break 226 } 227 228 logger.debug( 229 `${LOGGER_CONTEXT}: dispatch '${action.event}'`, 230 { 231 id: this.id, 232 prev: prevStatus, 233 next: this.status, 234 }, 235 logger.DebugContext.convo, 236 ) 237 } 238 239 private async init() { 240 logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) 241 242 try { 243 const response = await networkRetry(2, () => { 244 return this.agent.api.chat.bsky.convo.listConvos( 245 { 246 limit: 1, 247 }, 248 {headers: DM_SERVICE_HEADERS}, 249 ) 250 }) 251 // throw new Error('UNCOMMENT TO TEST INIT FAILURE') 252 253 const {convos} = response.data 254 255 for (const convo of convos) { 256 if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { 257 this.latestRev = convo.rev 258 } 259 } 260 261 this.dispatch({event: MessagesEventBusDispatchEvent.Ready}) 262 } catch (e: any) { 263 logger.error(e, { 264 context: `${LOGGER_CONTEXT}: init failed`, 265 }) 266 267 this.dispatch({ 268 event: MessagesEventBusDispatchEvent.Error, 269 payload: { 270 exception: e, 271 code: MessagesEventBusErrorCode.InitFailed, 272 retry: () => { 273 this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) 274 }, 275 }, 276 }) 277 } 278 } 279 280 /* 281 * Polling 282 */ 283 284 private isPolling = false 285 private pollIntervalRef: NodeJS.Timeout | undefined 286 287 private getPollInterval() { 288 switch (this.status) { 289 case MessagesEventBusStatus.Ready: { 290 const requested = Array.from(this.requestedPollIntervals.values()) 291 const lowest = Math.min(DEFAULT_POLL_INTERVAL, ...requested) 292 return lowest 293 } 294 case MessagesEventBusStatus.Backgrounded: { 295 return BACKGROUND_POLL_INTERVAL 296 } 297 default: 298 return DEFAULT_POLL_INTERVAL 299 } 300 } 301 302 private resetPoll() { 303 this.pollInterval = this.getPollInterval() 304 this.stopPoll() 305 this.startPoll() 306 } 307 308 private startPoll() { 309 if (!this.isPolling) this.poll() 310 311 this.pollIntervalRef = setInterval(() => { 312 if (this.isPolling) return 313 this.poll() 314 }, this.pollInterval) 315 } 316 317 private stopPoll() { 318 if (this.pollIntervalRef) clearInterval(this.pollIntervalRef) 319 } 320 321 private async poll() { 322 if (this.isPolling) return 323 324 this.isPolling = true 325 326 // logger.debug( 327 // `${LOGGER_CONTEXT}: poll`, 328 // { 329 // requestedPollIntervals: Array.from( 330 // this.requestedPollIntervals.values(), 331 // ), 332 // }, 333 // logger.DebugContext.convo, 334 // ) 335 336 try { 337 const response = await networkRetry(2, () => { 338 return this.agent.api.chat.bsky.convo.getLog( 339 { 340 cursor: this.latestRev, 341 }, 342 {headers: DM_SERVICE_HEADERS}, 343 ) 344 }) 345 346 // throw new Error('UNCOMMENT TO TEST POLL FAILURE') 347 348 const {logs: events} = response.data 349 350 let needsEmit = false 351 let batch: ChatBskyConvoGetLog.OutputSchema['logs'] = [] 352 353 for (const ev of events) { 354 /* 355 * If there's a rev, we should handle it. If there's not a rev, we don't 356 * know what it is. 357 */ 358 if (typeof ev.rev === 'string') { 359 /* 360 * We only care about new events 361 */ 362 if (ev.rev > (this.latestRev = this.latestRev || ev.rev)) { 363 /* 364 * Update rev regardless of if it's a ev type we care about or not 365 */ 366 this.latestRev = ev.rev 367 needsEmit = true 368 batch.push(ev) 369 } 370 } 371 } 372 373 if (needsEmit) { 374 try { 375 this.emitter.emit('event', {type: 'logs', logs: batch}) 376 } catch (e: any) { 377 logger.error(e, { 378 context: `${LOGGER_CONTEXT}: process latest events`, 379 }) 380 } 381 } 382 } catch (e: any) { 383 logger.error(e, {context: `${LOGGER_CONTEXT}: poll events failed`}) 384 385 this.dispatch({ 386 event: MessagesEventBusDispatchEvent.Error, 387 payload: { 388 exception: e, 389 code: MessagesEventBusErrorCode.PollFailed, 390 retry: () => { 391 this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) 392 }, 393 }, 394 }) 395 } finally { 396 this.isPolling = false 397 } 398 } 399}