unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at angular21 154 lines 4.9 kB view raw
1import { Job, Queue } from "bullmq"; 2import { 3 Media, 4 Post, 5 PostTag, 6 Quotes, 7 User, 8 Notification, 9} from "../../models/index.js"; 10import { completeEnvironment } from "../backendOptions.js"; 11import { Privacy } from "../../models/post.js"; 12import { getAtProtoSession } from "../../atproto/utils/getAtProtoSession.js"; 13import { postToAtproto } from "../../atproto/utils/postToAtproto.js"; 14import { wait } from "../wait.js"; 15import { logger } from "../logger.js"; 16 17async function sendBiteBsky(biterId: string, postId?: string, userId?: string) { 18 const post = postId ? await Post.findByPk(postId) : undefined; 19 const user = userId ? await User.findByPk(userId) : undefined; 20 const localUser = await User.findByPk(biterId); 21 22 if ((post && !post?.bskyUri) || (user && !user.bskyDid)) return; // make sure there is actually a bsky user 23 if (!localUser || !localUser.enableBsky) return; // make sure there's actually a bsky account to put the record on 24 25 const subjectUri = post ? post.bskyUri : user ? "at://" + user.bskyDid : ""; 26 27 const bskyBite = { 28 $type: "net.wafrn.feed.bite", 29 subject: subjectUri, 30 createdAt: new Date(Date.now()).toISOString(), 31 }; 32 33 let agent = await getAtProtoSession(localUser); 34 35 await agent.com.atproto.repo.createRecord({ 36 repo: agent.session?.did ?? localUser.bskyDid ?? "", 37 collection: "net.wafrn.feed.bite", 38 record: bskyBite, 39 }); 40} 41 42async function sendPostBsky(job: Job) { 43 const post = await Post.findByPk(job.data.postId); 44 if (post && !post.bskyUri) { 45 const parent = post.parentId 46 ? await Post.findByPk(post.parentId) 47 : undefined; 48 const parentPoster = parent 49 ? await User.findByPk(parent.userId) 50 : undefined; 51 const localUser = await User.findByPk(post.userId); 52 try { 53 if ( 54 post.privacy === Privacy.Public && 55 localUser?.enableBsky && 56 completeEnvironment.enableBsky 57 ) { 58 if (!parent || parent.bskyUri) { 59 // ok the user has bluesky time to send the post 60 let isReblog = false; 61 if ( 62 post.content == "" && 63 post.content_warning == "" && 64 post.parentId 65 ) { 66 const mediaCount = await Media.count({ 67 where: { 68 postId: post.id, 69 }, 70 }); 71 const quotesCount = await Quotes.count({ 72 where: { 73 quoterPostId: post.id, 74 }, 75 }); 76 const tagsCount = await PostTag.count({ 77 where: { 78 postId: post.id, 79 }, 80 }); 81 if (mediaCount + quotesCount + tagsCount === 0) { 82 isReblog = true; 83 if (parent?.bskyUri) { 84 let agent = await getAtProtoSession(localUser); 85 const { uri } = await agent.repost( 86 parent.bskyUri, 87 parent.bskyCid as string 88 ); 89 post.bskyUri = uri; 90 await post.save(); 91 } 92 } 93 } 94 if (!isReblog) { 95 let agent = await getAtProtoSession(localUser); 96 const bskyPost = await agent.post(await postToAtproto(post, agent)); 97 await wait(750); 98 const duplicatedPost = await Post.findOne({ 99 where: { 100 bskyCid: bskyPost.cid, 101 }, 102 }); 103 if (duplicatedPost) { 104 logger.debug({ 105 message: `Bluesky duplicated post in database already. Cleaning up`, 106 }); 107 await Notification.destroy({ 108 where: { 109 postId: duplicatedPost.id, 110 }, 111 }); 112 try { 113 await duplicatedPost.destroy(); 114 } catch (err) { 115 duplicatedPost.isDeleted = true; 116 await duplicatedPost.save(); 117 } 118 } 119 post.bskyUri = bskyPost.uri; 120 post.bskyCid = bskyPost.cid; 121 if (post.parentId) { 122 post.replyControl = 100; 123 } 124 await post.save(); 125 } 126 } 127 } 128 await post.save(); 129 } catch (error) { 130 logger.debug({ message: `Error sending post to bluesky`, error }); 131 } 132 } 133 if (post) { 134 const prepareSendPostQueue = new Queue("prepareSendPost", { 135 connection: completeEnvironment.bullmqConnection, 136 defaultJobOptions: { 137 removeOnComplete: true, 138 attempts: 3, 139 backoff: { 140 type: "exponential", 141 delay: 5000, 142 }, 143 removeOnFail: true, 144 }, 145 }); 146 // we send the post to fedi once we get the bsky data 147 await prepareSendPostQueue.add("prepareSendPost", job.data, { 148 jobId: post.id, 149 delay: 5000, 150 }); 151 } 152} 153 154export { sendPostBsky, sendBiteBsky };