Bluesky app fork with some witchin' additions 💫
at main 1327 lines 38 kB view raw
1import { 2 type AtpAgent, 3 type ChatBskyActorDefs, 4 ChatBskyConvoDefs, 5 type ChatBskyConvoGetLog, 6 type ChatBskyConvoSendMessage, 7} from '@atproto/api' 8import {XRPCError} from '@atproto/xrpc' 9import EventEmitter from 'eventemitter3' 10import {nanoid} from 'nanoid/non-secure' 11 12import {networkRetry} from '#/lib/async/retry' 13import {DM_SERVICE_HEADERS} from '#/lib/constants' 14import { 15 isErrorMaybeAppPasswordPermissions, 16 isNetworkError, 17} from '#/lib/strings/errors' 18import {Logger} from '#/logger' 19import { 20 ACTIVE_POLL_INTERVAL, 21 BACKGROUND_POLL_INTERVAL, 22 INACTIVE_TIMEOUT, 23 NETWORK_FAILURE_STATUSES, 24} from '#/state/messages/convo/const' 25import { 26 type ConvoDispatch, 27 ConvoDispatchEvent, 28 type ConvoError, 29 ConvoErrorCode, 30 type ConvoEvent, 31 type ConvoItem, 32 ConvoItemError, 33 type ConvoParams, 34 type ConvoState, 35 ConvoStatus, 36} from '#/state/messages/convo/types' 37import {type MessagesEventBus} from '#/state/messages/events/agent' 38import {type MessagesEventBusError} from '#/state/messages/events/types' 39import {IS_NATIVE} from '#/env' 40 41const logger = Logger.create(Logger.Context.ConversationAgent) 42 43export function isConvoItemMessage( 44 item: ConvoItem, 45): item is ConvoItem & {type: 'message'} { 46 if (!item) return false 47 return ( 48 item.type === 'message' || 49 item.type === 'deleted-message' || 50 item.type === 'pending-message' 51 ) 52} 53 54export class Convo { 55 private id: string 56 57 private agent: AtpAgent 58 private events: MessagesEventBus 59 private senderUserDid: string 60 61 private status: ConvoStatus = ConvoStatus.Uninitialized 62 private error: ConvoError | undefined 63 private oldestRev: string | undefined | null = undefined 64 private isFetchingHistory = false 65 private latestRev: string | undefined = undefined 66 67 private pastMessages: Map< 68 string, 69 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView 70 > = new Map() 71 private newMessages: Map< 72 string, 73 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView 74 > = new Map() 75 private pendingMessages: Map< 76 string, 77 {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} 78 > = new Map() 79 private deletedMessages: Set<string> = new Set() 80 81 private isProcessingPendingMessages = false 82 83 private lastActiveTimestamp: number | undefined 84 85 private emitter = new EventEmitter<{event: [ConvoEvent]}>() 86 87 convoId: string 88 convo: ChatBskyConvoDefs.ConvoView | undefined 89 sender: ChatBskyActorDefs.ProfileViewBasic | undefined 90 recipients: ChatBskyActorDefs.ProfileViewBasic[] | undefined 91 snapshot: ConvoState | undefined 92 93 constructor(params: ConvoParams) { 94 this.id = nanoid(3) 95 this.convoId = params.convoId 96 this.agent = params.agent 97 this.events = params.events 98 this.senderUserDid = params.agent.assertDid 99 100 if (params.placeholderData) { 101 this.setupPlaceholderData(params.placeholderData) 102 } 103 104 this.subscribe = this.subscribe.bind(this) 105 this.getSnapshot = this.getSnapshot.bind(this) 106 this.sendMessage = this.sendMessage.bind(this) 107 this.deleteMessage = this.deleteMessage.bind(this) 108 this.fetchMessageHistory = this.fetchMessageHistory.bind(this) 109 this.ingestFirehose = this.ingestFirehose.bind(this) 110 this.onFirehoseConnect = this.onFirehoseConnect.bind(this) 111 this.onFirehoseError = this.onFirehoseError.bind(this) 112 this.markConvoAccepted = this.markConvoAccepted.bind(this) 113 this.addReaction = this.addReaction.bind(this) 114 this.removeReaction = this.removeReaction.bind(this) 115 } 116 117 private commit() { 118 this.snapshot = undefined 119 this.subscribers.forEach(subscriber => subscriber()) 120 } 121 122 private subscribers: (() => void)[] = [] 123 124 subscribe(subscriber: () => void) { 125 if (this.subscribers.length === 0) this.init() 126 127 this.subscribers.push(subscriber) 128 129 return () => { 130 this.subscribers = this.subscribers.filter(s => s !== subscriber) 131 if (this.subscribers.length === 0) this.suspend() 132 } 133 } 134 135 getSnapshot(): ConvoState { 136 if (!this.snapshot) this.snapshot = this.generateSnapshot() 137 // logger.debug('snapshotted', {}) 138 return this.snapshot 139 } 140 141 private generateSnapshot(): ConvoState { 142 switch (this.status) { 143 case ConvoStatus.Initializing: { 144 return { 145 status: ConvoStatus.Initializing, 146 items: [], 147 convo: this.convo, 148 error: undefined, 149 sender: this.sender, 150 recipients: this.recipients, 151 isFetchingHistory: this.isFetchingHistory, 152 deleteMessage: undefined, 153 sendMessage: undefined, 154 fetchMessageHistory: undefined, 155 markConvoAccepted: undefined, 156 addReaction: undefined, 157 removeReaction: undefined, 158 } 159 } 160 case ConvoStatus.Disabled: 161 case ConvoStatus.Suspended: 162 case ConvoStatus.Backgrounded: 163 case ConvoStatus.Ready: { 164 return { 165 status: this.status, 166 items: this.getItems(), 167 convo: this.convo!, 168 error: undefined, 169 sender: this.sender!, 170 recipients: this.recipients!, 171 isFetchingHistory: this.isFetchingHistory, 172 deleteMessage: this.deleteMessage, 173 sendMessage: this.sendMessage, 174 fetchMessageHistory: this.fetchMessageHistory, 175 markConvoAccepted: this.markConvoAccepted, 176 addReaction: this.addReaction, 177 removeReaction: this.removeReaction, 178 } 179 } 180 case ConvoStatus.Error: { 181 return { 182 status: ConvoStatus.Error, 183 items: [], 184 convo: undefined, 185 error: this.error!, 186 sender: undefined, 187 recipients: undefined, 188 isFetchingHistory: false, 189 deleteMessage: undefined, 190 sendMessage: undefined, 191 fetchMessageHistory: undefined, 192 markConvoAccepted: undefined, 193 addReaction: undefined, 194 removeReaction: undefined, 195 } 196 } 197 default: { 198 return { 199 status: ConvoStatus.Uninitialized, 200 items: [], 201 convo: this.convo, 202 error: undefined, 203 sender: this.sender, 204 recipients: this.recipients, 205 isFetchingHistory: false, 206 deleteMessage: undefined, 207 sendMessage: undefined, 208 fetchMessageHistory: undefined, 209 markConvoAccepted: undefined, 210 addReaction: undefined, 211 removeReaction: undefined, 212 } 213 } 214 } 215 } 216 217 dispatch(action: ConvoDispatch) { 218 const prevStatus = this.status 219 220 switch (this.status) { 221 case ConvoStatus.Uninitialized: { 222 switch (action.event) { 223 case ConvoDispatchEvent.Init: { 224 this.status = ConvoStatus.Initializing 225 this.setup() 226 this.setupFirehose() 227 this.requestPollInterval(ACTIVE_POLL_INTERVAL) 228 break 229 } 230 } 231 break 232 } 233 case ConvoStatus.Initializing: { 234 switch (action.event) { 235 case ConvoDispatchEvent.Ready: { 236 this.status = ConvoStatus.Ready 237 this.fetchMessageHistory() 238 break 239 } 240 case ConvoDispatchEvent.Background: { 241 this.status = ConvoStatus.Backgrounded 242 this.fetchMessageHistory() 243 this.requestPollInterval(BACKGROUND_POLL_INTERVAL) 244 break 245 } 246 case ConvoDispatchEvent.Suspend: { 247 this.status = ConvoStatus.Suspended 248 this.cleanupFirehoseConnection?.() 249 this.withdrawRequestedPollInterval() 250 break 251 } 252 case ConvoDispatchEvent.Error: { 253 this.status = ConvoStatus.Error 254 this.error = action.payload 255 this.cleanupFirehoseConnection?.() 256 this.withdrawRequestedPollInterval() 257 break 258 } 259 case ConvoDispatchEvent.Disable: { 260 this.status = ConvoStatus.Disabled 261 this.fetchMessageHistory() // finish init 262 this.cleanupFirehoseConnection?.() 263 this.withdrawRequestedPollInterval() 264 break 265 } 266 } 267 break 268 } 269 case ConvoStatus.Ready: { 270 switch (action.event) { 271 case ConvoDispatchEvent.Resume: { 272 this.refreshConvo() 273 this.requestPollInterval(ACTIVE_POLL_INTERVAL) 274 break 275 } 276 case ConvoDispatchEvent.Background: { 277 this.status = ConvoStatus.Backgrounded 278 this.requestPollInterval(BACKGROUND_POLL_INTERVAL) 279 break 280 } 281 case ConvoDispatchEvent.Suspend: { 282 this.status = ConvoStatus.Suspended 283 this.cleanupFirehoseConnection?.() 284 this.withdrawRequestedPollInterval() 285 break 286 } 287 case ConvoDispatchEvent.Error: { 288 this.status = ConvoStatus.Error 289 this.error = action.payload 290 this.cleanupFirehoseConnection?.() 291 this.withdrawRequestedPollInterval() 292 break 293 } 294 case ConvoDispatchEvent.Disable: { 295 this.status = ConvoStatus.Disabled 296 this.cleanupFirehoseConnection?.() 297 this.withdrawRequestedPollInterval() 298 break 299 } 300 } 301 break 302 } 303 case ConvoStatus.Backgrounded: { 304 switch (action.event) { 305 case ConvoDispatchEvent.Resume: { 306 if (this.wasChatInactive()) { 307 this.reset() 308 } else { 309 if (this.convo) { 310 this.status = ConvoStatus.Ready 311 this.refreshConvo() 312 this.maybeRecoverFromNetworkError() 313 } else { 314 this.status = ConvoStatus.Initializing 315 this.setup() 316 } 317 this.requestPollInterval(ACTIVE_POLL_INTERVAL) 318 } 319 break 320 } 321 case ConvoDispatchEvent.Suspend: { 322 this.status = ConvoStatus.Suspended 323 this.cleanupFirehoseConnection?.() 324 this.withdrawRequestedPollInterval() 325 break 326 } 327 case ConvoDispatchEvent.Error: { 328 this.status = ConvoStatus.Error 329 this.error = action.payload 330 this.cleanupFirehoseConnection?.() 331 this.withdrawRequestedPollInterval() 332 break 333 } 334 case ConvoDispatchEvent.Disable: { 335 this.status = ConvoStatus.Disabled 336 this.cleanupFirehoseConnection?.() 337 this.withdrawRequestedPollInterval() 338 break 339 } 340 } 341 break 342 } 343 case ConvoStatus.Suspended: { 344 switch (action.event) { 345 case ConvoDispatchEvent.Init: { 346 this.reset() 347 break 348 } 349 case ConvoDispatchEvent.Resume: { 350 this.reset() 351 break 352 } 353 case ConvoDispatchEvent.Error: { 354 this.status = ConvoStatus.Error 355 this.error = action.payload 356 break 357 } 358 case ConvoDispatchEvent.Disable: { 359 this.status = ConvoStatus.Disabled 360 break 361 } 362 } 363 break 364 } 365 case ConvoStatus.Error: { 366 switch (action.event) { 367 case ConvoDispatchEvent.Init: { 368 this.reset() 369 break 370 } 371 case ConvoDispatchEvent.Resume: { 372 this.reset() 373 break 374 } 375 case ConvoDispatchEvent.Suspend: { 376 this.status = ConvoStatus.Suspended 377 break 378 } 379 case ConvoDispatchEvent.Error: { 380 this.status = ConvoStatus.Error 381 this.error = action.payload 382 break 383 } 384 case ConvoDispatchEvent.Disable: { 385 this.status = ConvoStatus.Disabled 386 break 387 } 388 } 389 break 390 } 391 case ConvoStatus.Disabled: { 392 // can't do anything 393 break 394 } 395 default: 396 break 397 } 398 399 logger.debug(`dispatch '${action.event}'`, { 400 id: this.id, 401 prev: prevStatus, 402 next: this.status, 403 }) 404 405 this.updateLastActiveTimestamp() 406 this.commit() 407 } 408 409 private reset() { 410 this.convo = undefined 411 this.sender = undefined 412 this.recipients = undefined 413 this.snapshot = undefined 414 415 this.status = ConvoStatus.Uninitialized 416 this.error = undefined 417 this.oldestRev = undefined 418 this.latestRev = undefined 419 420 this.pastMessages = new Map() 421 this.newMessages = new Map() 422 this.pendingMessages = new Map() 423 this.deletedMessages = new Set() 424 425 this.pendingMessageFailure = null 426 this.fetchMessageHistoryError = undefined 427 this.firehoseError = undefined 428 429 this.dispatch({event: ConvoDispatchEvent.Init}) 430 } 431 432 maybeRecoverFromNetworkError() { 433 if (this.firehoseError) { 434 this.firehoseError.retry() 435 this.firehoseError = undefined 436 this.commit() 437 } else { 438 this.batchRetryPendingMessages() 439 } 440 441 if (this.fetchMessageHistoryError) { 442 this.fetchMessageHistoryError.retry() 443 this.fetchMessageHistoryError = undefined 444 this.commit() 445 } 446 } 447 448 /** 449 * Initialises the convo with placeholder data, if provided. We still refetch it before rendering the convo, 450 * but this allows us to render the convo header immediately. 451 */ 452 private setupPlaceholderData( 453 data: NonNullable<ConvoParams['placeholderData']>, 454 ) { 455 this.convo = data.convo 456 this.sender = data.convo.members.find(m => m.did === this.senderUserDid) 457 this.recipients = data.convo.members.filter( 458 m => m.did !== this.senderUserDid, 459 ) 460 } 461 462 private async setup() { 463 try { 464 const {convo, sender, recipients} = await this.fetchConvo() 465 466 this.convo = convo 467 this.sender = sender 468 this.recipients = recipients 469 470 /* 471 * Some validation prior to `Ready` status 472 */ 473 if (!this.convo) { 474 throw new Error('could not find convo') 475 } 476 if (!this.sender) { 477 throw new Error('could not find sender in convo') 478 } 479 if (!this.recipients) { 480 throw new Error('could not find recipients in convo') 481 } 482 483 const userIsDisabled = Boolean(this.sender.chatDisabled) 484 485 if (userIsDisabled) { 486 this.dispatch({event: ConvoDispatchEvent.Disable}) 487 } else { 488 this.dispatch({event: ConvoDispatchEvent.Ready}) 489 } 490 } catch (e: any) { 491 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) { 492 logger.error('setup failed', { 493 safeMessage: e.message, 494 }) 495 } 496 497 this.dispatch({ 498 event: ConvoDispatchEvent.Error, 499 payload: { 500 exception: e, 501 code: ConvoErrorCode.InitFailed, 502 retry: () => { 503 this.reset() 504 }, 505 }, 506 }) 507 this.commit() 508 } 509 } 510 511 init() { 512 this.dispatch({event: ConvoDispatchEvent.Init}) 513 } 514 515 resume() { 516 this.dispatch({event: ConvoDispatchEvent.Resume}) 517 } 518 519 background() { 520 this.dispatch({event: ConvoDispatchEvent.Background}) 521 } 522 523 suspend() { 524 this.dispatch({event: ConvoDispatchEvent.Suspend}) 525 } 526 527 /** 528 * Called on any state transition, like when the chat is backgrounded. This 529 * value is then checked on background -> foreground transitions. 530 */ 531 private updateLastActiveTimestamp() { 532 this.lastActiveTimestamp = Date.now() 533 } 534 private wasChatInactive() { 535 if (!this.lastActiveTimestamp) return true 536 return Date.now() - this.lastActiveTimestamp > INACTIVE_TIMEOUT 537 } 538 539 private requestedPollInterval: (() => void) | undefined 540 private requestPollInterval(interval: number) { 541 this.withdrawRequestedPollInterval() 542 this.requestedPollInterval = this.events.requestPollInterval(interval) 543 } 544 private withdrawRequestedPollInterval() { 545 if (this.requestedPollInterval) { 546 this.requestedPollInterval() 547 } 548 } 549 550 private pendingFetchConvo: 551 | Promise<{ 552 convo: ChatBskyConvoDefs.ConvoView 553 sender: ChatBskyActorDefs.ProfileViewBasic | undefined 554 recipients: ChatBskyActorDefs.ProfileViewBasic[] 555 }> 556 | undefined 557 async fetchConvo() { 558 if (this.pendingFetchConvo) return this.pendingFetchConvo 559 560 this.pendingFetchConvo = new Promise<{ 561 convo: ChatBskyConvoDefs.ConvoView 562 sender: ChatBskyActorDefs.ProfileViewBasic | undefined 563 recipients: ChatBskyActorDefs.ProfileViewBasic[] 564 }>(async (resolve, reject) => { 565 try { 566 const response = await networkRetry(2, () => { 567 return this.agent.api.chat.bsky.convo.getConvo( 568 { 569 convoId: this.convoId, 570 }, 571 {headers: DM_SERVICE_HEADERS}, 572 ) 573 }) 574 575 const convo = response.data.convo 576 577 resolve({ 578 convo, 579 sender: convo.members.find(m => m.did === this.senderUserDid), 580 recipients: convo.members.filter(m => m.did !== this.senderUserDid), 581 }) 582 } catch (e) { 583 reject(e) 584 } finally { 585 this.pendingFetchConvo = undefined 586 } 587 }) 588 589 return this.pendingFetchConvo 590 } 591 592 async refreshConvo() { 593 try { 594 const {convo, sender, recipients} = await this.fetchConvo() 595 // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE') 596 this.convo = convo || this.convo 597 this.sender = sender || this.sender 598 this.recipients = recipients || this.recipients 599 } catch (e: any) { 600 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) { 601 logger.error(`failed to refresh convo`, { 602 safeMessage: e.message, 603 }) 604 } 605 } 606 } 607 608 private fetchMessageHistoryError: 609 | { 610 retry: () => void 611 } 612 | undefined 613 async fetchMessageHistory() { 614 logger.debug('fetch message history', {}) 615 616 /* 617 * If oldestRev is null, we've fetched all history. 618 */ 619 if (this.oldestRev === null) return 620 621 /* 622 * Don't fetch again if a fetch is already in progress 623 */ 624 if (this.isFetchingHistory) return 625 626 /* 627 * If we've rendered a retry state for history fetching, exit. Upon retry, 628 * this will be removed and we'll try again. 629 */ 630 if (this.fetchMessageHistoryError) return 631 632 try { 633 this.isFetchingHistory = true 634 this.commit() 635 636 const nextCursor = this.oldestRev // for TS 637 const response = await networkRetry(2, () => { 638 return this.agent.api.chat.bsky.convo.getMessages( 639 { 640 cursor: nextCursor, 641 convoId: this.convoId, 642 limit: IS_NATIVE ? 30 : 60, 643 }, 644 {headers: DM_SERVICE_HEADERS}, 645 ) 646 }) 647 const {cursor, messages} = response.data 648 649 this.oldestRev = cursor ?? null 650 651 for (const message of messages) { 652 if ( 653 ChatBskyConvoDefs.isMessageView(message) || 654 ChatBskyConvoDefs.isDeletedMessageView(message) 655 ) { 656 /* 657 * If this message is already in new messages, it was added by the 658 * firehose ingestion, and we can safely overwrite it. This trusts 659 * the server on ordering, and keeps it in sync. 660 */ 661 if (this.newMessages.has(message.id)) { 662 this.newMessages.delete(message.id) 663 } 664 this.pastMessages.set(message.id, message) 665 } 666 } 667 } catch (e: any) { 668 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) { 669 logger.error('failed to fetch message history', { 670 safeMessage: e.message, 671 }) 672 } 673 674 this.fetchMessageHistoryError = { 675 retry: () => { 676 this.fetchMessageHistory() 677 }, 678 } 679 } finally { 680 this.isFetchingHistory = false 681 this.commit() 682 } 683 } 684 685 private cleanupFirehoseConnection: (() => void) | undefined 686 private setupFirehose() { 687 // remove old listeners, if exist 688 this.cleanupFirehoseConnection?.() 689 690 // reconnect 691 this.cleanupFirehoseConnection = this.events.on( 692 event => { 693 switch (event.type) { 694 case 'connect': { 695 this.onFirehoseConnect() 696 break 697 } 698 case 'error': { 699 this.onFirehoseError(event.error) 700 break 701 } 702 case 'logs': { 703 this.ingestFirehose(event.logs) 704 break 705 } 706 } 707 }, 708 /* 709 * This is VERY important — we only want events for this convo. 710 */ 711 {convoId: this.convoId}, 712 ) 713 } 714 715 private firehoseError: MessagesEventBusError | undefined 716 717 onFirehoseConnect() { 718 this.firehoseError = undefined 719 this.batchRetryPendingMessages() 720 this.commit() 721 } 722 723 onFirehoseError(error?: MessagesEventBusError) { 724 this.firehoseError = error 725 this.commit() 726 } 727 728 ingestFirehose(events: ChatBskyConvoGetLog.OutputSchema['logs']) { 729 let needsCommit = false 730 731 for (const ev of events) { 732 /* 733 * If there's a rev, we should handle it. If there's not a rev, we don't 734 * know what it is. 735 */ 736 if ('rev' in ev && typeof ev.rev === 'string') { 737 const isUninitialized = !this.latestRev 738 const isNewEvent = this.latestRev && ev.rev > this.latestRev 739 740 /* 741 * We received an event prior to fetching any history, so we can safely 742 * use this as the initial history cursor 743 */ 744 if (this.oldestRev === undefined && isUninitialized) { 745 this.oldestRev = ev.rev 746 } 747 748 /* 749 * We only care about new events 750 */ 751 if (isNewEvent || isUninitialized) { 752 /* 753 * Update rev regardless of if it's a ev type we care about or not 754 */ 755 this.latestRev = ev.rev 756 757 if ( 758 ChatBskyConvoDefs.isLogCreateMessage(ev) && 759 ChatBskyConvoDefs.isMessageView(ev.message) 760 ) { 761 /** 762 * If this message is already in new messages, it was added by our 763 * sending logic, and is based on client-ordering. When we receive 764 * the "commited" event from the log, we should replace this 765 * reference and re-insert in order to respect the order we receied 766 * from the log. 767 */ 768 if (this.newMessages.has(ev.message.id)) { 769 this.newMessages.delete(ev.message.id) 770 } 771 this.newMessages.set(ev.message.id, ev.message) 772 needsCommit = true 773 } else if ( 774 ChatBskyConvoDefs.isLogDeleteMessage(ev) && 775 ChatBskyConvoDefs.isDeletedMessageView(ev.message) 776 ) { 777 /* 778 * Update if we have this in state. If we don't, don't worry about it. 779 */ 780 if ( 781 this.pastMessages.has(ev.message.id) || 782 this.newMessages.has(ev.message.id) 783 ) { 784 this.pastMessages.delete(ev.message.id) 785 this.newMessages.delete(ev.message.id) 786 this.deletedMessages.delete(ev.message.id) 787 needsCommit = true 788 } 789 } else if ( 790 (ChatBskyConvoDefs.isLogAddReaction(ev) || 791 ChatBskyConvoDefs.isLogRemoveReaction(ev)) && 792 ChatBskyConvoDefs.isMessageView(ev.message) 793 ) { 794 /* 795 * Update if we have this in state - replace message wholesale. If we don't, don't worry about it. 796 */ 797 if (this.pastMessages.has(ev.message.id)) { 798 this.pastMessages.set(ev.message.id, ev.message) 799 needsCommit = true 800 } 801 if (this.newMessages.has(ev.message.id)) { 802 this.newMessages.set(ev.message.id, ev.message) 803 needsCommit = true 804 } 805 } 806 } 807 } 808 } 809 810 if (needsCommit) { 811 this.commit() 812 } 813 } 814 815 private pendingMessageFailure: 'recoverable' | 'unrecoverable' | null = null 816 817 sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { 818 // Ignore empty messages for now since they have no other purpose atm 819 if (!message.text.trim() && !message.embed) return 820 821 logger.debug('send message', {}) 822 823 const tempId = nanoid() 824 825 this.pendingMessageFailure = null 826 this.pendingMessages.set(tempId, { 827 id: tempId, 828 message, 829 }) 830 if (this.convo?.status === 'request') { 831 this.convo = { 832 ...this.convo, 833 status: 'accepted', 834 } 835 } 836 this.commit() 837 838 if (!this.isProcessingPendingMessages && !this.pendingMessageFailure) { 839 this.processPendingMessages() 840 } 841 } 842 843 markConvoAccepted() { 844 if (this.convo) { 845 this.convo = { 846 ...this.convo, 847 status: 'accepted', 848 } 849 } 850 this.commit() 851 } 852 853 async processPendingMessages() { 854 logger.debug( 855 `processing messages (${this.pendingMessages.size} remaining)`, 856 {}, 857 ) 858 859 const pendingMessage = Array.from(this.pendingMessages.values()).shift() 860 861 /* 862 * If there are no pending messages, we're done. 863 */ 864 if (!pendingMessage) { 865 this.isProcessingPendingMessages = false 866 return 867 } 868 869 try { 870 this.isProcessingPendingMessages = true 871 872 const {id, message} = pendingMessage 873 874 const response = await this.agent.api.chat.bsky.convo.sendMessage( 875 { 876 convoId: this.convoId, 877 message, 878 }, 879 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 880 ) 881 const res = response.data 882 883 // remove from queue 884 this.pendingMessages.delete(id) 885 886 /* 887 * Insert into `newMessages` as soon as we have a real ID. That way, when 888 * we get an event log back, we can replace in situ. 889 */ 890 this.newMessages.set(res.id, { 891 ...res, 892 $type: 'chat.bsky.convo.defs#messageView', 893 }) 894 // render new message state, prior to firehose 895 this.commit() 896 897 // continue queue processing 898 await this.processPendingMessages() 899 } catch (e: any) { 900 this.handleSendMessageFailure(e) 901 this.isProcessingPendingMessages = false 902 } 903 } 904 905 private handleSendMessageFailure(e: any) { 906 if (e instanceof XRPCError) { 907 if (NETWORK_FAILURE_STATUSES.includes(e.status)) { 908 this.pendingMessageFailure = 'recoverable' 909 } else { 910 this.pendingMessageFailure = 'unrecoverable' 911 912 switch (e.message) { 913 case 'block between recipient and sender': 914 this.emitter.emit('event', { 915 type: 'invalidate-block-state', 916 accountDids: [ 917 this.sender!.did, 918 ...this.recipients!.map(r => r.did), 919 ], 920 }) 921 break 922 case 'Account is disabled': 923 this.dispatch({event: ConvoDispatchEvent.Disable}) 924 break 925 case 'Convo not found': 926 case 'Account does not exist': 927 case 'recipient does not exist': 928 case 'recipient requires incoming messages to come from someone they follow': 929 case 'recipient has disabled incoming messages': 930 break 931 default: 932 if (!isNetworkError(e)) { 933 logger.warn(`handleSendMessageFailure could not handle error`, { 934 status: e.status, 935 message: e.message, 936 }) 937 } 938 break 939 } 940 } 941 } else { 942 this.pendingMessageFailure = 'unrecoverable' 943 944 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) { 945 logger.error(`handleSendMessageFailure received unknown error`, { 946 safeMessage: e.message, 947 }) 948 } 949 } 950 951 this.commit() 952 } 953 954 async batchRetryPendingMessages() { 955 if (this.pendingMessageFailure === null) return 956 957 const messageArray = Array.from(this.pendingMessages.values()) 958 if (messageArray.length === 0) return 959 960 this.pendingMessageFailure = null 961 this.commit() 962 963 logger.debug( 964 `batch retrying ${this.pendingMessages.size} pending messages`, 965 {}, 966 ) 967 968 try { 969 const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch( 970 { 971 items: messageArray.map(({message}) => ({ 972 convoId: this.convoId, 973 message, 974 })), 975 }, 976 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 977 ) 978 const {items} = data 979 980 /* 981 * Insert into `newMessages` as soon as we have a real ID. That way, when 982 * we get an event log back, we can replace in situ. 983 */ 984 for (const item of items) { 985 this.newMessages.set(item.id, { 986 ...item, 987 $type: 'chat.bsky.convo.defs#messageView', 988 }) 989 } 990 991 for (const pendingMessage of messageArray) { 992 this.pendingMessages.delete(pendingMessage.id) 993 } 994 995 this.commit() 996 997 logger.debug(`sent ${this.pendingMessages.size} pending messages`, {}) 998 } catch (e: any) { 999 this.handleSendMessageFailure(e) 1000 } 1001 } 1002 1003 async deleteMessage(messageId: string) { 1004 logger.debug('delete message', {}) 1005 1006 this.deletedMessages.add(messageId) 1007 this.commit() 1008 1009 try { 1010 await networkRetry(2, () => { 1011 return this.agent.api.chat.bsky.convo.deleteMessageForSelf( 1012 { 1013 convoId: this.convoId, 1014 messageId, 1015 }, 1016 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 1017 ) 1018 }) 1019 } catch (e: any) { 1020 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) { 1021 logger.error(`failed to delete message`, { 1022 safeMessage: e.message, 1023 }) 1024 } 1025 this.deletedMessages.delete(messageId) 1026 this.commit() 1027 throw e 1028 } 1029 } 1030 1031 on(handler: (event: ConvoEvent) => void) { 1032 this.emitter.on('event', handler) 1033 1034 return () => { 1035 this.emitter.off('event', handler) 1036 } 1037 } 1038 1039 /* 1040 * Items in reverse order, since FlatList inverts 1041 */ 1042 getItems(): ConvoItem[] { 1043 const items: ConvoItem[] = [] 1044 1045 this.pastMessages.forEach(m => { 1046 if (ChatBskyConvoDefs.isMessageView(m)) { 1047 items.unshift({ 1048 type: 'message', 1049 key: m.id, 1050 message: m, 1051 nextMessage: null, 1052 prevMessage: null, 1053 }) 1054 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { 1055 items.unshift({ 1056 type: 'deleted-message', 1057 key: m.id, 1058 message: m, 1059 nextMessage: null, 1060 prevMessage: null, 1061 }) 1062 } 1063 }) 1064 1065 if (this.fetchMessageHistoryError) { 1066 items.unshift({ 1067 type: 'error', 1068 code: ConvoItemError.HistoryFailed, 1069 key: ConvoItemError.HistoryFailed, 1070 retry: () => { 1071 this.maybeRecoverFromNetworkError() 1072 }, 1073 }) 1074 } 1075 1076 this.newMessages.forEach(m => { 1077 if (ChatBskyConvoDefs.isMessageView(m)) { 1078 items.push({ 1079 type: 'message', 1080 key: m.id, 1081 message: m, 1082 nextMessage: null, 1083 prevMessage: null, 1084 }) 1085 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { 1086 items.push({ 1087 type: 'deleted-message', 1088 key: m.id, 1089 message: m, 1090 nextMessage: null, 1091 prevMessage: null, 1092 }) 1093 } 1094 }) 1095 1096 this.pendingMessages.forEach(m => { 1097 items.push({ 1098 type: 'pending-message', 1099 key: m.id, 1100 message: { 1101 ...m.message, 1102 embed: undefined, 1103 $type: 'chat.bsky.convo.defs#messageView', 1104 id: nanoid(), 1105 rev: '__fake__', 1106 sentAt: new Date().toISOString(), 1107 /* 1108 * `getItems` is only run in "active" status states, where 1109 * `this.sender` is defined 1110 */ 1111 sender: { 1112 $type: 'chat.bsky.convo.defs#messageViewSender', 1113 did: this.sender!.did, 1114 }, 1115 }, 1116 nextMessage: null, 1117 prevMessage: null, 1118 failed: this.pendingMessageFailure !== null, 1119 retry: 1120 this.pendingMessageFailure === 'recoverable' 1121 ? () => { 1122 this.maybeRecoverFromNetworkError() 1123 } 1124 : undefined, 1125 }) 1126 }) 1127 1128 if (this.firehoseError) { 1129 items.push({ 1130 type: 'error', 1131 code: ConvoItemError.FirehoseFailed, 1132 key: ConvoItemError.FirehoseFailed, 1133 retry: () => { 1134 this.firehoseError?.retry() 1135 }, 1136 }) 1137 } 1138 1139 return items 1140 .filter(item => { 1141 if (isConvoItemMessage(item)) { 1142 return !this.deletedMessages.has(item.message.id) 1143 } 1144 return true 1145 }) 1146 .map((item, i, arr) => { 1147 let nextMessage = null 1148 let prevMessage = null 1149 const isMessage = isConvoItemMessage(item) 1150 1151 if (isMessage) { 1152 if ( 1153 ChatBskyConvoDefs.isMessageView(item.message) || 1154 ChatBskyConvoDefs.isDeletedMessageView(item.message) 1155 ) { 1156 const next = arr[i + 1] 1157 1158 if ( 1159 isConvoItemMessage(next) && 1160 (ChatBskyConvoDefs.isMessageView(next.message) || 1161 ChatBskyConvoDefs.isDeletedMessageView(next.message)) 1162 ) { 1163 nextMessage = next.message 1164 } 1165 1166 const prev = arr[i - 1] 1167 1168 if ( 1169 isConvoItemMessage(prev) && 1170 (ChatBskyConvoDefs.isMessageView(prev.message) || 1171 ChatBskyConvoDefs.isDeletedMessageView(prev.message)) 1172 ) { 1173 prevMessage = prev.message 1174 } 1175 } 1176 1177 return { 1178 ...item, 1179 nextMessage, 1180 prevMessage, 1181 } 1182 } 1183 1184 return item 1185 }) 1186 } 1187 1188 /** 1189 * Add an emoji reaction to a message 1190 * 1191 * @param messageId - the id of the message to add the reaction to 1192 * @param emoji - must be one grapheme 1193 */ 1194 async addReaction(messageId: string, emoji: string) { 1195 const optimisticReaction = { 1196 value: emoji, 1197 sender: {did: this.senderUserDid}, 1198 createdAt: new Date().toISOString(), 1199 } 1200 let restore: null | (() => void) = null 1201 if (this.pastMessages.has(messageId)) { 1202 const prevMessage = this.pastMessages.get(messageId) 1203 if ( 1204 ChatBskyConvoDefs.isMessageView(prevMessage) && 1205 // skip optimistic update if reaction already exists 1206 !prevMessage.reactions?.find( 1207 reaction => 1208 reaction.sender.did === this.senderUserDid && 1209 reaction.value === emoji, 1210 ) 1211 ) { 1212 if (prevMessage.reactions) { 1213 if ( 1214 prevMessage.reactions.filter( 1215 reaction => reaction.sender.did === this.senderUserDid, 1216 ).length >= 5 1217 ) { 1218 throw new Error('Maximum reactions reached') 1219 } 1220 } 1221 this.pastMessages.set(messageId, { 1222 ...prevMessage, 1223 reactions: [...(prevMessage.reactions ?? []), optimisticReaction], 1224 }) 1225 this.commit() 1226 restore = () => { 1227 this.pastMessages.set(messageId, prevMessage) 1228 this.commit() 1229 } 1230 } 1231 } else if (this.newMessages.has(messageId)) { 1232 const prevMessage = this.newMessages.get(messageId) 1233 if ( 1234 ChatBskyConvoDefs.isMessageView(prevMessage) && 1235 !prevMessage.reactions?.find(reaction => reaction.value === emoji) 1236 ) { 1237 if (prevMessage.reactions && prevMessage.reactions.length >= 5) 1238 throw new Error('Maximum reactions reached') 1239 this.newMessages.set(messageId, { 1240 ...prevMessage, 1241 reactions: [...(prevMessage.reactions ?? []), optimisticReaction], 1242 }) 1243 this.commit() 1244 restore = () => { 1245 this.newMessages.set(messageId, prevMessage) 1246 this.commit() 1247 } 1248 } 1249 } 1250 1251 try { 1252 logger.debug(`Adding reaction ${emoji} to message ${messageId}`) 1253 const {data} = await this.agent.chat.bsky.convo.addReaction( 1254 {messageId, value: emoji, convoId: this.convoId}, 1255 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 1256 ) 1257 if (ChatBskyConvoDefs.isMessageView(data.message)) { 1258 if (this.pastMessages.has(messageId)) { 1259 this.pastMessages.set(messageId, data.message) 1260 this.commit() 1261 } else if (this.newMessages.has(messageId)) { 1262 this.newMessages.set(messageId, data.message) 1263 this.commit() 1264 } 1265 } 1266 } catch (error) { 1267 if (restore) restore() 1268 throw error 1269 } 1270 } 1271 1272 /* 1273 * Remove a reaction from a message. 1274 * 1275 * @param messageId - The ID of the message to remove the reaction from. 1276 * @param emoji - The emoji to remove. 1277 */ 1278 async removeReaction(messageId: string, emoji: string) { 1279 let restore: null | (() => void) = null 1280 if (this.pastMessages.has(messageId)) { 1281 const prevMessage = this.pastMessages.get(messageId) 1282 if (ChatBskyConvoDefs.isMessageView(prevMessage)) { 1283 this.pastMessages.set(messageId, { 1284 ...prevMessage, 1285 reactions: prevMessage.reactions?.filter( 1286 reaction => 1287 reaction.value !== emoji || 1288 reaction.sender.did !== this.senderUserDid, 1289 ), 1290 }) 1291 this.commit() 1292 restore = () => { 1293 this.pastMessages.set(messageId, prevMessage) 1294 this.commit() 1295 } 1296 } 1297 } else if (this.newMessages.has(messageId)) { 1298 const prevMessage = this.newMessages.get(messageId) 1299 if (ChatBskyConvoDefs.isMessageView(prevMessage)) { 1300 this.newMessages.set(messageId, { 1301 ...prevMessage, 1302 reactions: prevMessage.reactions?.filter( 1303 reaction => 1304 reaction.value !== emoji || 1305 reaction.sender.did !== this.senderUserDid, 1306 ), 1307 }) 1308 this.commit() 1309 restore = () => { 1310 this.newMessages.set(messageId, prevMessage) 1311 this.commit() 1312 } 1313 } 1314 } 1315 1316 try { 1317 logger.debug(`Removing reaction ${emoji} from message ${messageId}`) 1318 await this.agent.chat.bsky.convo.removeReaction( 1319 {messageId, value: emoji, convoId: this.convoId}, 1320 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 1321 ) 1322 } catch (error) { 1323 if (restore) restore() 1324 throw error 1325 } 1326 } 1327}