unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at cache-folder-container 460 lines 16 kB view raw
1import { Job, Queue } from "bullmq"; 2import { 3 Blocks, 4 EmojiReaction, 5 FederatedHost, 6 Follows, 7 Mutes, 8 Post, 9 PostMentionsUserRelation, 10 User, 11 UserLikesPostRelations, 12 UserOptions, 13 sequelize, 14} from "../../models/index.js"; 15import { completeEnvironment } from "../backendOptions.js"; 16import { getUserIdFromRemoteId } from "../cacheGetters/getUserIdFromRemoteId.js"; 17import { getPetitionSigned } from "../activitypub/getPetitionSigned.js"; 18import { processUserEmojis } from "../activitypub/processUserEmojis.js"; 19import { fediverseTag } from "../../interfaces/fediverse/tags.js"; 20import { logger } from "../logger.js"; 21import { redisCache } from "../redis.js"; 22import { Op } from "sequelize"; 23import { getDeletedUser } from "../cacheGetters/getDeletedUser.js"; 24import processExternalCustomCss from "../processExternalCustomCss.js"; 25import { unlink, writeFile } from "fs/promises"; 26import { existsSync } from "fs"; 27import { getDidDoc } from "../atproto/getDidDoc.js"; 28import { getAtprotoUser } from "../../atproto/utils/getAtprotoUser.js"; 29import getUserAgent from "../getUserAgent.js"; 30 31const mergeUsersQueue = new Queue("mergeUsers", { 32 connection: completeEnvironment.bullmqConnection, 33 defaultJobOptions: { 34 removeOnComplete: true, 35 attempts: 6, 36 backoff: { 37 type: "exponential", 38 delay: 25000, 39 }, 40 removeOnFail: false, 41 }, 42}); 43 44// This function will return userid after processing it. 45async function getRemoteActorIdProcessor(job: Job) { 46 const actorUrl: string = job.data.actorUrl; 47 const forceUpdate: boolean = job.data.forceUpdate; 48 let res: string | User | undefined | null = await getUserIdFromRemoteId( 49 actorUrl 50 ); 51 let url = undefined; 52 try { 53 url = new URL(actorUrl); 54 } catch (error) { 55 res = await getDeletedUser(); 56 url = undefined; 57 logger.debug({ 58 message: `Invalid url ${actorUrl}`, 59 url: actorUrl, 60 stack: new Error().stack, 61 }); 62 } 63 if (res === "" || (forceUpdate && url != undefined)) { 64 let federatedHost = await FederatedHost.findOne({ 65 where: sequelize.where( 66 sequelize.fn("lower", sequelize.col("displayName")), 67 url?.host ? url.host.toLowerCase() : "" 68 ), 69 }); 70 const hostBanned = federatedHost?.blocked; 71 if (hostBanned) { 72 res = await getDeletedUser(); 73 } else { 74 const user = (await User.findByPk(job.data.userId)) as User; 75 const userPetition = await getPetitionSigned(user, actorUrl); 76 if (userPetition) { 77 if (!federatedHost && url) { 78 const federatedHostToCreate = { 79 displayName: url.host.toLocaleLowerCase(), 80 publicInbox: userPetition.endpoints?.sharedInbox 81 ? userPetition.endpoints?.sharedInbox 82 : "", 83 }; 84 federatedHost = ( 85 await FederatedHost.findOrCreate({ where: federatedHostToCreate }) 86 )[0]; 87 } 88 if (!url || !federatedHost) { 89 logger.warn({ 90 message: "Url is not valid wtf", 91 trace: new Error().stack, 92 }); 93 return await getDeletedUser(); 94 } 95 const remoteMentionUrl = 96 typeof userPetition.url === "string" ? userPetition.url : ""; 97 let followers = 0; 98 let followed = 0; 99 if (userPetition.followers) { 100 const followersPetition = await getPetitionSigned( 101 user, 102 userPetition.followers 103 ); 104 if (followersPetition && followersPetition.totalItems) { 105 followers = followersPetition.totalItems; 106 } 107 } 108 if (userPetition.following) { 109 const followingPetition = await getPetitionSigned( 110 user, 111 userPetition.following 112 ); 113 if (followingPetition && followingPetition.totalItems) { 114 followed = followingPetition.totalItems; 115 } 116 } 117 const userData = { 118 hideFollows: false, 119 hideProfileNotLoggedIn: false, 120 url: `@${userPetition.preferredUsername}@${url?.host}`, 121 name: userPetition.name 122 ? userPetition.name 123 : userPetition.preferredUsername, 124 email: null, 125 description: userPetition.summary ? userPetition.summary : "", 126 avatar: userPetition.icon?.url 127 ? userPetition.icon.url 128 : `${completeEnvironment.mediaUrl}/uploads/default.webp`, 129 headerImage: userPetition?.image?.url 130 ? userPetition.image.url.toString() 131 : ``, 132 password: "NOT_A_WAFRN_USER_NOT_REAL_PASSWORD", 133 publicKey: userPetition.publicKey?.publicKeyPem, 134 remoteInbox: userPetition.inbox, 135 remoteId: actorUrl, 136 activated: true, 137 federatedHostId: federatedHost.id, 138 remoteMentionUrl: remoteMentionUrl, 139 followersCollectionUrl: userPetition.followers, 140 followingCollectionUrl: userPetition.following, 141 isBot: userPetition.type != "Person", 142 followerCount: followers, 143 followingCount: followed, 144 createdAt: userPetition.published 145 ? new Date(userPetition.published) 146 : new Date(), 147 updatedAt: new Date(), 148 NSFW: false, 149 birthDate: new Date(), 150 userMigratedTo: userPetition.movedTo || "", 151 displayUrl: Array.isArray(userPetition.url) 152 ? userPetition.url[0] 153 : userPetition.url, 154 manuallyAcceptsFollows: userPetition.manuallyApprovesFollowers ?? false 155 }; 156 federatedHost.publicInbox = userPetition.endpoints?.sharedInbox || null; 157 await federatedHost.save(); 158 let userRes; 159 const existingUsers = await User.findAll({ 160 where: { 161 [Op.or]: [ 162 sequelize.where( 163 sequelize.fn("lower", sequelize.col("url")), 164 userData.url.toLowerCase() 165 ), 166 { 167 remoteId: userData.remoteId, 168 }, 169 ], 170 }, 171 }); 172 if (res) { 173 if (res !== (await getDeletedUser())) { 174 userRes = await User.findByPk(res as string); 175 if (existingUsers.length > 1) { 176 logger.debug({ 177 message: `Multiple fedi users found for ${userData.url} (${userData.remoteId}): ${existingUsers.length}`, 178 }); 179 for await (const userWithDuplicatedData of existingUsers.slice( 180 1 181 )) { 182 userWithDuplicatedData.url = 183 userWithDuplicatedData.url + 184 "_DUPLICATED_" + 185 new Date().getTime(); 186 userWithDuplicatedData.remoteId = 187 userWithDuplicatedData.remoteId + 188 "_DUPLICATED_" + 189 new Date().getTime(); 190 } 191 } 192 if ( 193 existingUsers && 194 existingUsers.length > 0 && 195 existingUsers[0] && 196 userRes?.id !== existingUsers[0]?.id 197 ) { 198 const existingUser = existingUsers[0]; 199 existingUser.activated = false; 200 existingUser.remoteId = `${existingUser.remoteId 201 }_OVERWRITTEN_ON${new Date().getTime()}`; 202 existingUser.url = `${existingUser.url 203 }_OVERWRITTEN_ON${new Date().getTime()}`; 204 await existingUser.save(); 205 if (userRes) { 206 const updates = [ 207 Follows.update( 208 { 209 followerId: userRes.id, 210 }, 211 { 212 where: { 213 followerId: existingUser.id, 214 }, 215 } 216 ), 217 Follows.update( 218 { 219 followedId: userRes.id, 220 }, 221 { 222 where: { 223 followedId: existingUser.id, 224 }, 225 } 226 ), 227 Post.update( 228 { 229 userId: userRes.id, 230 }, 231 { 232 where: { 233 userId: existingUser.id, 234 }, 235 } 236 ), 237 UserLikesPostRelations.update( 238 { 239 userId: userRes.id, 240 }, 241 { 242 where: { 243 userId: existingUser.id, 244 }, 245 } 246 ), 247 EmojiReaction.update( 248 { 249 userId: userRes.id, 250 }, 251 { 252 where: { 253 userId: existingUser.id, 254 }, 255 } 256 ), 257 Blocks.update( 258 { 259 blockedId: userRes.id, 260 }, 261 { 262 where: { 263 blockedId: existingUser.id, 264 }, 265 } 266 ), 267 Blocks.update( 268 { 269 blockerId: userRes.id, 270 }, 271 { 272 where: { 273 blockerId: existingUser.id, 274 }, 275 } 276 ), 277 Mutes.update( 278 { 279 muterId: userRes.id, 280 }, 281 { 282 where: { 283 muterId: existingUser.id, 284 }, 285 } 286 ), 287 Mutes.update( 288 { 289 mutedId: userRes.id, 290 }, 291 { 292 where: { 293 mutedId: existingUser.id, 294 }, 295 } 296 ), 297 PostMentionsUserRelation.update( 298 { 299 userId: userRes.id, 300 }, 301 { 302 where: { 303 userId: existingUser.id, 304 }, 305 } 306 ), 307 ]; 308 await Promise.all(updates); 309 } 310 await redisCache.del("userRemoteId:" + existingUser.remoteId); 311 } 312 if (userRes) { 313 userRes.set(userData); 314 await userRes.save(); 315 } else { 316 redisCache.del("userRemoteId:" + actorUrl.toLocaleLowerCase()); 317 } 318 } 319 } else { 320 if (existingUsers && existingUsers[0]) { 321 existingUsers[0].set(userData); 322 await existingUsers[0].save(); 323 } else { 324 userRes = await User.create(userData); 325 } 326 } 327 if ( 328 userRes && 329 userRes.id && 330 userRes.url != completeEnvironment.deletedUser && 331 userPetition 332 ) { 333 try { 334 if (userPetition._wafrn_customCSS) { 335 let customCSS: string | undefined = undefined 336 logger.info({ id: userPetition.id }, "found custom css for this user"); 337 if (URL.canParse(userPetition._wafrn_customCSS)) { 338 const cssRes = await fetch(userPetition._wafrn_customCSS, { 339 headers: { 340 "User-Agent": getUserAgent('ActivityPubWorker') 341 } 342 }) 343 if (cssRes.ok) 344 customCSS = await cssRes.text() 345 } else { 346 customCSS = userPetition._wafrn_customCSS 347 } 348 if (customCSS) { 349 const css = await processExternalCustomCss(userRes.id, customCSS) 350 await writeFile(`uploads/themes/${userRes.id}.css`, css) 351 } 352 } else if (existsSync(`uploads/themes/${userRes.id}.css`)) { 353 await unlink(`uploads/themes/${userRes.id}.css`) 354 } 355 } catch (e) { 356 logger.warn(e) 357 } 358 359 try { 360 if (userPetition.alsoKnownAs) { 361 const atUri = (userPetition.alsoKnownAs as string[]).find(x => x.startsWith('did:') || x.startsWith('at://')) 362 let mergeAcc = 0 363 if (atUri) { 364 const atDoc = await getDidDoc(atUri) 365 if (atDoc && ( 366 atDoc.alsoKnownAs?.includes(userPetition.id) || 367 atDoc.alsoKnownAs?.includes(userPetition.id.replace('/fediverse/blog', '/blog')) 368 )) { 369 // make it merged (wafrn user) 370 mergeAcc = 1 371 } else if (atDoc && ( 372 userPetition.id.includes('brid.gy/') 373 )) { 374 // check if bridgy fed 375 // we can't bridge bridged from web users so hard code to bsky.brid.gy 376 mergeAcc = 2 377 } 378 if (mergeAcc > 0) { 379 const oldUser = await User.findOne({ 380 where: { 381 bskyDid: atUri.replace(/^at:\/\//, '') 382 } 383 }) 384 if (oldUser) { 385 logger.info({ oldUser, userRes }, 'merging accs') 386 // put this in a queue so it wont lag entire instance 387 await mergeUsersQueue.add("mergeUsers", { 388 primaryUserId: mergeAcc === 2 ? oldUser.id : userRes.id, 389 userToMergeId: mergeAcc === 1 ? oldUser.id : userRes.id 390 }); 391 } 392 393 // if bridgy user, to prevent more issues, return the existing bsky user instead 394 if (mergeAcc === 2) return oldUser 395 } 396 } 397 } 398 } catch (e) { 399 logger.warn({ 400 error: e, 401 userPetition 402 }, 'cannot merge user') 403 } 404 } 405 if ( 406 userRes && 407 userRes.id && 408 userRes.url != completeEnvironment.deletedUser && 409 userPetition && 410 userPetition.attachment && 411 userPetition.attachment.length 412 ) { 413 await UserOptions.destroy({ 414 where: { 415 userId: userRes.id, 416 optionName: { 417 [Op.like]: "fediverse.public.attachment", 418 }, 419 }, 420 }); 421 const properties = userPetition.attachment.filter( 422 (elem: any) => elem.type === "PropertyValue" 423 ); 424 await UserOptions.create({ 425 userId: userRes.id, 426 optionName: `fediverse.public.attachment`, 427 optionValue: JSON.stringify(properties), 428 public: true, 429 }); 430 } 431 res = userRes?.id ? userRes.id : await getDeletedUser(); 432 try { 433 if (userRes) { 434 const tags = userPetition?.tag 435 ? Array.isArray(userPetition.tag) 436 ? userPetition.tag 437 : [userPetition.tag] 438 : []; 439 const emojis = [ 440 ...new Set( 441 tags.filter((elem: fediverseTag) => elem.type === "Emoji") 442 ), 443 ]; 444 await processUserEmojis(userRes, emojis); 445 } 446 } catch (error) { 447 logger.info({ 448 message: `Error processing emojis from user ${userRes?.url}`, 449 error: error, 450 tags: userPetition?.tag, 451 userPetition: userPetition, 452 }); 453 } 454 } 455 } 456 } 457 return res; 458} 459 460export { getRemoteActorIdProcessor };