+1
.gitignore
+1
.gitignore
···
1
+
deno.lock
+51
config.json
+51
config.json
···
1
+
{
2
+
"jetstream_url": "wss://jetstream2.us-west.bsky.network/subscribe",
3
+
"es_url": "http://localhost:9200",
4
+
"record_types": ["xyz.statusphere.status", "com.example.test.esav"],
5
+
"index_name": "appview-poc",
6
+
"serve_port": "8370",
7
+
"index_fields": {
8
+
"xyz.statusphere.status": {
9
+
"status": {
10
+
"id": "status",
11
+
"type": "text"
12
+
}
13
+
},
14
+
"com.example.test.esav": {
15
+
"text": {
16
+
"id": "text",
17
+
"type": "text"
18
+
},
19
+
"reply.parent.uri": {
20
+
"id": "reply.parent.uri",
21
+
"type": "keyword"
22
+
},
23
+
"reply.root.uri": {
24
+
"id": "reply.root.uri",
25
+
"type": "keyword"
26
+
}
27
+
},
28
+
"app.bsky.feed.post": {
29
+
"text": {
30
+
"id": "text",
31
+
"type": "text"
32
+
},
33
+
"reply.parent.uri": {
34
+
"id": "reply.parent.uri",
35
+
"type": "keyword"
36
+
},
37
+
"reply.root.uri": {
38
+
"id": "reply.root.uri",
39
+
"type": "keyword"
40
+
}
41
+
},
42
+
"app.bsky.feed.like": {
43
+
"subject.uri":{
44
+
"id": "subject.uri",
45
+
"type": "keyword"
46
+
}
47
+
}
48
+
}
49
+
}
50
+
51
+
+10
readme.md
+10
readme.md
···
1
+
# Elastic Search AppView
2
+
terrible i know right lmao
3
+
4
+
send ES queries to `/xrpc/com.example.prototypeESQuery` either POST or GET q=
5
+
6
+
change config in `config.json`
7
+
8
+
run `deno task dev` to start
9
+
10
+
you need elasticsearch configured and running first though, good luck with that
+22
src/config.ts
+22
src/config.ts
···
1
+
export interface AppConfig {
2
+
jetstream_url: string;
3
+
es_url: string;
4
+
record_types: string[];
5
+
index_name: string;
6
+
serve_port: string;
7
+
index_fields: Record<
8
+
string,
9
+
Record<
10
+
string,
11
+
{
12
+
id: string;
13
+
type: "text" | "keyword";
14
+
}
15
+
>
16
+
>;
17
+
}
18
+
19
+
export async function readConfig(path: string): Promise<AppConfig> {
20
+
const text = await Deno.readTextFile(path);
21
+
return JSON.parse(text);
22
+
}
+37
src/firehose.ts
+37
src/firehose.ts
···
1
+
import { AppConfig } from "./config.ts";
2
+
import { indexDocument, deleteDocument } from "./indexer.ts";
3
+
4
+
export async function startFirehose(config: AppConfig) {
5
+
const ws = new WebSocket(config.jetstream_url);
6
+
7
+
ws.onopen = () => {
8
+
console.log("Connected to Jetstream");
9
+
};
10
+
11
+
ws.onmessage = async (msg) => {
12
+
const data = msg.data instanceof Blob ? await msg.data.text() : msg.data;
13
+
const evt = JSON.parse(data);
14
+
15
+
if (!evt?.commit) return;
16
+
17
+
const opType = evt.commit.operation;
18
+
const cid = evt.commit.cid;
19
+
const collection = evt.commit.collection;
20
+
const rkey = evt.commit.rkey;
21
+
const aturi = `at://${evt.did}/${collection}/${rkey}`;
22
+
const record = evt.commit.record;
23
+
const indexedAt = new Date(evt.time_us / 1000).toISOString();
24
+
25
+
if (!config.record_types.includes(collection)) return;
26
+
27
+
if (opType === "create" || opType === "edit") {
28
+
await indexDocument(config, aturi, record, cid, indexedAt);
29
+
} else if (opType === "delete") {
30
+
await deleteDocument(config, aturi);
31
+
}
32
+
};
33
+
34
+
ws.onerror = (e) => {
35
+
console.error("Jetstream error:", e);
36
+
};
37
+
}
+123
src/indexer.ts
+123
src/indexer.ts
···
1
+
import { AppConfig } from "./config.ts";
2
+
3
+
function atUriParts(uri: string) {
4
+
const parts = uri.split("/");
5
+
return {
6
+
did: parts[2],
7
+
collection: parts[3],
8
+
rkey: parts[4],
9
+
};
10
+
}
11
+
12
+
export async function ensureIndexMapping(config: AppConfig) {
13
+
const properties: Record<string, any> = {};
14
+
15
+
for (const fields of Object.values(config.index_fields)) {
16
+
for (const { id, type } of Object.values(fields)) {
17
+
properties[id] = {
18
+
type: type === "text" ? "text" : "keyword"
19
+
};
20
+
}
21
+
}
22
+
23
+
for (const key of [
24
+
"$metadata.uri",
25
+
"$metadata.cid",
26
+
"$metadata.indexedAt",
27
+
"$metadata.did",
28
+
"$metadata.collection",
29
+
"$metadata.rkey",
30
+
]) {
31
+
properties[key] = { type: "keyword" };
32
+
}
33
+
34
+
await fetch(`${config.es_url}/${config.index_name}`, {
35
+
method: "PUT",
36
+
headers: { "Content-Type": "application/json" },
37
+
body: JSON.stringify({
38
+
mappings: { properties }
39
+
}),
40
+
});
41
+
}
42
+
43
+
function extractIndexedFields(
44
+
record: any,
45
+
indexSpec: Record<string, { id: string; type: string }>
46
+
): Record<string, unknown> {
47
+
const result: Record<string, unknown> = {};
48
+
49
+
for (const [jsonPath, { id }] of Object.entries(indexSpec)) {
50
+
const pathSegments = jsonPath.split(".");
51
+
let value = record;
52
+
for (const segment of pathSegments) {
53
+
if (typeof value !== "object" || value === null) {
54
+
value = undefined;
55
+
break;
56
+
}
57
+
value = value[segment];
58
+
}
59
+
60
+
if (value !== undefined) {
61
+
result[id] = value;
62
+
}
63
+
}
64
+
65
+
return result;
66
+
}
67
+
68
+
export async function indexDocument(
69
+
config: AppConfig,
70
+
uri: string,
71
+
record: any,
72
+
cid: string,
73
+
indexedAt: string
74
+
) {
75
+
const { did, collection, rkey } = atUriParts(uri);
76
+
const recordType = collection;
77
+
78
+
const indexSpec = config.index_fields?.[recordType];
79
+
if (!indexSpec) return; // skip if record type not configured
80
+
81
+
const indexedFields = extractIndexedFields(record, indexSpec);
82
+
83
+
const metadata = {
84
+
"$metadata.uri": uri,
85
+
"$metadata.cid": cid,
86
+
"$metadata.indexedAt": indexedAt,
87
+
"$metadata.did": did,
88
+
"$metadata.collection": collection,
89
+
"$metadata.rkey": rkey,
90
+
};
91
+
92
+
const body = {
93
+
...metadata,
94
+
...indexedFields,
95
+
"$raw": record,
96
+
};
97
+
98
+
const res = await fetch(
99
+
`${config.es_url}/${config.index_name}/_doc/${encodeURIComponent(uri)}`,
100
+
{
101
+
method: "PUT",
102
+
headers: { "Content-Type": "application/json" },
103
+
body: JSON.stringify(body),
104
+
}
105
+
);
106
+
107
+
if (!res.ok) {
108
+
console.error("Indexing failed:", await res.text());
109
+
}
110
+
}
111
+
112
+
export async function deleteDocument(config: AppConfig, uri: string) {
113
+
const res = await fetch(
114
+
`${config.es_url}/${config.index_name}/_doc/${encodeURIComponent(uri)}`,
115
+
{
116
+
method: "DELETE",
117
+
}
118
+
);
119
+
120
+
if (!res.ok && res.status !== 404) {
121
+
console.error("Delete failed:", await res.text());
122
+
}
123
+
}
+13
src/main.ts
+13
src/main.ts
···
1
+
import { readConfig } from "./config.ts";
2
+
import { startFirehose } from "./firehose.ts";
3
+
import { ensureIndexMapping } from "./indexer.ts";
4
+
import { setupXRPCServer } from "./xrpc.ts";
5
+
6
+
const config = await readConfig("./config.json");
7
+
8
+
// prepare indexes
9
+
ensureIndexMapping(config);
10
+
11
+
startFirehose(config);
12
+
13
+
setupXRPCServer(config);
+49
src/xrpc.ts
+49
src/xrpc.ts
···
1
+
import { AppConfig } from "../src/config.ts";
2
+
import { serve } from "https://deno.land/std/http/server.ts";
3
+
4
+
export function setupXRPCServer(config: AppConfig) {
5
+
serve(async (req) => {
6
+
const url = new URL(req.url);
7
+
8
+
if (url.pathname === "/xrpc/com.example.prototypeESQuery") {
9
+
let esQuery: any;
10
+
11
+
if (req.method === "POST") {
12
+
try {
13
+
esQuery = await req.json();
14
+
} catch {
15
+
return new Response("Invalid JSON body", { status: 400 });
16
+
}
17
+
} else if (req.method === "GET") {
18
+
const queryParam = url.searchParams.get("q");
19
+
if (!queryParam) {
20
+
return new Response("Missing 'q'", { status: 400 });
21
+
}
22
+
23
+
try {
24
+
esQuery = JSON.parse(queryParam);
25
+
} catch {
26
+
return new Response("Invalid JSON in 'q'", { status: 400 });
27
+
}
28
+
} else {
29
+
return new Response("Method not allowed", { status: 405 });
30
+
}
31
+
32
+
const esRes = await fetch(`${config.es_url}/${config.index_name}/_search`, {
33
+
method: "POST",
34
+
headers: { "Content-Type": "application/json" },
35
+
body: JSON.stringify(esQuery),
36
+
});
37
+
38
+
if (!esRes.ok) {
39
+
const errText = await esRes.text();
40
+
return new Response(`Elasticsearch error: ${errText}`, { status: 500 });
41
+
}
42
+
43
+
const result = await esRes.json();
44
+
return Response.json(result.hits.hits.map((hit: any) => hit._source));
45
+
}
46
+
47
+
return new Response("Not found", { status: 404 });
48
+
}, { port: Number(config.serve_port) });
49
+
}