elasticsearch-based configurable generic appview for prototyping ideas

Compare changes

Choose any two refs to compare.

+1
.gitignore
··· 1 1 deno.lock 2 2 reindex.sh 3 + scripts/reindex.ts
+56 -2
config.json
··· 1 1 { 2 2 "jetstream_url": "wss://jetstream2.us-west.bsky.network/subscribe", 3 3 "es_url": "http://localhost:9200", 4 - "record_types": ["xyz.statusphere.status", "com.example.test.esav"], 4 + "record_types": [ 5 + "xyz.statusphere.status", 6 + "com.example.test.esav", 7 + "party.whey.ft.topic.post", 8 + "party.whey.ft.topic.reaction", 9 + "party.whey.ft.topic.moderation", 10 + "party.whey.ft.forum.definition", 11 + "party.whey.ft.forum.layout", 12 + "party.whey.ft.forum.request", 13 + "party.whey.ft.forum.accept", 14 + "party.whey.ft.forum.category", 15 + "party.whey.ft.user.profile" 16 + ], 5 17 "index_name": "appview-poc", 6 18 "serve_port": "8370", 7 19 "index_fields": { 20 + "party.whey.ft.topic.reaction": { 21 + "subject": { 22 + "id": "reactionSubject", 23 + "type": "keyword" 24 + }, 25 + "reactionEmoji": { 26 + "id": "reactionEmoji", 27 + "type": "keyword" 28 + } 29 + }, 30 + "party.whey.ft.topic.post": { 31 + "text": { 32 + "id": "text", 33 + "type": "text" 34 + }, 35 + "title": { 36 + "id": "title", 37 + "type": "text" 38 + }, 39 + "reply.root.uri": { 40 + "id": "root", 41 + "type": "keyword" 42 + }, 43 + "reply.parent.uri": { 44 + "id": "parent", 45 + "type": "keyword" 46 + }, 47 + "forum": { 48 + "id": "forum", 49 + "type": "keyword" 50 + } 51 + }, 52 + "party.whey.ft.forum.definition": { 53 + "description": { 54 + "id": "description", 55 + "type": "text" 56 + }, 57 + "displayName": { 58 + "id": "displayName", 59 + "type": "text" 60 + } 61 + }, 8 62 "xyz.statusphere.status": { 9 63 "status": { 10 64 "id": "status", 11 - "type": "text" 65 + "type": "keyword" 12 66 } 13 67 }, 14 68 "com.example.test.esav": {
+143 -6
readme.md
··· 1 1 # ESAV: ElasticSearch AppView 2 - terrible i know right lmao 2 + Jetstream plugged into ElasticSearch with a queryable api 3 + (also now supports live queries) 4 + 5 + ## Queries 6 + ### the boring one 7 + send ES queries to `/xrpc/party.whey.esav.esQuery` either POST or GET q= 8 + 9 + the format is just a normal elasticsearch query, any of them should work 10 + 11 + ### the cooler Live one (sync over websocket) 12 + ESAV Live is a real-time indexing and query service ESAV. 13 + 14 + the websocket provides a live stream of new document at:// URIs that match a specific query 15 + 16 + send Live queries to `wss://{your domain}/xrpc/party.whey.esav.esSync` 17 + 18 + once a connection is open, the client must send a subscription request (the live query itelf). this message is a JSON object containing the query that defines the feed you are interested in. the query format is basically a dumbed down Elasticsearch Query DSL (stripped to the few stuff thats actually implemented, and thatll produce a deterministic order / result) 19 + 20 + for example, to subscribe to all statuses from the `xyz.statusphere.status` collection, send the JSON request bleow: 21 + 22 + ```json 23 + { 24 + "type": "subscribe", 25 + "queryId": "statusphere-main-page", 26 + "esquery": { 27 + "query": { 28 + "term": { 29 + "$metadata.collection": "xyz.statusphere.status" 30 + } 31 + }, 32 + "sort": [ 33 + { 34 + "$metadata.indexedAt": "desc" 35 + } 36 + ], 37 + "size": 100 38 + } 39 + } 40 + ``` 41 + 42 + 43 + and then server will send back JSON messages as new documents matching your query are indexed. the primary message type youll get is a `query-delta` which will contain an array of new document URIs for each active query, and also the set of new or modified documents to store. 44 + 45 + example server message: 46 + 47 + ```json 48 + { 49 + "type": "query-delta", 50 + "documents": { 51 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n": { 52 + "cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy", 53 + "doc": { 54 + "$metadata.uri": "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n", 55 + "$metadata.cid": "bafyreidym6ad6k7grbz7nrqrnddht2rwxlnimbgbfddthrmftv7mbfk7gy", 56 + "$metadata.indexedAt": "2025-08-07T12:40:09.426Z", 57 + "$metadata.did": "did:plc:mn45tewwnse5btfftvd3powc", 58 + "$metadata.collection": "xyz.statusphere.status", 59 + "$metadata.rkey": "3lvsr2zqf3k2n", 60 + "status": "๐Ÿ‘", 61 + "$raw": { 62 + "$type": "xyz.statusphere.status", 63 + "createdAt": "2025-08-07T12:40:08.602Z", 64 + "status": "๐Ÿ‘" 65 + } 66 + } 67 + } 68 + }, 69 + "queries": { 70 + "statusphere-main-page": { 71 + "ecid": "bafyreidsxtbtnwce2o72wrwtwhcvc7olosdtvb2i4g6ezbs4kaxn3v3qna", 72 + "result": [ 73 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsr2zqf3k2n", 74 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsov4tatf2n", 75 + "at://did:plc:mn45tewwnse5btfftvd3powc/xyz.statusphere.status/3lvsouyvrbr2t" 76 + ] 77 + } 78 + } 79 + } 80 + ``` 81 + 82 + in here, only one document is given because this is not the initial response after a registered live query. or alternatively, a live query was registered using a still-valid "ecid" value (the window is like 5 minutes so its really only for dropped connections). 83 + 84 + your application should listen for these messages and prepend the new URIs to its list of statuses, creating the real-time effect 85 + 86 + ### helper functions 87 + 88 + because we need to sometimes resolve their did from handle, or handle from did 89 + 90 + and also because despite your usage of a custom lexicon, we still need to fetch app.bsky.actor.profile for their pfp right 91 + 92 + fetch `https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity` 93 + 94 + with these params: 95 + - `did`: The DID of the user. 96 + - `handle`: The handle of the user. 97 + - `includeBskyProfile` (optional): `true` to include the the entire bsky profile object as well. 3 98 4 - send ES queries to `/xrpc/com.example.prototypeESQuery` either POST or GET q= 99 + **Example Request:** 5 100 6 - change config in `config.json` 101 + `https://esav.whey.party/xrpc/party.whey.esav.resolveIdentity?did=did:plc:cjfima2v3vnyfuzieu7bvjx7&includeBskyProfile=true` 7 102 8 - run `deno task dev` to start 103 + gets me 9 104 10 - you need elasticsearch configured and running first though, good luck with that 105 + **Example Response (`200 OK`):** 11 106 12 - ## ForumTest, built with ESAV 107 + ```json 108 + { 109 + "did":"did:plc:cjfima2v3vnyfuzieu7bvjx7", 110 + "pdsUrl":"https://pds-nd.whey.party", 111 + "handle":"forumtest.whey.party", 112 + "profile": { 113 + "$type": "app.bsky.actor.profile", 114 + "avatar": { 115 + "$type": "blob", 116 + "ref": { 117 + "$link": "bafkreiabb6nrbpgguh5xaake3shoyxh2i7lyj7nj7j3ejk77hdlvt4lhmu" 118 + }, 119 + "mimeType": "image/png", 120 + "size": 35262 121 + }, 122 + "banner": { 123 + "$type": "blob", 124 + "ref": { 125 + "$link": "bafkreieupktgazj4vxuxbj6dyf4trvltgok5ukdsnvvblsqqw26cmsvn7y" 126 + }, 127 + "mimeType": "image/jpeg", 128 + "size": 931415 129 + }, 130 + "createdAt": "2025-08-04T06:27:34.561Z", 131 + "description": "ForumTest discussion and development", 132 + "displayName": "ForumTest" 133 + } 134 + } 135 + ``` 136 + 137 + > **Note on Performance**: This endpoint is called frequently. It is highly recommended to implement a client-side cache (e.g., a simple JavaScript `Map` or object) to store profile data keyed by DID. This will prevent your application from making redundant network requests for the same user profile, significantly improving performance. 138 + 139 + 140 + ## stuff built with ESAV 141 + 142 + ### Statusphere ESAV Live 143 + an example usage of ESAV Live with the "hello world!" of atproto 144 + 145 + check it out here -> [https://statusphere.whey.party/](https://statusphere.whey.party/) 146 + repo: [https://tangled.sh/@whey.party/statusphere-esav-live](https://tangled.sh/@whey.party/statusphere-esav-live) 147 + 148 + ### ForumTest 149 + atproto forum thing test 13 150 14 151 check it out here -> [https://forumtest.whey.party](https://forumtest.whey.party) 15 152 repo: [https://tangled.sh/@whey.party/forumtest](https://tangled.sh/@whey.party/forumtest)
+54 -26
src/firehose.ts
··· 1 1 import { AppConfig } from "./config.ts"; 2 - import { indexDocument, deleteDocument } from "./indexer.ts"; 2 + import { indexDocument, deleteDocument, type OnEventCallback } from "./indexer.ts"; 3 3 4 - export async function startFirehose(config: AppConfig) { 5 - const ws = new WebSocket(config.jetstream_url); 4 + interface FirehoseOptions { 5 + config: AppConfig; 6 + onEvent: OnEventCallback; 7 + forcedCursor?: string; 8 + } 6 9 7 - ws.onopen = () => { 8 - console.log("Connected to Jetstream"); 9 - }; 10 + export function startFirehose({ config, onEvent, forcedCursor }: FirehoseOptions) { 11 + let lastCursor: number | null = null; 10 12 11 - ws.onmessage = async (msg) => { 12 - const data = msg.data instanceof Blob ? await msg.data.text() : msg.data; 13 - const evt = JSON.parse(data); 13 + const connect = () => { 14 + const url = new URL(config.jetstream_url); 15 + if (lastCursor !== null) { 16 + url.searchParams.set("cursor", lastCursor.toString()); 17 + } 18 + if (forcedCursor) { 19 + console.log("using forced cursor: ", forcedCursor) 20 + url.searchParams.set("cursor", forcedCursor); 21 + } 14 22 15 - if (!evt?.commit) return; 23 + const ws = new WebSocket(url.toString()); 16 24 17 - const opType = evt.commit.operation; 18 - const cid = evt.commit.cid; 19 - const collection = evt.commit.collection; 20 - const rkey = evt.commit.rkey; 21 - const aturi = `at://${evt.did}/${collection}/${rkey}`; 22 - const record = evt.commit.record; 23 - const indexedAt = new Date(evt.time_us / 1000).toISOString(); 25 + ws.onopen = () => { 26 + console.log("Connected to Jetstream", lastCursor !== null ? `(resuming from cursor ${lastCursor})` : ""); 27 + }; 24 28 25 - if (!config.record_types.includes(collection)) return; 29 + ws.onmessage = async (msg) => { 30 + const data = msg.data instanceof Blob ? await msg.data.text() : msg.data; 31 + const evt = JSON.parse(data); 26 32 27 - if (opType === "create" || opType === "update") { 28 - await indexDocument(config, aturi, record, cid, indexedAt); 29 - } else if (opType === "delete") { 30 - await deleteDocument(config, aturi); 31 - } 32 - }; 33 + if (!evt?.commit) return; 33 34 34 - ws.onerror = (e) => { 35 - console.error("Jetstream error:", e); 35 + lastCursor = evt.time_us; 36 + 37 + const opType = evt.commit.operation; 38 + const cid = evt.commit.cid; 39 + const collection = evt.commit.collection; 40 + const rkey = evt.commit.rkey; 41 + const aturi = `at://${evt.did}/${collection}/${rkey}`; 42 + const record = evt.commit.record; 43 + const indexedAt = new Date(evt.time_us / 1000).toISOString(); 44 + 45 + if (!config.record_types.includes(collection)) return; 46 + 47 + if (opType === "create" || opType === "update") { 48 + await indexDocument(config, onEvent, aturi, record, cid, indexedAt); 49 + } else if (opType === "delete") { 50 + await deleteDocument(config, onEvent, aturi); 51 + } 52 + }; 53 + 54 + ws.onerror = (e) => { 55 + console.error("Jetstream WebSocket error:", e); 56 + }; 57 + 58 + ws.onclose = (e) => { 59 + console.warn("Jetstream disconnected. Attempting to reconnect in 1 second...", e.reason); 60 + setTimeout(connect, 1000); 61 + }; 36 62 }; 63 + 64 + connect(); 37 65 }
+54 -6
src/indexer.ts
··· 9 9 }; 10 10 } 11 11 12 + function flattenValues(input: unknown): unknown[] { 13 + const result: unknown[] = []; 14 + 15 + function recurse(value: unknown) { 16 + if (value === null || typeof value !== "object") { 17 + result.push(value); 18 + } else if (Array.isArray(value)) { 19 + for (const item of value) recurse(item); 20 + } else if (value) { 21 + for (const key in value) { 22 + recurse((value as Record<string, unknown>)[key]); 23 + } 24 + } 25 + } 26 + 27 + recurse(input); 28 + return result; 29 + } 30 + 12 31 export async function ensureIndexMapping(config: AppConfig) { 13 32 const properties: Record<string, any> = {}; 14 33 ··· 46 65 ): Record<string, unknown> { 47 66 const result: Record<string, unknown> = {}; 48 67 49 - for (const [jsonPath, { id }] of Object.entries(indexSpec)) { 50 - const pathSegments = jsonPath.split("."); 68 + for (const [rawPath, { id }] of Object.entries(indexSpec)) { 69 + const [path, modifier] = rawPath.split("#"); 70 + const pathSegments = path.split("."); 71 + 51 72 let value = record; 52 73 for (const segment of pathSegments) { 53 74 if (typeof value !== "object" || value === null) { ··· 57 78 value = value[segment]; 58 79 } 59 80 60 - if (value !== undefined) { 81 + if (value === undefined) continue; 82 + 83 + if (modifier === "flatten") { 84 + const flattened = flattenValues(value); 85 + if (flattened.length > 0) { 86 + result[id] = flattened; 87 + } 88 + } else { 61 89 result[id] = value; 62 90 } 63 91 } ··· 65 93 return result; 66 94 } 67 95 96 + export type IndexerEvent = 97 + | { type: 'index'; uri: string; data: Record<string, unknown> } 98 + | { type: 'delete'; uri: string }; 99 + 100 + export type OnEventCallback = (event: IndexerEvent) => Promise<void> | void; 101 + 68 102 export async function indexDocument( 69 103 config: AppConfig, 104 + onEvent: OnEventCallback, 70 105 uri: string, 71 - record: any, 106 + record: Record<string, unknown>, 72 107 cid: string, 73 108 indexedAt: string 74 109 ) { ··· 76 111 const recordType = collection; 77 112 78 113 const indexSpec = config.index_fields?.[recordType]; 79 - if (!indexSpec) return; // skip if record type not configured 114 + //if (!indexSpec) return; 80 115 81 116 const indexedFields = extractIndexedFields(record, indexSpec); 82 117 ··· 103 138 body: JSON.stringify(body), 104 139 } 105 140 ); 141 + await onEvent({ 142 + type: 'index', 143 + uri, 144 + data: body 145 + }); 106 146 107 147 if (!res.ok) { 108 148 console.error("Indexing failed:", await res.text()); 109 149 } 110 150 } 111 151 112 - export async function deleteDocument(config: AppConfig, uri: string) { 152 + export async function deleteDocument( 153 + config: AppConfig, 154 + onEvent: OnEventCallback, 155 + uri: string 156 + ) { 113 157 const res = await fetch( 114 158 `${config.es_url}/${config.index_name}/_doc/${encodeURIComponent(uri)}`, 115 159 { 116 160 method: "DELETE", 117 161 } 118 162 ); 163 + await onEvent({ 164 + type: 'delete', 165 + uri 166 + }); 119 167 120 168 if (!res.ok && res.status !== 404) { 121 169 console.error("Delete failed:", await res.text());
+224
src/live-utils.ts
··· 1 + type QueryClause = { 2 + match?: Record<string, unknown>; 3 + term?: Record<string, unknown>; 4 + bool?: { 5 + must?: QueryClause[]; 6 + must_not?: QueryClause[]; 7 + should?: QueryClause[]; 8 + filter?: QueryClause[]; 9 + }; 10 + range?: Record<string, Record<string, unknown>>; 11 + nested?: { 12 + path: string; 13 + query: QueryClause; 14 + }; 15 + exists?: { 16 + field: string; 17 + }; 18 + }; 19 + 20 + // function getField(obj: Record<string, any>, path: string): any { 21 + // return path.split('.').reduce((acc, part) => acc && acc[part], obj); 22 + // } 23 + 24 + export function getSafeField(obj: Record<string, any>, path: string): any { 25 + if (obj && typeof obj === 'object' && Object.prototype.hasOwnProperty.call(obj, path)) { 26 + return obj[path]; 27 + } 28 + return path.split('.').reduce((acc, part) => acc?.[part], obj); 29 + } 30 + 31 + export function matchDocAgainstQuery(doc: Record<string, unknown>, query: QueryClause): boolean { 32 + console.log(JSON.stringify(doc, null, 2)); 33 + if (!query || Object.keys(query).length === 0) { 34 + console.log("โœ… Accepted: No query provided, accepting all docs."); 35 + return true; 36 + } 37 + 38 + for (const key in query) { 39 + const clauseValue = (query as any)[key]; 40 + 41 + switch (key) { 42 + case 'term': { 43 + const [field, value] = Object.entries(clauseValue as object)[0]; 44 + const docValue = getSafeField(doc, field); 45 + if (docValue !== value) { 46 + console.log(`โŒ Rejected: term mismatch on field '${field}', expected '${value}', got '${docValue}'`); 47 + return false; 48 + } 49 + console.log(`โœ… Passed: term match on field '${field}' with value '${value}'`); 50 + break; 51 + } 52 + 53 + case 'terms': { 54 + const [field, queryValues] = Object.entries(clauseValue as object)[0]; 55 + if (!Array.isArray(queryValues)) return false; 56 + 57 + const docValue = getSafeField(doc, field); 58 + if (docValue === undefined) return false; 59 + 60 + if (Array.isArray(docValue)) { 61 + return docValue.some(v => queryValues.includes(v)); 62 + } else { 63 + return queryValues.includes(docValue); 64 + } 65 + } 66 + 67 + case 'match': { 68 + const [field, value] = Object.entries(clauseValue as object)[0]; 69 + const fieldValue = getSafeField(doc, field); 70 + if (typeof fieldValue !== 'string') { 71 + console.log(`โŒ Rejected: match expected string field '${field}', got non-string:`, fieldValue); 72 + return false; 73 + } 74 + if (!fieldValue.includes(String(value))) { 75 + console.log(`โŒ Rejected: match failed on field '${field}', expected to include '${value}', got '${fieldValue}'`); 76 + return false; 77 + } 78 + console.log(`โœ… Passed: match succeeded on field '${field}' includes '${value}'`); 79 + break; 80 + } 81 + 82 + case 'bool': { 83 + const boolQuery = clauseValue as QueryClause['bool']; 84 + 85 + if (boolQuery?.must?.some(q => !matchDocAgainstQuery(doc, q))) { 86 + console.log("โŒ Rejected: bool.must clause failed."); 87 + return false; 88 + } 89 + if (boolQuery?.filter?.some(q => !matchDocAgainstQuery(doc, q))) { 90 + console.log("โŒ Rejected: bool.filter clause failed."); 91 + return false; 92 + } 93 + if (boolQuery?.must_not?.some(q => matchDocAgainstQuery(doc, q))) { 94 + console.log("โŒ Rejected: bool.must_not clause matched (should not have)."); 95 + return false; 96 + } 97 + if (boolQuery?.should?.length && !boolQuery.should.some(q => matchDocAgainstQuery(doc, q))) { 98 + console.log("โŒ Rejected: bool.should clause did not match any."); 99 + return false; 100 + } 101 + console.log("โœ… Passed: bool clause matched."); 102 + break; 103 + } 104 + 105 + case 'range': { 106 + const [field, range] = Object.entries(clauseValue as object)[0]; 107 + const value = getSafeField(doc, field); 108 + if (value === undefined) { 109 + console.log(`โŒ Rejected: range field '${field}' is undefined in doc.`); 110 + return false; 111 + } 112 + 113 + for (const [op, opValue] of Object.entries(range as object)) { 114 + const numValue = Number(value); 115 + const numOpValue = Number(opValue); 116 + const isNumeric = !isNaN(numValue) && !isNaN(numOpValue); 117 + let passed = true; 118 + 119 + switch (op) { 120 + case 'gt': 121 + passed = isNumeric ? numValue > numOpValue : String(value) > String(opValue); 122 + break; 123 + case 'gte': 124 + passed = isNumeric ? numValue >= numOpValue : String(value) >= String(opValue); 125 + break; 126 + case 'lt': 127 + passed = isNumeric ? numValue < numOpValue : String(value) < String(opValue); 128 + break; 129 + case 'lte': 130 + passed = isNumeric ? numValue <= numOpValue : String(value) <= String(opValue); 131 + break; 132 + } 133 + 134 + if (!passed) { 135 + console.log(`โŒ Rejected: range.${op} failed on field '${field}', value '${value}' vs '${opValue}'`); 136 + return false; 137 + } 138 + } 139 + 140 + console.log(`โœ… Passed: range match on field '${field}'`); 141 + break; 142 + } 143 + 144 + case 'exists': { 145 + const field = (clauseValue as { field: string }).field; 146 + if (getSafeField(doc, field) === undefined) { 147 + console.log(`โŒ Rejected: exists failed, field '${field}' not found`); 148 + return false; 149 + } 150 + console.log(`โœ… Passed: exists matched, field '${field}' exists`); 151 + break; 152 + } 153 + 154 + default: 155 + console.log(`โ„น๏ธ Ignored unknown query clause: '${key}'`); 156 + break; 157 + } 158 + } 159 + 160 + console.log("โœ… Accepted: Document matched all clauses."); 161 + return true; 162 + } 163 + 164 + const ALLOWED_QUERY_CLAUSES = new Set([ 165 + 'term', 166 + 'terms', 167 + 'bool', 168 + 'range', 169 + 'exists', 170 + ]); 171 + 172 + const FORBIDDEN_QUERY_CLAUSES = new Set([ 173 + 'match', 174 + 'multi_match', 175 + 'match_phrase', 176 + 'query_string', 177 + 'simple_query_string', 178 + 'fuzzy', 179 + 'script', 180 + 'function_score', 181 + 'more_like_this', 182 + 'percolate', 183 + ]); 184 + 185 + function validateClause(clause: Record<string, unknown>) { 186 + for (const key in clause) { 187 + if (FORBIDDEN_QUERY_CLAUSES.has(key)) { 188 + throw new Error(`Query clause '${key}' is not allowed for live sync due to non-deterministic behavior. Please use deterministic clauses like 'term' or 'range'.`); 189 + } 190 + 191 + if (!ALLOWED_QUERY_CLAUSES.has(key)) { 192 + throw new Error(`Query clause '${key}' is not supported for live sync.`); 193 + } 194 + 195 + const value = clause[key]; 196 + if (key === 'bool' && typeof value === 'object' && value !== null) { 197 + const boolClauses = value as Record<string, unknown[]>; 198 + for (const boolType of ['must', 'filter', 'should', 'must_not']) { 199 + if (Array.isArray(boolClauses[boolType])) { 200 + for (const subClause of boolClauses[boolType]) { 201 + if (typeof subClause === 'object' && subClause !== null) { 202 + validateClause(subClause as Record<string, unknown>); 203 + } 204 + } 205 + } 206 + } 207 + } 208 + } 209 + } 210 + 211 + export function validateLiveQuery(esQuery: Record<string, unknown>): void { 212 + // a query must have an explicit 'sort' clause 213 + // which prevents non-deterministic sorting (bad) 214 + const sortClause = esQuery.sort as any[]; 215 + if (!Array.isArray(sortClause) || sortClause.length === 0) { 216 + throw new Error("Live queries must include an explicit 'sort' clause for deterministic ordering (e.g., sort: [{ '$metadata.indexedAt': 'desc' }])."); 217 + } 218 + 219 + // and also of the supported deterministic clauses (no text searches) 220 + const queryPart = esQuery.query as Record<string, unknown>; 221 + if (queryPart) { 222 + validateClause(queryPart); 223 + } 224 + }
+26 -6
src/main.ts
··· 1 1 import { readConfig } from "./config.ts"; 2 2 import { startFirehose } from "./firehose.ts"; 3 - import { ensureIndexMapping } from "./indexer.ts"; 3 + import { ensureIndexMapping, type IndexerEvent } from "./indexer.ts"; 4 4 import { setupXRPCServer } from "./xrpc.ts"; 5 + import { processEventForSync } from "./sync.ts"; 6 + import { parseArgs } from "jsr:@std/cli"; 5 7 6 - const config = await readConfig("./config.json"); 8 + async function main() { 9 + const config = await readConfig("./config.json"); 7 10 8 - // prepare indexes 9 - ensureIndexMapping(config); 11 + const args = parseArgs(Deno.args) 10 12 11 - startFirehose(config); 13 + // prepare indexes 14 + await ensureIndexMapping(config); 12 15 13 - setupXRPCServer(config); 16 + setupXRPCServer(config); 17 + 18 + startFirehose({ 19 + config, 20 + onEvent: (event: IndexerEvent) => { 21 + // ESAV Live !!! 22 + return processEventForSync(event); 23 + }, 24 + forcedCursor: args["force-cursor"] 25 + }); 26 + 27 + console.log("Server started and listening for events"); 28 + } 29 + 30 + main().catch(err => { 31 + console.error("Fatal error in main:", err); 32 + Deno.exit(1); 33 + });
+185
src/sync.ts
··· 1 + import { subscriptions, computeCid, type Subscription, type SubscriptionResult } from "./xrpc.ts"; 2 + import { type IndexerEvent } from "./indexer.ts"; 3 + import { getSafeField, matchDocAgainstQuery } from "./live-utils.ts"; 4 + 5 + interface ClientDelta { 6 + documents: Record<string, { cid: string; doc: Record<string, unknown> }>; 7 + queries: Record<string, { ecid: string; result: string[] }>; 8 + } 9 + 10 + function extractSortValues(doc: Record<string, any>, sortClause: any[]): unknown[] { 11 + const values: unknown[] = []; 12 + for (const sortField of sortClause) { 13 + const key = Object.keys(sortField)[0]; 14 + const value = getSafeField(doc, key); 15 + values.push(value); 16 + } 17 + return values; 18 + } 19 + 20 + function compareItems(aSorts: unknown[], bSorts: unknown[], sortClause: any[]): number { 21 + for (let i = 0; i < sortClause.length; i++) { 22 + const direction = Object.values(sortClause[i])[0]; 23 + const valA = aSorts[i]; 24 + const valB = bSorts[i]; 25 + 26 + if (valA === valB) continue; 27 + 28 + if (valA === undefined || valA === null) return 1; 29 + if (valB === undefined || valB === null) return -1; 30 + 31 + let comparison = 0; 32 + if (valA < valB) { 33 + comparison = -1; 34 + } else if (valA > valB) { 35 + comparison = 1; 36 + } 37 + 38 + if (comparison !== 0) { 39 + return direction === 'desc' ? -comparison : comparison; 40 + } 41 + } 42 + return 0; 43 + } 44 + 45 + function findInsertIndex(sortedArray: Subscription['result'], newItem: SubscriptionResult, sortClause: any[]): number { 46 + let low = 0; 47 + let high = sortedArray.length; 48 + 49 + while (low < high) { 50 + const mid = Math.floor((low + high) / 2); 51 + const comparison = compareItems(newItem.sortValues, sortedArray[mid].sortValues, sortClause); 52 + if (comparison < 0) { 53 + high = mid; 54 + } else { 55 + low = mid + 1; 56 + } 57 + } 58 + return low; 59 + } 60 + 61 + export async function processEventForSync(event: IndexerEvent) { 62 + const requiredDocuments = new Set<string>(); 63 + 64 + const changedQueries: { 65 + queryId: string; 66 + newEcid: string; 67 + newRichResult: SubscriptionResult[]; 68 + newResultUris: string[]; 69 + sub: Subscription; 70 + }[] = []; 71 + 72 + for (const [queryId, sub] of subscriptions.entries()) { 73 + let newRichResult: SubscriptionResult[] | undefined = undefined; 74 + const sortClause = (sub.esQuery as any).sort; 75 + 76 + if (event.type === 'index') { 77 + const queryClause = (sub.esQuery as any).query ?? {}; 78 + const doesMatch = matchDocAgainstQuery(event.data, queryClause); 79 + const currentIndex = sub.result.findIndex(item => item.uri === event.uri); 80 + const wasInResult = currentIndex !== -1; 81 + 82 + if (doesMatch && !wasInResult) { 83 + const newItem = { 84 + uri: event.uri, 85 + sortValues: extractSortValues(event.data, sortClause), 86 + }; 87 + const insertIndex = findInsertIndex(sub.result, newItem, sortClause); 88 + 89 + newRichResult = [...sub.result]; 90 + newRichResult.splice(insertIndex, 0, newItem); 91 + requiredDocuments.add(event.uri); 92 + 93 + } else if (!doesMatch && wasInResult) { 94 + newRichResult = sub.result.filter(item => item.uri !== event.uri); 95 + 96 + } else if (doesMatch && wasInResult) { 97 + const newItem = { 98 + uri: event.uri, 99 + sortValues: extractSortValues(event.data, sortClause), 100 + }; 101 + if (compareItems(newItem.sortValues, sub.result[currentIndex].sortValues, sortClause) !== 0) { 102 + const tempResult = sub.result.filter(item => item.uri !== event.uri); 103 + const insertIndex = findInsertIndex(tempResult, newItem, sortClause); 104 + 105 + newRichResult = [...tempResult]; 106 + newRichResult.splice(insertIndex, 0, newItem); 107 + } 108 + requiredDocuments.add(event.uri); 109 + } 110 + } else if (event.type === 'delete') { 111 + if (sub.result.some(item => item.uri === event.uri)) { 112 + newRichResult = sub.result.filter(item => item.uri !== event.uri); 113 + } 114 + } 115 + 116 + if (newRichResult) { 117 + const newResultUris = newRichResult.map(item => item.uri); 118 + const newEcid = await computeCid(newResultUris); 119 + 120 + if (newEcid !== sub.ecid) { 121 + changedQueries.push({ 122 + queryId, 123 + newEcid, 124 + newRichResult, 125 + newResultUris, 126 + sub 127 + }); 128 + } 129 + } 130 + } 131 + 132 + for (const { queryId, newEcid, newRichResult } of changedQueries) { 133 + const sub = subscriptions.get(queryId); 134 + if (sub) { 135 + sub.ecid = newEcid; 136 + sub.result = newRichResult; 137 + } 138 + } 139 + 140 + const documentsPayload: Record<string, { cid: string; doc: Record<string, unknown> }> = {}; 141 + if (event.type === 'index') { 142 + for (const uri of requiredDocuments) { 143 + documentsPayload[uri] = { 144 + cid: event.data["$metadata.cid"] as string, 145 + doc: event.data as Record<string, unknown>, 146 + }; 147 + } 148 + } 149 + 150 + if (changedQueries.length > 0 || Object.keys(documentsPayload).length > 0) { 151 + const affectedClients = new Set<WebSocket>(); 152 + changedQueries.forEach(cq => cq.sub.clients.forEach(c => affectedClients.add(c))); 153 + 154 + if (Object.keys(documentsPayload).length > 0 && event.type === 'index') { 155 + for (const [, sub] of subscriptions.entries()) { 156 + if (sub.result.some(item => item.uri === event.uri)) { 157 + sub.clients.forEach(c => affectedClients.add(c)); 158 + } 159 + } 160 + } 161 + 162 + for (const client of affectedClients) { 163 + const delta: ClientDelta = { 164 + documents: documentsPayload, 165 + queries: {}, 166 + }; 167 + 168 + for (const { queryId, newEcid, newResultUris, sub } of changedQueries) { 169 + if (sub.clients.has(client)) { 170 + delta.queries[queryId] = { ecid: newEcid, result: newResultUris }; 171 + } 172 + } 173 + 174 + if (Object.keys(delta.queries).length > 0 || Object.keys(documentsPayload).length > 0) { 175 + if (client.readyState === WebSocket.OPEN) { 176 + try { 177 + client.send(JSON.stringify({ type: 'query-delta', ...delta })); 178 + } catch (err) { 179 + console.error("Failed to send delta to client:", err); 180 + } 181 + } 182 + } 183 + } 184 + } 185 + }
+506 -54
src/xrpc.ts
··· 1 - import { AppConfig } from "../src/config.ts"; 1 + import { AppConfig } from "./config.ts"; 2 2 import { serve } from "https://deno.land/std/http/server.ts"; 3 + import { CID } from "npm:multiformats/cid"; 4 + import { sha256 } from "npm:multiformats/hashes/sha2"; 5 + import { DidResolver, HandleResolver } from "npm:@atproto/identity"; 6 + import * as dagCbor from "npm:@ipld/dag-cbor"; 7 + import { validateLiveQuery } from "./live-utils.ts"; 8 + 9 + const ZOMBIE_LIFETIME_MS = 5 * 60 * 1000; 10 + 11 + declare global { 12 + interface WebSocket { 13 + isAlive: boolean; 14 + ping(): void; 15 + terminate?(): void; 16 + } 17 + } 18 + 19 + export interface SubscriptionResult { 20 + uri: string; 21 + sortValues: unknown[]; 22 + } 23 + 24 + export interface Subscription { 25 + esQuery: Record<string, unknown>; 26 + clients: Set<WebSocket>; 27 + ecid: string; 28 + result: SubscriptionResult[]; 29 + zombieTimer?: number; 30 + } 31 + 32 + export const subscriptions = new Map<string, Subscription>(); 33 + 34 + export async function computeCid(data: unknown): Promise<string> { 35 + const encodedBytes = dagCbor.encode(data); 36 + const hash = await sha256.digest(encodedBytes); 37 + return CID.create(1, dagCbor.code, hash).toString(); 38 + } 39 + 40 + function corsHeaders(): HeadersInit { 41 + return { 42 + "Access-Control-Allow-Origin": "*", 43 + "Access-Control-Allow-Methods": "GET,POST,OPTIONS", 44 + "Access-Control-Allow-Headers": "Content-Type", 45 + }; 46 + } 3 47 4 48 export function setupXRPCServer(config: AppConfig) { 5 - serve(async (req) => { 6 - const url = new URL(req.url); 49 + serve( 50 + async (req) => { 51 + const { pathname, searchParams } = new URL(req.url); 7 52 8 - if (req.method === "OPTIONS") { 9 - return new Response(null, { 10 - status: 204, 11 - headers: corsHeaders(), 12 - }); 13 - } 53 + if (req.method === "OPTIONS") { 54 + return new Response(null, { status: 204, headers: corsHeaders() }); 55 + } 14 56 15 - if (url.pathname === "/xrpc/com.example.prototypeESQuery") { 16 - let esQuery: any; 57 + if ( 58 + pathname === "/xrpc/party.whey.esav.esSync" && 59 + req.headers.get("upgrade") === "websocket" 60 + ) { 61 + const { socket, response } = Deno.upgradeWebSocket(req); 62 + handleWebSocket(socket, config); 63 + return response; 64 + } 17 65 18 - if (req.method === "POST") { 66 + if (pathname === "/xrpc/party.whey.esav.resolveIdentity") { 19 67 try { 20 - esQuery = await req.json(); 21 - } catch { 22 - return new Response("Invalid JSON body", { 23 - status: 400, 68 + const handle = searchParams.get("handle"); 69 + const did = searchParams.get("did"); 70 + const clientCid = searchParams.get("cid"); 71 + const includeBskyProfile = searchParams.get("includeBskyProfile"); 72 + 73 + if (!handle && !did) { 74 + return new Response("handle or did parameter is required", { 75 + status: 400, 76 + headers: corsHeaders(), 77 + }); 78 + } 79 + 80 + const identity = await resolveWithHandleOrDid({ 81 + handle: handle ?? undefined, 82 + did: did ?? undefined, 83 + }); 84 + 85 + const finalResponse: Record<string, unknown> = { 86 + ...identity, 87 + }; 88 + 89 + if (includeBskyProfile) { 90 + finalResponse.profile = await getProfileRecord(identity.pdsUrl, identity.did); 91 + } 92 + 93 + const newCid = await computeCid(finalResponse); 94 + 95 + if (clientCid && clientCid === newCid) { 96 + return new Response(null, { status: 304, headers: corsHeaders() }); 97 + } 98 + 99 + return new Response(JSON.stringify(finalResponse), { 100 + status: 200, 101 + headers: { ...corsHeaders(), "Content-Type": "application/json" }, 102 + }); 103 + } catch (error) { 104 + const errorMessage = 105 + error instanceof Error 106 + ? error.message 107 + : "An unknown error occurred"; 108 + return new Response(errorMessage, { 109 + status: 500, 24 110 headers: corsHeaders(), 25 111 }); 26 112 } 27 - } else if (req.method === "GET") { 28 - const queryParam = url.searchParams.get("q"); 29 - if (!queryParam) { 30 - return new Response("Missing 'q'", { 31 - status: 400, 113 + } 114 + 115 + if ( 116 + pathname === "/xrpc/com.example.prototypeESQuery" || 117 + pathname === "/xrpc/party.whey.esav.esQuery" 118 + ) { 119 + let esQuery: Record<string, unknown>; 120 + const clientCid: string | null = searchParams.get("cid"); 121 + 122 + if (req.method === "POST") { 123 + try { 124 + esQuery = await req.json(); 125 + } catch { 126 + return new Response("Invalid JSON body", { 127 + status: 400, 128 + headers: corsHeaders(), 129 + }); 130 + } 131 + } else if (req.method === "GET") { 132 + const queryParam = searchParams.get("q"); 133 + if (!queryParam) 134 + return new Response("Missing 'q'", { 135 + status: 400, 136 + headers: corsHeaders(), 137 + }); 138 + 139 + try { 140 + esQuery = JSON.parse(queryParam); 141 + } catch { 142 + return new Response("Invalid JSON in 'q'", { 143 + status: 400, 144 + headers: corsHeaders(), 145 + }); 146 + } 147 + } else { 148 + return new Response("Method not allowed", { 149 + status: 405, 32 150 headers: corsHeaders(), 33 151 }); 34 152 } 35 153 36 154 try { 37 - esQuery = JSON.parse(queryParam); 38 - } catch { 39 - return new Response("Invalid JSON in 'q'", { 40 - status: 400, 155 + const esRes = await fetch( 156 + `${config.es_url}/${config.index_name}/_search`, 157 + { 158 + method: "POST", 159 + headers: { "Content-Type": "application/json" }, 160 + body: JSON.stringify(esQuery), 161 + } 162 + ); 163 + 164 + if (!esRes.ok) { 165 + const errText = await esRes.text(); 166 + return new Response(`Elasticsearch error: ${errText}`, { 167 + status: 500, 168 + headers: corsHeaders(), 169 + }); 170 + } 171 + 172 + const result = await esRes.json(); 173 + const newCid = await computeCid(result); 174 + 175 + if (clientCid && clientCid === newCid) { 176 + return new Response(null, { status: 304, headers: corsHeaders() }); 177 + } 178 + 179 + return new Response(JSON.stringify(result), { 180 + status: 200, 181 + headers: { ...corsHeaders(), "Content-Type": "application/json" }, 182 + }); 183 + } catch (error) { 184 + const errorMessage = 185 + error instanceof Error 186 + ? error.message 187 + : "An unknown error occurred"; 188 + return new Response(errorMessage, { 189 + status: 500, 41 190 headers: corsHeaders(), 42 191 }); 43 192 } 44 - } else { 45 - return new Response("Method not allowed", { 46 - status: 405, 47 - headers: corsHeaders(), 48 - }); 49 193 } 50 194 51 - const esRes = await fetch(`${config.es_url}/${config.index_name}/_search`, { 195 + return new Response("Not found", { status: 404, headers: corsHeaders() }); 196 + }, 197 + { port: Number(config.serve_port) } 198 + ); 199 + console.log(`XRPC server running on http://localhost:${config.serve_port}`); 200 + } 201 + 202 + function cleanupClient(ws: WebSocket) { 203 + console.log("WebSocket client disconnected. Cleaning up subscriptions."); 204 + for (const [queryId, sub] of subscriptions.entries()) { 205 + if (sub.clients.has(ws)) { 206 + handleUnsubscribe(ws, queryId); 207 + // sub.clients.delete(ws); 208 + // if (sub.clients.size === 0) { 209 + // subscriptions.delete(queryId); 210 + // console.log(`Cleaned up empty subscription for queryId: ${queryId}`); 211 + // } 212 + } 213 + } 214 + } 215 + 216 + function handleWebSocket(ws: WebSocket, config: AppConfig) { 217 + console.log("WebSocket client connected."); 218 + ws.isAlive = true; 219 + 220 + const pingInterval = setInterval(() => { 221 + if (!ws.isAlive) { 222 + console.log('Terminating dead WebSocket connection'); 223 + return ws.close(); 224 + } 225 + ws.isAlive = false; 226 + if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'ping' })); 227 + }, 30000); 228 + 229 + const onMessage = async (event: MessageEvent) => { 230 + if (typeof event.data !== 'string') return; 231 + 232 + let msg; 233 + try { 234 + msg = JSON.parse(event.data); 235 + } catch (_err) { 236 + if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' })); 237 + return; 238 + } 239 + 240 + switch (msg.type) { 241 + case 'pong': 242 + ws.isAlive = true; 243 + break; 244 + case 'subscribe': 245 + if (msg.queryId && msg.esquery) { 246 + await handleSubscribe(ws, msg.queryId, msg.esquery, config, msg.ecid); 247 + } else { 248 + ws.send(JSON.stringify({ type: 'error', error: 'Invalid subscribe message', request: msg })); 249 + } 250 + break; 251 + case 'unsubscribe': 252 + if (msg.queryId) { 253 + handleUnsubscribe(ws, msg.queryId); 254 + } else { 255 + ws.send(JSON.stringify({ type: 'error', error: 'Invalid unsubscribe message', request: msg })); 256 + } 257 + break; 258 + default: 259 + ws.send(JSON.stringify({ type: 'error', error: 'Unknown message type', request: msg })); 260 + break; 261 + } 262 + }; 263 + 264 + const onClose = () => { clearInterval(pingInterval); cleanupClient(ws); }; 265 + const onError = (e: Event) => { console.error('WebSocket error:', e); onClose(); }; 266 + 267 + ws.addEventListener('message', onMessage); 268 + ws.addEventListener('close', onClose); 269 + ws.addEventListener('error', onError); 270 + } 271 + 272 + async function handleSubscribe( 273 + ws: WebSocket, 274 + queryId: string, 275 + esQuery: Record<string, unknown>, 276 + config: AppConfig, 277 + clientEcid?: string 278 + ) { 279 + try { 280 + validateLiveQuery(esQuery); 281 + } catch (err) { 282 + console.error(`Invalid live query for queryId ${queryId}: ${err}`); 283 + if (ws.readyState === WebSocket.OPEN) { 284 + ws.send(JSON.stringify({ 285 + type: "error", 286 + queryId, 287 + error: `Invalid query for live subscription: ${err}` 288 + })); 289 + } 290 + return; 291 + } 292 + 293 + let existingSub = subscriptions.get(queryId); 294 + 295 + // a subscription (active or zombie) already exists 296 + if (existingSub) { 297 + // if the new query is different, the old state is invalid. 298 + if (JSON.stringify(esQuery) !== JSON.stringify(existingSub.esQuery)) { 299 + console.log(`[Sync] Query for ${queryId} has changed. Purging old state and re-running.`); 300 + if (existingSub.zombieTimer) clearTimeout(existingSub.zombieTimer); 301 + subscriptions.delete(queryId); 302 + existingSub = undefined; 303 + } else { 304 + // if the query is the same. lets revive it if its a zombie 305 + if (existingSub.zombieTimer) { 306 + console.log(`[Zombie] Reviving zombie subscription: ${queryId}`); 307 + clearTimeout(existingSub.zombieTimer); 308 + delete existingSub.zombieTimer; 309 + } 310 + 311 + existingSub.clients.add(ws); 312 + 313 + // only if the ecid is still salvageable 314 + if (clientEcid && existingSub.ecid === clientEcid) { 315 + console.log(`[Sync] Client for ${queryId} is already up-to-date. No-op.`); 316 + return; 317 + } 318 + 319 + console.log(`[Sync] Client for ${queryId} is out of sync. Re-running ES query.`); 320 + } 321 + } 322 + 323 + 324 + 325 + // if (clientEcid && existingSub && existingSub.ecid === clientEcid) { 326 + // console.log(`Client re-subscribed to ${queryId} with matching ECID.`); 327 + // existingSub.clients.add(ws); 328 + // return; 329 + // } 330 + 331 + console.log(`Performing full sync for queryId: ${queryId}`); 332 + try { 333 + const esRes = await fetch( 334 + `${config.es_url}/${config.index_name}/_search`, 335 + { 52 336 method: "POST", 53 337 headers: { "Content-Type": "application/json" }, 54 338 body: JSON.stringify(esQuery), 339 + } 340 + ); 341 + 342 + if (!esRes.ok) throw new Error(`Elasticsearch error: ${await esRes.text()}`); 343 + 344 + const esResult = await esRes.json(); 345 + const hits = esResult.hits?.hits ?? []; 346 + 347 + const documents: Record<string, { cid: string, doc: Record<string, unknown> }> = {}; 348 + const newResultState: SubscriptionResult[] = []; 349 + 350 + for (const hit of hits) { 351 + const uri = hit._id as string; 352 + const source = hit._source; 353 + 354 + newResultState.push({ 355 + uri: uri, 356 + sortValues: hit.sort || [], 55 357 }); 56 358 57 - if (!esRes.ok) { 58 - const errText = await esRes.text(); 59 - return new Response(`Elasticsearch error: ${errText}`, { 60 - status: 500, 61 - headers: corsHeaders(), 62 - }); 63 - } 359 + documents[uri] = { 360 + cid: source['$metadata.cid'], 361 + doc: source, 362 + }; 363 + } 364 + 365 + const resultUris = newResultState.map(item => item.uri); 366 + const newEcid = await computeCid(resultUris); 64 367 65 - const result = await esRes.json(); 66 - return new Response(JSON.stringify(result), { 67 - status: 200, 68 - headers: { 69 - ...corsHeaders(), 70 - "Content-Type": "application/json", 71 - }, 368 + if (existingSub) { 369 + existingSub.clients.add(ws); 370 + existingSub.result = newResultState; 371 + existingSub.ecid = newEcid; 372 + existingSub.esQuery = esQuery; 373 + } else { 374 + subscriptions.set(queryId, { 375 + esQuery, 376 + clients: new Set([ws]), 377 + ecid: newEcid, 378 + result: newResultState, 72 379 }); 73 380 } 74 381 75 - return new Response("Not found", { status: 404, headers: corsHeaders() }); 76 - }, { port: Number(config.serve_port) }); 382 + ws.send(JSON.stringify({ 383 + type: "query-delta", 384 + documents, 385 + queries: { 386 + [queryId]: { 387 + ecid: newEcid, 388 + result: resultUris, 389 + } 390 + } 391 + })); 392 + 393 + } catch (err) { 394 + console.error(`Error subscribing client to query ${queryId}:`, err); 395 + if (ws.readyState === WebSocket.OPEN) { 396 + ws.send(JSON.stringify({ type: "error", queryId, error: `Subscription failed: ${err instanceof Error ? err.message : 'Unknown error'}` })); 397 + } 398 + } 77 399 } 78 400 79 - function corsHeaders(): HeadersInit { 80 - return { 81 - "Access-Control-Allow-Origin": "*", 82 - "Access-Control-Allow-Methods": "GET,POST,OPTIONS", 83 - "Access-Control-Allow-Headers": "Content-Type", 84 - }; 85 - } 401 + function transitionToZombie(queryId: string) { 402 + const sub = subscriptions.get(queryId); 403 + if (!sub || sub.clients.size > 0) { 404 + return; 405 + } 406 + 407 + if (sub.zombieTimer) { 408 + clearTimeout(sub.zombieTimer); 409 + } 410 + 411 + console.log(`[Zombie] Subscription ${queryId} has no clients. Entering zombie state for 5 minutes.`); 412 + 413 + sub.zombieTimer = setTimeout(() => { 414 + subscriptions.delete(queryId); 415 + console.log(`[Zombie] Purged zombie subscription: ${queryId}`); 416 + }, ZOMBIE_LIFETIME_MS); 417 + } 418 + 419 + function handleUnsubscribe(ws: WebSocket, queryId: string) { 420 + const sub = subscriptions.get(queryId); 421 + if (!sub) return; 422 + sub.clients.delete(ws); 423 + console.log(`[Sync] Client unsubscribed from ${queryId}. Remaining clients: ${sub.clients.size}`); 424 + 425 + if (sub.clients.size === 0) { 426 + transitionToZombie(queryId); 427 + } 428 + // if (sub.clients.size === 0) { 429 + // subscriptions.delete(queryId); 430 + // console.log(`Subscription removed for queryId: ${queryId} (last client disconnected)`); 431 + // } else { 432 + // console.log(`Client unsubscribed from ${queryId}`); 433 + // } 434 + } 435 + 436 + declare global { 437 + interface WebSocket { 438 + isAlive: boolean; 439 + ping(): void; 440 + terminate?(): void; 441 + } 442 + } 443 + 444 + async function getServiceEndpointFromDid( 445 + did: string 446 + ): Promise<{ serviceEndpoint: string; handle?: string }> { 447 + const didres = new DidResolver({}); 448 + const doc = await didres.resolve(did) as { 449 + service?: Array<{ 450 + type: string; 451 + serviceEndpoint: string; 452 + }>; 453 + alsoKnownAs?: string[]; 454 + [key: string]: unknown; 455 + } | null; 456 + if (!doc) { 457 + throw new Error(`Could not resolve DID document for: ${did}`); 458 + } 459 + const endpointRaw = doc.service?.find( 460 + (s: any) => s.type === "AtprotoPersonalDataServer" 461 + )?.serviceEndpoint; 462 + if (!endpointRaw) { 463 + throw new Error(`Service endpoint not found for DID: ${did}`); 464 + } 465 + function getServiceEndpointString(endpoint: unknown): string { 466 + if (typeof endpoint === "string") return endpoint; 467 + if ( 468 + typeof endpoint === "object" && 469 + endpoint && 470 + "uri" in endpoint && 471 + typeof endpoint.uri === "string" 472 + ) { 473 + return endpoint.uri; 474 + } 475 + throw new Error("Unsupported serviceEndpoint format"); 476 + } 477 + const serviceEndpoint = getServiceEndpointString(endpointRaw); 478 + let handle: string | undefined; 479 + if (Array.isArray(doc.alsoKnownAs) && doc.alsoKnownAs.length > 0) { 480 + const aka = doc.alsoKnownAs[0]; 481 + if (typeof aka === "string" && aka.startsWith("at://")) { 482 + handle = aka.slice("at://".length); 483 + } 484 + } 485 + return { serviceEndpoint, handle }; 486 + } 487 + 488 + async function resolveWithHandleOrDid({ 489 + handle, 490 + did, 491 + }: { 492 + handle?: string; 493 + did?: string; 494 + }): Promise<{ did: string; pdsUrl: string; handle?: string }> { 495 + if (did) { 496 + const { serviceEndpoint, handle: resolvedHandle } = 497 + await getServiceEndpointFromDid(did); 498 + return { 499 + did, 500 + pdsUrl: serviceEndpoint, 501 + handle: resolvedHandle, 502 + }; 503 + } else if (handle) { 504 + const hdlres = new HandleResolver(); 505 + const resolvedDid = await hdlres.resolve(handle); 506 + if (!resolvedDid) { 507 + throw new Error(`Could not resolve handle: ${handle}`); 508 + } 509 + const { serviceEndpoint } = await getServiceEndpointFromDid(resolvedDid); 510 + return { did: resolvedDid, pdsUrl: serviceEndpoint, handle }; 511 + } else { 512 + throw new Error("Either handle or did must be provided"); 513 + } 514 + } 515 + 516 + async function getProfileRecord( 517 + pdsUrl: string, 518 + did: string 519 + ): Promise<string | undefined> { 520 + try { 521 + const profileUrl = `${pdsUrl}/xrpc/com.atproto.repo.getRecord?repo=${did}&collection=app.bsky.actor.profile&rkey=self`; 522 + const response = await fetch(profileUrl); 523 + if (!response.ok) { 524 + console.warn( 525 + `Failed to fetch profile for ${did} from ${pdsUrl}: ${response.status}` 526 + ); 527 + return undefined; 528 + } 529 + const data = await response.json(); 530 + //const cid = data?.value?.avatar?.ref?.["$link"]; 531 + //const pfpurl = `${pdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 532 + return data.value; 533 + } catch (error) { 534 + console.error(`Error fetching profile record for ${did}:`, error); 535 + return undefined; 536 + } 537 + }