Sifa professional network API (Fastify, AT Protocol, Jetstream) sifa.id/
at main 134 lines 3.7 kB view raw
1import { TID } from '@atproto/common-web'; 2import { Agent } from '@atproto/api'; 3import type { OAuthSession } from '@atproto/oauth-client'; 4import type { FastifyReply } from 'fastify'; 5 6export function generateTid(): string { 7 return TID.next().toString(); 8} 9 10export type ApplyWritesCreate = { 11 $type: 'com.atproto.repo.applyWrites#create'; 12 collection: string; 13 rkey: string; 14 value: Record<string, unknown>; 15}; 16 17export type ApplyWritesUpdate = { 18 $type: 'com.atproto.repo.applyWrites#update'; 19 collection: string; 20 rkey: string; 21 value: Record<string, unknown>; 22}; 23 24export type ApplyWritesDelete = { 25 $type: 'com.atproto.repo.applyWrites#delete'; 26 collection: string; 27 rkey: string; 28}; 29 30export type ApplyWritesOp = ApplyWritesCreate | ApplyWritesUpdate | ApplyWritesDelete; 31 32export function buildApplyWritesOp( 33 action: 'create' | 'update' | 'delete', 34 collection: string, 35 rkey: string, 36 record?: Record<string, unknown>, 37): ApplyWritesOp { 38 if (action === 'delete') { 39 return { 40 $type: 'com.atproto.repo.applyWrites#delete' as const, 41 collection, 42 rkey, 43 }; 44 } 45 46 return { 47 $type: 48 action === 'create' 49 ? ('com.atproto.repo.applyWrites#create' as const) 50 : ('com.atproto.repo.applyWrites#update' as const), 51 collection, 52 rkey, 53 value: { $type: collection, ...record }, 54 }; 55} 56 57const MAX_RETRIES = 3; 58const BASE_DELAY_MS = 1000; 59 60function getRetryDelay(err: unknown, attempt: number): number { 61 if (err instanceof Error && 'headers' in err) { 62 const headers = (err as { headers?: Record<string, string> }).headers; 63 const retryAfter = headers?.['retry-after']; 64 if (retryAfter) { 65 const seconds = Number(retryAfter); 66 if (!Number.isNaN(seconds) && seconds > 0 && seconds <= 60) { 67 return seconds * 1000; 68 } 69 } 70 } 71 return BASE_DELAY_MS * 2 ** (attempt - 1); 72} 73 74function isPdsRateLimit(err: unknown): boolean { 75 return err instanceof Error && 'status' in err && (err as { status: number }).status === 429; 76} 77 78export async function writeToUserPds(session: OAuthSession, repo: string, writes: ApplyWritesOp[]) { 79 const agent = new Agent(session); 80 for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { 81 try { 82 return await agent.com.atproto.repo.applyWrites({ 83 repo, 84 validate: false, 85 writes, 86 }); 87 } catch (err) { 88 if (isPdsRateLimit(err) && attempt < MAX_RETRIES) { 89 const delay = getRetryDelay(err, attempt + 1); 90 await new Promise((resolve) => setTimeout(resolve, delay)); 91 continue; 92 } 93 throw err; 94 } 95 } 96 throw new Error('writeToUserPds: unreachable'); 97} 98 99export async function pdsRecordExists( 100 session: OAuthSession, 101 repo: string, 102 collection: string, 103 rkey: string, 104): Promise<boolean> { 105 const agent = new Agent(session); 106 try { 107 await agent.com.atproto.repo.getRecord({ repo, collection, rkey }); 108 return true; 109 } catch { 110 return false; 111 } 112} 113 114export function isPdsRecordNotFound(err: unknown): boolean { 115 return ( 116 err instanceof Error && 117 'status' in err && 118 (err as { status: number }).status === 400 && 119 'error' in err && 120 (err as { error: string }).error === 'InvalidRequest' && 121 err.message?.includes('Could not find record') 122 ); 123} 124 125export function handlePdsError(err: unknown, reply: FastifyReply): FastifyReply { 126 if (err instanceof Error && 'status' in err) { 127 const status = (err as unknown as { status: number }).status; 128 const error = 'error' in err ? (err as unknown as { error: string }).error : 'PdsError'; 129 return reply 130 .status(status >= 400 && status < 600 ? status : 502) 131 .send({ error, message: err.message ?? 'PDS request failed' }); 132 } 133 throw err; 134}