A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
1import { EventEmitter } from "events";
2import { logger } from "../logger/index.js";
3
4export interface HydrationTask {
5 type: "post" | "profile";
6 identifier: string;
7}
8
9export class HydrationQueue extends EventEmitter {
10 private queue: HydrationTask[] = [];
11 private processing = false;
12 private processingTask: HydrationTask | null = null;
13
14 enqueue(task: HydrationTask): void {
15 const isDuplicate = this.queue.some(
16 (t) => t.type === task.type && t.identifier === task.identifier
17 );
18
19 if (isDuplicate) {
20 logger.debug(
21 { type: task.type, identifier: task.identifier },
22 "Skipping duplicate task"
23 );
24 return;
25 }
26
27 if (
28 this.processingTask?.type === task.type &&
29 this.processingTask?.identifier === task.identifier
30 ) {
31 logger.debug(
32 { type: task.type, identifier: task.identifier },
33 "Task already being processed"
34 );
35 return;
36 }
37
38 this.queue.push(task);
39 logger.debug(
40 { type: task.type, identifier: task.identifier, queueSize: this.queue.length },
41 "Task enqueued"
42 );
43
44 if (!this.processing) {
45 this.processNext();
46 }
47 }
48
49 private async processNext(): Promise<void> {
50 if (this.queue.length === 0) {
51 this.processing = false;
52 return;
53 }
54
55 this.processing = true;
56 this.processingTask = this.queue.shift()!;
57
58 logger.debug(
59 {
60 type: this.processingTask.type,
61 identifier: this.processingTask.identifier,
62 remaining: this.queue.length,
63 },
64 "Processing task"
65 );
66
67 this.emit("task", this.processingTask);
68
69 setTimeout(() => {
70 this.processingTask = null;
71 this.processNext();
72 }, 100);
73 }
74
75 getQueueSize(): number {
76 return this.queue.length;
77 }
78
79 clear(): void {
80 this.queue = [];
81 this.processing = false;
82 this.processingTask = null;
83 logger.info("Hydration queue cleared");
84 }
85}