alf: the atproto Latency Fabric
alf.fly.dev/
1// ABOUTME: Express + XRPC server with write proxying (draft creation) and draft management endpoints
2
3import express from 'express';
4import cors from 'cors';
5import pinoHttp from 'pino-http';
6import { readFileSync } from 'fs';
7import path from 'path';
8import { randomUUID } from 'node:crypto';
9import { CID } from 'multiformats/cid';
10import { sha256 } from 'multiformats/hashes/sha2';
11import { LexiconDoc } from '@atproto/lexicon';
12import * as xrpc from '@atproto/xrpc-server';
13import { schemas as atprotoSchemas } from '@atproto/api';
14import { TID } from '@atproto/common';
15import { cidForRecord } from '@atproto/repo';
16import type { ServiceConfig } from './config.js';
17import { createLogger, rootLogger } from './logger.js';
18import { verifyRequestAuth, extractPdsUrlFromToken, extractBearerToken, verifyDpopBoundToken } from './auth.js';
19import { httpRequestsTotal, httpRequestDuration } from './metrics.js';
20import { extractDidFromAtUri } from './schema.js';
21import { createOAuthRouter } from './routes/oauth.js';
22import {
23 createDraft,
24 getDraft,
25 getDraftRawRow,
26 getDraftByTriggerKeyHash,
27 listDrafts,
28 scheduleDraft,
29 updateDraft,
30 cancelDraft,
31 storeDraftBlob,
32 getUserAuthorization,
33 countActiveDraftsForUser,
34 deleteUserData,
35 createSchedule,
36 getSchedule,
37 getRawSchedule,
38 listSchedules,
39 updateSchedule,
40 updateScheduleNextDraft,
41 deleteSchedule,
42} from './storage.js';
43import { publishDraft, notifyScheduler } from './scheduler.js';
44import { encrypt, decrypt, hmac } from './encrypt.js';
45import { computeNextOccurrence } from '@newpublic/recurrence';
46import type { RecurrenceRule } from '@newpublic/recurrence';
47
48const RAW_CODEC = 0x55;
49
50/**
51 * Compute a blob CID using ATProto's algorithm: CIDv1, raw codec (0x55), SHA-256 multihash.
52 * Returns base32-encoded string (bafkrei... prefix).
53 */
54async function computeBlobCid(data: Uint8Array): Promise<string> {
55 const hash = await sha256.digest(data);
56 const cid = CID.createV1(RAW_CODEC, hash);
57 return cid.toString();
58}
59
60/**
61 * Create a 'once' schedule and its first (only) draft, routing the draft through schedule machinery.
62 * Used by createRecord/putRecord when x-scheduled-at header is present.
63 */
64async function createOnceDraftThroughSchedule(params: {
65 userDid: string;
66 collection: string;
67 rkey: string;
68 uri: string;
69 cid: string;
70 record: Record<string, unknown>;
71 scheduledAt: number;
72 action: 'create' | 'put';
73 triggerKeyHash?: string;
74 triggerKeyEncrypted?: string;
75}): Promise<void> {
76 const scheduleId = randomUUID();
77 const onceRule: RecurrenceRule = {
78 rule: { type: 'once', datetime: new Date(params.scheduledAt).toISOString() },
79 };
80 await createSchedule({
81 id: scheduleId,
82 userDid: params.userDid,
83 collection: params.collection,
84 record: params.record,
85 contentUrl: null,
86 recurrenceRule: onceRule as unknown as Record<string, unknown>,
87 timezone: 'UTC',
88 });
89 await createDraft({
90 uri: params.uri,
91 userDid: params.userDid,
92 collection: params.collection,
93 rkey: params.rkey,
94 record: params.record,
95 recordCid: params.cid,
96 action: params.action,
97 scheduledAt: params.scheduledAt,
98 scheduleId,
99 triggerKeyHash: params.triggerKeyHash,
100 triggerKeyEncrypted: params.triggerKeyEncrypted,
101 });
102 await updateScheduleNextDraft(scheduleId, params.uri);
103}
104
105const logger = createLogger('Server');
106
107function loadLexicons(): LexiconDoc[] {
108 // Load town.roundabout.scheduledPosts lexicons from bundled JSON files
109 const lexiconDir = path.join(__dirname, '..', 'lexicons', 'town', 'roundabout', 'scheduledPosts');
110 const bundledNames = [
111 'defs',
112 'listPosts', 'getPost', 'schedulePost', 'publishPost', 'updatePost', 'deletePost',
113 'createSchedule', 'listSchedules', 'getSchedule', 'updateSchedule', 'deleteSchedule',
114 ];
115 const bundled = bundledNames.map(name =>
116 JSON.parse(readFileSync(path.join(lexiconDir, `${name}.json`), 'utf8')) as LexiconDoc,
117 );
118
119 // Load standard ATProto write lexicons from @atproto/api
120 const atprotoIds = [
121 'com.atproto.repo.createRecord',
122 'com.atproto.repo.putRecord',
123 'com.atproto.repo.deleteRecord',
124 ];
125 const atproto = atprotoIds.map(id => {
126 const schema = atprotoSchemas.find(s => s.id === id);
127 /* istanbul ignore next */
128 if (!schema) throw new Error(`Required lexicon not found: ${id}`);
129 return schema;
130 });
131
132 return [...bundled, ...atproto];
133}
134
135/**
136 * Compute an AT-URI from the user DID, collection, and rkey
137 */
138function buildAtUri(did: string, collection: string, rkey: string): string {
139 return `at://${did}/${collection}/${rkey}`;
140}
141
142/**
143 * Extract PDS URL from a Bearer token, falling back to a PLC-resolved URL.
144 * For simplicity in dev, use the config's plcRoot to derive a default PDS URL.
145 */
146function getPdsUrlFromToken(token: string, defaultPdsUrl: string): string {
147 return extractPdsUrlFromToken(token, defaultPdsUrl);
148}
149
150/**
151 * Build the one-time trigger URL from a plaintext key.
152 */
153function buildTriggerUrl(serviceUrl: string, plainKey: string): string {
154 return `${serviceUrl}/triggers/${plainKey}`;
155}
156
157export function createServer(config: ServiceConfig): express.Application {
158 const app = express();
159
160 app.use(express.json({ limit: '1mb' }));
161 app.use(cors({ origin: true }));
162
163 // Structured HTTP request logging (skip health checks to reduce noise)
164 app.use(pinoHttp({
165 logger: rootLogger,
166 autoLogging: {
167 ignore: (req) => req.url === '/health' || req.url === '/xrpc/_health',
168 },
169 }));
170
171 // HTTP metrics middleware
172 app.use((req, res, next) => {
173 const start = Date.now();
174 res.on('finish', () => {
175 const endpoint = (req.route as { path?: string } | undefined)?.path ?? req.path ?? /* istanbul ignore next */ 'unknown';
176 const duration = (Date.now() - start) / 1000;
177 httpRequestsTotal.inc({ method: req.method, endpoint, status_code: String(res.statusCode) });
178 httpRequestDuration.observe({ method: req.method, endpoint }, duration);
179 });
180 next();
181 });
182
183 // Health check
184 app.get('/xrpc/_health', (_req, res) => {
185 res.json({ version: '1.0.0', service: 'alf' });
186 });
187
188 app.get('/health', (_req, res) => {
189 res.json({ status: 'ok', service: 'alf' });
190 });
191
192 // OAuth routes
193 app.use('/oauth', createOAuthRouter(config));
194
195 // -------------------------------------------------------------------------
196 // Blob upload endpoint — stores raw image bytes for deferred upload at publish time
197 // POST /blob
198 // Auth: Bearer token
199 // Body: raw image bytes
200 // Content-Type: image/jpeg | image/png | etc.
201 // Returns: { cid, mimeType, size }
202 // -------------------------------------------------------------------------
203 app.post('/blob', express.raw({ type: '*/*', limit: '10mb' }), async (req, res) => {
204 let user: { did: string };
205 try {
206 user = await requireAuthFromRequest(req.headers.authorization, req.headers.dpop as string | undefined);
207 } catch (err) {
208 const msg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err);
209 res.status(401).json({ error: 'AuthRequired', message: msg });
210 return;
211 }
212 try {
213 const data = req.body as Buffer;
214 if (!data || data.length === 0) {
215 res.status(400).json({ error: 'InvalidRequest', message: 'Request body is required' });
216 return;
217 }
218 const mimeType = (req.headers['content-type'] || /* istanbul ignore next */ 'application/octet-stream').split(';')[0]?.trim() || /* istanbul ignore next */ 'application/octet-stream';
219 const cid = await computeBlobCid(data);
220 await storeDraftBlob(user.did, cid, data, mimeType, data.length);
221 logger.info('Blob stored', { did: user.did, cid, size: data.length });
222 res.json({ cid, mimeType, size: data.length });
223 } catch (err) {
224 const msg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err);
225 logger.error('Blob upload failed', err instanceof Error ? err : /* istanbul ignore next */ undefined);
226 res.status(500).json({ error: 'InternalError', message: msg });
227 }
228 });
229
230 // -------------------------------------------------------------------------
231 // OAuth status endpoint — returns whether the requesting user has an authorization
232 // GET /oauth/status
233 // Auth: Bearer token
234 // Returns: { authorized: boolean, authType: string | null }
235 // -------------------------------------------------------------------------
236 app.get('/oauth/status', async (req, res) => {
237 try {
238 const user = await requireAuthFromRequest(req.headers.authorization, req.headers.dpop as string | undefined);
239 const auth = await getUserAuthorization(user.did);
240 res.json({
241 authorized: !!auth,
242 authType: auth?.auth_type ?? null,
243 disableRecurring: config.disableRecurring,
244 });
245 } catch (err) {
246 logger.warn('oauth/status auth failed', { error: err instanceof Error ? err.message : /* istanbul ignore next */ String(err) });
247 res.json({ authorized: false, authType: null });
248 }
249 });
250
251 // DELETE /account — cancel all drafts and remove authorization for the authenticated user
252 app.delete('/account', async (req, res) => {
253 try {
254 const user = await requireAuthFromRequest(req.headers.authorization, req.headers.dpop as string | undefined);
255 await deleteUserData(user.did);
256 res.status(200).json({ deleted: true });
257 } catch (err) {
258 logger.warn('delete account failed', { error: err instanceof Error ? err.message : /* istanbul ignore next */ String(err) });
259 res.status(401).json({ error: 'Unauthorized' });
260 }
261 });
262
263 // -------------------------------------------------------------------------
264 // Webhook trigger endpoint — no auth required (the URL is the secret)
265 // POST /triggers/:key
266 // Returns: { published: true, uri: "at://..." } or error
267 // -------------------------------------------------------------------------
268 app.post('/triggers/:key', async (req, res) => {
269 const plainKey = req.params.key;
270 /* istanbul ignore next */
271 if (!plainKey) {
272 res.status(400).json({ error: 'InvalidRequest', message: 'Trigger key is required' });
273 return;
274 }
275
276 try {
277 // Compute HMAC of the incoming key to look up the draft
278 const keyHash = hmac(plainKey, config.encryptionKey);
279 const draftRow = await getDraftByTriggerKeyHash(keyHash);
280
281 if (!draftRow) {
282 res.status(404).json({ error: 'NotFound', message: 'Trigger key not found' });
283 return;
284 }
285
286 // Check if already in terminal state
287 const terminalStatuses = ['published', 'failed', 'cancelled'];
288 if (terminalStatuses.includes(draftRow.status)) {
289 res.status(409).json({ error: 'TriggerAlreadyFired', message: 'This trigger has already been used or the draft is no longer active' });
290 return;
291 }
292
293 // Publish the draft (same path as publishPost)
294 await publishDraft(draftRow.uri, config);
295 notifyScheduler();
296
297 const published = await getDraft(draftRow.uri);
298 const actualStatus = published?.status ?? /* istanbul ignore next */ 'unknown';
299 if (actualStatus === 'published') {
300 res.json({ published: true, uri: draftRow.uri });
301 } else {
302 res.status(500).json({
303 error: 'PublishFailed',
304 message: published?.failureReason ?? 'Draft failed to publish',
305 status: actualStatus,
306 uri: draftRow.uri,
307 });
308 }
309 } catch (err) {
310 const msg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err);
311 logger.error('Trigger endpoint error', err instanceof Error ? err : /* istanbul ignore next */ undefined);
312 res.status(500).json({ error: 'InternalError', message: msg });
313 }
314 });
315
316 // Load lexicons
317 const lexicons = loadLexicons();
318 logger.info(`Loaded ${lexicons.length} lexicons`);
319
320 // Create XRPC server
321 const server = xrpc.createServer(lexicons, {
322 payload: {
323 jsonLimit: 1024 * 1024, // 1MB
324 textLimit: 100 * 1024,
325 blobLimit: 5 * 1024 * 1024,
326 },
327 });
328
329 // Default PDS URL for resolving user PDS (used as fallback for legacy tokens)
330 const defaultPdsUrl = 'http://localhost:2583';
331
332 // Auth helper for raw Express handlers (throws plain Error)
333 async function requireAuthFromRequest(
334 authHeader: string | undefined,
335 dpopHeader?: string,
336 ): Promise<{ did: string }> {
337 const scheme = authHeader?.split(' ')[0];
338 if (scheme === 'DPoP' && dpopHeader) {
339 const token = extractBearerToken(authHeader);
340 return await verifyDpopBoundToken(token, dpopHeader);
341 }
342 const token = extractBearerToken(authHeader);
343 const pdsUrl = getPdsUrlFromToken(token, defaultPdsUrl);
344 return await verifyRequestAuth(authHeader, pdsUrl);
345 }
346
347 // Auth helper: extracts and verifies the Bearer token, throws AuthRequiredError on failure
348 async function requireAuth(authHeader: string | undefined, dpopHeader?: string): Promise<{ did: string }> {
349 try {
350 return await requireAuthFromRequest(authHeader, dpopHeader);
351 } catch (err) {
352 /* istanbul ignore next */
353 const msg = err instanceof Error ? err.message : String(err);
354 throw new xrpc.AuthRequiredError(msg);
355 }
356 }
357
358 // URI helper: extracts DID from an AT-URI, throws InvalidRequestError on invalid format
359 function parseDid(uri: string): string {
360 try {
361 return extractDidFromAtUri(uri);
362 } catch {
363 /* istanbul ignore next */
364 throw new xrpc.InvalidRequestError('Invalid uri format');
365 }
366 }
367
368 // -------------------------------------------------------------------------
369 // Write Interface - all three endpoints always create drafts
370 // Supports x-trigger: webhook header to create a one-time trigger URL
371 // -------------------------------------------------------------------------
372
373 server.method('com.atproto.repo.createRecord', async (ctx: xrpc.HandlerContext) => {
374 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
375
376 const body = ctx.input?.body as {
377 repo: string;
378 collection: string;
379 rkey?: string;
380 record: Record<string, unknown>;
381 };
382
383 /* istanbul ignore next */
384 if (!body?.collection || !body?.record) {
385 throw new xrpc.InvalidRequestError('collection and record are required');
386 }
387
388 if (body.repo && body.repo !== user.did) {
389 throw new xrpc.InvalidRequestError('repo must match authenticated user');
390 }
391
392 if (config.maxDraftsPerUser !== null) {
393 const activeCount = await countActiveDraftsForUser(user.did);
394 if (activeCount >= config.maxDraftsPerUser) {
395 throw new xrpc.InvalidRequestError(
396 `Draft limit reached: you may have at most ${config.maxDraftsPerUser} active drafts`,
397 'DraftLimitExceeded',
398 );
399 }
400 }
401
402 const rkey = body.rkey ?? TID.nextStr();
403 const uri = buildAtUri(user.did, body.collection, rkey);
404
405 // Compute CID deterministically
406 const cid = (await cidForRecord(body.record)).toString();
407
408 // Check for scheduling header
409 const scheduledAtHeader = ctx.req.headers['x-scheduled-at'] as string | undefined;
410 const scheduledAt = scheduledAtHeader ? new Date(scheduledAtHeader).getTime() : undefined;
411
412 // Check for webhook trigger header
413 const triggerHeader = ctx.req.headers['x-trigger'] as string | undefined;
414 let triggerKeyHash: string | undefined;
415 let triggerKeyEncrypted: string | undefined;
416 let triggerUrl: string | undefined;
417
418 if (triggerHeader === 'webhook') {
419 const plainKey = randomUUID();
420 triggerKeyHash = hmac(plainKey, config.encryptionKey);
421 triggerKeyEncrypted = encrypt(plainKey, config.encryptionKey);
422 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey);
423 }
424
425 try {
426 if (scheduledAt) {
427 await createOnceDraftThroughSchedule({
428 userDid: user.did,
429 collection: body.collection,
430 rkey,
431 uri,
432 cid,
433 record: body.record,
434 scheduledAt,
435 action: 'create',
436 triggerKeyHash,
437 triggerKeyEncrypted,
438 });
439 } else {
440 await createDraft({
441 uri,
442 userDid: user.did,
443 collection: body.collection,
444 rkey,
445 record: body.record,
446 recordCid: cid,
447 action: 'create',
448 scheduledAt: undefined,
449 triggerKeyHash,
450 triggerKeyEncrypted,
451 });
452 }
453 } catch (err) {
454 if ((err as { code?: string }).code === 'DuplicateDraft') {
455 throw new xrpc.InvalidRequestError((err as Error).message, 'DuplicateDraft');
456 }
457 throw err;
458 }
459
460 if (scheduledAt) notifyScheduler();
461 logger.info('createRecord draft created', { uri, collection: body.collection });
462
463 return {
464 encoding: 'application/json',
465 body: {
466 uri,
467 cid,
468 validationStatus: 'unknown',
469 ...(triggerUrl ? { triggerUrl } : {}),
470 },
471 };
472 });
473
474 server.method('com.atproto.repo.putRecord', async (ctx: xrpc.HandlerContext) => {
475 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
476
477 const body = ctx.input?.body as {
478 repo: string;
479 collection: string;
480 rkey: string;
481 record: Record<string, unknown>;
482 };
483
484 /* istanbul ignore next */
485 if (!body?.collection || !body?.rkey || !body?.record) {
486 throw new xrpc.InvalidRequestError('collection, rkey, and record are required');
487 }
488
489 if (body.repo && body.repo !== user.did) {
490 throw new xrpc.InvalidRequestError('repo must match authenticated user');
491 }
492
493 if (config.maxDraftsPerUser !== null) {
494 const activeCount = await countActiveDraftsForUser(user.did);
495 if (activeCount >= config.maxDraftsPerUser) {
496 throw new xrpc.InvalidRequestError(
497 `Draft limit reached: you may have at most ${config.maxDraftsPerUser} active drafts`,
498 'DraftLimitExceeded',
499 );
500 }
501 }
502
503 const uri = buildAtUri(user.did, body.collection, body.rkey);
504 const cid = (await cidForRecord(body.record)).toString();
505
506 const scheduledAtHeader = ctx.req.headers['x-scheduled-at'] as string | undefined;
507 const scheduledAt = scheduledAtHeader ? new Date(scheduledAtHeader).getTime() : undefined;
508
509 const triggerHeader = ctx.req.headers['x-trigger'] as string | undefined;
510 let triggerKeyHash: string | undefined;
511 let triggerKeyEncrypted: string | undefined;
512 let triggerUrl: string | undefined;
513
514 if (triggerHeader === 'webhook') {
515 const plainKey = randomUUID();
516 triggerKeyHash = hmac(plainKey, config.encryptionKey);
517 triggerKeyEncrypted = encrypt(plainKey, config.encryptionKey);
518 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey);
519 }
520
521 try {
522 if (scheduledAt) {
523 await createOnceDraftThroughSchedule({
524 userDid: user.did,
525 collection: body.collection,
526 rkey: body.rkey,
527 uri,
528 cid,
529 record: body.record,
530 scheduledAt,
531 action: 'put',
532 triggerKeyHash,
533 triggerKeyEncrypted,
534 });
535 } else {
536 await createDraft({
537 uri,
538 userDid: user.did,
539 collection: body.collection,
540 rkey: body.rkey,
541 record: body.record,
542 recordCid: cid,
543 action: 'put',
544 scheduledAt: undefined,
545 triggerKeyHash,
546 triggerKeyEncrypted,
547 });
548 }
549 } catch (err) {
550 if ((err as { code?: string }).code === 'DuplicateDraft') {
551 throw new xrpc.InvalidRequestError((err as Error).message, 'DuplicateDraft');
552 }
553 throw err;
554 }
555
556 if (scheduledAt) notifyScheduler();
557 logger.info('putRecord draft created', { uri, collection: body.collection });
558
559 return {
560 encoding: 'application/json',
561 body: {
562 uri,
563 cid,
564 validationStatus: 'unknown',
565 ...(triggerUrl ? { triggerUrl } : {}),
566 },
567 };
568 });
569
570 server.method('com.atproto.repo.deleteRecord', async (ctx: xrpc.HandlerContext) => {
571 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
572
573 const body = ctx.input?.body as {
574 repo: string;
575 collection: string;
576 rkey: string;
577 };
578
579 /* istanbul ignore next */
580 if (!body?.collection || !body?.rkey) {
581 throw new xrpc.InvalidRequestError('collection and rkey are required');
582 }
583
584 if (body.repo && body.repo !== user.did) {
585 throw new xrpc.InvalidRequestError('repo must match authenticated user');
586 }
587
588 if (config.maxDraftsPerUser !== null) {
589 const activeCount = await countActiveDraftsForUser(user.did);
590 if (activeCount >= config.maxDraftsPerUser) {
591 throw new xrpc.InvalidRequestError(
592 `Draft limit reached: you may have at most ${config.maxDraftsPerUser} active drafts`,
593 'DraftLimitExceeded',
594 );
595 }
596 }
597
598 const uri = buildAtUri(user.did, body.collection, body.rkey);
599
600 const scheduledAtHeader = ctx.req.headers['x-scheduled-at'] as string | undefined;
601 const scheduledAt = scheduledAtHeader ? new Date(scheduledAtHeader).getTime() : undefined;
602
603 const triggerHeader = ctx.req.headers['x-trigger'] as string | undefined;
604 let triggerKeyHash: string | undefined;
605 let triggerKeyEncrypted: string | undefined;
606 let triggerUrl: string | undefined;
607
608 if (triggerHeader === 'webhook') {
609 const plainKey = randomUUID();
610 triggerKeyHash = hmac(plainKey, config.encryptionKey);
611 triggerKeyEncrypted = encrypt(plainKey, config.encryptionKey);
612 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey);
613 }
614
615 try {
616 await createDraft({
617 uri,
618 userDid: user.did,
619 collection: body.collection,
620 rkey: body.rkey,
621 record: null,
622 recordCid: null,
623 action: 'delete',
624 scheduledAt,
625 triggerKeyHash,
626 triggerKeyEncrypted,
627 });
628 } catch (err) {
629 if ((err as { code?: string }).code === 'DuplicateDraft') {
630 throw new xrpc.InvalidRequestError((err as Error).message, 'DuplicateDraft');
631 }
632 throw err;
633 }
634
635 if (scheduledAt) notifyScheduler();
636 logger.info('deleteRecord draft created', { uri, collection: body.collection });
637
638 return {
639 encoding: 'application/json',
640 body: {
641 ...(triggerUrl ? { triggerUrl } : {}),
642 },
643 };
644 });
645
646 // -------------------------------------------------------------------------
647 // Draft Management Endpoints
648 // -------------------------------------------------------------------------
649
650 server.method('town.roundabout.scheduledPosts.listPosts', async (ctx: xrpc.HandlerContext) => {
651 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
652
653 const params = ctx.params as {
654 repo: string;
655 status?: string;
656 limit?: number;
657 cursor?: string;
658 };
659
660 // Users can only list their own drafts
661 if (params.repo !== user.did) {
662 throw new xrpc.AuthRequiredError('You can only list your own drafts');
663 }
664
665 const result = await listDrafts({
666 userDid: user.did,
667 status: params.status,
668 limit: Number(params.limit ?? /* istanbul ignore next */ 50),
669 cursor: params.cursor,
670 });
671
672 // Decrypt trigger keys so clients can retrieve the webhook URL from the list
673 const posts = result.drafts.map(({ triggerKeyEncrypted, ...view }) => {
674 if (triggerKeyEncrypted) {
675 try {
676 const plainKey = decrypt(triggerKeyEncrypted, config.encryptionKey);
677 return { ...view, triggerUrl: buildTriggerUrl(config.serviceUrl, plainKey) };
678 } catch {
679 // Decryption failure — omit triggerUrl for this draft
680 }
681 }
682 return view;
683 });
684
685 return {
686 encoding: 'application/json',
687 body: {
688 posts,
689 cursor: result.cursor,
690 },
691 };
692 });
693
694 server.method('town.roundabout.scheduledPosts.getPost', async (ctx: xrpc.HandlerContext) => {
695 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
696
697 const params = ctx.params as { uri: string };
698 /* istanbul ignore next */
699 if (!params.uri) {
700 throw new xrpc.InvalidRequestError('uri is required');
701 }
702
703 const draft = await getDraft(params.uri);
704 if (!draft) {
705 throw new xrpc.InvalidRequestError('Draft not found', 'NotFound');
706 }
707
708 // Verify the draft belongs to the requesting user
709 const draftDid = parseDid(params.uri);
710 if (draftDid !== user.did) {
711 throw new xrpc.AuthRequiredError('You can only view your own drafts');
712 }
713
714 // Populate triggerUrl if the draft has a webhook trigger
715 const rawRow = await getDraftRawRow(params.uri);
716 let triggerUrl: string | undefined;
717 if (rawRow?.trigger_key_encrypted) {
718 try {
719 const plainKey = decrypt(rawRow.trigger_key_encrypted, config.encryptionKey);
720 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey);
721 } catch {
722 // Decryption failure — don't expose the URL, just omit it
723 logger.warn('Failed to decrypt trigger key for getPost', { uri: params.uri });
724 }
725 }
726
727 return {
728 encoding: 'application/json',
729 body: {
730 ...draft,
731 ...(triggerUrl ? { triggerUrl } : {}),
732 },
733 };
734 });
735
736 server.method('town.roundabout.scheduledPosts.schedulePost', async (ctx: xrpc.HandlerContext) => {
737 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
738
739 const body = ctx.input?.body as { uri: string; publishAt: string };
740 /* istanbul ignore next */
741 if (!body?.uri || !body?.publishAt) {
742 throw new xrpc.InvalidRequestError('uri and publishAt are required');
743 }
744
745 // Verify ownership
746 const draftDid = parseDid(body.uri);
747 if (draftDid !== user.did) {
748 throw new xrpc.AuthRequiredError('You can only schedule your own drafts');
749 }
750
751 const publishAt = new Date(body.publishAt).getTime();
752 /* istanbul ignore next */
753 if (isNaN(publishAt)) {
754 throw new xrpc.InvalidRequestError('publishAt must be a valid ISO 8601 datetime');
755 }
756
757 const draft = await scheduleDraft(body.uri, publishAt);
758 if (!draft) {
759 throw new xrpc.InvalidRequestError('Draft not found or not in a schedulable state', 'NotFound');
760 }
761
762 notifyScheduler();
763
764 return {
765 encoding: 'application/json',
766 body: draft,
767 };
768 });
769
770 server.method('town.roundabout.scheduledPosts.publishPost', async (ctx: xrpc.HandlerContext) => {
771 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
772
773 const body = ctx.input?.body as { uri: string };
774 /* istanbul ignore next */
775 if (!body?.uri) {
776 throw new xrpc.InvalidRequestError('uri is required');
777 }
778
779 const draftDid = parseDid(body.uri);
780 if (draftDid !== user.did) {
781 throw new xrpc.AuthRequiredError('You can only publish your own drafts');
782 }
783
784 const draftBefore = await getDraft(body.uri);
785 if (!draftBefore) {
786 throw new xrpc.InvalidRequestError('Draft not found', 'NotFound');
787 }
788
789 // Publish synchronously
790 await publishDraft(body.uri, config);
791 notifyScheduler();
792
793 const draft = await getDraft(body.uri);
794 if (!draft) {
795 throw new xrpc.InvalidRequestError('Draft not found after publish', 'NotFound');
796 }
797
798 return {
799 encoding: 'application/json',
800 body: draft,
801 };
802 });
803
804 server.method('town.roundabout.scheduledPosts.updatePost', async (ctx: xrpc.HandlerContext) => {
805 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
806
807 const body = ctx.input?.body as {
808 uri: string;
809 record?: Record<string, unknown>;
810 scheduledAt?: string;
811 };
812 /* istanbul ignore next */
813 if (!body?.uri) {
814 throw new xrpc.InvalidRequestError('uri is required');
815 }
816
817 const draftDid = parseDid(body.uri);
818 if (draftDid !== user.did) {
819 throw new xrpc.AuthRequiredError('You can only update your own drafts');
820 }
821
822 const updateParams: {
823 record?: Record<string, unknown>;
824 recordCid?: string;
825 scheduledAt?: number;
826 } = {};
827
828 if (body.record !== undefined) {
829 updateParams.record = body.record;
830 updateParams.recordCid = (await cidForRecord(body.record)).toString();
831 }
832
833 if (body.scheduledAt !== undefined) {
834 const scheduledAt = new Date(body.scheduledAt).getTime();
835 /* istanbul ignore next */
836 if (isNaN(scheduledAt)) {
837 throw new xrpc.InvalidRequestError('scheduledAt must be a valid ISO 8601 datetime');
838 }
839 updateParams.scheduledAt = scheduledAt;
840 }
841
842 const draft = await updateDraft(body.uri, updateParams);
843 if (!draft) {
844 throw new xrpc.InvalidRequestError('Draft not found or not in an updatable state', 'NotFound');
845 }
846
847 notifyScheduler();
848
849 return {
850 encoding: 'application/json',
851 body: draft,
852 };
853 });
854
855 server.method('town.roundabout.scheduledPosts.deletePost', async (ctx: xrpc.HandlerContext) => {
856 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
857
858 const body = ctx.input?.body as { uri: string };
859 /* istanbul ignore next */
860 if (!body?.uri) {
861 throw new xrpc.InvalidRequestError('uri is required');
862 }
863
864 const draftDid = parseDid(body.uri);
865 if (draftDid !== user.did) {
866 throw new xrpc.AuthRequiredError('You can only delete your own drafts');
867 }
868
869 await cancelDraft(body.uri);
870 notifyScheduler();
871
872 return {
873 encoding: 'application/json',
874 body: {},
875 };
876 });
877
878 // -------------------------------------------------------------------------
879 // Schedule Management Endpoints
880 // -------------------------------------------------------------------------
881
882 server.method('town.roundabout.scheduledPosts.createSchedule', async (ctx: xrpc.HandlerContext) => {
883 if (config.disableRecurring) {
884 throw new xrpc.AuthRequiredError('Recurring schedules are disabled on this server');
885 }
886
887 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
888
889 const body = ctx.input?.body as {
890 collection: string;
891 recurrenceRule: Record<string, unknown>;
892 timezone: string;
893 record?: Record<string, unknown>;
894 contentUrl?: string;
895 };
896
897 /* istanbul ignore next */
898 if (!body?.collection || !body?.recurrenceRule || !body?.timezone) {
899 throw new xrpc.InvalidRequestError('collection, recurrenceRule, and timezone are required');
900 }
901
902 if (body.record && body.contentUrl) {
903 throw new xrpc.InvalidRequestError('record and contentUrl are mutually exclusive');
904 }
905
906 // Validate the recurrence rule by computing the first occurrence
907 const rule = body.recurrenceRule as unknown as RecurrenceRule;
908
909 const nextFireAt = computeNextOccurrence(rule, new Date());
910 if (!nextFireAt) {
911 throw new xrpc.InvalidRequestError('Recurrence rule produces no future occurrences');
912 }
913
914 const scheduleId = randomUUID();
915 await createSchedule({
916 id: scheduleId,
917 userDid: user.did,
918 collection: body.collection,
919 record: body.record ?? null,
920 contentUrl: body.contentUrl ?? null,
921 recurrenceRule: body.recurrenceRule,
922 timezone: body.timezone,
923 });
924
925 // Create the first draft
926 const rkey = `sched-${Date.now()}-${randomUUID().substring(0, 8)}`;
927 const uri = buildAtUri(user.did, body.collection, rkey);
928 const draftRecord = body.contentUrl ? null : (body.record ?? /* istanbul ignore next */ null);
929
930 await createDraft({
931 uri,
932 userDid: user.did,
933 collection: body.collection,
934 rkey,
935 record: draftRecord,
936 recordCid: null,
937 action: 'create',
938 scheduledAt: nextFireAt.getTime(),
939 scheduleId,
940 });
941
942 await updateScheduleNextDraft(scheduleId, uri);
943 notifyScheduler();
944
945 const updatedSchedule = await getSchedule(scheduleId);
946 logger.info('Schedule created', { scheduleId, nextFireAt: nextFireAt.toISOString() });
947
948 return {
949 encoding: 'application/json',
950 body: { schedule: updatedSchedule },
951 };
952 });
953
954 server.method('town.roundabout.scheduledPosts.listSchedules', async (ctx: xrpc.HandlerContext) => {
955 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
956
957 const params = ctx.params as {
958 repo: string;
959 status?: string;
960 limit?: number;
961 cursor?: string;
962 };
963
964 if (params.repo !== user.did) {
965 throw new xrpc.AuthRequiredError('You can only list your own schedules');
966 }
967
968 const result = await listSchedules({
969 userDid: user.did,
970 status: params.status,
971 limit: Number(params.limit ?? /* istanbul ignore next */ 50),
972 cursor: params.cursor,
973 });
974
975 return {
976 encoding: 'application/json',
977 body: {
978 schedules: result.schedules,
979 cursor: result.cursor,
980 },
981 };
982 });
983
984 server.method('town.roundabout.scheduledPosts.getSchedule', async (ctx: xrpc.HandlerContext) => {
985 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
986
987 const params = ctx.params as { id: string };
988 /* istanbul ignore next */
989 if (!params.id) {
990 throw new xrpc.InvalidRequestError('id is required');
991 }
992
993 const schedule = await getSchedule(params.id);
994 if (!schedule) {
995 throw new xrpc.InvalidRequestError('Schedule not found', 'NotFound');
996 }
997
998 // Verify ownership
999 const raw = await getRawSchedule(params.id);
1000 if (raw?.user_did !== user.did) {
1001 throw new xrpc.AuthRequiredError('You can only view your own schedules');
1002 }
1003
1004 return {
1005 encoding: 'application/json',
1006 body: { schedule },
1007 };
1008 });
1009
1010 server.method('town.roundabout.scheduledPosts.updateSchedule', async (ctx: xrpc.HandlerContext) => {
1011 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
1012
1013 const body = ctx.input?.body as {
1014 id: string;
1015 recurrenceRule?: Record<string, unknown>;
1016 timezone?: string;
1017 record?: Record<string, unknown> | null;
1018 contentUrl?: string | null;
1019 status?: 'active' | 'paused';
1020 };
1021
1022 /* istanbul ignore next */
1023 if (!body?.id) {
1024 throw new xrpc.InvalidRequestError('id is required');
1025 }
1026
1027 const raw = await getRawSchedule(body.id);
1028 if (!raw) {
1029 throw new xrpc.InvalidRequestError('Schedule not found', 'NotFound');
1030 }
1031 if (raw.user_did !== user.did) {
1032 throw new xrpc.AuthRequiredError('You can only update your own schedules');
1033 }
1034
1035 const updateParams: Parameters<typeof updateSchedule>[1] = {};
1036 if ('record' in body) updateParams.record = body.record ?? /* istanbul ignore next */ null;
1037 if ('contentUrl' in body) updateParams.contentUrl = body.contentUrl ?? /* istanbul ignore next */ null;
1038 if (body.recurrenceRule !== undefined) updateParams.recurrenceRule = body.recurrenceRule;
1039 if (body.timezone !== undefined) updateParams.timezone = body.timezone;
1040
1041 // Handle pause/resume
1042 if (body.status === 'paused' && raw.status === 'active') {
1043 // Cancel the pending draft
1044 if (raw.next_draft_uri) {
1045 await cancelDraft(raw.next_draft_uri);
1046 }
1047 updateParams.status = 'paused';
1048 await updateScheduleNextDraft(body.id, null);
1049 } else if (body.status === 'active' && raw.status === 'paused') {
1050 // Resume: create a new next draft
1051 const ruleJson = body.recurrenceRule ?? JSON.parse(raw.recurrence_rule) as Record<string, unknown>;
1052 const rule = ruleJson as unknown as RecurrenceRule;
1053 const nextFireAt = computeNextOccurrence(rule, new Date());
1054 if (nextFireAt) {
1055 const rkey = `sched-${Date.now()}-${randomUUID().substring(0, 8)}`;
1056 const collection = raw.collection;
1057 const uri = buildAtUri(user.did, collection, rkey);
1058 const draftRecord = raw.content_url ? null : (raw.record ? JSON.parse(raw.record) as Record<string, unknown> : null);
1059
1060 await createDraft({
1061 uri,
1062 userDid: user.did,
1063 collection,
1064 rkey,
1065 record: draftRecord,
1066 recordCid: null,
1067 action: 'create',
1068 scheduledAt: nextFireAt.getTime(),
1069 scheduleId: body.id,
1070 });
1071
1072 await updateScheduleNextDraft(body.id, uri);
1073 notifyScheduler();
1074 }
1075 updateParams.status = 'active';
1076 } else if (body.status !== undefined) {
1077 updateParams.status = body.status;
1078 }
1079
1080 const schedule = await updateSchedule(body.id, updateParams);
1081
1082 return {
1083 encoding: 'application/json',
1084 body: { schedule },
1085 };
1086 });
1087
1088 server.method('town.roundabout.scheduledPosts.deleteSchedule', async (ctx: xrpc.HandlerContext) => {
1089 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined);
1090
1091 const body = ctx.input?.body as { id: string };
1092 /* istanbul ignore next */
1093 if (!body?.id) {
1094 throw new xrpc.InvalidRequestError('id is required');
1095 }
1096
1097 const raw = await getRawSchedule(body.id);
1098 if (!raw) {
1099 throw new xrpc.InvalidRequestError('Schedule not found', 'NotFound');
1100 }
1101 if (raw.user_did !== user.did) {
1102 throw new xrpc.AuthRequiredError('You can only delete your own schedules');
1103 }
1104
1105 await deleteSchedule(body.id);
1106 notifyScheduler();
1107
1108 return {
1109 encoding: 'application/json',
1110 body: {},
1111 };
1112 });
1113
1114 // Mount XRPC router
1115 app.use(server.router);
1116
1117 // Error handler
1118 /* istanbul ignore next */
1119 // eslint-disable-next-line @typescript-eslint/no-unused-vars
1120 app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
1121 const error = err as Error;
1122 logger.error('Express error handler caught', error);
1123 if (!res.headersSent) {
1124 res.status(500).json({ error: 'InternalServerError', message: 'Internal Server Error' });
1125 }
1126 });
1127
1128 // 404 handler
1129 app.use((_req, res) => {
1130 res.status(404).json({ error: 'Not found' });
1131 });
1132
1133 return app;
1134}