commits
Add unit tests for the firehose subscriber
Adds unit tests for the firehose subscriber to test the label events and
reconnection logic.
fix: blob deduplication logic using proper CID lookup
feat: hydrate profile avatars and banners
Initialize LocalBlobStorage or S3BlobStorage when HYDRATE_BLOBS=true.
Actually store profile blob files to disk and save storage_path to database.
Previously only computed hashes without saving actual image data.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Comprehensive documentation of key learnings including:
- CID deserialization quirks in @atproto/api
- PDS endpoint resolution via PLC directory
- Proper blob fetching with AT Protocol API
- Database schema design for profile vs post blobs
- Change tracking and re-hydration logic
- Common errors and solutions
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add captured_at timestamp to track when blobs were observed
- Change PK to (did, blob_type, captured_at) to allow history
- Add findLatestByDidAndType method to check current state
- Only insert new row if CID has changed from latest
- Enables tracking when users change avatars/banners
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- New profile_blobs table with FK to profiles(did)
- Restored FK constraint on blobs table to posts(uri)
- Created ProfileBlobsRepository for profile blob operations
- Primary key on (did, blob_type) ensures one avatar and one banner per profile
- Proper relational model for later analysis and queries
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Profile blobs use profile:// URIs that don't exist in posts table. Drop the foreign key constraint to allow storing blobs from both posts and profiles.
Add migration to recreate blobs table without constraint for existing databases.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Defaults to https://plc.wtf for fast DID resolution. Can be overridden to use plc.directory or custom PLC instance.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Query plc.wtf to get each user's actual PDS endpoint instead of assuming bsky.social. Fixes RepoNotFound errors when fetching blobs from users on different PDS instances.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
The @atproto/api library deserializes blob refs into CID objects, not plain objects with $link. Call toString() to get the CID string.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Add fallback to check if ref is CID string directly and log ref structure to diagnose why CIDs aren't being extracted despite being present in response.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Remove blob processor dependency for profile images
- Fetch blobs directly using AtpAgent.com.atproto.sync.getBlob()
- Compute hashes and store in blobs table directly
- Simpler, cleaner implementation without profile:// URI hack
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Replace CDN URL fetching with com.atproto.sync.getBlob XRPC endpoint.
Works for all blob types (posts, avatars, banners) via PDS.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add URI parsing for profile:// scheme (profile://did/avatar or profile://did/banner)
- Use correct CDN paths for avatars (img/avatar) and banners (img/banner)
- Keep existing feed_thumbnail/feed_fullsize paths for post blobs
- Add type tracking to blob processing logs
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Re-hydrate profiles with NULL avatar_cid/banner_cid fields
- Use empty string as sentinel for "no avatar/banner" vs NULL for "not checked"
- Add debug logging to inspect profile record structure
- Skip blob processing for empty CID strings
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Handles case where profiles table exists but lacks avatar_cid/banner_cid columns by checking schema and running ALTER TABLE if needed.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Extends profile hydration to capture and process avatar and banner images
from app.bsky.actor.profile records. Profile blobs are processed using the
existing blob infrastructure for deduplication, hashing, and storage.
Changes:
- Add avatar_cid and banner_cid columns to profiles table
- Update Profile interface and repository to support new fields
- Extract avatar/banner blob references from profile records
- Process profile blobs using BlobProcessor with special URI format
(profile://{did}/avatar and profile://{did}/banner)
- Add test coverage for profiles with avatar/banner CIDs
- Reuse existing blob deduplication and storage logic
Profile blobs are treated the same as post blobs, respecting the
HYDRATE_BLOBS configuration and benefiting from cross-entity deduplication.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
The blob processor was using `findBySha256(ref.cid)` which incorrectly
passed a CID to a method expecting a SHA256 hash. This caused the
deduplication check to never find matches, resulting in blobs being
unnecessarily reprocessed regardless of the hydrate_blobs setting.
Changes:
- Add `findByCid` method to BlobsRepository for proper CID lookup
- Update processor to use CID-based deduplication
- When blob exists, reuse hashes but still insert post+blob relationship
- Add index on blob_cid column for query performance
- Add test coverage for new findByCid method
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add isRecordNotFoundError helper to detect permanent failures
- Handle RecordNotFound errors in profile and post hydration
- Log warning and skip missing records instead of throwing
- Prevents retries for deleted or non-existent content
Deleted posts and profiles are common in moderation contexts.
This change treats them as expected conditions rather than errors.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Updated architecture diagram to include blob processing flow
- Added blob processor and rate limiter to components list
- Removed "Phase 4" markers from configuration options
- Updated project structure to show blobs/ and utils/ directories
- Marked phases 4-5 as complete in roadmap
- Added Rate Limiting section explaining p-ratelimit behavior
- Added blob processing log event to monitoring section
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Delete src/agent.ts (superseded by hydration services)
- Delete src/firehose.ts (superseded by firehose/ directory)
These were reference implementations used during development.
All tests still passing.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Replace custom RateLimiter with p-ratelimit library
- Configure: 3000 requests per 5min, 48 concurrency, 60s max delay
- Wrap API calls with p-ratelimit + retry logic
- All tests passing
Uses existing dependency instead of reinventing the wheel.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add rate limiter to post and profile hydration (3k per 5min)
- Configure rate limiter: 3000 tokens, 10/100ms refill (600/min)
- Wrap API calls with retry logic (3 attempts, exponential backoff)
- Handle rate limit, network, and server errors gracefully
- All tests passing
Rate limits match Bluesky API: 3000 requests per 5 minutes per IP.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add token bucket rate limiter with configurable refill
- Implement multi-endpoint rate limiting
- Create retry utility with exponential backoff
- Add retryable error detection (rate limit, network, server errors)
- All tests passing
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add blob hashing utilities (SHA-256 and perceptual hash)
- Implement blob processor to extract references from embeds
- Create local and S3 storage backends
- Integrate blob processing into post hydration pipeline
- Update config schema for blob hydration settings
- Fix decoder tests for plural extractLabelsFromMessage
Blobs are always hashed for fingerprinting but only downloaded
if HYDRATE_BLOBS=true for safety (CSAM/sensitive content).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Switch from decode to decodeFirst for proper frame handling
- Decode header and body separately to handle message framing
- Update label extraction to return array (messages can have multiple labels)
- Process seq for cursor tracking before label processing
- Improve error logging with actual error messages
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Comprehensive documentation covering:
- Feature overview and architecture
- Quick start with Docker
- Complete configuration reference
- Database schema documentation
- Development guidelines
- Safety features and warnings
Includes examples for common tasks and monitoring.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implements automatic post and profile hydration:
- Post hydration service via @atproto/api
* Fetches full post records (text, facets, embeds, langs, tags)
* Detects reply status
* Skips already-hydrated content
- Profile hydration service
* Fetches profile records (displayName, description)
* Resolves handles via getProfile
* Links DID to handle
- Asynchronous hydration queue
* Deduplicates tasks
* Prevents concurrent processing of same resource
* FIFO ordering
- Automatic URI parsing and routing
* at:// URIs with 3 parts → post hydration
* did: URIs → profile hydration
Integration:
- Labels trigger hydration on receipt
- Queue processes tasks asynchronously
- Both services authenticate with Bluesky on startup
Tests: 4 new queue tests (27 total, all passing)
All Phase 3 deliverables complete.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implements complete firehose subscription pipeline:
- WebSocket subscriber with exponential backoff reconnection
- DAG-CBOR decoding for label events
- Label filtering with configurable allow-list
- Cursor persistence for resume capability (cursor.txt)
- Integration with database repositories
- Graceful error handling and logging
Key features:
- Automatic reconnection with 1s-30s backoff
- Filter labels via CAPTURE_LABELS env var
- Stores cursor to resume from last processed event
- Validates all label events before processing
- Complete unit test coverage (14 new tests)
All Phase 2 deliverables complete and tested (23 tests passing).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implements the foundation for Skywatch Tail:
- Docker setup with docker-compose for containerized deployment
- Type-safe configuration using Zod with .env validation
- Pino logger with pretty printing for development
- DuckDB database with complete schema:
* labels table with auto-incrementing sequence
* posts table for hydrated post data
* profiles table for user account data
* blobs table for image/video metadata
- Repository pattern for all database entities
- Integration tests for all repositories (100% passing)
All Phase 1 deliverables complete and tested.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Initialize LocalBlobStorage or S3BlobStorage when HYDRATE_BLOBS=true.
Actually store profile blob files to disk and save storage_path to database.
Previously only computed hashes without saving actual image data.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Comprehensive documentation of key learnings including:
- CID deserialization quirks in @atproto/api
- PDS endpoint resolution via PLC directory
- Proper blob fetching with AT Protocol API
- Database schema design for profile vs post blobs
- Change tracking and re-hydration logic
- Common errors and solutions
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add captured_at timestamp to track when blobs were observed
- Change PK to (did, blob_type, captured_at) to allow history
- Add findLatestByDidAndType method to check current state
- Only insert new row if CID has changed from latest
- Enables tracking when users change avatars/banners
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- New profile_blobs table with FK to profiles(did)
- Restored FK constraint on blobs table to posts(uri)
- Created ProfileBlobsRepository for profile blob operations
- Primary key on (did, blob_type) ensures one avatar and one banner per profile
- Proper relational model for later analysis and queries
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Profile blobs use profile:// URIs that don't exist in posts table. Drop the foreign key constraint to allow storing blobs from both posts and profiles.
Add migration to recreate blobs table without constraint for existing databases.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Remove blob processor dependency for profile images
- Fetch blobs directly using AtpAgent.com.atproto.sync.getBlob()
- Compute hashes and store in blobs table directly
- Simpler, cleaner implementation without profile:// URI hack
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add URI parsing for profile:// scheme (profile://did/avatar or profile://did/banner)
- Use correct CDN paths for avatars (img/avatar) and banners (img/banner)
- Keep existing feed_thumbnail/feed_fullsize paths for post blobs
- Add type tracking to blob processing logs
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Re-hydrate profiles with NULL avatar_cid/banner_cid fields
- Use empty string as sentinel for "no avatar/banner" vs NULL for "not checked"
- Add debug logging to inspect profile record structure
- Skip blob processing for empty CID strings
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Extends profile hydration to capture and process avatar and banner images
from app.bsky.actor.profile records. Profile blobs are processed using the
existing blob infrastructure for deduplication, hashing, and storage.
Changes:
- Add avatar_cid and banner_cid columns to profiles table
- Update Profile interface and repository to support new fields
- Extract avatar/banner blob references from profile records
- Process profile blobs using BlobProcessor with special URI format
(profile://{did}/avatar and profile://{did}/banner)
- Add test coverage for profiles with avatar/banner CIDs
- Reuse existing blob deduplication and storage logic
Profile blobs are treated the same as post blobs, respecting the
HYDRATE_BLOBS configuration and benefiting from cross-entity deduplication.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
The blob processor was using `findBySha256(ref.cid)` which incorrectly
passed a CID to a method expecting a SHA256 hash. This caused the
deduplication check to never find matches, resulting in blobs being
unnecessarily reprocessed regardless of the hydrate_blobs setting.
Changes:
- Add `findByCid` method to BlobsRepository for proper CID lookup
- Update processor to use CID-based deduplication
- When blob exists, reuse hashes but still insert post+blob relationship
- Add index on blob_cid column for query performance
- Add test coverage for new findByCid method
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add isRecordNotFoundError helper to detect permanent failures
- Handle RecordNotFound errors in profile and post hydration
- Log warning and skip missing records instead of throwing
- Prevents retries for deleted or non-existent content
Deleted posts and profiles are common in moderation contexts.
This change treats them as expected conditions rather than errors.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Updated architecture diagram to include blob processing flow
- Added blob processor and rate limiter to components list
- Removed "Phase 4" markers from configuration options
- Updated project structure to show blobs/ and utils/ directories
- Marked phases 4-5 as complete in roadmap
- Added Rate Limiting section explaining p-ratelimit behavior
- Added blob processing log event to monitoring section
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Delete src/agent.ts (superseded by hydration services)
- Delete src/firehose.ts (superseded by firehose/ directory)
These were reference implementations used during development.
All tests still passing.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Replace custom RateLimiter with p-ratelimit library
- Configure: 3000 requests per 5min, 48 concurrency, 60s max delay
- Wrap API calls with p-ratelimit + retry logic
- All tests passing
Uses existing dependency instead of reinventing the wheel.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add rate limiter to post and profile hydration (3k per 5min)
- Configure rate limiter: 3000 tokens, 10/100ms refill (600/min)
- Wrap API calls with retry logic (3 attempts, exponential backoff)
- Handle rate limit, network, and server errors gracefully
- All tests passing
Rate limits match Bluesky API: 3000 requests per 5 minutes per IP.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add token bucket rate limiter with configurable refill
- Implement multi-endpoint rate limiting
- Create retry utility with exponential backoff
- Add retryable error detection (rate limit, network, server errors)
- All tests passing
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Add blob hashing utilities (SHA-256 and perceptual hash)
- Implement blob processor to extract references from embeds
- Create local and S3 storage backends
- Integrate blob processing into post hydration pipeline
- Update config schema for blob hydration settings
- Fix decoder tests for plural extractLabelsFromMessage
Blobs are always hashed for fingerprinting but only downloaded
if HYDRATE_BLOBS=true for safety (CSAM/sensitive content).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Switch from decode to decodeFirst for proper frame handling
- Decode header and body separately to handle message framing
- Update label extraction to return array (messages can have multiple labels)
- Process seq for cursor tracking before label processing
- Improve error logging with actual error messages
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Comprehensive documentation covering:
- Feature overview and architecture
- Quick start with Docker
- Complete configuration reference
- Database schema documentation
- Development guidelines
- Safety features and warnings
Includes examples for common tasks and monitoring.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implements automatic post and profile hydration:
- Post hydration service via @atproto/api
* Fetches full post records (text, facets, embeds, langs, tags)
* Detects reply status
* Skips already-hydrated content
- Profile hydration service
* Fetches profile records (displayName, description)
* Resolves handles via getProfile
* Links DID to handle
- Asynchronous hydration queue
* Deduplicates tasks
* Prevents concurrent processing of same resource
* FIFO ordering
- Automatic URI parsing and routing
* at:// URIs with 3 parts → post hydration
* did: URIs → profile hydration
Integration:
- Labels trigger hydration on receipt
- Queue processes tasks asynchronously
- Both services authenticate with Bluesky on startup
Tests: 4 new queue tests (27 total, all passing)
All Phase 3 deliverables complete.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implements complete firehose subscription pipeline:
- WebSocket subscriber with exponential backoff reconnection
- DAG-CBOR decoding for label events
- Label filtering with configurable allow-list
- Cursor persistence for resume capability (cursor.txt)
- Integration with database repositories
- Graceful error handling and logging
Key features:
- Automatic reconnection with 1s-30s backoff
- Filter labels via CAPTURE_LABELS env var
- Stores cursor to resume from last processed event
- Validates all label events before processing
- Complete unit test coverage (14 new tests)
All Phase 2 deliverables complete and tested (23 tests passing).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implements the foundation for Skywatch Tail:
- Docker setup with docker-compose for containerized deployment
- Type-safe configuration using Zod with .env validation
- Pino logger with pretty printing for development
- DuckDB database with complete schema:
* labels table with auto-incrementing sequence
* posts table for hydrated post data
* profiles table for user account data
* blobs table for image/video metadata
- Repository pattern for all database entities
- Integration tests for all repositories (100% passing)
All Phase 1 deliverables complete and tested.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>