unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at development 408 lines 15 kB view raw
1import { Job, Queue } from 'bullmq' 2import { 3 Blocks, 4 EmojiReaction, 5 FederatedHost, 6 Follows, 7 Mutes, 8 Post, 9 PostMentionsUserRelation, 10 User, 11 UserLikesPostRelations, 12 UserOptions, 13 sequelize 14} from '../../models/index.js' 15import { completeEnvironment } from '../backendOptions.js' 16import { getUserIdFromRemoteId } from '../cacheGetters/getUserIdFromRemoteId.js' 17import { getPetitionSigned } from '../activitypub/getPetitionSigned.js' 18import { processUserEmojis } from '../activitypub/processUserEmojis.js' 19import { fediverseTag } from '../../interfaces/fediverse/tags.js' 20import { logger } from '../logger.js' 21import { redisCache } from '../redis.js' 22import { Op } from 'sequelize' 23import { getDeletedUser } from '../cacheGetters/getDeletedUser.js' 24import processExternalCustomCss from '../processExternalCustomCss.js' 25import { unlink, writeFile } from 'fs/promises' 26import { existsSync } from 'fs' 27import { getDidDoc } from '../atproto/getDidDoc.js' 28import getUserAgent from '../getUserAgent.js' 29 30const mergeUsersQueue = new Queue('mergeUsers', { 31 connection: completeEnvironment.bullmqConnection, 32 defaultJobOptions: { 33 removeOnComplete: true, 34 attempts: 6, 35 backoff: { 36 type: 'exponential', 37 delay: 25000 38 }, 39 removeOnFail: false 40 } 41}) 42 43// This function will return userid after processing it. 44async function getRemoteActorIdProcessor(job: Job) { 45 const actorUrl: string = job.data.actorUrl 46 const forceUpdate: boolean = job.data.forceUpdate 47 let res: string | User | undefined | null = await getUserIdFromRemoteId(actorUrl) 48 let url = undefined 49 try { 50 url = new URL(actorUrl) 51 } catch (error) { 52 res = await getDeletedUser() 53 url = undefined 54 logger.debug({ 55 message: `Invalid url ${actorUrl}`, 56 url: actorUrl, 57 stack: new Error().stack 58 }) 59 } 60 if (res === '' || (forceUpdate && url != undefined)) { 61 let federatedHost = await FederatedHost.findOne({ 62 where: sequelize.where( 63 sequelize.fn('lower', sequelize.col('displayName')), 64 url?.host ? url.host.toLowerCase() : '' 65 ) 66 }) 67 const hostBanned = federatedHost?.blocked 68 if (hostBanned) { 69 res = await getDeletedUser() 70 } else { 71 const user = (await User.findByPk(job.data.userId)) as User 72 const userPetition = await getPetitionSigned(user, actorUrl) 73 if (userPetition) { 74 if (!federatedHost && url) { 75 const federatedHostToCreate = { 76 displayName: url.host.toLocaleLowerCase(), 77 publicInbox: userPetition.endpoints?.sharedInbox ? userPetition.endpoints?.sharedInbox : '' 78 } 79 federatedHost = (await FederatedHost.findOrCreate({ where: federatedHostToCreate }))[0] 80 } 81 if (!url || !federatedHost) { 82 logger.warn({ 83 message: 'Url is not valid wtf', 84 trace: new Error().stack 85 }) 86 return await getDeletedUser() 87 } 88 const remoteMentionUrl = typeof userPetition.url === 'string' ? userPetition.url : '' 89 let followers = 0 90 let followed = 0 91 if (userPetition.followers) { 92 const followersPetition = await getPetitionSigned(user, userPetition.followers) 93 if (followersPetition && followersPetition.totalItems) { 94 followers = followersPetition.totalItems 95 } 96 } 97 if (userPetition.following) { 98 const followingPetition = await getPetitionSigned(user, userPetition.following) 99 if (followingPetition && followingPetition.totalItems) { 100 followed = followingPetition.totalItems 101 } 102 } 103 const userData = { 104 hideFollows: false, 105 hideProfileNotLoggedIn: false, 106 url: `@${userPetition.preferredUsername}@${url?.host}`, 107 name: userPetition.name ? userPetition.name : userPetition.preferredUsername, 108 email: null, 109 description: userPetition.summary ? userPetition.summary : '', 110 avatar: userPetition.icon?.url 111 ? userPetition.icon.url 112 : `${completeEnvironment.mediaUrl}/uploads/default.webp`, 113 headerImage: userPetition?.image?.url ? userPetition.image.url.toString() : ``, 114 password: 'NOT_A_WAFRN_USER_NOT_REAL_PASSWORD', 115 publicKey: userPetition.publicKey?.publicKeyPem, 116 remoteInbox: userPetition.inbox, 117 remoteId: actorUrl, 118 activated: true, 119 federatedHostId: federatedHost.id, 120 remoteMentionUrl: remoteMentionUrl, 121 followersCollectionUrl: userPetition.followers, 122 followingCollectionUrl: userPetition.following, 123 isBot: userPetition.type != 'Person', 124 followerCount: followers, 125 followingCount: followed, 126 createdAt: userPetition.published ? new Date(userPetition.published) : new Date(), 127 updatedAt: new Date(), 128 NSFW: false, 129 birthDate: new Date(), 130 userMigratedTo: userPetition.movedTo || '', 131 displayUrl: Array.isArray(userPetition.url) ? userPetition.url[0] : userPetition.url, 132 manuallyAcceptsFollows: userPetition.manuallyApprovesFollowers ?? false 133 } 134 federatedHost.publicInbox = userPetition.endpoints?.sharedInbox || null 135 await federatedHost.save() 136 let userRes 137 const existingUsers = await User.findAll({ 138 where: { 139 [Op.or]: [ 140 sequelize.where(sequelize.fn('lower', sequelize.col('url')), userData.url.toLowerCase()), 141 { 142 remoteId: userData.remoteId 143 } 144 ] 145 } 146 }) 147 if (res) { 148 if (res !== (await getDeletedUser())) { 149 userRes = await User.findByPk(res as string) 150 if (existingUsers.length > 1) { 151 logger.debug({ 152 message: `Multiple fedi users found for ${userData.url} (${userData.remoteId}): ${existingUsers.length}` 153 }) 154 for await (const userWithDuplicatedData of existingUsers.slice(1)) { 155 userWithDuplicatedData.url = userWithDuplicatedData.url + '_DUPLICATED_' + new Date().getTime() 156 userWithDuplicatedData.remoteId = 157 userWithDuplicatedData.remoteId + '_DUPLICATED_' + new Date().getTime() 158 } 159 } 160 if (existingUsers && existingUsers.length > 0 && existingUsers[0] && userRes?.id !== existingUsers[0]?.id) { 161 const existingUser = existingUsers[0] 162 existingUser.activated = false 163 existingUser.remoteId = `${existingUser.remoteId}_OVERWRITTEN_ON${new Date().getTime()}` 164 existingUser.url = `${existingUser.url}_OVERWRITTEN_ON${new Date().getTime()}` 165 await existingUser.save() 166 if (userRes) { 167 const updates = [ 168 Follows.update( 169 { 170 followerId: userRes.id 171 }, 172 { 173 where: { 174 followerId: existingUser.id 175 } 176 } 177 ), 178 Follows.update( 179 { 180 followedId: userRes.id 181 }, 182 { 183 where: { 184 followedId: existingUser.id 185 } 186 } 187 ), 188 Post.update( 189 { 190 userId: userRes.id 191 }, 192 { 193 where: { 194 userId: existingUser.id 195 } 196 } 197 ), 198 UserLikesPostRelations.update( 199 { 200 userId: userRes.id 201 }, 202 { 203 where: { 204 userId: existingUser.id 205 } 206 } 207 ), 208 EmojiReaction.update( 209 { 210 userId: userRes.id 211 }, 212 { 213 where: { 214 userId: existingUser.id 215 } 216 } 217 ), 218 Blocks.update( 219 { 220 blockedId: userRes.id 221 }, 222 { 223 where: { 224 blockedId: existingUser.id 225 } 226 } 227 ), 228 Blocks.update( 229 { 230 blockerId: userRes.id 231 }, 232 { 233 where: { 234 blockerId: existingUser.id 235 } 236 } 237 ), 238 Mutes.update( 239 { 240 muterId: userRes.id 241 }, 242 { 243 where: { 244 muterId: existingUser.id 245 } 246 } 247 ), 248 Mutes.update( 249 { 250 mutedId: userRes.id 251 }, 252 { 253 where: { 254 mutedId: existingUser.id 255 } 256 } 257 ), 258 PostMentionsUserRelation.update( 259 { 260 userId: userRes.id 261 }, 262 { 263 where: { 264 userId: existingUser.id 265 } 266 } 267 ) 268 ] 269 await Promise.all(updates) 270 } 271 await redisCache.del('userRemoteId:' + existingUser.remoteId) 272 } 273 if (userRes) { 274 userRes.set(userData) 275 await userRes.save() 276 } else { 277 redisCache.del('userRemoteId:' + actorUrl.toLocaleLowerCase()) 278 } 279 } 280 } else { 281 if (existingUsers && existingUsers[0]) { 282 existingUsers[0].set(userData) 283 await existingUsers[0].save() 284 } else { 285 userRes = await User.create(userData) 286 } 287 } 288 if (userRes && userRes.id && userRes.url != completeEnvironment.deletedUser && userPetition) { 289 try { 290 if (userPetition._wafrn_customCSS) { 291 let customCSS: string | undefined = undefined 292 logger.info({ id: userPetition.id }, 'found custom css for this user') 293 if (URL.canParse(userPetition._wafrn_customCSS)) { 294 const cssRes = await fetch(userPetition._wafrn_customCSS, { 295 headers: { 296 'User-Agent': getUserAgent('ActivityPubWorker') 297 } 298 }) 299 if (cssRes.ok) customCSS = await cssRes.text() 300 } else { 301 customCSS = userPetition._wafrn_customCSS 302 } 303 if (customCSS) { 304 const css = await processExternalCustomCss(userRes.id, customCSS) 305 await writeFile(`uploads/themes/${userRes.id}.css`, css) 306 } 307 } else if (existsSync(`uploads/themes/${userRes.id}.css`)) { 308 await unlink(`uploads/themes/${userRes.id}.css`) 309 } 310 } catch (e) { 311 logger.warn(e) 312 } 313 314 try { 315 if (userPetition.alsoKnownAs) { 316 const atUri = (userPetition.alsoKnownAs as string[]).find( 317 (x) => x.startsWith('did:') || x.startsWith('at://') 318 ) 319 let mergeAcc = 0 320 if (atUri) { 321 const atDoc = await getDidDoc(atUri) 322 if ( 323 atDoc && 324 (atDoc.alsoKnownAs?.includes(userPetition.id) || 325 atDoc.alsoKnownAs?.includes(userPetition.id.replace('/fediverse/blog', '/blog'))) 326 ) { 327 // make it merged (wafrn user) 328 mergeAcc = 1 329 } else if (atDoc && userPetition.id.includes('brid.gy/')) { 330 // check if bridgy fed 331 // we can't bridge bridged from web users so hard code to bsky.brid.gy 332 mergeAcc = 2 333 } 334 if (mergeAcc > 0) { 335 const oldUser = await User.findOne({ 336 where: { 337 bskyDid: atUri.replace(/^at:\/\//, '') 338 } 339 }) 340 if (oldUser) { 341 logger.info({ oldUser, userRes }, 'merging accs') 342 // put this in a queue so it wont lag entire instance 343 await mergeUsersQueue.add('mergeUsers', { 344 primaryUserId: mergeAcc === 2 ? oldUser.id : userRes.id, 345 userToMergeId: mergeAcc === 1 ? oldUser.id : userRes.id 346 }) 347 } 348 349 // if bridgy user, to prevent more issues, return the existing bsky user instead 350 if (mergeAcc === 2) return oldUser 351 } 352 } 353 } 354 } catch (e) { 355 logger.warn( 356 { 357 error: e, 358 userPetition 359 }, 360 'cannot merge user' 361 ) 362 } 363 } 364 if (userRes && userRes.id && userRes.url != completeEnvironment.deletedUser) { 365 if (userPetition && userPetition.attachment && userPetition.attachment.length) { 366 await UserOptions.destroy({ 367 where: { 368 userId: userRes.id, 369 optionName: { 370 [Op.like]: 'fediverse.public.attachment' 371 } 372 } 373 }) 374 const properties = userPetition.attachment.filter((elem: any) => elem.type === 'PropertyValue') 375 await UserOptions.create({ 376 userId: userRes.id, 377 optionName: `fediverse.public.attachment`, 378 optionValue: JSON.stringify(properties), 379 public: true 380 }) 381 } 382 } 383 res = userRes?.id ? userRes.id : await getDeletedUser() 384 try { 385 if (userRes) { 386 const tags = userPetition?.tag 387 ? Array.isArray(userPetition.tag) 388 ? userPetition.tag 389 : [userPetition.tag] 390 : [] 391 const emojis = [...new Set(tags.filter((elem: fediverseTag) => elem.type === 'Emoji'))] 392 await processUserEmojis(userRes, emojis) 393 } 394 } catch (error) { 395 logger.info({ 396 message: `Error processing emojis from user ${userRes?.url}`, 397 error: error, 398 tags: userPetition?.tag, 399 userPetition: userPetition 400 }) 401 } 402 } 403 } 404 } 405 return res 406} 407 408export { getRemoteActorIdProcessor }