Openstatus www.openstatus.dev
at main 515 lines 15 kB view raw
1import { TRPCError } from "@trpc/server"; 2import { z } from "zod"; 3 4import { type SQL, and, count, db, eq, inArray } from "@openstatus/db"; 5import { 6 NotificationDataSchema, 7 googleChatDataSchema, 8 insertNotificationSchema, 9 monitor, 10 notification, 11 notificationProvider, 12 notificationsToMonitors, 13 selectMonitorSchema, 14 selectNotificationSchema, 15 telegramDataSchema, 16 whatsappDataSchema, 17} from "@openstatus/db/src/schema"; 18 19import { Events } from "@openstatus/analytics"; 20import { SchemaError } from "@openstatus/error"; 21import { sendTest as sendGoogleChatTest } from "@openstatus/notification-google-chat"; 22import { sendTest as sendTelegramTest } from "@openstatus/notification-telegram"; 23import { sendTest as sendWhatsAppTest } from "@openstatus/notification-twillio-whatsapp"; 24 25import { createTRPCRouter, protectedProcedure } from "../trpc"; 26 27export const notificationRouter = createTRPCRouter({ 28 create: protectedProcedure 29 .meta({ track: Events.CreateNotification, trackProps: ["provider"] }) 30 .input(insertNotificationSchema) 31 .mutation(async (opts) => { 32 const { monitors, ...props } = opts.input; 33 34 const notificationLimit = 35 opts.ctx.workspace.limits["notification-channels"]; 36 37 const notificationNumber = ( 38 await opts.ctx.db.query.notification.findMany({ 39 where: eq(notification.workspaceId, opts.ctx.workspace.id), 40 }) 41 ).length; 42 43 // the user has reached the limits 44 if (notificationNumber >= notificationLimit) { 45 throw new TRPCError({ 46 code: "FORBIDDEN", 47 message: "You reached your notification limits.", 48 }); 49 } 50 51 const limitedProviders = ["sms", "pagerduty", "opsgenie"]; 52 if (limitedProviders.includes(props.provider)) { 53 const isAllowed = 54 opts.ctx.workspace.limits[ 55 props.provider as "sms" | "pagerduty" | "opsgenie" 56 ]; 57 58 if (!isAllowed) { 59 throw new TRPCError({ 60 code: "FORBIDDEN", 61 message: "Upgrade to use the notification channel.", 62 }); 63 } 64 } 65 66 const _data = NotificationDataSchema.safeParse(JSON.parse(props.data)); 67 68 if (!_data.success) { 69 throw new TRPCError({ 70 code: "BAD_REQUEST", 71 message: SchemaError.fromZod(_data.error, props).message, 72 }); 73 } 74 75 const _notification = await opts.ctx.db 76 .insert(notification) 77 .values({ ...props, workspaceId: opts.ctx.workspace.id }) 78 .returning() 79 .get(); 80 81 const values = monitors.map((monitorId) => ({ 82 notificationId: _notification.id, 83 monitorId, 84 })); 85 86 if (values.length) { 87 await opts.ctx.db.insert(notificationsToMonitors).values(values); 88 } 89 90 return _notification; 91 }), 92 93 update: protectedProcedure 94 .meta({ track: Events.UpdateNotification }) 95 .input(insertNotificationSchema) 96 .mutation(async (opts) => { 97 if (!opts.input.id) return; 98 99 const { monitors, ...props } = opts.input; 100 101 const _data = NotificationDataSchema.safeParse(JSON.parse(props.data)); 102 103 if (!_data.success) { 104 throw new TRPCError({ 105 code: "BAD_REQUEST", 106 message: SchemaError.fromZod(_data.error, props).message, 107 }); 108 } 109 110 const currentNotification = await opts.ctx.db 111 .update(notification) 112 .set({ ...props, updatedAt: new Date() }) 113 .where( 114 and( 115 eq(notification.id, opts.input.id), 116 eq(notification.workspaceId, opts.ctx.workspace.id), 117 ), 118 ) 119 .returning() 120 .get(); 121 122 // TODO: relation 123 124 if (monitors.length) { 125 const allMonitors = await opts.ctx.db.query.monitor.findMany({ 126 where: and( 127 eq(monitor.workspaceId, opts.ctx.workspace.id), 128 inArray(monitor.id, monitors), 129 ), 130 }); 131 132 if (allMonitors.length !== monitors.length) { 133 throw new TRPCError({ 134 code: "FORBIDDEN", 135 message: "You don't have access to all the monitors.", 136 }); 137 } 138 } 139 140 const currentMonitorsToNotifications = await opts.ctx.db 141 .select() 142 .from(notificationsToMonitors) 143 .where( 144 eq(notificationsToMonitors.notificationId, currentNotification.id), 145 ) 146 .all(); 147 148 const removedMonitors = currentMonitorsToNotifications 149 .map(({ monitorId }) => monitorId) 150 .filter((x) => !monitors?.includes(x)); 151 152 if (removedMonitors.length) { 153 await opts.ctx.db 154 .delete(notificationsToMonitors) 155 .where( 156 and( 157 inArray(notificationsToMonitors.monitorId, removedMonitors), 158 eq( 159 notificationsToMonitors.notificationId, 160 currentNotification.id, 161 ), 162 ), 163 ); 164 } 165 166 const values = monitors.map((monitorId) => ({ 167 notificationId: currentNotification.id, 168 monitorId, 169 })); 170 171 if (values.length) { 172 await opts.ctx.db 173 .insert(notificationsToMonitors) 174 .values(values) 175 .onConflictDoNothing(); 176 } 177 178 return currentNotification; 179 }), 180 181 deleteNotification: protectedProcedure 182 .meta({ track: Events.DeleteNotification }) 183 .input(z.object({ id: z.number() })) 184 .mutation(async (opts) => { 185 await opts.ctx.db 186 .delete(notification) 187 .where( 188 and( 189 eq(notification.id, opts.input.id), 190 eq(notification.id, opts.input.id), 191 ), 192 ) 193 .run(); 194 }), 195 196 getNotificationById: protectedProcedure 197 .input(z.object({ id: z.number() })) 198 .query(async (opts) => { 199 const _notification = await opts.ctx.db.query.notification.findFirst({ 200 where: and( 201 eq(notification.id, opts.input.id), 202 eq(notification.id, opts.input.id), 203 eq(notification.workspaceId, opts.ctx.workspace.id), 204 ), 205 // FIXME: plural 206 with: { monitor: { with: { monitor: true } } }, 207 }); 208 209 const schema = selectNotificationSchema.extend({ 210 monitor: z.array( 211 z.object({ 212 monitor: selectMonitorSchema, 213 }), 214 ), 215 }); 216 217 return schema.parse(_notification); 218 }), 219 220 getNotificationsByWorkspace: protectedProcedure.query(async (opts) => { 221 const notifications = await opts.ctx.db.query.notification.findMany({ 222 where: and(eq(notification.workspaceId, opts.ctx.workspace.id)), 223 with: { 224 // FIXME: first should be plurals! 225 monitor: { with: { monitor: true } }, 226 }, 227 }); 228 229 const schema = selectNotificationSchema.extend({ 230 monitor: z.array( 231 z.object({ 232 monitor: selectMonitorSchema, 233 }), 234 ), 235 }); 236 237 return z.array(schema).parse(notifications); 238 }), 239 240 isNotificationLimitReached: protectedProcedure.query(async (opts) => { 241 const notificationLimit = 242 opts.ctx.workspace.limits["notification-channels"]; 243 const notificationNumbers = ( 244 await opts.ctx.db.query.notification.findMany({ 245 where: eq(notification.workspaceId, opts.ctx.workspace.id), 246 }) 247 ).length; 248 249 return notificationNumbers >= notificationLimit; 250 }), 251 252 list: protectedProcedure.query(async (opts) => { 253 const notifications = await opts.ctx.db.query.notification.findMany({ 254 where: eq(notification.workspaceId, opts.ctx.workspace.id), 255 with: { 256 monitor: { 257 with: { 258 monitor: true, 259 }, 260 }, 261 }, 262 }); 263 264 return selectNotificationSchema 265 .extend({ 266 monitors: selectMonitorSchema.array(), 267 }) 268 .array() 269 .parse( 270 notifications.map((notification) => ({ 271 ...notification, 272 monitors: notification.monitor.map(({ monitor }) => monitor), 273 })), 274 ); 275 }), 276 277 // TODO: rename to update after migration 278 updateNotifier: protectedProcedure 279 .meta({ track: Events.UpdateNotification }) 280 .input( 281 z.object({ 282 id: z.number(), 283 name: z.string(), 284 data: z.partialRecord( 285 z.enum(notificationProvider), 286 z.string().or(z.record(z.string(), z.string())), 287 ), 288 monitors: z.array(z.number()), 289 }), 290 ) 291 .mutation(async (opts) => { 292 console.log(opts.input); 293 const whereCondition: SQL[] = [ 294 eq(notification.id, opts.input.id), 295 eq(notification.workspaceId, opts.ctx.workspace.id), 296 ]; 297 298 const allMonitors = await opts.ctx.db.query.monitor.findMany({ 299 where: and( 300 eq(monitor.workspaceId, opts.ctx.workspace.id), 301 inArray(monitor.id, opts.input.monitors), 302 ), 303 }); 304 305 if (allMonitors.length !== opts.input.monitors.length) { 306 throw new TRPCError({ 307 code: "FORBIDDEN", 308 message: "You don't have access to all the monitors.", 309 }); 310 } 311 312 const _data = NotificationDataSchema.safeParse(opts.input.data); 313 314 if (!_data.success) { 315 throw new TRPCError({ 316 code: "BAD_REQUEST", 317 message: SchemaError.fromZod(_data.error, opts.input).message, 318 }); 319 } 320 321 await db.transaction(async (tx) => { 322 await tx 323 .update(notification) 324 .set({ 325 name: opts.input.name, 326 data: JSON.stringify(opts.input.data), 327 updatedAt: new Date(), 328 }) 329 .where(and(...whereCondition)); 330 331 await tx 332 .delete(notificationsToMonitors) 333 .where( 334 and(eq(notificationsToMonitors.notificationId, opts.input.id)), 335 ); 336 337 if (opts.input.monitors.length) { 338 await tx.insert(notificationsToMonitors).values( 339 opts.input.monitors.map((monitorId) => ({ 340 notificationId: opts.input.id, 341 monitorId, 342 })), 343 ); 344 } 345 }); 346 }), 347 348 new: protectedProcedure 349 .meta({ track: Events.CreateNotification, trackProps: ["provider"] }) 350 .input( 351 z.object({ 352 provider: z.enum(notificationProvider), 353 data: z.partialRecord( 354 z.enum(notificationProvider), 355 z.record(z.string(), z.string()).or(z.string()), 356 ), 357 name: z.string(), 358 monitors: z.array(z.number()).prefault([]), 359 }), 360 ) 361 .mutation(async (opts) => { 362 const limits = opts.ctx.workspace.limits; 363 364 const res = await opts.ctx.db 365 .select({ count: count() }) 366 .from(notification) 367 .where(eq(notification.workspaceId, opts.ctx.workspace.id)) 368 .get(); 369 370 // the user has reached the limits 371 if (res && res.count >= limits["notification-channels"]) { 372 throw new TRPCError({ 373 code: "FORBIDDEN", 374 message: "You reached your notification limits.", 375 }); 376 } 377 378 const allMonitors = await opts.ctx.db.query.monitor.findMany({ 379 where: and( 380 eq(monitor.workspaceId, opts.ctx.workspace.id), 381 inArray(monitor.id, opts.input.monitors), 382 ), 383 }); 384 385 if (allMonitors.length !== opts.input.monitors.length) { 386 throw new TRPCError({ 387 code: "FORBIDDEN", 388 message: "You don't have access to all the monitors.", 389 }); 390 } 391 392 const limitedProviders = ["sms", "pagerduty", "opsgenie"] as const; 393 // biome-ignore lint/suspicious/noExplicitAny: <explanation> 394 if (limitedProviders.includes(opts.input.provider as any)) { 395 const isAllowed = 396 opts.ctx.workspace.limits[ 397 opts.input.provider as "sms" | "pagerduty" | "opsgenie" 398 ]; 399 400 if (!isAllowed) { 401 throw new TRPCError({ 402 code: "FORBIDDEN", 403 message: "Upgrade to use the notification channel.", 404 }); 405 } 406 } 407 408 const _data = NotificationDataSchema.safeParse(opts.input.data); 409 410 if (!_data.success) { 411 throw new TRPCError({ 412 code: "BAD_REQUEST", 413 message: SchemaError.fromZod(_data.error, opts.input).message, 414 }); 415 } 416 417 const _notification = await db.transaction(async (tx) => { 418 const _notification = await opts.ctx.db 419 .insert(notification) 420 .values({ 421 name: opts.input.name, 422 provider: opts.input.provider, 423 data: JSON.stringify(opts.input.data), 424 workspaceId: opts.ctx.workspace.id, 425 }) 426 .returning() 427 .get(); 428 429 if (opts.input.monitors.length) { 430 await tx.insert(notificationsToMonitors).values( 431 opts.input.monitors.map((monitorId) => ({ 432 notificationId: _notification.id, 433 monitorId, 434 })), 435 ); 436 } 437 438 return _notification; 439 }); 440 441 return _notification; 442 }), 443 444 delete: protectedProcedure 445 .meta({ track: Events.DeleteNotification }) 446 .input(z.object({ id: z.number() })) 447 .mutation(async (opts) => { 448 await opts.ctx.db 449 .delete(notification) 450 .where( 451 and( 452 eq(notification.id, opts.input.id), 453 eq(notification.workspaceId, opts.ctx.workspace.id), 454 ), 455 ) 456 .run(); 457 }), 458 459 sendTest: protectedProcedure 460 .input( 461 z.object({ 462 provider: z.enum(notificationProvider), 463 data: z.partialRecord( 464 z.enum(notificationProvider), 465 z.record(z.string(), z.string()).or(z.string()), 466 ), 467 }), 468 ) 469 .mutation(async (opts) => { 470 if (opts.input.provider === "telegram") { 471 const _data = telegramDataSchema.safeParse(opts.input.data); 472 if (!_data.success) { 473 throw new TRPCError({ 474 code: "BAD_REQUEST", 475 message: SchemaError.fromZod(_data.error, opts.input).message, 476 }); 477 } 478 await sendTelegramTest({ 479 chatId: _data.data.telegram.chatId, 480 }); 481 482 return; 483 } 484 if (opts.input.provider === "whatsapp") { 485 const _data = whatsappDataSchema.safeParse(opts.input.data); 486 if (!_data.success) { 487 throw new TRPCError({ 488 code: "BAD_REQUEST", 489 message: SchemaError.fromZod(_data.error, opts.input).message, 490 }); 491 } 492 await sendWhatsAppTest({ phoneNumber: _data.data.whatsapp }); 493 494 return; 495 } 496 if (opts.input.provider === "google-chat") { 497 const _data = googleChatDataSchema.safeParse(opts.input.data); 498 console.log(opts.input.data); 499 console.log(_data); 500 if (!_data.success) { 501 throw new TRPCError({ 502 code: "BAD_REQUEST", 503 message: SchemaError.fromZod(_data.error, opts.input).message, 504 }); 505 } 506 await sendGoogleChatTest(_data.data["google-chat"]); 507 return; 508 } 509 510 throw new TRPCError({ 511 code: "BAD_REQUEST", 512 message: "Invalid provider", 513 }); 514 }), 515});