unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at development 272 lines 9.6 kB view raw
1import { Expo } from 'expo-server-sdk' 2import { Follows, Notification, PushNotificationToken, User, UserOptions } from '../../models/index.js' 3import { logger } from '../logger.js' 4import { 5 getNotificationBody, 6 getNotificationTitle, 7 handleDeliveryError, 8 type NotificationBody, 9 type NotificationContext 10} from '../pushNotifications.js' 11import { Job, Queue } from 'bullmq' 12import { Op } from 'sequelize' 13import { getMutedPosts } from '../cacheGetters/getMutedPosts.js' 14import { sendWebPushNotifications } from '../webpush.js' 15import getBlockedIds from '../cacheGetters/getBlockedIds.js' 16import { completeEnvironment } from '../backendOptions.js' 17 18const deliveryCheckQueue = new Queue('checkPushNotificationDelivery', { 19 connection: completeEnvironment.bullmqConnection, 20 defaultJobOptions: { 21 removeOnComplete: true, 22 attempts: 3, 23 backoff: { 24 type: 'exponential', 25 delay: 1000 26 } 27 } 28}) 29 30const websocketQueue = new Queue('updateNotificationsSocket', { 31 connection: completeEnvironment.bullmqConnection, 32 defaultJobOptions: { 33 removeOnComplete: true, 34 attempts: 3, 35 backoff: { 36 type: 'exponential', 37 delay: 1000 38 } 39 } 40}) 41 42const expoClient = new Expo() 43 44type PushNotificationPayload = { 45 notifications: NotificationBody[] 46 context?: NotificationContext 47} 48 49export async function sendPushNotification(job: Job<PushNotificationPayload>) { 50 const { notifications, context } = job.data 51 let notificationsToSend: NotificationBody[] = [] 52 for await (const notification of notifications) { 53 const mutedPosts = new Set( 54 (await getMutedPosts(notification.notifiedUserId, false)).concat( 55 await getMutedPosts(notification.notifiedUserId, true) 56 ) 57 ) 58 if (!mutedPosts.has(notification.postId ? notification.postId : '')) { 59 const blockedUsers = await getBlockedIds(notification.notifiedUserId) // do not push notification if muted user 60 if (notification.userId == notification.notifiedUserId || blockedUsers.includes(notification.userId)) { 61 // this is from a blocked user or same user. do not notify 62 continue 63 } 64 // TODO this part of code is repeated. take it to a function another day 65 const options = await UserOptions.findAll({ 66 where: { 67 userId: notification.notifiedUserId, 68 optionName: { 69 [Op.in]: [ 70 'wafrn.notificationsFrom', 71 'wafrn.notifyMentions', 72 'wafrn.notifyReactions', 73 'wafrn.notifyQuotes', 74 'wafrn.notifyFollows', 75 'wafrn.notifyRewoots', 76 'wafrn.notifyBites' 77 ] 78 } 79 } 80 }) 81 const optionNotificationsFrom = options.find((elem) => elem.optionName == 'wafrn.notificationsFrom') 82 const optionNotifyQuotes = options.find((elem) => elem.optionName == 'wafrn.notifyQuotes') 83 const optionNotifyMentions = options.find((elem) => elem.optionName == 'wafrn.notifyMentions') 84 const optionNotifyReactions = options.find((elem) => elem.optionName == 'wafrn.notifyReactions') 85 const optionNotifyFollows = options.find((elem) => elem.optionName == 'wafrn.notifyFollows') 86 const optionNotifyRewoots = options.find((elem) => elem.optionName == 'wafrn.notifyRewoots') 87 const optionNotifyBites = options.find((elem) => elem.optionName == 'wafrn.notifyBites') 88 89 const notificationTypes = [] 90 if (!optionNotifyQuotes || optionNotifyQuotes.optionValue != 'false') { 91 notificationTypes.push('QUOTE') 92 } 93 if (!optionNotifyMentions || optionNotifyMentions.optionValue != 'false') { 94 notificationTypes.push('MENTION') 95 } 96 if (!optionNotifyReactions || optionNotifyReactions.optionValue != 'false') { 97 notificationTypes.push('EMOJIREACT') 98 notificationTypes.push('LIKE') 99 } 100 if (!optionNotifyFollows || optionNotifyFollows.optionValue != 'false') { 101 notificationTypes.push('FOLLOW') 102 } 103 if (!optionNotifyRewoots || optionNotifyRewoots.optionValue != 'false') { 104 notificationTypes.push('REWOOT') 105 } 106 if (!optionNotifyBites || optionNotifyBites.optionValue != 'false') { 107 notificationTypes.push('POSTBITE') 108 notificationTypes.push('USERBITE') 109 } 110 if (notificationTypes.includes(notification.notificationType)) { 111 if (optionNotificationsFrom && optionNotificationsFrom.optionValue != '1') { 112 let validUsers: string[] = [] 113 switch (optionNotificationsFrom.optionValue) { 114 case '2': // followers 115 validUsers = ( 116 await Follows.findAll({ 117 where: { 118 accepted: true, 119 followedId: notification.notifiedUserId 120 } 121 }) 122 ).map((elem) => elem.followerId) 123 case '3': // followees 124 validUsers = ( 125 await Follows.findAll({ 126 where: { 127 accepted: true, 128 followerId: notification.notifiedUserId 129 } 130 }) 131 ).map((elem) => elem.followedId) 132 case '4': // mutuals 133 const followerIds = ( 134 await Follows.findAll({ 135 where: { 136 accepted: true, 137 followedId: notification.notifiedUserId 138 } 139 }) 140 ).map((elem) => elem.followerId) 141 validUsers = ( 142 await Follows.findAll({ 143 where: { 144 accepted: true, 145 followerId: notification.notifiedUserId, 146 followedId: { 147 [Op.in]: followerIds 148 } 149 } 150 }) 151 ).map((elem) => elem.followedId) 152 if (validUsers.includes(notification.userId)) { 153 notificationsToSend.push(notification) 154 } 155 continue 156 } 157 } else { 158 notificationsToSend.push(notification) 159 continue 160 } 161 } 162 } 163 } 164 165 const users = await User.findAll({ 166 attributes: ['id', 'url'], 167 where: { 168 id: { 169 [Op.in]: notificationsToSend.map((n) => n.notifiedUserId) 170 } 171 } 172 }) 173 174 const userUrlMap = Object.fromEntries(users.map((u) => [u.id, u.url])) 175 for (const notif of notificationsToSend) { 176 const url = userUrlMap[notif.notifiedUserId] 177 if (url) { 178 notif.notifiedUserUrl = url 179 } 180 } 181 182 if (notificationsToSend.length > 0) { 183 await sendWebPushNotifications(notificationsToSend, context) 184 await sendExpoNotifications(notificationsToSend, context) 185 await sendWsNotifications(notificationsToSend, context) 186 } 187} 188 189export async function sendExpoNotifications(notifications: NotificationBody[], context?: NotificationContext) { 190 const userIds = notifications.map((elem) => elem.notifiedUserId) 191 const tokenRows = await PushNotificationToken.findAll({ 192 where: { 193 userId: { 194 [Op.in]: userIds 195 } 196 } 197 }) 198 199 if (tokenRows.length === 0) { 200 return 201 } 202 const payloads = notifications.map((notification) => { 203 const tokens = tokenRows.filter((row) => row.userId === notification.notifiedUserId).map((row) => row.token) 204 205 // send the same notification to all the devices of each notified user 206 return { 207 to: tokens, 208 sound: 'default', 209 title: getNotificationTitle(notification, context), 210 body: getNotificationBody(notification, context), 211 data: { notification, context } 212 } 213 }) 214 215 // this will chunk the payloads into chunks of 1000 (max) and compress notifications with similar content 216 const okTickets = [] 217 const filteredPayloads: { 218 to: any[] 219 sound: string 220 title: string 221 body: string 222 data: { 223 notification: NotificationBody 224 context: NotificationContext | undefined 225 } 226 }[] = [] 227 for await (const payload of payloads) { 228 const mutedPosts = (await getMutedPosts(payload.data.notification.notifiedUserId, false)).concat( 229 await getMutedPosts(payload.data.notification.notifiedUserId, true) 230 ) 231 if (!mutedPosts.includes(payload.data.notification.postId as string)) { 232 filteredPayloads.push(payload) 233 } 234 } 235 const chunks = expoClient.chunkPushNotifications(filteredPayloads) 236 for (const chunk of chunks) { 237 const responses = await expoClient.sendPushNotificationsAsync(chunk) 238 for (const response of responses) { 239 if (response.status === 'ok') { 240 okTickets.push(response.id) 241 } else { 242 await handleDeliveryError(response) 243 } 244 } 245 } 246 247 await scheduleNotificationCheck(okTickets) 248} 249 250// schedule a job to check the delivery of the notifications after 30 minutes of being sent 251// this guarantees that the notification was delivered to the messaging services even in cases of high load 252function scheduleNotificationCheck(ticketIds: string[]) { 253 const delay = 1000 * 60 * 30 // 30 minutes 254 return deliveryCheckQueue.add('checkPushNotificationDelivery', { ticketIds }, { delay }) 255} 256 257async function sendWsNotifications(notifications: NotificationBody[], context?: NotificationContext) { 258 await websocketQueue.addBulk( 259 notifications.map((elem) => { 260 // we just tell the user to update the notifications 261 return { 262 name: 'updateNotificationsSocket', 263 data: { 264 userId: elem.notifiedUserId, 265 type: elem.notificationType, 266 from: elem.userId, 267 postId: elem.postId ? elem.postId : '' 268 } 269 } 270 }) 271 ) 272}