unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Job, Queue } from "bullmq";
2import {
3 Blocks,
4 EmojiReaction,
5 FederatedHost,
6 Follows,
7 Mutes,
8 Post,
9 PostMentionsUserRelation,
10 User,
11 UserLikesPostRelations,
12 UserOptions,
13 sequelize,
14} from "../../models/index.js";
15import { completeEnvironment } from "../backendOptions.js";
16import { getUserIdFromRemoteId } from "../cacheGetters/getUserIdFromRemoteId.js";
17import { getPetitionSigned } from "../activitypub/getPetitionSigned.js";
18import { processUserEmojis } from "../activitypub/processUserEmojis.js";
19import { fediverseTag } from "../../interfaces/fediverse/tags.js";
20import { logger } from "../logger.js";
21import { redisCache } from "../redis.js";
22import { Op } from "sequelize";
23import { getDeletedUser } from "../cacheGetters/getDeletedUser.js";
24import processExternalCustomCss from "../processExternalCustomCss.js";
25import { unlink, writeFile } from "fs/promises";
26import { existsSync } from "fs";
27import { getDidDoc } from "../atproto/getDidDoc.js";
28import { getAtprotoUser } from "../../atproto/utils/getAtprotoUser.js";
29import getUserAgent from "../getUserAgent.js";
30
31const mergeUsersQueue = new Queue("mergeUsers", {
32 connection: completeEnvironment.bullmqConnection,
33 defaultJobOptions: {
34 removeOnComplete: true,
35 attempts: 6,
36 backoff: {
37 type: "exponential",
38 delay: 25000,
39 },
40 removeOnFail: false,
41 },
42});
43
44// This function will return userid after processing it.
45async function getRemoteActorIdProcessor(job: Job) {
46 const actorUrl: string = job.data.actorUrl;
47 const forceUpdate: boolean = job.data.forceUpdate;
48 let res: string | User | undefined | null = await getUserIdFromRemoteId(
49 actorUrl
50 );
51 let url = undefined;
52 try {
53 url = new URL(actorUrl);
54 } catch (error) {
55 res = await getDeletedUser();
56 url = undefined;
57 logger.debug({
58 message: `Invalid url ${actorUrl}`,
59 url: actorUrl,
60 stack: new Error().stack,
61 });
62 }
63 if (res === "" || (forceUpdate && url != undefined)) {
64 let federatedHost = await FederatedHost.findOne({
65 where: sequelize.where(
66 sequelize.fn("lower", sequelize.col("displayName")),
67 url?.host ? url.host.toLowerCase() : ""
68 ),
69 });
70 const hostBanned = federatedHost?.blocked;
71 if (hostBanned) {
72 res = await getDeletedUser();
73 } else {
74 const user = (await User.findByPk(job.data.userId)) as User;
75 const userPetition = await getPetitionSigned(user, actorUrl);
76 if (userPetition) {
77 if (!federatedHost && url) {
78 const federatedHostToCreate = {
79 displayName: url.host.toLocaleLowerCase(),
80 publicInbox: userPetition.endpoints?.sharedInbox
81 ? userPetition.endpoints?.sharedInbox
82 : "",
83 };
84 federatedHost = (
85 await FederatedHost.findOrCreate({ where: federatedHostToCreate })
86 )[0];
87 }
88 if (!url || !federatedHost) {
89 logger.warn({
90 message: "Url is not valid wtf",
91 trace: new Error().stack,
92 });
93 return await getDeletedUser();
94 }
95 const remoteMentionUrl =
96 typeof userPetition.url === "string" ? userPetition.url : "";
97 let followers = 0;
98 let followed = 0;
99 if (userPetition.followers) {
100 const followersPetition = await getPetitionSigned(
101 user,
102 userPetition.followers
103 );
104 if (followersPetition && followersPetition.totalItems) {
105 followers = followersPetition.totalItems;
106 }
107 }
108 if (userPetition.following) {
109 const followingPetition = await getPetitionSigned(
110 user,
111 userPetition.following
112 );
113 if (followingPetition && followingPetition.totalItems) {
114 followed = followingPetition.totalItems;
115 }
116 }
117 const userData = {
118 hideFollows: false,
119 hideProfileNotLoggedIn: false,
120 url: `@${userPetition.preferredUsername}@${url?.host}`,
121 name: userPetition.name
122 ? userPetition.name
123 : userPetition.preferredUsername,
124 email: null,
125 description: userPetition.summary ? userPetition.summary : "",
126 avatar: userPetition.icon?.url
127 ? userPetition.icon.url
128 : `${completeEnvironment.mediaUrl}/uploads/default.webp`,
129 headerImage: userPetition?.image?.url
130 ? userPetition.image.url.toString()
131 : ``,
132 password: "NOT_A_WAFRN_USER_NOT_REAL_PASSWORD",
133 publicKey: userPetition.publicKey?.publicKeyPem,
134 remoteInbox: userPetition.inbox,
135 remoteId: actorUrl,
136 activated: true,
137 federatedHostId: federatedHost.id,
138 remoteMentionUrl: remoteMentionUrl,
139 followersCollectionUrl: userPetition.followers,
140 followingCollectionUrl: userPetition.following,
141 isBot: userPetition.type != "Person",
142 followerCount: followers,
143 followingCount: followed,
144 createdAt: userPetition.published
145 ? new Date(userPetition.published)
146 : new Date(),
147 updatedAt: new Date(),
148 NSFW: false,
149 birthDate: new Date(),
150 userMigratedTo: userPetition.movedTo || "",
151 displayUrl: Array.isArray(userPetition.url)
152 ? userPetition.url[0]
153 : userPetition.url,
154 manuallyAcceptsFollows: userPetition.manuallyApprovesFollowers ?? false
155 };
156 federatedHost.publicInbox = userPetition.endpoints?.sharedInbox || null;
157 await federatedHost.save();
158 let userRes;
159 const existingUsers = await User.findAll({
160 where: {
161 [Op.or]: [
162 sequelize.where(
163 sequelize.fn("lower", sequelize.col("url")),
164 userData.url.toLowerCase()
165 ),
166 {
167 remoteId: userData.remoteId,
168 },
169 ],
170 },
171 });
172 if (res) {
173 if (res !== (await getDeletedUser())) {
174 userRes = await User.findByPk(res as string);
175 if (existingUsers.length > 1) {
176 logger.debug({
177 message: `Multiple fedi users found for ${userData.url} (${userData.remoteId}): ${existingUsers.length}`,
178 });
179 for await (const userWithDuplicatedData of existingUsers.slice(
180 1
181 )) {
182 userWithDuplicatedData.url =
183 userWithDuplicatedData.url +
184 "_DUPLICATED_" +
185 new Date().getTime();
186 userWithDuplicatedData.remoteId =
187 userWithDuplicatedData.remoteId +
188 "_DUPLICATED_" +
189 new Date().getTime();
190 }
191 }
192 if (
193 existingUsers &&
194 existingUsers.length > 0 &&
195 existingUsers[0] &&
196 userRes?.id !== existingUsers[0]?.id
197 ) {
198 const existingUser = existingUsers[0];
199 existingUser.activated = false;
200 existingUser.remoteId = `${existingUser.remoteId
201 }_OVERWRITTEN_ON${new Date().getTime()}`;
202 existingUser.url = `${existingUser.url
203 }_OVERWRITTEN_ON${new Date().getTime()}`;
204 await existingUser.save();
205 if (userRes) {
206 const updates = [
207 Follows.update(
208 {
209 followerId: userRes.id,
210 },
211 {
212 where: {
213 followerId: existingUser.id,
214 },
215 }
216 ),
217 Follows.update(
218 {
219 followedId: userRes.id,
220 },
221 {
222 where: {
223 followedId: existingUser.id,
224 },
225 }
226 ),
227 Post.update(
228 {
229 userId: userRes.id,
230 },
231 {
232 where: {
233 userId: existingUser.id,
234 },
235 }
236 ),
237 UserLikesPostRelations.update(
238 {
239 userId: userRes.id,
240 },
241 {
242 where: {
243 userId: existingUser.id,
244 },
245 }
246 ),
247 EmojiReaction.update(
248 {
249 userId: userRes.id,
250 },
251 {
252 where: {
253 userId: existingUser.id,
254 },
255 }
256 ),
257 Blocks.update(
258 {
259 blockedId: userRes.id,
260 },
261 {
262 where: {
263 blockedId: existingUser.id,
264 },
265 }
266 ),
267 Blocks.update(
268 {
269 blockerId: userRes.id,
270 },
271 {
272 where: {
273 blockerId: existingUser.id,
274 },
275 }
276 ),
277 Mutes.update(
278 {
279 muterId: userRes.id,
280 },
281 {
282 where: {
283 muterId: existingUser.id,
284 },
285 }
286 ),
287 Mutes.update(
288 {
289 mutedId: userRes.id,
290 },
291 {
292 where: {
293 mutedId: existingUser.id,
294 },
295 }
296 ),
297 PostMentionsUserRelation.update(
298 {
299 userId: userRes.id,
300 },
301 {
302 where: {
303 userId: existingUser.id,
304 },
305 }
306 ),
307 ];
308 await Promise.all(updates);
309 }
310 await redisCache.del("userRemoteId:" + existingUser.remoteId);
311 }
312 if (userRes) {
313 userRes.set(userData);
314 await userRes.save();
315 } else {
316 redisCache.del("userRemoteId:" + actorUrl.toLocaleLowerCase());
317 }
318 }
319 } else {
320 if (existingUsers && existingUsers[0]) {
321 existingUsers[0].set(userData);
322 await existingUsers[0].save();
323 } else {
324 userRes = await User.create(userData);
325 }
326 }
327 if (
328 userRes &&
329 userRes.id &&
330 userRes.url != completeEnvironment.deletedUser &&
331 userPetition
332 ) {
333 try {
334 if (userPetition._wafrn_customCSS) {
335 let customCSS: string | undefined = undefined
336 logger.info({ id: userPetition.id }, "found custom css for this user");
337 if (URL.canParse(userPetition._wafrn_customCSS)) {
338 const cssRes = await fetch(userPetition._wafrn_customCSS, {
339 headers: {
340 "User-Agent": getUserAgent('ActivityPubWorker')
341 }
342 })
343 if (cssRes.ok)
344 customCSS = await cssRes.text()
345 } else {
346 customCSS = userPetition._wafrn_customCSS
347 }
348 if (customCSS) {
349 const css = await processExternalCustomCss(userRes.id, customCSS)
350 await writeFile(`uploads/themes/${userRes.id}.css`, css)
351 }
352 } else if (existsSync(`uploads/themes/${userRes.id}.css`)) {
353 await unlink(`uploads/themes/${userRes.id}.css`)
354 }
355 } catch (e) {
356 logger.warn(e)
357 }
358
359 try {
360 if (userPetition.alsoKnownAs) {
361 const atUri = (userPetition.alsoKnownAs as string[]).find(x => x.startsWith('did:') || x.startsWith('at://'))
362 let mergeAcc = 0
363 if (atUri) {
364 const atDoc = await getDidDoc(atUri)
365 if (atDoc && (
366 atDoc.alsoKnownAs?.includes(userPetition.id) ||
367 atDoc.alsoKnownAs?.includes(userPetition.id.replace('/fediverse/blog', '/blog'))
368 )) {
369 // make it merged (wafrn user)
370 mergeAcc = 1
371 } else if (atDoc && (
372 userPetition.id.includes('brid.gy/')
373 )) {
374 // check if bridgy fed
375 // we can't bridge bridged from web users so hard code to bsky.brid.gy
376 mergeAcc = 2
377 }
378 if (mergeAcc > 0) {
379 const oldUser = await User.findOne({
380 where: {
381 bskyDid: atUri.replace(/^at:\/\//, '')
382 }
383 })
384 if (oldUser) {
385 logger.info({ oldUser, userRes }, 'merging accs')
386 // put this in a queue so it wont lag entire instance
387 await mergeUsersQueue.add("mergeUsers", {
388 primaryUserId: mergeAcc === 2 ? oldUser.id : userRes.id,
389 userToMergeId: mergeAcc === 1 ? oldUser.id : userRes.id
390 });
391 }
392
393 // if bridgy user, to prevent more issues, return the existing bsky user instead
394 if (mergeAcc === 2) return oldUser
395 }
396 }
397 }
398 } catch (e) {
399 logger.warn({
400 error: e,
401 userPetition
402 }, 'cannot merge user')
403 }
404 }
405 if (
406 userRes &&
407 userRes.id &&
408 userRes.url != completeEnvironment.deletedUser &&
409 userPetition &&
410 userPetition.attachment &&
411 userPetition.attachment.length
412 ) {
413 await UserOptions.destroy({
414 where: {
415 userId: userRes.id,
416 optionName: {
417 [Op.like]: "fediverse.public.attachment",
418 },
419 },
420 });
421 const properties = userPetition.attachment.filter(
422 (elem: any) => elem.type === "PropertyValue"
423 );
424 await UserOptions.create({
425 userId: userRes.id,
426 optionName: `fediverse.public.attachment`,
427 optionValue: JSON.stringify(properties),
428 public: true,
429 });
430 }
431 res = userRes?.id ? userRes.id : await getDeletedUser();
432 try {
433 if (userRes) {
434 const tags = userPetition?.tag
435 ? Array.isArray(userPetition.tag)
436 ? userPetition.tag
437 : [userPetition.tag]
438 : [];
439 const emojis = [
440 ...new Set(
441 tags.filter((elem: fediverseTag) => elem.type === "Emoji")
442 ),
443 ];
444 await processUserEmojis(userRes, emojis);
445 }
446 } catch (error) {
447 logger.info({
448 message: `Error processing emojis from user ${userRes?.url}`,
449 error: error,
450 tags: userPetition?.tag,
451 userPetition: userPetition,
452 });
453 }
454 }
455 }
456 }
457 return res;
458}
459
460export { getRemoteActorIdProcessor };