my monorepo for atproto based applications
at main 195 lines 6.1 kB view raw
1import { Subscription } from "@atproto/xrpc-server"; 2import { cborToLexRecord, readCar } from "@atproto/repo"; 3import { BlobRef } from "@atproto/lexicon"; 4import { ids, lexicons } from "@my/lexicon/server/lexicons"; 5import { Record as PostRecord } from "@my/lexicon/server/types/app/bsky/feed/post"; 6import { Record as RepostRecord } from "@my/lexicon/server/types/app/bsky/feed/repost"; 7import { Record as LikeRecord } from "@my/lexicon/server/types/app/bsky/feed/like"; 8import { Record as FollowRecord } from "@my/lexicon/server/types/app/bsky/graph/follow"; 9import { 10 Commit, 11 OutputSchema as RepoEvent, 12 isCommit, 13} from "@my/lexicon/server/types/com/atproto/sync/subscribeRepos"; 14import { Database } from "@/db"; 15 16export abstract class FirehoseSubscriptionBase { 17 public sub: Subscription<RepoEvent>; 18 19 constructor( 20 public db: Database, 21 public service: string, 22 ) { 23 this.sub = new Subscription({ 24 service: service, 25 method: ids.ComAtprotoSyncSubscribeRepos, 26 getParams: () => this.getCursor(), 27 validate: (value: unknown) => { 28 try { 29 return lexicons.assertValidXrpcMessage<RepoEvent>( 30 ids.ComAtprotoSyncSubscribeRepos, 31 value, 32 ); 33 } catch (err) { 34 console.error("firehose: repo subscription skipped invalid message", err); 35 return undefined; 36 } 37 }, 38 }); 39 } 40 41 abstract handleEvent(evt: RepoEvent): Promise<void>; 42 43 async run(subscriptionReconnectDelay: number) { 44 try { 45 for await (const evt of this.sub) { 46 this.handleEvent(evt).catch((err) => { 47 console.error("firehose: repo subscription could not handle message", err); 48 }); 49 // update stored cursor every 20 events or so 50 if (isCommit(evt) && evt.seq % 20 === 0) { 51 await this.updateCursor(evt.seq); 52 } 53 } 54 } catch (err) { 55 console.error("repo subscription errored", err); 56 setTimeout(() => this.run(subscriptionReconnectDelay), subscriptionReconnectDelay); 57 } 58 } 59 60 async updateCursor(cursor: number) { 61 await this.db 62 .updateTable("sub_state") 63 .set({ cursor }) 64 .where("service", "=", this.service) 65 .execute(); 66 } 67 68 async getCursor(): Promise<{ cursor?: number }> { 69 const res = await this.db 70 .selectFrom("sub_state") 71 .selectAll() 72 .where("service", "=", this.service) 73 .executeTakeFirst(); 74 return res ? { cursor: res.cursor } : {}; 75 } 76} 77 78export const getOpsByType = async (evt: Commit): Promise<OperationsByType> => { 79 const car = await readCar(evt.blocks); 80 const opsByType: OperationsByType = { 81 posts: { creates: [], deletes: [] }, 82 reposts: { creates: [], deletes: [] }, 83 likes: { creates: [], deletes: [] }, 84 follows: { creates: [], deletes: [] }, 85 }; 86 87 for (const op of evt.ops) { 88 const uri = `at://${evt.repo}/${op.path}`; 89 const [collection] = op.path.split("/"); 90 91 if (op.action === "update") continue; // updates not supported yet 92 93 if (op.action === "create") { 94 if (!op.cid) continue; 95 const recordBytes = car.blocks.get(op.cid); 96 if (!recordBytes) continue; 97 const record = cborToLexRecord(recordBytes); 98 const create = { uri, cid: op.cid.toString(), author: evt.repo }; 99 if (collection === ids.AppBskyFeedPost && isPost(record)) { 100 opsByType.posts.creates.push({ record, ...create }); 101 } else if (collection === ids.AppBskyFeedRepost && isRepost(record)) { 102 opsByType.reposts.creates.push({ record, ...create }); 103 } else if (collection === ids.AppBskyFeedLike && isLike(record)) { 104 opsByType.likes.creates.push({ record, ...create }); 105 } else if (collection === ids.AppBskyGraphFollow && isFollow(record)) { 106 opsByType.follows.creates.push({ record, ...create }); 107 } 108 } 109 110 if (op.action === "delete") { 111 if (collection === ids.AppBskyFeedPost) { 112 opsByType.posts.deletes.push({ uri }); 113 } else if (collection === ids.AppBskyFeedRepost) { 114 opsByType.reposts.deletes.push({ uri }); 115 } else if (collection === ids.AppBskyFeedLike) { 116 opsByType.likes.deletes.push({ uri }); 117 } else if (collection === ids.AppBskyGraphFollow) { 118 opsByType.follows.deletes.push({ uri }); 119 } 120 } 121 } 122 123 return opsByType; 124}; 125 126export type OperationsByType = { 127 posts: Operations<PostRecord>; 128 reposts: Operations<RepostRecord>; 129 likes: Operations<LikeRecord>; 130 follows: Operations<FollowRecord>; 131}; 132 133export type Operations<T = Record<string, unknown>> = { 134 creates: CreateOp<T>[]; 135 deletes: DeleteOp[]; 136}; 137 138export type CreateOp<T> = { 139 uri: string; 140 cid: string; 141 author: string; 142 record: T; 143}; 144 145export type DeleteOp = { 146 uri: string; 147}; 148 149export const isPost = (obj: unknown): obj is PostRecord => { 150 return isType(obj, ids.AppBskyFeedPost); 151}; 152 153export const isRepost = (obj: unknown): obj is RepostRecord => { 154 return isType(obj, ids.AppBskyFeedRepost); 155}; 156 157export const isLike = (obj: unknown): obj is LikeRecord => { 158 return isType(obj, ids.AppBskyFeedLike); 159}; 160 161export const isFollow = (obj: unknown): obj is FollowRecord => { 162 return isType(obj, ids.AppBskyGraphFollow); 163}; 164 165const isType = (obj: unknown, nsid: string) => { 166 try { 167 lexicons.assertValidRecord(nsid, fixBlobRefs(obj)); 168 return true; 169 } catch (err) { 170 return false; 171 } 172}; 173 174// @TODO right now record validation fails on BlobRefs 175// simply because multiple packages have their own copy 176// of the BlobRef class, causing instanceof checks to fail. 177// This is a temporary solution. 178const fixBlobRefs = (obj: unknown): unknown => { 179 if (Array.isArray(obj)) { 180 return obj.map(fixBlobRefs); 181 } 182 if (obj && typeof obj === "object") { 183 if (obj.constructor.name === "BlobRef") { 184 const blob = obj as BlobRef; 185 return new BlobRef(blob.ref, blob.mimeType, blob.size, blob.original); 186 } 187 return Object.entries(obj).reduce( 188 (acc, [key, val]) => { 189 return Object.assign(acc, { [key]: fixBlobRefs(val) }); 190 }, 191 {} as Record<string, unknown>, 192 ); 193 } 194 return obj; 195};