tangled
alpha
login
or
join now
nonbinary.computer
/
weaver
atproto blogging
24
fork
atom
overview
issues
2
pulls
pipelines
more resilience
Orual
1 month ago
e86033fb
f79bbd8d
+362
-176
4 changed files
expand all
collapse all
unified
split
crates
weaver-index
src
clickhouse
client.rs
resilient_inserter.rs
indexer.rs
docker-compose.yml
+2
-2
crates/weaver-index/src/clickhouse/client.rs
···
50
.inserter(table)
51
.with_max_rows(1000)
52
.with_period_bias(0.1)
53
-
.with_period(Some(Duration::from_secs(1)))
54
-
.with_max_bytes(1_048_576)
55
}
56
57
/// Query table sizes from system.parts
···
50
.inserter(table)
51
.with_max_rows(1000)
52
.with_period_bias(0.1)
53
+
.with_period(Some(Duration::from_secs(2)))
54
+
.with_max_bytes(1_048_576 * 2)
55
}
56
57
/// Query table sizes from system.parts
+236
-31
crates/weaver-index/src/clickhouse/resilient_inserter.rs
···
38
impl Default for InserterConfig {
39
fn default() -> Self {
40
Self {
41
-
max_rows: 1000,
42
-
max_bytes: 1_048_576, // 1MB
43
-
period: Some(Duration::from_secs(1)),
44
period_bias: 0.1,
45
}
46
}
···
217
self.pending.len()
218
}
219
220
-
/// Handle a batch failure by retrying rows individually
0
0
0
0
0
0
0
0
0
221
async fn handle_batch_failure(
222
&mut self,
223
original_error: clickhouse::error::Error,
···
238
// Create fresh inserter (old one is poisoned after error)
239
self.inner = Self::create_inserter(&self.client, &self.config);
240
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
241
let mut succeeded = 0u64;
242
let mut failed = 0u64;
243
244
-
for row in rows {
245
-
match self.try_single_insert(&row).await {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
246
Ok(()) => {
247
succeeded += 1;
0
248
}
249
Err(e) => {
250
failed += 1;
···
256
error = ?e,
257
"row insert failed, sending to DLQ"
258
);
259
-
self.send_to_dlq(&row, &e).await?;
0
0
260
}
261
}
262
}
263
264
-
debug!(total, succeeded, failed, "batch failure recovery complete");
265
-
266
-
Ok(Quantities {
267
-
rows: succeeded,
268
-
bytes: 0,
269
-
transactions: 0,
270
-
})
271
-
}
272
-
273
-
/// Try to insert a single row using a fresh one-shot inserter
274
-
async fn try_single_insert(
275
-
&self,
276
-
row: &RawRecordInsert,
277
-
) -> Result<(), clickhouse::error::Error> {
278
-
let mut inserter: Inserter<RawRecordInsert> =
279
-
self.client.inserter(Tables::RAW_RECORDS).with_max_rows(1);
280
-
281
-
inserter.write(row).await?;
282
-
inserter.end().await?;
283
-
Ok(())
284
}
285
286
/// Send a failed row to the dead-letter queue
···
292
let raw_data = serde_json::to_string(row)
293
.unwrap_or_else(|e| format!("{{\"serialization_error\": \"{}\"}}", e));
294
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
295
let dlq_row = RawEventDlq {
296
-
event_type: row.operation.clone(),
297
raw_data: raw_data.to_smolstr(),
298
-
error_message: error.to_smolstr(),
299
-
seq: row.seq,
300
};
301
302
self.dlq
···
320
}
321
}
322
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
323
#[cfg(test)]
324
mod tests {
325
-
// TODO: Add tests with mock clickhouse client
0
0
0
0
0
0
0
0
0
0
0
0
0
0
326
}
···
38
impl Default for InserterConfig {
39
fn default() -> Self {
40
Self {
41
+
max_rows: 10000,
42
+
max_bytes: 1_048_576 * 2, // 1MB
43
+
period: Some(Duration::from_secs(2)),
44
period_bias: 0.1,
45
}
46
}
···
217
self.pending.len()
218
}
219
220
+
/// Get time remaining until the next scheduled flush
221
+
pub fn time_left(&mut self) -> Option<std::time::Duration> {
222
+
self.inner.time_left()
223
+
}
224
+
225
+
/// Handle a batch failure by retrying rows
226
+
///
227
+
/// Attempts to extract the failing row number from the error message.
228
+
/// If found, batches rows before/after the failure point for efficiency.
229
+
/// Falls back to individual retries if row number unavailable or sub-batches fail.
230
async fn handle_batch_failure(
231
&mut self,
232
original_error: clickhouse::error::Error,
···
247
// Create fresh inserter (old one is poisoned after error)
248
self.inner = Self::create_inserter(&self.client, &self.config);
249
250
+
// Try to extract failing row number for smart retry
251
+
if let Some(failing_row) = extract_failing_row(&original_error) {
252
+
// Subtract 2 for safety margin (1-indexed to 0-indexed, plus buffer)
253
+
let safe_row = failing_row.saturating_sub(2);
254
+
255
+
if safe_row > 0 && safe_row < total {
256
+
debug!(
257
+
failing_row,
258
+
safe_row, total, "extracted failing row, attempting smart retry"
259
+
);
260
+
return self.smart_retry(rows, safe_row, &original_error).await;
261
+
}
262
+
}
263
+
264
+
// Fall back to individual retries
265
+
debug!(total, "no row number found, retrying individually");
266
+
self.retry_individually(rows).await
267
+
}
268
+
269
+
/// Smart retry: batch rows before failure, DLQ the bad row, batch rows after
270
+
async fn smart_retry(
271
+
&mut self,
272
+
rows: Vec<RawRecordInsert>,
273
+
failing_idx: usize,
274
+
original_error: &clickhouse::error::Error,
275
+
) -> Result<Quantities, IndexError> {
276
+
let total = rows.len();
277
let mut succeeded = 0u64;
278
let mut failed = 0u64;
279
280
+
// Try to batch insert rows before the failure point
281
+
if failing_idx > 0 {
282
+
let before = &rows[..failing_idx];
283
+
debug!(count = before.len(), "batch inserting rows before failure");
284
+
285
+
match self.batch_insert(before).await {
286
+
Ok(count) => {
287
+
succeeded += count;
288
+
debug!(count, "pre-failure batch succeeded");
289
+
}
290
+
Err(e) => {
291
+
// Sub-batch failed, fall back to individual for this chunk
292
+
warn!(error = ?e, "pre-failure batch failed, retrying individually");
293
+
let (s, f) = self.retry_individually_slice(before).await?;
294
+
succeeded += s;
295
+
failed += f;
296
+
}
297
+
}
298
+
}
299
+
300
+
// Send the failing row (and a couple around it) to DLQ
301
+
let dlq_start = failing_idx;
302
+
let dlq_end = (failing_idx + 3).min(total); // failing row + 2 more for safety
303
+
for row in &rows[dlq_start..dlq_end] {
304
+
warn!(
305
+
did = %row.did,
306
+
collection = %row.collection,
307
+
rkey = %row.rkey,
308
+
seq = row.seq,
309
+
"sending suspected bad row to DLQ"
310
+
);
311
+
self.send_to_dlq(row, original_error).await?;
312
+
failed += 1;
313
+
}
314
+
315
+
// Try to batch insert rows after the failure point
316
+
if dlq_end < total {
317
+
let after = &rows[dlq_end..];
318
+
debug!(count = after.len(), "batch inserting rows after failure");
319
+
320
+
match self.batch_insert(after).await {
321
+
Ok(count) => {
322
+
succeeded += count;
323
+
debug!(count, "post-failure batch succeeded");
324
+
}
325
+
Err(e) => {
326
+
// Sub-batch failed, fall back to individual for this chunk
327
+
warn!(error = ?e, "post-failure batch failed, retrying individually");
328
+
let (s, f) = self.retry_individually_slice(after).await?;
329
+
succeeded += s;
330
+
failed += f;
331
+
}
332
+
}
333
+
}
334
+
335
+
debug!(total, succeeded, failed, "smart retry complete");
336
+
337
+
Ok(Quantities {
338
+
rows: succeeded,
339
+
bytes: 0,
340
+
transactions: 0,
341
+
})
342
+
}
343
+
344
+
/// Batch insert a slice of rows using a fresh one-shot inserter
345
+
async fn batch_insert(
346
+
&mut self,
347
+
rows: &[RawRecordInsert],
348
+
) -> Result<u64, clickhouse::error::Error> {
349
+
batch_insert_rows(&self.client, rows).await
350
+
}
351
+
352
+
/// Retry a vec of rows individually, returning (succeeded, failed) counts
353
+
async fn retry_individually(
354
+
&mut self,
355
+
rows: Vec<RawRecordInsert>,
356
+
) -> Result<Quantities, IndexError> {
357
+
let (succeeded, failed) = self.retry_individually_slice(&rows).await?;
358
+
359
+
if failed > 0 {
360
+
warn!(
361
+
succeeded,
362
+
failed, "individual retry had failures sent to DLQ"
363
+
);
364
+
}
365
+
366
+
Ok(Quantities {
367
+
rows: succeeded,
368
+
bytes: 0,
369
+
transactions: 0,
370
+
})
371
+
}
372
+
373
+
/// Retry a slice of rows individually, returning (succeeded, failed) counts
374
+
async fn retry_individually_slice(
375
+
&mut self,
376
+
rows: &[RawRecordInsert],
377
+
) -> Result<(u64, u64), IndexError> {
378
+
let total = rows.len();
379
+
let mut succeeded = 0u64;
380
+
let mut failed = 0u64;
381
+
382
+
let client = self.client.clone();
383
+
384
+
for (i, row) in rows.iter().enumerate() {
385
+
debug!(i, total, did = %row.did, "retrying row individually");
386
+
match try_single_insert(&client, row).await {
387
Ok(()) => {
388
succeeded += 1;
389
+
debug!(i, "row succeeded");
390
}
391
Err(e) => {
392
failed += 1;
···
398
error = ?e,
399
"row insert failed, sending to DLQ"
400
);
401
+
debug!(i, "sending to DLQ");
402
+
self.send_to_dlq(row, &e).await?;
403
+
debug!(i, "DLQ write complete");
404
}
405
}
406
}
407
408
+
debug!(total, succeeded, failed, "individual retry complete");
409
+
Ok((succeeded, failed))
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
410
}
411
412
/// Send a failed row to the dead-letter queue
···
418
let raw_data = serde_json::to_string(row)
419
.unwrap_or_else(|e| format!("{{\"serialization_error\": \"{}\"}}", e));
420
421
+
self.write_raw_to_dlq(row.operation.clone(), raw_data, error.to_string(), row.seq)
422
+
.await
423
+
}
424
+
425
+
/// Write a pre-insert failure directly to the DLQ
426
+
///
427
+
/// Use this for failures that happen before we even have a valid RawRecordInsert,
428
+
/// like JSON serialization errors.
429
+
pub async fn write_raw_to_dlq(
430
+
&mut self,
431
+
event_type: SmolStr,
432
+
raw_data: String,
433
+
error_message: String,
434
+
seq: u64,
435
+
) -> Result<(), IndexError> {
436
let dlq_row = RawEventDlq {
437
+
event_type,
438
raw_data: raw_data.to_smolstr(),
439
+
error_message: error_message.to_smolstr(),
440
+
seq,
441
};
442
443
self.dlq
···
461
}
462
}
463
464
+
/// Try to insert a single row using a fresh one-shot inserter
465
+
///
466
+
/// Free function to avoid &self borrow across await points (Sync issues)
467
+
async fn try_single_insert(
468
+
client: &clickhouse::Client,
469
+
row: &RawRecordInsert,
470
+
) -> Result<(), clickhouse::error::Error> {
471
+
let mut inserter: Inserter<RawRecordInsert> =
472
+
client.inserter(Tables::RAW_RECORDS).with_max_rows(1);
473
+
474
+
inserter.write(row).await?;
475
+
inserter.force_commit().await?;
476
+
inserter.end().await?;
477
+
Ok(())
478
+
}
479
+
480
+
/// Batch insert rows using a fresh inserter
481
+
///
482
+
/// Free function to avoid &self borrow across await points (Sync issues)
483
+
async fn batch_insert_rows(
484
+
client: &clickhouse::Client,
485
+
rows: &[RawRecordInsert],
486
+
) -> Result<u64, clickhouse::error::Error> {
487
+
let mut inserter: Inserter<RawRecordInsert> = client
488
+
.inserter(Tables::RAW_RECORDS)
489
+
.with_max_rows(rows.len() as u64);
490
+
491
+
for row in rows {
492
+
inserter.write(row).await?;
493
+
}
494
+
inserter.end().await?;
495
+
Ok(rows.len() as u64)
496
+
}
497
+
498
+
/// Extract the failing row number from a ClickHouse error message
499
+
///
500
+
/// Looks for patterns like "(at row 791)" in the error text.
501
+
/// Returns 1-indexed row number if found.
502
+
fn extract_failing_row(error: &clickhouse::error::Error) -> Option<usize> {
503
+
let msg = error.to_string();
504
+
// Look for "(at row N)"
505
+
if let Some(start) = msg.find("(at row ") {
506
+
let rest = &msg[start + 8..];
507
+
if let Some(end) = rest.find(')') {
508
+
return rest[..end].parse().ok();
509
+
}
510
+
}
511
+
None
512
+
}
513
+
514
#[cfg(test)]
515
mod tests {
516
+
use super::*;
517
+
518
+
#[test]
519
+
fn test_extract_failing_row() {
520
+
// Simulate the error message format from ClickHouse
521
+
let msg = "Code: 117. DB::Exception: Cannot parse JSON object here: : (at row 791)\n: While executing BinaryRowInputFormat.";
522
+
523
+
// We can't easily construct a clickhouse::error::Error, but we can test the parsing logic
524
+
assert!(msg.contains("(at row "));
525
+
let start = msg.find("(at row ").unwrap();
526
+
let rest = &msg[start + 8..];
527
+
let end = rest.find(')').unwrap();
528
+
let row: usize = rest[..end].parse().unwrap();
529
+
assert_eq!(row, 791);
530
+
}
531
}
+123
-142
crates/weaver-index/src/indexer.rs
···
10
use chrono::DateTime;
11
12
use crate::clickhouse::{
13
-
AccountRevState, Client, FirehoseCursor, RawAccountEvent, RawIdentityEvent, RawRecordInsert,
0
14
};
15
use crate::config::IndexerConfig;
16
use crate::config::TapConfig;
17
use crate::error::{ClickHouseError, IndexError, Result};
18
use crate::firehose::{
19
-
Account, Commit, ExtractedRecord, FirehoseConsumer, Identity, MessageStream,
20
-
SubscribeReposMessage, extract_records,
21
};
22
use crate::tap::{TapConfig as TapConsumerConfig, TapConsumer, TapEvent};
23
···
210
let mut stream: MessageStream = self.consumer.connect().await?;
211
212
// Inserters handle batching internally based on config
213
-
let mut records = self.client.inserter::<RawRecordInsert>("raw_records");
0
0
214
let mut identities = self
215
.client
216
.inserter::<RawIdentityEvent>("raw_identity_events");
···
235
let accounts_time = accounts.time_left().unwrap_or(Duration::from_secs(10));
236
let time_left = records_time.min(identities_time).min(accounts_time);
237
238
-
let result =
239
-
match tokio::time::timeout(time_left, stream.next()).await {
240
-
Ok(Some(result)) => result,
241
-
Ok(None) => {
242
-
// Stream ended
243
-
break;
244
-
}
245
-
Err(_) => {
246
-
// Timeout - flush inserters to keep INSERT alive
247
-
debug!("flush timeout, committing inserters");
248
-
records.commit().await.map_err(|e| {
249
-
crate::error::ClickHouseError::Query {
250
-
message: "periodic records commit failed".into(),
251
-
source: e,
252
-
}
0
0
0
0
0
0
0
253
})?;
254
-
identities.commit().await.map_err(|e| {
255
-
crate::error::ClickHouseError::Query {
256
-
message: "periodic identities commit failed".into(),
257
-
source: e,
258
-
}
259
-
})?;
260
-
accounts.commit().await.map_err(|e| {
261
-
crate::error::ClickHouseError::Query {
262
-
message: "periodic accounts commit failed".into(),
263
-
source: e,
264
-
}
265
-
})?;
266
-
continue;
267
-
}
268
-
};
269
270
let msg = match result {
271
Ok(msg) => msg,
···
294
295
match msg {
296
SubscribeReposMessage::Commit(commit) => {
297
-
if self
298
-
.process_commit(&commit, &mut records, &mut skipped)
299
-
.await?
300
-
{
301
-
processed += 1;
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
302
}
0
0
0
0
0
0
0
0
0
303
}
304
SubscribeReposMessage::Identity(identity) => {
305
write_identity(&identity, &mut identities).await?;
···
314
}
315
316
// commit() flushes if internal thresholds met, otherwise no-op
317
-
records
318
-
.commit()
319
-
.await
320
-
.map_err(|e| crate::error::ClickHouseError::Query {
321
-
message: "commit failed".into(),
322
-
source: e,
323
-
})?;
324
325
// Periodic stats and cursor save (every 10s)
326
if last_stats.elapsed() >= Duration::from_secs(10) {
···
344
}
345
346
// Final flush
347
-
records
348
-
.end()
349
-
.await
350
-
.map_err(|e| crate::error::ClickHouseError::Query {
351
-
message: "final flush failed".into(),
352
-
source: e,
353
-
})?;
354
identities
355
.end()
356
.await
···
374
info!(last_seq, "firehose stream ended");
375
Ok(())
376
}
377
-
378
-
async fn process_commit(
379
-
&self,
380
-
commit: &Commit<'_>,
381
-
inserter: &mut clickhouse::inserter::Inserter<RawRecordInsert>,
382
-
skipped: &mut u64,
383
-
) -> Result<bool> {
384
-
let did = commit.repo.as_ref();
385
-
let rev = commit.rev.as_ref();
386
-
387
-
// Dedup check
388
-
if !self.rev_cache.should_process(did, rev) {
389
-
*skipped += 1;
390
-
return Ok(false);
391
-
}
392
-
393
-
// Extract and write records
394
-
for record in extract_records(commit).await? {
395
-
// Collection filter - skip early before JSON conversion
396
-
if !self.config.collections.matches(&record.collection) {
397
-
continue;
398
-
}
399
-
400
-
let json = record.to_json()?.unwrap_or_else(|| "{}".to_string());
401
-
402
-
// Fire and forget delete handling
403
-
if record.operation == "delete" {
404
-
let client = self.client.clone();
405
-
let record_clone = record.clone();
406
-
tokio::spawn(async move {
407
-
if let Err(e) = handle_delete(&client, record_clone).await {
408
-
warn!(error = ?e, "delete handling failed");
409
-
}
410
-
});
411
-
}
412
-
413
-
inserter
414
-
.write(&RawRecordInsert {
415
-
did: record.did.clone(),
416
-
collection: record.collection.clone(),
417
-
rkey: record.rkey.clone(),
418
-
cid: record.cid.clone(),
419
-
rev: record.rev.clone(),
420
-
record: json.to_smolstr(),
421
-
operation: record.operation.clone(),
422
-
seq: record.seq as u64,
423
-
event_time: record.event_time,
424
-
is_live: true,
425
-
})
426
-
.await
427
-
.map_err(|e| crate::error::ClickHouseError::Query {
428
-
message: "write failed".into(),
429
-
source: e,
430
-
})?;
431
-
}
432
-
433
-
// Update rev cache
434
-
self.rev_cache.update(
435
-
&SmolStr::new(did),
436
-
&SmolStr::new(rev),
437
-
&commit.commit.0.to_smolstr(),
438
-
);
439
-
440
-
Ok(true)
441
-
}
442
}
443
444
async fn write_identity(
···
602
603
let (mut events, ack_tx) = consumer.connect().await?;
604
605
-
let mut records = self.client.inserter::<RawRecordInsert>("raw_records");
0
0
606
let mut identities = self
607
.client
608
.inserter::<RawIdentityEvent>("raw_identity_events");
···
629
Err(_) => {
630
// Timeout - flush inserters to keep INSERT alive
631
trace!("flush timeout, committing inserters");
632
-
records.commit().await.map_err(|e| ClickHouseError::Query {
633
-
message: "periodic records commit failed".into(),
634
-
source: e,
635
-
})?;
636
identities
637
.commit()
638
.await
···
658
continue;
659
}
660
661
-
let json = record
662
-
.record
663
-
.as_ref()
664
-
.map(|v| serde_json::to_string(v).unwrap_or_default())
665
-
.unwrap_or_default();
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
666
667
debug!(
668
op = record.action.as_str(),
···
672
);
673
674
records
675
-
.write(&RawRecordInsert {
676
did: record.did.clone(),
677
collection: record.collection.clone(),
678
rkey: record.rkey.clone(),
···
684
event_time: Utc::now(),
685
is_live: record.live,
686
})
687
-
.await
688
-
.map_err(|e| ClickHouseError::Query {
689
-
message: "record write failed".into(),
690
-
source: e,
691
-
})?;
692
-
records.commit().await.map_err(|e| ClickHouseError::Query {
693
-
message: format!("record commit failed for id {}:\n{}", event_id, json),
694
-
source: e,
695
-
})?;
696
697
processed += 1;
698
}
···
740
}
741
742
// Final flush
743
-
records.end().await.map_err(|e| ClickHouseError::Query {
744
-
message: "final records flush failed".into(),
745
-
source: e,
746
-
})?;
747
identities.end().await.map_err(|e| ClickHouseError::Query {
748
message: "final identities flush failed".into(),
749
source: e,
···
10
use chrono::DateTime;
11
12
use crate::clickhouse::{
13
+
AccountRevState, Client, FirehoseCursor, InserterConfig, RawAccountEvent, RawIdentityEvent,
14
+
RawRecordInsert, ResilientRecordInserter,
15
};
16
use crate::config::IndexerConfig;
17
use crate::config::TapConfig;
18
use crate::error::{ClickHouseError, IndexError, Result};
19
use crate::firehose::{
20
+
Account, ExtractedRecord, FirehoseConsumer, Identity, MessageStream, SubscribeReposMessage,
21
+
extract_records,
22
};
23
use crate::tap::{TapConfig as TapConsumerConfig, TapConsumer, TapEvent};
24
···
211
let mut stream: MessageStream = self.consumer.connect().await?;
212
213
// Inserters handle batching internally based on config
214
+
// Use resilient inserter for records since that's where untrusted JSON enters
215
+
let mut records =
216
+
ResilientRecordInserter::new(self.client.inner().clone(), InserterConfig::default());
217
let mut identities = self
218
.client
219
.inserter::<RawIdentityEvent>("raw_identity_events");
···
238
let accounts_time = accounts.time_left().unwrap_or(Duration::from_secs(10));
239
let time_left = records_time.min(identities_time).min(accounts_time);
240
241
+
let result = match tokio::time::timeout(time_left, stream.next()).await {
242
+
Ok(Some(result)) => result,
243
+
Ok(None) => {
244
+
// Stream ended
245
+
break;
246
+
}
247
+
Err(_) => {
248
+
// Timeout - flush inserters to keep INSERT alive
249
+
debug!("flush timeout, committing inserters");
250
+
records.commit().await?;
251
+
identities.commit().await.map_err(|e| {
252
+
crate::error::ClickHouseError::Query {
253
+
message: "periodic identities commit failed".into(),
254
+
source: e,
255
+
}
256
+
})?;
257
+
accounts
258
+
.commit()
259
+
.await
260
+
.map_err(|e| crate::error::ClickHouseError::Query {
261
+
message: "periodic accounts commit failed".into(),
262
+
source: e,
263
})?;
264
+
continue;
265
+
}
266
+
};
0
0
0
0
0
0
0
0
0
0
0
0
267
268
let msg = match result {
269
Ok(msg) => msg,
···
292
293
match msg {
294
SubscribeReposMessage::Commit(commit) => {
295
+
let did = commit.repo.as_ref();
296
+
let rev = commit.rev.as_ref();
297
+
298
+
// Dedup check
299
+
if !self.rev_cache.should_process(did, rev) {
300
+
skipped += 1;
301
+
continue;
302
+
}
303
+
304
+
// Extract and write records
305
+
for record in extract_records(&commit).await? {
306
+
// Collection filter - skip early before JSON conversion
307
+
if !self.config.collections.matches(&record.collection) {
308
+
continue;
309
+
}
310
+
311
+
let json = record.to_json()?.unwrap_or_else(|| "{}".to_string());
312
+
313
+
// Fire and forget delete handling
314
+
if record.operation == "delete" {
315
+
let client = self.client.clone();
316
+
let record_clone = record.clone();
317
+
tokio::spawn(async move {
318
+
if let Err(e) = handle_delete(&client, record_clone).await {
319
+
warn!(error = ?e, "delete handling failed");
320
+
}
321
+
});
322
+
}
323
+
324
+
records
325
+
.write(RawRecordInsert {
326
+
did: record.did.clone(),
327
+
collection: record.collection.clone(),
328
+
rkey: record.rkey.clone(),
329
+
cid: record.cid.clone(),
330
+
rev: record.rev.clone(),
331
+
record: json.to_smolstr(),
332
+
operation: record.operation.clone(),
333
+
seq: record.seq as u64,
334
+
event_time: record.event_time,
335
+
is_live: true,
336
+
})
337
+
.await?;
338
}
339
+
340
+
// Update rev cache
341
+
self.rev_cache.update(
342
+
&SmolStr::new(did),
343
+
&SmolStr::new(rev),
344
+
&commit.commit.0.to_smolstr(),
345
+
);
346
+
347
+
processed += 1;
348
}
349
SubscribeReposMessage::Identity(identity) => {
350
write_identity(&identity, &mut identities).await?;
···
359
}
360
361
// commit() flushes if internal thresholds met, otherwise no-op
362
+
records.commit().await?;
0
0
0
0
0
0
363
364
// Periodic stats and cursor save (every 10s)
365
if last_stats.elapsed() >= Duration::from_secs(10) {
···
383
}
384
385
// Final flush
386
+
records.end().await?;
0
0
0
0
0
0
387
identities
388
.end()
389
.await
···
407
info!(last_seq, "firehose stream ended");
408
Ok(())
409
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
410
}
411
412
async fn write_identity(
···
570
571
let (mut events, ack_tx) = consumer.connect().await?;
572
573
+
// Use resilient inserter for records since that's where untrusted JSON enters
574
+
let mut records =
575
+
ResilientRecordInserter::new(self.client.inner().clone(), InserterConfig::default());
576
let mut identities = self
577
.client
578
.inserter::<RawIdentityEvent>("raw_identity_events");
···
599
Err(_) => {
600
// Timeout - flush inserters to keep INSERT alive
601
trace!("flush timeout, committing inserters");
602
+
records.commit().await?;
0
0
0
603
identities
604
.commit()
605
.await
···
625
continue;
626
}
627
628
+
let json = match &record.record {
629
+
Some(v) => match serde_json::to_string(v) {
630
+
Ok(s) => s,
631
+
Err(e) => {
632
+
warn!(
633
+
did = %record.did,
634
+
collection = %record.collection,
635
+
rkey = %record.rkey,
636
+
error = ?e,
637
+
"failed to serialize record, sending to DLQ"
638
+
);
639
+
let raw_data = format!(
640
+
r#"{{"did":"{}","collection":"{}","rkey":"{}","cid":"{}","error":"serialization_failed"}}"#,
641
+
record.did, record.collection, record.rkey, record.cid
642
+
);
643
+
records
644
+
.write_raw_to_dlq(
645
+
record.action.as_str().to_smolstr(),
646
+
raw_data,
647
+
e.to_string(),
648
+
event_id,
649
+
)
650
+
.await?;
651
+
let _ = ack_tx.send(event_id).await;
652
+
continue;
653
+
}
654
+
},
655
+
None => "{}".to_string(),
656
+
};
657
658
debug!(
659
op = record.action.as_str(),
···
663
);
664
665
records
666
+
.write(RawRecordInsert {
667
did: record.did.clone(),
668
collection: record.collection.clone(),
669
rkey: record.rkey.clone(),
···
675
event_time: Utc::now(),
676
is_live: record.live,
677
})
678
+
.await?;
679
+
records.commit().await?;
0
0
0
0
0
0
0
680
681
processed += 1;
682
}
···
724
}
725
726
// Final flush
727
+
records.end().await?;
0
0
0
728
identities.end().await.map_err(|e| ClickHouseError::Query {
729
message: "final identities flush failed".into(),
730
source: e,
+1
-1
docker-compose.yml
···
33
ports:
34
- "3000:3000"
35
environment:
36
-
RUST_LOG: debug,weaver_index=debug
37
# ClickHouse connection (set these for your cloud/homelab instance)
38
CLICKHOUSE_URL: ${CLICKHOUSE_URL}
39
CLICKHOUSE_DATABASE: ${CLICKHOUSE_DATABASE:-weaver}
···
33
ports:
34
- "3000:3000"
35
environment:
36
+
RUST_LOG: debug,weaver_index=debug,hyper_util::client::legacy::pool=info
37
# ClickHouse connection (set these for your cloud/homelab instance)
38
CLICKHOUSE_URL: ${CLICKHOUSE_URL}
39
CLICKHOUSE_DATABASE: ${CLICKHOUSE_DATABASE:-weaver}