unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Queue } from "bullmq";
2import { completeEnvironment } from "../backendOptions.js";
3import { UserBitesPostRelation } from "../../models/userBitesPostRelation.js";
4import { User } from "../../models/user.js";
5import { Post, Privacy } from "../../models/post.js";
6import { activityPubObject } from "../../interfaces/fediverse/activityPubObject.js";
7import sequelize from "sequelize/lib/sequelize";
8import { Op } from "sequelize";
9import { FederatedHost } from "../../models/federatedHost.js";
10import { logger } from "../logger.js";
11import { postPetitionSigned } from "./postPetitionSigned.js";
12
13async function bitePostRemote(biteRelation: UserBitesPostRelation) {
14 const user = await User.findOne({
15 where: {
16 id: biteRelation.userId,
17 },
18 });
19
20 const post = await Post.findOne({
21 where: {
22 id: biteRelation.postId,
23 },
24 include: [
25 {
26 model: User,
27 as: "user",
28 },
29 ],
30 });
31
32 if (!user || !post) return;
33
34 const stringMyFollowers = `${
35 completeEnvironment.frontendUrl
36 }/fediverse/blog/${user.url.toLowerCase()}/followers`;
37 const ownerOfBittenPost =
38 post.user.remoteId ||
39 `${completeEnvironment.frontendUrl}/fediverse/blog/${post.user.url}`;
40 const biteObject: activityPubObject = {
41 "@context": [
42 "https://www.w3.org/ns/activitystreams",
43 { Bite: "https://ns.mia.jetzt/as#Bite" },
44 ],
45 actor: `${
46 completeEnvironment.frontendUrl
47 }/fediverse/blog/${user.url.toLowerCase()}`,
48 id: `${completeEnvironment.frontendUrl}/fediverse/bites/${biteRelation.userId}/${biteRelation.postId}`,
49 target:
50 post.remotePostId ||
51 `${completeEnvironment.frontendUrl}/fediverse/post/${post.id}`,
52 type: "Bite",
53 object:
54 post.remotePostId ||
55 `${completeEnvironment.frontendUrl}/fediverse/post/${post.id}`,
56 to:
57 post.privacy / 1 === Privacy.DirectMessage
58 ? [ownerOfBittenPost]
59 : post.privacy / 1 === Privacy.Public
60 ? ["https://www.w3.org/ns/activitystreams#Public", stringMyFollowers]
61 : [stringMyFollowers],
62 };
63
64 // servers with shared inbox
65 let serversToSendThePost = await FederatedHost.findAll({
66 where: {
67 publicInbox: { [Op.ne]: null },
68 blocked: { [Op.ne]: true },
69
70 [Op.or]: [
71 sequelize.literal(
72 `"id" in (SELECT "federatedHostId" from "users" where "users"."id" IN (SELECT "followerId" from "follows" where "followedId" = '${biteRelation.userId}') and "federatedHostId" is not NULL)`
73 ),
74 {
75 friendServer: true,
76 },
77 ],
78 },
79 });
80
81 // To guarantee that remote user gets stuff
82 const usersToSendThePost = [await User.findByPk(post.userId)];
83 await Promise.all([serversToSendThePost, usersToSendThePost]);
84 if (serversToSendThePost?.length > 0 || usersToSendThePost?.length > 0) {
85 let inboxes: string[] = [];
86 inboxes = inboxes.concat(
87 serversToSendThePost.map((elem: any) => elem.publicInbox)
88 );
89 inboxes = inboxes.concat(
90 usersToSendThePost.map((elem: any) =>
91 elem.remoteInbox ? elem.remoteInbox : ""
92 )
93 );
94
95 const sendPostQueue = new Queue("sendPostToInboxes", {
96 connection: completeEnvironment.bullmqConnection,
97 defaultJobOptions: {
98 removeOnComplete: true,
99 attempts: 3,
100 backoff: {
101 type: "exponential",
102 delay: 1000,
103 },
104 removeOnFail: true,
105 },
106 });
107
108 for await (const inboxChunk of inboxes.filter((elem) => !!elem)) {
109 await sendPostQueue.add(
110 "sendChunk",
111 {
112 objectToSend: biteObject,
113 petitionBy: user.dataValues,
114 inboxList: inboxChunk,
115 },
116 {
117 priority: 2097152,
118 delay: 500,
119 }
120 );
121 }
122 }
123}
124
125async function biteUserRemote(biter: User, bittenUser: User) {
126 const biteObject: activityPubObject = {
127 "@context": [
128 "https://www.w3.org/ns/activitystreams",
129 { Bite: "https://ns.mia.jetzt/as#Bite" },
130 ],
131 id: `${completeEnvironment.frontendUrl}/fediverse/bites/${biter.id}/${bittenUser.id}`,
132 type: "Bite",
133 actor: `${
134 completeEnvironment.frontendUrl
135 }/fediverse/blog/${biter.url.toLowerCase()}`,
136 target: bittenUser.remoteId,
137 object: bittenUser.remoteId,
138 to: [bittenUser.remoteId],
139 };
140
141 // servers with shared inbox
142 let serversToSendThePost = await FederatedHost.findAll({
143 where: {
144 publicInbox: { [Op.ne]: null },
145 blocked: { [Op.ne]: true },
146
147 [Op.or]: [
148 sequelize.literal(
149 `"id" in (SELECT "federatedHostId" from "users" where "users"."id" IN (SELECT "followerId" from "follows" where "followedId" = '${biter.id}') and "federatedHostId" is not NULL)`
150 ),
151 {
152 friendServer: true,
153 },
154 ],
155 },
156 });
157
158 // To guarantee that remote user gets stuff
159 const usersToSendThePost = [await User.findByPk(bittenUser.id)];
160 await Promise.all([serversToSendThePost, usersToSendThePost]);
161 if (serversToSendThePost?.length > 0 || usersToSendThePost?.length > 0) {
162 let inboxes: string[] = [];
163 inboxes = inboxes.concat(
164 serversToSendThePost.map((elem: any) => elem.publicInbox)
165 );
166 inboxes = inboxes.concat(
167 usersToSendThePost.map((elem: any) =>
168 elem.remoteInbox ? elem.remoteInbox : ""
169 )
170 );
171
172 const sendPostQueue = new Queue("sendPostToInboxes", {
173 connection: completeEnvironment.bullmqConnection,
174 defaultJobOptions: {
175 removeOnComplete: true,
176 attempts: 3,
177 backoff: {
178 type: "exponential",
179 delay: 1000,
180 },
181 removeOnFail: true,
182 },
183 });
184
185 for await (const inboxChunk of inboxes.filter((elem) => !!elem)) {
186 await sendPostQueue.add(
187 "sendChunk",
188 {
189 objectToSend: biteObject,
190 petitionBy: biter.dataValues,
191 inboxList: inboxChunk,
192 },
193 {
194 priority: 2097152,
195 delay: 500,
196 }
197 );
198 }
199 }
200}
201
202export { bitePostRemote, biteUserRemote };