+49
-26
src/firehose.ts
+49
-26
src/firehose.ts
···
1
1
import { AppConfig } from "./config.ts";
2
-
import { indexDocument, deleteDocument } from "./indexer.ts";
2
+
import { indexDocument, deleteDocument, type OnEventCallback } from "./indexer.ts";
3
3
4
-
export async function startFirehose(config: AppConfig) {
5
-
const ws = new WebSocket(config.jetstream_url);
4
+
interface FirehoseOptions {
5
+
config: AppConfig;
6
+
onEvent: OnEventCallback;
7
+
}
8
+
9
+
export function startFirehose({ config, onEvent }: FirehoseOptions) {
10
+
let lastCursor: number | null = null;
6
11
7
-
ws.onopen = () => {
8
-
console.log("Connected to Jetstream");
9
-
};
12
+
const connect = () => {
13
+
const url = new URL(config.jetstream_url);
14
+
if (lastCursor !== null) {
15
+
url.searchParams.set("cursor", lastCursor.toString());
16
+
}
10
17
11
-
ws.onmessage = async (msg) => {
12
-
const data = msg.data instanceof Blob ? await msg.data.text() : msg.data;
13
-
const evt = JSON.parse(data);
18
+
const ws = new WebSocket(url.toString());
19
+
20
+
ws.onopen = () => {
21
+
console.log("Connected to Jetstream", lastCursor !== null ? `(resuming from cursor ${lastCursor})` : "");
22
+
};
23
+
24
+
ws.onmessage = async (msg) => {
25
+
const data = msg.data instanceof Blob ? await msg.data.text() : msg.data;
26
+
const evt = JSON.parse(data);
27
+
28
+
if (!evt?.commit) return;
29
+
30
+
lastCursor = evt.time_us;
31
+
32
+
const opType = evt.commit.operation;
33
+
const cid = evt.commit.cid;
34
+
const collection = evt.commit.collection;
35
+
const rkey = evt.commit.rkey;
36
+
const aturi = `at://${evt.did}/${collection}/${rkey}`;
37
+
const record = evt.commit.record;
38
+
const indexedAt = new Date(evt.time_us / 1000).toISOString();
14
39
15
-
if (!evt?.commit) return;
40
+
if (!config.record_types.includes(collection)) return;
16
41
17
-
const opType = evt.commit.operation;
18
-
const cid = evt.commit.cid;
19
-
const collection = evt.commit.collection;
20
-
const rkey = evt.commit.rkey;
21
-
const aturi = `at://${evt.did}/${collection}/${rkey}`;
22
-
const record = evt.commit.record;
23
-
const indexedAt = new Date(evt.time_us / 1000).toISOString();
42
+
if (opType === "create" || opType === "update") {
43
+
await indexDocument(config, onEvent, aturi, record, cid, indexedAt);
44
+
} else if (opType === "delete") {
45
+
await deleteDocument(config, onEvent, aturi);
46
+
}
47
+
};
24
48
25
-
if (!config.record_types.includes(collection)) return;
49
+
ws.onerror = (e) => {
50
+
console.error("Jetstream WebSocket error:", e);
51
+
};
26
52
27
-
if (opType === "create" || opType === "update") {
28
-
await indexDocument(config, aturi, record, cid, indexedAt);
29
-
} else if (opType === "delete") {
30
-
await deleteDocument(config, aturi);
31
-
}
53
+
ws.onclose = (e) => {
54
+
console.warn("Jetstream disconnected. Attempting to reconnect in 1 second...", e.reason);
55
+
setTimeout(connect, 1000);
56
+
};
32
57
};
33
58
34
-
ws.onerror = (e) => {
35
-
console.error("Jetstream error:", e);
36
-
};
59
+
connect();
37
60
}
+54
-6
src/indexer.ts
+54
-6
src/indexer.ts
···
9
9
};
10
10
}
11
11
12
+
function flattenValues(input: unknown): unknown[] {
13
+
const result: unknown[] = [];
14
+
15
+
function recurse(value: unknown) {
16
+
if (value === null || typeof value !== "object") {
17
+
result.push(value);
18
+
} else if (Array.isArray(value)) {
19
+
for (const item of value) recurse(item);
20
+
} else if (value) {
21
+
for (const key in value) {
22
+
recurse((value as Record<string, unknown>)[key]);
23
+
}
24
+
}
25
+
}
26
+
27
+
recurse(input);
28
+
return result;
29
+
}
30
+
12
31
export async function ensureIndexMapping(config: AppConfig) {
13
32
const properties: Record<string, any> = {};
14
33
···
46
65
): Record<string, unknown> {
47
66
const result: Record<string, unknown> = {};
48
67
49
-
for (const [jsonPath, { id }] of Object.entries(indexSpec)) {
50
-
const pathSegments = jsonPath.split(".");
68
+
for (const [rawPath, { id }] of Object.entries(indexSpec)) {
69
+
const [path, modifier] = rawPath.split("#");
70
+
const pathSegments = path.split(".");
71
+
51
72
let value = record;
52
73
for (const segment of pathSegments) {
53
74
if (typeof value !== "object" || value === null) {
···
57
78
value = value[segment];
58
79
}
59
80
60
-
if (value !== undefined) {
81
+
if (value === undefined) continue;
82
+
83
+
if (modifier === "flatten") {
84
+
const flattened = flattenValues(value);
85
+
if (flattened.length > 0) {
86
+
result[id] = flattened;
87
+
}
88
+
} else {
61
89
result[id] = value;
62
90
}
63
91
}
···
65
93
return result;
66
94
}
67
95
96
+
export type IndexerEvent =
97
+
| { type: 'index'; uri: string; data: Record<string, unknown> }
98
+
| { type: 'delete'; uri: string };
99
+
100
+
export type OnEventCallback = (event: IndexerEvent) => Promise<void> | void;
101
+
68
102
export async function indexDocument(
69
103
config: AppConfig,
104
+
onEvent: OnEventCallback,
70
105
uri: string,
71
-
record: any,
106
+
record: Record<string, unknown>,
72
107
cid: string,
73
108
indexedAt: string
74
109
) {
···
76
111
const recordType = collection;
77
112
78
113
const indexSpec = config.index_fields?.[recordType];
79
-
if (!indexSpec) return; // skip if record type not configured
114
+
//if (!indexSpec) return;
80
115
81
116
const indexedFields = extractIndexedFields(record, indexSpec);
82
117
···
103
138
body: JSON.stringify(body),
104
139
}
105
140
);
141
+
await onEvent({
142
+
type: 'index',
143
+
uri,
144
+
data: body
145
+
});
106
146
107
147
if (!res.ok) {
108
148
console.error("Indexing failed:", await res.text());
109
149
}
110
150
}
111
151
112
-
export async function deleteDocument(config: AppConfig, uri: string) {
152
+
export async function deleteDocument(
153
+
config: AppConfig,
154
+
onEvent: OnEventCallback,
155
+
uri: string
156
+
) {
113
157
const res = await fetch(
114
158
`${config.es_url}/${config.index_name}/_doc/${encodeURIComponent(uri)}`,
115
159
{
116
160
method: "DELETE",
117
161
}
118
162
);
163
+
await onEvent({
164
+
type: 'delete',
165
+
uri
166
+
});
119
167
120
168
if (!res.ok && res.status !== 404) {
121
169
console.error("Delete failed:", await res.text());
+210
src/live-utils.ts
+210
src/live-utils.ts
···
1
+
type QueryClause = {
2
+
match?: Record<string, unknown>;
3
+
term?: Record<string, unknown>;
4
+
bool?: {
5
+
must?: QueryClause[];
6
+
must_not?: QueryClause[];
7
+
should?: QueryClause[];
8
+
filter?: QueryClause[];
9
+
};
10
+
range?: Record<string, Record<string, unknown>>;
11
+
nested?: {
12
+
path: string;
13
+
query: QueryClause;
14
+
};
15
+
exists?: {
16
+
field: string;
17
+
};
18
+
};
19
+
20
+
// function getField(obj: Record<string, any>, path: string): any {
21
+
// return path.split('.').reduce((acc, part) => acc && acc[part], obj);
22
+
// }
23
+
24
+
export function getSafeField(obj: Record<string, any>, path: string): any {
25
+
if (obj && typeof obj === 'object' && Object.prototype.hasOwnProperty.call(obj, path)) {
26
+
return obj[path];
27
+
}
28
+
return path.split('.').reduce((acc, part) => acc?.[part], obj);
29
+
}
30
+
31
+
export function matchDocAgainstQuery(doc: Record<string, unknown>, query: QueryClause): boolean {
32
+
console.log(JSON.stringify(doc, null, 2));
33
+
if (!query || Object.keys(query).length === 0) {
34
+
console.log("✅ Accepted: No query provided, accepting all docs.");
35
+
return true;
36
+
}
37
+
38
+
for (const key in query) {
39
+
const clauseValue = (query as any)[key];
40
+
41
+
switch (key) {
42
+
case 'term': {
43
+
const [field, value] = Object.entries(clauseValue as object)[0];
44
+
const docValue = getSafeField(doc, field);
45
+
if (docValue !== value) {
46
+
console.log(`❌ Rejected: term mismatch on field '${field}', expected '${value}', got '${docValue}'`);
47
+
return false;
48
+
}
49
+
console.log(`✅ Passed: term match on field '${field}' with value '${value}'`);
50
+
break;
51
+
}
52
+
53
+
case 'match': {
54
+
const [field, value] = Object.entries(clauseValue as object)[0];
55
+
const fieldValue = getSafeField(doc, field);
56
+
if (typeof fieldValue !== 'string') {
57
+
console.log(`❌ Rejected: match expected string field '${field}', got non-string:`, fieldValue);
58
+
return false;
59
+
}
60
+
if (!fieldValue.includes(String(value))) {
61
+
console.log(`❌ Rejected: match failed on field '${field}', expected to include '${value}', got '${fieldValue}'`);
62
+
return false;
63
+
}
64
+
console.log(`✅ Passed: match succeeded on field '${field}' includes '${value}'`);
65
+
break;
66
+
}
67
+
68
+
case 'bool': {
69
+
const boolQuery = clauseValue as QueryClause['bool'];
70
+
71
+
if (boolQuery?.must?.some(q => !matchDocAgainstQuery(doc, q))) {
72
+
console.log("❌ Rejected: bool.must clause failed.");
73
+
return false;
74
+
}
75
+
if (boolQuery?.filter?.some(q => !matchDocAgainstQuery(doc, q))) {
76
+
console.log("❌ Rejected: bool.filter clause failed.");
77
+
return false;
78
+
}
79
+
if (boolQuery?.must_not?.some(q => matchDocAgainstQuery(doc, q))) {
80
+
console.log("❌ Rejected: bool.must_not clause matched (should not have).");
81
+
return false;
82
+
}
83
+
if (boolQuery?.should?.length && !boolQuery.should.some(q => matchDocAgainstQuery(doc, q))) {
84
+
console.log("❌ Rejected: bool.should clause did not match any.");
85
+
return false;
86
+
}
87
+
console.log("✅ Passed: bool clause matched.");
88
+
break;
89
+
}
90
+
91
+
case 'range': {
92
+
const [field, range] = Object.entries(clauseValue as object)[0];
93
+
const value = getSafeField(doc, field);
94
+
if (value === undefined) {
95
+
console.log(`❌ Rejected: range field '${field}' is undefined in doc.`);
96
+
return false;
97
+
}
98
+
99
+
for (const [op, opValue] of Object.entries(range as object)) {
100
+
const numValue = Number(value);
101
+
const numOpValue = Number(opValue);
102
+
const isNumeric = !isNaN(numValue) && !isNaN(numOpValue);
103
+
let passed = true;
104
+
105
+
switch (op) {
106
+
case 'gt':
107
+
passed = isNumeric ? numValue > numOpValue : String(value) > String(opValue);
108
+
break;
109
+
case 'gte':
110
+
passed = isNumeric ? numValue >= numOpValue : String(value) >= String(opValue);
111
+
break;
112
+
case 'lt':
113
+
passed = isNumeric ? numValue < numOpValue : String(value) < String(opValue);
114
+
break;
115
+
case 'lte':
116
+
passed = isNumeric ? numValue <= numOpValue : String(value) <= String(opValue);
117
+
break;
118
+
}
119
+
120
+
if (!passed) {
121
+
console.log(`❌ Rejected: range.${op} failed on field '${field}', value '${value}' vs '${opValue}'`);
122
+
return false;
123
+
}
124
+
}
125
+
126
+
console.log(`✅ Passed: range match on field '${field}'`);
127
+
break;
128
+
}
129
+
130
+
case 'exists': {
131
+
const field = (clauseValue as { field: string }).field;
132
+
if (getSafeField(doc, field) === undefined) {
133
+
console.log(`❌ Rejected: exists failed, field '${field}' not found`);
134
+
return false;
135
+
}
136
+
console.log(`✅ Passed: exists matched, field '${field}' exists`);
137
+
break;
138
+
}
139
+
140
+
default:
141
+
console.log(`ℹ️ Ignored unknown query clause: '${key}'`);
142
+
break;
143
+
}
144
+
}
145
+
146
+
console.log("✅ Accepted: Document matched all clauses.");
147
+
return true;
148
+
}
149
+
150
+
const ALLOWED_QUERY_CLAUSES = new Set([
151
+
'term',
152
+
'terms',
153
+
'bool',
154
+
'range',
155
+
'exists',
156
+
]);
157
+
158
+
const FORBIDDEN_QUERY_CLAUSES = new Set([
159
+
'match',
160
+
'multi_match',
161
+
'match_phrase',
162
+
'query_string',
163
+
'simple_query_string',
164
+
'fuzzy',
165
+
'script',
166
+
'function_score',
167
+
'more_like_this',
168
+
'percolate',
169
+
]);
170
+
171
+
function validateClause(clause: Record<string, unknown>) {
172
+
for (const key in clause) {
173
+
if (FORBIDDEN_QUERY_CLAUSES.has(key)) {
174
+
throw new Error(`Query clause '${key}' is not allowed for live sync due to non-deterministic behavior. Please use deterministic clauses like 'term' or 'range'.`);
175
+
}
176
+
177
+
if (!ALLOWED_QUERY_CLAUSES.has(key)) {
178
+
throw new Error(`Query clause '${key}' is not supported for live sync.`);
179
+
}
180
+
181
+
const value = clause[key];
182
+
if (key === 'bool' && typeof value === 'object' && value !== null) {
183
+
const boolClauses = value as Record<string, unknown[]>;
184
+
for (const boolType of ['must', 'filter', 'should', 'must_not']) {
185
+
if (Array.isArray(boolClauses[boolType])) {
186
+
for (const subClause of boolClauses[boolType]) {
187
+
if (typeof subClause === 'object' && subClause !== null) {
188
+
validateClause(subClause as Record<string, unknown>);
189
+
}
190
+
}
191
+
}
192
+
}
193
+
}
194
+
}
195
+
}
196
+
197
+
export function validateLiveQuery(esQuery: Record<string, unknown>): void {
198
+
// a query must have an explicit 'sort' clause
199
+
// which prevents non-deterministic sorting (bad)
200
+
const sortClause = esQuery.sort as any[];
201
+
if (!Array.isArray(sortClause) || sortClause.length === 0) {
202
+
throw new Error("Live queries must include an explicit 'sort' clause for deterministic ordering (e.g., sort: [{ '$metadata.indexedAt': 'desc' }]).");
203
+
}
204
+
205
+
// and also of the supported deterministic clauses (no text searches)
206
+
const queryPart = esQuery.query as Record<string, unknown>;
207
+
if (queryPart) {
208
+
validateClause(queryPart);
209
+
}
210
+
}
+22
-6
src/main.ts
+22
-6
src/main.ts
···
1
1
import { readConfig } from "./config.ts";
2
2
import { startFirehose } from "./firehose.ts";
3
-
import { ensureIndexMapping } from "./indexer.ts";
3
+
import { ensureIndexMapping, type IndexerEvent } from "./indexer.ts";
4
4
import { setupXRPCServer } from "./xrpc.ts";
5
+
import { processEventForSync } from "./sync.ts";
5
6
6
-
const config = await readConfig("./config.json");
7
+
async function main() {
8
+
const config = await readConfig("./config.json");
7
9
8
-
// prepare indexes
9
-
ensureIndexMapping(config);
10
+
// prepare indexes
11
+
await ensureIndexMapping(config);
10
12
11
-
startFirehose(config);
13
+
setupXRPCServer(config);
12
14
13
-
setupXRPCServer(config);
15
+
startFirehose({
16
+
config,
17
+
onEvent: (event: IndexerEvent) => {
18
+
// ESAV Live !!!
19
+
return processEventForSync(event);
20
+
}
21
+
});
22
+
23
+
console.log("Server started and listening for events");
24
+
}
25
+
26
+
main().catch(err => {
27
+
console.error("Fatal error in main:", err);
28
+
Deno.exit(1);
29
+
});
+185
src/sync.ts
+185
src/sync.ts
···
1
+
import { subscriptions, computeCid, type Subscription, type SubscriptionResult } from "./xrpc.ts";
2
+
import { type IndexerEvent } from "./indexer.ts";
3
+
import { getSafeField, matchDocAgainstQuery } from "./live-utils.ts";
4
+
5
+
interface ClientDelta {
6
+
documents: Record<string, { cid: string; doc: Record<string, unknown> }>;
7
+
queries: Record<string, { ecid: string; result: string[] }>;
8
+
}
9
+
10
+
function extractSortValues(doc: Record<string, any>, sortClause: any[]): unknown[] {
11
+
const values: unknown[] = [];
12
+
for (const sortField of sortClause) {
13
+
const key = Object.keys(sortField)[0];
14
+
const value = getSafeField(doc, key);
15
+
values.push(value);
16
+
}
17
+
return values;
18
+
}
19
+
20
+
function compareItems(aSorts: unknown[], bSorts: unknown[], sortClause: any[]): number {
21
+
for (let i = 0; i < sortClause.length; i++) {
22
+
const direction = Object.values(sortClause[i])[0];
23
+
const valA = aSorts[i];
24
+
const valB = bSorts[i];
25
+
26
+
if (valA === valB) continue;
27
+
28
+
if (valA === undefined || valA === null) return 1;
29
+
if (valB === undefined || valB === null) return -1;
30
+
31
+
let comparison = 0;
32
+
if (valA < valB) {
33
+
comparison = -1;
34
+
} else if (valA > valB) {
35
+
comparison = 1;
36
+
}
37
+
38
+
if (comparison !== 0) {
39
+
return direction === 'desc' ? -comparison : comparison;
40
+
}
41
+
}
42
+
return 0;
43
+
}
44
+
45
+
function findInsertIndex(sortedArray: Subscription['result'], newItem: SubscriptionResult, sortClause: any[]): number {
46
+
let low = 0;
47
+
let high = sortedArray.length;
48
+
49
+
while (low < high) {
50
+
const mid = Math.floor((low + high) / 2);
51
+
const comparison = compareItems(newItem.sortValues, sortedArray[mid].sortValues, sortClause);
52
+
if (comparison < 0) {
53
+
high = mid;
54
+
} else {
55
+
low = mid + 1;
56
+
}
57
+
}
58
+
return low;
59
+
}
60
+
61
+
export async function processEventForSync(event: IndexerEvent) {
62
+
const requiredDocuments = new Set<string>();
63
+
64
+
const changedQueries: {
65
+
queryId: string;
66
+
newEcid: string;
67
+
newRichResult: SubscriptionResult[];
68
+
newResultUris: string[];
69
+
sub: Subscription;
70
+
}[] = [];
71
+
72
+
for (const [queryId, sub] of subscriptions.entries()) {
73
+
let newRichResult: SubscriptionResult[] | undefined = undefined;
74
+
const sortClause = (sub.esQuery as any).sort;
75
+
76
+
if (event.type === 'index') {
77
+
const queryClause = (sub.esQuery as any).query ?? {};
78
+
const doesMatch = matchDocAgainstQuery(event.data, queryClause);
79
+
const currentIndex = sub.result.findIndex(item => item.uri === event.uri);
80
+
const wasInResult = currentIndex !== -1;
81
+
82
+
if (doesMatch && !wasInResult) {
83
+
const newItem = {
84
+
uri: event.uri,
85
+
sortValues: extractSortValues(event.data, sortClause),
86
+
};
87
+
const insertIndex = findInsertIndex(sub.result, newItem, sortClause);
88
+
89
+
newRichResult = [...sub.result];
90
+
newRichResult.splice(insertIndex, 0, newItem);
91
+
requiredDocuments.add(event.uri);
92
+
93
+
} else if (!doesMatch && wasInResult) {
94
+
newRichResult = sub.result.filter(item => item.uri !== event.uri);
95
+
96
+
} else if (doesMatch && wasInResult) {
97
+
const newItem = {
98
+
uri: event.uri,
99
+
sortValues: extractSortValues(event.data, sortClause),
100
+
};
101
+
if (compareItems(newItem.sortValues, sub.result[currentIndex].sortValues, sortClause) !== 0) {
102
+
const tempResult = sub.result.filter(item => item.uri !== event.uri);
103
+
const insertIndex = findInsertIndex(tempResult, newItem, sortClause);
104
+
105
+
newRichResult = [...tempResult];
106
+
newRichResult.splice(insertIndex, 0, newItem);
107
+
}
108
+
requiredDocuments.add(event.uri);
109
+
}
110
+
} else if (event.type === 'delete') {
111
+
if (sub.result.some(item => item.uri === event.uri)) {
112
+
newRichResult = sub.result.filter(item => item.uri !== event.uri);
113
+
}
114
+
}
115
+
116
+
if (newRichResult) {
117
+
const newResultUris = newRichResult.map(item => item.uri);
118
+
const newEcid = await computeCid(newResultUris);
119
+
120
+
if (newEcid !== sub.ecid) {
121
+
changedQueries.push({
122
+
queryId,
123
+
newEcid,
124
+
newRichResult,
125
+
newResultUris,
126
+
sub
127
+
});
128
+
}
129
+
}
130
+
}
131
+
132
+
for (const { queryId, newEcid, newRichResult } of changedQueries) {
133
+
const sub = subscriptions.get(queryId);
134
+
if (sub) {
135
+
sub.ecid = newEcid;
136
+
sub.result = newRichResult;
137
+
}
138
+
}
139
+
140
+
const documentsPayload: Record<string, { cid: string; doc: Record<string, unknown> }> = {};
141
+
if (event.type === 'index') {
142
+
for (const uri of requiredDocuments) {
143
+
documentsPayload[uri] = {
144
+
cid: event.data["$metadata.cid"] as string,
145
+
doc: event.data as Record<string, unknown>,
146
+
};
147
+
}
148
+
}
149
+
150
+
if (changedQueries.length > 0 || Object.keys(documentsPayload).length > 0) {
151
+
const affectedClients = new Set<WebSocket>();
152
+
changedQueries.forEach(cq => cq.sub.clients.forEach(c => affectedClients.add(c)));
153
+
154
+
if (Object.keys(documentsPayload).length > 0 && event.type === 'index') {
155
+
for (const [, sub] of subscriptions.entries()) {
156
+
if (sub.result.some(item => item.uri === event.uri)) {
157
+
sub.clients.forEach(c => affectedClients.add(c));
158
+
}
159
+
}
160
+
}
161
+
162
+
for (const client of affectedClients) {
163
+
const delta: ClientDelta = {
164
+
documents: documentsPayload,
165
+
queries: {},
166
+
};
167
+
168
+
for (const { queryId, newEcid, newResultUris, sub } of changedQueries) {
169
+
if (sub.clients.has(client)) {
170
+
delta.queries[queryId] = { ecid: newEcid, result: newResultUris };
171
+
}
172
+
}
173
+
174
+
if (Object.keys(delta.queries).length > 0 || Object.keys(documentsPayload).length > 0) {
175
+
if (client.readyState === WebSocket.OPEN) {
176
+
try {
177
+
client.send(JSON.stringify({ type: 'query-delta', ...delta }));
178
+
} catch (err) {
179
+
console.error("Failed to send delta to client:", err);
180
+
}
181
+
}
182
+
}
183
+
}
184
+
}
185
+
}
+506
-54
src/xrpc.ts
+506
-54
src/xrpc.ts
···
1
-
import { AppConfig } from "../src/config.ts";
1
+
import { AppConfig } from "./config.ts";
2
2
import { serve } from "https://deno.land/std/http/server.ts";
3
+
import { CID } from "npm:multiformats/cid";
4
+
import { sha256 } from "npm:multiformats/hashes/sha2";
5
+
import { DidResolver, HandleResolver } from "npm:@atproto/identity";
6
+
import * as dagCbor from "npm:@ipld/dag-cbor";
7
+
import { validateLiveQuery } from "./live-utils.ts";
8
+
9
+
const ZOMBIE_LIFETIME_MS = 5 * 60 * 1000;
10
+
11
+
declare global {
12
+
interface WebSocket {
13
+
isAlive: boolean;
14
+
ping(): void;
15
+
terminate?(): void;
16
+
}
17
+
}
18
+
19
+
export interface SubscriptionResult {
20
+
uri: string;
21
+
sortValues: unknown[];
22
+
}
23
+
24
+
export interface Subscription {
25
+
esQuery: Record<string, unknown>;
26
+
clients: Set<WebSocket>;
27
+
ecid: string;
28
+
result: SubscriptionResult[];
29
+
zombieTimer?: number;
30
+
}
31
+
32
+
export const subscriptions = new Map<string, Subscription>();
33
+
34
+
export async function computeCid(data: unknown): Promise<string> {
35
+
const encodedBytes = dagCbor.encode(data);
36
+
const hash = await sha256.digest(encodedBytes);
37
+
return CID.create(1, dagCbor.code, hash).toString();
38
+
}
39
+
40
+
function corsHeaders(): HeadersInit {
41
+
return {
42
+
"Access-Control-Allow-Origin": "*",
43
+
"Access-Control-Allow-Methods": "GET,POST,OPTIONS",
44
+
"Access-Control-Allow-Headers": "Content-Type",
45
+
};
46
+
}
3
47
4
48
export function setupXRPCServer(config: AppConfig) {
5
-
serve(async (req) => {
6
-
const url = new URL(req.url);
49
+
serve(
50
+
async (req) => {
51
+
const { pathname, searchParams } = new URL(req.url);
7
52
8
-
if (req.method === "OPTIONS") {
9
-
return new Response(null, {
10
-
status: 204,
11
-
headers: corsHeaders(),
12
-
});
13
-
}
53
+
if (req.method === "OPTIONS") {
54
+
return new Response(null, { status: 204, headers: corsHeaders() });
55
+
}
14
56
15
-
if (url.pathname === "/xrpc/com.example.prototypeESQuery") {
16
-
let esQuery: any;
57
+
if (
58
+
pathname === "/xrpc/party.whey.esav.esSync" &&
59
+
req.headers.get("upgrade") === "websocket"
60
+
) {
61
+
const { socket, response } = Deno.upgradeWebSocket(req);
62
+
handleWebSocket(socket, config);
63
+
return response;
64
+
}
17
65
18
-
if (req.method === "POST") {
66
+
if (pathname === "/xrpc/party.whey.esav.resolveIdentity") {
19
67
try {
20
-
esQuery = await req.json();
21
-
} catch {
22
-
return new Response("Invalid JSON body", {
23
-
status: 400,
68
+
const handle = searchParams.get("handle");
69
+
const did = searchParams.get("did");
70
+
const clientCid = searchParams.get("cid");
71
+
const includePfp = searchParams.get("includePfp");
72
+
73
+
if (!handle && !did) {
74
+
return new Response("handle or did parameter is required", {
75
+
status: 400,
76
+
headers: corsHeaders(),
77
+
});
78
+
}
79
+
80
+
const identity = await resolveWithHandleOrDid({
81
+
handle: handle ?? undefined,
82
+
did: did ?? undefined,
83
+
});
84
+
85
+
const finalResponse: Record<string, unknown> = {
86
+
...identity,
87
+
};
88
+
89
+
if (includePfp) {
90
+
finalResponse.pfp = await getPfpUrl(identity.pdsUrl, identity.did);
91
+
}
92
+
93
+
const newCid = await computeCid(finalResponse);
94
+
95
+
if (clientCid && clientCid === newCid) {
96
+
return new Response(null, { status: 304, headers: corsHeaders() });
97
+
}
98
+
99
+
return new Response(JSON.stringify(finalResponse), {
100
+
status: 200,
101
+
headers: { ...corsHeaders(), "Content-Type": "application/json" },
102
+
});
103
+
} catch (error) {
104
+
const errorMessage =
105
+
error instanceof Error
106
+
? error.message
107
+
: "An unknown error occurred";
108
+
return new Response(errorMessage, {
109
+
status: 500,
24
110
headers: corsHeaders(),
25
111
});
26
112
}
27
-
} else if (req.method === "GET") {
28
-
const queryParam = url.searchParams.get("q");
29
-
if (!queryParam) {
30
-
return new Response("Missing 'q'", {
31
-
status: 400,
113
+
}
114
+
115
+
if (
116
+
pathname === "/xrpc/com.example.prototypeESQuery" ||
117
+
pathname === "/xrpc/party.whey.esav.esQuery"
118
+
) {
119
+
let esQuery: Record<string, unknown>;
120
+
const clientCid: string | null = searchParams.get("cid");
121
+
122
+
if (req.method === "POST") {
123
+
try {
124
+
esQuery = await req.json();
125
+
} catch {
126
+
return new Response("Invalid JSON body", {
127
+
status: 400,
128
+
headers: corsHeaders(),
129
+
});
130
+
}
131
+
} else if (req.method === "GET") {
132
+
const queryParam = searchParams.get("q");
133
+
if (!queryParam)
134
+
return new Response("Missing 'q'", {
135
+
status: 400,
136
+
headers: corsHeaders(),
137
+
});
138
+
139
+
try {
140
+
esQuery = JSON.parse(queryParam);
141
+
} catch {
142
+
return new Response("Invalid JSON in 'q'", {
143
+
status: 400,
144
+
headers: corsHeaders(),
145
+
});
146
+
}
147
+
} else {
148
+
return new Response("Method not allowed", {
149
+
status: 405,
32
150
headers: corsHeaders(),
33
151
});
34
152
}
35
153
36
154
try {
37
-
esQuery = JSON.parse(queryParam);
38
-
} catch {
39
-
return new Response("Invalid JSON in 'q'", {
40
-
status: 400,
155
+
const esRes = await fetch(
156
+
`${config.es_url}/${config.index_name}/_search`,
157
+
{
158
+
method: "POST",
159
+
headers: { "Content-Type": "application/json" },
160
+
body: JSON.stringify(esQuery),
161
+
}
162
+
);
163
+
164
+
if (!esRes.ok) {
165
+
const errText = await esRes.text();
166
+
return new Response(`Elasticsearch error: ${errText}`, {
167
+
status: 500,
168
+
headers: corsHeaders(),
169
+
});
170
+
}
171
+
172
+
const result = await esRes.json();
173
+
const newCid = await computeCid(result);
174
+
175
+
if (clientCid && clientCid === newCid) {
176
+
return new Response(null, { status: 304, headers: corsHeaders() });
177
+
}
178
+
179
+
return new Response(JSON.stringify(result), {
180
+
status: 200,
181
+
headers: { ...corsHeaders(), "Content-Type": "application/json" },
182
+
});
183
+
} catch (error) {
184
+
const errorMessage =
185
+
error instanceof Error
186
+
? error.message
187
+
: "An unknown error occurred";
188
+
return new Response(errorMessage, {
189
+
status: 500,
41
190
headers: corsHeaders(),
42
191
});
43
192
}
44
-
} else {
45
-
return new Response("Method not allowed", {
46
-
status: 405,
47
-
headers: corsHeaders(),
48
-
});
49
193
}
50
194
51
-
const esRes = await fetch(`${config.es_url}/${config.index_name}/_search`, {
195
+
return new Response("Not found", { status: 404, headers: corsHeaders() });
196
+
},
197
+
{ port: Number(config.serve_port) }
198
+
);
199
+
console.log(`XRPC server running on http://localhost:${config.serve_port}`);
200
+
}
201
+
202
+
function cleanupClient(ws: WebSocket) {
203
+
console.log("WebSocket client disconnected. Cleaning up subscriptions.");
204
+
for (const [queryId, sub] of subscriptions.entries()) {
205
+
if (sub.clients.has(ws)) {
206
+
handleUnsubscribe(ws, queryId);
207
+
// sub.clients.delete(ws);
208
+
// if (sub.clients.size === 0) {
209
+
// subscriptions.delete(queryId);
210
+
// console.log(`Cleaned up empty subscription for queryId: ${queryId}`);
211
+
// }
212
+
}
213
+
}
214
+
}
215
+
216
+
function handleWebSocket(ws: WebSocket, config: AppConfig) {
217
+
console.log("WebSocket client connected.");
218
+
ws.isAlive = true;
219
+
220
+
const pingInterval = setInterval(() => {
221
+
if (!ws.isAlive) {
222
+
console.log('Terminating dead WebSocket connection');
223
+
return ws.close();
224
+
}
225
+
ws.isAlive = false;
226
+
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'ping' }));
227
+
}, 30000);
228
+
229
+
const onMessage = async (event: MessageEvent) => {
230
+
if (typeof event.data !== 'string') return;
231
+
232
+
let msg;
233
+
try {
234
+
msg = JSON.parse(event.data);
235
+
} catch (_err) {
236
+
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
237
+
return;
238
+
}
239
+
240
+
switch (msg.type) {
241
+
case 'pong':
242
+
ws.isAlive = true;
243
+
break;
244
+
case 'subscribe':
245
+
if (msg.queryId && msg.esquery) {
246
+
await handleSubscribe(ws, msg.queryId, msg.esquery, config, msg.ecid);
247
+
} else {
248
+
ws.send(JSON.stringify({ type: 'error', error: 'Invalid subscribe message', request: msg }));
249
+
}
250
+
break;
251
+
case 'unsubscribe':
252
+
if (msg.queryId) {
253
+
handleUnsubscribe(ws, msg.queryId);
254
+
} else {
255
+
ws.send(JSON.stringify({ type: 'error', error: 'Invalid unsubscribe message', request: msg }));
256
+
}
257
+
break;
258
+
default:
259
+
ws.send(JSON.stringify({ type: 'error', error: 'Unknown message type', request: msg }));
260
+
break;
261
+
}
262
+
};
263
+
264
+
const onClose = () => { clearInterval(pingInterval); cleanupClient(ws); };
265
+
const onError = (e: Event) => { console.error('WebSocket error:', e); onClose(); };
266
+
267
+
ws.addEventListener('message', onMessage);
268
+
ws.addEventListener('close', onClose);
269
+
ws.addEventListener('error', onError);
270
+
}
271
+
272
+
async function handleSubscribe(
273
+
ws: WebSocket,
274
+
queryId: string,
275
+
esQuery: Record<string, unknown>,
276
+
config: AppConfig,
277
+
clientEcid?: string
278
+
) {
279
+
try {
280
+
validateLiveQuery(esQuery);
281
+
} catch (err) {
282
+
console.error(`Invalid live query for queryId ${queryId}: ${err}`);
283
+
if (ws.readyState === WebSocket.OPEN) {
284
+
ws.send(JSON.stringify({
285
+
type: "error",
286
+
queryId,
287
+
error: `Invalid query for live subscription: ${err}`
288
+
}));
289
+
}
290
+
return;
291
+
}
292
+
293
+
let existingSub = subscriptions.get(queryId);
294
+
295
+
// a subscription (active or zombie) already exists
296
+
if (existingSub) {
297
+
// if the new query is different, the old state is invalid.
298
+
if (JSON.stringify(esQuery) !== JSON.stringify(existingSub.esQuery)) {
299
+
console.log(`[Sync] Query for ${queryId} has changed. Purging old state and re-running.`);
300
+
if (existingSub.zombieTimer) clearTimeout(existingSub.zombieTimer);
301
+
subscriptions.delete(queryId);
302
+
existingSub = undefined;
303
+
} else {
304
+
// if the query is the same. lets revive it if its a zombie
305
+
if (existingSub.zombieTimer) {
306
+
console.log(`[Zombie] Reviving zombie subscription: ${queryId}`);
307
+
clearTimeout(existingSub.zombieTimer);
308
+
delete existingSub.zombieTimer;
309
+
}
310
+
311
+
existingSub.clients.add(ws);
312
+
313
+
// only if the ecid is still salvageable
314
+
if (clientEcid && existingSub.ecid === clientEcid) {
315
+
console.log(`[Sync] Client for ${queryId} is already up-to-date. No-op.`);
316
+
return;
317
+
}
318
+
319
+
console.log(`[Sync] Client for ${queryId} is out of sync. Re-running ES query.`);
320
+
}
321
+
}
322
+
323
+
324
+
325
+
// if (clientEcid && existingSub && existingSub.ecid === clientEcid) {
326
+
// console.log(`Client re-subscribed to ${queryId} with matching ECID.`);
327
+
// existingSub.clients.add(ws);
328
+
// return;
329
+
// }
330
+
331
+
console.log(`Performing full sync for queryId: ${queryId}`);
332
+
try {
333
+
const esRes = await fetch(
334
+
`${config.es_url}/${config.index_name}/_search`,
335
+
{
52
336
method: "POST",
53
337
headers: { "Content-Type": "application/json" },
54
338
body: JSON.stringify(esQuery),
339
+
}
340
+
);
341
+
342
+
if (!esRes.ok) throw new Error(`Elasticsearch error: ${await esRes.text()}`);
343
+
344
+
const esResult = await esRes.json();
345
+
const hits = esResult.hits?.hits ?? [];
346
+
347
+
const documents: Record<string, { cid: string, doc: Record<string, unknown> }> = {};
348
+
const newResultState: SubscriptionResult[] = [];
349
+
350
+
for (const hit of hits) {
351
+
const uri = hit._id as string;
352
+
const source = hit._source;
353
+
354
+
newResultState.push({
355
+
uri: uri,
356
+
sortValues: hit.sort || [],
55
357
});
56
358
57
-
if (!esRes.ok) {
58
-
const errText = await esRes.text();
59
-
return new Response(`Elasticsearch error: ${errText}`, {
60
-
status: 500,
61
-
headers: corsHeaders(),
62
-
});
63
-
}
359
+
documents[uri] = {
360
+
cid: source['$metadata.cid'],
361
+
doc: source,
362
+
};
363
+
}
364
+
365
+
const resultUris = newResultState.map(item => item.uri);
366
+
const newEcid = await computeCid(resultUris);
64
367
65
-
const result = await esRes.json();
66
-
return new Response(JSON.stringify(result), {
67
-
status: 200,
68
-
headers: {
69
-
...corsHeaders(),
70
-
"Content-Type": "application/json",
71
-
},
368
+
if (existingSub) {
369
+
existingSub.clients.add(ws);
370
+
existingSub.result = newResultState;
371
+
existingSub.ecid = newEcid;
372
+
existingSub.esQuery = esQuery;
373
+
} else {
374
+
subscriptions.set(queryId, {
375
+
esQuery,
376
+
clients: new Set([ws]),
377
+
ecid: newEcid,
378
+
result: newResultState,
72
379
});
73
380
}
74
381
75
-
return new Response("Not found", { status: 404, headers: corsHeaders() });
76
-
}, { port: Number(config.serve_port) });
382
+
ws.send(JSON.stringify({
383
+
type: "query-delta",
384
+
documents,
385
+
queries: {
386
+
[queryId]: {
387
+
ecid: newEcid,
388
+
result: resultUris,
389
+
}
390
+
}
391
+
}));
392
+
393
+
} catch (err) {
394
+
console.error(`Error subscribing client to query ${queryId}:`, err);
395
+
if (ws.readyState === WebSocket.OPEN) {
396
+
ws.send(JSON.stringify({ type: "error", queryId, error: `Subscription failed: ${err instanceof Error ? err.message : 'Unknown error'}` }));
397
+
}
398
+
}
77
399
}
78
400
79
-
function corsHeaders(): HeadersInit {
80
-
return {
81
-
"Access-Control-Allow-Origin": "*",
82
-
"Access-Control-Allow-Methods": "GET,POST,OPTIONS",
83
-
"Access-Control-Allow-Headers": "Content-Type",
84
-
};
85
-
}
401
+
function transitionToZombie(queryId: string) {
402
+
const sub = subscriptions.get(queryId);
403
+
if (!sub || sub.clients.size > 0) {
404
+
return;
405
+
}
406
+
407
+
if (sub.zombieTimer) {
408
+
clearTimeout(sub.zombieTimer);
409
+
}
410
+
411
+
console.log(`[Zombie] Subscription ${queryId} has no clients. Entering zombie state for 5 minutes.`);
412
+
413
+
sub.zombieTimer = setTimeout(() => {
414
+
subscriptions.delete(queryId);
415
+
console.log(`[Zombie] Purged zombie subscription: ${queryId}`);
416
+
}, ZOMBIE_LIFETIME_MS);
417
+
}
418
+
419
+
function handleUnsubscribe(ws: WebSocket, queryId: string) {
420
+
const sub = subscriptions.get(queryId);
421
+
if (!sub) return;
422
+
sub.clients.delete(ws);
423
+
console.log(`[Sync] Client unsubscribed from ${queryId}. Remaining clients: ${sub.clients.size}`);
424
+
425
+
if (sub.clients.size === 0) {
426
+
transitionToZombie(queryId);
427
+
}
428
+
// if (sub.clients.size === 0) {
429
+
// subscriptions.delete(queryId);
430
+
// console.log(`Subscription removed for queryId: ${queryId} (last client disconnected)`);
431
+
// } else {
432
+
// console.log(`Client unsubscribed from ${queryId}`);
433
+
// }
434
+
}
435
+
436
+
declare global {
437
+
interface WebSocket {
438
+
isAlive: boolean;
439
+
ping(): void;
440
+
terminate?(): void;
441
+
}
442
+
}
443
+
444
+
async function getServiceEndpointFromDid(
445
+
did: string
446
+
): Promise<{ serviceEndpoint: string; handle?: string }> {
447
+
const didres = new DidResolver({});
448
+
const doc = await didres.resolve(did) as {
449
+
service?: Array<{
450
+
type: string;
451
+
serviceEndpoint: string;
452
+
}>;
453
+
alsoKnownAs?: string[];
454
+
[key: string]: unknown;
455
+
} | null;
456
+
if (!doc) {
457
+
throw new Error(`Could not resolve DID document for: ${did}`);
458
+
}
459
+
const endpointRaw = doc.service?.find(
460
+
(s: any) => s.type === "AtprotoPersonalDataServer"
461
+
)?.serviceEndpoint;
462
+
if (!endpointRaw) {
463
+
throw new Error(`Service endpoint not found for DID: ${did}`);
464
+
}
465
+
function getServiceEndpointString(endpoint: unknown): string {
466
+
if (typeof endpoint === "string") return endpoint;
467
+
if (
468
+
typeof endpoint === "object" &&
469
+
endpoint &&
470
+
"uri" in endpoint &&
471
+
typeof endpoint.uri === "string"
472
+
) {
473
+
return endpoint.uri;
474
+
}
475
+
throw new Error("Unsupported serviceEndpoint format");
476
+
}
477
+
const serviceEndpoint = getServiceEndpointString(endpointRaw);
478
+
let handle: string | undefined;
479
+
if (Array.isArray(doc.alsoKnownAs) && doc.alsoKnownAs.length > 0) {
480
+
const aka = doc.alsoKnownAs[0];
481
+
if (typeof aka === "string" && aka.startsWith("at://")) {
482
+
handle = aka.slice("at://".length);
483
+
}
484
+
}
485
+
return { serviceEndpoint, handle };
486
+
}
487
+
488
+
async function resolveWithHandleOrDid({
489
+
handle,
490
+
did,
491
+
}: {
492
+
handle?: string;
493
+
did?: string;
494
+
}): Promise<{ did: string; pdsUrl: string; handle?: string }> {
495
+
if (did) {
496
+
const { serviceEndpoint, handle: resolvedHandle } =
497
+
await getServiceEndpointFromDid(did);
498
+
return {
499
+
did,
500
+
pdsUrl: serviceEndpoint,
501
+
handle: resolvedHandle,
502
+
};
503
+
} else if (handle) {
504
+
const hdlres = new HandleResolver();
505
+
const resolvedDid = await hdlres.resolve(handle);
506
+
if (!resolvedDid) {
507
+
throw new Error(`Could not resolve handle: ${handle}`);
508
+
}
509
+
const { serviceEndpoint } = await getServiceEndpointFromDid(resolvedDid);
510
+
return { did: resolvedDid, pdsUrl: serviceEndpoint, handle };
511
+
} else {
512
+
throw new Error("Either handle or did must be provided");
513
+
}
514
+
}
515
+
516
+
async function getPfpUrl(
517
+
pdsUrl: string,
518
+
did: string
519
+
): Promise<string | undefined> {
520
+
try {
521
+
const profileUrl = `${pdsUrl}/xrpc/com.atproto.repo.getRecord?repo=${did}&collection=app.bsky.actor.profile&rkey=self`;
522
+
const response = await fetch(profileUrl);
523
+
if (!response.ok) {
524
+
console.warn(
525
+
`Failed to fetch profile for ${did} from ${pdsUrl}: ${response.status}`
526
+
);
527
+
return undefined;
528
+
}
529
+
const data = await response.json();
530
+
const cid = data?.value?.avatar?.ref?.["$link"];
531
+
const pfpurl = `${pdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
532
+
return pfpurl;
533
+
} catch (error) {
534
+
console.error(`Error fetching PFP for ${did}:`, error);
535
+
return undefined;
536
+
}
537
+
}