GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1//! Sync from Turso to local SQLite
2//! Full sync on startup, incremental sync every 5 minutes
3
4const std = @import("std");
5const io = std.Options.debug_io;
6const zqlite = @import("zqlite");
7const logfire = @import("logfire");
8const Allocator = std.mem.Allocator;
9const TursoClient = @import("TursoClient.zig");
10const LocalDb = @import("LocalDb.zig");
11const Col = LocalDb.Col;
12
13const log = std.log.scoped(.sync);
14
15fn timestamp() i64 {
16 return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s));
17}
18
19fn milliTimestamp() i64 {
20 return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_ms));
21}
22
23/// index of the first column appended after actor_cols (rowid or updated_at)
24const trailing_col: usize = Col.pds + 1;
25
26const BATCH_SIZE = 2000;
27const SYNC_INTERVAL_S = 300; // 5 minutes
28const TOMBSTONE_RETENTION_S = 7 * 24 * 3600; // 7 days — must match cron.ts pruning
29
30/// Full sync: fetch all searchable actors from Turso using keyset pagination.
31/// Does NOT run stale cleanup — that's handled by incremental sync (tombstones +
32/// "became unsearchable" queries). This avoids races with the dual-write path.
33/// When `wipe` is true, deletes all local data first (used by stale rebuild path).
34pub fn fullSync(turso: *TursoClient, local: *LocalDb, wipe: bool) !void {
35 const span = logfire.span("sync.full", .{});
36 defer span.end();
37
38 const conn = local.getConn() orelse {
39 span.setStatus(.@"error", "LocalNotOpen");
40 return error.LocalNotOpen;
41 };
42
43 // check if a previous full sync completed successfully
44 const was_completed = blk: {
45 local.lock();
46 defer local.unlock();
47 const row = conn.row(
48 "SELECT value FROM sync_meta WHERE key = 'sync_complete'",
49 .{},
50 ) catch break :blk false;
51 if (row) |r| {
52 defer r.deinit();
53 break :blk std.mem.eql(u8, r.text(0), "1");
54 }
55 break :blk false;
56 };
57
58 if (wipe or !was_completed) {
59 log.info("full_sync: taking bootstrap path (wipe={}, was_completed={})", .{ wipe, was_completed });
60 span.setAttribute("mode", "bootstrap");
61 return fullSyncBootstrap(turso, local, conn, wipe);
62 }
63
64 // re-sync path: previous sync completed, keep serving while re-syncing
65 log.info("full_sync: taking resync path", .{});
66 local.setReady(true);
67 span.setAttribute("mode", "resync");
68 return fullSyncResync(turso, local, conn);
69}
70
71/// Bootstrap path: load into staging tables (no indexes), build indexes in bulk,
72/// then atomically swap into place. Live schema is never in a degraded state.
73fn fullSyncBootstrap(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn, wipe: bool) !void {
74 const span = logfire.span("sync.bootstrap", .{});
75 defer span.end();
76
77 const total_t0 = milliTimestamp();
78 local.setReady(false);
79
80 // close read connection during bootstrap — not serving traffic,
81 // avoids WAL reader lock interference with DDL/index builds
82 local.closeReadConn();
83 errdefer local.reopenReadConn() catch {};
84
85 if (wipe) {
86 local.lock();
87 defer local.unlock();
88 conn.exec("DELETE FROM sync_meta", .{}) catch {};
89 }
90
91 // clean up any leftover staging tables from a previous failed attempt
92 {
93 local.lock();
94 defer local.unlock();
95 conn.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {};
96 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
97 }
98
99 // create staging table — no PK, no indexes for fast sequential appends
100 {
101 local.lock();
102 defer local.unlock();
103 conn.exec(
104 \\CREATE TABLE actors_stage (
105 \\ did TEXT NOT NULL,
106 \\ handle TEXT NOT NULL DEFAULT '',
107 \\ display_name TEXT DEFAULT '',
108 \\ avatar_url TEXT DEFAULT '',
109 \\ hidden INTEGER NOT NULL DEFAULT 0,
110 \\ labels TEXT NOT NULL DEFAULT '[]',
111 \\ created_at TEXT DEFAULT '',
112 \\ associated TEXT DEFAULT '{}',
113 \\ pds TEXT DEFAULT ''
114 \\)
115 , .{}) catch |err| {
116 span.recordError(err);
117 return err;
118 };
119 }
120
121 // keyset pagination: fetch rows WHERE rowid > last_rowid ORDER BY rowid
122 var actor_count: usize = 0;
123 var error_count: usize = 0;
124 var last_rowid: i64 = 0;
125 var had_turso_error = false;
126 const load_t0 = milliTimestamp();
127
128 while (true) {
129 var rowid_buf: [20]u8 = undefined;
130 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break;
131
132 const t0 = milliTimestamp();
133
134 // fetch from turso (no lock held)
135 var result = turso.query(
136 "SELECT " ++ LocalDb.actor_cols ++ ", rowid FROM actors WHERE handle != '' AND rowid > ? ORDER BY rowid LIMIT 2000",
137 &.{rowid_str},
138 ) catch |err| {
139 log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err });
140 had_turso_error = true;
141 break;
142 };
143 defer result.deinit();
144
145 const t1 = milliTimestamp();
146
147 if (result.rows.len == 0) break;
148
149 // write batch to staging table — no indexes, fast sequential appends
150 {
151 local.lock();
152 defer local.unlock();
153 conn.exec("BEGIN", .{}) catch {};
154 for (result.rows) |row| {
155 insertActorRow(conn, "INSERT INTO actors_stage", row) catch |err| {
156 log.err("insert actor failed: {}", .{err});
157 error_count += 1;
158 continue;
159 };
160 actor_count += 1;
161 }
162 conn.exec("COMMIT", .{}) catch {};
163 }
164
165 const t2 = milliTimestamp();
166
167 last_rowid = result.rows[result.rows.len - 1].int(trailing_col);
168
169 // high-frequency batch progress — keep on stderr only
170 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{
171 result.rows.len, last_rowid, actor_count,
172 t1 - t0, t2 - t1,
173 });
174 }
175
176 if (had_turso_error or error_count > 0) {
177 span.setAttribute("error_count", @as(i64, @intCast(error_count)));
178 span.setStatus(.@"error", "bootstrap incomplete");
179 // clean up staging table
180 {
181 local.lock();
182 defer local.unlock();
183 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
184 }
185 // restore read connection so health endpoint works
186 local.reopenReadConn() catch {};
187 return;
188 }
189
190 const load_ms = milliTimestamp() - load_t0;
191
192 // checkpoint WAL before heavy DDL — clean slate for index builds
193 {
194 local.lock();
195 defer local.unlock();
196 conn.exec("PRAGMA wal_checkpoint(TRUNCATE)", .{}) catch {};
197 }
198
199 // build indexes on staging table
200 const idx_did_t0 = milliTimestamp();
201 {
202 local.lock();
203 defer local.unlock();
204 conn.exec("CREATE UNIQUE INDEX idx_stage_did ON actors_stage(did)", .{}) catch |err| {
205 span.recordError(err);
206 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
207 return err;
208 };
209 }
210 const idx_did_ms = milliTimestamp() - idx_did_t0;
211
212 const idx_handle_t0 = milliTimestamp();
213 {
214 local.lock();
215 defer local.unlock();
216 conn.exec("CREATE INDEX idx_stage_handle ON actors_stage(handle COLLATE NOCASE)", .{}) catch |err| {
217 span.recordError(err);
218 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
219 return err;
220 };
221 }
222 const idx_handle_ms = milliTimestamp() - idx_handle_t0;
223
224 // swap actors table first, then build FTS with final name.
225 {
226 local.lock();
227 defer local.unlock();
228 conn.exec("BEGIN", .{}) catch {};
229 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {};
230 conn.exec("DROP TABLE IF EXISTS actors", .{}) catch {};
231 conn.exec("ALTER TABLE actors_stage RENAME TO actors", .{}) catch |err| {
232 span.recordError(err);
233 conn.exec("ROLLBACK", .{}) catch {};
234 return err;
235 };
236 conn.exec("COMMIT", .{}) catch |err| {
237 span.recordError(err);
238 return err;
239 };
240 }
241
242 // build FTS with its final name — no rename needed
243 const fts_create_t0 = milliTimestamp();
244 {
245 local.lock();
246 defer local.unlock();
247 conn.exec(
248 \\CREATE VIRTUAL TABLE actors_fts USING fts5(
249 \\ did UNINDEXED, handle, display_name,
250 \\ tokenize='unicode61 remove_diacritics 2'
251 \\)
252 , .{}) catch |err| {
253 span.recordError(err);
254 return err;
255 };
256 }
257 const fts_create_ms = milliTimestamp() - fts_create_t0;
258
259 const fts_pop_t0 = milliTimestamp();
260 {
261 local.lock();
262 defer local.unlock();
263 conn.exec(
264 \\INSERT INTO actors_fts (did, handle, display_name)
265 \\SELECT did, handle, display_name FROM actors WHERE handle != ''
266 , .{}) catch |err| {
267 span.recordError(err);
268 return err;
269 };
270 }
271 const fts_pop_ms = milliTimestamp() - fts_pop_t0;
272
273 // post-swap: optimize, record completion + actor count, checkpoint
274 {
275 local.lock();
276 defer local.unlock();
277 conn.exec("PRAGMA optimize", .{}) catch {};
278
279 var ts_buf: [20]u8 = undefined;
280 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0";
281 conn.exec(
282 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)",
283 .{ts_str},
284 ) catch {};
285 conn.exec(
286 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')",
287 .{},
288 ) catch {};
289
290 var count_buf: [20]u8 = undefined;
291 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0";
292 conn.exec(
293 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)",
294 .{count_str},
295 ) catch {};
296
297 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {};
298 }
299
300 const total_ms = milliTimestamp() - total_t0;
301
302 // reopen read connection before serving traffic
303 local.reopenReadConn() catch {};
304
305 local.setReady(true);
306 span.setAttribute("actors", @as(i64, @intCast(actor_count)));
307 span.setAttribute("load_ms", load_ms);
308 span.setAttribute("idx_did_ms", idx_did_ms);
309 span.setAttribute("idx_handle_ms", idx_handle_ms);
310 span.setAttribute("fts_create_ms", fts_create_ms);
311 span.setAttribute("fts_populate_ms", fts_pop_ms);
312 span.setAttribute("total_ms", total_ms);
313 span.setStatus(.ok, null);
314}
315
316/// Re-sync path: previous sync completed, update live table directly.
317/// Only processes actors that may have changed — index maintenance cost is negligible.
318fn fullSyncResync(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn) !void {
319 const span = logfire.span("sync.resync", .{});
320 defer span.end();
321
322 log.info("resync: starting full table resync", .{});
323
324 var actor_count: usize = 0;
325 var error_count: usize = 0;
326 var last_rowid: i64 = 0;
327 var had_turso_error = false;
328 var batch_num: usize = 0;
329
330 while (true) {
331 var rowid_buf: [20]u8 = undefined;
332 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break;
333
334 const q_span = logfire.span("sync.query.resync_batch", .{});
335 const t0 = milliTimestamp();
336
337 var result = turso.query(
338 "SELECT " ++ LocalDb.actor_cols ++ ", rowid FROM actors WHERE handle != '' AND rowid > ? ORDER BY rowid LIMIT 2000",
339 &.{rowid_str},
340 ) catch |err| {
341 log.err("resync: turso query failed at rowid {d}: {}", .{ last_rowid, err });
342 q_span.setStatus(.@"error", "query failed");
343 q_span.end();
344 had_turso_error = true;
345 break;
346 };
347 defer result.deinit();
348
349 const t1 = milliTimestamp();
350 q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len)));
351 q_span.setAttribute("fetch_ms", t1 - t0);
352 q_span.setStatus(.ok, null);
353 q_span.end();
354
355 if (result.rows.len == 0) break;
356
357 {
358 const a_span = logfire.span("sync.apply.resync_batch", .{});
359 defer a_span.end();
360
361 local.lock();
362 defer local.unlock();
363 conn.exec("BEGIN", .{}) catch {};
364 for (result.rows) |row| {
365 insertActorRow(conn, "INSERT OR REPLACE INTO actors", row) catch |err| {
366 log.err("resync: insert actor failed: {}", .{err});
367 error_count += 1;
368 continue;
369 };
370 actor_count += 1;
371 }
372 conn.exec("COMMIT", .{}) catch {};
373
374 a_span.setAttribute("rows", @as(i64, @intCast(result.rows.len)));
375 a_span.setStatus(.ok, null);
376 }
377
378 const t2 = milliTimestamp();
379 last_rowid = result.rows[result.rows.len - 1].int(trailing_col);
380 batch_num += 1;
381
382 log.info("resync: batch {d}: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{
383 batch_num, result.rows.len, last_rowid, actor_count,
384 t1 - t0, t2 - t1,
385 });
386 }
387
388 if (had_turso_error or error_count > 0) {
389 span.setAttribute("error_count", @as(i64, @intCast(error_count)));
390 span.setStatus(.@"error", "resync incomplete");
391 log.err("resync: incomplete (actors={d}, errors={d})", .{ actor_count, error_count });
392 return;
393 }
394
395 log.info("resync: download complete ({d} actors), rebuilding FTS", .{actor_count});
396
397 // rebuild FTS from scratch
398 {
399 const fts_span = logfire.span("sync.fts_rebuild", .{});
400 defer fts_span.end();
401
402 local.lock();
403 defer local.unlock();
404 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {};
405 conn.exec(
406 \\CREATE VIRTUAL TABLE actors_fts USING fts5(
407 \\ did UNINDEXED, handle, display_name,
408 \\ tokenize='unicode61 remove_diacritics 2'
409 \\)
410 , .{}) catch |err| {
411 fts_span.recordError(err);
412 };
413 conn.exec(
414 \\INSERT INTO actors_fts (did, handle, display_name)
415 \\SELECT did, handle, display_name FROM actors WHERE handle != ''
416 , .{}) catch |err| {
417 fts_span.recordError(err);
418 };
419 fts_span.setAttribute("actors", @as(i64, @intCast(actor_count)));
420 }
421
422 log.info("resync: FTS rebuild complete", .{});
423
424 // record sync time + mark complete + actor count
425 {
426 local.lock();
427 defer local.unlock();
428 var ts_buf: [20]u8 = undefined;
429 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0";
430 conn.exec(
431 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)",
432 .{ts_str},
433 ) catch {};
434 conn.exec(
435 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')",
436 .{},
437 ) catch {};
438
439 var count_buf: [20]u8 = undefined;
440 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0";
441 conn.exec(
442 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)",
443 .{count_str},
444 ) catch {};
445
446 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {};
447 }
448
449 local.setReady(true);
450 span.setAttribute("actors", @as(i64, @intCast(actor_count)));
451 span.setStatus(.ok, null);
452 log.info("resync: complete ({d} actors)", .{actor_count});
453}
454
455/// Incremental sync: fetch actors updated since last sync + tombstones.
456/// Uses keyset pagination on (updated_at, did) to drain all updates.
457/// Only advances watermark when all turso queries succeed and all local writes succeed.
458pub fn incrementalSync(turso: *TursoClient, local: *LocalDb) !void {
459 const span = logfire.span("sync.incremental", .{});
460 defer span.end();
461
462 const conn = local.getConn() orelse {
463 span.setStatus(.@"error", "LocalNotOpen");
464 return error.LocalNotOpen;
465 };
466
467 // get last sync time
468 const last_sync_ts = blk: {
469 const s = logfire.span("sync.read_last_sync", .{});
470 defer s.end();
471 local.lock();
472 defer local.unlock();
473 const row = conn.row(
474 "SELECT value FROM sync_meta WHERE key = 'last_sync'",
475 .{},
476 ) catch {
477 s.setStatus(.@"error", "query failed");
478 break :blk @as(i64, 0);
479 };
480 if (row) |r| {
481 defer r.deinit();
482 const val = r.text(0);
483 const ts = if (val.len == 0) 0 else std.fmt.parseInt(i64, val, 10) catch 0;
484 s.setAttribute("last_sync", ts);
485 s.setStatus(.ok, null);
486 break :blk ts;
487 }
488 s.setStatus(.ok, null);
489 break :blk @as(i64, 0);
490 };
491
492 log.info("incremental: last_sync={d}", .{last_sync_ts});
493
494 if (last_sync_ts == 0) {
495 log.info("incremental: no last_sync, falling back to full sync", .{});
496 span.setAttribute("fallback", "full_sync");
497 return fullSync(turso, local, false);
498 }
499
500 // if last sync is older than tombstone retention, local data is too stale
501 const now = timestamp();
502 if (now - last_sync_ts > TOMBSTONE_RETENTION_S) {
503 log.info("incremental: stale ({d}s old), falling back to full sync with wipe", .{now - last_sync_ts});
504 span.setAttribute("fallback", "stale_wipe");
505 return fullSync(turso, local, true);
506 }
507
508 local.setReady(true);
509
510 // buffer: subtract 300s to catch stragglers (matches leaflet-search)
511 const since_ts = last_sync_ts - 300;
512 var since_buf: [20]u8 = undefined;
513 const since_str = std.fmt.bufPrint(&since_buf, "{d}", .{since_ts}) catch return;
514
515 // fetch updated searchable actors — keyset pagination to drain all
516 var updated: usize = 0;
517 var error_count: usize = 0;
518 var had_turso_error = false;
519 {
520 var cursor_ts_buf: [20]u8 = undefined;
521 var cursor_did_buf: [128]u8 = undefined;
522 @memcpy(cursor_ts_buf[0..since_str.len], since_str);
523 var cursor_ts: []const u8 = cursor_ts_buf[0..since_str.len];
524 var cursor_did_len: usize = 0;
525
526 while (true) {
527 const cursor_did: []const u8 = if (cursor_did_len > 0) cursor_did_buf[0..cursor_did_len] else "";
528
529 const args: []const []const u8 = if (cursor_did_len > 0)
530 &.{ cursor_ts, cursor_ts, cursor_did }
531 else
532 &.{cursor_ts};
533
534 const sql = if (cursor_did_len > 0)
535 "SELECT " ++ LocalDb.actor_cols ++ ", updated_at FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3)) ORDER BY updated_at, did LIMIT 2000"
536 else
537 "SELECT " ++ LocalDb.actor_cols ++ ", updated_at FROM actors WHERE handle != '' AND updated_at > ?1 ORDER BY updated_at, did LIMIT 2000"
538 ;
539
540 log.info("incremental: querying updated actors (cursor_ts={s})", .{cursor_ts});
541 const q_span = logfire.span("sync.query.updated", .{});
542
543 var result = turso.query(sql, args) catch |err| {
544 log.err("incremental query failed: {}", .{err});
545 q_span.setStatus(.@"error", "query failed");
546 q_span.end();
547 had_turso_error = true;
548 break;
549 };
550 defer result.deinit();
551
552 q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len)));
553 q_span.setStatus(.ok, null);
554 q_span.end();
555
556 if (result.rows.len == 0) break;
557
558 log.info("incremental: applying {d} updated actors", .{result.rows.len});
559 {
560 const a_span = logfire.span("sync.apply.updated", .{});
561 defer a_span.end();
562
563 local.lock();
564 defer local.unlock();
565 conn.exec("BEGIN", .{}) catch {};
566 for (result.rows) |row| {
567 insertActorRow(conn, "INSERT OR REPLACE INTO actors", row) catch {
568 error_count += 1;
569 continue;
570 };
571 updated += 1;
572 }
573 conn.exec("COMMIT", .{}) catch {};
574
575 a_span.setAttribute("rows", @as(i64, @intCast(result.rows.len)));
576 a_span.setStatus(.ok, null);
577 }
578
579 const last_row = result.rows[result.rows.len - 1];
580 const last_ua_int = last_row.int(trailing_col);
581 const last_did = last_row.text(Col.did);
582
583 // serialize updated_at integer to string for next query
584 const ua_str = std.fmt.bufPrint(&cursor_ts_buf, "{d}", .{last_ua_int}) catch break;
585 cursor_ts = ua_str;
586
587 if (last_did.len > 0 and last_did.len <= cursor_did_buf.len) {
588 @memcpy(cursor_did_buf[0..last_did.len], last_did);
589 cursor_did_len = last_did.len;
590 }
591
592 // safety: stop if batch was less than full (we've drained)
593 if (result.rows.len < BATCH_SIZE) break;
594 }
595 }
596
597 // fetch actors that became unsearchable (handle cleared)
598 var cleared: usize = 0;
599 unsearchable: {
600 log.info("incremental: querying unsearchable actors", .{});
601 const q_span = logfire.span("sync.query.unsearchable", .{});
602
603 var result = turso.query(
604 "SELECT did FROM actors WHERE updated_at > ? AND handle = ''",
605 &.{since_str},
606 ) catch |err| {
607 log.err("unsearchable query failed: {}", .{err});
608 q_span.setStatus(.@"error", "query failed");
609 q_span.end();
610 had_turso_error = true;
611 break :unsearchable;
612 };
613 defer result.deinit();
614
615 q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len)));
616 q_span.setStatus(.ok, null);
617 q_span.end();
618
619 if (result.rows.len > 0) {
620 log.info("incremental: clearing {d} unsearchable actors", .{result.rows.len});
621 local.lock();
622 defer local.unlock();
623 conn.exec("BEGIN", .{}) catch {};
624 for (result.rows) |row| {
625 const did = row.text(0);
626 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {
627 error_count += 1;
628 continue;
629 };
630 cleared += 1;
631 }
632 conn.exec("COMMIT", .{}) catch {};
633 }
634 }
635
636 // fetch tombstones
637 var deleted: usize = 0;
638 tombstone: {
639 log.info("incremental: querying tombstones", .{});
640 const q_span = logfire.span("sync.query.tombstones", .{});
641
642 var tomb_result = turso.query(
643 "SELECT did FROM tombstones WHERE deleted_at > ?",
644 &.{since_str},
645 ) catch |err| {
646 log.err("tombstone query failed: {}", .{err});
647 q_span.setStatus(.@"error", "query failed");
648 q_span.end();
649 had_turso_error = true;
650 break :tombstone;
651 };
652 defer tomb_result.deinit();
653
654 q_span.setAttribute("rows", @as(i64, @intCast(tomb_result.rows.len)));
655 q_span.setStatus(.ok, null);
656 q_span.end();
657
658 if (tomb_result.rows.len > 0) {
659 log.info("incremental: deleting {d} tombstoned actors", .{tomb_result.rows.len});
660 local.lock();
661 defer local.unlock();
662 conn.exec("BEGIN", .{}) catch {};
663 for (tomb_result.rows) |row| {
664 const did = row.text(0);
665 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {
666 error_count += 1;
667 continue;
668 };
669 deleted += 1;
670 }
671 conn.exec("COMMIT", .{}) catch {};
672 }
673 }
674
675 // rebuild FTS from actors table if anything changed.
676 // per-row FTS maintenance is O(N) per delete because `did` is UNINDEXED —
677 // bulk rebuild is O(N) total regardless of how many rows changed.
678 if (updated > 0 or cleared > 0 or deleted > 0) {
679 // checkpoint WAL before FTS rebuild — large WAL from batch deletes
680 // makes the INSERT SELECT scan extremely slow
681 {
682 local.lock();
683 defer local.unlock();
684 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {};
685 }
686
687 const fts_span = logfire.span("sync.fts_rebuild", .{});
688 log.info("incremental: rebuilding FTS ({d} changes)", .{updated + cleared + deleted});
689 {
690 local.lock();
691 defer local.unlock();
692 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {};
693 conn.exec(
694 \\CREATE VIRTUAL TABLE actors_fts USING fts5(
695 \\ did UNINDEXED, handle, display_name,
696 \\ tokenize='unicode61 remove_diacritics 2'
697 \\)
698 , .{}) catch |err| {
699 log.err("FTS rebuild: create failed: {}", .{err});
700 fts_span.recordError(err);
701 fts_span.setStatus(.@"error", "create failed");
702 fts_span.end();
703 return;
704 };
705 conn.exec(
706 \\INSERT INTO actors_fts (did, handle, display_name)
707 \\SELECT did, handle, display_name FROM actors WHERE handle != ''
708 , .{}) catch |err| {
709 log.err("FTS rebuild: populate failed: {}", .{err});
710 fts_span.recordError(err);
711 fts_span.setStatus(.@"error", "populate failed");
712 fts_span.end();
713 return;
714 };
715 }
716 fts_span.setStatus(.ok, null);
717 fts_span.end();
718 log.info("incremental: FTS rebuild complete", .{});
719 }
720
721 // only advance watermark when everything succeeded
722 const had_error = had_turso_error or error_count > 0;
723 if (had_error) {
724 span.setAttribute("error_count", @as(i64, @intCast(error_count)));
725 span.setStatus(.@"error", "incremental sync had errors");
726 log.err("incremental: done with errors (updated={d}, cleared={d}, deleted={d}, errors={d})", .{
727 updated, cleared, deleted, error_count,
728 });
729 } else {
730 local.lock();
731 defer local.unlock();
732 var ts_buf: [20]u8 = undefined;
733 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0";
734 conn.exec(
735 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)",
736 .{ts_str},
737 ) catch {};
738
739 // refresh cached actor count when data changed
740 if (updated > 0 or deleted > 0 or cleared > 0) {
741 updateActorCount(conn);
742 }
743
744 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {};
745 span.setStatus(.ok, null);
746 log.info("incremental: done (updated={d}, cleared={d}, deleted={d})", .{
747 updated, cleared, deleted,
748 });
749 }
750
751 span.setAttribute("updated", @as(i64, @intCast(updated)));
752 span.setAttribute("deleted", @as(i64, @intCast(deleted)));
753 span.setAttribute("cleared", @as(i64, @intCast(cleared)));
754}
755
756/// Background sync loop: full sync on boot, then incremental every 5 min
757pub fn syncLoop(allocator: Allocator, local: *LocalDb) void {
758 log.info("sync loop started", .{});
759 var turso = TursoClient.init(allocator) catch |err| {
760 log.err("turso client init failed: {}, sync disabled", .{err});
761 return;
762 };
763 defer turso.deinit();
764
765 // initial sync (full or incremental depending on state)
766 incrementalSync(&turso, local) catch |err| {
767 log.err("initial sync failed: {}", .{err});
768 };
769
770 // periodic incremental sync
771 while (true) {
772 io.sleep(.{ .nanoseconds = SYNC_INTERVAL_S * std.time.ns_per_s }, .real) catch {};
773
774 incrementalSync(&turso, local) catch |err| {
775 log.err("periodic sync failed: {}", .{err});
776 };
777 }
778}
779
780/// Insert actor row into a table. `prefix` is the INSERT clause, e.g.
781/// "INSERT INTO actors_stage" or "INSERT OR REPLACE INTO actors".
782fn insertActorRow(conn: zqlite.Conn, comptime prefix: []const u8, row: anytype) !void {
783 conn.exec(prefix ++ " (" ++ LocalDb.actor_cols ++ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", .{
784 row.text(Col.did),
785 row.text(Col.handle),
786 row.text(Col.display_name),
787 row.text(Col.avatar_url),
788 row.int(Col.hidden),
789 row.text(Col.labels),
790 row.text(Col.created_at),
791 row.text(Col.associated),
792 row.text(Col.pds),
793 }) catch |err| return err;
794}
795
796/// Update cached actor count in sync_meta (avoids COUNT(*) on health checks)
797fn updateActorCount(conn: zqlite.Conn) void {
798 const row = conn.row("SELECT COUNT(*) FROM actors WHERE handle != ''", .{}) catch return;
799 if (row) |r| {
800 defer r.deinit();
801 var buf: [20]u8 = undefined;
802 const count_str = std.fmt.bufPrint(&buf, "{d}", .{r.int(0)}) catch return;
803 conn.exec(
804 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)",
805 .{count_str},
806 ) catch {};
807 }
808}