Openstatus
www.openstatus.dev
1import { TRPCError } from "@trpc/server";
2import { z } from "zod";
3
4import { type SQL, and, count, db, eq, inArray } from "@openstatus/db";
5import {
6 NotificationDataSchema,
7 googleChatDataSchema,
8 insertNotificationSchema,
9 monitor,
10 notification,
11 notificationProvider,
12 notificationsToMonitors,
13 selectMonitorSchema,
14 selectNotificationSchema,
15 telegramDataSchema,
16 whatsappDataSchema,
17} from "@openstatus/db/src/schema";
18
19import { Events } from "@openstatus/analytics";
20import { SchemaError } from "@openstatus/error";
21import { sendTest as sendGoogleChatTest } from "@openstatus/notification-google-chat";
22import { sendTest as sendTelegramTest } from "@openstatus/notification-telegram";
23import { sendTest as sendWhatsAppTest } from "@openstatus/notification-twillio-whatsapp";
24
25import { createTRPCRouter, protectedProcedure } from "../trpc";
26
27export const notificationRouter = createTRPCRouter({
28 create: protectedProcedure
29 .meta({ track: Events.CreateNotification, trackProps: ["provider"] })
30 .input(insertNotificationSchema)
31 .mutation(async (opts) => {
32 const { monitors, ...props } = opts.input;
33
34 const notificationLimit =
35 opts.ctx.workspace.limits["notification-channels"];
36
37 const notificationNumber = (
38 await opts.ctx.db.query.notification.findMany({
39 where: eq(notification.workspaceId, opts.ctx.workspace.id),
40 })
41 ).length;
42
43 // the user has reached the limits
44 if (notificationNumber >= notificationLimit) {
45 throw new TRPCError({
46 code: "FORBIDDEN",
47 message: "You reached your notification limits.",
48 });
49 }
50
51 const limitedProviders = ["sms", "pagerduty", "opsgenie"];
52 if (limitedProviders.includes(props.provider)) {
53 const isAllowed =
54 opts.ctx.workspace.limits[
55 props.provider as "sms" | "pagerduty" | "opsgenie"
56 ];
57
58 if (!isAllowed) {
59 throw new TRPCError({
60 code: "FORBIDDEN",
61 message: "Upgrade to use the notification channel.",
62 });
63 }
64 }
65
66 const _data = NotificationDataSchema.safeParse(JSON.parse(props.data));
67
68 if (!_data.success) {
69 throw new TRPCError({
70 code: "BAD_REQUEST",
71 message: SchemaError.fromZod(_data.error, props).message,
72 });
73 }
74
75 const _notification = await opts.ctx.db
76 .insert(notification)
77 .values({ ...props, workspaceId: opts.ctx.workspace.id })
78 .returning()
79 .get();
80
81 const values = monitors.map((monitorId) => ({
82 notificationId: _notification.id,
83 monitorId,
84 }));
85
86 if (values.length) {
87 await opts.ctx.db.insert(notificationsToMonitors).values(values);
88 }
89
90 return _notification;
91 }),
92
93 update: protectedProcedure
94 .meta({ track: Events.UpdateNotification })
95 .input(insertNotificationSchema)
96 .mutation(async (opts) => {
97 if (!opts.input.id) return;
98
99 const { monitors, ...props } = opts.input;
100
101 const _data = NotificationDataSchema.safeParse(JSON.parse(props.data));
102
103 if (!_data.success) {
104 throw new TRPCError({
105 code: "BAD_REQUEST",
106 message: SchemaError.fromZod(_data.error, props).message,
107 });
108 }
109
110 const currentNotification = await opts.ctx.db
111 .update(notification)
112 .set({ ...props, updatedAt: new Date() })
113 .where(
114 and(
115 eq(notification.id, opts.input.id),
116 eq(notification.workspaceId, opts.ctx.workspace.id),
117 ),
118 )
119 .returning()
120 .get();
121
122 // TODO: relation
123
124 if (monitors.length) {
125 const allMonitors = await opts.ctx.db.query.monitor.findMany({
126 where: and(
127 eq(monitor.workspaceId, opts.ctx.workspace.id),
128 inArray(monitor.id, monitors),
129 ),
130 });
131
132 if (allMonitors.length !== monitors.length) {
133 throw new TRPCError({
134 code: "FORBIDDEN",
135 message: "You don't have access to all the monitors.",
136 });
137 }
138 }
139
140 const currentMonitorsToNotifications = await opts.ctx.db
141 .select()
142 .from(notificationsToMonitors)
143 .where(
144 eq(notificationsToMonitors.notificationId, currentNotification.id),
145 )
146 .all();
147
148 const removedMonitors = currentMonitorsToNotifications
149 .map(({ monitorId }) => monitorId)
150 .filter((x) => !monitors?.includes(x));
151
152 if (removedMonitors.length) {
153 await opts.ctx.db
154 .delete(notificationsToMonitors)
155 .where(
156 and(
157 inArray(notificationsToMonitors.monitorId, removedMonitors),
158 eq(
159 notificationsToMonitors.notificationId,
160 currentNotification.id,
161 ),
162 ),
163 );
164 }
165
166 const values = monitors.map((monitorId) => ({
167 notificationId: currentNotification.id,
168 monitorId,
169 }));
170
171 if (values.length) {
172 await opts.ctx.db
173 .insert(notificationsToMonitors)
174 .values(values)
175 .onConflictDoNothing();
176 }
177
178 return currentNotification;
179 }),
180
181 deleteNotification: protectedProcedure
182 .meta({ track: Events.DeleteNotification })
183 .input(z.object({ id: z.number() }))
184 .mutation(async (opts) => {
185 await opts.ctx.db
186 .delete(notification)
187 .where(
188 and(
189 eq(notification.id, opts.input.id),
190 eq(notification.id, opts.input.id),
191 ),
192 )
193 .run();
194 }),
195
196 getNotificationById: protectedProcedure
197 .input(z.object({ id: z.number() }))
198 .query(async (opts) => {
199 const _notification = await opts.ctx.db.query.notification.findFirst({
200 where: and(
201 eq(notification.id, opts.input.id),
202 eq(notification.id, opts.input.id),
203 eq(notification.workspaceId, opts.ctx.workspace.id),
204 ),
205 // FIXME: plural
206 with: { monitor: { with: { monitor: true } } },
207 });
208
209 const schema = selectNotificationSchema.extend({
210 monitor: z.array(
211 z.object({
212 monitor: selectMonitorSchema,
213 }),
214 ),
215 });
216
217 return schema.parse(_notification);
218 }),
219
220 getNotificationsByWorkspace: protectedProcedure.query(async (opts) => {
221 const notifications = await opts.ctx.db.query.notification.findMany({
222 where: and(eq(notification.workspaceId, opts.ctx.workspace.id)),
223 with: {
224 // FIXME: first should be plurals!
225 monitor: { with: { monitor: true } },
226 },
227 });
228
229 const schema = selectNotificationSchema.extend({
230 monitor: z.array(
231 z.object({
232 monitor: selectMonitorSchema,
233 }),
234 ),
235 });
236
237 return z.array(schema).parse(notifications);
238 }),
239
240 isNotificationLimitReached: protectedProcedure.query(async (opts) => {
241 const notificationLimit =
242 opts.ctx.workspace.limits["notification-channels"];
243 const notificationNumbers = (
244 await opts.ctx.db.query.notification.findMany({
245 where: eq(notification.workspaceId, opts.ctx.workspace.id),
246 })
247 ).length;
248
249 return notificationNumbers >= notificationLimit;
250 }),
251
252 list: protectedProcedure.query(async (opts) => {
253 const notifications = await opts.ctx.db.query.notification.findMany({
254 where: eq(notification.workspaceId, opts.ctx.workspace.id),
255 with: {
256 monitor: {
257 with: {
258 monitor: true,
259 },
260 },
261 },
262 });
263
264 return selectNotificationSchema
265 .extend({
266 monitors: selectMonitorSchema.array(),
267 })
268 .array()
269 .parse(
270 notifications.map((notification) => ({
271 ...notification,
272 monitors: notification.monitor.map(({ monitor }) => monitor),
273 })),
274 );
275 }),
276
277 // TODO: rename to update after migration
278 updateNotifier: protectedProcedure
279 .meta({ track: Events.UpdateNotification })
280 .input(
281 z.object({
282 id: z.number(),
283 name: z.string(),
284 data: z.partialRecord(
285 z.enum(notificationProvider),
286 z.string().or(z.record(z.string(), z.string())),
287 ),
288 monitors: z.array(z.number()),
289 }),
290 )
291 .mutation(async (opts) => {
292 console.log(opts.input);
293 const whereCondition: SQL[] = [
294 eq(notification.id, opts.input.id),
295 eq(notification.workspaceId, opts.ctx.workspace.id),
296 ];
297
298 const allMonitors = await opts.ctx.db.query.monitor.findMany({
299 where: and(
300 eq(monitor.workspaceId, opts.ctx.workspace.id),
301 inArray(monitor.id, opts.input.monitors),
302 ),
303 });
304
305 if (allMonitors.length !== opts.input.monitors.length) {
306 throw new TRPCError({
307 code: "FORBIDDEN",
308 message: "You don't have access to all the monitors.",
309 });
310 }
311
312 const _data = NotificationDataSchema.safeParse(opts.input.data);
313
314 if (!_data.success) {
315 throw new TRPCError({
316 code: "BAD_REQUEST",
317 message: SchemaError.fromZod(_data.error, opts.input).message,
318 });
319 }
320
321 await db.transaction(async (tx) => {
322 await tx
323 .update(notification)
324 .set({
325 name: opts.input.name,
326 data: JSON.stringify(opts.input.data),
327 updatedAt: new Date(),
328 })
329 .where(and(...whereCondition));
330
331 await tx
332 .delete(notificationsToMonitors)
333 .where(
334 and(eq(notificationsToMonitors.notificationId, opts.input.id)),
335 );
336
337 if (opts.input.monitors.length) {
338 await tx.insert(notificationsToMonitors).values(
339 opts.input.monitors.map((monitorId) => ({
340 notificationId: opts.input.id,
341 monitorId,
342 })),
343 );
344 }
345 });
346 }),
347
348 new: protectedProcedure
349 .meta({ track: Events.CreateNotification, trackProps: ["provider"] })
350 .input(
351 z.object({
352 provider: z.enum(notificationProvider),
353 data: z.partialRecord(
354 z.enum(notificationProvider),
355 z.record(z.string(), z.string()).or(z.string()),
356 ),
357 name: z.string(),
358 monitors: z.array(z.number()).prefault([]),
359 }),
360 )
361 .mutation(async (opts) => {
362 const limits = opts.ctx.workspace.limits;
363
364 const res = await opts.ctx.db
365 .select({ count: count() })
366 .from(notification)
367 .where(eq(notification.workspaceId, opts.ctx.workspace.id))
368 .get();
369
370 // the user has reached the limits
371 if (res && res.count >= limits["notification-channels"]) {
372 throw new TRPCError({
373 code: "FORBIDDEN",
374 message: "You reached your notification limits.",
375 });
376 }
377
378 const allMonitors = await opts.ctx.db.query.monitor.findMany({
379 where: and(
380 eq(monitor.workspaceId, opts.ctx.workspace.id),
381 inArray(monitor.id, opts.input.monitors),
382 ),
383 });
384
385 if (allMonitors.length !== opts.input.monitors.length) {
386 throw new TRPCError({
387 code: "FORBIDDEN",
388 message: "You don't have access to all the monitors.",
389 });
390 }
391
392 const limitedProviders = ["sms", "pagerduty", "opsgenie"] as const;
393 // biome-ignore lint/suspicious/noExplicitAny: <explanation>
394 if (limitedProviders.includes(opts.input.provider as any)) {
395 const isAllowed =
396 opts.ctx.workspace.limits[
397 opts.input.provider as "sms" | "pagerduty" | "opsgenie"
398 ];
399
400 if (!isAllowed) {
401 throw new TRPCError({
402 code: "FORBIDDEN",
403 message: "Upgrade to use the notification channel.",
404 });
405 }
406 }
407
408 const _data = NotificationDataSchema.safeParse(opts.input.data);
409
410 if (!_data.success) {
411 throw new TRPCError({
412 code: "BAD_REQUEST",
413 message: SchemaError.fromZod(_data.error, opts.input).message,
414 });
415 }
416
417 const _notification = await db.transaction(async (tx) => {
418 const _notification = await opts.ctx.db
419 .insert(notification)
420 .values({
421 name: opts.input.name,
422 provider: opts.input.provider,
423 data: JSON.stringify(opts.input.data),
424 workspaceId: opts.ctx.workspace.id,
425 })
426 .returning()
427 .get();
428
429 if (opts.input.monitors.length) {
430 await tx.insert(notificationsToMonitors).values(
431 opts.input.monitors.map((monitorId) => ({
432 notificationId: _notification.id,
433 monitorId,
434 })),
435 );
436 }
437
438 return _notification;
439 });
440
441 return _notification;
442 }),
443
444 delete: protectedProcedure
445 .meta({ track: Events.DeleteNotification })
446 .input(z.object({ id: z.number() }))
447 .mutation(async (opts) => {
448 await opts.ctx.db
449 .delete(notification)
450 .where(
451 and(
452 eq(notification.id, opts.input.id),
453 eq(notification.workspaceId, opts.ctx.workspace.id),
454 ),
455 )
456 .run();
457 }),
458
459 sendTest: protectedProcedure
460 .input(
461 z.object({
462 provider: z.enum(notificationProvider),
463 data: z.partialRecord(
464 z.enum(notificationProvider),
465 z.record(z.string(), z.string()).or(z.string()),
466 ),
467 }),
468 )
469 .mutation(async (opts) => {
470 if (opts.input.provider === "telegram") {
471 const _data = telegramDataSchema.safeParse(opts.input.data);
472 if (!_data.success) {
473 throw new TRPCError({
474 code: "BAD_REQUEST",
475 message: SchemaError.fromZod(_data.error, opts.input).message,
476 });
477 }
478 await sendTelegramTest({
479 chatId: _data.data.telegram.chatId,
480 });
481
482 return;
483 }
484 if (opts.input.provider === "whatsapp") {
485 const _data = whatsappDataSchema.safeParse(opts.input.data);
486 if (!_data.success) {
487 throw new TRPCError({
488 code: "BAD_REQUEST",
489 message: SchemaError.fromZod(_data.error, opts.input).message,
490 });
491 }
492 await sendWhatsAppTest({ phoneNumber: _data.data.whatsapp });
493
494 return;
495 }
496 if (opts.input.provider === "google-chat") {
497 const _data = googleChatDataSchema.safeParse(opts.input.data);
498 console.log(opts.input.data);
499 console.log(_data);
500 if (!_data.success) {
501 throw new TRPCError({
502 code: "BAD_REQUEST",
503 message: SchemaError.fromZod(_data.error, opts.input).message,
504 });
505 }
506 await sendGoogleChatTest(_data.data["google-chat"]);
507 return;
508 }
509
510 throw new TRPCError({
511 code: "BAD_REQUEST",
512 message: "Invalid provider",
513 });
514 }),
515});