Serverless #atproto Jetstream to Webhook Connector#
A Cloudflare Worker that connects to the atproto Jetstream to process events and forward them to a webhook endpoint.
Why might this be useful? This is an experimental setup for small atproto apps running on serverless systems (eg Next.js + Vercel) that want to subscribe to realtime events from the firehose/jetstream, but prefer to consume the events as webhooks instead of running a separate service that consumes the jetstream or firehose directly.
Flow: CF Worker + Durable Object => Listens to Jetstream for specified collection events => Adds new events to CF Queue => CF Queue worker processes events and forwards to your webhook.
Features#
- Real-time Event Processing: Connects to Bluesky Jetstream WebSocket to receive live AT Protocol events
- Configurable Collections: Subscribe to a set of collections via environment variables
- Queue-based Architecture: Uses Cloudflare Queues for reliable event processing and webhook delivery.
- Webhook Integration: Forwards events to your webhook endpoint with optional bearer token authentication
- Cursor Tracking: Maintains cursor position for gapless playback during reconnections
- Stats Collection: Tracks event counts per collection and total processing stats
- Web Dashboard: HTML interface to view processing statistics
- Auto-Reconnection: Handles WebSocket disconnections with exponential backoff
- Persistent Storage: Uses Durable Object storage to maintain state across deployments
- Fully Configurable: No hardcoded URLs or collections - easy to adapt for any project
Quick Start#
Local Development#
# 1. Clone and install dependencies
pnpm install
# 2. Set up local environment
pnpm run setup-local
# This creates .dev.vars from template
# 3. Edit .dev.vars with your configuration
nano .dev.vars
# 4. Create Cloudflare Queues (one-time setup)
pnpm run setup-queues
# 5. Start development server
pnpm run dev
# 6. View dashboard
open http://localhost:8787/stats/html
Production Deployment#
# 1. Configure environment variables and secrets
pnpm run setup-config
# 2. Deploy to Cloudflare
pnpm run deploy
Configuration#
This worker is fully configurable via environment variables:
| Variable | Description | Example |
|---|---|---|
WEBHOOK_URL |
Required - Your webhook endpoint | https://example.com/api/webhooks/jetstream |
JETSTREAM_COLLECTIONS |
Required - Collections to watch (comma-separated) | app.bsky.feed.post,app.bsky.graph.follow |
WEBHOOK_BEARER_TOKEN |
Optional - Bearer token for webhook authentication | your-secret-token |
Example Collections#
# Social media activity
JETSTREAM_COLLECTIONS=app.bsky.feed.post,app.bsky.graph.follow,app.bsky.feed.like
# Profile updates
JETSTREAM_COLLECTIONS=app.bsky.actor.profile
# Custom AT Protocol collections
JETSTREAM_COLLECTIONS=com.example.app.*,org.myproject.data.*
# Watch everything (high volume!)
JETSTREAM_COLLECTIONS=*
Local Development (.dev.vars)#
# Copy template and edit
cp .dev.vars.example .dev.vars
# Example configuration
WEBHOOK_URL=https://example.com/api/webhooks/jetstream-event
JETSTREAM_COLLECTIONS=app.bsky.feed.post,app.bsky.graph.follow
WEBHOOK_BEARER_TOKEN=your-development-token
Production (Cloudflare Secrets)#
# Set via interactive script
pnpm run setup-config
# Or manually
wrangler secret put WEBHOOK_URL
wrangler secret put JETSTREAM_COLLECTIONS
wrangler secret put WEBHOOK_BEARER_TOKEN
Endpoints#
GET /- API information and available endpointsGET /stats- JSON statistics of processed eventsGET /stats/html- HTML dashboard with real-time statistics (auto-refreshes every 30s)GET /status- WebSocket connection statusGET /health- Health check endpointPOST /reset- Reset all statisticsPOST /reconnect- Force WebSocket reconnection
Architecture#
Unified Worker Design#
This worker handles both event processing and queue consumption in a single deployment:
- Jetstream Processing (Durable Object): WebSocket connection, event filtering, queueing
- Queue Consumption (Queue Handler): Batch processing and webhook delivery
- HTTP API (Fetch Handler): Stats, dashboard, and control endpoints
Jetstream Events → Durable Object → Cloudflare Queue → Queue Handler → Your Webhook
Durable Object: JetstreamProcessor#
The core processing logic runs in a single Durable Object instance that:
- Establishes WebSocket Connection: Connects to
wss://jetstream1.us-west.bsky.network/subscribe - Filters Events: Only receives events from collections specified in
JETSTREAM_COLLECTIONS - Processes Events: For each received commit event:
- Skips identity and account events (only processes commits)
- Updates the cursor with the event's
time_us - Increments collection-specific counters
- Queues the event for webhook delivery
- Persists statistics every 100 events
- Handles Reconnections: Automatically reconnects on disconnection with cursor for gapless playback
Queue Consumer#
The queue handler processes events in batches and delivers them to your webhook with:
- Batch processing: Up to 10 events per batch
- Automatic retries: 3 retry attempts with dead letter queue
- Bearer token authentication: Optional secure webhook delivery
Event Types Processed#
The processor handles Jetstream events with these kind values:
commit: Repository commits with operations (create, update, delete) - PROCESSEDidentity: Identity/handle updates - SKIPPEDaccount: Account status changes - SKIPPED
Data Persistence#
Statistics are stored in Durable Object storage:
interface StoredStats {
cursor: number; // Latest time_us for reconnection
eventCounts: Record<string, number>; // Events per collection
totalEvents: number; // Total commit events processed
totalReceived: number; // Total events received (including skipped)
lastEventTime: string; // ISO timestamp of last processing
}
Monitoring#
Real-time Dashboard#
Visit /stats/html for a web interface showing:
- Commit events processed - Only the events you care about
- Total events received - All events from Jetstream (including skipped)
- Processing efficiency - Percentage of useful vs total events
- Unique collections - Number of different collections processed
- Last event timestamp - When the most recent event was received
- Events breakdown by collection - Detailed stats per collection
- Auto-refresh every 30 seconds
API Monitoring#
# Check connection status
curl http://localhost:8787/status
# Get current statistics
curl http://localhost:8787/stats
# Check health
curl http://localhost:8787/health
# Force reconnection if needed
curl -X POST http://localhost:8787/reconnect
Webhook Integration#
Each commit event is posted to your webhook endpoint as JSON with optional bearer token authentication:
POST {WEBHOOK_URL}
Content-Type: application/json
Authorization: Bearer {WEBHOOK_BEARER_TOKEN}
User-Agent: Jetstream-Unified/1.0
{
"did": "did:plc:...",
"time_us": 1725911162329308,
"kind": "commit",
"commit": {
"rev": "3l3qo2vutsw2b",
"operation": "create",
"collection": "app.bsky.feed.post",
"rkey": "3l3qo2vuowo2b",
"record": {
"$type": "app.bsky.feed.post",
"createdAt": "2024-09-09T19:46:02.102Z",
"text": "Hello, world!",
// ... record data
},
"cid": "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
}
}
Error Handling#
- WebSocket Errors: Automatic reconnection with exponential backoff
- Webhook Failures: Automatic retries via Cloudflare Queues with dead letter queue
- Parse Errors: Individual event failures don't crash the processor
- Storage Errors: Graceful degradation with in-memory fallback
- Configuration Errors: Clear error messages for missing required environment variables
Development Commands#
# Local development
pnpm run dev # Start development server
pnpm run setup-local # Set up local environment
# Configuration
pnpm run setup-config # Interactive production setup
pnpm run setup-queues # Create Cloudflare Queues (one-time)
# Deployment
pnpm run deploy # Deploy to Cloudflare
pnpm run cf-typegen # Regenerate TypeScript types
Project Structure#
src/
├── types.ts # Shared TypeScript interfaces
└── index.ts # Main worker (Durable Object + Queue Consumer)
wrangler.jsonc # Cloudflare Worker configuration
.dev.vars.example # Environment variables template
setup-*.sh # Setup scripts for queues and configuration
Deployment#
The worker automatically starts processing events upon deployment. The Durable Object ensures only one instance runs globally, maintaining connection state across worker invocations.
Adapting for Your Project#
This worker is designed to be easily adaptable:
- Fork the repository
- Configure your environment variables:
- Set your webhook URL
- Choose your AT Protocol collections
- Add authentication if needed
- Deploy to Cloudflare
- Monitor via the dashboard
No code changes required - everything is configurable via environment variables!
License#
MIT