import { WebSocketClient, WebSocketClientOptions } from "@vinerima/wah"; import { JetstreamPostCreateCommitSchema, JetstreamPostCreateCommit } from "../types/message"; import { commitToPost } from "./wsToFeed"; import type { Post } from "../types/post"; import { Logger } from "./logger"; export interface JetstreamClientOptions { /** Single Jetstream service URL or array of URLs for failover. */ service: string | string[]; /** Query parameters appended to the WebSocket URL (e.g. `wantedCollections`). */ queryParams?: Record; /** Reconnection configuration. */ reconnect?: WebSocketClientOptions["reconnect"]; /** Heartbeat ping interval in milliseconds. Default: 10000. */ pingInterval?: number; /** Called for each new post received from the firehose. */ onPost?: (post: Post) => void | Promise; } /** * Creates a `@vinerima/wah` WebSocketClient pre-configured for the * Bluesky Jetstream firehose. It registers a schema-validated handler * for post creation events and converts them to `Post` objects. * * @param options - Jetstream connection and handler options. * @returns A configured and connected `WebSocketClient` instance. */ export function createJetstreamClient(options: JetstreamClientOptions): WebSocketClient { const client = new WebSocketClient({ service: options.service, queryParams: options.queryParams, reconnect: options.reconnect, pingInterval: options.pingInterval, logger: { custom: { debug: (msg, ctx) => Logger.debug(msg, ctx as object), info: (msg, ctx) => Logger.info(msg, ctx as object), warn: (msg, ctx) => Logger.warn(msg, ctx as object), error: (msg, ctx) => Logger.error(msg, ctx as object), }, }, }); client.handle(JetstreamPostCreateCommitSchema, async ctx => { const post = commitToPost(ctx.data); if (post && options.onPost) { await options.onPost(post); } }); client.connect(); return client; }