unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Job, Queue } from 'bullmq'
2import { Follows, Post, User, UserLikesPostRelations } from '../../models/index.js'
3import { getPostThreadRecursive } from '../activitypub/getPostThreadRecursive.js'
4import { logger } from '../logger.js'
5import { Op } from 'sequelize'
6import { completeEnvironment } from '../backendOptions.js'
7import { removeUser } from '../activitypub/removeUser.js'
8
9
10// this thing is compute intensive
11async function mergeUser(job: Job) {
12 const {
13 primaryUserId,
14 userToMergeId
15 }: {
16 primaryUserId: string,
17 userToMergeId: string
18 } = job.data
19
20 if (primaryUserId === userToMergeId) return
21
22 logger.info(job.data, 'working on merging 2 users')
23
24 // first we get the users
25 const primaryUser = await User.scope('full').findByPk(primaryUserId)
26 const userToMerge = await User.scope('full').findByPk(userToMergeId)
27
28 // if user is local we dont merge lol
29 if (!primaryUser || !userToMerge || primaryUser.email || userToMerge.email) return
30 // then we start the merge
31 // we start by force refetching all the posts from usertomerge
32 let postsFromUserToMerge = await Post.findAll({
33 where: {
34 userId: userToMergeId,
35 [Op.or]: [
36 {
37 remotePostId: {
38 [Op.eq]: null
39 }
40 },
41 {
42 bskyUri: {
43 [Op.eq]: null
44 }
45 }
46 ]
47 }
48 })
49
50 const mergePostQueue = new Queue('mergePosts', {
51 connection: completeEnvironment.bullmqConnection,
52 defaultJobOptions: {
53 removeOnComplete: true,
54 attempts: 3,
55 backoff: {
56 type: 'exponential',
57 delay: 1000
58 },
59 removeOnFail: true
60 }
61 })
62
63 mergePostQueue.addBulk(
64 postsFromUserToMerge.map(post => {
65 return {
66 name: `mergePost-${post.id}`,
67 data: { postId: post.id} }
68 })
69
70 )
71
72 // now for the remainings we just migrate all of them to the new user
73 await Post.update({
74 userId: primaryUserId
75 }, {
76 where: {
77 userId: userToMergeId
78 }
79 })
80
81 // then we migrate the user info and stuff
82 let notToUpdate = await Follows.findAll({
83 where: {
84 followerId: primaryUserId
85 }
86 })
87 await Follows.update({
88 followerId: primaryUserId
89 }, {
90 where: {
91 followerId: userToMergeId,
92 followedId: {
93 [Op.notIn]: notToUpdate.map(elem => elem.followedId)
94 }
95 }
96 })
97
98 notToUpdate = await Follows.findAll({
99 where: {
100 followedId: primaryUserId
101 }
102 })
103 await Follows.update({
104 followedId: primaryUserId
105 }, {
106 where: {
107 followedId: userToMergeId,
108 followerId: {
109 [Op.notIn]: notToUpdate.map(elem => elem.followerId)
110 }
111 }
112 })
113 notToUpdate = await Follows.findAll({
114 where: {
115 followerId: primaryUserId
116 }
117 })
118 await Follows.update({
119 followerId: primaryUserId
120 }, {
121 where: {
122 followerId: userToMergeId,
123 followedId: {
124 [Op.notIn]: notToUpdate.map(elem => elem.followedId)
125 }
126 }
127 })
128
129 const newRemoteId = userToMerge.remoteId || primaryUser.remoteId
130 const did = userToMerge.bskyDid || primaryUser.bskyDid
131 // now we update the user to merge to decouple the remote post and mark it as deleted
132 // we will not delete it so if somethings wrong admins can still recover info
133 if (userToMerge.bskyDid) {
134 primaryUser.bskyDid = userToMerge.bskyDid
135 } else if (userToMerge.remoteId) {
136 primaryUser.remoteId = userToMerge.remoteId
137 primaryUser.remoteInbox = userToMerge.remoteInbox
138 primaryUser.remoteMentionUrl = userToMerge.remoteMentionUrl
139 primaryUser.publicKey = userToMerge.publicKey
140 primaryUser.followersCollectionUrl = userToMerge.followersCollectionUrl
141 primaryUser.followingCollectionUrl = userToMerge.followingCollectionUrl
142 primaryUser.isBskyPrimary = true
143 }
144
145 primaryUser.remoteId = newRemoteId
146 primaryUser.bskyDid = did
147
148 primaryUser.alternateUrl = userToMerge.url
149
150 userToMerge.bskyDid = null
151 userToMerge.remoteId = null
152 userToMerge.remoteInbox = null
153 userToMerge.remoteMentionUrl = null
154 userToMerge.publicKey = null
155 userToMerge.followersCollectionUrl = null
156 userToMerge.followingCollectionUrl = null
157 userToMerge.save()
158 primaryUser.save()
159 await removeUser(userToMerge.id)
160
161 logger.info({
162 message: `Merged users ${primaryUser.url} (primary) and ${userToMerge?.url}` }
163 )
164}
165
166export { mergeUser }