Schedule posts to Bluesky with Cloudflare workers.
skyscheduler.work
cf
tool
bsky-tool
cloudflare
bluesky
schedule
bsky
service
social-media
cloudflare-workers
1import { addHours, isAfter, isEqual } from "date-fns";
2import { and, asc, desc, eq, getTableColumns, gt, gte, ne, sql } from "drizzle-orm";
3import { BatchItem } from "drizzle-orm/batch";
4import { DrizzleD1Database } from "drizzle-orm/d1";
5import has from "just-has";
6import isEmpty from "just-is-empty";
7import { v4 as uuidv4, validate as uuidValid } from 'uuid';
8import { Post } from "../classes/post";
9import { RepostInfo } from "../classes/repost";
10import { mediaFiles, posts, repostCounts, reposts } from "../db/app.schema";
11import { accounts, users } from "../db/auth.schema";
12import { MAX_POSTS_PER_THREAD, MAX_REPOST_POSTS, MAX_REPOST_RULES_PER_POST } from "../limits";
13import { APP_NAME } from "../siteinfo";
14import {
15 AccountStatus, AllContext, BatchQuery,
16 CreateObjectResponse, CreatePostQueryResponse,
17 DeleteResponse, EmbedDataType, PostLabel
18} from "../types";
19import { PostSchema } from "../validation/postSchema";
20import { RepostSchema } from "../validation/repostSchema";
21import {
22 getChildPostsOfThread, getPostByCID,
23 getPostThreadCount, getRepostCountQuery, updatePostForGivenUser
24} from "./db/data";
25import {
26 getViolationsForUser, removeViolation,
27 removeViolations, userHasViolationsDB
28} from "./db/violations";
29import { floorGivenTime } from "./helpers";
30import { deleteEmbedsFromR2 } from "./r2Query";
31
32export const getPostsForUser = async (c: AllContext): Promise<Post[]|null> => {
33 try {
34 const userId = c.get("userId");
35 const db: DrizzleD1Database = c.get("db");
36 if (userId && db) {
37 const results = await db.select({
38 ...getTableColumns(posts),
39 repostCount: repostCounts.count
40 })
41 .from(posts).where(eq(posts.userId, userId))
42 .leftJoin(repostCounts, eq(posts.uuid, repostCounts.uuid))
43 .orderBy(desc(posts.scheduledDate), asc(posts.threadOrder), desc(posts.createdAt)).all();
44
45 if (isEmpty(results))
46 return null;
47
48 return results.map((itm) => new Post(itm));
49 }
50 } catch(err) {
51 console.error(`Failed to get posts for user, session could not be fetched ${err}`);
52 }
53 return null;
54};
55
56export const updateUserData = async (c: AllContext, newData: any): Promise<boolean> => {
57 const userId = c.get("userId");
58 const db: DrizzleD1Database = c.get("db");
59 try {
60 if (!db) {
61 console.error("Unable to update user data, no database object");
62 return false;
63 }
64 if (userId) {
65 let queriesToExecute:BatchItem<"sqlite">[] = [];
66
67 if (has(newData, "password")) {
68 // cache out the new hash
69 const newPassword = newData.password;
70 // remove it from the original object
71 delete newData.password;
72
73 // add the query to the db batch object
74 queriesToExecute.push(db.update(accounts)
75 .set({password: newPassword})
76 .where(eq(accounts.userId, userId)));
77 }
78
79 // If we have new data about the username, pds, or password
80 if (has(newData, "bskyAppPass") || has(newData, "username") || has(newData, "pds")) {
81 // check if the user has violations
82 if (await userHasViolationsDB(db, userId)) {
83 // they do, so clear them out
84 await removeViolations(c, userId, [AccountStatus.InvalidAccount, AccountStatus.Deactivated]);
85 }
86 }
87
88 if (!isEmpty(newData)) {
89 queriesToExecute.push(db.update(users).set(newData)
90 .where(eq(users.id, userId)));
91 }
92
93 if (queriesToExecute.length > 0)
94 await db.batch(queriesToExecute as BatchQuery);
95 return true;
96 }
97 } catch(err) {
98 console.error(`Failed to update new user data for user ${userId}`);
99 }
100 return false;
101};
102
103export const deletePost = async (c: AllContext, id: string): Promise<DeleteResponse> => {
104 const userId = c.get("userId");
105 const returnObj: DeleteResponse = {success: false, isRepost: false};
106 if (!userId) {
107 return returnObj;
108 }
109
110 const db: DrizzleD1Database = c.get("db");
111 if (!db) {
112 console.error(`unable to delete post ${id}, db was null`);
113 return returnObj;
114 }
115
116 const postObj = await getPostById(c, id);
117 if (postObj !== null) {
118 let queriesToExecute: BatchItem<"sqlite">[] = [];
119 // If the post has not been posted, that means we still have files for it, so
120 // delete the files from R2
121 if (!postObj.posted) {
122 await deleteEmbedsFromR2(c, postObj.embeds);
123 if (await userHasViolationsDB(db, userId)) {
124 // Remove the media too big violation if it's been given
125 await removeViolation(c, userId, AccountStatus.MediaTooBig);
126 }
127 }
128 returnObj.isRepost = postObj.isRepost || false;
129
130 // If the parent post is not null, then attempt to find and update the post chain
131 const parentPost = postObj.parentPost;
132 if (parentPost !== undefined) {
133 // set anyone who had this as their parent to this post chain
134 queriesToExecute.push(db.update(posts).set({parentPost: parentPost, threadOrder: postObj.threadOrder})
135 .where(and(eq(posts.parentPost, postObj.postid), eq(posts.rootPost, postObj.rootPost!))));
136
137 // Update the post order past here
138 queriesToExecute.push(db.update(posts).set({threadOrder: sql`threadOrder - 1`})
139 .where(
140 and(and(eq(posts.rootPost, postObj.rootPost!), ne(posts.threadOrder, -1)), gt(posts.threadOrder, postObj.threadOrder)
141 )));
142 }
143
144 // We'll need to delete all of the child embeds then, a costly, annoying experience.
145 if (postObj.isThreadRoot) {
146 const childPosts = await getChildPostsOfThread(c, postObj.postid);
147 if (childPosts !== null) {
148 for (const childPost of childPosts) {
149 c.executionCtx.waitUntil(deleteEmbedsFromR2(c, childPost.embeds));
150 queriesToExecute.push(db.delete(posts).where(eq(posts.uuid, childPost.postid)));
151 }
152 } else {
153 console.warn(`could not get child posts of thread ${postObj.postid} during delete`);
154 }
155 } else if (postObj.isChildPost) {
156 // this is not a thread root, so we should figure out how many children are left.
157 const childPostCount = (await getPostThreadCount(db, postObj.user, postObj.rootPost!)) - 1;
158 if (childPostCount <= 0) {
159 queriesToExecute.push(db.update(posts).set({threadOrder: -1}).where(eq(posts.uuid, postObj.rootPost!)));
160 }
161 }
162
163 // delete post
164 queriesToExecute.push(db.delete(posts).where(eq(posts.uuid, id)));
165 await c.executionCtx.waitUntil(db.batch(queriesToExecute as BatchQuery));
166 returnObj.success = true;
167 returnObj.needsRefresh = postObj.isThreadRoot;
168 }
169 return returnObj;
170};
171
172export const createPost = async (c: AllContext, body: any): Promise<CreatePostQueryResponse> => {
173 const db: DrizzleD1Database = c.get("db");
174 const userId = c.get("userId");
175 if (!userId)
176 return { ok: false, msg: "Your user session has expired, please login again"};
177
178 if (!db) {
179 console.error("unable to create post, db became null");
180 return { ok: false, msg: "An application error has occurred please refresh" };
181 }
182
183 const validation = PostSchema.safeParse(body);
184 if (!validation.success) {
185 return { ok: false, msg: validation.error.toString() };
186 }
187
188 const { content, scheduledDate, embeds, label, makePostNow, repostData, rootPost, parentPost } = validation.data;
189 const scheduleDate = floorGivenTime((makePostNow) ? new Date() : new Date(scheduledDate));
190
191 // Ensure scheduled date is in the future
192 //
193 // Do not do this check if you are doing a threaded post
194 // or you have marked that you are posting right now.
195 if (!isAfter(scheduleDate, new Date()) &&
196 (!makePostNow && (isEmpty(rootPost) && isEmpty(parentPost)))) {
197 return { ok: false, msg: "Scheduled date must be in the future" };
198 }
199
200 // Check if account is in violation
201 const violationData = await getViolationsForUser(db, userId);
202 if (violationData != null) {
203 if (violationData.tosViolation) {
204 return {ok: false, msg: `This account is unable to use ${APP_NAME} services at this time`};
205 } else if (violationData.userPassInvalid) {
206 return {ok: false, msg: "The BSky account credentials is invalid, please update these in the settings"};
207 }
208 }
209
210 // Check to see if this post already exists for thread
211 let rootPostID:string|undefined = undefined;
212 let parentPostID:string|undefined = undefined;
213 let rootPostData: Post|null = null;
214 let parentPostOrder: number = 0;
215 if (uuidValid(rootPost)) {
216 // returns null if the post doesn't appear on this account
217 rootPostData = await getPostById(c, rootPost!);
218 if (rootPostData !== null) {
219 if (rootPostData.posted) {
220 return { ok: false, msg: "You cannot make threads off already posted posts"};
221 }
222 if (rootPostData.isChildPost) {
223 return { ok: false, msg: "Subthreads of threads are not allowed." };
224 }
225 if (rootPostData.isRepost) {
226 return {ok: false, msg: "Threads cannot be made of repost actions"};
227 }
228 rootPostID = rootPostData.rootPost || rootPostData.postid;
229 // If this isn't a direct reply, check directly underneath it
230 if (rootPost !== parentPost) {
231 if (uuidValid(parentPost)) {
232 const parentPostData = await getPostById(c, parentPost!);
233 if (parentPostData !== null) {
234 parentPostID = parentPost!;
235 parentPostOrder = parentPostData.threadOrder + 1;
236 } else {
237 return { ok: false, msg: "The given parent post cannot be found on your account"};
238 }
239 } else {
240 return { ok: false, msg: "The given parent post is invalid"};
241 }
242 } else {
243 parentPostID = rootPostData.postid;
244 parentPostOrder = 1; // Root will always be 0, so if this is root, go 1 up.
245 }
246 } else {
247 return { ok: false, msg: "The given root post cannot be found on your account"};
248 }
249 }
250
251 const isThreadedPost: boolean = (rootPostID !== undefined && parentPostID !== undefined);
252 if (isThreadedPost) {
253 const threadCount: number = await getPostThreadCount(db, userId, rootPostID!);
254 if (threadCount >= MAX_POSTS_PER_THREAD) {
255 return { ok: false, msg: `this thread has hit the limit of ${MAX_POSTS_PER_THREAD} posts per thread`};
256 }
257 }
258
259 // Create repost metadata
260 const scheduleGUID = (!isThreadedPost) ? uuidv4() : undefined;
261 const repostInfo = (!isThreadedPost) ?
262 new RepostInfo(scheduleGUID!, scheduleDate, false, repostData) : undefined;
263
264 // Create the posts
265 const postUUID = uuidv4();
266 let dbOperations: BatchItem<"sqlite">[] = [];
267
268 // if we're threaded, insert our post before the given parent
269 if (isThreadedPost) {
270 // Update the parent to our new post
271 dbOperations.push(db.update(posts).set({parentPost: postUUID })
272 .where(and(eq(posts.parentPost, parentPostID!), eq(posts.rootPost, rootPostID!))));
273
274 // update all posts past this one to also update their order (we will take their id)
275 dbOperations.push(db.update(posts).set({threadOrder: sql`threadOrder + 1`})
276 .where(
277 and(and(eq(posts.rootPost, rootPostID!), ne(posts.threadOrder, -1)), gte(posts.threadOrder, parentPostOrder)
278 )));
279
280 // Update the root post so that it has the correct flags set on it as well.
281 if (rootPostData!.isThreadRoot == false) {
282 dbOperations.push(db.update(posts).set({threadOrder: 0, rootPost: rootPostData!.postid})
283 .where(eq(posts.uuid, rootPostData!.postid)));
284 }
285 } else {
286 rootPostID = postUUID;
287 }
288
289 // Add the post to the DB
290 dbOperations.push(db.insert(posts).values({
291 content,
292 uuid: postUUID,
293 postNow: makePostNow,
294 scheduledDate: (!isThreadedPost) ? scheduleDate : new Date(rootPostData!.scheduledDate!),
295 rootPost: rootPostID,
296 parentPost: parentPostID,
297 repostInfo: (!isThreadedPost) ? [repostInfo!] : [],
298 threadOrder: (!isThreadedPost) ? undefined : parentPostOrder,
299 embedContent: embeds,
300 contentLabel: label || PostLabel.None,
301 userId: userId
302 }));
303
304 if (!isEmpty(embeds)) {
305 // Loop through all data within an embed blob so we can mark it as posted
306 for (const embed of embeds!) {
307 if (embed.type === EmbedDataType.Image || embed.type === EmbedDataType.Video) {
308 dbOperations.push(
309 db.update(mediaFiles).set({hasPost: true}).where(eq(mediaFiles.fileName, embed.content)));
310 }
311 }
312 }
313
314 // Add repost data to the table
315 if (repostData && !isThreadedPost) {
316 for (var i = 1; i <= repostData.times; ++i) {
317 dbOperations.push(db.insert(reposts).values({
318 uuid: postUUID,
319 scheduleGuid: scheduleGUID,
320 scheduledDate: addHours(scheduleDate, i*repostData.hours)
321 }));
322 }
323 // Push the repost counts in
324 dbOperations.push(db.insert(repostCounts)
325 .values({uuid: postUUID, count: repostData.times}));
326 }
327
328 // Batch the query
329 const batchResponse = await db.batch(dbOperations as BatchQuery);
330 const success = batchResponse.every((el) => el.success);
331 return { ok: success, postNow: makePostNow, postId: postUUID, msg: success ? "success" : "fail" };
332};
333
334export const createRepost = async (c: AllContext, body: any): Promise<CreateObjectResponse> => {
335 const db: DrizzleD1Database = c.get("db");
336
337 const userId = c.get("userId");
338 if (!userId)
339 return { ok: false, msg: "Your user session has expired, please login again"};
340
341 if (!db) {
342 console.error("unable to create repost db became null");
343 return {ok: false, msg: "Invalid server operation occurred, please refresh"};
344 }
345
346 const validation = RepostSchema.safeParse(body);
347 if (!validation.success) {
348 return { ok: false, msg: validation.error.toString() };
349 }
350 const { url, uri, cid, content, scheduledDate, repostData } = validation.data;
351 const scheduleDate = floorGivenTime(new Date(scheduledDate));
352 const timeNow = new Date();
353
354 // Ensure scheduled date is in the future
355 if (!isAfter(scheduleDate, timeNow)) {
356 return { ok: false, msg: "Scheduled date must be in the future" };
357 }
358
359 // Check if account is in violation
360 const violationData = await getViolationsForUser(db, userId);
361 if (violationData != null) {
362 if (violationData.tosViolation) {
363 return {ok: false, msg: `This account is unable to use ${APP_NAME} services at this time`};
364 } else if (violationData.userPassInvalid) {
365 return {ok: false, msg: "The BSky account credentials is invalid, please update these in the settings"};
366 }
367 }
368 let postUUID;
369 let dbOperations: BatchItem<"sqlite">[] = [];
370 const scheduleGUID = uuidv4();
371 const repostInfo: RepostInfo = new RepostInfo(scheduleGUID, scheduleDate, true, repostData);
372
373 // Check to see if the post already exists
374 // (check also against the userId here as well to avoid cross account data collisions)
375 const existingPost = await getPostByCID(db, userId, cid);
376 if (existingPost !== null) {
377 postUUID = existingPost.postid;
378 const existingPostDate = existingPost.scheduledDate!;
379 // Ensure the date asked for is after what the post's schedule date is
380 if (!isAfter(scheduleDate, existingPostDate) && !isEqual(scheduledDate, existingPostDate)) {
381 return { ok: false, msg: "Scheduled date must be after the initial post's date" };
382 }
383 // Make sure this isn't a thread post.
384 // We could probably work around this but I don't think it's worth the effort.
385 if (existingPost.isChildPost) {
386 return {ok: false, msg: "Repost posts cannot be created from child thread posts"};
387 }
388
389 // Add repost info object to existing array
390 let newRepostInfo: RepostInfo[] = isEmpty(existingPost.repostInfo) ? [] : existingPost.repostInfo!;
391 if (newRepostInfo.length >= MAX_REPOST_RULES_PER_POST) {
392 return {ok: false, msg: `Num of reposts rules for this post has exceeded the limit of ${MAX_REPOST_RULES_PER_POST} rules`};
393 }
394
395 const repostInfoTimeStr = repostInfo.time.toISOString();
396 // Check to see if we have an exact repost match.
397 // If we do, do not update the repostInfo, as repost table will drop the duplicates for us anyways.
398 const isNewInfoNotDuped = (el: any) => {
399 if (el.time == repostInfoTimeStr) {
400 if (el.count == repostInfo.count) {
401 return el.hours != repostInfo.hours;
402 }
403 }
404 return true;
405 };
406 if (newRepostInfo.every(isNewInfoNotDuped)) {
407 newRepostInfo.push(repostInfo);
408
409 // push record update to add to json array
410 dbOperations.push(db.update(posts).set({repostInfo: newRepostInfo}).where(and(
411 eq(posts.userId, userId), eq(posts.cid, cid))));
412 }
413 } else {
414 // Limit of post reposts on the user's account.
415 const accountCurrentReposts = await db.$count(posts, and(eq(posts.userId, userId), eq(posts.isRepost, true)));
416 if (MAX_REPOST_POSTS > 0 && accountCurrentReposts >= MAX_REPOST_POSTS) {
417 return {ok: false, msg:
418 `You've cannot create any more repost posts at this time. Using: (${accountCurrentReposts}/${MAX_REPOST_POSTS}) repost posts`};
419 }
420
421 // Create the post base for this repost
422 postUUID = uuidv4();
423 dbOperations.push(db.insert(posts).values({
424 content: !isEmpty(content) ? content! : `Repost of ${url}`,
425 uuid: postUUID,
426 cid: cid,
427 uri: uri,
428 posted: true,
429 isRepost: true,
430 repostInfo: [repostInfo],
431 scheduledDate: scheduleDate,
432 userId: userId
433 }));
434 }
435
436 // Push initial repost
437 let totalRepostCount = 1;
438 dbOperations.push(db.insert(reposts).values({
439 uuid: postUUID,
440 scheduleGuid: scheduleGUID,
441 scheduledDate: scheduleDate
442 }).onConflictDoNothing());
443
444 // Push other repost times if we have them
445 if (repostData) {
446 for (var i = 1; i <= repostData.times; ++i) {
447 dbOperations.push(db.insert(reposts).values({
448 uuid: postUUID,
449 scheduleGuid: scheduleGUID,
450 scheduledDate: addHours(scheduleDate, i*repostData.hours)
451 }).onConflictDoNothing());
452 }
453 totalRepostCount += repostData.times;
454 }
455 // Update repost counts
456 if (existingPost !== null) {
457 // update existing content posts (but only for reposts, no one else)
458 if (existingPost.isRepost && !isEmpty(content)) {
459 dbOperations.push(db.update(posts).set({content: content!}).where(eq(posts.uuid, postUUID)));
460 }
461
462 // Because there could be conflicts that drop, run a count on the entire list and use the value from that
463 // we also don't know if the repost count table has repost values for this item, so we should
464 // attempt to always insert and update if it already exists
465 totalRepostCount = -1;
466 }
467
468 // pushing any value under zero causes a full recount
469 dbOperations.push(getRepostCountQuery(db, postUUID, totalRepostCount));
470
471 const batchResponse = await db.batch(dbOperations as BatchQuery);
472 const success = batchResponse.every((el) => el.success);
473 return { ok: success, msg: success ? "success" : "fail", postId: postUUID };
474};
475
476export const updatePostForUser = async (c: AllContext, id: string, newData: Object): Promise<boolean> => {
477 const userId = c.get("userId");
478 return await updatePostForGivenUser(c, userId, id, newData);
479};
480
481export const getPostById = async(c: AllContext, id: string): Promise<Post|null> => {
482 const userId = c.get("userId");
483 if (!userId || !uuidValid(id))
484 return null;
485
486 const db: DrizzleD1Database = c.get("db");
487 if (!db) {
488 console.error(`unable to get post ${id}, db was null`);
489 return null;
490 }
491
492 const result = await db.select().from(posts)
493 .where(and(eq(posts.uuid, id), eq(posts.userId, userId)))
494 .limit(1).all();
495
496 if (!isEmpty(result))
497 return new Post(result[0]);
498 return null;
499};
500
501// used for post editing, acts very similar to getPostsForUser
502export const getPostByIdWithReposts = async(c: AllContext, id: string): Promise<Post|null> => {
503 const userId = c.get("userId");
504 if (!userId || !uuidValid(id))
505 return null;
506
507 const db: DrizzleD1Database = c.get("db");
508 if (!db) {
509 console.error(`unable to get post ${id} with reposts, db was null`);
510 return null;
511 }
512
513 const result = await db.select({
514 ...getTableColumns(posts),
515 repostCount: repostCounts.count,
516 }).from(posts)
517 .where(and(eq(posts.uuid, id), eq(posts.userId, userId)))
518 .leftJoin(repostCounts, eq(posts.uuid, repostCounts.uuid))
519 .limit(1).all();
520
521 if (!isEmpty(result))
522 return new Post(result[0]);
523 return null;
524};
525
526export const deleteRepostRule = async(c: AllContext, id: string, scheduleId: string) => {
527 const db: DrizzleD1Database = c.get("db");
528 if (!db) {
529 console.error(`unable to delete schedule id ${scheduleId} from post ${id}, db was null`);
530 return false;
531 }
532 if (!uuidValid(id) || !uuidValid(scheduleId)) {
533 return false;
534 }
535
536 // Get the post to make sure it's valid and update post json
537 const currentPost = await getPostByIdWithReposts(c, id);
538 if (currentPost != null && currentPost.repostInfo !== undefined) {
539 // remove the schedule from the current json object set
540 let newRepostInfo: RepostInfo[] = currentPost.repostInfo!.filter((itm) => {
541 return itm.guid !== scheduleId;
542 });
543
544 let queriesToExecute: BatchItem<"sqlite">[] = [];
545 // modify the current repost info
546 queriesToExecute.push(db.update(posts).set({repostInfo: newRepostInfo}).where(and(
547 eq(posts.userId, currentPost.user), eq(posts.uuid, currentPost.postid))));
548
549 // Delete batch schedule items
550 // we don't bundle this one because we want to get a count to make the operation below it, better
551 const deletedItems = await db.delete(reposts).where(eq(reposts.scheduleGuid, scheduleId)).returning({date: reposts.scheduledDate});
552
553 // did we delete anything at all?
554 if (deletedItems.length <= 0) {
555 // we did not, that's really strange.
556 console.warn(`When trying to delete reposts for ${currentPost.postid}, schedule id ${scheduleId} had empty items`);
557 return false;
558 }
559
560 // Force update the repost count :)
561 queriesToExecute.push(getRepostCountQuery(db, id, currentPost.repostCount! - deletedItems.length));
562
563 // Batch push up everything
564 const batchResponse = await db.batch(queriesToExecute as BatchQuery);
565 return batchResponse.every((el) => el.success);
566 }
567 return false;
568};