GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
at main 808 lines 30 kB view raw
1//! Sync from Turso to local SQLite 2//! Full sync on startup, incremental sync every 5 minutes 3 4const std = @import("std"); 5const io = std.Options.debug_io; 6const zqlite = @import("zqlite"); 7const logfire = @import("logfire"); 8const Allocator = std.mem.Allocator; 9const TursoClient = @import("TursoClient.zig"); 10const LocalDb = @import("LocalDb.zig"); 11const Col = LocalDb.Col; 12 13const log = std.log.scoped(.sync); 14 15fn timestamp() i64 { 16 return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 17} 18 19fn milliTimestamp() i64 { 20 return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_ms)); 21} 22 23/// index of the first column appended after actor_cols (rowid or updated_at) 24const trailing_col: usize = Col.pds + 1; 25 26const BATCH_SIZE = 2000; 27const SYNC_INTERVAL_S = 300; // 5 minutes 28const TOMBSTONE_RETENTION_S = 7 * 24 * 3600; // 7 days — must match cron.ts pruning 29 30/// Full sync: fetch all searchable actors from Turso using keyset pagination. 31/// Does NOT run stale cleanup — that's handled by incremental sync (tombstones + 32/// "became unsearchable" queries). This avoids races with the dual-write path. 33/// When `wipe` is true, deletes all local data first (used by stale rebuild path). 34pub fn fullSync(turso: *TursoClient, local: *LocalDb, wipe: bool) !void { 35 const span = logfire.span("sync.full", .{}); 36 defer span.end(); 37 38 const conn = local.getConn() orelse { 39 span.setStatus(.@"error", "LocalNotOpen"); 40 return error.LocalNotOpen; 41 }; 42 43 // check if a previous full sync completed successfully 44 const was_completed = blk: { 45 local.lock(); 46 defer local.unlock(); 47 const row = conn.row( 48 "SELECT value FROM sync_meta WHERE key = 'sync_complete'", 49 .{}, 50 ) catch break :blk false; 51 if (row) |r| { 52 defer r.deinit(); 53 break :blk std.mem.eql(u8, r.text(0), "1"); 54 } 55 break :blk false; 56 }; 57 58 if (wipe or !was_completed) { 59 log.info("full_sync: taking bootstrap path (wipe={}, was_completed={})", .{ wipe, was_completed }); 60 span.setAttribute("mode", "bootstrap"); 61 return fullSyncBootstrap(turso, local, conn, wipe); 62 } 63 64 // re-sync path: previous sync completed, keep serving while re-syncing 65 log.info("full_sync: taking resync path", .{}); 66 local.setReady(true); 67 span.setAttribute("mode", "resync"); 68 return fullSyncResync(turso, local, conn); 69} 70 71/// Bootstrap path: load into staging tables (no indexes), build indexes in bulk, 72/// then atomically swap into place. Live schema is never in a degraded state. 73fn fullSyncBootstrap(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn, wipe: bool) !void { 74 const span = logfire.span("sync.bootstrap", .{}); 75 defer span.end(); 76 77 const total_t0 = milliTimestamp(); 78 local.setReady(false); 79 80 // close read connection during bootstrap — not serving traffic, 81 // avoids WAL reader lock interference with DDL/index builds 82 local.closeReadConn(); 83 errdefer local.reopenReadConn() catch {}; 84 85 if (wipe) { 86 local.lock(); 87 defer local.unlock(); 88 conn.exec("DELETE FROM sync_meta", .{}) catch {}; 89 } 90 91 // clean up any leftover staging tables from a previous failed attempt 92 { 93 local.lock(); 94 defer local.unlock(); 95 conn.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {}; 96 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 97 } 98 99 // create staging table — no PK, no indexes for fast sequential appends 100 { 101 local.lock(); 102 defer local.unlock(); 103 conn.exec( 104 \\CREATE TABLE actors_stage ( 105 \\ did TEXT NOT NULL, 106 \\ handle TEXT NOT NULL DEFAULT '', 107 \\ display_name TEXT DEFAULT '', 108 \\ avatar_url TEXT DEFAULT '', 109 \\ hidden INTEGER NOT NULL DEFAULT 0, 110 \\ labels TEXT NOT NULL DEFAULT '[]', 111 \\ created_at TEXT DEFAULT '', 112 \\ associated TEXT DEFAULT '{}', 113 \\ pds TEXT DEFAULT '' 114 \\) 115 , .{}) catch |err| { 116 span.recordError(err); 117 return err; 118 }; 119 } 120 121 // keyset pagination: fetch rows WHERE rowid > last_rowid ORDER BY rowid 122 var actor_count: usize = 0; 123 var error_count: usize = 0; 124 var last_rowid: i64 = 0; 125 var had_turso_error = false; 126 const load_t0 = milliTimestamp(); 127 128 while (true) { 129 var rowid_buf: [20]u8 = undefined; 130 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 131 132 const t0 = milliTimestamp(); 133 134 // fetch from turso (no lock held) 135 var result = turso.query( 136 "SELECT " ++ LocalDb.actor_cols ++ ", rowid FROM actors WHERE handle != '' AND rowid > ? ORDER BY rowid LIMIT 2000", 137 &.{rowid_str}, 138 ) catch |err| { 139 log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err }); 140 had_turso_error = true; 141 break; 142 }; 143 defer result.deinit(); 144 145 const t1 = milliTimestamp(); 146 147 if (result.rows.len == 0) break; 148 149 // write batch to staging table — no indexes, fast sequential appends 150 { 151 local.lock(); 152 defer local.unlock(); 153 conn.exec("BEGIN", .{}) catch {}; 154 for (result.rows) |row| { 155 insertActorRow(conn, "INSERT INTO actors_stage", row) catch |err| { 156 log.err("insert actor failed: {}", .{err}); 157 error_count += 1; 158 continue; 159 }; 160 actor_count += 1; 161 } 162 conn.exec("COMMIT", .{}) catch {}; 163 } 164 165 const t2 = milliTimestamp(); 166 167 last_rowid = result.rows[result.rows.len - 1].int(trailing_col); 168 169 // high-frequency batch progress — keep on stderr only 170 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 171 result.rows.len, last_rowid, actor_count, 172 t1 - t0, t2 - t1, 173 }); 174 } 175 176 if (had_turso_error or error_count > 0) { 177 span.setAttribute("error_count", @as(i64, @intCast(error_count))); 178 span.setStatus(.@"error", "bootstrap incomplete"); 179 // clean up staging table 180 { 181 local.lock(); 182 defer local.unlock(); 183 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 184 } 185 // restore read connection so health endpoint works 186 local.reopenReadConn() catch {}; 187 return; 188 } 189 190 const load_ms = milliTimestamp() - load_t0; 191 192 // checkpoint WAL before heavy DDL — clean slate for index builds 193 { 194 local.lock(); 195 defer local.unlock(); 196 conn.exec("PRAGMA wal_checkpoint(TRUNCATE)", .{}) catch {}; 197 } 198 199 // build indexes on staging table 200 const idx_did_t0 = milliTimestamp(); 201 { 202 local.lock(); 203 defer local.unlock(); 204 conn.exec("CREATE UNIQUE INDEX idx_stage_did ON actors_stage(did)", .{}) catch |err| { 205 span.recordError(err); 206 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 207 return err; 208 }; 209 } 210 const idx_did_ms = milliTimestamp() - idx_did_t0; 211 212 const idx_handle_t0 = milliTimestamp(); 213 { 214 local.lock(); 215 defer local.unlock(); 216 conn.exec("CREATE INDEX idx_stage_handle ON actors_stage(handle COLLATE NOCASE)", .{}) catch |err| { 217 span.recordError(err); 218 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 219 return err; 220 }; 221 } 222 const idx_handle_ms = milliTimestamp() - idx_handle_t0; 223 224 // swap actors table first, then build FTS with final name. 225 { 226 local.lock(); 227 defer local.unlock(); 228 conn.exec("BEGIN", .{}) catch {}; 229 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 230 conn.exec("DROP TABLE IF EXISTS actors", .{}) catch {}; 231 conn.exec("ALTER TABLE actors_stage RENAME TO actors", .{}) catch |err| { 232 span.recordError(err); 233 conn.exec("ROLLBACK", .{}) catch {}; 234 return err; 235 }; 236 conn.exec("COMMIT", .{}) catch |err| { 237 span.recordError(err); 238 return err; 239 }; 240 } 241 242 // build FTS with its final name — no rename needed 243 const fts_create_t0 = milliTimestamp(); 244 { 245 local.lock(); 246 defer local.unlock(); 247 conn.exec( 248 \\CREATE VIRTUAL TABLE actors_fts USING fts5( 249 \\ did UNINDEXED, handle, display_name, 250 \\ tokenize='unicode61 remove_diacritics 2' 251 \\) 252 , .{}) catch |err| { 253 span.recordError(err); 254 return err; 255 }; 256 } 257 const fts_create_ms = milliTimestamp() - fts_create_t0; 258 259 const fts_pop_t0 = milliTimestamp(); 260 { 261 local.lock(); 262 defer local.unlock(); 263 conn.exec( 264 \\INSERT INTO actors_fts (did, handle, display_name) 265 \\SELECT did, handle, display_name FROM actors WHERE handle != '' 266 , .{}) catch |err| { 267 span.recordError(err); 268 return err; 269 }; 270 } 271 const fts_pop_ms = milliTimestamp() - fts_pop_t0; 272 273 // post-swap: optimize, record completion + actor count, checkpoint 274 { 275 local.lock(); 276 defer local.unlock(); 277 conn.exec("PRAGMA optimize", .{}) catch {}; 278 279 var ts_buf: [20]u8 = undefined; 280 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0"; 281 conn.exec( 282 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 283 .{ts_str}, 284 ) catch {}; 285 conn.exec( 286 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')", 287 .{}, 288 ) catch {}; 289 290 var count_buf: [20]u8 = undefined; 291 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0"; 292 conn.exec( 293 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 294 .{count_str}, 295 ) catch {}; 296 297 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 298 } 299 300 const total_ms = milliTimestamp() - total_t0; 301 302 // reopen read connection before serving traffic 303 local.reopenReadConn() catch {}; 304 305 local.setReady(true); 306 span.setAttribute("actors", @as(i64, @intCast(actor_count))); 307 span.setAttribute("load_ms", load_ms); 308 span.setAttribute("idx_did_ms", idx_did_ms); 309 span.setAttribute("idx_handle_ms", idx_handle_ms); 310 span.setAttribute("fts_create_ms", fts_create_ms); 311 span.setAttribute("fts_populate_ms", fts_pop_ms); 312 span.setAttribute("total_ms", total_ms); 313 span.setStatus(.ok, null); 314} 315 316/// Re-sync path: previous sync completed, update live table directly. 317/// Only processes actors that may have changed — index maintenance cost is negligible. 318fn fullSyncResync(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn) !void { 319 const span = logfire.span("sync.resync", .{}); 320 defer span.end(); 321 322 log.info("resync: starting full table resync", .{}); 323 324 var actor_count: usize = 0; 325 var error_count: usize = 0; 326 var last_rowid: i64 = 0; 327 var had_turso_error = false; 328 var batch_num: usize = 0; 329 330 while (true) { 331 var rowid_buf: [20]u8 = undefined; 332 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 333 334 const q_span = logfire.span("sync.query.resync_batch", .{}); 335 const t0 = milliTimestamp(); 336 337 var result = turso.query( 338 "SELECT " ++ LocalDb.actor_cols ++ ", rowid FROM actors WHERE handle != '' AND rowid > ? ORDER BY rowid LIMIT 2000", 339 &.{rowid_str}, 340 ) catch |err| { 341 log.err("resync: turso query failed at rowid {d}: {}", .{ last_rowid, err }); 342 q_span.setStatus(.@"error", "query failed"); 343 q_span.end(); 344 had_turso_error = true; 345 break; 346 }; 347 defer result.deinit(); 348 349 const t1 = milliTimestamp(); 350 q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 351 q_span.setAttribute("fetch_ms", t1 - t0); 352 q_span.setStatus(.ok, null); 353 q_span.end(); 354 355 if (result.rows.len == 0) break; 356 357 { 358 const a_span = logfire.span("sync.apply.resync_batch", .{}); 359 defer a_span.end(); 360 361 local.lock(); 362 defer local.unlock(); 363 conn.exec("BEGIN", .{}) catch {}; 364 for (result.rows) |row| { 365 insertActorRow(conn, "INSERT OR REPLACE INTO actors", row) catch |err| { 366 log.err("resync: insert actor failed: {}", .{err}); 367 error_count += 1; 368 continue; 369 }; 370 actor_count += 1; 371 } 372 conn.exec("COMMIT", .{}) catch {}; 373 374 a_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 375 a_span.setStatus(.ok, null); 376 } 377 378 const t2 = milliTimestamp(); 379 last_rowid = result.rows[result.rows.len - 1].int(trailing_col); 380 batch_num += 1; 381 382 log.info("resync: batch {d}: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 383 batch_num, result.rows.len, last_rowid, actor_count, 384 t1 - t0, t2 - t1, 385 }); 386 } 387 388 if (had_turso_error or error_count > 0) { 389 span.setAttribute("error_count", @as(i64, @intCast(error_count))); 390 span.setStatus(.@"error", "resync incomplete"); 391 log.err("resync: incomplete (actors={d}, errors={d})", .{ actor_count, error_count }); 392 return; 393 } 394 395 log.info("resync: download complete ({d} actors), rebuilding FTS", .{actor_count}); 396 397 // rebuild FTS from scratch 398 { 399 const fts_span = logfire.span("sync.fts_rebuild", .{}); 400 defer fts_span.end(); 401 402 local.lock(); 403 defer local.unlock(); 404 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 405 conn.exec( 406 \\CREATE VIRTUAL TABLE actors_fts USING fts5( 407 \\ did UNINDEXED, handle, display_name, 408 \\ tokenize='unicode61 remove_diacritics 2' 409 \\) 410 , .{}) catch |err| { 411 fts_span.recordError(err); 412 }; 413 conn.exec( 414 \\INSERT INTO actors_fts (did, handle, display_name) 415 \\SELECT did, handle, display_name FROM actors WHERE handle != '' 416 , .{}) catch |err| { 417 fts_span.recordError(err); 418 }; 419 fts_span.setAttribute("actors", @as(i64, @intCast(actor_count))); 420 } 421 422 log.info("resync: FTS rebuild complete", .{}); 423 424 // record sync time + mark complete + actor count 425 { 426 local.lock(); 427 defer local.unlock(); 428 var ts_buf: [20]u8 = undefined; 429 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0"; 430 conn.exec( 431 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 432 .{ts_str}, 433 ) catch {}; 434 conn.exec( 435 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')", 436 .{}, 437 ) catch {}; 438 439 var count_buf: [20]u8 = undefined; 440 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0"; 441 conn.exec( 442 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 443 .{count_str}, 444 ) catch {}; 445 446 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 447 } 448 449 local.setReady(true); 450 span.setAttribute("actors", @as(i64, @intCast(actor_count))); 451 span.setStatus(.ok, null); 452 log.info("resync: complete ({d} actors)", .{actor_count}); 453} 454 455/// Incremental sync: fetch actors updated since last sync + tombstones. 456/// Uses keyset pagination on (updated_at, did) to drain all updates. 457/// Only advances watermark when all turso queries succeed and all local writes succeed. 458pub fn incrementalSync(turso: *TursoClient, local: *LocalDb) !void { 459 const span = logfire.span("sync.incremental", .{}); 460 defer span.end(); 461 462 const conn = local.getConn() orelse { 463 span.setStatus(.@"error", "LocalNotOpen"); 464 return error.LocalNotOpen; 465 }; 466 467 // get last sync time 468 const last_sync_ts = blk: { 469 const s = logfire.span("sync.read_last_sync", .{}); 470 defer s.end(); 471 local.lock(); 472 defer local.unlock(); 473 const row = conn.row( 474 "SELECT value FROM sync_meta WHERE key = 'last_sync'", 475 .{}, 476 ) catch { 477 s.setStatus(.@"error", "query failed"); 478 break :blk @as(i64, 0); 479 }; 480 if (row) |r| { 481 defer r.deinit(); 482 const val = r.text(0); 483 const ts = if (val.len == 0) 0 else std.fmt.parseInt(i64, val, 10) catch 0; 484 s.setAttribute("last_sync", ts); 485 s.setStatus(.ok, null); 486 break :blk ts; 487 } 488 s.setStatus(.ok, null); 489 break :blk @as(i64, 0); 490 }; 491 492 log.info("incremental: last_sync={d}", .{last_sync_ts}); 493 494 if (last_sync_ts == 0) { 495 log.info("incremental: no last_sync, falling back to full sync", .{}); 496 span.setAttribute("fallback", "full_sync"); 497 return fullSync(turso, local, false); 498 } 499 500 // if last sync is older than tombstone retention, local data is too stale 501 const now = timestamp(); 502 if (now - last_sync_ts > TOMBSTONE_RETENTION_S) { 503 log.info("incremental: stale ({d}s old), falling back to full sync with wipe", .{now - last_sync_ts}); 504 span.setAttribute("fallback", "stale_wipe"); 505 return fullSync(turso, local, true); 506 } 507 508 local.setReady(true); 509 510 // buffer: subtract 300s to catch stragglers (matches leaflet-search) 511 const since_ts = last_sync_ts - 300; 512 var since_buf: [20]u8 = undefined; 513 const since_str = std.fmt.bufPrint(&since_buf, "{d}", .{since_ts}) catch return; 514 515 // fetch updated searchable actors — keyset pagination to drain all 516 var updated: usize = 0; 517 var error_count: usize = 0; 518 var had_turso_error = false; 519 { 520 var cursor_ts_buf: [20]u8 = undefined; 521 var cursor_did_buf: [128]u8 = undefined; 522 @memcpy(cursor_ts_buf[0..since_str.len], since_str); 523 var cursor_ts: []const u8 = cursor_ts_buf[0..since_str.len]; 524 var cursor_did_len: usize = 0; 525 526 while (true) { 527 const cursor_did: []const u8 = if (cursor_did_len > 0) cursor_did_buf[0..cursor_did_len] else ""; 528 529 const args: []const []const u8 = if (cursor_did_len > 0) 530 &.{ cursor_ts, cursor_ts, cursor_did } 531 else 532 &.{cursor_ts}; 533 534 const sql = if (cursor_did_len > 0) 535 "SELECT " ++ LocalDb.actor_cols ++ ", updated_at FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3)) ORDER BY updated_at, did LIMIT 2000" 536 else 537 "SELECT " ++ LocalDb.actor_cols ++ ", updated_at FROM actors WHERE handle != '' AND updated_at > ?1 ORDER BY updated_at, did LIMIT 2000" 538 ; 539 540 log.info("incremental: querying updated actors (cursor_ts={s})", .{cursor_ts}); 541 const q_span = logfire.span("sync.query.updated", .{}); 542 543 var result = turso.query(sql, args) catch |err| { 544 log.err("incremental query failed: {}", .{err}); 545 q_span.setStatus(.@"error", "query failed"); 546 q_span.end(); 547 had_turso_error = true; 548 break; 549 }; 550 defer result.deinit(); 551 552 q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 553 q_span.setStatus(.ok, null); 554 q_span.end(); 555 556 if (result.rows.len == 0) break; 557 558 log.info("incremental: applying {d} updated actors", .{result.rows.len}); 559 { 560 const a_span = logfire.span("sync.apply.updated", .{}); 561 defer a_span.end(); 562 563 local.lock(); 564 defer local.unlock(); 565 conn.exec("BEGIN", .{}) catch {}; 566 for (result.rows) |row| { 567 insertActorRow(conn, "INSERT OR REPLACE INTO actors", row) catch { 568 error_count += 1; 569 continue; 570 }; 571 updated += 1; 572 } 573 conn.exec("COMMIT", .{}) catch {}; 574 575 a_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 576 a_span.setStatus(.ok, null); 577 } 578 579 const last_row = result.rows[result.rows.len - 1]; 580 const last_ua_int = last_row.int(trailing_col); 581 const last_did = last_row.text(Col.did); 582 583 // serialize updated_at integer to string for next query 584 const ua_str = std.fmt.bufPrint(&cursor_ts_buf, "{d}", .{last_ua_int}) catch break; 585 cursor_ts = ua_str; 586 587 if (last_did.len > 0 and last_did.len <= cursor_did_buf.len) { 588 @memcpy(cursor_did_buf[0..last_did.len], last_did); 589 cursor_did_len = last_did.len; 590 } 591 592 // safety: stop if batch was less than full (we've drained) 593 if (result.rows.len < BATCH_SIZE) break; 594 } 595 } 596 597 // fetch actors that became unsearchable (handle cleared) 598 var cleared: usize = 0; 599 unsearchable: { 600 log.info("incremental: querying unsearchable actors", .{}); 601 const q_span = logfire.span("sync.query.unsearchable", .{}); 602 603 var result = turso.query( 604 "SELECT did FROM actors WHERE updated_at > ? AND handle = ''", 605 &.{since_str}, 606 ) catch |err| { 607 log.err("unsearchable query failed: {}", .{err}); 608 q_span.setStatus(.@"error", "query failed"); 609 q_span.end(); 610 had_turso_error = true; 611 break :unsearchable; 612 }; 613 defer result.deinit(); 614 615 q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 616 q_span.setStatus(.ok, null); 617 q_span.end(); 618 619 if (result.rows.len > 0) { 620 log.info("incremental: clearing {d} unsearchable actors", .{result.rows.len}); 621 local.lock(); 622 defer local.unlock(); 623 conn.exec("BEGIN", .{}) catch {}; 624 for (result.rows) |row| { 625 const did = row.text(0); 626 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 627 error_count += 1; 628 continue; 629 }; 630 cleared += 1; 631 } 632 conn.exec("COMMIT", .{}) catch {}; 633 } 634 } 635 636 // fetch tombstones 637 var deleted: usize = 0; 638 tombstone: { 639 log.info("incremental: querying tombstones", .{}); 640 const q_span = logfire.span("sync.query.tombstones", .{}); 641 642 var tomb_result = turso.query( 643 "SELECT did FROM tombstones WHERE deleted_at > ?", 644 &.{since_str}, 645 ) catch |err| { 646 log.err("tombstone query failed: {}", .{err}); 647 q_span.setStatus(.@"error", "query failed"); 648 q_span.end(); 649 had_turso_error = true; 650 break :tombstone; 651 }; 652 defer tomb_result.deinit(); 653 654 q_span.setAttribute("rows", @as(i64, @intCast(tomb_result.rows.len))); 655 q_span.setStatus(.ok, null); 656 q_span.end(); 657 658 if (tomb_result.rows.len > 0) { 659 log.info("incremental: deleting {d} tombstoned actors", .{tomb_result.rows.len}); 660 local.lock(); 661 defer local.unlock(); 662 conn.exec("BEGIN", .{}) catch {}; 663 for (tomb_result.rows) |row| { 664 const did = row.text(0); 665 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 666 error_count += 1; 667 continue; 668 }; 669 deleted += 1; 670 } 671 conn.exec("COMMIT", .{}) catch {}; 672 } 673 } 674 675 // rebuild FTS from actors table if anything changed. 676 // per-row FTS maintenance is O(N) per delete because `did` is UNINDEXED — 677 // bulk rebuild is O(N) total regardless of how many rows changed. 678 if (updated > 0 or cleared > 0 or deleted > 0) { 679 // checkpoint WAL before FTS rebuild — large WAL from batch deletes 680 // makes the INSERT SELECT scan extremely slow 681 { 682 local.lock(); 683 defer local.unlock(); 684 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 685 } 686 687 const fts_span = logfire.span("sync.fts_rebuild", .{}); 688 log.info("incremental: rebuilding FTS ({d} changes)", .{updated + cleared + deleted}); 689 { 690 local.lock(); 691 defer local.unlock(); 692 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 693 conn.exec( 694 \\CREATE VIRTUAL TABLE actors_fts USING fts5( 695 \\ did UNINDEXED, handle, display_name, 696 \\ tokenize='unicode61 remove_diacritics 2' 697 \\) 698 , .{}) catch |err| { 699 log.err("FTS rebuild: create failed: {}", .{err}); 700 fts_span.recordError(err); 701 fts_span.setStatus(.@"error", "create failed"); 702 fts_span.end(); 703 return; 704 }; 705 conn.exec( 706 \\INSERT INTO actors_fts (did, handle, display_name) 707 \\SELECT did, handle, display_name FROM actors WHERE handle != '' 708 , .{}) catch |err| { 709 log.err("FTS rebuild: populate failed: {}", .{err}); 710 fts_span.recordError(err); 711 fts_span.setStatus(.@"error", "populate failed"); 712 fts_span.end(); 713 return; 714 }; 715 } 716 fts_span.setStatus(.ok, null); 717 fts_span.end(); 718 log.info("incremental: FTS rebuild complete", .{}); 719 } 720 721 // only advance watermark when everything succeeded 722 const had_error = had_turso_error or error_count > 0; 723 if (had_error) { 724 span.setAttribute("error_count", @as(i64, @intCast(error_count))); 725 span.setStatus(.@"error", "incremental sync had errors"); 726 log.err("incremental: done with errors (updated={d}, cleared={d}, deleted={d}, errors={d})", .{ 727 updated, cleared, deleted, error_count, 728 }); 729 } else { 730 local.lock(); 731 defer local.unlock(); 732 var ts_buf: [20]u8 = undefined; 733 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0"; 734 conn.exec( 735 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 736 .{ts_str}, 737 ) catch {}; 738 739 // refresh cached actor count when data changed 740 if (updated > 0 or deleted > 0 or cleared > 0) { 741 updateActorCount(conn); 742 } 743 744 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 745 span.setStatus(.ok, null); 746 log.info("incremental: done (updated={d}, cleared={d}, deleted={d})", .{ 747 updated, cleared, deleted, 748 }); 749 } 750 751 span.setAttribute("updated", @as(i64, @intCast(updated))); 752 span.setAttribute("deleted", @as(i64, @intCast(deleted))); 753 span.setAttribute("cleared", @as(i64, @intCast(cleared))); 754} 755 756/// Background sync loop: full sync on boot, then incremental every 5 min 757pub fn syncLoop(allocator: Allocator, local: *LocalDb) void { 758 log.info("sync loop started", .{}); 759 var turso = TursoClient.init(allocator) catch |err| { 760 log.err("turso client init failed: {}, sync disabled", .{err}); 761 return; 762 }; 763 defer turso.deinit(); 764 765 // initial sync (full or incremental depending on state) 766 incrementalSync(&turso, local) catch |err| { 767 log.err("initial sync failed: {}", .{err}); 768 }; 769 770 // periodic incremental sync 771 while (true) { 772 io.sleep(.{ .nanoseconds = SYNC_INTERVAL_S * std.time.ns_per_s }, .real) catch {}; 773 774 incrementalSync(&turso, local) catch |err| { 775 log.err("periodic sync failed: {}", .{err}); 776 }; 777 } 778} 779 780/// Insert actor row into a table. `prefix` is the INSERT clause, e.g. 781/// "INSERT INTO actors_stage" or "INSERT OR REPLACE INTO actors". 782fn insertActorRow(conn: zqlite.Conn, comptime prefix: []const u8, row: anytype) !void { 783 conn.exec(prefix ++ " (" ++ LocalDb.actor_cols ++ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", .{ 784 row.text(Col.did), 785 row.text(Col.handle), 786 row.text(Col.display_name), 787 row.text(Col.avatar_url), 788 row.int(Col.hidden), 789 row.text(Col.labels), 790 row.text(Col.created_at), 791 row.text(Col.associated), 792 row.text(Col.pds), 793 }) catch |err| return err; 794} 795 796/// Update cached actor count in sync_meta (avoids COUNT(*) on health checks) 797fn updateActorCount(conn: zqlite.Conn) void { 798 const row = conn.row("SELECT COUNT(*) FROM actors WHERE handle != ''", .{}) catch return; 799 if (row) |r| { 800 defer r.deinit(); 801 var buf: [20]u8 = undefined; 802 const count_str = std.fmt.bufPrint(&buf, "{d}", .{r.int(0)}) catch return; 803 conn.exec( 804 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 805 .{count_str}, 806 ) catch {}; 807 } 808}