Sifa professional network API (Fastify, AT Protocol, Jetstream)
sifa.id/
1import { TID } from '@atproto/common-web';
2import { Agent } from '@atproto/api';
3import type { OAuthSession } from '@atproto/oauth-client';
4import type { FastifyReply } from 'fastify';
5
6export function generateTid(): string {
7 return TID.next().toString();
8}
9
10export type ApplyWritesCreate = {
11 $type: 'com.atproto.repo.applyWrites#create';
12 collection: string;
13 rkey: string;
14 value: Record<string, unknown>;
15};
16
17export type ApplyWritesUpdate = {
18 $type: 'com.atproto.repo.applyWrites#update';
19 collection: string;
20 rkey: string;
21 value: Record<string, unknown>;
22};
23
24export type ApplyWritesDelete = {
25 $type: 'com.atproto.repo.applyWrites#delete';
26 collection: string;
27 rkey: string;
28};
29
30export type ApplyWritesOp = ApplyWritesCreate | ApplyWritesUpdate | ApplyWritesDelete;
31
32export function buildApplyWritesOp(
33 action: 'create' | 'update' | 'delete',
34 collection: string,
35 rkey: string,
36 record?: Record<string, unknown>,
37): ApplyWritesOp {
38 if (action === 'delete') {
39 return {
40 $type: 'com.atproto.repo.applyWrites#delete' as const,
41 collection,
42 rkey,
43 };
44 }
45
46 return {
47 $type:
48 action === 'create'
49 ? ('com.atproto.repo.applyWrites#create' as const)
50 : ('com.atproto.repo.applyWrites#update' as const),
51 collection,
52 rkey,
53 value: { $type: collection, ...record },
54 };
55}
56
57const MAX_RETRIES = 3;
58const BASE_DELAY_MS = 1000;
59
60function getRetryDelay(err: unknown, attempt: number): number {
61 if (err instanceof Error && 'headers' in err) {
62 const headers = (err as { headers?: Record<string, string> }).headers;
63 const retryAfter = headers?.['retry-after'];
64 if (retryAfter) {
65 const seconds = Number(retryAfter);
66 if (!Number.isNaN(seconds) && seconds > 0 && seconds <= 60) {
67 return seconds * 1000;
68 }
69 }
70 }
71 return BASE_DELAY_MS * 2 ** (attempt - 1);
72}
73
74function isPdsRateLimit(err: unknown): boolean {
75 return err instanceof Error && 'status' in err && (err as { status: number }).status === 429;
76}
77
78export async function writeToUserPds(session: OAuthSession, repo: string, writes: ApplyWritesOp[]) {
79 const agent = new Agent(session);
80 for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
81 try {
82 return await agent.com.atproto.repo.applyWrites({
83 repo,
84 validate: false,
85 writes,
86 });
87 } catch (err) {
88 if (isPdsRateLimit(err) && attempt < MAX_RETRIES) {
89 const delay = getRetryDelay(err, attempt + 1);
90 await new Promise((resolve) => setTimeout(resolve, delay));
91 continue;
92 }
93 throw err;
94 }
95 }
96 throw new Error('writeToUserPds: unreachable');
97}
98
99export async function pdsRecordExists(
100 session: OAuthSession,
101 repo: string,
102 collection: string,
103 rkey: string,
104): Promise<boolean> {
105 const agent = new Agent(session);
106 try {
107 await agent.com.atproto.repo.getRecord({ repo, collection, rkey });
108 return true;
109 } catch {
110 return false;
111 }
112}
113
114export function isPdsRecordNotFound(err: unknown): boolean {
115 return (
116 err instanceof Error &&
117 'status' in err &&
118 (err as { status: number }).status === 400 &&
119 'error' in err &&
120 (err as { error: string }).error === 'InvalidRequest' &&
121 err.message?.includes('Could not find record')
122 );
123}
124
125export function handlePdsError(err: unknown, reply: FastifyReply): FastifyReply {
126 if (err instanceof Error && 'status' in err) {
127 const status = (err as unknown as { status: number }).status;
128 const error = 'error' in err ? (err as unknown as { error: string }).error : 'PdsError';
129 return reply
130 .status(status >= 400 && status < 600 ? status : 502)
131 .send({ error, message: err.message ?? 'PDS request failed' });
132 }
133 throw err;
134}