unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Expo } from 'expo-server-sdk'
2import { Follows, Notification, PushNotificationToken, User, UserOptions } from '../../models/index.js'
3import { logger } from '../logger.js'
4import {
5 getNotificationBody,
6 getNotificationTitle,
7 handleDeliveryError,
8 type NotificationBody,
9 type NotificationContext
10} from '../pushNotifications.js'
11import { Job, Queue } from 'bullmq'
12import { Op } from 'sequelize'
13import { getMutedPosts } from '../cacheGetters/getMutedPosts.js'
14import { sendWebPushNotifications } from '../webpush.js'
15import getBlockedIds from '../cacheGetters/getBlockedIds.js'
16import { completeEnvironment } from '../backendOptions.js'
17
18const deliveryCheckQueue = new Queue('checkPushNotificationDelivery', {
19 connection: completeEnvironment.bullmqConnection,
20 defaultJobOptions: {
21 removeOnComplete: true,
22 attempts: 3,
23 backoff: {
24 type: 'exponential',
25 delay: 1000
26 }
27 }
28})
29
30const websocketQueue = new Queue('updateNotificationsSocket', {
31 connection: completeEnvironment.bullmqConnection,
32 defaultJobOptions: {
33 removeOnComplete: true,
34 attempts: 3,
35 backoff: {
36 type: 'exponential',
37 delay: 1000
38 }
39 }
40})
41
42const expoClient = new Expo()
43
44type PushNotificationPayload = {
45 notifications: NotificationBody[]
46 context?: NotificationContext
47}
48
49export async function sendPushNotification(job: Job<PushNotificationPayload>) {
50 const { notifications, context } = job.data
51 let notificationsToSend: NotificationBody[] = []
52 for await (const notification of notifications) {
53 const mutedPosts = new Set(
54 (await getMutedPosts(notification.notifiedUserId, false)).concat(
55 await getMutedPosts(notification.notifiedUserId, true)
56 )
57 )
58 if (!mutedPosts.has(notification.postId ? notification.postId : '')) {
59 const blockedUsers = await getBlockedIds(notification.notifiedUserId) // do not push notification if muted user
60 if (notification.userId == notification.notifiedUserId || blockedUsers.includes(notification.userId)) {
61 // this is from a blocked user or same user. do not notify
62 continue
63 }
64 // TODO this part of code is repeated. take it to a function another day
65 const options = await UserOptions.findAll({
66 where: {
67 userId: notification.notifiedUserId,
68 optionName: {
69 [Op.in]: [
70 'wafrn.notificationsFrom',
71 'wafrn.notifyMentions',
72 'wafrn.notifyReactions',
73 'wafrn.notifyQuotes',
74 'wafrn.notifyFollows',
75 'wafrn.notifyRewoots',
76 'wafrn.notifyBites'
77 ]
78 }
79 }
80 })
81 const optionNotificationsFrom = options.find((elem) => elem.optionName == 'wafrn.notificationsFrom')
82 const optionNotifyQuotes = options.find((elem) => elem.optionName == 'wafrn.notifyQuotes')
83 const optionNotifyMentions = options.find((elem) => elem.optionName == 'wafrn.notifyMentions')
84 const optionNotifyReactions = options.find((elem) => elem.optionName == 'wafrn.notifyReactions')
85 const optionNotifyFollows = options.find((elem) => elem.optionName == 'wafrn.notifyFollows')
86 const optionNotifyRewoots = options.find((elem) => elem.optionName == 'wafrn.notifyRewoots')
87 const optionNotifyBites = options.find((elem) => elem.optionName == 'wafrn.notifyBites')
88
89 const notificationTypes = []
90 if (!optionNotifyQuotes || optionNotifyQuotes.optionValue != 'false') {
91 notificationTypes.push('QUOTE')
92 }
93 if (!optionNotifyMentions || optionNotifyMentions.optionValue != 'false') {
94 notificationTypes.push('MENTION')
95 }
96 if (!optionNotifyReactions || optionNotifyReactions.optionValue != 'false') {
97 notificationTypes.push('EMOJIREACT')
98 notificationTypes.push('LIKE')
99 }
100 if (!optionNotifyFollows || optionNotifyFollows.optionValue != 'false') {
101 notificationTypes.push('FOLLOW')
102 }
103 if (!optionNotifyRewoots || optionNotifyRewoots.optionValue != 'false') {
104 notificationTypes.push('REWOOT')
105 }
106 if (!optionNotifyBites || optionNotifyBites.optionValue != 'false') {
107 notificationTypes.push('POSTBITE')
108 notificationTypes.push('USERBITE')
109 }
110 if (notificationTypes.includes(notification.notificationType)) {
111 if (optionNotificationsFrom && optionNotificationsFrom.optionValue != '1') {
112 let validUsers: string[] = []
113 switch (optionNotificationsFrom.optionValue) {
114 case '2': // followers
115 validUsers = (
116 await Follows.findAll({
117 where: {
118 accepted: true,
119 followedId: notification.notifiedUserId
120 }
121 })
122 ).map((elem) => elem.followerId)
123 case '3': // followees
124 validUsers = (
125 await Follows.findAll({
126 where: {
127 accepted: true,
128 followerId: notification.notifiedUserId
129 }
130 })
131 ).map((elem) => elem.followedId)
132 case '4': // mutuals
133 const followerIds = (
134 await Follows.findAll({
135 where: {
136 accepted: true,
137 followedId: notification.notifiedUserId
138 }
139 })
140 ).map((elem) => elem.followerId)
141 validUsers = (
142 await Follows.findAll({
143 where: {
144 accepted: true,
145 followerId: notification.notifiedUserId,
146 followedId: {
147 [Op.in]: followerIds
148 }
149 }
150 })
151 ).map((elem) => elem.followedId)
152 if (validUsers.includes(notification.userId)) {
153 notificationsToSend.push(notification)
154 }
155 continue
156 }
157 } else {
158 notificationsToSend.push(notification)
159 continue
160 }
161 }
162 }
163 }
164
165 const users = await User.findAll({
166 attributes: ['id', 'url'],
167 where: {
168 id: {
169 [Op.in]: notificationsToSend.map((n) => n.notifiedUserId)
170 }
171 }
172 })
173
174 const userUrlMap = Object.fromEntries(users.map((u) => [u.id, u.url]))
175 for (const notif of notificationsToSend) {
176 const url = userUrlMap[notif.notifiedUserId]
177 if (url) {
178 notif.notifiedUserUrl = url
179 }
180 }
181
182 if (notificationsToSend.length > 0) {
183 await sendWebPushNotifications(notificationsToSend, context)
184 await sendExpoNotifications(notificationsToSend, context)
185 await sendWsNotifications(notificationsToSend, context)
186 }
187}
188
189export async function sendExpoNotifications(notifications: NotificationBody[], context?: NotificationContext) {
190 const userIds = notifications.map((elem) => elem.notifiedUserId)
191 const tokenRows = await PushNotificationToken.findAll({
192 where: {
193 userId: {
194 [Op.in]: userIds
195 }
196 }
197 })
198
199 if (tokenRows.length === 0) {
200 return
201 }
202 const payloads = notifications.map((notification) => {
203 const tokens = tokenRows.filter((row) => row.userId === notification.notifiedUserId).map((row) => row.token)
204
205 // send the same notification to all the devices of each notified user
206 return {
207 to: tokens,
208 sound: 'default',
209 title: getNotificationTitle(notification, context),
210 body: getNotificationBody(notification, context),
211 data: { notification, context }
212 }
213 })
214
215 // this will chunk the payloads into chunks of 1000 (max) and compress notifications with similar content
216 const okTickets = []
217 const filteredPayloads: {
218 to: any[]
219 sound: string
220 title: string
221 body: string
222 data: {
223 notification: NotificationBody
224 context: NotificationContext | undefined
225 }
226 }[] = []
227 for await (const payload of payloads) {
228 const mutedPosts = (await getMutedPosts(payload.data.notification.notifiedUserId, false)).concat(
229 await getMutedPosts(payload.data.notification.notifiedUserId, true)
230 )
231 if (!mutedPosts.includes(payload.data.notification.postId as string)) {
232 filteredPayloads.push(payload)
233 }
234 }
235 const chunks = expoClient.chunkPushNotifications(filteredPayloads)
236 for (const chunk of chunks) {
237 const responses = await expoClient.sendPushNotificationsAsync(chunk)
238 for (const response of responses) {
239 if (response.status === 'ok') {
240 okTickets.push(response.id)
241 } else {
242 await handleDeliveryError(response)
243 }
244 }
245 }
246
247 await scheduleNotificationCheck(okTickets)
248}
249
250// schedule a job to check the delivery of the notifications after 30 minutes of being sent
251// this guarantees that the notification was delivered to the messaging services even in cases of high load
252function scheduleNotificationCheck(ticketIds: string[]) {
253 const delay = 1000 * 60 * 30 // 30 minutes
254 return deliveryCheckQueue.add('checkPushNotificationDelivery', { ticketIds }, { delay })
255}
256
257async function sendWsNotifications(notifications: NotificationBody[], context?: NotificationContext) {
258 await websocketQueue.addBulk(
259 notifications.map((elem) => {
260 // we just tell the user to update the notifications
261 return {
262 name: 'updateNotificationsSocket',
263 data: {
264 userId: elem.notifiedUserId,
265 type: elem.notificationType,
266 from: elem.userId,
267 postId: elem.postId ? elem.postId : ''
268 }
269 }
270 })
271 )
272}