Monorepo for Aesthetic.Computer
aesthetic.computer
1// Chat Manager, 25.11.28
2// Multi-instance chat support for session-server
3// Adapted from nanos/chat.mjs to run multiple chat instances in one process
4
5import { WebSocket } from "ws";
6import { promises as fs } from "fs";
7import fetch from "node-fetch";
8import https from "https";
9
10import { filter } from "./filter.mjs";
11import { redact, unredact } from "./redact.mjs";
12import { ensureIndexes as ensureHeartsIndexes, toggleHeart, countHearts } from "./hearts.mjs";
13
14import { MongoClient, ObjectId } from "mongodb";
15import { getMessaging } from "firebase-admin/messaging";
16
17const MAX_MESSAGES = 500;
18
19// Chat instance configurations
20export const chatInstances = {
21 "chat-system.aesthetic.computer": {
22 name: "chat-system",
23 allowedHost: "chat-system.aesthetic.computer",
24 userInfoEndpoint: "https://aesthetic.us.auth0.com/userinfo",
25 topic: "mood",
26 },
27 "chat.sotce.net": {
28 name: "chat-sotce",
29 allowedHost: "chat.sotce.net",
30 userInfoEndpoint: "https://sotce.us.auth0.com/userinfo",
31 topic: "mood",
32 },
33 "chat-clock.aesthetic.computer": {
34 name: "chat-clock",
35 allowedHost: "chat-clock.aesthetic.computer",
36 userInfoEndpoint: "https://aesthetic.us.auth0.com/userinfo",
37 topic: "mood",
38 },
39};
40
41// Development host mappings (localhost ports)
42const devHostMappings = {
43 "localhost:8083": "chat-system.aesthetic.computer",
44 "localhost:8084": "chat.sotce.net",
45 "localhost:8085": "chat-clock.aesthetic.computer",
46};
47
48export class ChatManager {
49 constructor(options = {}) {
50 this.dev = options.dev || false;
51 this.filterDebug = options.filterDebug || false;
52 this.loggerKey = options.loggerKey || process.env.LOGGER_KEY;
53 this.activityEmitter = options.activityEmitter || null;
54
55 // MongoDB
56 this.mongoClient = null;
57 this.db = null;
58 this.mongoConnectionString = options.mongoConnectionString || process.env.MONGODB_CONNECTION_STRING;
59 this.mongoDbName = options.mongoDbName || process.env.MONGODB_NAME || "aesthetic";
60
61 // HTTPS agent for dev mode
62 this.agent = this.dev ? new https.Agent({ rejectUnauthorized: false }) : null;
63
64 // Per-instance state
65 this.instances = {};
66 for (const [host, config] of Object.entries(chatInstances)) {
67 this.instances[host] = {
68 config,
69 messages: [],
70 connections: {},
71 connectionId: 0,
72 authorizedConnections: {},
73 subsToHandles: {},
74 subsToSubscribers: {},
75 };
76 }
77
78 console.log("💬 ChatManager initialized with instances:", Object.keys(chatInstances).join(", "));
79 }
80
81 setActivityEmitter(emitter) {
82 this.activityEmitter = typeof emitter === "function" ? emitter : null;
83 }
84
85 emitActivity(payload) {
86 if (!this.activityEmitter) return;
87 try {
88 this.activityEmitter(payload);
89 } catch (err) {
90 console.error("💬 Activity emitter failure:", err);
91 }
92 }
93
94 async init() {
95 // Connect to MongoDB
96 if (this.mongoConnectionString) {
97 try {
98 console.log("💬 Connecting to MongoDB...");
99 this.mongoClient = new MongoClient(this.mongoConnectionString);
100 await this.mongoClient.connect();
101 this.db = this.mongoClient.db(this.mongoDbName);
102 console.log("💬 MongoDB connected!");
103
104 // Ensure hearts indexes exist
105 await ensureHeartsIndexes(this.db);
106
107 // Load messages for each instance
108 for (const [host, instance] of Object.entries(this.instances)) {
109 await this.loadMessages(instance);
110 }
111 } catch (err) {
112 console.error("💬 MongoDB connection failed:", err);
113 }
114 } else {
115 console.log("💬 No MongoDB connection string, running without persistence");
116 }
117 }
118
119 async loadMessages(instance) {
120 if (!this.db) return;
121
122 const collectionName = instance.config.name;
123 console.log(`💬 Loading messages for ${collectionName}...`);
124
125 try {
126 const chatCollection = this.db.collection(collectionName);
127 let combinedMessages;
128
129 if (collectionName === "chat-sotce") {
130 combinedMessages = (await chatCollection
131 .find({})
132 .sort({ when: -1 })
133 .limit(MAX_MESSAGES)
134 .toArray()).reverse();
135 } else if (collectionName !== "chat-system") {
136 combinedMessages = (await chatCollection
137 .find({})
138 .sort({ when: -1 })
139 .limit(MAX_MESSAGES)
140 .toArray()).reverse();
141 } else {
142 // chat-system includes logs
143 combinedMessages = (
144 await chatCollection
145 .aggregate([
146 {
147 $unionWith: {
148 coll: "logs",
149 pipeline: [{ $match: {} }],
150 },
151 },
152 { $sort: { when: -1 } },
153 { $limit: MAX_MESSAGES },
154 ])
155 .toArray()
156 ).reverse();
157 }
158
159 // Deduplicate messages (removes duplicates caused by persist-on-shutdown bug)
160 {
161 const seen = new Set();
162 combinedMessages = combinedMessages.filter((msg) => {
163 const key = `${msg.user || msg.from}:${msg.text}:${msg.when?.getTime?.() ?? msg.when}`;
164 if (seen.has(key)) return false;
165 seen.add(key);
166 return true;
167 });
168 }
169
170 // Check mutes for chat-system
171 if (collectionName === "chat-system") {
172 for (const msg of combinedMessages) {
173 if (await this.isMuted(instance, msg.user)) {
174 redact(msg);
175 }
176 }
177 }
178
179 instance.messages = [];
180 for (const message of combinedMessages) {
181 let from;
182 if (message.user) {
183 from = await this.getHandleFromSub(instance, message.user);
184 } else {
185 from = message.from || "deleted";
186 }
187
188 const msg = {
189 from,
190 text: message.deleted
191 ? "[deleted]"
192 : (instance.config.name === "chat-clock" ? message.text : filter(message.text, this.filterDebug)) || "message forgotten",
193 redactedText: message.redactedText,
194 when: message.when,
195 sub: message.user || undefined,
196 font: message.font || "font_1", // 🔤 Include font from DB (default for old messages)
197 };
198 if (message._id) msg.id = message._id.toString();
199 if (message.deleted) msg.deleted = true;
200 instance.messages.push(msg);
201 }
202
203 console.log(`💬 Loaded ${instance.messages.length} messages for ${collectionName}`);
204 } catch (err) {
205 console.error(`💬 Failed to load messages for ${collectionName}:`, err);
206 }
207 }
208
209 // Check if a host should be handled by chat manager
210 isChatHost(host) {
211 // Direct match
212 if (chatInstances[host]) return true;
213 // Dev mapping
214 if (this.dev && devHostMappings[host]) return true;
215 return false;
216 }
217
218 // Get the instance for a host
219 getInstance(host) {
220 // Direct match
221 if (this.instances[host]) return this.instances[host];
222 // Dev mapping
223 if (this.dev && devHostMappings[host]) {
224 return this.instances[devHostMappings[host]];
225 }
226 return null;
227 }
228
229 // Handle a new WebSocket connection
230 async handleConnection(ws, req) {
231 const host = req.headers.host;
232 const instance = this.getInstance(host);
233
234 if (!instance) {
235 console.log("💬 Unknown chat host:", host);
236 ws.close(1008, "Unknown host");
237 return;
238 }
239
240 // Validate host in production
241 if (!this.dev && host !== instance.config.allowedHost) {
242 ws.close(1008, "Policy violation");
243 return;
244 }
245
246 const id = instance.connectionId++;
247 instance.connections[id] = ws;
248
249 const ip = req.socket.remoteAddress || "localhost";
250 ws.isAlive = true;
251
252 ws.on("pong", () => {
253 ws.isAlive = true;
254 });
255
256 console.log(
257 `💬 [${instance.config.name}] Connection ${id} from ${ip}, online: ${Object.keys(instance.connections).length}`
258 );
259
260 // Set up ping interval for this connection
261 const pingInterval = setInterval(() => {
262 if (ws.isAlive === false) {
263 clearInterval(pingInterval);
264 return ws.terminate();
265 }
266 ws.isAlive = false;
267 ws.ping();
268 }, 15000);
269
270 ws.on("message", async (data) => {
271 await this.handleMessage(instance, ws, id, data);
272 });
273
274 ws.on("close", () => {
275 clearInterval(pingInterval);
276 delete instance.connections[id];
277 delete instance.authorizedConnections[id];
278
279 console.log(
280 `💬 [${instance.config.name}] Connection ${id} closed, online: ${Object.keys(instance.connections).length}`
281 );
282
283 // Broadcast updated online handles after removing this connection
284 this.broadcastOnlineHandles(instance);
285
286 this.broadcast(instance, this.pack("left", {
287 chatters: Object.keys(instance.connections).length,
288 handles: this.getOnlineHandles(instance)
289 }, id));
290 });
291
292 // Load heart counts for current message window
293 let heartCounts = {};
294 if (this.db) {
295 const messageIds = instance.messages.filter((m) => m.id).map((m) => m.id);
296 heartCounts = await countHearts(this.db, instance.config.name, messageIds);
297 }
298
299 // Send welcome message
300 ws.send(
301 this.pack(
302 "connected",
303 {
304 message: `Joined \`${instance.config.name}\` • 🧑🤝🧑 ${Object.keys(instance.connections).length}`,
305 chatters: Object.keys(instance.connections).length,
306 handles: this.getOnlineHandles(instance),
307 messages: instance.messages,
308 heartCounts,
309 id,
310 },
311 id,
312 ),
313 );
314
315 // Notify others
316 this.broadcastOthers(instance, ws, this.pack(
317 "joined",
318 {
319 text: `${id} has joined. Connections open: ${Object.keys(instance.connections).length}`,
320 chatters: Object.keys(instance.connections).length,
321 handles: this.getOnlineHandles(instance),
322 },
323 id,
324 ));
325 }
326
327 async handleMessage(instance, ws, id, data) {
328 let msg;
329 try {
330 msg = JSON.parse(data.toString());
331 } catch (err) {
332 console.log("💬 Failed to parse message:", err);
333 return;
334 }
335
336 msg.id = id;
337
338 if (msg.type === "logout") {
339 console.log(`💬 [${instance.config.name}] User logged out`);
340 delete instance.authorizedConnections[id];
341 } else if (msg.type === "chat:message") {
342 await this.handleChatMessage(instance, ws, id, msg);
343 } else if (msg.type === "chat:delete") {
344 await this.handleDeleteMessage(instance, ws, id, msg);
345 } else if (msg.type === "chat:heart") {
346 await this.handleChatHeart(instance, ws, id, msg);
347 }
348 }
349
350 async handleChatMessage(instance, ws, id, msg) {
351 console.log(
352 `💬 [${instance.config.name}] Message from ${msg.content.sub} (${msg.content.text.length} chars)`
353 );
354
355 // Mute check
356 if (await this.isMuted(instance, msg.content.sub)) {
357 ws.send(this.pack("muted", { message: "Your user has been muted." }));
358 return;
359 }
360
361 // Length limit
362 const len = 128;
363 if (msg.content.text.length > len) {
364 ws.send(this.pack("too-long", { message: `Please limit to ${len} characters.` }));
365 return;
366 }
367
368 // Authorization (Auth0 token or AC device token)
369 let authorized;
370 if (instance.authorizedConnections[id]?.token === msg.content.token) {
371 authorized = instance.authorizedConnections[id].user;
372 console.log("💬 Pre-authorized");
373 } else {
374 console.log("💬 Authorizing...", "handle:", msg.content.handle, "token:", msg.content.token?.slice(0, 10) + "...", "content-type:", typeof msg.content);
375 authorized = await this.authorize(instance, msg.content.token);
376 // Fallback: AC device token (from ac-native device-token API)
377 if (!authorized && msg.content.handle && msg.content.token) {
378 console.log("💬 Trying device token auth for @" + msg.content.handle);
379 authorized = await this.authorizeDeviceToken(instance, msg.content.handle, msg.content.token);
380 }
381 if (authorized) {
382 instance.authorizedConnections[id] = {
383 token: msg.content.token,
384 user: authorized,
385 };
386 }
387 }
388
389 // Get handle
390 let handle, subscribed;
391 if (authorized) {
392 handle = await this.getHandleFromSub(instance, authorized.sub);
393
394 // Store handle in authorizedConnections for online list
395 instance.authorizedConnections[id].handle = handle;
396
397 // Broadcast updated online handles to all clients
398 this.broadcastOnlineHandles(instance);
399
400 // Subscription check for sotce
401 if (instance.config.name === "chat-sotce") {
402 subscribed = await this.checkSubscription(instance, authorized.sub, msg.content.token);
403 } else {
404 subscribed = true;
405 }
406 }
407
408 if (!authorized || !handle || !subscribed) {
409 console.error("💬 Unauthorized:", { authorized: !!authorized, handle, subscribed });
410 ws.send(this.pack("unauthorized", { message: "Please login and/or subscribe." }, id));
411 return;
412 }
413
414 // Process and store message
415 try {
416 const message = msg.content;
417 const fromSub = message.sub;
418 let filteredText;
419 const userIsMuted = await this.isMuted(instance, fromSub);
420
421 if (userIsMuted) {
422 redact(message);
423 filteredText = message.text;
424 } else {
425 filteredText = instance.config.name === "chat-clock" ? message.text : filter(message.text, this.filterDebug);
426 }
427
428 // Get server time
429 let when = new Date();
430 if (!this.dev) {
431 try {
432 const clockResponse = await fetch("https://aesthetic.computer/api/clock");
433 if (clockResponse.ok) {
434 const serverTimeISO = await clockResponse.text();
435 when = new Date(serverTimeISO);
436 }
437 } catch (err) {
438 console.log("💬 Clock fetch failed, using local time");
439 }
440 }
441
442 // Store in MongoDB (production only, non-muted users)
443 let insertedId;
444 if (!this.dev && !userIsMuted && this.db) {
445 const dbmsg = {
446 user: message.sub,
447 text: message.text,
448 when,
449 font: message.font || "font_1", // 🔤 Store user's font preference
450 };
451 const collection = this.db.collection(instance.config.name);
452 await collection.createIndex({ when: 1 });
453 const result = await collection.insertOne(dbmsg);
454 insertedId = result.insertedId?.toString();
455 console.log("💬 Message stored");
456 }
457
458 const out = {
459 from: handle,
460 text: filteredText,
461 redactedText: message.redactedText,
462 when,
463 sub: fromSub,
464 font: message.font || "font_1", // 🔤 Include font in broadcast
465 };
466 if (insertedId) out.id = insertedId;
467
468 // Duplicate detection
469 const lastMsg = instance.messages[instance.messages.length - 1];
470 if (lastMsg && lastMsg.sub === out.sub && lastMsg.text === out.text && !lastMsg.count) {
471 lastMsg.count = (lastMsg.count || 1) + 1;
472 this.broadcast(instance, this.pack("message:update", { index: instance.messages.length - 1, count: lastMsg.count }));
473 } else {
474 instance.messages.push(out);
475 if (instance.messages.length > MAX_MESSAGES) instance.messages.shift();
476 this.broadcast(instance, this.pack("message", out));
477 }
478
479 if (handle && handle.startsWith("@")) {
480 const compactText = `${filteredText || ""}`.replace(/\s+/g, " ").trim();
481 const preview =
482 compactText.length > 80 ? `${compactText.slice(0, 77)}...` : compactText;
483
484 this.emitActivity({
485 handle,
486 event: {
487 type: "chat",
488 when: when?.getTime?.() || Date.now(),
489 label: `Chat ${instance.config.name.replace("chat-", "")}: ${preview}`,
490 piece: instance.config.name,
491 ref: insertedId || null,
492 text: compactText,
493 },
494 countsDelta: { chats: 1 },
495 });
496 }
497
498 // Push notification (production only, non-muted, chat-system and chat-clock only)
499 // Note: chat-sotce is intentionally excluded from push notifications
500 if (!this.dev && !userIsMuted && instance.config.name !== "chat-sotce") {
501 this.notify(instance, handle, filteredText, when);
502 }
503 } catch (err) {
504 console.error("💬 Message handling error:", err);
505 }
506 }
507
508 async handleDeleteMessage(instance, ws, id, msg) {
509 const { token, sub, id: messageId } = msg.content;
510 console.log(
511 `💬 [${instance.config.name}] Delete request from ${sub} for message ${messageId}`
512 );
513
514 if (!messageId) {
515 ws.send(this.pack("error", { message: "Missing message id." }));
516 return;
517 }
518
519 // Authorize the user
520 let authorized;
521 if (instance.authorizedConnections[id]?.token === token) {
522 authorized = instance.authorizedConnections[id].user;
523 } else {
524 authorized = await this.authorize(instance, token);
525 if (authorized) {
526 instance.authorizedConnections[id] = { token, user: authorized };
527 }
528 }
529
530 if (!authorized || authorized.sub !== sub) {
531 ws.send(this.pack("unauthorized", { message: "Please login." }, id));
532 return;
533 }
534
535 // Find the message in memory and verify ownership
536 const message = instance.messages.find((m) => m.id === messageId);
537 if (!message) {
538 ws.send(this.pack("error", { message: "Message not found." }));
539 return;
540 }
541 if (message.sub !== sub) {
542 ws.send(this.pack("error", { message: "You can only delete your own messages." }));
543 return;
544 }
545
546 // Already deleted
547 if (message.deleted) return;
548
549 // Soft-delete in MongoDB
550 if (this.db) {
551 try {
552 const collection = this.db.collection(instance.config.name);
553 await collection.updateOne(
554 { _id: new ObjectId(messageId) },
555 { $set: { deleted: true } }
556 );
557 console.log("💬 Message soft-deleted in DB");
558 } catch (err) {
559 console.error("💬 Failed to delete message in DB:", err);
560 }
561 }
562
563 // Update in-memory
564 message.text = "[deleted]";
565 message.deleted = true;
566 delete message.redactedText;
567
568 // Broadcast deletion to all clients
569 this.broadcast(instance, this.pack("message:delete", { id: messageId }));
570 }
571
572 async handleChatHeart(instance, ws, id, msg) {
573 const { for: forId, token } = msg.content || {};
574 if (!forId || !token) return;
575
576 // Reuse cached auth or re-authorize
577 let authorized;
578 if (instance.authorizedConnections[id]?.token === token) {
579 authorized = instance.authorizedConnections[id].user;
580 } else {
581 authorized = await this.authorize(instance, token);
582 if (authorized) {
583 instance.authorizedConnections[id] = { token, user: authorized };
584 }
585 }
586
587 if (!authorized) {
588 ws.send(this.pack("unauthorized", { message: "Please login." }, id));
589 return;
590 }
591
592 if (!this.db) return;
593
594 try {
595 const { hearted, count } = await toggleHeart(this.db, {
596 user: authorized.sub,
597 type: instance.config.name,
598 for: forId,
599 });
600 console.log(`💬 [${instance.config.name}] Heart ${hearted ? "+" : "-"} on ${forId} → ${count}`);
601 this.broadcast(instance, this.pack("message:hearts", { for: forId, count }));
602 } catch (err) {
603 console.error("💬 Heart error:", err);
604 }
605 }
606
607 async authorizeDeviceToken(instance, handle, token) {
608 // AC device tokens are "hmac.timestamp" — validate by looking up handle in MongoDB
609 if (!token || !token.includes(".") || !handle) return undefined;
610 try {
611 const cleanHandle = handle.replace("@", "");
612 const doc = await this.db.collection("@handles").findOne({ handle: cleanHandle });
613 if (doc && doc._id) {
614 console.log("💬 Device token authorized for @" + cleanHandle + " (sub: " + doc._id + ")");
615 return { sub: doc._id };
616 }
617 console.log("💬 Device token: handle @" + cleanHandle + " not found in DB");
618 return undefined;
619 } catch (err) {
620 console.error("💬 Device token auth error:", err);
621 return undefined;
622 }
623 }
624
625 async authorize(instance, token) {
626 try {
627 const response = await fetch(instance.config.userInfoEndpoint, {
628 headers: {
629 Authorization: "Bearer " + token,
630 "Content-Type": "application/json",
631 },
632 });
633
634 if (response.status === 200) {
635 return response.json();
636 }
637 return undefined;
638 } catch (err) {
639 console.error("💬 Authorization error:", err);
640 return undefined;
641 }
642 }
643
644 async getHandleFromSub(instance, fromSub) {
645 if (instance.subsToHandles[fromSub]) {
646 return "@" + instance.subsToHandles[fromSub];
647 }
648
649 try {
650 let prefix = instance.config.name === "chat-sotce" ? "sotce-" : "";
651 let host = this.dev ? "https://localhost:8888" :
652 (instance.config.name === "chat-sotce" ? "https://sotce.net" : "https://aesthetic.computer");
653
654 const options = {};
655 if (this.dev) options.agent = this.agent;
656
657 const response = await fetch(`${host}/handle?for=${prefix}${fromSub}`, options);
658 if (response.status === 200) {
659 const data = await response.json();
660 instance.subsToHandles[fromSub] = data.handle;
661 return "@" + data.handle;
662 }
663 } catch (err) {
664 console.error("💬 Handle lookup error:", err);
665 }
666
667 return "@unknown";
668 }
669
670 async checkSubscription(instance, sub, token) {
671 if (instance.subsToSubscribers[sub] !== undefined) {
672 return instance.subsToSubscribers[sub];
673 }
674
675 const host = this.dev ? "https://localhost:8888" : "https://sotce.net";
676 const options = {
677 method: "POST",
678 body: JSON.stringify({ retrieve: "subscription" }),
679 headers: {
680 Authorization: "Bearer " + token,
681 "Content-Type": "application/json",
682 },
683 };
684
685 if (this.dev) options.agent = this.agent;
686
687 try {
688 const response = await fetch(`${host}/sotce-net/subscribed`, options);
689 if (response.status === 200) {
690 const data = await response.json();
691 instance.subsToSubscribers[sub] = data.subscribed;
692 return data.subscribed;
693 }
694 } catch (err) {
695 console.error("💬 Subscription check error:", err);
696 }
697
698 instance.subsToSubscribers[sub] = false;
699 return false;
700 }
701
702 async isMuted(instance, sub) {
703 if (!sub || !this.db) return false;
704 try {
705 const mutesCollection = this.db.collection(instance.config.name + "-mutes");
706 const mute = await mutesCollection.findOne({ user: sub });
707 return !!mute;
708 } catch (err) {
709 return false;
710 }
711 }
712
713 notify(instance, handle, text, when) {
714 let title = handle + " 💬";
715
716 if (instance.config.name === "chat-clock") {
717 const getClockEmoji = (date) => {
718 let hour = date.getHours() % 12 || 12;
719 const minutes = date.getMinutes();
720 const emojiCode = minutes < 30 ? (0x1f550 + hour - 1) : (0x1f55c + hour - 1);
721 return String.fromCodePoint(emojiCode);
722 };
723 title = handle + " " + getClockEmoji(when);
724 }
725
726 try {
727 getMessaging().send({
728 notification: { title, body: text },
729 apns: {
730 payload: {
731 aps: {
732 "mutable-content": 1,
733 "interruption-level": "time-sensitive",
734 priority: 10,
735 "content-available": 1,
736 },
737 },
738 headers: {
739 "apns-priority": "10",
740 "apns-push-type": "alert",
741 "apns-expiration": "0",
742 },
743 },
744 webpush: {
745 headers: {
746 Urgency: "high",
747 TTL: "0",
748 image: "https://aesthetic.computer/api/logo.png",
749 },
750 },
751 topic: instance.config.topic,
752 data: { piece: "chat" },
753 }).then((response) => {
754 console.log("💬 Notification sent:", response);
755 }).catch((err) => {
756 console.log("💬 Notification error:", err);
757 });
758 } catch (err) {
759 console.error("💬 Notification error:", err);
760 }
761 }
762
763 // Handle HTTP log endpoint
764 async handleLog(instance, body, authHeader) {
765 const token = authHeader?.split(" ")[1];
766 if (token !== this.loggerKey) {
767 return { status: 403, body: { status: "error", message: "Forbidden" } };
768 }
769
770 try {
771 const parsed = typeof body === "string" ? JSON.parse(body) : body;
772 console.log(`💬 [${instance.config.name}] Log received from: ${parsed.from || 'unknown'}`);
773
774 instance.messages.push(parsed);
775 if (instance.messages.length > MAX_MESSAGES) instance.messages.shift();
776
777 // Handle actions (mute/unmute, handle updates)
778 if (parsed.action) {
779 await this.handleLogAction(instance, parsed);
780 }
781
782 this.broadcast(instance, this.pack("message", parsed));
783
784 if (instance.config.name === "chat-system") {
785 this.notify(instance, "log 🪵", parsed.text, new Date());
786 }
787
788 return { status: 200, body: { status: "success", message: "Log received" } };
789 } catch (err) {
790 return { status: 400, body: { status: "error", message: "Malformed log JSON" } };
791 }
792 }
793
794 async handleLogAction(instance, parsed) {
795 let [object, behavior] = (parsed.action || "").split(":");
796 if (!behavior) {
797 behavior = object;
798 object = null;
799 }
800
801 if (object === "chat-system" && (behavior === "mute" || behavior === "unmute")) {
802 const user = parsed.users[0];
803 if (behavior === "mute") {
804 instance.messages.forEach((msg) => {
805 if (msg.sub === user) redact(msg);
806 });
807 } else {
808 instance.messages.forEach((msg) => {
809 if (msg.sub === user) unredact(msg);
810 });
811 }
812 this.broadcast(instance, this.pack(parsed.action, { user }));
813 }
814
815 if (object === "handle") {
816 if (behavior === "colors") {
817 // Broadcast handle color changes to all connected clients.
818 const data = JSON.parse(parsed.value);
819 this.broadcast(instance, this.pack("handle:colors", { user: parsed.users[0], handle: data.handle, colors: data.colors }));
820 } else {
821 instance.subsToHandles[parsed.users[0]] = parsed.value;
822
823 if (behavior === "update" || behavior === "strip") {
824 const from = behavior === "update" ? "@" + parsed.value : "nohandle";
825 instance.messages.forEach((msg) => {
826 if (msg.sub === parsed.users[0]) {
827 msg.from = from;
828 }
829 });
830 this.broadcast(instance, this.pack(parsed.action, { user: parsed.users[0], handle: from }));
831 }
832 }
833 }
834 }
835
836 pack(type, content, id) {
837 if (typeof content === "object") content = JSON.stringify(content);
838 return JSON.stringify({ type, content, id });
839 }
840
841 broadcast(instance, message) {
842 Object.values(instance.connections).forEach((c) => {
843 if (c?.readyState === WebSocket.OPEN) c.send(message);
844 });
845 }
846
847 broadcastOthers(instance, exclude, message) {
848 Object.values(instance.connections).forEach((c) => {
849 if (c !== exclude && c?.readyState === WebSocket.OPEN) c.send(message);
850 });
851 }
852
853 // Set the presence resolver function (called from session.mjs)
854 // This allows us to query which users are on a specific piece
855 setPresenceResolver(resolver) {
856 this.presenceResolver = resolver;
857 }
858
859 // Get list of online handles for an instance (all connected & authorized)
860 getOnlineHandles(instance) {
861 const handles = [];
862 for (const [id, auth] of Object.entries(instance.authorizedConnections)) {
863 if (auth.handle && instance.connections[id]) {
864 handles.push(auth.handle);
865 }
866 }
867 return [...new Set(handles)]; // Remove duplicates
868 }
869
870 // Get handles of users actually viewing the chat piece right now
871 getHereHandles(instance) {
872 if (!this.presenceResolver) return [];
873 // Get all users on "chat" piece from session server
874 const onChatPiece = this.presenceResolver("chat");
875 // Intersect with authorized handles for this chat instance
876 const onlineHandles = this.getOnlineHandles(instance);
877 return onChatPiece.filter(h => onlineHandles.includes(h));
878 }
879
880 // Broadcast presence data (online + here) to all clients
881 broadcastOnlineHandles(instance) {
882 const online = this.getOnlineHandles(instance);
883 const here = this.getHereHandles(instance);
884 // Send both for backwards compatibility and new "here" feature
885 this.broadcast(instance, this.pack("presence", {
886 handles: online, // backwards compat with old "online-handles"
887 online, // all authorized chat connections
888 here // only those actually on chat piece
889 }));
890 }
891
892 // Get status for a specific instance or all
893 getStatus(host = null) {
894 if (host) {
895 const instance = this.getInstance(host);
896 if (!instance) return null;
897 return {
898 name: instance.config.name,
899 connections: Object.keys(instance.connections).length,
900 messages: instance.messages.length,
901 };
902 }
903
904 return Object.entries(this.instances).map(([host, instance]) => ({
905 host,
906 name: instance.config.name,
907 connections: Object.keys(instance.connections).length,
908 messages: instance.messages.length,
909 }));
910 }
911
912 // Get recent messages for a specific instance (for dashboard)
913 getRecentMessages(host, count = 10) {
914 const instance = this.getInstance(host);
915 if (!instance) return [];
916
917 // Return the most recent messages, but don't expose sensitive data
918 return instance.messages.slice(-count).map(msg => ({
919 from: msg.from || 'unknown',
920 text: msg.text || '',
921 when: msg.when
922 })).reverse(); // Most recent first
923 }
924
925 // Persist any in-memory messages that weren't saved to MongoDB.
926 // Messages received while this.db was null (broken env) have a `sub` field
927 // but were never inserted. Messages loaded from DB on startup do not have `sub`.
928 async persistAllMessages() {
929 // Establish a connection if we don't have one
930 if (!this.db && this.mongoConnectionString) {
931 try {
932 console.log("💬 Connecting to MongoDB for message persistence...");
933 this.mongoClient = new MongoClient(this.mongoConnectionString);
934 await this.mongoClient.connect();
935 this.db = this.mongoClient.db(this.mongoDbName);
936 console.log("💬 MongoDB connected for persistence!");
937 } catch (err) {
938 console.error("💬 Failed to connect to MongoDB for persistence:", err);
939 return 0;
940 }
941 }
942
943 if (!this.db) {
944 console.error("💬 No MongoDB connection available, cannot persist messages");
945 return 0;
946 }
947
948 let totalPersisted = 0;
949
950 for (const [, instance] of Object.entries(this.instances)) {
951 const collectionName = instance.config.name;
952 // Only persist messages that were never saved to DB.
953 // DB-loaded and successfully-stored messages have an `id` (from MongoDB _id).
954 // Messages received while DB was unavailable have `sub` but no `id`.
955 const unpersisted = instance.messages.filter(msg => msg.sub && !msg.id);
956
957 if (unpersisted.length === 0) {
958 console.log(`💬 [${collectionName}] No unpersisted messages`);
959 continue;
960 }
961
962 try {
963 const collection = this.db.collection(collectionName);
964 const docs = unpersisted.map(msg => ({
965 user: msg.sub,
966 text: msg.text,
967 when: msg.when,
968 font: msg.font || "font_1",
969 }));
970
971 const result = await collection.insertMany(docs, { ordered: false });
972 totalPersisted += result.insertedCount;
973 console.log(`💬 [${collectionName}] Persisted ${result.insertedCount} messages`);
974 } catch (err) {
975 // insertMany with ordered:false continues on duplicate errors
976 if (err.insertedCount) totalPersisted += err.insertedCount;
977 console.error(`💬 [${collectionName}] Persistence error:`, err.message);
978 }
979 }
980
981 return totalPersisted;
982 }
983
984 async shutdown() {
985 console.log("💬 ChatManager shutting down...");
986 const count = await this.persistAllMessages();
987 console.log(`💬 Persisted ${count} total messages`);
988
989 if (this.mongoClient) {
990 await this.mongoClient.close();
991 console.log("💬 MongoDB connection closed");
992 }
993 }
994}