import { is } from "@atcute/lexicons"; import { Jetstream } from "@skyware/jetstream"; import { AppFooodzReview } from "./lexicons"; import { db } from "./drizzle/db"; import { reviews } from "./drizzle/schema"; import { eq } from "drizzle-orm"; const jetstream = new Jetstream({ wantedCollections: ["app.fooodz.review"], }); jetstream.onCreate("app.fooodz.review", async (event) => { console.info(`new review ${event.commit.rkey}`); if (is(AppFooodzReview.mainSchema, event.commit.record)) { const { place, rating, review, createdAt } = event.commit.record; // should store the did and rkey as well await db.insert(reviews).values({ rkey: event.commit.rkey, authorDid: event.did, place, rating, review, createdAt, }); } }); jetstream.onUpdate("app.fooodz.review", async (event) => { if (is(AppFooodzReview.mainSchema, event.commit.record)) { const { place, rating, review, createdAt } = event.commit.record; await db .insert(reviews) .values({ rkey: event.commit.rkey, authorDid: event.did, place, rating, review, createdAt, }) .onConflictDoUpdate({ target: reviews.rkey, set: { place, rating, review, }, }); } }); jetstream.onDelete("app.fooodz.review", async (event) => { await db.delete(reviews).where(eq(reviews.rkey, event.commit.rkey)); console.log("deleted record", event.commit.rkey); }); jetstream.on("open", () => { console.log("jetstream ingester started"); }); export { jetstream };