unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Model, Op, Sequelize } from 'sequelize'
2import { logger } from '../logger.js'
3import { postPetitionSigned } from '../activitypub/postPetitionSigned.js'
4import { postToJSONLD } from '../activitypub/postToJSONLD.js'
5import { LdSignature } from '../activitypub/rsa2017.js'
6import {
7 FederatedHost,
8 Post,
9 User,
10 PostHostView,
11 RemoteUserPostView,
12 sequelize,
13 Media,
14 Quotes,
15 PostTag
16} from '../../models/index.js'
17import { Job, Queue } from 'bullmq'
18import { Privacy } from '../../models/post.js'
19import { completeEnvironment } from '../backendOptions.js'
20
21const processPostViewQueue = new Queue('processRemoteView', {
22 connection: completeEnvironment.bullmqConnection,
23 defaultJobOptions: {
24 removeOnComplete: true,
25 attempts: 3,
26 backoff: {
27 type: 'exponential',
28 delay: 25000
29 },
30 removeOnFail: true
31 }
32})
33
34const sendPostQueue = new Queue('sendPostToInboxes', {
35 connection: completeEnvironment.bullmqConnection,
36 defaultJobOptions: {
37 removeOnComplete: true,
38 attempts: 3,
39 backoff: {
40 type: 'fixed',
41 delay: 25000
42 },
43 removeOnFail: true
44 }
45})
46
47const sendPostBskyQueue = new Queue('sendPostBsky', {
48 connection: completeEnvironment.bullmqConnection,
49 defaultJobOptions: {
50 removeOnComplete: true,
51 attempts: 3,
52 backoff: {
53 type: 'fixed',
54 delay: 25000
55 },
56 removeOnFail: true
57 }
58})
59async function prepareSendRemotePostWorker(job: Job) {
60 //async function sendRemotePost(localUser: any, post: any) {
61 const post = await Post.findByPk(job.id)
62 if (!post) {
63 return
64 }
65
66 const localUser = await User.scope('full').findByPk(post.userId)
67 if (post.privacy === Privacy.Public && localUser?.enableBsky && completeEnvironment.enableBsky) {
68 await sendPostBskyQueue.add('sendPostBsky', job.data)
69 }
70
71 const parents = await post.getAncestors({
72 include: [
73 {
74 model: User,
75 as: 'user'
76 }
77 ]
78 })
79 // we check if we need to send the post to fedi
80 const isBskyPost = parents.some((elem) => elem.isRemoteBlueskyPost)
81 if (localUser && !isBskyPost) {
82 // servers with shared inbox
83 let serversToSendThePost: FederatedHost[] = []
84 const localUserFollowers = await localUser.getFollower()
85 const followersServers = [...new Set(localUserFollowers.map((el: any) => el.federatedHostId))]
86 // for servers with no shared inbox
87 let usersToSendThePost = await FederatedHost.findAll({
88 where: {
89 publicInbox: { [Op.eq]: null },
90 blocked: false
91 },
92 include: [
93 {
94 required: true,
95 model: User,
96 attributes: ['remoteInbox', 'id'],
97 where: {
98 banned: false,
99 id: {
100 [Op.in]: (await localUser.getFollower()).map((usr: any) => usr.id)
101 }
102 }
103 }
104 ]
105 })
106 // mentioned users
107 const mentionedUsers = await post.getMentionPost()
108 switch (post.privacy) {
109 case Privacy.LocalOnly: {
110 break
111 }
112 case Privacy.DirectMessage: {
113 serversToSendThePost = []
114 usersToSendThePost = []
115 break
116 }
117 default: {
118 serversToSendThePost = await FederatedHost.findAll({
119 where: {
120 publicInbox: { [Op.ne]: null },
121 blocked: { [Op.ne]: true },
122 [Op.or]: [
123 {
124 id: {
125 [Op.in]: followersServers
126 }
127 },
128 {
129 friendServer: true
130 }
131 ]
132 }
133 })
134 }
135 }
136
137 let userViews = usersToSendThePost
138 .flatMap((usr: any) => usr.users)
139 .map((elem: any) => {
140 return {
141 userId: elem.id,
142 postId: post.id
143 }
144 })
145 .concat(
146 mentionedUsers.map((elem: any) => {
147 return {
148 userId: elem.id,
149 postId: post.id
150 }
151 })
152 )
153
154 // we store the fact that we have sent the post in a queue
155 await processPostViewQueue.addBulk(
156 serversToSendThePost.map((host: any) => {
157 return {
158 name: host.displayName + post.id,
159 data: {
160 postId: post.id,
161 federatedHostId: host.id,
162 userId: ''
163 }
164 }
165 })
166 )
167 // we store the fact that we have sent the post in a queue
168 await processPostViewQueue.addBulk(
169 userViews.map((userView: any) => {
170 return {
171 name: userView.userId + post.id,
172 data: {
173 postId: post.id,
174 federatedHostId: '',
175 userId: userView.userId
176 }
177 }
178 })
179 )
180
181 await RemoteUserPostView.bulkCreate(userViews, {
182 ignoreDuplicates: true
183 })
184
185 const objectToSend = await postToJSONLD(post.id)
186 if (!objectToSend) {
187 return
188 }
189 const ldSignature = new LdSignature()
190 if (localUser.privateKey) {
191 const bodySignature = await ldSignature.signRsaSignature2017(
192 objectToSend,
193 localUser.privateKey,
194 `${completeEnvironment.frontendUrl}/fediverse/blog/${localUser.url.toLocaleLowerCase()}`,
195 completeEnvironment.instanceUrl,
196 new Date(post.createdAt)
197 )
198
199 const objectToSendComplete = { ...objectToSend, signature: bodySignature.signature }
200 if (mentionedUsers?.length > 0) {
201 const mentionedInboxes = mentionedUsers.map((elem: any) => elem.remoteInbox)
202 for await (const remoteInbox of mentionedInboxes) {
203 try {
204 const response = await postPetitionSigned(objectToSendComplete, localUser, remoteInbox)
205 } catch (error) {
206 logger.debug(error)
207 }
208 }
209 }
210
211 if (serversToSendThePost?.length > 0 || usersToSendThePost?.length > 0) {
212 let inboxes: string[] = []
213 inboxes = inboxes.concat(serversToSendThePost.map((elem: any) => elem.publicInbox))
214 usersToSendThePost?.forEach((server: any) => {
215 inboxes = inboxes.concat(server.users.map((elem: any) => elem.remoteInbox))
216 })
217 const addSendPostToQueuePromises: Promise<any>[] = []
218 logger.debug(`Preparing send post. ${inboxes.length} inboxes`)
219 for (const inboxChunk of inboxes) {
220 addSendPostToQueuePromises.push(
221 sendPostQueue.add(
222 'sendChunk',
223 {
224 objectToSend: objectToSendComplete,
225 petitionBy: localUser.dataValues,
226 inboxList: inboxChunk
227 },
228 {
229 priority: 1
230 }
231 )
232 )
233 }
234 await Promise.allSettled(addSendPostToQueuePromises)
235 }
236 }
237 }
238}
239
240export { prepareSendRemotePostWorker }