tangled
alpha
login
or
join now
nonbinary.computer
/
weaver
atproto blogging
24
fork
atom
overview
issues
2
pulls
pipelines
couple fixes
Orual
1 month ago
0186121d
25871687
+10
-9
2 changed files
expand all
collapse all
unified
split
crates
weaver-index
src
clickhouse.rs
parallel_tap.rs
+3
-1
crates/weaver-index/src/clickhouse.rs
···
6
6
7
7
pub use client::{Client, TableSize};
8
8
pub use migrations::{DbObject, MigrationResult, Migrator, ObjectType};
9
9
-
pub use queries::{EntryRow, HandleMappingRow, NotebookRow, ProfileCountsRow, ProfileRow, ProfileWithCounts};
9
9
+
pub use queries::{
10
10
+
EntryRow, HandleMappingRow, NotebookRow, ProfileCountsRow, ProfileRow, ProfileWithCounts,
11
11
+
};
10
12
pub use resilient_inserter::{InserterConfig, ResilientRecordInserter};
11
13
pub use schema::{
12
14
AccountRevState, FirehoseCursor, RawAccountEvent, RawEventDlq, RawIdentityEvent,
+7
-8
crates/weaver-index/src/parallel_tap.rs
···
1
1
-
use std::sync::atomic::{AtomicBool, Ordering};
2
1
use std::sync::Arc;
2
2
+
use std::sync::atomic::{AtomicBool, Ordering};
3
3
use std::time::{Duration, Instant};
4
4
5
5
use chrono::Utc;
···
7
7
use tokio::task::JoinHandle;
8
8
use tracing::{debug, error, info, trace, warn};
9
9
10
10
-
use crate::clickhouse::migrations::Migrator;
11
10
use crate::clickhouse::{
12
12
-
Client, InserterConfig, RawIdentityEvent, RawRecordInsert, ResilientRecordInserter,
11
11
+
Client, InserterConfig, Migrator, RawIdentityEvent, RawRecordInsert, ResilientRecordInserter,
13
12
};
14
13
use crate::config::{IndexerConfig, TapConfig};
15
14
use crate::error::{ClickHouseError, Result};
···
239
238
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
240
239
.is_ok()
241
240
{
242
242
-
info!(
243
243
-
worker_id,
244
244
-
"first live event received, scheduling backfill"
245
245
-
);
241
241
+
info!(worker_id, "first live event received, scheduling backfill");
246
242
let backfill_client = client.clone();
247
243
tokio::spawn(async move {
248
244
run_backfill(backfill_client).await;
···
310
306
return;
311
307
}
312
308
313
313
-
info!(count = mvs.len(), "backfill: starting incremental MV backfill");
309
309
+
info!(
310
310
+
count = mvs.len(),
311
311
+
"backfill: starting incremental MV backfill"
312
312
+
);
314
313
315
314
for mv in mvs {
316
315
info!(