unoffical wafrn mirror wafrn.net
atproto social-network activitypub
at testPDSNotExplode 214 lines 6.4 kB view raw
1import { Job, MetricsTime, Worker } from 'bullmq' 2import { inboxWorker } from './queueProcessors/inbox.js' 3import { prepareSendRemotePostWorker } from './queueProcessors/prepareSendRemotePost.js' 4import { sendPostToInboxes } from './queueProcessors/sendPostToInboxes.js' 5import { getRemoteActorIdProcessor } from './queueProcessors/getRemoteActorIdProcessor.js' 6import { logger } from './logger.js' 7import { processRemotePostView } from './queueProcessors/processRemotePostView.js' 8import { processRemoteMedia } from './queueProcessors/remoteMediaProcessor.js' 9import { processFirehose } from '../atproto/workers/processFirehoseWorker.js' 10import { sendPushNotification } from './queueProcessors/sendPushNotification.js' 11import { checkPushNotificationDelivery } from './queueProcessors/checkPushNotificationDelivery.js' 12import { generateUserKeyPair } from './queueProcessors/generateUserKeyPair.js' 13import { completeEnvironment } from './backendOptions.js' 14import { sendPostBsky } from './queueProcessors/sendPostBsky.js' 15 16logger.info('started worker') 17const workerInbox = new Worker('inbox', (job: Job) => inboxWorker(job), { 18 connection: completeEnvironment.bullmqConnection, 19 metrics: { 20 maxDataPoints: MetricsTime.ONE_WEEK * 2 21 }, 22 concurrency: completeEnvironment.workers.low 23}) 24 25const workerPrepareSendPost = new Worker('prepareSendPost', (job: Job) => prepareSendRemotePostWorker(job), { 26 connection: completeEnvironment.bullmqConnection, 27 metrics: { 28 maxDataPoints: MetricsTime.ONE_WEEK * 2 29 }, 30 concurrency: completeEnvironment.workers.high, 31 lockDuration: 60000 32}) 33 34const workerSendPostBsky = new Worker('sendPostBsky', (job: Job) => sendPostBsky(job), { 35 connection: completeEnvironment.bullmqConnection, 36 metrics: { 37 maxDataPoints: MetricsTime.ONE_WEEK * 2 38 }, 39 concurrency: completeEnvironment.workers.high, 40 lockDuration: 60000 41}) 42 43const workerSendPostChunk = new Worker('sendPostToInboxes', (job: Job) => sendPostToInboxes(job), { 44 connection: completeEnvironment.bullmqConnection, 45 metrics: { 46 maxDataPoints: MetricsTime.ONE_WEEK * 2 47 }, 48 concurrency: completeEnvironment.workers.high, 49 lockDuration: 120000 50}) 51 52const workerDeletePost = new Worker('deletePostQueue', (job: Job) => sendPostToInboxes(job), { 53 connection: completeEnvironment.bullmqConnection, 54 metrics: { 55 maxDataPoints: MetricsTime.ONE_WEEK * 2 56 }, 57 concurrency: completeEnvironment.workers.high, 58 lockDuration: 120000 59}) 60 61const workerGetUser = new Worker('getRemoteActorId', async (job: Job) => await getRemoteActorIdProcessor(job), { 62 connection: completeEnvironment.bullmqConnection, 63 metrics: { 64 maxDataPoints: MetricsTime.ONE_WEEK * 2 65 }, 66 concurrency: completeEnvironment.workers.high, 67 lockDuration: 120000 68}) 69 70const workerProcessRemotePostView = new Worker( 71 'processRemoteView', 72 async (job: Job) => await processRemotePostView(job), 73 { 74 connection: completeEnvironment.bullmqConnection, 75 metrics: { 76 maxDataPoints: MetricsTime.ONE_WEEK * 2 77 }, 78 concurrency: completeEnvironment.workers.low, 79 lockDuration: 120000 80 } 81) 82 83const workerProcessRemoteMediaData = new Worker( 84 'processRemoteMediaData', 85 async (job: Job) => await processRemoteMedia(job), 86 { 87 connection: completeEnvironment.bullmqConnection, 88 metrics: { 89 maxDataPoints: MetricsTime.ONE_WEEK * 2 90 }, 91 concurrency: completeEnvironment.workers.low, 92 lockDuration: 120000 93 } 94) 95 96const workerProcessFirehose = completeEnvironment.enableBsky 97 ? new Worker('firehoseQueue', async (job: Job) => await processFirehose(job), { 98 connection: completeEnvironment.bullmqConnection, 99 metrics: { 100 maxDataPoints: MetricsTime.ONE_WEEK * 2 101 }, 102 concurrency: completeEnvironment.workers.medium, 103 // up to one minute 104 lockDuration: 60000 105 }) 106 : null 107 108const workerSendPushNotification = new Worker( 109 'sendPushNotification', 110 async (job: Job) => await sendPushNotification(job), 111 { 112 connection: completeEnvironment.bullmqConnection, 113 metrics: { 114 maxDataPoints: MetricsTime.ONE_WEEK * 2 115 }, 116 concurrency: completeEnvironment.workers.medium 117 } 118) 119 120const workerCheckPushNotificationDelivery = new Worker( 121 'checkPushNotificationDelivery', 122 async (job: Job) => await checkPushNotificationDelivery(job), 123 { 124 connection: completeEnvironment.bullmqConnection, 125 metrics: { 126 maxDataPoints: MetricsTime.ONE_WEEK * 2 127 }, 128 concurrency: completeEnvironment.workers.medium 129 } 130) 131 132const workerGenerateUserKeyPair = new Worker( 133 'generateUserKeyPair', 134 async (job: Job) => await generateUserKeyPair(job), 135 { 136 connection: completeEnvironment.bullmqConnection, 137 metrics: { 138 maxDataPoints: MetricsTime.ONE_WEEK * 2 139 }, 140 concurrency: 1 // this one is VERY cpu intensive 141 } 142) 143 144const workers = [ 145 workerInbox, 146 workerDeletePost, 147 workerGetUser, 148 workerPrepareSendPost, 149 workerProcessRemotePostView, 150 workerSendPostChunk, 151 workerProcessRemotePostView, 152 workerProcessRemoteMediaData, 153 workerSendPushNotification, 154 workerCheckPushNotificationDelivery, 155 workerGenerateUserKeyPair 156] 157if (completeEnvironment.enableBsky) { 158 workers.push(workerProcessFirehose as Worker) 159 workers.push(workerSendPostBsky as Worker) 160} 161 162workers.forEach((worker) => { 163 worker.on('error', (err) => { 164 logger.warn({ 165 message: `worker ${worker.name} had error`, 166 error: err 167 }) 168 }) 169 worker.on('failed', (err) => { 170 logger.warn({ 171 message: `worker ${worker.name} failed`, 172 error: err 173 }) 174 }) 175}) 176 177const workersToLogFail = [ 178 workerInbox, 179 workerDeletePost, 180 workerGetUser, 181 workerPrepareSendPost, 182 workerProcessRemotePostView, 183 workerSendPostChunk, 184 workerSendPushNotification, 185 workerGenerateUserKeyPair 186] 187if (completeEnvironment.enableBsky) { 188 workersToLogFail.push(workerProcessFirehose as Worker) 189 workersToLogFail.push(workerSendPostBsky as Worker) 190} 191 192workersToLogFail.forEach((worker) => 193 worker.on('failed', (err) => { 194 logger.warn({ 195 message: `worker ${worker.name} failed`, 196 error: err 197 }) 198 }) 199) 200 201export { 202 workerInbox, 203 workerSendPostChunk, 204 workerPrepareSendPost, 205 workerGetUser, 206 workerDeletePost, 207 workerProcessRemotePostView, 208 workerProcessRemoteMediaData, 209 workerProcessFirehose, 210 workerSendPushNotification, 211 workerCheckPushNotificationDelivery, 212 workerGenerateUserKeyPair, 213 workerSendPostBsky 214}