A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

Skywatch Tail#

A high-performance label capture and content hydration service for Bluesky moderation systems. Subscribe to a labeler's firehose, capture label events, and automatically hydrate associated content for machine learning training.

Features#

  • Real-time Label Capture: Subscribe to any Bluesky labeler's firehose via WebSocket
  • Automatic Content Hydration: Fetch full post records and user profiles for labeled content
  • Blob Processing: SHA-256 and perceptual hashing for images/videos with optional download
  • Intelligent Filtering: Optionally filter labels by type to capture only what you need
  • Rate Limiting: Respects Bluesky API limits (3000 req/5min) with p-ratelimit
  • Retry Logic: Automatic retry with exponential backoff for transient failures
  • Cursor Persistence: Resume from where you left off after restarts
  • Automatic Reconnection: Exponential backoff reconnection (1s-60s) for stability
  • DuckDB Storage: Embedded analytics database optimized for ML pipelines
  • Docker Ready: Containerized deployment with volume persistence
  • Type-Safe: Full TypeScript implementation with Zod validation

Architecture#

Firehose → Label Event → Filter → Store Label → Hydration Queue
                                        ↓              ↓
                                   DuckDB ← [Post/Profile Fetch] → Blob Processing
                                                                         ↓
                                                                    Hash + Store

Components#

  • Firehose Subscriber: WebSocket client with DAG-CBOR decoding
  • Label Filter: Configurable allow-list for label types
  • Hydration Services: Automatic post and profile data fetching with rate limiting
  • Blob Processor: SHA-256 and perceptual hash computation with optional download
  • Hydration Queue: Async queue with deduplication
  • Rate Limiter: p-ratelimit enforcing 3000 requests per 5 minutes
  • Retry Logic: Exponential backoff for transient failures
  • Repository Layer: Clean database abstraction for all entities

Quick Start#

Prerequisites#

  • Docker and Docker Compose
  • Bluesky account with app password
  • Access to a labeler firehose endpoint

Installation#

  1. Clone the repository:
git clone <repository-url>
cd skywatch-tail
  1. Copy the example environment file:
cp .env.example .env
  1. Configure your .env file:
# Bluesky Credentials
BSKY_HANDLE=your-handle.bsky.social
BSKY_PASSWORD=your-app-password

# Labeler Configuration
WSS_URL=wss://your-labeler.com/xrpc/com.atproto.label.subscribeLabels

# Optional: Filter specific labels
CAPTURE_LABELS=spam,hate-speech

# Logging
LOG_LEVEL=info
  1. Start with Docker Compose:
docker-compose up -d

Local Development#

Install dependencies with Bun:

bun install

Run in development mode:

bun run dev

Run tests:

bun test

Configuration#

All configuration is managed via environment variables:

Required#

  • BSKY_HANDLE: Your Bluesky handle
  • BSKY_PASSWORD: App password (not your main password)
  • WSS_URL: Labeler firehose WebSocket URL

Optional#

  • PDS: Bluesky PDS host (default: bsky.social)
  • CAPTURE_LABELS: Comma-separated list of label values to capture
  • DB_PATH: Path to DuckDB database file (default: ./data/skywatch.duckdb)
  • LOG_LEVEL: Logging level (default: info)
  • HYDRATE_BLOBS: Enable blob download (default: false)
  • BLOB_STORAGE_TYPE: Storage backend for blobs (local or s3)
  • BLOB_STORAGE_PATH: Local path for blob storage (default: ./data/blobs)

S3 Configuration (Optional)#

  • S3_BUCKET: S3 bucket name
  • S3_REGION: AWS region
  • AWS_ACCESS_KEY_ID: AWS access key
  • AWS_SECRET_ACCESS_KEY: AWS secret key

Database Schema#

Labels Table#

Stores raw label events from the firehose.

  • id: Auto-incrementing primary key
  • uri: AT-URI or DID of labeled content
  • cid: Content identifier (optional)
  • val: Label value (e.g., "spam")
  • neg: Negation flag
  • cts: Created timestamp
  • exp: Expiration timestamp (optional)
  • src: Labeler DID

Posts Table#

Hydrated post data for labeled content.

  • uri: AT-URI (primary key)
  • did: Author DID
  • text: Post content
  • facets: Rich text annotations (JSON)
  • embeds: Embedded content (JSON)
  • langs: Language codes (JSON)
  • tags: Hashtags (JSON)
  • created_at: Post creation timestamp
  • is_reply: Reply flag

Profiles Table#

Hydrated user profile data.

  • did: User DID (primary key)
  • handle: User handle
  • display_name: Display name
  • description: Bio/description

Blobs Table#

Image and video blob metadata.

  • post_uri: Associated post URI
  • blob_cid: Blob content identifier
  • sha256: Cryptographic hash
  • phash: Perceptual hash
  • storage_path: Local or S3 path (if downloaded)
  • mimetype: Content type

Label Filtering#

Filter labels by providing a comma-separated list in CAPTURE_LABELS:

CAPTURE_LABELS=spam,hate-speech,scam

If not set, all labels are captured.

Data Persistence#

Cursor Persistence#

The application saves its position in the firehose to data/cursor.txt. On restart, it resumes from this cursor, preventing duplicate processing.

Database Persistence#

The DuckDB database is stored in the data/ directory, which is mounted as a Docker volume. Your data persists across container restarts.

Monitoring#

Logs are output in structured JSON format (production) or pretty-printed (development).

Key log events:

  • Firehose connected: Successfully connected to labeler
  • Firehose disconnected: Connection lost, will auto-reconnect
  • Received label: Label captured and stored
  • Post hydrated successfully: Post data fetched
  • Profile hydrated successfully: Profile data fetched
  • Blob processed: Blob hashed and optionally stored

Rate Limiting#

The service implements p-ratelimit to respect Bluesky's API limits:

  • Limit: 3000 requests per 5 minutes per IP address
  • Concurrency: Up to 48 concurrent requests
  • Backoff: Automatic delays when approaching limits
  • Retry Logic: Exponential backoff for rate limit errors (1s-10s)

Development#

Project Structure#

skywatch-tail/
├── src/
│   ├── blobs/            # Blob processing and storage
│   ├── config/           # Environment validation
│   ├── database/         # Schema and repositories
│   ├── firehose/         # WebSocket subscriber
│   ├── hydration/        # Content hydration services
│   ├── logger/           # Pino logger setup
│   ├── utils/            # Retry logic and helpers
│   └── index.ts          # Main entry point
├── tests/
│   ├── integration/      # Database integration tests
│   └── unit/             # Unit tests
├── data/                 # Database and cursor storage
├── docker-compose.yml
├── Dockerfile
└── .env.example

Running Tests#

# All tests
bun test

# Specific test file
bun test tests/unit/decoder.test.ts

# Watch mode
bun test --watch

Database Access#

Access the DuckDB database directly:

# Using DuckDB CLI
duckdb data/skywatch.duckdb

# Query labels
SELECT val, COUNT(*) FROM labels GROUP BY val;

# Query recent posts
SELECT uri, text FROM posts ORDER BY created_at DESC LIMIT 10;

Roadmap#

  • Phase 1: Core infrastructure (Docker, config, database, logging)
  • Phase 2: Firehose connection and label capture
  • Phase 3: Content hydration (posts and profiles)
  • Phase 4: Blob processing (image/video hashing and storage)
  • Phase 5: Rate limiting and optimization
  • Phase 6: Comprehensive testing
  • Phase 7: Documentation

Safety Features#

Blob Hydration#

By default, HYDRATE_BLOBS is false. This prevents accidental download of potentially harmful / and or unlawful content (CSAM, graphic violence, etc.) while still capturing cryptographic and perceptual hashes.

Only enable blob download if:

  1. You understand the legal and safety implications
  2. You have proper content storage policies in place
  3. You're operating in a jurisdiction where possessing such content for moderation is legal

License#

See LICENSE file for details.

Contributing#

Contributions welcome. Please ensure all tests pass before submitting PRs.

Support#

For issues and feature requests, please use the GitHub issue tracker.