unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at cache-folder-container 160 lines 6.2 kB view raw
1import { Job, Queue } from 'bullmq' 2import { Media, Post, PostTag, Quotes, User, Notification } from '../../models/index.js' 3import { completeEnvironment } from '../backendOptions.js' 4import { InteractionControl, Privacy } from '../../models/post.js' 5import { getAtProtoSession } from '../../atproto/utils/getAtProtoSession.js' 6import { postToAtproto } from '../../atproto/utils/postToAtproto.js' 7import { wait } from '../wait.js' 8import { logger } from '../logger.js' 9import { AtUri } from '@atproto/api' 10 11async function sendPostBsky(job: Job) { 12 const post = await Post.findByPk(job.data.postId) 13 if (post && !post.bskyUri) { 14 const parent = post.parentId ? await Post.findByPk(post.parentId) : undefined 15 const parentPoster = parent ? await User.findByPk(parent.userId) : undefined 16 const localUser = await User.findByPk(post.userId) 17 try { 18 if ( 19 post.privacy === Privacy.Public && 20 localUser?.enableBsky && 21 localUser?.bskyDid && 22 completeEnvironment.enableBsky 23 ) { 24 if (!parent || parent.bskyUri) { 25 // ok the user has bluesky time to send the post 26 let isReblog = false 27 if (post.content == '' && post.content_warning == '' && post.parentId) { 28 const mediaCount = await Media.count({ 29 where: { 30 postId: post.id 31 } 32 }) 33 const quotesCount = await Quotes.count({ 34 where: { 35 quoterPostId: post.id 36 } 37 }) 38 const tagsCount = await PostTag.count({ 39 where: { 40 postId: post.id 41 } 42 }) 43 if (mediaCount + quotesCount + tagsCount === 0) { 44 isReblog = true 45 if (parent?.bskyUri) { 46 let agent = await getAtProtoSession(localUser) 47 const { uri } = await agent.repost(parent.bskyUri, parent.bskyCid as string) 48 post.bskyUri = uri 49 await post.save() 50 } 51 } 52 } 53 if (!isReblog) { 54 let agent = await getAtProtoSession(localUser) 55 const atProtoObject = await postToAtproto(post, agent) 56 const bskyPost = await agent.post(atProtoObject) 57 const {rkey} = new AtUri(bskyPost.uri) 58 if(bskyPost && agent.session && post.quoteControl != InteractionControl.Anyone) { 59 await agent.com.atproto.repo.createRecord({ 60 repo: agent.session.did, 61 collection: 'app.bsky.feed.postgate', 62 rkey, 63 record: { 64 $type: 'app.bsky.feed.postgate', 65 post: bskyPost.uri, 66 embeddingRules: [ 67 {'$type': 'app.bsky.feed.postgate#disableRule'} 68 ], 69 createdAt: new Date().toISOString() 70 } 71 }) 72 } 73 if(post.hierarchyLevel === 1 && post.replyControl != InteractionControl.Anyone && agent.session) { 74 const gates: string[] = [] 75 if([InteractionControl.FollowersFollowingAndMentioned, InteractionControl.MentionedUsersOnly, InteractionControl.FollowersAndMentioned, InteractionControl.FollowingAndMentioned].includes(post.replyControl)) { 76 gates.push('app.bsky.feed.threadgate#mentionRule') 77 } 78 if([InteractionControl.Followers, InteractionControl.FollowersAndFollowing, InteractionControl.FollowersAndMentioned, InteractionControl.FollowersFollowingAndMentioned].includes(post.replyControl)) { 79 gates.push('app.bsky.feed.threadgate#followerRule') 80 } 81 82 if([InteractionControl.Following, InteractionControl.FollowingAndMentioned, InteractionControl.FollowersAndFollowing, InteractionControl.FollowersFollowingAndMentioned].includes(post.replyControl)) { 83 gates.push('app.bsky.feed.threadgate#followingRule') 84 } 85 if(post.quoteControl !== InteractionControl.Anyone){ 86 } 87 await agent.com.atproto.repo.createRecord({ 88 repo: agent.session.did, 89 collection: 'app.bsky.feed.threadgate', 90 rkey, 91 record: { 92 $type: 'app.bsky.feed.threadgate', 93 post: bskyPost.uri, 94 allow: gates.map(elem => { return { 95 '$type': elem 96 }}), 97 createdAt: new Date().toISOString() 98 } 99 }) 100 101 102 } 103 await wait(750) 104 const duplicatedPost = await Post.findOne({ 105 where: { 106 bskyCid: bskyPost.cid 107 } 108 }) 109 if (duplicatedPost) { 110 logger.debug({ 111 message: `Bluesky duplicated post in database already. Cleaning up` 112 }) 113 await Notification.destroy({ 114 where: { 115 postId: duplicatedPost.id 116 } 117 }) 118 try { 119 await duplicatedPost.destroy() 120 } catch (err) { 121 duplicatedPost.isDeleted = true 122 await duplicatedPost.save() 123 } 124 } 125 post.bskyUri = bskyPost.uri 126 post.bskyCid = bskyPost.cid 127 if (post.parentId) { 128 post.replyControl = 100 129 } 130 await post.save() 131 } 132 } 133 } 134 await post.save() 135 } catch (error) { 136 logger.debug({ message: `Error sending post to bluesky`, error }) 137 } 138 } 139 if (post) { 140 const prepareSendPostQueue = new Queue('prepareSendPost', { 141 connection: completeEnvironment.bullmqConnection, 142 defaultJobOptions: { 143 removeOnComplete: true, 144 attempts: 3, 145 backoff: { 146 type: 'exponential', 147 delay: 5000 148 }, 149 removeOnFail: true 150 } 151 }) 152 // we send the post to fedi once we get the bsky data 153 await prepareSendPostQueue.add('prepareSendPost', job.data, { 154 jobId: post.id, 155 delay: 5000 156 }) 157 } 158} 159 160export { sendPostBsky }