unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at testPDSNotExplode 240 lines 6.7 kB view raw
1import { Model, Op, Sequelize } from 'sequelize' 2import { logger } from '../logger.js' 3import { postPetitionSigned } from '../activitypub/postPetitionSigned.js' 4import { postToJSONLD } from '../activitypub/postToJSONLD.js' 5import { LdSignature } from '../activitypub/rsa2017.js' 6import { 7 FederatedHost, 8 Post, 9 User, 10 PostHostView, 11 RemoteUserPostView, 12 sequelize, 13 Media, 14 Quotes, 15 PostTag 16} from '../../models/index.js' 17import { Job, Queue } from 'bullmq' 18import { Privacy } from '../../models/post.js' 19import { completeEnvironment } from '../backendOptions.js' 20 21const processPostViewQueue = new Queue('processRemoteView', { 22 connection: completeEnvironment.bullmqConnection, 23 defaultJobOptions: { 24 removeOnComplete: true, 25 attempts: 3, 26 backoff: { 27 type: 'exponential', 28 delay: 25000 29 }, 30 removeOnFail: true 31 } 32}) 33 34const sendPostQueue = new Queue('sendPostToInboxes', { 35 connection: completeEnvironment.bullmqConnection, 36 defaultJobOptions: { 37 removeOnComplete: true, 38 attempts: 3, 39 backoff: { 40 type: 'fixed', 41 delay: 25000 42 }, 43 removeOnFail: true 44 } 45}) 46 47const sendPostBskyQueue = new Queue('sendPostBsky', { 48 connection: completeEnvironment.bullmqConnection, 49 defaultJobOptions: { 50 removeOnComplete: true, 51 attempts: 3, 52 backoff: { 53 type: 'fixed', 54 delay: 25000 55 }, 56 removeOnFail: true 57 } 58}) 59async function prepareSendRemotePostWorker(job: Job) { 60 //async function sendRemotePost(localUser: any, post: any) { 61 const post = await Post.findByPk(job.id) 62 if (!post) { 63 return 64 } 65 66 const localUser = await User.scope('full').findByPk(post.userId) 67 if (post.privacy === Privacy.Public && localUser?.enableBsky && completeEnvironment.enableBsky) { 68 await sendPostBskyQueue.add('sendPostBsky', job.data) 69 } 70 71 const parents = await post.getAncestors({ 72 include: [ 73 { 74 model: User, 75 as: 'user' 76 } 77 ] 78 }) 79 // we check if we need to send the post to fedi 80 const isBskyPost = parents.some((elem) => elem.isRemoteBlueskyPost) 81 if (localUser && !isBskyPost) { 82 // servers with shared inbox 83 let serversToSendThePost: FederatedHost[] = [] 84 const localUserFollowers = await localUser.getFollower() 85 const followersServers = [...new Set(localUserFollowers.map((el: any) => el.federatedHostId))] 86 // for servers with no shared inbox 87 let usersToSendThePost = await FederatedHost.findAll({ 88 where: { 89 publicInbox: { [Op.eq]: null }, 90 blocked: false 91 }, 92 include: [ 93 { 94 required: true, 95 model: User, 96 attributes: ['remoteInbox', 'id'], 97 where: { 98 banned: false, 99 id: { 100 [Op.in]: (await localUser.getFollower()).map((usr: any) => usr.id) 101 } 102 } 103 } 104 ] 105 }) 106 // mentioned users 107 const mentionedUsers = await post.getMentionPost() 108 switch (post.privacy) { 109 case Privacy.LocalOnly: { 110 break 111 } 112 case Privacy.DirectMessage: { 113 serversToSendThePost = [] 114 usersToSendThePost = [] 115 break 116 } 117 default: { 118 serversToSendThePost = await FederatedHost.findAll({ 119 where: { 120 publicInbox: { [Op.ne]: null }, 121 blocked: { [Op.ne]: true }, 122 [Op.or]: [ 123 { 124 id: { 125 [Op.in]: followersServers 126 } 127 }, 128 { 129 friendServer: true 130 } 131 ] 132 } 133 }) 134 } 135 } 136 137 let userViews = usersToSendThePost 138 .flatMap((usr: any) => usr.users) 139 .map((elem: any) => { 140 return { 141 userId: elem.id, 142 postId: post.id 143 } 144 }) 145 .concat( 146 mentionedUsers.map((elem: any) => { 147 return { 148 userId: elem.id, 149 postId: post.id 150 } 151 }) 152 ) 153 154 // we store the fact that we have sent the post in a queue 155 await processPostViewQueue.addBulk( 156 serversToSendThePost.map((host: any) => { 157 return { 158 name: host.displayName + post.id, 159 data: { 160 postId: post.id, 161 federatedHostId: host.id, 162 userId: '' 163 } 164 } 165 }) 166 ) 167 // we store the fact that we have sent the post in a queue 168 await processPostViewQueue.addBulk( 169 userViews.map((userView: any) => { 170 return { 171 name: userView.userId + post.id, 172 data: { 173 postId: post.id, 174 federatedHostId: '', 175 userId: userView.userId 176 } 177 } 178 }) 179 ) 180 181 await RemoteUserPostView.bulkCreate(userViews, { 182 ignoreDuplicates: true 183 }) 184 185 const objectToSend = await postToJSONLD(post.id) 186 if (!objectToSend) { 187 return 188 } 189 const ldSignature = new LdSignature() 190 if (localUser.privateKey) { 191 const bodySignature = await ldSignature.signRsaSignature2017( 192 objectToSend, 193 localUser.privateKey, 194 `${completeEnvironment.frontendUrl}/fediverse/blog/${localUser.url.toLocaleLowerCase()}`, 195 completeEnvironment.instanceUrl, 196 new Date(post.createdAt) 197 ) 198 199 const objectToSendComplete = { ...objectToSend, signature: bodySignature.signature } 200 if (mentionedUsers?.length > 0) { 201 const mentionedInboxes = mentionedUsers.map((elem: any) => elem.remoteInbox) 202 for await (const remoteInbox of mentionedInboxes) { 203 try { 204 const response = await postPetitionSigned(objectToSendComplete, localUser, remoteInbox) 205 } catch (error) { 206 logger.debug(error) 207 } 208 } 209 } 210 211 if (serversToSendThePost?.length > 0 || usersToSendThePost?.length > 0) { 212 let inboxes: string[] = [] 213 inboxes = inboxes.concat(serversToSendThePost.map((elem: any) => elem.publicInbox)) 214 usersToSendThePost?.forEach((server: any) => { 215 inboxes = inboxes.concat(server.users.map((elem: any) => elem.remoteInbox)) 216 }) 217 const addSendPostToQueuePromises: Promise<any>[] = [] 218 logger.debug(`Preparing send post. ${inboxes.length} inboxes`) 219 for (const inboxChunk of inboxes) { 220 addSendPostToQueuePromises.push( 221 sendPostQueue.add( 222 'sendChunk', 223 { 224 objectToSend: objectToSendComplete, 225 petitionBy: localUser.dataValues, 226 inboxList: inboxChunk 227 }, 228 { 229 priority: 1 230 } 231 ) 232 ) 233 } 234 await Promise.allSettled(addSendPostToQueuePromises) 235 } 236 } 237 } 238} 239 240export { prepareSendRemotePostWorker }