unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at angular21 136 lines 4.0 kB view raw
1import { Job, Queue, QueueEvents } from "bullmq"; 2import { sequelize, User } from "../../models/index.js"; 3 4import { logger } from "../logger.js"; 5import { getUserIdFromRemoteId } from "../cacheGetters/getUserIdFromRemoteId.js"; 6import { getDeletedUser } from "../cacheGetters/getDeletedUser.js"; 7import { forcePopulateUsers } from "../../atproto/utils/getAtprotoUser.js"; 8import { redisCache } from "../redis.js"; 9import { completeEnvironment } from "../backendOptions.js"; 10 11const queue = new Queue("getRemoteActorId", { 12 connection: completeEnvironment.bullmqConnection, 13 defaultJobOptions: { 14 removeOnComplete: true, 15 removeOnFail: true, 16 attempts: 2, 17 backoff: { 18 type: "exponential", 19 delay: 1000, 20 }, 21 }, 22}); 23const queueEvents = new QueueEvents("getRemoteActorId", { 24 connection: completeEnvironment.bullmqConnection, 25}); 26async function getRemoteActor( 27 actorUrl: string, 28 user: User | null, 29 forceUpdate = false 30): Promise<User | null> { 31 if (!user) { 32 logger.debug({ 33 message: `caled getremoteactor with null`, 34 }); 35 return null; 36 } 37 let remoteUser; 38 if (!actorUrl) { 39 return await getDeletedUser(); 40 } 41 try { 42 // we check its a string. A little bit dirty but could be worse 43 if ( 44 actorUrl 45 .toLowerCase() 46 .startsWith(completeEnvironment.frontendUrl + "/fediverse/blog/") 47 ) { 48 const urlToSearch = actorUrl 49 .split(completeEnvironment.frontendUrl + "/fediverse/blog/")[1] 50 .toLowerCase(); 51 return User.findOne({ 52 where: sequelize.where( 53 sequelize.fn("lower", sequelize.col("url")), 54 urlToSearch.toLowerCase() 55 ), 56 }); 57 } 58 if ( 59 completeEnvironment.enableBsky && 60 actorUrl.toLowerCase().startsWith("at://") 61 ) { 62 // Bluesky user. This should only happen through an import 63 const adminUser = (await User.findOne({ 64 where: { 65 url: completeEnvironment.adminUser, 66 }, 67 })) as User; 68 await forcePopulateUsers([actorUrl.slice(5)], adminUser); 69 return ( 70 User.findOne({ 71 where: { 72 bskyDid: actorUrl.slice(5), 73 }, 74 }) || (await getDeletedUser()) 75 ); 76 } 77 let userId = await getUserIdFromRemoteId(actorUrl); 78 if (userId === "") { 79 const job = await queue.add("getRemoteActorId", { 80 actorUrl: actorUrl, 81 userId: user.id, 82 forceUpdate: forceUpdate, 83 }); 84 const result = await job.waitUntilFinished(queueEvents).catch((error) => { 85 logger.debug({ 86 message: `Error while geting user`, 87 user: actorUrl, 88 by: user.url, 89 error: error, 90 }); 91 }); 92 if (result && result.id) { 93 userId = result.id; 94 } else { 95 userId = result; 96 } 97 } 98 userId = userId == "" ? "00000000-0000-0000-0000-000000000000" : userId; 99 remoteUser = await User.findByPk(userId); 100 if ( 101 !remoteUser || 102 (remoteUser && remoteUser.banned) || 103 (remoteUser && (await remoteUser.getFederatedHost())?.blocked) 104 ) { 105 remoteUser = await getDeletedUser(); 106 } 107 } catch (error) { 108 logger.trace({ 109 message: `Error fetching user ${actorUrl}`, 110 error: error, 111 }); 112 } 113 // update user if last update was more than 24 hours ago 114 if (remoteUser && remoteUser.url !== completeEnvironment.deletedUser) { 115 const lastUpdate = new Date(remoteUser.updatedAt); 116 const now = new Date(); 117 if ( 118 now.getTime() - lastUpdate.getTime() > 24 * 3600 * 1000 || 119 forceUpdate 120 ) { 121 await queue.add( 122 "getRemoteActorId", 123 { actorUrl: actorUrl, userId: user.id, forceUpdate: true }, 124 { 125 jobId: actorUrl.replaceAll(":", "_").replaceAll("/", "_"), 126 } 127 ); 128 } 129 } 130 if (remoteUser) { 131 await redisCache.del("key:" + remoteUser.remoteId); 132 } 133 return remoteUser ? remoteUser : await getDeletedUser(); 134} 135 136export { getRemoteActor };