unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1// returns the post id
2import { getAtProtoSession } from "./getAtProtoSession.js";
3import { QueryParams } from "@atproto/sync/dist/firehose/lexicons.js";
4import {
5 EmojiReaction,
6 Media,
7 Notification,
8 Post,
9 PostAncestor,
10 PostMentionsUserRelation,
11 PostReport,
12 PostTag,
13 QuestionPoll,
14 Quotes,
15 RemoteUserPostView,
16 SilencedPost,
17 User,
18 UserBitesPostRelation,
19 UserBookmarkedPosts,
20 UserLikesPostRelations,
21} from "../../models/index.js";
22import { Model, Op } from "sequelize";
23import {
24 PostView,
25 ThreadViewPost,
26} from "@atproto/api/dist/client/types/app/bsky/feed/defs.js";
27import { getAtprotoUser } from "./getAtprotoUser.js";
28import { CreateOrUpdateOp } from "@skyware/firehose";
29import { logger } from "../../utils/logger.js";
30import { RichText } from "@atproto/api";
31import showdown from "showdown";
32import {
33 bulkCreateNotifications,
34 createNotification,
35} from "../../utils/pushNotifications.js";
36import { getAllLocalUserIds } from "../../utils/cacheGetters/getAllLocalUserIds.js";
37import {
38 InteractionControl,
39 InteractionControlType,
40 Privacy,
41} from "../../models/post.js";
42import { wait } from "../../utils/wait.js";
43import { UpdatedAt } from "sequelize-typescript";
44import { completeEnvironment } from "../../utils/backendOptions.js";
45import { MediaAttributes } from "../../models/media.js";
46import { getAdminAtprotoSession } from "../../utils/atproto/getAdminAtprotoSession.js";
47import { getPostThreadRecursive } from "../../utils/activitypub/getPostThreadRecursive.js";
48import { Queue, QueueEvents } from "bullmq";
49import { getAdminUser } from "../../utils/getAdminAndDeletedUser.js";
50
51const markdownConverter = new showdown.Converter({
52 simplifiedAutoLink: true,
53 literalMidWordUnderscores: true,
54 strikethrough: true,
55 simpleLineBreaks: true,
56 openLinksInNewWindow: true,
57 emoji: true,
58});
59
60const adminUser = getAdminUser();
61
62async function processSinglePost(
63 post: PostView,
64 parentId?: string,
65 forceUpdate?: boolean
66): Promise<string | undefined> {
67 if (!post || !completeEnvironment.enableBsky) {
68 return undefined;
69 }
70 if (!forceUpdate) {
71 const existingPost = await Post.findOne({
72 where: {
73 bskyUri: post.uri,
74 },
75 });
76 if (existingPost) {
77 return existingPost.id;
78 }
79 }
80 let postCreator: User | undefined;
81 try {
82 postCreator = await getAtprotoUser(
83 post.author.did,
84 (await adminUser) as User
85 );
86 } catch (error) {
87 logger.debug({
88 message: `Problem obtaining user from post`,
89 post,
90 parentId,
91 forceUpdate,
92 error,
93 });
94 }
95 let verifiedFedi: string | undefined;
96 if ("fediverseId" in post.record || "bridgyOriginalUrl" in post.record) {
97 if ("bridgyOriginalUrl" in post.record) {
98 const res = await fetch(
99 "https://slingshot.microcosm.blue/xrpc/com.bad-example.identity.resolveMiniDoc" +
100 `?identifier=${post.author.did}`
101 );
102 if (res.ok) {
103 const json = (await res.json()) as { pds: string };
104 if (json.pds.replace(/^https?:\/\//, "") === "atproto.brid.gy") {
105 // if user is on bridgy pds, verify it
106 verifiedFedi = post.record.bridgyOriginalUrl as string;
107 logger.info(
108 { uri: post.uri, url: post.record.bridgyOriginalUrl },
109 "fedi bridged post is bridgy fed"
110 );
111 }
112 }
113 } else {
114 // prob wafrn post, but lets verify it
115 try {
116 const waf = await fetch(
117 `https://${new URL(post.record.fediverseId as string).hostname
118 }/api/environment`
119 );
120 if (waf.ok) {
121 const res = await fetch(
122 (post.record.fediverseId as string).replace(
123 "fediverse/",
124 "api/v2/"
125 ),
126 {
127 headers: {
128 Accept: "application/json",
129 },
130 }
131 );
132 if (res.ok) {
133 const json = (await res.json()) as { posts: { bskyCid: string }[] };
134 if (json.posts[0].bskyCid === post.cid) {
135 verifiedFedi = post.record.fediverseId as string;
136 logger.info(
137 { uri: post.uri, url: post.record.fediverseId },
138 "fedi bridged post is wafrn"
139 );
140 }
141 }
142 }
143 } catch (error) {
144 logger.debug({
145 error,
146 message: `Error in obtaining fedi post ${post.record.fediverseId}`,
147 });
148 }
149 }
150 }
151 if (verifiedFedi) {
152 try {
153 const remotePost = await getPostThreadRecursive(
154 await getAdminUser(),
155 verifiedFedi
156 );
157 if (remotePost) {
158 await getPostThreadRecursive(
159 await getAdminUser(),
160 verifiedFedi,
161 undefined,
162 remotePost.id
163 );
164 remotePost.bskyCid = post.cid;
165 remotePost.bskyUri = post.uri;
166 // if there's already a bsky post about
167 // this that doesn't have any fedi urls, delete it
168 // and prob update the things
169 let existingPost = await Post.findOne({
170 where: {
171 bskyCid: post.cid,
172 remotePostId: null,
173 },
174 });
175 if (
176 existingPost &&
177 !(await getAllLocalUserIds()).includes(existingPost.userId)
178 ) {
179 // very expensive updates! but only happens when user
180 // searches existing post that is alr on db
181 await EmojiReaction.update(
182 {
183 postId: remotePost.id,
184 },
185 {
186 where: {
187 postId: existingPost.id,
188 },
189 }
190 );
191 await Notification.update(
192 {
193 postId: remotePost.id,
194 },
195 {
196 where: {
197 postId: existingPost.id,
198 },
199 }
200 );
201 await PostReport.update(
202 {
203 postId: remotePost.id,
204 },
205 {
206 where: {
207 postId: existingPost.id,
208 },
209 }
210 );
211 try {
212 await PostAncestor.update(
213 {
214 postsId: remotePost.id,
215 },
216 {
217 where: {
218 postsId: existingPost.id,
219 },
220 }
221 );
222 } catch { }
223 await QuestionPoll.update(
224 {
225 postId: remotePost.id,
226 },
227 {
228 where: {
229 postId: existingPost.id,
230 },
231 }
232 );
233 await Quotes.update(
234 {
235 quoterPostId: remotePost.id,
236 },
237 {
238 where: {
239 quoterPostId: existingPost.id,
240 },
241 }
242 );
243 if (
244 !(await Quotes.findOne({
245 where: {
246 quotedPostId: remotePost.id,
247 },
248 }))
249 ) {
250 await Quotes.update(
251 {
252 quotedPostId: remotePost.id,
253 },
254 {
255 where: {
256 quotedPostId: existingPost.id,
257 },
258 }
259 );
260 }
261 await RemoteUserPostView.update(
262 {
263 postId: remotePost.id,
264 },
265 {
266 where: {
267 postId: existingPost.id,
268 },
269 }
270 );
271 await SilencedPost.update(
272 {
273 postId: remotePost.id,
274 },
275 {
276 where: {
277 postId: existingPost.id,
278 },
279 }
280 );
281 await SilencedPost.update(
282 {
283 postId: remotePost.id,
284 },
285 {
286 where: {
287 postId: existingPost.id,
288 },
289 }
290 );
291 await UserBitesPostRelation.update(
292 {
293 postId: remotePost.id,
294 },
295 {
296 where: {
297 postId: existingPost.id,
298 },
299 }
300 );
301 await UserBookmarkedPosts.update(
302 {
303 postId: remotePost.id,
304 },
305 {
306 where: {
307 postId: existingPost.id,
308 },
309 }
310 );
311 await UserLikesPostRelations.update(
312 {
313 postId: remotePost.id,
314 },
315 {
316 where: {
317 postId: existingPost.id,
318 },
319 }
320 );
321 await Post.update(
322 {
323 parentId: remotePost.id,
324 },
325 {
326 where: {
327 parentId: existingPost.id,
328 },
329 }
330 );
331
332 await Post.destroy({
333 where: {
334 bskyCid: post.cid,
335 remotePostId: null,
336 userId: {
337 [Op.notIn]: await getAllLocalUserIds(),
338 },
339 },
340 });
341 }
342 await remotePost.save();
343 return remotePost.id;
344 }
345 } catch (error) {
346 logger.debug({
347 message: `Error in obtaining fedi post ${verifiedFedi}`,
348 error,
349 });
350 }
351 }
352 if (!postCreator || !post) {
353 const usr = postCreator
354 ? postCreator
355 : await User.findOne({ where: { url: completeEnvironment.deletedUser } });
356
357 const invalidPost = await Post.create({
358 userId: usr?.id,
359 content: `Failed to get atproto post`,
360 parentId: parentId,
361 isDeleted: true,
362 createdAt: new Date(0),
363 updatedAt: new Date(0),
364 });
365 return invalidPost.id;
366 }
367 if (postCreator) {
368 const medias = getPostMedias(post);
369 let tags: string[] = [];
370 let mentions: string[] = [];
371 let record = post.record as any;
372 let postText = record.text;
373 let federatedWoot = false;
374 if (record.fullText || record.bridgyOriginalText) {
375 federatedWoot = true;
376 tags = record.fullTags?.split("\n").filter((x: string) => !!x) ?? []; // also detect full tags
377 postText = record.fullText ?? record.bridgyOriginalText;
378 }
379 if (record.facets && record.facets.length > 0 && !federatedWoot) {
380 // lets get mentions
381 const mentionedDids = record.facets
382 .flatMap((elem: any) => elem.features)
383 .map((elem: any) => elem.did)
384 .filter((elem: any) => elem);
385 if (mentionedDids && mentionedDids.length > 0) {
386 const mentionedUsers = await User.findAll({
387 where: {
388 bskyDid: {
389 [Op.in]: mentionedDids,
390 },
391 },
392 });
393 mentions = mentionedUsers.map((elem) => elem.id);
394 }
395
396 const rt = new RichText({
397 text: postText,
398 facets: record.facets,
399 });
400 let text = "";
401
402 for (const segment of rt.segments()) {
403 if (segment.isLink()) {
404 const href = segment.link?.uri;
405 text += `<a href="${href}" target="_blank">${href}</a>`;
406 } else if (segment.isMention()) {
407 const href = `${completeEnvironment.frontendUrl}/blog/${segment.mention?.did}`;
408 text += `<a href="${href}" target="_blank">${segment.text}</a>`;
409 } else if (segment.isTag()) {
410 const href = `${completeEnvironment.frontendUrl
411 }/dashboard/search/${segment.text.substring(1)}`;
412 text += `<a href="${href}" target="_blank">${segment.text}</a>`;
413 tags.push(segment.text.substring(1));
414 } else {
415 text += segment.text;
416 }
417 }
418 postText = text;
419 }
420 if (!federatedWoot) postText = postText.replaceAll("\n", "<br>");
421
422 const labels = getPostLabels(post);
423 let cw =
424 labels.length > 0
425 ? `Post is labeled as: ${labels.join(", ")}`
426 : undefined;
427 if (!cw && postCreator.NSFW) {
428 cw =
429 "This user has been marked as NSFW and the post has been labeled automatically as NSFW";
430 }
431 const newData = {
432 userId: postCreator.id,
433 bskyCid: post.cid,
434 bskyUri: post.uri,
435 content: postText,
436 createdAt: new Date((post.record as any).createdAt),
437 privacy: Privacy.Public,
438 parentId: parentId,
439 content_warning: cw,
440 ...getPostInteractionLevels(post, parentId),
441 };
442 if (!parentId) {
443 delete newData.parentId;
444 }
445
446 if ((await getAllLocalUserIds()).includes(newData.userId) && !forceUpdate) {
447 // dirty as hell but this should stop the duplication
448 await wait(1500);
449 }
450 let [postToProcess, created] = await Post.findOrCreate({
451 where: { bskyUri: post.uri },
452 defaults: newData,
453 });
454 // do not update existing posts. But what if local user creates a post through bsky? then we force updte i guess
455 if (
456 !(await getAllLocalUserIds()).includes(postToProcess.userId) ||
457 created
458 ) {
459 if (!created) {
460 postToProcess.set(newData);
461 await postToProcess.save();
462 }
463 if (medias) {
464 await Media.destroy({
465 where: {
466 postId: postToProcess.id,
467 },
468 });
469 await Media.bulkCreate(
470 medias.map((media: any) => {
471 return { ...media, postId: postToProcess.id };
472 })
473 );
474 }
475 if (parentId) {
476 const ancestors = await postToProcess.getAncestors({
477 attributes: ["userId"],
478 where: {
479 hierarchyLevel: {
480 [Op.gt]: postToProcess.hierarchyLevel - 5,
481 },
482 },
483 });
484 mentions = mentions.concat(ancestors.map((elem) => elem.userId));
485 }
486 mentions = [...new Set(mentions)];
487 if (mentions.length > 0) {
488 await Notification.destroy({
489 where: {
490 notificationType: "MENTION",
491 postId: postToProcess.id,
492 },
493 });
494 await PostMentionsUserRelation.destroy({
495 where: {
496 postId: postToProcess.id,
497 },
498 });
499 await bulkCreateNotifications(
500 mentions.map((mnt) => ({
501 notificationType: "MENTION",
502 postId: postToProcess.id,
503 notifiedUserId: mnt,
504 userId: postToProcess.userId,
505 createdAt: new Date(postToProcess.createdAt),
506 })),
507 {
508 ignoreDuplicates: true,
509 postContent: postText,
510 userUrl: postCreator.url,
511 }
512 );
513 await PostMentionsUserRelation.bulkCreate(
514 mentions.map((mnt) => {
515 return {
516 userId: mnt,
517 postId: postToProcess.id,
518 };
519 }),
520 { ignoreDuplicates: true }
521 );
522 }
523 if (tags.length > 0) {
524 await PostTag.destroy({
525 where: {
526 postId: postToProcess.id,
527 },
528 });
529 await PostTag.bulkCreate(
530 tags.map((tag) => {
531 return {
532 postId: postToProcess.id,
533 tagName: tag,
534 };
535 })
536 );
537 }
538 const quotedPostUri = getQuotedPostUri(post);
539 if (quotedPostUri) {
540 const quotedPostId = await getAtProtoThread(quotedPostUri);
541 if (quotedPostId) {
542 const quotedPost = await Post.findByPk(quotedPostId);
543 if (quotedPost) {
544 await createNotification(
545 {
546 notificationType: "QUOTE",
547 notifiedUserId: quotedPost.userId,
548 userId: postToProcess.userId,
549 postId: postToProcess.id,
550 },
551 {
552 postContent: postToProcess.content,
553 userUrl: postCreator?.url,
554 }
555 );
556 await Quotes.findOrCreate({
557 where: {
558 quoterPostId: postToProcess.id,
559 quotedPostId: quotedPostId,
560 },
561 });
562 }
563 }
564 }
565 }
566
567 return postToProcess.id;
568 }
569}
570
571function getPostMedias(post: PostView) {
572 let res: MediaAttributes[] = [];
573 const labels = getPostLabels(post);
574 const embed = (post.record as any).embed;
575 if (embed) {
576 if (embed.external) {
577 res = res.concat([
578 {
579 mediaType: !embed.external.uri.startsWith("https://media.ternor.com/")
580 ? "text/html"
581 : "image/gif",
582 description: embed.external.title,
583 url: embed.external.uri,
584 mediaOrder: 0,
585 external: true,
586 },
587 ]);
588 }
589 if (embed.images || embed.media) {
590 // case with quote and gif / link preview
591 if (embed.media?.external) {
592 res = res.concat([
593 {
594 mediaType: !embed.media.external.uri.startsWith(
595 "https://media.ternor.com/"
596 )
597 ? "text/html"
598 : "image/gif",
599 description: embed.media.external.title,
600 url: embed.media.external.uri,
601 mediaOrder: 0,
602 external: true,
603 },
604 ]);
605 } else {
606 const thingToProcess = embed.images ? embed.images : embed.media.images;
607 if (thingToProcess) {
608 const toConcat = thingToProcess.map((media: any, index: any) => {
609 const cid = media.image.ref["$link"]
610 ? media.image.ref["$link"]
611 : media.image.ref.toString();
612 const did = post.author.did;
613 return {
614 mediaType: media.image.mimeType,
615 description: media.alt,
616 height: media.aspectRatio?.height,
617 width: media.aspectRatio?.width,
618 url: `?cid=${encodeURIComponent(cid)}&did=${encodeURIComponent(
619 did
620 )}`,
621 mediaOrder: index,
622 external: true,
623 };
624 });
625 res = res.concat(toConcat);
626 } else {
627 logger.debug({
628 message: `Bsky problem getting medias on post ${post.uri}`,
629 });
630 }
631 }
632 }
633 if (embed.video) {
634 const video = embed.video;
635 const cid = video.ref["$link"]
636 ? video.ref["$link"]
637 : video.ref.toString();
638 const did = post.author.did;
639 res = res.concat([
640 {
641 mediaType: embed.video.mimeType,
642 description: "",
643 height: embed.aspectRatio?.height,
644 width: embed.aspectRatio?.width,
645 url: `?cid=${encodeURIComponent(cid)}&did=${encodeURIComponent(did)}`,
646 mediaOrder: 0,
647 external: true,
648 },
649 ]);
650 }
651 }
652 return res.map((m) => {
653 return {
654 ...m,
655 NSFW: labels.length > 0,
656 };
657 });
658}
659
660// TODO improve this so we get better nsfw messages lol
661function getPostLabels(post: PostView) {
662 let labels = new Set<string>();
663 if (post.labels) {
664 for (const label of post.labels) {
665 if (label.neg && labels.has(label.val)) {
666 labels.delete(label.val);
667 } else {
668 labels.add(label.val);
669 }
670 }
671 }
672 return Array.from(labels);
673}
674
675function getPostInteractionLevels(
676 post: PostView,
677 parentId: string | undefined
678): {
679 replyControl: InteractionControlType;
680 likeControl: InteractionControlType;
681 reblogControl: InteractionControlType;
682 quoteControl: InteractionControlType;
683} {
684 let canQuote = InteractionControl.Anyone;
685 let canReply: InteractionControlType = InteractionControl.Anyone;
686 if (post.viewer && post.viewer.embeddingDisabled) {
687 canQuote = InteractionControl.NoOne;
688 }
689 if (parentId) {
690 canReply = InteractionControl.SameAsOp;
691 canQuote = InteractionControl.SameAsOp;
692 } else if (
693 post.threadgate &&
694 post.threadgate.record &&
695 (post.threadgate.record as any).allow
696 ) {
697 const allowList = (post.threadgate.record as any).allow;
698 if (allowList.length == 0) {
699 canReply = InteractionControl.NoOne;
700 } else {
701 const mentiontypes: string[] = allowList
702 .map((elem: any) => elem["$type"])
703 .map((elem: string) => elem.split("app.bsky.feed.threadgate#")[1]);
704 if (mentiontypes.includes("mentionRule")) {
705 if (mentiontypes.includes("followingRule")) {
706 canReply = mentiontypes.includes("followerRule")
707 ? InteractionControl.FollowersFollowersAndMentioned
708 : InteractionControl.FollowingAndMentioned;
709 } else {
710 canReply = mentiontypes.includes("followerRule")
711 ? InteractionControl.FollowersAndMentioned
712 : InteractionControl.MentionedUsersOnly;
713 }
714 } else {
715 if (mentiontypes.includes("followingRule")) {
716 canReply = mentiontypes.includes("followerRule")
717 ? InteractionControl.FollowersAndFollowing
718 : InteractionControl.Following;
719 } else {
720 canReply = mentiontypes.includes("followerRule")
721 ? InteractionControl.Followers
722 : InteractionControl.NoOne;
723 }
724 }
725 }
726 }
727
728 if (
729 canQuote === InteractionControl.Anyone &&
730 canReply != InteractionControl.Anyone
731 ) {
732 canQuote = canReply;
733 }
734
735 return {
736 quoteControl: canQuote,
737 replyControl: canReply,
738 likeControl: InteractionControl.Anyone,
739 reblogControl: InteractionControl.Anyone,
740 };
741}
742
743async function getAtProtoThread(
744 uri: string,
745 forceUpdate?: boolean,
746 ignoreDescendents?: boolean
747): Promise<string | undefined> {
748 const postExisting = forceUpdate
749 ? undefined
750 : await Post.findOne({
751 where: {
752 bskyUri: uri,
753 },
754 });
755 if (postExisting) {
756 return postExisting.id;
757 }
758
759 // TODO optimize this a bit if post is not in reply to anything that we dont have
760 const preThread = await getPostThreadSafe({
761 uri: uri,
762 depth: ignoreDescendents ? 0 : 50,
763 parentHeight: 1000,
764 });
765 if (preThread) {
766 const thread: ThreadViewPost = preThread.data.thread as ThreadViewPost;
767 //const tmpDids = getDidsFromThread(thread)
768 //forcePopulateUsers(tmpDids, (await adminUser) as Model<any, any>)
769 let parentId: string | undefined = undefined;
770 if (thread.parent) {
771 parentId = (await processParents(
772 thread.parent as ThreadViewPost
773 )) as string;
774 }
775 const procesedPost = await processSinglePost(
776 thread.post,
777 parentId,
778 forceUpdate
779 );
780 if (thread.replies && procesedPost) {
781 for await (const repliesThread of thread.replies) {
782 processReplies(repliesThread as ThreadViewPost, procesedPost);
783 }
784 }
785 return procesedPost as string;
786 } else {
787 }
788}
789
790async function processReplies(thread: ThreadViewPost, parentId: string) {
791 if (thread && thread.post) {
792 try {
793 const post = await processSinglePost(thread.post, parentId);
794 if (thread.replies && post) {
795 for await (const repliesThread of thread.replies) {
796 processReplies(repliesThread as ThreadViewPost, post);
797 }
798 }
799 } catch (error) {
800 logger.debug({
801 message: `Error processing bluesky replies`,
802 error: error,
803 thread: thread,
804 parentId,
805 });
806 }
807 }
808}
809
810async function processParents(
811 thread: ThreadViewPost
812): Promise<string | undefined> {
813 let parentId: string | undefined = undefined;
814 if (thread.parent) {
815 parentId = await processParents(thread.parent as ThreadViewPost);
816 }
817 return await processSinglePost(thread.post, parentId);
818}
819
820function getQuotedPostUri(post: PostView): string | undefined {
821 let res: string | undefined = undefined;
822 const embed = (post.record as any).embed;
823 if (embed && ["app.bsky.embed.record"].includes(embed["$type"])) {
824 res = embed.record.uri;
825 }
826 // case of post with pictures and quote
827 else if (
828 embed &&
829 ["app.bsky.embed.recordWithMedia"].includes(embed["$type"])
830 ) {
831 res = embed.record.record.uri;
832 }
833 return res;
834}
835
836export async function getPostThreadSafe(options: any) {
837 try {
838 const agent = await getAdminAtprotoSession();
839 return await agent.getPostThread(options);
840 } catch (error) {
841 logger.debug({
842 message: `Error trying to get atproto thread`,
843 options: options,
844 error: error,
845 });
846 }
847}
848
849export { getAtProtoThread, getQuotedPostUri, processSinglePost };