Openstatus
www.openstatus.dev
1import { and, count, db, eq, gte, inArray, schema } from "@openstatus/db";
2import type { Incident, MonitorStatus } from "@openstatus/db/src/schema";
3import {
4 selectMonitorSchema,
5 selectNotificationSchema,
6 selectWorkspaceSchema,
7} from "@openstatus/db/src/schema";
8
9import { getLogger } from "@logtape/logtape";
10import type { Region } from "@openstatus/db/src/schema/constants";
11import { Effect, Schedule } from "effect";
12import { checkerAudit } from "../utils/audit-log";
13import { providerToFunction } from "./utils";
14
15const logger = getLogger("workflow");
16
17export const triggerNotifications = async ({
18 monitorId,
19 statusCode,
20 message,
21 notifType,
22 cronTimestamp,
23 incidentId,
24 regions,
25 latency,
26}: {
27 monitorId: string;
28 statusCode?: number;
29 message?: string;
30 notifType: "alert" | "recovery" | "degraded";
31 cronTimestamp: number;
32 incidentId?: number;
33 regions?: string[];
34 latency?: number;
35}) => {
36 logger.info("Triggering alerting", {
37 monitor_id: monitorId,
38 notification_type: notifType,
39 });
40
41 let incident: Incident | undefined;
42 if (incidentId) {
43 try {
44 incident = await db.query.incidentTable.findFirst({
45 where: eq(schema.incidentTable.id, incidentId),
46 });
47 } catch (err) {
48 logger.warn("Failed to fetch incident data", {
49 incident_id: incidentId,
50 error_message: err instanceof Error ? err.message : String(err),
51 });
52 }
53 }
54
55 const notifications = await db
56 .select()
57 .from(schema.notificationsToMonitors)
58 .innerJoin(
59 schema.notification,
60 eq(schema.notification.id, schema.notificationsToMonitors.notificationId),
61 )
62 .innerJoin(
63 schema.monitor,
64 eq(schema.monitor.id, schema.notificationsToMonitors.monitorId),
65 )
66 .where(eq(schema.monitor.id, Number(monitorId)))
67 .all();
68 for (const notif of notifications) {
69 // for sms check we are in the quota
70 if (notif.notification.provider === "sms") {
71 if (notif.notification.workspaceId === null) {
72 continue;
73 }
74
75 const workspace = await db
76 .select()
77 .from(schema.workspace)
78 .where(eq(schema.workspace.id, notif.notification.workspaceId));
79
80 if (workspace.length !== 1) {
81 continue;
82 }
83
84 const data = selectWorkspaceSchema.parse(workspace[0]);
85
86 const oneMonthAgo = new Date();
87 oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1);
88
89 const smsNotification = await db
90 .select()
91 .from(schema.notification)
92 .where(
93 and(
94 eq(schema.notification.workspaceId, notif.notification.workspaceId),
95 eq(schema.notification.provider, "sms"),
96 ),
97 );
98 const ids = smsNotification.map((notification) => notification.id);
99
100 const smsSent = await db
101 .select({ count: count() })
102 .from(schema.notificationTrigger)
103 .where(
104 and(
105 gte(
106 schema.notificationTrigger.cronTimestamp,
107 Math.floor(oneMonthAgo.getTime() / 1000),
108 ),
109 inArray(schema.notificationTrigger.notificationId, ids),
110 ),
111 )
112 .all();
113
114 if ((smsSent[0]?.count ?? 0) > data.limits["sms-limit"]) {
115 logger.warn(
116 `SMS quota exceeded for workspace ${notif.notification.workspaceId}`,
117 );
118 continue;
119 }
120 }
121 logger.info("Sending notification", {
122 monitor_id: monitorId,
123 provider: notif.notification.provider,
124 notification_type: notifType,
125 notification_id: notif.notification.id,
126 });
127 const monitor = selectMonitorSchema.parse(notif.monitor);
128 try {
129 await insertNotificationTrigger({
130 monitorId: monitor.id,
131 notificationId: notif.notification.id,
132 cronTimestamp: cronTimestamp,
133 });
134 } catch (_e) {
135 logger.error("notification trigger already exists dont send again");
136 continue;
137 }
138 switch (notifType) {
139 case "alert":
140 const alertResult = Effect.tryPromise({
141 try: () =>
142 providerToFunction[notif.notification.provider].sendAlert({
143 monitor,
144 notification: selectNotificationSchema.parse(notif.notification),
145 statusCode,
146 message,
147 incident,
148 cronTimestamp,
149 regions,
150 latency,
151 }),
152
153 catch: (_unknown) =>
154 new Error(
155 `Failed sending notification via ${notif.notification.provider} for monitor ${monitorId}`,
156 ),
157 }).pipe(
158 Effect.retry({
159 times: 3,
160 schedule: Schedule.exponential("1000 millis"),
161 }),
162 );
163 await Effect.runPromise(alertResult).catch((err) =>
164 logger.error("Failed to send alert notification", {
165 monitor_id: monitorId,
166 provider: notif.notification.provider,
167 error_message: err instanceof Error ? err.message : String(err),
168 }),
169 );
170 break;
171 case "recovery":
172 const recoveryResult = Effect.tryPromise({
173 try: () =>
174 providerToFunction[notif.notification.provider].sendRecovery({
175 monitor,
176 notification: selectNotificationSchema.parse(notif.notification),
177 statusCode,
178 message,
179 incident,
180 cronTimestamp,
181 regions,
182 latency,
183 }),
184 catch: (_unknown) =>
185 new Error(
186 `Failed sending notification via ${notif.notification.provider} for monitor ${monitorId}`,
187 ),
188 }).pipe(
189 Effect.retry({
190 times: 3,
191 schedule: Schedule.exponential("1000 millis"),
192 }),
193 );
194 await Effect.runPromise(recoveryResult).catch((err) =>
195 logger.error("Failed to send recovery notification", {
196 monitor_id: monitorId,
197 provider: notif.notification.provider,
198 error_message: err instanceof Error ? err.message : String(err),
199 }),
200 );
201 break;
202 case "degraded":
203 const degradedResult = Effect.tryPromise({
204 try: () =>
205 providerToFunction[notif.notification.provider].sendDegraded({
206 monitor,
207 notification: selectNotificationSchema.parse(notif.notification),
208 statusCode,
209 message,
210 incident,
211 cronTimestamp,
212 regions,
213 latency,
214 }),
215 catch: (_unknown) =>
216 new Error(
217 `Failed sending notification via ${notif.notification.provider} for monitor ${monitorId}`,
218 ),
219 }).pipe(
220 Effect.retry({
221 times: 3,
222 schedule: Schedule.exponential("1000 millis"),
223 }),
224 );
225 await Effect.runPromise(degradedResult).catch((err) =>
226 logger.error("Failed to send degraded notification", {
227 monitor_id: monitorId,
228 provider: notif.notification.provider,
229 error_message: err instanceof Error ? err.message : String(err),
230 }),
231 );
232 break;
233 }
234 // ALPHA
235 await checkerAudit.publishAuditLog({
236 id: `monitor:${monitorId}`,
237 action: "notification.sent",
238 targets: [{ id: monitorId, type: "monitor" }],
239 metadata: {
240 provider: notif.notification.provider,
241 cronTimestamp,
242 type: notifType,
243 notificationId: notif.notification.id,
244 },
245 });
246 }
247};
248
249const insertNotificationTrigger = async ({
250 monitorId,
251 notificationId,
252 cronTimestamp,
253}: {
254 monitorId: number;
255 notificationId: number;
256 cronTimestamp: number;
257}) => {
258 await db
259 .insert(schema.notificationTrigger)
260 .values({
261 monitorId: Number(monitorId),
262 notificationId: notificationId,
263 cronTimestamp: cronTimestamp,
264 })
265 .returning();
266};
267
268export const upsertMonitorStatus = async ({
269 monitorId,
270 status,
271 region,
272}: {
273 monitorId: string;
274 status: MonitorStatus;
275 region: Region;
276}) => {
277 const newData = await db
278 .insert(schema.monitorStatusTable)
279 .values({ status, region, monitorId: Number(monitorId) })
280 .onConflictDoUpdate({
281 target: [
282 schema.monitorStatusTable.monitorId,
283 schema.monitorStatusTable.region,
284 ],
285 set: { status, updatedAt: new Date() },
286 })
287 .returning();
288 logger.debug("Upserted monitor status", {
289 monitor_id: monitorId,
290 region,
291 status,
292 updated_at: newData[0]?.updatedAt,
293 });
294};