Openstatus
www.openstatus.dev
1import { CloudTasksClient } from "@google-cloud/tasks";
2import type { google } from "@google-cloud/tasks/build/protos/protos";
3import type { NextRequest } from "next/server";
4import { z } from "zod";
5
6import { and, db, eq, gte, lte, notInArray } from "@openstatus/db";
7import type { MonitorStatus } from "@openstatus/db/src/schema";
8import {
9 maintenance,
10 maintenancesToMonitors,
11 monitor,
12 monitorStatusTable,
13 selectMonitorSchema,
14 selectMonitorStatusSchema,
15} from "@openstatus/db/src/schema";
16
17import { env } from "@/env";
18import type { Region } from "@openstatus/db/src/schema/constants";
19import { regionDict } from "@openstatus/regions";
20import {
21 type httpPayloadSchema,
22 type tpcPayloadSchema,
23 transformHeaders,
24} from "@openstatus/utils";
25
26const periodicityAvailable = selectMonitorSchema.pick({ periodicity: true });
27
28// FIXME: do coerce in zod instead
29
30const DEFAULT_URL = process.env.VERCEL_URL
31 ? `https://${process.env.VERCEL_URL}`
32 : "http://localhost:3000";
33
34// We can't secure cron endpoint by vercel thus we should make sure they are called by the generated url
35export const isAuthorizedDomain = (url: string) => {
36 return url.includes(DEFAULT_URL);
37};
38
39export const cron = async ({
40 periodicity,
41 // biome-ignore lint/correctness/noUnusedVariables: <explanation>
42 req,
43}: z.infer<typeof periodicityAvailable> & { req: NextRequest }) => {
44 const client = new CloudTasksClient({
45 projectId: env.GCP_PROJECT_ID,
46 credentials: {
47 client_email: process.env.GCP_CLIENT_EMAIL,
48 private_key: env.GCP_PRIVATE_KEY.replaceAll("\\n", "\n"),
49 },
50 });
51 const parent = client.queuePath(
52 env.GCP_PROJECT_ID,
53 env.GCP_LOCATION,
54 periodicity,
55 );
56
57 const timestamp = Date.now();
58
59 const currentMaintenance = db
60 .select({ id: maintenance.id })
61 .from(maintenance)
62 .where(
63 and(lte(maintenance.from, new Date()), gte(maintenance.to, new Date())),
64 )
65 .as("currentMaintenance");
66
67 const currentMaintenanceMonitors = db
68 .select({ id: maintenancesToMonitors.monitorId })
69 .from(maintenancesToMonitors)
70 .innerJoin(
71 currentMaintenance,
72 eq(maintenancesToMonitors.maintenanceId, currentMaintenance.id),
73 );
74
75 const result = await db
76 .select()
77 .from(monitor)
78 .where(
79 and(
80 eq(monitor.periodicity, periodicity),
81 eq(monitor.active, true),
82 notInArray(monitor.id, currentMaintenanceMonitors),
83 ),
84 )
85 .all();
86
87 console.log(`Start cron for ${periodicity}`);
88
89 const monitors = z.array(selectMonitorSchema).safeParse(result);
90 const allResult = [];
91 if (!monitors.success) {
92 console.error(
93 `Error while fetching the monitors ${monitors.error.issues.map((issue) => issue.message).join(", ")}`,
94 );
95 throw new Error("Error while fetching the monitors");
96 }
97
98 for (const row of monitors.data) {
99 // const selectedRegions = row.regions.length > 0 ? row.regions : ["ams"];
100
101 const result = await db
102 .select()
103 .from(monitorStatusTable)
104 .where(eq(monitorStatusTable.monitorId, row.id))
105 .all();
106 const monitorStatus = z.array(selectMonitorStatusSchema).safeParse(result);
107 if (!monitorStatus.success) {
108 console.error(
109 `Error while fetching the monitor status ${monitorStatus.error.issues.map((issue) => issue.message).join(", ")}`,
110 );
111 continue;
112 }
113
114 for (const region of row.regions) {
115 const status =
116 monitorStatus.data.find((m) => region === m.region)?.status || "active";
117
118 const r = regionDict[region as keyof typeof regionDict];
119
120 if (!r) {
121 console.error(`Invalid region ${region}`);
122 continue;
123 }
124 if (r.deprecated) {
125 // Let's uncomment this when we are ready to remove deprecated regions
126 // We should not use deprecated regions anymore
127 console.error(`Deprecated region ${region}`);
128 continue;
129 }
130 const response = createCronTask({
131 row,
132 timestamp,
133 client,
134 parent,
135 status,
136 region,
137 });
138 allResult.push(response);
139 if (periodicity === "30s") {
140 // we schedule another task in 30s
141 const scheduledAt = timestamp + 30 * 1000;
142 const response = createCronTask({
143 row,
144 timestamp: scheduledAt,
145 client,
146 parent,
147 status,
148 region,
149 });
150 allResult.push(response);
151 }
152 }
153 }
154
155 const allRequests = await Promise.allSettled(allResult);
156
157 const success = allRequests.filter((r) => r.status === "fulfilled").length;
158 const failed = allRequests.filter((r) => r.status === "rejected").length;
159
160 console.log(
161 `End cron for ${periodicity} with ${allResult.length} jobs with ${success} success and ${failed} failed`,
162 );
163};
164// timestamp needs to be in ms
165const createCronTask = async ({
166 row,
167 timestamp,
168 client,
169 parent,
170 status,
171 region,
172}: {
173 row: z.infer<typeof selectMonitorSchema>;
174 timestamp: number;
175 client: CloudTasksClient;
176 parent: string;
177 status: MonitorStatus;
178 region: Region;
179}) => {
180 let payload:
181 | z.infer<typeof httpPayloadSchema>
182 | z.infer<typeof tpcPayloadSchema>
183 | null = null;
184
185 //
186 if (row.jobType === "http") {
187 payload = {
188 workspaceId: String(row.workspaceId),
189 monitorId: String(row.id),
190 url: row.url,
191 method: row.method || "GET",
192 cronTimestamp: timestamp,
193 body: row.body,
194 headers: row.headers,
195 status: status,
196 assertions: row.assertions ? JSON.parse(row.assertions) : null,
197 degradedAfter: row.degradedAfter,
198 timeout: row.timeout,
199 trigger: "cron",
200 otelConfig: row.otelEndpoint
201 ? {
202 endpoint: row.otelEndpoint,
203 headers: transformHeaders(row.otelHeaders),
204 }
205 : undefined,
206 retry: row.retry || 3,
207 followRedirects: row.followRedirects || true,
208 };
209 }
210 if (row.jobType === "tcp") {
211 payload = {
212 workspaceId: String(row.workspaceId),
213 monitorId: String(row.id),
214 uri: row.url,
215 status: status,
216 assertions: row.assertions ? JSON.parse(row.assertions) : null,
217 cronTimestamp: timestamp,
218 degradedAfter: row.degradedAfter,
219 timeout: row.timeout,
220 trigger: "cron",
221 retry: row.retry || 3,
222 otelConfig: row.otelEndpoint
223 ? {
224 endpoint: row.otelEndpoint,
225 headers: transformHeaders(row.otelHeaders),
226 }
227 : undefined,
228 };
229 }
230
231 if (!payload) {
232 throw new Error("Invalid jobType");
233 }
234 const regionInfo = regionDict[region];
235 let regionHeader = {};
236 if (regionInfo.provider === "fly") {
237 regionHeader = { "fly-prefer-region": region };
238 }
239 if (regionInfo.provider === "koyeb") {
240 regionHeader = { "X-KOYEB-REGION-OVERRIDE": region.replace("koyeb_", "") };
241 }
242 if (regionInfo.provider === "railway") {
243 regionHeader = { "railway-region": region.replace("railway_", "") };
244 }
245 const newTask: google.cloud.tasks.v2beta3.ITask = {
246 httpRequest: {
247 headers: {
248 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing
249 ...regionHeader,
250 Authorization: `Basic ${env.CRON_SECRET}`,
251 },
252 httpMethod: "POST",
253 url: generateUrl({ row, region }),
254 body: Buffer.from(JSON.stringify(payload)).toString("base64"),
255 },
256 scheduleTime: {
257 seconds: timestamp / 1000,
258 },
259 };
260
261 const request = { parent: parent, task: newTask };
262 return client.createTask(request);
263};
264
265function generateUrl({
266 row,
267 region,
268}: {
269 row: z.infer<typeof selectMonitorSchema>;
270 region: Region;
271}) {
272 const regionInfo = regionDict[region];
273
274 switch (regionInfo.provider) {
275 case "fly":
276 return `https://openstatus-checker.fly.dev/checker/${row.jobType}?monitor_id=${row.id}`;
277 case "koyeb":
278 return `https://openstatus-checker.koyeb.app/checker/${row.jobType}?monitor_id=${row.id}`;
279 case "railway":
280 return `https://railway-proxy-production-9cb1.up.railway.app/checker/${row.jobType}?monitor_id=${row.id}`;
281
282 default:
283 throw new Error("Invalid jobType");
284 }
285}