A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 1.9 kB view raw
1import { EventEmitter } from "events"; 2import { logger } from "../logger/index.js"; 3 4export interface HydrationTask { 5 type: "post" | "profile"; 6 identifier: string; 7} 8 9export class HydrationQueue extends EventEmitter { 10 private queue: HydrationTask[] = []; 11 private processing = false; 12 private processingTask: HydrationTask | null = null; 13 14 enqueue(task: HydrationTask): void { 15 const isDuplicate = this.queue.some( 16 (t) => t.type === task.type && t.identifier === task.identifier 17 ); 18 19 if (isDuplicate) { 20 logger.debug( 21 { type: task.type, identifier: task.identifier }, 22 "Skipping duplicate task" 23 ); 24 return; 25 } 26 27 if ( 28 this.processingTask?.type === task.type && 29 this.processingTask?.identifier === task.identifier 30 ) { 31 logger.debug( 32 { type: task.type, identifier: task.identifier }, 33 "Task already being processed" 34 ); 35 return; 36 } 37 38 this.queue.push(task); 39 logger.debug( 40 { type: task.type, identifier: task.identifier, queueSize: this.queue.length }, 41 "Task enqueued" 42 ); 43 44 if (!this.processing) { 45 this.processNext(); 46 } 47 } 48 49 private async processNext(): Promise<void> { 50 if (this.queue.length === 0) { 51 this.processing = false; 52 return; 53 } 54 55 this.processing = true; 56 this.processingTask = this.queue.shift()!; 57 58 logger.debug( 59 { 60 type: this.processingTask.type, 61 identifier: this.processingTask.identifier, 62 remaining: this.queue.length, 63 }, 64 "Processing task" 65 ); 66 67 this.emit("task", this.processingTask); 68 69 setTimeout(() => { 70 this.processingTask = null; 71 this.processNext(); 72 }, 100); 73 } 74 75 getQueueSize(): number { 76 return this.queue.length; 77 } 78 79 clear(): void { 80 this.queue = []; 81 this.processing = false; 82 this.processingTask = null; 83 logger.info("Hydration queue cleared"); 84 } 85}