Schedule posts to Bluesky with Cloudflare workers.
skyscheduler.work
cf
tool
bsky-tool
cloudflare
bluesky
schedule
bsky
service
social-media
cloudflare-workers
1import {
2 and, asc, desc, eq, inArray, isNotNull,
3 lte, ne, notInArray, or, sql
4} from "drizzle-orm";
5import { BatchItem } from "drizzle-orm/batch";
6import { DrizzleD1Database } from "drizzle-orm/d1";
7import isEmpty from "just-is-empty";
8import { validate as uuidValid } from 'uuid';
9import { Post } from "../../classes/post";
10import { Repost } from "../../classes/repost";
11import { posts, repostCounts, reposts } from "../../db/app.schema";
12import { violations } from "../../db/enforcement.schema";
13import { MAX_POSTED_LENGTH } from "../../limits";
14import {
15 AllContext,
16 BatchQuery,
17 GetAllPostedBatch,
18 PostRecordResponse
19} from "../../types";
20import { floorCurrentTime } from "../helpers";
21
22export const getAllPostsForCurrentTime = async (c: AllContext, removeThreads: boolean = false): Promise<Post[]> => {
23 // Get all scheduled posts for current time
24 const db: DrizzleD1Database = c.get("db");
25 if (!db) {
26 console.error("Could not get all posts for current time, db was null");
27 return [];
28 }
29 const currentTime: Date = floorCurrentTime();
30
31 const violationUsers = db.select({violators: violations.userId}).from(violations);
32 const postsToMake = db.$with('scheduledPosts').as(db.select().from(posts)
33 .where(
34 and(
35 and(
36 and(
37 eq(posts.posted, false),
38 ne(posts.postNow, true) // Ignore any posts that are marked for post now
39 ),
40 lte(posts.scheduledDate, currentTime)
41 ),
42 // ignore threads, we'll create this one later.
43 removeThreads ? eq(posts.threadOrder, -1) : lte(posts.threadOrder, 0)
44 )
45 ));
46 const results = await db.with(postsToMake).select().from(postsToMake)
47 .where(notInArray(postsToMake.userId, violationUsers)).orderBy(asc(postsToMake.createdAt)).all();
48 return results.map((item) => new Post(item));
49};
50
51export const getAllRepostsForGivenTime = async (c: AllContext, givenDate: Date): Promise<Repost[]> => {
52 // Get all scheduled posts for the given time
53 const db: DrizzleD1Database = c.get("db");
54 if (!db) {
55 console.error("could not get all reposts for given timeframe, db was null");
56 return [];
57 }
58 const query = db.select({uuid: reposts.uuid}).from(reposts)
59 .where(lte(reposts.scheduledDate, givenDate));
60 const violationsQuery = db.select({data: violations.userId}).from(violations);
61 const results = await db.select({uuid: posts.uuid, uri: posts.uri, cid: posts.cid, userId: posts.userId })
62 .from(posts)
63 .where(and(inArray(posts.uuid, query), notInArray(posts.userId, violationsQuery)))
64 .all();
65
66 return results.map((item) => new Repost(item));
67};
68
69export const getAllRepostsForCurrentTime = async (c: AllContext): Promise<Repost[]> => {
70 return await getAllRepostsForGivenTime(c, floorCurrentTime());
71};
72
73export const deleteAllRepostsBeforeCurrentTime = async (c: AllContext) => {
74 const db: DrizzleD1Database = c.get("db");
75 if (!db) {
76 console.error("unable to delete all reposts before current time, db was null");
77 return;
78 }
79 const currentTime = floorCurrentTime();
80 const deletedPosts = await db.delete(reposts).where(lte(reposts.scheduledDate, currentTime))
81 .returning({id: reposts.uuid, scheduleGuid: reposts.scheduleGuid});
82
83 // This is really stupid and I hate it, but someone has to update repost counts once posted
84 if (deletedPosts.length > 0) {
85 let batchedQueries:BatchItem<"sqlite">[] = [];
86 for (const deleted of deletedPosts) {
87 // Update counts
88 const newCount = db.$count(reposts, eq(reposts.uuid, deleted.id));
89 batchedQueries.push(db.update(repostCounts)
90 .set({count: newCount})
91 .where(eq(repostCounts.uuid, deleted.id)));
92
93 // check if the repost data needs to be killed
94 if (!isEmpty(deleted.scheduleGuid)) {
95 // do a search to find if there are any reposts with the same scheduleguid.
96 // if there are none, this schedule should get removed from the repostInfo array
97 const stillHasSchedule = await db.select().from(reposts)
98 .where(and(
99 eq(reposts.scheduleGuid, deleted.scheduleGuid!),
100 eq(reposts.uuid, deleted.id)))
101 .limit(1).all();
102
103 // if this is empty, then we need to update the repost info.
104 if (isEmpty(stillHasSchedule)) {
105 // get the existing repost info to filter out this old data
106 const existingRepostInfoArr = (await db.select({repostInfo: posts.repostInfo}).from(posts)
107 .where(eq(posts.uuid, deleted.id)).limit(1).all())[0];
108 // check to see if there is anything in the repostInfo array
109 if (!isEmpty(existingRepostInfoArr)) {
110 // create a new array with the deleted out object
111 const newRepostInfoArr = existingRepostInfoArr.repostInfo!.filter((obj) => {
112 return obj.guid !== deleted.scheduleGuid!;
113 });
114 // push the new repost info array
115 batchedQueries.push(db.update(posts).set({repostInfo: newRepostInfoArr}).where(eq(posts.uuid, deleted.id)));
116 }
117 }
118 }
119 }
120 if (batchedQueries.length > 0)
121 await db.batch(batchedQueries as BatchQuery);
122 }
123};
124
125export const bulkUpdatePostedData = async (c: AllContext, records: PostRecordResponse[], allPosted: boolean) => {
126 const db: DrizzleD1Database = c.get("db");
127 if (!db) {
128 console.error("unable to bulk update posted data, db was null");
129 return;
130 }
131 let dbOperations: BatchItem<"sqlite">[] = [];
132
133 for (let i = 0; i < records.length; ++i) {
134 const record = records[i];
135 // skip over invalid records
136 if (record.postID === null)
137 continue;
138
139 let wasPosted = (i == 0 && !allPosted) ? false : true;
140 dbOperations.push(db.update(posts).set(
141 {content: sql`substr(posts.content, 0, ${MAX_POSTED_LENGTH+1})`, posted: wasPosted,
142 uri: record.uri, cid: record.cid, embedContent: []})
143 .where(eq(posts.uuid, record.postID)));
144 }
145
146 if (dbOperations.length > 0)
147 await db.batch(dbOperations as BatchQuery);
148};
149
150export const setPostNowOffForPost = async (c: AllContext, id: string) => {
151 const db: DrizzleD1Database = c.get("db");
152 if (!uuidValid(id))
153 return false;
154
155 if (!db) {
156 console.warn(`cannot set off post now for post ${id}`);
157 return false;
158 }
159
160 const {success} = await db.update(posts).set({postNow: false}).where(eq(posts.uuid, id));
161 if (!success)
162 console.error(`Unable to set PostNow to off for post ${id}`);
163};
164
165export const updatePostForGivenUser = async (c: AllContext, userId: string, id: string, newData: Object) => {
166 const db: DrizzleD1Database = c.get("db");
167 if (isEmpty(userId) || !uuidValid(id))
168 return false;
169
170 if (!db) {
171 console.error(`unable to update post ${id} for user ${userId}, db was null`);
172 return false;
173 }
174
175 const {success} = await db.update(posts).set(newData).where(
176 and(eq(posts.uuid, id), eq(posts.userId, userId)));
177 return success;
178};
179
180export const getAllPostedPostsOfUser = async(c: AllContext, userId: string): Promise<GetAllPostedBatch[]> => {
181 const db: DrizzleD1Database = c.get("db");
182 if (isEmpty(userId))
183 return [];
184
185 if (!db) {
186 console.error(`unable to get all posted posts of user ${userId}, db was null`);
187 return [];
188 }
189
190 return await db.select({id: posts.uuid, uri: posts.uri})
191 .from(posts)
192 .where(and(eq(posts.userId, userId), eq(posts.posted, true)))
193 .all();
194};
195
196export const getAllPostedPosts = async (c: AllContext): Promise<GetAllPostedBatch[]> => {
197 const db: DrizzleD1Database = c.get("db");
198 if (!db) {
199 console.error("unable to get all posted posts, db was null");
200 return [];
201 }
202 return await db.select({id: posts.uuid, uri: posts.uri})
203 .from(posts)
204 .where(eq(posts.posted, true))
205 .all();
206};
207
208export const isPostAlreadyPosted = async (c: AllContext, postId: string): Promise<boolean> => {
209 const db: DrizzleD1Database = c.get("db");
210 if (!uuidValid(postId))
211 return true;
212
213 if (!db) {
214 console.error(`unable to get database to tell if ${postId} has been posted`);
215 return true;
216 }
217
218 const query = await db.select({posted: posts.posted}).from(posts).where(eq(posts.uuid, postId)).all();
219 if (isEmpty(query) || query[0].posted === null) {
220 // if the post does not exist, return true anyways
221 return true;
222 }
223 return query[0].posted;
224};
225
226export const getChildPostsOfThread = async (c: AllContext, rootId: string): Promise<Post[]|null> => {
227 const db: DrizzleD1Database = c.get("db");
228 if (!uuidValid(rootId))
229 return null;
230
231 if (!db) {
232 console.error(`unable to get child posts of root ${rootId}, db was null`);
233 return null;
234 }
235
236 const query = await db.select().from(posts)
237 .where(and(isNotNull(posts.parentPost), eq(posts.rootPost, rootId)))
238 .orderBy(asc(posts.threadOrder), desc(posts.createdAt)).all();
239 if (query.length > 0) {
240 return query.map((child) => new Post(child));
241 }
242 return null;
243};
244
245export const getPostThreadCount = async (db: DrizzleD1Database, userId: string, rootId: string): Promise<number> => {
246 if (!uuidValid(rootId))
247 return 0;
248
249 return await db.$count(posts, and(
250 eq(posts.rootPost, rootId),
251 eq(posts.userId, userId)));
252};
253
254// deletes multiple posted posts from a database. Posts must be already posted as this does
255// no R2 db queries to clean
256export const deletePosts = async (c: AllContext, postsToDelete: string[]): Promise<number> => {
257 // Don't do anything on empty arrays.
258 if (isEmpty(postsToDelete))
259 return 0;
260
261 const db: DrizzleD1Database = c.get("db");
262 if (!db) {
263 console.error(`could not delete posts ${postsToDelete}, db was null`);
264 return 0;
265 }
266 let deleteQueries: BatchItem<"sqlite">[] = [];
267 postsToDelete.forEach((itm) => {
268 // this will wipe out any posts and their children if they are marked for delete
269 deleteQueries.push(db.delete(posts).where(
270 and(
271 or(eq(posts.uuid, itm), eq(posts.rootPost, itm)),
272 eq(posts.posted, true))));
273 });
274
275 // Batching this should improve db times
276 if (deleteQueries.length > 0) {
277 const batchResponse = await db.batch(deleteQueries as BatchQuery);
278 // Return the number of items that have been deleted
279 return batchResponse.reduce((val, item) => val + item.success, 0);
280 }
281 return 0;
282};
283
284export const purgePostedPosts = async (c: AllContext): Promise<number> => {
285 const db: DrizzleD1Database = c.get("db");
286 if (!db) {
287 console.error("could not purge posted posts, got error");
288 return 0;
289 }
290 const dbQuery = await db.select({ data: posts.uuid }).from(posts)
291 .leftJoin(repostCounts, eq(posts.uuid, repostCounts.uuid))
292 .where(
293 and(
294 and(
295 eq(posts.posted, true), lte(posts.updatedAt, sql`datetime('now', '-7 days')`)
296 ),
297 // skip child posts objects, only get us root posts and non-threads
298 and(lte(posts.threadOrder, 0), lte(repostCounts.count, 0))
299 )
300 ).all();
301 const postsToDelete = dbQuery.map((item) => { return item.data });
302 if (isEmpty(postsToDelete))
303 return 0;
304
305 return await deletePosts(c, postsToDelete);
306};
307
308export const getPostByCID = async(db: DrizzleD1Database, userId: string, cid: string): Promise<Post|null> => {
309 const result = await db.select().from(posts)
310 .where(and(eq(posts.userId, userId), eq(posts.cid, cid)))
311 .limit(1).all();
312
313 if (!isEmpty(result))
314 return new Post(result[0]);
315 return null;
316};
317
318export const getRepostCountQuery = (db: DrizzleD1Database, postUUID: string, newValue: number = -1) => {
319 // if we're given any value underneath 0, we need to recount for the entire post
320 const newCount = (newValue < 0) ? db.$count(reposts, eq(reposts.uuid, postUUID)) : newValue;
321 return db.insert(repostCounts)
322 .values({uuid: postUUID, count: newCount})
323 .onConflictDoUpdate({target: repostCounts.uuid, set: {count: newCount}});
324};