unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Model, Op, Sequelize } from "sequelize";
2import { logger } from "../logger.js";
3import { postPetitionSigned } from "../activitypub/postPetitionSigned.js";
4import {
5 getPostUrlForQuote,
6 postToJSONLD,
7} from "../activitypub/postToJSONLD.js";
8import { LdSignature } from "../activitypub/rsa2017.js";
9import {
10 FederatedHost,
11 Post,
12 User,
13 PostHostView,
14 RemoteUserPostView,
15 sequelize,
16 Media,
17 Quotes,
18 PostTag,
19} from "../../models/index.js";
20import { Job, Queue } from "bullmq";
21import { Privacy } from "../../models/post.js";
22import { completeEnvironment } from "../backendOptions.js";
23import { activityPubObject } from "../../interfaces/fediverse/activityPubObject.js";
24import { getPetitionSigned } from "../activitypub/getPetitionSigned.js";
25import { include } from "underscore";
26import { wait } from "../wait.js";
27
28const processPostViewQueue = new Queue("processRemoteView", {
29 connection: completeEnvironment.bullmqConnection,
30 defaultJobOptions: {
31 removeOnComplete: true,
32 attempts: 3,
33 backoff: {
34 type: "exponential",
35 delay: 25000,
36 },
37 removeOnFail: true,
38 },
39});
40
41const sendPostQueue = new Queue("sendPostToInboxes", {
42 connection: completeEnvironment.bullmqConnection,
43 defaultJobOptions: {
44 removeOnComplete: true,
45 attempts: 3,
46 backoff: {
47 type: "fixed",
48 delay: 25000,
49 },
50 removeOnFail: true,
51 },
52});
53
54async function prepareSendRemotePostWorker(job: Job) {
55 let highPriorityInboxes: string[] = [];
56 //async function sendRemotePost(localUser: any, post: any) {
57 const post = await Post.findByPk(job.id);
58 if (!post) {
59 return;
60 }
61
62 const localUser = await User.scope("full").findByPk(post.userId);
63 const parents = await post.getAncestors({
64 include: [
65 {
66 model: User,
67 as: "user",
68 },
69 ],
70 });
71 // we check if we need to send the post to fedi
72 const sendPostToFedi = parents.every((elem) => elem.postShouldGoFedi);
73 if (localUser && sendPostToFedi) {
74 // we get quote authorizations
75 const quotes = (
76 await Quotes.findAll({
77 include: [
78 {
79 model: Post,
80 as: "quotedPost",
81 include: [{ model: User, as: "user" }],
82 },
83 ],
84 where: {
85 quoterPostId: post.id,
86 },
87 })
88 ).filter(
89 (quote: any) => !!quote.dataValues.quotedPost.dataValues.user.remoteId
90 );
91 // TODO change in the future if fedi decides to do more than one quote
92 if (quotes && quotes.length == 1) {
93 const quote: any = quotes[0];
94 const objectToSend: activityPubObject = {
95 "@context": [
96 "https://www.w3.org/ns/activitystreams",
97 `${completeEnvironment.frontendUrl}/contexts/litepub-0.1.jsonld`,
98 ],
99 actor: `${
100 completeEnvironment.frontendUrl
101 }/fediverse/blog/${localUser.url.toLowerCase()}`,
102 id: `${completeEnvironment.frontendUrl}/fediverse/quote_request/${post.id}`,
103 type: "QuoteRequest",
104 object: await getPostUrlForQuote(quote.dataValues.quotedPost),
105 instrument: await postToJSONLD(post.id),
106 };
107 await RemoteUserPostView.findOrCreate({
108 where: {
109 postId: post.id,
110 userId: quote.dataValues.quotedPost.dataValues.user.id,
111 },
112 });
113
114 highPriorityInboxes.push(
115 quote.dataValues.quotedPost.dataValues.user.remoteInbox
116 );
117 }
118 // servers with shared inbox
119 let serversToSendThePost: FederatedHost[] = [];
120 const localUserFollowers = await localUser.getFollower();
121 const followersServers = [
122 ...new Set(localUserFollowers.map((el: any) => el.federatedHostId)),
123 ];
124 // for servers with no shared inbox
125 let usersToSendThePost = await FederatedHost.findAll({
126 where: {
127 publicInbox: { [Op.eq]: null },
128 blocked: false,
129 },
130 include: [
131 {
132 required: true,
133 model: User,
134 attributes: ["remoteInbox", "id"],
135 where: {
136 banned: false,
137 id: {
138 [Op.in]: (
139 await localUser.getFollower()
140 ).map((usr: any) => usr.id),
141 },
142 },
143 },
144 ],
145 });
146 // mentioned users
147 const mentionedUsers = await post.getMentionPost();
148 switch (post.privacy) {
149 case Privacy.LocalOnly: {
150 break;
151 }
152 case Privacy.DirectMessage: {
153 serversToSendThePost = [];
154 usersToSendThePost = [];
155 break;
156 }
157 default: {
158 serversToSendThePost = await FederatedHost.findAll({
159 where: {
160 publicInbox: { [Op.notIn]: [''], [Op.ne]: null },
161 blocked: { [Op.ne]: true },
162 [Op.or]: [
163 {
164 id: {
165 [Op.in]: followersServers,
166 },
167 },
168 {
169 friendServer: true,
170 },
171 ],
172 },
173 });
174 }
175 }
176
177 let userViews = usersToSendThePost
178 .flatMap((usr: any) => usr.users)
179 .map((elem: any) => {
180 return {
181 userId: elem.id,
182 postId: post.id,
183 };
184 })
185 .concat(
186 mentionedUsers.map((elem: any) => {
187 return {
188 userId: elem.id,
189 postId: post.id,
190 };
191 })
192 );
193
194 // we store the fact that we have sent the post in a queue
195 await processPostViewQueue.addBulk(
196 serversToSendThePost.map((host: any) => {
197 return {
198 name: host.displayName + post.id,
199 data: {
200 postId: post.id,
201 federatedHostId: host.id,
202 userId: "",
203 },
204 };
205 })
206 );
207 // we store the fact that we have sent the post in a queue
208 await processPostViewQueue.addBulk(
209 userViews.map((userView: any) => {
210 return {
211 name: userView.userId + post.id,
212 data: {
213 postId: post.id,
214 federatedHostId: "",
215 userId: userView.userId,
216 },
217 };
218 })
219 );
220
221 await RemoteUserPostView.bulkCreate(userViews, {
222 ignoreDuplicates: true,
223 });
224
225 const objectToSend = await postToJSONLD(post.id);
226 if (!objectToSend) {
227 return;
228 }
229 try {
230 const ldSignature = new LdSignature();
231 if (localUser.privateKey) {
232 const bodySignature = await ldSignature.signRsaSignature2017(
233 objectToSend,
234 localUser.privateKey,
235 `${
236 completeEnvironment.frontendUrl
237 }/fediverse/blog/${localUser.url.toLocaleLowerCase()}`,
238 completeEnvironment.instanceUrl,
239 new Date(post.createdAt)
240 );
241
242 const objectToSendComplete = {
243 ...objectToSend,
244 signature: bodySignature.signature,
245 };
246 if (mentionedUsers?.length > 0) {
247 const mentionedInboxes = mentionedUsers.map(
248 (elem: any) => elem.remoteInbox
249 );
250 for await (const mentionedUser of mentionedUsers.filter(
251 (elem) => elem.remoteId
252 )) {
253 await RemoteUserPostView.findOrCreate({
254 where: {
255 postId: post.id,
256 userId: mentionedUser.id,
257 },
258 });
259 }
260 for await (const remoteInbox of mentionedInboxes) {
261 highPriorityInboxes.push(remoteInbox);
262 }
263 }
264 if (post.isReblog) {
265 const parent = await Post.findByPk(post.parentId, {
266 include: [{ model: User, as: "user" }],
267 });
268 if (parent && parent.user?.remoteInbox) {
269 highPriorityInboxes.push(parent.user.remoteInbox);
270 await RemoteUserPostView.findOrCreate({
271 where: {
272 postId: post.id,
273 userId: parent.user.id,
274 },
275 });
276 }
277 }
278
279 if (
280 serversToSendThePost.length > 0 ||
281 usersToSendThePost.length > 0 ||
282 highPriorityInboxes.length > 0
283 ) {
284 let inboxes: string[] = [];
285 inboxes = inboxes.concat(
286 serversToSendThePost.map((elem: any) => elem.publicInbox)
287 );
288 usersToSendThePost?.forEach((server: any) => {
289 inboxes = inboxes.concat(
290 server.users.map((elem: any) => elem.remoteInbox)
291 );
292 });
293 inboxes = [...highPriorityInboxes, ...inboxes]
294 await sendPostQueue.addBulk(inboxes.map(elem => {
295 return {
296 name: 'sendChunk',
297 data: {
298 objectToSend: objectToSendComplete,
299 petitionBy: localUser.dataValues,
300 inboxList: elem,
301 }
302
303 }
304 }))
305 }
306 }
307 } catch (error) {
308 logger.info({
309 message: `Error signing fedi post`,
310 error,
311 });
312 }
313 }
314}
315
316export { prepareSendRemotePostWorker };