A couple of Bluesky feeds focused around PDSes
1import {
2 AppBskyFeedDescribeFeedGenerator,
3 AppBskyFeedGetFeedSkeleton,
4} from "@atcute/bluesky";
5import {
6 CompositeDidDocumentResolver,
7 PlcDidDocumentResolver,
8 WebDidDocumentResolver,
9} from "@atcute/identity-resolver";
10import {
11 parseResourceUri,
12 type Nsid,
13 type ResourceUri,
14} from "@atcute/lexicons/syntax";
15import {
16 AuthRequiredError,
17 InvalidRequestError,
18 XRPCRouter,
19 json,
20} from "@atcute/xrpc-server";
21import { ServiceJwtVerifier, type VerifiedJwt } from "@atcute/xrpc-server/auth";
22import { cors } from "@atcute/xrpc-server/middlewares/cors";
23import type { Statement } from "@db/sqlite";
24
25import { db } from "./common/db.ts";
26import type { Author, DID, Post } from "./common/types.ts";
27
28const publisher = Deno.env.get("PUBLISHER") ?? "did:example:bob";
29const hostname = Deno.env.get("HOSTNAME");
30if (!hostname) {
31 console.error("HOSTNAME not provided! Exiting now.");
32 Deno.exit(1);
33}
34
35const baseDID: DID = `did:web:${hostname}`;
36
37const app = new XRPCRouter({ middlewares: [cors()] });
38const didResolver = new CompositeDidDocumentResolver({
39 methods: {
40 plc: new PlcDidDocumentResolver(),
41 web: new WebDidDocumentResolver(),
42 },
43});
44const verifier = new ServiceJwtVerifier({
45 serviceDid: baseDID,
46 resolver: didResolver,
47});
48
49const feeds: Record<
50 string,
51 { default: Statement; cursor: Statement; pds: boolean }
52> = {
53 "your-pds": {
54 default: db.prepare(
55 `SELECT a.uri, a.indexed_at FROM posts a
56 INNER JOIN authors b ON a.author = b.did
57 WHERE b.pds = ?1
58 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?2;`
59 ),
60 cursor: db.prepare(
61 `SELECT a.uri, a.indexed_at FROM posts a
62 INNER JOIN authors b ON a.author = b.did
63 WHERE b.pds = ?1
64 AND a.indexed_at < ?2
65 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?3;`
66 ),
67 pds: true,
68 },
69 "non-bsky-pds": {
70 default: db.prepare(
71 `SELECT a.uri, a.indexed_at FROM posts a
72 INNER JOIN authors b ON a.author = b.did
73 WHERE b.pds_base != 'bsky.network'
74 AND b.pds_base != 'brid.gy'
75 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?1;`
76 ),
77 cursor: db.prepare(
78 `SELECT a.uri, a.indexed_at FROM posts a
79 INNER JOIN authors b ON a.author = b.did
80 WHERE b.pds_base != 'bsky.network'
81 AND b.pds_base != 'brid.gy'
82 AND a.indexed_at < ?1
83 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?2;`
84 ),
85 pds: false,
86 },
87};
88
89const requireAuth = async (
90 request: Request,
91 lxm: Nsid
92): Promise<VerifiedJwt> => {
93 const auth = request.headers.get("authorization");
94 if (auth === null) {
95 throw new AuthRequiredError({
96 description: `missing authorization header`,
97 });
98 }
99 if (!auth.startsWith("Bearer ")) {
100 throw new AuthRequiredError({
101 description: `invalid authorization scheme`,
102 });
103 }
104
105 const jwtString = auth.slice("Bearer ".length).trim();
106
107 const result = await verifier.verify(jwtString, { lxm });
108 if (!result.ok) {
109 throw new AuthRequiredError(result.error);
110 }
111
112 return result.value;
113};
114
115const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?");
116
117app.add(AppBskyFeedGetFeedSkeleton.mainSchema, {
118 async handler({ request, params: { feed, limit, cursor } }) {
119 const feedUri = parseResourceUri(feed);
120 if (!feedUri.ok || !feedUri.value.rkey) {
121 throw new InvalidRequestError();
122 }
123
124 const feedQuery = feeds[feedUri.value.rkey];
125
126 if (
127 feedUri.value.repo !== publisher ||
128 feedUri.value.collection !== "app.bsky.feed.generator" ||
129 !feedQuery
130 ) {
131 throw new InvalidRequestError({
132 error: "UnsupportedAlgorithm",
133 description: "Unsupported algorithm",
134 });
135 }
136
137 let pds = "";
138
139 if (feedQuery.pds) {
140 const jwt = await requireAuth(request, "app.bsky.feed.getFeedSkeleton");
141
142 const author = getAuthor.get<Author>(jwt.issuer);
143 if (author) {
144 pds = author.pds;
145 } else {
146 const resolved = await didResolver.resolve(jwt.issuer as DID);
147 for (const service of resolved.service ?? []) {
148 if (
149 service.type == "AtprotoPersonalDataServer" &&
150 typeof service.serviceEndpoint === "string"
151 ) {
152 pds = service.serviceEndpoint;
153 }
154 }
155 if (typeof pds !== "string")
156 throw new InvalidRequestError({
157 error: "NoServiceEndpoint",
158 description: "No service endpoint",
159 });
160 }
161 }
162
163 let res: Post[];
164 if (cursor) {
165 const timeStr = new Date(parseInt(cursor, 10)).toISOString();
166 if (feedQuery.pds) {
167 res = feedQuery.cursor.all(pds, timeStr, limit);
168 } else {
169 res = feedQuery.cursor.all(timeStr, limit);
170 }
171 } else {
172 if (feedQuery.pds) {
173 res = feedQuery.default.all(pds, limit);
174 } else {
175 res = feedQuery.default.all(limit);
176 }
177 }
178
179 const posts = res.map((row) => ({
180 post: row.uri,
181 }));
182
183 let cs: string | undefined;
184 const last = res.at(-1);
185 if (last) {
186 cs = new Date(last.indexed_at).getTime().toString(10);
187 }
188
189 return json({
190 cursor: cs,
191 feed: posts,
192 });
193 },
194});
195
196app.add(AppBskyFeedDescribeFeedGenerator.mainSchema, {
197 handler() {
198 const feedArray = Object.keys(feeds).map((v) => ({
199 uri: `at://${publisher}/app.bsky.feed.generator/${v}` as ResourceUri,
200 }));
201 return json({
202 did: baseDID,
203 feeds: feedArray,
204 });
205 },
206});
207
208export default app;