alf: the atproto Latency Fabric alf.fly.dev/
at main 1134 lines 38 kB view raw
1// ABOUTME: Express + XRPC server with write proxying (draft creation) and draft management endpoints 2 3import express from 'express'; 4import cors from 'cors'; 5import pinoHttp from 'pino-http'; 6import { readFileSync } from 'fs'; 7import path from 'path'; 8import { randomUUID } from 'node:crypto'; 9import { CID } from 'multiformats/cid'; 10import { sha256 } from 'multiformats/hashes/sha2'; 11import { LexiconDoc } from '@atproto/lexicon'; 12import * as xrpc from '@atproto/xrpc-server'; 13import { schemas as atprotoSchemas } from '@atproto/api'; 14import { TID } from '@atproto/common'; 15import { cidForRecord } from '@atproto/repo'; 16import type { ServiceConfig } from './config.js'; 17import { createLogger, rootLogger } from './logger.js'; 18import { verifyRequestAuth, extractPdsUrlFromToken, extractBearerToken, verifyDpopBoundToken } from './auth.js'; 19import { httpRequestsTotal, httpRequestDuration } from './metrics.js'; 20import { extractDidFromAtUri } from './schema.js'; 21import { createOAuthRouter } from './routes/oauth.js'; 22import { 23 createDraft, 24 getDraft, 25 getDraftRawRow, 26 getDraftByTriggerKeyHash, 27 listDrafts, 28 scheduleDraft, 29 updateDraft, 30 cancelDraft, 31 storeDraftBlob, 32 getUserAuthorization, 33 countActiveDraftsForUser, 34 deleteUserData, 35 createSchedule, 36 getSchedule, 37 getRawSchedule, 38 listSchedules, 39 updateSchedule, 40 updateScheduleNextDraft, 41 deleteSchedule, 42} from './storage.js'; 43import { publishDraft, notifyScheduler } from './scheduler.js'; 44import { encrypt, decrypt, hmac } from './encrypt.js'; 45import { computeNextOccurrence } from '@newpublic/recurrence'; 46import type { RecurrenceRule } from '@newpublic/recurrence'; 47 48const RAW_CODEC = 0x55; 49 50/** 51 * Compute a blob CID using ATProto's algorithm: CIDv1, raw codec (0x55), SHA-256 multihash. 52 * Returns base32-encoded string (bafkrei... prefix). 53 */ 54async function computeBlobCid(data: Uint8Array): Promise<string> { 55 const hash = await sha256.digest(data); 56 const cid = CID.createV1(RAW_CODEC, hash); 57 return cid.toString(); 58} 59 60/** 61 * Create a 'once' schedule and its first (only) draft, routing the draft through schedule machinery. 62 * Used by createRecord/putRecord when x-scheduled-at header is present. 63 */ 64async function createOnceDraftThroughSchedule(params: { 65 userDid: string; 66 collection: string; 67 rkey: string; 68 uri: string; 69 cid: string; 70 record: Record<string, unknown>; 71 scheduledAt: number; 72 action: 'create' | 'put'; 73 triggerKeyHash?: string; 74 triggerKeyEncrypted?: string; 75}): Promise<void> { 76 const scheduleId = randomUUID(); 77 const onceRule: RecurrenceRule = { 78 rule: { type: 'once', datetime: new Date(params.scheduledAt).toISOString() }, 79 }; 80 await createSchedule({ 81 id: scheduleId, 82 userDid: params.userDid, 83 collection: params.collection, 84 record: params.record, 85 contentUrl: null, 86 recurrenceRule: onceRule as unknown as Record<string, unknown>, 87 timezone: 'UTC', 88 }); 89 await createDraft({ 90 uri: params.uri, 91 userDid: params.userDid, 92 collection: params.collection, 93 rkey: params.rkey, 94 record: params.record, 95 recordCid: params.cid, 96 action: params.action, 97 scheduledAt: params.scheduledAt, 98 scheduleId, 99 triggerKeyHash: params.triggerKeyHash, 100 triggerKeyEncrypted: params.triggerKeyEncrypted, 101 }); 102 await updateScheduleNextDraft(scheduleId, params.uri); 103} 104 105const logger = createLogger('Server'); 106 107function loadLexicons(): LexiconDoc[] { 108 // Load town.roundabout.scheduledPosts lexicons from bundled JSON files 109 const lexiconDir = path.join(__dirname, '..', 'lexicons', 'town', 'roundabout', 'scheduledPosts'); 110 const bundledNames = [ 111 'defs', 112 'listPosts', 'getPost', 'schedulePost', 'publishPost', 'updatePost', 'deletePost', 113 'createSchedule', 'listSchedules', 'getSchedule', 'updateSchedule', 'deleteSchedule', 114 ]; 115 const bundled = bundledNames.map(name => 116 JSON.parse(readFileSync(path.join(lexiconDir, `${name}.json`), 'utf8')) as LexiconDoc, 117 ); 118 119 // Load standard ATProto write lexicons from @atproto/api 120 const atprotoIds = [ 121 'com.atproto.repo.createRecord', 122 'com.atproto.repo.putRecord', 123 'com.atproto.repo.deleteRecord', 124 ]; 125 const atproto = atprotoIds.map(id => { 126 const schema = atprotoSchemas.find(s => s.id === id); 127 /* istanbul ignore next */ 128 if (!schema) throw new Error(`Required lexicon not found: ${id}`); 129 return schema; 130 }); 131 132 return [...bundled, ...atproto]; 133} 134 135/** 136 * Compute an AT-URI from the user DID, collection, and rkey 137 */ 138function buildAtUri(did: string, collection: string, rkey: string): string { 139 return `at://${did}/${collection}/${rkey}`; 140} 141 142/** 143 * Extract PDS URL from a Bearer token, falling back to a PLC-resolved URL. 144 * For simplicity in dev, use the config's plcRoot to derive a default PDS URL. 145 */ 146function getPdsUrlFromToken(token: string, defaultPdsUrl: string): string { 147 return extractPdsUrlFromToken(token, defaultPdsUrl); 148} 149 150/** 151 * Build the one-time trigger URL from a plaintext key. 152 */ 153function buildTriggerUrl(serviceUrl: string, plainKey: string): string { 154 return `${serviceUrl}/triggers/${plainKey}`; 155} 156 157export function createServer(config: ServiceConfig): express.Application { 158 const app = express(); 159 160 app.use(express.json({ limit: '1mb' })); 161 app.use(cors({ origin: true })); 162 163 // Structured HTTP request logging (skip health checks to reduce noise) 164 app.use(pinoHttp({ 165 logger: rootLogger, 166 autoLogging: { 167 ignore: (req) => req.url === '/health' || req.url === '/xrpc/_health', 168 }, 169 })); 170 171 // HTTP metrics middleware 172 app.use((req, res, next) => { 173 const start = Date.now(); 174 res.on('finish', () => { 175 const endpoint = (req.route as { path?: string } | undefined)?.path ?? req.path ?? /* istanbul ignore next */ 'unknown'; 176 const duration = (Date.now() - start) / 1000; 177 httpRequestsTotal.inc({ method: req.method, endpoint, status_code: String(res.statusCode) }); 178 httpRequestDuration.observe({ method: req.method, endpoint }, duration); 179 }); 180 next(); 181 }); 182 183 // Health check 184 app.get('/xrpc/_health', (_req, res) => { 185 res.json({ version: '1.0.0', service: 'alf' }); 186 }); 187 188 app.get('/health', (_req, res) => { 189 res.json({ status: 'ok', service: 'alf' }); 190 }); 191 192 // OAuth routes 193 app.use('/oauth', createOAuthRouter(config)); 194 195 // ------------------------------------------------------------------------- 196 // Blob upload endpoint — stores raw image bytes for deferred upload at publish time 197 // POST /blob 198 // Auth: Bearer token 199 // Body: raw image bytes 200 // Content-Type: image/jpeg | image/png | etc. 201 // Returns: { cid, mimeType, size } 202 // ------------------------------------------------------------------------- 203 app.post('/blob', express.raw({ type: '*/*', limit: '10mb' }), async (req, res) => { 204 let user: { did: string }; 205 try { 206 user = await requireAuthFromRequest(req.headers.authorization, req.headers.dpop as string | undefined); 207 } catch (err) { 208 const msg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err); 209 res.status(401).json({ error: 'AuthRequired', message: msg }); 210 return; 211 } 212 try { 213 const data = req.body as Buffer; 214 if (!data || data.length === 0) { 215 res.status(400).json({ error: 'InvalidRequest', message: 'Request body is required' }); 216 return; 217 } 218 const mimeType = (req.headers['content-type'] || /* istanbul ignore next */ 'application/octet-stream').split(';')[0]?.trim() || /* istanbul ignore next */ 'application/octet-stream'; 219 const cid = await computeBlobCid(data); 220 await storeDraftBlob(user.did, cid, data, mimeType, data.length); 221 logger.info('Blob stored', { did: user.did, cid, size: data.length }); 222 res.json({ cid, mimeType, size: data.length }); 223 } catch (err) { 224 const msg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err); 225 logger.error('Blob upload failed', err instanceof Error ? err : /* istanbul ignore next */ undefined); 226 res.status(500).json({ error: 'InternalError', message: msg }); 227 } 228 }); 229 230 // ------------------------------------------------------------------------- 231 // OAuth status endpoint — returns whether the requesting user has an authorization 232 // GET /oauth/status 233 // Auth: Bearer token 234 // Returns: { authorized: boolean, authType: string | null } 235 // ------------------------------------------------------------------------- 236 app.get('/oauth/status', async (req, res) => { 237 try { 238 const user = await requireAuthFromRequest(req.headers.authorization, req.headers.dpop as string | undefined); 239 const auth = await getUserAuthorization(user.did); 240 res.json({ 241 authorized: !!auth, 242 authType: auth?.auth_type ?? null, 243 disableRecurring: config.disableRecurring, 244 }); 245 } catch (err) { 246 logger.warn('oauth/status auth failed', { error: err instanceof Error ? err.message : /* istanbul ignore next */ String(err) }); 247 res.json({ authorized: false, authType: null }); 248 } 249 }); 250 251 // DELETE /account — cancel all drafts and remove authorization for the authenticated user 252 app.delete('/account', async (req, res) => { 253 try { 254 const user = await requireAuthFromRequest(req.headers.authorization, req.headers.dpop as string | undefined); 255 await deleteUserData(user.did); 256 res.status(200).json({ deleted: true }); 257 } catch (err) { 258 logger.warn('delete account failed', { error: err instanceof Error ? err.message : /* istanbul ignore next */ String(err) }); 259 res.status(401).json({ error: 'Unauthorized' }); 260 } 261 }); 262 263 // ------------------------------------------------------------------------- 264 // Webhook trigger endpoint — no auth required (the URL is the secret) 265 // POST /triggers/:key 266 // Returns: { published: true, uri: "at://..." } or error 267 // ------------------------------------------------------------------------- 268 app.post('/triggers/:key', async (req, res) => { 269 const plainKey = req.params.key; 270 /* istanbul ignore next */ 271 if (!plainKey) { 272 res.status(400).json({ error: 'InvalidRequest', message: 'Trigger key is required' }); 273 return; 274 } 275 276 try { 277 // Compute HMAC of the incoming key to look up the draft 278 const keyHash = hmac(plainKey, config.encryptionKey); 279 const draftRow = await getDraftByTriggerKeyHash(keyHash); 280 281 if (!draftRow) { 282 res.status(404).json({ error: 'NotFound', message: 'Trigger key not found' }); 283 return; 284 } 285 286 // Check if already in terminal state 287 const terminalStatuses = ['published', 'failed', 'cancelled']; 288 if (terminalStatuses.includes(draftRow.status)) { 289 res.status(409).json({ error: 'TriggerAlreadyFired', message: 'This trigger has already been used or the draft is no longer active' }); 290 return; 291 } 292 293 // Publish the draft (same path as publishPost) 294 await publishDraft(draftRow.uri, config); 295 notifyScheduler(); 296 297 const published = await getDraft(draftRow.uri); 298 const actualStatus = published?.status ?? /* istanbul ignore next */ 'unknown'; 299 if (actualStatus === 'published') { 300 res.json({ published: true, uri: draftRow.uri }); 301 } else { 302 res.status(500).json({ 303 error: 'PublishFailed', 304 message: published?.failureReason ?? 'Draft failed to publish', 305 status: actualStatus, 306 uri: draftRow.uri, 307 }); 308 } 309 } catch (err) { 310 const msg = err instanceof Error ? err.message : /* istanbul ignore next */ String(err); 311 logger.error('Trigger endpoint error', err instanceof Error ? err : /* istanbul ignore next */ undefined); 312 res.status(500).json({ error: 'InternalError', message: msg }); 313 } 314 }); 315 316 // Load lexicons 317 const lexicons = loadLexicons(); 318 logger.info(`Loaded ${lexicons.length} lexicons`); 319 320 // Create XRPC server 321 const server = xrpc.createServer(lexicons, { 322 payload: { 323 jsonLimit: 1024 * 1024, // 1MB 324 textLimit: 100 * 1024, 325 blobLimit: 5 * 1024 * 1024, 326 }, 327 }); 328 329 // Default PDS URL for resolving user PDS (used as fallback for legacy tokens) 330 const defaultPdsUrl = 'http://localhost:2583'; 331 332 // Auth helper for raw Express handlers (throws plain Error) 333 async function requireAuthFromRequest( 334 authHeader: string | undefined, 335 dpopHeader?: string, 336 ): Promise<{ did: string }> { 337 const scheme = authHeader?.split(' ')[0]; 338 if (scheme === 'DPoP' && dpopHeader) { 339 const token = extractBearerToken(authHeader); 340 return await verifyDpopBoundToken(token, dpopHeader); 341 } 342 const token = extractBearerToken(authHeader); 343 const pdsUrl = getPdsUrlFromToken(token, defaultPdsUrl); 344 return await verifyRequestAuth(authHeader, pdsUrl); 345 } 346 347 // Auth helper: extracts and verifies the Bearer token, throws AuthRequiredError on failure 348 async function requireAuth(authHeader: string | undefined, dpopHeader?: string): Promise<{ did: string }> { 349 try { 350 return await requireAuthFromRequest(authHeader, dpopHeader); 351 } catch (err) { 352 /* istanbul ignore next */ 353 const msg = err instanceof Error ? err.message : String(err); 354 throw new xrpc.AuthRequiredError(msg); 355 } 356 } 357 358 // URI helper: extracts DID from an AT-URI, throws InvalidRequestError on invalid format 359 function parseDid(uri: string): string { 360 try { 361 return extractDidFromAtUri(uri); 362 } catch { 363 /* istanbul ignore next */ 364 throw new xrpc.InvalidRequestError('Invalid uri format'); 365 } 366 } 367 368 // ------------------------------------------------------------------------- 369 // Write Interface - all three endpoints always create drafts 370 // Supports x-trigger: webhook header to create a one-time trigger URL 371 // ------------------------------------------------------------------------- 372 373 server.method('com.atproto.repo.createRecord', async (ctx: xrpc.HandlerContext) => { 374 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 375 376 const body = ctx.input?.body as { 377 repo: string; 378 collection: string; 379 rkey?: string; 380 record: Record<string, unknown>; 381 }; 382 383 /* istanbul ignore next */ 384 if (!body?.collection || !body?.record) { 385 throw new xrpc.InvalidRequestError('collection and record are required'); 386 } 387 388 if (body.repo && body.repo !== user.did) { 389 throw new xrpc.InvalidRequestError('repo must match authenticated user'); 390 } 391 392 if (config.maxDraftsPerUser !== null) { 393 const activeCount = await countActiveDraftsForUser(user.did); 394 if (activeCount >= config.maxDraftsPerUser) { 395 throw new xrpc.InvalidRequestError( 396 `Draft limit reached: you may have at most ${config.maxDraftsPerUser} active drafts`, 397 'DraftLimitExceeded', 398 ); 399 } 400 } 401 402 const rkey = body.rkey ?? TID.nextStr(); 403 const uri = buildAtUri(user.did, body.collection, rkey); 404 405 // Compute CID deterministically 406 const cid = (await cidForRecord(body.record)).toString(); 407 408 // Check for scheduling header 409 const scheduledAtHeader = ctx.req.headers['x-scheduled-at'] as string | undefined; 410 const scheduledAt = scheduledAtHeader ? new Date(scheduledAtHeader).getTime() : undefined; 411 412 // Check for webhook trigger header 413 const triggerHeader = ctx.req.headers['x-trigger'] as string | undefined; 414 let triggerKeyHash: string | undefined; 415 let triggerKeyEncrypted: string | undefined; 416 let triggerUrl: string | undefined; 417 418 if (triggerHeader === 'webhook') { 419 const plainKey = randomUUID(); 420 triggerKeyHash = hmac(plainKey, config.encryptionKey); 421 triggerKeyEncrypted = encrypt(plainKey, config.encryptionKey); 422 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey); 423 } 424 425 try { 426 if (scheduledAt) { 427 await createOnceDraftThroughSchedule({ 428 userDid: user.did, 429 collection: body.collection, 430 rkey, 431 uri, 432 cid, 433 record: body.record, 434 scheduledAt, 435 action: 'create', 436 triggerKeyHash, 437 triggerKeyEncrypted, 438 }); 439 } else { 440 await createDraft({ 441 uri, 442 userDid: user.did, 443 collection: body.collection, 444 rkey, 445 record: body.record, 446 recordCid: cid, 447 action: 'create', 448 scheduledAt: undefined, 449 triggerKeyHash, 450 triggerKeyEncrypted, 451 }); 452 } 453 } catch (err) { 454 if ((err as { code?: string }).code === 'DuplicateDraft') { 455 throw new xrpc.InvalidRequestError((err as Error).message, 'DuplicateDraft'); 456 } 457 throw err; 458 } 459 460 if (scheduledAt) notifyScheduler(); 461 logger.info('createRecord draft created', { uri, collection: body.collection }); 462 463 return { 464 encoding: 'application/json', 465 body: { 466 uri, 467 cid, 468 validationStatus: 'unknown', 469 ...(triggerUrl ? { triggerUrl } : {}), 470 }, 471 }; 472 }); 473 474 server.method('com.atproto.repo.putRecord', async (ctx: xrpc.HandlerContext) => { 475 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 476 477 const body = ctx.input?.body as { 478 repo: string; 479 collection: string; 480 rkey: string; 481 record: Record<string, unknown>; 482 }; 483 484 /* istanbul ignore next */ 485 if (!body?.collection || !body?.rkey || !body?.record) { 486 throw new xrpc.InvalidRequestError('collection, rkey, and record are required'); 487 } 488 489 if (body.repo && body.repo !== user.did) { 490 throw new xrpc.InvalidRequestError('repo must match authenticated user'); 491 } 492 493 if (config.maxDraftsPerUser !== null) { 494 const activeCount = await countActiveDraftsForUser(user.did); 495 if (activeCount >= config.maxDraftsPerUser) { 496 throw new xrpc.InvalidRequestError( 497 `Draft limit reached: you may have at most ${config.maxDraftsPerUser} active drafts`, 498 'DraftLimitExceeded', 499 ); 500 } 501 } 502 503 const uri = buildAtUri(user.did, body.collection, body.rkey); 504 const cid = (await cidForRecord(body.record)).toString(); 505 506 const scheduledAtHeader = ctx.req.headers['x-scheduled-at'] as string | undefined; 507 const scheduledAt = scheduledAtHeader ? new Date(scheduledAtHeader).getTime() : undefined; 508 509 const triggerHeader = ctx.req.headers['x-trigger'] as string | undefined; 510 let triggerKeyHash: string | undefined; 511 let triggerKeyEncrypted: string | undefined; 512 let triggerUrl: string | undefined; 513 514 if (triggerHeader === 'webhook') { 515 const plainKey = randomUUID(); 516 triggerKeyHash = hmac(plainKey, config.encryptionKey); 517 triggerKeyEncrypted = encrypt(plainKey, config.encryptionKey); 518 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey); 519 } 520 521 try { 522 if (scheduledAt) { 523 await createOnceDraftThroughSchedule({ 524 userDid: user.did, 525 collection: body.collection, 526 rkey: body.rkey, 527 uri, 528 cid, 529 record: body.record, 530 scheduledAt, 531 action: 'put', 532 triggerKeyHash, 533 triggerKeyEncrypted, 534 }); 535 } else { 536 await createDraft({ 537 uri, 538 userDid: user.did, 539 collection: body.collection, 540 rkey: body.rkey, 541 record: body.record, 542 recordCid: cid, 543 action: 'put', 544 scheduledAt: undefined, 545 triggerKeyHash, 546 triggerKeyEncrypted, 547 }); 548 } 549 } catch (err) { 550 if ((err as { code?: string }).code === 'DuplicateDraft') { 551 throw new xrpc.InvalidRequestError((err as Error).message, 'DuplicateDraft'); 552 } 553 throw err; 554 } 555 556 if (scheduledAt) notifyScheduler(); 557 logger.info('putRecord draft created', { uri, collection: body.collection }); 558 559 return { 560 encoding: 'application/json', 561 body: { 562 uri, 563 cid, 564 validationStatus: 'unknown', 565 ...(triggerUrl ? { triggerUrl } : {}), 566 }, 567 }; 568 }); 569 570 server.method('com.atproto.repo.deleteRecord', async (ctx: xrpc.HandlerContext) => { 571 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 572 573 const body = ctx.input?.body as { 574 repo: string; 575 collection: string; 576 rkey: string; 577 }; 578 579 /* istanbul ignore next */ 580 if (!body?.collection || !body?.rkey) { 581 throw new xrpc.InvalidRequestError('collection and rkey are required'); 582 } 583 584 if (body.repo && body.repo !== user.did) { 585 throw new xrpc.InvalidRequestError('repo must match authenticated user'); 586 } 587 588 if (config.maxDraftsPerUser !== null) { 589 const activeCount = await countActiveDraftsForUser(user.did); 590 if (activeCount >= config.maxDraftsPerUser) { 591 throw new xrpc.InvalidRequestError( 592 `Draft limit reached: you may have at most ${config.maxDraftsPerUser} active drafts`, 593 'DraftLimitExceeded', 594 ); 595 } 596 } 597 598 const uri = buildAtUri(user.did, body.collection, body.rkey); 599 600 const scheduledAtHeader = ctx.req.headers['x-scheduled-at'] as string | undefined; 601 const scheduledAt = scheduledAtHeader ? new Date(scheduledAtHeader).getTime() : undefined; 602 603 const triggerHeader = ctx.req.headers['x-trigger'] as string | undefined; 604 let triggerKeyHash: string | undefined; 605 let triggerKeyEncrypted: string | undefined; 606 let triggerUrl: string | undefined; 607 608 if (triggerHeader === 'webhook') { 609 const plainKey = randomUUID(); 610 triggerKeyHash = hmac(plainKey, config.encryptionKey); 611 triggerKeyEncrypted = encrypt(plainKey, config.encryptionKey); 612 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey); 613 } 614 615 try { 616 await createDraft({ 617 uri, 618 userDid: user.did, 619 collection: body.collection, 620 rkey: body.rkey, 621 record: null, 622 recordCid: null, 623 action: 'delete', 624 scheduledAt, 625 triggerKeyHash, 626 triggerKeyEncrypted, 627 }); 628 } catch (err) { 629 if ((err as { code?: string }).code === 'DuplicateDraft') { 630 throw new xrpc.InvalidRequestError((err as Error).message, 'DuplicateDraft'); 631 } 632 throw err; 633 } 634 635 if (scheduledAt) notifyScheduler(); 636 logger.info('deleteRecord draft created', { uri, collection: body.collection }); 637 638 return { 639 encoding: 'application/json', 640 body: { 641 ...(triggerUrl ? { triggerUrl } : {}), 642 }, 643 }; 644 }); 645 646 // ------------------------------------------------------------------------- 647 // Draft Management Endpoints 648 // ------------------------------------------------------------------------- 649 650 server.method('town.roundabout.scheduledPosts.listPosts', async (ctx: xrpc.HandlerContext) => { 651 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 652 653 const params = ctx.params as { 654 repo: string; 655 status?: string; 656 limit?: number; 657 cursor?: string; 658 }; 659 660 // Users can only list their own drafts 661 if (params.repo !== user.did) { 662 throw new xrpc.AuthRequiredError('You can only list your own drafts'); 663 } 664 665 const result = await listDrafts({ 666 userDid: user.did, 667 status: params.status, 668 limit: Number(params.limit ?? /* istanbul ignore next */ 50), 669 cursor: params.cursor, 670 }); 671 672 // Decrypt trigger keys so clients can retrieve the webhook URL from the list 673 const posts = result.drafts.map(({ triggerKeyEncrypted, ...view }) => { 674 if (triggerKeyEncrypted) { 675 try { 676 const plainKey = decrypt(triggerKeyEncrypted, config.encryptionKey); 677 return { ...view, triggerUrl: buildTriggerUrl(config.serviceUrl, plainKey) }; 678 } catch { 679 // Decryption failure — omit triggerUrl for this draft 680 } 681 } 682 return view; 683 }); 684 685 return { 686 encoding: 'application/json', 687 body: { 688 posts, 689 cursor: result.cursor, 690 }, 691 }; 692 }); 693 694 server.method('town.roundabout.scheduledPosts.getPost', async (ctx: xrpc.HandlerContext) => { 695 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 696 697 const params = ctx.params as { uri: string }; 698 /* istanbul ignore next */ 699 if (!params.uri) { 700 throw new xrpc.InvalidRequestError('uri is required'); 701 } 702 703 const draft = await getDraft(params.uri); 704 if (!draft) { 705 throw new xrpc.InvalidRequestError('Draft not found', 'NotFound'); 706 } 707 708 // Verify the draft belongs to the requesting user 709 const draftDid = parseDid(params.uri); 710 if (draftDid !== user.did) { 711 throw new xrpc.AuthRequiredError('You can only view your own drafts'); 712 } 713 714 // Populate triggerUrl if the draft has a webhook trigger 715 const rawRow = await getDraftRawRow(params.uri); 716 let triggerUrl: string | undefined; 717 if (rawRow?.trigger_key_encrypted) { 718 try { 719 const plainKey = decrypt(rawRow.trigger_key_encrypted, config.encryptionKey); 720 triggerUrl = buildTriggerUrl(config.serviceUrl, plainKey); 721 } catch { 722 // Decryption failure — don't expose the URL, just omit it 723 logger.warn('Failed to decrypt trigger key for getPost', { uri: params.uri }); 724 } 725 } 726 727 return { 728 encoding: 'application/json', 729 body: { 730 ...draft, 731 ...(triggerUrl ? { triggerUrl } : {}), 732 }, 733 }; 734 }); 735 736 server.method('town.roundabout.scheduledPosts.schedulePost', async (ctx: xrpc.HandlerContext) => { 737 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 738 739 const body = ctx.input?.body as { uri: string; publishAt: string }; 740 /* istanbul ignore next */ 741 if (!body?.uri || !body?.publishAt) { 742 throw new xrpc.InvalidRequestError('uri and publishAt are required'); 743 } 744 745 // Verify ownership 746 const draftDid = parseDid(body.uri); 747 if (draftDid !== user.did) { 748 throw new xrpc.AuthRequiredError('You can only schedule your own drafts'); 749 } 750 751 const publishAt = new Date(body.publishAt).getTime(); 752 /* istanbul ignore next */ 753 if (isNaN(publishAt)) { 754 throw new xrpc.InvalidRequestError('publishAt must be a valid ISO 8601 datetime'); 755 } 756 757 const draft = await scheduleDraft(body.uri, publishAt); 758 if (!draft) { 759 throw new xrpc.InvalidRequestError('Draft not found or not in a schedulable state', 'NotFound'); 760 } 761 762 notifyScheduler(); 763 764 return { 765 encoding: 'application/json', 766 body: draft, 767 }; 768 }); 769 770 server.method('town.roundabout.scheduledPosts.publishPost', async (ctx: xrpc.HandlerContext) => { 771 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 772 773 const body = ctx.input?.body as { uri: string }; 774 /* istanbul ignore next */ 775 if (!body?.uri) { 776 throw new xrpc.InvalidRequestError('uri is required'); 777 } 778 779 const draftDid = parseDid(body.uri); 780 if (draftDid !== user.did) { 781 throw new xrpc.AuthRequiredError('You can only publish your own drafts'); 782 } 783 784 const draftBefore = await getDraft(body.uri); 785 if (!draftBefore) { 786 throw new xrpc.InvalidRequestError('Draft not found', 'NotFound'); 787 } 788 789 // Publish synchronously 790 await publishDraft(body.uri, config); 791 notifyScheduler(); 792 793 const draft = await getDraft(body.uri); 794 if (!draft) { 795 throw new xrpc.InvalidRequestError('Draft not found after publish', 'NotFound'); 796 } 797 798 return { 799 encoding: 'application/json', 800 body: draft, 801 }; 802 }); 803 804 server.method('town.roundabout.scheduledPosts.updatePost', async (ctx: xrpc.HandlerContext) => { 805 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 806 807 const body = ctx.input?.body as { 808 uri: string; 809 record?: Record<string, unknown>; 810 scheduledAt?: string; 811 }; 812 /* istanbul ignore next */ 813 if (!body?.uri) { 814 throw new xrpc.InvalidRequestError('uri is required'); 815 } 816 817 const draftDid = parseDid(body.uri); 818 if (draftDid !== user.did) { 819 throw new xrpc.AuthRequiredError('You can only update your own drafts'); 820 } 821 822 const updateParams: { 823 record?: Record<string, unknown>; 824 recordCid?: string; 825 scheduledAt?: number; 826 } = {}; 827 828 if (body.record !== undefined) { 829 updateParams.record = body.record; 830 updateParams.recordCid = (await cidForRecord(body.record)).toString(); 831 } 832 833 if (body.scheduledAt !== undefined) { 834 const scheduledAt = new Date(body.scheduledAt).getTime(); 835 /* istanbul ignore next */ 836 if (isNaN(scheduledAt)) { 837 throw new xrpc.InvalidRequestError('scheduledAt must be a valid ISO 8601 datetime'); 838 } 839 updateParams.scheduledAt = scheduledAt; 840 } 841 842 const draft = await updateDraft(body.uri, updateParams); 843 if (!draft) { 844 throw new xrpc.InvalidRequestError('Draft not found or not in an updatable state', 'NotFound'); 845 } 846 847 notifyScheduler(); 848 849 return { 850 encoding: 'application/json', 851 body: draft, 852 }; 853 }); 854 855 server.method('town.roundabout.scheduledPosts.deletePost', async (ctx: xrpc.HandlerContext) => { 856 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 857 858 const body = ctx.input?.body as { uri: string }; 859 /* istanbul ignore next */ 860 if (!body?.uri) { 861 throw new xrpc.InvalidRequestError('uri is required'); 862 } 863 864 const draftDid = parseDid(body.uri); 865 if (draftDid !== user.did) { 866 throw new xrpc.AuthRequiredError('You can only delete your own drafts'); 867 } 868 869 await cancelDraft(body.uri); 870 notifyScheduler(); 871 872 return { 873 encoding: 'application/json', 874 body: {}, 875 }; 876 }); 877 878 // ------------------------------------------------------------------------- 879 // Schedule Management Endpoints 880 // ------------------------------------------------------------------------- 881 882 server.method('town.roundabout.scheduledPosts.createSchedule', async (ctx: xrpc.HandlerContext) => { 883 if (config.disableRecurring) { 884 throw new xrpc.AuthRequiredError('Recurring schedules are disabled on this server'); 885 } 886 887 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 888 889 const body = ctx.input?.body as { 890 collection: string; 891 recurrenceRule: Record<string, unknown>; 892 timezone: string; 893 record?: Record<string, unknown>; 894 contentUrl?: string; 895 }; 896 897 /* istanbul ignore next */ 898 if (!body?.collection || !body?.recurrenceRule || !body?.timezone) { 899 throw new xrpc.InvalidRequestError('collection, recurrenceRule, and timezone are required'); 900 } 901 902 if (body.record && body.contentUrl) { 903 throw new xrpc.InvalidRequestError('record and contentUrl are mutually exclusive'); 904 } 905 906 // Validate the recurrence rule by computing the first occurrence 907 const rule = body.recurrenceRule as unknown as RecurrenceRule; 908 909 const nextFireAt = computeNextOccurrence(rule, new Date()); 910 if (!nextFireAt) { 911 throw new xrpc.InvalidRequestError('Recurrence rule produces no future occurrences'); 912 } 913 914 const scheduleId = randomUUID(); 915 await createSchedule({ 916 id: scheduleId, 917 userDid: user.did, 918 collection: body.collection, 919 record: body.record ?? null, 920 contentUrl: body.contentUrl ?? null, 921 recurrenceRule: body.recurrenceRule, 922 timezone: body.timezone, 923 }); 924 925 // Create the first draft 926 const rkey = `sched-${Date.now()}-${randomUUID().substring(0, 8)}`; 927 const uri = buildAtUri(user.did, body.collection, rkey); 928 const draftRecord = body.contentUrl ? null : (body.record ?? /* istanbul ignore next */ null); 929 930 await createDraft({ 931 uri, 932 userDid: user.did, 933 collection: body.collection, 934 rkey, 935 record: draftRecord, 936 recordCid: null, 937 action: 'create', 938 scheduledAt: nextFireAt.getTime(), 939 scheduleId, 940 }); 941 942 await updateScheduleNextDraft(scheduleId, uri); 943 notifyScheduler(); 944 945 const updatedSchedule = await getSchedule(scheduleId); 946 logger.info('Schedule created', { scheduleId, nextFireAt: nextFireAt.toISOString() }); 947 948 return { 949 encoding: 'application/json', 950 body: { schedule: updatedSchedule }, 951 }; 952 }); 953 954 server.method('town.roundabout.scheduledPosts.listSchedules', async (ctx: xrpc.HandlerContext) => { 955 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 956 957 const params = ctx.params as { 958 repo: string; 959 status?: string; 960 limit?: number; 961 cursor?: string; 962 }; 963 964 if (params.repo !== user.did) { 965 throw new xrpc.AuthRequiredError('You can only list your own schedules'); 966 } 967 968 const result = await listSchedules({ 969 userDid: user.did, 970 status: params.status, 971 limit: Number(params.limit ?? /* istanbul ignore next */ 50), 972 cursor: params.cursor, 973 }); 974 975 return { 976 encoding: 'application/json', 977 body: { 978 schedules: result.schedules, 979 cursor: result.cursor, 980 }, 981 }; 982 }); 983 984 server.method('town.roundabout.scheduledPosts.getSchedule', async (ctx: xrpc.HandlerContext) => { 985 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 986 987 const params = ctx.params as { id: string }; 988 /* istanbul ignore next */ 989 if (!params.id) { 990 throw new xrpc.InvalidRequestError('id is required'); 991 } 992 993 const schedule = await getSchedule(params.id); 994 if (!schedule) { 995 throw new xrpc.InvalidRequestError('Schedule not found', 'NotFound'); 996 } 997 998 // Verify ownership 999 const raw = await getRawSchedule(params.id); 1000 if (raw?.user_did !== user.did) { 1001 throw new xrpc.AuthRequiredError('You can only view your own schedules'); 1002 } 1003 1004 return { 1005 encoding: 'application/json', 1006 body: { schedule }, 1007 }; 1008 }); 1009 1010 server.method('town.roundabout.scheduledPosts.updateSchedule', async (ctx: xrpc.HandlerContext) => { 1011 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 1012 1013 const body = ctx.input?.body as { 1014 id: string; 1015 recurrenceRule?: Record<string, unknown>; 1016 timezone?: string; 1017 record?: Record<string, unknown> | null; 1018 contentUrl?: string | null; 1019 status?: 'active' | 'paused'; 1020 }; 1021 1022 /* istanbul ignore next */ 1023 if (!body?.id) { 1024 throw new xrpc.InvalidRequestError('id is required'); 1025 } 1026 1027 const raw = await getRawSchedule(body.id); 1028 if (!raw) { 1029 throw new xrpc.InvalidRequestError('Schedule not found', 'NotFound'); 1030 } 1031 if (raw.user_did !== user.did) { 1032 throw new xrpc.AuthRequiredError('You can only update your own schedules'); 1033 } 1034 1035 const updateParams: Parameters<typeof updateSchedule>[1] = {}; 1036 if ('record' in body) updateParams.record = body.record ?? /* istanbul ignore next */ null; 1037 if ('contentUrl' in body) updateParams.contentUrl = body.contentUrl ?? /* istanbul ignore next */ null; 1038 if (body.recurrenceRule !== undefined) updateParams.recurrenceRule = body.recurrenceRule; 1039 if (body.timezone !== undefined) updateParams.timezone = body.timezone; 1040 1041 // Handle pause/resume 1042 if (body.status === 'paused' && raw.status === 'active') { 1043 // Cancel the pending draft 1044 if (raw.next_draft_uri) { 1045 await cancelDraft(raw.next_draft_uri); 1046 } 1047 updateParams.status = 'paused'; 1048 await updateScheduleNextDraft(body.id, null); 1049 } else if (body.status === 'active' && raw.status === 'paused') { 1050 // Resume: create a new next draft 1051 const ruleJson = body.recurrenceRule ?? JSON.parse(raw.recurrence_rule) as Record<string, unknown>; 1052 const rule = ruleJson as unknown as RecurrenceRule; 1053 const nextFireAt = computeNextOccurrence(rule, new Date()); 1054 if (nextFireAt) { 1055 const rkey = `sched-${Date.now()}-${randomUUID().substring(0, 8)}`; 1056 const collection = raw.collection; 1057 const uri = buildAtUri(user.did, collection, rkey); 1058 const draftRecord = raw.content_url ? null : (raw.record ? JSON.parse(raw.record) as Record<string, unknown> : null); 1059 1060 await createDraft({ 1061 uri, 1062 userDid: user.did, 1063 collection, 1064 rkey, 1065 record: draftRecord, 1066 recordCid: null, 1067 action: 'create', 1068 scheduledAt: nextFireAt.getTime(), 1069 scheduleId: body.id, 1070 }); 1071 1072 await updateScheduleNextDraft(body.id, uri); 1073 notifyScheduler(); 1074 } 1075 updateParams.status = 'active'; 1076 } else if (body.status !== undefined) { 1077 updateParams.status = body.status; 1078 } 1079 1080 const schedule = await updateSchedule(body.id, updateParams); 1081 1082 return { 1083 encoding: 'application/json', 1084 body: { schedule }, 1085 }; 1086 }); 1087 1088 server.method('town.roundabout.scheduledPosts.deleteSchedule', async (ctx: xrpc.HandlerContext) => { 1089 const user = await requireAuth(ctx.req.headers.authorization, ctx.req.headers.dpop as string | undefined); 1090 1091 const body = ctx.input?.body as { id: string }; 1092 /* istanbul ignore next */ 1093 if (!body?.id) { 1094 throw new xrpc.InvalidRequestError('id is required'); 1095 } 1096 1097 const raw = await getRawSchedule(body.id); 1098 if (!raw) { 1099 throw new xrpc.InvalidRequestError('Schedule not found', 'NotFound'); 1100 } 1101 if (raw.user_did !== user.did) { 1102 throw new xrpc.AuthRequiredError('You can only delete your own schedules'); 1103 } 1104 1105 await deleteSchedule(body.id); 1106 notifyScheduler(); 1107 1108 return { 1109 encoding: 'application/json', 1110 body: {}, 1111 }; 1112 }); 1113 1114 // Mount XRPC router 1115 app.use(server.router); 1116 1117 // Error handler 1118 /* istanbul ignore next */ 1119 // eslint-disable-next-line @typescript-eslint/no-unused-vars 1120 app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => { 1121 const error = err as Error; 1122 logger.error('Express error handler caught', error); 1123 if (!res.headersSent) { 1124 res.status(500).json({ error: 'InternalServerError', message: 'Internal Server Error' }); 1125 } 1126 }); 1127 1128 // 404 handler 1129 app.use((_req, res) => { 1130 res.status(404).json({ error: 'Not found' }); 1131 }); 1132 1133 return app; 1134}