A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
TypeScript 99.6%
Dockerfile 0.4%
Other 0.1%
5 1 3

Clone this repository

https://tangled.org/skywatch.blue/skywatch-tail
git@tangled.org:skywatch.blue/skywatch-tail

For self-hosted knots, clone URLs may differ based on your setup.

README.md

Skywatch Tail#

A high-performance label capture and content hydration service for Bluesky moderation systems. Subscribe to a labeler's firehose, capture label events, and automatically hydrate associated content for machine learning training.

Features#

  • Real-time Label Capture: Subscribe to any Bluesky labeler's firehose via WebSocket
  • Automatic Content Hydration: Fetch full post records and user profiles for labeled content
  • Intelligent Filtering: Optionally filter labels by type to capture only what you need
  • Cursor Persistence: Resume from where you left off after restarts
  • Automatic Reconnection: Exponential backoff reconnection (1s-30s) for stability
  • DuckDB Storage: Embedded analytics database optimized for ML pipelines
  • Docker Ready: Containerized deployment with volume persistence
  • Type-Safe: Full TypeScript implementation with Zod validation

Architecture#

Firehose โ†’ Label Event โ†’ Filter โ†’ Store Label โ†’ Hydration Queue
                                        โ†“              โ†“
                                   DuckDB โ† [Post/Profile Fetch]

Components#

  • Firehose Subscriber: WebSocket client with DAG-CBOR decoding
  • Label Filter: Configurable allow-list for label types
  • Hydration Services: Automatic post and profile data fetching
  • Hydration Queue: Async queue with deduplication
  • Repository Layer: Clean database abstraction for all entities

Quick Start#

Prerequisites#

  • Docker and Docker Compose
  • Bluesky account with app password
  • Access to a labeler firehose endpoint

Installation#

  1. Clone the repository:
git clone <repository-url>
cd skywatch-tail
  1. Copy the example environment file:
cp .env.example .env
  1. Configure your .env file:
# Bluesky Credentials
BSKY_HANDLE=your-handle.bsky.social
BSKY_PASSWORD=your-app-password

# Labeler Configuration
WSS_URL=wss://your-labeler.com/xrpc/com.atproto.label.subscribeLabels

# Optional: Filter specific labels
CAPTURE_LABELS=spam,hate-speech,csam

# Logging
LOG_LEVEL=info
  1. Start with Docker Compose:
docker-compose up -d

Local Development#

Install dependencies with Bun:

bun install

Run in development mode:

bun run dev

Run tests:

bun test

Configuration#

All configuration is managed via environment variables:

Required#

  • BSKY_HANDLE: Your Bluesky handle
  • BSKY_PASSWORD: App password (not your main password)
  • WSS_URL: Labeler firehose WebSocket URL

Optional#

  • PDS: Bluesky PDS host (default: bsky.social)
  • CAPTURE_LABELS: Comma-separated list of label values to capture
  • DB_PATH: Path to DuckDB database file (default: ./data/skywatch.duckdb)
  • LOG_LEVEL: Logging level (default: info)
  • HYDRATE_BLOBS: Enable blob download (default: false) - Phase 4
  • BLOB_STORAGE_TYPE: Storage backend for blobs (local or s3) - Phase 4
  • BLOB_STORAGE_PATH: Local path for blob storage - Phase 4

S3 Configuration (Phase 4, Optional)#

  • S3_BUCKET: S3 bucket name
  • S3_REGION: AWS region
  • AWS_ACCESS_KEY_ID: AWS access key
  • AWS_SECRET_ACCESS_KEY: AWS secret key

Database Schema#

Labels Table#

Stores raw label events from the firehose.

  • id: Auto-incrementing primary key
  • uri: AT-URI or DID of labeled content
  • cid: Content identifier (optional)
  • val: Label value (e.g., "spam")
  • neg: Negation flag
  • cts: Created timestamp
  • exp: Expiration timestamp (optional)
  • src: Labeler DID

Posts Table#

Hydrated post data for labeled content.

  • uri: AT-URI (primary key)
  • did: Author DID
  • text: Post content
  • facets: Rich text annotations (JSON)
  • embeds: Embedded content (JSON)
  • langs: Language codes (JSON)
  • tags: Hashtags (JSON)
  • created_at: Post creation timestamp
  • is_reply: Reply flag

Profiles Table#

Hydrated user profile data.

  • did: User DID (primary key)
  • handle: User handle
  • display_name: Display name
  • description: Bio/description

Blobs Table (Phase 4)#

Image and video blob metadata.

  • post_uri: Associated post URI
  • blob_cid: Blob content identifier
  • sha256: Cryptographic hash
  • phash: Perceptual hash
  • storage_path: Local or S3 path (if downloaded)
  • mimetype: Content type

Label Filtering#

Filter labels by providing a comma-separated list in CAPTURE_LABELS:

CAPTURE_LABELS=spam,hate-speech,csam,scam

If not set, all labels are captured.

Data Persistence#

Cursor Persistence#

The application saves its position in the firehose to data/cursor.txt. On restart, it resumes from this cursor, preventing duplicate processing.

Database Persistence#

The DuckDB database is stored in the data/ directory, which is mounted as a Docker volume. Your data persists across container restarts.

Monitoring#

Logs are output in structured JSON format (production) or pretty-printed (development).

Key log events:

  • Firehose connected: Successfully connected to labeler
  • Firehose disconnected: Connection lost, will auto-reconnect
  • Received label: Label captured and stored
  • Post hydrated successfully: Post data fetched
  • Profile hydrated successfully: Profile data fetched

Development#

Project Structure#

skywatch-tail/
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ config/           # Environment validation
โ”‚   โ”œโ”€โ”€ database/         # Schema and repositories
โ”‚   โ”œโ”€โ”€ firehose/         # WebSocket subscriber
โ”‚   โ”œโ”€โ”€ hydration/        # Content hydration services
โ”‚   โ”œโ”€โ”€ logger/           # Pino logger setup
โ”‚   โ””โ”€โ”€ index.ts          # Main entry point
โ”œโ”€โ”€ tests/
โ”‚   โ”œโ”€โ”€ integration/      # Database integration tests
โ”‚   โ””โ”€โ”€ unit/             # Unit tests
โ”œโ”€โ”€ data/                 # Database and cursor storage
โ”œโ”€โ”€ docker-compose.yml
โ”œโ”€โ”€ Dockerfile
โ””โ”€โ”€ .env.example

Running Tests#

# All tests
bun test

# Specific test file
bun test tests/unit/decoder.test.ts

# Watch mode
bun test --watch

Database Access#

Access the DuckDB database directly:

# Using DuckDB CLI
duckdb data/skywatch.duckdb

# Query labels
SELECT val, COUNT(*) FROM labels GROUP BY val;

# Query recent posts
SELECT uri, text FROM posts ORDER BY created_at DESC LIMIT 10;

Roadmap#

  • Phase 1: Core infrastructure (Docker, config, database, logging)
  • Phase 2: Firehose connection and label capture
  • Phase 3: Content hydration (posts and profiles)
  • Phase 4: Blob processing (image/video hashing and storage)
  • Phase 5: Rate limiting and optimization
  • Phase 6: Comprehensive testing
  • Phase 7: Documentation

Safety Features#

Blob Hydration#

By default, HYDRATE_BLOBS is false. This prevents accidental download of potentially harmful content (CSAM, graphic violence, etc.) while still capturing cryptographic and perceptual hashes for ML training.

Only enable blob download if:

  1. You understand the legal and safety implications
  2. You have proper content storage policies in place
  3. You're operating in a jurisdiction where possessing such content for moderation is legal

License#

See LICENSE file for details.

Contributing#

Contributions welcome. Please ensure all tests pass before submitting PRs.

Support#

For issues and feature requests, please use the GitHub issue tracker.