// returns the post id import { getAtProtoSession } from "./getAtProtoSession.js"; import { QueryParams } from "@atproto/sync/dist/firehose/lexicons.js"; import { EmojiReaction, Media, Notification, Post, PostAncestor, PostMentionsUserRelation, PostReport, PostTag, QuestionPoll, Quotes, RemoteUserPostView, SilencedPost, User, UserBitesPostRelation, UserBookmarkedPosts, UserLikesPostRelations, } from "../../models/index.js"; import { Model, Op } from "sequelize"; import { PostView, ThreadViewPost, } from "@atproto/api/dist/client/types/app/bsky/feed/defs.js"; import { getAtprotoUser } from "./getAtprotoUser.js"; import { CreateOrUpdateOp } from "@skyware/firehose"; import { logger } from "../../utils/logger.js"; import { RichText } from "@atproto/api"; import showdown from "showdown"; import { bulkCreateNotifications, createNotification, } from "../../utils/pushNotifications.js"; import { getAllLocalUserIds } from "../../utils/cacheGetters/getAllLocalUserIds.js"; import { InteractionControl, InteractionControlType, Privacy, } from "../../models/post.js"; import { wait } from "../../utils/wait.js"; import { UpdatedAt } from "sequelize-typescript"; import { completeEnvironment } from "../../utils/backendOptions.js"; import { MediaAttributes } from "../../models/media.js"; import { getAdminAtprotoSession } from "../../utils/atproto/getAdminAtprotoSession.js"; import { getPostThreadRecursive } from "../../utils/activitypub/getPostThreadRecursive.js"; import { Queue, QueueEvents } from "bullmq"; import { getAdminUser } from "../../utils/getAdminAndDeletedUser.js"; const markdownConverter = new showdown.Converter({ simplifiedAutoLink: true, literalMidWordUnderscores: true, strikethrough: true, simpleLineBreaks: true, openLinksInNewWindow: true, emoji: true, }); const adminUser = getAdminUser(); async function processSinglePost( post: PostView, parentId?: string, forceUpdate?: boolean ): Promise { if (!post || !completeEnvironment.enableBsky) { return undefined; } if (!forceUpdate) { const existingPost = await Post.findOne({ where: { bskyUri: post.uri, }, }); if (existingPost) { return existingPost.id; } } let postCreator: User | undefined; try { postCreator = await getAtprotoUser( post.author.did, (await adminUser) as User ); } catch (error) { logger.debug({ message: `Problem obtaining user from post`, post, parentId, forceUpdate, error, }); } let verifiedFedi: string | undefined; if ("fediverseId" in post.record || "bridgyOriginalUrl" in post.record) { if ("bridgyOriginalUrl" in post.record) { const res = await fetch( "https://slingshot.microcosm.blue/xrpc/com.bad-example.identity.resolveMiniDoc" + `?identifier=${post.author.did}` ); if (res.ok) { const json = (await res.json()) as { pds: string }; if (json.pds.replace(/^https?:\/\//, "") === "atproto.brid.gy") { // if user is on bridgy pds, verify it verifiedFedi = post.record.bridgyOriginalUrl as string; logger.info( { uri: post.uri, url: post.record.bridgyOriginalUrl }, "fedi bridged post is bridgy fed" ); } } } else { // prob wafrn post, but lets verify it try { const waf = await fetch( `https://${new URL(post.record.fediverseId as string).hostname }/api/environment` ); if (waf.ok) { const res = await fetch( (post.record.fediverseId as string).replace( "fediverse/", "api/v2/" ), { headers: { Accept: "application/json", }, } ); if (res.ok) { const json = (await res.json()) as { posts: { bskyCid: string }[] }; if (json.posts[0].bskyCid === post.cid) { verifiedFedi = post.record.fediverseId as string; logger.info( { uri: post.uri, url: post.record.fediverseId }, "fedi bridged post is wafrn" ); } } } } catch (error) { logger.debug({ error, message: `Error in obtaining fedi post ${post.record.fediverseId}`, }); } } } if (verifiedFedi) { try { const remotePost = await getPostThreadRecursive( await getAdminUser(), verifiedFedi ); if (remotePost) { await getPostThreadRecursive( await getAdminUser(), verifiedFedi, undefined, remotePost.id ); remotePost.bskyCid = post.cid; remotePost.bskyUri = post.uri; // if there's already a bsky post about // this that doesn't have any fedi urls, delete it // and prob update the things let existingPost = await Post.findOne({ where: { bskyCid: post.cid, remotePostId: null, }, }); if ( existingPost && !(await getAllLocalUserIds()).includes(existingPost.userId) ) { // very expensive updates! but only happens when user // searches existing post that is alr on db await EmojiReaction.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await Notification.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await PostReport.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); try { await PostAncestor.update( { postsId: remotePost.id, }, { where: { postsId: existingPost.id, }, } ); } catch { } await QuestionPoll.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await Quotes.update( { quoterPostId: remotePost.id, }, { where: { quoterPostId: existingPost.id, }, } ); if ( !(await Quotes.findOne({ where: { quotedPostId: remotePost.id, }, })) ) { await Quotes.update( { quotedPostId: remotePost.id, }, { where: { quotedPostId: existingPost.id, }, } ); } await RemoteUserPostView.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await SilencedPost.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await SilencedPost.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await UserBitesPostRelation.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await UserBookmarkedPosts.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await UserLikesPostRelations.update( { postId: remotePost.id, }, { where: { postId: existingPost.id, }, } ); await Post.update( { parentId: remotePost.id, }, { where: { parentId: existingPost.id, }, } ); await Post.destroy({ where: { bskyCid: post.cid, remotePostId: null, userId: { [Op.notIn]: await getAllLocalUserIds(), }, }, }); } await remotePost.save(); return remotePost.id; } } catch (error) { logger.debug({ message: `Error in obtaining fedi post ${verifiedFedi}`, error, }); } } if (!postCreator || !post) { const usr = postCreator ? postCreator : await User.findOne({ where: { url: completeEnvironment.deletedUser } }); const invalidPost = await Post.create({ userId: usr?.id, content: `Failed to get atproto post`, parentId: parentId, isDeleted: true, createdAt: new Date(0), updatedAt: new Date(0), }); return invalidPost.id; } if (postCreator) { const medias = getPostMedias(post); let tags: string[] = []; let mentions: string[] = []; let record = post.record as any; let postText = record.text; let federatedWoot = false; if (record.fullText || record.bridgyOriginalText) { federatedWoot = true; tags = record.fullTags?.split("\n").filter((x: string) => !!x) ?? []; // also detect full tags postText = record.fullText ?? record.bridgyOriginalText; } if (record.facets && record.facets.length > 0 && !federatedWoot) { // lets get mentions const mentionedDids = record.facets .flatMap((elem: any) => elem.features) .map((elem: any) => elem.did) .filter((elem: any) => elem); if (mentionedDids && mentionedDids.length > 0) { const mentionedUsers = await User.findAll({ where: { bskyDid: { [Op.in]: mentionedDids, }, }, }); mentions = mentionedUsers.map((elem) => elem.id); } const rt = new RichText({ text: postText, facets: record.facets, }); let text = ""; for (const segment of rt.segments()) { if (segment.isLink()) { const href = segment.link?.uri; text += `${href}`; } else if (segment.isMention()) { const href = `${completeEnvironment.frontendUrl}/blog/${segment.mention?.did}`; text += `${segment.text}`; } else if (segment.isTag()) { const href = `${completeEnvironment.frontendUrl }/dashboard/search/${segment.text.substring(1)}`; text += `${segment.text}`; tags.push(segment.text.substring(1)); } else { text += segment.text; } } postText = text; } if (!federatedWoot) postText = postText.replaceAll("\n", "
"); const labels = getPostLabels(post); let cw = labels.length > 0 ? `Post is labeled as: ${labels.join(", ")}` : undefined; if (!cw && postCreator.NSFW) { cw = "This user has been marked as NSFW and the post has been labeled automatically as NSFW"; } const newData = { userId: postCreator.id, bskyCid: post.cid, bskyUri: post.uri, content: postText, createdAt: new Date((post.record as any).createdAt), privacy: Privacy.Public, parentId: parentId, content_warning: cw, ...getPostInteractionLevels(post, parentId), }; if (!parentId) { delete newData.parentId; } if ((await getAllLocalUserIds()).includes(newData.userId) && !forceUpdate) { // dirty as hell but this should stop the duplication await wait(1500); } let [postToProcess, created] = await Post.findOrCreate({ where: { bskyUri: post.uri }, defaults: newData, }); // do not update existing posts. But what if local user creates a post through bsky? then we force updte i guess if ( !(await getAllLocalUserIds()).includes(postToProcess.userId) || created ) { if (!created) { postToProcess.set(newData); await postToProcess.save(); } if (medias) { await Media.destroy({ where: { postId: postToProcess.id, }, }); await Media.bulkCreate( medias.map((media: any) => { return { ...media, postId: postToProcess.id }; }) ); } if (parentId) { const ancestors = await postToProcess.getAncestors({ attributes: ["userId"], where: { hierarchyLevel: { [Op.gt]: postToProcess.hierarchyLevel - 5, }, }, }); mentions = mentions.concat(ancestors.map((elem) => elem.userId)); } mentions = [...new Set(mentions)]; if (mentions.length > 0) { await Notification.destroy({ where: { notificationType: "MENTION", postId: postToProcess.id, }, }); await PostMentionsUserRelation.destroy({ where: { postId: postToProcess.id, }, }); await bulkCreateNotifications( mentions.map((mnt) => ({ notificationType: "MENTION", postId: postToProcess.id, notifiedUserId: mnt, userId: postToProcess.userId, createdAt: new Date(postToProcess.createdAt), })), { ignoreDuplicates: true, postContent: postText, userUrl: postCreator.url, } ); await PostMentionsUserRelation.bulkCreate( mentions.map((mnt) => { return { userId: mnt, postId: postToProcess.id, }; }), { ignoreDuplicates: true } ); } if (tags.length > 0) { await PostTag.destroy({ where: { postId: postToProcess.id, }, }); await PostTag.bulkCreate( tags.map((tag) => { return { postId: postToProcess.id, tagName: tag, }; }) ); } const quotedPostUri = getQuotedPostUri(post); if (quotedPostUri) { const quotedPostId = await getAtProtoThread(quotedPostUri); if (quotedPostId) { const quotedPost = await Post.findByPk(quotedPostId); if (quotedPost) { await createNotification( { notificationType: "QUOTE", notifiedUserId: quotedPost.userId, userId: postToProcess.userId, postId: postToProcess.id, }, { postContent: postToProcess.content, userUrl: postCreator?.url, } ); await Quotes.findOrCreate({ where: { quoterPostId: postToProcess.id, quotedPostId: quotedPostId, }, }); } } } } return postToProcess.id; } } function getPostMedias(post: PostView) { let res: MediaAttributes[] = []; const labels = getPostLabels(post); const embed = (post.record as any).embed; if (embed) { if (embed.external) { res = res.concat([ { mediaType: !embed.external.uri.startsWith("https://media.ternor.com/") ? "text/html" : "image/gif", description: embed.external.title, url: embed.external.uri, mediaOrder: 0, external: true, }, ]); } if (embed.images || embed.media) { // case with quote and gif / link preview if (embed.media?.external) { res = res.concat([ { mediaType: !embed.media.external.uri.startsWith( "https://media.ternor.com/" ) ? "text/html" : "image/gif", description: embed.media.external.title, url: embed.media.external.uri, mediaOrder: 0, external: true, }, ]); } else { const thingToProcess = embed.images ? embed.images : embed.media.images; if (thingToProcess) { const toConcat = thingToProcess.map((media: any, index: any) => { const cid = media.image.ref["$link"] ? media.image.ref["$link"] : media.image.ref.toString(); const did = post.author.did; return { mediaType: media.image.mimeType, description: media.alt, height: media.aspectRatio?.height, width: media.aspectRatio?.width, url: `?cid=${encodeURIComponent(cid)}&did=${encodeURIComponent( did )}`, mediaOrder: index, external: true, }; }); res = res.concat(toConcat); } else { logger.debug({ message: `Bsky problem getting medias on post ${post.uri}`, }); } } } if (embed.video) { const video = embed.video; const cid = video.ref["$link"] ? video.ref["$link"] : video.ref.toString(); const did = post.author.did; res = res.concat([ { mediaType: embed.video.mimeType, description: "", height: embed.aspectRatio?.height, width: embed.aspectRatio?.width, url: `?cid=${encodeURIComponent(cid)}&did=${encodeURIComponent(did)}`, mediaOrder: 0, external: true, }, ]); } } return res.map((m) => { return { ...m, NSFW: labels.length > 0, }; }); } // TODO improve this so we get better nsfw messages lol function getPostLabels(post: PostView) { let labels = new Set(); if (post.labels) { for (const label of post.labels) { if (label.neg && labels.has(label.val)) { labels.delete(label.val); } else { labels.add(label.val); } } } return Array.from(labels); } function getPostInteractionLevels( post: PostView, parentId: string | undefined ): { replyControl: InteractionControlType; likeControl: InteractionControlType; reblogControl: InteractionControlType; quoteControl: InteractionControlType; } { let canQuote = InteractionControl.Anyone; let canReply: InteractionControlType = InteractionControl.Anyone; if (post.viewer && post.viewer.embeddingDisabled) { canQuote = InteractionControl.NoOne; } if (parentId) { canReply = InteractionControl.SameAsOp; canQuote = InteractionControl.SameAsOp; } else if ( post.threadgate && post.threadgate.record && (post.threadgate.record as any).allow ) { const allowList = (post.threadgate.record as any).allow; if (allowList.length == 0) { canReply = InteractionControl.NoOne; } else { const mentiontypes: string[] = allowList .map((elem: any) => elem["$type"]) .map((elem: string) => elem.split("app.bsky.feed.threadgate#")[1]); if (mentiontypes.includes("mentionRule")) { if (mentiontypes.includes("followingRule")) { canReply = mentiontypes.includes("followerRule") ? InteractionControl.FollowersFollowersAndMentioned : InteractionControl.FollowingAndMentioned; } else { canReply = mentiontypes.includes("followerRule") ? InteractionControl.FollowersAndMentioned : InteractionControl.MentionedUsersOnly; } } else { if (mentiontypes.includes("followingRule")) { canReply = mentiontypes.includes("followerRule") ? InteractionControl.FollowersAndFollowing : InteractionControl.Following; } else { canReply = mentiontypes.includes("followerRule") ? InteractionControl.Followers : InteractionControl.NoOne; } } } } if ( canQuote === InteractionControl.Anyone && canReply != InteractionControl.Anyone ) { canQuote = canReply; } return { quoteControl: canQuote, replyControl: canReply, likeControl: InteractionControl.Anyone, reblogControl: InteractionControl.Anyone, }; } async function getAtProtoThread( uri: string, forceUpdate?: boolean, ignoreDescendents?: boolean ): Promise { const postExisting = forceUpdate ? undefined : await Post.findOne({ where: { bskyUri: uri, }, }); if (postExisting) { return postExisting.id; } // TODO optimize this a bit if post is not in reply to anything that we dont have const preThread = await getPostThreadSafe({ uri: uri, depth: ignoreDescendents ? 0 : 50, parentHeight: 1000, }); if (preThread) { const thread: ThreadViewPost = preThread.data.thread as ThreadViewPost; //const tmpDids = getDidsFromThread(thread) //forcePopulateUsers(tmpDids, (await adminUser) as Model) let parentId: string | undefined = undefined; if (thread.parent) { parentId = (await processParents( thread.parent as ThreadViewPost )) as string; } const procesedPost = await processSinglePost( thread.post, parentId, forceUpdate ); if (thread.replies && procesedPost) { for await (const repliesThread of thread.replies) { processReplies(repliesThread as ThreadViewPost, procesedPost); } } return procesedPost as string; } else { } } async function processReplies(thread: ThreadViewPost, parentId: string) { if (thread && thread.post) { try { const post = await processSinglePost(thread.post, parentId); if (thread.replies && post) { for await (const repliesThread of thread.replies) { processReplies(repliesThread as ThreadViewPost, post); } } } catch (error) { logger.debug({ message: `Error processing bluesky replies`, error: error, thread: thread, parentId, }); } } } async function processParents( thread: ThreadViewPost ): Promise { let parentId: string | undefined = undefined; if (thread.parent) { parentId = await processParents(thread.parent as ThreadViewPost); } return await processSinglePost(thread.post, parentId); } function getQuotedPostUri(post: PostView): string | undefined { let res: string | undefined = undefined; const embed = (post.record as any).embed; if (embed && ["app.bsky.embed.record"].includes(embed["$type"])) { res = embed.record.uri; } // case of post with pictures and quote else if ( embed && ["app.bsky.embed.recordWithMedia"].includes(embed["$type"]) ) { res = embed.record.record.uri; } return res; } export async function getPostThreadSafe(options: any) { try { const agent = await getAdminAtprotoSession(); return await agent.getPostThread(options); } catch (error) { logger.debug({ message: `Error trying to get atproto thread`, options: options, error: error, }); } } export { getAtProtoThread, getQuotedPostUri, processSinglePost };