unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at angular21 258 lines 7.2 kB view raw
1import { Job, MetricsTime, Worker } from "bullmq"; 2import { inboxWorker } from "./queueProcessors/inbox.js"; 3import { prepareSendRemotePostWorker } from "./queueProcessors/prepareSendRemotePost.js"; 4import { sendPostToInboxes } from "./queueProcessors/sendPostToInboxes.js"; 5import { getRemoteActorIdProcessor } from "./queueProcessors/getRemoteActorIdProcessor.js"; 6import { logger } from "./logger.js"; 7import { processRemotePostView } from "./queueProcessors/processRemotePostView.js"; 8import { processRemoteMedia } from "./queueProcessors/remoteMediaProcessor.js"; 9import { processFirehose } from "../atproto/workers/processFirehoseWorker.js"; 10import { sendPushNotification } from "./queueProcessors/sendPushNotification.js"; 11import { checkPushNotificationDelivery } from "./queueProcessors/checkPushNotificationDelivery.js"; 12import { generateUserKeyPair } from "./queueProcessors/generateUserKeyPair.js"; 13import { completeEnvironment } from "./backendOptions.js"; 14import { sendPostBsky } from "./queueProcessors/sendPostBsky.js"; 15import { processSinglePostJob } from "../atproto/workers/processSinglePostWorker.js"; 16 17logger.info("started worker"); 18const workerInbox = new Worker("inbox", (job: Job) => inboxWorker(job), { 19 connection: completeEnvironment.bullmqConnection, 20 metrics: { 21 maxDataPoints: MetricsTime.ONE_WEEK * 2, 22 }, 23 concurrency: completeEnvironment.workers.low, 24}); 25 26const workerPrepareSendPost = new Worker( 27 "prepareSendPost", 28 (job: Job) => prepareSendRemotePostWorker(job), 29 { 30 connection: completeEnvironment.bullmqConnection, 31 metrics: { 32 maxDataPoints: MetricsTime.ONE_WEEK * 2, 33 }, 34 concurrency: completeEnvironment.workers.high, 35 lockDuration: 60000, 36 } 37); 38 39const workerSendPostBsky = new Worker( 40 "sendPostBsky", 41 (job: Job) => sendPostBsky(job), 42 { 43 connection: completeEnvironment.bullmqConnection, 44 metrics: { 45 maxDataPoints: MetricsTime.ONE_WEEK * 2, 46 }, 47 concurrency: completeEnvironment.workers.high, 48 lockDuration: 60000, 49 } 50); 51 52const workerSendPostChunk = new Worker( 53 "sendPostToInboxes", 54 (job: Job) => sendPostToInboxes(job), 55 { 56 connection: completeEnvironment.bullmqConnection, 57 metrics: { 58 maxDataPoints: MetricsTime.ONE_WEEK * 2, 59 }, 60 concurrency: completeEnvironment.workers.high, 61 lockDuration: 120000, 62 } 63); 64 65const workerDeletePost = new Worker( 66 "deletePostQueue", 67 (job: Job) => sendPostToInboxes(job), 68 { 69 connection: completeEnvironment.bullmqConnection, 70 metrics: { 71 maxDataPoints: MetricsTime.ONE_WEEK * 2, 72 }, 73 concurrency: completeEnvironment.workers.high, 74 lockDuration: 120000, 75 } 76); 77 78const workerGetUser = new Worker( 79 "getRemoteActorId", 80 async (job: Job) => await getRemoteActorIdProcessor(job), 81 { 82 connection: completeEnvironment.bullmqConnection, 83 metrics: { 84 maxDataPoints: MetricsTime.ONE_WEEK * 2, 85 }, 86 concurrency: completeEnvironment.workers.high, 87 lockDuration: 120000, 88 } 89); 90 91const workerProcessRemotePostView = new Worker( 92 "processRemoteView", 93 async (job: Job) => await processRemotePostView(job), 94 { 95 connection: completeEnvironment.bullmqConnection, 96 metrics: { 97 maxDataPoints: MetricsTime.ONE_WEEK * 2, 98 }, 99 concurrency: completeEnvironment.workers.low, 100 lockDuration: 120000, 101 } 102); 103 104const workerProcessRemoteMediaData = new Worker( 105 "processRemoteMediaData", 106 async (job: Job) => await processRemoteMedia(job), 107 { 108 connection: completeEnvironment.bullmqConnection, 109 metrics: { 110 maxDataPoints: MetricsTime.ONE_WEEK * 2, 111 }, 112 concurrency: completeEnvironment.workers.low, 113 lockDuration: 120000, 114 } 115); 116 117const workerProcessFirehose = completeEnvironment.enableBsky 118 ? new Worker( 119 "firehoseQueue", 120 async (job: Job) => await processFirehose(job), 121 { 122 connection: completeEnvironment.bullmqConnection, 123 metrics: { 124 maxDataPoints: MetricsTime.ONE_WEEK * 2, 125 }, 126 concurrency: completeEnvironment.workers.medium, 127 // up to one minute 128 lockDuration: 60000, 129 } 130 ) 131 : null; 132 133const workerProcessSinglePost = completeEnvironment.enableBsky 134 ? new Worker( 135 "processSinglePost", 136 async (job: Job) => await processSinglePostJob(job), 137 { 138 connection: completeEnvironment.bullmqConnection, 139 metrics: { 140 maxDataPoints: MetricsTime.ONE_WEEK * 2, 141 }, 142 concurrency: 25, 143 // up to one minute 144 lockDuration: 60000, 145 } 146 ) 147 : null; 148 149const workerSendPushNotification = new Worker( 150 "sendPushNotification", 151 async (job: Job) => await sendPushNotification(job), 152 { 153 connection: completeEnvironment.bullmqConnection, 154 metrics: { 155 maxDataPoints: MetricsTime.ONE_WEEK * 2, 156 }, 157 concurrency: completeEnvironment.workers.medium, 158 } 159); 160 161const workerCheckPushNotificationDelivery = new Worker( 162 "checkPushNotificationDelivery", 163 async (job: Job) => await checkPushNotificationDelivery(job), 164 { 165 connection: completeEnvironment.bullmqConnection, 166 metrics: { 167 maxDataPoints: MetricsTime.ONE_WEEK * 2, 168 }, 169 concurrency: completeEnvironment.workers.medium, 170 } 171); 172 173const workerGenerateUserKeyPair = new Worker( 174 "generateUserKeyPair", 175 async (job: Job) => await generateUserKeyPair(job), 176 { 177 connection: completeEnvironment.bullmqConnection, 178 metrics: { 179 maxDataPoints: MetricsTime.ONE_WEEK * 2, 180 }, 181 concurrency: 1, // this one is VERY cpu intensive 182 } 183); 184 185const workers = [ 186 workerInbox, 187 workerDeletePost, 188 workerGetUser, 189 workerPrepareSendPost, 190 workerProcessRemotePostView, 191 workerSendPostChunk, 192 workerProcessRemotePostView, 193 workerProcessRemoteMediaData, 194 workerSendPushNotification, 195 workerCheckPushNotificationDelivery, 196 workerGenerateUserKeyPair, 197]; 198if (completeEnvironment.enableBsky) { 199 workers.push(workerProcessFirehose as Worker); 200 workers.push(workerProcessSinglePost as Worker); 201 workers.push(workerSendPostBsky as Worker); 202} 203 204workers.forEach((worker) => { 205 worker.on("error", (err) => { 206 logger.warn({ 207 message: `worker ${worker.name} had error`, 208 error: err, 209 }); 210 }); 211 worker.on("failed", (err) => { 212 logger.warn({ 213 message: `worker ${worker.name} failed`, 214 error: err, 215 }); 216 }); 217}); 218 219const workersToLogFail = [ 220 workerInbox, 221 workerDeletePost, 222 workerGetUser, 223 workerPrepareSendPost, 224 workerProcessRemotePostView, 225 workerSendPostChunk, 226 workerSendPushNotification, 227 workerGenerateUserKeyPair, 228]; 229if (completeEnvironment.enableBsky) { 230 workersToLogFail.push(workerProcessFirehose as Worker); 231 workersToLogFail.push(workerProcessSinglePost as Worker); 232 workersToLogFail.push(workerSendPostBsky as Worker); 233} 234 235workersToLogFail.forEach((worker) => 236 worker.on("failed", (err) => { 237 logger.warn({ 238 message: `worker ${worker.name} failed`, 239 error: err, 240 }); 241 }) 242); 243 244export { 245 workerInbox, 246 workerSendPostChunk, 247 workerPrepareSendPost, 248 workerGetUser, 249 workerDeletePost, 250 workerProcessRemotePostView, 251 workerProcessRemoteMediaData, 252 workerProcessFirehose, 253 workerProcessSinglePost, 254 workerSendPushNotification, 255 workerCheckPushNotificationDelivery, 256 workerGenerateUserKeyPair, 257 workerSendPostBsky, 258};