Schedule posts to Bluesky with Cloudflare workers. skyscheduler.work
cf tool bsky-tool cloudflare bluesky schedule bsky service social-media cloudflare-workers
at main 96 lines 4.2 kB view raw
1import unique from "just-unique"; 2import { ScheduledContext } from "../../classes/context"; 3import { Post } from "../../classes/post"; 4import { Repost } from "../../classes/repost"; 5import { Bindings, QueueTaskData, TaskType } from "../../types"; 6import { AgentMap } from "../bsky/bskyAgents"; 7import { userHasViolations } from "../db/violations"; 8import { isPost } from "../helpers"; 9import { handlePostTask, handleRepostTask } from "../scheduler"; 10import { enqueueEmptyWork } from "./queuePublisher"; 11 12type BufferBlast = { 13 type: TaskType, 14 time: number 15}; 16 17export async function processQueue(batch: MessageBatch<QueueTaskData>, env: Bindings, ctx: ExecutionContext) { 18 // runtime overhead 19 const runtimeWrapper = new ScheduledContext(env, ctx); 20 const agency = new AgentMap(env.TASK_SETTINGS); 21 22 // Retry settings 23 const delay: number = env.QUEUE_SETTINGS.delay_val; 24 const maxRetries: number = env.QUEUE_SETTINGS.max_retries; 25 const bufferRetries: boolean = env.QUEUE_SETTINGS.pressure_retries || false; 26 let bufferBlasts: BufferBlast[] = []; 27 28 for (const message of batch.messages) { 29 let wasSuccess: boolean = false; 30 const taskType: TaskType = message.body.type; 31 if (taskType == TaskType.Post || taskType == TaskType.Repost) { 32 if (message.body.data == null) { 33 console.error(`got a task type of ${taskType} but the message body has no data. cannot be processed!`); 34 // maybe this was a bad send, so try it again later. Do not backblast as it was not an upstream failure. 35 message.retry(); 36 continue; 37 } 38 39 // This probably doesn't need to be recreated anymore because we send the literal JS object now 40 // TODO: Check if we're already a class before new constructing 41 const postDataObj: Post|Repost = (isPost(message.body.data)) ? new Post(message.body.data) : new Repost(message.body.data); 42 const agent = await agency.getOrAddAgentFromObj(runtimeWrapper, postDataObj, taskType); 43 if (agent == null) { 44 const userId = postDataObj.getUser(); 45 // if we could not get an agent for you, we should check to see if you have violations 46 // if you do, we stop processing you. 47 if (await userHasViolations(runtimeWrapper, userId)) { 48 console.log(`User ${userId} has violations, dropping them from the queue`); 49 message.ack(); 50 continue; 51 } else { 52 console.warn(`Could not make an agent for ${userId}, got null.`); 53 } 54 55 } else { 56 if (taskType == TaskType.Post) 57 wasSuccess = await handlePostTask(runtimeWrapper, postDataObj as Post, agent); 58 else 59 wasSuccess = await handleRepostTask(runtimeWrapper, postDataObj as Repost, agent); 60 } 61 } else if (taskType == TaskType.Blast) { 62 console.log(`Got a blast message with ${batch.messages.length} messages in batch`); 63 wasSuccess = true; 64 } else { 65 console.error("Got a message queue task type that was invalid"); 66 message.ack(); 67 return; 68 } 69 // Handle queue acknowledgement on success/failure 70 if (!wasSuccess) { 71 const currentAttempts: number = message.attempts; 72 const delaySeconds = delay*(currentAttempts+1); 73 console.log(`attempting to retry message ${taskType} in ${delaySeconds}`); 74 message.retry({delaySeconds: delaySeconds}); 75 76 // if the attempts are over the maximum amount of retries then do not backblast 77 if (currentAttempts > maxRetries) 78 continue; 79 80 // push a backblast so that this item will retry in the future. 81 // it basically just writes null in the buffer, which is silly but w/e 82 bufferBlasts.push({type: taskType, time: delaySeconds}); 83 } else { 84 message.ack(); 85 } 86 } 87 // If we have any retries, they'll only get delivered on next batch 88 // so we're going to back blast the buffer queue so that we can make sure the retries go. 89 if (bufferRetries && bufferBlasts.length > 0) { 90 bufferBlasts = unique(bufferBlasts); 91 console.log(`Attempting to backblast ${bufferBlasts.length} items`); 92 for (const blast of bufferBlasts) { 93 await enqueueEmptyWork(runtimeWrapper, blast.type, blast.time + 10); 94 } 95 } 96};