work-in-progress atproto PDS
typescript atproto pds atcute

refactor: perform service proxying after local handlers

mary.my.id f840ace6 681d8a28

verified
Changed files
+173 -43
packages
+4 -2
packages/danaus/src/api/com.atproto/identity.resolveHandle.ts
··· 10 10 * @param context app context 11 11 */ 12 12 export const resolveHandle = (router: XRPCRouter, context: AppContext) => { 13 - const { accountManager, config, handleResolver } = context; 13 + const { accountManager, config, handleResolver, proxy } = context; 14 14 15 15 router.addQuery(ComAtprotoIdentityResolveHandle, { 16 - async handler({ params }) { 16 + async handler({ params, request }) { 17 + await proxy.passthrough(request); 18 + 17 19 const handle = params.handle.toLowerCase(); 18 20 19 21 if (!isHandle(handle)) {
+4 -2
packages/danaus/src/api/com.atproto/repo.getRecord.ts
··· 9 9 * @param context app context 10 10 */ 11 11 export const getRecord = (router: XRPCRouter, context: AppContext) => { 12 - const { accountManager, actorManager } = context; 12 + const { accountManager, actorManager, proxy } = context; 13 13 14 14 router.addQuery(ComAtprotoRepoGetRecord, { 15 - async handler({ params }) { 15 + async handler({ params, request }) { 16 + await proxy.passthrough(request); 17 + 16 18 const { repo, collection, rkey, cid } = params; 17 19 18 20 const did = accountManager.getAccountDid(repo);
+1 -8
packages/danaus/src/bin/pds.ts
··· 23 23 24 24 targets.set('did:web:api.bsky.app#bsky_appview', { 25 25 to: `did:web:localhost%3A${BSKY_PORT}#bsky_appview`, 26 - exclude: [ 27 - 'app.bsky.actor.getPreferences', 28 - 'app.bsky.actor.putPreferences', 29 - 'com.atproto.repo.applyWrites', 30 - 'com.atproto.repo.createRecord', 31 - 'com.atproto.repo.putRecord', 32 - 'com.atproto.server.getSession', 33 - ], 26 + exclude: ['app.bsky.actor.getPreferences', 'app.bsky.actor.putPreferences'], 34 27 }); 35 28 36 29 const pds = await TestPds.create({
+13
packages/danaus/src/context.ts
··· 21 21 import { CachedDidDocumentResolver } from './identity/cached-did-document-resolver'; 22 22 import { CachedHandleResolver } from './identity/cached-handle-resolver'; 23 23 import { IdentityCache } from './identity/manager'; 24 + import { createServiceProxy, type ServiceProxy } from './proxy/index'; 24 25 import { Sequencer } from './sequencer/sequencer'; 25 26 26 27 export interface AppContext { ··· 38 39 authVerifier: AuthVerifier; 39 40 40 41 sequencer: Sequencer; 42 + 43 + /** service proxy for forwarding requests to atproto-proxy targets */ 44 + proxy: ServiceProxy; 41 45 } 42 46 43 47 export const createAppContext = (config: AppConfig): AppContext => { ··· 116 120 didDocumentResolver: didDocumentResolver, 117 121 }); 118 122 123 + const proxy = createServiceProxy({ 124 + targets: config.proxy.targets, 125 + authVerifier: authVerifier, 126 + actorManager: actorManager, 127 + didDocumentResolver: didDocumentResolver, 128 + }); 129 + 119 130 return { 120 131 config: config, 121 132 ··· 131 142 authVerifier: authVerifier, 132 143 133 144 sequencer: sequencer, 145 + 146 + proxy: proxy, 134 147 }; 135 148 };
+1 -2
packages/danaus/src/pds-server.ts
··· 8 8 import { localDanaus } from './api/local.danaus/index.ts'; 9 9 import type { AppConfig } from './config.ts'; 10 10 import { createAppContext, type AppContext } from './context.ts'; 11 - import { createProxyMiddleware } from './proxy/index.ts'; 12 11 import { createWebRouter } from './web/router.ts'; 13 12 import styles from './web/styles/main.out.css' with { type: 'file' }; 14 13 ··· 61 60 allowedHeaders: ['x-bsky-topics'], 62 61 allowPrivateNetwork: true, 63 62 }), 64 - createProxyMiddleware(context), 65 63 ], 64 + handleNotFound: context.proxy.handleNotFound, 66 65 handleException(err, request) { 67 66 return defaultExceptionHandler(err, request); 68 67 },
+103 -24
packages/danaus/src/proxy/index.ts
··· 1 - import { InvalidRequestError, type FetchMiddleware } from '@atcute/xrpc-server'; 1 + import type { DidDocumentResolver } from '@atcute/identity-resolver'; 2 + import { defaultNotFoundHandler, InvalidRequestError, type NotFoundHandler } from '@atcute/xrpc-server'; 2 3 import { createServiceJwt } from '@atcute/xrpc-server/auth'; 3 4 4 - import type { AppContext } from '#app/context.ts'; 5 + import type { ActorManager } from '#app/actors/manager.ts'; 6 + import type { AuthVerifier } from '#app/auth/verifier.ts'; 7 + import type { ProxyTargetConfig } from '#app/config.ts'; 5 8 6 9 import { 7 10 buildProxyRequestHeaders, 11 + buildProxyRequestHeadersWithInput, 8 12 filterResponseHeaders, 9 13 parseProxyHeader, 10 14 parseRequestNsid, 11 15 } from './utils.ts'; 16 + 17 + export type { ProxyTarget } from './utils.ts'; 18 + 19 + export type PassthroughFn = (request: Request, input?: unknown) => Promise<void>; 20 + 21 + export interface ServiceProxy { 22 + /** not-found handler for XRPCRouter that proxies unhandled requests */ 23 + handleNotFound: NotFoundHandler; 24 + /** 25 + * proxy the request to an external service and throw the response. 26 + * use this in local handlers that want to delegate to the atproto-proxy target. 27 + * @param request original request 28 + * @param input parsed input body for POST requests (if body was already consumed) 29 + * @throws Response from the proxied request, or returns if no atproto-proxy header 30 + */ 31 + passthrough: PassthroughFn; 32 + } 33 + 34 + export interface ServiceProxyOptions { 35 + targets: Map<string, ProxyTargetConfig>; 36 + authVerifier: AuthVerifier; 37 + actorManager: ActorManager; 38 + didDocumentResolver: DidDocumentResolver<string>; 39 + } 12 40 13 41 /** 14 - * create proxy middleware that forwards requests to external services. 15 - * the middleware activates when the `atproto-proxy` header is present. 16 - * @param ctx app context 17 - * @returns fetch middleware 42 + * create service proxy handlers. 43 + * @param options proxy dependencies 44 + * @returns handleNotFound for XRPCRouter and passthrough for local handlers 18 45 */ 19 - export const createProxyMiddleware = (ctx: AppContext): FetchMiddleware => { 20 - return async (request, next) => { 46 + export const createServiceProxy = (options: ServiceProxyOptions): ServiceProxy => { 47 + const { targets, authVerifier, actorManager, didDocumentResolver } = options; 48 + 49 + /** 50 + * core proxy logic - performs the actual proxying. 51 + */ 52 + const proxyRequest = async (request: Request, input?: unknown): Promise<Response> => { 21 53 const proxyHeader = request.headers.get('atproto-proxy'); 22 54 if (!proxyHeader) { 23 - return next(request); 55 + throw new InvalidRequestError({ description: `missing atproto-proxy header` }); 24 56 } 25 57 26 58 // only allow GET, HEAD, POST ··· 28 60 throw new InvalidRequestError({ description: `XRPC requests only support GET, HEAD, and POST` }); 29 61 } 30 62 63 + // dev-only check: input should not be provided for GET requests 64 + if (import.meta.env?.DEV && input !== undefined && request.method === 'GET') { 65 + throw new Error(`passthrough: input provided for GET request`); 66 + } 67 + 31 68 // parse NSID from request path 32 69 const lxm = parseRequestNsid(request); 33 70 34 71 // parse proxy header and resolve target 35 - const target = await parseProxyHeader(ctx, proxyHeader, lxm); 72 + const target = await parseProxyHeader(targets, didDocumentResolver, proxyHeader, lxm); 36 73 if (target === null) { 37 74 // NSID is excluded from proxying for this target 38 - return next(request); 75 + throw new InvalidRequestError({ description: `method not found` }); 39 76 } 40 77 41 78 // verify authorization and get user DID 42 - const auth = await ctx.authVerifier.authorization(request); 79 + const auth = await authVerifier.authorization(request); 43 80 44 81 // load user's signing keypair 45 - const keypair = await ctx.actorManager.importKeypair(auth.did); 82 + const keypair = await actorManager.importKeypair(auth.did); 46 83 47 84 // create service auth JWT signed with user's key 48 85 const serviceJwt = await createServiceJwt({ ··· 57 94 upstreamUrl.protocol = new URL(target.url).protocol; 58 95 upstreamUrl.host = new URL(target.url).host; 59 96 60 - const upstreamHeaders = buildProxyRequestHeaders(request, serviceJwt); 97 + let upstreamHeaders: Headers; 98 + let upstreamBody: Bun.BodyInit | null = null; 99 + 100 + if (input !== undefined) { 101 + // body was already consumed, reserialize from input 102 + upstreamHeaders = buildProxyRequestHeadersWithInput(request, serviceJwt); 103 + upstreamBody = JSON.stringify(input); 104 + } else if (request.method === 'POST') { 105 + // stream the original body 106 + upstreamHeaders = buildProxyRequestHeaders(request, serviceJwt); 107 + upstreamBody = request.body; 108 + } else { 109 + upstreamHeaders = buildProxyRequestHeaders(request, serviceJwt); 110 + } 61 111 62 112 const upstreamRequest = new Request(upstreamUrl.toString(), { 63 113 method: request.method, 64 114 headers: upstreamHeaders, 65 - body: request.method === 'POST' ? request.body : null, 66 - duplex: request.method === 'POST' ? 'half' : undefined, 115 + body: upstreamBody, 116 + duplex: upstreamBody !== null ? 'half' : undefined, 67 117 }); 68 118 69 119 // forward request 70 120 const upstreamResponse = await fetch(upstreamRequest); 71 121 72 - upstreamResponse 73 - .clone() 74 - .text() 75 - .then((text) => { 76 - console.log(`${auth.did} -> ${upstreamUrl.toString()}`); 77 - console.log(text); 78 - }); 79 - 80 122 // build response with filtered headers 81 123 const responseHeaders = filterResponseHeaders(upstreamResponse.headers); 82 124 ··· 86 128 headers: responseHeaders, 87 129 }); 88 130 }; 131 + 132 + const handleNotFound: NotFoundHandler = async (request) => { 133 + const proxyHeader = request.headers.get('atproto-proxy'); 134 + if (!proxyHeader) { 135 + return defaultNotFoundHandler(request); 136 + } 137 + 138 + return proxyRequest(request); 139 + }; 140 + 141 + const passthrough: PassthroughFn = async (request, input) => { 142 + const proxyHeader = request.headers.get('atproto-proxy'); 143 + if (!proxyHeader) { 144 + // no proxy header - continue with local handler logic 145 + return; 146 + } 147 + 148 + // dev-only check: input should not be provided for GET requests 149 + if (import.meta.env?.DEV && input !== undefined && request.method === 'GET') { 150 + throw new Error(`passthrough: input provided for GET request`); 151 + } 152 + 153 + // parse NSID from request path 154 + const lxm = parseRequestNsid(request); 155 + 156 + // check if NSID is excluded for this target 157 + const target = await parseProxyHeader(targets, didDocumentResolver, proxyHeader, lxm); 158 + if (target === null) { 159 + // NSID is excluded - continue with local handler logic 160 + return; 161 + } 162 + 163 + const response = await proxyRequest(request, input); 164 + throw response; 165 + }; 166 + 167 + return { handleNotFound, passthrough }; 89 168 };
+47 -5
packages/danaus/src/proxy/utils.ts
··· 1 1 import { getAtprotoServiceEndpoint, isAtprotoAudience } from '@atcute/identity'; 2 + import type { DidDocumentResolver } from '@atcute/identity-resolver'; 2 3 import type { Did, Nsid } from '@atcute/lexicons'; 3 4 import { isNsid, type AtprotoDid } from '@atcute/lexicons/syntax'; 4 5 import { InvalidRequestError } from '@atcute/xrpc-server'; 5 6 6 - import type { AppContext } from '#app/context.ts'; 7 + import type { ProxyTargetConfig } from '#app/config.ts'; 7 8 8 9 export interface ProxyTarget { 9 10 did: Did; ··· 12 13 13 14 /** 14 15 * parse atproto-proxy header and resolve service endpoint. 15 - * @param ctx app context 16 + * @param targets proxy target configurations 17 + * @param didDocumentResolver DID document resolver 16 18 * @param header proxy header value (format: `did#serviceId`) 17 19 * @param nsid request NSID to check exclusions 18 20 * @returns resolved proxy target with DID and URL, or null if NSID is excluded 19 21 */ 20 22 export const parseProxyHeader = async ( 21 - ctx: AppContext, 23 + targets: Map<string, ProxyTargetConfig>, 24 + didDocumentResolver: DidDocumentResolver<string>, 22 25 header: string, 23 26 nsid: Nsid, 24 27 ): Promise<ProxyTarget | null> => { ··· 26 29 throw new InvalidRequestError({ description: `invalid atproto-proxy header` }); 27 30 } 28 31 29 - const targetConfig = ctx.config.proxy.targets.get(header); 32 + const targetConfig = targets.get(header); 30 33 31 34 // check if NSID is excluded for this target 32 35 if (targetConfig?.exclude?.includes(nsid)) { ··· 40 43 const did = audience.slice(0, hashIndex) as AtprotoDid; 41 44 const serviceId = audience.slice(hashIndex) as `#${string}`; 42 45 43 - const didDoc = await ctx.didDocumentResolver.resolve(did); 46 + const didDoc = await didDocumentResolver.resolve(did); 44 47 if (!didDoc) { 45 48 throw new InvalidRequestError({ description: `could not resolve proxy did` }); 46 49 } ··· 97 100 } 98 101 } 99 102 } 103 + 104 + // set service auth 105 + headers.set('authorization', `Bearer ${serviceJwt}`); 106 + 107 + return headers; 108 + }; 109 + 110 + /** 111 + * build headers for upstream proxy request when input body was already consumed. 112 + * sets content-type to application/json since the body will be reserialized. 113 + * @param req original request 114 + * @param serviceJwt service auth JWT 115 + * @returns headers for upstream request 116 + */ 117 + export const buildProxyRequestHeadersWithInput = (req: Request, serviceJwt: string): Headers => { 118 + const headers = new Headers(); 119 + 120 + // forward standard headers 121 + for (const name of REQUEST_HEADERS_TO_FORWARD) { 122 + const value = req.headers.get(name); 123 + if (value) { 124 + headers.set(name, value); 125 + } 126 + } 127 + 128 + // ensure accept-encoding has a value 129 + if (!headers.has('accept-encoding')) { 130 + headers.set('accept-encoding', 'identity'); 131 + } 132 + 133 + // forward all x-* headers 134 + for (const [name, value] of req.headers) { 135 + if (name.startsWith('x-')) { 136 + headers.set(name, value); 137 + } 138 + } 139 + 140 + // set content-type for reserialized JSON body 141 + headers.set('content-type', 'application/json'); 100 142 101 143 // set service auth 102 144 headers.set('authorization', `Bearer ${serviceJwt}`);