this repo has no description
at main 80 kB view raw
1const Server = @This(); 2 3const builtin = @import("builtin"); 4const httpz = @import("httpz"); 5const std = @import("std"); 6const sqlite = @import("sqlite"); 7const uuid = @import("uuid"); 8const xev = @import("xev"); 9const zeit = @import("zeit"); 10 11const atproto = @import("atproto.zig"); 12const db = @import("db.zig"); 13const github = @import("github.zig"); 14const http = @import("http.zig"); 15const irc = @import("irc.zig"); 16const log = @import("log.zig"); 17 18const Allocator = std.mem.Allocator; 19const Capability = irc.Capability; 20const Channel = irc.Channel; 21const ChannelPrivileges = irc.ChannelPrivileges; 22const ChatHistory = irc.ChatHistory; 23const ClientMessage = irc.ClientMessage; 24const HeapArena = @import("HeapArena.zig"); 25const Http = @import("http.zig"); 26const Message = irc.Message; 27const MessageIterator = irc.MessageIterator; 28const Queue = @import("queue.zig").Queue; 29const Sanitize = @import("sanitize.zig"); 30const SaslMechanism = irc.SaslMechanism; 31const ThreadPool = @import("ThreadPool.zig"); 32const Timestamp = irc.Timestamp; 33const User = irc.User; 34 35const assert = std.debug.assert; 36 37// We allow tags, so our maximum is 4096 + 512 38const max_message_len = 4096 + 512; 39 40const garbage_collect_ms = 1 * std.time.ms_per_min; 41 42const max_chathistory: u16 = 100; 43 44const ProcessMessageError = error{ClientQuit} || Allocator.Error; 45 46pub const WakeupResult = union(enum) { 47 auth_success: AuthSuccess, 48 auth_failure: struct { 49 arena: HeapArena, 50 fd: xev.TCP, 51 msg: []const u8, 52 }, 53 history_batch: ChatHistory.HistoryBatch, 54 history_targets: ChatHistory.TargetBatch, 55 mark_read: struct { 56 arena: HeapArena, 57 fd: xev.TCP, 58 target: []const u8, 59 timestamp: ?Timestamp, 60 }, 61 // a new event stream has connected. This happens in another thread, so we coalesce in the main 62 // via wakeup 63 event_stream: *http.EventStream, 64 65 pub const AuthSuccess = struct { 66 arena: HeapArena, 67 fd: xev.TCP, 68 avatar_url: []const u8, 69 nick: []const u8, 70 user: []const u8, 71 realname: []const u8, 72 }; 73}; 74 75pub const WorkerQueue = Queue(WakeupResult, 128); 76 77pub const Options = struct { 78 hostname: []const u8 = "localhost", 79 irc_port: u16 = 6667, 80 /// If http_port is null, the http server will not be started 81 http_port: ?u16 = 8080, 82 auth: AuthProvider = .none, 83 db_path: [:0]const u8 = "apollo.db", 84}; 85 86const Capabilities = packed struct { 87 @"away-notify": bool = false, 88 @"draft/chathistory": bool = false, 89 @"draft/read-marker": bool = false, 90 @"draft/no-implicit-names": bool = false, 91 @"echo-message": bool = false, 92 @"message-tags": bool = false, 93 @"server-time": bool = false, 94 @"standard-replies": bool = false, 95}; 96 97const AuthProvider = enum { 98 none, 99 github, 100 atproto, 101}; 102 103const PendingAuth = struct { 104 conn: *Connection, 105 mechanism: SaslMechanism, 106}; 107 108gpa: std.mem.Allocator, 109loop: xev.Loop, 110address: std.net.Address, 111tcp: xev.TCP, 112hostname: []const u8, 113auth: AuthProvider, 114 115http_client: std.http.Client, 116 117http_server: Http.Server, 118httpz_server: httpz.Server(*Http.Server), 119http_server_thread: ?std.Thread, 120 121/// maps a tcp connection to an EventStream object 122event_streams: std.AutoArrayHashMapUnmanaged(xev.TCP, *http.EventStream), 123/// maps a tcp connection to a connection object 124connections: std.AutoArrayHashMapUnmanaged(xev.TCP, *Connection), 125/// maps a nick to a user 126nick_map: std.StringArrayHashMapUnmanaged(*User), 127/// maps channel name to channel 128channels: std.StringArrayHashMapUnmanaged(*Channel), 129 130pending_auth: std.ArrayListUnmanaged(PendingAuth), 131 132garbage_collect_timer: xev.Timer, 133gc_cycle: u8, 134 135thread_pool: ThreadPool, 136db_pool: *sqlite.Pool, 137wakeup: xev.Async, 138wakeup_queue: WorkerQueue, 139 140completion_pool: MemoryPoolUnmanaged, 141next_batch: u32, 142 143pub fn init( 144 self: *Server, 145 gpa: std.mem.Allocator, 146 opts: Options, 147) !void { 148 const addr = try std.net.Address.parseIp4("127.0.0.1", opts.irc_port); 149 150 const core_count = if (builtin.is_test) 1 else @max(4, std.Thread.getCpuCount() catch 0); 151 const db_config: sqlite.Pool.Config = .{ 152 .size = core_count, 153 .path = opts.db_path, 154 .flags = sqlite.OpenFlags.Create | 155 sqlite.OpenFlags.EXResCode | 156 sqlite.OpenFlags.ReadWrite, 157 .on_first_connection = db.createTables, 158 .on_connection = db.setPragmas, 159 }; 160 161 const n_jobs: u16 = @intCast(core_count); 162 163 const db_pool: *sqlite.Pool = try .init(gpa, db_config); 164 165 self.* = .{ 166 .gpa = gpa, 167 .loop = try xev.Loop.init(.{ .entries = 1024 }), 168 .tcp = try .init(addr), 169 .address = addr, 170 .event_streams = .empty, 171 .connections = .empty, 172 .nick_map = .empty, 173 .channels = .empty, 174 .hostname = opts.hostname, 175 .auth = opts.auth, 176 .http_client = .{ .allocator = gpa }, 177 .http_server = undefined, 178 .httpz_server = undefined, 179 .http_server_thread = null, 180 .garbage_collect_timer = try .init(), 181 .gc_cycle = 0, 182 .thread_pool = undefined, 183 .db_pool = db_pool, 184 .wakeup = try .init(), 185 .wakeup_queue = .{}, 186 .completion_pool = .empty, 187 .next_batch = 0, 188 .pending_auth = .empty, 189 }; 190 191 if (opts.http_port) |http_port| { 192 // If we have an http port, we start the server and spawn it's thread 193 self.http_server = .{ 194 .gpa = gpa, 195 .channels = &self.channels, 196 .db_pool = db_pool, 197 .irc_server = self, 198 }; 199 200 self.httpz_server = try httpz.Server(*Http.Server).init( 201 gpa, 202 .{ 203 .port = http_port, 204 .request = .{ .max_form_count = 1 }, 205 .thread_pool = .{ .count = n_jobs }, 206 }, 207 &self.http_server, 208 ); 209 self.http_server_thread = try .spawn( 210 .{}, 211 webMain, 212 .{ self, http_port }, 213 ); 214 } 215 try self.thread_pool.init(.{ .allocator = gpa, .n_jobs = n_jobs }); 216 self.wakeup_queue.eventfd = self.wakeup.fd; 217 218 const tcp_c = try self.completion_pool.create(self.gpa); 219 self.tcp.accept(&self.loop, tcp_c, Server, self, Server.onAccept); 220 log.info("Listening at {}", .{self.address}); 221 222 // Start listening for our wakeup 223 const wakeup_c = try self.completion_pool.create(self.gpa); 224 self.wakeup.wait(&self.loop, wakeup_c, Server, self, Server.onWakeup); 225 226 // Start the rehash timer. This is a timer to rehash our hashmaps 227 const rehash_c = try self.completion_pool.create(self.gpa); 228 self.garbage_collect_timer.run(&self.loop, rehash_c, garbage_collect_ms, Server, self, Server.onGarbageCollect); 229 230 try self.tcp.bind(addr); 231 try self.tcp.listen(256); 232 233 // get the bound port 234 var sock_len = self.address.getOsSockLen(); 235 try std.posix.getsockname(self.tcp.fd, &self.address.any, &sock_len); 236 237 // Load initial db data 238 try db.loadChannels(self); 239 try db.loadUsers(self); 240 try db.loadChannelMembership(self); 241 242 log.info("{d} users in {d} channels", .{ self.nick_map.count(), self.channels.count() }); 243} 244 245fn onGarbageCollect( 246 ud: ?*Server, 247 _: *xev.Loop, 248 c: *xev.Completion, 249 r: xev.Timer.RunError!void, 250) xev.CallbackAction { 251 const self = ud.?; 252 defer self.gc_cycle +|= 1; 253 _ = r catch |err| { 254 log.err("timer error: {}", .{err}); 255 }; 256 257 self.garbage_collect_timer.run( 258 &self.loop, 259 c, 260 garbage_collect_ms, 261 Server, 262 self, 263 Server.onGarbageCollect, 264 ); 265 266 // Close unauthenticated connections. Every cycle we do this 267 { 268 var iter = self.connections.iterator(); 269 const now = std.time.timestamp(); 270 while (iter.next()) |entry| { 271 const conn = entry.value_ptr.*; 272 if (conn.isAuthenticated()) continue; 273 // If it's been more than 60 seconds since this connection connected and it isn't 274 // authenticated, we close it 275 if (conn.connected_at + 60 < now) { 276 log.debug("closing unauthenticated connection: {d}", .{conn.client.fd}); 277 // Cancel the read completion. One cancelled, we close and clean up the 278 // connection 279 const close_c = self.completion_pool.create(self.gpa) catch { 280 @panic("Out of memory"); 281 }; 282 self.loop.cancel(conn.read_c, close_c, Server, self, Server.onCancel); 283 } 284 } 285 } 286 287 // Increment this in each GC block so we stagger the cycles 288 var stagger: u8 = 0; 289 290 if ((self.gc_cycle + stagger) % 10 == 0) { 291 stagger += 1; 292 // Clean up connections hash map. Every 10th cycle 293 connections: { 294 const keys = self.gpa.dupe(xev.TCP, self.connections.keys()) catch break :connections; 295 defer self.gpa.free(keys); 296 const values = self.gpa.dupe(*Connection, self.connections.values()) catch break :connections; 297 defer self.gpa.free(values); 298 self.connections.shrinkAndFree(self.gpa, keys.len); 299 self.connections.reinit(self.gpa, keys, values) catch break :connections; 300 } 301 } 302 303 if ((self.gc_cycle + stagger) % 10 == 0) { 304 stagger += 1; 305 // Clean up pending auth list. We shrink it to the size of items it has (effecitvely 306 // clearing it's capacity) 307 const len = self.pending_auth.items.len; 308 self.pending_auth.shrinkAndFree(self.gpa, len); 309 } 310 311 // TODO: GC completion memory pool 312 // TODO: GC Event stream hash map 313 // TODO: rehash all hashmaps 314 315 return .disarm; 316} 317 318pub fn deinit(self: *Server) void { 319 log.info("shutting down", .{}); 320 { 321 var iter = self.connections.iterator(); 322 while (iter.next()) |entry| { 323 const tcp = entry.key_ptr.*; 324 std.posix.close(tcp.fd); 325 const conn = entry.value_ptr.*; 326 conn.deinit(self.gpa); 327 self.gpa.destroy(conn); 328 } 329 } 330 { 331 var iter = self.event_streams.iterator(); 332 while (iter.next()) |entry| { 333 const tcp = entry.key_ptr.*; 334 std.posix.close(tcp.fd); 335 const es = entry.value_ptr.*; 336 es.write_buf.deinit(self.gpa); 337 self.gpa.destroy(es); 338 } 339 } 340 self.wakeup_queue.lock(); 341 defer self.wakeup_queue.unlock(); 342 while (self.wakeup_queue.drain()) |result| { 343 switch (result) { 344 .event_stream => |v| self.gpa.destroy(v), 345 inline else => |v| v.arena.deinit(), 346 } 347 } 348 for (self.nick_map.values()) |v| { 349 v.deinit(self.gpa); 350 self.gpa.destroy(v); 351 } 352 self.nick_map.deinit(self.gpa); 353 self.connections.deinit(self.gpa); 354 self.event_streams.deinit(self.gpa); 355 self.completion_pool.deinit(self.gpa); 356 self.loop.deinit(); 357 self.http_client.deinit(); 358 if (self.http_server_thread) |thread| { 359 // We have an http server. Clean it up 360 self.httpz_server.stop(); 361 self.httpz_server.deinit(); 362 thread.join(); 363 } 364 self.thread_pool.deinit(); 365 366 // Do a couple last minute pragmas 367 { 368 const conn = self.db_pool.acquire(); 369 defer self.db_pool.release(conn); 370 conn.execNoArgs("PRAGMA analysis_limit = 400") catch {}; 371 conn.execNoArgs("PRAGMA optimize") catch {}; 372 } 373 self.db_pool.deinit(); 374} 375 376// Runs while value is true 377fn runUntil(self: *Server, value: *const std.atomic.Value(bool), wg: *std.Thread.WaitGroup) !void { 378 wg.finish(); 379 while (value.load(.unordered)) { 380 try self.loop.run(.once); 381 } 382} 383 384fn onWakeup( 385 ud: ?*Server, 386 _: *xev.Loop, 387 _: *xev.Completion, 388 r: xev.Async.WaitError!void, 389) xev.CallbackAction { 390 const self = ud.?; 391 _ = r catch |err| { 392 log.err("wait error: {}", .{err}); 393 }; 394 395 // Drain anything that may have woken us up 396 self.wakeup_queue.lock(); 397 defer self.wakeup_queue.unlock(); 398 while (self.wakeup_queue.drain()) |result| { 399 switch (result) { 400 .auth_success => |v| { 401 defer v.arena.deinit(); 402 self.onSuccessfulAuth( 403 v.fd, 404 v.nick, 405 v.user, 406 v.realname, 407 v.avatar_url, 408 ) catch |err| { 409 log.err("could finish auth: {}", .{err}); 410 const conn = self.connections.get(v.fd) orelse continue; 411 self.errSaslFail(conn, "failed to finish authentication") catch {}; 412 }; 413 }, 414 .auth_failure => |v| { 415 defer v.arena.deinit(); 416 log.debug("auth response={s}", .{v.msg}); 417 const conn = self.connections.get(v.fd) orelse continue; 418 self.errSaslFail(conn, v.msg) catch {}; 419 }, 420 .history_batch => |v| { 421 defer v.arena.deinit(); 422 const conn = self.connections.get(v.fd) orelse continue; 423 const batch_id = self.next_batch; 424 self.next_batch +|= 1; 425 conn.print( 426 self.gpa, 427 ":{s} BATCH +{d} chathistory {s}\r\n", 428 .{ self.hostname, batch_id, v.target }, 429 ) catch @panic("TODO"); 430 for (v.items) |msg| { 431 conn.print( 432 self.gpa, 433 "@time={};batch={d};msgid={s} :{s} {s}\r\n", 434 .{ 435 msg.timestamp, 436 batch_id, 437 msg.uuid, 438 msg.sender, 439 msg.message, 440 }, 441 ) catch @panic("TODO"); 442 } 443 conn.print( 444 self.gpa, 445 ":{s} BATCH -{d} chathistory {s}\r\n", 446 .{ self.hostname, batch_id, v.target }, 447 ) catch @panic("TODO"); 448 449 self.queueWrite(conn.client, conn) catch {}; 450 }, 451 .history_targets => |v| { 452 defer v.arena.deinit(); 453 const conn = self.connections.get(v.fd) orelse continue; 454 const batch_id = self.next_batch; 455 self.next_batch +|= 1; 456 conn.print( 457 self.gpa, 458 ":{s} BATCH +{d} draft/chathistory-targets\r\n", 459 .{ self.hostname, batch_id }, 460 ) catch @panic("TODO"); 461 for (v.items) |target| { 462 conn.print( 463 self.gpa, 464 "@batch={d} CHATHISTORY TARGETS {s} {s}\r\n", 465 .{ 466 batch_id, 467 target.nick_or_channel, 468 target.latest_timestamp, 469 }, 470 ) catch @panic("TODO"); 471 } 472 conn.print( 473 self.gpa, 474 ":{s} BATCH -{d} draft/chathistory-targets\r\n", 475 .{ self.hostname, batch_id }, 476 ) catch @panic("TODO"); 477 478 self.queueWrite(conn.client, conn) catch {}; 479 }, 480 .mark_read => |v| { 481 defer v.arena.deinit(); 482 const c = self.connections.get(v.fd) orelse continue; 483 const user = c.user orelse continue; 484 // We report the markread to all connections 485 for (user.connections.items) |conn| { 486 if (v.timestamp) |timestamp| { 487 conn.print( 488 self.gpa, 489 ":{s} MARKREAD {s} timestamp={s}\r\n", 490 .{ self.hostname, v.target, timestamp }, 491 ) catch @panic("TODO"); 492 } else { 493 conn.print( 494 self.gpa, 495 ":{s} MARKREAD {s} *\r\n", 496 .{ self.hostname, v.target }, 497 ) catch @panic("TODO"); 498 } 499 } 500 }, 501 .event_stream => |v| { 502 self.event_streams.put(self.gpa, v.stream, v) catch @panic("OOM"); 503 v.channel.streams.append(self.gpa, v) catch @panic("OOM"); 504 }, 505 } 506 } 507 return .rearm; 508} 509 510fn onSuccessfulAuth( 511 self: *Server, 512 fd: xev.TCP, 513 nick: []const u8, 514 username: []const u8, 515 realname: []const u8, 516 avatar_url: []const u8, 517) Allocator.Error!void { 518 const conn = self.connections.get(fd) orelse { 519 log.warn("connection not found: fd={d}", .{fd.fd}); 520 return; 521 }; 522 523 // If we don't have a user in the map, we will create one and insert it 524 if (!self.nick_map.contains(nick)) { 525 const user = try self.gpa.create(User); 526 user.* = .init(); 527 user.real = try self.gpa.dupe(u8, realname); 528 user.username = try self.gpa.dupe(u8, username); 529 user.avatar_url = try self.gpa.dupe(u8, avatar_url); 530 user.nick = try self.gpa.dupe(u8, nick); 531 try self.nick_map.put(self.gpa, nick, user); 532 } 533 const user = self.nick_map.get(nick).?; 534 535 // Store or update the user in the db. We do this in a worker thread 536 try self.thread_pool.spawn(db.storeUser, .{ self.db_pool, user }); 537 try user.connections.append(self.gpa, conn); 538 conn.user = user; 539 540 try conn.print( 541 self.gpa, 542 ":{s} 900 {s} {s}!{s}@{s} {s} :You are now logged in\r\n", 543 .{ 544 self.hostname, 545 user.nick, 546 user.nick, 547 user.username, 548 self.hostname, 549 user.nick, 550 }, 551 ); 552 try conn.print(self.gpa, ":{s} 903 {s} :SASL successful\r\n", .{ self.hostname, user.nick }); 553 // RPL_WELCOME 554 try conn.print(self.gpa, ":{s} 001 {s} :Good Apollo, I'm burning Star IV!\r\n", .{ self.hostname, user.nick }); 555 // RPL_YOURHOST 556 try conn.print(self.gpa, ":{s} 002 {s} :Your host is {s}\r\n", .{ self.hostname, user.nick, self.hostname }); 557 // RPL_CREATED 558 try conn.print(self.gpa, ":{s} 003 {s} :This server exists\r\n", .{ self.hostname, user.nick }); 559 // RPL_MYINFO 560 // TODO: include any user or channel modes? 561 try conn.print(self.gpa, ":{s} 004 {s} apollo v0.0.0 \r\n", .{ self.hostname, user.nick }); 562 // ISUPPORT 563 try conn.print( 564 self.gpa, 565 ":{s} 005 {s} WHOX CHATHISTORY={d} MSGREFTYPES=timestamp PREFIX=(o)@ :are supported\r\n", 566 .{ self.hostname, user.nick, max_chathistory }, 567 ); 568 569 // MOTD. Some clients check for these, so we need to send them unilaterally (eg goguma) 570 try conn.print(self.gpa, ":{s} 375 {s} :Message of the day -\r\n", .{ self.hostname, user.nick }); 571 try conn.print(self.gpa, ":{s} 376 {s} :End of Message of the day -\r\n", .{ self.hostname, user.nick }); 572 573 // If this is the only connection the user has, we notify all channels they are a member of 574 // that they are back 575 if (user.connections.items.len == 1) { 576 for (user.channels.items) |chan| { 577 try chan.notifyBack(self, user); 578 } 579 } 580 581 // Send a join to the user for all of their channels 582 for (user.channels.items) |chan| { 583 var buf: [128]u8 = undefined; 584 const m = std.fmt.bufPrint(&buf, "JOIN {s}", .{chan.name}) catch unreachable; 585 try self.handleJoin(conn, .init(m)); 586 } 587 588 // If the client isn't part of any channels, we'll force them into #apollo 589 if (user.channels.items.len == 0) { 590 try self.handleJoin(conn, .init("JOIN #apollo")); 591 } 592 593 try self.queueWrite(conn.client, conn); 594} 595 596/// xev callback when a connection occurs 597fn onAccept( 598 ud: ?*Server, 599 loop: *xev.Loop, 600 _: *xev.Completion, 601 result: xev.AcceptError!xev.TCP, 602) xev.CallbackAction { 603 const self = ud.?; 604 605 // Accept the connection 606 const client = result catch |err| { 607 log.err("accept error: {}", .{err}); 608 return .rearm; 609 }; 610 611 self.accept(loop, client) catch |err| { 612 log.err("couldn't accept connection: fd={d}", .{client.fd}); 613 switch (err) { 614 error.OutOfMemory => return .disarm, 615 } 616 }; 617 618 return .rearm; 619} 620 621// Initializes the connection 622fn accept(self: *Server, loop: *xev.Loop, client: xev.TCP) Allocator.Error!void { 623 log.debug("accepted connection: fd={d}", .{client.fd}); 624 const completion = try self.completion_pool.create(self.gpa); 625 const conn = try self.gpa.create(Connection); 626 conn.init(client, completion); 627 628 try self.connections.put(self.gpa, client, conn); 629 630 client.read( 631 loop, 632 completion, 633 .{ .slice = &conn.read_buf }, 634 Server, 635 self, 636 Server.onRead, 637 ); 638} 639 640fn handleClientDisconnect(self: *Server, client: xev.TCP) void { 641 log.info("client disconnected: fd={d}", .{client.fd}); 642 643 const conn = self.connections.get(client) orelse { 644 log.warn("connection not found: fd={d}", .{client.fd}); 645 return; 646 }; 647 648 // Remove this connection from any pending auth state 649 for (self.pending_auth.items, 0..) |v, i| { 650 if (v.conn == conn) { 651 _ = self.pending_auth.swapRemove(i); 652 } 653 } 654 655 if (conn.user) |user| { 656 // Remove this connection from the nick_map 657 if (self.nick_map.get(user.nick)) |v| { 658 for (v.connections.items, 0..) |c, i| { 659 if (c == conn) { 660 _ = v.connections.swapRemove(i); 661 break; 662 } 663 } 664 665 // No more connections 666 if (v.connections.items.len == 0) { 667 for (v.channels.items) |chan| { 668 chan.notifyAway(self, v) catch {}; 669 } 670 } 671 } 672 } 673 conn.deinit(self.gpa); 674 _ = self.connections.swapRemove(client); 675 self.gpa.destroy(conn); 676} 677 678/// queues a write of the pending buffer. If there is nothing to queue, this is a noop 679pub fn queueWrite(self: *Server, tcp: xev.TCP, conn: *Connection) Allocator.Error!void { 680 if (conn.write_buf.items.len == 0) return; 681 const buf = try conn.write_buf.toOwnedSlice(self.gpa); 682 const write_c = try self.completion_pool.create(self.gpa); 683 tcp.write( 684 &self.loop, 685 write_c, 686 .{ .slice = buf }, 687 Server, 688 self, 689 Server.onWrite, 690 ); 691} 692 693pub fn queueWriteEventStream(self: *Server, stream: *http.EventStream) Allocator.Error!void { 694 if (stream.write_buf.items.len == 0) return; 695 const buf = try stream.write_buf.toOwnedSlice(self.gpa); 696 const tcp: xev.TCP = .{ .fd = stream.stream.fd }; 697 tcp.write( 698 &self.loop, 699 &stream.write_c, 700 .{ .slice = buf }, 701 Server, 702 self, 703 Server.onEventStreamWrite, 704 ); 705} 706 707fn onWrite( 708 ud: ?*Server, 709 _: *xev.Loop, 710 c: *xev.Completion, 711 tcp: xev.TCP, 712 wb: xev.WriteBuffer, 713 result: xev.WriteError!usize, 714) xev.CallbackAction { 715 const self = ud.?; 716 self.completion_pool.destroy(c); 717 defer self.gpa.free(wb.slice); 718 719 const n = result catch |err| { 720 log.err("write error: {}", .{err}); 721 return .disarm; 722 }; 723 log.debug("write: {s}", .{std.mem.trimRight(u8, wb.slice[0..n], "\r\n")}); 724 725 const conn = self.connections.get(tcp) orelse { 726 log.err("connection not found: {d}", .{tcp.fd}); 727 return .disarm; 728 }; 729 730 // Incomplete write. Insert the unwritten portion at the front of the list and we'll requeue 731 if (n < wb.slice.len) { 732 conn.write_buf.insertSlice(self.gpa, 0, wb.slice[n..]) catch |err| { 733 log.err("couldn't insert unwritten bytes: {}", .{err}); 734 return .disarm; 735 }; 736 } 737 self.queueWrite(tcp, conn) catch {}; 738 return .disarm; 739} 740 741fn onEventStreamWrite( 742 ud: ?*Server, 743 _: *xev.Loop, 744 _: *xev.Completion, 745 tcp: xev.TCP, 746 wb: xev.WriteBuffer, 747 result: xev.WriteError!usize, 748) xev.CallbackAction { 749 const self = ud.?; 750 defer self.gpa.free(wb.slice); 751 752 const n = result catch |err| { 753 log.warn("Event stream error, probably closed: {}", .{err}); 754 // Clean up the stream. We remove it from the Server streams, and the channel streams 755 const kv = self.event_streams.fetchSwapRemove(tcp) orelse return .disarm; 756 const es = kv.value; 757 const channel = es.channel; 758 for (channel.streams.items, 0..) |item, i| { 759 if (item == es) { 760 _ = channel.streams.swapRemove(i); 761 break; 762 } 763 } 764 std.posix.close(es.stream.fd); 765 es.write_buf.deinit(self.gpa); 766 self.gpa.destroy(es); 767 return .disarm; 768 }; 769 770 log.debug("event stream write: {s}", .{std.mem.trimRight(u8, wb.slice[0..n], "\r\n")}); 771 772 const es = self.event_streams.get(tcp) orelse { 773 log.err("event_stream not found: {d}", .{tcp.fd}); 774 std.posix.close(tcp.fd); 775 return .disarm; 776 }; 777 778 // Incomplete write. Insert the unwritten portion at the front of the list and we'll requeue 779 if (n < wb.slice.len) { 780 es.write_buf.insertSlice(self.gpa, 0, wb.slice[n..]) catch |err| { 781 log.err("couldn't insert unwritten bytes: {}", .{err}); 782 return .disarm; 783 }; 784 } 785 self.queueWriteEventStream(es) catch {}; 786 return .disarm; 787} 788 789/// queues a write of the pending buffer. On completion, closes the conenction 790fn queueFinalWrite(self: *Server, tcp: xev.TCP, conn: *Connection) Allocator.Error!void { 791 if (conn.write_buf.items.len == 0) { 792 // client disconnected. Close the fd 793 const write_c = try self.completion_pool.create(self.gpa); 794 conn.client.close(&self.loop, write_c, Server, self, Server.onClose); 795 return; 796 } 797 const buf = try conn.write_buf.toOwnedSlice(self.gpa); 798 const write_c = try self.completion_pool.create(self.gpa); 799 tcp.write( 800 &self.loop, 801 write_c, 802 .{ .slice = buf }, 803 Server, 804 self, 805 Server.onFinalWrite, 806 ); 807} 808 809fn onFinalWrite( 810 ud: ?*Server, 811 _: *xev.Loop, 812 c: *xev.Completion, 813 tcp: xev.TCP, 814 wb: xev.WriteBuffer, 815 result: xev.WriteError!usize, 816) xev.CallbackAction { 817 const self = ud.?; 818 self.completion_pool.destroy(c); 819 defer self.gpa.free(wb.slice); 820 821 const n = result catch |err| { 822 log.err("write error: {}", .{err}); 823 return .disarm; 824 }; 825 log.debug("write: {s}", .{std.mem.trimRight(u8, wb.slice[0..n], "\r\n")}); 826 827 const conn = self.connections.get(tcp) orelse { 828 log.err("connection not found: {d}", .{tcp.fd}); 829 return .disarm; 830 }; 831 832 // Incomplete write. Insert the unwritten portion at the front of the list and we'll requeue 833 if (n < wb.slice.len) { 834 conn.write_buf.insertSlice(self.gpa, 0, wb.slice[n..]) catch |err| { 835 log.err("couldn't insert unwritten bytes: {}", .{err}); 836 return .disarm; 837 }; 838 } 839 self.queueFinalWrite(tcp, conn) catch {}; 840 return .disarm; 841} 842 843fn onCancel( 844 ud: ?*Server, 845 _: *xev.Loop, 846 c: *xev.Completion, 847 result: xev.CancelError!void, 848) xev.CallbackAction { 849 log.debug("cancelled completion {x}", .{@intFromPtr(c)}); 850 const self = ud.?; 851 self.completion_pool.destroy(c); 852 _ = result catch |err| { 853 log.err("close error: {}", .{err}); 854 }; 855 return .disarm; 856} 857 858fn onClose( 859 ud: ?*Server, 860 _: *xev.Loop, 861 c: *xev.Completion, 862 client: xev.TCP, 863 result: xev.CloseError!void, 864) xev.CallbackAction { 865 const self = ud.?; 866 _ = result catch |err| { 867 log.err("close error: {}", .{err}); 868 }; 869 self.handleClientDisconnect(client); 870 self.completion_pool.destroy(c); 871 return .disarm; 872} 873 874fn onRead( 875 ud: ?*Server, 876 loop: *xev.Loop, 877 c: *xev.Completion, 878 client: xev.TCP, 879 rb: xev.ReadBuffer, 880 result: xev.ReadError!usize, 881) xev.CallbackAction { 882 const self = ud.?; 883 884 // Get the read result 885 const n = result catch |err| { 886 switch (err) { 887 error.Canceled => log.info("read canceled: fd={d}", .{client.fd}), 888 error.EOF => log.info("client eof: fd={d}", .{client.fd}), 889 else => log.err("read error: {}", .{err}), 890 } 891 // client disconnected. Close the fd 892 client.close(loop, c, Server, self, Server.onClose); 893 return .disarm; 894 }; 895 896 // Handle a disconnected client 897 if (n == 0) { 898 client.close(loop, c, Server, self, Server.onClose); 899 return .disarm; 900 } 901 902 // Get the client 903 const conn = self.connections.get(client) orelse { 904 log.warn("client not found: fd={d}", .{client.fd}); 905 client.close(loop, c, Server, self, Server.onClose); 906 return .disarm; 907 }; 908 909 // Process the newly read bytes 910 self.processMessages(conn, rb.slice[0..n]) catch |err| { 911 log.err("couldn't process message: fd={d}", .{client.fd}); 912 switch (err) { 913 error.OutOfMemory => return .disarm, 914 error.ClientQuit => { 915 self.queueFinalWrite(client, conn) catch { 916 // On error, we just close the fd 917 client.close(loop, c, Server, self, Server.onClose); 918 }; 919 return .disarm; 920 }, 921 } 922 }; 923 924 self.queueWrite(client, conn) catch |err| { 925 log.err("couldn't queue write: fd={d}", .{client.fd}); 926 switch (err) { 927 error.OutOfMemory => return .disarm, 928 } 929 }; 930 return .rearm; 931} 932 933fn processMessages(self: *Server, conn: *Connection, bytes: []const u8) ProcessMessageError!void { 934 const buf: []const u8 = 935 // If we have no queue, and this message is complete, we can process without allocating 936 if (conn.read_queue.items.len == 0 and std.mem.endsWith(u8, bytes, "\n")) 937 bytes 938 else blk: { 939 try conn.read_queue.appendSlice(self.gpa, bytes); 940 break :blk conn.read_queue.items; 941 }; 942 943 var iter: MessageIterator = .{ .bytes = buf }; 944 while (iter.next()) |raw| { 945 log.debug("read: {s}", .{raw}); 946 const msg: Message = .init(raw); 947 try self.handleMessage(conn, msg); 948 } 949 950 // if our read_queue is empty, we are done 951 if (conn.read_queue.items.len == 0) return; 952 953 // Clean up the read_queue 954 955 // Replace the amount we read 956 conn.read_queue.replaceRangeAssumeCapacity(0, iter.bytesRead(), ""); 957 958 if (conn.read_queue.items.len == 0) { 959 // If we consumed the entire thing, reclaim the memory 960 conn.read_queue.clearAndFree(self.gpa); 961 } else if (conn.read_queue.items.len > Server.max_message_len) { 962 // If we have > max_message_size bytes, we send an error 963 conn.read_queue.clearAndFree(self.gpa); 964 return self.errInputTooLong(conn); 965 } 966} 967 968fn handleMessage(self: *Server, conn: *Connection, msg: Message) ProcessMessageError!void { 969 const cmd = msg.command(); 970 971 const client_msg = ClientMessage.fromString(cmd) orelse { 972 return self.errUnknownCommand(conn, cmd); 973 }; 974 975 switch (client_msg) { 976 .CAP => try self.handleCap(conn, msg), 977 .NICK => try self.handleNick(conn, msg), 978 .USER => try self.handleUser(conn, msg), 979 .AUTHENTICATE => try self.handleAuthenticate(conn, msg), 980 .PASS => {}, 981 .PING => try self.handlePing(conn, msg), 982 .QUIT => try self.handleQuit(conn, msg), 983 .MODE => try self.handleMode(conn, msg), 984 985 .JOIN => try self.handleJoin(conn, msg), 986 .TOPIC => try self.handleTopic(conn, msg), 987 .PART => try self.handlePart(conn, msg), 988 .NAMES => try self.handleNames(conn, msg), 989 .LIST => try self.handleList(conn, msg), 990 991 .PRIVMSG => try self.handlePrivMsg(conn, msg), 992 .TAGMSG => try self.handleTagMsg(conn, msg), 993 994 .WHO => try self.handleWho(conn, msg), 995 996 .AWAY => try self.handleAway(conn, msg), 997 .CHATHISTORY => try self.handleChathistory(conn, msg), 998 .MARKREAD => try self.handleMarkread(conn, msg), 999 else => try self.errUnknownCommand(conn, cmd), 1000 } 1001} 1002 1003fn handleCap(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1004 var iter = msg.paramIterator(); 1005 const subcmd = iter.next() orelse return self.errNeedMoreParams(conn, "CAP"); 1006 1007 if (std.mem.eql(u8, subcmd, "LS")) { 1008 // LS lists available capabilities 1009 // We expect a 302, but we don't actually care 1010 try conn.print( 1011 self.gpa, 1012 ":{s} CAP {s} LS :", 1013 .{ self.hostname, conn.nickname() }, 1014 ); 1015 for (std.meta.fieldNames(Capability), 0..) |cap, i| { 1016 if (i > 0) try conn.write(self.gpa, " "); 1017 try conn.write(self.gpa, cap); 1018 if (std.mem.eql(u8, "sasl", cap)) try conn.write(self.gpa, "=PLAIN"); 1019 } 1020 try conn.write(self.gpa, "\r\n"); 1021 } else if (std.mem.eql(u8, subcmd, "LIST")) { 1022 // LIST lists enabled capabilities 1023 } else if (std.mem.eql(u8, subcmd, "REQ")) { 1024 // REQ tries to enable the given capability 1025 const caps = iter.next() orelse return; 1026 var cap_iter = std.mem.splitScalar(u8, caps, ' '); 1027 while (cap_iter.next()) |cap_str| { 1028 const cap = std.meta.stringToEnum(Capability, cap_str) orelse { 1029 try conn.print( 1030 self.gpa, 1031 ":{s} CAP {s} NAK {s}\r\n", 1032 .{ self.hostname, conn.nickname(), cap_str }, 1033 ); 1034 continue; 1035 }; 1036 try conn.enableCap(cap); 1037 try conn.print( 1038 self.gpa, 1039 ":{s} CAP {s} ACK {s}\r\n", 1040 .{ self.hostname, conn.nickname(), cap_str }, 1041 ); 1042 } 1043 } else if (std.mem.eql(u8, subcmd, "END")) { 1044 // END signals the end of capability negotiation. It's possible to be authenticated 1045 // already if it happened really fast 1046 } else return conn.print( 1047 self.gpa, 1048 ":{s} 410 {s} {s} :Invalid CAP command\r\n", 1049 .{ self.hostname, conn.nickname(), subcmd }, 1050 ); 1051} 1052 1053fn handleNick(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1054 // Silently ignore nick commands. We get this from the Github auth 1055 _ = self; 1056 _ = conn; 1057 _ = msg; 1058} 1059 1060fn handleUser(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1061 // Silently ignore user commands. We get this from the Github auth 1062 _ = self; 1063 _ = conn; 1064 _ = msg; 1065} 1066 1067fn handleAuthenticate(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1068 var iter = msg.paramIterator(); 1069 const pending_mech: ?SaslMechanism = for (self.pending_auth.items, 0..) |pending_conn, i| { 1070 if (pending_conn.conn == conn) { 1071 // This connection is already pending auth. We can remove it from the list now 1072 _ = self.pending_auth.swapRemove(i); 1073 break pending_conn.mechanism; 1074 } 1075 } else null; 1076 1077 if (pending_mech == null) { 1078 // We "unauthenticate" the connection by setting it's user field to null. We do this 1079 // even for failure 1080 conn.user = null; 1081 // If we aren't pending auth, this should be AUTHENTICATE <mechanism> 1082 // We first must get AUTHENTICATE <mechanism> 1083 const mechanism = iter.next() orelse 1084 return self.errNeedMoreParams(conn, "AUTHENTICATE"); 1085 if (std.ascii.eqlIgnoreCase("PLAIN", mechanism)) { 1086 const pending: PendingAuth = .{ .mechanism = .plain, .conn = conn }; 1087 try self.pending_auth.append(self.gpa, pending); 1088 return conn.write(self.gpa, "AUTHENTICATE +\r\n"); 1089 } 1090 return conn.print( 1091 self.gpa, 1092 ":{s} 908 {s} PLAIN :are available SASL mechanisms\r\n", 1093 .{ self.hostname, conn.nickname() }, 1094 ); 1095 } 1096 1097 // This is our username + password 1098 const str = iter.next() orelse return self.errNeedMoreParams(conn, "AUTHENTICATE"); 1099 1100 var buf: [1024]u8 = undefined; 1101 const Decoder = std.base64.standard.Decoder; 1102 const len = Decoder.calcSizeForSlice(str) catch |err| { 1103 switch (err) { 1104 error.InvalidPadding => return self.errSaslFail(conn, "invalid base64 encoding"), 1105 else => unreachable, 1106 } 1107 }; 1108 const decode_buf = buf[0..len]; 1109 std.base64.standard.Decoder.decode(decode_buf, str) catch |err| { 1110 switch (err) { 1111 error.InvalidPadding => return self.errSaslFail(conn, "invalid base64 encoding"), 1112 error.InvalidCharacter => return self.errSaslFail(conn, "invalid base64 encoding"), 1113 error.NoSpaceLeft => return self.errSaslFail(conn, "auth too long"), 1114 } 1115 }; 1116 1117 var sasl_iter = std.mem.splitScalar(u8, decode_buf, 0x00); 1118 1119 // Authorized as is the identity we will act as. We generally ignore this 1120 const authorized_as = sasl_iter.next() orelse 1121 return self.errSaslFail(conn, "invalid SASL message"); 1122 _ = authorized_as; 1123 1124 // Authenticate as is the identity that belongs to the password 1125 const authenticate_as = sasl_iter.next() orelse 1126 return self.errSaslFail(conn, "invalid SASL message"); 1127 1128 // The password 1129 const password = sasl_iter.next() orelse 1130 return self.errSaslFail(conn, "invalid SASL message"); 1131 1132 const arena: HeapArena = try .init(self.gpa); 1133 errdefer arena.deinit(); 1134 switch (self.auth) { 1135 .none => { 1136 self.wakeup_queue.push(.{ 1137 .auth_success = .{ 1138 .arena = arena, 1139 .fd = conn.client, 1140 .nick = try arena.allocator().dupe(u8, authenticate_as), 1141 .user = try arena.allocator().dupe(u8, authenticate_as), 1142 .realname = try arena.allocator().dupe(u8, authenticate_as), 1143 .avatar_url = "", 1144 }, 1145 }); 1146 }, 1147 .github => { 1148 const auth_header = try std.fmt.allocPrint( 1149 arena.allocator(), 1150 "Bearer {s}", 1151 .{password}, 1152 ); 1153 try self.thread_pool.spawn(github.authenticate, .{ 1154 arena, 1155 &self.http_client, 1156 &self.wakeup_queue, 1157 conn.client, 1158 auth_header, 1159 }); 1160 }, 1161 .atproto => { 1162 const handle = try arena.allocator().dupe(u8, authenticate_as); 1163 const dupe_pass = try arena.allocator().dupe(u8, password); 1164 try self.thread_pool.spawn( 1165 atproto.authenticateConnection, 1166 .{ 1167 arena, 1168 &self.http_client, 1169 &self.wakeup_queue, 1170 self.db_pool, 1171 conn.client, 1172 handle, 1173 dupe_pass, 1174 }, 1175 ); 1176 }, 1177 } 1178} 1179 1180fn handlePing(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1181 try conn.print( 1182 self.gpa, 1183 ":{s} PONG {s} :{s}\r\n", 1184 .{ self.hostname, self.hostname, msg.rawParameters() }, 1185 ); 1186} 1187 1188fn handleQuit(self: *Server, conn: *Connection, msg: Message) error{ClientQuit}!void { 1189 _ = msg; 1190 conn.print(self.gpa, ":{s} ERROR :Client quit\r\n", .{self.hostname}) catch {}; 1191 return error.ClientQuit; 1192} 1193 1194fn handleMode(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1195 var iter = msg.paramIterator(); 1196 const target = iter.next() orelse return self.errNeedMoreParams(conn, "MODE"); 1197 if (target.len == 0) return self.errNeedMoreParams(conn, "MODE"); 1198 1199 if (target[0] != '#') { 1200 // User MODE 1201 const source = conn.user orelse 1202 return self.errUnknownError(conn, "MODE", "must be authenticated for MODE"); 1203 _ = source; 1204 // TODO: implement this 1205 return; 1206 } 1207 1208 // Target is a channel 1209 const channel = self.channels.get(target) orelse { 1210 return self.errNoSuchChannel(conn, target); 1211 }; 1212 1213 const modestring = iter.next() orelse { 1214 // TODO: send the channel mode. We don't have any right now 1215 return; 1216 }; 1217 1218 // If we have a modestring, we also have to be authenticated 1219 const user = conn.user orelse { 1220 return self.errChanOpPrivsNeeded(conn, channel.name); 1221 }; 1222 1223 const privs = channel.getPrivileges(user); 1224 1225 if (!user.modes.operator and !privs.operator) { 1226 // User either needs to be global ops or chanops 1227 return self.errChanOpPrivsNeeded(conn, channel.name); 1228 } 1229 1230 // We have the right privileges. Get the arguments (we should have one, a nickname) 1231 const arg = iter.next() orelse return self.errNeedMoreParams(conn, "MODE"); 1232 1233 // Validate the argument 1234 if (arg.len == 0) return self.errNeedMoreParams(conn, "MODE"); 1235 if (arg[0] == '#') return self.errUnknownError(conn, "MODE", "argument cannot be a channel"); 1236 1237 // Get the target user we are modifying the mode of 1238 const target_user = self.nick_map.get(arg) orelse { 1239 return self.errNoSuchNick(conn, arg); 1240 }; 1241 1242 // Get the target_users current privileges 1243 var target_privs = channel.getPrivileges(target_user); 1244 1245 // Parse and apply the privileges 1246 const State = enum { none, add, remove }; 1247 var state: State = .none; 1248 for (modestring) |b| { 1249 switch (b) { 1250 '+' => state = .add, 1251 '-' => state = .remove, 1252 'o' => { 1253 switch (state) { 1254 .add => target_privs.operator = true, 1255 .remove => target_privs.operator = false, 1256 .none => {}, 1257 } 1258 }, 1259 else => log.warn("unsupported mode byte: {c}", .{b}), 1260 } 1261 } 1262 1263 // Update the state in mem and db 1264 return channel.storePrivileges(self, target_user, target_privs); 1265} 1266 1267fn handlePrivMsg(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1268 const source = conn.user orelse 1269 return self.errUnknownError(conn, "PRIVMSG", "cannot PRIVMSG before authentication"); 1270 1271 var iter = msg.paramIterator(); 1272 const target = iter.next() orelse return self.errNoRecipient(conn); 1273 const text = iter.next() orelse return self.errNoTextToSend(conn); 1274 1275 if (target.len == 0) return self.errNoRecipient(conn); 1276 switch (target[0]) { 1277 '#' => { 1278 const channel = self.channels.get(target) orelse { 1279 return self.errNoSuchChannel(conn, target); 1280 }; 1281 1282 { 1283 // store the message 1284 const arena: HeapArena = try .init(self.gpa); 1285 errdefer arena.deinit(); 1286 const sender_nick = try arena.allocator().dupe(u8, source.nick); 1287 const target_name = try arena.allocator().dupe(u8, target); 1288 const msg_dupe = try msg.copy(arena.allocator()); 1289 try self.thread_pool.spawn( 1290 db.storeChannelMessage, 1291 .{ arena, self.db_pool, sender_nick, target_name, msg_dupe }, 1292 ); 1293 } 1294 1295 // Send message to any http clients. 1296 try channel.sendPrivMsgToStreams(self, source, msg); 1297 1298 for (channel.members.items) |m| { 1299 const u = m.user; 1300 for (u.connections.items) |c| { 1301 if (c.caps.@"server-time" or c.caps.@"message-tags") { 1302 const urn = uuid.urn.serialize(msg.uuid); 1303 try c.print( 1304 self.gpa, 1305 "@time={};msgid={s} ", 1306 .{ msg.timestamp, &urn }, 1307 ); 1308 } 1309 1310 // If this is our account, we only send if we have echo-message enabled 1311 if (u == source and !c.caps.@"echo-message") continue; 1312 1313 try c.print(self.gpa, ":{s} PRIVMSG {s} :{s}\r\n", .{ source.nick, target, text }); 1314 try self.queueWrite(c.client, c); 1315 } 1316 } 1317 }, 1318 else => { 1319 // Get the connections for this nick 1320 const user = self.nick_map.get(target) orelse { 1321 return self.errNoSuchNick(conn, target); 1322 }; 1323 1324 { 1325 // store the message 1326 const arena: HeapArena = try .init(self.gpa); 1327 errdefer arena.deinit(); 1328 const sender_nick = try arena.allocator().dupe(u8, source.nick); 1329 const target_name = try arena.allocator().dupe(u8, target); 1330 const msg_dupe = try msg.copy(arena.allocator()); 1331 try self.thread_pool.spawn( 1332 db.storePrivateMessage, 1333 .{ arena, self.db_pool, sender_nick, target_name, msg_dupe }, 1334 ); 1335 } 1336 1337 for (user.connections.items) |c| { 1338 if (c.caps.@"server-time" or c.caps.@"message-tags") { 1339 const urn = uuid.urn.serialize(msg.uuid); 1340 try c.print( 1341 self.gpa, 1342 "@time={};msgid={s} ", 1343 .{ msg.timestamp, &urn }, 1344 ); 1345 } 1346 1347 try c.print(self.gpa, ":{s} PRIVMSG {s} :{s}\r\n", .{ source.nick, target, text }); 1348 try self.queueWrite(c.client, c); 1349 } 1350 1351 for (source.connections.items) |c| { 1352 if (!c.caps.@"echo-message") continue; 1353 if (c.caps.@"server-time" or c.caps.@"message-tags") { 1354 const urn = uuid.urn.serialize(msg.uuid); 1355 try c.print( 1356 self.gpa, 1357 "@time={};msgid={s} ", 1358 .{ msg.timestamp, &urn }, 1359 ); 1360 } 1361 1362 try c.print(self.gpa, ":{s} PRIVMSG {s} :{s}\r\n", .{ source.nick, target, text }); 1363 try self.queueWrite(c.client, c); 1364 } 1365 }, 1366 } 1367} 1368 1369fn handleTagMsg(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1370 const source = conn.user orelse 1371 return self.errUnknownError(conn, "TAGMSG", "cannot TAGMSG before authentication"); 1372 // TODO: store the message in database 1373 var iter = msg.paramIterator(); 1374 const target = iter.next() orelse return self.errNoRecipient(conn); 1375 1376 if (target.len == 0) return self.errNoRecipient(conn); 1377 switch (target[0]) { 1378 '#' => { 1379 const channel = self.channels.get(target) orelse { 1380 return self.errNoSuchChannel(conn, target); 1381 }; 1382 for (channel.members.items) |m| { 1383 const u = m.user; 1384 for (u.connections.items) |c| { 1385 // We don't send tag messages to connections which haven't enabled 1386 // message-tags 1387 if (!c.caps.@"message-tags") continue; 1388 if (c.caps.@"server-time") { 1389 try c.print(self.gpa, "@time={}", .{msg.timestamp}); 1390 } 1391 1392 // If this is our account, we only send if we have echo-message enabled 1393 if (u == source and !c.caps.@"echo-message") continue; 1394 var tag_iter = msg.tagIterator(); 1395 while (tag_iter.next()) |tag| { 1396 try c.write(self.gpa, ";"); 1397 try c.write(self.gpa, tag.key); 1398 try c.write(self.gpa, "="); 1399 try c.write(self.gpa, tag.value); 1400 } 1401 try c.write(self.gpa, " "); 1402 1403 try c.print(self.gpa, ":{s} TAGMSG {s}\r\n", .{ source.nick, target }); 1404 try self.queueWrite(c.client, c); 1405 } 1406 } 1407 }, 1408 else => { 1409 // Get the connections for this nick 1410 const user = self.nick_map.get(target) orelse { 1411 return self.errNoSuchNick(conn, target); 1412 }; 1413 1414 for (user.connections.items) |c| { 1415 // We don't send tag messages to connections which haven't enabled 1416 // message-tags 1417 if (!c.caps.@"message-tags") continue; 1418 if (c.caps.@"server-time") { 1419 try c.print(self.gpa, "@time={}", .{msg.timestamp}); 1420 } 1421 1422 var tag_iter = msg.tagIterator(); 1423 while (tag_iter.next()) |tag| { 1424 try c.write(self.gpa, ";"); 1425 try c.write(self.gpa, tag.key); 1426 try c.write(self.gpa, "="); 1427 try c.write(self.gpa, tag.value); 1428 } 1429 try c.write(self.gpa, " "); 1430 1431 try c.print(self.gpa, ":{s} TAGMSG {s}\r\n", .{ source.nick, target }); 1432 try self.queueWrite(c.client, c); 1433 } 1434 1435 for (source.connections.items) |c| { 1436 if (!c.caps.@"echo-message") continue; 1437 if (c.caps.@"server-time") { 1438 try c.print(self.gpa, "@time={}", .{msg.timestamp}); 1439 } 1440 1441 var tag_iter = msg.tagIterator(); 1442 while (tag_iter.next()) |tag| { 1443 try c.write(self.gpa, ";"); 1444 try c.write(self.gpa, tag.key); 1445 try c.write(self.gpa, "="); 1446 try c.write(self.gpa, tag.value); 1447 } 1448 try c.write(self.gpa, " "); 1449 1450 try c.print(self.gpa, ":{s} TAGMSG {s}\r\n", .{ source.nick, target }); 1451 try self.queueWrite(c.client, c); 1452 } 1453 }, 1454 } 1455} 1456 1457fn handleJoin(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1458 const user = conn.user orelse 1459 return self.errUnknownError(conn, "JOIN", "cannot join before authentication"); 1460 var iter = msg.paramIterator(); 1461 const target = iter.next() orelse return self.errNeedMoreParams(conn, "JOIN"); 1462 1463 if (target.len > 32) { 1464 return conn.print( 1465 self.gpa, 1466 ":{s} 476 {s} :Channel name is too long\r\n", 1467 .{ self.hostname, target }, 1468 ); 1469 } 1470 1471 if (target.len == 0) return self.errNeedMoreParams(conn, "JOIN"); 1472 switch (target[0]) { 1473 '#' => {}, 1474 else => return self.errNoSuchChannel(conn, target), 1475 } 1476 1477 if (!self.channels.contains(target)) { 1478 const arena: HeapArena = try .init(self.gpa); 1479 const target_dupe = try arena.allocator().dupe(u8, target); 1480 // Create the channel in the db 1481 try self.thread_pool.spawn(db.createChannel, .{ arena, self.db_pool, target_dupe }); 1482 1483 const channel = try self.gpa.create(Channel); 1484 const name = try self.gpa.dupe(u8, target); 1485 channel.* = .init(name, ""); 1486 try self.channels.put(self.gpa, name, channel); 1487 } 1488 1489 const channel = self.channels.get(target).?; 1490 try channel.addUser(self, user, conn); 1491 1492 // drafts/read-marker requires us to send a MARKREAD on join 1493 if (conn.caps.@"draft/read-marker") { 1494 const arena: HeapArena = try .init(self.gpa); 1495 const target2 = try arena.allocator().dupe(u8, target); 1496 const nick = try arena.allocator().dupe(u8, user.nick); 1497 1498 try self.thread_pool.spawn(db.getMarkRead, .{ 1499 arena, 1500 self.db_pool, 1501 &self.wakeup_queue, 1502 conn.client, 1503 nick, 1504 target2, 1505 }); 1506 } 1507} 1508 1509fn handleTopic(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1510 var iter = msg.paramIterator(); 1511 const target = iter.next() orelse return self.errNeedMoreParams(conn, "TOPIC"); 1512 const topic = iter.next() orelse ""; 1513 if (topic.len > 0 and conn.user == null) { 1514 return self.errUnknownError(conn, "TOPIC", "cannot set topic without authentication"); 1515 } 1516 1517 const channel = self.channels.get(target) orelse { 1518 return self.errNoSuchChannel(conn, target); 1519 }; 1520 try channel.setTopic(self, conn, topic); 1521} 1522 1523fn handlePart(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1524 const user = conn.user orelse 1525 return self.errUnknownError(conn, "PART", "cannot part before authentication"); 1526 var iter = msg.paramIterator(); 1527 const target = iter.next() orelse return self.errNeedMoreParams(conn, "JOIN"); 1528 1529 var chan_iter = std.mem.splitScalar(u8, target, ','); 1530 while (chan_iter.next()) |chan| { 1531 const channel = self.channels.get(chan) orelse continue; 1532 try channel.removeUser(self, user); 1533 } 1534} 1535 1536fn handleNames(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1537 const cmd = "NAMES"; 1538 var iter = msg.paramIterator(); 1539 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1540 1541 if (target.len == 0) return self.errNeedMoreParams(conn, cmd); 1542 switch (target[0]) { 1543 '#' => {}, 1544 else => return self.errUnknownError(conn, cmd, "not a valid channel name"), 1545 } 1546 1547 const channel = self.channels.get(target) orelse { 1548 return self.errNoSuchChannel(conn, target); 1549 }; 1550 1551 try channel.names(self, conn); 1552} 1553 1554fn handleList(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1555 // TODO: handle masks 1556 _ = msg; 1557 try conn.print( 1558 self.gpa, 1559 ":{s} 321 {s} Channel :Start of LIST\r\n", 1560 .{ self.hostname, conn.nickname() }, 1561 ); 1562 1563 for (self.channels.values()) |channel| { 1564 try conn.print( 1565 self.gpa, 1566 ":{s} 322 {s} {s} {d} :{s}\r\n", 1567 .{ 1568 self.hostname, 1569 conn.nickname(), 1570 channel.name, 1571 channel.members.items.len, 1572 channel.topic, 1573 }, 1574 ); 1575 } 1576 1577 try conn.print( 1578 self.gpa, 1579 ":{s} 323 {s} :End of LIST\r\n", 1580 .{ self.hostname, conn.nickname() }, 1581 ); 1582} 1583 1584fn handleWho(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1585 const cmd = "WHO"; 1586 var iter = msg.paramIterator(); 1587 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1588 1589 if (target.len == 0) return self.errNeedMoreParams(conn, cmd); 1590 switch (target[0]) { 1591 '#' => { 1592 const channel = self.channels.get(target) orelse { 1593 return self.errNoSuchChannel(conn, target); 1594 }; 1595 1596 try channel.who(self, conn, msg); 1597 }, 1598 else => { 1599 const client: []const u8 = if (conn.user) |user| user.nick else "*"; 1600 const user = self.nick_map.get(target) orelse { 1601 return self.errNoSuchNick(conn, target); 1602 }; 1603 const args = iter.next() orelse ""; 1604 const token = iter.next(); 1605 if (args.len == 0) { 1606 try conn.print( 1607 self.gpa, 1608 ":{s} 352 {s} * {s} {s} {s} {s} {s} :0 {s}\r\n", 1609 .{ 1610 self.hostname, 1611 client, 1612 user.username, 1613 self.hostname, 1614 self.hostname, 1615 user.nick, 1616 "H", // TODO: flags, now we just always say the user is H="here" 1617 user.real, 1618 }, 1619 ); 1620 } else { 1621 try conn.print( 1622 self.gpa, 1623 ":{s} 354 {s}", 1624 .{ self.hostname, client }, 1625 ); 1626 1627 // Find the index of the standard field indicator 1628 const std_idx = std.mem.indexOfScalar(u8, args, '%') orelse args.len; 1629 // TODO: any nonstandard fields 1630 1631 // Handle standard fields, in order. The order is tcuihsnfdlaor 1632 if (std.mem.indexOfScalarPos(u8, args, std_idx, 't')) |_| { 1633 if (token) |t| try conn.print(self.gpa, " {s}", .{t}); 1634 } 1635 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'c')) |_| { 1636 try conn.print(self.gpa, " *", .{}); 1637 } 1638 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'u')) |_| { 1639 try conn.print(self.gpa, " {s}", .{user.username}); 1640 } 1641 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'i')) |_| { 1642 try conn.print(self.gpa, " {s}", .{"127.0.0.1"}); 1643 } 1644 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'h')) |_| { 1645 try conn.print(self.gpa, " {s}", .{self.hostname}); 1646 } 1647 if (std.mem.indexOfScalarPos(u8, args, std_idx, 's')) |_| { 1648 try conn.print(self.gpa, " {s}", .{self.hostname}); 1649 } 1650 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'n')) |_| { 1651 try conn.print(self.gpa, " {s}", .{user.nick}); 1652 } 1653 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'f')) |_| { 1654 // TODO: user flags 1655 try conn.print(self.gpa, " H", .{}); 1656 } 1657 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'd')) |_| { 1658 try conn.write(self.gpa, " 0"); 1659 } 1660 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'l')) |_| { 1661 try conn.write(self.gpa, " 0"); 1662 } 1663 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'a')) |_| { 1664 try conn.print(self.gpa, " {s}", .{user.username}); 1665 } 1666 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'o')) |_| { 1667 // TODO: chan op level 1668 try conn.print(self.gpa, " {s}", .{user.username}); 1669 } 1670 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'r')) |_| { 1671 try conn.print(self.gpa, " :{s}", .{user.real}); 1672 } 1673 try conn.write(self.gpa, "\r\n"); 1674 } 1675 try conn.print( 1676 self.gpa, 1677 ":{s} 315 {s} * :End of WHO list\r\n", 1678 .{ self.hostname, client }, 1679 ); 1680 try self.queueWrite(conn.client, conn); 1681 }, 1682 } 1683} 1684 1685fn handleAway(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1686 const user = conn.user orelse 1687 return self.errUnknownError(conn, "AWAY", "cannot set AWAY without authentication"); 1688 1689 var iter = msg.paramIterator(); 1690 if (iter.next()) |_| { 1691 user.away = true; 1692 for (user.channels.items) |chan| { 1693 try chan.notifyAway(self, user); 1694 } 1695 for (user.connections.items) |c| { 1696 try c.print( 1697 self.gpa, 1698 ":{s} 306 {s} :You have been marked as away\r\n", 1699 .{ self.hostname, c.nickname() }, 1700 ); 1701 try self.queueWrite(c.client, c); 1702 } 1703 } else { 1704 user.away = false; 1705 for (user.channels.items) |chan| { 1706 try chan.notifyBack(self, user); 1707 } 1708 for (user.connections.items) |c| { 1709 try c.print( 1710 self.gpa, 1711 ":{s} 305 {s} :You are no longer marked as away\r\n", 1712 .{ self.hostname, c.nickname() }, 1713 ); 1714 try self.queueWrite(c.client, c); 1715 } 1716 } 1717} 1718 1719fn handleChathistory(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1720 const user = conn.user orelse return self.errUnknownError( 1721 conn, 1722 "CHATHISTORY TARGETS", 1723 "cannot CHATHISTORY without authentication", 1724 ); 1725 const cmd = "CHATHISTORY"; 1726 var iter = msg.paramIterator(); 1727 1728 const subcmd = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1729 1730 if (std.ascii.eqlIgnoreCase("TARGETS", subcmd)) { 1731 const sub = "TARGETS"; 1732 // "TARGETS <ts_one> <ts_two> <limit>". There is no requirement that ts_one < ts_two 1733 const ts_one = iter.next() orelse return self.errNeedMoreParams(conn, cmd ++ " " ++ sub); 1734 const ts_two = iter.next() orelse return self.errNeedMoreParams(conn, cmd ++ " " ++ sub); 1735 const limit = iter.next() orelse return self.errNeedMoreParams(conn, cmd ++ " " ++ sub); 1736 1737 const ts_one_str = blk: { 1738 var ts_iter = std.mem.splitScalar(u8, ts_one, '='); 1739 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param"); 1740 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param"); 1741 }; 1742 const ts_two_str = blk: { 1743 var ts_iter = std.mem.splitScalar(u8, ts_two, '='); 1744 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param"); 1745 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param"); 1746 }; 1747 1748 const ts_one_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_one_str } }) catch { 1749 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp"); 1750 }; 1751 const ts_two_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_two_str } }) catch { 1752 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp"); 1753 }; 1754 // NOTE: For TARGETS, we don't have an internal limit 1755 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit, 10) catch { 1756 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1757 }; 1758 1759 const from = if (ts_one_inst.timestamp < ts_two_inst.timestamp) ts_one_inst else ts_two_inst; 1760 const to = if (ts_one_inst.timestamp > ts_two_inst.timestamp) ts_one_inst else ts_two_inst; 1761 1762 const req: ChatHistory.TargetsRequest = .{ 1763 .from = .{ .milliseconds = @intCast(from.milliTimestamp()) }, 1764 .to = .{ .milliseconds = @intCast(to.milliTimestamp()) }, 1765 .limit = limit_int, 1766 }; 1767 const arena: HeapArena = try .init(self.gpa); 1768 const nick = try arena.allocator().dupe(u8, user.nick); 1769 // Spawn a db query 1770 try self.thread_pool.spawn( 1771 db.chathistoryTargets, 1772 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, nick, req }, 1773 ); 1774 return; 1775 } 1776 1777 if (std.ascii.eqlIgnoreCase("AFTER", subcmd)) { 1778 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1779 if (target.len == 0) { 1780 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid target"); 1781 } 1782 1783 const ts = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1784 const ts_str = blk: { 1785 var ts_iter = std.mem.splitScalar(u8, ts, '='); 1786 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no timestamp param"); 1787 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no '=' separator"); 1788 }; 1789 const ts_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_str } }) catch { 1790 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp"); 1791 }; 1792 1793 const limit_str = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1794 1795 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit_str, 10) catch { 1796 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1797 }; 1798 if (limit_int > max_chathistory) { 1799 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1800 } 1801 1802 const arena: HeapArena = try .init(self.gpa); 1803 const req: ChatHistory.AfterRequest = .{ 1804 .after_ms = .{ .milliseconds = @intCast(ts_inst.milliTimestamp()) }, 1805 .limit = limit_int, 1806 .target = try arena.allocator().dupe(u8, target), 1807 }; 1808 try self.thread_pool.spawn( 1809 db.chathistoryAfter, 1810 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, req }, 1811 ); 1812 return; 1813 } 1814 1815 if (std.ascii.eqlIgnoreCase("BEFORE", subcmd)) { 1816 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1817 if (target.len == 0) { 1818 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid target"); 1819 } 1820 1821 const ts = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1822 const ts_str = blk: { 1823 var ts_iter = std.mem.splitScalar(u8, ts, '='); 1824 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no timestamp param"); 1825 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no '=' separator"); 1826 }; 1827 const ts_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_str } }) catch { 1828 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp"); 1829 }; 1830 1831 const limit_str = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1832 1833 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit_str, 10) catch { 1834 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1835 }; 1836 if (limit_int > max_chathistory) { 1837 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1838 } 1839 1840 const arena: HeapArena = try .init(self.gpa); 1841 const req: ChatHistory.BeforeRequest = .{ 1842 .conn = conn, 1843 .before_ms = .{ .milliseconds = @intCast(ts_inst.milliTimestamp()) }, 1844 .limit = limit_int, 1845 .target = try arena.allocator().dupe(u8, target), 1846 }; 1847 try self.thread_pool.spawn( 1848 db.chathistoryBefore, 1849 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, req }, 1850 ); 1851 return; 1852 } 1853 1854 if (std.ascii.eqlIgnoreCase("LATEST", subcmd)) { 1855 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1856 if (target.len == 0) { 1857 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid target"); 1858 } 1859 1860 const restriction = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1861 // TODO: handle the restriction. This could be "*", or a timestamp, or a msgid 1862 _ = restriction; 1863 1864 const limit_str = iter.next() orelse return self.errNeedMoreParams(conn, cmd); 1865 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit_str, 10) catch { 1866 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1867 }; 1868 if (limit_int > max_chathistory) { 1869 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit"); 1870 } 1871 1872 const arena: HeapArena = try .init(self.gpa); 1873 const req: ChatHistory.LatestRequest = .{ 1874 .conn = conn, 1875 .limit = limit_int, 1876 .target = try arena.allocator().dupe(u8, target), 1877 }; 1878 try self.thread_pool.spawn( 1879 db.chathistoryLatest, 1880 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, req }, 1881 ); 1882 return; 1883 } 1884} 1885 1886fn handleMarkread(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void { 1887 const user = conn.user orelse return; 1888 var iter = msg.paramIterator(); 1889 const target = iter.next() orelse 1890 return self.fail(conn, "MARKREAD", "NEED_MORE_PARAMS", "Missing parameters"); 1891 if (iter.next()) |timestamp| { 1892 const ts_str = blk: { 1893 var ts_iter = std.mem.splitScalar(u8, timestamp, '='); 1894 _ = ts_iter.next() orelse return self.fail(conn, "MARKREAD", "INVALID_PARAMS", "no timestamp param"); 1895 break :blk ts_iter.next() orelse return self.fail(conn, "MARKREAD", "INVALID_PARAMS", "no '=' separator"); 1896 }; 1897 const ts_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_str } }) catch { 1898 return self.fail(conn, "MARKREAD", "INVALID_PARAMS", "invalid timestamp"); 1899 }; 1900 1901 const arena: HeapArena = try .init(self.gpa); 1902 const target_duped = try arena.allocator().dupe(u8, target); 1903 const ts_: Timestamp = .{ .milliseconds = @intCast(ts_inst.milliTimestamp()) }; 1904 try self.thread_pool.spawn(db.setMarkRead, .{ 1905 arena, 1906 self.db_pool, 1907 &self.wakeup_queue, 1908 conn.client, 1909 user.nick, 1910 target_duped, 1911 ts_, 1912 }); 1913 } 1914} 1915 1916/// Sends a "standard reply" FAIL 1917fn fail( 1918 self: *Server, 1919 conn: *Connection, 1920 cmd: []const u8, 1921 code: []const u8, 1922 description: []const u8, 1923) Allocator.Error!void { 1924 return self.standardReply(conn, "FAIL", cmd, code, description); 1925} 1926 1927fn standardReply( 1928 self: *Server, 1929 conn: *Connection, 1930 kind: []const u8, 1931 cmd: []const u8, 1932 code: []const u8, 1933 description: []const u8, 1934) Allocator.Error!void { 1935 try conn.print( 1936 self.gpa, 1937 ":{s} {s} {s} {s} :{s}\r\n", 1938 .{ self.hostname, kind, cmd, code, description }, 1939 ); 1940} 1941 1942fn errNoSuchNick(self: *Server, conn: *Connection, nick_or_chan: []const u8) Allocator.Error!void { 1943 try conn.print( 1944 self.gpa, 1945 ":{s} 401 {s} {s} :No such nick/channel\r\n", 1946 .{ self.hostname, conn.nickname(), nick_or_chan }, 1947 ); 1948} 1949 1950fn errNoSuchChannel(self: *Server, conn: *Connection, chan: []const u8) Allocator.Error!void { 1951 try conn.print( 1952 self.gpa, 1953 ":{s} 403 {s} {s} :No such channel\r\n", 1954 .{ self.hostname, conn.nickname(), chan }, 1955 ); 1956} 1957 1958fn errNoRecipient(self: *Server, conn: *Connection) Allocator.Error!void { 1959 try conn.print( 1960 self.gpa, 1961 ":{s} 411 {s} :No recipient given\r\n", 1962 .{ self.hostname, conn.nickname() }, 1963 ); 1964} 1965 1966fn errNoTextToSend(self: *Server, conn: *Connection) Allocator.Error!void { 1967 try conn.print( 1968 self.gpa, 1969 ":{s} 412 {s} :No text to send\r\n", 1970 .{ self.hostname, conn.nickname() }, 1971 ); 1972} 1973 1974fn errInputTooLong(self: *Server, conn: *Connection) Allocator.Error!void { 1975 try conn.print( 1976 self.gpa, 1977 ":{s} 417 {s} :Input too long\r\n", 1978 .{ self.hostname, conn.nickname() }, 1979 ); 1980} 1981 1982fn errNeedMoreParams(self: *Server, conn: *Connection, cmd: []const u8) Allocator.Error!void { 1983 try conn.print( 1984 self.gpa, 1985 ":{s} 461 {s} {s} :Not enough paramaters\r\n", 1986 .{ self.hostname, conn.nickname(), cmd }, 1987 ); 1988} 1989 1990fn errUnknownCommand(self: *Server, conn: *Connection, cmd: []const u8) Allocator.Error!void { 1991 log.err("unknown command: {s}", .{cmd}); 1992 try conn.print( 1993 self.gpa, 1994 ":{s} 421 {s} {s} :Unknown command\r\n", 1995 .{ self.hostname, conn.nickname(), cmd }, 1996 ); 1997} 1998 1999pub fn errChanOpPrivsNeeded(self: *Server, conn: *Connection, channel: []const u8) Allocator.Error!void { 2000 try conn.print( 2001 self.gpa, 2002 ":{s} 482 {s} {s} :You must be a channel operator to perform that command\r\n", 2003 .{ self.hostname, conn.nickname(), channel }, 2004 ); 2005} 2006 2007fn errUnknownError( 2008 self: *Server, 2009 conn: *Connection, 2010 cmd: []const u8, 2011 err: []const u8, 2012) Allocator.Error!void { 2013 try conn.print( 2014 self.gpa, 2015 ":{s} 400 {s} {s} :{s}\r\n", 2016 .{ self.hostname, conn.nickname(), cmd, err }, 2017 ); 2018} 2019 2020fn errSaslFail(self: *Server, conn: *Connection, msg: []const u8) Allocator.Error!void { 2021 try conn.print( 2022 self.gpa, 2023 ":{s} 904 {s} :SASL authenticated failed: {s}\r\n", 2024 .{ self.hostname, conn.nickname(), msg }, 2025 ); 2026} 2027 2028fn webMain( 2029 self: *Server, 2030 port: u16, 2031) !void { 2032 const server = &self.httpz_server; 2033 var router = try server.router(.{}); 2034 router.get("/", Http.getIndex, .{}); 2035 router.get("/assets/:type/:name", Http.getAsset, .{}); 2036 router.get("/channels/:channel", Http.getChannel, .{}); 2037 router.get("/channels/:channel/events", Http.startChannelEventStream, .{}); 2038 router.get("/channels", Http.getChannels, .{}); 2039 2040 log.info("HTTP server listening on http://localhost:{d}", .{port}); 2041 try self.httpz_server.listen(); 2042} 2043 2044pub const Connection = struct { 2045 client: xev.TCP, 2046 2047 read_buf: [512]u8, 2048 read_queue: std.ArrayListUnmanaged(u8), 2049 read_c: *xev.Completion, 2050 2051 write_buf: std.ArrayListUnmanaged(u8), 2052 2053 /// User will always be non-null for authenticated connections 2054 user: ?*User, 2055 2056 caps: Capabilities, 2057 // Time the connection started 2058 connected_at: u32, 2059 2060 fn init(self: *Connection, client: xev.TCP, completion: *xev.Completion) void { 2061 self.* = .{ 2062 .client = client, 2063 2064 .read_buf = undefined, 2065 .read_queue = .empty, 2066 .read_c = completion, 2067 2068 .write_buf = .empty, 2069 2070 .user = null, 2071 2072 .caps = .{}, 2073 .connected_at = @intCast(std.time.timestamp()), 2074 }; 2075 } 2076 2077 fn isAuthenticated(self: *Connection) bool { 2078 return self.user != null; 2079 } 2080 2081 pub fn nickname(self: *Connection) []const u8 { 2082 if (self.user) |user| return user.nick; 2083 return "*"; 2084 } 2085 2086 fn deinit(self: *Connection, gpa: Allocator) void { 2087 self.read_queue.deinit(gpa); 2088 self.write_buf.deinit(gpa); 2089 } 2090 2091 pub fn write(self: *Connection, gpa: Allocator, bytes: []const u8) Allocator.Error!void { 2092 try self.write_buf.appendSlice(gpa, bytes); 2093 } 2094 2095 pub fn print(self: *Connection, gpa: Allocator, comptime fmt: []const u8, args: anytype) Allocator.Error!void { 2096 return self.write_buf.writer(gpa).print(fmt, args); 2097 } 2098 2099 fn enableCap(self: *Connection, cap: Capability) Allocator.Error!void { 2100 switch (cap) { 2101 .@"away-notify" => self.caps.@"away-notify" = true, 2102 .batch => {}, // TODO: do we care? 2103 .@"draft/chathistory" => self.caps.@"draft/chathistory" = true, 2104 .@"draft/read-marker" => self.caps.@"draft/read-marker" = true, 2105 .@"draft/no-implicit-names" => self.caps.@"draft/no-implicit-names" = true, 2106 .@"echo-message" => self.caps.@"echo-message" = true, 2107 .@"message-tags" => self.caps.@"message-tags" = true, 2108 .@"server-time" => self.caps.@"server-time" = true, 2109 .sasl => {}, // We don't track sasl as a requested cap, we just respond to AUTHENTICATE 2110 } 2111 } 2112}; 2113 2114// TODO: GC this. We need to move all used completions to the start, and then prune unused to some 2115// percentage 2116const MemoryPoolUnmanaged = struct { 2117 list: std.ArrayListUnmanaged(*xev.Completion), 2118 free_list: std.ArrayListUnmanaged(bool), 2119 2120 const empty: MemoryPoolUnmanaged = .{ .list = .empty, .free_list = .empty }; 2121 2122 fn create(self: *MemoryPoolUnmanaged, gpa: Allocator) Allocator.Error!*xev.Completion { 2123 // Look in our list for the first free item 2124 for (self.free_list.items, 0..) |free, i| { 2125 if (free) { 2126 self.free_list.items[i] = false; 2127 return self.list.items[i]; 2128 } 2129 } 2130 // Otherwise, we create a new node and add it to the list 2131 const c = try gpa.create(xev.Completion); 2132 c.* = .{}; 2133 try self.list.append(gpa, c); 2134 try self.free_list.append(gpa, false); 2135 return c; 2136 } 2137 2138 fn destroy(self: *MemoryPoolUnmanaged, item: *xev.Completion) void { 2139 for (self.list.items, 0..) |c, i| { 2140 if (c == item) { 2141 self.free_list.items[i] = true; 2142 return; 2143 } 2144 } 2145 unreachable; 2146 } 2147 2148 fn deinit(self: *MemoryPoolUnmanaged, gpa: Allocator) void { 2149 for (self.list.items) |node| { 2150 gpa.destroy(node); 2151 } 2152 self.list.deinit(gpa); 2153 self.free_list.deinit(gpa); 2154 } 2155}; 2156 2157/// Reads one line from the stream. If the command does not match, we fail the test 2158fn expectResponse(stream: std.net.Stream, response: []const u8) !void { 2159 var buf: [512]u8 = undefined; 2160 const actual = try stream.reader().readUntilDelimiter(&buf, '\n'); 2161 try std.testing.expectEqualStrings(response, std.mem.trimRight(u8, actual, "\r\n")); 2162} 2163 2164const TestServer = struct { 2165 server: Server, 2166 cond: std.atomic.Value(bool), 2167 thread: std.Thread, 2168 2169 streams: std.ArrayListUnmanaged(std.net.Stream), 2170 2171 fn init(self: *TestServer, gpa: Allocator) !void { 2172 self.* = .{ 2173 .server = undefined, 2174 .cond = .init(true), 2175 .thread = undefined, 2176 .streams = .empty, 2177 }; 2178 try self.server.init(gpa, .{ 2179 .hostname = "localhost", 2180 .irc_port = 0, 2181 .http_port = null, 2182 .auth = .none, 2183 .db_path = ":memory:", 2184 }); 2185 var wg: std.Thread.WaitGroup = .{}; 2186 wg.start(); 2187 self.thread = try std.Thread.spawn(.{}, Server.runUntil, .{ &self.server, &self.cond, &wg }); 2188 wg.wait(); 2189 } 2190 2191 fn deinit(self: *TestServer) void { 2192 // Close the connection 2193 self.cond.store(false, .unordered); 2194 2195 if (self.server.wakeup.notify()) { 2196 self.thread.join(); 2197 } else |err| { 2198 log.err("Failed to notify wakeup: {}", .{err}); 2199 self.thread.detach(); 2200 } 2201 self.server.deinit(); 2202 for (self.streams.items) |stream| { 2203 stream.close(); 2204 } 2205 self.streams.deinit(std.testing.allocator); 2206 self.* = undefined; 2207 } 2208 2209 fn port(self: *TestServer) u16 { 2210 return self.server.address.getPort(); 2211 } 2212 2213 fn createConnections(self: *TestServer, n: usize) ![]*Connection { 2214 try self.streams.ensureUnusedCapacity(std.testing.allocator, n); 2215 for (0..n) |_| { 2216 const stream = try std.net.tcpConnectToHost( 2217 std.testing.allocator, 2218 "localhost", 2219 self.port(), 2220 ); 2221 self.streams.appendAssumeCapacity(stream); 2222 } 2223 2224 // Sleep for up to 1 second to wait for all the connections 2225 for (0..1_000) |_| { 2226 if (self.server.connections.count() == n) break; 2227 std.time.sleep(1 * std.time.ns_per_ms); 2228 } else return error.Timeout; 2229 2230 return self.server.connections.values(); 2231 } 2232}; 2233 2234test "Message: CAP" { 2235 var ts: TestServer = undefined; 2236 try ts.init(std.testing.allocator); 2237 defer ts.deinit(); 2238 2239 const conns = try ts.createConnections(1); 2240 const client = conns[0]; 2241 2242 { 2243 // Happy path 2244 try ts.server.handleMessage(client, .init("CAP LS 302")); 2245 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost CAP * LS"); 2246 try std.testing.expectStringEndsWith(client.write_buf.items, "\r\n"); 2247 client.write_buf.clearRetainingCapacity(); 2248 2249 // ACK 2250 try ts.server.handleMessage(client, .init("CAP REQ sasl")); 2251 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost CAP * ACK sasl\r\n"); 2252 client.write_buf.clearRetainingCapacity(); 2253 2254 // NAK 2255 try ts.server.handleMessage(client, .init("CAP REQ foo")); 2256 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost CAP * NAK foo\r\n"); 2257 client.write_buf.clearRetainingCapacity(); 2258 } 2259 2260 { 2261 // Not enough parameters 2262 try ts.server.handleMessage(client, .init("CAP")); 2263 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost 461 * CAP"); 2264 try std.testing.expectStringEndsWith(client.write_buf.items, "\r\n"); 2265 client.write_buf.clearRetainingCapacity(); 2266 } 2267 2268 { 2269 // Invalid Parameters 2270 try ts.server.handleMessage(client, .init("CAP foo")); 2271 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost 410 * foo"); 2272 try std.testing.expectStringEndsWith(client.write_buf.items, "\r\n"); 2273 client.write_buf.clearRetainingCapacity(); 2274 } 2275}