import { JetstreamSubscription } from "@atcute/jetstream"; import { is, parse, parseCanonicalResourceUri, type RecordKey, } from "@atcute/lexicons"; import { SystemsGazeBarometerService, SystemsGazeBarometerCheck, } from "barometer-lexicon"; import { config } from "./config"; import store, { type Service } from "./store"; import { expect, getRecord, getUri, log, type ServiceUri } from "./utils"; const subscription = new JetstreamSubscription({ url: "wss://jetstream2.us-east.bsky.network", wantedCollections: [ "systems.gaze.barometer.service", "systems.gaze.barometer.check", "systems.gaze.barometer.state", ], wantedDids: [config.repoDid], }); const handleService = async ( record: Record, rkey: RecordKey, ) => { const collection = "systems.gaze.barometer.service"; const serviceRecord = parse(SystemsGazeBarometerService.mainSchema, record); // we dont care if its a dangling service if (!serviceRecord.hostedBy) return true; const hostAtUri = expect(parseCanonicalResourceUri(serviceRecord.hostedBy)); // not our host if (hostAtUri.rkey !== store.hostname) return true; const serviceUri = getUri(collection, rkey); const service: Service = store.services.get(serviceUri) ?? { record: serviceRecord, checks: new Set(), rkey, }; store.services.set(serviceUri, { ...service, record: serviceRecord, }); return false; }; const handleCheck = async ( record: Record, rkey: RecordKey, ) => { const collection = "systems.gaze.barometer.check"; const checkRecord = parse(SystemsGazeBarometerCheck.mainSchema, record); const checkUri = getUri(collection, rkey); const serviceUri = checkRecord.forService as ServiceUri; const maybeService = await store.getOrFetch(serviceUri); if (!maybeService.ok) { log.error( `can't fetch service record (${serviceUri}) for check record (${checkUri})`, ); return true; } const service = maybeService.value; service.checks.add(rkey); store.checks.set(checkUri, { record: checkRecord, rkey, }); store.services.set(serviceUri, service); return false; }; export const handleEvents = async () => { for await (const event of subscription) { if (event.kind !== "commit") continue; const { operation, collection, rkey } = event.commit; // log.info(`${operation} at://${event.did}/${collection}/${rkey}`); if (operation === "create" || operation === "update") { const record = event.commit.record; switch (collection) { case "systems.gaze.barometer.service": { if (await handleService(record, rkey)) continue; break; } case "systems.gaze.barometer.check": { if (await handleCheck(record, rkey)) continue; break; } } } else { switch (collection) { case "systems.gaze.barometer.service": { const serviceUri = getUri(collection, rkey); const service = store.services.get(serviceUri); if (!service) continue; for (const checkRkey of service.checks) { store.checks.delete( getUri("systems.gaze.barometer.check", checkRkey), ); } store.services.delete(serviceUri); break; } case "systems.gaze.barometer.check": { const checkUri = getUri(collection, rkey); const check = store.checks.get(checkUri); if (!check) continue; const serviceUri = check.record.forService as ServiceUri; const service = store.services.get(serviceUri); if (service) { service.checks.delete(rkey); store.services.set(serviceUri, service); } store.checks.delete(checkUri); break; } case "systems.gaze.barometer.state": { store.states.delete(getUri(collection, rkey)); break; } } } } };