unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import { Job, MetricsTime, Worker } from 'bullmq'
2import { inboxWorker } from './queueProcessors/inbox.js'
3import { prepareSendRemotePostWorker } from './queueProcessors/prepareSendRemotePost.js'
4import { sendPostToInboxes } from './queueProcessors/sendPostToInboxes.js'
5import { getRemoteActorIdProcessor } from './queueProcessors/getRemoteActorIdProcessor.js'
6import { logger } from './logger.js'
7import { processRemotePostView } from './queueProcessors/processRemotePostView.js'
8import { processRemoteMedia } from './queueProcessors/remoteMediaProcessor.js'
9import { processFirehose } from '../atproto/workers/processFirehoseWorker.js'
10import { sendPushNotification } from './queueProcessors/sendPushNotification.js'
11import { checkPushNotificationDelivery } from './queueProcessors/checkPushNotificationDelivery.js'
12import { generateUserKeyPair } from './queueProcessors/generateUserKeyPair.js'
13import { completeEnvironment } from './backendOptions.js'
14import { sendPostBsky } from './queueProcessors/sendPostBsky.js'
15
16logger.info('started worker')
17const workerInbox = new Worker('inbox', (job: Job) => inboxWorker(job), {
18 connection: completeEnvironment.bullmqConnection,
19 metrics: {
20 maxDataPoints: MetricsTime.ONE_WEEK * 2
21 },
22 concurrency: completeEnvironment.workers.low
23})
24
25const workerPrepareSendPost = new Worker('prepareSendPost', (job: Job) => prepareSendRemotePostWorker(job), {
26 connection: completeEnvironment.bullmqConnection,
27 metrics: {
28 maxDataPoints: MetricsTime.ONE_WEEK * 2
29 },
30 concurrency: completeEnvironment.workers.high,
31 lockDuration: 60000
32})
33
34const workerSendPostBsky = new Worker('sendPostBsky', (job: Job) => sendPostBsky(job), {
35 connection: completeEnvironment.bullmqConnection,
36 metrics: {
37 maxDataPoints: MetricsTime.ONE_WEEK * 2
38 },
39 concurrency: completeEnvironment.workers.high,
40 lockDuration: 60000
41})
42
43const workerSendPostChunk = new Worker('sendPostToInboxes', (job: Job) => sendPostToInboxes(job), {
44 connection: completeEnvironment.bullmqConnection,
45 metrics: {
46 maxDataPoints: MetricsTime.ONE_WEEK * 2
47 },
48 concurrency: completeEnvironment.workers.high,
49 lockDuration: 120000
50})
51
52const workerDeletePost = new Worker('deletePostQueue', (job: Job) => sendPostToInboxes(job), {
53 connection: completeEnvironment.bullmqConnection,
54 metrics: {
55 maxDataPoints: MetricsTime.ONE_WEEK * 2
56 },
57 concurrency: completeEnvironment.workers.high,
58 lockDuration: 120000
59})
60
61const workerGetUser = new Worker('getRemoteActorId', async (job: Job) => await getRemoteActorIdProcessor(job), {
62 connection: completeEnvironment.bullmqConnection,
63 metrics: {
64 maxDataPoints: MetricsTime.ONE_WEEK * 2
65 },
66 concurrency: completeEnvironment.workers.high,
67 lockDuration: 120000
68})
69
70const workerProcessRemotePostView = new Worker(
71 'processRemoteView',
72 async (job: Job) => await processRemotePostView(job),
73 {
74 connection: completeEnvironment.bullmqConnection,
75 metrics: {
76 maxDataPoints: MetricsTime.ONE_WEEK * 2
77 },
78 concurrency: completeEnvironment.workers.low,
79 lockDuration: 120000
80 }
81)
82
83const workerProcessRemoteMediaData = new Worker(
84 'processRemoteMediaData',
85 async (job: Job) => await processRemoteMedia(job),
86 {
87 connection: completeEnvironment.bullmqConnection,
88 metrics: {
89 maxDataPoints: MetricsTime.ONE_WEEK * 2
90 },
91 concurrency: completeEnvironment.workers.low,
92 lockDuration: 120000
93 }
94)
95
96const workerProcessFirehose = completeEnvironment.enableBsky
97 ? new Worker('firehoseQueue', async (job: Job) => await processFirehose(job), {
98 connection: completeEnvironment.bullmqConnection,
99 metrics: {
100 maxDataPoints: MetricsTime.ONE_WEEK * 2
101 },
102 concurrency: completeEnvironment.workers.medium,
103 // up to one minute
104 lockDuration: 60000
105 })
106 : null
107
108const workerSendPushNotification = new Worker(
109 'sendPushNotification',
110 async (job: Job) => await sendPushNotification(job),
111 {
112 connection: completeEnvironment.bullmqConnection,
113 metrics: {
114 maxDataPoints: MetricsTime.ONE_WEEK * 2
115 },
116 concurrency: completeEnvironment.workers.medium
117 }
118)
119
120const workerCheckPushNotificationDelivery = new Worker(
121 'checkPushNotificationDelivery',
122 async (job: Job) => await checkPushNotificationDelivery(job),
123 {
124 connection: completeEnvironment.bullmqConnection,
125 metrics: {
126 maxDataPoints: MetricsTime.ONE_WEEK * 2
127 },
128 concurrency: completeEnvironment.workers.medium
129 }
130)
131
132const workerGenerateUserKeyPair = new Worker(
133 'generateUserKeyPair',
134 async (job: Job) => await generateUserKeyPair(job),
135 {
136 connection: completeEnvironment.bullmqConnection,
137 metrics: {
138 maxDataPoints: MetricsTime.ONE_WEEK * 2
139 },
140 concurrency: 1 // this one is VERY cpu intensive
141 }
142)
143
144const workers = [
145 workerInbox,
146 workerDeletePost,
147 workerGetUser,
148 workerPrepareSendPost,
149 workerProcessRemotePostView,
150 workerSendPostChunk,
151 workerProcessRemotePostView,
152 workerProcessRemoteMediaData,
153 workerSendPushNotification,
154 workerCheckPushNotificationDelivery,
155 workerGenerateUserKeyPair
156]
157if (completeEnvironment.enableBsky) {
158 workers.push(workerProcessFirehose as Worker)
159 workers.push(workerSendPostBsky as Worker)
160}
161
162workers.forEach((worker) => {
163 worker.on('error', (err) => {
164 logger.warn({
165 message: `worker ${worker.name} had error`,
166 error: err
167 })
168 })
169 worker.on('failed', (err) => {
170 logger.warn({
171 message: `worker ${worker.name} failed`,
172 error: err
173 })
174 })
175})
176
177const workersToLogFail = [
178 workerInbox,
179 workerDeletePost,
180 workerGetUser,
181 workerPrepareSendPost,
182 workerProcessRemotePostView,
183 workerSendPostChunk,
184 workerSendPushNotification,
185 workerGenerateUserKeyPair
186]
187if (completeEnvironment.enableBsky) {
188 workersToLogFail.push(workerProcessFirehose as Worker)
189 workersToLogFail.push(workerSendPostBsky as Worker)
190}
191
192workersToLogFail.forEach((worker) =>
193 worker.on('failed', (err) => {
194 logger.warn({
195 message: `worker ${worker.name} failed`,
196 error: err
197 })
198 })
199)
200
201export {
202 workerInbox,
203 workerSendPostChunk,
204 workerPrepareSendPost,
205 workerGetUser,
206 workerDeletePost,
207 workerProcessRemotePostView,
208 workerProcessRemoteMediaData,
209 workerProcessFirehose,
210 workerSendPushNotification,
211 workerCheckPushNotificationDelivery,
212 workerGenerateUserKeyPair,
213 workerSendPostBsky
214}