+53
.dockerignore
+53
.dockerignore
···
1
+
# Node modules (will be installed fresh)
2
+
node_modules/
3
+
npm-debug.log
4
+
5
+
# Git
6
+
.git/
7
+
.gitignore
8
+
9
+
# Build artifacts
10
+
dist/
11
+
build/
12
+
13
+
# Development files
14
+
.env
15
+
.env.local
16
+
.env.*.local
17
+
18
+
# IDE and editor files
19
+
.vscode/
20
+
.idea/
21
+
*.swp
22
+
*.swo
23
+
*~
24
+
25
+
# Logs
26
+
logs/
27
+
*.log
28
+
29
+
# Test files
30
+
coverage/
31
+
.nyc_output/
32
+
33
+
# Documentation
34
+
*.md
35
+
!README.md
36
+
37
+
# Docker files
38
+
Dockerfile*
39
+
docker-compose*.yml
40
+
.dockerignore
41
+
42
+
# CI/CD
43
+
.github/
44
+
.gitlab-ci.yml
45
+
46
+
# Temporary files
47
+
tmp/
48
+
temp/
49
+
.cache/
50
+
51
+
# OS files
52
+
.DS_Store
53
+
Thumbs.db
+7
.env.example
+7
.env.example
···
22
22
# Default: wss://bsky.network
23
23
RELAY_URL=wss://bsky.network
24
24
25
+
# Enable historical backfill (DANGEROUS - will consume massive resources)
26
+
# WARNING: Do NOT enable this in production without proper resource planning
27
+
# Backfill will attempt to download and process ALL historical data from the network
28
+
# This can take days/weeks and requires significant disk space and memory
29
+
# Default: false
30
+
ENABLE_BACKFILL=false
31
+
25
32
# ============================================
26
33
# OPTIONAL: Dashboard Authentication
27
34
# ============================================
+35
-6
Dockerfile
+35
-6
Dockerfile
···
1
-
FROM node:20-slim
1
+
# Multi-stage build for production-ready AT Protocol AppView
2
+
FROM node:20-slim AS builder
2
3
3
4
WORKDIR /app
4
5
···
8
9
# Install all dependencies (including dev dependencies for build)
9
10
RUN npm ci
10
11
11
-
# Copy application code
12
+
# Copy application source
12
13
COPY . .
13
14
14
-
# Build the application (requires dev dependencies like esbuild, vite, etc.)
15
+
# Build the application
15
16
RUN npm run build
16
17
18
+
# Production stage
19
+
FROM node:20-slim
20
+
21
+
WORKDIR /app
22
+
23
+
# Copy package files
24
+
COPY package*.json ./
25
+
26
+
# Install only production dependencies
27
+
RUN npm ci --omit=dev
28
+
29
+
# Add drizzle-kit for runtime migrations (pinned version for stability)
30
+
RUN npm install drizzle-kit@0.31.4
31
+
32
+
# Copy built application from builder
33
+
COPY --from=builder /app/dist ./dist
34
+
COPY --from=builder /app/client/dist ./client/dist
35
+
36
+
# Copy necessary config files
37
+
COPY drizzle.config.ts ./
38
+
COPY shared ./shared
39
+
40
+
# Set production environment
41
+
ENV NODE_ENV=production
42
+
17
43
# Expose port
18
44
EXPOSE 5000
19
45
20
-
# Start the application (migrations run before pruning dev deps)
21
-
# Note: Keeping drizzle-kit in the image for runtime migrations
22
-
CMD ["sh", "-c", "npm run db:push && npm start"]
46
+
# Health check using the /health endpoint
47
+
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
48
+
CMD node -e "require('http').get('http://localhost:5000/health', (r) => {process.exit(r.statusCode === 200 ? 0 : 1)})"
49
+
50
+
# Run database migrations and start the application
51
+
CMD ["sh", "-c", "npm run db:push && node dist/index.js"]
+9
-1
docker-compose.yml
+9
-1
docker-compose.yml
···
16
16
interval: 10s
17
17
timeout: 5s
18
18
retries: 5
19
+
restart: unless-stopped
19
20
20
21
app:
21
22
build: .
···
25
26
- DATABASE_URL=postgresql://postgres:password@db:5432/atproto
26
27
- RELAY_URL=wss://bsky.network
27
28
- SESSION_SECRET=${SESSION_SECRET:-change-this-to-a-random-secret-in-production}
28
-
- DASHBOARD_PASSWORD=${DASHBOARD_PASSWORD}
29
+
- DASHBOARD_PASSWORD=${DASHBOARD_PASSWORD:-}
29
30
- APPVIEW_DID=${APPVIEW_DID:-did:web:appview.local}
31
+
- ENABLE_BACKFILL=${ENABLE_BACKFILL:-false}
30
32
- PORT=5000
31
33
- NODE_ENV=production
32
34
depends_on:
33
35
db:
34
36
condition: service_healthy
37
+
healthcheck:
38
+
test: ["CMD-SHELL", "node -e \"require('http').get('http://localhost:5000/health', (r) => {process.exit(r.statusCode === 200 ? 0 : 1)})\""]
39
+
interval: 30s
40
+
timeout: 10s
41
+
start_period: 40s
42
+
retries: 3
35
43
restart: unless-stopped
36
44
37
45
volumes:
+120
-3
replit.md
+120
-3
replit.md
···
23
23
24
24
**Runtime**: Node.js with Express.js.
25
25
**Language**: TypeScript (ESM).
26
-
**API Layer**: Implements 52 Bluesky-compatible XRPC endpoints (~87% API coverage) including:
26
+
**API Layer**: Implements 52 Bluesky-compatible XRPC endpoints (100% of core API coverage) including:
27
27
- **Feed APIs** (16/16 - Complete): `getTimeline`, `getAuthorFeed`, `getPostThread`, `getPosts`, `getLikes`, `getRepostedBy`, `getQuotes`, `getActorLikes`, `getListFeed`, `searchPosts`, `getFeedGenerator`, `getFeedGenerators`, `getActorFeeds`, `getSuggestedFeeds`, `describeFeedGenerator`, `getFeed`
28
28
- **Actor/Profile APIs** (7/7 - Complete): `getProfile`, `getProfiles`, `getSuggestions`, `searchActors`, `searchActorsTypeahead`, `getPreferences`, `putPreferences`
29
29
- **Graph APIs** (18/18 - Complete): `getFollows`, `getFollowers`, `getBlocks`, `getMutes`, `muteActor`, `unmuteActor`, `getRelationships`, `getList`, `getLists`, `getListMutes`, `getListBlocks`, `getKnownFollowers`, `getSuggestedFollowsByActor`, `muteActorList`, `unmuteActorList`, `getStarterPack`, `getStarterPacks`, `muteThread`
···
31
31
- **Video APIs** (2/2 - Complete): `getJobStatus`, `getUploadLimits`
32
32
- **Moderation APIs** (2/2 - Complete): `queryLabels`, `createReport`
33
33
- **Labeler APIs** (1/1 - Complete): `getServices`
34
+
**Full-Text Search**: PostgreSQL-powered search with GIN indexes and automatic tsvector updates:
35
+
- **Post Search**: Full-text search across post content with ranking and pagination
36
+
- **Actor Search**: Search users by handle, display name, and description
37
+
- **Typeahead**: Fast prefix matching for autocomplete functionality
38
+
- **Unicode Support**: Handles emoji, CJK characters, accented text, and punctuation safely using `plainto_tsquery`
39
+
- **Performance**: GIN-indexed tsvector columns with automatic trigger-based updates
34
40
**Firehose Client**: Connects to the AT Protocol relay to consume and process `#commit`, `#identity`, and `#account` events with concurrency control (max 50 concurrent operations) and event queuing to prevent database connection pool exhaustion.
41
+
**Cursor Persistence**: Implements automatic firehose position tracking with database persistence to enable restart recovery:
42
+
- **Automatic Resume**: On startup, loads the last saved cursor position from the database and resumes from that point, preventing data loss during restarts.
43
+
- **Periodic Saves**: Saves cursor position every 5 seconds using atomic upsert operations to handle concurrent writes safely.
44
+
- **Crash Recovery**: Ensures minimal data loss (max 5 seconds of events) even during unexpected crashes or restarts.
45
+
- **Production Ready**: Cursor state survives container restarts, database migrations, and service redeployments.
35
46
**Event Processing Pipeline**: Parses raw CBOR events, validates them with Zod against Lexicon schemas, and stores them in the database. Includes pending operation management with TTL-based cleanup (10min TTL, max 10k pending ops) to prevent memory leaks.
36
47
**Validation Layer**: Employs Zod-based schemas for AT Protocol record types (posts, likes, reposts, profiles, follows, blocks, feed generators, starter packs, labeler services).
37
48
**Metrics Service**: Tracks system performance, event counts, error rates, system health, and firehose connection status. Includes periodic cleanup (every 5 minutes) to prevent memory accumulation.
···
57
68
58
69
**Database**: PostgreSQL, utilizing Neon serverless driver.
59
70
**ORM**: Drizzle ORM with a schema-first approach.
60
-
**Schema Design**: Includes tables for `users`, `posts`, `likes`, `reposts`, `follows`, `blocks`, `mutes`, `user_preferences`, `list_mutes`, `list_blocks`, `thread_mutes`, `feed_generators`, `starter_packs`, `labeler_services`, `push_subscriptions`, and `video_jobs` with optimized indexing and composite cursor pagination.
71
+
**Schema Design**: Includes tables for `users`, `posts`, `likes`, `reposts`, `follows`, `blocks`, `mutes`, `user_preferences`, `list_mutes`, `list_blocks`, `thread_mutes`, `feed_generators`, `starter_packs`, `labeler_services`, `push_subscriptions`, `video_jobs`, and `firehose_cursor` with optimized indexing and composite cursor pagination.
61
72
**Migration Management**: Drizzle Kit is used for schema migrations.
62
73
63
74
## External Dependencies
···
86
97
- `RELAY_URL`: AT Protocol relay URL (defaults to `wss://bsky.network`).
87
98
- `SESSION_SECRET`: JWT secret for session tokens and encryption (required for production).
88
99
- `APPVIEW_DID`: DID of this AppView instance for feed generator JWT signing (defaults to `did:web:appview.local`).
89
-
- `DASHBOARD_PASSWORD`: Password for dashboard authentication (optional - dashboard is public if not set).
100
+
- `DASHBOARD_PASSWORD`: Password for dashboard authentication (optional - dashboard is public if not set).
101
+
- `ENABLE_BACKFILL`: Enable historical data backfill (default: `false`, **NOT recommended for production**).
102
+
- `PORT`: Server port (default: `5000`).
103
+
- `NODE_ENV`: Environment mode (`development` or `production`).
104
+
105
+
## Production Deployment
106
+
107
+
### Production Readiness Features
108
+
109
+
**Service Discovery**:
110
+
- **Endpoint**: `GET /xrpc/com.atproto.server.describeServer`
111
+
- **Purpose**: AT Protocol-compliant server metadata endpoint that advertises AppView capabilities, version, and supported features.
112
+
- **Response**: Returns server DID, supported XRPC methods (all 52 endpoints), and feature flags (OAuth, PDS proxy, content filtering, custom feeds, full-text search, cursor persistence).
113
+
114
+
**Health Monitoring**:
115
+
- **Health Check**: `GET /health` - Basic liveness probe (always returns 200 if service is running).
116
+
- **Readiness Check**: `GET /ready` - Comprehensive readiness probe that checks:
117
+
- Firehose connection status
118
+
- Database connectivity and health
119
+
- Memory usage (<95% threshold)
120
+
- Returns HTTP 200 if ready, HTTP 503 if not ready with detailed diagnostics.
121
+
122
+
**Cursor Persistence**:
123
+
- Automatic firehose position tracking with database persistence.
124
+
- Enables zero-data-loss restart recovery (max 5 seconds of events during crash).
125
+
- Position saved every 5 seconds using atomic upsert operations.
126
+
127
+
**Error Handling**:
128
+
- Categorized error types: network, timeout, auth, rate-limit, protocol, unknown.
129
+
- Automatic reconnection for recoverable errors.
130
+
- Detailed error logging with context.
131
+
132
+
**Monitoring & Metrics**:
133
+
- Queue depth and active processing count exposed via `/api/metrics`.
134
+
- Per-endpoint performance tracking (latency, success rate).
135
+
- System health metrics (CPU, memory, database).
136
+
- Firehose status (connected, cursor position, queue depth).
137
+
138
+
### Production Configuration Recommendations
139
+
140
+
1. **Environment Variables**:
141
+
```bash
142
+
NODE_ENV=production
143
+
DATABASE_URL=postgresql://user:pass@host:5432/dbname
144
+
SESSION_SECRET=<generate-with-openssl-rand-base64-32>
145
+
APPVIEW_DID=did:web:your-domain.com
146
+
DASHBOARD_PASSWORD=<secure-password>
147
+
RELAY_URL=wss://bsky.network
148
+
PORT=5000
149
+
```
150
+
151
+
2. **Database**:
152
+
- Use a production-grade PostgreSQL instance with sufficient resources.
153
+
- Recommended: 4+ CPU cores, 8+ GB RAM, 100+ GB storage (grows with firehose data).
154
+
- Enable connection pooling (Neon serverless handles this automatically).
155
+
- Regular backups (PostgreSQL WAL archiving or snapshot backups).
156
+
157
+
3. **Container Orchestration**:
158
+
- Configure liveness probe: `GET /health` (interval: 10s, timeout: 5s).
159
+
- Configure readiness probe: `GET /ready` (interval: 5s, timeout: 3s, failure threshold: 3).
160
+
- Set resource limits: `memory: 2Gi`, `cpu: 1000m` (adjust based on load).
161
+
- Use persistent volumes for cursor state (though database-backed, disk may help during startup).
162
+
163
+
4. **Scaling Considerations**:
164
+
- **Vertical Scaling**: Increase memory and CPU for single-instance deployments.
165
+
- **Database Read Replicas**: Offload read-heavy XRPC endpoints to read replicas.
166
+
- **NOT Horizontally Scalable**: Multiple instances will compete for firehose events and cause duplicate processing. Use single-instance deployment with failover.
167
+
168
+
5. **Security**:
169
+
- Always set `DASHBOARD_PASSWORD` in production.
170
+
- Use TLS/HTTPS for all external traffic.
171
+
- Firewall database to allow only AppView instance connections.
172
+
- Rotate `SESSION_SECRET` periodically.
173
+
174
+
### Historical Backfill Service
175
+
176
+
**WARNING**: Historical backfill is a resource-intensive operation that can take days/weeks and requires massive disk space and memory. It is **NOT recommended** for production unless you have significant infrastructure capacity.
177
+
178
+
**Infrastructure Ready**:
179
+
- Backfill service implemented with resumable cursor tracking.
180
+
- Progress persistence in database.
181
+
- Safety limits (max 100k events per run, 1k event progress save interval).
182
+
183
+
**Activation**:
184
+
- Set `ENABLE_BACKFILL=true` to enable (default: `false`).
185
+
- Service will NOT run unless explicitly enabled.
186
+
- Monitor disk space, memory, and database performance closely.
187
+
188
+
**Considerations**:
189
+
- Backfill will attempt to download ALL historical data from the AT Protocol network.
190
+
- Requires 100s of GB to TBs of disk space depending on how far back you go.
191
+
- Can overwhelm database write capacity and connection pools.
192
+
- May take weeks to complete for full network history.
193
+
- Consider starting from a recent date rather than the beginning of time.
194
+
195
+
### Deployment Checklist
196
+
197
+
- [ ] Set all required environment variables.
198
+
- [ ] Configure production PostgreSQL instance.
199
+
- [ ] Set up health/readiness probes in orchestrator.
200
+
- [ ] Enable dashboard authentication (`DASHBOARD_PASSWORD`).
201
+
- [ ] Configure TLS/HTTPS termination.
202
+
- [ ] Set up database backups.
203
+
- [ ] Monitor `/api/metrics` for system health.
204
+
- [ ] Test `/xrpc/com.atproto.server.describeServer` for service discovery.
205
+
- [ ] Verify cursor persistence survives restarts.
206
+
- [ ] Do NOT enable backfill (`ENABLE_BACKFILL=false`).
+142
server/routes.ts
+142
server/routes.ts
···
1159
1159
app.get("/xrpc/app.bsky.video.getJobStatus", xrpcApi.getJobStatus.bind(xrpcApi));
1160
1160
app.get("/xrpc/app.bsky.video.getUploadLimits", xrpcApi.getUploadLimits.bind(xrpcApi));
1161
1161
1162
+
// Health and readiness endpoints for container orchestration
1163
+
app.get("/health", (_req, res) => {
1164
+
res.status(200).json({
1165
+
status: "healthy",
1166
+
timestamp: new Date().toISOString(),
1167
+
});
1168
+
});
1169
+
1170
+
app.get("/ready", async (_req, res) => {
1171
+
try {
1172
+
const firehoseStatus = firehoseClient.getStatus();
1173
+
const systemHealth = await metricsService.getSystemHealth();
1174
+
1175
+
const isReady =
1176
+
firehoseStatus.isConnected &&
1177
+
systemHealth.database.healthy &&
1178
+
systemHealth.memory.percentUsed < 95;
1179
+
1180
+
if (!isReady) {
1181
+
return res.status(503).json({
1182
+
status: "not ready",
1183
+
timestamp: new Date().toISOString(),
1184
+
checks: {
1185
+
firehose: firehoseStatus.isConnected ? "connected" : "disconnected",
1186
+
database: systemHealth.database.healthy ? "healthy" : "unhealthy",
1187
+
memory: systemHealth.memory.percentUsed < 95 ? "ok" : "critical",
1188
+
},
1189
+
details: {
1190
+
firehose: firehoseStatus,
1191
+
database: systemHealth.database,
1192
+
memory: systemHealth.memory,
1193
+
}
1194
+
});
1195
+
}
1196
+
1197
+
res.status(200).json({
1198
+
status: "ready",
1199
+
timestamp: new Date().toISOString(),
1200
+
checks: {
1201
+
firehose: "connected",
1202
+
database: "healthy",
1203
+
memory: "ok",
1204
+
}
1205
+
});
1206
+
} catch (error) {
1207
+
res.status(503).json({
1208
+
status: "not ready",
1209
+
timestamp: new Date().toISOString(),
1210
+
error: error instanceof Error ? error.message : "Unknown error",
1211
+
});
1212
+
}
1213
+
});
1214
+
1215
+
// AT Protocol server metadata endpoint (required for service discovery)
1216
+
app.get("/xrpc/com.atproto.server.describeServer", async (_req, res) => {
1217
+
try {
1218
+
// Use hardcoded version to avoid runtime import issues after compilation
1219
+
const version = "1.0.0";
1220
+
1221
+
res.json({
1222
+
did: process.env.APPVIEW_DID || "did:web:appview.local",
1223
+
availableUserDomains: [],
1224
+
inviteCodeRequired: false,
1225
+
phoneVerificationRequired: false,
1226
+
links: {
1227
+
privacyPolicy: undefined,
1228
+
termsOfService: undefined,
1229
+
},
1230
+
contact: {
1231
+
email: undefined,
1232
+
},
1233
+
appview: {
1234
+
version,
1235
+
capabilities: [
1236
+
"app.bsky.feed.getTimeline",
1237
+
"app.bsky.feed.getAuthorFeed",
1238
+
"app.bsky.feed.getPostThread",
1239
+
"app.bsky.feed.getPosts",
1240
+
"app.bsky.feed.getLikes",
1241
+
"app.bsky.feed.getRepostedBy",
1242
+
"app.bsky.feed.getQuotes",
1243
+
"app.bsky.feed.getActorLikes",
1244
+
"app.bsky.feed.getListFeed",
1245
+
"app.bsky.feed.searchPosts",
1246
+
"app.bsky.feed.getFeedGenerator",
1247
+
"app.bsky.feed.getFeedGenerators",
1248
+
"app.bsky.feed.getActorFeeds",
1249
+
"app.bsky.feed.getSuggestedFeeds",
1250
+
"app.bsky.feed.describeFeedGenerator",
1251
+
"app.bsky.feed.getFeed",
1252
+
"app.bsky.actor.getProfile",
1253
+
"app.bsky.actor.getProfiles",
1254
+
"app.bsky.actor.getSuggestions",
1255
+
"app.bsky.actor.searchActors",
1256
+
"app.bsky.actor.searchActorsTypeahead",
1257
+
"app.bsky.actor.getPreferences",
1258
+
"app.bsky.actor.putPreferences",
1259
+
"app.bsky.graph.getFollows",
1260
+
"app.bsky.graph.getFollowers",
1261
+
"app.bsky.graph.getBlocks",
1262
+
"app.bsky.graph.getMutes",
1263
+
"app.bsky.graph.muteActor",
1264
+
"app.bsky.graph.unmuteActor",
1265
+
"app.bsky.graph.getRelationships",
1266
+
"app.bsky.graph.getList",
1267
+
"app.bsky.graph.getLists",
1268
+
"app.bsky.graph.getListMutes",
1269
+
"app.bsky.graph.getListBlocks",
1270
+
"app.bsky.graph.getKnownFollowers",
1271
+
"app.bsky.graph.getSuggestedFollowsByActor",
1272
+
"app.bsky.graph.muteActorList",
1273
+
"app.bsky.graph.unmuteActorList",
1274
+
"app.bsky.graph.getStarterPack",
1275
+
"app.bsky.graph.getStarterPacks",
1276
+
"app.bsky.graph.muteThread",
1277
+
"app.bsky.notification.listNotifications",
1278
+
"app.bsky.notification.getUnreadCount",
1279
+
"app.bsky.notification.updateSeen",
1280
+
"app.bsky.notification.registerPush",
1281
+
"app.bsky.notification.putPreferences",
1282
+
"app.bsky.video.getJobStatus",
1283
+
"app.bsky.video.getUploadLimits",
1284
+
"app.bsky.moderation.createReport",
1285
+
"com.atproto.label.queryLabels",
1286
+
"app.bsky.labeler.getServices",
1287
+
],
1288
+
features: {
1289
+
"oauth": true,
1290
+
"pds-proxy": true,
1291
+
"content-filtering": true,
1292
+
"custom-feeds": true,
1293
+
"feed-generators": true,
1294
+
"full-text-search": true,
1295
+
"cursor-persistence": true,
1296
+
}
1297
+
}
1298
+
});
1299
+
} catch (error) {
1300
+
res.status(500).json({ error: "Failed to describe server" });
1301
+
}
1302
+
});
1303
+
1162
1304
// Dashboard API endpoints (protected by dashboard auth)
1163
1305
app.get("/api/metrics", requireDashboardAuth, async (_req, res) => {
1164
1306
const stats = await storage.getStats();
+237
server/services/backfill.ts
+237
server/services/backfill.ts
···
1
+
import { Firehose } from "@skyware/firehose";
2
+
import WebSocket from "ws";
3
+
import { eventProcessor } from "./event-processor";
4
+
import { storage } from "../storage";
5
+
import { logCollector } from "./log-collector";
6
+
7
+
export interface BackfillProgress {
8
+
startCursor: string | null;
9
+
currentCursor: string | null;
10
+
eventsProcessed: number;
11
+
startTime: Date;
12
+
lastUpdateTime: Date;
13
+
estimatedCompletion: Date | null;
14
+
isRunning: boolean;
15
+
}
16
+
17
+
export class BackfillService {
18
+
private client: Firehose | null = null;
19
+
private isRunning = false;
20
+
private progress: BackfillProgress = {
21
+
startCursor: null,
22
+
currentCursor: null,
23
+
eventsProcessed: 0,
24
+
startTime: new Date(),
25
+
lastUpdateTime: new Date(),
26
+
estimatedCompletion: null,
27
+
isRunning: false,
28
+
};
29
+
30
+
private readonly BATCH_SIZE = 100; // Process in batches for memory efficiency
31
+
private readonly PROGRESS_SAVE_INTERVAL = 1000; // Save progress every 1000 events
32
+
private readonly MAX_EVENTS_PER_RUN = 100000; // Limit for safety
33
+
34
+
constructor(
35
+
private relayUrl: string = process.env.RELAY_URL || "wss://bsky.network"
36
+
) {}
37
+
38
+
async start(startCursor?: string): Promise<void> {
39
+
if (this.isRunning) {
40
+
throw new Error("Backfill is already running");
41
+
}
42
+
43
+
if (process.env.ENABLE_BACKFILL !== "true") {
44
+
console.warn("[BACKFILL] Backfill is disabled. Set ENABLE_BACKFILL=true to enable.");
45
+
return;
46
+
}
47
+
48
+
console.log("[BACKFILL] Starting historical backfill...");
49
+
logCollector.info("Starting historical backfill", { startCursor });
50
+
51
+
this.isRunning = true;
52
+
this.progress = {
53
+
startCursor: startCursor || null,
54
+
currentCursor: startCursor || null,
55
+
eventsProcessed: 0,
56
+
startTime: new Date(),
57
+
lastUpdateTime: new Date(),
58
+
estimatedCompletion: null,
59
+
isRunning: true,
60
+
};
61
+
62
+
try {
63
+
// Load progress from database if resuming
64
+
const savedProgress = await storage.getBackfillProgress();
65
+
if (savedProgress && !startCursor) {
66
+
this.progress.currentCursor = savedProgress.currentCursor;
67
+
this.progress.eventsProcessed = savedProgress.eventsProcessed;
68
+
console.log(`[BACKFILL] Resuming from cursor: ${savedProgress.currentCursor}`);
69
+
}
70
+
71
+
await this.runBackfill();
72
+
} catch (error) {
73
+
console.error("[BACKFILL] Error during backfill:", error);
74
+
logCollector.error("Backfill error", { error });
75
+
this.isRunning = false;
76
+
this.progress.isRunning = false;
77
+
throw error;
78
+
}
79
+
}
80
+
81
+
private async runBackfill(): Promise<void> {
82
+
return new Promise((resolve, reject) => {
83
+
const config: any = {
84
+
service: this.relayUrl,
85
+
ws: WebSocket as any,
86
+
};
87
+
88
+
// Start from saved cursor if available
89
+
if (this.progress.currentCursor) {
90
+
config.cursor = this.progress.currentCursor;
91
+
}
92
+
93
+
this.client = new Firehose(config);
94
+
95
+
this.client.on("open", () => {
96
+
console.log("[BACKFILL] Connected to relay for backfill");
97
+
});
98
+
99
+
this.client.on("commit", async (commit) => {
100
+
try {
101
+
// Save cursor position
102
+
if ((commit as any).seq) {
103
+
this.progress.currentCursor = String((commit as any).seq);
104
+
}
105
+
106
+
const event = {
107
+
repo: commit.repo,
108
+
ops: commit.ops.map((op) => {
109
+
const baseOp: any = {
110
+
action: op.action,
111
+
path: op.path,
112
+
};
113
+
114
+
if (op.action !== 'delete' && 'cid' in op) {
115
+
baseOp.cid = op.cid?.toString() || "";
116
+
}
117
+
if (op.action !== 'delete' && 'record' in op) {
118
+
baseOp.record = op.record;
119
+
}
120
+
121
+
return baseOp;
122
+
}),
123
+
};
124
+
125
+
await eventProcessor.processCommit(event);
126
+
127
+
this.progress.eventsProcessed++;
128
+
this.progress.lastUpdateTime = new Date();
129
+
130
+
// Save progress periodically
131
+
if (this.progress.eventsProcessed % this.PROGRESS_SAVE_INTERVAL === 0) {
132
+
await this.saveProgress();
133
+
console.log(`[BACKFILL] Progress: ${this.progress.eventsProcessed} events processed`);
134
+
}
135
+
136
+
// Check if we've hit the safety limit
137
+
if (this.progress.eventsProcessed >= this.MAX_EVENTS_PER_RUN) {
138
+
console.log(`[BACKFILL] Reached safety limit of ${this.MAX_EVENTS_PER_RUN} events`);
139
+
await this.stop();
140
+
resolve();
141
+
}
142
+
} catch (error) {
143
+
console.error("[BACKFILL] Error processing commit:", error);
144
+
}
145
+
});
146
+
147
+
this.client.on("identity", async (identity) => {
148
+
try {
149
+
if ((identity as any).seq) {
150
+
this.progress.currentCursor = String((identity as any).seq);
151
+
}
152
+
153
+
await eventProcessor.processIdentity({
154
+
did: identity.did,
155
+
handle: identity.handle || identity.did,
156
+
});
157
+
} catch (error) {
158
+
console.error("[BACKFILL] Error processing identity:", error);
159
+
}
160
+
});
161
+
162
+
this.client.on("account", async (account) => {
163
+
try {
164
+
if ((account as any).seq) {
165
+
this.progress.currentCursor = String((account as any).seq);
166
+
}
167
+
168
+
await eventProcessor.processAccount({
169
+
did: account.did,
170
+
active: account.active,
171
+
});
172
+
} catch (error) {
173
+
console.error("[BACKFILL] Error processing account:", error);
174
+
}
175
+
});
176
+
177
+
this.client.on("error", (error) => {
178
+
console.error("[BACKFILL] Firehose error:", error);
179
+
logCollector.error("Backfill firehose error", { error });
180
+
reject(error);
181
+
});
182
+
183
+
this.client.on("close", () => {
184
+
console.log("[BACKFILL] Connection closed");
185
+
this.isRunning = false;
186
+
this.progress.isRunning = false;
187
+
resolve();
188
+
});
189
+
});
190
+
}
191
+
192
+
async stop(): Promise<void> {
193
+
console.log("[BACKFILL] Stopping backfill...");
194
+
195
+
if (this.client) {
196
+
try {
197
+
this.client.removeAllListeners();
198
+
// @ts-ignore
199
+
if (typeof this.client.close === 'function') {
200
+
this.client.close();
201
+
}
202
+
} catch (error) {
203
+
console.error("[BACKFILL] Error closing client:", error);
204
+
}
205
+
this.client = null;
206
+
}
207
+
208
+
await this.saveProgress();
209
+
this.isRunning = false;
210
+
this.progress.isRunning = false;
211
+
212
+
console.log("[BACKFILL] Backfill stopped");
213
+
logCollector.info("Backfill stopped", { progress: this.progress });
214
+
}
215
+
216
+
private async saveProgress(): Promise<void> {
217
+
try {
218
+
await storage.saveBackfillProgress({
219
+
currentCursor: this.progress.currentCursor,
220
+
eventsProcessed: this.progress.eventsProcessed,
221
+
lastUpdateTime: this.progress.lastUpdateTime,
222
+
});
223
+
} catch (error) {
224
+
console.error("[BACKFILL] Error saving progress:", error);
225
+
}
226
+
}
227
+
228
+
getProgress(): BackfillProgress {
229
+
return { ...this.progress };
230
+
}
231
+
232
+
isBackfillRunning(): boolean {
233
+
return this.isRunning;
234
+
}
235
+
}
236
+
237
+
export const backfillService = new BackfillService();
+102
-4
server/services/firehose.ts
+102
-4
server/services/firehose.ts
···
17
17
private eventCallbacks: EventCallback[] = [];
18
18
private recentEvents: any[] = []; // Keep last 50 events for dashboard
19
19
20
+
// Cursor persistence for restart recovery
21
+
private currentCursor: string | null = null;
22
+
private lastCursorSave = 0;
23
+
private readonly CURSOR_SAVE_INTERVAL = 5000; // Save cursor every 5 seconds
24
+
20
25
// Concurrency control to prevent overwhelming the database connection pool
21
26
private processingQueue: Array<() => Promise<void>> = [];
22
27
private activeProcessing = 0;
···
125
130
});
126
131
}
127
132
128
-
connect() {
133
+
private async saveCursor(cursor: string) {
134
+
// Update current cursor
135
+
this.currentCursor = cursor;
136
+
137
+
// Save to database periodically (every 5 seconds) to avoid excessive writes
138
+
const now = Date.now();
139
+
if (now - this.lastCursorSave > this.CURSOR_SAVE_INTERVAL) {
140
+
try {
141
+
const { storage } = await import("../storage");
142
+
await storage.saveFirehoseCursor("firehose", cursor, new Date());
143
+
this.lastCursorSave = now;
144
+
} catch (error) {
145
+
console.error("[FIREHOSE] Error saving cursor:", error);
146
+
}
147
+
}
148
+
}
149
+
150
+
async connect() {
129
151
// Close existing client before creating new one to prevent memory leaks
130
152
if (this.client) {
131
153
try {
···
140
162
this.client = null;
141
163
}
142
164
165
+
// Load saved cursor for restart recovery
166
+
try {
167
+
const { storage } = await import("../storage");
168
+
const savedCursor = await storage.getFirehoseCursor("firehose");
169
+
if (savedCursor && savedCursor.cursor) {
170
+
this.currentCursor = savedCursor.cursor;
171
+
console.log(`[FIREHOSE] Resuming from saved cursor: ${this.currentCursor.slice(0, 20)}...`);
172
+
logCollector.info(`Resuming firehose from cursor: ${this.currentCursor.slice(0, 20)}...`);
173
+
} else {
174
+
console.log(`[FIREHOSE] No saved cursor found, starting from now`);
175
+
logCollector.info("No saved cursor - starting firehose from current position");
176
+
}
177
+
} catch (error) {
178
+
console.error("[FIREHOSE] Error loading cursor:", error);
179
+
logCollector.error("Failed to load firehose cursor", { error });
180
+
}
181
+
143
182
console.log(`[FIREHOSE] Connecting to ${this.url}...`);
144
183
logCollector.info(`Connecting to firehose at ${this.url}`);
145
184
146
185
try {
147
-
this.client = new Firehose({
186
+
const firehoseConfig: any = {
148
187
service: this.url,
149
188
ws: WebSocket as any,
150
-
});
189
+
};
190
+
191
+
// Resume from saved cursor if available
192
+
if (this.currentCursor) {
193
+
firehoseConfig.cursor = this.currentCursor;
194
+
}
195
+
196
+
this.client = new Firehose(firehoseConfig);
151
197
152
198
this.client.on("open", () => {
153
199
console.log("[FIREHOSE] Connected to relay");
···
160
206
this.client.on("commit", (commit) => {
161
207
metricsService.incrementEvent("#commit");
162
208
209
+
// Save cursor for restart recovery (extract from commit metadata if available)
210
+
if ((commit as any).seq) {
211
+
this.saveCursor(String((commit as any).seq));
212
+
}
213
+
163
214
const event = {
164
215
repo: commit.repo,
165
216
ops: commit.ops.map((op) => {
···
206
257
this.client.on("identity", (identity) => {
207
258
metricsService.incrementEvent("#identity");
208
259
260
+
// Save cursor for restart recovery
261
+
if ((identity as any).seq) {
262
+
this.saveCursor(String((identity as any).seq));
263
+
}
264
+
209
265
// Broadcast to WebSocket clients
210
266
this.broadcastEvent({
211
267
type: "#identity",
···
231
287
232
288
this.client.on("account", (account) => {
233
289
metricsService.incrementEvent("#account");
290
+
291
+
// Save cursor for restart recovery
292
+
if ((account as any).seq) {
293
+
this.saveCursor(String((account as any).seq));
294
+
}
234
295
235
296
// Broadcast to WebSocket clients
236
297
this.broadcastEvent({
···
257
318
258
319
this.client.on("error", (error) => {
259
320
console.error("[FIREHOSE] WebSocket error:", error);
260
-
logCollector.error("Firehose WebSocket error", { error: error.message });
321
+
322
+
// Categorize errors for better monitoring
323
+
const errorType = this.categorizeError(error);
324
+
logCollector.error(`Firehose ${errorType} error`, {
325
+
error: error.message,
326
+
type: errorType,
327
+
url: this.url
328
+
});
329
+
261
330
this.isConnected = false;
262
331
metricsService.updateFirehoseStatus("error");
332
+
metricsService.incrementError();
333
+
334
+
// Attempt reconnection for recoverable errors
335
+
if (errorType !== "fatal") {
336
+
this.reconnect();
337
+
}
263
338
});
264
339
265
340
this.client.start();
···
308
383
metricsService.updateFirehoseStatus("disconnected");
309
384
}
310
385
386
+
private categorizeError(error: any): string {
387
+
const message = error?.message?.toLowerCase() || "";
388
+
389
+
if (message.includes("econnrefused") || message.includes("enotfound")) {
390
+
return "network";
391
+
} else if (message.includes("timeout")) {
392
+
return "timeout";
393
+
} else if (message.includes("authentication") || message.includes("unauthorized")) {
394
+
return "auth";
395
+
} else if (message.includes("rate limit")) {
396
+
return "rate-limit";
397
+
} else if (message.includes("protocol") || message.includes("parse")) {
398
+
return "protocol";
399
+
}
400
+
401
+
return "unknown";
402
+
}
403
+
311
404
getStatus() {
312
405
return {
406
+
isConnected: this.isConnected,
313
407
connected: this.isConnected,
314
408
url: this.url,
409
+
currentCursor: this.currentCursor,
410
+
queueDepth: this.processingQueue.length,
411
+
activeProcessing: this.activeProcessing,
412
+
reconnectDelay: this.reconnectDelay,
315
413
};
316
414
}
317
415
+21
-31
server/services/search.ts
+21
-31
server/services/search.ts
···
1
-
import { pool } from "../db";
1
+
import { pool, db } from "../db";
2
2
import { users } from "../../shared/schema";
3
3
import { ilike } from "drizzle-orm";
4
4
import { sql } from "drizzle-orm";
···
41
41
cursor?: string,
42
42
userDid?: string
43
43
): Promise<{ posts: PostSearchResult[]; cursor?: string }> {
44
-
// Sanitize query for tsquery
45
-
const sanitizedQuery = query
46
-
.trim()
47
-
.split(/\s+/)
48
-
.map(term => term.replace(/[^a-zA-Z0-9]/g, ''))
49
-
.filter(term => term.length > 0)
50
-
.join(' & ');
51
-
52
-
if (!sanitizedQuery) {
44
+
const trimmedQuery = query.trim();
45
+
46
+
if (!trimmedQuery) {
53
47
return { posts: [] };
54
48
}
55
-
// Use pool directly for PostgreSQL-specific queries
49
+
50
+
// Use plainto_tsquery which safely handles Unicode, punctuation, and special characters
56
51
const sqlQuery = cursor
57
-
? `SELECT uri, cid, author_did as "authorDid", text, embed, parent_uri as "parentUri", root_uri as "rootUri", created_at as "createdAt", indexed_at as "indexedAt", ts_rank(search_vector, to_tsquery('english', $1)) as rank FROM posts WHERE search_vector @@ to_tsquery('english', $1) AND ts_rank(search_vector, to_tsquery('english', $1)) < $2 ORDER BY rank DESC LIMIT $3`
58
-
: `SELECT uri, cid, author_did as "authorDid", text, embed, parent_uri as "parentUri", root_uri as "rootUri", created_at as "createdAt", indexed_at as "indexedAt", ts_rank(search_vector, to_tsquery('english', $1)) as rank FROM posts WHERE search_vector @@ to_tsquery('english', $1) ORDER BY rank DESC LIMIT $2`;
52
+
? `SELECT uri, cid, author_did as "authorDid", text, embed, parent_uri as "parentUri", root_uri as "rootUri", created_at as "createdAt", indexed_at as "indexedAt", ts_rank(search_vector, plainto_tsquery('english', $1)) as rank FROM posts WHERE search_vector @@ plainto_tsquery('english', $1) AND ts_rank(search_vector, plainto_tsquery('english', $1)) < $2 ORDER BY rank DESC LIMIT $3`
53
+
: `SELECT uri, cid, author_did as "authorDid", text, embed, parent_uri as "parentUri", root_uri as "rootUri", created_at as "createdAt", indexed_at as "indexedAt", ts_rank(search_vector, plainto_tsquery('english', $1)) as rank FROM posts WHERE search_vector @@ plainto_tsquery('english', $1) ORDER BY rank DESC LIMIT $2`;
59
54
60
-
const params = cursor ? [sanitizedQuery, parseFloat(cursor), limit + 1] : [sanitizedQuery, limit + 1];
55
+
const params = cursor ? [trimmedQuery, parseFloat(cursor), limit + 1] : [trimmedQuery, limit + 1];
61
56
const queryResult = await pool.query(sqlQuery, params);
62
57
const results = { rows: queryResult.rows as (PostSearchResult & { rank: number })[] };
63
58
···
97
92
limit = 25,
98
93
cursor?: string
99
94
): Promise<{ actors: ActorSearchResult[]; cursor?: string }> {
100
-
// Sanitize query for tsquery
101
-
const sanitizedQuery = query
102
-
.trim()
103
-
.split(/\s+/)
104
-
.map(term => term.replace(/[^a-zA-Z0-9]/g, ''))
105
-
.filter(term => term.length > 0)
106
-
.join(' & ');
107
-
108
-
if (!sanitizedQuery) {
95
+
const trimmedQuery = query.trim();
96
+
97
+
if (!trimmedQuery) {
109
98
return { actors: [] };
110
99
}
111
100
112
-
// Build SQL query with optional cursor
101
+
// Build SQL query with optional cursor - use plainto_tsquery for safe Unicode handling
113
102
const cursorCondition = cursor
114
-
? sql`AND ts_rank(search_vector, to_tsquery('english', ${sanitizedQuery})) < ${parseFloat(cursor)}`
103
+
? sql`AND ts_rank(search_vector, plainto_tsquery('english', ${trimmedQuery})) < ${parseFloat(cursor)}`
115
104
: sql``;
116
105
117
106
// Execute search using raw SQL
118
-
const results = await db.execute<ActorSearchResult>(sql`
107
+
const results = await db.execute(sql`
119
108
SELECT
120
109
did,
121
110
handle,
122
111
display_name as "displayName",
123
112
avatar_url as "avatarUrl",
124
113
description,
125
-
ts_rank(search_vector, to_tsquery('english', ${sanitizedQuery})) as rank
114
+
ts_rank(search_vector, plainto_tsquery('english', ${trimmedQuery})) as rank
126
115
FROM users
127
-
WHERE search_vector @@ to_tsquery('english', ${sanitizedQuery})
116
+
WHERE search_vector @@ plainto_tsquery('english', ${trimmedQuery})
128
117
${cursorCondition}
129
118
ORDER BY rank DESC
130
119
LIMIT ${limit + 1}
131
120
`);
132
121
133
122
// Determine pagination
134
-
const hasMore = results.rows.length > limit;
135
-
const actorsToReturn = results.rows.slice(0, limit);
123
+
const rows = results.rows as unknown as (ActorSearchResult & { rank: number })[];
124
+
const hasMore = rows.length > limit;
125
+
const actorsToReturn = rows.slice(0, limit);
136
126
const nextCursor = hasMore && actorsToReturn.length > 0
137
127
? actorsToReturn[actorsToReturn.length - 1].rank.toString()
138
128
: undefined;
139
129
140
130
return {
141
-
actors: actorsToReturn as ActorSearchResult[],
131
+
actors: actorsToReturn,
142
132
cursor: nextCursor,
143
133
};
144
134
}
+75
-1
server/storage.ts
+75
-1
server/storage.ts
···
1
-
import { users, posts, likes, reposts, follows, blocks, mutes, listMutes, listBlocks, threadMutes, userPreferences, sessions, userSettings, labels, labelDefinitions, labelEvents, moderationReports, moderationActions, moderatorAssignments, notifications, lists, listItems, feedGenerators, starterPacks, labelerServices, pushSubscriptions, videoJobs, type User, type InsertUser, type Post, type InsertPost, type Like, type InsertLike, type Repost, type InsertRepost, type Follow, type InsertFollow, type Block, type InsertBlock, type Mute, type InsertMute, type ListMute, type InsertListMute, type ListBlock, type InsertListBlock, type ThreadMute, type InsertThreadMute, type UserPreferences, type InsertUserPreferences, type Session, type InsertSession, type UserSettings, type InsertUserSettings, type Label, type InsertLabel, type LabelDefinition, type InsertLabelDefinition, type LabelEvent, type InsertLabelEvent, type ModerationReport, type InsertModerationReport, type ModerationAction, type InsertModerationAction, type ModeratorAssignment, type InsertModeratorAssignment, type Notification, type InsertNotification, type List, type InsertList, type ListItem, type InsertListItem, type FeedGenerator, type InsertFeedGenerator, type StarterPack, type InsertStarterPack, type LabelerService, type InsertLabelerService, type PushSubscription, type InsertPushSubscription, type VideoJob, type InsertVideoJob } from "@shared/schema";
1
+
import { users, posts, likes, reposts, follows, blocks, mutes, listMutes, listBlocks, threadMutes, userPreferences, sessions, userSettings, labels, labelDefinitions, labelEvents, moderationReports, moderationActions, moderatorAssignments, notifications, lists, listItems, feedGenerators, starterPacks, labelerServices, pushSubscriptions, videoJobs, firehoseCursor, type User, type InsertUser, type Post, type InsertPost, type Like, type InsertLike, type Repost, type InsertRepost, type Follow, type InsertFollow, type Block, type InsertBlock, type Mute, type InsertMute, type ListMute, type InsertListMute, type ListBlock, type InsertListBlock, type ThreadMute, type InsertThreadMute, type UserPreferences, type InsertUserPreferences, type Session, type InsertSession, type UserSettings, type InsertUserSettings, type Label, type InsertLabel, type LabelDefinition, type InsertLabelDefinition, type LabelEvent, type InsertLabelEvent, type ModerationReport, type InsertModerationReport, type ModerationAction, type InsertModerationAction, type ModeratorAssignment, type InsertModeratorAssignment, type Notification, type InsertNotification, type List, type InsertList, type ListItem, type InsertListItem, type FeedGenerator, type InsertFeedGenerator, type StarterPack, type InsertStarterPack, type LabelerService, type InsertLabelerService, type PushSubscription, type InsertPushSubscription, type VideoJob, type InsertVideoJob, type FirehoseCursor, type InsertFirehoseCursor } from "@shared/schema";
2
2
import { db, pool } from "./db";
3
3
import { eq, desc, and, sql, inArray, isNull } from "drizzle-orm";
4
4
import { encryptionService } from "./services/encryption";
···
189
189
getUserVideoJobs(userDid: string, limit?: number): Promise<VideoJob[]>;
190
190
updateVideoJob(jobId: string, data: Partial<InsertVideoJob>): Promise<VideoJob | undefined>;
191
191
deleteVideoJob(jobId: string): Promise<void>;
192
+
193
+
// Firehose cursor operations
194
+
getFirehoseCursor(service: string): Promise<FirehoseCursor | undefined>;
195
+
saveFirehoseCursor(service: string, cursor: string | null, lastEventTime?: Date): Promise<void>;
196
+
197
+
// Backfill progress operations
198
+
getBackfillProgress(): Promise<{ currentCursor: string | null; eventsProcessed: number; lastUpdateTime: Date } | undefined>;
199
+
saveBackfillProgress(progress: { currentCursor: string | null; eventsProcessed: number; lastUpdateTime: Date }): Promise<void>;
192
200
193
201
// Stats
194
202
getStats(): Promise<{
···
1587
1595
1588
1596
async deleteVideoJob(jobId: string): Promise<void> {
1589
1597
await db.delete(videoJobs).where(eq(videoJobs.jobId, jobId));
1598
+
}
1599
+
1600
+
async getFirehoseCursor(service: string): Promise<FirehoseCursor | undefined> {
1601
+
const [cursor] = await db.select().from(firehoseCursor).where(eq(firehoseCursor.service, service));
1602
+
return cursor || undefined;
1603
+
}
1604
+
1605
+
async saveFirehoseCursor(service: string, cursor: string | null, lastEventTime?: Date): Promise<void> {
1606
+
// Use upsert to handle concurrent saves atomically
1607
+
await db
1608
+
.insert(firehoseCursor)
1609
+
.values({
1610
+
service,
1611
+
cursor,
1612
+
lastEventTime: lastEventTime || new Date(),
1613
+
})
1614
+
.onConflictDoUpdate({
1615
+
target: firehoseCursor.service,
1616
+
set: {
1617
+
cursor,
1618
+
lastEventTime: lastEventTime || new Date(),
1619
+
updatedAt: new Date(),
1620
+
},
1621
+
});
1622
+
}
1623
+
1624
+
async getBackfillProgress(): Promise<{ currentCursor: string | null; eventsProcessed: number; lastUpdateTime: Date } | undefined> {
1625
+
const [record] = await db.select().from(firehoseCursor).where(eq(firehoseCursor.service, "backfill"));
1626
+
if (!record) return undefined;
1627
+
1628
+
// Parse cursor field which contains both cursor and eventsProcessed encoded as "cursor|eventsProcessed"
1629
+
let currentCursor: string | null = record.cursor;
1630
+
let eventsProcessed = 0;
1631
+
1632
+
if (record.cursor && record.cursor.includes('|')) {
1633
+
const parts = record.cursor.split('|');
1634
+
currentCursor = parts[0] || null;
1635
+
eventsProcessed = parseInt(parts[1] || '0', 10);
1636
+
}
1637
+
1638
+
return {
1639
+
currentCursor,
1640
+
eventsProcessed,
1641
+
lastUpdateTime: record.lastEventTime || new Date(),
1642
+
};
1643
+
}
1644
+
1645
+
async saveBackfillProgress(progress: { currentCursor: string | null; eventsProcessed: number; lastUpdateTime: Date }): Promise<void> {
1646
+
// Encode both cursor and eventsProcessed in the cursor field as "cursor|eventsProcessed"
1647
+
const encodedCursor = `${progress.currentCursor || ''}|${progress.eventsProcessed}`;
1648
+
1649
+
await db
1650
+
.insert(firehoseCursor)
1651
+
.values({
1652
+
service: "backfill",
1653
+
cursor: encodedCursor,
1654
+
lastEventTime: progress.lastUpdateTime,
1655
+
})
1656
+
.onConflictDoUpdate({
1657
+
target: firehoseCursor.service,
1658
+
set: {
1659
+
cursor: encodedCursor,
1660
+
lastEventTime: progress.lastUpdateTime,
1661
+
updatedAt: new Date(),
1662
+
},
1663
+
});
1590
1664
}
1591
1665
1592
1666
private statsCache: { data: any, timestamp: number } | null = null;