unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at angular21 849 lines 24 kB view raw
1// returns the post id 2import { getAtProtoSession } from "./getAtProtoSession.js"; 3import { QueryParams } from "@atproto/sync/dist/firehose/lexicons.js"; 4import { 5 EmojiReaction, 6 Media, 7 Notification, 8 Post, 9 PostAncestor, 10 PostMentionsUserRelation, 11 PostReport, 12 PostTag, 13 QuestionPoll, 14 Quotes, 15 RemoteUserPostView, 16 SilencedPost, 17 User, 18 UserBitesPostRelation, 19 UserBookmarkedPosts, 20 UserLikesPostRelations, 21} from "../../models/index.js"; 22import { Model, Op } from "sequelize"; 23import { 24 PostView, 25 ThreadViewPost, 26} from "@atproto/api/dist/client/types/app/bsky/feed/defs.js"; 27import { getAtprotoUser } from "./getAtprotoUser.js"; 28import { CreateOrUpdateOp } from "@skyware/firehose"; 29import { logger } from "../../utils/logger.js"; 30import { RichText } from "@atproto/api"; 31import showdown from "showdown"; 32import { 33 bulkCreateNotifications, 34 createNotification, 35} from "../../utils/pushNotifications.js"; 36import { getAllLocalUserIds } from "../../utils/cacheGetters/getAllLocalUserIds.js"; 37import { 38 InteractionControl, 39 InteractionControlType, 40 Privacy, 41} from "../../models/post.js"; 42import { wait } from "../../utils/wait.js"; 43import { UpdatedAt } from "sequelize-typescript"; 44import { completeEnvironment } from "../../utils/backendOptions.js"; 45import { MediaAttributes } from "../../models/media.js"; 46import { getAdminAtprotoSession } from "../../utils/atproto/getAdminAtprotoSession.js"; 47import { getPostThreadRecursive } from "../../utils/activitypub/getPostThreadRecursive.js"; 48import { Queue, QueueEvents } from "bullmq"; 49import { getAdminUser } from "../../utils/getAdminAndDeletedUser.js"; 50 51const markdownConverter = new showdown.Converter({ 52 simplifiedAutoLink: true, 53 literalMidWordUnderscores: true, 54 strikethrough: true, 55 simpleLineBreaks: true, 56 openLinksInNewWindow: true, 57 emoji: true, 58}); 59 60const adminUser = getAdminUser(); 61 62async function processSinglePost( 63 post: PostView, 64 parentId?: string, 65 forceUpdate?: boolean 66): Promise<string | undefined> { 67 if (!post || !completeEnvironment.enableBsky) { 68 return undefined; 69 } 70 if (!forceUpdate) { 71 const existingPost = await Post.findOne({ 72 where: { 73 bskyUri: post.uri, 74 }, 75 }); 76 if (existingPost) { 77 return existingPost.id; 78 } 79 } 80 let postCreator: User | undefined; 81 try { 82 postCreator = await getAtprotoUser( 83 post.author.did, 84 (await adminUser) as User 85 ); 86 } catch (error) { 87 logger.debug({ 88 message: `Problem obtaining user from post`, 89 post, 90 parentId, 91 forceUpdate, 92 error, 93 }); 94 } 95 let verifiedFedi: string | undefined; 96 if ("fediverseId" in post.record || "bridgyOriginalUrl" in post.record) { 97 if ("bridgyOriginalUrl" in post.record) { 98 const res = await fetch( 99 "https://slingshot.microcosm.blue/xrpc/com.bad-example.identity.resolveMiniDoc" + 100 `?identifier=${post.author.did}` 101 ); 102 if (res.ok) { 103 const json = (await res.json()) as { pds: string }; 104 if (json.pds.replace(/^https?:\/\//, "") === "atproto.brid.gy") { 105 // if user is on bridgy pds, verify it 106 verifiedFedi = post.record.bridgyOriginalUrl as string; 107 logger.info( 108 { uri: post.uri, url: post.record.bridgyOriginalUrl }, 109 "fedi bridged post is bridgy fed" 110 ); 111 } 112 } 113 } else { 114 // prob wafrn post, but lets verify it 115 try { 116 const waf = await fetch( 117 `https://${new URL(post.record.fediverseId as string).hostname 118 }/api/environment` 119 ); 120 if (waf.ok) { 121 const res = await fetch( 122 (post.record.fediverseId as string).replace( 123 "fediverse/", 124 "api/v2/" 125 ), 126 { 127 headers: { 128 Accept: "application/json", 129 }, 130 } 131 ); 132 if (res.ok) { 133 const json = (await res.json()) as { posts: { bskyCid: string }[] }; 134 if (json.posts[0].bskyCid === post.cid) { 135 verifiedFedi = post.record.fediverseId as string; 136 logger.info( 137 { uri: post.uri, url: post.record.fediverseId }, 138 "fedi bridged post is wafrn" 139 ); 140 } 141 } 142 } 143 } catch (error) { 144 logger.debug({ 145 error, 146 message: `Error in obtaining fedi post ${post.record.fediverseId}`, 147 }); 148 } 149 } 150 } 151 if (verifiedFedi) { 152 try { 153 const remotePost = await getPostThreadRecursive( 154 await getAdminUser(), 155 verifiedFedi 156 ); 157 if (remotePost) { 158 await getPostThreadRecursive( 159 await getAdminUser(), 160 verifiedFedi, 161 undefined, 162 remotePost.id 163 ); 164 remotePost.bskyCid = post.cid; 165 remotePost.bskyUri = post.uri; 166 // if there's already a bsky post about 167 // this that doesn't have any fedi urls, delete it 168 // and prob update the things 169 let existingPost = await Post.findOne({ 170 where: { 171 bskyCid: post.cid, 172 remotePostId: null, 173 }, 174 }); 175 if ( 176 existingPost && 177 !(await getAllLocalUserIds()).includes(existingPost.userId) 178 ) { 179 // very expensive updates! but only happens when user 180 // searches existing post that is alr on db 181 await EmojiReaction.update( 182 { 183 postId: remotePost.id, 184 }, 185 { 186 where: { 187 postId: existingPost.id, 188 }, 189 } 190 ); 191 await Notification.update( 192 { 193 postId: remotePost.id, 194 }, 195 { 196 where: { 197 postId: existingPost.id, 198 }, 199 } 200 ); 201 await PostReport.update( 202 { 203 postId: remotePost.id, 204 }, 205 { 206 where: { 207 postId: existingPost.id, 208 }, 209 } 210 ); 211 try { 212 await PostAncestor.update( 213 { 214 postsId: remotePost.id, 215 }, 216 { 217 where: { 218 postsId: existingPost.id, 219 }, 220 } 221 ); 222 } catch { } 223 await QuestionPoll.update( 224 { 225 postId: remotePost.id, 226 }, 227 { 228 where: { 229 postId: existingPost.id, 230 }, 231 } 232 ); 233 await Quotes.update( 234 { 235 quoterPostId: remotePost.id, 236 }, 237 { 238 where: { 239 quoterPostId: existingPost.id, 240 }, 241 } 242 ); 243 if ( 244 !(await Quotes.findOne({ 245 where: { 246 quotedPostId: remotePost.id, 247 }, 248 })) 249 ) { 250 await Quotes.update( 251 { 252 quotedPostId: remotePost.id, 253 }, 254 { 255 where: { 256 quotedPostId: existingPost.id, 257 }, 258 } 259 ); 260 } 261 await RemoteUserPostView.update( 262 { 263 postId: remotePost.id, 264 }, 265 { 266 where: { 267 postId: existingPost.id, 268 }, 269 } 270 ); 271 await SilencedPost.update( 272 { 273 postId: remotePost.id, 274 }, 275 { 276 where: { 277 postId: existingPost.id, 278 }, 279 } 280 ); 281 await SilencedPost.update( 282 { 283 postId: remotePost.id, 284 }, 285 { 286 where: { 287 postId: existingPost.id, 288 }, 289 } 290 ); 291 await UserBitesPostRelation.update( 292 { 293 postId: remotePost.id, 294 }, 295 { 296 where: { 297 postId: existingPost.id, 298 }, 299 } 300 ); 301 await UserBookmarkedPosts.update( 302 { 303 postId: remotePost.id, 304 }, 305 { 306 where: { 307 postId: existingPost.id, 308 }, 309 } 310 ); 311 await UserLikesPostRelations.update( 312 { 313 postId: remotePost.id, 314 }, 315 { 316 where: { 317 postId: existingPost.id, 318 }, 319 } 320 ); 321 await Post.update( 322 { 323 parentId: remotePost.id, 324 }, 325 { 326 where: { 327 parentId: existingPost.id, 328 }, 329 } 330 ); 331 332 await Post.destroy({ 333 where: { 334 bskyCid: post.cid, 335 remotePostId: null, 336 userId: { 337 [Op.notIn]: await getAllLocalUserIds(), 338 }, 339 }, 340 }); 341 } 342 await remotePost.save(); 343 return remotePost.id; 344 } 345 } catch (error) { 346 logger.debug({ 347 message: `Error in obtaining fedi post ${verifiedFedi}`, 348 error, 349 }); 350 } 351 } 352 if (!postCreator || !post) { 353 const usr = postCreator 354 ? postCreator 355 : await User.findOne({ where: { url: completeEnvironment.deletedUser } }); 356 357 const invalidPost = await Post.create({ 358 userId: usr?.id, 359 content: `Failed to get atproto post`, 360 parentId: parentId, 361 isDeleted: true, 362 createdAt: new Date(0), 363 updatedAt: new Date(0), 364 }); 365 return invalidPost.id; 366 } 367 if (postCreator) { 368 const medias = getPostMedias(post); 369 let tags: string[] = []; 370 let mentions: string[] = []; 371 let record = post.record as any; 372 let postText = record.text; 373 let federatedWoot = false; 374 if (record.fullText || record.bridgyOriginalText) { 375 federatedWoot = true; 376 tags = record.fullTags?.split("\n").filter((x: string) => !!x) ?? []; // also detect full tags 377 postText = record.fullText ?? record.bridgyOriginalText; 378 } 379 if (record.facets && record.facets.length > 0 && !federatedWoot) { 380 // lets get mentions 381 const mentionedDids = record.facets 382 .flatMap((elem: any) => elem.features) 383 .map((elem: any) => elem.did) 384 .filter((elem: any) => elem); 385 if (mentionedDids && mentionedDids.length > 0) { 386 const mentionedUsers = await User.findAll({ 387 where: { 388 bskyDid: { 389 [Op.in]: mentionedDids, 390 }, 391 }, 392 }); 393 mentions = mentionedUsers.map((elem) => elem.id); 394 } 395 396 const rt = new RichText({ 397 text: postText, 398 facets: record.facets, 399 }); 400 let text = ""; 401 402 for (const segment of rt.segments()) { 403 if (segment.isLink()) { 404 const href = segment.link?.uri; 405 text += `<a href="${href}" target="_blank">${href}</a>`; 406 } else if (segment.isMention()) { 407 const href = `${completeEnvironment.frontendUrl}/blog/${segment.mention?.did}`; 408 text += `<a href="${href}" target="_blank">${segment.text}</a>`; 409 } else if (segment.isTag()) { 410 const href = `${completeEnvironment.frontendUrl 411 }/dashboard/search/${segment.text.substring(1)}`; 412 text += `<a href="${href}" target="_blank">${segment.text}</a>`; 413 tags.push(segment.text.substring(1)); 414 } else { 415 text += segment.text; 416 } 417 } 418 postText = text; 419 } 420 if (!federatedWoot) postText = postText.replaceAll("\n", "<br>"); 421 422 const labels = getPostLabels(post); 423 let cw = 424 labels.length > 0 425 ? `Post is labeled as: ${labels.join(", ")}` 426 : undefined; 427 if (!cw && postCreator.NSFW) { 428 cw = 429 "This user has been marked as NSFW and the post has been labeled automatically as NSFW"; 430 } 431 const newData = { 432 userId: postCreator.id, 433 bskyCid: post.cid, 434 bskyUri: post.uri, 435 content: postText, 436 createdAt: new Date((post.record as any).createdAt), 437 privacy: Privacy.Public, 438 parentId: parentId, 439 content_warning: cw, 440 ...getPostInteractionLevels(post, parentId), 441 }; 442 if (!parentId) { 443 delete newData.parentId; 444 } 445 446 if ((await getAllLocalUserIds()).includes(newData.userId) && !forceUpdate) { 447 // dirty as hell but this should stop the duplication 448 await wait(1500); 449 } 450 let [postToProcess, created] = await Post.findOrCreate({ 451 where: { bskyUri: post.uri }, 452 defaults: newData, 453 }); 454 // do not update existing posts. But what if local user creates a post through bsky? then we force updte i guess 455 if ( 456 !(await getAllLocalUserIds()).includes(postToProcess.userId) || 457 created 458 ) { 459 if (!created) { 460 postToProcess.set(newData); 461 await postToProcess.save(); 462 } 463 if (medias) { 464 await Media.destroy({ 465 where: { 466 postId: postToProcess.id, 467 }, 468 }); 469 await Media.bulkCreate( 470 medias.map((media: any) => { 471 return { ...media, postId: postToProcess.id }; 472 }) 473 ); 474 } 475 if (parentId) { 476 const ancestors = await postToProcess.getAncestors({ 477 attributes: ["userId"], 478 where: { 479 hierarchyLevel: { 480 [Op.gt]: postToProcess.hierarchyLevel - 5, 481 }, 482 }, 483 }); 484 mentions = mentions.concat(ancestors.map((elem) => elem.userId)); 485 } 486 mentions = [...new Set(mentions)]; 487 if (mentions.length > 0) { 488 await Notification.destroy({ 489 where: { 490 notificationType: "MENTION", 491 postId: postToProcess.id, 492 }, 493 }); 494 await PostMentionsUserRelation.destroy({ 495 where: { 496 postId: postToProcess.id, 497 }, 498 }); 499 await bulkCreateNotifications( 500 mentions.map((mnt) => ({ 501 notificationType: "MENTION", 502 postId: postToProcess.id, 503 notifiedUserId: mnt, 504 userId: postToProcess.userId, 505 createdAt: new Date(postToProcess.createdAt), 506 })), 507 { 508 ignoreDuplicates: true, 509 postContent: postText, 510 userUrl: postCreator.url, 511 } 512 ); 513 await PostMentionsUserRelation.bulkCreate( 514 mentions.map((mnt) => { 515 return { 516 userId: mnt, 517 postId: postToProcess.id, 518 }; 519 }), 520 { ignoreDuplicates: true } 521 ); 522 } 523 if (tags.length > 0) { 524 await PostTag.destroy({ 525 where: { 526 postId: postToProcess.id, 527 }, 528 }); 529 await PostTag.bulkCreate( 530 tags.map((tag) => { 531 return { 532 postId: postToProcess.id, 533 tagName: tag, 534 }; 535 }) 536 ); 537 } 538 const quotedPostUri = getQuotedPostUri(post); 539 if (quotedPostUri) { 540 const quotedPostId = await getAtProtoThread(quotedPostUri); 541 if (quotedPostId) { 542 const quotedPost = await Post.findByPk(quotedPostId); 543 if (quotedPost) { 544 await createNotification( 545 { 546 notificationType: "QUOTE", 547 notifiedUserId: quotedPost.userId, 548 userId: postToProcess.userId, 549 postId: postToProcess.id, 550 }, 551 { 552 postContent: postToProcess.content, 553 userUrl: postCreator?.url, 554 } 555 ); 556 await Quotes.findOrCreate({ 557 where: { 558 quoterPostId: postToProcess.id, 559 quotedPostId: quotedPostId, 560 }, 561 }); 562 } 563 } 564 } 565 } 566 567 return postToProcess.id; 568 } 569} 570 571function getPostMedias(post: PostView) { 572 let res: MediaAttributes[] = []; 573 const labels = getPostLabels(post); 574 const embed = (post.record as any).embed; 575 if (embed) { 576 if (embed.external) { 577 res = res.concat([ 578 { 579 mediaType: !embed.external.uri.startsWith("https://media.ternor.com/") 580 ? "text/html" 581 : "image/gif", 582 description: embed.external.title, 583 url: embed.external.uri, 584 mediaOrder: 0, 585 external: true, 586 }, 587 ]); 588 } 589 if (embed.images || embed.media) { 590 // case with quote and gif / link preview 591 if (embed.media?.external) { 592 res = res.concat([ 593 { 594 mediaType: !embed.media.external.uri.startsWith( 595 "https://media.ternor.com/" 596 ) 597 ? "text/html" 598 : "image/gif", 599 description: embed.media.external.title, 600 url: embed.media.external.uri, 601 mediaOrder: 0, 602 external: true, 603 }, 604 ]); 605 } else { 606 const thingToProcess = embed.images ? embed.images : embed.media.images; 607 if (thingToProcess) { 608 const toConcat = thingToProcess.map((media: any, index: any) => { 609 const cid = media.image.ref["$link"] 610 ? media.image.ref["$link"] 611 : media.image.ref.toString(); 612 const did = post.author.did; 613 return { 614 mediaType: media.image.mimeType, 615 description: media.alt, 616 height: media.aspectRatio?.height, 617 width: media.aspectRatio?.width, 618 url: `?cid=${encodeURIComponent(cid)}&did=${encodeURIComponent( 619 did 620 )}`, 621 mediaOrder: index, 622 external: true, 623 }; 624 }); 625 res = res.concat(toConcat); 626 } else { 627 logger.debug({ 628 message: `Bsky problem getting medias on post ${post.uri}`, 629 }); 630 } 631 } 632 } 633 if (embed.video) { 634 const video = embed.video; 635 const cid = video.ref["$link"] 636 ? video.ref["$link"] 637 : video.ref.toString(); 638 const did = post.author.did; 639 res = res.concat([ 640 { 641 mediaType: embed.video.mimeType, 642 description: "", 643 height: embed.aspectRatio?.height, 644 width: embed.aspectRatio?.width, 645 url: `?cid=${encodeURIComponent(cid)}&did=${encodeURIComponent(did)}`, 646 mediaOrder: 0, 647 external: true, 648 }, 649 ]); 650 } 651 } 652 return res.map((m) => { 653 return { 654 ...m, 655 NSFW: labels.length > 0, 656 }; 657 }); 658} 659 660// TODO improve this so we get better nsfw messages lol 661function getPostLabels(post: PostView) { 662 let labels = new Set<string>(); 663 if (post.labels) { 664 for (const label of post.labels) { 665 if (label.neg && labels.has(label.val)) { 666 labels.delete(label.val); 667 } else { 668 labels.add(label.val); 669 } 670 } 671 } 672 return Array.from(labels); 673} 674 675function getPostInteractionLevels( 676 post: PostView, 677 parentId: string | undefined 678): { 679 replyControl: InteractionControlType; 680 likeControl: InteractionControlType; 681 reblogControl: InteractionControlType; 682 quoteControl: InteractionControlType; 683} { 684 let canQuote = InteractionControl.Anyone; 685 let canReply: InteractionControlType = InteractionControl.Anyone; 686 if (post.viewer && post.viewer.embeddingDisabled) { 687 canQuote = InteractionControl.NoOne; 688 } 689 if (parentId) { 690 canReply = InteractionControl.SameAsOp; 691 canQuote = InteractionControl.SameAsOp; 692 } else if ( 693 post.threadgate && 694 post.threadgate.record && 695 (post.threadgate.record as any).allow 696 ) { 697 const allowList = (post.threadgate.record as any).allow; 698 if (allowList.length == 0) { 699 canReply = InteractionControl.NoOne; 700 } else { 701 const mentiontypes: string[] = allowList 702 .map((elem: any) => elem["$type"]) 703 .map((elem: string) => elem.split("app.bsky.feed.threadgate#")[1]); 704 if (mentiontypes.includes("mentionRule")) { 705 if (mentiontypes.includes("followingRule")) { 706 canReply = mentiontypes.includes("followerRule") 707 ? InteractionControl.FollowersFollowersAndMentioned 708 : InteractionControl.FollowingAndMentioned; 709 } else { 710 canReply = mentiontypes.includes("followerRule") 711 ? InteractionControl.FollowersAndMentioned 712 : InteractionControl.MentionedUsersOnly; 713 } 714 } else { 715 if (mentiontypes.includes("followingRule")) { 716 canReply = mentiontypes.includes("followerRule") 717 ? InteractionControl.FollowersAndFollowing 718 : InteractionControl.Following; 719 } else { 720 canReply = mentiontypes.includes("followerRule") 721 ? InteractionControl.Followers 722 : InteractionControl.NoOne; 723 } 724 } 725 } 726 } 727 728 if ( 729 canQuote === InteractionControl.Anyone && 730 canReply != InteractionControl.Anyone 731 ) { 732 canQuote = canReply; 733 } 734 735 return { 736 quoteControl: canQuote, 737 replyControl: canReply, 738 likeControl: InteractionControl.Anyone, 739 reblogControl: InteractionControl.Anyone, 740 }; 741} 742 743async function getAtProtoThread( 744 uri: string, 745 forceUpdate?: boolean, 746 ignoreDescendents?: boolean 747): Promise<string | undefined> { 748 const postExisting = forceUpdate 749 ? undefined 750 : await Post.findOne({ 751 where: { 752 bskyUri: uri, 753 }, 754 }); 755 if (postExisting) { 756 return postExisting.id; 757 } 758 759 // TODO optimize this a bit if post is not in reply to anything that we dont have 760 const preThread = await getPostThreadSafe({ 761 uri: uri, 762 depth: ignoreDescendents ? 0 : 50, 763 parentHeight: 1000, 764 }); 765 if (preThread) { 766 const thread: ThreadViewPost = preThread.data.thread as ThreadViewPost; 767 //const tmpDids = getDidsFromThread(thread) 768 //forcePopulateUsers(tmpDids, (await adminUser) as Model<any, any>) 769 let parentId: string | undefined = undefined; 770 if (thread.parent) { 771 parentId = (await processParents( 772 thread.parent as ThreadViewPost 773 )) as string; 774 } 775 const procesedPost = await processSinglePost( 776 thread.post, 777 parentId, 778 forceUpdate 779 ); 780 if (thread.replies && procesedPost) { 781 for await (const repliesThread of thread.replies) { 782 processReplies(repliesThread as ThreadViewPost, procesedPost); 783 } 784 } 785 return procesedPost as string; 786 } else { 787 } 788} 789 790async function processReplies(thread: ThreadViewPost, parentId: string) { 791 if (thread && thread.post) { 792 try { 793 const post = await processSinglePost(thread.post, parentId); 794 if (thread.replies && post) { 795 for await (const repliesThread of thread.replies) { 796 processReplies(repliesThread as ThreadViewPost, post); 797 } 798 } 799 } catch (error) { 800 logger.debug({ 801 message: `Error processing bluesky replies`, 802 error: error, 803 thread: thread, 804 parentId, 805 }); 806 } 807 } 808} 809 810async function processParents( 811 thread: ThreadViewPost 812): Promise<string | undefined> { 813 let parentId: string | undefined = undefined; 814 if (thread.parent) { 815 parentId = await processParents(thread.parent as ThreadViewPost); 816 } 817 return await processSinglePost(thread.post, parentId); 818} 819 820function getQuotedPostUri(post: PostView): string | undefined { 821 let res: string | undefined = undefined; 822 const embed = (post.record as any).embed; 823 if (embed && ["app.bsky.embed.record"].includes(embed["$type"])) { 824 res = embed.record.uri; 825 } 826 // case of post with pictures and quote 827 else if ( 828 embed && 829 ["app.bsky.embed.recordWithMedia"].includes(embed["$type"]) 830 ) { 831 res = embed.record.record.uri; 832 } 833 return res; 834} 835 836export async function getPostThreadSafe(options: any) { 837 try { 838 const agent = await getAdminAtprotoSession(); 839 return await agent.getPostThread(options); 840 } catch (error) { 841 logger.debug({ 842 message: `Error trying to get atproto thread`, 843 options: options, 844 error: error, 845 }); 846 } 847} 848 849export { getAtProtoThread, getQuotedPostUri, processSinglePost };