unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at testPDSNotExplode 490 lines 18 kB view raw
1import { Op } from 'sequelize' 2import { 3 Blocks, 4 Emoji, 5 FederatedHost, 6 Media, 7 Post, 8 PostMentionsUserRelation, 9 ServerBlock, 10 PostTag, 11 User, 12 sequelize, 13 Ask, 14 Notification 15} from '../../models/index.js' 16import { completeEnvironment } from '../backendOptions.js' 17import { logger } from '../logger.js' 18import { getRemoteActor } from './getRemoteActor.js' 19import { getPetitionSigned } from './getPetitionSigned.js' 20import { fediverseTag } from '../../interfaces/fediverse/tags.js' 21import { loadPoll } from './loadPollFromPost.js' 22import { getApObjectPrivacy } from './getPrivacy.js' 23import dompurify from 'isomorphic-dompurify' 24import { Queue } from 'bullmq' 25import { bulkCreateNotifications } from '../pushNotifications.js' 26import { getDeletedUser } from '../cacheGetters/getDeletedUser.js' 27import { Privacy } from '../../models/post.js' 28import { getAtProtoThread } from '../../atproto/utils/getAtProtoThread.js' 29import * as cheerio from 'cheerio' 30 31const updateMediaDataQueue = new Queue('processRemoteMediaData', { 32 connection: completeEnvironment.bullmqConnection, 33 defaultJobOptions: { 34 removeOnComplete: true, 35 attempts: 3, 36 backoff: { 37 type: 'exponential', 38 delay: 1000 39 }, 40 removeOnFail: true 41 } 42}) 43 44async function getPostThreadRecursive( 45 user: any, 46 remotePostId: string | null, 47 remotePostObject?: any, 48 localPostToForceUpdate?: string, 49 options?: any 50) { 51 if (remotePostId === null) return 52 53 const deletedUser = getDeletedUser() 54 try { 55 remotePostId.startsWith(`${completeEnvironment.frontendUrl}/fediverse/post/`) 56 } catch (error) { 57 logger.info({ 58 message: 'Error with url on post', 59 object: remotePostId, 60 stack: new Error().stack 61 }) 62 return 63 } 64 if (remotePostId.startsWith(`${completeEnvironment.frontendUrl}/fediverse/post/`)) { 65 // we are looking at a local post 66 const partToRemove = `${completeEnvironment.frontendUrl}/fediverse/post/` 67 const postId = remotePostId.substring(partToRemove.length) 68 return await Post.findOne({ 69 where: { 70 id: postId 71 } 72 }) 73 } 74 if (completeEnvironment.enableBsky && remotePostId.startsWith('at://')) { 75 // Bluesky post. Likely coming from an import 76 const postInDatabase = await Post.findOne({ 77 where: { 78 bskyUri: remotePostId 79 } 80 }) 81 if (postInDatabase) { 82 return postInDatabase 83 } else if (!remotePostObject) { 84 const postId = await getAtProtoThread(remotePostId) 85 return await Post.findByPk(postId) 86 } 87 } 88 const postInDatabase = await Post.findOne({ 89 where: { 90 remotePostId: remotePostId 91 } 92 }) 93 if (postInDatabase && !localPostToForceUpdate) { 94 if (postInDatabase.remotePostId) { 95 const parentPostPetition = await getPetitionSigned(user, postInDatabase.remotePostId) 96 if (parentPostPetition) { 97 await loadPoll(parentPostPetition, postInDatabase, user) 98 } 99 } 100 return postInDatabase 101 } else { 102 try { 103 const postPetition = remotePostObject ? remotePostObject : await getPetitionSigned(user, remotePostId) 104 if (postPetition && !localPostToForceUpdate) { 105 const remotePostInDatabase = await Post.findOne({ 106 where: { 107 remotePostId: postPetition.id 108 } 109 }) 110 if (remotePostInDatabase) { 111 if (remotePostInDatabase.remotePostId) { 112 const parentPostPetition = await getPetitionSigned(user, remotePostInDatabase.remotePostId) 113 if (parentPostPetition) { 114 await loadPoll(parentPostPetition, remotePostInDatabase, user) 115 } 116 } 117 return remotePostInDatabase 118 } 119 } 120 // peertube: what the fuck 121 let actorUrl = postPetition.attributedTo 122 if (Array.isArray(actorUrl)) { 123 actorUrl = actorUrl[0].id 124 } 125 const remoteUser = await getRemoteActor(actorUrl, user) 126 if (remoteUser) { 127 const remoteHost = (await FederatedHost.findByPk(remoteUser.federatedHostId as string)) as FederatedHost 128 const remoteUserServerBaned = remoteHost?.blocked ? remoteHost.blocked : false 129 // HACK: some implementations (GTS IM LOOKING AT YOU) may send a single element instead of an array 130 // I should had used a funciton instead of this dirty thing, BUT you see, its late. Im eepy 131 // also this code is CRITICAL. A failure here is a big problem. So this hack it is 132 postPetition.tag = !Array.isArray(postPetition.tag) 133 ? [postPetition.tag].filter((elem) => elem) 134 : postPetition.tag 135 const medias: any[] = [] 136 const fediTags: fediverseTag[] = [ 137 ...new Set<fediverseTag>( 138 postPetition.tag 139 ?.filter((elem: fediverseTag) => 140 [ 141 postPetition.tag.some((tag: fediverseTag) => tag.type == 'WafrnHashtag') ? 'WafrnHashtag' : 'Hashtag' 142 ].includes(elem.type) 143 ) 144 .map((elem: fediverseTag) => { 145 return { href: elem.href, type: elem.type, name: elem.name } 146 }) 147 ) 148 ] 149 let fediMentions: fediverseTag[] = postPetition.tag?.filter((elem: fediverseTag) => elem.type === 'Mention') 150 if (fediMentions == undefined) { 151 fediMentions = postPetition.to.map((elem: string) => { 152 return { href: elem } 153 }) 154 } 155 const fediEmojis: any[] = postPetition.tag?.filter((elem: fediverseTag) => elem.type === 'Emoji') 156 157 const privacy = getApObjectPrivacy(postPetition, remoteUser) 158 159 let postTextContent = `${postPetition.content ? postPetition.content : ''}` // Fix for bridgy giving this as undefined 160 if (postPetition.type == 'Video') { 161 // peertube federation. We just add a link to the video, federating this is HELL 162 postTextContent = postTextContent + ` <a href="${postPetition.id}" target="_blank">${postPetition.id}</a>` 163 } 164 if (postPetition.tag && postPetition.tag.some((tag: fediverseTag) => tag.type === 'WafrnHashtag')) { 165 // Ok we have wafrn hashtags with us, we are probably talking with another wafrn! Crazy, I know 166 const dom = cheerio.load(postTextContent) 167 const tags = dom('a.hashtag').html('') 168 postTextContent = dom.html() 169 } 170 if ( 171 postPetition.attachment && 172 postPetition.attachment.length > 0 && 173 (!remoteUser.banned || options?.allowMediaFromBanned) 174 ) { 175 for await (const remoteFile of postPetition.attachment) { 176 if (remoteFile.type !== 'Link') { 177 const wafrnMedia = await Media.create({ 178 url: remoteFile.url, 179 NSFW: postPetition?.sensitive, 180 userId: remoteUserServerBaned || remoteUser.banned ? (await deletedUser)?.id : remoteUser.id, 181 description: remoteFile.name, 182 ipUpload: 'IMAGE_FROM_OTHER_FEDIVERSE_INSTANCE', 183 mediaOrder: postPetition.attachment.indexOf(remoteFile), // could be non consecutive but its ok 184 external: true, 185 mediaType: remoteFile.mediaType ? remoteFile.mediaType : '', 186 blurhash: remoteFile.blurhash ? remoteFile.blurhash : null, 187 height: remoteFile.height ? remoteFile.height : null, 188 width: remoteFile.width ? remoteFile.width : null 189 }) 190 if (!wafrnMedia.mediaType || (wafrnMedia.mediaType?.startsWith('image') && !wafrnMedia.width)) { 191 await updateMediaDataQueue.add(`updateMedia:${wafrnMedia.id}`, { 192 mediaId: wafrnMedia.id 193 }) 194 } 195 medias.push(wafrnMedia) 196 } else { 197 postTextContent = '' + postTextContent + `<a href="${remoteFile.href}" >${remoteFile.href}</a>` 198 } 199 } 200 } 201 const lemmyName = postPetition.name ? postPetition.name : '' 202 postTextContent = postTextContent ? postTextContent : `<p>${lemmyName}</p>` 203 let createdAt = new Date(postPetition.published) 204 if (createdAt.getTime() > new Date().getTime()) { 205 createdAt = new Date() 206 } 207 const postToCreate: any = { 208 content: '' + postTextContent, 209 content_warning: postPetition.summary 210 ? postPetition.summary 211 : remoteUser.NSFW 212 ? 'User is marked as NSFW by this instance staff. Possible NSFW without tagging' 213 : '', 214 createdAt: createdAt, 215 updatedAt: createdAt, 216 userId: remoteUserServerBaned || remoteUser.banned ? (await deletedUser)?.id : remoteUser.id, 217 remotePostId: postPetition.id, 218 privacy: privacy 219 } 220 221 if (postPetition.name) { 222 postToCreate.title = postPetition.name 223 } 224 225 const mentionedUsersIds: string[] = [] 226 const quotes: any[] = [] 227 try { 228 if (!remoteUser.banned && !remoteUserServerBaned) { 229 for await (const mention of fediMentions) { 230 let mentionedUser 231 if (mention.href?.indexOf(completeEnvironment.frontendUrl) !== -1) { 232 const username = mention.href?.substring( 233 `${completeEnvironment.frontendUrl}/fediverse/blog/`.length 234 ) as string 235 mentionedUser = await User.findOne({ 236 where: sequelize.where(sequelize.fn('lower', sequelize.col('url')), username.toLowerCase()) 237 }) 238 } else { 239 mentionedUser = await getRemoteActor(mention.href, user) 240 } 241 if ( 242 mentionedUser?.id && 243 mentionedUser.id != (await deletedUser)?.id && 244 !mentionedUsersIds.includes(mentionedUser.id) 245 ) { 246 mentionedUsersIds.push(mentionedUser.id) 247 } 248 } 249 } 250 } catch (error) { 251 logger.info('problem processing mentions') 252 logger.info(error) 253 } 254 255 if (postPetition.inReplyTo && postPetition.id !== postPetition.inReplyTo) { 256 const parent = await getPostThreadRecursive( 257 user, 258 postPetition.inReplyTo.id ? postPetition.inReplyTo.id : postPetition.inReplyTo 259 ) 260 postToCreate.parentId = parent?.id 261 } 262 263 const existingPost = localPostToForceUpdate ? await Post.findByPk(localPostToForceUpdate) : undefined 264 265 if (existingPost) { 266 existingPost.set(postToCreate) 267 await existingPost.save() 268 await loadPoll(postPetition, existingPost, user) 269 } 270 271 const newPost = existingPost ? existingPost : await Post.create(postToCreate) 272 try { 273 if (!remoteUser.banned && !remoteUserServerBaned && fediEmojis) { 274 processEmojis(newPost, fediEmojis) 275 } 276 } catch (error) { 277 logger.debug('Problem processing emojis') 278 } 279 newPost.setMedias(medias) 280 try { 281 if (postPetition.quote || postPetition.quoteUrl) { 282 const urlQuote = postPetition.quoteUrl || postPetition.quote 283 const postToQuote = await getPostThreadRecursive(user, urlQuote) 284 if (postToQuote && postToQuote.privacy != Privacy.DirectMessage) { 285 quotes.push(postToQuote) 286 } 287 if (!postToQuote) { 288 postToCreate.content = postToCreate.content + `<p>RE: ${urlQuote}</p>` 289 } 290 const postsToQuotePromise: any[] = [] 291 postPetition.tag 292 ?.filter((elem: fediverseTag) => elem.type === 'Link') 293 .forEach((quote: fediverseTag) => { 294 postsToQuotePromise.push(getPostThreadRecursive(user, quote.href as string)) 295 postToCreate.content = postToCreate.content.replace(quote.name, '') 296 }) 297 const quotesToAdd = await Promise.allSettled(postsToQuotePromise) 298 const quotesThatWillGetAdded = quotesToAdd.filter( 299 (elem) => elem.status === 'fulfilled' && elem.value && elem.value.privacy !== 10 300 ) 301 quotesThatWillGetAdded.forEach((quot) => { 302 if (quot.status === 'fulfilled' && !quotes.map((q) => q.id).includes(quot.value.id)) { 303 quotes.push(quot.value) 304 } 305 }) 306 } 307 } catch (error) { 308 logger.info('Error processing quotes') 309 logger.debug(error) 310 } 311 newPost.setQuoted(quotes) 312 313 await newPost.save() 314 315 await bulkCreateNotifications( 316 quotes.map((quote) => ({ 317 notificationType: 'QUOTE', 318 notifiedUserId: quote.userId, 319 userId: newPost.userId, 320 postId: newPost.id, 321 createdAt: new Date(newPost.createdAt) 322 })), 323 { 324 postContent: newPost.content, 325 userUrl: remoteUser.url 326 } 327 ) 328 try { 329 if (!remoteUser.banned && !remoteUserServerBaned) { 330 await addTagsToPost(newPost, fediTags) 331 } 332 } catch (error) { 333 logger.info('problem processing tags') 334 } 335 if (mentionedUsersIds.length != 0) { 336 await processMentions(newPost, mentionedUsersIds) 337 } 338 await loadPoll(remotePostObject, newPost, user) 339 const postCleanContent = dompurify.sanitize(newPost.content, { ALLOWED_TAGS: [] }).trim() 340 const mentions = await newPost.getMentionPost() 341 if (postCleanContent.startsWith('!ask') && mentions.length === 1) { 342 let askContent = postCleanContent.split(`!ask @${mentions[0].url}`)[1] 343 if (askContent.startsWith('@' + completeEnvironment.instanceUrl)) { 344 askContent = askContent.split('@' + completeEnvironment.instanceUrl)[1] 345 } 346 await Ask.create({ 347 question: askContent, 348 userAsker: newPost.userId, 349 userAsked: mentions[0].id, 350 answered: false, 351 apObject: JSON.stringify(postPetition) 352 }) 353 } 354 return newPost 355 } 356 } catch (error) { 357 logger.trace({ 358 message: 'error getting remote post', 359 url: remotePostId, 360 user: user.url, 361 problem: error 362 }) 363 return null 364 } 365 } 366} 367 368async function addTagsToPost(post: any, originalTags: fediverseTag[]) { 369 let tags = [...originalTags] 370 const res = await post.setPostTags([]) 371 if (tags.some((elem) => elem.name == 'WafrnHashtag')) { 372 tags = tags.filter((elem) => elem.name == 'WafrnHashtag') 373 } 374 return await PostTag.bulkCreate( 375 tags 376 .filter((elem) => elem && post && elem.name && post.id) 377 .map((elem) => { 378 return { 379 tagName: elem?.name?.replace('#', ''), 380 postId: post.id 381 } 382 }) 383 ) 384} 385 386async function processMentions(post: any, userIds: string[]) { 387 await post.setMentionPost([]) 388 await Notification.destroy({ 389 where: { 390 notificationType: 'MENTION', 391 postId: post.id 392 } 393 }) 394 const blocks = await Blocks.findAll({ 395 where: { 396 blockerId: { 397 [Op.in]: userIds 398 }, 399 blockedId: post.userId 400 } 401 }) 402 const remoteUser = await User.findByPk(post.userId, { attributes: ['url', 'federatedHostId'] }) 403 const userServerBlocks = await ServerBlock.findAll({ 404 where: { 405 userBlockerId: { 406 [Op.in]: userIds 407 }, 408 blockedServerId: remoteUser?.federatedHostId || '' 409 } 410 }) 411 const blockerIds: string[] = blocks 412 .map((block: any) => block.blockerId) 413 .concat(userServerBlocks.map((elem: any) => elem.userBlockerId)) 414 415 await bulkCreateNotifications( 416 userIds.map((mentionedUserId) => ({ 417 notificationType: 'MENTION', 418 notifiedUserId: mentionedUserId, 419 userId: post.userId, 420 postId: post.id, 421 createdAt: new Date(post.createdAt) 422 })), 423 { 424 postContent: post.content, 425 userUrl: remoteUser?.url 426 } 427 ) 428 429 return await PostMentionsUserRelation.bulkCreate( 430 userIds 431 .filter((elem) => !blockerIds.includes(elem)) 432 .map((elem) => { 433 return { 434 postId: post.id, 435 userId: elem 436 } 437 }), 438 { 439 ignoreDuplicates: true 440 } 441 ) 442} 443 444async function processEmojis(post: any, fediEmojis: any[]) { 445 let emojis: any[] = [] 446 let res: any 447 const emojiIds: string[] = Array.from(new Set(fediEmojis.map((emoji: any) => emoji.id))) 448 const foundEmojis = await Emoji.findAll({ 449 where: { 450 id: { 451 [Op.in]: emojiIds 452 } 453 } 454 }) 455 foundEmojis.forEach((emoji: any) => { 456 const newData = fediEmojis.find((foundEmoji: any) => foundEmoji.id === emoji.id) 457 if (newData && newData.icon?.url) { 458 emoji.set({ 459 url: newData.icon.url 460 }) 461 emoji.save() 462 } else { 463 logger.debug('issue with emoji') 464 logger.debug(emoji) 465 logger.debug(newData) 466 } 467 }) 468 emojis = emojis.concat(foundEmojis) 469 const notFoundEmojis = fediEmojis.filter((elem: any) => !foundEmojis.find((found: any) => found.id === elem.id)) 470 if (fediEmojis && notFoundEmojis && notFoundEmojis.length > 0) { 471 try { 472 const newEmojis = notFoundEmojis.map((newEmoji: any) => { 473 return { 474 id: newEmoji.id ? newEmoji.id : newEmoji.name + newEmoji.icon?.url, 475 name: newEmoji.name, 476 external: true, 477 url: newEmoji.icon?.url 478 } 479 }) 480 emojis = emojis.concat(await Emoji.bulkCreate(newEmojis, { ignoreDuplicates: true })) 481 } catch (error) { 482 logger.debug('Error with emojis') 483 logger.debug(error) 484 } 485 } 486 487 return await post.setEmojis(emojis) 488} 489 490export { getPostThreadRecursive }