Monorepo for Aesthetic.Computer aesthetic.computer
at main 994 lines 32 kB view raw
1// Chat Manager, 25.11.28 2// Multi-instance chat support for session-server 3// Adapted from nanos/chat.mjs to run multiple chat instances in one process 4 5import { WebSocket } from "ws"; 6import { promises as fs } from "fs"; 7import fetch from "node-fetch"; 8import https from "https"; 9 10import { filter } from "./filter.mjs"; 11import { redact, unredact } from "./redact.mjs"; 12import { ensureIndexes as ensureHeartsIndexes, toggleHeart, countHearts } from "./hearts.mjs"; 13 14import { MongoClient, ObjectId } from "mongodb"; 15import { getMessaging } from "firebase-admin/messaging"; 16 17const MAX_MESSAGES = 500; 18 19// Chat instance configurations 20export const chatInstances = { 21 "chat-system.aesthetic.computer": { 22 name: "chat-system", 23 allowedHost: "chat-system.aesthetic.computer", 24 userInfoEndpoint: "https://aesthetic.us.auth0.com/userinfo", 25 topic: "mood", 26 }, 27 "chat.sotce.net": { 28 name: "chat-sotce", 29 allowedHost: "chat.sotce.net", 30 userInfoEndpoint: "https://sotce.us.auth0.com/userinfo", 31 topic: "mood", 32 }, 33 "chat-clock.aesthetic.computer": { 34 name: "chat-clock", 35 allowedHost: "chat-clock.aesthetic.computer", 36 userInfoEndpoint: "https://aesthetic.us.auth0.com/userinfo", 37 topic: "mood", 38 }, 39}; 40 41// Development host mappings (localhost ports) 42const devHostMappings = { 43 "localhost:8083": "chat-system.aesthetic.computer", 44 "localhost:8084": "chat.sotce.net", 45 "localhost:8085": "chat-clock.aesthetic.computer", 46}; 47 48export class ChatManager { 49 constructor(options = {}) { 50 this.dev = options.dev || false; 51 this.filterDebug = options.filterDebug || false; 52 this.loggerKey = options.loggerKey || process.env.LOGGER_KEY; 53 this.activityEmitter = options.activityEmitter || null; 54 55 // MongoDB 56 this.mongoClient = null; 57 this.db = null; 58 this.mongoConnectionString = options.mongoConnectionString || process.env.MONGODB_CONNECTION_STRING; 59 this.mongoDbName = options.mongoDbName || process.env.MONGODB_NAME || "aesthetic"; 60 61 // HTTPS agent for dev mode 62 this.agent = this.dev ? new https.Agent({ rejectUnauthorized: false }) : null; 63 64 // Per-instance state 65 this.instances = {}; 66 for (const [host, config] of Object.entries(chatInstances)) { 67 this.instances[host] = { 68 config, 69 messages: [], 70 connections: {}, 71 connectionId: 0, 72 authorizedConnections: {}, 73 subsToHandles: {}, 74 subsToSubscribers: {}, 75 }; 76 } 77 78 console.log("💬 ChatManager initialized with instances:", Object.keys(chatInstances).join(", ")); 79 } 80 81 setActivityEmitter(emitter) { 82 this.activityEmitter = typeof emitter === "function" ? emitter : null; 83 } 84 85 emitActivity(payload) { 86 if (!this.activityEmitter) return; 87 try { 88 this.activityEmitter(payload); 89 } catch (err) { 90 console.error("💬 Activity emitter failure:", err); 91 } 92 } 93 94 async init() { 95 // Connect to MongoDB 96 if (this.mongoConnectionString) { 97 try { 98 console.log("💬 Connecting to MongoDB..."); 99 this.mongoClient = new MongoClient(this.mongoConnectionString); 100 await this.mongoClient.connect(); 101 this.db = this.mongoClient.db(this.mongoDbName); 102 console.log("💬 MongoDB connected!"); 103 104 // Ensure hearts indexes exist 105 await ensureHeartsIndexes(this.db); 106 107 // Load messages for each instance 108 for (const [host, instance] of Object.entries(this.instances)) { 109 await this.loadMessages(instance); 110 } 111 } catch (err) { 112 console.error("💬 MongoDB connection failed:", err); 113 } 114 } else { 115 console.log("💬 No MongoDB connection string, running without persistence"); 116 } 117 } 118 119 async loadMessages(instance) { 120 if (!this.db) return; 121 122 const collectionName = instance.config.name; 123 console.log(`💬 Loading messages for ${collectionName}...`); 124 125 try { 126 const chatCollection = this.db.collection(collectionName); 127 let combinedMessages; 128 129 if (collectionName === "chat-sotce") { 130 combinedMessages = (await chatCollection 131 .find({}) 132 .sort({ when: -1 }) 133 .limit(MAX_MESSAGES) 134 .toArray()).reverse(); 135 } else if (collectionName !== "chat-system") { 136 combinedMessages = (await chatCollection 137 .find({}) 138 .sort({ when: -1 }) 139 .limit(MAX_MESSAGES) 140 .toArray()).reverse(); 141 } else { 142 // chat-system includes logs 143 combinedMessages = ( 144 await chatCollection 145 .aggregate([ 146 { 147 $unionWith: { 148 coll: "logs", 149 pipeline: [{ $match: {} }], 150 }, 151 }, 152 { $sort: { when: -1 } }, 153 { $limit: MAX_MESSAGES }, 154 ]) 155 .toArray() 156 ).reverse(); 157 } 158 159 // Deduplicate messages (removes duplicates caused by persist-on-shutdown bug) 160 { 161 const seen = new Set(); 162 combinedMessages = combinedMessages.filter((msg) => { 163 const key = `${msg.user || msg.from}:${msg.text}:${msg.when?.getTime?.() ?? msg.when}`; 164 if (seen.has(key)) return false; 165 seen.add(key); 166 return true; 167 }); 168 } 169 170 // Check mutes for chat-system 171 if (collectionName === "chat-system") { 172 for (const msg of combinedMessages) { 173 if (await this.isMuted(instance, msg.user)) { 174 redact(msg); 175 } 176 } 177 } 178 179 instance.messages = []; 180 for (const message of combinedMessages) { 181 let from; 182 if (message.user) { 183 from = await this.getHandleFromSub(instance, message.user); 184 } else { 185 from = message.from || "deleted"; 186 } 187 188 const msg = { 189 from, 190 text: message.deleted 191 ? "[deleted]" 192 : (instance.config.name === "chat-clock" ? message.text : filter(message.text, this.filterDebug)) || "message forgotten", 193 redactedText: message.redactedText, 194 when: message.when, 195 sub: message.user || undefined, 196 font: message.font || "font_1", // 🔤 Include font from DB (default for old messages) 197 }; 198 if (message._id) msg.id = message._id.toString(); 199 if (message.deleted) msg.deleted = true; 200 instance.messages.push(msg); 201 } 202 203 console.log(`💬 Loaded ${instance.messages.length} messages for ${collectionName}`); 204 } catch (err) { 205 console.error(`💬 Failed to load messages for ${collectionName}:`, err); 206 } 207 } 208 209 // Check if a host should be handled by chat manager 210 isChatHost(host) { 211 // Direct match 212 if (chatInstances[host]) return true; 213 // Dev mapping 214 if (this.dev && devHostMappings[host]) return true; 215 return false; 216 } 217 218 // Get the instance for a host 219 getInstance(host) { 220 // Direct match 221 if (this.instances[host]) return this.instances[host]; 222 // Dev mapping 223 if (this.dev && devHostMappings[host]) { 224 return this.instances[devHostMappings[host]]; 225 } 226 return null; 227 } 228 229 // Handle a new WebSocket connection 230 async handleConnection(ws, req) { 231 const host = req.headers.host; 232 const instance = this.getInstance(host); 233 234 if (!instance) { 235 console.log("💬 Unknown chat host:", host); 236 ws.close(1008, "Unknown host"); 237 return; 238 } 239 240 // Validate host in production 241 if (!this.dev && host !== instance.config.allowedHost) { 242 ws.close(1008, "Policy violation"); 243 return; 244 } 245 246 const id = instance.connectionId++; 247 instance.connections[id] = ws; 248 249 const ip = req.socket.remoteAddress || "localhost"; 250 ws.isAlive = true; 251 252 ws.on("pong", () => { 253 ws.isAlive = true; 254 }); 255 256 console.log( 257 `💬 [${instance.config.name}] Connection ${id} from ${ip}, online: ${Object.keys(instance.connections).length}` 258 ); 259 260 // Set up ping interval for this connection 261 const pingInterval = setInterval(() => { 262 if (ws.isAlive === false) { 263 clearInterval(pingInterval); 264 return ws.terminate(); 265 } 266 ws.isAlive = false; 267 ws.ping(); 268 }, 15000); 269 270 ws.on("message", async (data) => { 271 await this.handleMessage(instance, ws, id, data); 272 }); 273 274 ws.on("close", () => { 275 clearInterval(pingInterval); 276 delete instance.connections[id]; 277 delete instance.authorizedConnections[id]; 278 279 console.log( 280 `💬 [${instance.config.name}] Connection ${id} closed, online: ${Object.keys(instance.connections).length}` 281 ); 282 283 // Broadcast updated online handles after removing this connection 284 this.broadcastOnlineHandles(instance); 285 286 this.broadcast(instance, this.pack("left", { 287 chatters: Object.keys(instance.connections).length, 288 handles: this.getOnlineHandles(instance) 289 }, id)); 290 }); 291 292 // Load heart counts for current message window 293 let heartCounts = {}; 294 if (this.db) { 295 const messageIds = instance.messages.filter((m) => m.id).map((m) => m.id); 296 heartCounts = await countHearts(this.db, instance.config.name, messageIds); 297 } 298 299 // Send welcome message 300 ws.send( 301 this.pack( 302 "connected", 303 { 304 message: `Joined \`${instance.config.name}\` • 🧑‍🤝‍🧑 ${Object.keys(instance.connections).length}`, 305 chatters: Object.keys(instance.connections).length, 306 handles: this.getOnlineHandles(instance), 307 messages: instance.messages, 308 heartCounts, 309 id, 310 }, 311 id, 312 ), 313 ); 314 315 // Notify others 316 this.broadcastOthers(instance, ws, this.pack( 317 "joined", 318 { 319 text: `${id} has joined. Connections open: ${Object.keys(instance.connections).length}`, 320 chatters: Object.keys(instance.connections).length, 321 handles: this.getOnlineHandles(instance), 322 }, 323 id, 324 )); 325 } 326 327 async handleMessage(instance, ws, id, data) { 328 let msg; 329 try { 330 msg = JSON.parse(data.toString()); 331 } catch (err) { 332 console.log("💬 Failed to parse message:", err); 333 return; 334 } 335 336 msg.id = id; 337 338 if (msg.type === "logout") { 339 console.log(`💬 [${instance.config.name}] User logged out`); 340 delete instance.authorizedConnections[id]; 341 } else if (msg.type === "chat:message") { 342 await this.handleChatMessage(instance, ws, id, msg); 343 } else if (msg.type === "chat:delete") { 344 await this.handleDeleteMessage(instance, ws, id, msg); 345 } else if (msg.type === "chat:heart") { 346 await this.handleChatHeart(instance, ws, id, msg); 347 } 348 } 349 350 async handleChatMessage(instance, ws, id, msg) { 351 console.log( 352 `💬 [${instance.config.name}] Message from ${msg.content.sub} (${msg.content.text.length} chars)` 353 ); 354 355 // Mute check 356 if (await this.isMuted(instance, msg.content.sub)) { 357 ws.send(this.pack("muted", { message: "Your user has been muted." })); 358 return; 359 } 360 361 // Length limit 362 const len = 128; 363 if (msg.content.text.length > len) { 364 ws.send(this.pack("too-long", { message: `Please limit to ${len} characters.` })); 365 return; 366 } 367 368 // Authorization (Auth0 token or AC device token) 369 let authorized; 370 if (instance.authorizedConnections[id]?.token === msg.content.token) { 371 authorized = instance.authorizedConnections[id].user; 372 console.log("💬 Pre-authorized"); 373 } else { 374 console.log("💬 Authorizing...", "handle:", msg.content.handle, "token:", msg.content.token?.slice(0, 10) + "...", "content-type:", typeof msg.content); 375 authorized = await this.authorize(instance, msg.content.token); 376 // Fallback: AC device token (from ac-native device-token API) 377 if (!authorized && msg.content.handle && msg.content.token) { 378 console.log("💬 Trying device token auth for @" + msg.content.handle); 379 authorized = await this.authorizeDeviceToken(instance, msg.content.handle, msg.content.token); 380 } 381 if (authorized) { 382 instance.authorizedConnections[id] = { 383 token: msg.content.token, 384 user: authorized, 385 }; 386 } 387 } 388 389 // Get handle 390 let handle, subscribed; 391 if (authorized) { 392 handle = await this.getHandleFromSub(instance, authorized.sub); 393 394 // Store handle in authorizedConnections for online list 395 instance.authorizedConnections[id].handle = handle; 396 397 // Broadcast updated online handles to all clients 398 this.broadcastOnlineHandles(instance); 399 400 // Subscription check for sotce 401 if (instance.config.name === "chat-sotce") { 402 subscribed = await this.checkSubscription(instance, authorized.sub, msg.content.token); 403 } else { 404 subscribed = true; 405 } 406 } 407 408 if (!authorized || !handle || !subscribed) { 409 console.error("💬 Unauthorized:", { authorized: !!authorized, handle, subscribed }); 410 ws.send(this.pack("unauthorized", { message: "Please login and/or subscribe." }, id)); 411 return; 412 } 413 414 // Process and store message 415 try { 416 const message = msg.content; 417 const fromSub = message.sub; 418 let filteredText; 419 const userIsMuted = await this.isMuted(instance, fromSub); 420 421 if (userIsMuted) { 422 redact(message); 423 filteredText = message.text; 424 } else { 425 filteredText = instance.config.name === "chat-clock" ? message.text : filter(message.text, this.filterDebug); 426 } 427 428 // Get server time 429 let when = new Date(); 430 if (!this.dev) { 431 try { 432 const clockResponse = await fetch("https://aesthetic.computer/api/clock"); 433 if (clockResponse.ok) { 434 const serverTimeISO = await clockResponse.text(); 435 when = new Date(serverTimeISO); 436 } 437 } catch (err) { 438 console.log("💬 Clock fetch failed, using local time"); 439 } 440 } 441 442 // Store in MongoDB (production only, non-muted users) 443 let insertedId; 444 if (!this.dev && !userIsMuted && this.db) { 445 const dbmsg = { 446 user: message.sub, 447 text: message.text, 448 when, 449 font: message.font || "font_1", // 🔤 Store user's font preference 450 }; 451 const collection = this.db.collection(instance.config.name); 452 await collection.createIndex({ when: 1 }); 453 const result = await collection.insertOne(dbmsg); 454 insertedId = result.insertedId?.toString(); 455 console.log("💬 Message stored"); 456 } 457 458 const out = { 459 from: handle, 460 text: filteredText, 461 redactedText: message.redactedText, 462 when, 463 sub: fromSub, 464 font: message.font || "font_1", // 🔤 Include font in broadcast 465 }; 466 if (insertedId) out.id = insertedId; 467 468 // Duplicate detection 469 const lastMsg = instance.messages[instance.messages.length - 1]; 470 if (lastMsg && lastMsg.sub === out.sub && lastMsg.text === out.text && !lastMsg.count) { 471 lastMsg.count = (lastMsg.count || 1) + 1; 472 this.broadcast(instance, this.pack("message:update", { index: instance.messages.length - 1, count: lastMsg.count })); 473 } else { 474 instance.messages.push(out); 475 if (instance.messages.length > MAX_MESSAGES) instance.messages.shift(); 476 this.broadcast(instance, this.pack("message", out)); 477 } 478 479 if (handle && handle.startsWith("@")) { 480 const compactText = `${filteredText || ""}`.replace(/\s+/g, " ").trim(); 481 const preview = 482 compactText.length > 80 ? `${compactText.slice(0, 77)}...` : compactText; 483 484 this.emitActivity({ 485 handle, 486 event: { 487 type: "chat", 488 when: when?.getTime?.() || Date.now(), 489 label: `Chat ${instance.config.name.replace("chat-", "")}: ${preview}`, 490 piece: instance.config.name, 491 ref: insertedId || null, 492 text: compactText, 493 }, 494 countsDelta: { chats: 1 }, 495 }); 496 } 497 498 // Push notification (production only, non-muted, chat-system and chat-clock only) 499 // Note: chat-sotce is intentionally excluded from push notifications 500 if (!this.dev && !userIsMuted && instance.config.name !== "chat-sotce") { 501 this.notify(instance, handle, filteredText, when); 502 } 503 } catch (err) { 504 console.error("💬 Message handling error:", err); 505 } 506 } 507 508 async handleDeleteMessage(instance, ws, id, msg) { 509 const { token, sub, id: messageId } = msg.content; 510 console.log( 511 `💬 [${instance.config.name}] Delete request from ${sub} for message ${messageId}` 512 ); 513 514 if (!messageId) { 515 ws.send(this.pack("error", { message: "Missing message id." })); 516 return; 517 } 518 519 // Authorize the user 520 let authorized; 521 if (instance.authorizedConnections[id]?.token === token) { 522 authorized = instance.authorizedConnections[id].user; 523 } else { 524 authorized = await this.authorize(instance, token); 525 if (authorized) { 526 instance.authorizedConnections[id] = { token, user: authorized }; 527 } 528 } 529 530 if (!authorized || authorized.sub !== sub) { 531 ws.send(this.pack("unauthorized", { message: "Please login." }, id)); 532 return; 533 } 534 535 // Find the message in memory and verify ownership 536 const message = instance.messages.find((m) => m.id === messageId); 537 if (!message) { 538 ws.send(this.pack("error", { message: "Message not found." })); 539 return; 540 } 541 if (message.sub !== sub) { 542 ws.send(this.pack("error", { message: "You can only delete your own messages." })); 543 return; 544 } 545 546 // Already deleted 547 if (message.deleted) return; 548 549 // Soft-delete in MongoDB 550 if (this.db) { 551 try { 552 const collection = this.db.collection(instance.config.name); 553 await collection.updateOne( 554 { _id: new ObjectId(messageId) }, 555 { $set: { deleted: true } } 556 ); 557 console.log("💬 Message soft-deleted in DB"); 558 } catch (err) { 559 console.error("💬 Failed to delete message in DB:", err); 560 } 561 } 562 563 // Update in-memory 564 message.text = "[deleted]"; 565 message.deleted = true; 566 delete message.redactedText; 567 568 // Broadcast deletion to all clients 569 this.broadcast(instance, this.pack("message:delete", { id: messageId })); 570 } 571 572 async handleChatHeart(instance, ws, id, msg) { 573 const { for: forId, token } = msg.content || {}; 574 if (!forId || !token) return; 575 576 // Reuse cached auth or re-authorize 577 let authorized; 578 if (instance.authorizedConnections[id]?.token === token) { 579 authorized = instance.authorizedConnections[id].user; 580 } else { 581 authorized = await this.authorize(instance, token); 582 if (authorized) { 583 instance.authorizedConnections[id] = { token, user: authorized }; 584 } 585 } 586 587 if (!authorized) { 588 ws.send(this.pack("unauthorized", { message: "Please login." }, id)); 589 return; 590 } 591 592 if (!this.db) return; 593 594 try { 595 const { hearted, count } = await toggleHeart(this.db, { 596 user: authorized.sub, 597 type: instance.config.name, 598 for: forId, 599 }); 600 console.log(`💬 [${instance.config.name}] Heart ${hearted ? "+" : "-"} on ${forId}${count}`); 601 this.broadcast(instance, this.pack("message:hearts", { for: forId, count })); 602 } catch (err) { 603 console.error("💬 Heart error:", err); 604 } 605 } 606 607 async authorizeDeviceToken(instance, handle, token) { 608 // AC device tokens are "hmac.timestamp" — validate by looking up handle in MongoDB 609 if (!token || !token.includes(".") || !handle) return undefined; 610 try { 611 const cleanHandle = handle.replace("@", ""); 612 const doc = await this.db.collection("@handles").findOne({ handle: cleanHandle }); 613 if (doc && doc._id) { 614 console.log("💬 Device token authorized for @" + cleanHandle + " (sub: " + doc._id + ")"); 615 return { sub: doc._id }; 616 } 617 console.log("💬 Device token: handle @" + cleanHandle + " not found in DB"); 618 return undefined; 619 } catch (err) { 620 console.error("💬 Device token auth error:", err); 621 return undefined; 622 } 623 } 624 625 async authorize(instance, token) { 626 try { 627 const response = await fetch(instance.config.userInfoEndpoint, { 628 headers: { 629 Authorization: "Bearer " + token, 630 "Content-Type": "application/json", 631 }, 632 }); 633 634 if (response.status === 200) { 635 return response.json(); 636 } 637 return undefined; 638 } catch (err) { 639 console.error("💬 Authorization error:", err); 640 return undefined; 641 } 642 } 643 644 async getHandleFromSub(instance, fromSub) { 645 if (instance.subsToHandles[fromSub]) { 646 return "@" + instance.subsToHandles[fromSub]; 647 } 648 649 try { 650 let prefix = instance.config.name === "chat-sotce" ? "sotce-" : ""; 651 let host = this.dev ? "https://localhost:8888" : 652 (instance.config.name === "chat-sotce" ? "https://sotce.net" : "https://aesthetic.computer"); 653 654 const options = {}; 655 if (this.dev) options.agent = this.agent; 656 657 const response = await fetch(`${host}/handle?for=${prefix}${fromSub}`, options); 658 if (response.status === 200) { 659 const data = await response.json(); 660 instance.subsToHandles[fromSub] = data.handle; 661 return "@" + data.handle; 662 } 663 } catch (err) { 664 console.error("💬 Handle lookup error:", err); 665 } 666 667 return "@unknown"; 668 } 669 670 async checkSubscription(instance, sub, token) { 671 if (instance.subsToSubscribers[sub] !== undefined) { 672 return instance.subsToSubscribers[sub]; 673 } 674 675 const host = this.dev ? "https://localhost:8888" : "https://sotce.net"; 676 const options = { 677 method: "POST", 678 body: JSON.stringify({ retrieve: "subscription" }), 679 headers: { 680 Authorization: "Bearer " + token, 681 "Content-Type": "application/json", 682 }, 683 }; 684 685 if (this.dev) options.agent = this.agent; 686 687 try { 688 const response = await fetch(`${host}/sotce-net/subscribed`, options); 689 if (response.status === 200) { 690 const data = await response.json(); 691 instance.subsToSubscribers[sub] = data.subscribed; 692 return data.subscribed; 693 } 694 } catch (err) { 695 console.error("💬 Subscription check error:", err); 696 } 697 698 instance.subsToSubscribers[sub] = false; 699 return false; 700 } 701 702 async isMuted(instance, sub) { 703 if (!sub || !this.db) return false; 704 try { 705 const mutesCollection = this.db.collection(instance.config.name + "-mutes"); 706 const mute = await mutesCollection.findOne({ user: sub }); 707 return !!mute; 708 } catch (err) { 709 return false; 710 } 711 } 712 713 notify(instance, handle, text, when) { 714 let title = handle + " 💬"; 715 716 if (instance.config.name === "chat-clock") { 717 const getClockEmoji = (date) => { 718 let hour = date.getHours() % 12 || 12; 719 const minutes = date.getMinutes(); 720 const emojiCode = minutes < 30 ? (0x1f550 + hour - 1) : (0x1f55c + hour - 1); 721 return String.fromCodePoint(emojiCode); 722 }; 723 title = handle + " " + getClockEmoji(when); 724 } 725 726 try { 727 getMessaging().send({ 728 notification: { title, body: text }, 729 apns: { 730 payload: { 731 aps: { 732 "mutable-content": 1, 733 "interruption-level": "time-sensitive", 734 priority: 10, 735 "content-available": 1, 736 }, 737 }, 738 headers: { 739 "apns-priority": "10", 740 "apns-push-type": "alert", 741 "apns-expiration": "0", 742 }, 743 }, 744 webpush: { 745 headers: { 746 Urgency: "high", 747 TTL: "0", 748 image: "https://aesthetic.computer/api/logo.png", 749 }, 750 }, 751 topic: instance.config.topic, 752 data: { piece: "chat" }, 753 }).then((response) => { 754 console.log("💬 Notification sent:", response); 755 }).catch((err) => { 756 console.log("💬 Notification error:", err); 757 }); 758 } catch (err) { 759 console.error("💬 Notification error:", err); 760 } 761 } 762 763 // Handle HTTP log endpoint 764 async handleLog(instance, body, authHeader) { 765 const token = authHeader?.split(" ")[1]; 766 if (token !== this.loggerKey) { 767 return { status: 403, body: { status: "error", message: "Forbidden" } }; 768 } 769 770 try { 771 const parsed = typeof body === "string" ? JSON.parse(body) : body; 772 console.log(`💬 [${instance.config.name}] Log received from: ${parsed.from || 'unknown'}`); 773 774 instance.messages.push(parsed); 775 if (instance.messages.length > MAX_MESSAGES) instance.messages.shift(); 776 777 // Handle actions (mute/unmute, handle updates) 778 if (parsed.action) { 779 await this.handleLogAction(instance, parsed); 780 } 781 782 this.broadcast(instance, this.pack("message", parsed)); 783 784 if (instance.config.name === "chat-system") { 785 this.notify(instance, "log 🪵", parsed.text, new Date()); 786 } 787 788 return { status: 200, body: { status: "success", message: "Log received" } }; 789 } catch (err) { 790 return { status: 400, body: { status: "error", message: "Malformed log JSON" } }; 791 } 792 } 793 794 async handleLogAction(instance, parsed) { 795 let [object, behavior] = (parsed.action || "").split(":"); 796 if (!behavior) { 797 behavior = object; 798 object = null; 799 } 800 801 if (object === "chat-system" && (behavior === "mute" || behavior === "unmute")) { 802 const user = parsed.users[0]; 803 if (behavior === "mute") { 804 instance.messages.forEach((msg) => { 805 if (msg.sub === user) redact(msg); 806 }); 807 } else { 808 instance.messages.forEach((msg) => { 809 if (msg.sub === user) unredact(msg); 810 }); 811 } 812 this.broadcast(instance, this.pack(parsed.action, { user })); 813 } 814 815 if (object === "handle") { 816 if (behavior === "colors") { 817 // Broadcast handle color changes to all connected clients. 818 const data = JSON.parse(parsed.value); 819 this.broadcast(instance, this.pack("handle:colors", { user: parsed.users[0], handle: data.handle, colors: data.colors })); 820 } else { 821 instance.subsToHandles[parsed.users[0]] = parsed.value; 822 823 if (behavior === "update" || behavior === "strip") { 824 const from = behavior === "update" ? "@" + parsed.value : "nohandle"; 825 instance.messages.forEach((msg) => { 826 if (msg.sub === parsed.users[0]) { 827 msg.from = from; 828 } 829 }); 830 this.broadcast(instance, this.pack(parsed.action, { user: parsed.users[0], handle: from })); 831 } 832 } 833 } 834 } 835 836 pack(type, content, id) { 837 if (typeof content === "object") content = JSON.stringify(content); 838 return JSON.stringify({ type, content, id }); 839 } 840 841 broadcast(instance, message) { 842 Object.values(instance.connections).forEach((c) => { 843 if (c?.readyState === WebSocket.OPEN) c.send(message); 844 }); 845 } 846 847 broadcastOthers(instance, exclude, message) { 848 Object.values(instance.connections).forEach((c) => { 849 if (c !== exclude && c?.readyState === WebSocket.OPEN) c.send(message); 850 }); 851 } 852 853 // Set the presence resolver function (called from session.mjs) 854 // This allows us to query which users are on a specific piece 855 setPresenceResolver(resolver) { 856 this.presenceResolver = resolver; 857 } 858 859 // Get list of online handles for an instance (all connected & authorized) 860 getOnlineHandles(instance) { 861 const handles = []; 862 for (const [id, auth] of Object.entries(instance.authorizedConnections)) { 863 if (auth.handle && instance.connections[id]) { 864 handles.push(auth.handle); 865 } 866 } 867 return [...new Set(handles)]; // Remove duplicates 868 } 869 870 // Get handles of users actually viewing the chat piece right now 871 getHereHandles(instance) { 872 if (!this.presenceResolver) return []; 873 // Get all users on "chat" piece from session server 874 const onChatPiece = this.presenceResolver("chat"); 875 // Intersect with authorized handles for this chat instance 876 const onlineHandles = this.getOnlineHandles(instance); 877 return onChatPiece.filter(h => onlineHandles.includes(h)); 878 } 879 880 // Broadcast presence data (online + here) to all clients 881 broadcastOnlineHandles(instance) { 882 const online = this.getOnlineHandles(instance); 883 const here = this.getHereHandles(instance); 884 // Send both for backwards compatibility and new "here" feature 885 this.broadcast(instance, this.pack("presence", { 886 handles: online, // backwards compat with old "online-handles" 887 online, // all authorized chat connections 888 here // only those actually on chat piece 889 })); 890 } 891 892 // Get status for a specific instance or all 893 getStatus(host = null) { 894 if (host) { 895 const instance = this.getInstance(host); 896 if (!instance) return null; 897 return { 898 name: instance.config.name, 899 connections: Object.keys(instance.connections).length, 900 messages: instance.messages.length, 901 }; 902 } 903 904 return Object.entries(this.instances).map(([host, instance]) => ({ 905 host, 906 name: instance.config.name, 907 connections: Object.keys(instance.connections).length, 908 messages: instance.messages.length, 909 })); 910 } 911 912 // Get recent messages for a specific instance (for dashboard) 913 getRecentMessages(host, count = 10) { 914 const instance = this.getInstance(host); 915 if (!instance) return []; 916 917 // Return the most recent messages, but don't expose sensitive data 918 return instance.messages.slice(-count).map(msg => ({ 919 from: msg.from || 'unknown', 920 text: msg.text || '', 921 when: msg.when 922 })).reverse(); // Most recent first 923 } 924 925 // Persist any in-memory messages that weren't saved to MongoDB. 926 // Messages received while this.db was null (broken env) have a `sub` field 927 // but were never inserted. Messages loaded from DB on startup do not have `sub`. 928 async persistAllMessages() { 929 // Establish a connection if we don't have one 930 if (!this.db && this.mongoConnectionString) { 931 try { 932 console.log("💬 Connecting to MongoDB for message persistence..."); 933 this.mongoClient = new MongoClient(this.mongoConnectionString); 934 await this.mongoClient.connect(); 935 this.db = this.mongoClient.db(this.mongoDbName); 936 console.log("💬 MongoDB connected for persistence!"); 937 } catch (err) { 938 console.error("💬 Failed to connect to MongoDB for persistence:", err); 939 return 0; 940 } 941 } 942 943 if (!this.db) { 944 console.error("💬 No MongoDB connection available, cannot persist messages"); 945 return 0; 946 } 947 948 let totalPersisted = 0; 949 950 for (const [, instance] of Object.entries(this.instances)) { 951 const collectionName = instance.config.name; 952 // Only persist messages that were never saved to DB. 953 // DB-loaded and successfully-stored messages have an `id` (from MongoDB _id). 954 // Messages received while DB was unavailable have `sub` but no `id`. 955 const unpersisted = instance.messages.filter(msg => msg.sub && !msg.id); 956 957 if (unpersisted.length === 0) { 958 console.log(`💬 [${collectionName}] No unpersisted messages`); 959 continue; 960 } 961 962 try { 963 const collection = this.db.collection(collectionName); 964 const docs = unpersisted.map(msg => ({ 965 user: msg.sub, 966 text: msg.text, 967 when: msg.when, 968 font: msg.font || "font_1", 969 })); 970 971 const result = await collection.insertMany(docs, { ordered: false }); 972 totalPersisted += result.insertedCount; 973 console.log(`💬 [${collectionName}] Persisted ${result.insertedCount} messages`); 974 } catch (err) { 975 // insertMany with ordered:false continues on duplicate errors 976 if (err.insertedCount) totalPersisted += err.insertedCount; 977 console.error(`💬 [${collectionName}] Persistence error:`, err.message); 978 } 979 } 980 981 return totalPersisted; 982 } 983 984 async shutdown() { 985 console.log("💬 ChatManager shutting down..."); 986 const count = await this.persistAllMessages(); 987 console.log(`💬 Persisted ${count} total messages`); 988 989 if (this.mongoClient) { 990 await this.mongoClient.close(); 991 console.log("💬 MongoDB connection closed"); 992 } 993 } 994}