unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Job, Queue } from 'bullmq'
2import {
3 Blocks,
4 EmojiReaction,
5 FederatedHost,
6 Follows,
7 Mutes,
8 Post,
9 PostMentionsUserRelation,
10 User,
11 UserLikesPostRelations,
12 UserOptions,
13 sequelize
14} from '../../models/index.js'
15import { completeEnvironment } from '../backendOptions.js'
16import { getUserIdFromRemoteId } from '../cacheGetters/getUserIdFromRemoteId.js'
17import { getPetitionSigned } from '../activitypub/getPetitionSigned.js'
18import { processUserEmojis } from '../activitypub/processUserEmojis.js'
19import { fediverseTag } from '../../interfaces/fediverse/tags.js'
20import { logger } from '../logger.js'
21import { redisCache } from '../redis.js'
22import { Op } from 'sequelize'
23import { getDeletedUser } from '../cacheGetters/getDeletedUser.js'
24import processExternalCustomCss from '../processExternalCustomCss.js'
25import { unlink, writeFile } from 'fs/promises'
26import { existsSync } from 'fs'
27import { getDidDoc } from '../atproto/getDidDoc.js'
28import getUserAgent from '../getUserAgent.js'
29
30const mergeUsersQueue = new Queue('mergeUsers', {
31 connection: completeEnvironment.bullmqConnection,
32 defaultJobOptions: {
33 removeOnComplete: true,
34 attempts: 6,
35 backoff: {
36 type: 'exponential',
37 delay: 25000
38 },
39 removeOnFail: false
40 }
41})
42
43// This function will return userid after processing it.
44async function getRemoteActorIdProcessor(job: Job) {
45 const actorUrl: string = job.data.actorUrl
46 const forceUpdate: boolean = job.data.forceUpdate
47 let res: string | User | undefined | null = await getUserIdFromRemoteId(actorUrl)
48 let url = undefined
49 try {
50 url = new URL(actorUrl)
51 } catch (error) {
52 res = await getDeletedUser()
53 url = undefined
54 logger.debug({
55 message: `Invalid url ${actorUrl}`,
56 url: actorUrl,
57 stack: new Error().stack
58 })
59 }
60 if (res === '' || (forceUpdate && url != undefined)) {
61 let federatedHost = await FederatedHost.findOne({
62 where: sequelize.where(
63 sequelize.fn('lower', sequelize.col('displayName')),
64 url?.host ? url.host.toLowerCase() : ''
65 )
66 })
67 const hostBanned = federatedHost?.blocked
68 if (hostBanned) {
69 res = await getDeletedUser()
70 } else {
71 const user = (await User.findByPk(job.data.userId)) as User
72 const userPetition = await getPetitionSigned(user, actorUrl)
73 if (userPetition) {
74 if (!federatedHost && url) {
75 const federatedHostToCreate = {
76 displayName: url.host.toLocaleLowerCase(),
77 publicInbox: userPetition.endpoints?.sharedInbox ? userPetition.endpoints?.sharedInbox : ''
78 }
79 federatedHost = (await FederatedHost.findOrCreate({ where: federatedHostToCreate }))[0]
80 }
81 if (!url || !federatedHost) {
82 logger.warn({
83 message: 'Url is not valid wtf',
84 trace: new Error().stack
85 })
86 return await getDeletedUser()
87 }
88 const remoteMentionUrl = typeof userPetition.url === 'string' ? userPetition.url : ''
89 let followers = 0
90 let followed = 0
91 if (userPetition.followers) {
92 const followersPetition = await getPetitionSigned(user, userPetition.followers)
93 if (followersPetition && followersPetition.totalItems) {
94 followers = followersPetition.totalItems
95 }
96 }
97 if (userPetition.following) {
98 const followingPetition = await getPetitionSigned(user, userPetition.following)
99 if (followingPetition && followingPetition.totalItems) {
100 followed = followingPetition.totalItems
101 }
102 }
103 const userData = {
104 hideFollows: false,
105 hideProfileNotLoggedIn: false,
106 url: `@${userPetition.preferredUsername}@${url?.host}`,
107 name: userPetition.name ? userPetition.name : userPetition.preferredUsername,
108 email: null,
109 description: userPetition.summary ? userPetition.summary : '',
110 avatar: userPetition.icon?.url
111 ? userPetition.icon.url
112 : `${completeEnvironment.mediaUrl}/uploads/default.webp`,
113 headerImage: userPetition?.image?.url ? userPetition.image.url.toString() : ``,
114 password: 'NOT_A_WAFRN_USER_NOT_REAL_PASSWORD',
115 publicKey: userPetition.publicKey?.publicKeyPem,
116 remoteInbox: userPetition.inbox,
117 remoteId: actorUrl,
118 activated: true,
119 federatedHostId: federatedHost.id,
120 remoteMentionUrl: remoteMentionUrl,
121 followersCollectionUrl: userPetition.followers,
122 followingCollectionUrl: userPetition.following,
123 isBot: userPetition.type != 'Person',
124 followerCount: followers,
125 followingCount: followed,
126 createdAt: userPetition.published ? new Date(userPetition.published) : new Date(),
127 updatedAt: new Date(),
128 NSFW: false,
129 birthDate: new Date(),
130 userMigratedTo: userPetition.movedTo || '',
131 displayUrl: Array.isArray(userPetition.url) ? userPetition.url[0] : userPetition.url,
132 manuallyAcceptsFollows: userPetition.manuallyApprovesFollowers ?? false
133 }
134 federatedHost.publicInbox = userPetition.endpoints?.sharedInbox || null
135 await federatedHost.save()
136 let userRes
137 const existingUsers = await User.findAll({
138 where: {
139 [Op.or]: [
140 sequelize.where(sequelize.fn('lower', sequelize.col('url')), userData.url.toLowerCase()),
141 {
142 remoteId: userData.remoteId
143 }
144 ]
145 }
146 })
147 if (res) {
148 if (res !== (await getDeletedUser())) {
149 userRes = await User.findByPk(res as string)
150 if (existingUsers.length > 1) {
151 logger.debug({
152 message: `Multiple fedi users found for ${userData.url} (${userData.remoteId}): ${existingUsers.length}`
153 })
154 for await (const userWithDuplicatedData of existingUsers.slice(1)) {
155 userWithDuplicatedData.url = userWithDuplicatedData.url + '_DUPLICATED_' + new Date().getTime()
156 userWithDuplicatedData.remoteId =
157 userWithDuplicatedData.remoteId + '_DUPLICATED_' + new Date().getTime()
158 }
159 }
160 if (existingUsers && existingUsers.length > 0 && existingUsers[0] && userRes?.id !== existingUsers[0]?.id) {
161 const existingUser = existingUsers[0]
162 existingUser.activated = false
163 existingUser.remoteId = `${existingUser.remoteId}_OVERWRITTEN_ON${new Date().getTime()}`
164 existingUser.url = `${existingUser.url}_OVERWRITTEN_ON${new Date().getTime()}`
165 await existingUser.save()
166 if (userRes) {
167 const updates = [
168 Follows.update(
169 {
170 followerId: userRes.id
171 },
172 {
173 where: {
174 followerId: existingUser.id
175 }
176 }
177 ),
178 Follows.update(
179 {
180 followedId: userRes.id
181 },
182 {
183 where: {
184 followedId: existingUser.id
185 }
186 }
187 ),
188 Post.update(
189 {
190 userId: userRes.id
191 },
192 {
193 where: {
194 userId: existingUser.id
195 }
196 }
197 ),
198 UserLikesPostRelations.update(
199 {
200 userId: userRes.id
201 },
202 {
203 where: {
204 userId: existingUser.id
205 }
206 }
207 ),
208 EmojiReaction.update(
209 {
210 userId: userRes.id
211 },
212 {
213 where: {
214 userId: existingUser.id
215 }
216 }
217 ),
218 Blocks.update(
219 {
220 blockedId: userRes.id
221 },
222 {
223 where: {
224 blockedId: existingUser.id
225 }
226 }
227 ),
228 Blocks.update(
229 {
230 blockerId: userRes.id
231 },
232 {
233 where: {
234 blockerId: existingUser.id
235 }
236 }
237 ),
238 Mutes.update(
239 {
240 muterId: userRes.id
241 },
242 {
243 where: {
244 muterId: existingUser.id
245 }
246 }
247 ),
248 Mutes.update(
249 {
250 mutedId: userRes.id
251 },
252 {
253 where: {
254 mutedId: existingUser.id
255 }
256 }
257 ),
258 PostMentionsUserRelation.update(
259 {
260 userId: userRes.id
261 },
262 {
263 where: {
264 userId: existingUser.id
265 }
266 }
267 )
268 ]
269 await Promise.all(updates)
270 }
271 await redisCache.del('userRemoteId:' + existingUser.remoteId)
272 }
273 if (userRes) {
274 userRes.set(userData)
275 await userRes.save()
276 } else {
277 redisCache.del('userRemoteId:' + actorUrl.toLocaleLowerCase())
278 }
279 }
280 } else {
281 if (existingUsers && existingUsers[0]) {
282 existingUsers[0].set(userData)
283 await existingUsers[0].save()
284 } else {
285 userRes = await User.create(userData)
286 }
287 }
288 if (userRes && userRes.id && userRes.url != completeEnvironment.deletedUser && userPetition) {
289 try {
290 if (userPetition._wafrn_customCSS) {
291 let customCSS: string | undefined = undefined
292 logger.info({ id: userPetition.id }, 'found custom css for this user')
293 if (URL.canParse(userPetition._wafrn_customCSS)) {
294 const cssRes = await fetch(userPetition._wafrn_customCSS, {
295 headers: {
296 'User-Agent': getUserAgent('ActivityPubWorker')
297 }
298 })
299 if (cssRes.ok) customCSS = await cssRes.text()
300 } else {
301 customCSS = userPetition._wafrn_customCSS
302 }
303 if (customCSS) {
304 const css = await processExternalCustomCss(userRes.id, customCSS)
305 await writeFile(`uploads/themes/${userRes.id}.css`, css)
306 }
307 } else if (existsSync(`uploads/themes/${userRes.id}.css`)) {
308 await unlink(`uploads/themes/${userRes.id}.css`)
309 }
310 } catch (e) {
311 logger.warn(e)
312 }
313
314 try {
315 if (userPetition.alsoKnownAs) {
316 const atUri = (userPetition.alsoKnownAs as string[]).find(
317 (x) => x.startsWith('did:') || x.startsWith('at://')
318 )
319 let mergeAcc = 0
320 if (atUri) {
321 const atDoc = await getDidDoc(atUri)
322 if (
323 atDoc &&
324 (atDoc.alsoKnownAs?.includes(userPetition.id) ||
325 atDoc.alsoKnownAs?.includes(userPetition.id.replace('/fediverse/blog', '/blog')))
326 ) {
327 // make it merged (wafrn user)
328 mergeAcc = 1
329 } else if (atDoc && userPetition.id.includes('brid.gy/')) {
330 // check if bridgy fed
331 // we can't bridge bridged from web users so hard code to bsky.brid.gy
332 mergeAcc = 2
333 }
334 if (mergeAcc > 0) {
335 const oldUser = await User.findOne({
336 where: {
337 bskyDid: atUri.replace(/^at:\/\//, '')
338 }
339 })
340 if (oldUser) {
341 logger.info({ oldUser, userRes }, 'merging accs')
342 // put this in a queue so it wont lag entire instance
343 await mergeUsersQueue.add('mergeUsers', {
344 primaryUserId: mergeAcc === 2 ? oldUser.id : userRes.id,
345 userToMergeId: mergeAcc === 1 ? oldUser.id : userRes.id
346 })
347 }
348
349 // if bridgy user, to prevent more issues, return the existing bsky user instead
350 if (mergeAcc === 2) return oldUser
351 }
352 }
353 }
354 } catch (e) {
355 logger.warn(
356 {
357 error: e,
358 userPetition
359 },
360 'cannot merge user'
361 )
362 }
363 }
364 if (userRes && userRes.id && userRes.url != completeEnvironment.deletedUser) {
365 if (userPetition && userPetition.attachment && userPetition.attachment.length) {
366 await UserOptions.destroy({
367 where: {
368 userId: userRes.id,
369 optionName: {
370 [Op.like]: 'fediverse.public.attachment'
371 }
372 }
373 })
374 const properties = userPetition.attachment.filter((elem: any) => elem.type === 'PropertyValue')
375 await UserOptions.create({
376 userId: userRes.id,
377 optionName: `fediverse.public.attachment`,
378 optionValue: JSON.stringify(properties),
379 public: true
380 })
381 }
382 }
383 res = userRes?.id ? userRes.id : await getDeletedUser()
384 try {
385 if (userRes) {
386 const tags = userPetition?.tag
387 ? Array.isArray(userPetition.tag)
388 ? userPetition.tag
389 : [userPetition.tag]
390 : []
391 const emojis = [...new Set(tags.filter((elem: fediverseTag) => elem.type === 'Emoji'))]
392 await processUserEmojis(userRes, emojis)
393 }
394 } catch (error) {
395 logger.info({
396 message: `Error processing emojis from user ${userRes?.url}`,
397 error: error,
398 tags: userPetition?.tag,
399 userPetition: userPetition
400 })
401 }
402 }
403 }
404 }
405 return res
406}
407
408export { getRemoteActorIdProcessor }