mirror of https://git.lenooby09.tech/LeNooby09/social-app.git
1import {
2 AppBskyActorDefs,
3 BskyAgent,
4 ChatBskyConvoDefs,
5 ChatBskyConvoGetLog,
6 ChatBskyConvoSendMessage,
7} from '@atproto/api'
8import {XRPCError} from '@atproto/xrpc'
9import EventEmitter from 'eventemitter3'
10import {nanoid} from 'nanoid/non-secure'
11
12import {networkRetry} from '#/lib/async/retry'
13import {logger} from '#/logger'
14import {isNative} from '#/platform/detection'
15import {
16 ACTIVE_POLL_INTERVAL,
17 BACKGROUND_POLL_INTERVAL,
18 INACTIVE_TIMEOUT,
19 NETWORK_FAILURE_STATUSES,
20} from '#/state/messages/convo/const'
21import {
22 ConvoDispatch,
23 ConvoDispatchEvent,
24 ConvoError,
25 ConvoErrorCode,
26 ConvoEvent,
27 ConvoItem,
28 ConvoItemError,
29 ConvoParams,
30 ConvoState,
31 ConvoStatus,
32} from '#/state/messages/convo/types'
33import {MessagesEventBus} from '#/state/messages/events/agent'
34import {MessagesEventBusError} from '#/state/messages/events/types'
35import {DM_SERVICE_HEADERS} from '#/state/queries/messages/const'
36
37export function isConvoItemMessage(
38 item: ConvoItem,
39): item is ConvoItem & {type: 'message'} {
40 if (!item) return false
41 return (
42 item.type === 'message' ||
43 item.type === 'deleted-message' ||
44 item.type === 'pending-message'
45 )
46}
47
48export class Convo {
49 private id: string
50
51 private agent: BskyAgent
52 private events: MessagesEventBus
53 private senderUserDid: string
54
55 private status: ConvoStatus = ConvoStatus.Uninitialized
56 private error: ConvoError | undefined
57 private oldestRev: string | undefined | null = undefined
58 private isFetchingHistory = false
59 private latestRev: string | undefined = undefined
60
61 private pastMessages: Map<
62 string,
63 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
64 > = new Map()
65 private newMessages: Map<
66 string,
67 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
68 > = new Map()
69 private pendingMessages: Map<
70 string,
71 {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']}
72 > = new Map()
73 private deletedMessages: Set<string> = new Set()
74
75 private isProcessingPendingMessages = false
76
77 private lastActiveTimestamp: number | undefined
78
79 private emitter = new EventEmitter<{event: [ConvoEvent]}>()
80
81 convoId: string
82 convo: ChatBskyConvoDefs.ConvoView | undefined
83 sender: AppBskyActorDefs.ProfileViewBasic | undefined
84 recipients: AppBskyActorDefs.ProfileViewBasic[] | undefined = undefined
85 snapshot: ConvoState | undefined
86
87 constructor(params: ConvoParams) {
88 this.id = nanoid(3)
89 this.convoId = params.convoId
90 this.agent = params.agent
91 this.events = params.events
92 this.senderUserDid = params.agent.session?.did!
93
94 this.subscribe = this.subscribe.bind(this)
95 this.getSnapshot = this.getSnapshot.bind(this)
96 this.sendMessage = this.sendMessage.bind(this)
97 this.deleteMessage = this.deleteMessage.bind(this)
98 this.fetchMessageHistory = this.fetchMessageHistory.bind(this)
99 this.ingestFirehose = this.ingestFirehose.bind(this)
100 this.onFirehoseConnect = this.onFirehoseConnect.bind(this)
101 this.onFirehoseError = this.onFirehoseError.bind(this)
102 }
103
104 private commit() {
105 this.snapshot = undefined
106 this.subscribers.forEach(subscriber => subscriber())
107 }
108
109 private subscribers: (() => void)[] = []
110
111 subscribe(subscriber: () => void) {
112 if (this.subscribers.length === 0) this.init()
113
114 this.subscribers.push(subscriber)
115
116 return () => {
117 this.subscribers = this.subscribers.filter(s => s !== subscriber)
118 if (this.subscribers.length === 0) this.suspend()
119 }
120 }
121
122 getSnapshot(): ConvoState {
123 if (!this.snapshot) this.snapshot = this.generateSnapshot()
124 // logger.debug('Convo: snapshotted', {}, logger.DebugContext.convo)
125 return this.snapshot
126 }
127
128 private generateSnapshot(): ConvoState {
129 switch (this.status) {
130 case ConvoStatus.Initializing: {
131 return {
132 status: ConvoStatus.Initializing,
133 items: [],
134 convo: undefined,
135 error: undefined,
136 sender: undefined,
137 recipients: undefined,
138 isFetchingHistory: this.isFetchingHistory,
139 deleteMessage: undefined,
140 sendMessage: undefined,
141 fetchMessageHistory: undefined,
142 }
143 }
144 case ConvoStatus.Disabled:
145 case ConvoStatus.Suspended:
146 case ConvoStatus.Backgrounded:
147 case ConvoStatus.Ready: {
148 return {
149 status: this.status,
150 items: this.getItems(),
151 convo: this.convo!,
152 error: undefined,
153 sender: this.sender!,
154 recipients: this.recipients!,
155 isFetchingHistory: this.isFetchingHistory,
156 deleteMessage: this.deleteMessage,
157 sendMessage: this.sendMessage,
158 fetchMessageHistory: this.fetchMessageHistory,
159 }
160 }
161 case ConvoStatus.Error: {
162 return {
163 status: ConvoStatus.Error,
164 items: [],
165 convo: undefined,
166 error: this.error!,
167 sender: undefined,
168 recipients: undefined,
169 isFetchingHistory: false,
170 deleteMessage: undefined,
171 sendMessage: undefined,
172 fetchMessageHistory: undefined,
173 }
174 }
175 default: {
176 return {
177 status: ConvoStatus.Uninitialized,
178 items: [],
179 convo: undefined,
180 error: undefined,
181 sender: undefined,
182 recipients: undefined,
183 isFetchingHistory: false,
184 deleteMessage: undefined,
185 sendMessage: undefined,
186 fetchMessageHistory: undefined,
187 }
188 }
189 }
190 }
191
192 dispatch(action: ConvoDispatch) {
193 const prevStatus = this.status
194
195 switch (this.status) {
196 case ConvoStatus.Uninitialized: {
197 switch (action.event) {
198 case ConvoDispatchEvent.Init: {
199 this.status = ConvoStatus.Initializing
200 this.setup()
201 this.setupFirehose()
202 this.requestPollInterval(ACTIVE_POLL_INTERVAL)
203 break
204 }
205 }
206 break
207 }
208 case ConvoStatus.Initializing: {
209 switch (action.event) {
210 case ConvoDispatchEvent.Ready: {
211 this.status = ConvoStatus.Ready
212 this.fetchMessageHistory()
213 break
214 }
215 case ConvoDispatchEvent.Background: {
216 this.status = ConvoStatus.Backgrounded
217 this.fetchMessageHistory()
218 this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
219 break
220 }
221 case ConvoDispatchEvent.Suspend: {
222 this.status = ConvoStatus.Suspended
223 this.cleanupFirehoseConnection?.()
224 this.withdrawRequestedPollInterval()
225 break
226 }
227 case ConvoDispatchEvent.Error: {
228 this.status = ConvoStatus.Error
229 this.error = action.payload
230 this.cleanupFirehoseConnection?.()
231 this.withdrawRequestedPollInterval()
232 break
233 }
234 case ConvoDispatchEvent.Disable: {
235 this.status = ConvoStatus.Disabled
236 this.fetchMessageHistory() // finish init
237 this.cleanupFirehoseConnection?.()
238 this.withdrawRequestedPollInterval()
239 break
240 }
241 }
242 break
243 }
244 case ConvoStatus.Ready: {
245 switch (action.event) {
246 case ConvoDispatchEvent.Resume: {
247 this.refreshConvo()
248 this.requestPollInterval(ACTIVE_POLL_INTERVAL)
249 break
250 }
251 case ConvoDispatchEvent.Background: {
252 this.status = ConvoStatus.Backgrounded
253 this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
254 break
255 }
256 case ConvoDispatchEvent.Suspend: {
257 this.status = ConvoStatus.Suspended
258 this.cleanupFirehoseConnection?.()
259 this.withdrawRequestedPollInterval()
260 break
261 }
262 case ConvoDispatchEvent.Error: {
263 this.status = ConvoStatus.Error
264 this.error = action.payload
265 this.cleanupFirehoseConnection?.()
266 this.withdrawRequestedPollInterval()
267 break
268 }
269 case ConvoDispatchEvent.Disable: {
270 this.status = ConvoStatus.Disabled
271 this.cleanupFirehoseConnection?.()
272 this.withdrawRequestedPollInterval()
273 break
274 }
275 }
276 break
277 }
278 case ConvoStatus.Backgrounded: {
279 switch (action.event) {
280 case ConvoDispatchEvent.Resume: {
281 if (this.wasChatInactive()) {
282 this.reset()
283 } else {
284 if (this.convo) {
285 this.status = ConvoStatus.Ready
286 this.refreshConvo()
287 this.maybeRecoverFromNetworkError()
288 } else {
289 this.status = ConvoStatus.Initializing
290 this.setup()
291 }
292 this.requestPollInterval(ACTIVE_POLL_INTERVAL)
293 }
294 break
295 }
296 case ConvoDispatchEvent.Suspend: {
297 this.status = ConvoStatus.Suspended
298 this.cleanupFirehoseConnection?.()
299 this.withdrawRequestedPollInterval()
300 break
301 }
302 case ConvoDispatchEvent.Error: {
303 this.status = ConvoStatus.Error
304 this.error = action.payload
305 this.cleanupFirehoseConnection?.()
306 this.withdrawRequestedPollInterval()
307 break
308 }
309 case ConvoDispatchEvent.Disable: {
310 this.status = ConvoStatus.Disabled
311 this.cleanupFirehoseConnection?.()
312 this.withdrawRequestedPollInterval()
313 break
314 }
315 }
316 break
317 }
318 case ConvoStatus.Suspended: {
319 switch (action.event) {
320 case ConvoDispatchEvent.Init: {
321 this.reset()
322 break
323 }
324 case ConvoDispatchEvent.Resume: {
325 this.reset()
326 break
327 }
328 case ConvoDispatchEvent.Error: {
329 this.status = ConvoStatus.Error
330 this.error = action.payload
331 break
332 }
333 case ConvoDispatchEvent.Disable: {
334 this.status = ConvoStatus.Disabled
335 break
336 }
337 }
338 break
339 }
340 case ConvoStatus.Error: {
341 switch (action.event) {
342 case ConvoDispatchEvent.Init: {
343 this.reset()
344 break
345 }
346 case ConvoDispatchEvent.Resume: {
347 this.reset()
348 break
349 }
350 case ConvoDispatchEvent.Suspend: {
351 this.status = ConvoStatus.Suspended
352 break
353 }
354 case ConvoDispatchEvent.Error: {
355 this.status = ConvoStatus.Error
356 this.error = action.payload
357 break
358 }
359 case ConvoDispatchEvent.Disable: {
360 this.status = ConvoStatus.Disabled
361 break
362 }
363 }
364 break
365 }
366 case ConvoStatus.Disabled: {
367 // can't do anything
368 break
369 }
370 default:
371 break
372 }
373
374 logger.debug(
375 `Convo: dispatch '${action.event}'`,
376 {
377 id: this.id,
378 prev: prevStatus,
379 next: this.status,
380 },
381 logger.DebugContext.convo,
382 )
383
384 this.updateLastActiveTimestamp()
385 this.commit()
386 }
387
388 private reset() {
389 this.convo = undefined
390 this.sender = undefined
391 this.recipients = undefined
392 this.snapshot = undefined
393
394 this.status = ConvoStatus.Uninitialized
395 this.error = undefined
396 this.oldestRev = undefined
397 this.latestRev = undefined
398
399 this.pastMessages = new Map()
400 this.newMessages = new Map()
401 this.pendingMessages = new Map()
402 this.deletedMessages = new Set()
403
404 this.pendingMessageFailure = null
405 this.fetchMessageHistoryError = undefined
406 this.firehoseError = undefined
407
408 this.dispatch({event: ConvoDispatchEvent.Init})
409 }
410
411 maybeRecoverFromNetworkError() {
412 if (this.firehoseError) {
413 this.firehoseError.retry()
414 this.firehoseError = undefined
415 this.commit()
416 } else {
417 this.batchRetryPendingMessages()
418 }
419
420 if (this.fetchMessageHistoryError) {
421 this.fetchMessageHistoryError.retry()
422 this.fetchMessageHistoryError = undefined
423 this.commit()
424 }
425 }
426
427 private async setup() {
428 try {
429 const {convo, sender, recipients} = await this.fetchConvo()
430
431 this.convo = convo
432 this.sender = sender
433 this.recipients = recipients
434
435 /*
436 * Some validation prior to `Ready` status
437 */
438 if (!this.convo) {
439 throw new Error('Convo: could not find convo')
440 }
441 if (!this.sender) {
442 throw new Error('Convo: could not find sender in convo')
443 }
444 if (!this.recipients) {
445 throw new Error('Convo: could not find recipients in convo')
446 }
447
448 const userIsDisabled = this.sender.chatDisabled as boolean
449
450 if (userIsDisabled) {
451 this.dispatch({event: ConvoDispatchEvent.Disable})
452 } else {
453 this.dispatch({event: ConvoDispatchEvent.Ready})
454 }
455 } catch (e: any) {
456 logger.error(e, {context: 'Convo: setup failed'})
457
458 this.dispatch({
459 event: ConvoDispatchEvent.Error,
460 payload: {
461 exception: e,
462 code: ConvoErrorCode.InitFailed,
463 retry: () => {
464 this.reset()
465 },
466 },
467 })
468 this.commit()
469 }
470 }
471
472 init() {
473 this.dispatch({event: ConvoDispatchEvent.Init})
474 }
475
476 resume() {
477 this.dispatch({event: ConvoDispatchEvent.Resume})
478 }
479
480 background() {
481 this.dispatch({event: ConvoDispatchEvent.Background})
482 }
483
484 suspend() {
485 this.dispatch({event: ConvoDispatchEvent.Suspend})
486 }
487
488 /**
489 * Called on any state transition, like when the chat is backgrounded. This
490 * value is then checked on background -> foreground transitions.
491 */
492 private updateLastActiveTimestamp() {
493 this.lastActiveTimestamp = Date.now()
494 }
495 private wasChatInactive() {
496 if (!this.lastActiveTimestamp) return true
497 return Date.now() - this.lastActiveTimestamp > INACTIVE_TIMEOUT
498 }
499
500 private requestedPollInterval: (() => void) | undefined
501 private requestPollInterval(interval: number) {
502 this.withdrawRequestedPollInterval()
503 this.requestedPollInterval = this.events.requestPollInterval(interval)
504 }
505 private withdrawRequestedPollInterval() {
506 if (this.requestedPollInterval) {
507 this.requestedPollInterval()
508 }
509 }
510
511 private pendingFetchConvo:
512 | Promise<{
513 convo: ChatBskyConvoDefs.ConvoView
514 sender: AppBskyActorDefs.ProfileViewBasic | undefined
515 recipients: AppBskyActorDefs.ProfileViewBasic[]
516 }>
517 | undefined
518 async fetchConvo() {
519 if (this.pendingFetchConvo) return this.pendingFetchConvo
520
521 this.pendingFetchConvo = new Promise<{
522 convo: ChatBskyConvoDefs.ConvoView
523 sender: AppBskyActorDefs.ProfileViewBasic | undefined
524 recipients: AppBskyActorDefs.ProfileViewBasic[]
525 }>(async (resolve, reject) => {
526 try {
527 const response = await networkRetry(2, () => {
528 return this.agent.api.chat.bsky.convo.getConvo(
529 {
530 convoId: this.convoId,
531 },
532 {headers: DM_SERVICE_HEADERS},
533 )
534 })
535
536 const convo = response.data.convo
537
538 resolve({
539 convo,
540 sender: convo.members.find(m => m.did === this.senderUserDid),
541 recipients: convo.members.filter(m => m.did !== this.senderUserDid),
542 })
543 } catch (e) {
544 reject(e)
545 } finally {
546 this.pendingFetchConvo = undefined
547 }
548 })
549
550 return this.pendingFetchConvo
551 }
552
553 async refreshConvo() {
554 try {
555 const {convo, sender, recipients} = await this.fetchConvo()
556 // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE')
557 this.convo = convo || this.convo
558 this.sender = sender || this.sender
559 this.recipients = recipients || this.recipients
560 } catch (e: any) {
561 logger.error(e, {context: `Convo: failed to refresh convo`})
562 }
563 }
564
565 private fetchMessageHistoryError:
566 | {
567 retry: () => void
568 }
569 | undefined
570 async fetchMessageHistory() {
571 logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo)
572
573 /*
574 * If oldestRev is null, we've fetched all history.
575 */
576 if (this.oldestRev === null) return
577
578 /*
579 * Don't fetch again if a fetch is already in progress
580 */
581 if (this.isFetchingHistory) return
582
583 /*
584 * If we've rendered a retry state for history fetching, exit. Upon retry,
585 * this will be removed and we'll try again.
586 */
587 if (this.fetchMessageHistoryError) return
588
589 try {
590 this.isFetchingHistory = true
591 this.commit()
592
593 const nextCursor = this.oldestRev // for TS
594 const response = await networkRetry(2, () => {
595 return this.agent.api.chat.bsky.convo.getMessages(
596 {
597 cursor: nextCursor,
598 convoId: this.convoId,
599 limit: isNative ? 30 : 60,
600 },
601 {headers: DM_SERVICE_HEADERS},
602 )
603 })
604 const {cursor, messages} = response.data
605
606 this.oldestRev = cursor ?? null
607
608 for (const message of messages) {
609 if (
610 ChatBskyConvoDefs.isMessageView(message) ||
611 ChatBskyConvoDefs.isDeletedMessageView(message)
612 ) {
613 /*
614 * If this message is already in new messages, it was added by the
615 * firehose ingestion, and we can safely overwrite it. This trusts
616 * the server on ordering, and keeps it in sync.
617 */
618 if (this.newMessages.has(message.id)) {
619 this.newMessages.delete(message.id)
620 }
621 this.pastMessages.set(message.id, message)
622 }
623 }
624 } catch (e: any) {
625 logger.error('Convo: failed to fetch message history')
626
627 this.fetchMessageHistoryError = {
628 retry: () => {
629 this.fetchMessageHistory()
630 },
631 }
632 } finally {
633 this.isFetchingHistory = false
634 this.commit()
635 }
636 }
637
638 private cleanupFirehoseConnection: (() => void) | undefined
639 private setupFirehose() {
640 // remove old listeners, if exist
641 this.cleanupFirehoseConnection?.()
642
643 // reconnect
644 this.cleanupFirehoseConnection = this.events.on(
645 event => {
646 switch (event.type) {
647 case 'connect': {
648 this.onFirehoseConnect()
649 break
650 }
651 case 'error': {
652 this.onFirehoseError(event.error)
653 break
654 }
655 case 'logs': {
656 this.ingestFirehose(event.logs)
657 break
658 }
659 }
660 },
661 /*
662 * This is VERY important — we only want events for this convo.
663 */
664 {convoId: this.convoId},
665 )
666 }
667
668 private firehoseError: MessagesEventBusError | undefined
669
670 onFirehoseConnect() {
671 this.firehoseError = undefined
672 this.batchRetryPendingMessages()
673 this.commit()
674 }
675
676 onFirehoseError(error?: MessagesEventBusError) {
677 this.firehoseError = error
678 this.commit()
679 }
680
681 ingestFirehose(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
682 let needsCommit = false
683
684 for (const ev of events) {
685 /*
686 * If there's a rev, we should handle it. If there's not a rev, we don't
687 * know what it is.
688 */
689 if (typeof ev.rev === 'string') {
690 const isUninitialized = !this.latestRev
691 const isNewEvent = this.latestRev && ev.rev > this.latestRev
692
693 /*
694 * We received an event prior to fetching any history, so we can safely
695 * use this as the initial history cursor
696 */
697 if (this.oldestRev === undefined && isUninitialized) {
698 this.oldestRev = ev.rev
699 }
700
701 /*
702 * We only care about new events
703 */
704 if (isNewEvent || isUninitialized) {
705 /*
706 * Update rev regardless of if it's a ev type we care about or not
707 */
708 this.latestRev = ev.rev
709
710 if (
711 ChatBskyConvoDefs.isLogCreateMessage(ev) &&
712 ChatBskyConvoDefs.isMessageView(ev.message)
713 ) {
714 /**
715 * If this message is already in new messages, it was added by our
716 * sending logic, and is based on client-ordering. When we receive
717 * the "commited" event from the log, we should replace this
718 * reference and re-insert in order to respect the order we receied
719 * from the log.
720 */
721 if (this.newMessages.has(ev.message.id)) {
722 this.newMessages.delete(ev.message.id)
723 }
724 this.newMessages.set(ev.message.id, ev.message)
725 needsCommit = true
726 } else if (
727 ChatBskyConvoDefs.isLogDeleteMessage(ev) &&
728 ChatBskyConvoDefs.isDeletedMessageView(ev.message)
729 ) {
730 /*
731 * Update if we have this in state. If we don't, don't worry about it.
732 */
733 if (
734 this.pastMessages.has(ev.message.id) ||
735 this.newMessages.has(ev.message.id)
736 ) {
737 this.pastMessages.delete(ev.message.id)
738 this.newMessages.delete(ev.message.id)
739 this.deletedMessages.delete(ev.message.id)
740 needsCommit = true
741 }
742 }
743 }
744 }
745 }
746
747 if (needsCommit) {
748 this.commit()
749 }
750 }
751
752 private pendingMessageFailure: 'recoverable' | 'unrecoverable' | null = null
753
754 sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
755 // Ignore empty messages for now since they have no other purpose atm
756 if (!message.text.trim() && !message.embed) return
757
758 logger.debug('Convo: send message', {}, logger.DebugContext.convo)
759
760 const tempId = nanoid()
761
762 this.pendingMessageFailure = null
763 this.pendingMessages.set(tempId, {
764 id: tempId,
765 message,
766 })
767 this.commit()
768
769 if (!this.isProcessingPendingMessages && !this.pendingMessageFailure) {
770 this.processPendingMessages()
771 }
772 }
773
774 async processPendingMessages() {
775 logger.debug(
776 `Convo: processing messages (${this.pendingMessages.size} remaining)`,
777 {},
778 logger.DebugContext.convo,
779 )
780
781 const pendingMessage = Array.from(this.pendingMessages.values()).shift()
782
783 /*
784 * If there are no pending messages, we're done.
785 */
786 if (!pendingMessage) {
787 this.isProcessingPendingMessages = false
788 return
789 }
790
791 try {
792 this.isProcessingPendingMessages = true
793
794 const {id, message} = pendingMessage
795
796 const response = await this.agent.api.chat.bsky.convo.sendMessage(
797 {
798 convoId: this.convoId,
799 message,
800 },
801 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
802 )
803 const res = response.data
804
805 // remove from queue
806 this.pendingMessages.delete(id)
807
808 /*
809 * Insert into `newMessages` as soon as we have a real ID. That way, when
810 * we get an event log back, we can replace in situ.
811 */
812 this.newMessages.set(res.id, {
813 ...res,
814 $type: 'chat.bsky.convo.defs#messageView',
815 })
816 // render new message state, prior to firehose
817 this.commit()
818
819 // continue queue processing
820 await this.processPendingMessages()
821 } catch (e: any) {
822 logger.error(e, {context: `Convo: failed to send message`})
823 this.handleSendMessageFailure(e)
824 this.isProcessingPendingMessages = false
825 }
826 }
827
828 private handleSendMessageFailure(e: any) {
829 if (e instanceof XRPCError) {
830 if (NETWORK_FAILURE_STATUSES.includes(e.status)) {
831 this.pendingMessageFailure = 'recoverable'
832 } else {
833 this.pendingMessageFailure = 'unrecoverable'
834
835 switch (e.message) {
836 case 'block between recipient and sender':
837 this.emitter.emit('event', {
838 type: 'invalidate-block-state',
839 accountDids: [
840 this.sender!.did,
841 ...this.recipients!.map(r => r.did),
842 ],
843 })
844 break
845 case 'Account is disabled':
846 this.dispatch({event: ConvoDispatchEvent.Disable})
847 break
848 case 'Convo not found':
849 case 'Account does not exist':
850 case 'recipient does not exist':
851 case 'recipient requires incoming messages to come from someone they follow':
852 case 'recipient has disabled incoming messages':
853 break
854 default:
855 logger.warn(
856 `Convo handleSendMessageFailure could not handle error`,
857 {
858 status: e.status,
859 message: e.message,
860 },
861 )
862 break
863 }
864 }
865 } else {
866 this.pendingMessageFailure = 'unrecoverable'
867 logger.error(e, {
868 context: `Convo handleSendMessageFailure received unknown error`,
869 })
870 }
871
872 this.commit()
873 }
874
875 async batchRetryPendingMessages() {
876 if (this.pendingMessageFailure === null) return
877
878 const messageArray = Array.from(this.pendingMessages.values())
879 if (messageArray.length === 0) return
880
881 this.pendingMessageFailure = null
882 this.commit()
883
884 logger.debug(
885 `Convo: batch retrying ${this.pendingMessages.size} pending messages`,
886 {},
887 logger.DebugContext.convo,
888 )
889
890 try {
891 const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch(
892 {
893 items: messageArray.map(({message}) => ({
894 convoId: this.convoId,
895 message,
896 })),
897 },
898 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
899 )
900 const {items} = data
901
902 /*
903 * Insert into `newMessages` as soon as we have a real ID. That way, when
904 * we get an event log back, we can replace in situ.
905 */
906 for (const item of items) {
907 this.newMessages.set(item.id, {
908 ...item,
909 $type: 'chat.bsky.convo.defs#messageView',
910 })
911 }
912
913 for (const pendingMessage of messageArray) {
914 this.pendingMessages.delete(pendingMessage.id)
915 }
916
917 this.commit()
918
919 logger.debug(
920 `Convo: sent ${this.pendingMessages.size} pending messages`,
921 {},
922 logger.DebugContext.convo,
923 )
924 } catch (e: any) {
925 logger.error(e, {context: `Convo: failed to batch retry messages`})
926 this.handleSendMessageFailure(e)
927 }
928 }
929
930 async deleteMessage(messageId: string) {
931 logger.debug('Convo: delete message', {}, logger.DebugContext.convo)
932
933 this.deletedMessages.add(messageId)
934 this.commit()
935
936 try {
937 await networkRetry(2, () => {
938 return this.agent.api.chat.bsky.convo.deleteMessageForSelf(
939 {
940 convoId: this.convoId,
941 messageId,
942 },
943 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
944 )
945 })
946 } catch (e: any) {
947 logger.error(e, {context: `Convo: failed to delete message`})
948 this.deletedMessages.delete(messageId)
949 this.commit()
950 throw e
951 }
952 }
953
954 on(handler: (event: ConvoEvent) => void) {
955 this.emitter.on('event', handler)
956
957 return () => {
958 this.emitter.off('event', handler)
959 }
960 }
961
962 /*
963 * Items in reverse order, since FlatList inverts
964 */
965 getItems(): ConvoItem[] {
966 const items: ConvoItem[] = []
967
968 this.pastMessages.forEach(m => {
969 if (ChatBskyConvoDefs.isMessageView(m)) {
970 items.unshift({
971 type: 'message',
972 key: m.id,
973 message: m,
974 nextMessage: null,
975 })
976 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
977 items.unshift({
978 type: 'deleted-message',
979 key: m.id,
980 message: m,
981 nextMessage: null,
982 })
983 }
984 })
985
986 if (this.fetchMessageHistoryError) {
987 items.unshift({
988 type: 'error',
989 code: ConvoItemError.HistoryFailed,
990 key: ConvoItemError.HistoryFailed,
991 retry: () => {
992 this.maybeRecoverFromNetworkError()
993 },
994 })
995 }
996
997 this.newMessages.forEach(m => {
998 if (ChatBskyConvoDefs.isMessageView(m)) {
999 items.push({
1000 type: 'message',
1001 key: m.id,
1002 message: m,
1003 nextMessage: null,
1004 })
1005 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
1006 items.push({
1007 type: 'deleted-message',
1008 key: m.id,
1009 message: m,
1010 nextMessage: null,
1011 })
1012 }
1013 })
1014
1015 this.pendingMessages.forEach(m => {
1016 items.push({
1017 type: 'pending-message',
1018 key: m.id,
1019 message: {
1020 ...m.message,
1021 embed: undefined,
1022 $type: 'chat.bsky.convo.defs#messageView',
1023 id: nanoid(),
1024 rev: '__fake__',
1025 sentAt: new Date().toISOString(),
1026 /*
1027 * `getItems` is only run in "active" status states, where
1028 * `this.sender` is defined
1029 */
1030 sender: this.sender!,
1031 },
1032 nextMessage: null,
1033 failed: this.pendingMessageFailure !== null,
1034 retry:
1035 this.pendingMessageFailure === 'recoverable'
1036 ? () => {
1037 this.maybeRecoverFromNetworkError()
1038 }
1039 : undefined,
1040 })
1041 })
1042
1043 if (this.firehoseError) {
1044 items.push({
1045 type: 'error',
1046 code: ConvoItemError.FirehoseFailed,
1047 key: ConvoItemError.FirehoseFailed,
1048 retry: () => {
1049 this.firehoseError?.retry()
1050 },
1051 })
1052 }
1053
1054 return items
1055 .filter(item => {
1056 if (isConvoItemMessage(item)) {
1057 return !this.deletedMessages.has(item.message.id)
1058 }
1059 return true
1060 })
1061 .map((item, i, arr) => {
1062 let nextMessage = null
1063 const isMessage = isConvoItemMessage(item)
1064
1065 if (isMessage) {
1066 if (
1067 isMessage &&
1068 (ChatBskyConvoDefs.isMessageView(item.message) ||
1069 ChatBskyConvoDefs.isDeletedMessageView(item.message))
1070 ) {
1071 const next = arr[i + 1]
1072
1073 if (
1074 isConvoItemMessage(next) &&
1075 next &&
1076 (ChatBskyConvoDefs.isMessageView(next.message) ||
1077 ChatBskyConvoDefs.isDeletedMessageView(next.message))
1078 ) {
1079 nextMessage = next.message
1080 }
1081 }
1082
1083 return {
1084 ...item,
1085 nextMessage,
1086 }
1087 }
1088
1089 return item
1090 })
1091 }
1092}