A couple of Bluesky feeds focused around PDSes
at main 5.5 kB view raw
1import { 2 AppBskyFeedDescribeFeedGenerator, 3 AppBskyFeedGetFeedSkeleton, 4} from "@atcute/bluesky"; 5import { 6 CompositeDidDocumentResolver, 7 PlcDidDocumentResolver, 8 WebDidDocumentResolver, 9} from "@atcute/identity-resolver"; 10import { 11 parseResourceUri, 12 type Nsid, 13 type ResourceUri, 14} from "@atcute/lexicons/syntax"; 15import { 16 AuthRequiredError, 17 InvalidRequestError, 18 XRPCRouter, 19 json, 20} from "@atcute/xrpc-server"; 21import { ServiceJwtVerifier, type VerifiedJwt } from "@atcute/xrpc-server/auth"; 22import { cors } from "@atcute/xrpc-server/middlewares/cors"; 23import type { Statement } from "@db/sqlite"; 24 25import { db } from "./common/db.ts"; 26import type { Author, DID, Post } from "./common/types.ts"; 27 28const publisher = Deno.env.get("PUBLISHER") ?? "did:example:bob"; 29const hostname = Deno.env.get("HOSTNAME"); 30if (!hostname) { 31 console.error("HOSTNAME not provided! Exiting now."); 32 Deno.exit(1); 33} 34 35const baseDID: DID = `did:web:${hostname}`; 36 37const app = new XRPCRouter({ middlewares: [cors()] }); 38const didResolver = new CompositeDidDocumentResolver({ 39 methods: { 40 plc: new PlcDidDocumentResolver(), 41 web: new WebDidDocumentResolver(), 42 }, 43}); 44const verifier = new ServiceJwtVerifier({ 45 serviceDid: baseDID, 46 resolver: didResolver, 47}); 48 49const feeds: Record< 50 string, 51 { default: Statement; cursor: Statement; pds: boolean } 52> = { 53 "your-pds": { 54 default: db.prepare( 55 `SELECT a.uri, a.indexed_at FROM posts a 56 INNER JOIN authors b ON a.author = b.did 57 WHERE b.pds = ?1 58 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?2;` 59 ), 60 cursor: db.prepare( 61 `SELECT a.uri, a.indexed_at FROM posts a 62 INNER JOIN authors b ON a.author = b.did 63 WHERE b.pds = ?1 64 AND a.indexed_at < ?2 65 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?3;` 66 ), 67 pds: true, 68 }, 69 "non-bsky-pds": { 70 default: db.prepare( 71 `SELECT a.uri, a.indexed_at FROM posts a 72 INNER JOIN authors b ON a.author = b.did 73 WHERE b.pds_base != 'bsky.network' 74 AND b.pds_base != 'brid.gy' 75 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?1;` 76 ), 77 cursor: db.prepare( 78 `SELECT a.uri, a.indexed_at FROM posts a 79 INNER JOIN authors b ON a.author = b.did 80 WHERE b.pds_base != 'bsky.network' 81 AND b.pds_base != 'brid.gy' 82 AND a.indexed_at < ?1 83 ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?2;` 84 ), 85 pds: false, 86 }, 87}; 88 89const requireAuth = async ( 90 request: Request, 91 lxm: Nsid 92): Promise<VerifiedJwt> => { 93 const auth = request.headers.get("authorization"); 94 if (auth === null) { 95 throw new AuthRequiredError({ 96 description: `missing authorization header`, 97 }); 98 } 99 if (!auth.startsWith("Bearer ")) { 100 throw new AuthRequiredError({ 101 description: `invalid authorization scheme`, 102 }); 103 } 104 105 const jwtString = auth.slice("Bearer ".length).trim(); 106 107 const result = await verifier.verify(jwtString, { lxm }); 108 if (!result.ok) { 109 throw new AuthRequiredError(result.error); 110 } 111 112 return result.value; 113}; 114 115const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?"); 116 117app.add(AppBskyFeedGetFeedSkeleton.mainSchema, { 118 async handler({ request, params: { feed, limit, cursor } }) { 119 const feedUri = parseResourceUri(feed); 120 if (!feedUri.ok || !feedUri.value.rkey) { 121 throw new InvalidRequestError(); 122 } 123 124 const feedQuery = feeds[feedUri.value.rkey]; 125 126 if ( 127 feedUri.value.repo !== publisher || 128 feedUri.value.collection !== "app.bsky.feed.generator" || 129 !feedQuery 130 ) { 131 throw new InvalidRequestError({ 132 error: "UnsupportedAlgorithm", 133 description: "Unsupported algorithm", 134 }); 135 } 136 137 let pds = ""; 138 139 if (feedQuery.pds) { 140 const jwt = await requireAuth(request, "app.bsky.feed.getFeedSkeleton"); 141 142 const author = getAuthor.get<Author>(jwt.issuer); 143 if (author) { 144 pds = author.pds; 145 } else { 146 const resolved = await didResolver.resolve(jwt.issuer as DID); 147 for (const service of resolved.service ?? []) { 148 if ( 149 service.type == "AtprotoPersonalDataServer" && 150 typeof service.serviceEndpoint === "string" 151 ) { 152 pds = service.serviceEndpoint; 153 } 154 } 155 if (typeof pds !== "string") 156 throw new InvalidRequestError({ 157 error: "NoServiceEndpoint", 158 description: "No service endpoint", 159 }); 160 } 161 } 162 163 let res: Post[]; 164 if (cursor) { 165 const timeStr = new Date(parseInt(cursor, 10)).toISOString(); 166 if (feedQuery.pds) { 167 res = feedQuery.cursor.all(pds, timeStr, limit); 168 } else { 169 res = feedQuery.cursor.all(timeStr, limit); 170 } 171 } else { 172 if (feedQuery.pds) { 173 res = feedQuery.default.all(pds, limit); 174 } else { 175 res = feedQuery.default.all(limit); 176 } 177 } 178 179 const posts = res.map((row) => ({ 180 post: row.uri, 181 })); 182 183 let cs: string | undefined; 184 const last = res.at(-1); 185 if (last) { 186 cs = new Date(last.indexed_at).getTime().toString(10); 187 } 188 189 return json({ 190 cursor: cs, 191 feed: posts, 192 }); 193 }, 194}); 195 196app.add(AppBskyFeedDescribeFeedGenerator.mainSchema, { 197 handler() { 198 const feedArray = Object.keys(feeds).map((v) => ({ 199 uri: `at://${publisher}/app.bsky.feed.generator/${v}` as ResourceUri, 200 })); 201 return json({ 202 did: baseDID, 203 feeds: feedArray, 204 }); 205 }, 206}); 207 208export default app;