Implementation Plan: Skywatch Tail#
Overview#
Build a TypeScript/Bun application that subscribes to a Bluesky labeler firehose, captures label events, hydrates associated content (posts/profiles), and stores everything in DuckDB for ML training.
Architecture#
Core Components#
-
Firehose Subscriber (
src/firehose/)- WebSocket connection to labeler service
- DAG-CBOR/CAR decoding using
@atcute/cborand@atcute/car - Label filtering based on
CAPTURE_LABELSconfig - Event queue management for hydration pipeline
-
Hydration Service (
src/hydration/)- Post hydrator: Fetches
app.bsky.feed.postrecords via@atproto/api - Profile hydrator: Fetches
app.bsky.actor.profilerecords and resolves handles - Blob processor: Extracts blob references from embeds
- Rate limiting and retry logic for API calls
- Post hydrator: Fetches
-
Blob Handler (
src/blobs/)- Conditional blob download based on
HYDRATE_BLOBSflag - SHA-256 cryptographic hashing (always computed)
- Perceptual hashing (pHash) for image similarity (always computed)
- Storage abstraction: local filesystem or S3
- Support for images and video blobs
- Conditional blob download based on
-
Database Layer (
src/database/)- DuckDB connection management
- Schema initialization and migrations
- Repository Pattern: CRUD operations for each entity (labels, posts, etc.) will be encapsulated in a dedicated repository module to isolate data access logic.
- Transaction handling for atomic writes
-
Configuration (
src/config/)- Environment variable parsing with validation (using Zod).
- Creates a single, immutable, type-safe configuration object.
- Defaults and optional parameters
-
Logging (
src/logger/)- Pino logger setup with pretty printing in dev
- Structured logging for debugging and monitoring
Architectural Patterns#
- Repository/Data Access Layer (DAL): Database interactions will be abstracted into repository modules. Business logic will call these repositories instead of directly querying the database, improving separation of concerns.
- Dependency Injection (DI): Services will receive their dependencies (like other services or database connections) via their constructor. This decouples components and makes unit testing significantly easier by allowing for mock dependencies.
Data Flow#
Firehose → Label Event → Filter Check → Hydration Queue
↓
[Post OR Profile Hydration]
↓
Extract Blob References
↓
[Compute Hashes / Optional Download]
↓
Store in DuckDB
Implementation Phases#
Testing will be conducted throughout each phase, not just at the end. Unit and integration tests should be written as components are built.
Commit changes after each phase passes testing to allow for easy rollback and debugging.
Phase 1: Foundation (Core Infrastructure)#
Goal: Set up project structure, dependencies, and basic configuration.
- Initialize TypeScript/Bun project
- Set up Docker and docker-compose.yml
- Implement type-safe configuration (
src/config) using Zod. - Initialize Pino logger (
src/logger) - Set up DuckDB connection and schema (
src/database) - Test: Write integration tests for database connection and schema validation.
Deliverables:
- Working Docker setup with volume mounts.
- Type-safe configuration loading and validation.
- Database schema initialized and tested.
Phase 2: Firehose Connection#
Goal: Connect to labeler firehose and parse label events.
- Implement WebSocket client for
com.atproto.label.subscribeLabels. - Implement DAG-CBOR decoding of label events.
- Implement label filtering logic.
- Store raw labels in the database using the Label Repository.
- Implement connection recovery and error handling.
- Test: Write unit tests for the decoder and filter. Write integration tests for firehose connection and data insertion.
Deliverables:
- Labels flowing from firehose into the database.
- Filter working correctly.
- Stable reconnection logic.
Phase 3: Content Hydration#
Goal: Fetch and store post and profile data.
- Implement post and profile hydration services (
src/hydration). - Use the Post and Profile Repositories to store data.
- Link hydrated content to labels via URI/DID.
- Test: Write unit tests for the data extraction and transformation logic.
Deliverables:
- Posts and profiles are automatically hydrated when labels are received.
- Data is correctly stored and linked in the database.
Phase 4: Blob Processing#
Goal: Handle image/video blobs with hashing and optional download.
- Implement blob processor to extract blob references.
- Implement hashing utilities (SHA-256, pHash).
- Implement conditional blob download and storage (local and S3).
- Use the Blob Repository to store metadata.
- Test: Write unit tests for hashing logic and integration tests for storage mechanisms.
Deliverables:
- Both hash types are captured for all blobs.
- Optional blob download works for both local and S3 storage.
- Blob metadata is linked to posts.
Phase 5: Rate Limiting & Optimization#
Goal: Ensure API compliance and performance.
- Implement a rate-limiting utility using a token bucket or similar algorithm.
- Integrate rate limiting into the hydration service.
- Implement a robust hydration queue.
- Add retry logic with exponential backoff for API calls.
- Test: Write unit tests for the rate limiter and retry logic.
Deliverables:
- No API rate limit violations under normal operation.
- Efficient resource usage and observable performance metrics.
Phase 6: Final Testing & Validation#
Goal: Ensure end-to-end reliability and expand test coverage.
- Implement end-to-end tests using a mock firehose.
- Conduct validation testing with a real labeler firehose.
- Review and improve test coverage, aiming for >80% on critical paths.
- Validate schema integrity and data relationships.
- Test and finalize Docker deployment.
Deliverables:
- High test coverage for critical paths.
- Docker deployment verified.
- End-to-end validation complete.
Phase 7: Documentation & Portability#
Goal: Make it easy for others to use.
- Write a comprehensive README with setup and deployment instructions.
- Create a fully commented
.env.examplefile. - Document the database schema.
- Provide a troubleshooting guide.
Deliverables:
- A new user can clone, configure, and run the application with minimal effort.
Technical Decisions#
Why DuckDB?#
- Embedded database (no separate server)
- Excellent analytics performance
- Easy to backup (single file)
- Great for ML pipelines
- JSON column support for complex fields
Why Bun?#
- Fast TypeScript runtime
- Native support for TypeScript, JSX, and Web APIs (like WebSocket).
- All-in-one toolchain (runtime, bundler, test runner).
Blob Storage Strategy#
- Always compute hashes: Even if not downloading, we need SHA-256 and pHash for data fingerprinting.
- Conditional download: A critical safety feature, especially when dealing with CSAM or other sensitive content labels.
- Storage abstraction: Allows for easy extension to other storage backends in the future.
Rate Limiting Approach#
- Track API calls per endpoint.
- Implement a token bucket algorithm to manage request rates.
- Queue hydration requests and process them according to rate limits.
- Prioritize queue based on label timestamp (FIFO).
Risk Mitigation#
Firehose Disconnection#
- Implement automatic reconnection with exponential backoff.
- Persist last processed cursor to a local file (
cursor.txt) to allow for seamless resume capability. - Log all disconnection and reconnection events for monitoring.
API Rate Limits#
- Implement conservative rate limiting to stay well under official limits.
- Graceful degradation: The hydration queue will grow if limits are hit, but the application will not crash.
- Monitor queue depth and API response headers.
Blob Safety#
HYDRATE_BLOBS=falseby default to prevent accidental download of sensitive material.- Provide clear documentation about the risks of enabling blob hydration.
- Hashes are computed from metadata or partial reads where possible, without downloading the full blob unless required.
Data Integrity#
- Use atomic database transactions for related inserts (e.g., a label and its hydrated post).
- Enforce data integrity with foreign key constraints in the database schema.
- Perform validation before writing data to the database.
Project Structure#
skywatch-capture/
├── src/
│ ├── index.ts # Main entry point
│ ├── config/
│ │ └── index.ts # Configuration object (with Zod validation)
│ ├── logger/
│ │ └── index.ts # Pino logger setup
│ ├── database/
│ │ ├── connection.ts # DuckDB connection
│ │ ├── schema.ts # Table definitions
│ │ ├── labels.repository.ts # Label repository
│ │ ├── posts.repository.ts # Post repository
│ │ ├── profiles.repository.ts# Profile repository
│ │ └── blobs.repository.ts # Blob repository
│ ├── firehose/
│ │ ├── subscriber.ts # WebSocket client
│ │ ├── decoder.ts # CBOR decoding
│ │ └── filter.ts # Label filtering
│ ├── hydration/
│ │ ├── posts.service.ts # Post hydration service
│ │ ├── profiles.service.ts # Profile hydration service
│ │ └── queue.ts # Hydration queue
│ ├── blobs/
│ │ ├── processor.ts # Blob extraction
│ │ ├── hasher.ts # SHA-256 & pHash
│ │ ├── downloader.ts # Blob download
│ │ └── storage/
│ │ ├── local.ts # Local filesystem storage
│ │ └── s3.ts # S3 storage
│ └── utils/
│ ├── rate-limit.ts # Rate limiting
│ └── retry.ts # Retry logic
├── tests/
│ ├── unit/
│ └── integration/
├── data/ # Volume mount point
│ ├── skywatch.duckdb # Database file
│ ├── cursor.txt # Last processed firehose cursor
│ └── blobs/ # Local blob storage
├── .env.example
├── docker-compose.yml
├── Dockerfile
├── package.json
├── tsconfig.json
└── README.md
Dependencies#
Core#
bun- Runtime & Test Runnertypescript- Language@atproto/api- Bluesky API client@atcute/cbor- CBOR decoding@atcute/car- CAR file handlingduckdb- Databasepino&pino-pretty- Loggingdotenv- Environment variableszod- Type-safe validation
Blob Processing#
crypto(built-in) - SHA-256 hashingsharporjimp- Image processing for pHash@aws-sdk/client-s3- S3 storage (optional)
Testing#
@types/*- TypeScript definitions
Success Criteria#
- ✅ Successfully connects to labeler firehose
- ✅ Correctly parses and stores label events
- ✅ Hydrates posts and profiles automatically
- ✅ Computes both SHA-256 and pHash for all blobs
- ✅ Conditionally downloads blobs based on config
- ✅ Stores all data in DuckDB with proper relationships
- ✅ Respects API rate limits
- ✅ Handles disconnections gracefully
- ✅ Runs in Docker with persistent storage
- ✅ Configurable via environment variables
- ✅ Documented and portable
Timeline Estimate#
- Phase 1: 1-2 days
- Phase 2: 2-3 days
- Phase 3: 2-3 days
- Phase 4: 3-4 days
- Phase 5: 2-3 days
- Phase 6: 2-3 days
- Phase 7: 1-2 days
Total: ~13-20 days for complete implementation
Next Steps#
- Review and approve this plan
- Set up development environment
- Begin Phase 1 implementation
- Iterate based on testing and feedback