unoffical wafrn mirror wafrn.net
atproto social-network activitypub

move push notifications to queues

+180 -103
+17 -101
packages/backend/utils/pushNotifications.ts
··· 1 - import { ExpoPushErrorTicket } from "expo-server-sdk" 2 - import { Expo } from "expo-server-sdk" 1 + import { Expo, type ExpoPushErrorTicket } from "expo-server-sdk" 3 2 import { logger } from "./logger.js" 4 3 import { Notification, PushNotificationToken } from "../db.js" 4 + import { Queue } from "bullmq" 5 + import { environment } from "../environment.js" 6 + 7 + const sendPushNotificationQueue = new Queue('sendPushNotification', { 8 + connection: environment.bullmqConnection, 9 + defaultJobOptions: { 10 + removeOnComplete: true, 11 + attempts: 3, 12 + backoff: { 13 + type: 'exponential', 14 + delay: 1000 15 + } 16 + } 17 + }) 5 18 6 19 export type NotificationBody = { 7 20 notifiedUserId: string ··· 30 43 export async function createNotification(notification: NotificationBody, context?: NotificationContext) { 31 44 await Promise.all([ 32 45 Notification.create(notification), 33 - sendNotification(notification, context) 46 + sendPushNotificationQueue.add('sendPushNotification', { notification, context }) 34 47 ]) 35 48 } 36 49 37 50 // Error codes reference: https://docs.expo.io/push-notifications/sending-notifications/#individual-errors 38 - async function handleDeliveryError(response: ExpoPushErrorTicket) { 51 + export async function handleDeliveryError(response: ExpoPushErrorTicket) { 39 52 logger.error(response) 40 53 const error = response.details?.error 41 54 ··· 47 60 } 48 61 } 49 62 } 50 - 51 - const verbMap = { 52 - LIKE: 'liked', 53 - REWOOT: 'rewooted', 54 - MENTION: 'replied to', 55 - QUOTE: 'quoted', 56 - EMOJIREACT: 'reacted to', 57 - } 58 - 59 - function getNotificationTitle(notification: NotificationBody, context?: NotificationContext) { 60 - if (notification.notificationType === 'FOLLOW') { 61 - return 'New user followed you' 62 - } 63 - 64 - if (notification.notificationType === 'EMOJIREACT' && context?.emoji) { 65 - return `${context?.userUrl || 'someone'} reacted with ${context.emoji} to your post` 66 - } 67 - 68 - return `${context?.userUrl || 'someone'} ${verbMap[notification.notificationType]} your post` 69 - } 70 - 71 - function getNotificationBody(notification: NotificationBody, context?: NotificationContext) { 72 - if (notification.notificationType === 'FOLLOW') { 73 - return context?.userUrl ? `@${context?.userUrl.replace(/^@/, '')}` : '' 74 - } 75 - 76 - return `${context?.postContent}` 77 - } 78 - 79 - async function sendNotification(notification: NotificationBody, context?: NotificationContext) { 80 - const userId = notification.notifiedUserId 81 - const tokenRows = await PushNotificationToken.findAll({ 82 - where: { 83 - userId 84 - } 85 - }) 86 - 87 - if (tokenRows.length === 0) { 88 - return 89 - } 90 - 91 - const payloads = tokenRows.map((row) => ({ 92 - to: row.token, 93 - sound: 'default', 94 - title: getNotificationTitle(notification, context), 95 - body: getNotificationBody(notification, context), 96 - data: notification 97 - })) 98 - 99 - // this will chunk the payloads into chunks of 1000 (max) and compress notifications with similar content 100 - const chunks = expoClient.chunkPushNotifications(payloads) 101 - const okTickets = [] 102 - 103 - // TODO: handle in a queue with retry logic and exponential backoff 104 - for (const chunk of chunks) { 105 - try { 106 - const responses = await expoClient.sendPushNotificationsAsync(chunk) 107 - for (const response of responses) { 108 - if (response.status === 'ok') { 109 - okTickets.push(response.id) 110 - } else { 111 - await handleDeliveryError(response) 112 - } 113 - } 114 - } catch (error) { 115 - logger.error(error) 116 - // TODO: retry sending the notification after some time 117 - } 118 - } 119 - 120 - scheduleNotificationCheck(okTickets) 121 - } 122 - 123 - function scheduleNotificationCheck(ticketIds: string[]) { 124 - // TODO: enqueue a task in the queue to check that the okTickets are actually ok and were delivered to the device 125 - } 126 - 127 - export async function checkNotificationDelivery(ticketIds: string[]) { 128 - let receiptIdChunks = expoClient.chunkPushNotificationReceiptIds(ticketIds); 129 - for (const chunk of receiptIdChunks) { 130 - try { 131 - const receipts = await expoClient.getPushNotificationReceiptsAsync(chunk) 132 - 133 - // The receipts specify whether Apple or Google successfully received the 134 - // notification and information about an error, if one occurred. 135 - for (const receiptId in receipts) { 136 - const receipt = receipts[receiptId] 137 - if (receipt.status === 'error') { 138 - await handleDeliveryError(receipt) 139 - } 140 - } 141 - } catch (error) { 142 - // TODO: retry checking the delivery of the notification after some time 143 - logger.error(error) 144 - } 145 - } 146 - }
+28
packages/backend/utils/queueProcessors/checkPushNotificationDelivery.ts
··· 1 + import { Job } from "bullmq"; 2 + import { logger } from "../logger.js"; 3 + import { handleDeliveryError } from "../pushNotifications.js"; 4 + import { Expo } from "expo-server-sdk"; 5 + 6 + const expoClient = new Expo() 7 + 8 + export async function checkPushNotificationDelivery(job: Job<{ ticketIds: string[] }>) { 9 + const { ticketIds } = job.data 10 + let receiptIdChunks = expoClient.chunkPushNotificationReceiptIds(ticketIds); 11 + for (const chunk of receiptIdChunks) { 12 + try { 13 + const receipts = await expoClient.getPushNotificationReceiptsAsync(chunk) 14 + 15 + // The receipts specify whether Apple or Google successfully received the 16 + // notification and information about an error, if one occurred. 17 + for (const receiptId in receipts) { 18 + const receipt = receipts[receiptId] 19 + if (receipt.status === 'error') { 20 + await handleDeliveryError(receipt) 21 + } 22 + } 23 + } catch (error) { 24 + // TODO: retry checking the delivery of the notification after some time 25 + logger.error(error) 26 + } 27 + } 28 + }
+103
packages/backend/utils/queueProcessors/sendPushNotification.ts
··· 1 + import { Expo } from "expo-server-sdk" 2 + import { PushNotificationToken } from "../../db.js" 3 + import { logger } from "../logger.js" 4 + import { handleDeliveryError, type NotificationBody, type NotificationContext } from "../pushNotifications.js" 5 + import { Job, Queue } from "bullmq" 6 + import { environment } from "../../environment.js" 7 + 8 + const deliveryCheckQueue = new Queue('checkPushNotificationDelivery', { 9 + connection: environment.bullmqConnection, 10 + defaultJobOptions: { 11 + removeOnComplete: true, 12 + attempts: 3, 13 + backoff: { 14 + type: 'exponential', 15 + delay: 1000 16 + } 17 + } 18 + }) 19 + 20 + const verbMap = { 21 + LIKE: 'liked', 22 + REWOOT: 'rewooted', 23 + MENTION: 'replied to', 24 + QUOTE: 'quoted', 25 + EMOJIREACT: 'reacted to', 26 + } 27 + 28 + const expoClient = new Expo() 29 + 30 + function getNotificationTitle(notification: NotificationBody, context?: NotificationContext) { 31 + if (notification.notificationType === 'FOLLOW') { 32 + return 'New user followed you' 33 + } 34 + 35 + if (notification.notificationType === 'EMOJIREACT' && context?.emoji) { 36 + return `${context?.userUrl || 'someone'} reacted with ${context.emoji} to your post` 37 + } 38 + 39 + return `${context?.userUrl || 'someone'} ${verbMap[notification.notificationType]} your post` 40 + } 41 + 42 + function getNotificationBody(notification: NotificationBody, context?: NotificationContext) { 43 + if (notification.notificationType === 'FOLLOW') { 44 + return context?.userUrl ? `@${context?.userUrl.replace(/^@/, '')}` : '' 45 + } 46 + 47 + return `${context?.postContent}` 48 + } 49 + 50 + type PushNotificationPayload = { 51 + notification: NotificationBody 52 + context?: NotificationContext 53 + } 54 + 55 + export async function sendPushNotification(job: Job<PushNotificationPayload>) { 56 + const { notification, context } = job.data 57 + const userId = notification.notifiedUserId 58 + const tokenRows = await PushNotificationToken.findAll({ 59 + where: { 60 + userId 61 + } 62 + }) 63 + 64 + if (tokenRows.length === 0) { 65 + return 66 + } 67 + 68 + const payloads = tokenRows.map((row) => ({ 69 + to: row.token, 70 + sound: 'default', 71 + title: getNotificationTitle(notification, context), 72 + body: getNotificationBody(notification, context), 73 + data: notification 74 + })) 75 + 76 + // this will chunk the payloads into chunks of 1000 (max) and compress notifications with similar content 77 + const chunks = expoClient.chunkPushNotifications(payloads) 78 + const okTickets = [] 79 + 80 + // TODO: handle in a queue with retry logic and exponential backoff 81 + for (const chunk of chunks) { 82 + try { 83 + const responses = await expoClient.sendPushNotificationsAsync(chunk) 84 + for (const response of responses) { 85 + if (response.status === 'ok') { 86 + okTickets.push(response.id) 87 + } else { 88 + await handleDeliveryError(response) 89 + } 90 + } 91 + } catch (error) { 92 + logger.error(error) 93 + // TODO: retry sending the notification after some time 94 + } 95 + } 96 + 97 + await scheduleNotificationCheck(okTickets) 98 + } 99 + 100 + function scheduleNotificationCheck(ticketIds: string[]) { 101 + const delay = 1000 * 60 * 30 // 30 minutes 102 + return deliveryCheckQueue.add('checkPushNotificationDelivery', { ticketIds}, { delay }) 103 + }
+32 -2
packages/backend/utils/workers.ts
··· 8 8 import { processRemotePostView } from './queueProcessors/processRemotePostView.js' 9 9 import { processRemoteMedia } from './queueProcessors/remoteMediaProcessor.js' 10 10 import { processFirehose } from '../atproto/workers/processFirehoseWorker.js' 11 + import { sendPushNotification } from './queueProcessors/sendPushNotification.js' 12 + import { checkPushNotificationDelivery } from './queueProcessors/checkPushNotificationDelivery.js' 11 13 12 14 logger.info('starting workers') 13 15 const workerInbox = new Worker('inbox', (job: Job) => inboxWorker(job), { ··· 92 94 }) 93 95 : null 94 96 97 + const workerSendPushNotification = new Worker( 98 + 'sendPushNotification', 99 + async (job: Job) => await sendPushNotification(job), 100 + { 101 + connection: environment.bullmqConnection, 102 + metrics: { 103 + maxDataPoints: MetricsTime.ONE_WEEK * 2 104 + }, 105 + concurrency: environment.workers.medium, 106 + } 107 + ) 108 + 109 + const workerCheckPushNotificationDelivery = new Worker( 110 + 'checkPushNotificationDelivery', 111 + async (job: Job) => await checkPushNotificationDelivery(job), 112 + { 113 + connection: environment.bullmqConnection, 114 + metrics: { 115 + maxDataPoints: MetricsTime.ONE_WEEK * 2 116 + }, 117 + concurrency: environment.workers.medium, 118 + } 119 + ) 120 + 95 121 const workers = [ 96 122 workerInbox, 97 123 workerDeletePost, ··· 100 126 workerProcessRemotePostView, 101 127 workerSendPostChunk, 102 128 workerProcessRemotePostView, 103 - workerProcessRemoteMediaData 129 + workerProcessRemoteMediaData, 130 + workerSendPushNotification, 131 + workerCheckPushNotificationDelivery 104 132 ] 105 133 if (environment.enableBsky) { 106 134 workers.push(workerProcessFirehose as Worker) ··· 148 176 workerDeletePost, 149 177 workerProcessRemotePostView, 150 178 workerProcessRemoteMediaData, 151 - workerProcessFirehose 179 + workerProcessFirehose, 180 + workerSendPushNotification, 181 + workerCheckPushNotificationDelivery 152 182 }