Openstatus www.openstatus.dev
at main 285 lines 8.2 kB view raw
1import { CloudTasksClient } from "@google-cloud/tasks"; 2import type { google } from "@google-cloud/tasks/build/protos/protos"; 3import type { NextRequest } from "next/server"; 4import { z } from "zod"; 5 6import { and, db, eq, gte, lte, notInArray } from "@openstatus/db"; 7import type { MonitorStatus } from "@openstatus/db/src/schema"; 8import { 9 maintenance, 10 maintenancesToMonitors, 11 monitor, 12 monitorStatusTable, 13 selectMonitorSchema, 14 selectMonitorStatusSchema, 15} from "@openstatus/db/src/schema"; 16 17import { env } from "@/env"; 18import type { Region } from "@openstatus/db/src/schema/constants"; 19import { regionDict } from "@openstatus/regions"; 20import { 21 type httpPayloadSchema, 22 type tpcPayloadSchema, 23 transformHeaders, 24} from "@openstatus/utils"; 25 26const periodicityAvailable = selectMonitorSchema.pick({ periodicity: true }); 27 28// FIXME: do coerce in zod instead 29 30const DEFAULT_URL = process.env.VERCEL_URL 31 ? `https://${process.env.VERCEL_URL}` 32 : "http://localhost:3000"; 33 34// We can't secure cron endpoint by vercel thus we should make sure they are called by the generated url 35export const isAuthorizedDomain = (url: string) => { 36 return url.includes(DEFAULT_URL); 37}; 38 39export const cron = async ({ 40 periodicity, 41 // biome-ignore lint/correctness/noUnusedVariables: <explanation> 42 req, 43}: z.infer<typeof periodicityAvailable> & { req: NextRequest }) => { 44 const client = new CloudTasksClient({ 45 projectId: env.GCP_PROJECT_ID, 46 credentials: { 47 client_email: process.env.GCP_CLIENT_EMAIL, 48 private_key: env.GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), 49 }, 50 }); 51 const parent = client.queuePath( 52 env.GCP_PROJECT_ID, 53 env.GCP_LOCATION, 54 periodicity, 55 ); 56 57 const timestamp = Date.now(); 58 59 const currentMaintenance = db 60 .select({ id: maintenance.id }) 61 .from(maintenance) 62 .where( 63 and(lte(maintenance.from, new Date()), gte(maintenance.to, new Date())), 64 ) 65 .as("currentMaintenance"); 66 67 const currentMaintenanceMonitors = db 68 .select({ id: maintenancesToMonitors.monitorId }) 69 .from(maintenancesToMonitors) 70 .innerJoin( 71 currentMaintenance, 72 eq(maintenancesToMonitors.maintenanceId, currentMaintenance.id), 73 ); 74 75 const result = await db 76 .select() 77 .from(monitor) 78 .where( 79 and( 80 eq(monitor.periodicity, periodicity), 81 eq(monitor.active, true), 82 notInArray(monitor.id, currentMaintenanceMonitors), 83 ), 84 ) 85 .all(); 86 87 console.log(`Start cron for ${periodicity}`); 88 89 const monitors = z.array(selectMonitorSchema).safeParse(result); 90 const allResult = []; 91 if (!monitors.success) { 92 console.error( 93 `Error while fetching the monitors ${monitors.error.issues.map((issue) => issue.message).join(", ")}`, 94 ); 95 throw new Error("Error while fetching the monitors"); 96 } 97 98 for (const row of monitors.data) { 99 // const selectedRegions = row.regions.length > 0 ? row.regions : ["ams"]; 100 101 const result = await db 102 .select() 103 .from(monitorStatusTable) 104 .where(eq(monitorStatusTable.monitorId, row.id)) 105 .all(); 106 const monitorStatus = z.array(selectMonitorStatusSchema).safeParse(result); 107 if (!monitorStatus.success) { 108 console.error( 109 `Error while fetching the monitor status ${monitorStatus.error.issues.map((issue) => issue.message).join(", ")}`, 110 ); 111 continue; 112 } 113 114 for (const region of row.regions) { 115 const status = 116 monitorStatus.data.find((m) => region === m.region)?.status || "active"; 117 118 const r = regionDict[region as keyof typeof regionDict]; 119 120 if (!r) { 121 console.error(`Invalid region ${region}`); 122 continue; 123 } 124 if (r.deprecated) { 125 // Let's uncomment this when we are ready to remove deprecated regions 126 // We should not use deprecated regions anymore 127 console.error(`Deprecated region ${region}`); 128 continue; 129 } 130 const response = createCronTask({ 131 row, 132 timestamp, 133 client, 134 parent, 135 status, 136 region, 137 }); 138 allResult.push(response); 139 if (periodicity === "30s") { 140 // we schedule another task in 30s 141 const scheduledAt = timestamp + 30 * 1000; 142 const response = createCronTask({ 143 row, 144 timestamp: scheduledAt, 145 client, 146 parent, 147 status, 148 region, 149 }); 150 allResult.push(response); 151 } 152 } 153 } 154 155 const allRequests = await Promise.allSettled(allResult); 156 157 const success = allRequests.filter((r) => r.status === "fulfilled").length; 158 const failed = allRequests.filter((r) => r.status === "rejected").length; 159 160 console.log( 161 `End cron for ${periodicity} with ${allResult.length} jobs with ${success} success and ${failed} failed`, 162 ); 163}; 164// timestamp needs to be in ms 165const createCronTask = async ({ 166 row, 167 timestamp, 168 client, 169 parent, 170 status, 171 region, 172}: { 173 row: z.infer<typeof selectMonitorSchema>; 174 timestamp: number; 175 client: CloudTasksClient; 176 parent: string; 177 status: MonitorStatus; 178 region: Region; 179}) => { 180 let payload: 181 | z.infer<typeof httpPayloadSchema> 182 | z.infer<typeof tpcPayloadSchema> 183 | null = null; 184 185 // 186 if (row.jobType === "http") { 187 payload = { 188 workspaceId: String(row.workspaceId), 189 monitorId: String(row.id), 190 url: row.url, 191 method: row.method || "GET", 192 cronTimestamp: timestamp, 193 body: row.body, 194 headers: row.headers, 195 status: status, 196 assertions: row.assertions ? JSON.parse(row.assertions) : null, 197 degradedAfter: row.degradedAfter, 198 timeout: row.timeout, 199 trigger: "cron", 200 otelConfig: row.otelEndpoint 201 ? { 202 endpoint: row.otelEndpoint, 203 headers: transformHeaders(row.otelHeaders), 204 } 205 : undefined, 206 retry: row.retry || 3, 207 followRedirects: row.followRedirects || true, 208 }; 209 } 210 if (row.jobType === "tcp") { 211 payload = { 212 workspaceId: String(row.workspaceId), 213 monitorId: String(row.id), 214 uri: row.url, 215 status: status, 216 assertions: row.assertions ? JSON.parse(row.assertions) : null, 217 cronTimestamp: timestamp, 218 degradedAfter: row.degradedAfter, 219 timeout: row.timeout, 220 trigger: "cron", 221 retry: row.retry || 3, 222 otelConfig: row.otelEndpoint 223 ? { 224 endpoint: row.otelEndpoint, 225 headers: transformHeaders(row.otelHeaders), 226 } 227 : undefined, 228 }; 229 } 230 231 if (!payload) { 232 throw new Error("Invalid jobType"); 233 } 234 const regionInfo = regionDict[region]; 235 let regionHeader = {}; 236 if (regionInfo.provider === "fly") { 237 regionHeader = { "fly-prefer-region": region }; 238 } 239 if (regionInfo.provider === "koyeb") { 240 regionHeader = { "X-KOYEB-REGION-OVERRIDE": region.replace("koyeb_", "") }; 241 } 242 if (regionInfo.provider === "railway") { 243 regionHeader = { "railway-region": region.replace("railway_", "") }; 244 } 245 const newTask: google.cloud.tasks.v2beta3.ITask = { 246 httpRequest: { 247 headers: { 248 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing 249 ...regionHeader, 250 Authorization: `Basic ${env.CRON_SECRET}`, 251 }, 252 httpMethod: "POST", 253 url: generateUrl({ row, region }), 254 body: Buffer.from(JSON.stringify(payload)).toString("base64"), 255 }, 256 scheduleTime: { 257 seconds: timestamp / 1000, 258 }, 259 }; 260 261 const request = { parent: parent, task: newTask }; 262 return client.createTask(request); 263}; 264 265function generateUrl({ 266 row, 267 region, 268}: { 269 row: z.infer<typeof selectMonitorSchema>; 270 region: Region; 271}) { 272 const regionInfo = regionDict[region]; 273 274 switch (regionInfo.provider) { 275 case "fly": 276 return `https://openstatus-checker.fly.dev/checker/${row.jobType}?monitor_id=${row.id}`; 277 case "koyeb": 278 return `https://openstatus-checker.koyeb.app/checker/${row.jobType}?monitor_id=${row.id}`; 279 case "railway": 280 return `https://railway-proxy-production-9cb1.up.railway.app/checker/${row.jobType}?monitor_id=${row.id}`; 281 282 default: 283 throw new Error("Invalid jobType"); 284 } 285}