elasticsearch-based configurable generic appview for prototyping ideas
at main 6.2 kB view raw
1import { subscriptions, computeCid, type Subscription, type SubscriptionResult } from "./xrpc.ts"; 2import { type IndexerEvent } from "./indexer.ts"; 3import { getSafeField, matchDocAgainstQuery } from "./live-utils.ts"; 4 5interface ClientDelta { 6 documents: Record<string, { cid: string; doc: Record<string, unknown> }>; 7 queries: Record<string, { ecid: string; result: string[] }>; 8} 9 10function extractSortValues(doc: Record<string, any>, sortClause: any[]): unknown[] { 11 const values: unknown[] = []; 12 for (const sortField of sortClause) { 13 const key = Object.keys(sortField)[0]; 14 const value = getSafeField(doc, key); 15 values.push(value); 16 } 17 return values; 18} 19 20function compareItems(aSorts: unknown[], bSorts: unknown[], sortClause: any[]): number { 21 for (let i = 0; i < sortClause.length; i++) { 22 const direction = Object.values(sortClause[i])[0]; 23 const valA = aSorts[i]; 24 const valB = bSorts[i]; 25 26 if (valA === valB) continue; 27 28 if (valA === undefined || valA === null) return 1; 29 if (valB === undefined || valB === null) return -1; 30 31 let comparison = 0; 32 if (valA < valB) { 33 comparison = -1; 34 } else if (valA > valB) { 35 comparison = 1; 36 } 37 38 if (comparison !== 0) { 39 return direction === 'desc' ? -comparison : comparison; 40 } 41 } 42 return 0; 43} 44 45function findInsertIndex(sortedArray: Subscription['result'], newItem: SubscriptionResult, sortClause: any[]): number { 46 let low = 0; 47 let high = sortedArray.length; 48 49 while (low < high) { 50 const mid = Math.floor((low + high) / 2); 51 const comparison = compareItems(newItem.sortValues, sortedArray[mid].sortValues, sortClause); 52 if (comparison < 0) { 53 high = mid; 54 } else { 55 low = mid + 1; 56 } 57 } 58 return low; 59} 60 61export async function processEventForSync(event: IndexerEvent) { 62 const requiredDocuments = new Set<string>(); 63 64 const changedQueries: { 65 queryId: string; 66 newEcid: string; 67 newRichResult: SubscriptionResult[]; 68 newResultUris: string[]; 69 sub: Subscription; 70 }[] = []; 71 72 for (const [queryId, sub] of subscriptions.entries()) { 73 let newRichResult: SubscriptionResult[] | undefined = undefined; 74 const sortClause = (sub.esQuery as any).sort; 75 76 if (event.type === 'index') { 77 const queryClause = (sub.esQuery as any).query ?? {}; 78 const doesMatch = matchDocAgainstQuery(event.data, queryClause); 79 const currentIndex = sub.result.findIndex(item => item.uri === event.uri); 80 const wasInResult = currentIndex !== -1; 81 82 if (doesMatch && !wasInResult) { 83 const newItem = { 84 uri: event.uri, 85 sortValues: extractSortValues(event.data, sortClause), 86 }; 87 const insertIndex = findInsertIndex(sub.result, newItem, sortClause); 88 89 newRichResult = [...sub.result]; 90 newRichResult.splice(insertIndex, 0, newItem); 91 requiredDocuments.add(event.uri); 92 93 } else if (!doesMatch && wasInResult) { 94 newRichResult = sub.result.filter(item => item.uri !== event.uri); 95 96 } else if (doesMatch && wasInResult) { 97 const newItem = { 98 uri: event.uri, 99 sortValues: extractSortValues(event.data, sortClause), 100 }; 101 if (compareItems(newItem.sortValues, sub.result[currentIndex].sortValues, sortClause) !== 0) { 102 const tempResult = sub.result.filter(item => item.uri !== event.uri); 103 const insertIndex = findInsertIndex(tempResult, newItem, sortClause); 104 105 newRichResult = [...tempResult]; 106 newRichResult.splice(insertIndex, 0, newItem); 107 } 108 requiredDocuments.add(event.uri); 109 } 110 } else if (event.type === 'delete') { 111 if (sub.result.some(item => item.uri === event.uri)) { 112 newRichResult = sub.result.filter(item => item.uri !== event.uri); 113 } 114 } 115 116 if (newRichResult) { 117 const newResultUris = newRichResult.map(item => item.uri); 118 const newEcid = await computeCid(newResultUris); 119 120 if (newEcid !== sub.ecid) { 121 changedQueries.push({ 122 queryId, 123 newEcid, 124 newRichResult, 125 newResultUris, 126 sub 127 }); 128 } 129 } 130 } 131 132 for (const { queryId, newEcid, newRichResult } of changedQueries) { 133 const sub = subscriptions.get(queryId); 134 if (sub) { 135 sub.ecid = newEcid; 136 sub.result = newRichResult; 137 } 138 } 139 140 const documentsPayload: Record<string, { cid: string; doc: Record<string, unknown> }> = {}; 141 if (event.type === 'index') { 142 for (const uri of requiredDocuments) { 143 documentsPayload[uri] = { 144 cid: event.data["$metadata.cid"] as string, 145 doc: event.data as Record<string, unknown>, 146 }; 147 } 148 } 149 150 if (changedQueries.length > 0 || Object.keys(documentsPayload).length > 0) { 151 const affectedClients = new Set<WebSocket>(); 152 changedQueries.forEach(cq => cq.sub.clients.forEach(c => affectedClients.add(c))); 153 154 if (Object.keys(documentsPayload).length > 0 && event.type === 'index') { 155 for (const [, sub] of subscriptions.entries()) { 156 if (sub.result.some(item => item.uri === event.uri)) { 157 sub.clients.forEach(c => affectedClients.add(c)); 158 } 159 } 160 } 161 162 for (const client of affectedClients) { 163 const delta: ClientDelta = { 164 documents: documentsPayload, 165 queries: {}, 166 }; 167 168 for (const { queryId, newEcid, newResultUris, sub } of changedQueries) { 169 if (sub.clients.has(client)) { 170 delta.queries[queryId] = { ecid: newEcid, result: newResultUris }; 171 } 172 } 173 174 if (Object.keys(delta.queries).length > 0 || Object.keys(documentsPayload).length > 0) { 175 if (client.readyState === WebSocket.OPEN) { 176 try { 177 client.send(JSON.stringify({ type: 'query-delta', ...delta })); 178 } catch (err) { 179 console.error("Failed to send delta to client:", err); 180 } 181 } 182 } 183 } 184 } 185}