Schedule posts to Bluesky with Cloudflare workers. skyscheduler.work
cf tool bsky-tool cloudflare bluesky schedule bsky service social-media cloudflare-workers
at main 568 lines 22 kB view raw
1import { addHours, isAfter, isEqual } from "date-fns"; 2import { and, asc, desc, eq, getTableColumns, gt, gte, ne, sql } from "drizzle-orm"; 3import { BatchItem } from "drizzle-orm/batch"; 4import { DrizzleD1Database } from "drizzle-orm/d1"; 5import has from "just-has"; 6import isEmpty from "just-is-empty"; 7import { v4 as uuidv4, validate as uuidValid } from 'uuid'; 8import { Post } from "../classes/post"; 9import { RepostInfo } from "../classes/repost"; 10import { mediaFiles, posts, repostCounts, reposts } from "../db/app.schema"; 11import { accounts, users } from "../db/auth.schema"; 12import { MAX_POSTS_PER_THREAD, MAX_REPOST_POSTS, MAX_REPOST_RULES_PER_POST } from "../limits"; 13import { APP_NAME } from "../siteinfo"; 14import { 15 AccountStatus, AllContext, BatchQuery, 16 CreateObjectResponse, CreatePostQueryResponse, 17 DeleteResponse, EmbedDataType, PostLabel 18} from "../types"; 19import { PostSchema } from "../validation/postSchema"; 20import { RepostSchema } from "../validation/repostSchema"; 21import { 22 getChildPostsOfThread, getPostByCID, 23 getPostThreadCount, getRepostCountQuery, updatePostForGivenUser 24} from "./db/data"; 25import { 26 getViolationsForUser, removeViolation, 27 removeViolations, userHasViolationsDB 28} from "./db/violations"; 29import { floorGivenTime } from "./helpers"; 30import { deleteEmbedsFromR2 } from "./r2Query"; 31 32export const getPostsForUser = async (c: AllContext): Promise<Post[]|null> => { 33 try { 34 const userId = c.get("userId"); 35 const db: DrizzleD1Database = c.get("db"); 36 if (userId && db) { 37 const results = await db.select({ 38 ...getTableColumns(posts), 39 repostCount: repostCounts.count 40 }) 41 .from(posts).where(eq(posts.userId, userId)) 42 .leftJoin(repostCounts, eq(posts.uuid, repostCounts.uuid)) 43 .orderBy(desc(posts.scheduledDate), asc(posts.threadOrder), desc(posts.createdAt)).all(); 44 45 if (isEmpty(results)) 46 return null; 47 48 return results.map((itm) => new Post(itm)); 49 } 50 } catch(err) { 51 console.error(`Failed to get posts for user, session could not be fetched ${err}`); 52 } 53 return null; 54}; 55 56export const updateUserData = async (c: AllContext, newData: any): Promise<boolean> => { 57 const userId = c.get("userId"); 58 const db: DrizzleD1Database = c.get("db"); 59 try { 60 if (!db) { 61 console.error("Unable to update user data, no database object"); 62 return false; 63 } 64 if (userId) { 65 let queriesToExecute:BatchItem<"sqlite">[] = []; 66 67 if (has(newData, "password")) { 68 // cache out the new hash 69 const newPassword = newData.password; 70 // remove it from the original object 71 delete newData.password; 72 73 // add the query to the db batch object 74 queriesToExecute.push(db.update(accounts) 75 .set({password: newPassword}) 76 .where(eq(accounts.userId, userId))); 77 } 78 79 // If we have new data about the username, pds, or password 80 if (has(newData, "bskyAppPass") || has(newData, "username") || has(newData, "pds")) { 81 // check if the user has violations 82 if (await userHasViolationsDB(db, userId)) { 83 // they do, so clear them out 84 await removeViolations(c, userId, [AccountStatus.InvalidAccount, AccountStatus.Deactivated]); 85 } 86 } 87 88 if (!isEmpty(newData)) { 89 queriesToExecute.push(db.update(users).set(newData) 90 .where(eq(users.id, userId))); 91 } 92 93 if (queriesToExecute.length > 0) 94 await db.batch(queriesToExecute as BatchQuery); 95 return true; 96 } 97 } catch(err) { 98 console.error(`Failed to update new user data for user ${userId}`); 99 } 100 return false; 101}; 102 103export const deletePost = async (c: AllContext, id: string): Promise<DeleteResponse> => { 104 const userId = c.get("userId"); 105 const returnObj: DeleteResponse = {success: false, isRepost: false}; 106 if (!userId) { 107 return returnObj; 108 } 109 110 const db: DrizzleD1Database = c.get("db"); 111 if (!db) { 112 console.error(`unable to delete post ${id}, db was null`); 113 return returnObj; 114 } 115 116 const postObj = await getPostById(c, id); 117 if (postObj !== null) { 118 let queriesToExecute: BatchItem<"sqlite">[] = []; 119 // If the post has not been posted, that means we still have files for it, so 120 // delete the files from R2 121 if (!postObj.posted) { 122 await deleteEmbedsFromR2(c, postObj.embeds); 123 if (await userHasViolationsDB(db, userId)) { 124 // Remove the media too big violation if it's been given 125 await removeViolation(c, userId, AccountStatus.MediaTooBig); 126 } 127 } 128 returnObj.isRepost = postObj.isRepost || false; 129 130 // If the parent post is not null, then attempt to find and update the post chain 131 const parentPost = postObj.parentPost; 132 if (parentPost !== undefined) { 133 // set anyone who had this as their parent to this post chain 134 queriesToExecute.push(db.update(posts).set({parentPost: parentPost, threadOrder: postObj.threadOrder}) 135 .where(and(eq(posts.parentPost, postObj.postid), eq(posts.rootPost, postObj.rootPost!)))); 136 137 // Update the post order past here 138 queriesToExecute.push(db.update(posts).set({threadOrder: sql`threadOrder - 1`}) 139 .where( 140 and(and(eq(posts.rootPost, postObj.rootPost!), ne(posts.threadOrder, -1)), gt(posts.threadOrder, postObj.threadOrder) 141 ))); 142 } 143 144 // We'll need to delete all of the child embeds then, a costly, annoying experience. 145 if (postObj.isThreadRoot) { 146 const childPosts = await getChildPostsOfThread(c, postObj.postid); 147 if (childPosts !== null) { 148 for (const childPost of childPosts) { 149 c.executionCtx.waitUntil(deleteEmbedsFromR2(c, childPost.embeds)); 150 queriesToExecute.push(db.delete(posts).where(eq(posts.uuid, childPost.postid))); 151 } 152 } else { 153 console.warn(`could not get child posts of thread ${postObj.postid} during delete`); 154 } 155 } else if (postObj.isChildPost) { 156 // this is not a thread root, so we should figure out how many children are left. 157 const childPostCount = (await getPostThreadCount(db, postObj.user, postObj.rootPost!)) - 1; 158 if (childPostCount <= 0) { 159 queriesToExecute.push(db.update(posts).set({threadOrder: -1}).where(eq(posts.uuid, postObj.rootPost!))); 160 } 161 } 162 163 // delete post 164 queriesToExecute.push(db.delete(posts).where(eq(posts.uuid, id))); 165 await c.executionCtx.waitUntil(db.batch(queriesToExecute as BatchQuery)); 166 returnObj.success = true; 167 returnObj.needsRefresh = postObj.isThreadRoot; 168 } 169 return returnObj; 170}; 171 172export const createPost = async (c: AllContext, body: any): Promise<CreatePostQueryResponse> => { 173 const db: DrizzleD1Database = c.get("db"); 174 const userId = c.get("userId"); 175 if (!userId) 176 return { ok: false, msg: "Your user session has expired, please login again"}; 177 178 if (!db) { 179 console.error("unable to create post, db became null"); 180 return { ok: false, msg: "An application error has occurred please refresh" }; 181 } 182 183 const validation = PostSchema.safeParse(body); 184 if (!validation.success) { 185 return { ok: false, msg: validation.error.toString() }; 186 } 187 188 const { content, scheduledDate, embeds, label, makePostNow, repostData, rootPost, parentPost } = validation.data; 189 const scheduleDate = floorGivenTime((makePostNow) ? new Date() : new Date(scheduledDate)); 190 191 // Ensure scheduled date is in the future 192 // 193 // Do not do this check if you are doing a threaded post 194 // or you have marked that you are posting right now. 195 if (!isAfter(scheduleDate, new Date()) && 196 (!makePostNow && (isEmpty(rootPost) && isEmpty(parentPost)))) { 197 return { ok: false, msg: "Scheduled date must be in the future" }; 198 } 199 200 // Check if account is in violation 201 const violationData = await getViolationsForUser(db, userId); 202 if (violationData != null) { 203 if (violationData.tosViolation) { 204 return {ok: false, msg: `This account is unable to use ${APP_NAME} services at this time`}; 205 } else if (violationData.userPassInvalid) { 206 return {ok: false, msg: "The BSky account credentials is invalid, please update these in the settings"}; 207 } 208 } 209 210 // Check to see if this post already exists for thread 211 let rootPostID:string|undefined = undefined; 212 let parentPostID:string|undefined = undefined; 213 let rootPostData: Post|null = null; 214 let parentPostOrder: number = 0; 215 if (uuidValid(rootPost)) { 216 // returns null if the post doesn't appear on this account 217 rootPostData = await getPostById(c, rootPost!); 218 if (rootPostData !== null) { 219 if (rootPostData.posted) { 220 return { ok: false, msg: "You cannot make threads off already posted posts"}; 221 } 222 if (rootPostData.isChildPost) { 223 return { ok: false, msg: "Subthreads of threads are not allowed." }; 224 } 225 if (rootPostData.isRepost) { 226 return {ok: false, msg: "Threads cannot be made of repost actions"}; 227 } 228 rootPostID = rootPostData.rootPost || rootPostData.postid; 229 // If this isn't a direct reply, check directly underneath it 230 if (rootPost !== parentPost) { 231 if (uuidValid(parentPost)) { 232 const parentPostData = await getPostById(c, parentPost!); 233 if (parentPostData !== null) { 234 parentPostID = parentPost!; 235 parentPostOrder = parentPostData.threadOrder + 1; 236 } else { 237 return { ok: false, msg: "The given parent post cannot be found on your account"}; 238 } 239 } else { 240 return { ok: false, msg: "The given parent post is invalid"}; 241 } 242 } else { 243 parentPostID = rootPostData.postid; 244 parentPostOrder = 1; // Root will always be 0, so if this is root, go 1 up. 245 } 246 } else { 247 return { ok: false, msg: "The given root post cannot be found on your account"}; 248 } 249 } 250 251 const isThreadedPost: boolean = (rootPostID !== undefined && parentPostID !== undefined); 252 if (isThreadedPost) { 253 const threadCount: number = await getPostThreadCount(db, userId, rootPostID!); 254 if (threadCount >= MAX_POSTS_PER_THREAD) { 255 return { ok: false, msg: `this thread has hit the limit of ${MAX_POSTS_PER_THREAD} posts per thread`}; 256 } 257 } 258 259 // Create repost metadata 260 const scheduleGUID = (!isThreadedPost) ? uuidv4() : undefined; 261 const repostInfo = (!isThreadedPost) ? 262 new RepostInfo(scheduleGUID!, scheduleDate, false, repostData) : undefined; 263 264 // Create the posts 265 const postUUID = uuidv4(); 266 let dbOperations: BatchItem<"sqlite">[] = []; 267 268 // if we're threaded, insert our post before the given parent 269 if (isThreadedPost) { 270 // Update the parent to our new post 271 dbOperations.push(db.update(posts).set({parentPost: postUUID }) 272 .where(and(eq(posts.parentPost, parentPostID!), eq(posts.rootPost, rootPostID!)))); 273 274 // update all posts past this one to also update their order (we will take their id) 275 dbOperations.push(db.update(posts).set({threadOrder: sql`threadOrder + 1`}) 276 .where( 277 and(and(eq(posts.rootPost, rootPostID!), ne(posts.threadOrder, -1)), gte(posts.threadOrder, parentPostOrder) 278 ))); 279 280 // Update the root post so that it has the correct flags set on it as well. 281 if (rootPostData!.isThreadRoot == false) { 282 dbOperations.push(db.update(posts).set({threadOrder: 0, rootPost: rootPostData!.postid}) 283 .where(eq(posts.uuid, rootPostData!.postid))); 284 } 285 } else { 286 rootPostID = postUUID; 287 } 288 289 // Add the post to the DB 290 dbOperations.push(db.insert(posts).values({ 291 content, 292 uuid: postUUID, 293 postNow: makePostNow, 294 scheduledDate: (!isThreadedPost) ? scheduleDate : new Date(rootPostData!.scheduledDate!), 295 rootPost: rootPostID, 296 parentPost: parentPostID, 297 repostInfo: (!isThreadedPost) ? [repostInfo!] : [], 298 threadOrder: (!isThreadedPost) ? undefined : parentPostOrder, 299 embedContent: embeds, 300 contentLabel: label || PostLabel.None, 301 userId: userId 302 })); 303 304 if (!isEmpty(embeds)) { 305 // Loop through all data within an embed blob so we can mark it as posted 306 for (const embed of embeds!) { 307 if (embed.type === EmbedDataType.Image || embed.type === EmbedDataType.Video) { 308 dbOperations.push( 309 db.update(mediaFiles).set({hasPost: true}).where(eq(mediaFiles.fileName, embed.content))); 310 } 311 } 312 } 313 314 // Add repost data to the table 315 if (repostData && !isThreadedPost) { 316 for (var i = 1; i <= repostData.times; ++i) { 317 dbOperations.push(db.insert(reposts).values({ 318 uuid: postUUID, 319 scheduleGuid: scheduleGUID, 320 scheduledDate: addHours(scheduleDate, i*repostData.hours) 321 })); 322 } 323 // Push the repost counts in 324 dbOperations.push(db.insert(repostCounts) 325 .values({uuid: postUUID, count: repostData.times})); 326 } 327 328 // Batch the query 329 const batchResponse = await db.batch(dbOperations as BatchQuery); 330 const success = batchResponse.every((el) => el.success); 331 return { ok: success, postNow: makePostNow, postId: postUUID, msg: success ? "success" : "fail" }; 332}; 333 334export const createRepost = async (c: AllContext, body: any): Promise<CreateObjectResponse> => { 335 const db: DrizzleD1Database = c.get("db"); 336 337 const userId = c.get("userId"); 338 if (!userId) 339 return { ok: false, msg: "Your user session has expired, please login again"}; 340 341 if (!db) { 342 console.error("unable to create repost db became null"); 343 return {ok: false, msg: "Invalid server operation occurred, please refresh"}; 344 } 345 346 const validation = RepostSchema.safeParse(body); 347 if (!validation.success) { 348 return { ok: false, msg: validation.error.toString() }; 349 } 350 const { url, uri, cid, content, scheduledDate, repostData } = validation.data; 351 const scheduleDate = floorGivenTime(new Date(scheduledDate)); 352 const timeNow = new Date(); 353 354 // Ensure scheduled date is in the future 355 if (!isAfter(scheduleDate, timeNow)) { 356 return { ok: false, msg: "Scheduled date must be in the future" }; 357 } 358 359 // Check if account is in violation 360 const violationData = await getViolationsForUser(db, userId); 361 if (violationData != null) { 362 if (violationData.tosViolation) { 363 return {ok: false, msg: `This account is unable to use ${APP_NAME} services at this time`}; 364 } else if (violationData.userPassInvalid) { 365 return {ok: false, msg: "The BSky account credentials is invalid, please update these in the settings"}; 366 } 367 } 368 let postUUID; 369 let dbOperations: BatchItem<"sqlite">[] = []; 370 const scheduleGUID = uuidv4(); 371 const repostInfo: RepostInfo = new RepostInfo(scheduleGUID, scheduleDate, true, repostData); 372 373 // Check to see if the post already exists 374 // (check also against the userId here as well to avoid cross account data collisions) 375 const existingPost = await getPostByCID(db, userId, cid); 376 if (existingPost !== null) { 377 postUUID = existingPost.postid; 378 const existingPostDate = existingPost.scheduledDate!; 379 // Ensure the date asked for is after what the post's schedule date is 380 if (!isAfter(scheduleDate, existingPostDate) && !isEqual(scheduledDate, existingPostDate)) { 381 return { ok: false, msg: "Scheduled date must be after the initial post's date" }; 382 } 383 // Make sure this isn't a thread post. 384 // We could probably work around this but I don't think it's worth the effort. 385 if (existingPost.isChildPost) { 386 return {ok: false, msg: "Repost posts cannot be created from child thread posts"}; 387 } 388 389 // Add repost info object to existing array 390 let newRepostInfo: RepostInfo[] = isEmpty(existingPost.repostInfo) ? [] : existingPost.repostInfo!; 391 if (newRepostInfo.length >= MAX_REPOST_RULES_PER_POST) { 392 return {ok: false, msg: `Num of reposts rules for this post has exceeded the limit of ${MAX_REPOST_RULES_PER_POST} rules`}; 393 } 394 395 const repostInfoTimeStr = repostInfo.time.toISOString(); 396 // Check to see if we have an exact repost match. 397 // If we do, do not update the repostInfo, as repost table will drop the duplicates for us anyways. 398 const isNewInfoNotDuped = (el: any) => { 399 if (el.time == repostInfoTimeStr) { 400 if (el.count == repostInfo.count) { 401 return el.hours != repostInfo.hours; 402 } 403 } 404 return true; 405 }; 406 if (newRepostInfo.every(isNewInfoNotDuped)) { 407 newRepostInfo.push(repostInfo); 408 409 // push record update to add to json array 410 dbOperations.push(db.update(posts).set({repostInfo: newRepostInfo}).where(and( 411 eq(posts.userId, userId), eq(posts.cid, cid)))); 412 } 413 } else { 414 // Limit of post reposts on the user's account. 415 const accountCurrentReposts = await db.$count(posts, and(eq(posts.userId, userId), eq(posts.isRepost, true))); 416 if (MAX_REPOST_POSTS > 0 && accountCurrentReposts >= MAX_REPOST_POSTS) { 417 return {ok: false, msg: 418 `You've cannot create any more repost posts at this time. Using: (${accountCurrentReposts}/${MAX_REPOST_POSTS}) repost posts`}; 419 } 420 421 // Create the post base for this repost 422 postUUID = uuidv4(); 423 dbOperations.push(db.insert(posts).values({ 424 content: !isEmpty(content) ? content! : `Repost of ${url}`, 425 uuid: postUUID, 426 cid: cid, 427 uri: uri, 428 posted: true, 429 isRepost: true, 430 repostInfo: [repostInfo], 431 scheduledDate: scheduleDate, 432 userId: userId 433 })); 434 } 435 436 // Push initial repost 437 let totalRepostCount = 1; 438 dbOperations.push(db.insert(reposts).values({ 439 uuid: postUUID, 440 scheduleGuid: scheduleGUID, 441 scheduledDate: scheduleDate 442 }).onConflictDoNothing()); 443 444 // Push other repost times if we have them 445 if (repostData) { 446 for (var i = 1; i <= repostData.times; ++i) { 447 dbOperations.push(db.insert(reposts).values({ 448 uuid: postUUID, 449 scheduleGuid: scheduleGUID, 450 scheduledDate: addHours(scheduleDate, i*repostData.hours) 451 }).onConflictDoNothing()); 452 } 453 totalRepostCount += repostData.times; 454 } 455 // Update repost counts 456 if (existingPost !== null) { 457 // update existing content posts (but only for reposts, no one else) 458 if (existingPost.isRepost && !isEmpty(content)) { 459 dbOperations.push(db.update(posts).set({content: content!}).where(eq(posts.uuid, postUUID))); 460 } 461 462 // Because there could be conflicts that drop, run a count on the entire list and use the value from that 463 // we also don't know if the repost count table has repost values for this item, so we should 464 // attempt to always insert and update if it already exists 465 totalRepostCount = -1; 466 } 467 468 // pushing any value under zero causes a full recount 469 dbOperations.push(getRepostCountQuery(db, postUUID, totalRepostCount)); 470 471 const batchResponse = await db.batch(dbOperations as BatchQuery); 472 const success = batchResponse.every((el) => el.success); 473 return { ok: success, msg: success ? "success" : "fail", postId: postUUID }; 474}; 475 476export const updatePostForUser = async (c: AllContext, id: string, newData: Object): Promise<boolean> => { 477 const userId = c.get("userId"); 478 return await updatePostForGivenUser(c, userId, id, newData); 479}; 480 481export const getPostById = async(c: AllContext, id: string): Promise<Post|null> => { 482 const userId = c.get("userId"); 483 if (!userId || !uuidValid(id)) 484 return null; 485 486 const db: DrizzleD1Database = c.get("db"); 487 if (!db) { 488 console.error(`unable to get post ${id}, db was null`); 489 return null; 490 } 491 492 const result = await db.select().from(posts) 493 .where(and(eq(posts.uuid, id), eq(posts.userId, userId))) 494 .limit(1).all(); 495 496 if (!isEmpty(result)) 497 return new Post(result[0]); 498 return null; 499}; 500 501// used for post editing, acts very similar to getPostsForUser 502export const getPostByIdWithReposts = async(c: AllContext, id: string): Promise<Post|null> => { 503 const userId = c.get("userId"); 504 if (!userId || !uuidValid(id)) 505 return null; 506 507 const db: DrizzleD1Database = c.get("db"); 508 if (!db) { 509 console.error(`unable to get post ${id} with reposts, db was null`); 510 return null; 511 } 512 513 const result = await db.select({ 514 ...getTableColumns(posts), 515 repostCount: repostCounts.count, 516 }).from(posts) 517 .where(and(eq(posts.uuid, id), eq(posts.userId, userId))) 518 .leftJoin(repostCounts, eq(posts.uuid, repostCounts.uuid)) 519 .limit(1).all(); 520 521 if (!isEmpty(result)) 522 return new Post(result[0]); 523 return null; 524}; 525 526export const deleteRepostRule = async(c: AllContext, id: string, scheduleId: string) => { 527 const db: DrizzleD1Database = c.get("db"); 528 if (!db) { 529 console.error(`unable to delete schedule id ${scheduleId} from post ${id}, db was null`); 530 return false; 531 } 532 if (!uuidValid(id) || !uuidValid(scheduleId)) { 533 return false; 534 } 535 536 // Get the post to make sure it's valid and update post json 537 const currentPost = await getPostByIdWithReposts(c, id); 538 if (currentPost != null && currentPost.repostInfo !== undefined) { 539 // remove the schedule from the current json object set 540 let newRepostInfo: RepostInfo[] = currentPost.repostInfo!.filter((itm) => { 541 return itm.guid !== scheduleId; 542 }); 543 544 let queriesToExecute: BatchItem<"sqlite">[] = []; 545 // modify the current repost info 546 queriesToExecute.push(db.update(posts).set({repostInfo: newRepostInfo}).where(and( 547 eq(posts.userId, currentPost.user), eq(posts.uuid, currentPost.postid)))); 548 549 // Delete batch schedule items 550 // we don't bundle this one because we want to get a count to make the operation below it, better 551 const deletedItems = await db.delete(reposts).where(eq(reposts.scheduleGuid, scheduleId)).returning({date: reposts.scheduledDate}); 552 553 // did we delete anything at all? 554 if (deletedItems.length <= 0) { 555 // we did not, that's really strange. 556 console.warn(`When trying to delete reposts for ${currentPost.postid}, schedule id ${scheduleId} had empty items`); 557 return false; 558 } 559 560 // Force update the repost count :) 561 queriesToExecute.push(getRepostCountQuery(db, id, currentPost.repostCount! - deletedItems.length)); 562 563 // Batch push up everything 564 const batchResponse = await db.batch(queriesToExecute as BatchQuery); 565 return batchResponse.every((el) => el.success); 566 } 567 return false; 568};