forked from
jollywhoppers.com/witchsky.app
Bluesky app fork with some witchin' additions 💫
1import {
2 type AtpAgent,
3 type ChatBskyActorDefs,
4 ChatBskyConvoDefs,
5 type ChatBskyConvoGetLog,
6 type ChatBskyConvoSendMessage,
7} from '@atproto/api'
8import {XRPCError} from '@atproto/xrpc'
9import EventEmitter from 'eventemitter3'
10import {nanoid} from 'nanoid/non-secure'
11
12import {networkRetry} from '#/lib/async/retry'
13import {DM_SERVICE_HEADERS} from '#/lib/constants'
14import {
15 isErrorMaybeAppPasswordPermissions,
16 isNetworkError,
17} from '#/lib/strings/errors'
18import {Logger} from '#/logger'
19import {
20 ACTIVE_POLL_INTERVAL,
21 BACKGROUND_POLL_INTERVAL,
22 INACTIVE_TIMEOUT,
23 NETWORK_FAILURE_STATUSES,
24} from '#/state/messages/convo/const'
25import {
26 type ConvoDispatch,
27 ConvoDispatchEvent,
28 type ConvoError,
29 ConvoErrorCode,
30 type ConvoEvent,
31 type ConvoItem,
32 ConvoItemError,
33 type ConvoParams,
34 type ConvoState,
35 ConvoStatus,
36} from '#/state/messages/convo/types'
37import {type MessagesEventBus} from '#/state/messages/events/agent'
38import {type MessagesEventBusError} from '#/state/messages/events/types'
39import {IS_NATIVE} from '#/env'
40
41const logger = Logger.create(Logger.Context.ConversationAgent)
42
43export function isConvoItemMessage(
44 item: ConvoItem,
45): item is ConvoItem & {type: 'message'} {
46 if (!item) return false
47 return (
48 item.type === 'message' ||
49 item.type === 'deleted-message' ||
50 item.type === 'pending-message'
51 )
52}
53
54export class Convo {
55 private id: string
56
57 private agent: AtpAgent
58 private events: MessagesEventBus
59 private senderUserDid: string
60
61 private status: ConvoStatus = ConvoStatus.Uninitialized
62 private error: ConvoError | undefined
63 private oldestRev: string | undefined | null = undefined
64 private isFetchingHistory = false
65 private latestRev: string | undefined = undefined
66
67 private pastMessages: Map<
68 string,
69 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
70 > = new Map()
71 private newMessages: Map<
72 string,
73 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
74 > = new Map()
75 private pendingMessages: Map<
76 string,
77 {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']}
78 > = new Map()
79 private deletedMessages: Set<string> = new Set()
80
81 private isProcessingPendingMessages = false
82
83 private lastActiveTimestamp: number | undefined
84
85 private emitter = new EventEmitter<{event: [ConvoEvent]}>()
86
87 convoId: string
88 convo: ChatBskyConvoDefs.ConvoView | undefined
89 sender: ChatBskyActorDefs.ProfileViewBasic | undefined
90 recipients: ChatBskyActorDefs.ProfileViewBasic[] | undefined
91 snapshot: ConvoState | undefined
92
93 constructor(params: ConvoParams) {
94 this.id = nanoid(3)
95 this.convoId = params.convoId
96 this.agent = params.agent
97 this.events = params.events
98 this.senderUserDid = params.agent.assertDid
99
100 if (params.placeholderData) {
101 this.setupPlaceholderData(params.placeholderData)
102 }
103
104 this.subscribe = this.subscribe.bind(this)
105 this.getSnapshot = this.getSnapshot.bind(this)
106 this.sendMessage = this.sendMessage.bind(this)
107 this.deleteMessage = this.deleteMessage.bind(this)
108 this.fetchMessageHistory = this.fetchMessageHistory.bind(this)
109 this.ingestFirehose = this.ingestFirehose.bind(this)
110 this.onFirehoseConnect = this.onFirehoseConnect.bind(this)
111 this.onFirehoseError = this.onFirehoseError.bind(this)
112 this.markConvoAccepted = this.markConvoAccepted.bind(this)
113 this.addReaction = this.addReaction.bind(this)
114 this.removeReaction = this.removeReaction.bind(this)
115 }
116
117 private commit() {
118 this.snapshot = undefined
119 this.subscribers.forEach(subscriber => subscriber())
120 }
121
122 private subscribers: (() => void)[] = []
123
124 subscribe(subscriber: () => void) {
125 if (this.subscribers.length === 0) this.init()
126
127 this.subscribers.push(subscriber)
128
129 return () => {
130 this.subscribers = this.subscribers.filter(s => s !== subscriber)
131 if (this.subscribers.length === 0) this.suspend()
132 }
133 }
134
135 getSnapshot(): ConvoState {
136 if (!this.snapshot) this.snapshot = this.generateSnapshot()
137 // logger.debug('snapshotted', {})
138 return this.snapshot
139 }
140
141 private generateSnapshot(): ConvoState {
142 switch (this.status) {
143 case ConvoStatus.Initializing: {
144 return {
145 status: ConvoStatus.Initializing,
146 items: [],
147 convo: this.convo,
148 error: undefined,
149 sender: this.sender,
150 recipients: this.recipients,
151 isFetchingHistory: this.isFetchingHistory,
152 deleteMessage: undefined,
153 sendMessage: undefined,
154 fetchMessageHistory: undefined,
155 markConvoAccepted: undefined,
156 addReaction: undefined,
157 removeReaction: undefined,
158 }
159 }
160 case ConvoStatus.Disabled:
161 case ConvoStatus.Suspended:
162 case ConvoStatus.Backgrounded:
163 case ConvoStatus.Ready: {
164 return {
165 status: this.status,
166 items: this.getItems(),
167 convo: this.convo!,
168 error: undefined,
169 sender: this.sender!,
170 recipients: this.recipients!,
171 isFetchingHistory: this.isFetchingHistory,
172 deleteMessage: this.deleteMessage,
173 sendMessage: this.sendMessage,
174 fetchMessageHistory: this.fetchMessageHistory,
175 markConvoAccepted: this.markConvoAccepted,
176 addReaction: this.addReaction,
177 removeReaction: this.removeReaction,
178 }
179 }
180 case ConvoStatus.Error: {
181 return {
182 status: ConvoStatus.Error,
183 items: [],
184 convo: undefined,
185 error: this.error!,
186 sender: undefined,
187 recipients: undefined,
188 isFetchingHistory: false,
189 deleteMessage: undefined,
190 sendMessage: undefined,
191 fetchMessageHistory: undefined,
192 markConvoAccepted: undefined,
193 addReaction: undefined,
194 removeReaction: undefined,
195 }
196 }
197 default: {
198 return {
199 status: ConvoStatus.Uninitialized,
200 items: [],
201 convo: this.convo,
202 error: undefined,
203 sender: this.sender,
204 recipients: this.recipients,
205 isFetchingHistory: false,
206 deleteMessage: undefined,
207 sendMessage: undefined,
208 fetchMessageHistory: undefined,
209 markConvoAccepted: undefined,
210 addReaction: undefined,
211 removeReaction: undefined,
212 }
213 }
214 }
215 }
216
217 dispatch(action: ConvoDispatch) {
218 const prevStatus = this.status
219
220 switch (this.status) {
221 case ConvoStatus.Uninitialized: {
222 switch (action.event) {
223 case ConvoDispatchEvent.Init: {
224 this.status = ConvoStatus.Initializing
225 this.setup()
226 this.setupFirehose()
227 this.requestPollInterval(ACTIVE_POLL_INTERVAL)
228 break
229 }
230 }
231 break
232 }
233 case ConvoStatus.Initializing: {
234 switch (action.event) {
235 case ConvoDispatchEvent.Ready: {
236 this.status = ConvoStatus.Ready
237 this.fetchMessageHistory()
238 break
239 }
240 case ConvoDispatchEvent.Background: {
241 this.status = ConvoStatus.Backgrounded
242 this.fetchMessageHistory()
243 this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
244 break
245 }
246 case ConvoDispatchEvent.Suspend: {
247 this.status = ConvoStatus.Suspended
248 this.cleanupFirehoseConnection?.()
249 this.withdrawRequestedPollInterval()
250 break
251 }
252 case ConvoDispatchEvent.Error: {
253 this.status = ConvoStatus.Error
254 this.error = action.payload
255 this.cleanupFirehoseConnection?.()
256 this.withdrawRequestedPollInterval()
257 break
258 }
259 case ConvoDispatchEvent.Disable: {
260 this.status = ConvoStatus.Disabled
261 this.fetchMessageHistory() // finish init
262 this.cleanupFirehoseConnection?.()
263 this.withdrawRequestedPollInterval()
264 break
265 }
266 }
267 break
268 }
269 case ConvoStatus.Ready: {
270 switch (action.event) {
271 case ConvoDispatchEvent.Resume: {
272 this.refreshConvo()
273 this.requestPollInterval(ACTIVE_POLL_INTERVAL)
274 break
275 }
276 case ConvoDispatchEvent.Background: {
277 this.status = ConvoStatus.Backgrounded
278 this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
279 break
280 }
281 case ConvoDispatchEvent.Suspend: {
282 this.status = ConvoStatus.Suspended
283 this.cleanupFirehoseConnection?.()
284 this.withdrawRequestedPollInterval()
285 break
286 }
287 case ConvoDispatchEvent.Error: {
288 this.status = ConvoStatus.Error
289 this.error = action.payload
290 this.cleanupFirehoseConnection?.()
291 this.withdrawRequestedPollInterval()
292 break
293 }
294 case ConvoDispatchEvent.Disable: {
295 this.status = ConvoStatus.Disabled
296 this.cleanupFirehoseConnection?.()
297 this.withdrawRequestedPollInterval()
298 break
299 }
300 }
301 break
302 }
303 case ConvoStatus.Backgrounded: {
304 switch (action.event) {
305 case ConvoDispatchEvent.Resume: {
306 if (this.wasChatInactive()) {
307 this.reset()
308 } else {
309 if (this.convo) {
310 this.status = ConvoStatus.Ready
311 this.refreshConvo()
312 this.maybeRecoverFromNetworkError()
313 } else {
314 this.status = ConvoStatus.Initializing
315 this.setup()
316 }
317 this.requestPollInterval(ACTIVE_POLL_INTERVAL)
318 }
319 break
320 }
321 case ConvoDispatchEvent.Suspend: {
322 this.status = ConvoStatus.Suspended
323 this.cleanupFirehoseConnection?.()
324 this.withdrawRequestedPollInterval()
325 break
326 }
327 case ConvoDispatchEvent.Error: {
328 this.status = ConvoStatus.Error
329 this.error = action.payload
330 this.cleanupFirehoseConnection?.()
331 this.withdrawRequestedPollInterval()
332 break
333 }
334 case ConvoDispatchEvent.Disable: {
335 this.status = ConvoStatus.Disabled
336 this.cleanupFirehoseConnection?.()
337 this.withdrawRequestedPollInterval()
338 break
339 }
340 }
341 break
342 }
343 case ConvoStatus.Suspended: {
344 switch (action.event) {
345 case ConvoDispatchEvent.Init: {
346 this.reset()
347 break
348 }
349 case ConvoDispatchEvent.Resume: {
350 this.reset()
351 break
352 }
353 case ConvoDispatchEvent.Error: {
354 this.status = ConvoStatus.Error
355 this.error = action.payload
356 break
357 }
358 case ConvoDispatchEvent.Disable: {
359 this.status = ConvoStatus.Disabled
360 break
361 }
362 }
363 break
364 }
365 case ConvoStatus.Error: {
366 switch (action.event) {
367 case ConvoDispatchEvent.Init: {
368 this.reset()
369 break
370 }
371 case ConvoDispatchEvent.Resume: {
372 this.reset()
373 break
374 }
375 case ConvoDispatchEvent.Suspend: {
376 this.status = ConvoStatus.Suspended
377 break
378 }
379 case ConvoDispatchEvent.Error: {
380 this.status = ConvoStatus.Error
381 this.error = action.payload
382 break
383 }
384 case ConvoDispatchEvent.Disable: {
385 this.status = ConvoStatus.Disabled
386 break
387 }
388 }
389 break
390 }
391 case ConvoStatus.Disabled: {
392 // can't do anything
393 break
394 }
395 default:
396 break
397 }
398
399 logger.debug(`dispatch '${action.event}'`, {
400 id: this.id,
401 prev: prevStatus,
402 next: this.status,
403 })
404
405 this.updateLastActiveTimestamp()
406 this.commit()
407 }
408
409 private reset() {
410 this.convo = undefined
411 this.sender = undefined
412 this.recipients = undefined
413 this.snapshot = undefined
414
415 this.status = ConvoStatus.Uninitialized
416 this.error = undefined
417 this.oldestRev = undefined
418 this.latestRev = undefined
419
420 this.pastMessages = new Map()
421 this.newMessages = new Map()
422 this.pendingMessages = new Map()
423 this.deletedMessages = new Set()
424
425 this.pendingMessageFailure = null
426 this.fetchMessageHistoryError = undefined
427 this.firehoseError = undefined
428
429 this.dispatch({event: ConvoDispatchEvent.Init})
430 }
431
432 maybeRecoverFromNetworkError() {
433 if (this.firehoseError) {
434 this.firehoseError.retry()
435 this.firehoseError = undefined
436 this.commit()
437 } else {
438 this.batchRetryPendingMessages()
439 }
440
441 if (this.fetchMessageHistoryError) {
442 this.fetchMessageHistoryError.retry()
443 this.fetchMessageHistoryError = undefined
444 this.commit()
445 }
446 }
447
448 /**
449 * Initialises the convo with placeholder data, if provided. We still refetch it before rendering the convo,
450 * but this allows us to render the convo header immediately.
451 */
452 private setupPlaceholderData(
453 data: NonNullable<ConvoParams['placeholderData']>,
454 ) {
455 this.convo = data.convo
456 this.sender = data.convo.members.find(m => m.did === this.senderUserDid)
457 this.recipients = data.convo.members.filter(
458 m => m.did !== this.senderUserDid,
459 )
460 }
461
462 private async setup() {
463 try {
464 const {convo, sender, recipients} = await this.fetchConvo()
465
466 this.convo = convo
467 this.sender = sender
468 this.recipients = recipients
469
470 /*
471 * Some validation prior to `Ready` status
472 */
473 if (!this.convo) {
474 throw new Error('could not find convo')
475 }
476 if (!this.sender) {
477 throw new Error('could not find sender in convo')
478 }
479 if (!this.recipients) {
480 throw new Error('could not find recipients in convo')
481 }
482
483 const userIsDisabled = Boolean(this.sender.chatDisabled)
484
485 if (userIsDisabled) {
486 this.dispatch({event: ConvoDispatchEvent.Disable})
487 } else {
488 this.dispatch({event: ConvoDispatchEvent.Ready})
489 }
490 } catch (e: any) {
491 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) {
492 logger.error('setup failed', {
493 safeMessage: e.message,
494 })
495 }
496
497 this.dispatch({
498 event: ConvoDispatchEvent.Error,
499 payload: {
500 exception: e,
501 code: ConvoErrorCode.InitFailed,
502 retry: () => {
503 this.reset()
504 },
505 },
506 })
507 this.commit()
508 }
509 }
510
511 init() {
512 this.dispatch({event: ConvoDispatchEvent.Init})
513 }
514
515 resume() {
516 this.dispatch({event: ConvoDispatchEvent.Resume})
517 }
518
519 background() {
520 this.dispatch({event: ConvoDispatchEvent.Background})
521 }
522
523 suspend() {
524 this.dispatch({event: ConvoDispatchEvent.Suspend})
525 }
526
527 /**
528 * Called on any state transition, like when the chat is backgrounded. This
529 * value is then checked on background -> foreground transitions.
530 */
531 private updateLastActiveTimestamp() {
532 this.lastActiveTimestamp = Date.now()
533 }
534 private wasChatInactive() {
535 if (!this.lastActiveTimestamp) return true
536 return Date.now() - this.lastActiveTimestamp > INACTIVE_TIMEOUT
537 }
538
539 private requestedPollInterval: (() => void) | undefined
540 private requestPollInterval(interval: number) {
541 this.withdrawRequestedPollInterval()
542 this.requestedPollInterval = this.events.requestPollInterval(interval)
543 }
544 private withdrawRequestedPollInterval() {
545 if (this.requestedPollInterval) {
546 this.requestedPollInterval()
547 }
548 }
549
550 private pendingFetchConvo:
551 | Promise<{
552 convo: ChatBskyConvoDefs.ConvoView
553 sender: ChatBskyActorDefs.ProfileViewBasic | undefined
554 recipients: ChatBskyActorDefs.ProfileViewBasic[]
555 }>
556 | undefined
557 async fetchConvo() {
558 if (this.pendingFetchConvo) return this.pendingFetchConvo
559
560 this.pendingFetchConvo = new Promise<{
561 convo: ChatBskyConvoDefs.ConvoView
562 sender: ChatBskyActorDefs.ProfileViewBasic | undefined
563 recipients: ChatBskyActorDefs.ProfileViewBasic[]
564 }>(async (resolve, reject) => {
565 try {
566 const response = await networkRetry(2, () => {
567 return this.agent.api.chat.bsky.convo.getConvo(
568 {
569 convoId: this.convoId,
570 },
571 {headers: DM_SERVICE_HEADERS},
572 )
573 })
574
575 const convo = response.data.convo
576
577 resolve({
578 convo,
579 sender: convo.members.find(m => m.did === this.senderUserDid),
580 recipients: convo.members.filter(m => m.did !== this.senderUserDid),
581 })
582 } catch (e) {
583 reject(e)
584 } finally {
585 this.pendingFetchConvo = undefined
586 }
587 })
588
589 return this.pendingFetchConvo
590 }
591
592 async refreshConvo() {
593 try {
594 const {convo, sender, recipients} = await this.fetchConvo()
595 // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE')
596 this.convo = convo || this.convo
597 this.sender = sender || this.sender
598 this.recipients = recipients || this.recipients
599 } catch (e: any) {
600 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) {
601 logger.error(`failed to refresh convo`, {
602 safeMessage: e.message,
603 })
604 }
605 }
606 }
607
608 private fetchMessageHistoryError:
609 | {
610 retry: () => void
611 }
612 | undefined
613 async fetchMessageHistory() {
614 logger.debug('fetch message history', {})
615
616 /*
617 * If oldestRev is null, we've fetched all history.
618 */
619 if (this.oldestRev === null) return
620
621 /*
622 * Don't fetch again if a fetch is already in progress
623 */
624 if (this.isFetchingHistory) return
625
626 /*
627 * If we've rendered a retry state for history fetching, exit. Upon retry,
628 * this will be removed and we'll try again.
629 */
630 if (this.fetchMessageHistoryError) return
631
632 try {
633 this.isFetchingHistory = true
634 this.commit()
635
636 const nextCursor = this.oldestRev // for TS
637 const response = await networkRetry(2, () => {
638 return this.agent.api.chat.bsky.convo.getMessages(
639 {
640 cursor: nextCursor,
641 convoId: this.convoId,
642 limit: IS_NATIVE ? 30 : 60,
643 },
644 {headers: DM_SERVICE_HEADERS},
645 )
646 })
647 const {cursor, messages} = response.data
648
649 this.oldestRev = cursor ?? null
650
651 for (const message of messages) {
652 if (
653 ChatBskyConvoDefs.isMessageView(message) ||
654 ChatBskyConvoDefs.isDeletedMessageView(message)
655 ) {
656 /*
657 * If this message is already in new messages, it was added by the
658 * firehose ingestion, and we can safely overwrite it. This trusts
659 * the server on ordering, and keeps it in sync.
660 */
661 if (this.newMessages.has(message.id)) {
662 this.newMessages.delete(message.id)
663 }
664 this.pastMessages.set(message.id, message)
665 }
666 }
667 } catch (e: any) {
668 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) {
669 logger.error('failed to fetch message history', {
670 safeMessage: e.message,
671 })
672 }
673
674 this.fetchMessageHistoryError = {
675 retry: () => {
676 this.fetchMessageHistory()
677 },
678 }
679 } finally {
680 this.isFetchingHistory = false
681 this.commit()
682 }
683 }
684
685 private cleanupFirehoseConnection: (() => void) | undefined
686 private setupFirehose() {
687 // remove old listeners, if exist
688 this.cleanupFirehoseConnection?.()
689
690 // reconnect
691 this.cleanupFirehoseConnection = this.events.on(
692 event => {
693 switch (event.type) {
694 case 'connect': {
695 this.onFirehoseConnect()
696 break
697 }
698 case 'error': {
699 this.onFirehoseError(event.error)
700 break
701 }
702 case 'logs': {
703 this.ingestFirehose(event.logs)
704 break
705 }
706 }
707 },
708 /*
709 * This is VERY important — we only want events for this convo.
710 */
711 {convoId: this.convoId},
712 )
713 }
714
715 private firehoseError: MessagesEventBusError | undefined
716
717 onFirehoseConnect() {
718 this.firehoseError = undefined
719 this.batchRetryPendingMessages()
720 this.commit()
721 }
722
723 onFirehoseError(error?: MessagesEventBusError) {
724 this.firehoseError = error
725 this.commit()
726 }
727
728 ingestFirehose(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
729 let needsCommit = false
730
731 for (const ev of events) {
732 /*
733 * If there's a rev, we should handle it. If there's not a rev, we don't
734 * know what it is.
735 */
736 if ('rev' in ev && typeof ev.rev === 'string') {
737 const isUninitialized = !this.latestRev
738 const isNewEvent = this.latestRev && ev.rev > this.latestRev
739
740 /*
741 * We received an event prior to fetching any history, so we can safely
742 * use this as the initial history cursor
743 */
744 if (this.oldestRev === undefined && isUninitialized) {
745 this.oldestRev = ev.rev
746 }
747
748 /*
749 * We only care about new events
750 */
751 if (isNewEvent || isUninitialized) {
752 /*
753 * Update rev regardless of if it's a ev type we care about or not
754 */
755 this.latestRev = ev.rev
756
757 if (
758 ChatBskyConvoDefs.isLogCreateMessage(ev) &&
759 ChatBskyConvoDefs.isMessageView(ev.message)
760 ) {
761 /**
762 * If this message is already in new messages, it was added by our
763 * sending logic, and is based on client-ordering. When we receive
764 * the "commited" event from the log, we should replace this
765 * reference and re-insert in order to respect the order we receied
766 * from the log.
767 */
768 if (this.newMessages.has(ev.message.id)) {
769 this.newMessages.delete(ev.message.id)
770 }
771 this.newMessages.set(ev.message.id, ev.message)
772 needsCommit = true
773 } else if (
774 ChatBskyConvoDefs.isLogDeleteMessage(ev) &&
775 ChatBskyConvoDefs.isDeletedMessageView(ev.message)
776 ) {
777 /*
778 * Update if we have this in state. If we don't, don't worry about it.
779 */
780 if (
781 this.pastMessages.has(ev.message.id) ||
782 this.newMessages.has(ev.message.id)
783 ) {
784 this.pastMessages.delete(ev.message.id)
785 this.newMessages.delete(ev.message.id)
786 this.deletedMessages.delete(ev.message.id)
787 needsCommit = true
788 }
789 } else if (
790 (ChatBskyConvoDefs.isLogAddReaction(ev) ||
791 ChatBskyConvoDefs.isLogRemoveReaction(ev)) &&
792 ChatBskyConvoDefs.isMessageView(ev.message)
793 ) {
794 /*
795 * Update if we have this in state - replace message wholesale. If we don't, don't worry about it.
796 */
797 if (this.pastMessages.has(ev.message.id)) {
798 this.pastMessages.set(ev.message.id, ev.message)
799 needsCommit = true
800 }
801 if (this.newMessages.has(ev.message.id)) {
802 this.newMessages.set(ev.message.id, ev.message)
803 needsCommit = true
804 }
805 }
806 }
807 }
808 }
809
810 if (needsCommit) {
811 this.commit()
812 }
813 }
814
815 private pendingMessageFailure: 'recoverable' | 'unrecoverable' | null = null
816
817 sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
818 // Ignore empty messages for now since they have no other purpose atm
819 if (!message.text.trim() && !message.embed) return
820
821 logger.debug('send message', {})
822
823 const tempId = nanoid()
824
825 this.pendingMessageFailure = null
826 this.pendingMessages.set(tempId, {
827 id: tempId,
828 message,
829 })
830 if (this.convo?.status === 'request') {
831 this.convo = {
832 ...this.convo,
833 status: 'accepted',
834 }
835 }
836 this.commit()
837
838 if (!this.isProcessingPendingMessages && !this.pendingMessageFailure) {
839 this.processPendingMessages()
840 }
841 }
842
843 markConvoAccepted() {
844 if (this.convo) {
845 this.convo = {
846 ...this.convo,
847 status: 'accepted',
848 }
849 }
850 this.commit()
851 }
852
853 async processPendingMessages() {
854 logger.debug(
855 `processing messages (${this.pendingMessages.size} remaining)`,
856 {},
857 )
858
859 const pendingMessage = Array.from(this.pendingMessages.values()).shift()
860
861 /*
862 * If there are no pending messages, we're done.
863 */
864 if (!pendingMessage) {
865 this.isProcessingPendingMessages = false
866 return
867 }
868
869 try {
870 this.isProcessingPendingMessages = true
871
872 const {id, message} = pendingMessage
873
874 const response = await this.agent.api.chat.bsky.convo.sendMessage(
875 {
876 convoId: this.convoId,
877 message,
878 },
879 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
880 )
881 const res = response.data
882
883 // remove from queue
884 this.pendingMessages.delete(id)
885
886 /*
887 * Insert into `newMessages` as soon as we have a real ID. That way, when
888 * we get an event log back, we can replace in situ.
889 */
890 this.newMessages.set(res.id, {
891 ...res,
892 $type: 'chat.bsky.convo.defs#messageView',
893 })
894 // render new message state, prior to firehose
895 this.commit()
896
897 // continue queue processing
898 await this.processPendingMessages()
899 } catch (e: any) {
900 this.handleSendMessageFailure(e)
901 this.isProcessingPendingMessages = false
902 }
903 }
904
905 private handleSendMessageFailure(e: any) {
906 if (e instanceof XRPCError) {
907 if (NETWORK_FAILURE_STATUSES.includes(e.status)) {
908 this.pendingMessageFailure = 'recoverable'
909 } else {
910 this.pendingMessageFailure = 'unrecoverable'
911
912 switch (e.message) {
913 case 'block between recipient and sender':
914 this.emitter.emit('event', {
915 type: 'invalidate-block-state',
916 accountDids: [
917 this.sender!.did,
918 ...this.recipients!.map(r => r.did),
919 ],
920 })
921 break
922 case 'Account is disabled':
923 this.dispatch({event: ConvoDispatchEvent.Disable})
924 break
925 case 'Convo not found':
926 case 'Account does not exist':
927 case 'recipient does not exist':
928 case 'recipient requires incoming messages to come from someone they follow':
929 case 'recipient has disabled incoming messages':
930 break
931 default:
932 if (!isNetworkError(e)) {
933 logger.warn(`handleSendMessageFailure could not handle error`, {
934 status: e.status,
935 message: e.message,
936 })
937 }
938 break
939 }
940 }
941 } else {
942 this.pendingMessageFailure = 'unrecoverable'
943
944 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) {
945 logger.error(`handleSendMessageFailure received unknown error`, {
946 safeMessage: e.message,
947 })
948 }
949 }
950
951 this.commit()
952 }
953
954 async batchRetryPendingMessages() {
955 if (this.pendingMessageFailure === null) return
956
957 const messageArray = Array.from(this.pendingMessages.values())
958 if (messageArray.length === 0) return
959
960 this.pendingMessageFailure = null
961 this.commit()
962
963 logger.debug(
964 `batch retrying ${this.pendingMessages.size} pending messages`,
965 {},
966 )
967
968 try {
969 const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch(
970 {
971 items: messageArray.map(({message}) => ({
972 convoId: this.convoId,
973 message,
974 })),
975 },
976 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
977 )
978 const {items} = data
979
980 /*
981 * Insert into `newMessages` as soon as we have a real ID. That way, when
982 * we get an event log back, we can replace in situ.
983 */
984 for (const item of items) {
985 this.newMessages.set(item.id, {
986 ...item,
987 $type: 'chat.bsky.convo.defs#messageView',
988 })
989 }
990
991 for (const pendingMessage of messageArray) {
992 this.pendingMessages.delete(pendingMessage.id)
993 }
994
995 this.commit()
996
997 logger.debug(`sent ${this.pendingMessages.size} pending messages`, {})
998 } catch (e: any) {
999 this.handleSendMessageFailure(e)
1000 }
1001 }
1002
1003 async deleteMessage(messageId: string) {
1004 logger.debug('delete message', {})
1005
1006 this.deletedMessages.add(messageId)
1007 this.commit()
1008
1009 try {
1010 await networkRetry(2, () => {
1011 return this.agent.api.chat.bsky.convo.deleteMessageForSelf(
1012 {
1013 convoId: this.convoId,
1014 messageId,
1015 },
1016 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
1017 )
1018 })
1019 } catch (e: any) {
1020 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) {
1021 logger.error(`failed to delete message`, {
1022 safeMessage: e.message,
1023 })
1024 }
1025 this.deletedMessages.delete(messageId)
1026 this.commit()
1027 throw e
1028 }
1029 }
1030
1031 on(handler: (event: ConvoEvent) => void) {
1032 this.emitter.on('event', handler)
1033
1034 return () => {
1035 this.emitter.off('event', handler)
1036 }
1037 }
1038
1039 /*
1040 * Items in reverse order, since FlatList inverts
1041 */
1042 getItems(): ConvoItem[] {
1043 const items: ConvoItem[] = []
1044
1045 this.pastMessages.forEach(m => {
1046 if (ChatBskyConvoDefs.isMessageView(m)) {
1047 items.unshift({
1048 type: 'message',
1049 key: m.id,
1050 message: m,
1051 nextMessage: null,
1052 prevMessage: null,
1053 })
1054 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
1055 items.unshift({
1056 type: 'deleted-message',
1057 key: m.id,
1058 message: m,
1059 nextMessage: null,
1060 prevMessage: null,
1061 })
1062 }
1063 })
1064
1065 if (this.fetchMessageHistoryError) {
1066 items.unshift({
1067 type: 'error',
1068 code: ConvoItemError.HistoryFailed,
1069 key: ConvoItemError.HistoryFailed,
1070 retry: () => {
1071 this.maybeRecoverFromNetworkError()
1072 },
1073 })
1074 }
1075
1076 this.newMessages.forEach(m => {
1077 if (ChatBskyConvoDefs.isMessageView(m)) {
1078 items.push({
1079 type: 'message',
1080 key: m.id,
1081 message: m,
1082 nextMessage: null,
1083 prevMessage: null,
1084 })
1085 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
1086 items.push({
1087 type: 'deleted-message',
1088 key: m.id,
1089 message: m,
1090 nextMessage: null,
1091 prevMessage: null,
1092 })
1093 }
1094 })
1095
1096 this.pendingMessages.forEach(m => {
1097 items.push({
1098 type: 'pending-message',
1099 key: m.id,
1100 message: {
1101 ...m.message,
1102 embed: undefined,
1103 $type: 'chat.bsky.convo.defs#messageView',
1104 id: nanoid(),
1105 rev: '__fake__',
1106 sentAt: new Date().toISOString(),
1107 /*
1108 * `getItems` is only run in "active" status states, where
1109 * `this.sender` is defined
1110 */
1111 sender: {
1112 $type: 'chat.bsky.convo.defs#messageViewSender',
1113 did: this.sender!.did,
1114 },
1115 },
1116 nextMessage: null,
1117 prevMessage: null,
1118 failed: this.pendingMessageFailure !== null,
1119 retry:
1120 this.pendingMessageFailure === 'recoverable'
1121 ? () => {
1122 this.maybeRecoverFromNetworkError()
1123 }
1124 : undefined,
1125 })
1126 })
1127
1128 if (this.firehoseError) {
1129 items.push({
1130 type: 'error',
1131 code: ConvoItemError.FirehoseFailed,
1132 key: ConvoItemError.FirehoseFailed,
1133 retry: () => {
1134 this.firehoseError?.retry()
1135 },
1136 })
1137 }
1138
1139 return items
1140 .filter(item => {
1141 if (isConvoItemMessage(item)) {
1142 return !this.deletedMessages.has(item.message.id)
1143 }
1144 return true
1145 })
1146 .map((item, i, arr) => {
1147 let nextMessage = null
1148 let prevMessage = null
1149 const isMessage = isConvoItemMessage(item)
1150
1151 if (isMessage) {
1152 if (
1153 ChatBskyConvoDefs.isMessageView(item.message) ||
1154 ChatBskyConvoDefs.isDeletedMessageView(item.message)
1155 ) {
1156 const next = arr[i + 1]
1157
1158 if (
1159 isConvoItemMessage(next) &&
1160 (ChatBskyConvoDefs.isMessageView(next.message) ||
1161 ChatBskyConvoDefs.isDeletedMessageView(next.message))
1162 ) {
1163 nextMessage = next.message
1164 }
1165
1166 const prev = arr[i - 1]
1167
1168 if (
1169 isConvoItemMessage(prev) &&
1170 (ChatBskyConvoDefs.isMessageView(prev.message) ||
1171 ChatBskyConvoDefs.isDeletedMessageView(prev.message))
1172 ) {
1173 prevMessage = prev.message
1174 }
1175 }
1176
1177 return {
1178 ...item,
1179 nextMessage,
1180 prevMessage,
1181 }
1182 }
1183
1184 return item
1185 })
1186 }
1187
1188 /**
1189 * Add an emoji reaction to a message
1190 *
1191 * @param messageId - the id of the message to add the reaction to
1192 * @param emoji - must be one grapheme
1193 */
1194 async addReaction(messageId: string, emoji: string) {
1195 const optimisticReaction = {
1196 value: emoji,
1197 sender: {did: this.senderUserDid},
1198 createdAt: new Date().toISOString(),
1199 }
1200 let restore: null | (() => void) = null
1201 if (this.pastMessages.has(messageId)) {
1202 const prevMessage = this.pastMessages.get(messageId)
1203 if (
1204 ChatBskyConvoDefs.isMessageView(prevMessage) &&
1205 // skip optimistic update if reaction already exists
1206 !prevMessage.reactions?.find(
1207 reaction =>
1208 reaction.sender.did === this.senderUserDid &&
1209 reaction.value === emoji,
1210 )
1211 ) {
1212 if (prevMessage.reactions) {
1213 if (
1214 prevMessage.reactions.filter(
1215 reaction => reaction.sender.did === this.senderUserDid,
1216 ).length >= 5
1217 ) {
1218 throw new Error('Maximum reactions reached')
1219 }
1220 }
1221 this.pastMessages.set(messageId, {
1222 ...prevMessage,
1223 reactions: [...(prevMessage.reactions ?? []), optimisticReaction],
1224 })
1225 this.commit()
1226 restore = () => {
1227 this.pastMessages.set(messageId, prevMessage)
1228 this.commit()
1229 }
1230 }
1231 } else if (this.newMessages.has(messageId)) {
1232 const prevMessage = this.newMessages.get(messageId)
1233 if (
1234 ChatBskyConvoDefs.isMessageView(prevMessage) &&
1235 !prevMessage.reactions?.find(reaction => reaction.value === emoji)
1236 ) {
1237 if (prevMessage.reactions && prevMessage.reactions.length >= 5)
1238 throw new Error('Maximum reactions reached')
1239 this.newMessages.set(messageId, {
1240 ...prevMessage,
1241 reactions: [...(prevMessage.reactions ?? []), optimisticReaction],
1242 })
1243 this.commit()
1244 restore = () => {
1245 this.newMessages.set(messageId, prevMessage)
1246 this.commit()
1247 }
1248 }
1249 }
1250
1251 try {
1252 logger.debug(`Adding reaction ${emoji} to message ${messageId}`)
1253 const {data} = await this.agent.chat.bsky.convo.addReaction(
1254 {messageId, value: emoji, convoId: this.convoId},
1255 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
1256 )
1257 if (ChatBskyConvoDefs.isMessageView(data.message)) {
1258 if (this.pastMessages.has(messageId)) {
1259 this.pastMessages.set(messageId, data.message)
1260 this.commit()
1261 } else if (this.newMessages.has(messageId)) {
1262 this.newMessages.set(messageId, data.message)
1263 this.commit()
1264 }
1265 }
1266 } catch (error) {
1267 if (restore) restore()
1268 throw error
1269 }
1270 }
1271
1272 /*
1273 * Remove a reaction from a message.
1274 *
1275 * @param messageId - The ID of the message to remove the reaction from.
1276 * @param emoji - The emoji to remove.
1277 */
1278 async removeReaction(messageId: string, emoji: string) {
1279 let restore: null | (() => void) = null
1280 if (this.pastMessages.has(messageId)) {
1281 const prevMessage = this.pastMessages.get(messageId)
1282 if (ChatBskyConvoDefs.isMessageView(prevMessage)) {
1283 this.pastMessages.set(messageId, {
1284 ...prevMessage,
1285 reactions: prevMessage.reactions?.filter(
1286 reaction =>
1287 reaction.value !== emoji ||
1288 reaction.sender.did !== this.senderUserDid,
1289 ),
1290 })
1291 this.commit()
1292 restore = () => {
1293 this.pastMessages.set(messageId, prevMessage)
1294 this.commit()
1295 }
1296 }
1297 } else if (this.newMessages.has(messageId)) {
1298 const prevMessage = this.newMessages.get(messageId)
1299 if (ChatBskyConvoDefs.isMessageView(prevMessage)) {
1300 this.newMessages.set(messageId, {
1301 ...prevMessage,
1302 reactions: prevMessage.reactions?.filter(
1303 reaction =>
1304 reaction.value !== emoji ||
1305 reaction.sender.did !== this.senderUserDid,
1306 ),
1307 })
1308 this.commit()
1309 restore = () => {
1310 this.newMessages.set(messageId, prevMessage)
1311 this.commit()
1312 }
1313 }
1314 }
1315
1316 try {
1317 logger.debug(`Removing reaction ${emoji} from message ${messageId}`)
1318 await this.agent.chat.bsky.convo.removeReaction(
1319 {messageId, value: emoji, convoId: this.convoId},
1320 {encoding: 'application/json', headers: DM_SERVICE_HEADERS},
1321 )
1322 } catch (error) {
1323 if (restore) restore()
1324 throw error
1325 }
1326 }
1327}