unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at testPDSNotExplode 249 lines 8.9 kB view raw
1import { Expo } from 'expo-server-sdk' 2import { Follows, Notification, PushNotificationToken, UserOptions } from '../../models/index.js' 3import { logger } from '../logger.js' 4import { 5 getNotificationBody, 6 getNotificationTitle, 7 handleDeliveryError, 8 type NotificationBody, 9 type NotificationContext 10} from '../pushNotifications.js' 11import { Job, Queue } from 'bullmq' 12import { Op } from 'sequelize' 13import { getMutedPosts } from '../cacheGetters/getMutedPosts.js' 14import { sendWebPushNotifications } from '../webpush.js' 15import getBlockedIds from '../cacheGetters/getBlockedIds.js' 16import { completeEnvironment } from '../backendOptions.js' 17 18const deliveryCheckQueue = new Queue('checkPushNotificationDelivery', { 19 connection: completeEnvironment.bullmqConnection, 20 defaultJobOptions: { 21 removeOnComplete: true, 22 attempts: 3, 23 backoff: { 24 type: 'exponential', 25 delay: 1000 26 } 27 } 28}) 29 30const websocketQueue = new Queue('updateNotificationsSocket', { 31 connection: completeEnvironment.bullmqConnection, 32 defaultJobOptions: { 33 removeOnComplete: true, 34 attempts: 3, 35 backoff: { 36 type: 'exponential', 37 delay: 1000 38 } 39 } 40}) 41 42const expoClient = new Expo() 43 44type PushNotificationPayload = { 45 notifications: NotificationBody[] 46 context?: NotificationContext 47} 48 49export async function sendPushNotification(job: Job<PushNotificationPayload>) { 50 const { notifications, context } = job.data 51 let notificationsToSend: NotificationBody[] = [] 52 for await (const notification of notifications) { 53 const mutedPosts = new Set( 54 (await getMutedPosts(notification.notifiedUserId, false)).concat( 55 await getMutedPosts(notification.notifiedUserId, true) 56 ) 57 ) 58 if (!mutedPosts.has(notification.postId ? notification.postId : '')) { 59 const blockedUsers = await getBlockedIds(notification.notifiedUserId) // do not push notification if muted user 60 if (notification.userId == notification.notifiedUserId || blockedUsers.includes(notification.userId)) { 61 // this is from a blocked user or same user. do not notify 62 continue 63 } 64 // TODO this part of code is repeated. take it to a function another day 65 const options = await UserOptions.findAll({ 66 where: { 67 userId: notification.notifiedUserId, 68 optionName: { 69 [Op.in]: [ 70 'wafrn.notificationsFrom', 71 'wafrn.notifyMentions', 72 'wafrn.notifyReactions', 73 'wafrn.notifyQuotes', 74 'wafrn.notifyFollows', 75 'wafrn.notifyRewoots' 76 ] 77 } 78 } 79 }) 80 const optionNotificationsFrom = options.find((elem) => elem.optionName == 'wafrn.notificationsFrom') 81 const optionNotifyQuotes = options.find((elem) => elem.optionName == 'wafrn.notifyQuotes') 82 const optionNotifyMentions = options.find((elem) => elem.optionName == 'wafrn.notifyMentions') 83 const optionNotifyReactions = options.find((elem) => elem.optionName == 'wafrn.notifyReactions') 84 const optionNotifyFollows = options.find((elem) => elem.optionName == 'wafrn.notifyFollows') 85 const optionNotifyRewoots = options.find((elem) => elem.optionName == 'wafrn.notifyRewoots') 86 87 const notificationTypes = [] 88 if (!optionNotifyQuotes || optionNotifyQuotes.optionValue != 'false') { 89 notificationTypes.push('QUOTE') 90 } 91 if (!optionNotifyMentions || optionNotifyMentions.optionValue != 'false') { 92 notificationTypes.push('MENTION') 93 } 94 if (!optionNotifyReactions || optionNotifyReactions.optionValue != 'false') { 95 notificationTypes.push('EMOJIREACT') 96 notificationTypes.push('LIKE') 97 } 98 if (!optionNotifyFollows || optionNotifyFollows.optionValue != 'false') { 99 notificationTypes.push('FOLLOW') 100 } 101 if (!optionNotifyRewoots || optionNotifyRewoots.optionValue != 'false') { 102 notificationTypes.push('REWOOT') 103 } 104 if (notificationTypes.includes(notification.notificationType)) { 105 if (optionNotificationsFrom && optionNotificationsFrom.optionValue != '1') { 106 let validUsers: string[] = [] 107 switch (optionNotificationsFrom.optionValue) { 108 case '2': // followers 109 validUsers = ( 110 await Follows.findAll({ 111 where: { 112 accepted: true, 113 followedId: notification.notifiedUserId 114 } 115 }) 116 ).map((elem) => elem.followerId) 117 case '3': // followees 118 validUsers = ( 119 await Follows.findAll({ 120 where: { 121 accepted: true, 122 followerId: notification.notifiedUserId 123 } 124 }) 125 ).map((elem) => elem.followedId) 126 case '4': // mutuals 127 const followerIds = ( 128 await Follows.findAll({ 129 where: { 130 accepted: true, 131 followedId: notification.notifiedUserId 132 } 133 }) 134 ).map((elem) => elem.followerId) 135 validUsers = ( 136 await Follows.findAll({ 137 where: { 138 accepted: true, 139 followerId: notification.notifiedUserId, 140 followedId: { 141 [Op.in]: followerIds 142 } 143 } 144 }) 145 ).map((elem) => elem.followedId) 146 if (validUsers.includes(notification.userId)) { 147 notificationsToSend.push(notification) 148 } 149 continue 150 } 151 } else { 152 notificationsToSend.push(notification) 153 continue 154 } 155 } 156 } 157 } 158 159 if (notificationsToSend.length > 0) { 160 await sendWebPushNotifications(notificationsToSend, context) 161 await sendExpoNotifications(notificationsToSend, context) 162 await sendWsNotifications(notificationsToSend, context) 163 } 164} 165 166export async function sendExpoNotifications(notifications: NotificationBody[], context?: NotificationContext) { 167 const userIds = notifications.map((elem) => elem.notifiedUserId) 168 const tokenRows = await PushNotificationToken.findAll({ 169 where: { 170 userId: { 171 [Op.in]: userIds 172 } 173 } 174 }) 175 176 if (tokenRows.length === 0) { 177 return 178 } 179 const payloads = notifications.map((notification) => { 180 const tokens = tokenRows.filter((row) => row.userId === notification.notifiedUserId).map((row) => row.token) 181 182 // send the same notification to all the devices of each notified user 183 return { 184 to: tokens, 185 sound: 'default', 186 title: getNotificationTitle(notification, context), 187 body: getNotificationBody(notification, context), 188 data: { notification, context } 189 } 190 }) 191 192 // this will chunk the payloads into chunks of 1000 (max) and compress notifications with similar content 193 const okTickets = [] 194 const filteredPayloads: { 195 to: any[] 196 sound: string 197 title: string 198 body: string 199 data: { 200 notification: NotificationBody 201 context: NotificationContext | undefined 202 } 203 }[] = [] 204 for await (const payload of payloads) { 205 const mutedPosts = (await getMutedPosts(payload.data.notification.notifiedUserId, false)).concat( 206 await getMutedPosts(payload.data.notification.notifiedUserId, true) 207 ) 208 if (!mutedPosts.includes(payload.data.notification.postId as string)) { 209 filteredPayloads.push(payload) 210 } 211 } 212 const chunks = expoClient.chunkPushNotifications(filteredPayloads) 213 for (const chunk of chunks) { 214 const responses = await expoClient.sendPushNotificationsAsync(chunk) 215 for (const response of responses) { 216 if (response.status === 'ok') { 217 okTickets.push(response.id) 218 } else { 219 await handleDeliveryError(response) 220 } 221 } 222 } 223 224 await scheduleNotificationCheck(okTickets) 225} 226 227// schedule a job to check the delivery of the notifications after 30 minutes of being sent 228// this guarantees that the notification was delivered to the messaging services even in cases of high load 229function scheduleNotificationCheck(ticketIds: string[]) { 230 const delay = 1000 * 60 * 30 // 30 minutes 231 return deliveryCheckQueue.add('checkPushNotificationDelivery', { ticketIds }, { delay }) 232} 233 234async function sendWsNotifications(notifications: NotificationBody[], context?: NotificationContext) { 235 await websocketQueue.addBulk( 236 notifications.map((elem) => { 237 // we just tell the user to update the notifications 238 return { 239 name: 'updateNotificationsSocket', 240 data: { 241 userId: elem.notifiedUserId, 242 type: elem.notificationType, 243 from: elem.userId, 244 postId: elem.postId ? elem.postId : '' 245 } 246 } 247 }) 248 ) 249}