Schedule posts to Bluesky with Cloudflare workers.
skyscheduler.work
cf
tool
bsky-tool
cloudflare
bluesky
schedule
bsky
service
social-media
cloudflare-workers
1import unique from "just-unique";
2import { ScheduledContext } from "../../classes/context";
3import { Post } from "../../classes/post";
4import { Repost } from "../../classes/repost";
5import { Bindings, QueueTaskData, TaskType } from "../../types";
6import { AgentMap } from "../bsky/bskyAgents";
7import { userHasViolations } from "../db/violations";
8import { isPost } from "../helpers";
9import { handlePostTask, handleRepostTask } from "../scheduler";
10import { enqueueEmptyWork } from "./queuePublisher";
11
12type BufferBlast = {
13 type: TaskType,
14 time: number
15};
16
17export async function processQueue(batch: MessageBatch<QueueTaskData>, env: Bindings, ctx: ExecutionContext) {
18 // runtime overhead
19 const runtimeWrapper = new ScheduledContext(env, ctx);
20 const agency = new AgentMap(env.TASK_SETTINGS);
21
22 // Retry settings
23 const delay: number = env.QUEUE_SETTINGS.delay_val;
24 const maxRetries: number = env.QUEUE_SETTINGS.max_retries;
25 const bufferRetries: boolean = env.QUEUE_SETTINGS.pressure_retries || false;
26 let bufferBlasts: BufferBlast[] = [];
27
28 for (const message of batch.messages) {
29 let wasSuccess: boolean = false;
30 const taskType: TaskType = message.body.type;
31 if (taskType == TaskType.Post || taskType == TaskType.Repost) {
32 if (message.body.data == null) {
33 console.error(`got a task type of ${taskType} but the message body has no data. cannot be processed!`);
34 // maybe this was a bad send, so try it again later. Do not backblast as it was not an upstream failure.
35 message.retry();
36 continue;
37 }
38
39 // This probably doesn't need to be recreated anymore because we send the literal JS object now
40 // TODO: Check if we're already a class before new constructing
41 const postDataObj: Post|Repost = (isPost(message.body.data)) ? new Post(message.body.data) : new Repost(message.body.data);
42 const agent = await agency.getOrAddAgentFromObj(runtimeWrapper, postDataObj, taskType);
43 if (agent == null) {
44 const userId = postDataObj.getUser();
45 // if we could not get an agent for you, we should check to see if you have violations
46 // if you do, we stop processing you.
47 if (await userHasViolations(runtimeWrapper, userId)) {
48 console.log(`User ${userId} has violations, dropping them from the queue`);
49 message.ack();
50 continue;
51 } else {
52 console.warn(`Could not make an agent for ${userId}, got null.`);
53 }
54
55 } else {
56 if (taskType == TaskType.Post)
57 wasSuccess = await handlePostTask(runtimeWrapper, postDataObj as Post, agent);
58 else
59 wasSuccess = await handleRepostTask(runtimeWrapper, postDataObj as Repost, agent);
60 }
61 } else if (taskType == TaskType.Blast) {
62 console.log(`Got a blast message with ${batch.messages.length} messages in batch`);
63 wasSuccess = true;
64 } else {
65 console.error("Got a message queue task type that was invalid");
66 message.ack();
67 return;
68 }
69 // Handle queue acknowledgement on success/failure
70 if (!wasSuccess) {
71 const currentAttempts: number = message.attempts;
72 const delaySeconds = delay*(currentAttempts+1);
73 console.log(`attempting to retry message ${taskType} in ${delaySeconds}`);
74 message.retry({delaySeconds: delaySeconds});
75
76 // if the attempts are over the maximum amount of retries then do not backblast
77 if (currentAttempts > maxRetries)
78 continue;
79
80 // push a backblast so that this item will retry in the future.
81 // it basically just writes null in the buffer, which is silly but w/e
82 bufferBlasts.push({type: taskType, time: delaySeconds});
83 } else {
84 message.ack();
85 }
86 }
87 // If we have any retries, they'll only get delivered on next batch
88 // so we're going to back blast the buffer queue so that we can make sure the retries go.
89 if (bufferRetries && bufferBlasts.length > 0) {
90 bufferBlasts = unique(bufferBlasts);
91 console.log(`Attempting to backblast ${bufferBlasts.length} items`);
92 for (const blast of bufferBlasts) {
93 await enqueueEmptyWork(runtimeWrapper, blast.type, blast.time + 10);
94 }
95 }
96};