this repo has no description
1const std = @import("std");
2const sqlite = @import("sqlite");
3const uuid = @import("uuid");
4const xev = @import("xev");
5
6const log = @import("log.zig");
7const irc = @import("irc.zig");
8
9const Allocator = std.mem.Allocator;
10const Channel = irc.Channel;
11const ChannelPrivileges = irc.ChannelPrivileges;
12const ChatHistory = irc.ChatHistory;
13const Connection = Server.Connection;
14const HeapArena = @import("HeapArena.zig");
15const Message = irc.Message;
16const Server = @import("Server.zig");
17const Timestamp = irc.Timestamp;
18const User = irc.User;
19const WorkerQueue = Server.WorkerQueue;
20
21const schema = @embedFile("schema.sql");
22
23/// Called on first db connection
24pub fn createTables(conn: sqlite.Conn) anyerror!void {
25 try conn.execNoArgs(schema);
26}
27
28/// Called for each db connection
29pub fn setPragmas(conn: sqlite.Conn) anyerror!void {
30 try conn.busyTimeout(5000);
31 try conn.execNoArgs("PRAGMA synchronous = normal");
32 try conn.execNoArgs("PRAGMA journal_mode = wal");
33 try conn.execNoArgs("PRAGMA foreign_keys = on");
34}
35
36/// Called in the main thread when the server starts. This loads all the channels in the server
37/// and stores them in memory
38pub fn loadChannels(server: *Server) !void {
39 const conn = server.db_pool.acquire();
40 defer server.db_pool.release(conn);
41 var rows = try conn.rows("SELECT name, topic FROM channels", .{});
42 defer rows.deinit();
43 while (rows.next()) |row| {
44 const channel = try server.gpa.create(Channel);
45 channel.* = .{
46 .name = try server.gpa.dupe(u8, row.text(0)),
47 .topic = try server.gpa.dupe(u8, row.text(1)),
48 .members = .empty,
49 .streams = .empty,
50 };
51 try server.channels.put(server.gpa, channel.name, channel);
52 }
53}
54
55/// Called in the main thread when the server starts. This loads all the users in the server
56/// and stores them in memory
57pub fn loadUsers(server: *Server) !void {
58 const conn = server.db_pool.acquire();
59 defer server.db_pool.release(conn);
60 var rows = try conn.rows("SELECT did, nick, modes FROM users", .{});
61 defer rows.deinit();
62 while (rows.next()) |row| {
63 const user = try server.gpa.create(User);
64 const modes: u1 = @intCast(row.int(2));
65 user.* = .{
66 .username = try server.gpa.dupe(u8, row.text(0)),
67 .nick = try server.gpa.dupe(u8, row.text(1)),
68 .real = "",
69 .avatar_url = "",
70 .connections = .empty,
71 .channels = .empty,
72 .away = false,
73 .modes = @bitCast(modes),
74 };
75 try server.nick_map.put(server.gpa, user.nick, user);
76 }
77}
78
79/// Called in the main thread when the server starts. This loads all the channel memberships in
80/// the server
81pub fn loadChannelMembership(server: *Server) !void {
82 const conn = server.db_pool.acquire();
83 defer server.db_pool.release(conn);
84 const sql =
85 \\SELECT u.nick, c.name, cm.privileges
86 \\FROM channel_membership cm
87 \\JOIN users u ON cm.user_id = u.id
88 \\JOIN channels c ON cm.channel_id = c.id;
89 ;
90 var rows = try conn.rows(sql, .{});
91 defer rows.deinit();
92 while (rows.next()) |row| {
93 const nick = row.text(0);
94 const ch_name = row.text(1);
95 const privileges_val: u1 = @intCast(row.int(2));
96 const privileges: ChannelPrivileges = @bitCast(privileges_val);
97
98 const user = server.nick_map.get(nick) orelse {
99 log.warn("user with nick {s} not found", .{nick});
100 continue;
101 };
102 const channel = server.channels.get(ch_name) orelse {
103 log.warn("channel with name {s} not found", .{ch_name});
104 continue;
105 };
106
107 // Add the user to the channel
108 try channel.members.append(server.gpa, .{ .user = user, .privileges = privileges });
109 // Add the channel to the user
110 try user.channels.append(server.gpa, channel);
111 }
112}
113
114pub fn updatePrivileges(
115 pool: *sqlite.Pool,
116 user: *User,
117 privs: ChannelPrivileges,
118 channel: []const u8,
119) !void {
120 const conn = pool.acquire();
121 defer pool.release(conn);
122
123 const sql =
124 \\UPDATE channel_membership
125 \\SET privileges = ?
126 \\WHERE channel_id = (
127 \\ SELECT id FROM channels WHERE name = ?
128 \\)
129 \\AND user_id = (
130 \\ SELECT id FROM users WHERE nick = ?
131 \\);
132 ;
133 const privs_as_int: u1 = @bitCast(privs);
134 conn.exec(sql, .{ privs_as_int, channel, user.nick }) catch |err| {
135 log.err("updating privileges: {}: {s}", .{ err, conn.lastError() });
136 return;
137 };
138}
139
140/// Checks if a user is already in the db. If they are, checks their nick is the same. Updates
141/// it as needed.
142///
143/// Creates a user if they don't exist
144pub fn storeUser(pool: *sqlite.Pool, user: *User) !void {
145 const conn = pool.acquire();
146 defer pool.release(conn);
147
148 // First we see if the user exists
149 const maybe_row = conn.row("SELECT id, nick FROM users WHERE did = ?;", .{user.username}) catch |err| {
150 log.err("finding user: {}: {s}", .{ err, conn.lastError() });
151 return;
152 };
153 if (maybe_row) |row| {
154 defer row.deinit();
155 const nick = row.text(1);
156 // If the nick is the same, we are done
157 if (std.mem.eql(u8, nick, user.nick)) return;
158 const id = row.int(0);
159 // They aren't equal. Update the nick
160 conn.exec("UPDATE users SET nick = ? WHERE id = ?;", .{ user.nick, id }) catch |err| {
161 log.err("updating user nick: {}: {s}", .{ err, conn.lastError() });
162 return;
163 };
164 return;
165 }
166
167 // This is a new user. Create them
168 conn.exec("INSERT INTO users (did, nick) VALUES (?, ?);", .{ user.username, user.nick }) catch |err| {
169 log.err("creating user: {}: {s}", .{ err, conn.lastError() });
170 return;
171 };
172}
173
174/// Creates a channel
175pub fn createChannel(arena: HeapArena, pool: *sqlite.Pool, channel: []const u8) !void {
176 defer arena.deinit();
177 const conn = pool.acquire();
178 defer pool.release(conn);
179 conn.exec("INSERT OR IGNORE INTO channels (name) VALUES (?);", .{channel}) catch |err| {
180 log.err("creating channel: {}: {s}", .{ err, conn.lastError() });
181 return;
182 };
183}
184
185pub fn createChannelMembership(pool: *sqlite.Pool, channel: []const u8, nick: []const u8) !void {
186 const conn = pool.acquire();
187 defer pool.release(conn);
188 const sql =
189 \\INSERT OR IGNORE INTO channel_membership (user_id, channel_id)
190 \\SELECT u.id, c.id
191 \\FROM users u
192 \\JOIN channels c ON c.name = ? -- Channel name
193 \\WHERE u.nick = ?; -- User nick
194 ;
195 conn.exec(sql, .{ channel, nick }) catch |err| {
196 log.err("creating channel membership: {}: {s}", .{ err, conn.lastError() });
197 return;
198 };
199}
200
201pub fn removeChannelMembership(pool: *sqlite.Pool, channel: []const u8, nick: []const u8) !void {
202 const conn = pool.acquire();
203 defer pool.release(conn);
204 const sql =
205 \\DELETE FROM channel_membership
206 \\WHERE user_id = (SELECT id FROM users WHERE nick = ?)
207 \\ AND channel_id = (SELECT id FROM channels WHERE name = ?);
208 ;
209 conn.exec(sql, .{ nick, channel }) catch |err| {
210 log.err("creating channel membership: {}: {s}", .{ err, conn.lastError() });
211 return;
212 };
213}
214
215/// Stores a message between two users
216pub fn storePrivateMessage(
217 arena: HeapArena,
218 pool: *sqlite.Pool,
219 sender_nick: []const u8,
220 target: []const u8,
221 msg: irc.Message,
222) !void {
223 defer arena.deinit();
224 const sql =
225 \\INSERT INTO messages (uuid, timestamp_ms, sender_id, sender_nick, recipient_id, recipient_type, message)
226 \\VALUES (
227 \\ ?, -- uuid
228 \\ ?, -- timestamp_ms
229 \\ (SELECT id FROM users WHERE nick = ?), -- sender_id
230 \\ ?, -- sender_nick
231 \\ (SELECT id FROM users WHERE nick = ?), -- recipient_id
232 \\ 1, -- recipient_type (user to user)
233 \\ ? -- message
234 \\);
235 ;
236
237 const urn = uuid.urn.serialize(msg.uuid);
238 const conn = pool.acquire();
239 defer pool.release(conn);
240 conn.exec(sql, .{
241 &urn,
242 msg.timestamp.milliseconds,
243 sender_nick,
244 sender_nick,
245 target,
246 msg.bytes,
247 }) catch |err| {
248 log.err("storing message: {}: {s}", .{ err, conn.lastError() });
249 return;
250 };
251}
252
253/// Stores a message to a channel
254pub fn storeChannelMessage(
255 arena: HeapArena,
256 pool: *sqlite.Pool,
257 sender_nick: []const u8,
258 target: []const u8,
259 msg: irc.Message,
260) !void {
261 defer arena.deinit();
262 const sql =
263 \\INSERT INTO messages (uuid, timestamp_ms, sender_id, sender_nick, recipient_id, recipient_type, message)
264 \\VALUES (
265 \\ ?, -- uuid
266 \\ ?, -- timestamp_ms
267 \\ (SELECT id FROM users WHERE nick = ?), -- sender_id
268 \\ ?, -- sender_nick
269 \\ (SELECT id FROM channels WHERE name = ?), -- recipient_id
270 \\ 0, -- recipient_type (0 = channel message)
271 \\ ? -- message
272 \\);
273 ;
274
275 const urn = uuid.urn.serialize(msg.uuid);
276 const conn = pool.acquire();
277 defer pool.release(conn);
278 conn.exec(sql, .{
279 &urn,
280 msg.timestamp.milliseconds,
281 sender_nick,
282 sender_nick,
283 target,
284 msg.bytes,
285 }) catch |err| {
286 log.err("storing message: {}: {s}", .{ err, conn.lastError() });
287 return;
288 };
289}
290
291pub fn chathistoryTargets(
292 arena: HeapArena,
293 pool: *sqlite.Pool,
294 queue: *WorkerQueue,
295 fd: xev.TCP,
296 nick: []const u8,
297 req: ChatHistory.TargetsRequest,
298) !void {
299 errdefer {
300 // On error, we send an empty target response
301 queue.push(.{
302 .history_targets = .{ .arena = arena, .fd = fd, .items = &.{} },
303 });
304 }
305 const db_conn = pool.acquire();
306 defer pool.release(db_conn);
307
308 var results: std.ArrayListUnmanaged(irc.ChatHistory.Target) = .empty;
309
310 {
311 // First we get all users we've had exchanges with over the time period
312 const sql =
313 \\WITH user_id AS (
314 \\ SELECT id FROM users WHERE nick = ?
315 \\)
316 \\SELECT
317 \\ u1.nick AS sender_nick,
318 \\ u2.nick AS recipient_nick,
319 \\ MAX(m.timestamp_ms) AS latest_timestamp
320 \\FROM messages m
321 \\JOIN users u1 ON m.sender_id = u1.id
322 \\JOIN users u2 ON m.recipient_id = u2.id
323 \\WHERE (m.sender_id = (SELECT id FROM user_id)
324 \\ OR m.recipient_id = (SELECT id FROM user_id))
325 \\ AND m.recipient_type = 1
326 \\ AND m.timestamp_ms BETWEEN ? AND ?
327 \\GROUP BY u1.nick, u2.nick;
328 ;
329
330 var rows = db_conn.rows(
331 sql,
332 .{ nick, req.from.milliseconds, req.to.milliseconds },
333 ) catch |err| {
334 log.err("querying messages: {}: {s}", .{ err, db_conn.lastError() });
335 return;
336 };
337 defer rows.deinit();
338
339 while (rows.next()) |row| {
340 const sender = row.text(0);
341 const recpt = row.text(1);
342 const ts = row.int(2);
343 // We report whichever isn't *us*
344 if (std.ascii.eqlIgnoreCase(sender, nick)) {
345 // We are the sender, report recpt
346 const result: ChatHistory.Target = .{
347 .nick_or_channel = try arena.allocator().dupe(u8, recpt),
348 .latest_timestamp = .{ .milliseconds = ts },
349 };
350 try results.append(arena.allocator(), result);
351 } else {
352 // We are the recpt, report sender
353 const result: ChatHistory.Target = .{
354 .nick_or_channel = try arena.allocator().dupe(u8, sender),
355 .latest_timestamp = .{ .milliseconds = ts },
356 };
357 try results.append(arena.allocator(), result);
358 }
359 }
360 }
361
362 {
363 // Next we get all the channels we are a member of and the latest message
364 const sql =
365 \\WITH user_id AS (
366 \\ SELECT id FROM users WHERE nick = ?
367 \\)
368 \\SELECT
369 \\ c.name AS channel_name,
370 \\ MAX(m.timestamp_ms) AS latest_timestamp
371 \\FROM messages m
372 \\JOIN channels c ON m.recipient_id = c.id
373 \\JOIN channel_membership cm ON cm.channel_id = c.id
374 \\WHERE cm.user_id = (SELECT id FROM user_id)
375 \\ AND m.recipient_type = 0 -- recipient_type for channels
376 \\ AND m.timestamp_ms BETWEEN ? AND ?
377 \\GROUP BY c.name;
378 ;
379
380 var rows = db_conn.rows(
381 sql,
382 .{ nick, req.from.milliseconds, req.to.milliseconds },
383 ) catch |err| {
384 log.err("querying messages: {}: {s}", .{ err, db_conn.lastError() });
385 return;
386 };
387 defer rows.deinit();
388
389 while (rows.next()) |row| {
390 const channel = row.text(0);
391 const ts = row.int(1);
392 const result: ChatHistory.Target = .{
393 .nick_or_channel = try arena.allocator().dupe(u8, channel),
394 .latest_timestamp = .{ .milliseconds = ts },
395 };
396 try results.append(arena.allocator(), result);
397 }
398 }
399
400 // If the number of results is too many, we sort and truncate
401 if (results.items.len > req.limit) {
402 // TODO: Sort and prune
403 }
404
405 const batch: ChatHistory.TargetBatch = .{
406 .arena = arena,
407 .fd = fd,
408 .items = results.items,
409 };
410
411 queue.push(.{ .history_targets = batch });
412}
413
414pub fn chathistoryAfter(
415 arena: HeapArena,
416 pool: *sqlite.Pool,
417 queue: *WorkerQueue,
418 fd: xev.TCP,
419 req: ChatHistory.AfterRequest,
420) !void {
421 errdefer {
422 queue.push(.{
423 .history_batch = .{ .fd = fd, .arena = arena, .items = &.{}, .target = req.target },
424 });
425 }
426 const sql = switch (req.target[0]) {
427 '#' =>
428 \\SELECT
429 \\ uuid,
430 \\ timestamp_ms,
431 \\ sender_nick,
432 \\ message
433 \\FROM messages m
434 \\WHERE recipient_type = 0
435 \\AND recipient_id = (SELECT id FROM channels WHERE name = ?)
436 \\AND m.timestamp_ms > ?
437 \\ORDER BY timestamp_ms ASC
438 \\LIMIT ?;
439 ,
440 else =>
441 \\SELECT
442 \\ uuid,
443 \\ timestamp_ms,
444 \\ sender_nick,
445 \\ message
446 \\FROM messages m
447 \\WHERE recipient_type = 1
448 \\AND recipient_id = (SELECT id FROM users WHERE nick = ?)
449 \\AND m.timestamp_ms > ?
450 \\ORDER BY timestamp_ms ASC
451 \\LIMIT ?;
452 ,
453 };
454
455 const conn = pool.acquire();
456 defer pool.release(conn);
457
458 var rows = conn.rows(sql, .{ req.target, req.after_ms.milliseconds, req.limit }) catch |err| {
459 log.err("querying messages: {}: {s}", .{ err, conn.lastError() });
460 return err;
461 };
462 defer rows.deinit();
463
464 try collectChathistoryRows(arena, queue, &rows, req.target, fd, req.limit);
465}
466
467pub fn chathistoryBefore(
468 arena: HeapArena,
469 pool: *sqlite.Pool,
470 queue: *WorkerQueue,
471 fd: xev.TCP,
472 req: ChatHistory.BeforeRequest,
473) !void {
474 errdefer {
475 queue.push(.{
476 .history_batch = .{ .fd = fd, .arena = arena, .items = &.{}, .target = req.target },
477 });
478 }
479 const sql = switch (req.target[0]) {
480 '#' =>
481 \\SELECT
482 \\ uuid,
483 \\ timestamp_ms,
484 \\ sender_nick,
485 \\ message
486 \\FROM messages m
487 \\WHERE recipient_type = 0
488 \\AND recipient_id = (SELECT id FROM channels WHERE name = ?)
489 \\AND m.timestamp_ms < ?
490 \\ORDER BY timestamp_ms DESC
491 \\LIMIT ?;
492 ,
493 else =>
494 \\SELECT
495 \\ uuid,
496 \\ timestamp_ms,
497 \\ sender_nick,
498 \\ message
499 \\FROM messages m
500 \\WHERE recipient_type = 1
501 \\AND recipient_id = (SELECT id FROM users WHERE nick = ?)
502 \\AND m.timestamp_ms < ?
503 \\ORDER BY timestamp_ms DESC
504 \\LIMIT ?;
505 ,
506 };
507
508 const conn = pool.acquire();
509 defer pool.release(conn);
510
511 var rows = conn.rows(sql, .{ req.target, req.before_ms.milliseconds, req.limit }) catch |err| {
512 log.err("querying messages: {}: {s}", .{ err, conn.lastError() });
513 return err;
514 };
515 defer rows.deinit();
516
517 try collectChathistoryRows(arena, queue, &rows, req.target, fd, req.limit);
518}
519
520pub fn chathistoryLatest(
521 arena: HeapArena,
522 pool: *sqlite.Pool,
523 queue: *WorkerQueue,
524 fd: xev.TCP,
525 req: ChatHistory.LatestRequest,
526) !void {
527 errdefer {
528 queue.push(.{
529 .history_batch = .{ .fd = fd, .arena = arena, .items = &.{}, .target = req.target },
530 });
531 }
532 const sql = switch (req.target[0]) {
533 '#' =>
534 \\SELECT
535 \\ uuid,
536 \\ timestamp_ms,
537 \\ sender_nick,
538 \\ message
539 \\FROM messages m
540 \\WHERE recipient_type = 0
541 \\AND recipient_id = (SELECT id FROM channels WHERE name = ?)
542 \\ORDER BY m.timestamp_ms DESC
543 \\LIMIT ?;
544 ,
545 else =>
546 \\SELECT
547 \\ uuid,
548 \\ timestamp_ms,
549 \\ sender_nick,
550 \\ message
551 \\FROM messages m
552 \\WHERE recipient_type = 1
553 \\AND recipient_id = (SELECT id FROM users WHERE nick = ?)
554 \\ORDER BY m.timestamp_ms DESC
555 \\LIMIT ?;
556 ,
557 };
558
559 const conn = pool.acquire();
560 defer pool.release(conn);
561 var rows = conn.rows(sql, .{ req.target, req.limit }) catch |err| {
562 log.err("querying messages: {}: {s}", .{ err, conn.lastError() });
563 return err;
564 };
565 defer rows.deinit();
566
567 try collectChathistoryRows(arena, queue, &rows, req.target, fd, req.limit);
568}
569
570fn collectChathistoryRows(
571 arena: HeapArena,
572 queue: *WorkerQueue,
573 rows: *sqlite.Rows,
574 target: []const u8,
575 fd: xev.TCP,
576 limit: u16,
577) Allocator.Error!void {
578 var msgs = try std.ArrayListUnmanaged(ChatHistory.HistoryMessage).initCapacity(
579 arena.allocator(),
580 limit,
581 );
582
583 while (rows.next()) |row| {
584 const msg: ChatHistory.HistoryMessage = .{
585 .uuid = try arena.allocator().dupe(u8, row.text(0)),
586 .timestamp = .{ .milliseconds = row.int(1) },
587 .sender = try arena.allocator().dupe(u8, row.text(2)),
588 .message = try arena.allocator().dupe(u8, row.text(3)),
589 };
590 msgs.appendAssumeCapacity(msg);
591 }
592
593 // Sort to ascending
594 std.sort.insertion(
595 ChatHistory.HistoryMessage,
596 msgs.items,
597 {},
598 ChatHistory.HistoryMessage.lessThan,
599 );
600
601 const batch: ChatHistory.HistoryBatch = .{
602 .fd = fd,
603 .arena = arena,
604 .items = msgs.items,
605 .target = target,
606 };
607
608 queue.push(.{ .history_batch = batch });
609}
610
611pub fn setMarkRead(
612 arena: HeapArena,
613 pool: *sqlite.Pool,
614 queue: *WorkerQueue,
615 fd: xev.TCP,
616 nick: []const u8,
617 target: []const u8,
618 ts: Timestamp,
619) !void {
620 errdefer arena.deinit();
621 if (target.len == 0) return error.NoTarget;
622 const sql = switch (target[0]) {
623 '#' =>
624 \\INSERT INTO read_marker (user_id, target_id, target_kind, timestamp_ms)
625 \\VALUES (
626 \\ (SELECT id FROM users WHERE nick = ?),
627 \\ (SELECT id FROM channels WHERE name = ?),
628 \\ 0,
629 \\ ?
630 \\)
631 \\ON CONFLICT(user_id, target_id, target_kind)
632 \\DO UPDATE SET timestamp_ms = excluded.timestamp_ms;
633 ,
634 else =>
635 \\INSERT INTO read_marker (user_id, target_id, target_kind, timestamp_ms)
636 \\VALUES (
637 \\ (SELECT id FROM users WHERE nick = ?),
638 \\ (SELECT id FROM users WHERE nick = ?),
639 \\ 1,
640 \\ ?
641 \\)
642 \\ON CONFLICT(user_id, target_id, target_kind)
643 \\DO UPDATE SET timestamp_ms = excluded.timestamp_ms;
644 };
645
646 const db_conn = pool.acquire();
647 defer pool.release(db_conn);
648
649 db_conn.exec(sql, .{ nick, target, ts.milliseconds }) catch |err| {
650 log.err("setting mark read: {}: {s}", .{ err, db_conn.lastError() });
651 return;
652 };
653
654 queue.push(.{ .mark_read = .{
655 .arena = arena,
656 .fd = fd,
657 .target = target,
658 .timestamp = ts,
659 } });
660}
661
662pub fn getMarkRead(
663 arena: HeapArena,
664 pool: *sqlite.Pool,
665 queue: *WorkerQueue,
666 fd: xev.TCP,
667 nick: []const u8,
668 target: []const u8,
669) !void {
670 errdefer arena.deinit();
671 if (target.len == 0) return error.NoTarget;
672 const sql = switch (target[0]) {
673 '#' =>
674 \\SELECT timestamp_ms
675 \\FROM read_marker
676 \\WHERE user_id = (SELECT id FROM users WHERE nick = ?)
677 \\AND target_id = (SELECT id FROM channels WHERE name = ?)
678 \\AND target_kind = 0;
679 ,
680 else =>
681 \\SELECT timestamp_ms
682 \\FROM read_marker
683 \\WHERE user_id = (SELECT id FROM users WHERE nick = ?)
684 \\AND target_id = (SELECT id FROM users WHERE nick = ?)
685 \\AND target_kind = 1;
686 };
687
688 const db_conn = pool.acquire();
689 defer pool.release(db_conn);
690
691 const maybe_row = db_conn.row(sql, .{ nick, target }) catch |err| {
692 log.err("setting mark read: {}: {s}", .{ err, db_conn.lastError() });
693 return err;
694 };
695
696 const timestamp: ?Timestamp = if (maybe_row) |row| blk: {
697 defer row.deinit();
698 break :blk .{ .milliseconds = row.int(0) };
699 } else null;
700
701 queue.push(.{
702 .mark_read = .{
703 .arena = arena,
704 .fd = fd,
705 .target = target,
706 .timestamp = timestamp,
707 },
708 });
709}
710
711pub fn updateTopic(server: *Server, channel: []const u8, topic: []const u8) !void {
712 const sql =
713 \\UPDATE channels
714 \\SET topic = ?
715 \\WHERE name = ?;
716 ;
717 const db_conn = server.db_pool.acquire();
718 defer server.db_pool.release(db_conn);
719 db_conn.exec(sql, .{ topic, channel }) catch |err| {
720 log.err("updating topic: {}: {s}", .{ err, db_conn.lastError() });
721 };
722}