Openstatus www.openstatus.dev
at main 415 lines 11 kB view raw
1import { CloudTasksClient } from "@google-cloud/tasks"; 2import type { google } from "@google-cloud/tasks/build/protos/protos"; 3import { 4 and, 5 db, 6 desc, 7 eq, 8 isNull, 9 lte, 10 max, 11 or, 12 schema, 13} from "@openstatus/db"; 14import { session, user } from "@openstatus/db/src/schema"; 15import { 16 monitorDeactivationEmail, 17 monitorPausedEmail, 18} from "@openstatus/emails"; 19import { sendBatchEmailHtml } from "@openstatus/emails/src/send"; 20import { Redis } from "@openstatus/upstash"; 21import { RateLimiter } from "limiter"; 22import { z } from "zod"; 23import { env } from "../env"; 24 25const redis = Redis.fromEnv(); 26 27const client = new CloudTasksClient({ 28 projectId: env().GCP_PROJECT_ID, 29 fallback: "rest", 30 credentials: { 31 client_email: env().GCP_CLIENT_EMAIL, 32 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), 33 }, 34}); 35 36const parent = client.queuePath( 37 env().GCP_PROJECT_ID, 38 env().GCP_LOCATION, 39 "workflow", 40); 41 42const limiter = new RateLimiter({ tokensPerInterval: 15, interval: "second" }); 43 44export async function LaunchMonitorWorkflow() { 45 // Expires is one month after last connection, so if we want to reach people who connected 3 months ago we need to check for people with expires 2 months ago 46 const twoMonthAgo = new Date().setMonth(new Date().getMonth() - 2); 47 48 const date = new Date(twoMonthAgo); 49 // User without session 50 const userWithoutSession = db 51 .select({ 52 userId: schema.user.id, 53 email: schema.user.email, 54 updatedAt: schema.user.updatedAt, 55 }) 56 .from(schema.user) 57 .leftJoin(schema.session, eq(schema.session.userId, schema.user.id)) 58 .where(isNull(schema.session.userId)) 59 .as("query"); 60 // Only free users monitors are paused 61 // We don't need to handle multi users per workspace because free workspaces only have one user 62 // Only free users monitors are paused 63 64 const u1 = await db 65 .select({ 66 userId: userWithoutSession.userId, 67 email: userWithoutSession.email, 68 workspaceId: schema.workspace.id, 69 }) 70 .from(userWithoutSession) 71 .innerJoin( 72 schema.usersToWorkspaces, 73 eq(userWithoutSession.userId, schema.usersToWorkspaces.userId), 74 ) 75 .innerJoin( 76 schema.workspace, 77 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 78 ) 79 .where( 80 and( 81 or( 82 lte(userWithoutSession.updatedAt, date), 83 isNull(userWithoutSession.updatedAt), 84 ), 85 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 86 ), 87 ); 88 89 console.log(`Found ${u1.length} users without session to start the workflow`); 90 const maxSessionPerUser = db 91 .select({ 92 userId: schema.user.id, 93 email: schema.user.email, 94 lastConnection: max(schema.session.expires).as("lastConnection"), 95 }) 96 .from(schema.user) 97 .innerJoin(schema.session, eq(schema.session.userId, schema.user.id)) 98 .groupBy(schema.user.id) 99 .as("maxSessionPerUser"); 100 // Only free users monitors are paused 101 // We don't need to handle multi users per workspace because free workspaces only have one user 102 // Only free users monitors are paused 103 104 const u = await db 105 .select({ 106 userId: maxSessionPerUser.userId, 107 email: maxSessionPerUser.email, 108 workspaceId: schema.workspace.id, 109 }) 110 .from(maxSessionPerUser) 111 .innerJoin( 112 schema.usersToWorkspaces, 113 eq(maxSessionPerUser.userId, schema.usersToWorkspaces.userId), 114 ) 115 .innerJoin( 116 schema.workspace, 117 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 118 ) 119 .where( 120 and( 121 lte(maxSessionPerUser.lastConnection, date), 122 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 123 ), 124 ); 125 // Let's merge both results 126 const users = [...u, ...u1]; 127 // iterate over users 128 129 const allResult = []; 130 131 for (const user of users) { 132 await limiter.removeTokens(1); 133 const workflow = workflowInit({ user }); 134 allResult.push(workflow); 135 } 136 137 const allRequests = await Promise.allSettled(allResult); 138 139 const success = allRequests.filter((r) => r.status === "fulfilled").length; 140 const failed = allRequests.filter((r) => r.status === "rejected").length; 141 142 console.log( 143 `End cron with ${allResult.length} jobs with ${success} success and ${failed} failed`, 144 ); 145} 146 147async function workflowInit({ 148 user, 149}: { 150 user: { 151 userId: number; 152 email: string | null; 153 workspaceId: number; 154 }; 155}) { 156 console.log(`Starting workflow for ${user.userId}`); 157 // Let's check if the user is in the workflow 158 const isMember = await redis.sismember("workflow:users", user.userId); 159 if (isMember) { 160 console.log(`user workflow already started for ${user.userId}`); 161 return; 162 } 163 // check if user has some running monitors 164 const nbRunningMonitor = await db.$count( 165 schema.monitor, 166 and( 167 eq(schema.monitor.workspaceId, user.workspaceId), 168 eq(schema.monitor.active, true), 169 isNull(schema.monitor.deletedAt), 170 ), 171 ); 172 if (nbRunningMonitor === 0) { 173 console.log(`user has no running monitors for ${user.userId}`); 174 return; 175 } 176 await CreateTask({ 177 parent, 178 client: client, 179 step: "14days", 180 userId: user.userId, 181 initialRun: new Date().getTime(), 182 }); 183 // // Add our user to the list of users that have started the workflow 184 185 await redis.sadd("workflow:users", user.userId); 186 console.log(`user workflow started for ${user.userId}`); 187} 188 189export async function Step14Days(userId: number, workFlowRunTimestamp: number) { 190 const user = await getUser(userId); 191 192 // Send email saying we are going to pause the monitors 193 // The task has just been created we don't double check if the user has logged in :scary: 194 // send First email 195 // TODO: Send email 196 197 if (user.email) { 198 await sendBatchEmailHtml([ 199 { 200 to: user.email, 201 subject: "Your OpenStatus monitors will be paused in 14 days", 202 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 203 reply_to: "thibault@openstatus.dev", 204 html: monitorDeactivationEmail({ 205 date: new Date( 206 new Date().setDate(new Date().getDate() + 14), 207 ).toDateString(), 208 }), 209 }, 210 ]); 211 212 await CreateTask({ 213 parent, 214 client: client, 215 step: "3days", 216 userId: user.id, 217 initialRun: workFlowRunTimestamp, 218 }); 219 } 220} 221 222export async function Step3Days(userId: number, workFlowRunTimestamp: number) { 223 // check if user has connected 224 const hasConnected = await hasUserLoggedIn({ 225 userId, 226 date: new Date(workFlowRunTimestamp), 227 }); 228 229 if (hasConnected) { 230 // 231 await redis.srem("workflow:users", userId); 232 return; 233 } 234 235 const user = await getUser(userId); 236 237 if (user.email) { 238 await sendBatchEmailHtml([ 239 { 240 to: user.email, 241 subject: "Your OpenStatus monitors will be paused in 3 days", 242 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 243 reply_to: "thibault@openstatus.dev", 244 html: monitorDeactivationEmail({ 245 date: new Date( 246 new Date().setDate(new Date().getDate() + 3), 247 ).toDateString(), 248 }), 249 }, 250 ]); 251 } 252 253 // Send second email 254 //TODO: Send email 255 // Let's schedule the next task 256 await CreateTask({ 257 client, 258 parent, 259 step: "paused", 260 userId, 261 initialRun: workFlowRunTimestamp, 262 }); 263} 264 265export async function StepPaused(userId: number, workFlowRunTimestamp: number) { 266 const hasConnected = await hasUserLoggedIn({ 267 userId, 268 date: new Date(workFlowRunTimestamp), 269 }); 270 if (!hasConnected) { 271 // sendSecond pause email 272 const users = await db 273 .select({ 274 userId: schema.user.id, 275 email: schema.user.email, 276 workspaceId: schema.workspace.id, 277 }) 278 .from(user) 279 .innerJoin(session, eq(schema.user.id, schema.session.userId)) 280 .innerJoin( 281 schema.usersToWorkspaces, 282 eq(schema.user.id, schema.usersToWorkspaces.userId), 283 ) 284 .innerJoin( 285 schema.workspace, 286 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 287 ) 288 .where( 289 and( 290 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 291 eq(schema.user.id, userId), 292 ), 293 ) 294 .get(); 295 // We should only have one user :) 296 if (!users) { 297 console.error(`No user found for ${userId}`); 298 return; 299 } 300 301 await db 302 .update(schema.monitor) 303 .set({ active: false }) 304 .where(eq(schema.monitor.workspaceId, users.workspaceId)); 305 // Send last email with pause monitor 306 } 307 308 const currentUser = await getUser(userId); 309 // TODO: Send email 310 // Remove user for workflow 311 312 if (currentUser.email) { 313 await sendBatchEmailHtml([ 314 { 315 to: currentUser.email, 316 subject: "Your monitors have been paused", 317 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 318 reply_to: "thibault@openstatus.dev", 319 html: monitorPausedEmail(), 320 }, 321 ]); 322 } 323 await redis.srem("workflow:users", userId); 324} 325 326async function hasUserLoggedIn({ 327 userId, 328 date, 329}: { 330 userId: number; 331 date: Date; 332}) { 333 const userResult = await db 334 .select({ lastSession: schema.session.expires }) 335 .from(schema.session) 336 .where(eq(schema.session.userId, userId)) 337 .orderBy(desc(schema.session.expires)); 338 339 if (userResult.length === 0) { 340 return false; 341 } 342 const user = userResult[0]; 343 if (user.lastSession === null) { 344 return false; 345 } 346 return user.lastSession > date; 347} 348 349function CreateTask({ 350 parent, 351 client, 352 step, 353 userId, 354 initialRun, 355}: { 356 parent: string; 357 client: CloudTasksClient; 358 step: z.infer<typeof workflowStepSchema>; 359 userId: number; 360 initialRun: number; 361}) { 362 const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`; 363 const timestamp = getScheduledTime(step); 364 const newTask: google.cloud.tasks.v2beta3.ITask = { 365 httpRequest: { 366 headers: { 367 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing 368 Authorization: `${env().CRON_SECRET}`, 369 }, 370 httpMethod: "GET", 371 url, 372 }, 373 scheduleTime: { 374 seconds: timestamp, 375 }, 376 }; 377 378 const request = { parent: parent, task: newTask }; 379 return client.createTask(request); 380} 381 382function getScheduledTime(step: z.infer<typeof workflowStepSchema>) { 383 switch (step) { 384 case "14days": 385 // let's triger it now 386 return new Date().getTime() / 1000; 387 case "3days": 388 // it's 11 days after the 14 days 389 return new Date().setDate(new Date().getDate() + 11) / 1000; 390 case "paused": 391 // it's 3 days after the 3 days step 392 return new Date().setDate(new Date().getDate() + 3) / 1000; 393 default: 394 throw new Error("Invalid step"); 395 } 396} 397 398export const workflowStep = ["14days", "3days", "paused"] as const; 399export const workflowStepSchema = z.enum(workflowStep); 400 401async function getUser(userId: number) { 402 const currentUser = await db 403 .select() 404 .from(user) 405 .where(eq(schema.user.id, userId)) 406 .get(); 407 408 if (!currentUser) { 409 throw new Error("User not found"); 410 } 411 if (!currentUser.email) { 412 throw new Error("User email not found"); 413 } 414 return currentUser; 415}