A decentralized music tracking and discovery platform built on AT Protocol 馃幍
listenbrainz spotify atproto lastfm musicbrainz scrobbling
at main 70 lines 1.8 kB view raw
1import { ctx } from "./context.ts"; 2import schema from "./schema/mod.ts"; 3import _ from "@es-toolkit/es-toolkit/compat"; 4import { broadcastEvent } from "./main.ts"; 5import type { InsertEvent } from "./schema/event.ts"; 6import logger from "./logger.ts"; 7 8const BATCH_SIZE = 100; 9const BATCH_TIMEOUT_MS = 100; 10 11let eventBatch: InsertEvent[] = []; 12let batchTimer: number | null = null; 13let flushPromise: Promise<void> | null = null; 14 15export async function flushBatch() { 16 if (flushPromise) { 17 await flushPromise; 18 return; 19 } 20 21 if (eventBatch.length === 0) return; 22 23 flushPromise = (async () => { 24 const toInsert = [...eventBatch]; 25 eventBatch = []; 26 27 try { 28 logger.info`馃攧 Flushing batch of ${toInsert.length} events...`; 29 30 const results = await ctx.db 31 .insert(schema.events) 32 .values(toInsert) 33 .onConflictDoNothing() 34 .returning() 35 .execute(); 36 37 for (const result of results) { 38 broadcastEvent(result); 39 } 40 41 logger.info`馃摑 Batch inserted ${results.length} events`; 42 } catch (error) { 43 logger.error`Failed to insert batch: ${error}`; 44 // Re-add failed events to the front of the batch for retry 45 eventBatch = [...toInsert, ...eventBatch]; 46 } finally { 47 flushPromise = null; 48 } 49 })(); 50 51 await flushPromise; 52} 53 54export function addToBatch(event: InsertEvent) { 55 eventBatch.push(event); 56 57 if (batchTimer !== null) { 58 clearTimeout(batchTimer); 59 batchTimer = null; 60 } 61 62 if (eventBatch.length >= BATCH_SIZE) { 63 flushBatch().catch((err) => logger.error`Flush error: ${err}`); 64 } else { 65 batchTimer = setTimeout(() => { 66 batchTimer = null; 67 flushBatch().catch((err) => logger.error`Flush error: ${err}`); 68 }, BATCH_TIMEOUT_MS); 69 } 70}