unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at cache-folder-container 316 lines 9.0 kB view raw
1import { Model, Op, Sequelize } from "sequelize"; 2import { logger } from "../logger.js"; 3import { postPetitionSigned } from "../activitypub/postPetitionSigned.js"; 4import { 5 getPostUrlForQuote, 6 postToJSONLD, 7} from "../activitypub/postToJSONLD.js"; 8import { LdSignature } from "../activitypub/rsa2017.js"; 9import { 10 FederatedHost, 11 Post, 12 User, 13 PostHostView, 14 RemoteUserPostView, 15 sequelize, 16 Media, 17 Quotes, 18 PostTag, 19} from "../../models/index.js"; 20import { Job, Queue } from "bullmq"; 21import { Privacy } from "../../models/post.js"; 22import { completeEnvironment } from "../backendOptions.js"; 23import { activityPubObject } from "../../interfaces/fediverse/activityPubObject.js"; 24import { getPetitionSigned } from "../activitypub/getPetitionSigned.js"; 25import { include } from "underscore"; 26import { wait } from "../wait.js"; 27 28const processPostViewQueue = new Queue("processRemoteView", { 29 connection: completeEnvironment.bullmqConnection, 30 defaultJobOptions: { 31 removeOnComplete: true, 32 attempts: 3, 33 backoff: { 34 type: "exponential", 35 delay: 25000, 36 }, 37 removeOnFail: true, 38 }, 39}); 40 41const sendPostQueue = new Queue("sendPostToInboxes", { 42 connection: completeEnvironment.bullmqConnection, 43 defaultJobOptions: { 44 removeOnComplete: true, 45 attempts: 3, 46 backoff: { 47 type: "fixed", 48 delay: 25000, 49 }, 50 removeOnFail: true, 51 }, 52}); 53 54async function prepareSendRemotePostWorker(job: Job) { 55 let highPriorityInboxes: string[] = []; 56 //async function sendRemotePost(localUser: any, post: any) { 57 const post = await Post.findByPk(job.id); 58 if (!post) { 59 return; 60 } 61 62 const localUser = await User.scope("full").findByPk(post.userId); 63 const parents = await post.getAncestors({ 64 include: [ 65 { 66 model: User, 67 as: "user", 68 }, 69 ], 70 }); 71 // we check if we need to send the post to fedi 72 const sendPostToFedi = parents.every((elem) => elem.postShouldGoFedi); 73 if (localUser && sendPostToFedi) { 74 // we get quote authorizations 75 const quotes = ( 76 await Quotes.findAll({ 77 include: [ 78 { 79 model: Post, 80 as: "quotedPost", 81 include: [{ model: User, as: "user" }], 82 }, 83 ], 84 where: { 85 quoterPostId: post.id, 86 }, 87 }) 88 ).filter( 89 (quote: any) => !!quote.dataValues.quotedPost.dataValues.user.remoteId 90 ); 91 // TODO change in the future if fedi decides to do more than one quote 92 if (quotes && quotes.length == 1) { 93 const quote: any = quotes[0]; 94 const objectToSend: activityPubObject = { 95 "@context": [ 96 "https://www.w3.org/ns/activitystreams", 97 `${completeEnvironment.frontendUrl}/contexts/litepub-0.1.jsonld`, 98 ], 99 actor: `${ 100 completeEnvironment.frontendUrl 101 }/fediverse/blog/${localUser.url.toLowerCase()}`, 102 id: `${completeEnvironment.frontendUrl}/fediverse/quote_request/${post.id}`, 103 type: "QuoteRequest", 104 object: await getPostUrlForQuote(quote.dataValues.quotedPost), 105 instrument: await postToJSONLD(post.id), 106 }; 107 await RemoteUserPostView.findOrCreate({ 108 where: { 109 postId: post.id, 110 userId: quote.dataValues.quotedPost.dataValues.user.id, 111 }, 112 }); 113 114 highPriorityInboxes.push( 115 quote.dataValues.quotedPost.dataValues.user.remoteInbox 116 ); 117 } 118 // servers with shared inbox 119 let serversToSendThePost: FederatedHost[] = []; 120 const localUserFollowers = await localUser.getFollower(); 121 const followersServers = [ 122 ...new Set(localUserFollowers.map((el: any) => el.federatedHostId)), 123 ]; 124 // for servers with no shared inbox 125 let usersToSendThePost = await FederatedHost.findAll({ 126 where: { 127 publicInbox: { [Op.eq]: null }, 128 blocked: false, 129 }, 130 include: [ 131 { 132 required: true, 133 model: User, 134 attributes: ["remoteInbox", "id"], 135 where: { 136 banned: false, 137 id: { 138 [Op.in]: ( 139 await localUser.getFollower() 140 ).map((usr: any) => usr.id), 141 }, 142 }, 143 }, 144 ], 145 }); 146 // mentioned users 147 const mentionedUsers = await post.getMentionPost(); 148 switch (post.privacy) { 149 case Privacy.LocalOnly: { 150 break; 151 } 152 case Privacy.DirectMessage: { 153 serversToSendThePost = []; 154 usersToSendThePost = []; 155 break; 156 } 157 default: { 158 serversToSendThePost = await FederatedHost.findAll({ 159 where: { 160 publicInbox: { [Op.notIn]: [''], [Op.ne]: null }, 161 blocked: { [Op.ne]: true }, 162 [Op.or]: [ 163 { 164 id: { 165 [Op.in]: followersServers, 166 }, 167 }, 168 { 169 friendServer: true, 170 }, 171 ], 172 }, 173 }); 174 } 175 } 176 177 let userViews = usersToSendThePost 178 .flatMap((usr: any) => usr.users) 179 .map((elem: any) => { 180 return { 181 userId: elem.id, 182 postId: post.id, 183 }; 184 }) 185 .concat( 186 mentionedUsers.map((elem: any) => { 187 return { 188 userId: elem.id, 189 postId: post.id, 190 }; 191 }) 192 ); 193 194 // we store the fact that we have sent the post in a queue 195 await processPostViewQueue.addBulk( 196 serversToSendThePost.map((host: any) => { 197 return { 198 name: host.displayName + post.id, 199 data: { 200 postId: post.id, 201 federatedHostId: host.id, 202 userId: "", 203 }, 204 }; 205 }) 206 ); 207 // we store the fact that we have sent the post in a queue 208 await processPostViewQueue.addBulk( 209 userViews.map((userView: any) => { 210 return { 211 name: userView.userId + post.id, 212 data: { 213 postId: post.id, 214 federatedHostId: "", 215 userId: userView.userId, 216 }, 217 }; 218 }) 219 ); 220 221 await RemoteUserPostView.bulkCreate(userViews, { 222 ignoreDuplicates: true, 223 }); 224 225 const objectToSend = await postToJSONLD(post.id); 226 if (!objectToSend) { 227 return; 228 } 229 try { 230 const ldSignature = new LdSignature(); 231 if (localUser.privateKey) { 232 const bodySignature = await ldSignature.signRsaSignature2017( 233 objectToSend, 234 localUser.privateKey, 235 `${ 236 completeEnvironment.frontendUrl 237 }/fediverse/blog/${localUser.url.toLocaleLowerCase()}`, 238 completeEnvironment.instanceUrl, 239 new Date(post.createdAt) 240 ); 241 242 const objectToSendComplete = { 243 ...objectToSend, 244 signature: bodySignature.signature, 245 }; 246 if (mentionedUsers?.length > 0) { 247 const mentionedInboxes = mentionedUsers.map( 248 (elem: any) => elem.remoteInbox 249 ); 250 for await (const mentionedUser of mentionedUsers.filter( 251 (elem) => elem.remoteId 252 )) { 253 await RemoteUserPostView.findOrCreate({ 254 where: { 255 postId: post.id, 256 userId: mentionedUser.id, 257 }, 258 }); 259 } 260 for await (const remoteInbox of mentionedInboxes) { 261 highPriorityInboxes.push(remoteInbox); 262 } 263 } 264 if (post.isReblog) { 265 const parent = await Post.findByPk(post.parentId, { 266 include: [{ model: User, as: "user" }], 267 }); 268 if (parent && parent.user?.remoteInbox) { 269 highPriorityInboxes.push(parent.user.remoteInbox); 270 await RemoteUserPostView.findOrCreate({ 271 where: { 272 postId: post.id, 273 userId: parent.user.id, 274 }, 275 }); 276 } 277 } 278 279 if ( 280 serversToSendThePost.length > 0 || 281 usersToSendThePost.length > 0 || 282 highPriorityInboxes.length > 0 283 ) { 284 let inboxes: string[] = []; 285 inboxes = inboxes.concat( 286 serversToSendThePost.map((elem: any) => elem.publicInbox) 287 ); 288 usersToSendThePost?.forEach((server: any) => { 289 inboxes = inboxes.concat( 290 server.users.map((elem: any) => elem.remoteInbox) 291 ); 292 }); 293 inboxes = [...highPriorityInboxes, ...inboxes] 294 await sendPostQueue.addBulk(inboxes.map(elem => { 295 return { 296 name: 'sendChunk', 297 data: { 298 objectToSend: objectToSendComplete, 299 petitionBy: localUser.dataValues, 300 inboxList: elem, 301 } 302 303 } 304 })) 305 } 306 } 307 } catch (error) { 308 logger.info({ 309 message: `Error signing fedi post`, 310 error, 311 }); 312 } 313 } 314} 315 316export { prepareSendRemotePostWorker };