unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Expo } from 'expo-server-sdk'
2import { Follows, Notification, PushNotificationToken, UserOptions } from '../../models/index.js'
3import { logger } from '../logger.js'
4import {
5 getNotificationBody,
6 getNotificationTitle,
7 handleDeliveryError,
8 type NotificationBody,
9 type NotificationContext
10} from '../pushNotifications.js'
11import { Job, Queue } from 'bullmq'
12import { Op } from 'sequelize'
13import { getMutedPosts } from '../cacheGetters/getMutedPosts.js'
14import { sendWebPushNotifications } from '../webpush.js'
15import getBlockedIds from '../cacheGetters/getBlockedIds.js'
16import { completeEnvironment } from '../backendOptions.js'
17
18const deliveryCheckQueue = new Queue('checkPushNotificationDelivery', {
19 connection: completeEnvironment.bullmqConnection,
20 defaultJobOptions: {
21 removeOnComplete: true,
22 attempts: 3,
23 backoff: {
24 type: 'exponential',
25 delay: 1000
26 }
27 }
28})
29
30const websocketQueue = new Queue('updateNotificationsSocket', {
31 connection: completeEnvironment.bullmqConnection,
32 defaultJobOptions: {
33 removeOnComplete: true,
34 attempts: 3,
35 backoff: {
36 type: 'exponential',
37 delay: 1000
38 }
39 }
40})
41
42const expoClient = new Expo()
43
44type PushNotificationPayload = {
45 notifications: NotificationBody[]
46 context?: NotificationContext
47}
48
49export async function sendPushNotification(job: Job<PushNotificationPayload>) {
50 const { notifications, context } = job.data
51 let notificationsToSend: NotificationBody[] = []
52 for await (const notification of notifications) {
53 const mutedPosts = new Set(
54 (await getMutedPosts(notification.notifiedUserId, false)).concat(
55 await getMutedPosts(notification.notifiedUserId, true)
56 )
57 )
58 if (!mutedPosts.has(notification.postId ? notification.postId : '')) {
59 const blockedUsers = await getBlockedIds(notification.notifiedUserId) // do not push notification if muted user
60 if (notification.userId == notification.notifiedUserId || blockedUsers.includes(notification.userId)) {
61 // this is from a blocked user or same user. do not notify
62 continue
63 }
64 // TODO this part of code is repeated. take it to a function another day
65 const options = await UserOptions.findAll({
66 where: {
67 userId: notification.notifiedUserId,
68 optionName: {
69 [Op.in]: [
70 'wafrn.notificationsFrom',
71 'wafrn.notifyMentions',
72 'wafrn.notifyReactions',
73 'wafrn.notifyQuotes',
74 'wafrn.notifyFollows',
75 'wafrn.notifyRewoots'
76 ]
77 }
78 }
79 })
80 const optionNotificationsFrom = options.find((elem) => elem.optionName == 'wafrn.notificationsFrom')
81 const optionNotifyQuotes = options.find((elem) => elem.optionName == 'wafrn.notifyQuotes')
82 const optionNotifyMentions = options.find((elem) => elem.optionName == 'wafrn.notifyMentions')
83 const optionNotifyReactions = options.find((elem) => elem.optionName == 'wafrn.notifyReactions')
84 const optionNotifyFollows = options.find((elem) => elem.optionName == 'wafrn.notifyFollows')
85 const optionNotifyRewoots = options.find((elem) => elem.optionName == 'wafrn.notifyRewoots')
86
87 const notificationTypes = []
88 if (!optionNotifyQuotes || optionNotifyQuotes.optionValue != 'false') {
89 notificationTypes.push('QUOTE')
90 }
91 if (!optionNotifyMentions || optionNotifyMentions.optionValue != 'false') {
92 notificationTypes.push('MENTION')
93 }
94 if (!optionNotifyReactions || optionNotifyReactions.optionValue != 'false') {
95 notificationTypes.push('EMOJIREACT')
96 notificationTypes.push('LIKE')
97 }
98 if (!optionNotifyFollows || optionNotifyFollows.optionValue != 'false') {
99 notificationTypes.push('FOLLOW')
100 }
101 if (!optionNotifyRewoots || optionNotifyRewoots.optionValue != 'false') {
102 notificationTypes.push('REWOOT')
103 }
104 if (notificationTypes.includes(notification.notificationType)) {
105 if (optionNotificationsFrom && optionNotificationsFrom.optionValue != '1') {
106 let validUsers: string[] = []
107 switch (optionNotificationsFrom.optionValue) {
108 case '2': // followers
109 validUsers = (
110 await Follows.findAll({
111 where: {
112 accepted: true,
113 followedId: notification.notifiedUserId
114 }
115 })
116 ).map((elem) => elem.followerId)
117 case '3': // followees
118 validUsers = (
119 await Follows.findAll({
120 where: {
121 accepted: true,
122 followerId: notification.notifiedUserId
123 }
124 })
125 ).map((elem) => elem.followedId)
126 case '4': // mutuals
127 const followerIds = (
128 await Follows.findAll({
129 where: {
130 accepted: true,
131 followedId: notification.notifiedUserId
132 }
133 })
134 ).map((elem) => elem.followerId)
135 validUsers = (
136 await Follows.findAll({
137 where: {
138 accepted: true,
139 followerId: notification.notifiedUserId,
140 followedId: {
141 [Op.in]: followerIds
142 }
143 }
144 })
145 ).map((elem) => elem.followedId)
146 if (validUsers.includes(notification.userId)) {
147 notificationsToSend.push(notification)
148 }
149 continue
150 }
151 } else {
152 notificationsToSend.push(notification)
153 continue
154 }
155 }
156 }
157 }
158
159 if (notificationsToSend.length > 0) {
160 await sendWebPushNotifications(notificationsToSend, context)
161 await sendExpoNotifications(notificationsToSend, context)
162 await sendWsNotifications(notificationsToSend, context)
163 }
164}
165
166export async function sendExpoNotifications(notifications: NotificationBody[], context?: NotificationContext) {
167 const userIds = notifications.map((elem) => elem.notifiedUserId)
168 const tokenRows = await PushNotificationToken.findAll({
169 where: {
170 userId: {
171 [Op.in]: userIds
172 }
173 }
174 })
175
176 if (tokenRows.length === 0) {
177 return
178 }
179 const payloads = notifications.map((notification) => {
180 const tokens = tokenRows.filter((row) => row.userId === notification.notifiedUserId).map((row) => row.token)
181
182 // send the same notification to all the devices of each notified user
183 return {
184 to: tokens,
185 sound: 'default',
186 title: getNotificationTitle(notification, context),
187 body: getNotificationBody(notification, context),
188 data: { notification, context }
189 }
190 })
191
192 // this will chunk the payloads into chunks of 1000 (max) and compress notifications with similar content
193 const okTickets = []
194 const filteredPayloads: {
195 to: any[]
196 sound: string
197 title: string
198 body: string
199 data: {
200 notification: NotificationBody
201 context: NotificationContext | undefined
202 }
203 }[] = []
204 for await (const payload of payloads) {
205 const mutedPosts = (await getMutedPosts(payload.data.notification.notifiedUserId, false)).concat(
206 await getMutedPosts(payload.data.notification.notifiedUserId, true)
207 )
208 if (!mutedPosts.includes(payload.data.notification.postId as string)) {
209 filteredPayloads.push(payload)
210 }
211 }
212 const chunks = expoClient.chunkPushNotifications(filteredPayloads)
213 for (const chunk of chunks) {
214 const responses = await expoClient.sendPushNotificationsAsync(chunk)
215 for (const response of responses) {
216 if (response.status === 'ok') {
217 okTickets.push(response.id)
218 } else {
219 await handleDeliveryError(response)
220 }
221 }
222 }
223
224 await scheduleNotificationCheck(okTickets)
225}
226
227// schedule a job to check the delivery of the notifications after 30 minutes of being sent
228// this guarantees that the notification was delivered to the messaging services even in cases of high load
229function scheduleNotificationCheck(ticketIds: string[]) {
230 const delay = 1000 * 60 * 30 // 30 minutes
231 return deliveryCheckQueue.add('checkPushNotificationDelivery', { ticketIds }, { delay })
232}
233
234async function sendWsNotifications(notifications: NotificationBody[], context?: NotificationContext) {
235 await websocketQueue.addBulk(
236 notifications.map((elem) => {
237 // we just tell the user to update the notifications
238 return {
239 name: 'updateNotificationsSocket',
240 data: {
241 userId: elem.notifiedUserId,
242 type: elem.notificationType,
243 from: elem.userId,
244 postId: elem.postId ? elem.postId : ''
245 }
246 }
247 })
248 )
249}