Schedule posts to Bluesky with Cloudflare workers. skyscheduler.work
cf tool bsky-tool cloudflare bluesky schedule bsky service social-media cloudflare-workers
at main 65 lines 2.9 kB view raw
1import isEmpty from 'just-is-empty'; 2import random from 'just-random'; 3import get from 'just-safe-get'; 4import { Post } from "../../classes/post"; 5import { Repost } from "../../classes/repost"; 6import { AllContext, Bindings, QueueTaskData, TaskType } from "../../types"; 7 8// picks a random queue to publish data to 9const getRandomQueue = (env: Bindings, listName: string): Queue|null => { 10 const queueListNames: string[] = get(env.QUEUE_SETTINGS, listName, []); 11 if (isEmpty(queueListNames)) 12 return null; 13 14 const queueName: string = random(queueListNames) || ""; 15 if (queueListNames.length > 1) 16 console.log(`Picked ${queueName} from ${listName}`); 17 return get(env, queueName, null); 18}; 19 20async function pushToQueue(queueConsumer: Queue|null, data: Post|Repost|null, taskType: TaskType, delay: number = -1) { 21 if (queueConsumer !== null) { 22 const options: QueueSendOptions = { 23 contentType: 'v8' 24 }; 25 if (delay > -1) 26 options.delaySeconds = delay; 27 28 await queueConsumer.send({type: taskType, data: data} as QueueTaskData, options); 29 } else { 30 console.warn(`could not push data to empty queue, was type ${taskType}`); 31 } 32}; 33 34const hasPostQueue = (env: Bindings) => !isEmpty(env.QUEUE_SETTINGS.post_queues) && env.IN_DEV == false; 35const hasRepostQueue = (env: Bindings) => !isEmpty(env.QUEUE_SETTINGS.repost_queues) && env.IN_DEV == false; 36export const isQueueEnabled = (env: Bindings) => env.QUEUE_SETTINGS.enabled && hasPostQueue(env); 37export const isRepostQueueEnabled = (env: Bindings) => env.QUEUE_SETTINGS.repostsEnabled && hasRepostQueue(env); 38export const shouldPostNowQueue = (env: Bindings) => env.QUEUE_SETTINGS.postNowEnabled && isQueueEnabled(env); 39export const shouldPostThreadQueue = (env: Bindings) => env.QUEUE_SETTINGS.threadEnabled && (hasPostQueue(env) || isQueueEnabled(env)); 40 41export async function enqueuePost(c: AllContext, data: Post, delay: number = -1) { 42 if (data.isThreadRoot) { 43 if (!shouldPostThreadQueue(c.env)) 44 return; 45 } else if (!isQueueEnabled(c.env)) 46 return; 47 48 // Pick a random consumer to handle this post 49 const queueConsumer: Queue|null = getRandomQueue(c.env, "post_queues"); 50 await pushToQueue(queueConsumer, data, TaskType.Post, delay); 51}; 52 53export async function enqueueRepost(c: AllContext, data: Repost, delay: number = -1) { 54 if (!isRepostQueueEnabled(c.env)) 55 return; 56 57 // Pick a random consumer to handle this repost 58 const queueConsumer: Queue|null = getRandomQueue(c.env, "repost_queues"); 59 await pushToQueue(queueConsumer, data, TaskType.Repost, delay); 60}; 61 62export async function enqueueEmptyWork(c: AllContext, queueType: TaskType, delay: number = -1) { 63 const queueConsumer: Queue|null = getRandomQueue(c.env, (queueType === TaskType.Post) ? "post_queues" : "repost_queues"); 64 await pushToQueue(queueConsumer, null, TaskType.Blast, delay); 65};