+56
-2
config.json
+56
-2
config.json
···
1
1
{
2
2
"jetstream_url": "wss://jetstream2.us-west.bsky.network/subscribe",
3
3
"es_url": "http://localhost:9200",
4
-
"record_types": ["xyz.statusphere.status", "com.example.test.esav"],
4
+
"record_types": [
5
+
"xyz.statusphere.status",
6
+
"com.example.test.esav",
7
+
"party.whey.ft.topic.post",
8
+
"party.whey.ft.topic.reaction",
9
+
"party.whey.ft.topic.moderation",
10
+
"party.whey.ft.forum.definition",
11
+
"party.whey.ft.forum.layout",
12
+
"party.whey.ft.forum.request",
13
+
"party.whey.ft.forum.accept",
14
+
"party.whey.ft.forum.category",
15
+
"party.whey.ft.user.profile"
16
+
],
5
17
"index_name": "appview-poc",
6
18
"serve_port": "8370",
7
19
"index_fields": {
20
+
"party.whey.ft.topic.reaction": {
21
+
"subject": {
22
+
"id": "reactionSubject",
23
+
"type": "keyword"
24
+
},
25
+
"reactionEmoji": {
26
+
"id": "reactionEmoji",
27
+
"type": "keyword"
28
+
}
29
+
},
30
+
"party.whey.ft.topic.post": {
31
+
"text": {
32
+
"id": "text",
33
+
"type": "text"
34
+
},
35
+
"title": {
36
+
"id": "title",
37
+
"type": "text"
38
+
},
39
+
"reply.root.uri": {
40
+
"id": "root",
41
+
"type": "keyword"
42
+
},
43
+
"reply.parent.uri": {
44
+
"id": "parent",
45
+
"type": "keyword"
46
+
},
47
+
"forum": {
48
+
"id": "forum",
49
+
"type": "keyword"
50
+
}
51
+
},
52
+
"party.whey.ft.forum.definition": {
53
+
"description": {
54
+
"id": "description",
55
+
"type": "text"
56
+
},
57
+
"displayName": {
58
+
"id": "displayName",
59
+
"type": "text"
60
+
}
61
+
},
8
62
"xyz.statusphere.status": {
9
63
"status": {
10
64
"id": "status",
11
-
"type": "text"
65
+
"type": "keyword"
12
66
}
13
67
},
14
68
"com.example.test.esav": {
+143
-6
readme.md
+143
-6
readme.md
···
1
1
# ESAV: ElasticSearch AppView
2
-
terrible i know right lmao
2
+
Jetstream plugged into ElasticSearch with a queryable api
3
+
(also now supports live queries)
4
+
5
+
## Queries
6
+
### the boring one
7
+
send ES queries to `/xrpc/party.whey.esav.esQuery` either POST or GET q=
8
+
9
+
the format is just a normal elasticsearch query, any of them should work
10
+
11
+
### the cooler Live one (sync over websocket)
12
+
ESAV Live is a real-time indexing and query service ESAV.
13
+
14
+
the websocket provides a live stream of new document at:// URIs that match a specific query
15
+
16
+
send Live queries to `wss://{your domain}/xrpc/party.whey.esav.esSync`
17
+
18
+
once a connection is open, the client must send a subscription request (the live query itelf). this message is a JSON object containing the query that defines the feed you are interested in. the query format is basically a dumbed down Elasticsearch Query DSL (stripped to the few stuff thats actually implemented, and thatll produce a deterministic order / result)
19
+
20
+
for example, to subscribe to all statuses from the `xyz.statusphere.status` collection, send the JSON request bleow:
21
+
22
+
```json
23
+
{
24
+
"type": "subscribe",
25
+
"queryId": "statusphere-main-page",
26
+
"esquery": {
27
+
"query": {
28
+
"term": {
29
+
"$metadata.collection": "xyz.statusphere.status"
30
+
}
31
+
},
32
+
"sort": [
33
+
{
34
+
"$metadata.indexedAt": "desc"
35
+
}
36
+
],
37
+
"size": 100
38
+
}
39
+
}
40
+
```
41
+
42
+
43
+
and then server will send back JSON messages as new documents matching your query are indexed. the primary message type youll get is a `query-delta` which will contain an array of new document URIs for each active query, and also the set of new or modified documents to store.
44
+
45
+
example server message:
46
+
47
+
```json
48
+
{
49
+
"type": "query-delta",
50
+
"documents": {
51
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n": {
52
+
"cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy",
53
+
"doc": {
54
+
"$metadata.uri": "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n",
55
+
"$metadata.cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy",
56
+
"$metadata.indexedAt": "2025-08-07T12:40:09.426Z",
57
+
"$metadata.did": "did:plc:mn45tewwnse5btfftvd3powc",
58
+
"$metadata.collection": "xyz.statusphere.status",
59
+
"$metadata.rkey": "3lvsr2zqf3k2n",
60
+
"status": "๐",
61
+
"$raw": {
62
+
"$type": "xyz.statusphere.status",
63
+
"createdAt": "2025-08-07T12:40:08.602Z",
64
+
"status": "๐"
65
+
}
66
+
}
67
+
}
68
+
},
69
+
"queries": {
70
+
"statusphere-main-page": {
71
+
"ecid": "bafyreidsxtbtnwce2o72wrwtwhcvc7olosdtvb2i4g6ezbs4kaxn3v3qna",
72
+
"result": [
73
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n",
74
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsov4tatf2n",
75
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsouyvrbr2t"
76
+
]
77
+
}
78
+
}
79
+
}
80
+
```
81
+
82
+
in here, only one document is given because this is not the initial response after a registered live query. or alternatively, a live query was registered using a still-valid "ecid" value (the window is like 5 minutes so its really only for dropped connections).
83
+
84
+
your application should listen for these messages and prepend the new URIs to its list of statuses, creating the real-time effect
85
+
86
+
### helper functions
87
+
88
+
because we need to sometimes resolve their did from handle, or handle from did
89
+
90
+
and also because despite your usage of a custom lexicon, we still need to fetch app.bsky.actor.profile for their pfp right
91
+
92
+
fetch `https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity`
93
+
94
+
with these params:
95
+
- `did`: The DID of the user.
96
+
- `handle`: The handle of the user.
97
+
- `includeBskyProfile` (optional): `true` to include the the entire bsky profile object as well.
3
98
4
-
send ES queries to `/xrpc/com.example.prototypeESQuery` either POST or GET q=
99
+
**Example Request:**
5
100
6
-
change config in `config.json`
101
+
`https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity?did=did:plc:cjfima2v3vnyfuzieu7bvjx7&includeBskyProfile=true`
7
102
8
-
run `deno task dev` to start
103
+
gets me
9
104
10
-
you need elasticsearch configured and running first though, good luck with that
105
+
**Example Response (`200 OK`):**
11
106
12
-
## ForumTest, built with ESAV
107
+
```json
108
+
{
109
+
"did":"did:plc:cjfima2v3vnyfuzieu7bvjx7",
110
+
"pdsUrl":"https://pds-nd.whey.party",
111
+
"handle":"forumtest.whey.party",
112
+
"profile": {
113
+
"$type": "app.bsky.actor.profile",
114
+
"avatar": {
115
+
"$type": "blob",
116
+
"ref": {
117
+
"$link": "bafkreiabb6nrbpgguh5xaake3shoyxh2i7lyj7nj7j3ejk77hdlvt4lhmu"
118
+
},
119
+
"mimeType": "image/png",
120
+
"size": 35262
121
+
},
122
+
"banner": {
123
+
"$type": "blob",
124
+
"ref": {
125
+
"$link": "bafkreieupktgazj4vxuxbj6dyf4trvltgok5ukdsnvvblsqqw26cmsvn7y"
126
+
},
127
+
"mimeType": "image/jpeg",
128
+
"size": 931415
129
+
},
130
+
"createdAt": "2025-08-04T06:27:34.561Z",
131
+
"description": "ForumTest discussion and development",
132
+
"displayName": "ForumTest"
133
+
}
134
+
}
135
+
```
136
+
137
+
> **Note on Performance**: This endpoint is called frequently. It is highly recommended to implement a client-side cache (e.g., a simple JavaScript `Map` or object) to store profile data keyed by DID. This will prevent your application from making redundant network requests for the same user profile, significantly improving performance.
138
+
139
+
140
+
## stuff built with ESAV
141
+
142
+
### Statusphere ESAV Live
143
+
an example usage of ESAV Live with the "hello world!" of atproto
144
+
145
+
check it out here -> [https://statusphere.whey.party/](https://statusphere.whey.party/)
146
+
repo: [https://tangled.sh/@whey.party/statusphere-esav-live](https://tangled.sh/@whey.party/statusphere-esav-live)
147
+
148
+
### ForumTest
149
+
atproto forum thing test
13
150
14
151
check it out here -> [https://forumtest.whey.party](https://forumtest.whey.party)
15
152
repo: [https://tangled.sh/@whey.party/forumtest](https://tangled.sh/@whey.party/forumtest)
+54
-26
src/firehose.ts
+54
-26
src/firehose.ts
···
1
1
import { AppConfig } from "./config.ts";
2
-
import { indexDocument, deleteDocument } from "./indexer.ts";
2
+
import { indexDocument, deleteDocument, type OnEventCallback } from "./indexer.ts";
3
3
4
-
export async function startFirehose(config: AppConfig) {
5
-
const ws = new WebSocket(config.jetstream_url);
4
+
interface FirehoseOptions {
5
+
config: AppConfig;
6
+
onEvent: OnEventCallback;
7
+
forcedCursor?: string;
8
+
}
6
9
7
-
ws.onopen = () => {
8
-
console.log("Connected to Jetstream");
9
-
};
10
+
export function startFirehose({ config, onEvent, forcedCursor }: FirehoseOptions) {
11
+
let lastCursor: number | null = null;
10
12
11
-
ws.onmessage = async (msg) => {
12
-
const data = msg.data instanceof Blob ? await msg.data.text() : msg.data;
13
-
const evt = JSON.parse(data);
13
+
const connect = () => {
14
+
const url = new URL(config.jetstream_url);
15
+
if (lastCursor !== null) {
16
+
url.searchParams.set("cursor", lastCursor.toString());
17
+
}
18
+
if (forcedCursor) {
19
+
console.log("using forced cursor: ", forcedCursor)
20
+
url.searchParams.set("cursor", forcedCursor);
21
+
}
14
22
15
-
if (!evt?.commit) return;
23
+
const ws = new WebSocket(url.toString());
16
24
17
-
const opType = evt.commit.operation;
18
-
const cid = evt.commit.cid;
19
-
const collection = evt.commit.collection;
20
-
const rkey = evt.commit.rkey;
21
-
const aturi = `at://${evt.did}/${collection}/${rkey}`;
22
-
const record = evt.commit.record;
23
-
const indexedAt = new Date(evt.time_us / 1000).toISOString();
25
+
ws.onopen = () => {
26
+
console.log("Connected to Jetstream", lastCursor !== null ? `(resuming from cursor ${lastCursor})` : "");
27
+
};
24
28
25
-
if (!config.record_types.includes(collection)) return;
29
+
ws.onmessage = async (msg) => {
30
+
const data = msg.data instanceof Blob ? await msg.data.text() : msg.data;
31
+
const evt = JSON.parse(data);
26
32
27
-
if (opType === "create" || opType === "update") {
28
-
await indexDocument(config, aturi, record, cid, indexedAt);
29
-
} else if (opType === "delete") {
30
-
await deleteDocument(config, aturi);
31
-
}
32
-
};
33
+
if (!evt?.commit) return;
33
34
34
-
ws.onerror = (e) => {
35
-
console.error("Jetstream error:", e);
35
+
lastCursor = evt.time_us;
36
+
37
+
const opType = evt.commit.operation;
38
+
const cid = evt.commit.cid;
39
+
const collection = evt.commit.collection;
40
+
const rkey = evt.commit.rkey;
41
+
const aturi = `at://${evt.did}/${collection}/${rkey}`;
42
+
const record = evt.commit.record;
43
+
const indexedAt = new Date(evt.time_us / 1000).toISOString();
44
+
45
+
if (!config.record_types.includes(collection)) return;
46
+
47
+
if (opType === "create" || opType === "update") {
48
+
await indexDocument(config, onEvent, aturi, record, cid, indexedAt);
49
+
} else if (opType === "delete") {
50
+
await deleteDocument(config, onEvent, aturi);
51
+
}
52
+
};
53
+
54
+
ws.onerror = (e) => {
55
+
console.error("Jetstream WebSocket error:", e);
56
+
};
57
+
58
+
ws.onclose = (e) => {
59
+
console.warn("Jetstream disconnected. Attempting to reconnect in 1 second...", e.reason);
60
+
setTimeout(connect, 1000);
61
+
};
36
62
};
63
+
64
+
connect();
37
65
}
+54
-6
src/indexer.ts
+54
-6
src/indexer.ts
···
9
9
};
10
10
}
11
11
12
+
function flattenValues(input: unknown): unknown[] {
13
+
const result: unknown[] = [];
14
+
15
+
function recurse(value: unknown) {
16
+
if (value === null || typeof value !== "object") {
17
+
result.push(value);
18
+
} else if (Array.isArray(value)) {
19
+
for (const item of value) recurse(item);
20
+
} else if (value) {
21
+
for (const key in value) {
22
+
recurse((value as Record<string, unknown>)[key]);
23
+
}
24
+
}
25
+
}
26
+
27
+
recurse(input);
28
+
return result;
29
+
}
30
+
12
31
export async function ensureIndexMapping(config: AppConfig) {
13
32
const properties: Record<string, any> = {};
14
33
···
46
65
): Record<string, unknown> {
47
66
const result: Record<string, unknown> = {};
48
67
49
-
for (const [jsonPath, { id }] of Object.entries(indexSpec)) {
50
-
const pathSegments = jsonPath.split(".");
68
+
for (const [rawPath, { id }] of Object.entries(indexSpec)) {
69
+
const [path, modifier] = rawPath.split("#");
70
+
const pathSegments = path.split(".");
71
+
51
72
let value = record;
52
73
for (const segment of pathSegments) {
53
74
if (typeof value !== "object" || value === null) {
···
57
78
value = value[segment];
58
79
}
59
80
60
-
if (value !== undefined) {
81
+
if (value === undefined) continue;
82
+
83
+
if (modifier === "flatten") {
84
+
const flattened = flattenValues(value);
85
+
if (flattened.length > 0) {
86
+
result[id] = flattened;
87
+
}
88
+
} else {
61
89
result[id] = value;
62
90
}
63
91
}
···
65
93
return result;
66
94
}
67
95
96
+
export type IndexerEvent =
97
+
| { type: 'index'; uri: string; data: Record<string, unknown> }
98
+
| { type: 'delete'; uri: string };
99
+
100
+
export type OnEventCallback = (event: IndexerEvent) => Promise<void> | void;
101
+
68
102
export async function indexDocument(
69
103
config: AppConfig,
104
+
onEvent: OnEventCallback,
70
105
uri: string,
71
-
record: any,
106
+
record: Record<string, unknown>,
72
107
cid: string,
73
108
indexedAt: string
74
109
) {
···
76
111
const recordType = collection;
77
112
78
113
const indexSpec = config.index_fields?.[recordType];
79
-
if (!indexSpec) return; // skip if record type not configured
114
+
//if (!indexSpec) return;
80
115
81
116
const indexedFields = extractIndexedFields(record, indexSpec);
82
117
···
103
138
body: JSON.stringify(body),
104
139
}
105
140
);
141
+
await onEvent({
142
+
type: 'index',
143
+
uri,
144
+
data: body
145
+
});
106
146
107
147
if (!res.ok) {
108
148
console.error("Indexing failed:", await res.text());
109
149
}
110
150
}
111
151
112
-
export async function deleteDocument(config: AppConfig, uri: string) {
152
+
export async function deleteDocument(
153
+
config: AppConfig,
154
+
onEvent: OnEventCallback,
155
+
uri: string
156
+
) {
113
157
const res = await fetch(
114
158
`${config.es_url}/${config.index_name}/_doc/${encodeURIComponent(uri)}`,
115
159
{
116
160
method: "DELETE",
117
161
}
118
162
);
163
+
await onEvent({
164
+
type: 'delete',
165
+
uri
166
+
});
119
167
120
168
if (!res.ok && res.status !== 404) {
121
169
console.error("Delete failed:", await res.text());
+224
src/live-utils.ts
+224
src/live-utils.ts
···
1
+
type QueryClause = {
2
+
match?: Record<string, unknown>;
3
+
term?: Record<string, unknown>;
4
+
bool?: {
5
+
must?: QueryClause[];
6
+
must_not?: QueryClause[];
7
+
should?: QueryClause[];
8
+
filter?: QueryClause[];
9
+
};
10
+
range?: Record<string, Record<string, unknown>>;
11
+
nested?: {
12
+
path: string;
13
+
query: QueryClause;
14
+
};
15
+
exists?: {
16
+
field: string;
17
+
};
18
+
};
19
+
20
+
// function getField(obj: Record<string, any>, path: string): any {
21
+
// return path.split('.').reduce((acc, part) => acc && acc[part], obj);
22
+
// }
23
+
24
+
export function getSafeField(obj: Record<string, any>, path: string): any {
25
+
if (obj && typeof obj === 'object' && Object.prototype.hasOwnProperty.call(obj, path)) {
26
+
return obj[path];
27
+
}
28
+
return path.split('.').reduce((acc, part) => acc?.[part], obj);
29
+
}
30
+
31
+
export function matchDocAgainstQuery(doc: Record<string, unknown>, query: QueryClause): boolean {
32
+
console.log(JSON.stringify(doc, null, 2));
33
+
if (!query || Object.keys(query).length === 0) {
34
+
console.log("โ
Accepted: No query provided, accepting all docs.");
35
+
return true;
36
+
}
37
+
38
+
for (const key in query) {
39
+
const clauseValue = (query as any)[key];
40
+
41
+
switch (key) {
42
+
case 'term': {
43
+
const [field, value] = Object.entries(clauseValue as object)[0];
44
+
const docValue = getSafeField(doc, field);
45
+
if (docValue !== value) {
46
+
console.log(`โ Rejected: term mismatch on field '${field}', expected '${value}', got '${docValue}'`);
47
+
return false;
48
+
}
49
+
console.log(`โ
Passed: term match on field '${field}' with value '${value}'`);
50
+
break;
51
+
}
52
+
53
+
case 'terms': {
54
+
const [field, queryValues] = Object.entries(clauseValue as object)[0];
55
+
if (!Array.isArray(queryValues)) return false;
56
+
57
+
const docValue = getSafeField(doc, field);
58
+
if (docValue === undefined) return false;
59
+
60
+
if (Array.isArray(docValue)) {
61
+
return docValue.some(v => queryValues.includes(v));
62
+
} else {
63
+
return queryValues.includes(docValue);
64
+
}
65
+
}
66
+
67
+
case 'match': {
68
+
const [field, value] = Object.entries(clauseValue as object)[0];
69
+
const fieldValue = getSafeField(doc, field);
70
+
if (typeof fieldValue !== 'string') {
71
+
console.log(`โ Rejected: match expected string field '${field}', got non-string:`, fieldValue);
72
+
return false;
73
+
}
74
+
if (!fieldValue.includes(String(value))) {
75
+
console.log(`โ Rejected: match failed on field '${field}', expected to include '${value}', got '${fieldValue}'`);
76
+
return false;
77
+
}
78
+
console.log(`โ
Passed: match succeeded on field '${field}' includes '${value}'`);
79
+
break;
80
+
}
81
+
82
+
case 'bool': {
83
+
const boolQuery = clauseValue as QueryClause['bool'];
84
+
85
+
if (boolQuery?.must?.some(q => !matchDocAgainstQuery(doc, q))) {
86
+
console.log("โ Rejected: bool.must clause failed.");
87
+
return false;
88
+
}
89
+
if (boolQuery?.filter?.some(q => !matchDocAgainstQuery(doc, q))) {
90
+
console.log("โ Rejected: bool.filter clause failed.");
91
+
return false;
92
+
}
93
+
if (boolQuery?.must_not?.some(q => matchDocAgainstQuery(doc, q))) {
94
+
console.log("โ Rejected: bool.must_not clause matched (should not have).");
95
+
return false;
96
+
}
97
+
if (boolQuery?.should?.length && !boolQuery.should.some(q => matchDocAgainstQuery(doc, q))) {
98
+
console.log("โ Rejected: bool.should clause did not match any.");
99
+
return false;
100
+
}
101
+
console.log("โ
Passed: bool clause matched.");
102
+
break;
103
+
}
104
+
105
+
case 'range': {
106
+
const [field, range] = Object.entries(clauseValue as object)[0];
107
+
const value = getSafeField(doc, field);
108
+
if (value === undefined) {
109
+
console.log(`โ Rejected: range field '${field}' is undefined in doc.`);
110
+
return false;
111
+
}
112
+
113
+
for (const [op, opValue] of Object.entries(range as object)) {
114
+
const numValue = Number(value);
115
+
const numOpValue = Number(opValue);
116
+
const isNumeric = !isNaN(numValue) && !isNaN(numOpValue);
117
+
let passed = true;
118
+
119
+
switch (op) {
120
+
case 'gt':
121
+
passed = isNumeric ? numValue > numOpValue : String(value) > String(opValue);
122
+
break;
123
+
case 'gte':
124
+
passed = isNumeric ? numValue >= numOpValue : String(value) >= String(opValue);
125
+
break;
126
+
case 'lt':
127
+
passed = isNumeric ? numValue < numOpValue : String(value) < String(opValue);
128
+
break;
129
+
case 'lte':
130
+
passed = isNumeric ? numValue <= numOpValue : String(value) <= String(opValue);
131
+
break;
132
+
}
133
+
134
+
if (!passed) {
135
+
console.log(`โ Rejected: range.${op} failed on field '${field}', value '${value}' vs '${opValue}'`);
136
+
return false;
137
+
}
138
+
}
139
+
140
+
console.log(`โ
Passed: range match on field '${field}'`);
141
+
break;
142
+
}
143
+
144
+
case 'exists': {
145
+
const field = (clauseValue as { field: string }).field;
146
+
if (getSafeField(doc, field) === undefined) {
147
+
console.log(`โ Rejected: exists failed, field '${field}' not found`);
148
+
return false;
149
+
}
150
+
console.log(`โ
Passed: exists matched, field '${field}' exists`);
151
+
break;
152
+
}
153
+
154
+
default:
155
+
console.log(`โน๏ธ Ignored unknown query clause: '${key}'`);
156
+
break;
157
+
}
158
+
}
159
+
160
+
console.log("โ
Accepted: Document matched all clauses.");
161
+
return true;
162
+
}
163
+
164
+
const ALLOWED_QUERY_CLAUSES = new Set([
165
+
'term',
166
+
'terms',
167
+
'bool',
168
+
'range',
169
+
'exists',
170
+
]);
171
+
172
+
const FORBIDDEN_QUERY_CLAUSES = new Set([
173
+
'match',
174
+
'multi_match',
175
+
'match_phrase',
176
+
'query_string',
177
+
'simple_query_string',
178
+
'fuzzy',
179
+
'script',
180
+
'function_score',
181
+
'more_like_this',
182
+
'percolate',
183
+
]);
184
+
185
+
function validateClause(clause: Record<string, unknown>) {
186
+
for (const key in clause) {
187
+
if (FORBIDDEN_QUERY_CLAUSES.has(key)) {
188
+
throw new Error(`Query clause '${key}' is not allowed for live sync due to non-deterministic behavior. Please use deterministic clauses like 'term' or 'range'.`);
189
+
}
190
+
191
+
if (!ALLOWED_QUERY_CLAUSES.has(key)) {
192
+
throw new Error(`Query clause '${key}' is not supported for live sync.`);
193
+
}
194
+
195
+
const value = clause[key];
196
+
if (key === 'bool' && typeof value === 'object' && value !== null) {
197
+
const boolClauses = value as Record<string, unknown[]>;
198
+
for (const boolType of ['must', 'filter', 'should', 'must_not']) {
199
+
if (Array.isArray(boolClauses[boolType])) {
200
+
for (const subClause of boolClauses[boolType]) {
201
+
if (typeof subClause === 'object' && subClause !== null) {
202
+
validateClause(subClause as Record<string, unknown>);
203
+
}
204
+
}
205
+
}
206
+
}
207
+
}
208
+
}
209
+
}
210
+
211
+
export function validateLiveQuery(esQuery: Record<string, unknown>): void {
212
+
// a query must have an explicit 'sort' clause
213
+
// which prevents non-deterministic sorting (bad)
214
+
const sortClause = esQuery.sort as any[];
215
+
if (!Array.isArray(sortClause) || sortClause.length === 0) {
216
+
throw new Error("Live queries must include an explicit 'sort' clause for deterministic ordering (e.g., sort: [{ '$metadata.indexedAt': 'desc' }]).");
217
+
}
218
+
219
+
// and also of the supported deterministic clauses (no text searches)
220
+
const queryPart = esQuery.query as Record<string, unknown>;
221
+
if (queryPart) {
222
+
validateClause(queryPart);
223
+
}
224
+
}
+26
-6
src/main.ts
+26
-6
src/main.ts
···
1
1
import { readConfig } from "./config.ts";
2
2
import { startFirehose } from "./firehose.ts";
3
-
import { ensureIndexMapping } from "./indexer.ts";
3
+
import { ensureIndexMapping, type IndexerEvent } from "./indexer.ts";
4
4
import { setupXRPCServer } from "./xrpc.ts";
5
+
import { processEventForSync } from "./sync.ts";
6
+
import { parseArgs } from "jsr:@std/cli";
5
7
6
-
const config = await readConfig("./config.json");
8
+
async function main() {
9
+
const config = await readConfig("./config.json");
7
10
8
-
// prepare indexes
9
-
ensureIndexMapping(config);
11
+
const args = parseArgs(Deno.args)
10
12
11
-
startFirehose(config);
13
+
// prepare indexes
14
+
await ensureIndexMapping(config);
12
15
13
-
setupXRPCServer(config);
16
+
setupXRPCServer(config);
17
+
18
+
startFirehose({
19
+
config,
20
+
onEvent: (event: IndexerEvent) => {
21
+
// ESAV Live !!!
22
+
return processEventForSync(event);
23
+
},
24
+
forcedCursor: args["force-cursor"]
25
+
});
26
+
27
+
console.log("Server started and listening for events");
28
+
}
29
+
30
+
main().catch(err => {
31
+
console.error("Fatal error in main:", err);
32
+
Deno.exit(1);
33
+
});
+185
src/sync.ts
+185
src/sync.ts
···
1
+
import { subscriptions, computeCid, type Subscription, type SubscriptionResult } from "./xrpc.ts";
2
+
import { type IndexerEvent } from "./indexer.ts";
3
+
import { getSafeField, matchDocAgainstQuery } from "./live-utils.ts";
4
+
5
+
interface ClientDelta {
6
+
documents: Record<string, { cid: string; doc: Record<string, unknown> }>;
7
+
queries: Record<string, { ecid: string; result: string[] }>;
8
+
}
9
+
10
+
function extractSortValues(doc: Record<string, any>, sortClause: any[]): unknown[] {
11
+
const values: unknown[] = [];
12
+
for (const sortField of sortClause) {
13
+
const key = Object.keys(sortField)[0];
14
+
const value = getSafeField(doc, key);
15
+
values.push(value);
16
+
}
17
+
return values;
18
+
}
19
+
20
+
function compareItems(aSorts: unknown[], bSorts: unknown[], sortClause: any[]): number {
21
+
for (let i = 0; i < sortClause.length; i++) {
22
+
const direction = Object.values(sortClause[i])[0];
23
+
const valA = aSorts[i];
24
+
const valB = bSorts[i];
25
+
26
+
if (valA === valB) continue;
27
+
28
+
if (valA === undefined || valA === null) return 1;
29
+
if (valB === undefined || valB === null) return -1;
30
+
31
+
let comparison = 0;
32
+
if (valA < valB) {
33
+
comparison = -1;
34
+
} else if (valA > valB) {
35
+
comparison = 1;
36
+
}
37
+
38
+
if (comparison !== 0) {
39
+
return direction === 'desc' ? -comparison : comparison;
40
+
}
41
+
}
42
+
return 0;
43
+
}
44
+
45
+
function findInsertIndex(sortedArray: Subscription['result'], newItem: SubscriptionResult, sortClause: any[]): number {
46
+
let low = 0;
47
+
let high = sortedArray.length;
48
+
49
+
while (low < high) {
50
+
const mid = Math.floor((low + high) / 2);
51
+
const comparison = compareItems(newItem.sortValues, sortedArray[mid].sortValues, sortClause);
52
+
if (comparison < 0) {
53
+
high = mid;
54
+
} else {
55
+
low = mid + 1;
56
+
}
57
+
}
58
+
return low;
59
+
}
60
+
61
+
export async function processEventForSync(event: IndexerEvent) {
62
+
const requiredDocuments = new Set<string>();
63
+
64
+
const changedQueries: {
65
+
queryId: string;
66
+
newEcid: string;
67
+
newRichResult: SubscriptionResult[];
68
+
newResultUris: string[];
69
+
sub: Subscription;
70
+
}[] = [];
71
+
72
+
for (const [queryId, sub] of subscriptions.entries()) {
73
+
let newRichResult: SubscriptionResult[] | undefined = undefined;
74
+
const sortClause = (sub.esQuery as any).sort;
75
+
76
+
if (event.type === 'index') {
77
+
const queryClause = (sub.esQuery as any).query ?? {};
78
+
const doesMatch = matchDocAgainstQuery(event.data, queryClause);
79
+
const currentIndex = sub.result.findIndex(item => item.uri === event.uri);
80
+
const wasInResult = currentIndex !== -1;
81
+
82
+
if (doesMatch && !wasInResult) {
83
+
const newItem = {
84
+
uri: event.uri,
85
+
sortValues: extractSortValues(event.data, sortClause),
86
+
};
87
+
const insertIndex = findInsertIndex(sub.result, newItem, sortClause);
88
+
89
+
newRichResult = [...sub.result];
90
+
newRichResult.splice(insertIndex, 0, newItem);
91
+
requiredDocuments.add(event.uri);
92
+
93
+
} else if (!doesMatch && wasInResult) {
94
+
newRichResult = sub.result.filter(item => item.uri !== event.uri);
95
+
96
+
} else if (doesMatch && wasInResult) {
97
+
const newItem = {
98
+
uri: event.uri,
99
+
sortValues: extractSortValues(event.data, sortClause),
100
+
};
101
+
if (compareItems(newItem.sortValues, sub.result[currentIndex].sortValues, sortClause) !== 0) {
102
+
const tempResult = sub.result.filter(item => item.uri !== event.uri);
103
+
const insertIndex = findInsertIndex(tempResult, newItem, sortClause);
104
+
105
+
newRichResult = [...tempResult];
106
+
newRichResult.splice(insertIndex, 0, newItem);
107
+
}
108
+
requiredDocuments.add(event.uri);
109
+
}
110
+
} else if (event.type === 'delete') {
111
+
if (sub.result.some(item => item.uri === event.uri)) {
112
+
newRichResult = sub.result.filter(item => item.uri !== event.uri);
113
+
}
114
+
}
115
+
116
+
if (newRichResult) {
117
+
const newResultUris = newRichResult.map(item => item.uri);
118
+
const newEcid = await computeCid(newResultUris);
119
+
120
+
if (newEcid !== sub.ecid) {
121
+
changedQueries.push({
122
+
queryId,
123
+
newEcid,
124
+
newRichResult,
125
+
newResultUris,
126
+
sub
127
+
});
128
+
}
129
+
}
130
+
}
131
+
132
+
for (const { queryId, newEcid, newRichResult } of changedQueries) {
133
+
const sub = subscriptions.get(queryId);
134
+
if (sub) {
135
+
sub.ecid = newEcid;
136
+
sub.result = newRichResult;
137
+
}
138
+
}
139
+
140
+
const documentsPayload: Record<string, { cid: string; doc: Record<string, unknown> }> = {};
141
+
if (event.type === 'index') {
142
+
for (const uri of requiredDocuments) {
143
+
documentsPayload[uri] = {
144
+
cid: event.data["$metadata.cid"] as string,
145
+
doc: event.data as Record<string, unknown>,
146
+
};
147
+
}
148
+
}
149
+
150
+
if (changedQueries.length > 0 || Object.keys(documentsPayload).length > 0) {
151
+
const affectedClients = new Set<WebSocket>();
152
+
changedQueries.forEach(cq => cq.sub.clients.forEach(c => affectedClients.add(c)));
153
+
154
+
if (Object.keys(documentsPayload).length > 0 && event.type === 'index') {
155
+
for (const [, sub] of subscriptions.entries()) {
156
+
if (sub.result.some(item => item.uri === event.uri)) {
157
+
sub.clients.forEach(c => affectedClients.add(c));
158
+
}
159
+
}
160
+
}
161
+
162
+
for (const client of affectedClients) {
163
+
const delta: ClientDelta = {
164
+
documents: documentsPayload,
165
+
queries: {},
166
+
};
167
+
168
+
for (const { queryId, newEcid, newResultUris, sub } of changedQueries) {
169
+
if (sub.clients.has(client)) {
170
+
delta.queries[queryId] = { ecid: newEcid, result: newResultUris };
171
+
}
172
+
}
173
+
174
+
if (Object.keys(delta.queries).length > 0 || Object.keys(documentsPayload).length > 0) {
175
+
if (client.readyState === WebSocket.OPEN) {
176
+
try {
177
+
client.send(JSON.stringify({ type: 'query-delta', ...delta }));
178
+
} catch (err) {
179
+
console.error("Failed to send delta to client:", err);
180
+
}
181
+
}
182
+
}
183
+
}
184
+
}
185
+
}
+506
-54
src/xrpc.ts
+506
-54
src/xrpc.ts
···
1
-
import { AppConfig } from "../src/config.ts";
1
+
import { AppConfig } from "./config.ts";
2
2
import { serve } from "https://deno.land/std/http/server.ts";
3
+
import { CID } from "npm:multiformats/cid";
4
+
import { sha256 } from "npm:multiformats/hashes/sha2";
5
+
import { DidResolver, HandleResolver } from "npm:@atproto/identity";
6
+
import * as dagCbor from "npm:@ipld/dag-cbor";
7
+
import { validateLiveQuery } from "./live-utils.ts";
8
+
9
+
const ZOMBIE_LIFETIME_MS = 5 * 60 * 1000;
10
+
11
+
declare global {
12
+
interface WebSocket {
13
+
isAlive: boolean;
14
+
ping(): void;
15
+
terminate?(): void;
16
+
}
17
+
}
18
+
19
+
export interface SubscriptionResult {
20
+
uri: string;
21
+
sortValues: unknown[];
22
+
}
23
+
24
+
export interface Subscription {
25
+
esQuery: Record<string, unknown>;
26
+
clients: Set<WebSocket>;
27
+
ecid: string;
28
+
result: SubscriptionResult[];
29
+
zombieTimer?: number;
30
+
}
31
+
32
+
export const subscriptions = new Map<string, Subscription>();
33
+
34
+
export async function computeCid(data: unknown): Promise<string> {
35
+
const encodedBytes = dagCbor.encode(data);
36
+
const hash = await sha256.digest(encodedBytes);
37
+
return CID.create(1, dagCbor.code, hash).toString();
38
+
}
39
+
40
+
function corsHeaders(): HeadersInit {
41
+
return {
42
+
"Access-Control-Allow-Origin": "*",
43
+
"Access-Control-Allow-Methods": "GET,POST,OPTIONS",
44
+
"Access-Control-Allow-Headers": "Content-Type",
45
+
};
46
+
}
3
47
4
48
export function setupXRPCServer(config: AppConfig) {
5
-
serve(async (req) => {
6
-
const url = new URL(req.url);
49
+
serve(
50
+
async (req) => {
51
+
const { pathname, searchParams } = new URL(req.url);
7
52
8
-
if (req.method === "OPTIONS") {
9
-
return new Response(null, {
10
-
status: 204,
11
-
headers: corsHeaders(),
12
-
});
13
-
}
53
+
if (req.method === "OPTIONS") {
54
+
return new Response(null, { status: 204, headers: corsHeaders() });
55
+
}
14
56
15
-
if (url.pathname === "/xrpc/com.example.prototypeESQuery") {
16
-
let esQuery: any;
57
+
if (
58
+
pathname === "/xrpc/party.whey.esav.esSync" &&
59
+
req.headers.get("upgrade") === "websocket"
60
+
) {
61
+
const { socket, response } = Deno.upgradeWebSocket(req);
62
+
handleWebSocket(socket, config);
63
+
return response;
64
+
}
17
65
18
-
if (req.method === "POST") {
66
+
if (pathname === "/xrpc/party.whey.esav.resolveIdentity") {
19
67
try {
20
-
esQuery = await req.json();
21
-
} catch {
22
-
return new Response("Invalid JSON body", {
23
-
status: 400,
68
+
const handle = searchParams.get("handle");
69
+
const did = searchParams.get("did");
70
+
const clientCid = searchParams.get("cid");
71
+
const includeBskyProfile = searchParams.get("includeBskyProfile");
72
+
73
+
if (!handle && !did) {
74
+
return new Response("handle or did parameter is required", {
75
+
status: 400,
76
+
headers: corsHeaders(),
77
+
});
78
+
}
79
+
80
+
const identity = await resolveWithHandleOrDid({
81
+
handle: handle ?? undefined,
82
+
did: did ?? undefined,
83
+
});
84
+
85
+
const finalResponse: Record<string, unknown> = {
86
+
...identity,
87
+
};
88
+
89
+
if (includeBskyProfile) {
90
+
finalResponse.profile = await getProfileRecord(identity.pdsUrl, identity.did);
91
+
}
92
+
93
+
const newCid = await computeCid(finalResponse);
94
+
95
+
if (clientCid && clientCid === newCid) {
96
+
return new Response(null, { status: 304, headers: corsHeaders() });
97
+
}
98
+
99
+
return new Response(JSON.stringify(finalResponse), {
100
+
status: 200,
101
+
headers: { ...corsHeaders(), "Content-Type": "application/json" },
102
+
});
103
+
} catch (error) {
104
+
const errorMessage =
105
+
error instanceof Error
106
+
? error.message
107
+
: "An unknown error occurred";
108
+
return new Response(errorMessage, {
109
+
status: 500,
24
110
headers: corsHeaders(),
25
111
});
26
112
}
27
-
} else if (req.method === "GET") {
28
-
const queryParam = url.searchParams.get("q");
29
-
if (!queryParam) {
30
-
return new Response("Missing 'q'", {
31
-
status: 400,
113
+
}
114
+
115
+
if (
116
+
pathname === "/xrpc/com.example.prototypeESQuery" ||
117
+
pathname === "/xrpc/party.whey.esav.esQuery"
118
+
) {
119
+
let esQuery: Record<string, unknown>;
120
+
const clientCid: string | null = searchParams.get("cid");
121
+
122
+
if (req.method === "POST") {
123
+
try {
124
+
esQuery = await req.json();
125
+
} catch {
126
+
return new Response("Invalid JSON body", {
127
+
status: 400,
128
+
headers: corsHeaders(),
129
+
});
130
+
}
131
+
} else if (req.method === "GET") {
132
+
const queryParam = searchParams.get("q");
133
+
if (!queryParam)
134
+
return new Response("Missing 'q'", {
135
+
status: 400,
136
+
headers: corsHeaders(),
137
+
});
138
+
139
+
try {
140
+
esQuery = JSON.parse(queryParam);
141
+
} catch {
142
+
return new Response("Invalid JSON in 'q'", {
143
+
status: 400,
144
+
headers: corsHeaders(),
145
+
});
146
+
}
147
+
} else {
148
+
return new Response("Method not allowed", {
149
+
status: 405,
32
150
headers: corsHeaders(),
33
151
});
34
152
}
35
153
36
154
try {
37
-
esQuery = JSON.parse(queryParam);
38
-
} catch {
39
-
return new Response("Invalid JSON in 'q'", {
40
-
status: 400,
155
+
const esRes = await fetch(
156
+
`${config.es_url}/${config.index_name}/_search`,
157
+
{
158
+
method: "POST",
159
+
headers: { "Content-Type": "application/json" },
160
+
body: JSON.stringify(esQuery),
161
+
}
162
+
);
163
+
164
+
if (!esRes.ok) {
165
+
const errText = await esRes.text();
166
+
return new Response(`Elasticsearch error: ${errText}`, {
167
+
status: 500,
168
+
headers: corsHeaders(),
169
+
});
170
+
}
171
+
172
+
const result = await esRes.json();
173
+
const newCid = await computeCid(result);
174
+
175
+
if (clientCid && clientCid === newCid) {
176
+
return new Response(null, { status: 304, headers: corsHeaders() });
177
+
}
178
+
179
+
return new Response(JSON.stringify(result), {
180
+
status: 200,
181
+
headers: { ...corsHeaders(), "Content-Type": "application/json" },
182
+
});
183
+
} catch (error) {
184
+
const errorMessage =
185
+
error instanceof Error
186
+
? error.message
187
+
: "An unknown error occurred";
188
+
return new Response(errorMessage, {
189
+
status: 500,
41
190
headers: corsHeaders(),
42
191
});
43
192
}
44
-
} else {
45
-
return new Response("Method not allowed", {
46
-
status: 405,
47
-
headers: corsHeaders(),
48
-
});
49
193
}
50
194
51
-
const esRes = await fetch(`${config.es_url}/${config.index_name}/_search`, {
195
+
return new Response("Not found", { status: 404, headers: corsHeaders() });
196
+
},
197
+
{ port: Number(config.serve_port) }
198
+
);
199
+
console.log(`XRPC server running on http://localhost:${config.serve_port}`);
200
+
}
201
+
202
+
function cleanupClient(ws: WebSocket) {
203
+
console.log("WebSocket client disconnected. Cleaning up subscriptions.");
204
+
for (const [queryId, sub] of subscriptions.entries()) {
205
+
if (sub.clients.has(ws)) {
206
+
handleUnsubscribe(ws, queryId);
207
+
// sub.clients.delete(ws);
208
+
// if (sub.clients.size === 0) {
209
+
// subscriptions.delete(queryId);
210
+
// console.log(`Cleaned up empty subscription for queryId: ${queryId}`);
211
+
// }
212
+
}
213
+
}
214
+
}
215
+
216
+
function handleWebSocket(ws: WebSocket, config: AppConfig) {
217
+
console.log("WebSocket client connected.");
218
+
ws.isAlive = true;
219
+
220
+
const pingInterval = setInterval(() => {
221
+
if (!ws.isAlive) {
222
+
console.log('Terminating dead WebSocket connection');
223
+
return ws.close();
224
+
}
225
+
ws.isAlive = false;
226
+
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'ping' }));
227
+
}, 30000);
228
+
229
+
const onMessage = async (event: MessageEvent) => {
230
+
if (typeof event.data !== 'string') return;
231
+
232
+
let msg;
233
+
try {
234
+
msg = JSON.parse(event.data);
235
+
} catch (_err) {
236
+
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
237
+
return;
238
+
}
239
+
240
+
switch (msg.type) {
241
+
case 'pong':
242
+
ws.isAlive = true;
243
+
break;
244
+
case 'subscribe':
245
+
if (msg.queryId && msg.esquery) {
246
+
await handleSubscribe(ws, msg.queryId, msg.esquery, config, msg.ecid);
247
+
} else {
248
+
ws.send(JSON.stringify({ type: 'error', error: 'Invalid subscribe message', request: msg }));
249
+
}
250
+
break;
251
+
case 'unsubscribe':
252
+
if (msg.queryId) {
253
+
handleUnsubscribe(ws, msg.queryId);
254
+
} else {
255
+
ws.send(JSON.stringify({ type: 'error', error: 'Invalid unsubscribe message', request: msg }));
256
+
}
257
+
break;
258
+
default:
259
+
ws.send(JSON.stringify({ type: 'error', error: 'Unknown message type', request: msg }));
260
+
break;
261
+
}
262
+
};
263
+
264
+
const onClose = () => { clearInterval(pingInterval); cleanupClient(ws); };
265
+
const onError = (e: Event) => { console.error('WebSocket error:', e); onClose(); };
266
+
267
+
ws.addEventListener('message', onMessage);
268
+
ws.addEventListener('close', onClose);
269
+
ws.addEventListener('error', onError);
270
+
}
271
+
272
+
async function handleSubscribe(
273
+
ws: WebSocket,
274
+
queryId: string,
275
+
esQuery: Record<string, unknown>,
276
+
config: AppConfig,
277
+
clientEcid?: string
278
+
) {
279
+
try {
280
+
validateLiveQuery(esQuery);
281
+
} catch (err) {
282
+
console.error(`Invalid live query for queryId ${queryId}: ${err}`);
283
+
if (ws.readyState === WebSocket.OPEN) {
284
+
ws.send(JSON.stringify({
285
+
type: "error",
286
+
queryId,
287
+
error: `Invalid query for live subscription: ${err}`
288
+
}));
289
+
}
290
+
return;
291
+
}
292
+
293
+
let existingSub = subscriptions.get(queryId);
294
+
295
+
// a subscription (active or zombie) already exists
296
+
if (existingSub) {
297
+
// if the new query is different, the old state is invalid.
298
+
if (JSON.stringify(esQuery) !== JSON.stringify(existingSub.esQuery)) {
299
+
console.log(`[Sync] Query for ${queryId} has changed. Purging old state and re-running.`);
300
+
if (existingSub.zombieTimer) clearTimeout(existingSub.zombieTimer);
301
+
subscriptions.delete(queryId);
302
+
existingSub = undefined;
303
+
} else {
304
+
// if the query is the same. lets revive it if its a zombie
305
+
if (existingSub.zombieTimer) {
306
+
console.log(`[Zombie] Reviving zombie subscription: ${queryId}`);
307
+
clearTimeout(existingSub.zombieTimer);
308
+
delete existingSub.zombieTimer;
309
+
}
310
+
311
+
existingSub.clients.add(ws);
312
+
313
+
// only if the ecid is still salvageable
314
+
if (clientEcid && existingSub.ecid === clientEcid) {
315
+
console.log(`[Sync] Client for ${queryId} is already up-to-date. No-op.`);
316
+
return;
317
+
}
318
+
319
+
console.log(`[Sync] Client for ${queryId} is out of sync. Re-running ES query.`);
320
+
}
321
+
}
322
+
323
+
324
+
325
+
// if (clientEcid && existingSub && existingSub.ecid === clientEcid) {
326
+
// console.log(`Client re-subscribed to ${queryId} with matching ECID.`);
327
+
// existingSub.clients.add(ws);
328
+
// return;
329
+
// }
330
+
331
+
console.log(`Performing full sync for queryId: ${queryId}`);
332
+
try {
333
+
const esRes = await fetch(
334
+
`${config.es_url}/${config.index_name}/_search`,
335
+
{
52
336
method: "POST",
53
337
headers: { "Content-Type": "application/json" },
54
338
body: JSON.stringify(esQuery),
339
+
}
340
+
);
341
+
342
+
if (!esRes.ok) throw new Error(`Elasticsearch error: ${await esRes.text()}`);
343
+
344
+
const esResult = await esRes.json();
345
+
const hits = esResult.hits?.hits ?? [];
346
+
347
+
const documents: Record<string, { cid: string, doc: Record<string, unknown> }> = {};
348
+
const newResultState: SubscriptionResult[] = [];
349
+
350
+
for (const hit of hits) {
351
+
const uri = hit._id as string;
352
+
const source = hit._source;
353
+
354
+
newResultState.push({
355
+
uri: uri,
356
+
sortValues: hit.sort || [],
55
357
});
56
358
57
-
if (!esRes.ok) {
58
-
const errText = await esRes.text();
59
-
return new Response(`Elasticsearch error: ${errText}`, {
60
-
status: 500,
61
-
headers: corsHeaders(),
62
-
});
63
-
}
359
+
documents[uri] = {
360
+
cid: source['$metadata.cid'],
361
+
doc: source,
362
+
};
363
+
}
364
+
365
+
const resultUris = newResultState.map(item => item.uri);
366
+
const newEcid = await computeCid(resultUris);
64
367
65
-
const result = await esRes.json();
66
-
return new Response(JSON.stringify(result), {
67
-
status: 200,
68
-
headers: {
69
-
...corsHeaders(),
70
-
"Content-Type": "application/json",
71
-
},
368
+
if (existingSub) {
369
+
existingSub.clients.add(ws);
370
+
existingSub.result = newResultState;
371
+
existingSub.ecid = newEcid;
372
+
existingSub.esQuery = esQuery;
373
+
} else {
374
+
subscriptions.set(queryId, {
375
+
esQuery,
376
+
clients: new Set([ws]),
377
+
ecid: newEcid,
378
+
result: newResultState,
72
379
});
73
380
}
74
381
75
-
return new Response("Not found", { status: 404, headers: corsHeaders() });
76
-
}, { port: Number(config.serve_port) });
382
+
ws.send(JSON.stringify({
383
+
type: "query-delta",
384
+
documents,
385
+
queries: {
386
+
[queryId]: {
387
+
ecid: newEcid,
388
+
result: resultUris,
389
+
}
390
+
}
391
+
}));
392
+
393
+
} catch (err) {
394
+
console.error(`Error subscribing client to query ${queryId}:`, err);
395
+
if (ws.readyState === WebSocket.OPEN) {
396
+
ws.send(JSON.stringify({ type: "error", queryId, error: `Subscription failed: ${err instanceof Error ? err.message : 'Unknown error'}` }));
397
+
}
398
+
}
77
399
}
78
400
79
-
function corsHeaders(): HeadersInit {
80
-
return {
81
-
"Access-Control-Allow-Origin": "*",
82
-
"Access-Control-Allow-Methods": "GET,POST,OPTIONS",
83
-
"Access-Control-Allow-Headers": "Content-Type",
84
-
};
85
-
}
401
+
function transitionToZombie(queryId: string) {
402
+
const sub = subscriptions.get(queryId);
403
+
if (!sub || sub.clients.size > 0) {
404
+
return;
405
+
}
406
+
407
+
if (sub.zombieTimer) {
408
+
clearTimeout(sub.zombieTimer);
409
+
}
410
+
411
+
console.log(`[Zombie] Subscription ${queryId} has no clients. Entering zombie state for 5 minutes.`);
412
+
413
+
sub.zombieTimer = setTimeout(() => {
414
+
subscriptions.delete(queryId);
415
+
console.log(`[Zombie] Purged zombie subscription: ${queryId}`);
416
+
}, ZOMBIE_LIFETIME_MS);
417
+
}
418
+
419
+
function handleUnsubscribe(ws: WebSocket, queryId: string) {
420
+
const sub = subscriptions.get(queryId);
421
+
if (!sub) return;
422
+
sub.clients.delete(ws);
423
+
console.log(`[Sync] Client unsubscribed from ${queryId}. Remaining clients: ${sub.clients.size}`);
424
+
425
+
if (sub.clients.size === 0) {
426
+
transitionToZombie(queryId);
427
+
}
428
+
// if (sub.clients.size === 0) {
429
+
// subscriptions.delete(queryId);
430
+
// console.log(`Subscription removed for queryId: ${queryId} (last client disconnected)`);
431
+
// } else {
432
+
// console.log(`Client unsubscribed from ${queryId}`);
433
+
// }
434
+
}
435
+
436
+
declare global {
437
+
interface WebSocket {
438
+
isAlive: boolean;
439
+
ping(): void;
440
+
terminate?(): void;
441
+
}
442
+
}
443
+
444
+
async function getServiceEndpointFromDid(
445
+
did: string
446
+
): Promise<{ serviceEndpoint: string; handle?: string }> {
447
+
const didres = new DidResolver({});
448
+
const doc = await didres.resolve(did) as {
449
+
service?: Array<{
450
+
type: string;
451
+
serviceEndpoint: string;
452
+
}>;
453
+
alsoKnownAs?: string[];
454
+
[key: string]: unknown;
455
+
} | null;
456
+
if (!doc) {
457
+
throw new Error(`Could not resolve DID document for: ${did}`);
458
+
}
459
+
const endpointRaw = doc.service?.find(
460
+
(s: any) => s.type === "AtprotoPersonalDataServer"
461
+
)?.serviceEndpoint;
462
+
if (!endpointRaw) {
463
+
throw new Error(`Service endpoint not found for DID: ${did}`);
464
+
}
465
+
function getServiceEndpointString(endpoint: unknown): string {
466
+
if (typeof endpoint === "string") return endpoint;
467
+
if (
468
+
typeof endpoint === "object" &&
469
+
endpoint &&
470
+
"uri" in endpoint &&
471
+
typeof endpoint.uri === "string"
472
+
) {
473
+
return endpoint.uri;
474
+
}
475
+
throw new Error("Unsupported serviceEndpoint format");
476
+
}
477
+
const serviceEndpoint = getServiceEndpointString(endpointRaw);
478
+
let handle: string | undefined;
479
+
if (Array.isArray(doc.alsoKnownAs) && doc.alsoKnownAs.length > 0) {
480
+
const aka = doc.alsoKnownAs[0];
481
+
if (typeof aka === "string" && aka.startsWith("at://")) {
482
+
handle = aka.slice("at://".length);
483
+
}
484
+
}
485
+
return { serviceEndpoint, handle };
486
+
}
487
+
488
+
async function resolveWithHandleOrDid({
489
+
handle,
490
+
did,
491
+
}: {
492
+
handle?: string;
493
+
did?: string;
494
+
}): Promise<{ did: string; pdsUrl: string; handle?: string }> {
495
+
if (did) {
496
+
const { serviceEndpoint, handle: resolvedHandle } =
497
+
await getServiceEndpointFromDid(did);
498
+
return {
499
+
did,
500
+
pdsUrl: serviceEndpoint,
501
+
handle: resolvedHandle,
502
+
};
503
+
} else if (handle) {
504
+
const hdlres = new HandleResolver();
505
+
const resolvedDid = await hdlres.resolve(handle);
506
+
if (!resolvedDid) {
507
+
throw new Error(`Could not resolve handle: ${handle}`);
508
+
}
509
+
const { serviceEndpoint } = await getServiceEndpointFromDid(resolvedDid);
510
+
return { did: resolvedDid, pdsUrl: serviceEndpoint, handle };
511
+
} else {
512
+
throw new Error("Either handle or did must be provided");
513
+
}
514
+
}
515
+
516
+
async function getProfileRecord(
517
+
pdsUrl: string,
518
+
did: string
519
+
): Promise<string | undefined> {
520
+
try {
521
+
const profileUrl = `${pdsUrl}/xrpc/com.atproto.repo.getRecord?repo=${did}&collection=app.bsky.actor.profile&rkey=self`;
522
+
const response = await fetch(profileUrl);
523
+
if (!response.ok) {
524
+
console.warn(
525
+
`Failed to fetch profile for ${did} from ${pdsUrl}: ${response.status}`
526
+
);
527
+
return undefined;
528
+
}
529
+
const data = await response.json();
530
+
//const cid = data?.value?.avatar?.ref?.["$link"];
531
+
//const pfpurl = `${pdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
532
+
return data.value;
533
+
} catch (error) {
534
+
console.error(`Error fetching profile record for ${did}:`, error);
535
+
return undefined;
536
+
}
537
+
}