this repo has no description
1const Server = @This();
2
3const builtin = @import("builtin");
4const httpz = @import("httpz");
5const std = @import("std");
6const sqlite = @import("sqlite");
7const uuid = @import("uuid");
8const xev = @import("xev");
9const zeit = @import("zeit");
10
11const atproto = @import("atproto.zig");
12const db = @import("db.zig");
13const github = @import("github.zig");
14const http = @import("http.zig");
15const irc = @import("irc.zig");
16const log = @import("log.zig");
17
18const Allocator = std.mem.Allocator;
19const Capability = irc.Capability;
20const Channel = irc.Channel;
21const ChannelPrivileges = irc.ChannelPrivileges;
22const ChatHistory = irc.ChatHistory;
23const ClientMessage = irc.ClientMessage;
24const HeapArena = @import("HeapArena.zig");
25const Http = @import("http.zig");
26const Message = irc.Message;
27const MessageIterator = irc.MessageIterator;
28const Queue = @import("queue.zig").Queue;
29const Sanitize = @import("sanitize.zig");
30const SaslMechanism = irc.SaslMechanism;
31const ThreadPool = @import("ThreadPool.zig");
32const Timestamp = irc.Timestamp;
33const User = irc.User;
34
35const assert = std.debug.assert;
36
37// We allow tags, so our maximum is 4096 + 512
38const max_message_len = 4096 + 512;
39
40const garbage_collect_ms = 1 * std.time.ms_per_min;
41
42const max_chathistory: u16 = 100;
43
44const ProcessMessageError = error{ClientQuit} || Allocator.Error;
45
46pub const WakeupResult = union(enum) {
47 auth_success: AuthSuccess,
48 auth_failure: struct {
49 arena: HeapArena,
50 fd: xev.TCP,
51 msg: []const u8,
52 },
53 history_batch: ChatHistory.HistoryBatch,
54 history_targets: ChatHistory.TargetBatch,
55 mark_read: struct {
56 arena: HeapArena,
57 fd: xev.TCP,
58 target: []const u8,
59 timestamp: ?Timestamp,
60 },
61 // a new event stream has connected. This happens in another thread, so we coalesce in the main
62 // via wakeup
63 event_stream: *http.EventStream,
64
65 pub const AuthSuccess = struct {
66 arena: HeapArena,
67 fd: xev.TCP,
68 avatar_url: []const u8,
69 nick: []const u8,
70 user: []const u8,
71 realname: []const u8,
72 };
73};
74
75pub const WorkerQueue = Queue(WakeupResult, 128);
76
77pub const Options = struct {
78 hostname: []const u8 = "localhost",
79 irc_port: u16 = 6667,
80 /// If http_port is null, the http server will not be started
81 http_port: ?u16 = 8080,
82 auth: AuthProvider = .none,
83 db_path: [:0]const u8 = "apollo.db",
84};
85
86const Capabilities = packed struct {
87 @"away-notify": bool = false,
88 @"draft/chathistory": bool = false,
89 @"draft/read-marker": bool = false,
90 @"draft/no-implicit-names": bool = false,
91 @"echo-message": bool = false,
92 @"message-tags": bool = false,
93 @"server-time": bool = false,
94 @"standard-replies": bool = false,
95};
96
97const AuthProvider = enum {
98 none,
99 github,
100 atproto,
101};
102
103const PendingAuth = struct {
104 conn: *Connection,
105 mechanism: SaslMechanism,
106};
107
108gpa: std.mem.Allocator,
109loop: xev.Loop,
110address: std.net.Address,
111tcp: xev.TCP,
112hostname: []const u8,
113auth: AuthProvider,
114
115http_client: std.http.Client,
116
117http_server: Http.Server,
118httpz_server: httpz.Server(*Http.Server),
119http_server_thread: ?std.Thread,
120
121/// maps a tcp connection to an EventStream object
122event_streams: std.AutoArrayHashMapUnmanaged(xev.TCP, *http.EventStream),
123/// maps a tcp connection to a connection object
124connections: std.AutoArrayHashMapUnmanaged(xev.TCP, *Connection),
125/// maps a nick to a user
126nick_map: std.StringArrayHashMapUnmanaged(*User),
127/// maps channel name to channel
128channels: std.StringArrayHashMapUnmanaged(*Channel),
129
130pending_auth: std.ArrayListUnmanaged(PendingAuth),
131
132garbage_collect_timer: xev.Timer,
133gc_cycle: u8,
134
135thread_pool: ThreadPool,
136db_pool: *sqlite.Pool,
137wakeup: xev.Async,
138wakeup_queue: WorkerQueue,
139
140completion_pool: MemoryPoolUnmanaged,
141next_batch: u32,
142
143pub fn init(
144 self: *Server,
145 gpa: std.mem.Allocator,
146 opts: Options,
147) !void {
148 const addr = try std.net.Address.parseIp4("127.0.0.1", opts.irc_port);
149
150 const core_count = if (builtin.is_test) 1 else @max(4, std.Thread.getCpuCount() catch 0);
151 const db_config: sqlite.Pool.Config = .{
152 .size = core_count,
153 .path = opts.db_path,
154 .flags = sqlite.OpenFlags.Create |
155 sqlite.OpenFlags.EXResCode |
156 sqlite.OpenFlags.ReadWrite,
157 .on_first_connection = db.createTables,
158 .on_connection = db.setPragmas,
159 };
160
161 const n_jobs: u16 = @intCast(core_count);
162
163 const db_pool: *sqlite.Pool = try .init(gpa, db_config);
164
165 self.* = .{
166 .gpa = gpa,
167 .loop = try xev.Loop.init(.{ .entries = 1024 }),
168 .tcp = try .init(addr),
169 .address = addr,
170 .event_streams = .empty,
171 .connections = .empty,
172 .nick_map = .empty,
173 .channels = .empty,
174 .hostname = opts.hostname,
175 .auth = opts.auth,
176 .http_client = .{ .allocator = gpa },
177 .http_server = undefined,
178 .httpz_server = undefined,
179 .http_server_thread = null,
180 .garbage_collect_timer = try .init(),
181 .gc_cycle = 0,
182 .thread_pool = undefined,
183 .db_pool = db_pool,
184 .wakeup = try .init(),
185 .wakeup_queue = .{},
186 .completion_pool = .empty,
187 .next_batch = 0,
188 .pending_auth = .empty,
189 };
190
191 if (opts.http_port) |http_port| {
192 // If we have an http port, we start the server and spawn it's thread
193 self.http_server = .{
194 .gpa = gpa,
195 .channels = &self.channels,
196 .db_pool = db_pool,
197 .irc_server = self,
198 };
199
200 self.httpz_server = try httpz.Server(*Http.Server).init(
201 gpa,
202 .{
203 .port = http_port,
204 .request = .{ .max_form_count = 1 },
205 .thread_pool = .{ .count = n_jobs },
206 },
207 &self.http_server,
208 );
209 self.http_server_thread = try .spawn(
210 .{},
211 webMain,
212 .{ self, http_port },
213 );
214 }
215 try self.thread_pool.init(.{ .allocator = gpa, .n_jobs = n_jobs });
216 self.wakeup_queue.eventfd = self.wakeup.fd;
217
218 const tcp_c = try self.completion_pool.create(self.gpa);
219 self.tcp.accept(&self.loop, tcp_c, Server, self, Server.onAccept);
220 log.info("Listening at {}", .{self.address});
221
222 // Start listening for our wakeup
223 const wakeup_c = try self.completion_pool.create(self.gpa);
224 self.wakeup.wait(&self.loop, wakeup_c, Server, self, Server.onWakeup);
225
226 // Start the rehash timer. This is a timer to rehash our hashmaps
227 const rehash_c = try self.completion_pool.create(self.gpa);
228 self.garbage_collect_timer.run(&self.loop, rehash_c, garbage_collect_ms, Server, self, Server.onGarbageCollect);
229
230 try self.tcp.bind(addr);
231 try self.tcp.listen(256);
232
233 // get the bound port
234 var sock_len = self.address.getOsSockLen();
235 try std.posix.getsockname(self.tcp.fd, &self.address.any, &sock_len);
236
237 // Load initial db data
238 try db.loadChannels(self);
239 try db.loadUsers(self);
240 try db.loadChannelMembership(self);
241
242 log.info("{d} users in {d} channels", .{ self.nick_map.count(), self.channels.count() });
243}
244
245fn onGarbageCollect(
246 ud: ?*Server,
247 _: *xev.Loop,
248 c: *xev.Completion,
249 r: xev.Timer.RunError!void,
250) xev.CallbackAction {
251 const self = ud.?;
252 defer self.gc_cycle +|= 1;
253 _ = r catch |err| {
254 log.err("timer error: {}", .{err});
255 };
256
257 self.garbage_collect_timer.run(
258 &self.loop,
259 c,
260 garbage_collect_ms,
261 Server,
262 self,
263 Server.onGarbageCollect,
264 );
265
266 // Close unauthenticated connections. Every cycle we do this
267 {
268 var iter = self.connections.iterator();
269 const now = std.time.timestamp();
270 while (iter.next()) |entry| {
271 const conn = entry.value_ptr.*;
272 if (conn.isAuthenticated()) continue;
273 // If it's been more than 60 seconds since this connection connected and it isn't
274 // authenticated, we close it
275 if (conn.connected_at + 60 < now) {
276 log.debug("closing unauthenticated connection: {d}", .{conn.client.fd});
277 // Cancel the read completion. One cancelled, we close and clean up the
278 // connection
279 const close_c = self.completion_pool.create(self.gpa) catch {
280 @panic("Out of memory");
281 };
282 self.loop.cancel(conn.read_c, close_c, Server, self, Server.onCancel);
283 }
284 }
285 }
286
287 // Increment this in each GC block so we stagger the cycles
288 var stagger: u8 = 0;
289
290 if ((self.gc_cycle + stagger) % 10 == 0) {
291 stagger += 1;
292 // Clean up connections hash map. Every 10th cycle
293 connections: {
294 const keys = self.gpa.dupe(xev.TCP, self.connections.keys()) catch break :connections;
295 defer self.gpa.free(keys);
296 const values = self.gpa.dupe(*Connection, self.connections.values()) catch break :connections;
297 defer self.gpa.free(values);
298 self.connections.shrinkAndFree(self.gpa, keys.len);
299 self.connections.reinit(self.gpa, keys, values) catch break :connections;
300 }
301 }
302
303 if ((self.gc_cycle + stagger) % 10 == 0) {
304 stagger += 1;
305 // Clean up pending auth list. We shrink it to the size of items it has (effecitvely
306 // clearing it's capacity)
307 const len = self.pending_auth.items.len;
308 self.pending_auth.shrinkAndFree(self.gpa, len);
309 }
310
311 // TODO: GC completion memory pool
312 // TODO: GC Event stream hash map
313 // TODO: rehash all hashmaps
314
315 return .disarm;
316}
317
318pub fn deinit(self: *Server) void {
319 log.info("shutting down", .{});
320 {
321 var iter = self.connections.iterator();
322 while (iter.next()) |entry| {
323 const tcp = entry.key_ptr.*;
324 std.posix.close(tcp.fd);
325 const conn = entry.value_ptr.*;
326 conn.deinit(self.gpa);
327 self.gpa.destroy(conn);
328 }
329 }
330 {
331 var iter = self.event_streams.iterator();
332 while (iter.next()) |entry| {
333 const tcp = entry.key_ptr.*;
334 std.posix.close(tcp.fd);
335 const es = entry.value_ptr.*;
336 es.write_buf.deinit(self.gpa);
337 self.gpa.destroy(es);
338 }
339 }
340 self.wakeup_queue.lock();
341 defer self.wakeup_queue.unlock();
342 while (self.wakeup_queue.drain()) |result| {
343 switch (result) {
344 .event_stream => |v| self.gpa.destroy(v),
345 inline else => |v| v.arena.deinit(),
346 }
347 }
348 for (self.nick_map.values()) |v| {
349 v.deinit(self.gpa);
350 self.gpa.destroy(v);
351 }
352 self.nick_map.deinit(self.gpa);
353 self.connections.deinit(self.gpa);
354 self.event_streams.deinit(self.gpa);
355 self.completion_pool.deinit(self.gpa);
356 self.loop.deinit();
357 self.http_client.deinit();
358 if (self.http_server_thread) |thread| {
359 // We have an http server. Clean it up
360 self.httpz_server.stop();
361 self.httpz_server.deinit();
362 thread.join();
363 }
364 self.thread_pool.deinit();
365
366 // Do a couple last minute pragmas
367 {
368 const conn = self.db_pool.acquire();
369 defer self.db_pool.release(conn);
370 conn.execNoArgs("PRAGMA analysis_limit = 400") catch {};
371 conn.execNoArgs("PRAGMA optimize") catch {};
372 }
373 self.db_pool.deinit();
374}
375
376// Runs while value is true
377fn runUntil(self: *Server, value: *const std.atomic.Value(bool), wg: *std.Thread.WaitGroup) !void {
378 wg.finish();
379 while (value.load(.unordered)) {
380 try self.loop.run(.once);
381 }
382}
383
384fn onWakeup(
385 ud: ?*Server,
386 _: *xev.Loop,
387 _: *xev.Completion,
388 r: xev.Async.WaitError!void,
389) xev.CallbackAction {
390 const self = ud.?;
391 _ = r catch |err| {
392 log.err("wait error: {}", .{err});
393 };
394
395 // Drain anything that may have woken us up
396 self.wakeup_queue.lock();
397 defer self.wakeup_queue.unlock();
398 while (self.wakeup_queue.drain()) |result| {
399 switch (result) {
400 .auth_success => |v| {
401 defer v.arena.deinit();
402 self.onSuccessfulAuth(
403 v.fd,
404 v.nick,
405 v.user,
406 v.realname,
407 v.avatar_url,
408 ) catch |err| {
409 log.err("could finish auth: {}", .{err});
410 const conn = self.connections.get(v.fd) orelse continue;
411 self.errSaslFail(conn, "failed to finish authentication") catch {};
412 };
413 },
414 .auth_failure => |v| {
415 defer v.arena.deinit();
416 log.debug("auth response={s}", .{v.msg});
417 const conn = self.connections.get(v.fd) orelse continue;
418 self.errSaslFail(conn, v.msg) catch {};
419 },
420 .history_batch => |v| {
421 defer v.arena.deinit();
422 const conn = self.connections.get(v.fd) orelse continue;
423 const batch_id = self.next_batch;
424 self.next_batch +|= 1;
425 conn.print(
426 self.gpa,
427 ":{s} BATCH +{d} chathistory {s}\r\n",
428 .{ self.hostname, batch_id, v.target },
429 ) catch @panic("TODO");
430 for (v.items) |msg| {
431 conn.print(
432 self.gpa,
433 "@time={};batch={d};msgid={s} :{s} {s}\r\n",
434 .{
435 msg.timestamp,
436 batch_id,
437 msg.uuid,
438 msg.sender,
439 msg.message,
440 },
441 ) catch @panic("TODO");
442 }
443 conn.print(
444 self.gpa,
445 ":{s} BATCH -{d} chathistory {s}\r\n",
446 .{ self.hostname, batch_id, v.target },
447 ) catch @panic("TODO");
448
449 self.queueWrite(conn.client, conn) catch {};
450 },
451 .history_targets => |v| {
452 defer v.arena.deinit();
453 const conn = self.connections.get(v.fd) orelse continue;
454 const batch_id = self.next_batch;
455 self.next_batch +|= 1;
456 conn.print(
457 self.gpa,
458 ":{s} BATCH +{d} draft/chathistory-targets\r\n",
459 .{ self.hostname, batch_id },
460 ) catch @panic("TODO");
461 for (v.items) |target| {
462 conn.print(
463 self.gpa,
464 "@batch={d} CHATHISTORY TARGETS {s} {s}\r\n",
465 .{
466 batch_id,
467 target.nick_or_channel,
468 target.latest_timestamp,
469 },
470 ) catch @panic("TODO");
471 }
472 conn.print(
473 self.gpa,
474 ":{s} BATCH -{d} draft/chathistory-targets\r\n",
475 .{ self.hostname, batch_id },
476 ) catch @panic("TODO");
477
478 self.queueWrite(conn.client, conn) catch {};
479 },
480 .mark_read => |v| {
481 defer v.arena.deinit();
482 const c = self.connections.get(v.fd) orelse continue;
483 const user = c.user orelse continue;
484 // We report the markread to all connections
485 for (user.connections.items) |conn| {
486 if (v.timestamp) |timestamp| {
487 conn.print(
488 self.gpa,
489 ":{s} MARKREAD {s} timestamp={s}\r\n",
490 .{ self.hostname, v.target, timestamp },
491 ) catch @panic("TODO");
492 } else {
493 conn.print(
494 self.gpa,
495 ":{s} MARKREAD {s} *\r\n",
496 .{ self.hostname, v.target },
497 ) catch @panic("TODO");
498 }
499 }
500 },
501 .event_stream => |v| {
502 self.event_streams.put(self.gpa, v.stream, v) catch @panic("OOM");
503 v.channel.streams.append(self.gpa, v) catch @panic("OOM");
504 },
505 }
506 }
507 return .rearm;
508}
509
510fn onSuccessfulAuth(
511 self: *Server,
512 fd: xev.TCP,
513 nick: []const u8,
514 username: []const u8,
515 realname: []const u8,
516 avatar_url: []const u8,
517) Allocator.Error!void {
518 const conn = self.connections.get(fd) orelse {
519 log.warn("connection not found: fd={d}", .{fd.fd});
520 return;
521 };
522
523 // If we don't have a user in the map, we will create one and insert it
524 if (!self.nick_map.contains(nick)) {
525 const user = try self.gpa.create(User);
526 user.* = .init();
527 user.real = try self.gpa.dupe(u8, realname);
528 user.username = try self.gpa.dupe(u8, username);
529 user.avatar_url = try self.gpa.dupe(u8, avatar_url);
530 user.nick = try self.gpa.dupe(u8, nick);
531 try self.nick_map.put(self.gpa, nick, user);
532 }
533 const user = self.nick_map.get(nick).?;
534
535 // Store or update the user in the db. We do this in a worker thread
536 try self.thread_pool.spawn(db.storeUser, .{ self.db_pool, user });
537 try user.connections.append(self.gpa, conn);
538 conn.user = user;
539
540 try conn.print(
541 self.gpa,
542 ":{s} 900 {s} {s}!{s}@{s} {s} :You are now logged in\r\n",
543 .{
544 self.hostname,
545 user.nick,
546 user.nick,
547 user.username,
548 self.hostname,
549 user.nick,
550 },
551 );
552 try conn.print(self.gpa, ":{s} 903 {s} :SASL successful\r\n", .{ self.hostname, user.nick });
553 // RPL_WELCOME
554 try conn.print(self.gpa, ":{s} 001 {s} :Good Apollo, I'm burning Star IV!\r\n", .{ self.hostname, user.nick });
555 // RPL_YOURHOST
556 try conn.print(self.gpa, ":{s} 002 {s} :Your host is {s}\r\n", .{ self.hostname, user.nick, self.hostname });
557 // RPL_CREATED
558 try conn.print(self.gpa, ":{s} 003 {s} :This server exists\r\n", .{ self.hostname, user.nick });
559 // RPL_MYINFO
560 // TODO: include any user or channel modes?
561 try conn.print(self.gpa, ":{s} 004 {s} apollo v0.0.0 \r\n", .{ self.hostname, user.nick });
562 // ISUPPORT
563 try conn.print(
564 self.gpa,
565 ":{s} 005 {s} WHOX CHATHISTORY={d} MSGREFTYPES=timestamp PREFIX=(o)@ :are supported\r\n",
566 .{ self.hostname, user.nick, max_chathistory },
567 );
568
569 // MOTD. Some clients check for these, so we need to send them unilaterally (eg goguma)
570 try conn.print(self.gpa, ":{s} 375 {s} :Message of the day -\r\n", .{ self.hostname, user.nick });
571 try conn.print(self.gpa, ":{s} 376 {s} :End of Message of the day -\r\n", .{ self.hostname, user.nick });
572
573 // If this is the only connection the user has, we notify all channels they are a member of
574 // that they are back
575 if (user.connections.items.len == 1) {
576 for (user.channels.items) |chan| {
577 try chan.notifyBack(self, user);
578 }
579 }
580
581 // Send a join to the user for all of their channels
582 for (user.channels.items) |chan| {
583 var buf: [128]u8 = undefined;
584 const m = std.fmt.bufPrint(&buf, "JOIN {s}", .{chan.name}) catch unreachable;
585 try self.handleJoin(conn, .init(m));
586 }
587
588 // If the client isn't part of any channels, we'll force them into #apollo
589 if (user.channels.items.len == 0) {
590 try self.handleJoin(conn, .init("JOIN #apollo"));
591 }
592
593 try self.queueWrite(conn.client, conn);
594}
595
596/// xev callback when a connection occurs
597fn onAccept(
598 ud: ?*Server,
599 loop: *xev.Loop,
600 _: *xev.Completion,
601 result: xev.AcceptError!xev.TCP,
602) xev.CallbackAction {
603 const self = ud.?;
604
605 // Accept the connection
606 const client = result catch |err| {
607 log.err("accept error: {}", .{err});
608 return .rearm;
609 };
610
611 self.accept(loop, client) catch |err| {
612 log.err("couldn't accept connection: fd={d}", .{client.fd});
613 switch (err) {
614 error.OutOfMemory => return .disarm,
615 }
616 };
617
618 return .rearm;
619}
620
621// Initializes the connection
622fn accept(self: *Server, loop: *xev.Loop, client: xev.TCP) Allocator.Error!void {
623 log.debug("accepted connection: fd={d}", .{client.fd});
624 const completion = try self.completion_pool.create(self.gpa);
625 const conn = try self.gpa.create(Connection);
626 conn.init(client, completion);
627
628 try self.connections.put(self.gpa, client, conn);
629
630 client.read(
631 loop,
632 completion,
633 .{ .slice = &conn.read_buf },
634 Server,
635 self,
636 Server.onRead,
637 );
638}
639
640fn handleClientDisconnect(self: *Server, client: xev.TCP) void {
641 log.info("client disconnected: fd={d}", .{client.fd});
642
643 const conn = self.connections.get(client) orelse {
644 log.warn("connection not found: fd={d}", .{client.fd});
645 return;
646 };
647
648 // Remove this connection from any pending auth state
649 for (self.pending_auth.items, 0..) |v, i| {
650 if (v.conn == conn) {
651 _ = self.pending_auth.swapRemove(i);
652 }
653 }
654
655 if (conn.user) |user| {
656 // Remove this connection from the nick_map
657 if (self.nick_map.get(user.nick)) |v| {
658 for (v.connections.items, 0..) |c, i| {
659 if (c == conn) {
660 _ = v.connections.swapRemove(i);
661 break;
662 }
663 }
664
665 // No more connections
666 if (v.connections.items.len == 0) {
667 for (v.channels.items) |chan| {
668 chan.notifyAway(self, v) catch {};
669 }
670 }
671 }
672 }
673 conn.deinit(self.gpa);
674 _ = self.connections.swapRemove(client);
675 self.gpa.destroy(conn);
676}
677
678/// queues a write of the pending buffer. If there is nothing to queue, this is a noop
679pub fn queueWrite(self: *Server, tcp: xev.TCP, conn: *Connection) Allocator.Error!void {
680 if (conn.write_buf.items.len == 0) return;
681 const buf = try conn.write_buf.toOwnedSlice(self.gpa);
682 const write_c = try self.completion_pool.create(self.gpa);
683 tcp.write(
684 &self.loop,
685 write_c,
686 .{ .slice = buf },
687 Server,
688 self,
689 Server.onWrite,
690 );
691}
692
693pub fn queueWriteEventStream(self: *Server, stream: *http.EventStream) Allocator.Error!void {
694 if (stream.write_buf.items.len == 0) return;
695 const buf = try stream.write_buf.toOwnedSlice(self.gpa);
696 const tcp: xev.TCP = .{ .fd = stream.stream.fd };
697 tcp.write(
698 &self.loop,
699 &stream.write_c,
700 .{ .slice = buf },
701 Server,
702 self,
703 Server.onEventStreamWrite,
704 );
705}
706
707fn onWrite(
708 ud: ?*Server,
709 _: *xev.Loop,
710 c: *xev.Completion,
711 tcp: xev.TCP,
712 wb: xev.WriteBuffer,
713 result: xev.WriteError!usize,
714) xev.CallbackAction {
715 const self = ud.?;
716 self.completion_pool.destroy(c);
717 defer self.gpa.free(wb.slice);
718
719 const n = result catch |err| {
720 log.err("write error: {}", .{err});
721 return .disarm;
722 };
723 log.debug("write: {s}", .{std.mem.trimRight(u8, wb.slice[0..n], "\r\n")});
724
725 const conn = self.connections.get(tcp) orelse {
726 log.err("connection not found: {d}", .{tcp.fd});
727 return .disarm;
728 };
729
730 // Incomplete write. Insert the unwritten portion at the front of the list and we'll requeue
731 if (n < wb.slice.len) {
732 conn.write_buf.insertSlice(self.gpa, 0, wb.slice[n..]) catch |err| {
733 log.err("couldn't insert unwritten bytes: {}", .{err});
734 return .disarm;
735 };
736 }
737 self.queueWrite(tcp, conn) catch {};
738 return .disarm;
739}
740
741fn onEventStreamWrite(
742 ud: ?*Server,
743 _: *xev.Loop,
744 _: *xev.Completion,
745 tcp: xev.TCP,
746 wb: xev.WriteBuffer,
747 result: xev.WriteError!usize,
748) xev.CallbackAction {
749 const self = ud.?;
750 defer self.gpa.free(wb.slice);
751
752 const n = result catch |err| {
753 log.warn("Event stream error, probably closed: {}", .{err});
754 // Clean up the stream. We remove it from the Server streams, and the channel streams
755 const kv = self.event_streams.fetchSwapRemove(tcp) orelse return .disarm;
756 const es = kv.value;
757 const channel = es.channel;
758 for (channel.streams.items, 0..) |item, i| {
759 if (item == es) {
760 _ = channel.streams.swapRemove(i);
761 break;
762 }
763 }
764 std.posix.close(es.stream.fd);
765 es.write_buf.deinit(self.gpa);
766 self.gpa.destroy(es);
767 return .disarm;
768 };
769
770 log.debug("event stream write: {s}", .{std.mem.trimRight(u8, wb.slice[0..n], "\r\n")});
771
772 const es = self.event_streams.get(tcp) orelse {
773 log.err("event_stream not found: {d}", .{tcp.fd});
774 std.posix.close(tcp.fd);
775 return .disarm;
776 };
777
778 // Incomplete write. Insert the unwritten portion at the front of the list and we'll requeue
779 if (n < wb.slice.len) {
780 es.write_buf.insertSlice(self.gpa, 0, wb.slice[n..]) catch |err| {
781 log.err("couldn't insert unwritten bytes: {}", .{err});
782 return .disarm;
783 };
784 }
785 self.queueWriteEventStream(es) catch {};
786 return .disarm;
787}
788
789/// queues a write of the pending buffer. On completion, closes the conenction
790fn queueFinalWrite(self: *Server, tcp: xev.TCP, conn: *Connection) Allocator.Error!void {
791 if (conn.write_buf.items.len == 0) {
792 // client disconnected. Close the fd
793 const write_c = try self.completion_pool.create(self.gpa);
794 conn.client.close(&self.loop, write_c, Server, self, Server.onClose);
795 return;
796 }
797 const buf = try conn.write_buf.toOwnedSlice(self.gpa);
798 const write_c = try self.completion_pool.create(self.gpa);
799 tcp.write(
800 &self.loop,
801 write_c,
802 .{ .slice = buf },
803 Server,
804 self,
805 Server.onFinalWrite,
806 );
807}
808
809fn onFinalWrite(
810 ud: ?*Server,
811 _: *xev.Loop,
812 c: *xev.Completion,
813 tcp: xev.TCP,
814 wb: xev.WriteBuffer,
815 result: xev.WriteError!usize,
816) xev.CallbackAction {
817 const self = ud.?;
818 self.completion_pool.destroy(c);
819 defer self.gpa.free(wb.slice);
820
821 const n = result catch |err| {
822 log.err("write error: {}", .{err});
823 return .disarm;
824 };
825 log.debug("write: {s}", .{std.mem.trimRight(u8, wb.slice[0..n], "\r\n")});
826
827 const conn = self.connections.get(tcp) orelse {
828 log.err("connection not found: {d}", .{tcp.fd});
829 return .disarm;
830 };
831
832 // Incomplete write. Insert the unwritten portion at the front of the list and we'll requeue
833 if (n < wb.slice.len) {
834 conn.write_buf.insertSlice(self.gpa, 0, wb.slice[n..]) catch |err| {
835 log.err("couldn't insert unwritten bytes: {}", .{err});
836 return .disarm;
837 };
838 }
839 self.queueFinalWrite(tcp, conn) catch {};
840 return .disarm;
841}
842
843fn onCancel(
844 ud: ?*Server,
845 _: *xev.Loop,
846 c: *xev.Completion,
847 result: xev.CancelError!void,
848) xev.CallbackAction {
849 log.debug("cancelled completion {x}", .{@intFromPtr(c)});
850 const self = ud.?;
851 self.completion_pool.destroy(c);
852 _ = result catch |err| {
853 log.err("close error: {}", .{err});
854 };
855 return .disarm;
856}
857
858fn onClose(
859 ud: ?*Server,
860 _: *xev.Loop,
861 c: *xev.Completion,
862 client: xev.TCP,
863 result: xev.CloseError!void,
864) xev.CallbackAction {
865 const self = ud.?;
866 _ = result catch |err| {
867 log.err("close error: {}", .{err});
868 };
869 self.handleClientDisconnect(client);
870 self.completion_pool.destroy(c);
871 return .disarm;
872}
873
874fn onRead(
875 ud: ?*Server,
876 loop: *xev.Loop,
877 c: *xev.Completion,
878 client: xev.TCP,
879 rb: xev.ReadBuffer,
880 result: xev.ReadError!usize,
881) xev.CallbackAction {
882 const self = ud.?;
883
884 // Get the read result
885 const n = result catch |err| {
886 switch (err) {
887 error.Canceled => log.info("read canceled: fd={d}", .{client.fd}),
888 error.EOF => log.info("client eof: fd={d}", .{client.fd}),
889 else => log.err("read error: {}", .{err}),
890 }
891 // client disconnected. Close the fd
892 client.close(loop, c, Server, self, Server.onClose);
893 return .disarm;
894 };
895
896 // Handle a disconnected client
897 if (n == 0) {
898 client.close(loop, c, Server, self, Server.onClose);
899 return .disarm;
900 }
901
902 // Get the client
903 const conn = self.connections.get(client) orelse {
904 log.warn("client not found: fd={d}", .{client.fd});
905 client.close(loop, c, Server, self, Server.onClose);
906 return .disarm;
907 };
908
909 // Process the newly read bytes
910 self.processMessages(conn, rb.slice[0..n]) catch |err| {
911 log.err("couldn't process message: fd={d}", .{client.fd});
912 switch (err) {
913 error.OutOfMemory => return .disarm,
914 error.ClientQuit => {
915 self.queueFinalWrite(client, conn) catch {
916 // On error, we just close the fd
917 client.close(loop, c, Server, self, Server.onClose);
918 };
919 return .disarm;
920 },
921 }
922 };
923
924 self.queueWrite(client, conn) catch |err| {
925 log.err("couldn't queue write: fd={d}", .{client.fd});
926 switch (err) {
927 error.OutOfMemory => return .disarm,
928 }
929 };
930 return .rearm;
931}
932
933fn processMessages(self: *Server, conn: *Connection, bytes: []const u8) ProcessMessageError!void {
934 const buf: []const u8 =
935 // If we have no queue, and this message is complete, we can process without allocating
936 if (conn.read_queue.items.len == 0 and std.mem.endsWith(u8, bytes, "\n"))
937 bytes
938 else blk: {
939 try conn.read_queue.appendSlice(self.gpa, bytes);
940 break :blk conn.read_queue.items;
941 };
942
943 var iter: MessageIterator = .{ .bytes = buf };
944 while (iter.next()) |raw| {
945 log.debug("read: {s}", .{raw});
946 const msg: Message = .init(raw);
947 try self.handleMessage(conn, msg);
948 }
949
950 // if our read_queue is empty, we are done
951 if (conn.read_queue.items.len == 0) return;
952
953 // Clean up the read_queue
954
955 // Replace the amount we read
956 conn.read_queue.replaceRangeAssumeCapacity(0, iter.bytesRead(), "");
957
958 if (conn.read_queue.items.len == 0) {
959 // If we consumed the entire thing, reclaim the memory
960 conn.read_queue.clearAndFree(self.gpa);
961 } else if (conn.read_queue.items.len > Server.max_message_len) {
962 // If we have > max_message_size bytes, we send an error
963 conn.read_queue.clearAndFree(self.gpa);
964 return self.errInputTooLong(conn);
965 }
966}
967
968fn handleMessage(self: *Server, conn: *Connection, msg: Message) ProcessMessageError!void {
969 const cmd = msg.command();
970
971 const client_msg = ClientMessage.fromString(cmd) orelse {
972 return self.errUnknownCommand(conn, cmd);
973 };
974
975 switch (client_msg) {
976 .CAP => try self.handleCap(conn, msg),
977 .NICK => try self.handleNick(conn, msg),
978 .USER => try self.handleUser(conn, msg),
979 .AUTHENTICATE => try self.handleAuthenticate(conn, msg),
980 .PASS => {},
981 .PING => try self.handlePing(conn, msg),
982 .QUIT => try self.handleQuit(conn, msg),
983 .MODE => try self.handleMode(conn, msg),
984
985 .JOIN => try self.handleJoin(conn, msg),
986 .TOPIC => try self.handleTopic(conn, msg),
987 .PART => try self.handlePart(conn, msg),
988 .NAMES => try self.handleNames(conn, msg),
989 .LIST => try self.handleList(conn, msg),
990
991 .PRIVMSG => try self.handlePrivMsg(conn, msg),
992 .TAGMSG => try self.handleTagMsg(conn, msg),
993
994 .WHO => try self.handleWho(conn, msg),
995
996 .AWAY => try self.handleAway(conn, msg),
997 .CHATHISTORY => try self.handleChathistory(conn, msg),
998 .MARKREAD => try self.handleMarkread(conn, msg),
999 else => try self.errUnknownCommand(conn, cmd),
1000 }
1001}
1002
1003fn handleCap(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1004 var iter = msg.paramIterator();
1005 const subcmd = iter.next() orelse return self.errNeedMoreParams(conn, "CAP");
1006
1007 if (std.mem.eql(u8, subcmd, "LS")) {
1008 // LS lists available capabilities
1009 // We expect a 302, but we don't actually care
1010 try conn.print(
1011 self.gpa,
1012 ":{s} CAP {s} LS :",
1013 .{ self.hostname, conn.nickname() },
1014 );
1015 for (std.meta.fieldNames(Capability), 0..) |cap, i| {
1016 if (i > 0) try conn.write(self.gpa, " ");
1017 try conn.write(self.gpa, cap);
1018 if (std.mem.eql(u8, "sasl", cap)) try conn.write(self.gpa, "=PLAIN");
1019 }
1020 try conn.write(self.gpa, "\r\n");
1021 } else if (std.mem.eql(u8, subcmd, "LIST")) {
1022 // LIST lists enabled capabilities
1023 } else if (std.mem.eql(u8, subcmd, "REQ")) {
1024 // REQ tries to enable the given capability
1025 const caps = iter.next() orelse return;
1026 var cap_iter = std.mem.splitScalar(u8, caps, ' ');
1027 while (cap_iter.next()) |cap_str| {
1028 const cap = std.meta.stringToEnum(Capability, cap_str) orelse {
1029 try conn.print(
1030 self.gpa,
1031 ":{s} CAP {s} NAK {s}\r\n",
1032 .{ self.hostname, conn.nickname(), cap_str },
1033 );
1034 continue;
1035 };
1036 try conn.enableCap(cap);
1037 try conn.print(
1038 self.gpa,
1039 ":{s} CAP {s} ACK {s}\r\n",
1040 .{ self.hostname, conn.nickname(), cap_str },
1041 );
1042 }
1043 } else if (std.mem.eql(u8, subcmd, "END")) {
1044 // END signals the end of capability negotiation. It's possible to be authenticated
1045 // already if it happened really fast
1046 } else return conn.print(
1047 self.gpa,
1048 ":{s} 410 {s} {s} :Invalid CAP command\r\n",
1049 .{ self.hostname, conn.nickname(), subcmd },
1050 );
1051}
1052
1053fn handleNick(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1054 // Silently ignore nick commands. We get this from the Github auth
1055 _ = self;
1056 _ = conn;
1057 _ = msg;
1058}
1059
1060fn handleUser(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1061 // Silently ignore user commands. We get this from the Github auth
1062 _ = self;
1063 _ = conn;
1064 _ = msg;
1065}
1066
1067fn handleAuthenticate(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1068 var iter = msg.paramIterator();
1069 const pending_mech: ?SaslMechanism = for (self.pending_auth.items, 0..) |pending_conn, i| {
1070 if (pending_conn.conn == conn) {
1071 // This connection is already pending auth. We can remove it from the list now
1072 _ = self.pending_auth.swapRemove(i);
1073 break pending_conn.mechanism;
1074 }
1075 } else null;
1076
1077 if (pending_mech == null) {
1078 // We "unauthenticate" the connection by setting it's user field to null. We do this
1079 // even for failure
1080 conn.user = null;
1081 // If we aren't pending auth, this should be AUTHENTICATE <mechanism>
1082 // We first must get AUTHENTICATE <mechanism>
1083 const mechanism = iter.next() orelse
1084 return self.errNeedMoreParams(conn, "AUTHENTICATE");
1085 if (std.ascii.eqlIgnoreCase("PLAIN", mechanism)) {
1086 const pending: PendingAuth = .{ .mechanism = .plain, .conn = conn };
1087 try self.pending_auth.append(self.gpa, pending);
1088 return conn.write(self.gpa, "AUTHENTICATE +\r\n");
1089 }
1090 return conn.print(
1091 self.gpa,
1092 ":{s} 908 {s} PLAIN :are available SASL mechanisms\r\n",
1093 .{ self.hostname, conn.nickname() },
1094 );
1095 }
1096
1097 // This is our username + password
1098 const str = iter.next() orelse return self.errNeedMoreParams(conn, "AUTHENTICATE");
1099
1100 var buf: [1024]u8 = undefined;
1101 const Decoder = std.base64.standard.Decoder;
1102 const len = Decoder.calcSizeForSlice(str) catch |err| {
1103 switch (err) {
1104 error.InvalidPadding => return self.errSaslFail(conn, "invalid base64 encoding"),
1105 else => unreachable,
1106 }
1107 };
1108 const decode_buf = buf[0..len];
1109 std.base64.standard.Decoder.decode(decode_buf, str) catch |err| {
1110 switch (err) {
1111 error.InvalidPadding => return self.errSaslFail(conn, "invalid base64 encoding"),
1112 error.InvalidCharacter => return self.errSaslFail(conn, "invalid base64 encoding"),
1113 error.NoSpaceLeft => return self.errSaslFail(conn, "auth too long"),
1114 }
1115 };
1116
1117 var sasl_iter = std.mem.splitScalar(u8, decode_buf, 0x00);
1118
1119 // Authorized as is the identity we will act as. We generally ignore this
1120 const authorized_as = sasl_iter.next() orelse
1121 return self.errSaslFail(conn, "invalid SASL message");
1122 _ = authorized_as;
1123
1124 // Authenticate as is the identity that belongs to the password
1125 const authenticate_as = sasl_iter.next() orelse
1126 return self.errSaslFail(conn, "invalid SASL message");
1127
1128 // The password
1129 const password = sasl_iter.next() orelse
1130 return self.errSaslFail(conn, "invalid SASL message");
1131
1132 const arena: HeapArena = try .init(self.gpa);
1133 errdefer arena.deinit();
1134 switch (self.auth) {
1135 .none => {
1136 self.wakeup_queue.push(.{
1137 .auth_success = .{
1138 .arena = arena,
1139 .fd = conn.client,
1140 .nick = try arena.allocator().dupe(u8, authenticate_as),
1141 .user = try arena.allocator().dupe(u8, authenticate_as),
1142 .realname = try arena.allocator().dupe(u8, authenticate_as),
1143 .avatar_url = "",
1144 },
1145 });
1146 },
1147 .github => {
1148 const auth_header = try std.fmt.allocPrint(
1149 arena.allocator(),
1150 "Bearer {s}",
1151 .{password},
1152 );
1153 try self.thread_pool.spawn(github.authenticate, .{
1154 arena,
1155 &self.http_client,
1156 &self.wakeup_queue,
1157 conn.client,
1158 auth_header,
1159 });
1160 },
1161 .atproto => {
1162 const handle = try arena.allocator().dupe(u8, authenticate_as);
1163 const dupe_pass = try arena.allocator().dupe(u8, password);
1164 try self.thread_pool.spawn(
1165 atproto.authenticateConnection,
1166 .{
1167 arena,
1168 &self.http_client,
1169 &self.wakeup_queue,
1170 self.db_pool,
1171 conn.client,
1172 handle,
1173 dupe_pass,
1174 },
1175 );
1176 },
1177 }
1178}
1179
1180fn handlePing(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1181 try conn.print(
1182 self.gpa,
1183 ":{s} PONG {s} :{s}\r\n",
1184 .{ self.hostname, self.hostname, msg.rawParameters() },
1185 );
1186}
1187
1188fn handleQuit(self: *Server, conn: *Connection, msg: Message) error{ClientQuit}!void {
1189 _ = msg;
1190 conn.print(self.gpa, ":{s} ERROR :Client quit\r\n", .{self.hostname}) catch {};
1191 return error.ClientQuit;
1192}
1193
1194fn handleMode(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1195 var iter = msg.paramIterator();
1196 const target = iter.next() orelse return self.errNeedMoreParams(conn, "MODE");
1197 if (target.len == 0) return self.errNeedMoreParams(conn, "MODE");
1198
1199 if (target[0] != '#') {
1200 // User MODE
1201 const source = conn.user orelse
1202 return self.errUnknownError(conn, "MODE", "must be authenticated for MODE");
1203 _ = source;
1204 // TODO: implement this
1205 return;
1206 }
1207
1208 // Target is a channel
1209 const channel = self.channels.get(target) orelse {
1210 return self.errNoSuchChannel(conn, target);
1211 };
1212
1213 const modestring = iter.next() orelse {
1214 // TODO: send the channel mode. We don't have any right now
1215 return;
1216 };
1217
1218 // If we have a modestring, we also have to be authenticated
1219 const user = conn.user orelse {
1220 return self.errChanOpPrivsNeeded(conn, channel.name);
1221 };
1222
1223 const privs = channel.getPrivileges(user);
1224
1225 if (!user.modes.operator and !privs.operator) {
1226 // User either needs to be global ops or chanops
1227 return self.errChanOpPrivsNeeded(conn, channel.name);
1228 }
1229
1230 // We have the right privileges. Get the arguments (we should have one, a nickname)
1231 const arg = iter.next() orelse return self.errNeedMoreParams(conn, "MODE");
1232
1233 // Validate the argument
1234 if (arg.len == 0) return self.errNeedMoreParams(conn, "MODE");
1235 if (arg[0] == '#') return self.errUnknownError(conn, "MODE", "argument cannot be a channel");
1236
1237 // Get the target user we are modifying the mode of
1238 const target_user = self.nick_map.get(arg) orelse {
1239 return self.errNoSuchNick(conn, arg);
1240 };
1241
1242 // Get the target_users current privileges
1243 var target_privs = channel.getPrivileges(target_user);
1244
1245 // Parse and apply the privileges
1246 const State = enum { none, add, remove };
1247 var state: State = .none;
1248 for (modestring) |b| {
1249 switch (b) {
1250 '+' => state = .add,
1251 '-' => state = .remove,
1252 'o' => {
1253 switch (state) {
1254 .add => target_privs.operator = true,
1255 .remove => target_privs.operator = false,
1256 .none => {},
1257 }
1258 },
1259 else => log.warn("unsupported mode byte: {c}", .{b}),
1260 }
1261 }
1262
1263 // Update the state in mem and db
1264 return channel.storePrivileges(self, target_user, target_privs);
1265}
1266
1267fn handlePrivMsg(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1268 const source = conn.user orelse
1269 return self.errUnknownError(conn, "PRIVMSG", "cannot PRIVMSG before authentication");
1270
1271 var iter = msg.paramIterator();
1272 const target = iter.next() orelse return self.errNoRecipient(conn);
1273 const text = iter.next() orelse return self.errNoTextToSend(conn);
1274
1275 if (target.len == 0) return self.errNoRecipient(conn);
1276 switch (target[0]) {
1277 '#' => {
1278 const channel = self.channels.get(target) orelse {
1279 return self.errNoSuchChannel(conn, target);
1280 };
1281
1282 {
1283 // store the message
1284 const arena: HeapArena = try .init(self.gpa);
1285 errdefer arena.deinit();
1286 const sender_nick = try arena.allocator().dupe(u8, source.nick);
1287 const target_name = try arena.allocator().dupe(u8, target);
1288 const msg_dupe = try msg.copy(arena.allocator());
1289 try self.thread_pool.spawn(
1290 db.storeChannelMessage,
1291 .{ arena, self.db_pool, sender_nick, target_name, msg_dupe },
1292 );
1293 }
1294
1295 // Send message to any http clients.
1296 try channel.sendPrivMsgToStreams(self, source, msg);
1297
1298 for (channel.members.items) |m| {
1299 const u = m.user;
1300 for (u.connections.items) |c| {
1301 if (c.caps.@"server-time" or c.caps.@"message-tags") {
1302 const urn = uuid.urn.serialize(msg.uuid);
1303 try c.print(
1304 self.gpa,
1305 "@time={};msgid={s} ",
1306 .{ msg.timestamp, &urn },
1307 );
1308 }
1309
1310 // If this is our account, we only send if we have echo-message enabled
1311 if (u == source and !c.caps.@"echo-message") continue;
1312
1313 try c.print(self.gpa, ":{s} PRIVMSG {s} :{s}\r\n", .{ source.nick, target, text });
1314 try self.queueWrite(c.client, c);
1315 }
1316 }
1317 },
1318 else => {
1319 // Get the connections for this nick
1320 const user = self.nick_map.get(target) orelse {
1321 return self.errNoSuchNick(conn, target);
1322 };
1323
1324 {
1325 // store the message
1326 const arena: HeapArena = try .init(self.gpa);
1327 errdefer arena.deinit();
1328 const sender_nick = try arena.allocator().dupe(u8, source.nick);
1329 const target_name = try arena.allocator().dupe(u8, target);
1330 const msg_dupe = try msg.copy(arena.allocator());
1331 try self.thread_pool.spawn(
1332 db.storePrivateMessage,
1333 .{ arena, self.db_pool, sender_nick, target_name, msg_dupe },
1334 );
1335 }
1336
1337 for (user.connections.items) |c| {
1338 if (c.caps.@"server-time" or c.caps.@"message-tags") {
1339 const urn = uuid.urn.serialize(msg.uuid);
1340 try c.print(
1341 self.gpa,
1342 "@time={};msgid={s} ",
1343 .{ msg.timestamp, &urn },
1344 );
1345 }
1346
1347 try c.print(self.gpa, ":{s} PRIVMSG {s} :{s}\r\n", .{ source.nick, target, text });
1348 try self.queueWrite(c.client, c);
1349 }
1350
1351 for (source.connections.items) |c| {
1352 if (!c.caps.@"echo-message") continue;
1353 if (c.caps.@"server-time" or c.caps.@"message-tags") {
1354 const urn = uuid.urn.serialize(msg.uuid);
1355 try c.print(
1356 self.gpa,
1357 "@time={};msgid={s} ",
1358 .{ msg.timestamp, &urn },
1359 );
1360 }
1361
1362 try c.print(self.gpa, ":{s} PRIVMSG {s} :{s}\r\n", .{ source.nick, target, text });
1363 try self.queueWrite(c.client, c);
1364 }
1365 },
1366 }
1367}
1368
1369fn handleTagMsg(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1370 const source = conn.user orelse
1371 return self.errUnknownError(conn, "TAGMSG", "cannot TAGMSG before authentication");
1372 // TODO: store the message in database
1373 var iter = msg.paramIterator();
1374 const target = iter.next() orelse return self.errNoRecipient(conn);
1375
1376 if (target.len == 0) return self.errNoRecipient(conn);
1377 switch (target[0]) {
1378 '#' => {
1379 const channel = self.channels.get(target) orelse {
1380 return self.errNoSuchChannel(conn, target);
1381 };
1382 for (channel.members.items) |m| {
1383 const u = m.user;
1384 for (u.connections.items) |c| {
1385 // We don't send tag messages to connections which haven't enabled
1386 // message-tags
1387 if (!c.caps.@"message-tags") continue;
1388 if (c.caps.@"server-time") {
1389 try c.print(self.gpa, "@time={}", .{msg.timestamp});
1390 }
1391
1392 // If this is our account, we only send if we have echo-message enabled
1393 if (u == source and !c.caps.@"echo-message") continue;
1394 var tag_iter = msg.tagIterator();
1395 while (tag_iter.next()) |tag| {
1396 try c.write(self.gpa, ";");
1397 try c.write(self.gpa, tag.key);
1398 try c.write(self.gpa, "=");
1399 try c.write(self.gpa, tag.value);
1400 }
1401 try c.write(self.gpa, " ");
1402
1403 try c.print(self.gpa, ":{s} TAGMSG {s}\r\n", .{ source.nick, target });
1404 try self.queueWrite(c.client, c);
1405 }
1406 }
1407 },
1408 else => {
1409 // Get the connections for this nick
1410 const user = self.nick_map.get(target) orelse {
1411 return self.errNoSuchNick(conn, target);
1412 };
1413
1414 for (user.connections.items) |c| {
1415 // We don't send tag messages to connections which haven't enabled
1416 // message-tags
1417 if (!c.caps.@"message-tags") continue;
1418 if (c.caps.@"server-time") {
1419 try c.print(self.gpa, "@time={}", .{msg.timestamp});
1420 }
1421
1422 var tag_iter = msg.tagIterator();
1423 while (tag_iter.next()) |tag| {
1424 try c.write(self.gpa, ";");
1425 try c.write(self.gpa, tag.key);
1426 try c.write(self.gpa, "=");
1427 try c.write(self.gpa, tag.value);
1428 }
1429 try c.write(self.gpa, " ");
1430
1431 try c.print(self.gpa, ":{s} TAGMSG {s}\r\n", .{ source.nick, target });
1432 try self.queueWrite(c.client, c);
1433 }
1434
1435 for (source.connections.items) |c| {
1436 if (!c.caps.@"echo-message") continue;
1437 if (c.caps.@"server-time") {
1438 try c.print(self.gpa, "@time={}", .{msg.timestamp});
1439 }
1440
1441 var tag_iter = msg.tagIterator();
1442 while (tag_iter.next()) |tag| {
1443 try c.write(self.gpa, ";");
1444 try c.write(self.gpa, tag.key);
1445 try c.write(self.gpa, "=");
1446 try c.write(self.gpa, tag.value);
1447 }
1448 try c.write(self.gpa, " ");
1449
1450 try c.print(self.gpa, ":{s} TAGMSG {s}\r\n", .{ source.nick, target });
1451 try self.queueWrite(c.client, c);
1452 }
1453 },
1454 }
1455}
1456
1457fn handleJoin(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1458 const user = conn.user orelse
1459 return self.errUnknownError(conn, "JOIN", "cannot join before authentication");
1460 var iter = msg.paramIterator();
1461 const target = iter.next() orelse return self.errNeedMoreParams(conn, "JOIN");
1462
1463 if (target.len > 32) {
1464 return conn.print(
1465 self.gpa,
1466 ":{s} 476 {s} :Channel name is too long\r\n",
1467 .{ self.hostname, target },
1468 );
1469 }
1470
1471 if (target.len == 0) return self.errNeedMoreParams(conn, "JOIN");
1472 switch (target[0]) {
1473 '#' => {},
1474 else => return self.errNoSuchChannel(conn, target),
1475 }
1476
1477 if (!self.channels.contains(target)) {
1478 const arena: HeapArena = try .init(self.gpa);
1479 const target_dupe = try arena.allocator().dupe(u8, target);
1480 // Create the channel in the db
1481 try self.thread_pool.spawn(db.createChannel, .{ arena, self.db_pool, target_dupe });
1482
1483 const channel = try self.gpa.create(Channel);
1484 const name = try self.gpa.dupe(u8, target);
1485 channel.* = .init(name, "");
1486 try self.channels.put(self.gpa, name, channel);
1487 }
1488
1489 const channel = self.channels.get(target).?;
1490 try channel.addUser(self, user, conn);
1491
1492 // drafts/read-marker requires us to send a MARKREAD on join
1493 if (conn.caps.@"draft/read-marker") {
1494 const arena: HeapArena = try .init(self.gpa);
1495 const target2 = try arena.allocator().dupe(u8, target);
1496 const nick = try arena.allocator().dupe(u8, user.nick);
1497
1498 try self.thread_pool.spawn(db.getMarkRead, .{
1499 arena,
1500 self.db_pool,
1501 &self.wakeup_queue,
1502 conn.client,
1503 nick,
1504 target2,
1505 });
1506 }
1507}
1508
1509fn handleTopic(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1510 var iter = msg.paramIterator();
1511 const target = iter.next() orelse return self.errNeedMoreParams(conn, "TOPIC");
1512 const topic = iter.next() orelse "";
1513 if (topic.len > 0 and conn.user == null) {
1514 return self.errUnknownError(conn, "TOPIC", "cannot set topic without authentication");
1515 }
1516
1517 const channel = self.channels.get(target) orelse {
1518 return self.errNoSuchChannel(conn, target);
1519 };
1520 try channel.setTopic(self, conn, topic);
1521}
1522
1523fn handlePart(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1524 const user = conn.user orelse
1525 return self.errUnknownError(conn, "PART", "cannot part before authentication");
1526 var iter = msg.paramIterator();
1527 const target = iter.next() orelse return self.errNeedMoreParams(conn, "JOIN");
1528
1529 var chan_iter = std.mem.splitScalar(u8, target, ',');
1530 while (chan_iter.next()) |chan| {
1531 const channel = self.channels.get(chan) orelse continue;
1532 try channel.removeUser(self, user);
1533 }
1534}
1535
1536fn handleNames(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1537 const cmd = "NAMES";
1538 var iter = msg.paramIterator();
1539 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1540
1541 if (target.len == 0) return self.errNeedMoreParams(conn, cmd);
1542 switch (target[0]) {
1543 '#' => {},
1544 else => return self.errUnknownError(conn, cmd, "not a valid channel name"),
1545 }
1546
1547 const channel = self.channels.get(target) orelse {
1548 return self.errNoSuchChannel(conn, target);
1549 };
1550
1551 try channel.names(self, conn);
1552}
1553
1554fn handleList(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1555 // TODO: handle masks
1556 _ = msg;
1557 try conn.print(
1558 self.gpa,
1559 ":{s} 321 {s} Channel :Start of LIST\r\n",
1560 .{ self.hostname, conn.nickname() },
1561 );
1562
1563 for (self.channels.values()) |channel| {
1564 try conn.print(
1565 self.gpa,
1566 ":{s} 322 {s} {s} {d} :{s}\r\n",
1567 .{
1568 self.hostname,
1569 conn.nickname(),
1570 channel.name,
1571 channel.members.items.len,
1572 channel.topic,
1573 },
1574 );
1575 }
1576
1577 try conn.print(
1578 self.gpa,
1579 ":{s} 323 {s} :End of LIST\r\n",
1580 .{ self.hostname, conn.nickname() },
1581 );
1582}
1583
1584fn handleWho(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1585 const cmd = "WHO";
1586 var iter = msg.paramIterator();
1587 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1588
1589 if (target.len == 0) return self.errNeedMoreParams(conn, cmd);
1590 switch (target[0]) {
1591 '#' => {
1592 const channel = self.channels.get(target) orelse {
1593 return self.errNoSuchChannel(conn, target);
1594 };
1595
1596 try channel.who(self, conn, msg);
1597 },
1598 else => {
1599 const client: []const u8 = if (conn.user) |user| user.nick else "*";
1600 const user = self.nick_map.get(target) orelse {
1601 return self.errNoSuchNick(conn, target);
1602 };
1603 const args = iter.next() orelse "";
1604 const token = iter.next();
1605 if (args.len == 0) {
1606 try conn.print(
1607 self.gpa,
1608 ":{s} 352 {s} * {s} {s} {s} {s} {s} :0 {s}\r\n",
1609 .{
1610 self.hostname,
1611 client,
1612 user.username,
1613 self.hostname,
1614 self.hostname,
1615 user.nick,
1616 "H", // TODO: flags, now we just always say the user is H="here"
1617 user.real,
1618 },
1619 );
1620 } else {
1621 try conn.print(
1622 self.gpa,
1623 ":{s} 354 {s}",
1624 .{ self.hostname, client },
1625 );
1626
1627 // Find the index of the standard field indicator
1628 const std_idx = std.mem.indexOfScalar(u8, args, '%') orelse args.len;
1629 // TODO: any nonstandard fields
1630
1631 // Handle standard fields, in order. The order is tcuihsnfdlaor
1632 if (std.mem.indexOfScalarPos(u8, args, std_idx, 't')) |_| {
1633 if (token) |t| try conn.print(self.gpa, " {s}", .{t});
1634 }
1635 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'c')) |_| {
1636 try conn.print(self.gpa, " *", .{});
1637 }
1638 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'u')) |_| {
1639 try conn.print(self.gpa, " {s}", .{user.username});
1640 }
1641 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'i')) |_| {
1642 try conn.print(self.gpa, " {s}", .{"127.0.0.1"});
1643 }
1644 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'h')) |_| {
1645 try conn.print(self.gpa, " {s}", .{self.hostname});
1646 }
1647 if (std.mem.indexOfScalarPos(u8, args, std_idx, 's')) |_| {
1648 try conn.print(self.gpa, " {s}", .{self.hostname});
1649 }
1650 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'n')) |_| {
1651 try conn.print(self.gpa, " {s}", .{user.nick});
1652 }
1653 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'f')) |_| {
1654 // TODO: user flags
1655 try conn.print(self.gpa, " H", .{});
1656 }
1657 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'd')) |_| {
1658 try conn.write(self.gpa, " 0");
1659 }
1660 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'l')) |_| {
1661 try conn.write(self.gpa, " 0");
1662 }
1663 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'a')) |_| {
1664 try conn.print(self.gpa, " {s}", .{user.username});
1665 }
1666 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'o')) |_| {
1667 // TODO: chan op level
1668 try conn.print(self.gpa, " {s}", .{user.username});
1669 }
1670 if (std.mem.indexOfScalarPos(u8, args, std_idx, 'r')) |_| {
1671 try conn.print(self.gpa, " :{s}", .{user.real});
1672 }
1673 try conn.write(self.gpa, "\r\n");
1674 }
1675 try conn.print(
1676 self.gpa,
1677 ":{s} 315 {s} * :End of WHO list\r\n",
1678 .{ self.hostname, client },
1679 );
1680 try self.queueWrite(conn.client, conn);
1681 },
1682 }
1683}
1684
1685fn handleAway(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1686 const user = conn.user orelse
1687 return self.errUnknownError(conn, "AWAY", "cannot set AWAY without authentication");
1688
1689 var iter = msg.paramIterator();
1690 if (iter.next()) |_| {
1691 user.away = true;
1692 for (user.channels.items) |chan| {
1693 try chan.notifyAway(self, user);
1694 }
1695 for (user.connections.items) |c| {
1696 try c.print(
1697 self.gpa,
1698 ":{s} 306 {s} :You have been marked as away\r\n",
1699 .{ self.hostname, c.nickname() },
1700 );
1701 try self.queueWrite(c.client, c);
1702 }
1703 } else {
1704 user.away = false;
1705 for (user.channels.items) |chan| {
1706 try chan.notifyBack(self, user);
1707 }
1708 for (user.connections.items) |c| {
1709 try c.print(
1710 self.gpa,
1711 ":{s} 305 {s} :You are no longer marked as away\r\n",
1712 .{ self.hostname, c.nickname() },
1713 );
1714 try self.queueWrite(c.client, c);
1715 }
1716 }
1717}
1718
1719fn handleChathistory(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1720 const user = conn.user orelse return self.errUnknownError(
1721 conn,
1722 "CHATHISTORY TARGETS",
1723 "cannot CHATHISTORY without authentication",
1724 );
1725 const cmd = "CHATHISTORY";
1726 var iter = msg.paramIterator();
1727
1728 const subcmd = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1729
1730 if (std.ascii.eqlIgnoreCase("TARGETS", subcmd)) {
1731 const sub = "TARGETS";
1732 // "TARGETS <ts_one> <ts_two> <limit>". There is no requirement that ts_one < ts_two
1733 const ts_one = iter.next() orelse return self.errNeedMoreParams(conn, cmd ++ " " ++ sub);
1734 const ts_two = iter.next() orelse return self.errNeedMoreParams(conn, cmd ++ " " ++ sub);
1735 const limit = iter.next() orelse return self.errNeedMoreParams(conn, cmd ++ " " ++ sub);
1736
1737 const ts_one_str = blk: {
1738 var ts_iter = std.mem.splitScalar(u8, ts_one, '=');
1739 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param");
1740 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param");
1741 };
1742 const ts_two_str = blk: {
1743 var ts_iter = std.mem.splitScalar(u8, ts_two, '=');
1744 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param");
1745 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "invalid param");
1746 };
1747
1748 const ts_one_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_one_str } }) catch {
1749 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp");
1750 };
1751 const ts_two_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_two_str } }) catch {
1752 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp");
1753 };
1754 // NOTE: For TARGETS, we don't have an internal limit
1755 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit, 10) catch {
1756 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit");
1757 };
1758
1759 const from = if (ts_one_inst.timestamp < ts_two_inst.timestamp) ts_one_inst else ts_two_inst;
1760 const to = if (ts_one_inst.timestamp > ts_two_inst.timestamp) ts_one_inst else ts_two_inst;
1761
1762 const req: ChatHistory.TargetsRequest = .{
1763 .from = .{ .milliseconds = @intCast(from.milliTimestamp()) },
1764 .to = .{ .milliseconds = @intCast(to.milliTimestamp()) },
1765 .limit = limit_int,
1766 };
1767 const arena: HeapArena = try .init(self.gpa);
1768 const nick = try arena.allocator().dupe(u8, user.nick);
1769 // Spawn a db query
1770 try self.thread_pool.spawn(
1771 db.chathistoryTargets,
1772 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, nick, req },
1773 );
1774 return;
1775 }
1776
1777 if (std.ascii.eqlIgnoreCase("AFTER", subcmd)) {
1778 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1779 if (target.len == 0) {
1780 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid target");
1781 }
1782
1783 const ts = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1784 const ts_str = blk: {
1785 var ts_iter = std.mem.splitScalar(u8, ts, '=');
1786 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no timestamp param");
1787 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no '=' separator");
1788 };
1789 const ts_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_str } }) catch {
1790 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp");
1791 };
1792
1793 const limit_str = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1794
1795 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit_str, 10) catch {
1796 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit");
1797 };
1798 if (limit_int > max_chathistory) {
1799 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit");
1800 }
1801
1802 const arena: HeapArena = try .init(self.gpa);
1803 const req: ChatHistory.AfterRequest = .{
1804 .after_ms = .{ .milliseconds = @intCast(ts_inst.milliTimestamp()) },
1805 .limit = limit_int,
1806 .target = try arena.allocator().dupe(u8, target),
1807 };
1808 try self.thread_pool.spawn(
1809 db.chathistoryAfter,
1810 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, req },
1811 );
1812 return;
1813 }
1814
1815 if (std.ascii.eqlIgnoreCase("BEFORE", subcmd)) {
1816 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1817 if (target.len == 0) {
1818 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid target");
1819 }
1820
1821 const ts = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1822 const ts_str = blk: {
1823 var ts_iter = std.mem.splitScalar(u8, ts, '=');
1824 _ = ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no timestamp param");
1825 break :blk ts_iter.next() orelse return self.fail(conn, cmd, "INVALID_PARAMS", "no '=' separator");
1826 };
1827 const ts_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_str } }) catch {
1828 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid timestamp");
1829 };
1830
1831 const limit_str = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1832
1833 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit_str, 10) catch {
1834 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit");
1835 };
1836 if (limit_int > max_chathistory) {
1837 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit");
1838 }
1839
1840 const arena: HeapArena = try .init(self.gpa);
1841 const req: ChatHistory.BeforeRequest = .{
1842 .conn = conn,
1843 .before_ms = .{ .milliseconds = @intCast(ts_inst.milliTimestamp()) },
1844 .limit = limit_int,
1845 .target = try arena.allocator().dupe(u8, target),
1846 };
1847 try self.thread_pool.spawn(
1848 db.chathistoryBefore,
1849 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, req },
1850 );
1851 return;
1852 }
1853
1854 if (std.ascii.eqlIgnoreCase("LATEST", subcmd)) {
1855 const target = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1856 if (target.len == 0) {
1857 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid target");
1858 }
1859
1860 const restriction = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1861 // TODO: handle the restriction. This could be "*", or a timestamp, or a msgid
1862 _ = restriction;
1863
1864 const limit_str = iter.next() orelse return self.errNeedMoreParams(conn, cmd);
1865 const limit_int: u16 = std.fmt.parseUnsigned(u16, limit_str, 10) catch {
1866 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit");
1867 };
1868 if (limit_int > max_chathistory) {
1869 return self.fail(conn, cmd, "INVALID_PARAMS", "invalid limit");
1870 }
1871
1872 const arena: HeapArena = try .init(self.gpa);
1873 const req: ChatHistory.LatestRequest = .{
1874 .conn = conn,
1875 .limit = limit_int,
1876 .target = try arena.allocator().dupe(u8, target),
1877 };
1878 try self.thread_pool.spawn(
1879 db.chathistoryLatest,
1880 .{ arena, self.db_pool, &self.wakeup_queue, conn.client, req },
1881 );
1882 return;
1883 }
1884}
1885
1886fn handleMarkread(self: *Server, conn: *Connection, msg: Message) Allocator.Error!void {
1887 const user = conn.user orelse return;
1888 var iter = msg.paramIterator();
1889 const target = iter.next() orelse
1890 return self.fail(conn, "MARKREAD", "NEED_MORE_PARAMS", "Missing parameters");
1891 if (iter.next()) |timestamp| {
1892 const ts_str = blk: {
1893 var ts_iter = std.mem.splitScalar(u8, timestamp, '=');
1894 _ = ts_iter.next() orelse return self.fail(conn, "MARKREAD", "INVALID_PARAMS", "no timestamp param");
1895 break :blk ts_iter.next() orelse return self.fail(conn, "MARKREAD", "INVALID_PARAMS", "no '=' separator");
1896 };
1897 const ts_inst = zeit.instant(.{ .source = .{ .iso8601 = ts_str } }) catch {
1898 return self.fail(conn, "MARKREAD", "INVALID_PARAMS", "invalid timestamp");
1899 };
1900
1901 const arena: HeapArena = try .init(self.gpa);
1902 const target_duped = try arena.allocator().dupe(u8, target);
1903 const ts_: Timestamp = .{ .milliseconds = @intCast(ts_inst.milliTimestamp()) };
1904 try self.thread_pool.spawn(db.setMarkRead, .{
1905 arena,
1906 self.db_pool,
1907 &self.wakeup_queue,
1908 conn.client,
1909 user.nick,
1910 target_duped,
1911 ts_,
1912 });
1913 }
1914}
1915
1916/// Sends a "standard reply" FAIL
1917fn fail(
1918 self: *Server,
1919 conn: *Connection,
1920 cmd: []const u8,
1921 code: []const u8,
1922 description: []const u8,
1923) Allocator.Error!void {
1924 return self.standardReply(conn, "FAIL", cmd, code, description);
1925}
1926
1927fn standardReply(
1928 self: *Server,
1929 conn: *Connection,
1930 kind: []const u8,
1931 cmd: []const u8,
1932 code: []const u8,
1933 description: []const u8,
1934) Allocator.Error!void {
1935 try conn.print(
1936 self.gpa,
1937 ":{s} {s} {s} {s} :{s}\r\n",
1938 .{ self.hostname, kind, cmd, code, description },
1939 );
1940}
1941
1942fn errNoSuchNick(self: *Server, conn: *Connection, nick_or_chan: []const u8) Allocator.Error!void {
1943 try conn.print(
1944 self.gpa,
1945 ":{s} 401 {s} {s} :No such nick/channel\r\n",
1946 .{ self.hostname, conn.nickname(), nick_or_chan },
1947 );
1948}
1949
1950fn errNoSuchChannel(self: *Server, conn: *Connection, chan: []const u8) Allocator.Error!void {
1951 try conn.print(
1952 self.gpa,
1953 ":{s} 403 {s} {s} :No such channel\r\n",
1954 .{ self.hostname, conn.nickname(), chan },
1955 );
1956}
1957
1958fn errNoRecipient(self: *Server, conn: *Connection) Allocator.Error!void {
1959 try conn.print(
1960 self.gpa,
1961 ":{s} 411 {s} :No recipient given\r\n",
1962 .{ self.hostname, conn.nickname() },
1963 );
1964}
1965
1966fn errNoTextToSend(self: *Server, conn: *Connection) Allocator.Error!void {
1967 try conn.print(
1968 self.gpa,
1969 ":{s} 412 {s} :No text to send\r\n",
1970 .{ self.hostname, conn.nickname() },
1971 );
1972}
1973
1974fn errInputTooLong(self: *Server, conn: *Connection) Allocator.Error!void {
1975 try conn.print(
1976 self.gpa,
1977 ":{s} 417 {s} :Input too long\r\n",
1978 .{ self.hostname, conn.nickname() },
1979 );
1980}
1981
1982fn errNeedMoreParams(self: *Server, conn: *Connection, cmd: []const u8) Allocator.Error!void {
1983 try conn.print(
1984 self.gpa,
1985 ":{s} 461 {s} {s} :Not enough paramaters\r\n",
1986 .{ self.hostname, conn.nickname(), cmd },
1987 );
1988}
1989
1990fn errUnknownCommand(self: *Server, conn: *Connection, cmd: []const u8) Allocator.Error!void {
1991 log.err("unknown command: {s}", .{cmd});
1992 try conn.print(
1993 self.gpa,
1994 ":{s} 421 {s} {s} :Unknown command\r\n",
1995 .{ self.hostname, conn.nickname(), cmd },
1996 );
1997}
1998
1999pub fn errChanOpPrivsNeeded(self: *Server, conn: *Connection, channel: []const u8) Allocator.Error!void {
2000 try conn.print(
2001 self.gpa,
2002 ":{s} 482 {s} {s} :You must be a channel operator to perform that command\r\n",
2003 .{ self.hostname, conn.nickname(), channel },
2004 );
2005}
2006
2007fn errUnknownError(
2008 self: *Server,
2009 conn: *Connection,
2010 cmd: []const u8,
2011 err: []const u8,
2012) Allocator.Error!void {
2013 try conn.print(
2014 self.gpa,
2015 ":{s} 400 {s} {s} :{s}\r\n",
2016 .{ self.hostname, conn.nickname(), cmd, err },
2017 );
2018}
2019
2020fn errSaslFail(self: *Server, conn: *Connection, msg: []const u8) Allocator.Error!void {
2021 try conn.print(
2022 self.gpa,
2023 ":{s} 904 {s} :SASL authenticated failed: {s}\r\n",
2024 .{ self.hostname, conn.nickname(), msg },
2025 );
2026}
2027
2028fn webMain(
2029 self: *Server,
2030 port: u16,
2031) !void {
2032 const server = &self.httpz_server;
2033 var router = try server.router(.{});
2034 router.get("/", Http.getIndex, .{});
2035 router.get("/assets/:type/:name", Http.getAsset, .{});
2036 router.get("/channels/:channel", Http.getChannel, .{});
2037 router.get("/channels/:channel/events", Http.startChannelEventStream, .{});
2038 router.get("/channels", Http.getChannels, .{});
2039
2040 log.info("HTTP server listening on http://localhost:{d}", .{port});
2041 try self.httpz_server.listen();
2042}
2043
2044pub const Connection = struct {
2045 client: xev.TCP,
2046
2047 read_buf: [512]u8,
2048 read_queue: std.ArrayListUnmanaged(u8),
2049 read_c: *xev.Completion,
2050
2051 write_buf: std.ArrayListUnmanaged(u8),
2052
2053 /// User will always be non-null for authenticated connections
2054 user: ?*User,
2055
2056 caps: Capabilities,
2057 // Time the connection started
2058 connected_at: u32,
2059
2060 fn init(self: *Connection, client: xev.TCP, completion: *xev.Completion) void {
2061 self.* = .{
2062 .client = client,
2063
2064 .read_buf = undefined,
2065 .read_queue = .empty,
2066 .read_c = completion,
2067
2068 .write_buf = .empty,
2069
2070 .user = null,
2071
2072 .caps = .{},
2073 .connected_at = @intCast(std.time.timestamp()),
2074 };
2075 }
2076
2077 fn isAuthenticated(self: *Connection) bool {
2078 return self.user != null;
2079 }
2080
2081 pub fn nickname(self: *Connection) []const u8 {
2082 if (self.user) |user| return user.nick;
2083 return "*";
2084 }
2085
2086 fn deinit(self: *Connection, gpa: Allocator) void {
2087 self.read_queue.deinit(gpa);
2088 self.write_buf.deinit(gpa);
2089 }
2090
2091 pub fn write(self: *Connection, gpa: Allocator, bytes: []const u8) Allocator.Error!void {
2092 try self.write_buf.appendSlice(gpa, bytes);
2093 }
2094
2095 pub fn print(self: *Connection, gpa: Allocator, comptime fmt: []const u8, args: anytype) Allocator.Error!void {
2096 return self.write_buf.writer(gpa).print(fmt, args);
2097 }
2098
2099 fn enableCap(self: *Connection, cap: Capability) Allocator.Error!void {
2100 switch (cap) {
2101 .@"away-notify" => self.caps.@"away-notify" = true,
2102 .batch => {}, // TODO: do we care?
2103 .@"draft/chathistory" => self.caps.@"draft/chathistory" = true,
2104 .@"draft/read-marker" => self.caps.@"draft/read-marker" = true,
2105 .@"draft/no-implicit-names" => self.caps.@"draft/no-implicit-names" = true,
2106 .@"echo-message" => self.caps.@"echo-message" = true,
2107 .@"message-tags" => self.caps.@"message-tags" = true,
2108 .@"server-time" => self.caps.@"server-time" = true,
2109 .sasl => {}, // We don't track sasl as a requested cap, we just respond to AUTHENTICATE
2110 }
2111 }
2112};
2113
2114// TODO: GC this. We need to move all used completions to the start, and then prune unused to some
2115// percentage
2116const MemoryPoolUnmanaged = struct {
2117 list: std.ArrayListUnmanaged(*xev.Completion),
2118 free_list: std.ArrayListUnmanaged(bool),
2119
2120 const empty: MemoryPoolUnmanaged = .{ .list = .empty, .free_list = .empty };
2121
2122 fn create(self: *MemoryPoolUnmanaged, gpa: Allocator) Allocator.Error!*xev.Completion {
2123 // Look in our list for the first free item
2124 for (self.free_list.items, 0..) |free, i| {
2125 if (free) {
2126 self.free_list.items[i] = false;
2127 return self.list.items[i];
2128 }
2129 }
2130 // Otherwise, we create a new node and add it to the list
2131 const c = try gpa.create(xev.Completion);
2132 c.* = .{};
2133 try self.list.append(gpa, c);
2134 try self.free_list.append(gpa, false);
2135 return c;
2136 }
2137
2138 fn destroy(self: *MemoryPoolUnmanaged, item: *xev.Completion) void {
2139 for (self.list.items, 0..) |c, i| {
2140 if (c == item) {
2141 self.free_list.items[i] = true;
2142 return;
2143 }
2144 }
2145 unreachable;
2146 }
2147
2148 fn deinit(self: *MemoryPoolUnmanaged, gpa: Allocator) void {
2149 for (self.list.items) |node| {
2150 gpa.destroy(node);
2151 }
2152 self.list.deinit(gpa);
2153 self.free_list.deinit(gpa);
2154 }
2155};
2156
2157/// Reads one line from the stream. If the command does not match, we fail the test
2158fn expectResponse(stream: std.net.Stream, response: []const u8) !void {
2159 var buf: [512]u8 = undefined;
2160 const actual = try stream.reader().readUntilDelimiter(&buf, '\n');
2161 try std.testing.expectEqualStrings(response, std.mem.trimRight(u8, actual, "\r\n"));
2162}
2163
2164const TestServer = struct {
2165 server: Server,
2166 cond: std.atomic.Value(bool),
2167 thread: std.Thread,
2168
2169 streams: std.ArrayListUnmanaged(std.net.Stream),
2170
2171 fn init(self: *TestServer, gpa: Allocator) !void {
2172 self.* = .{
2173 .server = undefined,
2174 .cond = .init(true),
2175 .thread = undefined,
2176 .streams = .empty,
2177 };
2178 try self.server.init(gpa, .{
2179 .hostname = "localhost",
2180 .irc_port = 0,
2181 .http_port = null,
2182 .auth = .none,
2183 .db_path = ":memory:",
2184 });
2185 var wg: std.Thread.WaitGroup = .{};
2186 wg.start();
2187 self.thread = try std.Thread.spawn(.{}, Server.runUntil, .{ &self.server, &self.cond, &wg });
2188 wg.wait();
2189 }
2190
2191 fn deinit(self: *TestServer) void {
2192 // Close the connection
2193 self.cond.store(false, .unordered);
2194
2195 if (self.server.wakeup.notify()) {
2196 self.thread.join();
2197 } else |err| {
2198 log.err("Failed to notify wakeup: {}", .{err});
2199 self.thread.detach();
2200 }
2201 self.server.deinit();
2202 for (self.streams.items) |stream| {
2203 stream.close();
2204 }
2205 self.streams.deinit(std.testing.allocator);
2206 self.* = undefined;
2207 }
2208
2209 fn port(self: *TestServer) u16 {
2210 return self.server.address.getPort();
2211 }
2212
2213 fn createConnections(self: *TestServer, n: usize) ![]*Connection {
2214 try self.streams.ensureUnusedCapacity(std.testing.allocator, n);
2215 for (0..n) |_| {
2216 const stream = try std.net.tcpConnectToHost(
2217 std.testing.allocator,
2218 "localhost",
2219 self.port(),
2220 );
2221 self.streams.appendAssumeCapacity(stream);
2222 }
2223
2224 // Sleep for up to 1 second to wait for all the connections
2225 for (0..1_000) |_| {
2226 if (self.server.connections.count() == n) break;
2227 std.time.sleep(1 * std.time.ns_per_ms);
2228 } else return error.Timeout;
2229
2230 return self.server.connections.values();
2231 }
2232};
2233
2234test "Message: CAP" {
2235 var ts: TestServer = undefined;
2236 try ts.init(std.testing.allocator);
2237 defer ts.deinit();
2238
2239 const conns = try ts.createConnections(1);
2240 const client = conns[0];
2241
2242 {
2243 // Happy path
2244 try ts.server.handleMessage(client, .init("CAP LS 302"));
2245 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost CAP * LS");
2246 try std.testing.expectStringEndsWith(client.write_buf.items, "\r\n");
2247 client.write_buf.clearRetainingCapacity();
2248
2249 // ACK
2250 try ts.server.handleMessage(client, .init("CAP REQ sasl"));
2251 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost CAP * ACK sasl\r\n");
2252 client.write_buf.clearRetainingCapacity();
2253
2254 // NAK
2255 try ts.server.handleMessage(client, .init("CAP REQ foo"));
2256 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost CAP * NAK foo\r\n");
2257 client.write_buf.clearRetainingCapacity();
2258 }
2259
2260 {
2261 // Not enough parameters
2262 try ts.server.handleMessage(client, .init("CAP"));
2263 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost 461 * CAP");
2264 try std.testing.expectStringEndsWith(client.write_buf.items, "\r\n");
2265 client.write_buf.clearRetainingCapacity();
2266 }
2267
2268 {
2269 // Invalid Parameters
2270 try ts.server.handleMessage(client, .init("CAP foo"));
2271 try std.testing.expectStringStartsWith(client.write_buf.items, ":localhost 410 * foo");
2272 try std.testing.expectStringEndsWith(client.write_buf.items, "\r\n");
2273 client.write_buf.clearRetainingCapacity();
2274 }
2275}