Schedule posts to Bluesky with Cloudflare workers. skyscheduler.work
cf tool bsky-tool cloudflare bluesky schedule bsky service social-media cloudflare-workers
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

ah yes a big reorganization again

woohoo let's go boys.

Also handles messaging on violations, off for right now #97

+205 -177
+4 -12
src/auth/index.ts
··· 5 5 import { drizzle, DrizzleD1Database } from "drizzle-orm/d1"; 6 6 import { schema } from "../db"; 7 7 import { BSKY_MAX_USERNAME_LENGTH, BSKY_MIN_USERNAME_LENGTH } from "../limits"; 8 - import { APP_NAME, SITE_URL } from "../siteinfo"; 8 + import { APP_NAME } from "../siteinfo"; 9 9 import { Bindings } from "../types"; 10 - import { lookupBskyHandle } from "../utils/bskyApi"; 11 - import { createDMWithUser } from "../utils/bskyMsg"; 12 - 13 - function createPasswordResetMessage(url: string, token: string) { 14 - return `Your ${APP_NAME} password reset url is: 15 - ${SITE_URL}/reset-password/${token} 16 - 17 - This URL will expire in about an hour. 18 - 19 - If you did not request a password reset, please ignore this message.`; 20 - } 10 + import { lookupBskyHandle } from "../utils/bsky/bskyApi"; 11 + import { createDMWithUser } from "../utils/bsky/bskyMessage"; 12 + import { createPasswordResetMessage } from "../utils/messages/accountReset"; 21 13 22 14 // Single auth configuration that handles both CLI and runtime scenarios 23 15 function createAuth(env?: Bindings, cf?: IncomingRequestCfProperties) {
+3 -2
src/classes/bskyLogin.ts
··· 4 4 pds: string; 5 5 username: string; 6 6 password: string; 7 - valid: boolean; 8 7 constructor(data: any) { 9 8 if (isEmpty(data)) { 10 9 this.password = this.username = this.pds = ""; ··· 13 12 this.username = data.user; 14 13 this.password = data.pass; 15 14 } 16 - this.valid = !isEmpty(data.user) && !isEmpty(data.pass); 15 + } 16 + get valid(): boolean { 17 + return !isEmpty(this.username) && !isEmpty(this.password); 17 18 } 18 19 };
+2 -2
src/endpoints/account.tsx
··· 8 8 import { rateLimit } from "../middleware/rateLimit"; 9 9 import { verifyTurnstile } from "../middleware/turnstile"; 10 10 import { Bindings, LooseObj } from "../types"; 11 - import { lookupBskyHandle, lookupBskyPDS } from "../utils/bskyApi"; 12 - import { checkIfCanDMUser } from "../utils/bskyMsg"; 11 + import { lookupBskyHandle, lookupBskyPDS } from "../utils/bsky/bskyApi"; 12 + import { checkIfCanDMUser } from "../utils/bsky/bskyMessage"; 13 13 import { getAllMediaOfUser } from "../utils/db/file"; 14 14 import { doesUserExist, getUserEmailForHandle, getUsernameForUser } from "../utils/db/userinfo"; 15 15 import { userHasBan } from "../utils/db/violations";
+1 -1
src/index.tsx
··· 21 21 import { SITE_URL } from "./siteinfo"; 22 22 import { Bindings, QueueTaskData } from "./types"; 23 23 import { makeConstScript } from "./utils/constScriptGen"; 24 - import { processQueue } from "./utils/queueHandler"; 24 + import { processQueue } from "./utils/queues/queueHandler"; 25 25 import { cleanUpPostsTask, schedulePostTask } from "./utils/scheduler"; 26 26 import { setupAccounts } from "./utils/setup"; 27 27
+87
src/utils/bsky/bskyAgents.ts
··· 1 + // this file is used to handle atpagents and their reuse during cron/queues 2 + // this is done because logging into PDSes across tasks can be extremely 3 + // expensive time wise. 4 + import AtpAgent from "@atproto/api"; 5 + import { Post } from "../../classes/post"; 6 + import { Repost } from "../../classes/repost"; 7 + import { 8 + AccountStatus, AgentConfigSettings, 9 + AllContext, TaskType 10 + } from "../../types"; 11 + import { getBskyUserPassForId } from "../db/userinfo"; 12 + import { createViolationForUser } from "../db/violations"; 13 + import { resetAppPasswordMessage } from "../messages/resetAppPassword"; 14 + import { loginToBsky } from "./bskyLogin"; 15 + import { createDMWithUser } from "./bskyMessage"; 16 + 17 + type AgentLoginResponse = { 18 + agent: AtpAgent|null; 19 + violation: boolean; 20 + wasNewViolation?: boolean; 21 + violationType?: AccountStatus; 22 + } 23 + 24 + export class AgentMap { 25 + #forPosts: boolean; 26 + #forReposts: boolean; 27 + #map: Map<string, AtpAgent|null>; 28 + constructor(config: AgentConfigSettings) { 29 + this.#forPosts = config.use_posts; 30 + this.#forReposts = config.use_reposts; 31 + this.#map = new Map(); 32 + } 33 + async getOrAddAgent(c: AllContext, userId: string, type: TaskType): Promise<AtpAgent|null> { 34 + const usesAgent: boolean = this.usesAgentForType(type); 35 + let mappedAgent = (usesAgent) ? this.#map.get(userId) : null; 36 + if (mappedAgent === undefined) { 37 + const {agent, violation} = await AgentMap.getAgentDirect(c, userId, false); 38 + mappedAgent = agent; 39 + if (usesAgent) { 40 + // only add this agent if it's not null 41 + // but if we have a violation, we should absolutely add it in. 42 + if (agent !== null || violation) { 43 + this.#map.set(userId, agent); 44 + } 45 + } 46 + } 47 + return mappedAgent; 48 + }; 49 + async getOrAddAgentFromObj(c: AllContext, data: Post|Repost, type: TaskType): Promise<AtpAgent|null> { 50 + const userId: string = (type === TaskType.Post) ? (data as Post).user : (data as Repost).userId; 51 + return await this.getOrAddAgent(c, userId, type); 52 + }; 53 + static async getAgentDirect(c: AllContext, userId: string, messageOnViolation: boolean): Promise<AgentLoginResponse> { 54 + const loginCreds = await getBskyUserPassForId(c, userId); 55 + if (loginCreds.valid === false) { 56 + console.error(`credentials for user ${userId} were invalid`); 57 + return {agent: null, violation: false}; 58 + } 59 + const {username, password, pds} = loginCreds; 60 + // Login to bsky 61 + const agent = new AtpAgent({ service: new URL(pds) }); 62 + 63 + const loginResponse: AccountStatus = await loginToBsky(agent, username, password); 64 + if (loginResponse != AccountStatus.Ok) { 65 + const addViolation: boolean = await createViolationForUser(c, userId, loginResponse); 66 + if (addViolation) { 67 + console.error(`Unable to login for ${userId} with violation ${loginResponse}`); 68 + if (messageOnViolation && loginResponse == AccountStatus.InvalidAccount) { 69 + await createDMWithUser(c.env, username, resetAppPasswordMessage()); 70 + } 71 + } else { 72 + console.error(`Unable to login ${userId}, no new violation made, got ${loginResponse}`); 73 + } 74 + return {agent: null, violation: true, wasNewViolation: addViolation, violationType: loginResponse}; 75 + } 76 + return {agent: agent, violation: false}; 77 + }; 78 + usesAgentForType(type: TaskType) { 79 + switch(type) { 80 + case TaskType.Post: 81 + return this.#forPosts; 82 + case TaskType.Repost: 83 + return this.#forReposts; 84 + } 85 + return false; 86 + } 87 + };
+49
src/utils/bsky/bskyLogin.ts
··· 1 + // handles general login for agents. 2 + // most cases should not be interfacing directly with this function, but instead 3 + // use bskyAgents or bskyMsg to talk to the network 4 + import AtpAgent from "@atproto/api"; 5 + import { AccountStatus, LooseObj } from "../../types"; 6 + 7 + export const loginToBsky = async (agent: AtpAgent, user: string, pass: string) => { 8 + try { 9 + const loginResponse = await agent.login({ 10 + identifier: user, 11 + password: pass, 12 + allowTakendown: true 13 + }); 14 + if (!loginResponse.success) { 15 + if (loginResponse.data.active == false) { 16 + switch (loginResponse.data.status) { 17 + case "deactivated": 18 + return AccountStatus.Deactivated; 19 + case "suspended": 20 + return AccountStatus.Suspended; 21 + case "takendown": 22 + return AccountStatus.TakenDown; 23 + } 24 + return AccountStatus.InvalidAccount; 25 + } 26 + return AccountStatus.PlatformOutage; 27 + } 28 + return AccountStatus.Ok; 29 + } catch (err) { 30 + // Apparently login can rethrow as an XRPCError and completely eat the original throw. 31 + // so errors don't get handled gracefully. 32 + const errWrap: LooseObj = err as LooseObj; 33 + const errorName = errWrap.constructor.name; 34 + if (errorName === "XRPCError") { 35 + const errCode = errWrap.status; 36 + if (errCode == 401) { 37 + // app password is bad 38 + return AccountStatus.InvalidAccount; 39 + } else if (errCode >= 500) { 40 + return AccountStatus.PlatformOutage; 41 + } 42 + } else if (errorName === "XRPCNotSupported") { 43 + // handle is bad 44 + return AccountStatus.InvalidAccount; 45 + } 46 + console.error(`encountered exception on login for user ${user}, err ${err}`); 47 + } 48 + return AccountStatus.UnhandledError; 49 + };
-117
src/utils/bskyAgents.ts
··· 1 - // this file is used to handle atpagents and their reuse during cron/queues 2 - // this is done because logging into PDSes across tasks can be extremely 3 - // expensive time wise. 4 - // 5 - // Also just handles general login. 6 - import AtpAgent from "@atproto/api"; 7 - import { Post } from "../classes/post"; 8 - import { Repost } from "../classes/repost"; 9 - import { 10 - AccountStatus, AgentConfigSettings, 11 - AllContext, LooseObj, TaskType 12 - } from "../types"; 13 - import { getBskyUserPassForId } from "./db/userinfo"; 14 - import { createViolationForUser } from "./db/violations"; 15 - 16 - export const makeAgentForUser = async (c: AllContext, userId: string) => { 17 - const loginCreds = await getBskyUserPassForId(c, userId); 18 - if (loginCreds.valid === false) { 19 - console.error(`credentials for user ${userId} were invalid`); 20 - return null; 21 - } 22 - const {username, password, pds} = loginCreds; 23 - // Login to bsky 24 - const agent = new AtpAgent({ service: new URL(pds) }); 25 - 26 - const loginResponse: AccountStatus = await loginToBsky(agent, username, password); 27 - if (loginResponse != AccountStatus.Ok) { 28 - const addViolation: boolean = await createViolationForUser(c, userId, loginResponse); 29 - if (addViolation) 30 - console.error(`Unable to login for ${userId} with violation ${loginResponse}`); 31 - else 32 - console.error(`Unable to login ${userId}, no violation made, got ${loginResponse}`); 33 - return null; 34 - } 35 - return agent; 36 - }; 37 - 38 - export const loginToBsky = async (agent: AtpAgent, user: string, pass: string) => { 39 - try { 40 - const loginResponse = await agent.login({ 41 - identifier: user, 42 - password: pass, 43 - allowTakendown: true 44 - }); 45 - if (!loginResponse.success) { 46 - if (loginResponse.data.active == false) { 47 - switch (loginResponse.data.status) { 48 - case "deactivated": 49 - return AccountStatus.Deactivated; 50 - case "suspended": 51 - return AccountStatus.Suspended; 52 - case "takendown": 53 - return AccountStatus.TakenDown; 54 - } 55 - return AccountStatus.InvalidAccount; 56 - } 57 - return AccountStatus.PlatformOutage; 58 - } 59 - return AccountStatus.Ok; 60 - } catch (err) { 61 - // Apparently login can rethrow as an XRPCError and completely eat the original throw. 62 - // so errors don't get handled gracefully. 63 - const errWrap: LooseObj = err as LooseObj; 64 - const errorName = errWrap.constructor.name; 65 - if (errorName === "XRPCError") { 66 - const errCode = errWrap.status; 67 - if (errCode == 401) { 68 - // app password is bad 69 - return AccountStatus.InvalidAccount; 70 - } else if (errCode >= 500) { 71 - return AccountStatus.PlatformOutage; 72 - } 73 - } else if (errorName === "XRPCNotSupported") { 74 - // handle is bad 75 - return AccountStatus.InvalidAccount; 76 - } 77 - console.error(`encountered exception on login for user ${user}, err ${err}`); 78 - } 79 - return AccountStatus.UnhandledError; 80 - }; 81 - 82 - export class AgentMap { 83 - #forPosts: boolean; 84 - #forReposts: boolean; 85 - #map: Map<string, AtpAgent>; 86 - constructor(config: AgentConfigSettings) { 87 - this.#forPosts = config.use_posts; 88 - this.#forReposts = config.use_reposts; 89 - this.#map = new Map(); 90 - } 91 - async getOrAddAgent(c: AllContext, userId: string, type: TaskType): Promise<AtpAgent|null> { 92 - const usesAgent: boolean = this.usesAgentForType(type); 93 - let agent = (usesAgent) ? this.#map.get(userId) || null : null; 94 - if (agent === null) { 95 - agent = await AgentMap.getAgentDirect(c, userId); 96 - if (usesAgent && agent !== null) 97 - this.#map.set(userId, agent); 98 - } 99 - return agent; 100 - }; 101 - async getOrAddAgentFromObj(c: AllContext, data: Post|Repost, type: TaskType): Promise<AtpAgent|null> { 102 - const userId: string = (type === TaskType.Post) ? (data as Post).user : (data as Repost).userId; 103 - return await this.getOrAddAgent(c, userId, type); 104 - }; 105 - static async getAgentDirect(c: AllContext, userId: string) { 106 - return await makeAgentForUser(c, userId); 107 - }; 108 - usesAgentForType(type: TaskType) { 109 - switch(type) { 110 - case TaskType.Post: 111 - return this.#forPosts; 112 - case TaskType.Repost: 113 - return this.#forReposts; 114 - } 115 - return false; 116 - } 117 - };
+13 -22
src/utils/bskyApi.ts src/utils/bsky/bskyApi.ts
··· 4 4 import has from 'just-has'; 5 5 import isEmpty from "just-is-empty"; 6 6 import truncate from "just-truncate"; 7 - import { Post } from "../classes/post"; 8 - import { Repost } from "../classes/repost"; 9 - import { BSKY_IMG_SIZE_LIMIT, MAX_ALT_TEXT, MAX_EMBEDS_PER_POST } from '../limits'; 7 + import { Post } from "../../classes/post"; 8 + import { Repost } from "../../classes/repost"; 9 + import { BSKY_IMG_SIZE_LIMIT, MAX_ALT_TEXT, MAX_EMBEDS_PER_POST } from '../../limits'; 10 10 import { 11 11 AccountStatus, 12 12 AllContext, 13 13 BskyEmbedWrapper, BskyRecordWrapper, EmbedData, EmbedDataType, 14 14 LooseObj, PostLabel, 15 15 PostRecordResponse, PostStatus 16 - } from '../types'; 17 - import { atpRecordURI } from '../validation/regexCases'; 18 - import { makeAgentForUser } from './bskyAgents'; 16 + } from '../../types'; 17 + import { atpRecordURI } from '../../validation/regexCases'; 19 18 import { 20 19 bulkUpdatePostedData, getChildPostsOfThread, 21 20 isPostAlreadyPosted, setPostNowOffForPost 22 - } from './db/data'; 23 - import { getUsernameForUserId } from './db/userinfo'; 24 - import { createViolationForUser } from './db/violations'; 25 - import { deleteEmbedsFromR2 } from './r2Query'; 21 + } from '../db/data'; 22 + import { getUsernameForUserId } from '../db/userinfo'; 23 + import { createViolationForUser } from '../db/violations'; 24 + import { deleteEmbedsFromR2 } from '../r2Query'; 26 25 27 26 export const doesHandleExist = async (user: string) => { 28 27 try { ··· 106 105 } 107 106 108 107 export const makeRepost = async (c: AllContext, content: Repost, usingAgent: AtpAgent) => { 109 - let bWasSuccess = true; 110 - const agent: AtpAgent|null = (usingAgent === null) ? await makeAgentForUser(c, content.userId) : usingAgent; 111 - if (agent === null) { 112 - console.warn(`could not make agent for repost ${content.postid}`); 113 - return false; 114 - } 115 - 116 108 try { 117 - await agent.deleteRepost(content.uri); 109 + await usingAgent.deleteRepost(content.uri); 118 110 } catch { 119 111 // This probably should not be a warning, and should silently fail. 120 112 // the only thing that actually matters is the object below. ··· 122 114 } 123 115 124 116 try { 125 - await agent.repost(content.uri, content.cid); 117 + await usingAgent.repost(content.uri, content.cid); 118 + return true; 126 119 } catch(err) { 127 120 console.error(`Failed to repost ${content.uri}, got error ${err}`); 128 - bWasSuccess = false; 121 + return false; 129 122 } 130 - 131 - return bWasSuccess; 132 123 }; 133 124 134 125 const makePostRaw = async (c: AllContext, content: Post, agent: AtpAgent): Promise<PostStatus|null> => {
+2 -2
src/utils/bskyMsg.ts src/utils/bsky/bskyMessage.ts
··· 1 1 import { AtpAgent, RichText } from '@atproto/api'; 2 - import { AccountStatus, Bindings } from '../types'; 3 - import { loginToBsky } from './bskyAgents'; 2 + import { AccountStatus, Bindings } from '../../types'; 3 + import { loginToBsky } from './bskyLogin'; 4 4 5 5 const chatHeaders = {headers: { 6 6 "atproto-proxy": "did:web:api.bsky.chat#bsky_chat"
+2 -2
src/utils/bskyPrune.ts src/utils/bsky/bskyPrune.ts
··· 1 1 import isEmpty from 'just-is-empty'; 2 2 import split from 'just-split'; 3 - import { AllContext } from '../types'; 3 + import { AllContext } from '../../types'; 4 4 import { getPostRecords } from './bskyApi'; 5 - import { getAllPostedPosts, getAllPostedPostsOfUser } from './db/data'; 5 + import { getAllPostedPosts, getAllPostedPostsOfUser } from '../db/data'; 6 6 7 7 // This looks for a bunch of posts that are posted and determines if the posts 8 8 // are still on the network or not. If they are not, then this prunes the posts from
+1 -1
src/utils/db/violations.ts
··· 4 4 import isEmpty from "just-is-empty"; 5 5 import { bannedUsers, violations } from "../../db/enforcement.schema"; 6 6 import { AccountStatus, AllContext, LooseObj, Violation } from "../../types"; 7 - import { lookupBskyHandle } from "../bskyApi"; 7 + import { lookupBskyHandle } from "../bsky/bskyApi"; 8 8 import { getUsernameForUserId } from "./userinfo"; 9 9 10 10 const createBanForUser = async(db: DrizzleD1Database, userName: string, reason: string) => {
+10
src/utils/messages/accountReset.ts
··· 1 + import { APP_NAME, SITE_URL } from "../../siteinfo"; 2 + 3 + export function createPasswordResetMessage(url: string, token: string) { 4 + return `Your ${APP_NAME} password reset url is: 5 + ${SITE_URL}/reset-password/${token} 6 + 7 + This URL will expire in about an hour. 8 + 9 + If you did not request a password reset, please ignore this message.`; 10 + };
+15
src/utils/messages/resetAppPassword.ts
··· 1 + import { APP_NAME } from "../../siteinfo"; 2 + 3 + export function resetAppPasswordMessage() { 4 + return `Hey there! This is an automated message to let you know 5 + that your ${APP_NAME} account is no longer automatically posting. 6 + 7 + This occurs when your BSky App Password either expires or gets deleted. 8 + If you did not mean to do this, log back onto ${APP_NAME} and provide 9 + an updated BSky app password in the "Account Settings" section of the 10 + dashboard. 11 + 12 + This message can only be seen by you. If you no longer want to use 13 + ${APP_NAME} or don't need to for awhile, no worries! This bot will 14 + only send you this message once (unless you password expires again).`; 15 + };
+8 -8
src/utils/queueHandler.ts src/utils/queues/queueHandler.ts
··· 1 1 import unique from "just-unique"; 2 - import { ScheduledContext } from "../classes/context"; 3 - import { Post } from "../classes/post"; 4 - import { Repost } from "../classes/repost"; 5 - import { Bindings, QueueTaskData, TaskType } from "../types"; 6 - import { AgentMap } from "./bskyAgents"; 7 - import { userHasViolations } from "./db/violations"; 8 - import { isPost } from "./helpers"; 2 + import { ScheduledContext } from "../../classes/context"; 3 + import { Post } from "../../classes/post"; 4 + import { Repost } from "../../classes/repost"; 5 + import { Bindings, QueueTaskData, TaskType } from "../../types"; 6 + import { AgentMap } from "../bsky/bskyAgents"; 7 + import { userHasViolations } from "../db/violations"; 8 + import { isPost } from "../helpers"; 9 + import { handlePostTask, handleRepostTask } from "../scheduler"; 9 10 import { enqueueEmptyWork } from "./queuePublisher"; 10 - import { handlePostTask, handleRepostTask } from "./scheduler"; 11 11 12 12 type BufferBlast = { 13 13 type: TaskType,
+3 -3
src/utils/queuePublisher.ts src/utils/queues/queuePublisher.ts
··· 1 1 import isEmpty from 'just-is-empty'; 2 2 import random from 'just-random'; 3 3 import get from 'just-safe-get'; 4 - import { Post } from "../classes/post"; 5 - import { Repost } from "../classes/repost"; 6 - import { AllContext, Bindings, QueueTaskData, TaskType } from "../types"; 4 + import { Post } from "../../classes/post"; 5 + import { Repost } from "../../classes/repost"; 6 + import { AllContext, Bindings, QueueTaskData, TaskType } from "../../types"; 7 7 8 8 // picks a random queue to publish data to 9 9 const getRandomQueue = (env: Bindings, listName: string): Queue|null => {
+5 -5
src/utils/scheduler.ts
··· 3 3 import { Post } from "../classes/post"; 4 4 import { Repost } from "../classes/repost"; 5 5 import { AllContext, TaskType } from '../types'; 6 - import { AgentMap } from './bskyAgents'; 7 - import { makePost, makeRepost } from './bskyApi'; 8 - import { pruneBskyPosts } from './bskyPrune'; 6 + import { AgentMap } from './bsky/bskyAgents'; 7 + import { makePost, makeRepost } from './bsky/bskyApi'; 8 + import { pruneBskyPosts } from './bsky/bskyPrune'; 9 9 import { 10 10 deleteAllRepostsBeforeCurrentTime, deletePosts, getAllPostsForCurrentTime, 11 11 getAllRepostsForCurrentTime, purgePostedPosts, ··· 15 15 import { 16 16 enqueuePost, enqueueRepost, isQueueEnabled, 17 17 isRepostQueueEnabled, shouldPostNowQueue, shouldPostThreadQueue 18 - } from './queuePublisher'; 18 + } from './queues/queuePublisher'; 19 19 import { deleteFromR2 } from './r2Query'; 20 20 21 21 export const handlePostTask = async(runtime: AllContext, postData: Post, agent: AtpAgent|null) => { ··· 43 43 postStatus = false; 44 44 } 45 45 } else { 46 - const agent = await AgentMap.getAgentDirect(c, postData.user); 46 + const {agent} = await AgentMap.getAgentDirect(c, postData.user, false); 47 47 if (agent === null) { 48 48 console.error(`unable to get agent for user ${postData.user} to post now`); 49 49 postStatus = false;