unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at angular21 202 lines 6.0 kB view raw
1import { Queue } from "bullmq"; 2import { completeEnvironment } from "../backendOptions.js"; 3import { UserBitesPostRelation } from "../../models/userBitesPostRelation.js"; 4import { User } from "../../models/user.js"; 5import { Post, Privacy } from "../../models/post.js"; 6import { activityPubObject } from "../../interfaces/fediverse/activityPubObject.js"; 7import sequelize from "sequelize/lib/sequelize"; 8import { Op } from "sequelize"; 9import { FederatedHost } from "../../models/federatedHost.js"; 10import { logger } from "../logger.js"; 11import { postPetitionSigned } from "./postPetitionSigned.js"; 12 13async function bitePostRemote(biteRelation: UserBitesPostRelation) { 14 const user = await User.findOne({ 15 where: { 16 id: biteRelation.userId, 17 }, 18 }); 19 20 const post = await Post.findOne({ 21 where: { 22 id: biteRelation.postId, 23 }, 24 include: [ 25 { 26 model: User, 27 as: "user", 28 }, 29 ], 30 }); 31 32 if (!user || !post) return; 33 34 const stringMyFollowers = `${ 35 completeEnvironment.frontendUrl 36 }/fediverse/blog/${user.url.toLowerCase()}/followers`; 37 const ownerOfBittenPost = 38 post.user.remoteId || 39 `${completeEnvironment.frontendUrl}/fediverse/blog/${post.user.url}`; 40 const biteObject: activityPubObject = { 41 "@context": [ 42 "https://www.w3.org/ns/activitystreams", 43 { Bite: "https://ns.mia.jetzt/as#Bite" }, 44 ], 45 actor: `${ 46 completeEnvironment.frontendUrl 47 }/fediverse/blog/${user.url.toLowerCase()}`, 48 id: `${completeEnvironment.frontendUrl}/fediverse/bites/${biteRelation.userId}/${biteRelation.postId}`, 49 target: 50 post.remotePostId || 51 `${completeEnvironment.frontendUrl}/fediverse/post/${post.id}`, 52 type: "Bite", 53 object: 54 post.remotePostId || 55 `${completeEnvironment.frontendUrl}/fediverse/post/${post.id}`, 56 to: 57 post.privacy / 1 === Privacy.DirectMessage 58 ? [ownerOfBittenPost] 59 : post.privacy / 1 === Privacy.Public 60 ? ["https://www.w3.org/ns/activitystreams#Public", stringMyFollowers] 61 : [stringMyFollowers], 62 }; 63 64 // servers with shared inbox 65 let serversToSendThePost = await FederatedHost.findAll({ 66 where: { 67 publicInbox: { [Op.ne]: null }, 68 blocked: { [Op.ne]: true }, 69 70 [Op.or]: [ 71 sequelize.literal( 72 `"id" in (SELECT "federatedHostId" from "users" where "users"."id" IN (SELECT "followerId" from "follows" where "followedId" = '${biteRelation.userId}') and "federatedHostId" is not NULL)` 73 ), 74 { 75 friendServer: true, 76 }, 77 ], 78 }, 79 }); 80 81 // To guarantee that remote user gets stuff 82 const usersToSendThePost = [await User.findByPk(post.userId)]; 83 await Promise.all([serversToSendThePost, usersToSendThePost]); 84 if (serversToSendThePost?.length > 0 || usersToSendThePost?.length > 0) { 85 let inboxes: string[] = []; 86 inboxes = inboxes.concat( 87 serversToSendThePost.map((elem: any) => elem.publicInbox) 88 ); 89 inboxes = inboxes.concat( 90 usersToSendThePost.map((elem: any) => 91 elem.remoteInbox ? elem.remoteInbox : "" 92 ) 93 ); 94 95 const sendPostQueue = new Queue("sendPostToInboxes", { 96 connection: completeEnvironment.bullmqConnection, 97 defaultJobOptions: { 98 removeOnComplete: true, 99 attempts: 3, 100 backoff: { 101 type: "exponential", 102 delay: 1000, 103 }, 104 removeOnFail: true, 105 }, 106 }); 107 108 for await (const inboxChunk of inboxes.filter((elem) => !!elem)) { 109 await sendPostQueue.add( 110 "sendChunk", 111 { 112 objectToSend: biteObject, 113 petitionBy: user.dataValues, 114 inboxList: inboxChunk, 115 }, 116 { 117 priority: 2097152, 118 delay: 500, 119 } 120 ); 121 } 122 } 123} 124 125async function biteUserRemote(biter: User, bittenUser: User) { 126 const biteObject: activityPubObject = { 127 "@context": [ 128 "https://www.w3.org/ns/activitystreams", 129 { Bite: "https://ns.mia.jetzt/as#Bite" }, 130 ], 131 id: `${completeEnvironment.frontendUrl}/fediverse/bites/${biter.id}/${bittenUser.id}`, 132 type: "Bite", 133 actor: `${ 134 completeEnvironment.frontendUrl 135 }/fediverse/blog/${biter.url.toLowerCase()}`, 136 target: bittenUser.remoteId, 137 object: bittenUser.remoteId, 138 to: [bittenUser.remoteId], 139 }; 140 141 // servers with shared inbox 142 let serversToSendThePost = await FederatedHost.findAll({ 143 where: { 144 publicInbox: { [Op.ne]: null }, 145 blocked: { [Op.ne]: true }, 146 147 [Op.or]: [ 148 sequelize.literal( 149 `"id" in (SELECT "federatedHostId" from "users" where "users"."id" IN (SELECT "followerId" from "follows" where "followedId" = '${biter.id}') and "federatedHostId" is not NULL)` 150 ), 151 { 152 friendServer: true, 153 }, 154 ], 155 }, 156 }); 157 158 // To guarantee that remote user gets stuff 159 const usersToSendThePost = [await User.findByPk(bittenUser.id)]; 160 await Promise.all([serversToSendThePost, usersToSendThePost]); 161 if (serversToSendThePost?.length > 0 || usersToSendThePost?.length > 0) { 162 let inboxes: string[] = []; 163 inboxes = inboxes.concat( 164 serversToSendThePost.map((elem: any) => elem.publicInbox) 165 ); 166 inboxes = inboxes.concat( 167 usersToSendThePost.map((elem: any) => 168 elem.remoteInbox ? elem.remoteInbox : "" 169 ) 170 ); 171 172 const sendPostQueue = new Queue("sendPostToInboxes", { 173 connection: completeEnvironment.bullmqConnection, 174 defaultJobOptions: { 175 removeOnComplete: true, 176 attempts: 3, 177 backoff: { 178 type: "exponential", 179 delay: 1000, 180 }, 181 removeOnFail: true, 182 }, 183 }); 184 185 for await (const inboxChunk of inboxes.filter((elem) => !!elem)) { 186 await sendPostQueue.add( 187 "sendChunk", 188 { 189 objectToSend: biteObject, 190 petitionBy: biter.dataValues, 191 inboxList: inboxChunk, 192 }, 193 { 194 priority: 2097152, 195 delay: 500, 196 } 197 ); 198 } 199 } 200} 201 202export { bitePostRemote, biteUserRemote };