Openstatus
www.openstatus.dev
1import { CloudTasksClient } from "@google-cloud/tasks";
2import type { google } from "@google-cloud/tasks/build/protos/protos";
3import {
4 and,
5 db,
6 desc,
7 eq,
8 isNull,
9 lte,
10 max,
11 or,
12 schema,
13} from "@openstatus/db";
14import { session, user } from "@openstatus/db/src/schema";
15import {
16 monitorDeactivationEmail,
17 monitorPausedEmail,
18} from "@openstatus/emails";
19import { sendBatchEmailHtml } from "@openstatus/emails/src/send";
20import { Redis } from "@openstatus/upstash";
21import { RateLimiter } from "limiter";
22import { z } from "zod";
23import { env } from "../env";
24
25const redis = Redis.fromEnv();
26
27const client = new CloudTasksClient({
28 projectId: env().GCP_PROJECT_ID,
29 fallback: "rest",
30 credentials: {
31 client_email: env().GCP_CLIENT_EMAIL,
32 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"),
33 },
34});
35
36const parent = client.queuePath(
37 env().GCP_PROJECT_ID,
38 env().GCP_LOCATION,
39 "workflow",
40);
41
42const limiter = new RateLimiter({ tokensPerInterval: 15, interval: "second" });
43
44export async function LaunchMonitorWorkflow() {
45 // Expires is one month after last connection, so if we want to reach people who connected 3 months ago we need to check for people with expires 2 months ago
46 const twoMonthAgo = new Date().setMonth(new Date().getMonth() - 2);
47
48 const date = new Date(twoMonthAgo);
49 // User without session
50 const userWithoutSession = db
51 .select({
52 userId: schema.user.id,
53 email: schema.user.email,
54 updatedAt: schema.user.updatedAt,
55 })
56 .from(schema.user)
57 .leftJoin(schema.session, eq(schema.session.userId, schema.user.id))
58 .where(isNull(schema.session.userId))
59 .as("query");
60 // Only free users monitors are paused
61 // We don't need to handle multi users per workspace because free workspaces only have one user
62 // Only free users monitors are paused
63
64 const u1 = await db
65 .select({
66 userId: userWithoutSession.userId,
67 email: userWithoutSession.email,
68 workspaceId: schema.workspace.id,
69 })
70 .from(userWithoutSession)
71 .innerJoin(
72 schema.usersToWorkspaces,
73 eq(userWithoutSession.userId, schema.usersToWorkspaces.userId),
74 )
75 .innerJoin(
76 schema.workspace,
77 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
78 )
79 .where(
80 and(
81 or(
82 lte(userWithoutSession.updatedAt, date),
83 isNull(userWithoutSession.updatedAt),
84 ),
85 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
86 ),
87 );
88
89 console.log(`Found ${u1.length} users without session to start the workflow`);
90 const maxSessionPerUser = db
91 .select({
92 userId: schema.user.id,
93 email: schema.user.email,
94 lastConnection: max(schema.session.expires).as("lastConnection"),
95 })
96 .from(schema.user)
97 .innerJoin(schema.session, eq(schema.session.userId, schema.user.id))
98 .groupBy(schema.user.id)
99 .as("maxSessionPerUser");
100 // Only free users monitors are paused
101 // We don't need to handle multi users per workspace because free workspaces only have one user
102 // Only free users monitors are paused
103
104 const u = await db
105 .select({
106 userId: maxSessionPerUser.userId,
107 email: maxSessionPerUser.email,
108 workspaceId: schema.workspace.id,
109 })
110 .from(maxSessionPerUser)
111 .innerJoin(
112 schema.usersToWorkspaces,
113 eq(maxSessionPerUser.userId, schema.usersToWorkspaces.userId),
114 )
115 .innerJoin(
116 schema.workspace,
117 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
118 )
119 .where(
120 and(
121 lte(maxSessionPerUser.lastConnection, date),
122 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
123 ),
124 );
125 // Let's merge both results
126 const users = [...u, ...u1];
127 // iterate over users
128
129 const allResult = [];
130
131 for (const user of users) {
132 await limiter.removeTokens(1);
133 const workflow = workflowInit({ user });
134 allResult.push(workflow);
135 }
136
137 const allRequests = await Promise.allSettled(allResult);
138
139 const success = allRequests.filter((r) => r.status === "fulfilled").length;
140 const failed = allRequests.filter((r) => r.status === "rejected").length;
141
142 console.log(
143 `End cron with ${allResult.length} jobs with ${success} success and ${failed} failed`,
144 );
145}
146
147async function workflowInit({
148 user,
149}: {
150 user: {
151 userId: number;
152 email: string | null;
153 workspaceId: number;
154 };
155}) {
156 console.log(`Starting workflow for ${user.userId}`);
157 // Let's check if the user is in the workflow
158 const isMember = await redis.sismember("workflow:users", user.userId);
159 if (isMember) {
160 console.log(`user workflow already started for ${user.userId}`);
161 return;
162 }
163 // check if user has some running monitors
164 const nbRunningMonitor = await db.$count(
165 schema.monitor,
166 and(
167 eq(schema.monitor.workspaceId, user.workspaceId),
168 eq(schema.monitor.active, true),
169 isNull(schema.monitor.deletedAt),
170 ),
171 );
172 if (nbRunningMonitor === 0) {
173 console.log(`user has no running monitors for ${user.userId}`);
174 return;
175 }
176 await CreateTask({
177 parent,
178 client: client,
179 step: "14days",
180 userId: user.userId,
181 initialRun: new Date().getTime(),
182 });
183 // // Add our user to the list of users that have started the workflow
184
185 await redis.sadd("workflow:users", user.userId);
186 console.log(`user workflow started for ${user.userId}`);
187}
188
189export async function Step14Days(userId: number, workFlowRunTimestamp: number) {
190 const user = await getUser(userId);
191
192 // Send email saying we are going to pause the monitors
193 // The task has just been created we don't double check if the user has logged in :scary:
194 // send First email
195 // TODO: Send email
196
197 if (user.email) {
198 await sendBatchEmailHtml([
199 {
200 to: user.email,
201 subject: "Your OpenStatus monitors will be paused in 14 days",
202 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
203 reply_to: "thibault@openstatus.dev",
204 html: monitorDeactivationEmail({
205 date: new Date(
206 new Date().setDate(new Date().getDate() + 14),
207 ).toDateString(),
208 }),
209 },
210 ]);
211
212 await CreateTask({
213 parent,
214 client: client,
215 step: "3days",
216 userId: user.id,
217 initialRun: workFlowRunTimestamp,
218 });
219 }
220}
221
222export async function Step3Days(userId: number, workFlowRunTimestamp: number) {
223 // check if user has connected
224 const hasConnected = await hasUserLoggedIn({
225 userId,
226 date: new Date(workFlowRunTimestamp),
227 });
228
229 if (hasConnected) {
230 //
231 await redis.srem("workflow:users", userId);
232 return;
233 }
234
235 const user = await getUser(userId);
236
237 if (user.email) {
238 await sendBatchEmailHtml([
239 {
240 to: user.email,
241 subject: "Your OpenStatus monitors will be paused in 3 days",
242 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
243 reply_to: "thibault@openstatus.dev",
244 html: monitorDeactivationEmail({
245 date: new Date(
246 new Date().setDate(new Date().getDate() + 3),
247 ).toDateString(),
248 }),
249 },
250 ]);
251 }
252
253 // Send second email
254 //TODO: Send email
255 // Let's schedule the next task
256 await CreateTask({
257 client,
258 parent,
259 step: "paused",
260 userId,
261 initialRun: workFlowRunTimestamp,
262 });
263}
264
265export async function StepPaused(userId: number, workFlowRunTimestamp: number) {
266 const hasConnected = await hasUserLoggedIn({
267 userId,
268 date: new Date(workFlowRunTimestamp),
269 });
270 if (!hasConnected) {
271 // sendSecond pause email
272 const users = await db
273 .select({
274 userId: schema.user.id,
275 email: schema.user.email,
276 workspaceId: schema.workspace.id,
277 })
278 .from(user)
279 .innerJoin(session, eq(schema.user.id, schema.session.userId))
280 .innerJoin(
281 schema.usersToWorkspaces,
282 eq(schema.user.id, schema.usersToWorkspaces.userId),
283 )
284 .innerJoin(
285 schema.workspace,
286 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
287 )
288 .where(
289 and(
290 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
291 eq(schema.user.id, userId),
292 ),
293 )
294 .get();
295 // We should only have one user :)
296 if (!users) {
297 console.error(`No user found for ${userId}`);
298 return;
299 }
300
301 await db
302 .update(schema.monitor)
303 .set({ active: false })
304 .where(eq(schema.monitor.workspaceId, users.workspaceId));
305 // Send last email with pause monitor
306 }
307
308 const currentUser = await getUser(userId);
309 // TODO: Send email
310 // Remove user for workflow
311
312 if (currentUser.email) {
313 await sendBatchEmailHtml([
314 {
315 to: currentUser.email,
316 subject: "Your monitors have been paused",
317 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
318 reply_to: "thibault@openstatus.dev",
319 html: monitorPausedEmail(),
320 },
321 ]);
322 }
323 await redis.srem("workflow:users", userId);
324}
325
326async function hasUserLoggedIn({
327 userId,
328 date,
329}: {
330 userId: number;
331 date: Date;
332}) {
333 const userResult = await db
334 .select({ lastSession: schema.session.expires })
335 .from(schema.session)
336 .where(eq(schema.session.userId, userId))
337 .orderBy(desc(schema.session.expires));
338
339 if (userResult.length === 0) {
340 return false;
341 }
342 const user = userResult[0];
343 if (user.lastSession === null) {
344 return false;
345 }
346 return user.lastSession > date;
347}
348
349function CreateTask({
350 parent,
351 client,
352 step,
353 userId,
354 initialRun,
355}: {
356 parent: string;
357 client: CloudTasksClient;
358 step: z.infer<typeof workflowStepSchema>;
359 userId: number;
360 initialRun: number;
361}) {
362 const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`;
363 const timestamp = getScheduledTime(step);
364 const newTask: google.cloud.tasks.v2beta3.ITask = {
365 httpRequest: {
366 headers: {
367 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing
368 Authorization: `${env().CRON_SECRET}`,
369 },
370 httpMethod: "GET",
371 url,
372 },
373 scheduleTime: {
374 seconds: timestamp,
375 },
376 };
377
378 const request = { parent: parent, task: newTask };
379 return client.createTask(request);
380}
381
382function getScheduledTime(step: z.infer<typeof workflowStepSchema>) {
383 switch (step) {
384 case "14days":
385 // let's triger it now
386 return new Date().getTime() / 1000;
387 case "3days":
388 // it's 11 days after the 14 days
389 return new Date().setDate(new Date().getDate() + 11) / 1000;
390 case "paused":
391 // it's 3 days after the 3 days step
392 return new Date().setDate(new Date().getDate() + 3) / 1000;
393 default:
394 throw new Error("Invalid step");
395 }
396}
397
398export const workflowStep = ["14days", "3days", "paused"] as const;
399export const workflowStepSchema = z.enum(workflowStep);
400
401async function getUser(userId: number) {
402 const currentUser = await db
403 .select()
404 .from(user)
405 .where(eq(schema.user.id, userId))
406 .get();
407
408 if (!currentUser) {
409 throw new Error("User not found");
410 }
411 if (!currentUser.email) {
412 throw new Error("User email not found");
413 }
414 return currentUser;
415}