Openstatus www.openstatus.dev
at main 294 lines 8.6 kB view raw
1import { and, count, db, eq, gte, inArray, schema } from "@openstatus/db"; 2import type { Incident, MonitorStatus } from "@openstatus/db/src/schema"; 3import { 4 selectMonitorSchema, 5 selectNotificationSchema, 6 selectWorkspaceSchema, 7} from "@openstatus/db/src/schema"; 8 9import { getLogger } from "@logtape/logtape"; 10import type { Region } from "@openstatus/db/src/schema/constants"; 11import { Effect, Schedule } from "effect"; 12import { checkerAudit } from "../utils/audit-log"; 13import { providerToFunction } from "./utils"; 14 15const logger = getLogger("workflow"); 16 17export const triggerNotifications = async ({ 18 monitorId, 19 statusCode, 20 message, 21 notifType, 22 cronTimestamp, 23 incidentId, 24 regions, 25 latency, 26}: { 27 monitorId: string; 28 statusCode?: number; 29 message?: string; 30 notifType: "alert" | "recovery" | "degraded"; 31 cronTimestamp: number; 32 incidentId?: number; 33 regions?: string[]; 34 latency?: number; 35}) => { 36 logger.info("Triggering alerting", { 37 monitor_id: monitorId, 38 notification_type: notifType, 39 }); 40 41 let incident: Incident | undefined; 42 if (incidentId) { 43 try { 44 incident = await db.query.incidentTable.findFirst({ 45 where: eq(schema.incidentTable.id, incidentId), 46 }); 47 } catch (err) { 48 logger.warn("Failed to fetch incident data", { 49 incident_id: incidentId, 50 error_message: err instanceof Error ? err.message : String(err), 51 }); 52 } 53 } 54 55 const notifications = await db 56 .select() 57 .from(schema.notificationsToMonitors) 58 .innerJoin( 59 schema.notification, 60 eq(schema.notification.id, schema.notificationsToMonitors.notificationId), 61 ) 62 .innerJoin( 63 schema.monitor, 64 eq(schema.monitor.id, schema.notificationsToMonitors.monitorId), 65 ) 66 .where(eq(schema.monitor.id, Number(monitorId))) 67 .all(); 68 for (const notif of notifications) { 69 // for sms check we are in the quota 70 if (notif.notification.provider === "sms") { 71 if (notif.notification.workspaceId === null) { 72 continue; 73 } 74 75 const workspace = await db 76 .select() 77 .from(schema.workspace) 78 .where(eq(schema.workspace.id, notif.notification.workspaceId)); 79 80 if (workspace.length !== 1) { 81 continue; 82 } 83 84 const data = selectWorkspaceSchema.parse(workspace[0]); 85 86 const oneMonthAgo = new Date(); 87 oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1); 88 89 const smsNotification = await db 90 .select() 91 .from(schema.notification) 92 .where( 93 and( 94 eq(schema.notification.workspaceId, notif.notification.workspaceId), 95 eq(schema.notification.provider, "sms"), 96 ), 97 ); 98 const ids = smsNotification.map((notification) => notification.id); 99 100 const smsSent = await db 101 .select({ count: count() }) 102 .from(schema.notificationTrigger) 103 .where( 104 and( 105 gte( 106 schema.notificationTrigger.cronTimestamp, 107 Math.floor(oneMonthAgo.getTime() / 1000), 108 ), 109 inArray(schema.notificationTrigger.notificationId, ids), 110 ), 111 ) 112 .all(); 113 114 if ((smsSent[0]?.count ?? 0) > data.limits["sms-limit"]) { 115 logger.warn( 116 `SMS quota exceeded for workspace ${notif.notification.workspaceId}`, 117 ); 118 continue; 119 } 120 } 121 logger.info("Sending notification", { 122 monitor_id: monitorId, 123 provider: notif.notification.provider, 124 notification_type: notifType, 125 notification_id: notif.notification.id, 126 }); 127 const monitor = selectMonitorSchema.parse(notif.monitor); 128 try { 129 await insertNotificationTrigger({ 130 monitorId: monitor.id, 131 notificationId: notif.notification.id, 132 cronTimestamp: cronTimestamp, 133 }); 134 } catch (_e) { 135 logger.error("notification trigger already exists dont send again"); 136 continue; 137 } 138 switch (notifType) { 139 case "alert": 140 const alertResult = Effect.tryPromise({ 141 try: () => 142 providerToFunction[notif.notification.provider].sendAlert({ 143 monitor, 144 notification: selectNotificationSchema.parse(notif.notification), 145 statusCode, 146 message, 147 incident, 148 cronTimestamp, 149 regions, 150 latency, 151 }), 152 153 catch: (_unknown) => 154 new Error( 155 `Failed sending notification via ${notif.notification.provider} for monitor ${monitorId}`, 156 ), 157 }).pipe( 158 Effect.retry({ 159 times: 3, 160 schedule: Schedule.exponential("1000 millis"), 161 }), 162 ); 163 await Effect.runPromise(alertResult).catch((err) => 164 logger.error("Failed to send alert notification", { 165 monitor_id: monitorId, 166 provider: notif.notification.provider, 167 error_message: err instanceof Error ? err.message : String(err), 168 }), 169 ); 170 break; 171 case "recovery": 172 const recoveryResult = Effect.tryPromise({ 173 try: () => 174 providerToFunction[notif.notification.provider].sendRecovery({ 175 monitor, 176 notification: selectNotificationSchema.parse(notif.notification), 177 statusCode, 178 message, 179 incident, 180 cronTimestamp, 181 regions, 182 latency, 183 }), 184 catch: (_unknown) => 185 new Error( 186 `Failed sending notification via ${notif.notification.provider} for monitor ${monitorId}`, 187 ), 188 }).pipe( 189 Effect.retry({ 190 times: 3, 191 schedule: Schedule.exponential("1000 millis"), 192 }), 193 ); 194 await Effect.runPromise(recoveryResult).catch((err) => 195 logger.error("Failed to send recovery notification", { 196 monitor_id: monitorId, 197 provider: notif.notification.provider, 198 error_message: err instanceof Error ? err.message : String(err), 199 }), 200 ); 201 break; 202 case "degraded": 203 const degradedResult = Effect.tryPromise({ 204 try: () => 205 providerToFunction[notif.notification.provider].sendDegraded({ 206 monitor, 207 notification: selectNotificationSchema.parse(notif.notification), 208 statusCode, 209 message, 210 incident, 211 cronTimestamp, 212 regions, 213 latency, 214 }), 215 catch: (_unknown) => 216 new Error( 217 `Failed sending notification via ${notif.notification.provider} for monitor ${monitorId}`, 218 ), 219 }).pipe( 220 Effect.retry({ 221 times: 3, 222 schedule: Schedule.exponential("1000 millis"), 223 }), 224 ); 225 await Effect.runPromise(degradedResult).catch((err) => 226 logger.error("Failed to send degraded notification", { 227 monitor_id: monitorId, 228 provider: notif.notification.provider, 229 error_message: err instanceof Error ? err.message : String(err), 230 }), 231 ); 232 break; 233 } 234 // ALPHA 235 await checkerAudit.publishAuditLog({ 236 id: `monitor:${monitorId}`, 237 action: "notification.sent", 238 targets: [{ id: monitorId, type: "monitor" }], 239 metadata: { 240 provider: notif.notification.provider, 241 cronTimestamp, 242 type: notifType, 243 notificationId: notif.notification.id, 244 }, 245 }); 246 } 247}; 248 249const insertNotificationTrigger = async ({ 250 monitorId, 251 notificationId, 252 cronTimestamp, 253}: { 254 monitorId: number; 255 notificationId: number; 256 cronTimestamp: number; 257}) => { 258 await db 259 .insert(schema.notificationTrigger) 260 .values({ 261 monitorId: Number(monitorId), 262 notificationId: notificationId, 263 cronTimestamp: cronTimestamp, 264 }) 265 .returning(); 266}; 267 268export const upsertMonitorStatus = async ({ 269 monitorId, 270 status, 271 region, 272}: { 273 monitorId: string; 274 status: MonitorStatus; 275 region: Region; 276}) => { 277 const newData = await db 278 .insert(schema.monitorStatusTable) 279 .values({ status, region, monitorId: Number(monitorId) }) 280 .onConflictDoUpdate({ 281 target: [ 282 schema.monitorStatusTable.monitorId, 283 schema.monitorStatusTable.region, 284 ], 285 set: { status, updatedAt: new Date() }, 286 }) 287 .returning(); 288 logger.debug("Upserted monitor status", { 289 monitor_id: monitorId, 290 region, 291 status, 292 updated_at: newData[0]?.updatedAt, 293 }); 294};