a collection of lightweight TypeScript packages for AT Protocol, the protocol powering Bluesky
atproto bluesky typescript npm
README.md

@atcute/firehose#

lightweight XRPC subscription client for AT Protocol.

npm install @atcute/firehose

this package provides a generic client for XRPC subscriptions - the WebSocket-based streaming protocol used by AT Protocol. it handles CBOR frame decoding, automatic reconnection, and optional schema validation.

for consuming the Bluesky network firehose specifically, consider using @atcute/jetstream instead, which provides a simpler JSON-based interface.

usage#

subscribing to the relay firehose#

import { FirehoseSubscription } from '@atcute/firehose';
import { ComAtprotoSyncSubscribeRepos } from '@atcute/atproto';

const subscription = new FirehoseSubscription({
	service: 'wss://bsky.network',
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
});

for await (const message of subscription) {
	console.log(message.$type, message.seq);
}

the connection opens when you start iterating and closes when you break out of the loop. the underlying WebSocket automatically reconnects on disconnection.

handling message types#

messages include a $type field indicating their type:

for await (const message of subscription) {
	switch (message.$type) {
		case 'com.atproto.sync.subscribeRepos#commit': {
			// repository commit (record creates, updates, deletes)
			console.log('commit:', message.repo, message.rev);
			break;
		}

		case 'com.atproto.sync.subscribeRepos#handle': {
			// handle change
			console.log('handle:', message.did, message.handle);
			break;
		}

		case 'com.atproto.sync.subscribeRepos#migrate': {
			// account migration
			console.log('migrate:', message.did, message.migrateTo);
			break;
		}

		case 'com.atproto.sync.subscribeRepos#tombstone': {
			// account deletion
			console.log('tombstone:', message.did);
			break;
		}

		case 'com.atproto.sync.subscribeRepos#identity': {
			// identity update
			console.log('identity:', message.did);
			break;
		}

		case 'com.atproto.sync.subscribeRepos#account': {
			// account status change
			console.log('account:', message.did, message.active);
			break;
		}
	}
}

tracking cursor for resumption#

use a function for params to provide the current cursor on each connection attempt:

let cursor: number | undefined;

// load saved cursor if resuming
const saved = localStorage.getItem('firehose-cursor');
if (saved) {
	cursor = Number(saved);
}

const subscription = new FirehoseSubscription({
	service: 'wss://bsky.network',
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
	// function is called on each connection/reconnection
	params: () => ({ cursor }),
});

for await (const message of subscription) {
	if ('seq' in message) {
		cursor = message.seq;

		// periodically save cursor for recovery
		if (cursor % 1000 === 0) {
			localStorage.setItem('firehose-cursor', String(cursor));
		}
	}
}

the params function is called on each connection attempt, so when the WebSocket reconnects after a disconnection, it automatically uses the latest cursor value.

using multiple servers#

pass an array of URLs for automatic failover. the client randomly selects one on each connection:

const subscription = new FirehoseSubscription({
	service: ['wss://bsky.network', 'wss://bsky-relay.example.com'],
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
});

handling errors#

XRPC subscriptions can send error frames. handle them with the onError callback:

const subscription = new FirehoseSubscription({
	service: 'wss://bsky.network',
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
	onError(error, message) {
		console.error('firehose error:', error, message);
		// common errors:
		// - "FutureCursor": cursor is ahead of the server
		// - "ConsumerTooSlow": client is not consuming messages fast enough
	},
});

connection lifecycle callbacks#

handle connection events for logging or UI updates:

const subscription = new FirehoseSubscription({
	service: 'wss://bsky.network',
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
	onConnectionOpen(event) {
		console.log('connected to firehose');
	},
	onConnectionClose(event) {
		console.log('disconnected:', event.code, event.reason);
	},
	onConnectionError(event) {
		console.error('connection error:', event.error);
	},
});

updating options at runtime#

change options using updateOptions(). this triggers a reconnection:

const subscription = new FirehoseSubscription({
	service: 'wss://bsky.network',
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
});

// later, switch to a different service
subscription.updateOptions({
	service: 'wss://different-relay.example.com',
});

disabling message validation#

by default, messages are validated against the schema. disable this for better performance if you trust the server:

const subscription = new FirehoseSubscription({
	service: 'wss://bsky.network',
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
	validateMessages: false,
});

WebSocket options#

pass options to the underlying partysocket WebSocket for custom reconnection behavior:

const subscription = new FirehoseSubscription({
	service: 'wss://bsky.network',
	nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
	ws: {
		maxRetries: 10,
		minReconnectionDelay: 1000,
		maxReconnectionDelay: 30000,
	},
});