elasticsearch-based configurable generic appview for prototyping ideas
1import { subscriptions, computeCid, type Subscription, type SubscriptionResult } from "./xrpc.ts";
2import { type IndexerEvent } from "./indexer.ts";
3import { getSafeField, matchDocAgainstQuery } from "./live-utils.ts";
4
5interface ClientDelta {
6 documents: Record<string, { cid: string; doc: Record<string, unknown> }>;
7 queries: Record<string, { ecid: string; result: string[] }>;
8}
9
10function extractSortValues(doc: Record<string, any>, sortClause: any[]): unknown[] {
11 const values: unknown[] = [];
12 for (const sortField of sortClause) {
13 const key = Object.keys(sortField)[0];
14 const value = getSafeField(doc, key);
15 values.push(value);
16 }
17 return values;
18}
19
20function compareItems(aSorts: unknown[], bSorts: unknown[], sortClause: any[]): number {
21 for (let i = 0; i < sortClause.length; i++) {
22 const direction = Object.values(sortClause[i])[0];
23 const valA = aSorts[i];
24 const valB = bSorts[i];
25
26 if (valA === valB) continue;
27
28 if (valA === undefined || valA === null) return 1;
29 if (valB === undefined || valB === null) return -1;
30
31 let comparison = 0;
32 if (valA < valB) {
33 comparison = -1;
34 } else if (valA > valB) {
35 comparison = 1;
36 }
37
38 if (comparison !== 0) {
39 return direction === 'desc' ? -comparison : comparison;
40 }
41 }
42 return 0;
43}
44
45function findInsertIndex(sortedArray: Subscription['result'], newItem: SubscriptionResult, sortClause: any[]): number {
46 let low = 0;
47 let high = sortedArray.length;
48
49 while (low < high) {
50 const mid = Math.floor((low + high) / 2);
51 const comparison = compareItems(newItem.sortValues, sortedArray[mid].sortValues, sortClause);
52 if (comparison < 0) {
53 high = mid;
54 } else {
55 low = mid + 1;
56 }
57 }
58 return low;
59}
60
61export async function processEventForSync(event: IndexerEvent) {
62 const requiredDocuments = new Set<string>();
63
64 const changedQueries: {
65 queryId: string;
66 newEcid: string;
67 newRichResult: SubscriptionResult[];
68 newResultUris: string[];
69 sub: Subscription;
70 }[] = [];
71
72 for (const [queryId, sub] of subscriptions.entries()) {
73 let newRichResult: SubscriptionResult[] | undefined = undefined;
74 const sortClause = (sub.esQuery as any).sort;
75
76 if (event.type === 'index') {
77 const queryClause = (sub.esQuery as any).query ?? {};
78 const doesMatch = matchDocAgainstQuery(event.data, queryClause);
79 const currentIndex = sub.result.findIndex(item => item.uri === event.uri);
80 const wasInResult = currentIndex !== -1;
81
82 if (doesMatch && !wasInResult) {
83 const newItem = {
84 uri: event.uri,
85 sortValues: extractSortValues(event.data, sortClause),
86 };
87 const insertIndex = findInsertIndex(sub.result, newItem, sortClause);
88
89 newRichResult = [...sub.result];
90 newRichResult.splice(insertIndex, 0, newItem);
91 requiredDocuments.add(event.uri);
92
93 } else if (!doesMatch && wasInResult) {
94 newRichResult = sub.result.filter(item => item.uri !== event.uri);
95
96 } else if (doesMatch && wasInResult) {
97 const newItem = {
98 uri: event.uri,
99 sortValues: extractSortValues(event.data, sortClause),
100 };
101 if (compareItems(newItem.sortValues, sub.result[currentIndex].sortValues, sortClause) !== 0) {
102 const tempResult = sub.result.filter(item => item.uri !== event.uri);
103 const insertIndex = findInsertIndex(tempResult, newItem, sortClause);
104
105 newRichResult = [...tempResult];
106 newRichResult.splice(insertIndex, 0, newItem);
107 }
108 requiredDocuments.add(event.uri);
109 }
110 } else if (event.type === 'delete') {
111 if (sub.result.some(item => item.uri === event.uri)) {
112 newRichResult = sub.result.filter(item => item.uri !== event.uri);
113 }
114 }
115
116 if (newRichResult) {
117 const newResultUris = newRichResult.map(item => item.uri);
118 const newEcid = await computeCid(newResultUris);
119
120 if (newEcid !== sub.ecid) {
121 changedQueries.push({
122 queryId,
123 newEcid,
124 newRichResult,
125 newResultUris,
126 sub
127 });
128 }
129 }
130 }
131
132 for (const { queryId, newEcid, newRichResult } of changedQueries) {
133 const sub = subscriptions.get(queryId);
134 if (sub) {
135 sub.ecid = newEcid;
136 sub.result = newRichResult;
137 }
138 }
139
140 const documentsPayload: Record<string, { cid: string; doc: Record<string, unknown> }> = {};
141 if (event.type === 'index') {
142 for (const uri of requiredDocuments) {
143 documentsPayload[uri] = {
144 cid: event.data["$metadata.cid"] as string,
145 doc: event.data as Record<string, unknown>,
146 };
147 }
148 }
149
150 if (changedQueries.length > 0 || Object.keys(documentsPayload).length > 0) {
151 const affectedClients = new Set<WebSocket>();
152 changedQueries.forEach(cq => cq.sub.clients.forEach(c => affectedClients.add(c)));
153
154 if (Object.keys(documentsPayload).length > 0 && event.type === 'index') {
155 for (const [, sub] of subscriptions.entries()) {
156 if (sub.result.some(item => item.uri === event.uri)) {
157 sub.clients.forEach(c => affectedClients.add(c));
158 }
159 }
160 }
161
162 for (const client of affectedClients) {
163 const delta: ClientDelta = {
164 documents: documentsPayload,
165 queries: {},
166 };
167
168 for (const { queryId, newEcid, newResultUris, sub } of changedQueries) {
169 if (sub.clients.has(client)) {
170 delta.queries[queryId] = { ecid: newEcid, result: newResultUris };
171 }
172 }
173
174 if (Object.keys(delta.queries).length > 0 || Object.keys(documentsPayload).length > 0) {
175 if (client.readyState === WebSocket.OPEN) {
176 try {
177 client.send(JSON.stringify({ type: 'query-delta', ...delta }));
178 } catch (err) {
179 console.error("Failed to send delta to client:", err);
180 }
181 }
182 }
183 }
184 }
185}