an attempt to make a lightweight, easily self-hostable, scoped bluesky appview
1import { indexHandlerContext } from "./index/types.ts";
2
3import { assertRecord, validateRecord } from "./utils/records.ts";
4import {
5buildBlobUrl,
6 getSlingshotRecord,
7 resolveIdentity,
8 searchParamsToJson,
9 withCors,
10} from "./utils/server.ts";
11import * as IndexServerTypes from "./utils/indexservertypes.ts";
12import { Database } from "jsr:@db/sqlite@0.11";
13import { setupUserDb } from "./utils/dbuser.ts";
14// import { systemDB } from "./env.ts";
15import { JetstreamManager, SpacedustManager } from "./utils/sharders.ts";
16import { handleSpacedust, SpacedustLinkMessage } from "./index/spacedust.ts";
17import { handleJetstream } from "./index/jetstream.ts";
18import * as ATPAPI from "npm:@atproto/api";
19import { AtUri } from "npm:@atproto/api";
20import * as IndexServerAPI from "./indexclient/index.ts";
21import * as IndexServerUtils from "./indexclient/util.ts";
22import { isPostView } from "./indexclient/types/app/bsky/feed/defs.ts";
23
24export interface IndexServerConfig {
25 baseDbPath: string;
26 systemDbPath: string;
27}
28
29interface BaseRow {
30 uri: string;
31 did: string;
32 cid: string | null;
33 rev: string | null;
34 createdat: number | null;
35 indexedat: number;
36 json: string | null;
37}
38interface GeneratorRow extends BaseRow {
39 displayname: string | null;
40 description: string | null;
41 avatarcid: string | null;
42}
43interface LikeRow extends BaseRow {
44 subject: string;
45}
46interface RepostRow extends BaseRow {
47 subject: string;
48}
49interface BacklinkRow {
50 srcuri: string;
51 srcdid: string;
52}
53
54const FEED_LIMIT = 50;
55
56export class IndexServer {
57 private config: IndexServerConfig;
58 public userManager: IndexServerUserManager;
59 public systemDB: Database;
60
61 constructor(config: IndexServerConfig) {
62 this.config = config;
63
64 // We will initialize the system DB and user manager here
65 this.systemDB = new Database(this.config.systemDbPath);
66 // TODO: We need to setup the system DB schema if it's new
67
68 this.userManager = new IndexServerUserManager(this); // Pass the server instance
69 }
70
71 public start() {
72 // This is where we'll kick things off, like the cold start
73 this.userManager.coldStart(this.systemDB);
74 console.log("IndexServer started.");
75 }
76
77 public async handleRequest(req: Request): Promise<Response> {
78 const url = new URL(req.url);
79 // We will add routing logic here later to call our handlers
80 if (url.pathname.startsWith("/xrpc/")) {
81 return this.indexServerHandler(req);
82 }
83 if (url.pathname.startsWith("/links")) {
84 return this.constellationAPIHandler(req);
85 }
86 return new Response("Not Found", { status: 404 });
87 }
88
89 public handlesDid(did: string): boolean {
90 return this.userManager.handlesDid(did);
91 }
92 async unspeccedGetRegisteredUsers(): Promise<{
93 did: string;
94 role: string;
95 registrationdate: string;
96 onboardingstatus: string;
97 pfp?: string;
98 displayname: string;
99 handle: string;
100 }[]|undefined> {
101 const stmt = this.systemDB.prepare(`
102 SELECT *
103 FROM users;
104 `);
105 const result = stmt.all() as
106 {
107 did: string;
108 role: string;
109 registrationdate: string;
110 onboardingstatus: string;
111 }[];
112 const hydrated = await Promise.all( result.map(async (user)=>{
113 const identity = await resolveIdentity(user.did);
114 const profile = (await getSlingshotRecord(identity.did,"app.bsky.actor.profile","self")).value as ATPAPI.AppBskyActorProfile.Record;
115 const avatarcid = uncid(profile.avatar?.ref);
116 const avatar = avatarcid
117 ? buildBlobUrl(identity.pds, identity.did, avatarcid)
118 : undefined;
119 return {...user,handle: identity.handle,pfp: avatar, displayname:profile.displayName ?? identity.handle }
120 }))
121 //const exists = result !== undefined;
122 return hydrated;
123 }
124
125 // We will move all the global functions into this class as methods...
126 async indexServerHandler(req: Request): Promise<Response> {
127 const url = new URL(req.url);
128 const pathname = url.pathname;
129 //const bskyUrl = `https://api.bsky.app${pathname}${url.search}`;
130 //const hasAuth = req.headers.has("authorization");
131 const xrpcMethod = pathname.startsWith("/xrpc/")
132 ? pathname.slice("/xrpc/".length)
133 : null;
134 const searchParams = searchParamsToJson(url.searchParams);
135 console.log(JSON.stringify(searchParams, null, 2));
136 const jsonUntyped = searchParams;
137
138 switch (xrpcMethod) {
139 case "app.bsky.actor.getProfile": {
140 const jsonTyped =
141 jsonUntyped as IndexServerTypes.AppBskyActorGetProfile.QueryParams;
142
143 const res = await this.queryProfileView(jsonTyped.actor, "Detailed");
144 if (!res)
145 return new Response(
146 JSON.stringify({
147 error: "User not found",
148 }),
149 {
150 status: 404,
151 headers: withCors({ "Content-Type": "application/json" }),
152 }
153 );
154 const response: IndexServerTypes.AppBskyActorGetProfile.OutputSchema =
155 res;
156
157 return new Response(JSON.stringify(response), {
158 headers: withCors({ "Content-Type": "application/json" }),
159 });
160 }
161 case "app.bsky.actor.getProfiles": {
162 const jsonTyped =
163 jsonUntyped as IndexServerTypes.AppBskyActorGetProfiles.QueryParams;
164
165 if (typeof jsonUntyped?.actors === "string") {
166 const res = await this.queryProfileView(
167 jsonUntyped.actors as string,
168 "Detailed"
169 );
170 if (!res)
171 return new Response(
172 JSON.stringify({
173 error: "User not found",
174 }),
175 {
176 status: 404,
177 headers: withCors({ "Content-Type": "application/json" }),
178 }
179 );
180 const response: IndexServerTypes.AppBskyActorGetProfiles.OutputSchema =
181 {
182 profiles: [res],
183 };
184
185 return new Response(JSON.stringify(response), {
186 headers: withCors({ "Content-Type": "application/json" }),
187 });
188 }
189
190 const res: ATPAPI.AppBskyActorDefs.ProfileViewDetailed[] =
191 await Promise.all(
192 jsonTyped.actors
193 .map(async (actor) => {
194 return await this.queryProfileView(actor, "Detailed");
195 })
196 .filter(
197 (
198 x
199 ): x is Promise<ATPAPI.AppBskyActorDefs.ProfileViewDetailed> =>
200 x !== undefined
201 )
202 );
203
204 if (!res)
205 return new Response(
206 JSON.stringify({
207 error: "User not found",
208 }),
209 {
210 status: 404,
211 headers: withCors({ "Content-Type": "application/json" }),
212 }
213 );
214
215 const response: IndexServerTypes.AppBskyActorGetProfiles.OutputSchema =
216 {
217 profiles: res,
218 };
219
220 return new Response(JSON.stringify(response), {
221 headers: withCors({ "Content-Type": "application/json" }),
222 });
223 }
224 case "app.bsky.feed.getActorFeeds": {
225 const jsonTyped =
226 jsonUntyped as IndexServerTypes.AppBskyFeedGetActorFeeds.QueryParams;
227
228 const qresult = await this.queryActorFeeds(jsonTyped.actor);
229
230 const response: IndexServerTypes.AppBskyFeedGetActorFeeds.OutputSchema =
231 {
232 feeds: qresult,
233 };
234
235 return new Response(JSON.stringify(response), {
236 headers: withCors({ "Content-Type": "application/json" }),
237 });
238 }
239 case "app.bsky.feed.getFeedGenerator": {
240 const jsonTyped =
241 jsonUntyped as IndexServerTypes.AppBskyFeedGetFeedGenerator.QueryParams;
242
243 const qresult = await this.queryFeedGenerator(jsonTyped.feed);
244 if (!qresult) {
245 return new Response(
246 JSON.stringify({
247 error: "Feed not found",
248 }),
249 {
250 status: 404,
251 headers: withCors({ "Content-Type": "application/json" }),
252 }
253 );
254 }
255
256 const response: IndexServerTypes.AppBskyFeedGetFeedGenerator.OutputSchema =
257 {
258 view: qresult,
259 isOnline: true, // lmao
260 isValid: true, // lmao
261 };
262
263 return new Response(JSON.stringify(response), {
264 headers: withCors({ "Content-Type": "application/json" }),
265 });
266 }
267 case "app.bsky.feed.getFeedGenerators": {
268 const jsonTyped =
269 jsonUntyped as IndexServerTypes.AppBskyFeedGetFeedGenerators.QueryParams;
270
271 const qresult = await this.queryFeedGenerators(jsonTyped.feeds);
272 if (!qresult) {
273 return new Response(
274 JSON.stringify({
275 error: "Feed not found",
276 }),
277 {
278 status: 404,
279 headers: withCors({ "Content-Type": "application/json" }),
280 }
281 );
282 }
283
284 const response: IndexServerTypes.AppBskyFeedGetFeedGenerators.OutputSchema =
285 {
286 feeds: qresult,
287 };
288
289 return new Response(JSON.stringify(response), {
290 headers: withCors({ "Content-Type": "application/json" }),
291 });
292 }
293 case "app.bsky.feed.getPosts": {
294 const jsonTyped =
295 jsonUntyped as IndexServerTypes.AppBskyFeedGetPosts.QueryParams;
296
297 const posts: IndexServerTypes.AppBskyFeedGetPosts.OutputSchema["posts"] =
298 (
299 await Promise.all(
300 jsonTyped.uris.map((uri) => this.queryPostView(uri))
301 )
302 ).filter((p): p is ATPAPI.AppBskyFeedDefs.PostView => Boolean(p));
303
304 const response: IndexServerTypes.AppBskyFeedGetPosts.OutputSchema = {
305 posts,
306 };
307
308 return new Response(JSON.stringify(response), {
309 headers: withCors({ "Content-Type": "application/json" }),
310 });
311 }
312 case "party.whey.app.bsky.feed.getActorLikesPartial": {
313 const jsonTyped =
314 jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetActorLikesPartial.QueryParams;
315
316 // TODO: not partial yet, currently skips refs
317
318 const qresult = await this.queryActorLikesPartial(
319 jsonTyped.actor,
320 jsonTyped.cursor
321 );
322 if (!qresult) {
323 return new Response(
324 JSON.stringify({
325 error: "Feed not found",
326 }),
327 {
328 status: 404,
329 headers: withCors({ "Content-Type": "application/json" }),
330 }
331 );
332 }
333
334 const response: IndexServerTypes.PartyWheyAppBskyFeedGetActorLikesPartial.OutputSchema =
335 {
336 feed: qresult.items as ATPAPI.$Typed<ATPAPI.AppBskyFeedDefs.FeedViewPost>[],
337 cursor: qresult.cursor,
338 };
339
340 return new Response(JSON.stringify(response), {
341 headers: withCors({ "Content-Type": "application/json" }),
342 });
343 }
344 case "party.whey.app.bsky.feed.getAuthorFeedPartial": {
345 const jsonTyped =
346 jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetAuthorFeedPartial.QueryParams;
347
348 // TODO: not partial yet, currently skips refs
349
350 const qresult = await this.queryAuthorFeedPartial(
351 jsonTyped.actor,
352 jsonTyped.cursor
353 );
354 if (!qresult) {
355 return new Response(
356 JSON.stringify({
357 error: "Feed not found",
358 }),
359 {
360 status: 404,
361 headers: withCors({ "Content-Type": "application/json" }),
362 }
363 );
364 }
365
366 const response: IndexServerTypes.PartyWheyAppBskyFeedGetAuthorFeedPartial.OutputSchema =
367 {
368 feed: qresult.items as ATPAPI.$Typed<ATPAPI.AppBskyFeedDefs.FeedViewPost>[],
369 cursor: qresult.cursor,
370 };
371
372 return new Response(JSON.stringify(response), {
373 headers: withCors({ "Content-Type": "application/json" }),
374 });
375 }
376 case "party.whey.app.bsky.feed.getLikesPartial": {
377 const jsonTyped =
378 jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetLikesPartial.QueryParams;
379
380 // TODO: not partial yet, currently skips refs
381
382 const qresult = this.queryLikes(jsonTyped.uri);
383 if (!qresult) {
384 return new Response(
385 JSON.stringify({
386 error: "Feed not found",
387 }),
388 {
389 status: 404,
390 headers: withCors({ "Content-Type": "application/json" }),
391 }
392 );
393 }
394 const response: IndexServerTypes.PartyWheyAppBskyFeedGetLikesPartial.OutputSchema =
395 {
396 // @ts-ignore whatever i dont care TODO: fix ts ignores
397 likes: qresult,
398 };
399
400 return new Response(JSON.stringify(response), {
401 headers: withCors({ "Content-Type": "application/json" }),
402 });
403 }
404 case "party.whey.app.bsky.feed.getPostThreadPartial": {
405 const jsonTyped =
406 jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetPostThreadPartial.QueryParams;
407
408 // TODO: not partial yet, currently skips refs
409
410 const qresult = await this.queryPostThreadPartial(jsonTyped.uri);
411 if (!qresult) {
412 return new Response(
413 JSON.stringify({
414 error: "Feed not found",
415 }),
416 {
417 status: 404,
418 headers: withCors({ "Content-Type": "application/json" }),
419 }
420 );
421 }
422 const response: IndexServerTypes.PartyWheyAppBskyFeedGetPostThreadPartial.OutputSchema =
423 qresult;
424
425 return new Response(JSON.stringify(response), {
426 headers: withCors({ "Content-Type": "application/json" }),
427 });
428 }
429 case "party.whey.app.bsky.feed.getQuotesPartial": {
430 const jsonTyped =
431 jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetQuotesPartial.QueryParams;
432
433 // TODO: not partial yet, currently skips refs
434
435 const qresult = await this.queryQuotes(jsonTyped.uri);
436 if (!qresult) {
437 return new Response(
438 JSON.stringify({
439 error: "Feed not found",
440 }),
441 {
442 status: 404,
443 headers: withCors({ "Content-Type": "application/json" }),
444 }
445 );
446 }
447 const response: IndexServerTypes.PartyWheyAppBskyFeedGetQuotesPartial.OutputSchema =
448 {
449 uri: jsonTyped.uri,
450 posts: qresult.map((feedviewpost) => {
451 return feedviewpost.post as ATPAPI.$Typed<ATPAPI.AppBskyFeedDefs.PostView>;
452 }),
453 };
454
455 return new Response(JSON.stringify(response), {
456 headers: withCors({ "Content-Type": "application/json" }),
457 });
458 }
459 case "party.whey.app.bsky.feed.getRepostedByPartial": {
460 const jsonTyped =
461 jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetRepostedByPartial.QueryParams;
462
463 // TODO: not partial yet, currently skips refs
464
465 const qresult = await this.queryReposts(jsonTyped.uri);
466 if (!qresult) {
467 return new Response(
468 JSON.stringify({
469 error: "Feed not found",
470 }),
471 {
472 status: 404,
473 headers: withCors({ "Content-Type": "application/json" }),
474 }
475 );
476 }
477 const response: IndexServerTypes.PartyWheyAppBskyFeedGetRepostedByPartial.OutputSchema =
478 {
479 uri: jsonTyped.uri,
480 repostedBy:
481 qresult as ATPAPI.$Typed<ATPAPI.AppBskyActorDefs.ProfileView>[],
482 };
483
484 return new Response(JSON.stringify(response), {
485 headers: withCors({ "Content-Type": "application/json" }),
486 });
487 }
488 // TODO: too hard for now
489 // case "party.whey.app.bsky.feed.getListFeedPartial": {
490 // const jsonTyped =
491 // jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetListFeedPartial.QueryParams;
492
493 // const response: IndexServerTypes.PartyWheyAppBskyFeedGetListFeedPartial.OutputSchema =
494 // {};
495
496 // return new Response(JSON.stringify(response), {
497 // headers: withCors({ "Content-Type": "application/json" }),
498 // });
499 // }
500 /* three more coming soon
501 app.bsky.graph.getLists
502 app.bsky.graph.getList
503 app.bsky.graph.getActorStarterPacks
504 */
505 default: {
506 return new Response(
507 JSON.stringify({
508 error: "XRPCNotSupported",
509 message:
510 "HEY hello there my name is whey dot party and you have used my custom appview that is very cool but have you considered that XRPC Not Supported",
511 }),
512 {
513 status: 404,
514 headers: withCors({ "Content-Type": "application/json" }),
515 }
516 );
517 }
518 }
519
520 // return new Response("Not Found", { status: 404 });
521 }
522
523 constellationAPIHandler(req: Request): Response {
524 const url = new URL(req.url);
525 const pathname = url.pathname;
526 const searchParams = searchParamsToJson(url.searchParams) as linksQuery;
527 const jsonUntyped = searchParams;
528
529 if (!jsonUntyped.target) {
530 return new Response(
531 JSON.stringify({ error: "Missing required parameter: target" }),
532 {
533 status: 400,
534 headers: withCors({ "Content-Type": "application/json" }),
535 }
536 );
537 }
538
539 const did = isDid(searchParams.target)
540 ? searchParams.target
541 : new AtUri(searchParams.target).host;
542 const db = this.userManager.getDbForDid(did);
543 if (!db) {
544 return new Response(
545 JSON.stringify({
546 error: "User not found",
547 }),
548 {
549 status: 404,
550 headers: withCors({ "Content-Type": "application/json" }),
551 }
552 );
553 }
554
555 const limit = 16; //Math.min(parseInt(searchParams.limit || "50", 10), 100);
556 const offset = parseInt(searchParams.cursor || "0", 10);
557
558 switch (pathname) {
559 case "/links": {
560 const jsonTyped = jsonUntyped as linksQuery;
561 if (!jsonTyped.collection || !jsonTyped.path) {
562 return new Response(
563 JSON.stringify({
564 error: "Missing required parameters: collection, path",
565 }),
566 {
567 status: 400,
568 headers: withCors({ "Content-Type": "application/json" }),
569 }
570 );
571 }
572
573 const field = `${jsonTyped.collection}:${jsonTyped.path.replace(
574 /^\./,
575 ""
576 )}`;
577
578 const paginatedSql = `${SQL.links} LIMIT ? OFFSET ?`;
579 const rows = db
580 .prepare(paginatedSql)
581 .all(jsonTyped.target, jsonTyped.collection, field, limit, offset);
582
583 const countResult = db
584 .prepare(SQL.count)
585 .get(jsonTyped.target, jsonTyped.collection, field);
586 const total = countResult ? Number(countResult.total) : 0;
587
588 const linking_records: linksRecord[] = rows.map((row: any) => {
589 const rkey = row.srcuri.split("/").pop()!;
590 return {
591 did: row.srcdid,
592 collection: row.srccol,
593 rkey,
594 };
595 });
596
597 const response: linksRecordsResponse = {
598 total: total.toString(),
599 linking_records,
600 };
601
602 const nextCursor = offset + linking_records.length;
603 if (nextCursor < total) {
604 response.cursor = nextCursor.toString();
605 }
606
607 return new Response(JSON.stringify(response), {
608 headers: withCors({ "Content-Type": "application/json" }),
609 });
610 }
611 case "/links/distinct-dids": {
612 const jsonTyped = jsonUntyped as linksQuery;
613 if (!jsonTyped.collection || !jsonTyped.path) {
614 return new Response(
615 JSON.stringify({
616 error: "Missing required parameters: collection, path",
617 }),
618 {
619 status: 400,
620 headers: withCors({ "Content-Type": "application/json" }),
621 }
622 );
623 }
624
625 const field = `${jsonTyped.collection}:${jsonTyped.path.replace(
626 /^\./,
627 ""
628 )}`;
629
630 const paginatedSql = `${SQL.distinctDids} LIMIT ? OFFSET ?`;
631 const rows = db
632 .prepare(paginatedSql)
633 .all(jsonTyped.target, jsonTyped.collection, field, limit, offset);
634
635 const countResult = db
636 .prepare(SQL.countDistinctDids)
637 .get(jsonTyped.target, jsonTyped.collection, field);
638 const total = countResult ? Number(countResult.total) : 0;
639
640 const linking_dids: string[] = rows.map((row: any) => row.srcdid);
641
642 const response: linksDidsResponse = {
643 total: total.toString(),
644 linking_dids,
645 };
646
647 const nextCursor = offset + linking_dids.length;
648 if (nextCursor < total) {
649 response.cursor = nextCursor.toString();
650 }
651
652 return new Response(JSON.stringify(response), {
653 headers: withCors({ "Content-Type": "application/json" }),
654 });
655 }
656 case "/links/count": {
657 const jsonTyped = jsonUntyped as linksQuery;
658 if (!jsonTyped.collection || !jsonTyped.path) {
659 return new Response(
660 JSON.stringify({
661 error: "Missing required parameters: collection, path",
662 }),
663 {
664 status: 400,
665 headers: withCors({ "Content-Type": "application/json" }),
666 }
667 );
668 }
669
670 const field = `${jsonTyped.collection}:${jsonTyped.path.replace(
671 /^\./,
672 ""
673 )}`;
674
675 const result = db
676 .prepare(SQL.count)
677 .get(jsonTyped.target, jsonTyped.collection, field);
678
679 const response: linksCountResponse = {
680 total: result && result.total ? result.total.toString() : "0",
681 };
682
683 return new Response(JSON.stringify(response), {
684 headers: withCors({ "Content-Type": "application/json" }),
685 });
686 }
687 case "/links/count/distinct-dids": {
688 const jsonTyped = jsonUntyped as linksQuery;
689 if (!jsonTyped.collection || !jsonTyped.path) {
690 return new Response(
691 JSON.stringify({
692 error: "Missing required parameters: collection, path",
693 }),
694 {
695 status: 400,
696 headers: withCors({ "Content-Type": "application/json" }),
697 }
698 );
699 }
700
701 const field = `${jsonTyped.collection}:${jsonTyped.path.replace(
702 /^\./,
703 ""
704 )}`;
705
706 const result = db
707 .prepare(SQL.countDistinctDids)
708 .get(jsonTyped.target, jsonTyped.collection, field);
709
710 const response: linksCountResponse = {
711 total: result && result.total ? result.total.toString() : "0",
712 };
713
714 return new Response(JSON.stringify(response), {
715 headers: withCors({ "Content-Type": "application/json" }),
716 });
717 }
718 case "/links/all": {
719 const jsonTyped = jsonUntyped as linksAllQuery;
720
721 const rows = db.prepare(SQL.all).all(jsonTyped.target) as any[];
722
723 const links: linksAllResponse["links"] = {};
724
725 for (const row of rows) {
726 if (!links[row.suburi]) {
727 links[row.suburi] = {};
728 }
729 links[row.suburi][row.srccol] = {
730 records: row.records,
731 distinct_dids: row.distinct_dids,
732 };
733 }
734
735 const response: linksAllResponse = {
736 links,
737 };
738
739 return new Response(JSON.stringify(response), {
740 headers: withCors({ "Content-Type": "application/json" }),
741 });
742 }
743 default: {
744 return new Response(
745 JSON.stringify({
746 error: "NotSupported",
747 message:
748 "The requested endpoint is not supported by this Constellation implementation.",
749 }),
750 {
751 status: 404,
752 headers: withCors({ "Content-Type": "application/json" }),
753 }
754 );
755 }
756 }
757 }
758
759 indexServerIndexer(ctx: indexHandlerContext) {
760 const record = assertRecord(ctx.value);
761 //const record = validateRecord(ctx.value);
762 const db = this.userManager.getDbForDid(ctx.doer);
763 if (!db) return;
764 console.log("indexering");
765 switch (record?.$type) {
766 case "app.bsky.feed.like": {
767 return;
768 }
769 case "app.bsky.actor.profile": {
770 console.log("bsky profuile");
771
772 try {
773 const stmt = db.prepare(`
774 INSERT OR IGNORE INTO app_bsky_actor_profile (
775 uri, did, cid, rev, createdat, indexedat, json,
776 displayname,
777 description,
778 avatarcid,
779 avatarmime,
780 bannercid,
781 bannermime
782 ) VALUES (?, ?, ?, ?, ?, ?, ?,
783 ?, ?, ?,
784 ?, ?, ?)
785 `);
786 console.log({
787 uri: ctx.aturi,
788 did: ctx.doer,
789 cid: ctx.cid,
790 rev: ctx.rev,
791 createdat: record.createdAt,
792 indexedat: Date.now(),
793 json: JSON.stringify(record),
794 displayname: record.displayName,
795 description: record.description,
796 avatarcid: uncid(record.avatar?.ref),
797 avatarmime: record.avatar?.mimeType,
798 bannercid: uncid(record.banner?.ref),
799 bannermime: record.banner?.mimeType,
800 });
801 stmt.run(
802 ctx.aturi ?? null,
803 ctx.doer ?? null,
804 ctx.cid ?? null,
805 ctx.rev ?? null,
806 record.createdAt ?? null,
807 Date.now(),
808 JSON.stringify(record),
809
810 record.displayName ?? null,
811 record.description ?? null,
812 uncid(record.avatar?.ref) ?? null,
813 record.avatar?.mimeType ?? null,
814 uncid(record.banner?.ref) ?? null,
815 record.banner?.mimeType ?? null
816 // TODO please add pinned posts
817 );
818 } catch (err) {
819 console.error("stmt.run failed:", err);
820 }
821 return;
822 }
823 case "app.bsky.feed.post": {
824 console.log("bsky post");
825 const stmt = db.prepare(`
826 INSERT OR IGNORE INTO app_bsky_feed_post (
827 uri, did, cid, rev, createdat, indexedat, json,
828 text, replyroot, replyparent, quote,
829 imagecount, image1cid, image1mime, image1aspect,
830 image2cid, image2mime, image2aspect,
831 image3cid, image3mime, image3aspect,
832 image4cid, image4mime, image4aspect,
833 videocount, videocid, videomime, videoaspect
834 ) VALUES (?, ?, ?, ?, ?, ?, ?,
835 ?, ?, ?, ?,
836 ?, ?, ?, ?,
837 ?, ?, ?,
838 ?, ?, ?,
839 ?, ?, ?,
840 ?, ?, ?, ?)
841 `);
842
843 const embed = record.embed;
844
845 const images = extractImages(embed);
846 const video = extractVideo(embed);
847 const quoteUri = extractQuoteUri(embed);
848 try {
849 stmt.run(
850 ctx.aturi ?? null,
851 ctx.doer ?? null,
852 ctx.cid ?? null,
853 ctx.rev ?? null,
854 record.createdAt,
855 Date.now(),
856 JSON.stringify(record),
857
858 record.text ?? null,
859 record.reply?.root?.uri ?? null,
860 record.reply?.parent?.uri ?? null,
861
862 quoteUri,
863
864 images.length,
865 uncid(images[0]?.image?.ref) ?? null,
866 images[0]?.image?.mimeType ?? null,
867 images[0]?.aspectRatio &&
868 images[0].aspectRatio.width &&
869 images[0].aspectRatio.height
870 ? `${images[0].aspectRatio.width}:${images[0].aspectRatio.height}`
871 : null,
872
873 uncid(images[1]?.image?.ref) ?? null,
874 images[1]?.image?.mimeType ?? null,
875 images[1]?.aspectRatio &&
876 images[1].aspectRatio.width &&
877 images[1].aspectRatio.height
878 ? `${images[1].aspectRatio.width}:${images[1].aspectRatio.height}`
879 : null,
880
881 uncid(images[2]?.image?.ref) ?? null,
882 images[2]?.image?.mimeType ?? null,
883 images[2]?.aspectRatio &&
884 images[2].aspectRatio.width &&
885 images[2].aspectRatio.height
886 ? `${images[2].aspectRatio.width}:${images[2].aspectRatio.height}`
887 : null,
888
889 uncid(images[3]?.image?.ref) ?? null,
890 images[3]?.image?.mimeType ?? null,
891 images[3]?.aspectRatio &&
892 images[3].aspectRatio.width &&
893 images[3].aspectRatio.height
894 ? `${images[3].aspectRatio.width}:${images[3].aspectRatio.height}`
895 : null,
896
897 uncid(video?.video) ? 1 : 0,
898 uncid(video?.video) ?? null,
899 uncid(video?.video) ? "video/mp4" : null,
900 video?.aspectRatio
901 ? `${video.aspectRatio.width}:${video.aspectRatio.height}`
902 : null
903 );
904 } catch (err) {
905 console.error("stmt.run failed:", err);
906 }
907 return;
908 }
909 default: {
910 // what the hell
911 return;
912 }
913 }
914 }
915
916 // user data
917 async queryProfileView(
918 did: string,
919 type: ""
920 ): Promise<ATPAPI.AppBskyActorDefs.ProfileView | undefined>;
921 async queryProfileView(
922 did: string,
923 type: "Basic"
924 ): Promise<ATPAPI.AppBskyActorDefs.ProfileViewBasic | undefined>;
925 async queryProfileView(
926 did: string,
927 type: "Detailed"
928 ): Promise<ATPAPI.AppBskyActorDefs.ProfileViewDetailed | undefined>;
929 async queryProfileView(
930 did: string,
931 type: "" | "Basic" | "Detailed"
932 ): Promise<
933 | ATPAPI.AppBskyActorDefs.ProfileView
934 | ATPAPI.AppBskyActorDefs.ProfileViewBasic
935 | ATPAPI.AppBskyActorDefs.ProfileViewDetailed
936 | undefined
937 > {
938 if (!this.isRegisteredIndexUser(did)) return;
939 const db = this.userManager.getDbForDid(did);
940 if (!db) return;
941
942 const stmt = db.prepare(`
943 SELECT *
944 FROM app_bsky_actor_profile
945 WHERE did = ?
946 LIMIT 1;
947 `);
948
949 const row = stmt.get(did) as ProfileRow;
950
951 const identity = await resolveIdentity(did);
952 const avatar = row.avatarcid ? buildBlobUrl(
953 identity.pds,
954 identity.did,
955 row.avatarcid
956 ) : undefined
957 const banner = row.bannercid ? buildBlobUrl(
958 identity.pds,
959 identity.did,
960 row.bannercid
961 ) : undefined
962 // simulate different types returned
963 switch (type) {
964 case "": {
965 const result: ATPAPI.AppBskyActorDefs.ProfileView = {
966 $type: "app.bsky.actor.defs#profileView",
967 did: did,
968 handle: identity.handle, // TODO: Resolve user identity here for the handle
969 displayName: row.displayname ?? undefined,
970 description: row.description ?? undefined,
971 avatar: avatar, // create profile URL from resolved identity
972 //associated?: ProfileAssociated,
973 indexedAt: row.createdat
974 ? new Date(row.createdat).toISOString()
975 : undefined,
976 createdAt: row.createdat
977 ? new Date(row.createdat).toISOString()
978 : undefined,
979 //viewer?: ViewerState,
980 //labels?: ComAtprotoLabelDefs.Label[],
981 //verification?: VerificationState,
982 //status?: StatusView,
983 };
984 return result;
985 }
986 case "Basic": {
987 const result: ATPAPI.AppBskyActorDefs.ProfileViewBasic = {
988 $type: "app.bsky.actor.defs#profileViewBasic",
989 did: did,
990 handle: identity.handle, // TODO: Resolve user identity here for the handle
991 displayName: row.displayname ?? undefined,
992 avatar: avatar, // create profile URL from resolved identity
993 //associated?: ProfileAssociated,
994 createdAt: row.createdat
995 ? new Date(row.createdat).toISOString()
996 : undefined,
997 //viewer?: ViewerState,
998 //labels?: ComAtprotoLabelDefs.Label[],
999 //verification?: VerificationState,
1000 //status?: StatusView,
1001 };
1002 return result;
1003 }
1004 case "Detailed": {
1005 // Query for follower count from the backlink_skeleton table
1006 const followersStmt = db.prepare(`
1007 SELECT COUNT(*) as count
1008 FROM backlink_skeleton
1009 WHERE subdid = ? AND srccol = 'app.bsky.graph.follow'
1010 `);
1011 const followersResult = followersStmt.get(did) as { count: number };
1012 const followersCount = followersResult?.count ?? 0;
1013
1014 // Query for following count from the app_bsky_graph_follow table
1015 const followingStmt = db.prepare(`
1016 SELECT COUNT(*) as count
1017 FROM app_bsky_graph_follow
1018 WHERE did = ?
1019 `);
1020 const followingResult = followingStmt.get(did) as { count: number };
1021 const followsCount = followingResult?.count ?? 0;
1022
1023 // Query for post count from the app_bsky_feed_post table
1024 const postsStmt = db.prepare(`
1025 SELECT COUNT(*) as count
1026 FROM app_bsky_feed_post
1027 WHERE did = ?
1028 `);
1029 const postsResult = postsStmt.get(did) as { count: number };
1030 const postsCount = postsResult?.count ?? 0;
1031
1032 const result: ATPAPI.AppBskyActorDefs.ProfileViewDetailed = {
1033 $type: "app.bsky.actor.defs#profileViewDetailed",
1034 did: did,
1035 handle: identity.handle, // TODO: Resolve user identity here for the handle
1036 displayName: row.displayname ?? undefined,
1037 description: row.description ?? undefined,
1038 avatar: avatar, // TODO: create profile URL from resolved identity
1039 banner: banner, // same here
1040 followersCount: followersCount,
1041 followsCount: followsCount,
1042 postsCount: postsCount,
1043 //associated?: ProfileAssociated,
1044 //joinedViaStarterPack?: // AppBskyGraphDefs.StarterPackViewBasic;
1045 indexedAt: row.createdat
1046 ? new Date(row.createdat).toISOString()
1047 : undefined,
1048 createdAt: row.createdat
1049 ? new Date(row.createdat).toISOString()
1050 : undefined,
1051 //viewer?: ViewerState,
1052 //labels?: ComAtprotoLabelDefs.Label[],
1053 pinnedPost: undefined, //row.; // TODO: i forgot to put pinnedp posts in db schema oops
1054 //verification?: VerificationState,
1055 //status?: StatusView,
1056 };
1057 return result;
1058 }
1059 default:
1060 throw new Error("Invalid type");
1061 }
1062 }
1063
1064 // post hydration
1065 async queryPostView(
1066 uri: string
1067 ): Promise<ATPAPI.AppBskyFeedDefs.PostView | undefined> {
1068 const URI = new AtUri(uri);
1069 const did = URI.host;
1070 if (!this.isRegisteredIndexUser(did)) return;
1071 const db = this.userManager.getDbForDid(did);
1072 if (!db) return;
1073
1074 const stmt = db.prepare(`
1075 SELECT *
1076 FROM app_bsky_feed_post
1077 WHERE uri = ?
1078 LIMIT 1;
1079 `);
1080
1081 const row = stmt.get(uri) as PostRow;
1082 const profileView = await this.queryProfileView(did, "Basic");
1083 if (!row || !row.cid || !profileView || !row.json) return;
1084 const value = JSON.parse(row.json) as ATPAPI.AppBskyFeedPost.Record;
1085
1086 const post: ATPAPI.AppBskyFeedDefs.PostView = {
1087 uri: row.uri,
1088 cid: row.cid,
1089 author: profileView,
1090 record: value,
1091 indexedAt: new Date(row.indexedat).toISOString(),
1092 embed: value.embed,
1093 };
1094
1095 return post;
1096 }
1097
1098 constructPostViewRef(
1099 uri: string
1100 ): IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef {
1101 const post: IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef = {
1102 uri: uri,
1103 cid: "cid.invalid", // oh shit we dont know the cid TODO: major design flaw
1104 };
1105
1106 return post;
1107 }
1108
1109 async queryFeedViewPost(
1110 uri: string
1111 ): Promise<ATPAPI.AppBskyFeedDefs.FeedViewPost | undefined> {
1112 const post = await this.queryPostView(uri);
1113 if (!post) return;
1114
1115 const feedviewpost: ATPAPI.AppBskyFeedDefs.FeedViewPost = {
1116 $type: "app.bsky.feed.defs#feedViewPost",
1117 post: post,
1118 //reply: ReplyRef,
1119 //reason: ,
1120 };
1121
1122 return feedviewpost;
1123 }
1124
1125 constructFeedViewPostRef(
1126 uri: string
1127 ): IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef {
1128 const post = this.constructPostViewRef(uri);
1129
1130 const feedviewpostref: IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef =
1131 {
1132 $type: "party.whey.app.bsky.feed.defs#feedViewPostRef",
1133 post: post as IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef>,
1134 };
1135
1136 return feedviewpostref;
1137 }
1138
1139 // user feedgens
1140
1141 async queryActorFeeds(
1142 did: string
1143 ): Promise<ATPAPI.AppBskyFeedDefs.GeneratorView[]> {
1144 if (!this.isRegisteredIndexUser(did)) return [];
1145 const db = this.userManager.getDbForDid(did);
1146 if (!db) return [];
1147
1148 const stmt = db.prepare(`
1149 SELECT uri, cid, did, json, indexedat
1150 FROM app_bsky_feed_generator
1151 WHERE did = ?
1152 ORDER BY createdat DESC;
1153 `);
1154
1155 const rows = stmt.all(did) as unknown as GeneratorRow[];
1156 const creatorView = await this.queryProfileView(did, "Basic");
1157 if (!creatorView) return [];
1158
1159 return rows
1160 .map((row) => {
1161 try {
1162 if (!row.json) return;
1163 const record = JSON.parse(
1164 row.json
1165 ) as ATPAPI.AppBskyFeedGenerator.Record;
1166 return {
1167 $type: "app.bsky.feed.defs#generatorView",
1168 uri: row.uri,
1169 cid: row.cid,
1170 did: row.did,
1171 creator: creatorView,
1172 displayName: record.displayName,
1173 description: record.description,
1174 descriptionFacets: record.descriptionFacets,
1175 avatar: record.avatar,
1176 likeCount: 0, // TODO: this should be easy
1177 indexedAt: new Date(row.indexedat).toISOString(),
1178 } as ATPAPI.AppBskyFeedDefs.GeneratorView;
1179 } catch {
1180 return undefined;
1181 }
1182 })
1183 .filter((v): v is ATPAPI.AppBskyFeedDefs.GeneratorView => !!v);
1184 }
1185
1186 async queryFeedGenerator(
1187 uri: string
1188 ): Promise<ATPAPI.AppBskyFeedDefs.GeneratorView | undefined> {
1189 const gens = await this.queryFeedGenerators([uri]); // gens: GeneratorView[]
1190 return gens[0];
1191 }
1192
1193 async queryFeedGenerators(
1194 uris: string[]
1195 ): Promise<ATPAPI.AppBskyFeedDefs.GeneratorView[]> {
1196 const generators: ATPAPI.AppBskyFeedDefs.GeneratorView[] = [];
1197 const urisByDid = new Map<string, string[]>();
1198
1199 for (const uri of uris) {
1200 try {
1201 const { host: did } = new AtUri(uri);
1202 if (!urisByDid.has(did)) {
1203 urisByDid.set(did, []);
1204 }
1205 urisByDid.get(did)!.push(uri);
1206 } catch {}
1207 }
1208
1209 for (const [did, didUris] of urisByDid.entries()) {
1210 if (!this.isRegisteredIndexUser(did)) continue;
1211 const db = this.userManager.getDbForDid(did);
1212 if (!db) continue;
1213
1214 const placeholders = didUris.map(() => "?").join(",");
1215 const stmt = db.prepare(`
1216 SELECT uri, cid, did, json, indexedat
1217 FROM app_bsky_feed_generator
1218 WHERE uri IN (${placeholders});
1219 `);
1220
1221 const rows = stmt.all(...didUris) as unknown as GeneratorRow[];
1222 if (rows.length === 0) continue;
1223
1224 const creatorView = await this.queryProfileView(did, "");
1225 if (!creatorView) continue;
1226
1227 for (const row of rows) {
1228 try {
1229 if (!row.json || !row.cid) continue;
1230 const record = JSON.parse(
1231 row.json
1232 ) as ATPAPI.AppBskyFeedGenerator.Record;
1233 generators.push({
1234 $type: "app.bsky.feed.defs#generatorView",
1235 uri: row.uri,
1236 cid: row.cid,
1237 did: row.did,
1238 creator: creatorView,
1239 displayName: record.displayName,
1240 description: record.description,
1241 descriptionFacets: record.descriptionFacets,
1242 avatar: record.avatar as string | undefined,
1243 likeCount: 0,
1244 indexedAt: new Date(row.indexedat).toISOString(),
1245 });
1246 } catch {}
1247 }
1248 }
1249 return generators;
1250 }
1251
1252 // user feeds
1253
1254 async queryAuthorFeedPartial(
1255 did: string,
1256 cursor?: string
1257 ): Promise<
1258 | {
1259 items: (
1260 | ATPAPI.AppBskyFeedDefs.FeedViewPost
1261 | IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef
1262 )[];
1263 cursor: string | undefined;
1264 }
1265 | undefined
1266 > {
1267 if (!this.isRegisteredIndexUser(did)) return;
1268 const db = this.userManager.getDbForDid(did);
1269 if (!db) return;
1270
1271 const subquery = `
1272 SELECT uri, cid, indexedat, 'post' as type, null as subject
1273 FROM app_bsky_feed_post
1274 WHERE did = ?
1275 UNION ALL
1276 SELECT uri, cid, indexedat, 'repost' as type, subject
1277 FROM app_bsky_feed_repost
1278 WHERE did = ?
1279 `;
1280
1281 let query = `SELECT * FROM (${subquery}) as feed_items`;
1282 const params: (string | number)[] = [did, did];
1283
1284 if (cursor) {
1285 const [indexedat, cid] = cursor.split("::");
1286 query += ` WHERE (indexedat < ? OR (indexedat = ? AND cid < ?))`;
1287 params.push(parseInt(indexedat, 10), parseInt(indexedat, 10), cid);
1288 }
1289
1290 query += ` ORDER BY indexedat DESC, cid DESC LIMIT ${FEED_LIMIT}`;
1291
1292 const stmt = db.prepare(query);
1293 const rows = stmt.all(...params) as {
1294 uri: string;
1295 indexedat: number;
1296 cid: string;
1297 type: "post" | "repost";
1298 subject: string | null;
1299 }[];
1300
1301 const authorProfile = await this.queryProfileView(did, "Basic");
1302
1303 const items = await Promise.all(
1304 rows
1305 .map((row) => {
1306 if (row.type === "repost" && row.subject) {
1307 const subjectDid = new AtUri(row.subject).host;
1308
1309 const originalPost = this.handlesDid(subjectDid)
1310 ? this.queryFeedViewPost(row.subject)
1311 : this.constructFeedViewPostRef(row.subject);
1312
1313 if (!originalPost || !authorProfile) return null;
1314
1315 return {
1316 post: originalPost,
1317 reason: {
1318 $type: "app.bsky.feed.defs#reasonRepost",
1319 by: authorProfile,
1320 indexedAt: new Date(row.indexedat).toISOString(),
1321 },
1322 };
1323 } else {
1324 return this.queryFeedViewPost(row.uri);
1325 }
1326 })
1327 .filter((p): p is Promise<ATPAPI.AppBskyFeedDefs.FeedViewPost> => !!p)
1328 );
1329
1330 const lastItem = rows[rows.length - 1];
1331 const nextCursor = lastItem
1332 ? `${lastItem.indexedat}::${lastItem.cid}`
1333 : undefined;
1334
1335 return { items, cursor: nextCursor };
1336 }
1337
1338 queryListFeed(
1339 uri: string,
1340 cursor?: string
1341 ):
1342 | {
1343 items: ATPAPI.AppBskyFeedDefs.FeedViewPost[];
1344 cursor: string | undefined;
1345 }
1346 | undefined {
1347 return { items: [], cursor: undefined };
1348 }
1349
1350 async queryActorLikesPartial(
1351 did: string,
1352 cursor?: string
1353 ): Promise<
1354 | {
1355 items: (
1356 | ATPAPI.AppBskyFeedDefs.FeedViewPost
1357 | IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef
1358 )[];
1359 cursor: string | undefined;
1360 }
1361 | undefined
1362 > {
1363 // early return only if the actor did is not registered
1364 if (!this.isRegisteredIndexUser(did)) return;
1365 const db = this.userManager.getDbForDid(did);
1366 if (!db) return;
1367
1368 let query = `
1369 SELECT subject, indexedat, cid
1370 FROM app_bsky_feed_like
1371 WHERE did = ?
1372 `;
1373 const params: (string | number)[] = [did];
1374
1375 if (cursor) {
1376 const [indexedat, cid] = cursor.split("::");
1377 query += ` AND (indexedat < ? OR (indexedat = ? AND cid < ?))`;
1378 params.push(parseInt(indexedat, 10), parseInt(indexedat, 10), cid);
1379 }
1380
1381 query += ` ORDER BY indexedat DESC, cid DESC LIMIT ${FEED_LIMIT}`;
1382
1383 const stmt = db.prepare(query);
1384 const rows = stmt.all(...params) as {
1385 subject: string;
1386 indexedat: number;
1387 cid: string;
1388 }[];
1389
1390 const items = await Promise.all(
1391 rows
1392 .map(async (row) => {
1393 const subjectDid = new AtUri(row.subject).host;
1394
1395 if (this.handlesDid(subjectDid)) {
1396 return await this.queryFeedViewPost(row.subject);
1397 } else {
1398 return this.constructFeedViewPostRef(row.subject);
1399 }
1400 })
1401 .filter(
1402 (
1403 p
1404 ): p is Promise<
1405 | ATPAPI.AppBskyFeedDefs.FeedViewPost
1406 | IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef
1407 > => !!p
1408 )
1409 );
1410
1411 const lastItem = rows[rows.length - 1];
1412 const nextCursor = lastItem
1413 ? `${lastItem.indexedat}::${lastItem.cid}`
1414 : undefined;
1415
1416 return { items, cursor: nextCursor };
1417 }
1418
1419 // post metadata
1420
1421 async queryLikes(
1422 uri: string
1423 ): Promise<ATPAPI.AppBskyFeedGetLikes.Like[] | undefined> {
1424 const postUri = new AtUri(uri);
1425 const postAuthorDid = postUri.hostname;
1426 if (!this.isRegisteredIndexUser(postAuthorDid)) return;
1427 const db = this.userManager.getDbForDid(postAuthorDid);
1428 if (!db) return;
1429
1430 const stmt = db.prepare(`
1431 SELECT b.srcdid, b.srcuri
1432 FROM backlink_skeleton AS b
1433 WHERE b.suburi = ? AND b.srccol = 'app_bsky_feed_like'
1434 ORDER BY b.id DESC;
1435 `);
1436
1437 const rows = stmt.all(uri) as unknown as BacklinkRow[];
1438
1439 return await Promise.all(
1440 rows
1441 .map(async (row) => {
1442 const actor = await this.queryProfileView(row.srcdid, "");
1443 if (!actor) return;
1444
1445 return {
1446 // TODO write indexedAt for spacedust indexes
1447 createdAt: new Date(Date.now()).toISOString(),
1448 indexedAt: new Date(Date.now()).toISOString(),
1449 actor: actor,
1450 };
1451 })
1452 .filter(
1453 (like): like is Promise<ATPAPI.AppBskyFeedGetLikes.Like> => !!like
1454 )
1455 );
1456 }
1457
1458 async queryReposts(
1459 uri: string
1460 ): Promise<ATPAPI.AppBskyActorDefs.ProfileView[]> {
1461 const postUri = new AtUri(uri);
1462 const postAuthorDid = postUri.hostname;
1463 if (!this.isRegisteredIndexUser(postAuthorDid)) return [];
1464 const db = this.userManager.getDbForDid(postAuthorDid);
1465 if (!db) return [];
1466
1467 const stmt = db.prepare(`
1468 SELECT srcdid
1469 FROM backlink_skeleton
1470 WHERE suburi = ? AND srccol = 'app_bsky_feed_repost'
1471 ORDER BY id DESC;
1472 `);
1473
1474 const rows = stmt.all(uri) as { srcdid: string }[];
1475
1476 return await Promise.all(
1477 rows
1478 .map(async (row) => await this.queryProfileView(row.srcdid, ""))
1479 .filter((p): p is Promise<ATPAPI.AppBskyActorDefs.ProfileView> => !!p)
1480 );
1481 }
1482
1483 async queryQuotes(
1484 uri: string
1485 ): Promise<ATPAPI.AppBskyFeedDefs.FeedViewPost[]> {
1486 const postUri = new AtUri(uri);
1487 const postAuthorDid = postUri.hostname;
1488 if (!this.isRegisteredIndexUser(postAuthorDid)) return [];
1489 const db = this.userManager.getDbForDid(postAuthorDid);
1490 if (!db) return [];
1491
1492 const stmt = db.prepare(`
1493 SELECT srcuri
1494 FROM backlink_skeleton
1495 WHERE suburi = ? AND srccol = 'app_bsky_feed_post' AND srcfield = 'quote'
1496 ORDER BY id DESC;
1497 `);
1498
1499 const rows = stmt.all(uri) as { srcuri: string }[];
1500
1501 return await Promise.all(
1502 rows
1503 .map(async (row) => await this.queryFeedViewPost(row.srcuri))
1504 .filter((p): p is Promise<ATPAPI.AppBskyFeedDefs.FeedViewPost> => !!p)
1505 );
1506 }
1507 async _getPostViewUnion(
1508 uri: string
1509 ): Promise<
1510 | ATPAPI.AppBskyFeedDefs.PostView
1511 | IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef
1512 | undefined
1513 > {
1514 try {
1515 const postDid = new AtUri(uri).hostname;
1516 if (this.handlesDid(postDid)) {
1517 return await this.queryPostView(uri);
1518 } else {
1519 return this.constructPostViewRef(uri);
1520 }
1521 } catch (_e) {
1522 return undefined;
1523 }
1524 }
1525 async queryPostThreadPartial(
1526 uri: string
1527 ): Promise<
1528 | IndexServerTypes.PartyWheyAppBskyFeedGetPostThreadPartial.OutputSchema
1529 | undefined
1530 > {
1531 const post = await this._getPostViewUnion(uri);
1532
1533 if (!post) {
1534 return {
1535 thread: {
1536 $type: "app.bsky.feed.defs#notFoundPost",
1537 uri: uri,
1538 notFound: true,
1539 } as ATPAPI.$Typed<ATPAPI.AppBskyFeedDefs.NotFoundPost>,
1540 };
1541 }
1542
1543 const thread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef = {
1544 $type: "party.whey.app.bsky.feed.defs#threadViewPostRef",
1545 post: post as
1546 | ATPAPI.$Typed<ATPAPI.AppBskyFeedDefs.PostView>
1547 | IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef>,
1548 replies: [],
1549 };
1550
1551 let current = thread;
1552 // we can only climb the parent tree if we have the full post record.
1553 // which is not implemented yet (sad i know)
1554 if (
1555 isPostView(current.post) &&
1556 isFeedPostRecord(current.post.record) &&
1557 current.post.record?.reply?.parent?.uri
1558 ) {
1559 let parentUri: string | undefined = current.post.record.reply.parent.uri;
1560
1561 // keep climbing as long as we find a valid parent post.
1562 while (parentUri) {
1563 const parentPost = await this._getPostViewUnion(parentUri);
1564 if (!parentPost) break; // stop if a parent in the chain is not found.
1565
1566 const parentThread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef =
1567 {
1568 $type: "party.whey.app.bsky.feed.defs#threadViewPostRef",
1569 post: parentPost as ATPAPI.$Typed<ATPAPI.AppBskyFeedDefs.PostView>,
1570 replies: [
1571 current as IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef>,
1572 ],
1573 };
1574 current.parent =
1575 parentThread as IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef>;
1576 current = parentThread;
1577
1578 // check if the new current post has a parent to continue the loop
1579 parentUri =
1580 isPostView(current.post) && isFeedPostRecord(current.post.record)
1581 ? current.post.record?.reply?.parent?.uri
1582 : undefined;
1583 }
1584 }
1585
1586 const seenUris = new Set<string>();
1587 const fetchReplies = async (
1588 parentThread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef
1589 ) => {
1590 if (!parentThread.post || !("uri" in parentThread.post)) {
1591 return;
1592 }
1593 if (seenUris.has(parentThread.post.uri)) return;
1594 seenUris.add(parentThread.post.uri);
1595
1596 const parentUri = new AtUri(parentThread.post.uri);
1597 const parentAuthorDid = parentUri.hostname;
1598
1599 // replies can only be discovered for local posts where we have the backlink data
1600 if (!this.handlesDid(parentAuthorDid)) return;
1601
1602 const db = this.userManager.getDbForDid(parentAuthorDid);
1603 if (!db) return;
1604
1605 const stmt = db.prepare(`
1606 SELECT srcuri
1607 FROM backlink_skeleton
1608 WHERE suburi = ? AND srccol = 'app_bsky_feed_post' AND srcfield = 'replyparent'
1609 `);
1610 const replyRows = stmt.all(parentThread.post.uri) as { srcuri: string }[];
1611
1612 const replies = await Promise.all(
1613 replyRows
1614 .map(async (row) => await this._getPostViewUnion(row.srcuri))
1615 .filter(
1616 (
1617 p
1618 ): p is Promise<
1619 | ATPAPI.AppBskyFeedDefs.PostView
1620 | IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef
1621 > => !!p
1622 )
1623 );
1624
1625 for (const replyPost of replies) {
1626 const replyThread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef =
1627 {
1628 $type: "party.whey.app.bsky.feed.defs#threadViewPostRef",
1629 post: replyPost as
1630 | ATPAPI.$Typed<ATPAPI.AppBskyFeedDefs.PostView>
1631 | IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef>,
1632 parent:
1633 parentThread as IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef>,
1634 replies: [],
1635 };
1636 parentThread.replies?.push(
1637 replyThread as IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef>
1638 );
1639 fetchReplies(replyThread); // recurse
1640 }
1641 };
1642
1643 fetchReplies(thread);
1644
1645 const returned =
1646 current as unknown as IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef;
1647
1648 return {
1649 thread:
1650 returned as IndexServerUtils.$Typed<IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef>,
1651 };
1652 }
1653
1654 /**
1655 * please do not use this, use openDbForDid() instead
1656 * @param did
1657 * @returns
1658 */
1659 internalCreateDbForDid(did: string): Database {
1660 const path = `${this.config.baseDbPath}/${did}.sqlite`;
1661 const db = new Database(path);
1662 setupUserDb(db);
1663 //await db.exec(/* CREATE IF NOT EXISTS statements */);
1664 return db;
1665 }
1666 /**
1667 * @deprecated use handlesDid() instead
1668 * @param did
1669 * @returns
1670 */
1671 isRegisteredIndexUser(did: string): boolean {
1672 const stmt = this.systemDB.prepare(`
1673 SELECT 1
1674 FROM users
1675 WHERE did = ?
1676 AND onboardingstatus != 'onboarding-backfill'
1677 LIMIT 1;
1678 `);
1679 const result = stmt.value<[number]>(did);
1680 const exists = result !== undefined;
1681 return exists;
1682 }
1683}
1684
1685export class IndexServerUserManager {
1686 public indexServer: IndexServer;
1687
1688 constructor(indexServer: IndexServer) {
1689 this.indexServer = indexServer;
1690 }
1691
1692 public users = new Map<string, UserIndexServer>();
1693 public handlesDid(did: string): boolean {
1694 return this.users.has(did);
1695 }
1696
1697 /*async*/ addUser(did: string) {
1698 if (this.users.has(did)) return;
1699 const instance = new UserIndexServer(this, did);
1700 //await instance.initialize();
1701 this.users.set(did, instance);
1702 }
1703
1704 // async handleRequest({
1705 // did,
1706 // route,
1707 // req,
1708 // }: {
1709 // did: string;
1710 // route: string;
1711 // req: Request;
1712 // }) {
1713 // if (!this.users.has(did)) await this.addUser(did);
1714 // const user = this.users.get(did)!;
1715 // return await user.handleHttpRequest(route, req);
1716 // }
1717
1718 removeUser(did: string) {
1719 const instance = this.users.get(did);
1720 if (!instance) return;
1721 /*await*/ instance.shutdown();
1722 this.users.delete(did);
1723 }
1724
1725 getDbForDid(did: string): Database | null {
1726 if (!this.users.has(did)) {
1727 return null;
1728 }
1729 return this.users.get(did)?.db ?? null;
1730 }
1731
1732 coldStart(db: Database) {
1733 const rows = db.prepare("SELECT did FROM users").all();
1734 for (const row of rows) {
1735 this.addUser(row.did);
1736 }
1737 }
1738}
1739
1740class UserIndexServer {
1741 public indexServerUserManager: IndexServerUserManager;
1742 did: string;
1743 db: Database; // | undefined;
1744 jetstream: JetstreamManager; // | undefined;
1745 spacedust: SpacedustManager; // | undefined;
1746
1747 constructor(indexServerUserManager: IndexServerUserManager, did: string) {
1748 this.did = did;
1749 this.indexServerUserManager = indexServerUserManager;
1750 this.db = this.indexServerUserManager.indexServer.internalCreateDbForDid(
1751 this.did
1752 );
1753 // should probably put the params of exactly what were listening to here
1754 this.jetstream = new JetstreamManager((msg) => {
1755 console.log("Received Jetstream message: ", msg);
1756
1757 const op = msg.commit.operation;
1758 const doer = msg.did;
1759 const rev = msg.commit.rev;
1760 const aturi = `${msg.did}/${msg.commit.collection}/${msg.commit.rkey}`;
1761 const value = msg.commit.record;
1762
1763 if (!doer || !value) return;
1764 this.indexServerUserManager.indexServer.indexServerIndexer({
1765 op,
1766 doer,
1767 cid: msg.commit.cid,
1768 rev,
1769 aturi,
1770 value,
1771 indexsrc: `jetstream-${op}`,
1772 db: this.db,
1773 });
1774 });
1775 this.jetstream.start({
1776 // for realsies pls get from db or something instead of this shit
1777 wantedDids: [
1778 this.did,
1779 // "did:plc:mn45tewwnse5btfftvd3powc",
1780 // "did:plc:yy6kbriyxtimkjqonqatv2rb",
1781 // "did:plc:zzhzjga3ab5fcs2vnsv2ist3",
1782 // "did:plc:jz4ibztn56hygfld6j6zjszg",
1783 ],
1784 wantedCollections: [
1785 "app.bsky.actor.profile",
1786 "app.bsky.feed.generator",
1787 "app.bsky.feed.like",
1788 "app.bsky.feed.post",
1789 "app.bsky.feed.repost",
1790 "app.bsky.feed.threadgate",
1791 "app.bsky.graph.block",
1792 "app.bsky.graph.follow",
1793 "app.bsky.graph.list",
1794 "app.bsky.graph.listblock",
1795 "app.bsky.graph.listitem",
1796 "app.bsky.notification.declaration",
1797 ],
1798 });
1799 //await connectToJetstream(this.did, this.db);
1800 this.spacedust = new SpacedustManager((msg: SpacedustLinkMessage) => {
1801 console.log("Received Spacedust message: ", msg);
1802 const operation = msg.link.operation;
1803
1804 const sourceURI = new AtUri(msg.link.source_record);
1805 const srcUri = msg.link.source_record;
1806 const srcDid = sourceURI.host;
1807 const srcField = msg.link.source;
1808 const srcCol = sourceURI.collection;
1809 const subjectURI = new AtUri(msg.link.subject);
1810 const subUri = msg.link.subject;
1811 const subDid = subjectURI.host;
1812 const subCol = subjectURI.collection;
1813
1814 if (operation === "delete") {
1815 this.db.run(
1816 `DELETE FROM backlink_skeleton
1817 WHERE srcuri = ? AND srcfield = ? AND suburi = ?`,
1818 [srcUri, srcField, subUri]
1819 );
1820 } else if (operation === "create") {
1821 this.db.run(
1822 `INSERT OR REPLACE INTO backlink_skeleton (
1823 srcuri,
1824 srcdid,
1825 srcfield,
1826 srccol,
1827 suburi,
1828 subdid,
1829 subcol,
1830 indexedAt
1831 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
1832 [
1833 srcUri, // full AT URI of the source record
1834 srcDid, // did: of the source
1835 srcField, // e.g., "reply.parent.uri" or "facets.features.did"
1836 srcCol, // e.g., "app.bsky.feed.post"
1837 subUri, // full AT URI of the subject (linked record)
1838 subDid, // did: of the subject
1839 subCol, // subject collection (can be inferred or passed)
1840 Date.now()
1841 ]
1842 );
1843 }
1844 });
1845 this.spacedust.start({
1846 wantedSources: [
1847 "app.bsky.feed.like:subject.uri", // like
1848 "app.bsky.feed.like:via.uri", // liked repost
1849 "app.bsky.feed.repost:subject.uri", // repost
1850 "app.bsky.feed.repost:via.uri", // reposted repost
1851 "app.bsky.feed.post:reply.root.uri", // thread OP
1852 "app.bsky.feed.post:reply.parent.uri", // direct parent
1853 "app.bsky.feed.post:embed.media.record.record.uri", // quote with media
1854 "app.bsky.feed.post:embed.record.uri", // quote without media
1855 "app.bsky.feed.threadgate:post", // threadgate subject
1856 "app.bsky.feed.threadgate:hiddenReplies", // threadgate items (array)
1857 "app.bsky.feed.post:facets.features.did", // facet item (array): mention
1858 "app.bsky.graph.block:subject", // blocks
1859 "app.bsky.graph.follow:subject", // follow
1860 "app.bsky.graph.listblock:subject", // list item (blocks)
1861 "app.bsky.graph.listblock:list", // blocklist mention (might not exist)
1862 "app.bsky.graph.listitem:subject", // list item (blocks)
1863 "app.bsky.graph.listitem:list", // list mention
1864 ],
1865 // should be getting from DB but whatever right
1866 wantedSubjects: [
1867 // as noted i dont need to write down each post, just the user to listen to !
1868 // hell yeah
1869 // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybv7b6ic2h",
1870 // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybws4avc2h",
1871 // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvvkcxcscs2h",
1872 // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3l63ogxocq42f",
1873 // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3lw3wamvflu23",
1874 ],
1875 wantedSubjectDids: [
1876 this.did,
1877 //"did:plc:mn45tewwnse5btfftvd3powc",
1878 //"did:plc:yy6kbriyxtimkjqonqatv2rb",
1879 //"did:plc:zzhzjga3ab5fcs2vnsv2ist3",
1880 //"did:plc:jz4ibztn56hygfld6j6zjszg",
1881 ],
1882 instant: ["true"]
1883 });
1884 //await connectToConstellation(this.did, this.db);
1885 }
1886
1887 // initialize() {
1888
1889 // }
1890
1891 // async handleHttpRequest(route: string, req: Request): Promise<Response> {
1892 // if (route === "posts") {
1893 // const posts = await this.queryPosts();
1894 // return new Response(JSON.stringify(posts), {
1895 // headers: { "content-type": "application/json" },
1896 // });
1897 // }
1898
1899 // return new Response("Unknown route", { status: 404 });
1900 // }
1901
1902 // private async queryPosts() {
1903 // return this.db.run(
1904 // "SELECT * FROM posts ORDER BY created_at DESC LIMIT 100"
1905 // );
1906 // }
1907
1908 shutdown() {
1909 this.jetstream.stop();
1910 this.spacedust.stop();
1911 this.db.close?.();
1912 }
1913}
1914
1915// /**
1916// * please do not use this, use openDbForDid() instead
1917// * @param did
1918// * @returns
1919// */
1920// function internalCreateDbForDid(did: string): Database {
1921// const path = `./dbs/${did}.sqlite`;
1922// const db = new Database(path);
1923// setupUserDb(db);
1924// //await db.exec(/* CREATE IF NOT EXISTS statements */);
1925// return db;
1926// }
1927
1928// function getDbForDid(did: string): Database | undefined {
1929// const db = indexerUserManager.getDbForDid(did);
1930// if (!db) return;
1931// return db;
1932// }
1933
1934// async function connectToJetstream(did: string, db: Database): Promise<WebSocket> {
1935// const url = `${jetstreamurl}/xrpc/com.atproto.sync.subscribeRepos?did=${did}`;
1936// const ws = new WebSocket(url);
1937// ws.onmessage = (msg) => {
1938// //handleJetstreamMessage(evt.data, db)
1939
1940// const op = msg.commit.operation;
1941// const doer = msg.did;
1942// const rev = msg.commit.rev;
1943// const aturi = `${msg.did}/${msg.commit.collection}/${msg.commit.rkey}`;
1944// const value = msg.commit.record;
1945
1946// if (!doer || !value) return;
1947// indexServerIndexer({
1948// op,
1949// doer,
1950// rev,
1951// aturi,
1952// value,
1953// indexsrc: "onboarding_backfill",
1954// userdbname: did,
1955// })
1956// };
1957
1958// return ws;
1959// }
1960
1961// async function connectToConstellation(did: string, db: D1Database): Promise<WebSocket> {
1962// const url = `wss://bsky.social/xrpc/com.atproto.sync.subscribeLabels?did=${did}`;
1963// const ws = new WebSocket(url);
1964// ws.onmessage = (evt) => handleConstellationMessage(evt.data, db);
1965// return ws;
1966// }
1967
1968type PostRow = {
1969 uri: string;
1970 did: string;
1971 cid: string | null;
1972 rev: string | null;
1973 createdat: number | null;
1974 indexedat: number;
1975 json: string | null;
1976
1977 text: string | null;
1978 replyroot: string | null;
1979 replyparent: string | null;
1980 quote: string | null;
1981
1982 imagecount: number | null;
1983 image1cid: string | null;
1984 image1mime: string | null;
1985 image1aspect: string | null;
1986 image2cid: string | null;
1987 image2mime: string | null;
1988 image2aspect: string | null;
1989 image3cid: string | null;
1990 image3mime: string | null;
1991 image3aspect: string | null;
1992 image4cid: string | null;
1993 image4mime: string | null;
1994 image4aspect: string | null;
1995
1996 videocount: number | null;
1997 videocid: string | null;
1998 videomime: string | null;
1999 videoaspect: string | null;
2000};
2001
2002type ProfileRow = {
2003 uri: string;
2004 cid: string | null;
2005 rev: string | null;
2006 createdat: number | null;
2007 indexedat: number;
2008 json: string | null;
2009 displayname: string | null;
2010 description: string | null;
2011 avatarcid: string | null;
2012 avatarmime: string | null;
2013 bannercid: string | null;
2014 bannermime: string | null;
2015};
2016
2017type linksQuery = {
2018 target: string;
2019 collection: string;
2020 path: string;
2021 cursor?: string;
2022};
2023type linksRecord = {
2024 did: string;
2025 collection: string;
2026 rkey: string;
2027};
2028type linksRecordsResponse = {
2029 total: string;
2030 linking_records: linksRecord[];
2031 cursor?: string;
2032};
2033type linksDidsResponse = {
2034 total: string;
2035 linking_dids: string[];
2036 cursor?: string;
2037};
2038type linksCountResponse = {
2039 total: string;
2040};
2041type linksAllResponse = {
2042 links: Record<
2043 string,
2044 Record<
2045 string,
2046 {
2047 records: number;
2048 distinct_dids: number;
2049 }
2050 >
2051 >;
2052};
2053
2054type linksAllQuery = {
2055 target: string;
2056};
2057
2058const SQL = {
2059 links: `
2060 SELECT srcuri, srcdid, srccol
2061 FROM backlink_skeleton
2062 WHERE suburi = ? AND srccol = ? AND srcfield = ?
2063 `,
2064 distinctDids: `
2065 SELECT DISTINCT srcdid
2066 FROM backlink_skeleton
2067 WHERE suburi = ? AND srccol = ? AND srcfield = ?
2068 `,
2069 count: `
2070 SELECT COUNT(*) as total
2071 FROM backlink_skeleton
2072 WHERE suburi = ? AND srccol = ? AND srcfield = ?
2073 `,
2074 countDistinctDids: `
2075 SELECT COUNT(DISTINCT srcdid) as total
2076 FROM backlink_skeleton
2077 WHERE suburi = ? AND srccol = ? AND srcfield = ?
2078 `,
2079 all: `
2080 SELECT suburi, srccol, COUNT(*) as records, COUNT(DISTINCT srcdid) as distinct_dids
2081 FROM backlink_skeleton
2082 WHERE suburi = ?
2083 GROUP BY suburi, srccol
2084 `,
2085};
2086
2087export function isDid(str: string): boolean {
2088 return typeof str === "string" && str.startsWith("did:");
2089}
2090
2091function isFeedPostRecord(
2092 post: unknown
2093): post is ATPAPI.AppBskyFeedPost.Record {
2094 return (
2095 typeof post === "object" &&
2096 post !== null &&
2097 "$type" in post &&
2098 (post as any).$type === "app.bsky.feed.post"
2099 );
2100}
2101
2102function isImageEmbed(embed: unknown): embed is ATPAPI.AppBskyEmbedImages.Main {
2103 return (
2104 typeof embed === "object" &&
2105 embed !== null &&
2106 "$type" in embed &&
2107 (embed as any).$type === "app.bsky.embed.images"
2108 );
2109}
2110
2111function isVideoEmbed(embed: unknown): embed is ATPAPI.AppBskyEmbedVideo.Main {
2112 return (
2113 typeof embed === "object" &&
2114 embed !== null &&
2115 "$type" in embed &&
2116 (embed as any).$type === "app.bsky.embed.video"
2117 );
2118}
2119
2120function isRecordEmbed(
2121 embed: unknown
2122): embed is ATPAPI.AppBskyEmbedRecord.Main {
2123 return (
2124 typeof embed === "object" &&
2125 embed !== null &&
2126 "$type" in embed &&
2127 (embed as any).$type === "app.bsky.embed.record"
2128 );
2129}
2130
2131function isRecordWithMediaEmbed(
2132 embed: unknown
2133): embed is ATPAPI.AppBskyEmbedRecordWithMedia.Main {
2134 return (
2135 typeof embed === "object" &&
2136 embed !== null &&
2137 "$type" in embed &&
2138 (embed as any).$type === "app.bsky.embed.recordWithMedia"
2139 );
2140}
2141
2142export function uncid(anything: any): string | null {
2143 return (
2144 ((anything as Record<string, unknown>)?.["$link"] as string | null) || null
2145 );
2146}
2147
2148function extractImages(embed: unknown) {
2149 if (isImageEmbed(embed)) return embed.images;
2150 if (isRecordWithMediaEmbed(embed) && isImageEmbed(embed.media))
2151 return embed.media.images;
2152 return [];
2153}
2154
2155function extractVideo(embed: unknown) {
2156 if (isVideoEmbed(embed)) return embed;
2157 if (isRecordWithMediaEmbed(embed) && isVideoEmbed(embed.media))
2158 return embed.media;
2159 return null;
2160}
2161
2162function extractQuoteUri(embed: unknown): string | null {
2163 if (isRecordEmbed(embed)) return embed.record.uri;
2164 if (isRecordWithMediaEmbed(embed)) return embed.record.record.uri;
2165 return null;
2166}