this repo has no description
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

irc: implement CHATHISTORY TARGETS

rockorager.dev 94424b28 80979c5a

verified
+166 -6
+33 -3
src/Server.zig
··· 49 49 msg: []const u8, 50 50 }, 51 51 history_batch: ChatHistory.HistoryBatch, 52 + history_targets: ChatHistory.TargetBatch, 52 53 mark_read: struct { 53 54 conn: *Connection, 54 55 target: []const u8, ··· 293 294 v.arena.deinit(); 294 295 self.gpa.free(v.target); 295 296 }, 297 + .history_targets => |v| { 298 + v.arena.deinit(); 299 + }, 296 300 .mark_read => |v| { 297 301 self.gpa.free(v.target); 298 302 }, ··· 388 392 self.gpa, 389 393 ":{s} BATCH -{d} chathistory {s}\r\n", 390 394 .{ self.hostname, batch_id, v.target }, 395 + ) catch @panic("TODO"); 396 + 397 + self.queueWrite(v.conn.client, v.conn) catch {}; 398 + }, 399 + .history_targets => |v| { 400 + defer v.arena.deinit(); 401 + const batch_id = self.next_batch; 402 + self.next_batch +|= 1; 403 + v.conn.print( 404 + self.gpa, 405 + ":{s} BATCH +{d} draft/chathistory-targets\r\n", 406 + .{ self.hostname, batch_id }, 407 + ) catch @panic("TODO"); 408 + for (v.items) |target| { 409 + v.conn.print( 410 + self.gpa, 411 + "@batch={d} CHATHISTORY TARGETS {s} {s}\r\n", 412 + .{ 413 + batch_id, 414 + target.nick_or_channel, 415 + target.latest_timestamp, 416 + }, 417 + ) catch @panic("TODO"); 418 + } 419 + v.conn.print( 420 + self.gpa, 421 + ":{s} BATCH -{d} draft/chathistory-targets\r\n", 422 + .{ self.hostname, batch_id }, 391 423 ) catch @panic("TODO"); 392 424 393 425 self.queueWrite(v.conn.client, v.conn) catch {}; ··· 1645 1677 const ts_two_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_two_str } }) catch { 1646 1678 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp"); 1647 1679 }; 1680 + // NOTE: For TARGETS, we don't have an internal limit 1648 1681 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit, 10) catch { 1649 1682 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1650 1683 }; 1651 - if (limit_int > max_chathistory) { 1652 - return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1653 - } 1654 1684 1655 1685 const from = if (ts_one_inst.timestamp < ts_two_inst.timestamp) ts_one_inst else ts_two_inst; 1656 1686 const to = if (ts_one_inst.timestamp > ts_two_inst.timestamp) ts_one_inst else ts_two_inst;
+122 -3
src/db.zig
··· 271 271 } 272 272 273 273 pub fn chathistoryTargets(server: *Server, req: ChatHistory.TargetsRequest) void { 274 - // TODO: implement 275 - _ = server; 276 - _ = req; 274 + doTargets(server, req) catch |err| { 275 + log.err("querying chathistory targets: {}", .{err}); 276 + }; 277 + } 278 + 279 + fn doTargets(server: *Server, req: ChatHistory.TargetsRequest) !void { 280 + const user = req.conn.user orelse return; 281 + 282 + // On success, the arena will be passed with the result to the main thread and freed there 283 + var arena = std.heap.ArenaAllocator.init(server.gpa); 284 + errdefer arena.deinit(); 285 + 286 + const db_conn = server.db_pool.acquire(); 287 + defer server.db_pool.release(db_conn); 288 + 289 + var results: std.ArrayListUnmanaged(irc.ChatHistory.Target) = .empty; 290 + 291 + { 292 + // First we get all users we've had exchanges with over the time period 293 + const sql = 294 + \\WITH user_id AS ( 295 + \\ SELECT id FROM users WHERE nick = ? 296 + \\) 297 + \\SELECT 298 + \\ u1.nick AS sender_nick, 299 + \\ u2.nick AS recipient_nick, 300 + \\ MAX(m.timestamp_ms) AS latest_timestamp 301 + \\FROM messages m 302 + \\JOIN users u1 ON m.sender_id = u1.id 303 + \\JOIN users u2 ON m.recipient_id = u2.id 304 + \\WHERE (m.sender_id = (SELECT id FROM user_id) 305 + \\ OR m.recipient_id = (SELECT id FROM user_id)) 306 + \\ AND m.recipient_type = 1 307 + \\ AND m.timestamp_ms BETWEEN ? AND ? 308 + \\GROUP BY u1.nick, u2.nick; 309 + ; 310 + 311 + var rows = db_conn.rows( 312 + sql, 313 + .{ user.nick, req.from.milliseconds, req.to.milliseconds }, 314 + ) catch |err| { 315 + log.err("querying messages: {}: {s}", .{ err, db_conn.lastError() }); 316 + return; 317 + }; 318 + defer rows.deinit(); 319 + 320 + while (rows.next()) |row| { 321 + const sender = row.text(0); 322 + const recpt = row.text(1); 323 + const ts = row.int(2); 324 + // We report whichever isn't *us* 325 + if (std.ascii.eqlIgnoreCase(sender, user.nick)) { 326 + // We are the sender, report recpt 327 + const result: ChatHistory.Target = .{ 328 + .nick_or_channel = try arena.allocator().dupe(u8, recpt), 329 + .latest_timestamp = .{ .milliseconds = ts }, 330 + }; 331 + try results.append(arena.allocator(), result); 332 + } else { 333 + // We are the recpt, report sender 334 + const result: ChatHistory.Target = .{ 335 + .nick_or_channel = try arena.allocator().dupe(u8, sender), 336 + .latest_timestamp = .{ .milliseconds = ts }, 337 + }; 338 + try results.append(arena.allocator(), result); 339 + } 340 + } 341 + } 342 + 343 + { 344 + // Next we get all the channels we are a member of and the latest message 345 + const sql = 346 + \\WITH user_id AS ( 347 + \\ SELECT id FROM users WHERE nick = ? 348 + \\) 349 + \\SELECT 350 + \\ c.name AS channel_name, 351 + \\ MAX(m.timestamp_ms) AS latest_timestamp 352 + \\FROM messages m 353 + \\JOIN channels c ON m.recipient_id = c.id 354 + \\JOIN channel_membership cm ON cm.channel_id = c.id 355 + \\WHERE cm.user_id = (SELECT id FROM user_id) 356 + \\ AND m.recipient_type = 0 -- recipient_type for channels 357 + \\ AND m.timestamp_ms BETWEEN ? AND ? 358 + \\GROUP BY c.name; 359 + ; 360 + 361 + var rows = db_conn.rows( 362 + sql, 363 + .{ user.nick, req.from.milliseconds, req.to.milliseconds }, 364 + ) catch |err| { 365 + log.err("querying messages: {}: {s}", .{ err, db_conn.lastError() }); 366 + return; 367 + }; 368 + defer rows.deinit(); 369 + 370 + while (rows.next()) |row| { 371 + const channel = row.text(0); 372 + const ts = row.int(1); 373 + const result: ChatHistory.Target = .{ 374 + .nick_or_channel = try arena.allocator().dupe(u8, channel), 375 + .latest_timestamp = .{ .milliseconds = ts }, 376 + }; 377 + try results.append(arena.allocator(), result); 378 + } 379 + } 380 + 381 + // If the number of results is too many, we sort and truncate 382 + if (results.items.len > req.limit) { 383 + // TODO: Sort and prune 384 + } 385 + 386 + const batch: ChatHistory.TargetBatch = .{ 387 + .arena = arena, 388 + .conn = req.conn, 389 + .items = results.items, 390 + }; 391 + 392 + server.wakeup_mutex.lock(); 393 + defer server.wakeup_mutex.unlock(); 394 + try server.wakeup_results.append(server.gpa, .{ .history_targets = batch }); 395 + try server.wakeup.notify(); 277 396 } 278 397 279 398 pub fn chathistoryAfter(server: *Server, req: ChatHistory.AfterRequest) void {
+11
src/irc.zig
··· 97 97 items: []HistoryMessage, 98 98 target: []const u8, 99 99 }; 100 + 101 + pub const Target = struct { 102 + nick_or_channel: []const u8, 103 + latest_timestamp: Timestamp, 104 + }; 105 + 106 + pub const TargetBatch = struct { 107 + arena: std.heap.ArenaAllocator, 108 + conn: *Connection, 109 + items: []Target, 110 + }; 100 111 }; 101 112 102 113 pub const Message = struct {