elasticsearch-based configurable generic appview for prototyping ideas

Compare changes

Choose any two refs to compare.

+56 -2
config.json
··· 1 { 2 "jetstream_url": "wss://jetstream2.us-west.bsky.network/subscribe", 3 "es_url": "http://localhost:9200", 4 - "record_types": ["xyz.statusphere.status", "com.example.test.esav"], 5 "index_name": "appview-poc", 6 "serve_port": "8370", 7 "index_fields": { 8 "xyz.statusphere.status": { 9 "status": { 10 "id": "status", 11 - "type": "text" 12 } 13 }, 14 "com.example.test.esav": {
··· 1 { 2 "jetstream_url": "wss://jetstream2.us-west.bsky.network/subscribe", 3 "es_url": "http://localhost:9200", 4 + "record_types": [ 5 + "xyz.statusphere.status", 6 + "com.example.test.esav", 7 + "party.whey.ft.topic.post", 8 + "party.whey.ft.topic.reaction", 9 + "party.whey.ft.topic.moderation", 10 + "party.whey.ft.forum.definition", 11 + "party.whey.ft.forum.layout", 12 + "party.whey.ft.forum.request", 13 + "party.whey.ft.forum.accept", 14 + "party.whey.ft.forum.category", 15 + "party.whey.ft.user.profile" 16 + ], 17 "index_name": "appview-poc", 18 "serve_port": "8370", 19 "index_fields": { 20 + "party.whey.ft.topic.reaction": { 21 + "subject": { 22 + "id": "reactionSubject", 23 + "type": "keyword" 24 + }, 25 + "reactionEmoji": { 26 + "id": "reactionEmoji", 27 + "type": "keyword" 28 + } 29 + }, 30 + "party.whey.ft.topic.post": { 31 + "text": { 32 + "id": "text", 33 + "type": "text" 34 + }, 35 + "title": { 36 + "id": "title", 37 + "type": "text" 38 + }, 39 + "reply.root.uri": { 40 + "id": "root", 41 + "type": "keyword" 42 + }, 43 + "reply.parent.uri": { 44 + "id": "parent", 45 + "type": "keyword" 46 + }, 47 + "forum": { 48 + "id": "forum", 49 + "type": "keyword" 50 + } 51 + }, 52 + "party.whey.ft.forum.definition": { 53 + "description": { 54 + "id": "description", 55 + "type": "text" 56 + }, 57 + "displayName": { 58 + "id": "displayName", 59 + "type": "text" 60 + } 61 + }, 62 "xyz.statusphere.status": { 63 "status": { 64 "id": "status", 65 + "type": "keyword" 66 } 67 }, 68 "com.example.test.esav": {
+143 -6
readme.md
··· 1 # ESAV: ElasticSearch AppView 2 - terrible i know right lmao 3 4 - send ES queries to `/xrpc/com.example.prototypeESQuery` either POST or GET q= 5 6 - change config in `config.json` 7 8 - run `deno task dev` to start 9 10 - you need elasticsearch configured and running first though, good luck with that 11 12 - ## ForumTest, built with ESAV 13 14 check it out here -> [https://forumtest.whey.party](https://forumtest.whey.party) 15 repo: [https://tangled.sh/@whey.party/forumtest](https://tangled.sh/@whey.party/forumtest)
··· 1 # ESAV: ElasticSearch AppView 2 + Jetstream plugged into ElasticSearch with a queryable api 3 + (also now supports live queries) 4 + 5 + ## Queries 6 + ### the boring one 7 + send ES queries to `/xrpc/party.whey.esav.esQuery` either POST or GET q= 8 + 9 + the format is just a normal elasticsearch query, any of them should work 10 + 11 + ### the cooler Live one (sync over websocket) 12 + ESAV Live is a real-time indexing and query service ESAV. 13 + 14 + the websocket provides a live stream of new document at:// URIs that match a specific query 15 + 16 + send Live queries to `wss://{your domain}/xrpc/party.whey.esav.esSync` 17 + 18 + once a connection is open, the client must send a subscription request (the live query itelf). this message is a JSON object containing the query that defines the feed you are interested in. the query format is basically a dumbed down Elasticsearch Query DSL (stripped to the few stuff thats actually implemented, and thatll produce a deterministic order / result) 19 + 20 + for example, to subscribe to all statuses from the `xyz.statusphere.status` collection, send the JSON request bleow: 21 + 22 + ```json 23 + { 24 + "type": "subscribe", 25 + "queryId": "statusphere-main-page", 26 + "esquery": { 27 + "query": { 28 + "term": { 29 + "$metadata.collection": "xyz.statusphere.status" 30 + } 31 + }, 32 + "sort": [ 33 + { 34 + "$metadata.indexedAt": "desc" 35 + } 36 + ], 37 + "size": 100 38 + } 39 + } 40 + ``` 41 + 42 + 43 + and then server will send back JSON messages as new documents matching your query are indexed. the primary message type youll get is a `query-delta` which will contain an array of new document URIs for each active query, and also the set of new or modified documents to store. 44 + 45 + example server message: 46 + 47 + ```json 48 + { 49 + "type": "query-delta", 50 + "documents": { 51 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n": { 52 + "cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy", 53 + "doc": { 54 + "$metadata.uri": "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n", 55 + "$metadata.cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy", 56 + "$metadata.indexedAt": "2025-08-07T12:40:09.426Z", 57 + "$metadata.did": "did:plc:mn45tewwnse5btfftvd3powc", 58 + "$metadata.collection": "xyz.statusphere.status", 59 + "$metadata.rkey": "3lvsr2zqf3k2n", 60 + "status": "๐Ÿ‘", 61 + "$raw": { 62 + "$type": "xyz.statusphere.status", 63 + "createdAt": "2025-08-07T12:40:08.602Z", 64 + "status": "๐Ÿ‘" 65 + } 66 + } 67 + } 68 + }, 69 + "queries": { 70 + "statusphere-main-page": { 71 + "ecid": "bafyreidsxtbtnwce2o72wrwtwhcvc7olosdtvb2i4g6ezbs4kaxn3v3qna", 72 + "result": [ 73 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n", 74 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsov4tatf2n", 75 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsouyvrbr2t" 76 + ] 77 + } 78 + } 79 + } 80 + ``` 81 + 82 + in here, only one document is given because this is not the initial response after a registered live query. or alternatively, a live query was registered using a still-valid "ecid" value (the window is like 5 minutes so its really only for dropped connections). 83 + 84 + your application should listen for these messages and prepend the new URIs to its list of statuses, creating the real-time effect 85 + 86 + ### helper functions 87 + 88 + because we need to sometimes resolve their did from handle, or handle from did 89 + 90 + and also because despite your usage of a custom lexicon, we still need to fetch app.bsky.actor.profile for their pfp right 91 + 92 + fetch `https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity` 93 + 94 + with these params: 95 + - `did`: The DID of the user. 96 + - `handle`: The handle of the user. 97 + - `includeBskyProfile` (optional): `true` to include the the entire bsky profile object as well. 98 99 + **Example Request:** 100 101 + `https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity?did=did:plc:cjfima2v3vnyfuzieu7bvjx7&includeBskyProfile=true` 102 103 + gets me 104 105 + **Example Response (`200 OK`):** 106 107 + ```json 108 + { 109 + "did":"did:plc:cjfima2v3vnyfuzieu7bvjx7", 110 + "pdsUrl":"https://pds-nd.whey.party", 111 + "handle":"forumtest.whey.party", 112 + "profile": { 113 + "$type": "app.bsky.actor.profile", 114 + "avatar": { 115 + "$type": "blob", 116 + "ref": { 117 + "$link": "bafkreiabb6nrbpgguh5xaake3shoyxh2i7lyj7nj7j3ejk77hdlvt4lhmu" 118 + }, 119 + "mimeType": "image/png", 120 + "size": 35262 121 + }, 122 + "banner": { 123 + "$type": "blob", 124 + "ref": { 125 + "$link": "bafkreieupktgazj4vxuxbj6dyf4trvltgok5ukdsnvvblsqqw26cmsvn7y" 126 + }, 127 + "mimeType": "image/jpeg", 128 + "size": 931415 129 + }, 130 + "createdAt": "2025-08-04T06:27:34.561Z", 131 + "description": "ForumTest discussion and development", 132 + "displayName": "ForumTest" 133 + } 134 + } 135 + ``` 136 + 137 + > **Note on Performance**: This endpoint is called frequently. It is highly recommended to implement a client-side cache (e.g., a simple JavaScript `Map` or object) to store profile data keyed by DID. This will prevent your application from making redundant network requests for the same user profile, significantly improving performance. 138 + 139 + 140 + ## stuff built with ESAV 141 + 142 + ### Statusphere ESAV Live 143 + an example usage of ESAV Live with the "hello world!" of atproto 144 + 145 + check it out here -> [https://statusphere.whey.party/](https://statusphere.whey.party/) 146 + repo: [https://tangled.sh/@whey.party/statusphere-esav-live](https://tangled.sh/@whey.party/statusphere-esav-live) 147 + 148 + ### ForumTest 149 + atproto forum thing test 150 151 check it out here -> [https://forumtest.whey.party](https://forumtest.whey.party) 152 repo: [https://tangled.sh/@whey.party/forumtest](https://tangled.sh/@whey.party/forumtest)
+6 -1
src/firehose.ts
··· 4 interface FirehoseOptions { 5 config: AppConfig; 6 onEvent: OnEventCallback; 7 } 8 9 - export function startFirehose({ config, onEvent }: FirehoseOptions) { 10 let lastCursor: number | null = null; 11 12 const connect = () => { 13 const url = new URL(config.jetstream_url); 14 if (lastCursor !== null) { 15 url.searchParams.set("cursor", lastCursor.toString()); 16 } 17 18 const ws = new WebSocket(url.toString());
··· 4 interface FirehoseOptions { 5 config: AppConfig; 6 onEvent: OnEventCallback; 7 + forcedCursor?: string; 8 } 9 10 + export function startFirehose({ config, onEvent, forcedCursor }: FirehoseOptions) { 11 let lastCursor: number | null = null; 12 13 const connect = () => { 14 const url = new URL(config.jetstream_url); 15 if (lastCursor !== null) { 16 url.searchParams.set("cursor", lastCursor.toString()); 17 + } 18 + if (forcedCursor) { 19 + console.log("using forced cursor: ", forcedCursor) 20 + url.searchParams.set("cursor", forcedCursor); 21 } 22 23 const ws = new WebSocket(url.toString());
+14
src/live-utils.ts
··· 50 break; 51 } 52 53 case 'match': { 54 const [field, value] = Object.entries(clauseValue as object)[0]; 55 const fieldValue = getSafeField(doc, field);
··· 50 break; 51 } 52 53 + case 'terms': { 54 + const [field, queryValues] = Object.entries(clauseValue as object)[0]; 55 + if (!Array.isArray(queryValues)) return false; 56 + 57 + const docValue = getSafeField(doc, field); 58 + if (docValue === undefined) return false; 59 + 60 + if (Array.isArray(docValue)) { 61 + return docValue.some(v => queryValues.includes(v)); 62 + } else { 63 + return queryValues.includes(docValue); 64 + } 65 + } 66 + 67 case 'match': { 68 const [field, value] = Object.entries(clauseValue as object)[0]; 69 const fieldValue = getSafeField(doc, field);
+5 -1
src/main.ts
··· 3 import { ensureIndexMapping, type IndexerEvent } from "./indexer.ts"; 4 import { setupXRPCServer } from "./xrpc.ts"; 5 import { processEventForSync } from "./sync.ts"; 6 7 async function main() { 8 const config = await readConfig("./config.json"); 9 10 // prepare indexes 11 await ensureIndexMapping(config); ··· 17 onEvent: (event: IndexerEvent) => { 18 // ESAV Live !!! 19 return processEventForSync(event); 20 - } 21 }); 22 23 console.log("Server started and listening for events");
··· 3 import { ensureIndexMapping, type IndexerEvent } from "./indexer.ts"; 4 import { setupXRPCServer } from "./xrpc.ts"; 5 import { processEventForSync } from "./sync.ts"; 6 + import { parseArgs } from "jsr:@std/cli"; 7 8 async function main() { 9 const config = await readConfig("./config.json"); 10 + 11 + const args = parseArgs(Deno.args) 12 13 // prepare indexes 14 await ensureIndexMapping(config); ··· 20 onEvent: (event: IndexerEvent) => { 21 // ESAV Live !!! 22 return processEventForSync(event); 23 + }, 24 + forcedCursor: args["force-cursor"] 25 }); 26 27 console.log("Server started and listening for events");
+8 -8
src/xrpc.ts
··· 68 const handle = searchParams.get("handle"); 69 const did = searchParams.get("did"); 70 const clientCid = searchParams.get("cid"); 71 - const includePfp = searchParams.get("includePfp"); 72 73 if (!handle && !did) { 74 return new Response("handle or did parameter is required", { ··· 86 ...identity, 87 }; 88 89 - if (includePfp) { 90 - finalResponse.pfp = await getPfpUrl(identity.pdsUrl, identity.did); 91 } 92 93 const newCid = await computeCid(finalResponse); ··· 513 } 514 } 515 516 - async function getPfpUrl( 517 pdsUrl: string, 518 did: string 519 ): Promise<string | undefined> { ··· 527 return undefined; 528 } 529 const data = await response.json(); 530 - const cid = data?.value?.avatar?.ref?.["$link"]; 531 - const pfpurl = `${pdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 532 - return pfpurl; 533 } catch (error) { 534 - console.error(`Error fetching PFP for ${did}:`, error); 535 return undefined; 536 } 537 }
··· 68 const handle = searchParams.get("handle"); 69 const did = searchParams.get("did"); 70 const clientCid = searchParams.get("cid"); 71 + const includeBskyProfile = searchParams.get("includeBskyProfile"); 72 73 if (!handle && !did) { 74 return new Response("handle or did parameter is required", { ··· 86 ...identity, 87 }; 88 89 + if (includeBskyProfile) { 90 + finalResponse.profile = await getProfileRecord(identity.pdsUrl, identity.did); 91 } 92 93 const newCid = await computeCid(finalResponse); ··· 513 } 514 } 515 516 + async function getProfileRecord( 517 pdsUrl: string, 518 did: string 519 ): Promise<string | undefined> { ··· 527 return undefined; 528 } 529 const data = await response.json(); 530 + //const cid = data?.value?.avatar?.ref?.["$link"]; 531 + //const pfpurl = `${pdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 532 + return data.value; 533 } catch (error) { 534 + console.error(`Error fetching profile record for ${did}:`, error); 535 return undefined; 536 } 537 }