Schedule posts to Bluesky with Cloudflare workers.
skyscheduler.work
cf
tool
bsky-tool
cloudflare
bluesky
schedule
bsky
service
social-media
cloudflare-workers
1import isEmpty from 'just-is-empty';
2import random from 'just-random';
3import get from 'just-safe-get';
4import { Post } from "../../classes/post";
5import { Repost } from "../../classes/repost";
6import { AllContext, Bindings, QueueTaskData, TaskType } from "../../types";
7
8// picks a random queue to publish data to
9const getRandomQueue = (env: Bindings, listName: string): Queue|null => {
10 const queueListNames: string[] = get(env.QUEUE_SETTINGS, listName, []);
11 if (isEmpty(queueListNames))
12 return null;
13
14 const queueName: string = random(queueListNames) || "";
15 if (queueListNames.length > 1)
16 console.log(`Picked ${queueName} from ${listName}`);
17 return get(env, queueName, null);
18};
19
20async function pushToQueue(queueConsumer: Queue|null, data: Post|Repost|null, taskType: TaskType, delay: number = -1) {
21 if (queueConsumer !== null) {
22 const options: QueueSendOptions = {
23 contentType: 'v8'
24 };
25 if (delay > -1)
26 options.delaySeconds = delay;
27
28 await queueConsumer.send({type: taskType, data: data} as QueueTaskData, options);
29 } else {
30 console.warn(`could not push data to empty queue, was type ${taskType}`);
31 }
32};
33
34const hasPostQueue = (env: Bindings) => !isEmpty(env.QUEUE_SETTINGS.post_queues) && env.IN_DEV == false;
35const hasRepostQueue = (env: Bindings) => !isEmpty(env.QUEUE_SETTINGS.repost_queues) && env.IN_DEV == false;
36export const isQueueEnabled = (env: Bindings) => env.QUEUE_SETTINGS.enabled && hasPostQueue(env);
37export const isRepostQueueEnabled = (env: Bindings) => env.QUEUE_SETTINGS.repostsEnabled && hasRepostQueue(env);
38export const shouldPostNowQueue = (env: Bindings) => env.QUEUE_SETTINGS.postNowEnabled && isQueueEnabled(env);
39export const shouldPostThreadQueue = (env: Bindings) => env.QUEUE_SETTINGS.threadEnabled && (hasPostQueue(env) || isQueueEnabled(env));
40
41export async function enqueuePost(c: AllContext, data: Post, delay: number = -1) {
42 if (data.isThreadRoot) {
43 if (!shouldPostThreadQueue(c.env))
44 return;
45 } else if (!isQueueEnabled(c.env))
46 return;
47
48 // Pick a random consumer to handle this post
49 const queueConsumer: Queue|null = getRandomQueue(c.env, "post_queues");
50 await pushToQueue(queueConsumer, data, TaskType.Post, delay);
51};
52
53export async function enqueueRepost(c: AllContext, data: Repost, delay: number = -1) {
54 if (!isRepostQueueEnabled(c.env))
55 return;
56
57 // Pick a random consumer to handle this repost
58 const queueConsumer: Queue|null = getRandomQueue(c.env, "repost_queues");
59 await pushToQueue(queueConsumer, data, TaskType.Repost, delay);
60};
61
62export async function enqueueEmptyWork(c: AllContext, queueType: TaskType, delay: number = -1) {
63 const queueConsumer: Queue|null = getRandomQueue(c.env, (queueType === TaskType.Post) ? "post_queues" : "repost_queues");
64 await pushToQueue(queueConsumer, null, TaskType.Blast, delay);
65};