import { subscriptions, computeCid, type Subscription, type SubscriptionResult } from "./xrpc.ts"; import { type IndexerEvent } from "./indexer.ts"; import { getSafeField, matchDocAgainstQuery } from "./live-utils.ts"; interface ClientDelta { documents: Record }>; queries: Record; } function extractSortValues(doc: Record, sortClause: any[]): unknown[] { const values: unknown[] = []; for (const sortField of sortClause) { const key = Object.keys(sortField)[0]; const value = getSafeField(doc, key); values.push(value); } return values; } function compareItems(aSorts: unknown[], bSorts: unknown[], sortClause: any[]): number { for (let i = 0; i < sortClause.length; i++) { const direction = Object.values(sortClause[i])[0]; const valA = aSorts[i]; const valB = bSorts[i]; if (valA === valB) continue; if (valA === undefined || valA === null) return 1; if (valB === undefined || valB === null) return -1; let comparison = 0; if (valA < valB) { comparison = -1; } else if (valA > valB) { comparison = 1; } if (comparison !== 0) { return direction === 'desc' ? -comparison : comparison; } } return 0; } function findInsertIndex(sortedArray: Subscription['result'], newItem: SubscriptionResult, sortClause: any[]): number { let low = 0; let high = sortedArray.length; while (low < high) { const mid = Math.floor((low + high) / 2); const comparison = compareItems(newItem.sortValues, sortedArray[mid].sortValues, sortClause); if (comparison < 0) { high = mid; } else { low = mid + 1; } } return low; } export async function processEventForSync(event: IndexerEvent) { const requiredDocuments = new Set(); const changedQueries: { queryId: string; newEcid: string; newRichResult: SubscriptionResult[]; newResultUris: string[]; sub: Subscription; }[] = []; for (const [queryId, sub] of subscriptions.entries()) { let newRichResult: SubscriptionResult[] | undefined = undefined; const sortClause = (sub.esQuery as any).sort; if (event.type === 'index') { const queryClause = (sub.esQuery as any).query ?? {}; const doesMatch = matchDocAgainstQuery(event.data, queryClause); const currentIndex = sub.result.findIndex(item => item.uri === event.uri); const wasInResult = currentIndex !== -1; if (doesMatch && !wasInResult) { const newItem = { uri: event.uri, sortValues: extractSortValues(event.data, sortClause), }; const insertIndex = findInsertIndex(sub.result, newItem, sortClause); newRichResult = [...sub.result]; newRichResult.splice(insertIndex, 0, newItem); requiredDocuments.add(event.uri); } else if (!doesMatch && wasInResult) { newRichResult = sub.result.filter(item => item.uri !== event.uri); } else if (doesMatch && wasInResult) { const newItem = { uri: event.uri, sortValues: extractSortValues(event.data, sortClause), }; if (compareItems(newItem.sortValues, sub.result[currentIndex].sortValues, sortClause) !== 0) { const tempResult = sub.result.filter(item => item.uri !== event.uri); const insertIndex = findInsertIndex(tempResult, newItem, sortClause); newRichResult = [...tempResult]; newRichResult.splice(insertIndex, 0, newItem); } requiredDocuments.add(event.uri); } } else if (event.type === 'delete') { if (sub.result.some(item => item.uri === event.uri)) { newRichResult = sub.result.filter(item => item.uri !== event.uri); } } if (newRichResult) { const newResultUris = newRichResult.map(item => item.uri); const newEcid = await computeCid(newResultUris); if (newEcid !== sub.ecid) { changedQueries.push({ queryId, newEcid, newRichResult, newResultUris, sub }); } } } for (const { queryId, newEcid, newRichResult } of changedQueries) { const sub = subscriptions.get(queryId); if (sub) { sub.ecid = newEcid; sub.result = newRichResult; } } const documentsPayload: Record }> = {}; if (event.type === 'index') { for (const uri of requiredDocuments) { documentsPayload[uri] = { cid: event.data["$metadata.cid"] as string, doc: event.data as Record, }; } } if (changedQueries.length > 0 || Object.keys(documentsPayload).length > 0) { const affectedClients = new Set(); changedQueries.forEach(cq => cq.sub.clients.forEach(c => affectedClients.add(c))); if (Object.keys(documentsPayload).length > 0 && event.type === 'index') { for (const [, sub] of subscriptions.entries()) { if (sub.result.some(item => item.uri === event.uri)) { sub.clients.forEach(c => affectedClients.add(c)); } } } for (const client of affectedClients) { const delta: ClientDelta = { documents: documentsPayload, queries: {}, }; for (const { queryId, newEcid, newResultUris, sub } of changedQueries) { if (sub.clients.has(client)) { delta.queries[queryId] = { ecid: newEcid, result: newResultUris }; } } if (Object.keys(delta.queries).length > 0 || Object.keys(documentsPayload).length > 0) { if (client.readyState === WebSocket.OPEN) { try { client.send(JSON.stringify({ type: 'query-delta', ...delta })); } catch (err) { console.error("Failed to send delta to client:", err); } } } } } }