unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at development 166 lines 4.4 kB view raw
1import { Job, Queue } from 'bullmq' 2import { Follows, Post, User, UserLikesPostRelations } from '../../models/index.js' 3import { getPostThreadRecursive } from '../activitypub/getPostThreadRecursive.js' 4import { logger } from '../logger.js' 5import { Op } from 'sequelize' 6import { completeEnvironment } from '../backendOptions.js' 7import { removeUser } from '../activitypub/removeUser.js' 8 9 10// this thing is compute intensive 11async function mergeUser(job: Job) { 12 const { 13 primaryUserId, 14 userToMergeId 15 }: { 16 primaryUserId: string, 17 userToMergeId: string 18 } = job.data 19 20 if (primaryUserId === userToMergeId) return 21 22 logger.info(job.data, 'working on merging 2 users') 23 24 // first we get the users 25 const primaryUser = await User.scope('full').findByPk(primaryUserId) 26 const userToMerge = await User.scope('full').findByPk(userToMergeId) 27 28 // if user is local we dont merge lol 29 if (!primaryUser || !userToMerge || primaryUser.email || userToMerge.email) return 30 // then we start the merge 31 // we start by force refetching all the posts from usertomerge 32 let postsFromUserToMerge = await Post.findAll({ 33 where: { 34 userId: userToMergeId, 35 [Op.or]: [ 36 { 37 remotePostId: { 38 [Op.eq]: null 39 } 40 }, 41 { 42 bskyUri: { 43 [Op.eq]: null 44 } 45 } 46 ] 47 } 48 }) 49 50 const mergePostQueue = new Queue('mergePosts', { 51 connection: completeEnvironment.bullmqConnection, 52 defaultJobOptions: { 53 removeOnComplete: true, 54 attempts: 3, 55 backoff: { 56 type: 'exponential', 57 delay: 1000 58 }, 59 removeOnFail: true 60 } 61 }) 62 63 mergePostQueue.addBulk( 64 postsFromUserToMerge.map(post => { 65 return { 66 name: `mergePost-${post.id}`, 67 data: { postId: post.id} } 68 }) 69 70 ) 71 72 // now for the remainings we just migrate all of them to the new user 73 await Post.update({ 74 userId: primaryUserId 75 }, { 76 where: { 77 userId: userToMergeId 78 } 79 }) 80 81 // then we migrate the user info and stuff 82 let notToUpdate = await Follows.findAll({ 83 where: { 84 followerId: primaryUserId 85 } 86 }) 87 await Follows.update({ 88 followerId: primaryUserId 89 }, { 90 where: { 91 followerId: userToMergeId, 92 followedId: { 93 [Op.notIn]: notToUpdate.map(elem => elem.followedId) 94 } 95 } 96 }) 97 98 notToUpdate = await Follows.findAll({ 99 where: { 100 followedId: primaryUserId 101 } 102 }) 103 await Follows.update({ 104 followedId: primaryUserId 105 }, { 106 where: { 107 followedId: userToMergeId, 108 followerId: { 109 [Op.notIn]: notToUpdate.map(elem => elem.followerId) 110 } 111 } 112 }) 113 notToUpdate = await Follows.findAll({ 114 where: { 115 followerId: primaryUserId 116 } 117 }) 118 await Follows.update({ 119 followerId: primaryUserId 120 }, { 121 where: { 122 followerId: userToMergeId, 123 followedId: { 124 [Op.notIn]: notToUpdate.map(elem => elem.followedId) 125 } 126 } 127 }) 128 129 const newRemoteId = userToMerge.remoteId || primaryUser.remoteId 130 const did = userToMerge.bskyDid || primaryUser.bskyDid 131 // now we update the user to merge to decouple the remote post and mark it as deleted 132 // we will not delete it so if somethings wrong admins can still recover info 133 if (userToMerge.bskyDid) { 134 primaryUser.bskyDid = userToMerge.bskyDid 135 } else if (userToMerge.remoteId) { 136 primaryUser.remoteId = userToMerge.remoteId 137 primaryUser.remoteInbox = userToMerge.remoteInbox 138 primaryUser.remoteMentionUrl = userToMerge.remoteMentionUrl 139 primaryUser.publicKey = userToMerge.publicKey 140 primaryUser.followersCollectionUrl = userToMerge.followersCollectionUrl 141 primaryUser.followingCollectionUrl = userToMerge.followingCollectionUrl 142 primaryUser.isBskyPrimary = true 143 } 144 145 primaryUser.remoteId = newRemoteId 146 primaryUser.bskyDid = did 147 148 primaryUser.alternateUrl = userToMerge.url 149 150 userToMerge.bskyDid = null 151 userToMerge.remoteId = null 152 userToMerge.remoteInbox = null 153 userToMerge.remoteMentionUrl = null 154 userToMerge.publicKey = null 155 userToMerge.followersCollectionUrl = null 156 userToMerge.followingCollectionUrl = null 157 userToMerge.save() 158 primaryUser.save() 159 await removeUser(userToMerge.id) 160 161 logger.info({ 162 message: `Merged users ${primaryUser.url} (primary) and ${userToMerge?.url}` } 163 ) 164} 165 166export { mergeUser }