my monorepo for atproto based applications
1import { Subscription } from "@atproto/xrpc-server";
2import { cborToLexRecord, readCar } from "@atproto/repo";
3import { BlobRef } from "@atproto/lexicon";
4import { ids, lexicons } from "@my/lexicon/server/lexicons";
5import { Record as PostRecord } from "@my/lexicon/server/types/app/bsky/feed/post";
6import { Record as RepostRecord } from "@my/lexicon/server/types/app/bsky/feed/repost";
7import { Record as LikeRecord } from "@my/lexicon/server/types/app/bsky/feed/like";
8import { Record as FollowRecord } from "@my/lexicon/server/types/app/bsky/graph/follow";
9import {
10 Commit,
11 OutputSchema as RepoEvent,
12 isCommit,
13} from "@my/lexicon/server/types/com/atproto/sync/subscribeRepos";
14import { Database } from "@/db";
15
16export abstract class FirehoseSubscriptionBase {
17 public sub: Subscription<RepoEvent>;
18
19 constructor(
20 public db: Database,
21 public service: string,
22 ) {
23 this.sub = new Subscription({
24 service: service,
25 method: ids.ComAtprotoSyncSubscribeRepos,
26 getParams: () => this.getCursor(),
27 validate: (value: unknown) => {
28 try {
29 return lexicons.assertValidXrpcMessage<RepoEvent>(
30 ids.ComAtprotoSyncSubscribeRepos,
31 value,
32 );
33 } catch (err) {
34 console.error("firehose: repo subscription skipped invalid message", err);
35 return undefined;
36 }
37 },
38 });
39 }
40
41 abstract handleEvent(evt: RepoEvent): Promise<void>;
42
43 async run(subscriptionReconnectDelay: number) {
44 try {
45 for await (const evt of this.sub) {
46 this.handleEvent(evt).catch((err) => {
47 console.error("firehose: repo subscription could not handle message", err);
48 });
49 // update stored cursor every 20 events or so
50 if (isCommit(evt) && evt.seq % 20 === 0) {
51 await this.updateCursor(evt.seq);
52 }
53 }
54 } catch (err) {
55 console.error("repo subscription errored", err);
56 setTimeout(() => this.run(subscriptionReconnectDelay), subscriptionReconnectDelay);
57 }
58 }
59
60 async updateCursor(cursor: number) {
61 await this.db
62 .updateTable("sub_state")
63 .set({ cursor })
64 .where("service", "=", this.service)
65 .execute();
66 }
67
68 async getCursor(): Promise<{ cursor?: number }> {
69 const res = await this.db
70 .selectFrom("sub_state")
71 .selectAll()
72 .where("service", "=", this.service)
73 .executeTakeFirst();
74 return res ? { cursor: res.cursor } : {};
75 }
76}
77
78export const getOpsByType = async (evt: Commit): Promise<OperationsByType> => {
79 const car = await readCar(evt.blocks);
80 const opsByType: OperationsByType = {
81 posts: { creates: [], deletes: [] },
82 reposts: { creates: [], deletes: [] },
83 likes: { creates: [], deletes: [] },
84 follows: { creates: [], deletes: [] },
85 };
86
87 for (const op of evt.ops) {
88 const uri = `at://${evt.repo}/${op.path}`;
89 const [collection] = op.path.split("/");
90
91 if (op.action === "update") continue; // updates not supported yet
92
93 if (op.action === "create") {
94 if (!op.cid) continue;
95 const recordBytes = car.blocks.get(op.cid);
96 if (!recordBytes) continue;
97 const record = cborToLexRecord(recordBytes);
98 const create = { uri, cid: op.cid.toString(), author: evt.repo };
99 if (collection === ids.AppBskyFeedPost && isPost(record)) {
100 opsByType.posts.creates.push({ record, ...create });
101 } else if (collection === ids.AppBskyFeedRepost && isRepost(record)) {
102 opsByType.reposts.creates.push({ record, ...create });
103 } else if (collection === ids.AppBskyFeedLike && isLike(record)) {
104 opsByType.likes.creates.push({ record, ...create });
105 } else if (collection === ids.AppBskyGraphFollow && isFollow(record)) {
106 opsByType.follows.creates.push({ record, ...create });
107 }
108 }
109
110 if (op.action === "delete") {
111 if (collection === ids.AppBskyFeedPost) {
112 opsByType.posts.deletes.push({ uri });
113 } else if (collection === ids.AppBskyFeedRepost) {
114 opsByType.reposts.deletes.push({ uri });
115 } else if (collection === ids.AppBskyFeedLike) {
116 opsByType.likes.deletes.push({ uri });
117 } else if (collection === ids.AppBskyGraphFollow) {
118 opsByType.follows.deletes.push({ uri });
119 }
120 }
121 }
122
123 return opsByType;
124};
125
126export type OperationsByType = {
127 posts: Operations<PostRecord>;
128 reposts: Operations<RepostRecord>;
129 likes: Operations<LikeRecord>;
130 follows: Operations<FollowRecord>;
131};
132
133export type Operations<T = Record<string, unknown>> = {
134 creates: CreateOp<T>[];
135 deletes: DeleteOp[];
136};
137
138export type CreateOp<T> = {
139 uri: string;
140 cid: string;
141 author: string;
142 record: T;
143};
144
145export type DeleteOp = {
146 uri: string;
147};
148
149export const isPost = (obj: unknown): obj is PostRecord => {
150 return isType(obj, ids.AppBskyFeedPost);
151};
152
153export const isRepost = (obj: unknown): obj is RepostRecord => {
154 return isType(obj, ids.AppBskyFeedRepost);
155};
156
157export const isLike = (obj: unknown): obj is LikeRecord => {
158 return isType(obj, ids.AppBskyFeedLike);
159};
160
161export const isFollow = (obj: unknown): obj is FollowRecord => {
162 return isType(obj, ids.AppBskyGraphFollow);
163};
164
165const isType = (obj: unknown, nsid: string) => {
166 try {
167 lexicons.assertValidRecord(nsid, fixBlobRefs(obj));
168 return true;
169 } catch (err) {
170 return false;
171 }
172};
173
174// @TODO right now record validation fails on BlobRefs
175// simply because multiple packages have their own copy
176// of the BlobRef class, causing instanceof checks to fail.
177// This is a temporary solution.
178const fixBlobRefs = (obj: unknown): unknown => {
179 if (Array.isArray(obj)) {
180 return obj.map(fixBlobRefs);
181 }
182 if (obj && typeof obj === "object") {
183 if (obj.constructor.name === "BlobRef") {
184 const blob = obj as BlobRef;
185 return new BlobRef(blob.ref, blob.mimeType, blob.size, blob.original);
186 }
187 return Object.entries(obj).reduce(
188 (acc, [key, val]) => {
189 return Object.assign(acc, { [key]: fixBlobRefs(val) });
190 },
191 {} as Record<string, unknown>,
192 );
193 }
194 return obj;
195};