Skywatch Tail#
A high-performance label capture and content hydration service for Bluesky moderation systems. Subscribe to a labeler's firehose, capture label events, and automatically hydrate associated content for machine learning training.
Features#
- Real-time Label Capture: Subscribe to any Bluesky labeler's firehose via WebSocket
- Automatic Content Hydration: Fetch full post records and user profiles for labeled content
- Blob Processing: SHA-256 and perceptual hashing for images/videos with optional download
- Intelligent Filtering: Optionally filter labels by type to capture only what you need
- Rate Limiting: Respects Bluesky API limits (3000 req/5min) with p-ratelimit
- Retry Logic: Automatic retry with exponential backoff for transient failures
- Cursor Persistence: Resume from where you left off after restarts
- Automatic Reconnection: Exponential backoff reconnection (1s-60s) for stability
- DuckDB Storage: Embedded analytics database optimized for ML pipelines
- Docker Ready: Containerized deployment with volume persistence
- Type-Safe: Full TypeScript implementation with Zod validation
Architecture#
Firehose → Label Event → Filter → Store Label → Hydration Queue
↓ ↓
DuckDB ← [Post/Profile Fetch] → Blob Processing
↓
Hash + Store
Components#
- Firehose Subscriber: WebSocket client with DAG-CBOR decoding
- Label Filter: Configurable allow-list for label types
- Hydration Services: Automatic post and profile data fetching with rate limiting
- Blob Processor: SHA-256 and perceptual hash computation with optional download
- Hydration Queue: Async queue with deduplication
- Rate Limiter: p-ratelimit enforcing 3000 requests per 5 minutes
- Retry Logic: Exponential backoff for transient failures
- Repository Layer: Clean database abstraction for all entities
Quick Start#
Prerequisites#
- Docker and Docker Compose
- Bluesky account with app password
- Access to a labeler firehose endpoint
Installation#
- Clone the repository:
git clone <repository-url>
cd skywatch-tail
- Copy the example environment file:
cp .env.example .env
- Configure your
.envfile:
# Bluesky Credentials
BSKY_HANDLE=your-handle.bsky.social
BSKY_PASSWORD=your-app-password
# Labeler Configuration
WSS_URL=wss://your-labeler.com/xrpc/com.atproto.label.subscribeLabels
# Optional: Filter specific labels
CAPTURE_LABELS=spam,hate-speech
# Logging
LOG_LEVEL=info
- Start with Docker Compose:
docker-compose up -d
Local Development#
Install dependencies with Bun:
bun install
Run in development mode:
bun run dev
Run tests:
bun test
Configuration#
All configuration is managed via environment variables:
Required#
BSKY_HANDLE: Your Bluesky handleBSKY_PASSWORD: App password (not your main password)WSS_URL: Labeler firehose WebSocket URL
Optional#
PDS: Bluesky PDS host (default:bsky.social)CAPTURE_LABELS: Comma-separated list of label values to captureDB_PATH: Path to DuckDB database file (default:./data/skywatch.duckdb)LOG_LEVEL: Logging level (default:info)HYDRATE_BLOBS: Enable blob download (default:false)BLOB_STORAGE_TYPE: Storage backend for blobs (localors3)BLOB_STORAGE_PATH: Local path for blob storage (default:./data/blobs)
S3 Configuration (Optional)#
S3_BUCKET: S3 bucket nameS3_REGION: AWS regionAWS_ACCESS_KEY_ID: AWS access keyAWS_SECRET_ACCESS_KEY: AWS secret key
Database Schema#
Labels Table#
Stores raw label events from the firehose.
id: Auto-incrementing primary keyuri: AT-URI or DID of labeled contentcid: Content identifier (optional)val: Label value (e.g., "spam")neg: Negation flagcts: Created timestampexp: Expiration timestamp (optional)src: Labeler DID
Posts Table#
Hydrated post data for labeled content.
uri: AT-URI (primary key)did: Author DIDtext: Post contentfacets: Rich text annotations (JSON)embeds: Embedded content (JSON)langs: Language codes (JSON)tags: Hashtags (JSON)created_at: Post creation timestampis_reply: Reply flag
Profiles Table#
Hydrated user profile data.
did: User DID (primary key)handle: User handledisplay_name: Display namedescription: Bio/description
Blobs Table#
Image and video blob metadata.
post_uri: Associated post URIblob_cid: Blob content identifiersha256: Cryptographic hashphash: Perceptual hashstorage_path: Local or S3 path (if downloaded)mimetype: Content type
Label Filtering#
Filter labels by providing a comma-separated list in CAPTURE_LABELS:
CAPTURE_LABELS=spam,hate-speech,scam
If not set, all labels are captured.
Data Persistence#
Cursor Persistence#
The application saves its position in the firehose to data/cursor.txt. On restart, it resumes from this cursor, preventing duplicate processing.
Database Persistence#
The DuckDB database is stored in the data/ directory, which is mounted as a Docker volume. Your data persists across container restarts.
Monitoring#
Logs are output in structured JSON format (production) or pretty-printed (development).
Key log events:
Firehose connected: Successfully connected to labelerFirehose disconnected: Connection lost, will auto-reconnectReceived label: Label captured and storedPost hydrated successfully: Post data fetchedProfile hydrated successfully: Profile data fetchedBlob processed: Blob hashed and optionally stored
Rate Limiting#
The service implements p-ratelimit to respect Bluesky's API limits:
- Limit: 3000 requests per 5 minutes per IP address
- Concurrency: Up to 48 concurrent requests
- Backoff: Automatic delays when approaching limits
- Retry Logic: Exponential backoff for rate limit errors (1s-10s)
Development#
Project Structure#
skywatch-tail/
├── src/
│ ├── blobs/ # Blob processing and storage
│ ├── config/ # Environment validation
│ ├── database/ # Schema and repositories
│ ├── firehose/ # WebSocket subscriber
│ ├── hydration/ # Content hydration services
│ ├── logger/ # Pino logger setup
│ ├── utils/ # Retry logic and helpers
│ └── index.ts # Main entry point
├── tests/
│ ├── integration/ # Database integration tests
│ └── unit/ # Unit tests
├── data/ # Database and cursor storage
├── docker-compose.yml
├── Dockerfile
└── .env.example
Running Tests#
# All tests
bun test
# Specific test file
bun test tests/unit/decoder.test.ts
# Watch mode
bun test --watch
Database Access#
Access the DuckDB database directly:
# Using DuckDB CLI
duckdb data/skywatch.duckdb
# Query labels
SELECT val, COUNT(*) FROM labels GROUP BY val;
# Query recent posts
SELECT uri, text FROM posts ORDER BY created_at DESC LIMIT 10;
Roadmap#
- Phase 1: Core infrastructure (Docker, config, database, logging)
- Phase 2: Firehose connection and label capture
- Phase 3: Content hydration (posts and profiles)
- Phase 4: Blob processing (image/video hashing and storage)
- Phase 5: Rate limiting and optimization
- Phase 6: Comprehensive testing
- Phase 7: Documentation
Safety Features#
Blob Hydration#
By default, HYDRATE_BLOBS is false. This prevents accidental download of potentially harmful / and or unlawful content (CSAM, graphic violence, etc.) while still capturing cryptographic and perceptual hashes.
Only enable blob download if:
- You understand the legal and safety implications
- You have proper content storage policies in place
- You're operating in a jurisdiction where possessing such content for moderation is legal
License#
See LICENSE file for details.
Contributing#
Contributions welcome. Please ensure all tests pass before submitting PRs.
Support#
For issues and feature requests, please use the GitHub issue tracker.