Schedule posts to Bluesky with Cloudflare workers. skyscheduler.work
cf tool bsky-tool cloudflare bluesky schedule bsky service social-media cloudflare-workers
at main 324 lines 12 kB view raw
1import { 2 and, asc, desc, eq, inArray, isNotNull, 3 lte, ne, notInArray, or, sql 4} from "drizzle-orm"; 5import { BatchItem } from "drizzle-orm/batch"; 6import { DrizzleD1Database } from "drizzle-orm/d1"; 7import isEmpty from "just-is-empty"; 8import { validate as uuidValid } from 'uuid'; 9import { Post } from "../../classes/post"; 10import { Repost } from "../../classes/repost"; 11import { posts, repostCounts, reposts } from "../../db/app.schema"; 12import { violations } from "../../db/enforcement.schema"; 13import { MAX_POSTED_LENGTH } from "../../limits"; 14import { 15 AllContext, 16 BatchQuery, 17 GetAllPostedBatch, 18 PostRecordResponse 19} from "../../types"; 20import { floorCurrentTime } from "../helpers"; 21 22export const getAllPostsForCurrentTime = async (c: AllContext, removeThreads: boolean = false): Promise<Post[]> => { 23 // Get all scheduled posts for current time 24 const db: DrizzleD1Database = c.get("db"); 25 if (!db) { 26 console.error("Could not get all posts for current time, db was null"); 27 return []; 28 } 29 const currentTime: Date = floorCurrentTime(); 30 31 const violationUsers = db.select({violators: violations.userId}).from(violations); 32 const postsToMake = db.$with('scheduledPosts').as(db.select().from(posts) 33 .where( 34 and( 35 and( 36 and( 37 eq(posts.posted, false), 38 ne(posts.postNow, true) // Ignore any posts that are marked for post now 39 ), 40 lte(posts.scheduledDate, currentTime) 41 ), 42 // ignore threads, we'll create this one later. 43 removeThreads ? eq(posts.threadOrder, -1) : lte(posts.threadOrder, 0) 44 ) 45 )); 46 const results = await db.with(postsToMake).select().from(postsToMake) 47 .where(notInArray(postsToMake.userId, violationUsers)).orderBy(asc(postsToMake.createdAt)).all(); 48 return results.map((item) => new Post(item)); 49}; 50 51export const getAllRepostsForGivenTime = async (c: AllContext, givenDate: Date): Promise<Repost[]> => { 52 // Get all scheduled posts for the given time 53 const db: DrizzleD1Database = c.get("db"); 54 if (!db) { 55 console.error("could not get all reposts for given timeframe, db was null"); 56 return []; 57 } 58 const query = db.select({uuid: reposts.uuid}).from(reposts) 59 .where(lte(reposts.scheduledDate, givenDate)); 60 const violationsQuery = db.select({data: violations.userId}).from(violations); 61 const results = await db.select({uuid: posts.uuid, uri: posts.uri, cid: posts.cid, userId: posts.userId }) 62 .from(posts) 63 .where(and(inArray(posts.uuid, query), notInArray(posts.userId, violationsQuery))) 64 .all(); 65 66 return results.map((item) => new Repost(item)); 67}; 68 69export const getAllRepostsForCurrentTime = async (c: AllContext): Promise<Repost[]> => { 70 return await getAllRepostsForGivenTime(c, floorCurrentTime()); 71}; 72 73export const deleteAllRepostsBeforeCurrentTime = async (c: AllContext) => { 74 const db: DrizzleD1Database = c.get("db"); 75 if (!db) { 76 console.error("unable to delete all reposts before current time, db was null"); 77 return; 78 } 79 const currentTime = floorCurrentTime(); 80 const deletedPosts = await db.delete(reposts).where(lte(reposts.scheduledDate, currentTime)) 81 .returning({id: reposts.uuid, scheduleGuid: reposts.scheduleGuid}); 82 83 // This is really stupid and I hate it, but someone has to update repost counts once posted 84 if (deletedPosts.length > 0) { 85 let batchedQueries:BatchItem<"sqlite">[] = []; 86 for (const deleted of deletedPosts) { 87 // Update counts 88 const newCount = db.$count(reposts, eq(reposts.uuid, deleted.id)); 89 batchedQueries.push(db.update(repostCounts) 90 .set({count: newCount}) 91 .where(eq(repostCounts.uuid, deleted.id))); 92 93 // check if the repost data needs to be killed 94 if (!isEmpty(deleted.scheduleGuid)) { 95 // do a search to find if there are any reposts with the same scheduleguid. 96 // if there are none, this schedule should get removed from the repostInfo array 97 const stillHasSchedule = await db.select().from(reposts) 98 .where(and( 99 eq(reposts.scheduleGuid, deleted.scheduleGuid!), 100 eq(reposts.uuid, deleted.id))) 101 .limit(1).all(); 102 103 // if this is empty, then we need to update the repost info. 104 if (isEmpty(stillHasSchedule)) { 105 // get the existing repost info to filter out this old data 106 const existingRepostInfoArr = (await db.select({repostInfo: posts.repostInfo}).from(posts) 107 .where(eq(posts.uuid, deleted.id)).limit(1).all())[0]; 108 // check to see if there is anything in the repostInfo array 109 if (!isEmpty(existingRepostInfoArr)) { 110 // create a new array with the deleted out object 111 const newRepostInfoArr = existingRepostInfoArr.repostInfo!.filter((obj) => { 112 return obj.guid !== deleted.scheduleGuid!; 113 }); 114 // push the new repost info array 115 batchedQueries.push(db.update(posts).set({repostInfo: newRepostInfoArr}).where(eq(posts.uuid, deleted.id))); 116 } 117 } 118 } 119 } 120 if (batchedQueries.length > 0) 121 await db.batch(batchedQueries as BatchQuery); 122 } 123}; 124 125export const bulkUpdatePostedData = async (c: AllContext, records: PostRecordResponse[], allPosted: boolean) => { 126 const db: DrizzleD1Database = c.get("db"); 127 if (!db) { 128 console.error("unable to bulk update posted data, db was null"); 129 return; 130 } 131 let dbOperations: BatchItem<"sqlite">[] = []; 132 133 for (let i = 0; i < records.length; ++i) { 134 const record = records[i]; 135 // skip over invalid records 136 if (record.postID === null) 137 continue; 138 139 let wasPosted = (i == 0 && !allPosted) ? false : true; 140 dbOperations.push(db.update(posts).set( 141 {content: sql`substr(posts.content, 0, ${MAX_POSTED_LENGTH+1})`, posted: wasPosted, 142 uri: record.uri, cid: record.cid, embedContent: []}) 143 .where(eq(posts.uuid, record.postID))); 144 } 145 146 if (dbOperations.length > 0) 147 await db.batch(dbOperations as BatchQuery); 148}; 149 150export const setPostNowOffForPost = async (c: AllContext, id: string) => { 151 const db: DrizzleD1Database = c.get("db"); 152 if (!uuidValid(id)) 153 return false; 154 155 if (!db) { 156 console.warn(`cannot set off post now for post ${id}`); 157 return false; 158 } 159 160 const {success} = await db.update(posts).set({postNow: false}).where(eq(posts.uuid, id)); 161 if (!success) 162 console.error(`Unable to set PostNow to off for post ${id}`); 163}; 164 165export const updatePostForGivenUser = async (c: AllContext, userId: string, id: string, newData: Object) => { 166 const db: DrizzleD1Database = c.get("db"); 167 if (isEmpty(userId) || !uuidValid(id)) 168 return false; 169 170 if (!db) { 171 console.error(`unable to update post ${id} for user ${userId}, db was null`); 172 return false; 173 } 174 175 const {success} = await db.update(posts).set(newData).where( 176 and(eq(posts.uuid, id), eq(posts.userId, userId))); 177 return success; 178}; 179 180export const getAllPostedPostsOfUser = async(c: AllContext, userId: string): Promise<GetAllPostedBatch[]> => { 181 const db: DrizzleD1Database = c.get("db"); 182 if (isEmpty(userId)) 183 return []; 184 185 if (!db) { 186 console.error(`unable to get all posted posts of user ${userId}, db was null`); 187 return []; 188 } 189 190 return await db.select({id: posts.uuid, uri: posts.uri}) 191 .from(posts) 192 .where(and(eq(posts.userId, userId), eq(posts.posted, true))) 193 .all(); 194}; 195 196export const getAllPostedPosts = async (c: AllContext): Promise<GetAllPostedBatch[]> => { 197 const db: DrizzleD1Database = c.get("db"); 198 if (!db) { 199 console.error("unable to get all posted posts, db was null"); 200 return []; 201 } 202 return await db.select({id: posts.uuid, uri: posts.uri}) 203 .from(posts) 204 .where(eq(posts.posted, true)) 205 .all(); 206}; 207 208export const isPostAlreadyPosted = async (c: AllContext, postId: string): Promise<boolean> => { 209 const db: DrizzleD1Database = c.get("db"); 210 if (!uuidValid(postId)) 211 return true; 212 213 if (!db) { 214 console.error(`unable to get database to tell if ${postId} has been posted`); 215 return true; 216 } 217 218 const query = await db.select({posted: posts.posted}).from(posts).where(eq(posts.uuid, postId)).all(); 219 if (isEmpty(query) || query[0].posted === null) { 220 // if the post does not exist, return true anyways 221 return true; 222 } 223 return query[0].posted; 224}; 225 226export const getChildPostsOfThread = async (c: AllContext, rootId: string): Promise<Post[]|null> => { 227 const db: DrizzleD1Database = c.get("db"); 228 if (!uuidValid(rootId)) 229 return null; 230 231 if (!db) { 232 console.error(`unable to get child posts of root ${rootId}, db was null`); 233 return null; 234 } 235 236 const query = await db.select().from(posts) 237 .where(and(isNotNull(posts.parentPost), eq(posts.rootPost, rootId))) 238 .orderBy(asc(posts.threadOrder), desc(posts.createdAt)).all(); 239 if (query.length > 0) { 240 return query.map((child) => new Post(child)); 241 } 242 return null; 243}; 244 245export const getPostThreadCount = async (db: DrizzleD1Database, userId: string, rootId: string): Promise<number> => { 246 if (!uuidValid(rootId)) 247 return 0; 248 249 return await db.$count(posts, and( 250 eq(posts.rootPost, rootId), 251 eq(posts.userId, userId))); 252}; 253 254// deletes multiple posted posts from a database. Posts must be already posted as this does 255// no R2 db queries to clean 256export const deletePosts = async (c: AllContext, postsToDelete: string[]): Promise<number> => { 257 // Don't do anything on empty arrays. 258 if (isEmpty(postsToDelete)) 259 return 0; 260 261 const db: DrizzleD1Database = c.get("db"); 262 if (!db) { 263 console.error(`could not delete posts ${postsToDelete}, db was null`); 264 return 0; 265 } 266 let deleteQueries: BatchItem<"sqlite">[] = []; 267 postsToDelete.forEach((itm) => { 268 // this will wipe out any posts and their children if they are marked for delete 269 deleteQueries.push(db.delete(posts).where( 270 and( 271 or(eq(posts.uuid, itm), eq(posts.rootPost, itm)), 272 eq(posts.posted, true)))); 273 }); 274 275 // Batching this should improve db times 276 if (deleteQueries.length > 0) { 277 const batchResponse = await db.batch(deleteQueries as BatchQuery); 278 // Return the number of items that have been deleted 279 return batchResponse.reduce((val, item) => val + item.success, 0); 280 } 281 return 0; 282}; 283 284export const purgePostedPosts = async (c: AllContext): Promise<number> => { 285 const db: DrizzleD1Database = c.get("db"); 286 if (!db) { 287 console.error("could not purge posted posts, got error"); 288 return 0; 289 } 290 const dbQuery = await db.select({ data: posts.uuid }).from(posts) 291 .leftJoin(repostCounts, eq(posts.uuid, repostCounts.uuid)) 292 .where( 293 and( 294 and( 295 eq(posts.posted, true), lte(posts.updatedAt, sql`datetime('now', '-7 days')`) 296 ), 297 // skip child posts objects, only get us root posts and non-threads 298 and(lte(posts.threadOrder, 0), lte(repostCounts.count, 0)) 299 ) 300 ).all(); 301 const postsToDelete = dbQuery.map((item) => { return item.data }); 302 if (isEmpty(postsToDelete)) 303 return 0; 304 305 return await deletePosts(c, postsToDelete); 306}; 307 308export const getPostByCID = async(db: DrizzleD1Database, userId: string, cid: string): Promise<Post|null> => { 309 const result = await db.select().from(posts) 310 .where(and(eq(posts.userId, userId), eq(posts.cid, cid))) 311 .limit(1).all(); 312 313 if (!isEmpty(result)) 314 return new Post(result[0]); 315 return null; 316}; 317 318export const getRepostCountQuery = (db: DrizzleD1Database, postUUID: string, newValue: number = -1) => { 319 // if we're given any value underneath 0, we need to recount for the entire post 320 const newCount = (newValue < 0) ? db.$count(reposts, eq(reposts.uuid, postUUID)) : newValue; 321 return db.insert(repostCounts) 322 .values({uuid: postUUID, count: newCount}) 323 .onConflictDoUpdate({target: repostCounts.uuid, set: {count: newCount}}); 324};