+56
-2
config.json
+56
-2
config.json
···
1
1
{
2
2
"jetstream_url": "wss://jetstream2.us-west.bsky.network/subscribe",
3
3
"es_url": "http://localhost:9200",
4
-
"record_types": ["xyz.statusphere.status", "com.example.test.esav"],
4
+
"record_types": [
5
+
"xyz.statusphere.status",
6
+
"com.example.test.esav",
7
+
"party.whey.ft.topic.post",
8
+
"party.whey.ft.topic.reaction",
9
+
"party.whey.ft.topic.moderation",
10
+
"party.whey.ft.forum.definition",
11
+
"party.whey.ft.forum.layout",
12
+
"party.whey.ft.forum.request",
13
+
"party.whey.ft.forum.accept",
14
+
"party.whey.ft.forum.category",
15
+
"party.whey.ft.user.profile"
16
+
],
5
17
"index_name": "appview-poc",
6
18
"serve_port": "8370",
7
19
"index_fields": {
20
+
"party.whey.ft.topic.reaction": {
21
+
"subject": {
22
+
"id": "reactionSubject",
23
+
"type": "keyword"
24
+
},
25
+
"reactionEmoji": {
26
+
"id": "reactionEmoji",
27
+
"type": "keyword"
28
+
}
29
+
},
30
+
"party.whey.ft.topic.post": {
31
+
"text": {
32
+
"id": "text",
33
+
"type": "text"
34
+
},
35
+
"title": {
36
+
"id": "title",
37
+
"type": "text"
38
+
},
39
+
"reply.root.uri": {
40
+
"id": "root",
41
+
"type": "keyword"
42
+
},
43
+
"reply.parent.uri": {
44
+
"id": "parent",
45
+
"type": "keyword"
46
+
},
47
+
"forum": {
48
+
"id": "forum",
49
+
"type": "keyword"
50
+
}
51
+
},
52
+
"party.whey.ft.forum.definition": {
53
+
"description": {
54
+
"id": "description",
55
+
"type": "text"
56
+
},
57
+
"displayName": {
58
+
"id": "displayName",
59
+
"type": "text"
60
+
}
61
+
},
8
62
"xyz.statusphere.status": {
9
63
"status": {
10
64
"id": "status",
11
-
"type": "text"
65
+
"type": "keyword"
12
66
}
13
67
},
14
68
"com.example.test.esav": {
+143
-6
readme.md
+143
-6
readme.md
···
1
1
# ESAV: ElasticSearch AppView
2
-
terrible i know right lmao
2
+
Jetstream plugged into ElasticSearch with a queryable api
3
+
(also now supports live queries)
4
+
5
+
## Queries
6
+
### the boring one
7
+
send ES queries to `/xrpc/party.whey.esav.esQuery` either POST or GET q=
8
+
9
+
the format is just a normal elasticsearch query, any of them should work
10
+
11
+
### the cooler Live one (sync over websocket)
12
+
ESAV Live is a real-time indexing and query service ESAV.
13
+
14
+
the websocket provides a live stream of new document at:// URIs that match a specific query
15
+
16
+
send Live queries to `wss://{your domain}/xrpc/party.whey.esav.esSync`
17
+
18
+
once a connection is open, the client must send a subscription request (the live query itelf). this message is a JSON object containing the query that defines the feed you are interested in. the query format is basically a dumbed down Elasticsearch Query DSL (stripped to the few stuff thats actually implemented, and thatll produce a deterministic order / result)
19
+
20
+
for example, to subscribe to all statuses from the `xyz.statusphere.status` collection, send the JSON request bleow:
21
+
22
+
```json
23
+
{
24
+
"type": "subscribe",
25
+
"queryId": "statusphere-main-page",
26
+
"esquery": {
27
+
"query": {
28
+
"term": {
29
+
"$metadata.collection": "xyz.statusphere.status"
30
+
}
31
+
},
32
+
"sort": [
33
+
{
34
+
"$metadata.indexedAt": "desc"
35
+
}
36
+
],
37
+
"size": 100
38
+
}
39
+
}
40
+
```
41
+
42
+
43
+
and then server will send back JSON messages as new documents matching your query are indexed. the primary message type youll get is a `query-delta` which will contain an array of new document URIs for each active query, and also the set of new or modified documents to store.
44
+
45
+
example server message:
46
+
47
+
```json
48
+
{
49
+
"type": "query-delta",
50
+
"documents": {
51
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n": {
52
+
"cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy",
53
+
"doc": {
54
+
"$metadata.uri": "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n",
55
+
"$metadata.cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy",
56
+
"$metadata.indexedAt": "2025-08-07T12:40:09.426Z",
57
+
"$metadata.did": "did:plc:mn45tewwnse5btfftvd3powc",
58
+
"$metadata.collection": "xyz.statusphere.status",
59
+
"$metadata.rkey": "3lvsr2zqf3k2n",
60
+
"status": "๐",
61
+
"$raw": {
62
+
"$type": "xyz.statusphere.status",
63
+
"createdAt": "2025-08-07T12:40:08.602Z",
64
+
"status": "๐"
65
+
}
66
+
}
67
+
}
68
+
},
69
+
"queries": {
70
+
"statusphere-main-page": {
71
+
"ecid": "bafyreidsxtbtnwce2o72wrwtwhcvc7olosdtvb2i4g6ezbs4kaxn3v3qna",
72
+
"result": [
73
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n",
74
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsov4tatf2n",
75
+
"at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsouyvrbr2t"
76
+
]
77
+
}
78
+
}
79
+
}
80
+
```
81
+
82
+
in here, only one document is given because this is not the initial response after a registered live query. or alternatively, a live query was registered using a still-valid "ecid" value (the window is like 5 minutes so its really only for dropped connections).
83
+
84
+
your application should listen for these messages and prepend the new URIs to its list of statuses, creating the real-time effect
85
+
86
+
### helper functions
87
+
88
+
because we need to sometimes resolve their did from handle, or handle from did
89
+
90
+
and also because despite your usage of a custom lexicon, we still need to fetch app.bsky.actor.profile for their pfp right
91
+
92
+
fetch `https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity`
93
+
94
+
with these params:
95
+
- `did`: The DID of the user.
96
+
- `handle`: The handle of the user.
97
+
- `includeBskyProfile` (optional): `true` to include the the entire bsky profile object as well.
3
98
4
-
send ES queries to `/xrpc/com.example.prototypeESQuery` either POST or GET q=
99
+
**Example Request:**
5
100
6
-
change config in `config.json`
101
+
`https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity?did=did:plc:cjfima2v3vnyfuzieu7bvjx7&includeBskyProfile=true`
7
102
8
-
run `deno task dev` to start
103
+
gets me
9
104
10
-
you need elasticsearch configured and running first though, good luck with that
105
+
**Example Response (`200 OK`):**
11
106
12
-
## ForumTest, built with ESAV
107
+
```json
108
+
{
109
+
"did":"did:plc:cjfima2v3vnyfuzieu7bvjx7",
110
+
"pdsUrl":"https://pds-nd.whey.party",
111
+
"handle":"forumtest.whey.party",
112
+
"profile": {
113
+
"$type": "app.bsky.actor.profile",
114
+
"avatar": {
115
+
"$type": "blob",
116
+
"ref": {
117
+
"$link": "bafkreiabb6nrbpgguh5xaake3shoyxh2i7lyj7nj7j3ejk77hdlvt4lhmu"
118
+
},
119
+
"mimeType": "image/png",
120
+
"size": 35262
121
+
},
122
+
"banner": {
123
+
"$type": "blob",
124
+
"ref": {
125
+
"$link": "bafkreieupktgazj4vxuxbj6dyf4trvltgok5ukdsnvvblsqqw26cmsvn7y"
126
+
},
127
+
"mimeType": "image/jpeg",
128
+
"size": 931415
129
+
},
130
+
"createdAt": "2025-08-04T06:27:34.561Z",
131
+
"description": "ForumTest discussion and development",
132
+
"displayName": "ForumTest"
133
+
}
134
+
}
135
+
```
136
+
137
+
> **Note on Performance**: This endpoint is called frequently. It is highly recommended to implement a client-side cache (e.g., a simple JavaScript `Map` or object) to store profile data keyed by DID. This will prevent your application from making redundant network requests for the same user profile, significantly improving performance.
138
+
139
+
140
+
## stuff built with ESAV
141
+
142
+
### Statusphere ESAV Live
143
+
an example usage of ESAV Live with the "hello world!" of atproto
144
+
145
+
check it out here -> [https://statusphere.whey.party/](https://statusphere.whey.party/)
146
+
repo: [https://tangled.sh/@whey.party/statusphere-esav-live](https://tangled.sh/@whey.party/statusphere-esav-live)
147
+
148
+
### ForumTest
149
+
atproto forum thing test
13
150
14
151
check it out here -> [https://forumtest.whey.party](https://forumtest.whey.party)
15
152
repo: [https://tangled.sh/@whey.party/forumtest](https://tangled.sh/@whey.party/forumtest)
+6
-1
src/firehose.ts
+6
-1
src/firehose.ts
···
4
4
interface FirehoseOptions {
5
5
config: AppConfig;
6
6
onEvent: OnEventCallback;
7
+
forcedCursor?: string;
7
8
}
8
9
9
-
export function startFirehose({ config, onEvent }: FirehoseOptions) {
10
+
export function startFirehose({ config, onEvent, forcedCursor }: FirehoseOptions) {
10
11
let lastCursor: number | null = null;
11
12
12
13
const connect = () => {
13
14
const url = new URL(config.jetstream_url);
14
15
if (lastCursor !== null) {
15
16
url.searchParams.set("cursor", lastCursor.toString());
17
+
}
18
+
if (forcedCursor) {
19
+
console.log("using forced cursor: ", forcedCursor)
20
+
url.searchParams.set("cursor", forcedCursor);
16
21
}
17
22
18
23
const ws = new WebSocket(url.toString());
+14
src/live-utils.ts
+14
src/live-utils.ts
···
50
50
break;
51
51
}
52
52
53
+
case 'terms': {
54
+
const [field, queryValues] = Object.entries(clauseValue as object)[0];
55
+
if (!Array.isArray(queryValues)) return false;
56
+
57
+
const docValue = getSafeField(doc, field);
58
+
if (docValue === undefined) return false;
59
+
60
+
if (Array.isArray(docValue)) {
61
+
return docValue.some(v => queryValues.includes(v));
62
+
} else {
63
+
return queryValues.includes(docValue);
64
+
}
65
+
}
66
+
53
67
case 'match': {
54
68
const [field, value] = Object.entries(clauseValue as object)[0];
55
69
const fieldValue = getSafeField(doc, field);
+5
-1
src/main.ts
+5
-1
src/main.ts
···
3
3
import { ensureIndexMapping, type IndexerEvent } from "./indexer.ts";
4
4
import { setupXRPCServer } from "./xrpc.ts";
5
5
import { processEventForSync } from "./sync.ts";
6
+
import { parseArgs } from "jsr:@std/cli";
6
7
7
8
async function main() {
8
9
const config = await readConfig("./config.json");
10
+
11
+
const args = parseArgs(Deno.args)
9
12
10
13
// prepare indexes
11
14
await ensureIndexMapping(config);
···
17
20
onEvent: (event: IndexerEvent) => {
18
21
// ESAV Live !!!
19
22
return processEventForSync(event);
20
-
}
23
+
},
24
+
forcedCursor: args["force-cursor"]
21
25
});
22
26
23
27
console.log("Server started and listening for events");
+8
-8
src/xrpc.ts
+8
-8
src/xrpc.ts
···
68
68
const handle = searchParams.get("handle");
69
69
const did = searchParams.get("did");
70
70
const clientCid = searchParams.get("cid");
71
-
const includePfp = searchParams.get("includePfp");
71
+
const includeBskyProfile = searchParams.get("includeBskyProfile");
72
72
73
73
if (!handle && !did) {
74
74
return new Response("handle or did parameter is required", {
···
86
86
...identity,
87
87
};
88
88
89
-
if (includePfp) {
90
-
finalResponse.pfp = await getPfpUrl(identity.pdsUrl, identity.did);
89
+
if (includeBskyProfile) {
90
+
finalResponse.profile = await getProfileRecord(identity.pdsUrl, identity.did);
91
91
}
92
92
93
93
const newCid = await computeCid(finalResponse);
···
513
513
}
514
514
}
515
515
516
-
async function getPfpUrl(
516
+
async function getProfileRecord(
517
517
pdsUrl: string,
518
518
did: string
519
519
): Promise<string | undefined> {
···
527
527
return undefined;
528
528
}
529
529
const data = await response.json();
530
-
const cid = data?.value?.avatar?.ref?.["$link"];
531
-
const pfpurl = `${pdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
532
-
return pfpurl;
530
+
//const cid = data?.value?.avatar?.ref?.["$link"];
531
+
//const pfpurl = `${pdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
532
+
return data.value;
533
533
} catch (error) {
534
-
console.error(`Error fetching PFP for ${did}:`, error);
534
+
console.error(`Error fetching profile record for ${did}:`, error);
535
535
return undefined;
536
536
}
537
537
}