mirror of https://git.lenooby09.tech/LeNooby09/social-app.git
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at schema-errors 1092 lines 31 kB view raw
1import { 2 AppBskyActorDefs, 3 BskyAgent, 4 ChatBskyConvoDefs, 5 ChatBskyConvoGetLog, 6 ChatBskyConvoSendMessage, 7} from '@atproto/api' 8import {XRPCError} from '@atproto/xrpc' 9import EventEmitter from 'eventemitter3' 10import {nanoid} from 'nanoid/non-secure' 11 12import {networkRetry} from '#/lib/async/retry' 13import {logger} from '#/logger' 14import {isNative} from '#/platform/detection' 15import { 16 ACTIVE_POLL_INTERVAL, 17 BACKGROUND_POLL_INTERVAL, 18 INACTIVE_TIMEOUT, 19 NETWORK_FAILURE_STATUSES, 20} from '#/state/messages/convo/const' 21import { 22 ConvoDispatch, 23 ConvoDispatchEvent, 24 ConvoError, 25 ConvoErrorCode, 26 ConvoEvent, 27 ConvoItem, 28 ConvoItemError, 29 ConvoParams, 30 ConvoState, 31 ConvoStatus, 32} from '#/state/messages/convo/types' 33import {MessagesEventBus} from '#/state/messages/events/agent' 34import {MessagesEventBusError} from '#/state/messages/events/types' 35import {DM_SERVICE_HEADERS} from '#/state/queries/messages/const' 36 37export function isConvoItemMessage( 38 item: ConvoItem, 39): item is ConvoItem & {type: 'message'} { 40 if (!item) return false 41 return ( 42 item.type === 'message' || 43 item.type === 'deleted-message' || 44 item.type === 'pending-message' 45 ) 46} 47 48export class Convo { 49 private id: string 50 51 private agent: BskyAgent 52 private events: MessagesEventBus 53 private senderUserDid: string 54 55 private status: ConvoStatus = ConvoStatus.Uninitialized 56 private error: ConvoError | undefined 57 private oldestRev: string | undefined | null = undefined 58 private isFetchingHistory = false 59 private latestRev: string | undefined = undefined 60 61 private pastMessages: Map< 62 string, 63 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView 64 > = new Map() 65 private newMessages: Map< 66 string, 67 ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView 68 > = new Map() 69 private pendingMessages: Map< 70 string, 71 {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} 72 > = new Map() 73 private deletedMessages: Set<string> = new Set() 74 75 private isProcessingPendingMessages = false 76 77 private lastActiveTimestamp: number | undefined 78 79 private emitter = new EventEmitter<{event: [ConvoEvent]}>() 80 81 convoId: string 82 convo: ChatBskyConvoDefs.ConvoView | undefined 83 sender: AppBskyActorDefs.ProfileViewBasic | undefined 84 recipients: AppBskyActorDefs.ProfileViewBasic[] | undefined = undefined 85 snapshot: ConvoState | undefined 86 87 constructor(params: ConvoParams) { 88 this.id = nanoid(3) 89 this.convoId = params.convoId 90 this.agent = params.agent 91 this.events = params.events 92 this.senderUserDid = params.agent.session?.did! 93 94 this.subscribe = this.subscribe.bind(this) 95 this.getSnapshot = this.getSnapshot.bind(this) 96 this.sendMessage = this.sendMessage.bind(this) 97 this.deleteMessage = this.deleteMessage.bind(this) 98 this.fetchMessageHistory = this.fetchMessageHistory.bind(this) 99 this.ingestFirehose = this.ingestFirehose.bind(this) 100 this.onFirehoseConnect = this.onFirehoseConnect.bind(this) 101 this.onFirehoseError = this.onFirehoseError.bind(this) 102 } 103 104 private commit() { 105 this.snapshot = undefined 106 this.subscribers.forEach(subscriber => subscriber()) 107 } 108 109 private subscribers: (() => void)[] = [] 110 111 subscribe(subscriber: () => void) { 112 if (this.subscribers.length === 0) this.init() 113 114 this.subscribers.push(subscriber) 115 116 return () => { 117 this.subscribers = this.subscribers.filter(s => s !== subscriber) 118 if (this.subscribers.length === 0) this.suspend() 119 } 120 } 121 122 getSnapshot(): ConvoState { 123 if (!this.snapshot) this.snapshot = this.generateSnapshot() 124 // logger.debug('Convo: snapshotted', {}, logger.DebugContext.convo) 125 return this.snapshot 126 } 127 128 private generateSnapshot(): ConvoState { 129 switch (this.status) { 130 case ConvoStatus.Initializing: { 131 return { 132 status: ConvoStatus.Initializing, 133 items: [], 134 convo: undefined, 135 error: undefined, 136 sender: undefined, 137 recipients: undefined, 138 isFetchingHistory: this.isFetchingHistory, 139 deleteMessage: undefined, 140 sendMessage: undefined, 141 fetchMessageHistory: undefined, 142 } 143 } 144 case ConvoStatus.Disabled: 145 case ConvoStatus.Suspended: 146 case ConvoStatus.Backgrounded: 147 case ConvoStatus.Ready: { 148 return { 149 status: this.status, 150 items: this.getItems(), 151 convo: this.convo!, 152 error: undefined, 153 sender: this.sender!, 154 recipients: this.recipients!, 155 isFetchingHistory: this.isFetchingHistory, 156 deleteMessage: this.deleteMessage, 157 sendMessage: this.sendMessage, 158 fetchMessageHistory: this.fetchMessageHistory, 159 } 160 } 161 case ConvoStatus.Error: { 162 return { 163 status: ConvoStatus.Error, 164 items: [], 165 convo: undefined, 166 error: this.error!, 167 sender: undefined, 168 recipients: undefined, 169 isFetchingHistory: false, 170 deleteMessage: undefined, 171 sendMessage: undefined, 172 fetchMessageHistory: undefined, 173 } 174 } 175 default: { 176 return { 177 status: ConvoStatus.Uninitialized, 178 items: [], 179 convo: undefined, 180 error: undefined, 181 sender: undefined, 182 recipients: undefined, 183 isFetchingHistory: false, 184 deleteMessage: undefined, 185 sendMessage: undefined, 186 fetchMessageHistory: undefined, 187 } 188 } 189 } 190 } 191 192 dispatch(action: ConvoDispatch) { 193 const prevStatus = this.status 194 195 switch (this.status) { 196 case ConvoStatus.Uninitialized: { 197 switch (action.event) { 198 case ConvoDispatchEvent.Init: { 199 this.status = ConvoStatus.Initializing 200 this.setup() 201 this.setupFirehose() 202 this.requestPollInterval(ACTIVE_POLL_INTERVAL) 203 break 204 } 205 } 206 break 207 } 208 case ConvoStatus.Initializing: { 209 switch (action.event) { 210 case ConvoDispatchEvent.Ready: { 211 this.status = ConvoStatus.Ready 212 this.fetchMessageHistory() 213 break 214 } 215 case ConvoDispatchEvent.Background: { 216 this.status = ConvoStatus.Backgrounded 217 this.fetchMessageHistory() 218 this.requestPollInterval(BACKGROUND_POLL_INTERVAL) 219 break 220 } 221 case ConvoDispatchEvent.Suspend: { 222 this.status = ConvoStatus.Suspended 223 this.cleanupFirehoseConnection?.() 224 this.withdrawRequestedPollInterval() 225 break 226 } 227 case ConvoDispatchEvent.Error: { 228 this.status = ConvoStatus.Error 229 this.error = action.payload 230 this.cleanupFirehoseConnection?.() 231 this.withdrawRequestedPollInterval() 232 break 233 } 234 case ConvoDispatchEvent.Disable: { 235 this.status = ConvoStatus.Disabled 236 this.fetchMessageHistory() // finish init 237 this.cleanupFirehoseConnection?.() 238 this.withdrawRequestedPollInterval() 239 break 240 } 241 } 242 break 243 } 244 case ConvoStatus.Ready: { 245 switch (action.event) { 246 case ConvoDispatchEvent.Resume: { 247 this.refreshConvo() 248 this.requestPollInterval(ACTIVE_POLL_INTERVAL) 249 break 250 } 251 case ConvoDispatchEvent.Background: { 252 this.status = ConvoStatus.Backgrounded 253 this.requestPollInterval(BACKGROUND_POLL_INTERVAL) 254 break 255 } 256 case ConvoDispatchEvent.Suspend: { 257 this.status = ConvoStatus.Suspended 258 this.cleanupFirehoseConnection?.() 259 this.withdrawRequestedPollInterval() 260 break 261 } 262 case ConvoDispatchEvent.Error: { 263 this.status = ConvoStatus.Error 264 this.error = action.payload 265 this.cleanupFirehoseConnection?.() 266 this.withdrawRequestedPollInterval() 267 break 268 } 269 case ConvoDispatchEvent.Disable: { 270 this.status = ConvoStatus.Disabled 271 this.cleanupFirehoseConnection?.() 272 this.withdrawRequestedPollInterval() 273 break 274 } 275 } 276 break 277 } 278 case ConvoStatus.Backgrounded: { 279 switch (action.event) { 280 case ConvoDispatchEvent.Resume: { 281 if (this.wasChatInactive()) { 282 this.reset() 283 } else { 284 if (this.convo) { 285 this.status = ConvoStatus.Ready 286 this.refreshConvo() 287 this.maybeRecoverFromNetworkError() 288 } else { 289 this.status = ConvoStatus.Initializing 290 this.setup() 291 } 292 this.requestPollInterval(ACTIVE_POLL_INTERVAL) 293 } 294 break 295 } 296 case ConvoDispatchEvent.Suspend: { 297 this.status = ConvoStatus.Suspended 298 this.cleanupFirehoseConnection?.() 299 this.withdrawRequestedPollInterval() 300 break 301 } 302 case ConvoDispatchEvent.Error: { 303 this.status = ConvoStatus.Error 304 this.error = action.payload 305 this.cleanupFirehoseConnection?.() 306 this.withdrawRequestedPollInterval() 307 break 308 } 309 case ConvoDispatchEvent.Disable: { 310 this.status = ConvoStatus.Disabled 311 this.cleanupFirehoseConnection?.() 312 this.withdrawRequestedPollInterval() 313 break 314 } 315 } 316 break 317 } 318 case ConvoStatus.Suspended: { 319 switch (action.event) { 320 case ConvoDispatchEvent.Init: { 321 this.reset() 322 break 323 } 324 case ConvoDispatchEvent.Resume: { 325 this.reset() 326 break 327 } 328 case ConvoDispatchEvent.Error: { 329 this.status = ConvoStatus.Error 330 this.error = action.payload 331 break 332 } 333 case ConvoDispatchEvent.Disable: { 334 this.status = ConvoStatus.Disabled 335 break 336 } 337 } 338 break 339 } 340 case ConvoStatus.Error: { 341 switch (action.event) { 342 case ConvoDispatchEvent.Init: { 343 this.reset() 344 break 345 } 346 case ConvoDispatchEvent.Resume: { 347 this.reset() 348 break 349 } 350 case ConvoDispatchEvent.Suspend: { 351 this.status = ConvoStatus.Suspended 352 break 353 } 354 case ConvoDispatchEvent.Error: { 355 this.status = ConvoStatus.Error 356 this.error = action.payload 357 break 358 } 359 case ConvoDispatchEvent.Disable: { 360 this.status = ConvoStatus.Disabled 361 break 362 } 363 } 364 break 365 } 366 case ConvoStatus.Disabled: { 367 // can't do anything 368 break 369 } 370 default: 371 break 372 } 373 374 logger.debug( 375 `Convo: dispatch '${action.event}'`, 376 { 377 id: this.id, 378 prev: prevStatus, 379 next: this.status, 380 }, 381 logger.DebugContext.convo, 382 ) 383 384 this.updateLastActiveTimestamp() 385 this.commit() 386 } 387 388 private reset() { 389 this.convo = undefined 390 this.sender = undefined 391 this.recipients = undefined 392 this.snapshot = undefined 393 394 this.status = ConvoStatus.Uninitialized 395 this.error = undefined 396 this.oldestRev = undefined 397 this.latestRev = undefined 398 399 this.pastMessages = new Map() 400 this.newMessages = new Map() 401 this.pendingMessages = new Map() 402 this.deletedMessages = new Set() 403 404 this.pendingMessageFailure = null 405 this.fetchMessageHistoryError = undefined 406 this.firehoseError = undefined 407 408 this.dispatch({event: ConvoDispatchEvent.Init}) 409 } 410 411 maybeRecoverFromNetworkError() { 412 if (this.firehoseError) { 413 this.firehoseError.retry() 414 this.firehoseError = undefined 415 this.commit() 416 } else { 417 this.batchRetryPendingMessages() 418 } 419 420 if (this.fetchMessageHistoryError) { 421 this.fetchMessageHistoryError.retry() 422 this.fetchMessageHistoryError = undefined 423 this.commit() 424 } 425 } 426 427 private async setup() { 428 try { 429 const {convo, sender, recipients} = await this.fetchConvo() 430 431 this.convo = convo 432 this.sender = sender 433 this.recipients = recipients 434 435 /* 436 * Some validation prior to `Ready` status 437 */ 438 if (!this.convo) { 439 throw new Error('Convo: could not find convo') 440 } 441 if (!this.sender) { 442 throw new Error('Convo: could not find sender in convo') 443 } 444 if (!this.recipients) { 445 throw new Error('Convo: could not find recipients in convo') 446 } 447 448 const userIsDisabled = this.sender.chatDisabled as boolean 449 450 if (userIsDisabled) { 451 this.dispatch({event: ConvoDispatchEvent.Disable}) 452 } else { 453 this.dispatch({event: ConvoDispatchEvent.Ready}) 454 } 455 } catch (e: any) { 456 logger.error(e, {context: 'Convo: setup failed'}) 457 458 this.dispatch({ 459 event: ConvoDispatchEvent.Error, 460 payload: { 461 exception: e, 462 code: ConvoErrorCode.InitFailed, 463 retry: () => { 464 this.reset() 465 }, 466 }, 467 }) 468 this.commit() 469 } 470 } 471 472 init() { 473 this.dispatch({event: ConvoDispatchEvent.Init}) 474 } 475 476 resume() { 477 this.dispatch({event: ConvoDispatchEvent.Resume}) 478 } 479 480 background() { 481 this.dispatch({event: ConvoDispatchEvent.Background}) 482 } 483 484 suspend() { 485 this.dispatch({event: ConvoDispatchEvent.Suspend}) 486 } 487 488 /** 489 * Called on any state transition, like when the chat is backgrounded. This 490 * value is then checked on background -> foreground transitions. 491 */ 492 private updateLastActiveTimestamp() { 493 this.lastActiveTimestamp = Date.now() 494 } 495 private wasChatInactive() { 496 if (!this.lastActiveTimestamp) return true 497 return Date.now() - this.lastActiveTimestamp > INACTIVE_TIMEOUT 498 } 499 500 private requestedPollInterval: (() => void) | undefined 501 private requestPollInterval(interval: number) { 502 this.withdrawRequestedPollInterval() 503 this.requestedPollInterval = this.events.requestPollInterval(interval) 504 } 505 private withdrawRequestedPollInterval() { 506 if (this.requestedPollInterval) { 507 this.requestedPollInterval() 508 } 509 } 510 511 private pendingFetchConvo: 512 | Promise<{ 513 convo: ChatBskyConvoDefs.ConvoView 514 sender: AppBskyActorDefs.ProfileViewBasic | undefined 515 recipients: AppBskyActorDefs.ProfileViewBasic[] 516 }> 517 | undefined 518 async fetchConvo() { 519 if (this.pendingFetchConvo) return this.pendingFetchConvo 520 521 this.pendingFetchConvo = new Promise<{ 522 convo: ChatBskyConvoDefs.ConvoView 523 sender: AppBskyActorDefs.ProfileViewBasic | undefined 524 recipients: AppBskyActorDefs.ProfileViewBasic[] 525 }>(async (resolve, reject) => { 526 try { 527 const response = await networkRetry(2, () => { 528 return this.agent.api.chat.bsky.convo.getConvo( 529 { 530 convoId: this.convoId, 531 }, 532 {headers: DM_SERVICE_HEADERS}, 533 ) 534 }) 535 536 const convo = response.data.convo 537 538 resolve({ 539 convo, 540 sender: convo.members.find(m => m.did === this.senderUserDid), 541 recipients: convo.members.filter(m => m.did !== this.senderUserDid), 542 }) 543 } catch (e) { 544 reject(e) 545 } finally { 546 this.pendingFetchConvo = undefined 547 } 548 }) 549 550 return this.pendingFetchConvo 551 } 552 553 async refreshConvo() { 554 try { 555 const {convo, sender, recipients} = await this.fetchConvo() 556 // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE') 557 this.convo = convo || this.convo 558 this.sender = sender || this.sender 559 this.recipients = recipients || this.recipients 560 } catch (e: any) { 561 logger.error(e, {context: `Convo: failed to refresh convo`}) 562 } 563 } 564 565 private fetchMessageHistoryError: 566 | { 567 retry: () => void 568 } 569 | undefined 570 async fetchMessageHistory() { 571 logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo) 572 573 /* 574 * If oldestRev is null, we've fetched all history. 575 */ 576 if (this.oldestRev === null) return 577 578 /* 579 * Don't fetch again if a fetch is already in progress 580 */ 581 if (this.isFetchingHistory) return 582 583 /* 584 * If we've rendered a retry state for history fetching, exit. Upon retry, 585 * this will be removed and we'll try again. 586 */ 587 if (this.fetchMessageHistoryError) return 588 589 try { 590 this.isFetchingHistory = true 591 this.commit() 592 593 const nextCursor = this.oldestRev // for TS 594 const response = await networkRetry(2, () => { 595 return this.agent.api.chat.bsky.convo.getMessages( 596 { 597 cursor: nextCursor, 598 convoId: this.convoId, 599 limit: isNative ? 30 : 60, 600 }, 601 {headers: DM_SERVICE_HEADERS}, 602 ) 603 }) 604 const {cursor, messages} = response.data 605 606 this.oldestRev = cursor ?? null 607 608 for (const message of messages) { 609 if ( 610 ChatBskyConvoDefs.isMessageView(message) || 611 ChatBskyConvoDefs.isDeletedMessageView(message) 612 ) { 613 /* 614 * If this message is already in new messages, it was added by the 615 * firehose ingestion, and we can safely overwrite it. This trusts 616 * the server on ordering, and keeps it in sync. 617 */ 618 if (this.newMessages.has(message.id)) { 619 this.newMessages.delete(message.id) 620 } 621 this.pastMessages.set(message.id, message) 622 } 623 } 624 } catch (e: any) { 625 logger.error('Convo: failed to fetch message history') 626 627 this.fetchMessageHistoryError = { 628 retry: () => { 629 this.fetchMessageHistory() 630 }, 631 } 632 } finally { 633 this.isFetchingHistory = false 634 this.commit() 635 } 636 } 637 638 private cleanupFirehoseConnection: (() => void) | undefined 639 private setupFirehose() { 640 // remove old listeners, if exist 641 this.cleanupFirehoseConnection?.() 642 643 // reconnect 644 this.cleanupFirehoseConnection = this.events.on( 645 event => { 646 switch (event.type) { 647 case 'connect': { 648 this.onFirehoseConnect() 649 break 650 } 651 case 'error': { 652 this.onFirehoseError(event.error) 653 break 654 } 655 case 'logs': { 656 this.ingestFirehose(event.logs) 657 break 658 } 659 } 660 }, 661 /* 662 * This is VERY important — we only want events for this convo. 663 */ 664 {convoId: this.convoId}, 665 ) 666 } 667 668 private firehoseError: MessagesEventBusError | undefined 669 670 onFirehoseConnect() { 671 this.firehoseError = undefined 672 this.batchRetryPendingMessages() 673 this.commit() 674 } 675 676 onFirehoseError(error?: MessagesEventBusError) { 677 this.firehoseError = error 678 this.commit() 679 } 680 681 ingestFirehose(events: ChatBskyConvoGetLog.OutputSchema['logs']) { 682 let needsCommit = false 683 684 for (const ev of events) { 685 /* 686 * If there's a rev, we should handle it. If there's not a rev, we don't 687 * know what it is. 688 */ 689 if (typeof ev.rev === 'string') { 690 const isUninitialized = !this.latestRev 691 const isNewEvent = this.latestRev && ev.rev > this.latestRev 692 693 /* 694 * We received an event prior to fetching any history, so we can safely 695 * use this as the initial history cursor 696 */ 697 if (this.oldestRev === undefined && isUninitialized) { 698 this.oldestRev = ev.rev 699 } 700 701 /* 702 * We only care about new events 703 */ 704 if (isNewEvent || isUninitialized) { 705 /* 706 * Update rev regardless of if it's a ev type we care about or not 707 */ 708 this.latestRev = ev.rev 709 710 if ( 711 ChatBskyConvoDefs.isLogCreateMessage(ev) && 712 ChatBskyConvoDefs.isMessageView(ev.message) 713 ) { 714 /** 715 * If this message is already in new messages, it was added by our 716 * sending logic, and is based on client-ordering. When we receive 717 * the "commited" event from the log, we should replace this 718 * reference and re-insert in order to respect the order we receied 719 * from the log. 720 */ 721 if (this.newMessages.has(ev.message.id)) { 722 this.newMessages.delete(ev.message.id) 723 } 724 this.newMessages.set(ev.message.id, ev.message) 725 needsCommit = true 726 } else if ( 727 ChatBskyConvoDefs.isLogDeleteMessage(ev) && 728 ChatBskyConvoDefs.isDeletedMessageView(ev.message) 729 ) { 730 /* 731 * Update if we have this in state. If we don't, don't worry about it. 732 */ 733 if ( 734 this.pastMessages.has(ev.message.id) || 735 this.newMessages.has(ev.message.id) 736 ) { 737 this.pastMessages.delete(ev.message.id) 738 this.newMessages.delete(ev.message.id) 739 this.deletedMessages.delete(ev.message.id) 740 needsCommit = true 741 } 742 } 743 } 744 } 745 } 746 747 if (needsCommit) { 748 this.commit() 749 } 750 } 751 752 private pendingMessageFailure: 'recoverable' | 'unrecoverable' | null = null 753 754 sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { 755 // Ignore empty messages for now since they have no other purpose atm 756 if (!message.text.trim() && !message.embed) return 757 758 logger.debug('Convo: send message', {}, logger.DebugContext.convo) 759 760 const tempId = nanoid() 761 762 this.pendingMessageFailure = null 763 this.pendingMessages.set(tempId, { 764 id: tempId, 765 message, 766 }) 767 this.commit() 768 769 if (!this.isProcessingPendingMessages && !this.pendingMessageFailure) { 770 this.processPendingMessages() 771 } 772 } 773 774 async processPendingMessages() { 775 logger.debug( 776 `Convo: processing messages (${this.pendingMessages.size} remaining)`, 777 {}, 778 logger.DebugContext.convo, 779 ) 780 781 const pendingMessage = Array.from(this.pendingMessages.values()).shift() 782 783 /* 784 * If there are no pending messages, we're done. 785 */ 786 if (!pendingMessage) { 787 this.isProcessingPendingMessages = false 788 return 789 } 790 791 try { 792 this.isProcessingPendingMessages = true 793 794 const {id, message} = pendingMessage 795 796 const response = await this.agent.api.chat.bsky.convo.sendMessage( 797 { 798 convoId: this.convoId, 799 message, 800 }, 801 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 802 ) 803 const res = response.data 804 805 // remove from queue 806 this.pendingMessages.delete(id) 807 808 /* 809 * Insert into `newMessages` as soon as we have a real ID. That way, when 810 * we get an event log back, we can replace in situ. 811 */ 812 this.newMessages.set(res.id, { 813 ...res, 814 $type: 'chat.bsky.convo.defs#messageView', 815 }) 816 // render new message state, prior to firehose 817 this.commit() 818 819 // continue queue processing 820 await this.processPendingMessages() 821 } catch (e: any) { 822 logger.error(e, {context: `Convo: failed to send message`}) 823 this.handleSendMessageFailure(e) 824 this.isProcessingPendingMessages = false 825 } 826 } 827 828 private handleSendMessageFailure(e: any) { 829 if (e instanceof XRPCError) { 830 if (NETWORK_FAILURE_STATUSES.includes(e.status)) { 831 this.pendingMessageFailure = 'recoverable' 832 } else { 833 this.pendingMessageFailure = 'unrecoverable' 834 835 switch (e.message) { 836 case 'block between recipient and sender': 837 this.emitter.emit('event', { 838 type: 'invalidate-block-state', 839 accountDids: [ 840 this.sender!.did, 841 ...this.recipients!.map(r => r.did), 842 ], 843 }) 844 break 845 case 'Account is disabled': 846 this.dispatch({event: ConvoDispatchEvent.Disable}) 847 break 848 case 'Convo not found': 849 case 'Account does not exist': 850 case 'recipient does not exist': 851 case 'recipient requires incoming messages to come from someone they follow': 852 case 'recipient has disabled incoming messages': 853 break 854 default: 855 logger.warn( 856 `Convo handleSendMessageFailure could not handle error`, 857 { 858 status: e.status, 859 message: e.message, 860 }, 861 ) 862 break 863 } 864 } 865 } else { 866 this.pendingMessageFailure = 'unrecoverable' 867 logger.error(e, { 868 context: `Convo handleSendMessageFailure received unknown error`, 869 }) 870 } 871 872 this.commit() 873 } 874 875 async batchRetryPendingMessages() { 876 if (this.pendingMessageFailure === null) return 877 878 const messageArray = Array.from(this.pendingMessages.values()) 879 if (messageArray.length === 0) return 880 881 this.pendingMessageFailure = null 882 this.commit() 883 884 logger.debug( 885 `Convo: batch retrying ${this.pendingMessages.size} pending messages`, 886 {}, 887 logger.DebugContext.convo, 888 ) 889 890 try { 891 const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch( 892 { 893 items: messageArray.map(({message}) => ({ 894 convoId: this.convoId, 895 message, 896 })), 897 }, 898 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 899 ) 900 const {items} = data 901 902 /* 903 * Insert into `newMessages` as soon as we have a real ID. That way, when 904 * we get an event log back, we can replace in situ. 905 */ 906 for (const item of items) { 907 this.newMessages.set(item.id, { 908 ...item, 909 $type: 'chat.bsky.convo.defs#messageView', 910 }) 911 } 912 913 for (const pendingMessage of messageArray) { 914 this.pendingMessages.delete(pendingMessage.id) 915 } 916 917 this.commit() 918 919 logger.debug( 920 `Convo: sent ${this.pendingMessages.size} pending messages`, 921 {}, 922 logger.DebugContext.convo, 923 ) 924 } catch (e: any) { 925 logger.error(e, {context: `Convo: failed to batch retry messages`}) 926 this.handleSendMessageFailure(e) 927 } 928 } 929 930 async deleteMessage(messageId: string) { 931 logger.debug('Convo: delete message', {}, logger.DebugContext.convo) 932 933 this.deletedMessages.add(messageId) 934 this.commit() 935 936 try { 937 await networkRetry(2, () => { 938 return this.agent.api.chat.bsky.convo.deleteMessageForSelf( 939 { 940 convoId: this.convoId, 941 messageId, 942 }, 943 {encoding: 'application/json', headers: DM_SERVICE_HEADERS}, 944 ) 945 }) 946 } catch (e: any) { 947 logger.error(e, {context: `Convo: failed to delete message`}) 948 this.deletedMessages.delete(messageId) 949 this.commit() 950 throw e 951 } 952 } 953 954 on(handler: (event: ConvoEvent) => void) { 955 this.emitter.on('event', handler) 956 957 return () => { 958 this.emitter.off('event', handler) 959 } 960 } 961 962 /* 963 * Items in reverse order, since FlatList inverts 964 */ 965 getItems(): ConvoItem[] { 966 const items: ConvoItem[] = [] 967 968 this.pastMessages.forEach(m => { 969 if (ChatBskyConvoDefs.isMessageView(m)) { 970 items.unshift({ 971 type: 'message', 972 key: m.id, 973 message: m, 974 nextMessage: null, 975 }) 976 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { 977 items.unshift({ 978 type: 'deleted-message', 979 key: m.id, 980 message: m, 981 nextMessage: null, 982 }) 983 } 984 }) 985 986 if (this.fetchMessageHistoryError) { 987 items.unshift({ 988 type: 'error', 989 code: ConvoItemError.HistoryFailed, 990 key: ConvoItemError.HistoryFailed, 991 retry: () => { 992 this.maybeRecoverFromNetworkError() 993 }, 994 }) 995 } 996 997 this.newMessages.forEach(m => { 998 if (ChatBskyConvoDefs.isMessageView(m)) { 999 items.push({ 1000 type: 'message', 1001 key: m.id, 1002 message: m, 1003 nextMessage: null, 1004 }) 1005 } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { 1006 items.push({ 1007 type: 'deleted-message', 1008 key: m.id, 1009 message: m, 1010 nextMessage: null, 1011 }) 1012 } 1013 }) 1014 1015 this.pendingMessages.forEach(m => { 1016 items.push({ 1017 type: 'pending-message', 1018 key: m.id, 1019 message: { 1020 ...m.message, 1021 embed: undefined, 1022 $type: 'chat.bsky.convo.defs#messageView', 1023 id: nanoid(), 1024 rev: '__fake__', 1025 sentAt: new Date().toISOString(), 1026 /* 1027 * `getItems` is only run in "active" status states, where 1028 * `this.sender` is defined 1029 */ 1030 sender: this.sender!, 1031 }, 1032 nextMessage: null, 1033 failed: this.pendingMessageFailure !== null, 1034 retry: 1035 this.pendingMessageFailure === 'recoverable' 1036 ? () => { 1037 this.maybeRecoverFromNetworkError() 1038 } 1039 : undefined, 1040 }) 1041 }) 1042 1043 if (this.firehoseError) { 1044 items.push({ 1045 type: 'error', 1046 code: ConvoItemError.FirehoseFailed, 1047 key: ConvoItemError.FirehoseFailed, 1048 retry: () => { 1049 this.firehoseError?.retry() 1050 }, 1051 }) 1052 } 1053 1054 return items 1055 .filter(item => { 1056 if (isConvoItemMessage(item)) { 1057 return !this.deletedMessages.has(item.message.id) 1058 } 1059 return true 1060 }) 1061 .map((item, i, arr) => { 1062 let nextMessage = null 1063 const isMessage = isConvoItemMessage(item) 1064 1065 if (isMessage) { 1066 if ( 1067 isMessage && 1068 (ChatBskyConvoDefs.isMessageView(item.message) || 1069 ChatBskyConvoDefs.isDeletedMessageView(item.message)) 1070 ) { 1071 const next = arr[i + 1] 1072 1073 if ( 1074 isConvoItemMessage(next) && 1075 next && 1076 (ChatBskyConvoDefs.isMessageView(next.message) || 1077 ChatBskyConvoDefs.isDeletedMessageView(next.message)) 1078 ) { 1079 nextMessage = next.message 1080 } 1081 } 1082 1083 return { 1084 ...item, 1085 nextMessage, 1086 } 1087 } 1088 1089 return item 1090 }) 1091 } 1092}