import ky from "npm:ky"; import { isGeneratorView } from "./indexserver/types/app/bsky/feed/defs.ts"; import * as ViewServerTypes from "./utils/viewservertypes.ts"; import * as ATPAPI from "npm:@atproto/api"; import { searchParamsToJson, resolveIdentity, buildBlobUrl, cachedFetch, didWebToHttps, getSlingshotRecord, withCors, } from "./utils/server.ts"; import QuickLRU from "npm:quick-lru"; import { validateRecord } from "./utils/records.ts"; import { indexHandlerContext } from "./index/types.ts"; import { Database } from "jsr:@db/sqlite@0.11"; import { JetstreamManager, SpacedustManager } from "./utils/sharders.ts"; import { SpacedustLinkMessage } from "./index/spacedust.ts"; import { setupUserDb } from "./utils/dbuser.ts"; import { config } from "./config.ts"; import { AtUri } from "npm:@atproto/api"; import { CID } from "../../Library/Caches/deno/npm/registry.npmjs.org/multiformats/9.9.0/cjs/src/cid.js"; import { uncid } from "./indexserver.ts"; import { getAuthenticatedDid } from "./utils/auth.ts"; const temporarydevelopmentblockednotiftypes: ATPAPI.AppBskyNotificationListNotifications.Notification["reason"][] = [ //'like', //'repost', //'follow', //'mention', //'reply', //'quote', //'starterpack-joined', //'liked-via-repost', //'repost-via-repost', ]; export interface ViewServerConfig { baseDbPath: string; systemDbPath: string; } interface BaseRow { uri: string; did: string; cid: string | null; rev: string | null; createdat: number | null; indexedAt: number; json: string | null; } interface GeneratorRow extends BaseRow { displayname: string | null; description: string | null; avatarcid: string | null; } interface LikeRow extends BaseRow { subject: string; } interface RepostRow extends BaseRow { subject: string; } interface BacklinkRow { srcuri: string; srcdid: string; } const FEED_LIMIT = 50; export class ViewServer { private config: ViewServerConfig; public userManager: ViewServerUserManager; public systemDB: Database; constructor(config: ViewServerConfig) { this.config = config; // We will initialize the system DB and user manager here this.systemDB = new Database(this.config.systemDbPath); // TODO: We need to setup the system DB schema if it's new this.userManager = new ViewServerUserManager(this); // Pass the server instance } public start() { // This is where we'll kick things off, like the cold start this.userManager.coldStart(this.systemDB); console.log("viewServer started."); } async unspeccedGetRegisteredUsers(): Promise<{ did: string; role: string; registrationdate: string; onboardingstatus: string; pfp?: string; displayname: string; handle: string; }[]|undefined> { const stmt = this.systemDB.prepare(` SELECT * FROM users; `); const result = stmt.all() as { did: string; role: string; registrationdate: string; onboardingstatus: string; }[]; const hydrated = await Promise.all( result.map(async (user)=>{ const identity = await resolveIdentity(user.did); const profile = (await getSlingshotRecord(identity.did,"app.bsky.actor.profile","self")).value as ATPAPI.AppBskyActorProfile.Record; const avatarcid = uncid(profile.avatar?.ref); const avatar = avatarcid ? buildBlobUrl(identity.pds, identity.did, avatarcid) : undefined; return {...user,handle: identity.handle,pfp: avatar, displayname:profile.displayName ?? identity.handle } })) //const exists = result !== undefined; return hydrated; } async viewServerHandler(req: Request): Promise { const url = new URL(req.url); const pathname = url.pathname; const bskyUrl = `https://api.bsky.app${pathname}${url.search}`; const hasAuth = req.headers.has("authorization"); const xrpcMethod = pathname.startsWith("/xrpc/") ? pathname.slice("/xrpc/".length) : null; const searchParams = searchParamsToJson(url.searchParams); const jsonUntyped = searchParams; let tempauthdid: string | undefined = undefined; try { tempauthdid = (await getAuthenticatedDid(req)) ?? undefined; } catch (_e) { // nothing lol } const authdid = tempauthdid ? this.handlesDid(tempauthdid) ? tempauthdid : undefined : undefined; console.log("authed:", authdid); if (xrpcMethod === "app.bsky.unspecced.getTrendingTopics") { // const jsonTyped = // jsonUntyped as ViewServerTypes.AppBskyUnspeccedGetTrendingTopics.QueryParams; const faketopics: ATPAPI.AppBskyUnspeccedDefs.TrendingTopic[] = [ { $type: "app.bsky.unspecced.defs#trendingTopic", topic: "Git Repo", displayName: "Git Repo", description: "Git Repo", link: "https://tangled.sh/@whey.party/skylite", }, { $type: "app.bsky.unspecced.defs#trendingTopic", topic: "this View Server url", displayName: "this View Server url", description: "this View Server url", link: config.viewServer.host, }, { $type: "app.bsky.unspecced.defs#trendingTopic", topic: "this social-app fork url", displayName: "this social-app fork url", description: "this social-app fork url", link: "https://github.com/rimar1337/social-app/tree/publicappview-colorable", }, { $type: "app.bsky.unspecced.defs#trendingTopic", topic: "whey dot party", displayName: "whey dot party", description: "whey dot party", link: "https://whey.party/", }, ]; const response: ViewServerTypes.AppBskyUnspeccedGetTrendingTopics.OutputSchema = { topics: faketopics, suggested: faketopics, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } //if (xrpcMethod !== 'app.bsky.actor.getPreferences' && xrpcMethod !== 'app.bsky.notification.listNotifications') { if ( !hasAuth // (!hasAuth || // xrpcMethod === "app.bsky.labeler.getServices" || // xrpcMethod === "app.bsky.unspecced.getConfig") && // xrpcMethod !== "app.bsky.notification.putPreferences" ) { return new Response( JSON.stringify({ error: "XRPCNotSupported", message: "(no auth) HEY hello there my name is whey dot party and you have used my custom appview that is very cool but have you considered that XRPC Not Supported", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); //return await sendItToApiBskyApp(req); } if ( // !hasAuth || xrpcMethod === "app.bsky.labeler.getServices" || xrpcMethod === "app.bsky.unspecced.getConfig" //&& //xrpcMethod !== "app.bsky.notification.putPreferences" ) { return new Response( JSON.stringify({ error: "XRPCNotSupported", message: "(getservices / getconfig) HEY hello there my name is whey dot party and you have used my custom appview that is very cool but have you considered that XRPC Not Supported", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); //return await sendItToApiBskyApp(req); } const authDID = "did:plc:mn45tewwnse5btfftvd3powc"; //getAuthenticatedDid(req); switch (xrpcMethod) { case "app.bsky.feed.getFeedGenerators": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyFeedGetFeedGenerators.QueryParams; const feeds: ATPAPI.AppBskyFeedDefs.GeneratorView[] = ( await Promise.all( jsonTyped.feeds.map(async (feed) => { try { const did = new ATPAPI.AtUri(feed).hostname; const rkey = new ATPAPI.AtUri(feed).rkey; const identity = await resolveIdentity(did); const feedgetRecord = await getSlingshotRecord( identity.did, "app.bsky.feed.generator", rkey ); const profile = ( await getSlingshotRecord( identity.did, "app.bsky.actor.profile", "self" ) ).value as ATPAPI.AppBskyActorProfile.Record; const anyprofile = profile as any; const value = feedgetRecord.value as ATPAPI.AppBskyFeedGenerator.Record; return { $type: "app.bsky.feed.defs#generatorView", uri: feed, cid: feedgetRecord.cid, did: identity.did, creator: /*AppBskyActorDefs.ProfileView*/ { $type: "app.bsky.actor.defs#profileView", did: identity.did, handle: identity.handle, displayName: profile.displayName, description: profile.description, avatar: buildBlobUrl( identity.pds, identity.did, anyprofile.avatar.ref["$link"] ), //associated?: ProfileAssociated //indexedAt?: string //createdAt?: string //viewer?: ViewerState //labels?: ComAtprotoLabelDefs.Label[] //verification?: VerificationState //status?: StatusView }, displayName: value.displayName, description: value.description, //descriptionFacets?: AppBskyRichtextFacet.Main[] avatar: buildBlobUrl( identity.pds, identity.did, (value as any).avatar.ref["$link"] ), //likeCount?: number //acceptsInteractions?: boolean //labels?: ComAtprotoLabelDefs.Label[] //viewer?: GeneratorViewerState contentMode: value.contentMode, indexedAt: new Date().toISOString(), }; } catch (err) { return undefined; } }) ) ).filter(isGeneratorView); const response: ViewServerTypes.AppBskyFeedGetFeedGenerators.OutputSchema = { feeds: feeds ? feeds : [], }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getFeed": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyFeedGetFeed.QueryParams; const cursor = jsonTyped.cursor; const feed = jsonTyped.feed; const limit = jsonTyped.limit; const proxyauth = req.headers.get("authorization") || ""; const did = new ATPAPI.AtUri(feed).hostname; const rkey = new ATPAPI.AtUri(feed).rkey; const identity = await resolveIdentity(did); const feedgetRecord = ( await getSlingshotRecord( identity.did, "app.bsky.feed.generator", rkey ) ).value as ATPAPI.AppBskyFeedGenerator.Record; const skeleton = (await cachedFetch( `${didWebToHttps( feedgetRecord.did )}/xrpc/app.bsky.feed.getFeedSkeleton?feed=${jsonTyped.feed}${ cursor ? `&cursor=${cursor}` : "" }${limit ? `&limit=${limit}` : ""}`, proxyauth )) as ATPAPI.AppBskyFeedGetFeedSkeleton.OutputSchema; const nextcursor = skeleton.cursor; const dbgrqstid = skeleton.reqId; const uriarray = skeleton.feed; // Step 1: Chunk into 25 max const chunks = []; for (let i = 0; i < uriarray.length; i += 25) { chunks.push(uriarray.slice(i, i + 25)); } // Step 2: Hydrate via getPosts const hydratedPosts: ATPAPI.AppBskyFeedDefs.FeedViewPost[] = []; for (const chunk of chunks) { const searchParams = new URLSearchParams(); for (const uri of chunk.map((item) => item.post)) { searchParams.append("uris", uri); } const postResp = await ky // TODO aaaaaa dont do this please use the new getServiceEndpointFromIdentity() .get(`https://api.bsky.app/xrpc/app.bsky.feed.getPosts`, { // headers: { // Authorization: proxyauth, // }, searchParams, }) .json(); for (const post of postResp.posts) { const matchingSkeleton = uriarray.find( (item) => item.post === post.uri ); if (matchingSkeleton) { //post.author.handle = post.author.handle + ".percent40.api.bsky.app"; // or any logic to modify it hydratedPosts.push({ post, reason: matchingSkeleton.reason, //reply: matchingSkeleton, }); } } } // Step 3: Compose final response const response: ViewServerTypes.AppBskyFeedGetFeed.OutputSchema = { feed: hydratedPosts, cursor: nextcursor, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.actor.getProfile": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyActorGetProfile.QueryParams; const response: ViewServerTypes.AppBskyActorGetProfile.OutputSchema = ((await this.resolveGetProfiles([jsonTyped.actor])) ?? [])[0]; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.actor.getProfiles": { const jsonhalfTyped = jsonUntyped as ViewServerTypes.AppBskyActorGetProfiles.QueryParams; const actors = jsonhalfTyped.actors as string[] | string const queryactors = Array.isArray(actors) ? actors : [actors]; //console.log("queryactors:",jsonTyped.actors) const response: ViewServerTypes.AppBskyActorGetProfiles.OutputSchema = { profiles: (await this.resolveGetProfiles(queryactors)) ?? [], }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getAuthorFeed": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyFeedGetAuthorFeed.QueryParams; const userindexservice = ""; const isbskyfallback = true; if (isbskyfallback) { return this.sendItToApiBskyApp(req); } const response: ViewServerTypes.AppBskyFeedGetAuthorFeed.OutputSchema = {}; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getPostThread": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyFeedGetPostThread.QueryParams; const userindexservice = ""; const isbskyfallback = true; if (isbskyfallback) { return this.sendItToApiBskyApp(req); } const response: ViewServerTypes.AppBskyFeedGetPostThread.OutputSchema = {}; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.unspecced.getPostThreadV2": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyUnspeccedGetPostThreadV2.QueryParams; const userindexservice = ""; const isbskyfallback = true; if (isbskyfallback) { return this.sendItToApiBskyApp(req); } const response: ViewServerTypes.AppBskyUnspeccedGetPostThreadV2.OutputSchema = {}; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } // case "app.bsky.actor.getProfile": { // const jsonTyped = // jsonUntyped as ViewServerTypes.AppBskyActorGetProfile.QueryParams; // const response: ViewServerTypes.AppBskyActorGetProfile.OutputSchema= {}; // return new Response(JSON.stringify(response), { // headers: withCors({ "Content-Type": "application/json" }), // }); // } // case "app.bsky.actor.getProfiles": { // const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyActorGetProfiles.QueryParams; // const response: ViewServerTypes.AppBskyActorGetProfiles.OutputSchema = {}; // return new Response(JSON.stringify(response), { // headers: withCors({ "Content-Type": "application/json" }), // }); // } // case "whatever": { // const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyFeedGetAuthorFeed.QueryParams; // const response: ViewServerTypes.AppBskyFeedGetAuthorFeed.OutputSchema = {} // return new Response(JSON.stringify(response), { // headers: withCors({ "Content-Type": "application/json" }), // }); // } case "app.bsky.notification.listNotifications": { if (!authdid) return new Response("Not Found", { status: 404 }); const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyNotificationListNotifications.QueryParams; const response: ViewServerTypes.AppBskyNotificationListNotifications.OutputSchema = await this.queryNotificationsList(authdid, jsonTyped.cursor, jsonTyped.limit); return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getPosts": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyFeedGetPosts.QueryParams; const inputUris = Array.isArray(jsonTyped.uris) ? jsonTyped.uris : [jsonTyped.uris]; const response: ViewServerTypes.AppBskyFeedGetPosts.OutputSchema = { posts: await this.resolveGetPosts(inputUris), }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.unspecced.getConfig": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyUnspeccedGetConfig.QueryParams; const response: ViewServerTypes.AppBskyUnspeccedGetConfig.OutputSchema = { checkEmailConfirmed: true, liveNow: [ { $type: "app.bsky.unspecced.getConfig#liveNowConfig", did: "did:plc:mn45tewwnse5btfftvd3powc", domains: ["local3768forumtest.whey.party"], }, ], }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.graph.getLists": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyGraphGetLists.QueryParams; const response: ViewServerTypes.AppBskyGraphGetLists.OutputSchema = { lists: [], }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } //https://shimeji.us-east.host.bsky.network/xrpc/app.bsky.unspecced.getTrendingTopics?limit=14 case "app.bsky.unspecced.getTrendingTopics": { const jsonTyped = jsonUntyped as ViewServerTypes.AppBskyUnspeccedGetTrendingTopics.QueryParams; const faketopics: ATPAPI.AppBskyUnspeccedDefs.TrendingTopic[] = [ { $type: "app.bsky.unspecced.defs#trendingTopic", topic: "Git Repo", displayName: "Git Repo", description: "Git Repo", link: "https://tangled.sh/@whey.party/skylite", }, { $type: "app.bsky.unspecced.defs#trendingTopic", topic: "Red Dwarf Lite", displayName: "Red Dwarf Lite", description: "Red Dwarf Lite", link: "https://reddwarf.whey.party/", }, { $type: "app.bsky.unspecced.defs#trendingTopic", topic: "whey dot party", displayName: "whey dot party", description: "whey dot party", link: "https://whey.party/", }, ]; const response: ViewServerTypes.AppBskyUnspeccedGetTrendingTopics.OutputSchema = { topics: faketopics, suggested: faketopics, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } default: { return new Response( JSON.stringify({ error: "XRPCNotSupported", message: "(default) HEY hello there my name is whey dot party and you have used my custom appview that is very cool but have you considered that XRPC Not Supported", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } } // return new Response("Not Found", { status: 404 }); } async sendItToApiBskyApp(req: Request): Promise { const url = new URL(req.url); const pathname = url.pathname; const searchParams = searchParamsToJson(url.searchParams); let reqBody: undefined | string; let jsonbody: undefined | Record; if (req.body) { const body = await req.json(); jsonbody = body; // console.log( // `called at euh reqreqreqreq: ${pathname}\n\n${JSON.stringify(body)}` // ); reqBody = JSON.stringify(body, null, 2); } const bskyUrl = `https://public.api.bsky.app${pathname}${url.search}`; console.log("request", searchParams); const proxyHeaders = new Headers(req.headers); // Remove Authorization and set browser-like User-Agent proxyHeaders.delete("authorization"); proxyHeaders.delete("Access-Control-Allow-Origin"), proxyHeaders.set( "user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36" ); proxyHeaders.set("Access-Control-Allow-Origin", "*"); const proxyRes = await fetch(bskyUrl, { method: req.method, headers: proxyHeaders, body: ["GET", "HEAD"].includes(req.method.toUpperCase()) ? undefined : reqBody, }); const resBody = await proxyRes.text(); // console.log( // "← Response:", // JSON.stringify(await JSON.parse(resBody), null, 2) // ); return new Response(resBody, { status: proxyRes.status, headers: proxyRes.headers, }); } viewServerIndexer(ctx: indexHandlerContext) { const record = validateRecord(ctx.value); switch (record?.$type) { case "app.bsky.feed.like": { return; } default: { // what the hell return; } } } /** * please do not use this, use openDbForDid() instead * @param did * @returns */ internalCreateDbForDid(did: string): Database { const path = `${this.config.baseDbPath}/${did}.sqlite`; const db = new Database(path); // TODO maybe split the user db schema between view server and index server setupUserDb(db); //await db.exec(/* CREATE IF NOT EXISTS statements */); return db; } public handlesDid(did: string): boolean { return this.userManager.handlesDid(did); } async resolveGetPosts( uris: string[] ): Promise { const grouped: Record = {}; // Group URIs by resolved endpoint for (const uri of uris) { const did = new AtUri(uri).host; const endpoint = await getSkyliteEndpoint(did); if (!endpoint) continue; if (!grouped[endpoint]) { grouped[endpoint] = []; } grouped[endpoint].push(uri); } const postviews: ATPAPI.AppBskyFeedDefs.PostView[] = []; // Fetch posts per endpoint for (const [endpoint, urisForEndpoint] of Object.entries(grouped)) { const query = urisForEndpoint .map((u) => `uris=${encodeURIComponent(u)}`) .join("&"); const url = `${endpoint}/xrpc/app.bsky.feed.getPosts?${query}`; const resp = await fetch(url); if (!resp.ok) { throw new Error( `Failed to fetch posts from ${endpoint} for uris=${urisForEndpoint.join( "," )}` ); } const raw = (await resp.json()) as ATPAPI.AppBskyFeedGetPosts.OutputSchema; postviews.push(...raw.posts); } return postviews; } async resolveGetProfiles( dids: string[] ): Promise { const profiles: ATPAPI.AppBskyActorDefs.ProfileViewDetailed[] = []; for (const did of dids) { const endpoint = await getSkyliteEndpoint(did); const url = `${endpoint}/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent( did )}`; const resp = await fetch(url); if (!resp.ok) throw new Error(`Failed to fetch profile for ${did} via ${url}`); const raw = (await resp.json()) as ATPAPI.AppBskyActorGetProfile.OutputSchema; profiles.push(raw); } return profiles; } async queryNotificationsList( did: string, cursor?: string, limit?: number ): Promise { if (!this.handlesDid(did)) { return { notifications: [] }; } const db = this.userManager.getDbForDid(did); if (!db) { return { notifications: [] }; } const NOTIFS_LIMIT = limit ?? 30; const offset = cursor ? parseInt(cursor, 10) : 0; const mapReason = ( field: string ): | ATPAPI.AppBskyNotificationListNotifications.Notification["reason"] | undefined => { switch (field) { //'like' | 'repost' | 'follow' | 'mention' | 'reply' | 'quote' | 'starterpack-joined' | 'verified' | 'unverified' | 'like-via-repost' | 'repost-via-repost' | case "app.bsky.feed.like:subject.uri": return "like"; case "app.bsky.feed.like:via.uri": return "liked-via-repost"; case "app.bsky.feed.repost:subject.uri": return "repost"; case "app.bsky.feed.repost:via.uri": return "repost-via-repost"; case "app.bsky.feed.post:reply.root.uri": return "reply"; case "app.bsky.feed.post:reply.parent.uri": return "reply"; case "app.bsky.feed.post:embed.media.record.record.uri": return "quote"; case "app.bsky.feed.post:embed.record.uri": return "quote"; //case"app.bsky.feed.threadgate:post": return "threadgate subject //case"app.bsky.feed.threadgate:hiddenReplies": return "threadgate items (array) case "app.bsky.feed.post:facets.features.did": return "mention"; //case"app.bsky.graph.block:subject": return "blocks case "app.bsky.graph.follow:subject": return "follow"; //case"app.bsky.graph.listblock:subject": return "list item (blocks) //case"app.bsky.graph.listblock:list": return "blocklist mention (might not exist) //case"app.bsky.graph.listitem:subject": return "list item (blocks) //"app.bsky.graph.listitem:list": return "list mention // case "like": return "like"; // case "repost": return "repost"; // case "follow": return "follow"; // case "replyparent": return "reply"; // case "replyroot": return "reply"; // case "mention": return "mention"; default: return undefined; } }; // --- Build Query --- let query = ` SELECT srcuri, suburi, srcfield, indexedAt FROM backlink_skeleton WHERE -- Find actions targeting the user's content or profile (suburi LIKE ? OR suburi = ?) -- Exclude notifications from the user themselves AND srcuri NOT LIKE ? ORDER BY indexedAt DESC, srcuri DESC LIMIT ? OFFSET ? `; const params: (string | number)[] = [ `at://${did}/%`, did, `at://${did}/%`, NOTIFS_LIMIT, offset ]; // if (cursor) { // const [indexedAt, srcuri] = cursor.split("::"); // if (indexedAt && srcuri && !Number.isNaN(+indexedAt)) { // query += ` AND (indexedAt < ? OR (indexedAt = ? AND srcuri < ?))`; // params.push(+indexedAt, +indexedAt, srcuri); // } // } // query += ` ORDER BY indexedAt DESC, srcuri DESC LIMIT ${NOTIFS_LIMIT}`; // --- Fetch and Process --- const stmt = db.prepare(query); const rows = stmt.all(...params) as { srcuri: string; suburi: string; // might be uri, might be just a did srcfield: string; indexedAt: number; }[]; const notificationPromises = rows.map(async (row) => { try { const reason = mapReason(row.srcfield); // i have a hunch that follow notifs are crashing the client if (!reason || temporarydevelopmentblockednotiftypes.includes(reason)) { return null; } // Skip if it's a backlink type we don't have a notification for if (!reason) return null; const srcURI = new AtUri(row.srcuri); const [authorRes, recordRes] = await Promise.allSettled([ this.resolveProfileView(srcURI.host, ""), getSlingshotRecord(srcURI.host, srcURI.collection, srcURI.rkey), ]); const author = authorRes.status === "fulfilled" ? authorRes.value : null; const getrecord = recordRes.status === "fulfilled" ? recordRes.value : null; const reasonsubject = row.suburi.startsWith("at://") ? row.suburi : `at://${row.suburi}`; // If we can't resolve the author or the record, we can't form a valid notification if (!author || !getrecord || !reason || !reasonsubject) return null; author.viewer = { "muted": false, "blockedBy": false, //"following": } // TODO: proper mutes and blocks here if (!getrecord?.value?.$type) getrecord.value.$type = srcURI.collection return { uri: row.srcuri, cid: getrecord.cid, author: author, reason: reason, // The reasonSubject is the URI of the post that was liked, reposted, or replied to reasonSubject: reasonsubject, record: getrecord.value, isRead: false, // Placeholder for read-state logic indexedAt: new Date(row.indexedAt).toISOString(), labels: [], // Placeholder for label logic } as ATPAPI.AppBskyNotificationListNotifications.Notification; } catch (e) {console.log("error:",e)} }); const seen = new Set(); const notifications = (await Promise.all(notificationPromises)) .filter((n): n is ATPAPI.AppBskyNotificationListNotifications.Notification => { if (!n) return false; const key = `${n.uri}:${n.reason}:${n.reasonSubject}`; if (seen.has(key)) return false; seen.add(key); return true; }); // --- Create next cursor --- const nextCursor:number = Number(offset) + Number(limit ?? 0) // const lastItem = rows[rows.length - 1]; // const nextCursor = lastItem // ? `${lastItem.indexedAt}::${lastItem.srcuri}` // : undefined; return { cursor: `${nextCursor}`,//nextCursor, notifications: notifications, priority:false, seenAt: new Date().toISOString() }; } async resolveProfileView( did: string, type: "" ): Promise; async resolveProfileView( did: string, type: "Basic" ): Promise; async resolveProfileView( did: string, type: "Detailed" ): Promise; async resolveProfileView( did: string, type: "" | "Basic" | "Detailed" ): Promise< | ATPAPI.AppBskyActorDefs.ProfileView | ATPAPI.AppBskyActorDefs.ProfileViewBasic | ATPAPI.AppBskyActorDefs.ProfileViewDetailed | undefined > { const record = ( await getSlingshotRecord(did, "app.bsky.actor.profile", "self") ).value as ATPAPI.AppBskyActorProfile.Record; const identity = await resolveIdentity(did); const avatarcid = uncid(record.avatar?.ref); const avatar = avatarcid ? buildBlobUrl(identity.pds, identity.did, avatarcid) : undefined; const bannercid = uncid(record.banner?.ref); const banner = bannercid ? buildBlobUrl(identity.pds, identity.did, bannercid) : undefined; // simulate different types returned switch (type) { case "": { const result: ATPAPI.AppBskyActorDefs.ProfileView = { $type: "app.bsky.actor.defs#profileView", did: did, handle: identity.handle, displayName: record.displayName ?? identity.handle, description: record.description ?? undefined, avatar: avatar, // create profile URL from resolved identity //associated?: ProfileAssociated, indexedAt: record.createdAt ? new Date(record.createdAt).toISOString() : undefined, createdAt: record.createdAt ? new Date(record.createdAt).toISOString() : undefined, //viewer?: ViewerState, //labels?: ComAtprotoLabelDefs.Label[], //verification?: VerificationState, //status?: StatusView, }; return result; } case "Basic": { const result: ATPAPI.AppBskyActorDefs.ProfileViewBasic = { $type: "app.bsky.actor.defs#profileViewBasic", did: did, handle: identity.handle, displayName: record.displayName ?? identity.handle, avatar: avatar, // create profile URL from resolved identity //associated?: ProfileAssociated, createdAt: record.createdAt ? new Date(record.createdAt).toISOString() : undefined, //viewer?: ViewerState, //labels?: ComAtprotoLabelDefs.Label[], //verification?: VerificationState, //status?: StatusView, }; return result; } case "Detailed": { const response: ViewServerTypes.AppBskyActorGetProfile.OutputSchema = ((await this.resolveGetProfiles([did])) ?? [])[0]; return response; } default: throw new Error("Invalid type"); } } } export class ViewServerUserManager { public viewServer: ViewServer; constructor(viewServer: ViewServer) { this.viewServer = viewServer; } public users = new Map(); public handlesDid(did: string): boolean { return this.users.has(did); } /*async*/ addUser(did: string) { if (this.users.has(did)) return; const instance = new UserViewServer(this, did); //await instance.initialize(); this.users.set(did, instance); } // async handleRequest({ // did, // route, // req, // }: { // did: string; // route: string; // req: Request; // }) { // if (!this.users.has(did)) await this.addUser(did); // const user = this.users.get(did)!; // return await user.handleHttpRequest(route, req); // } removeUser(did: string) { const instance = this.users.get(did); if (!instance) return; /*await*/ instance.shutdown(); this.users.delete(did); } getDbForDid(did: string): Database | null { if (!this.users.has(did)) { return null; } return this.users.get(did)?.db ?? null; } coldStart(db: Database) { const rows = db.prepare("SELECT did FROM users").all(); for (const row of rows) { this.addUser(row.did); } } } class UserViewServer { public viewServerUserManager: ViewServerUserManager; did: string; db: Database; // | undefined; jetstream: JetstreamManager; // | undefined; spacedust: SpacedustManager; // | undefined; constructor(viewServerUserManager: ViewServerUserManager, did: string) { this.did = did; this.viewServerUserManager = viewServerUserManager; this.db = this.viewServerUserManager.viewServer.internalCreateDbForDid( this.did ); // should probably put the params of exactly what were listening to here this.jetstream = new JetstreamManager((msg) => { console.log("Received Jetstream message: ", msg); const op = msg.commit.operation; const doer = msg.did; const rev = msg.commit.rev; const aturi = `${msg.did}/${msg.commit.collection}/${msg.commit.rkey}`; const value = msg.commit.record; if (!doer || !value) return; this.viewServerUserManager.viewServer.viewServerIndexer({ op, doer, cid: msg.commit.cid, rev, aturi, value, indexsrc: `jetstream-${op}`, db: this.db, }); }); this.jetstream.start({ // for realsies pls get from db or something instead of this shit wantedDids: [ this.did, // "did:plc:mn45tewwnse5btfftvd3powc", // "did:plc:yy6kbriyxtimkjqonqatv2rb", // "did:plc:zzhzjga3ab5fcs2vnsv2ist3", // "did:plc:jz4ibztn56hygfld6j6zjszg", ], wantedCollections: [ // View server only needs some of the things related to user views mutes, not all of them //"app.bsky.actor.profile", //"app.bsky.feed.generator", //"app.bsky.feed.like", //"app.bsky.feed.post", //"app.bsky.feed.repost", "app.bsky.feed.threadgate", // mod "app.bsky.graph.block", // mod "app.bsky.graph.follow", // graphing //"app.bsky.graph.list", "app.bsky.graph.listblock", // mod //"app.bsky.graph.listitem", "app.bsky.notification.declaration", // mod ], }); //await connectToJetstream(this.did, this.db); this.spacedust = new SpacedustManager((msg: SpacedustLinkMessage) => { console.log("Received Spacedust message: ", msg); const operation = msg.link.operation; const sourceURI = new ATPAPI.AtUri(msg.link.source_record); const srcUri = msg.link.source_record; const srcDid = sourceURI.host; const srcField = msg.link.source; const srcCol = sourceURI.collection; const subjectURI = new ATPAPI.AtUri(msg.link.subject); const subUri = msg.link.subject; const subDid = subjectURI.host; const subCol = subjectURI.collection; if (operation === "delete") { this.db.run( `DELETE FROM backlink_skeleton WHERE srcuri = ? AND srcfield = ? AND suburi = ?`, [srcUri, srcField, subUri] ); } else if (operation === "create") { this.db.run( `INSERT OR REPLACE INTO backlink_skeleton ( srcuri, srcdid, srcfield, srccol, suburi, subdid, subcol, indexedAt ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, [ srcUri, // full AT URI of the source record srcDid, // did: of the source srcField, // e.g., "reply.parent.uri" or "facets.features.did" srcCol, // e.g., "app.bsky.feed.post" subUri, // full AT URI of the subject (linked record) subDid, // did: of the subject subCol, // subject collection (can be inferred or passed) Date.now(), ] ); } }); this.spacedust.start({ wantedSources: [ // view server keeps all of this because notifications are a thing "app.bsky.feed.like:subject.uri", // like "app.bsky.feed.like:via.uri", // liked repost "app.bsky.feed.repost:subject.uri", // repost "app.bsky.feed.repost:via.uri", // reposted repost "app.bsky.feed.post:reply.root.uri", // thread OP "app.bsky.feed.post:reply.parent.uri", // direct parent "app.bsky.feed.post:embed.media.record.record.uri", // quote with media "app.bsky.feed.post:embed.record.uri", // quote without media "app.bsky.feed.threadgate:post", // threadgate subject "app.bsky.feed.threadgate:hiddenReplies", // threadgate items (array) "app.bsky.feed.post:facets.features.did", // facet item (array): mention "app.bsky.graph.block:subject", // blocks "app.bsky.graph.follow:subject", // follow "app.bsky.graph.listblock:subject", // list item (blocks) "app.bsky.graph.listblock:list", // blocklist mention (might not exist) "app.bsky.graph.listitem:subject", // list item (blocks) "app.bsky.graph.listitem:list", // list mention ], // should be getting from DB but whatever right wantedSubjects: [ // as noted i dont need to write down each post, just the user to listen to ! // hell yeah // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybv7b6ic2h", // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybws4avc2h", // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvvkcxcscs2h", // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3l63ogxocq42f", // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3lw3wamvflu23", ], wantedSubjectDids: [ this.did, //"did:plc:mn45tewwnse5btfftvd3powc", //"did:plc:yy6kbriyxtimkjqonqatv2rb", //"did:plc:zzhzjga3ab5fcs2vnsv2ist3", //"did:plc:jz4ibztn56hygfld6j6zjszg", ], instant: ["true"] }); //await connectToConstellation(this.did, this.db); } // initialize() { // } // async handleHttpRequest(route: string, req: Request): Promise { // if (route === "posts") { // const posts = await this.queryPosts(); // return new Response(JSON.stringify(posts), { // headers: { "content-type": "application/json" }, // }); // } // return new Response("Unknown route", { status: 404 }); // } // private async queryPosts() { // return this.db.run( // "SELECT * FROM posts ORDER BY created_at DESC LIMIT 100" // ); // } shutdown() { this.jetstream.stop(); this.spacedust.stop(); this.db.close?.(); } } async function getServiceEndpointFromIdentity( did: string, kind: "skylite_index" | "bsky_appview" ): Promise { //const identity = await resolveIdentity(did); //const declUrl = `${identity.pds}/xrpc/com.atproto.repo.getRecord?repo=${identity.did}&collection=party.whey.skylite.declaration&rkey=self`; //const data = (await cachedFetch(declUrl)) as any; const data = await getSlingshotRecord(did,"party.whey.skylite.declaration","self") as any; //if (!resp.ok) throw new Error(`Failed to fetch declaration for ${did}`); //const data = await resp.json(); const svc = data?.value?.service?.find((s: any) => s.id === `#${kind}`); return svc?.serviceEndpoint ?? null; } const cache = new QuickLRU({ maxSize: 10000 }); async function getSkyliteEndpoint(did: string): Promise { if (cache.has(did)) return cache.get(did) as string; for (const resolver of config.viewServer.indexPriority) { try { const [prefix, suffix] = resolver.split("#") as [ "user" | `did:web:${string}`, "skylite_index" | "bsky_appview" ]; if (prefix === "user") { return await getServiceEndpointFromIdentity(did, suffix); } else if (prefix.startsWith("did:web:")) { // map did:web:foo.com -> https://foo.com return prefix.replace("did:web:", "https://"); } } catch (err) { // continue to next resolver continue; } } return null; //throw new Error(`No endpoint found for ${resolver}`); } // export interface Notification { // $type?: 'app.bsky.notification.listNotifications#notification'; // uri: string; // cid: string; // author: AppBskyActorDefs.ProfileView; // /** The reason why this notification was delivered - e.g. your post was liked, or you received a new follower. */ // reason: 'like' | 'repost' | 'follow' | 'mention' | 'reply' | 'quote' | 'starterpack-joined' | 'verified' | 'unverified' | 'like-via-repost' | 'repost-via-repost' | (string & {}); // reasonSubject?: string; // record: { // [_ in string]: unknown; // }; // isRead: boolean; // indexedAt: string; // labels?: ComAtprotoLabelDefs.Label[]; // }