Skywatch-Phash: Complete Architecture & Implementation Guide#
Overview#
skywatch-phash is a real-time perceptual hash-based image moderation service for Bluesky/ATProto. It:
- Subscribes to the Bluesky firehose (Jetstream) in real-time
- Extracts images from posts and computes perceptual hashes (phash)
- Compares against known harmful image hashes using Hamming distance
- Automatically applies moderation labels and reports to Bluesky Ozone moderators
Tech Stack: Bun/TypeScript, Redis, Jetstream, ATProto API, Sharp (image processing)
Architecture Overview#
┌─────────────────────────────────────────────────────────────┐
│ Main Application │
│ (main.ts - Jetstream event handler + orchestrator) │
└─────────────────┬───────────────────────────────────────────┘
│
┌─────────┼─────────┐
│ │ │
┌────▼──┐ ┌──▼────┐ ┌──▼─────────┐
│Redis │ │ Jet- │ │ Moderation │
│Queue │ │stream │ │ Agent │
└────┬──┘ └───┬───┘ └──┬─────────┘
│ │ │
└─────────┼────────┘
│
┌─────────▼──────────────┐
│ Queue Worker │
│ (concurrency control) │
└─────────┬──────────────┘
│
┌─────────▼──────────────────────────┐
│ Image Processor │
│ (phash + matching logic) │
├─────────────────────────────────────┤
│ ▪ Phash Computation (Sharp) │
│ ▪ Hamming Distance Matching │
│ ▪ Redis Phash Cache │
│ ▪ Moderation Claims Deduplication │
└────────────────────────────────────┘
1. Entry Point & Main Event Loop#
File: src/main.ts#
Responsibilities:
- Jetstream connection management
- Cursor persistence (recovery from crashes)
- Queue/worker initialization
- Moderation action dispatch
Key Flow:
// 1. Load cursor from disk (enables resuming from last processed event)
cursor = fs.readFileSync("cursor.txt") || Math.floor(Date.now() * 1000)
// 2. Connect to Jetstream
jetstream = new Jetstream({
endpoint: "wss://jetstream1.us-east.fire.hose.cam/subscribe",
cursor,
wantedCollections: ["app.bsky.feed.post"]
})
// 3. Register handler for new posts
jetstream.onCreate("app.bsky.feed.post", async (event) => {
// Extract image blobs (CIDs) from post
const blobs = extractBlobsFromEvent(event)
// Enqueue for processing
const job = { postUri, postCid, postDid, blobs, timestamp, attempts: 0 }
await queue.enqueue(job)
})
// 4. Start processing
await worker.start()
jetstream.start()
// 5. On match found, execute moderation actions
worker.onMatchFound((postUri, postCid, postDid, match) => {
if (match.matchedCheck.toLabel)
await createPostLabel(...)
if (match.matchedCheck.reportPost)
await createPostReport(...)
if (match.matchedCheck.labelAcct)
await createAccountLabel(...)
if (match.matchedCheck.reportAcct)
await createAccountReport(...)
})
Cursor Handling (CRITICAL):
- Cursor is microsecond timestamp (µs, not ms) from Jetstream
- Saved every 10 seconds (configurable via
CURSOR_UPDATE_INTERVAL) - On restart, reads from
cursor.txtto resume from last processed event - This prevents duplicate processing of same posts
Graceful Shutdown:
- Saves cursor before exit
- Waits for active jobs to complete
- Closes all connections
2. Jetstream Connection & Event Processing#
Event Structure#
Posts with embedded images trigger:
CommitCreateEvent<"app.bsky.feed.post"> {
did: string // Author DID
commit: {
cid: string // Post CID
collection: "app.bsky.feed.post"
rkey: string // Post key
record: {
embed: {
images?: [{image: {ref: {$link: CID}, mimeType}}]
media?: {images: [...]}
}
}
}
}
Image Extraction Logic:
- Handles both
embed.imagesandembed.media.images(both are valid) - Filters out SVG and non-image types
- Extracts CID from nested
ref.$linkfield
3. Perceptual Hash Algorithm (CRITICAL - Exact Implementation)#
File: src/hasher/phash.ts#
async function computePerceptualHash(buffer: Buffer): Promise<string> {
// Step 1: Decode image via Sharp
const image = sharp(buffer)
const metadata = await image.metadata()
// Step 2: Resize to 8x8 grayscale
// CRITICAL: fit: "fill" preserves aspect ratio, may add padding
const resized = await image
.resize(8, 8, { fit: "fill" })
.grayscale()
.raw()
.toBuffer()
// Step 3: Extract pixel values (Uint8Array, 0-255 range)
const pixels = new Uint8Array(resized)
// Step 4: Compute average brightness
const avg = pixels.reduce((sum, val) => sum + val, 0) / pixels.length
// Step 5: Create 64-bit hash (8x8 = 64 pixels)
let hash = ""
for (let i = 0; i < pixels.length; i++) {
hash += pixels[i] > avg ? "1" : "0"
}
// Step 6: Convert binary string to hex (16 character string)
return BigInt(`0b${hash}`).toString(16).padStart(16, "0")
}
Output Format:
- Exactly 16 hex characters (64 bits for 8x8 image)
- Lowercase
- Zero-padded
Example: "a1b2c3d4e5f6a7b8"
Important Details:
-
Sharp resize behavior:
fit: "fill"= no cropping, may add padding to preserve aspect ratio- This is intentional - handles images of any aspect ratio
- Padding typically adds uniform brightness which affects hash slightly
-
Grayscale conversion:
- Sharp converts RGB to single channel (standard luminosity formula)
- Range: 0-255
-
No normalization:
- Raw pixel values compared to mean (not normalized)
- This is correct for perceptual hashing
-
Deterministic:
- Same image buffer always produces same hash
- Different images (even slight variations) can produce different hashes
- BUT: Can match within Hamming distance threshold
4. Hamming Distance Matching#
File: src/matcher/hamming.ts#
function hammingDistance(hash1: string, hash2: string): number {
// Convert hex to BigInt
const a = BigInt(`0x${hash1}`)
const b = BigInt(`0x${hash2}`)
// XOR finds differing bits
const xor = a ^ b
// Count set bits (Brian Kernighan's algorithm)
let count = 0
let n = xor
while (n > 0n) {
count++
n &= n - 1n // Remove rightmost set bit
}
return count
}
function findMatch(phash: string, checks: BlobCheck[]): BlobCheck | null {
for (const check of checks) {
const threshold = check.hammingThreshold ?? 5
for (const checkPhash of check.phashes) {
const distance = hammingDistance(phash, checkPhash)
if (distance <= threshold) {
return check // First match wins
}
}
}
return null
}
Key Semantics:
- Hamming distance = number of differing bits out of 64
- Range: 0-64
- Threshold comparison:
distance <= threshold(inclusive)
Threshold Guidelines (from README):
0= Exact match only (very strict)1-2= Nearly identical (minor compression artifacts)3-4= Very similar (slight edits, crops)5-8= Similar (moderate edits)10+= Loosely similar (too permissive)
Default threshold: 5 (configurable per check or globally)
5. Queue & Worker Implementation#
File: src/queue/redis-queue.ts#
Redis Keys:
phash:queue:pending → List (FIFO for new jobs)
phash:queue:processing → List (jobs being worked on)
phash:queue:failed → List (jobs that exceeded retries)
State Transitions:
Pending → Pop (LPOP) → Processing (RPUSH) → Complete (LREM)
└─→ Retry (RPUSH to Pending)
└─→ Failed (RPUSH to Failed)
Job Structure:
interface ImageJob {
postUri: string // "at://did/app.bsky.feed.post/rkey"
postCid: string // CID of the post
postDid: string // Author DID
blobs: BlobReference[] // [{cid, mimeType?}]
timestamp: number // When job was created
attempts: number // Retry counter
}
File: src/queue/worker.ts#
Worker Configuration:
interface WorkerConfig {
concurrency: number // Number of parallel workers (default: 10)
retryAttempts: number // Max retries (default: 3)
retryDelay: number // MS between retries (default: 1000)
pollInterval?: number // MS to wait if queue empty (default: 1000)
}
Processing Loop (per worker thread):
1. Dequeue job
2. For each blob in job:
a. Check cache
b. If miss, fetch blob from PDS
c. Compute phash
d. Store in cache
e. Match against checks
f. If match, emit onMatchFound event
3. Mark job complete
4. If error and retries < max:
- Re-enqueue with incremented attempts
5. If error and retries exhausted:
- Move to failed queue
Error Handling:
- Corrupt/invalid images → logged at debug level (expected)
- Network errors → retry (with backoff)
- Moderation action failures → logged but don't block further processing
Concurrency Strategy:
- N workers running in parallel (default 10)
- Each worker independently polls queue
- Active job count tracked
- Graceful shutdown waits for all jobs
6. Image Processing Pipeline#
File: src/processor/image-processor.ts#
High-Level Flow:
1. DID/Check filtering (ignoreDID)
2. Cache lookup → if hit, use cached phash
3. Cache miss:
a. Resolve PDS endpoint from DID document (PLCy)
b. Check if repo has been taken down
c. Fetch blob from PDS
d. Compute phash
e. Cache result
4. Match against checks
5. Return MatchResult (if match found)
Key Implementation Details:
DID → PDS Resolution:
resolvePds(did: string) {
// GET https://plc.directory/{did}
// Extract service with id="atproto_pds" and type="AtprotoPersonalDataServer"
// Return serviceEndpoint URL
// Cached per DID to avoid repeated lookups
}
Blob Fetch:
// GET {pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}
// Returns raw blob data as Buffer
// Includes redirect: 'follow' to handle PDS redirects
Taken-Down Repo Check:
checkRepoTakendown(did: string, pdsEndpoint: string) {
// GET {pdsEndpoint}/xrpc/com.atproto.repo.describeRepo?repo={did}
// If error === "RepoTakendown", skip processing
// Cached per DID to avoid repeated checks
}
MatchResult Creation:
createMatchResult(phash: string, check: BlobCheck): MatchResult | null {
for (const checkPhash of check.phashes) {
const distance = hammingDistance(phash, checkPhash)
const threshold = check.hammingThreshold ?? defaultThreshold
if (distance <= threshold) {
return {
phash,
matchedCheck: check,
matchedPhash: checkPhash,
hammingDistance: distance
}
}
}
return null
}
Caches (in-memory, in ImageProcessor):
pdsCache: Map<did, pdsEndpoint|null> // PDS resolution
repoTakendownCache: Map<did, boolean> // Taken-down status
These are NOT persisted - they're per-process memory optimization.
7. Redis Cache Structure#
File: src/cache/phash-cache.ts#
Purpose: Cache computed phashes to avoid re-fetching viral images
Key Pattern: phash:cache:{cid} → hex string (phash)
TTL: Configurable (default: 86400 seconds = 24 hours)
Operations:
get(cid)→ returns cached phash or nullset(cid, phash)→ stores with TTLdelete(cid)→ immediate removalclear()→ remove all cache entriesgetStats()→ returns cache size
Cache Hit Rate:
- Metrics track cache hits vs misses
- Typical: 20-40% hit rate (viral images)
8. Moderation Claims (Deduplication)#
File: src/cache/moderation-claims.ts#
Purpose: Prevent duplicate moderation actions (labels/reports) within 7 days
Key Patterns:
claim:post:label:{uri}:{label} → "1" (TTL: 7 days)
claim:account:label:{did}:{label} → "1" (TTL: 7 days)
claim:account:comment:{did}:{uri} → "1" (TTL: 7 days)
TTL: 604800 seconds = 7 days (hardcoded, not configurable)
Operations (all use SET ... NX):
tryClaimPostLabel(uri, label)→ tries to acquire claim, returns booleantryClaimAccountLabel(did, label)→ tries to acquire claim, returns booleantryClaimAccountComment(did, uri)→ tries to acquire claim, returns boolean
Flow in Moderation Actions:
// Before creating post label
const claimed = await claims.tryClaimPostLabel(uri, label)
if (!claimed) {
// Already labeled in past 7 days
metrics?.increment("labelsCached")
return
}
// Then check if label already exists in Ozone (race condition safety)
const hasLabel = await checkRecordLabels(uri, label)
if (hasLabel) {
metrics?.increment("labelsCached")
return
}
// Otherwise, create label via Ozone API
Why both checks?
- Redis claim = "we acted" (fast, distributed)
- Ozone check = "label still exists" (authoritative)
- This handles race conditions if multiple services label simultaneously
9. Moderation Action Flows#
File: src/moderation/post.ts#
createPostLabel:
// 1. Try claim → if fail, exit (already labeled recently)
// 2. Check if label exists → if yes, exit
// 3. Emit event via Ozone API:
// - $type: "tools.ozone.moderation.defs#modEventLabel"
// - subject: {uri, cid} (strongRef)
// - createLabelVals: [label]
// - comment: "{timestamp}: {comment} at {uri} with phash \"{phash}\""
// 4. Rate-limited via limit() function
createPostReport:
// Same flow but:
// - $type: "tools.ozone.moderation.defs#modEventReport"
// - reportType: "com.atproto.moderation.defs#reasonOther"
// - No claim check needed (reports can repeat)
File: src/moderation/account.ts#
createAccountLabel:
// Similar to post label but:
// - subject: {$type: "com.atproto.admin.defs#repoRef", did}
// - Claims work per (did, label) pair
createAccountReport:
// Similar to post report but:
// - subject: {$type: "com.atproto.admin.defs#repoRef", did}
createAccountComment:
// Comments on account (metadata only, no label)
// Claims prevent duplicate comments for same (did, uri) pair
API Headers (All Moderation Calls):
{
encoding: "application/json",
headers: {
"atproto-proxy": `${modDid}#atproto_labeler`,
"atproto-accept-labelers": "did:plc:ar7c4by46qjdydhdevvrndac;redact"
}
}
These headers:
- Tell Ozone to proxy request through moderation account
- Request redaction of sensitive PII
- Are critical for proper audit trails
10. Rate Limiting#
File: src/limits.ts#
Two-Level Strategy:
Level 1: Concurrency Control
- p-ratelimit library
- Limits to 24 concurrent requests
- Prevents overwhelming Ozone API with simultaneous requests
Level 2: Header-Based Rate Limit Tracking
- Ozone API returns headers:
RateLimit-Limit,RateLimit-Remaining,RateLimit-Reset - Service maintains state and waits if critically low
- Safety buffer: 5 requests reserved
- Only waits if
remaining <= 5
Dynamic Rate Limit State:
interface RateLimitState {
limit: number // e.g., 280 requests per window
remaining: number // Current budget
reset: number // Unix timestamp when resets
}
How moderation actions use limits:
await limit(async () => {
// This will:
// 1. Check concurrency (max 24 concurrent)
// 2. Check remaining budget (wait if <5 remaining)
// 3. Execute request
// 4. Parse RateLimit headers and update state
return await agent.tools.ozone.moderation.emitEvent(...)
})
Prometheus Metrics:
rate_limit_waits_total→ how many times we waitedrate_limit_wait_duration_seconds→ how long waits tookconcurrent_requests→ live count of concurrent calls
11. Agent & Authentication#
File: src/agent.ts#
Session Management:
// Session file: .session (chmod 600, NOT in git)
// Contains: {accessJwt, refreshJwt, did, handle, email, ...}
// Login flow:
1. Try load session from .session file
2. If loaded, try resume session (verify with getProfile call)
3. If resume fails or no saved session, perform fresh login
4. Save session after successful login
// Token refresh:
- JWT lifetime: 2 hours
- Refresh scheduled at 80% of lifetime (~96 minutes)
- On refresh failure, fallback to fresh login with retries
Retry Strategy:
- MAX_LOGIN_RETRIES = 3
- RETRY_DELAY_MS = 2000
- If all retries fail, process exits with error
Undici Configuration:
const dispatcher = new Agent({
connect: { timeout: 20_000 },
keepAliveTimeout: 10_000,
keepAliveMaxTimeout: 20_000
})
setGlobalDispatcher(dispatcher)
This ensures robust HTTP handling for PDS/Ozone API calls.
12. Configuration Management#
File: src/config/index.ts#
All settings sourced from environment variables:
// REQUIRED
LABELER_DID // e.g., "did:plc:..."
LABELER_HANDLE // e.g., "labeler.bsky.social"
LABELER_PASSWORD // app password (NOT user password)
// Processing
JETSTREAM_URL // (default: wss://jetstream.atproto.tools/subscribe)
REDIS_URL // (default: redis://localhost:6379)
PROCESSING_CONCURRENCY // (default: 10)
RETRY_ATTEMPTS // (default: 3)
RETRY_DELAY_MS // (default: 1000)
CACHE_ENABLED // (default: true)
CACHE_TTL_SECONDS // (default: 86400)
// PDS/Ozone
PDS_ENDPOINT // (default: https://bsky.social)
OZONE_URL // (no default)
OZONE_PDS // (no default - used for agent.service)
MOD_DID // (no default - used in moderation headers)
// Phash
PHASH_HAMMING_THRESHOLD // (default: 3)
// Logging
LOG_LEVEL // (default: debug/info)
NODE_ENV // (defaults to production)
13. Metrics & Observability#
File: src/metrics/collector.ts#
Simple in-memory metrics:
class MetricsCollector {
counters: Map<string, number>
increment(metric: string, value?: number)
get(metric: string): number
getAll(): Record<string, number>
getWithRates(): Record<string, number|string>
}
Key Metrics Tracked:
postsProcessed // Jobs completed
blobsFetched // Blobs downloaded
blobsProcessed // Blobs hashed successfully
blobsCorrupted // Invalid images skipped
fetchErrors // Blob fetch failures
cacheHits // Phash cache hits
cacheMisses // Phash cache misses
matchesFound // Images matching known phashes
labelsApplied // Labels successfully created
labelsCached // Labels skipped (already claimed)
reportsCreated // Reports successfully created
Derived Rates (calculated on-demand):
postsPerSecond = postsProcessed / uptimeSeconds
blobsPerSecond = blobsProcessed / uptimeSeconds
cacheHitRate = cacheHits / (cacheHits + cacheMisses) * 100
matchRate = matchesFound / blobsProcessed * 100
Prometheus Integration:
- Rate limit metrics via prom-client
- Not exposed as metrics server (would need HTTP server)
Stats Logged:
- Every 60 seconds to logger
- Includes worker stats, cache stats, metrics with rates
14. Logging#
File: src/logger/index.ts#
Pino-based structured logging:
logger.debug(...) // Development details
logger.info(...) // Normal operation milestones
logger.warn(...) // Unexpected but recoverable
logger.error(...) // Errors (usually don't stop service)
Development Mode:
- Pretty-printed output
- Colors, timestamps, field names
- Ignores pid/hostname
Production Mode:
- JSON logs (newline-delimited)
- Parse with jq or log aggregators
- Structured for monitoring
Log Levels:
processfield added manually to track sub-components- Examples:
{process: "MAIN"},{process: "MODERATION"}
15. Rules Configuration#
File: rules/blobs.ts#
BlobCheck structure:
interface BlobCheck {
phashes: string[] // List of known bad phashes
label: string // Label to apply (e.g., "troll")
comment: string // Description of harm
reportAcct: boolean // Report the account?
labelAcct: boolean // Label the account?
reportPost: boolean // Report the post?
toLabel: boolean // Label the post?
hammingThreshold?: number // Per-check threshold (overrides default)
description?: string // Internal documentation
ignoreDID?: string[] // Exempt accounts
}
Multiple checks evaluated in order:
- First match wins (no combining of multiple checks)
- Each check can independently configure moderation actions
Example:
{
phashes: ["e0e0e0e0e0fcfefe", "9b9e00008f8fffff"],
label: "harassment-image",
comment: "Known harassment image",
toLabel: true, // Label post
reportPost: false, // Don't report post
labelAcct: true, // Label account
reportAcct: false, // Don't report account
hammingThreshold: 3, // Strict threshold
ignoreDID: ["did:plc:trusted-account"] // Don't check this account
}
16. Critical Gotchas & Nuances for Rust Rewrite#
1. Phash Algorithm Details#
- Output MUST be exactly 16 hex chars (64-bit)
- Lowercase
- Zero-padded
- Grayscale conversion via luminosity formula (not simple average)
- 8x8 resize with aspect-ratio preservation
2. Hamming Distance#
- Uses Brian Kernighan's bit-counting algorithm
- Range 0-64
- Threshold comparison is
distance <= threshold(INCLUSIVE)
3. Cursor Persistence#
- Is microsecond timestamp (NOT millisecond)
- Must persist every 10 seconds
- Resume from cursor prevents duplicate processing
- Write to
cursor.txtin working directory
4. Queue State Machine#
- Never lose jobs mid-processing
- Pending → Processing → Complete is atomic per job
- On crash, jobs in Processing list are reprocessed on restart
- Failed queue accumulates jobs beyond retry limit
5. Blob Fetch Strategy#
- Requires PDS resolution from PLCy first
- Many PDSes redirect blob endpoints
- Must follow redirects
- Check repo.takedown status before fetching
- Cache both PDS endpoint and takedown status (per process)
6. Moderation Claims Dedup#
- 7-day TTL (hardcoded in moderation-claims.ts)
- Uses Redis SET ... NX (atomic)
- Must still check Ozone API (race condition safety)
- Account claims are per (did, label) pair
7. Rate Limiting#
- TWO separate mechanisms (concurrency + header-based)
- Only waits when
remaining <= 5(safety buffer) - Ozone returns RateLimit headers
- Conservative defaults: 280 requests per 30-second window
- Must not block job processing (only moderation action calls)
8. Session Token Management#
- Tokens live ~2 hours
- Refresh at 80% of lifetime (~96 minutes)
- Session saved to
.sessionfile (must be chmod 600) - Resume session before fresh login
- Multiple login attempts = shared promise (singleton)
9. Image Mime Type Filtering#
- Skip SVG images
- Only process image/* types
- mimeType may be missing (optional field)
10. Error Handling Patterns#
- Corrupt images → debug log, skip (expected)
- Network errors → retry with backoff
- Moderation action failures → log but continue (don't block)
- Takden-down repos → skip silently
- Missing PDS → warn and skip blob
11. Config Environment Variables#
- No defaults for labeler credentials (must be provided)
- Ozone URL/PDS used differently (URL is separate from service endpoint)
- OZONE_PDS is the agent.service endpoint
- OZONE_URL is for display/configuration only
12. Redis Key Patterns#
phash:queue:pending → List
phash:queue:processing → List
phash:queue:failed → List
phash:cache:{cid} → String
claim:post:label:{uri}:{label} → String
claim:account:label:{did}:{label} → String
claim:account:comment:{did}:{uri} → String
13. Jetstream Event Structure#
- Only processes
app.bsky.feed.postcreates - Ignores updates/deletes
- Image extraction handles nested embed structures
- Blob CID in
ref.$linkfield
14. Metrics Naming Conventions#
- Snake_case for Prometheus metrics
- Counter suffixes:
_total - Gauge suffixes: none (just name)
- Histogram suffixes:
_seconds(for duration)
15. Graceful Shutdown#
- Saves cursor to disk
- Waits for active jobs (polling every 100ms)
- Closes all connections
- SIGINT/SIGTERM trigger shutdown
Architecture Strengths & Assumptions#
What Works Well:
- ✅ Decoupled Jetstream ingestion from processing (queue isolates)
- ✅ Per-job retry strategy with exponential backoff
- ✅ Cache hit rate for viral images reduces PDS load
- ✅ Redis claims prevent duplicate moderation actions
- ✅ Cursor persistence enables recovery
- ✅ Rate limiting respects Ozone API limits
Assumptions/Constraints:
- Redis is always available (no in-memory fallback)
- PDS/Ozone/PLCy endpoints are reachable
- Jetstream can resume from cursor
- Single-process (no distributed workers)
- In-memory caches (PDS/takdown) lost on restart
Typical Performance Characteristics#
VM Requirements (from README):
- Minimal: 2GB RAM, 2 vCPU, 10GB disk
- Recommended: 4GB RAM, 2-4 vCPU, 20GB disk
Throughput:
- Concurrency default: 10 workers
- Each worker can process ~1-5 images/second (depends on network)
- Total: 10-50 images/second throughput
- Cache hit rate: 20-40% (viral images)
Latency:
- Jetstream event → job enqueue: <100ms
- Job dequeue → phash computed: 200-500ms (depends on network/image size)
- Phash → moderation action: 100-200ms (rate-limited)
Testing Coverage#
Unit Tests Exist For:
- ✅ Phash algorithm (format, determinism, slight variations)
- ✅ Hamming distance (exact match, thresholds, edge cases)
- ✅ Cache hit/miss
- ✅ Queue operations
- ✅ Image processor logic
Integration Tests:
- ✅ Queue persistence across restart
- ✅ End-to-end job processing
No Tests For:
- Jetstream connection handling
- Moderation action API calls
- Session management
- Rate limiting
File Structure Summary#
src/
├── main.ts # Entry point, Jetstream orchestrator
├── types.ts # Core interfaces (BlobCheck, ImageJob, MatchResult)
├── agent.ts # ATProto agent, session management
├── session.ts # Session file persistence
├── limits.ts # Rate limiting (concurrency + header-based)
├── config/
│ └── index.ts # Environment variable parsing
├── logger/
│ └── index.ts # Pino structured logging
├── hasher/
│ └── phash.ts # Perceptual hash computation
├── matcher/
│ └── hamming.ts # Hamming distance, match finding
├── cache/
│ ├── phash-cache.ts # Redis phash cache
│ └── moderation-claims.ts # Redis deduplication claims
├── queue/
│ ├── redis-queue.ts # Redis job queue
│ └── worker.ts # Worker pool, job processing loop
├── processor/
│ └── image-processor.ts # Blob fetch, phash compute, matching
├── moderation/
│ ├── post.ts # Post label/report actions
│ └── account.ts # Account label/report/comment actions
└── metrics/
└── collector.ts # In-memory metrics
rules/
└── blobs.ts # BlobCheck definitions
tests/
├── unit/
│ ├── phash.test.ts
│ ├── hamming.test.ts
│ ├── phash-cache.test.ts
│ └── image-processor.test.ts
└── integration/
└── queue.test.ts
Entry Point Summary#
The entire system starts with:
- main.ts loads config, connects Redis, logs in to Ozone
- Jetstream connects and starts emitting post events
- Queue receives jobs (post + blob CIDs)
- Worker pool dequeues jobs and processes them in parallel
- ImageProcessor fetches blobs, computes phashes, finds matches
- Moderation actions (label/report) triggered on matches
- Metrics/Logging tracked throughout
- Graceful shutdown saves cursor and exits cleanly
This is a real-time, event-driven system - no polling, no batch processing. Every post on Bluesky is potentially seen and processed within seconds.