unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Job, MetricsTime, Worker } from "bullmq";
2import { inboxWorker } from "./queueProcessors/inbox.js";
3import { prepareSendRemotePostWorker } from "./queueProcessors/prepareSendRemotePost.js";
4import { sendPostToInboxes } from "./queueProcessors/sendPostToInboxes.js";
5import { getRemoteActorIdProcessor } from "./queueProcessors/getRemoteActorIdProcessor.js";
6import { logger } from "./logger.js";
7import { processRemotePostView } from "./queueProcessors/processRemotePostView.js";
8import { processRemoteMedia } from "./queueProcessors/remoteMediaProcessor.js";
9import { processFirehose } from "../atproto/workers/processFirehoseWorker.js";
10import { sendPushNotification } from "./queueProcessors/sendPushNotification.js";
11import { checkPushNotificationDelivery } from "./queueProcessors/checkPushNotificationDelivery.js";
12import { generateUserKeyPair } from "./queueProcessors/generateUserKeyPair.js";
13import { completeEnvironment } from "./backendOptions.js";
14import { sendPostBsky } from "./queueProcessors/sendPostBsky.js";
15import { processSinglePostJob } from "../atproto/workers/processSinglePostWorker.js";
16
17logger.info("started worker");
18const workerInbox = new Worker("inbox", (job: Job) => inboxWorker(job), {
19 connection: completeEnvironment.bullmqConnection,
20 metrics: {
21 maxDataPoints: MetricsTime.ONE_WEEK * 2,
22 },
23 concurrency: completeEnvironment.workers.low,
24});
25
26const workerPrepareSendPost = new Worker(
27 "prepareSendPost",
28 (job: Job) => prepareSendRemotePostWorker(job),
29 {
30 connection: completeEnvironment.bullmqConnection,
31 metrics: {
32 maxDataPoints: MetricsTime.ONE_WEEK * 2,
33 },
34 concurrency: completeEnvironment.workers.high,
35 lockDuration: 60000,
36 }
37);
38
39const workerSendPostBsky = new Worker(
40 "sendPostBsky",
41 (job: Job) => sendPostBsky(job),
42 {
43 connection: completeEnvironment.bullmqConnection,
44 metrics: {
45 maxDataPoints: MetricsTime.ONE_WEEK * 2,
46 },
47 concurrency: completeEnvironment.workers.high,
48 lockDuration: 60000,
49 }
50);
51
52const workerSendPostChunk = new Worker(
53 "sendPostToInboxes",
54 (job: Job) => sendPostToInboxes(job),
55 {
56 connection: completeEnvironment.bullmqConnection,
57 metrics: {
58 maxDataPoints: MetricsTime.ONE_WEEK * 2,
59 },
60 concurrency: completeEnvironment.workers.high,
61 lockDuration: 120000,
62 }
63);
64
65const workerDeletePost = new Worker(
66 "deletePostQueue",
67 (job: Job) => sendPostToInboxes(job),
68 {
69 connection: completeEnvironment.bullmqConnection,
70 metrics: {
71 maxDataPoints: MetricsTime.ONE_WEEK * 2,
72 },
73 concurrency: completeEnvironment.workers.high,
74 lockDuration: 120000,
75 }
76);
77
78const workerGetUser = new Worker(
79 "getRemoteActorId",
80 async (job: Job) => await getRemoteActorIdProcessor(job),
81 {
82 connection: completeEnvironment.bullmqConnection,
83 metrics: {
84 maxDataPoints: MetricsTime.ONE_WEEK * 2,
85 },
86 concurrency: completeEnvironment.workers.high,
87 lockDuration: 120000,
88 }
89);
90
91const workerProcessRemotePostView = new Worker(
92 "processRemoteView",
93 async (job: Job) => await processRemotePostView(job),
94 {
95 connection: completeEnvironment.bullmqConnection,
96 metrics: {
97 maxDataPoints: MetricsTime.ONE_WEEK * 2,
98 },
99 concurrency: completeEnvironment.workers.low,
100 lockDuration: 120000,
101 }
102);
103
104const workerProcessRemoteMediaData = new Worker(
105 "processRemoteMediaData",
106 async (job: Job) => await processRemoteMedia(job),
107 {
108 connection: completeEnvironment.bullmqConnection,
109 metrics: {
110 maxDataPoints: MetricsTime.ONE_WEEK * 2,
111 },
112 concurrency: completeEnvironment.workers.low,
113 lockDuration: 120000,
114 }
115);
116
117const workerProcessFirehose = completeEnvironment.enableBsky
118 ? new Worker(
119 "firehoseQueue",
120 async (job: Job) => await processFirehose(job),
121 {
122 connection: completeEnvironment.bullmqConnection,
123 metrics: {
124 maxDataPoints: MetricsTime.ONE_WEEK * 2,
125 },
126 concurrency: completeEnvironment.workers.medium,
127 // up to one minute
128 lockDuration: 60000,
129 }
130 )
131 : null;
132
133const workerProcessSinglePost = completeEnvironment.enableBsky
134 ? new Worker(
135 "processSinglePost",
136 async (job: Job) => await processSinglePostJob(job),
137 {
138 connection: completeEnvironment.bullmqConnection,
139 metrics: {
140 maxDataPoints: MetricsTime.ONE_WEEK * 2,
141 },
142 concurrency: 25,
143 // up to one minute
144 lockDuration: 60000,
145 }
146 )
147 : null;
148
149const workerSendPushNotification = new Worker(
150 "sendPushNotification",
151 async (job: Job) => await sendPushNotification(job),
152 {
153 connection: completeEnvironment.bullmqConnection,
154 metrics: {
155 maxDataPoints: MetricsTime.ONE_WEEK * 2,
156 },
157 concurrency: completeEnvironment.workers.medium,
158 }
159);
160
161const workerCheckPushNotificationDelivery = new Worker(
162 "checkPushNotificationDelivery",
163 async (job: Job) => await checkPushNotificationDelivery(job),
164 {
165 connection: completeEnvironment.bullmqConnection,
166 metrics: {
167 maxDataPoints: MetricsTime.ONE_WEEK * 2,
168 },
169 concurrency: completeEnvironment.workers.medium,
170 }
171);
172
173const workerGenerateUserKeyPair = new Worker(
174 "generateUserKeyPair",
175 async (job: Job) => await generateUserKeyPair(job),
176 {
177 connection: completeEnvironment.bullmqConnection,
178 metrics: {
179 maxDataPoints: MetricsTime.ONE_WEEK * 2,
180 },
181 concurrency: 1, // this one is VERY cpu intensive
182 }
183);
184
185const workers = [
186 workerInbox,
187 workerDeletePost,
188 workerGetUser,
189 workerPrepareSendPost,
190 workerProcessRemotePostView,
191 workerSendPostChunk,
192 workerProcessRemotePostView,
193 workerProcessRemoteMediaData,
194 workerSendPushNotification,
195 workerCheckPushNotificationDelivery,
196 workerGenerateUserKeyPair,
197];
198if (completeEnvironment.enableBsky) {
199 workers.push(workerProcessFirehose as Worker);
200 workers.push(workerProcessSinglePost as Worker);
201 workers.push(workerSendPostBsky as Worker);
202}
203
204workers.forEach((worker) => {
205 worker.on("error", (err) => {
206 logger.warn({
207 message: `worker ${worker.name} had error`,
208 error: err,
209 });
210 });
211 worker.on("failed", (err) => {
212 logger.warn({
213 message: `worker ${worker.name} failed`,
214 error: err,
215 });
216 });
217});
218
219const workersToLogFail = [
220 workerInbox,
221 workerDeletePost,
222 workerGetUser,
223 workerPrepareSendPost,
224 workerProcessRemotePostView,
225 workerSendPostChunk,
226 workerSendPushNotification,
227 workerGenerateUserKeyPair,
228];
229if (completeEnvironment.enableBsky) {
230 workersToLogFail.push(workerProcessFirehose as Worker);
231 workersToLogFail.push(workerProcessSinglePost as Worker);
232 workersToLogFail.push(workerSendPostBsky as Worker);
233}
234
235workersToLogFail.forEach((worker) =>
236 worker.on("failed", (err) => {
237 logger.warn({
238 message: `worker ${worker.name} failed`,
239 error: err,
240 });
241 })
242);
243
244export {
245 workerInbox,
246 workerSendPostChunk,
247 workerPrepareSendPost,
248 workerGetUser,
249 workerDeletePost,
250 workerProcessRemotePostView,
251 workerProcessRemoteMediaData,
252 workerProcessFirehose,
253 workerProcessSinglePost,
254 workerSendPushNotification,
255 workerCheckPushNotificationDelivery,
256 workerGenerateUserKeyPair,
257 workerSendPostBsky,
258};