A decentralized music tracking and discovery platform built on AT Protocol 馃幍
listenbrainz
spotify
atproto
lastfm
musicbrainz
scrobbling
1import { ctx } from "./context.ts";
2import schema from "./schema/mod.ts";
3import _ from "@es-toolkit/es-toolkit/compat";
4import { broadcastEvent } from "./main.ts";
5import type { InsertEvent } from "./schema/event.ts";
6import logger from "./logger.ts";
7
8const BATCH_SIZE = 100;
9const BATCH_TIMEOUT_MS = 100;
10
11let eventBatch: InsertEvent[] = [];
12let batchTimer: number | null = null;
13let flushPromise: Promise<void> | null = null;
14
15export async function flushBatch() {
16 if (flushPromise) {
17 await flushPromise;
18 return;
19 }
20
21 if (eventBatch.length === 0) return;
22
23 flushPromise = (async () => {
24 const toInsert = [...eventBatch];
25 eventBatch = [];
26
27 try {
28 logger.info`馃攧 Flushing batch of ${toInsert.length} events...`;
29
30 const results = await ctx.db
31 .insert(schema.events)
32 .values(toInsert)
33 .onConflictDoNothing()
34 .returning()
35 .execute();
36
37 for (const result of results) {
38 broadcastEvent(result);
39 }
40
41 logger.info`馃摑 Batch inserted ${results.length} events`;
42 } catch (error) {
43 logger.error`Failed to insert batch: ${error}`;
44 // Re-add failed events to the front of the batch for retry
45 eventBatch = [...toInsert, ...eventBatch];
46 } finally {
47 flushPromise = null;
48 }
49 })();
50
51 await flushPromise;
52}
53
54export function addToBatch(event: InsertEvent) {
55 eventBatch.push(event);
56
57 if (batchTimer !== null) {
58 clearTimeout(batchTimer);
59 batchTimer = null;
60 }
61
62 if (eventBatch.length >= BATCH_SIZE) {
63 flushBatch().catch((err) => logger.error`Flush error: ${err}`);
64 } else {
65 batchTimer = setTimeout(() => {
66 batchTimer = null;
67 flushBatch().catch((err) => logger.error`Flush error: ${err}`);
68 }, BATCH_TIMEOUT_MS);
69 }
70}