unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at angular21 257 lines 9.2 kB view raw
1import { Expo } from 'expo-server-sdk' 2import { Follows, Notification, PushNotificationToken, UserOptions } from '../../models/index.js' 3import { logger } from '../logger.js' 4import { 5 getNotificationBody, 6 getNotificationTitle, 7 handleDeliveryError, 8 type NotificationBody, 9 type NotificationContext 10} from '../pushNotifications.js' 11import { Job, Queue } from 'bullmq' 12import { Op } from 'sequelize' 13import { getMutedPosts } from '../cacheGetters/getMutedPosts.js' 14import { sendWebPushNotifications } from '../webpush.js' 15import getBlockedIds from '../cacheGetters/getBlockedIds.js' 16import { completeEnvironment } from '../backendOptions.js' 17 18const deliveryCheckQueue = new Queue('checkPushNotificationDelivery', { 19 connection: completeEnvironment.bullmqConnection, 20 defaultJobOptions: { 21 removeOnComplete: true, 22 attempts: 3, 23 backoff: { 24 type: 'exponential', 25 delay: 1000 26 } 27 } 28}) 29 30const websocketQueue = new Queue('updateNotificationsSocket', { 31 connection: completeEnvironment.bullmqConnection, 32 defaultJobOptions: { 33 removeOnComplete: true, 34 attempts: 3, 35 backoff: { 36 type: 'exponential', 37 delay: 1000 38 } 39 } 40}) 41 42const expoClient = new Expo() 43 44type PushNotificationPayload = { 45 notifications: NotificationBody[] 46 context?: NotificationContext 47} 48 49export async function sendPushNotification(job: Job<PushNotificationPayload>) { 50 const { notifications, context } = job.data 51 let notificationsToSend: NotificationBody[] = [] 52 for await (const notification of notifications) { 53 const mutedPosts = new Set( 54 (await getMutedPosts(notification.notifiedUserId, false)).concat( 55 await getMutedPosts(notification.notifiedUserId, true) 56 ) 57 ) 58 if (!mutedPosts.has(notification.postId ? notification.postId : '')) { 59 const blockedUsers = await getBlockedIds(notification.notifiedUserId) // do not push notification if muted user 60 if (notification.userId == notification.notifiedUserId || blockedUsers.includes(notification.userId)) { 61 // this is from a blocked user or same user. do not notify 62 continue 63 } 64 // TODO this part of code is repeated. take it to a function another day 65 const options = await UserOptions.findAll({ 66 where: { 67 userId: notification.notifiedUserId, 68 optionName: { 69 [Op.in]: [ 70 'wafrn.notificationsFrom', 71 'wafrn.notifyMentions', 72 'wafrn.notifyReactions', 73 'wafrn.notifyQuotes', 74 'wafrn.notifyFollows', 75 'wafrn.notifyRewoots', 76 'wafrn.notifyBites' 77 78 ] 79 } 80 } 81 }) 82 const optionNotificationsFrom = options.find((elem) => elem.optionName == 'wafrn.notificationsFrom') 83 const optionNotifyQuotes = options.find((elem) => elem.optionName == 'wafrn.notifyQuotes') 84 const optionNotifyMentions = options.find((elem) => elem.optionName == 'wafrn.notifyMentions') 85 const optionNotifyReactions = options.find((elem) => elem.optionName == 'wafrn.notifyReactions') 86 const optionNotifyFollows = options.find((elem) => elem.optionName == 'wafrn.notifyFollows') 87 const optionNotifyRewoots = options.find((elem) => elem.optionName == 'wafrn.notifyRewoots') 88 const optionNotifyBites = options.find((elem) => elem.optionName == 'wafrn.notifyBites') 89 90 const notificationTypes = [] 91 if (!optionNotifyQuotes || optionNotifyQuotes.optionValue != 'false') { 92 notificationTypes.push('QUOTE') 93 } 94 if (!optionNotifyMentions || optionNotifyMentions.optionValue != 'false') { 95 notificationTypes.push('MENTION') 96 } 97 if (!optionNotifyReactions || optionNotifyReactions.optionValue != 'false') { 98 notificationTypes.push('EMOJIREACT') 99 notificationTypes.push('LIKE') 100 101 } 102 if (!optionNotifyFollows || optionNotifyFollows.optionValue != 'false') { 103 notificationTypes.push('FOLLOW') 104 } 105 if (!optionNotifyRewoots || optionNotifyRewoots.optionValue != 'false') { 106 notificationTypes.push('REWOOT') 107 } 108 if(!optionNotifyBites || optionNotifyBites.optionValue != 'false') { 109 notificationTypes.push('POSTBITE') 110 notificationTypes.push('USERBITE') 111 } 112 if (notificationTypes.includes(notification.notificationType)) { 113 if (optionNotificationsFrom && optionNotificationsFrom.optionValue != '1') { 114 let validUsers: string[] = [] 115 switch (optionNotificationsFrom.optionValue) { 116 case '2': // followers 117 validUsers = ( 118 await Follows.findAll({ 119 where: { 120 accepted: true, 121 followedId: notification.notifiedUserId 122 } 123 }) 124 ).map((elem) => elem.followerId) 125 case '3': // followees 126 validUsers = ( 127 await Follows.findAll({ 128 where: { 129 accepted: true, 130 followerId: notification.notifiedUserId 131 } 132 }) 133 ).map((elem) => elem.followedId) 134 case '4': // mutuals 135 const followerIds = ( 136 await Follows.findAll({ 137 where: { 138 accepted: true, 139 followedId: notification.notifiedUserId 140 } 141 }) 142 ).map((elem) => elem.followerId) 143 validUsers = ( 144 await Follows.findAll({ 145 where: { 146 accepted: true, 147 followerId: notification.notifiedUserId, 148 followedId: { 149 [Op.in]: followerIds 150 } 151 } 152 }) 153 ).map((elem) => elem.followedId) 154 if (validUsers.includes(notification.userId)) { 155 notificationsToSend.push(notification) 156 } 157 continue 158 } 159 } else { 160 notificationsToSend.push(notification) 161 continue 162 } 163 } 164 } 165 } 166 167 if (notificationsToSend.length > 0) { 168 await sendWebPushNotifications(notificationsToSend, context) 169 await sendExpoNotifications(notificationsToSend, context) 170 await sendWsNotifications(notificationsToSend, context) 171 } 172} 173 174export async function sendExpoNotifications(notifications: NotificationBody[], context?: NotificationContext) { 175 const userIds = notifications.map((elem) => elem.notifiedUserId) 176 const tokenRows = await PushNotificationToken.findAll({ 177 where: { 178 userId: { 179 [Op.in]: userIds 180 } 181 } 182 }) 183 184 if (tokenRows.length === 0) { 185 return 186 } 187 const payloads = notifications.map((notification) => { 188 const tokens = tokenRows.filter((row) => row.userId === notification.notifiedUserId).map((row) => row.token) 189 190 // send the same notification to all the devices of each notified user 191 return { 192 to: tokens, 193 sound: 'default', 194 title: getNotificationTitle(notification, context), 195 body: getNotificationBody(notification, context), 196 data: { notification, context } 197 } 198 }) 199 200 // this will chunk the payloads into chunks of 1000 (max) and compress notifications with similar content 201 const okTickets = [] 202 const filteredPayloads: { 203 to: any[] 204 sound: string 205 title: string 206 body: string 207 data: { 208 notification: NotificationBody 209 context: NotificationContext | undefined 210 } 211 }[] = [] 212 for await (const payload of payloads) { 213 const mutedPosts = (await getMutedPosts(payload.data.notification.notifiedUserId, false)).concat( 214 await getMutedPosts(payload.data.notification.notifiedUserId, true) 215 ) 216 if (!mutedPosts.includes(payload.data.notification.postId as string)) { 217 filteredPayloads.push(payload) 218 } 219 } 220 const chunks = expoClient.chunkPushNotifications(filteredPayloads) 221 for (const chunk of chunks) { 222 const responses = await expoClient.sendPushNotificationsAsync(chunk) 223 for (const response of responses) { 224 if (response.status === 'ok') { 225 okTickets.push(response.id) 226 } else { 227 await handleDeliveryError(response) 228 } 229 } 230 } 231 232 await scheduleNotificationCheck(okTickets) 233} 234 235// schedule a job to check the delivery of the notifications after 30 minutes of being sent 236// this guarantees that the notification was delivered to the messaging services even in cases of high load 237function scheduleNotificationCheck(ticketIds: string[]) { 238 const delay = 1000 * 60 * 30 // 30 minutes 239 return deliveryCheckQueue.add('checkPushNotificationDelivery', { ticketIds }, { delay }) 240} 241 242async function sendWsNotifications(notifications: NotificationBody[], context?: NotificationContext) { 243 await websocketQueue.addBulk( 244 notifications.map((elem) => { 245 // we just tell the user to update the notifications 246 return { 247 name: 'updateNotificationsSocket', 248 data: { 249 userId: elem.notifiedUserId, 250 type: elem.notificationType, 251 from: elem.userId, 252 postId: elem.postId ? elem.postId : '' 253 } 254 } 255 }) 256 ) 257}