Openstatus www.openstatus.dev
at main 345 lines 8.9 kB view raw
1import { Hono } from "hono"; 2import { z } from "zod"; 3 4import { and, db, eq, inArray, isNull, schema } from "@openstatus/db"; 5import { incidentTable } from "@openstatus/db/src/schema"; 6import { 7 monitorStatusSchema, 8 selectMonitorSchema, 9} from "@openstatus/db/src/schema/monitors/validation"; 10 11import { getLogger } from "@logtape/logtape"; 12import { monitorRegions } from "@openstatus/db/src/schema/constants"; 13import { env } from "../env"; 14import type { Env } from "../index"; 15import { checkerAudit } from "../utils/audit-log"; 16import { triggerNotifications, upsertMonitorStatus } from "./alerting"; 17 18export const checkerRoute = new Hono<Env>(); 19 20const payloadSchema = z.object({ 21 monitorId: z.string(), 22 message: z.string().optional(), 23 statusCode: z.number().optional(), 24 region: z.enum(monitorRegions), 25 cronTimestamp: z.number(), 26 status: monitorStatusSchema, 27 latency: z.number().optional(), 28}); 29 30const logger = getLogger(["workflow"]); 31 32/** 33 * Finds an open incident (not resolved and not acknowledged) for the given monitor. 34 */ 35async function findOpenIncident(monitorId: number) { 36 return db 37 .select() 38 .from(incidentTable) 39 .where( 40 and( 41 eq(incidentTable.monitorId, monitorId), 42 isNull(incidentTable.resolvedAt), 43 ), 44 ) 45 .get(); 46} 47 48/** 49 * Resolves an open incident by setting resolvedAt and autoResolved flag. 50 */ 51async function resolveIncident(params: { 52 monitorId: string; 53 cronTimestamp: number; 54}) { 55 const { monitorId, cronTimestamp } = params; 56 const incident = await findOpenIncident(Number(monitorId)); 57 58 if (!incident || incident.resolvedAt) { 59 return null; 60 } 61 62 logger.info("Recovering incident", { 63 incident_id: incident.id, 64 monitor_id: monitorId, 65 }); 66 67 await db 68 .update(incidentTable) 69 .set({ 70 resolvedAt: new Date(cronTimestamp), 71 autoResolved: true, 72 }) 73 .where(eq(incidentTable.id, incident.id)) 74 .run(); 75 76 await checkerAudit.publishAuditLog({ 77 id: `monitor:${monitorId}`, 78 action: "incident.resolved", 79 targets: [{ id: monitorId, type: "monitor" }], 80 metadata: { cronTimestamp, incidentId: incident.id }, 81 }); 82 83 return incident; 84} 85 86checkerRoute.post("/updateStatus", async (c) => { 87 const auth = c.req.header("Authorization"); 88 if (auth !== `Basic ${env().CRON_SECRET}`) { 89 logger.error("Unauthorized"); 90 return c.text("Unauthorized", 401); 91 } 92 93 const event = c.get("event"); 94 const json = await c.req.json(); 95 96 const result = payloadSchema.safeParse(json); 97 98 if (!result.success) { 99 return c.text("Unprocessable Entity", 422); 100 } 101 event.status_update = { 102 status: result.data.status, 103 message: result.data.message, 104 region: result.data.region, 105 status_code: result.data.statusCode, 106 cron_timestamp: result.data.cronTimestamp, 107 latency_ms: result.data.latency, 108 }; 109 110 const { 111 monitorId, 112 message, 113 region, 114 statusCode, 115 cronTimestamp, 116 status, 117 latency, 118 } = result.data; 119 120 logger.info("Updating monitor status", { 121 monitor_id: monitorId, 122 region, 123 status, 124 status_code: statusCode, 125 cron_timestamp: cronTimestamp, 126 latency_ms: latency, 127 }); 128 129 // First we upsert the monitor status 130 await upsertMonitorStatus({ 131 monitorId: monitorId, 132 status, 133 region: region, 134 }); 135 136 const currentMonitor = await db 137 .select() 138 .from(schema.monitor) 139 .where(eq(schema.monitor.id, Number(monitorId))) 140 .get(); 141 142 const monitor = selectMonitorSchema.parse(currentMonitor); 143 const numberOfRegions = monitor.regions.length; 144 145 // Fetch all affected regions for notifications (single query) 146 const affectedRegions = await db 147 .select({ region: schema.monitorStatusTable.region }) 148 .from(schema.monitorStatusTable) 149 .where( 150 and( 151 eq(schema.monitorStatusTable.monitorId, monitor.id), 152 eq(schema.monitorStatusTable.status, status), 153 inArray(schema.monitorStatusTable.region, monitor.regions), 154 ), 155 ) 156 .all(); 157 158 const affectedRegionsList = affectedRegions.map((r) => r.region); 159 const affectedRegionCount = affectedRegionsList.length; 160 161 if (affectedRegionCount === 0) { 162 return c.json({ success: true }, 200); 163 } 164 165 // audit log the current state of the ping 166 167 switch (status) { 168 case "active": 169 await checkerAudit.publishAuditLog({ 170 id: `monitor:${monitorId}`, 171 action: "monitor.recovered", 172 targets: [{ id: monitorId, type: "monitor" }], 173 metadata: { 174 region, 175 statusCode: statusCode ?? -1, 176 cronTimestamp, 177 latency, 178 }, 179 }); 180 break; 181 case "degraded": 182 await checkerAudit.publishAuditLog({ 183 id: `monitor:${monitorId}`, 184 action: "monitor.degraded", 185 targets: [{ id: monitorId, type: "monitor" }], 186 metadata: { 187 region, 188 statusCode: statusCode ?? -1, 189 cronTimestamp, 190 latency, 191 }, 192 }); 193 break; 194 case "error": 195 await checkerAudit.publishAuditLog({ 196 id: `monitor:${monitorId}`, 197 action: "monitor.failed", 198 targets: [{ id: monitorId, type: "monitor" }], 199 metadata: { 200 region, 201 statusCode: statusCode ?? -1, 202 message, 203 cronTimestamp, 204 latency, 205 }, 206 }); 207 break; 208 } 209 210 if (affectedRegionCount >= numberOfRegions / 2 || numberOfRegions === 1) { 211 switch (status) { 212 case "active": { 213 if (monitor.status === "active") { 214 break; 215 } 216 217 logger.info("Monitor status changed to active", { 218 monitor_id: monitor.id, 219 workspace_id: monitor.workspaceId, 220 }); 221 await db 222 .update(schema.monitor) 223 .set({ status: "active" }) 224 .where(eq(schema.monitor.id, monitor.id)); 225 226 let incident = null; 227 if (monitor.status === "error") { 228 incident = await resolveIncident({ monitorId, cronTimestamp }); 229 } 230 231 await triggerNotifications({ 232 monitorId, 233 statusCode, 234 message, 235 notifType: "recovery", 236 cronTimestamp, 237 regions: affectedRegionsList, 238 latency, 239 incidentId: incident?.id, 240 }); 241 242 break; 243 } 244 case "degraded": 245 if (monitor.status === "degraded") { 246 break; 247 } 248 249 logger.info("Monitor status changed to degraded", { 250 monitor_id: monitor.id, 251 workspace_id: monitor.workspaceId, 252 }); 253 254 await db 255 .update(schema.monitor) 256 .set({ status: "degraded" }) 257 .where(eq(schema.monitor.id, monitor.id)); 258 259 let incident = null; 260 if (monitor.status === "error") { 261 incident = await resolveIncident({ 262 monitorId, 263 cronTimestamp, 264 }); 265 } 266 267 await triggerNotifications({ 268 monitorId, 269 statusCode, 270 message, 271 notifType: "degraded", 272 cronTimestamp, 273 latency, 274 regions: affectedRegionsList, 275 incidentId: incident?.id, 276 }); 277 278 break; 279 case "error": 280 if (monitor.status === "error") { 281 break; 282 } 283 284 logger.info("Monitor status changed to error", { 285 monitor_id: monitor.id, 286 workspace_id: monitor.workspaceId, 287 }); 288 289 await db 290 .update(schema.monitor) 291 .set({ status: "error" }) 292 .where(eq(schema.monitor.id, monitor.id)); 293 294 try { 295 const existingIncident = await findOpenIncident(Number(monitorId)); 296 if (existingIncident) { 297 logger.info("Already in incident", { 298 incident_id: existingIncident.id, 299 }); 300 break; 301 } 302 303 const [newIncident] = await db 304 .insert(incidentTable) 305 .values({ 306 monitorId: Number(monitorId), 307 workspaceId: monitor.workspaceId, 308 startedAt: new Date(cronTimestamp), 309 }) 310 .returning(); 311 312 if (!newIncident?.id) { 313 break; 314 } 315 316 await checkerAudit.publishAuditLog({ 317 id: `monitor:${monitorId}`, 318 action: "incident.created", 319 targets: [{ id: monitorId, type: "monitor" }], 320 metadata: { cronTimestamp, incidentId: newIncident.id }, 321 }); 322 323 await triggerNotifications({ 324 monitorId, 325 statusCode, 326 message, 327 notifType: "alert", 328 cronTimestamp, 329 latency, 330 regions: affectedRegionsList, 331 incidentId: newIncident.id, 332 }); 333 } catch (error) { 334 logger.warning("Failed to create incident", { error }); 335 } 336 337 break; 338 default: 339 logger.error("should not happen"); 340 break; 341 } 342 } 343 344 return c.text("Ok", 200); 345});