Openstatus
www.openstatus.dev
1package scheduler
2
3import (
4 "context"
5 "log"
6 "sync"
7 "time"
8
9 "connectrpc.com/connect"
10 "github.com/madflojo/tasks"
11 "github.com/openstatushq/openstatus/apps/checker/pkg/job"
12 v1 "github.com/openstatushq/openstatus/apps/checker/proto/private_location/v1"
13)
14
15const (
16 Interval10s = "10s"
17 Interval30s = "30s"
18 Interval1m = "1m"
19 Interval5m = "5m"
20 Interval10m = "10m"
21 Interval30m = "30m"
22 Interval1h = "1h"
23)
24
25type MonitorManager struct {
26 Client v1.PrivateLocationServiceClient
27 JobRunner job.JobRunner
28 Scheduler *tasks.Scheduler
29 mu sync.Mutex
30}
31
32// UpdateMonitors fetches the latest monitors and starts/stops jobs as needed
33func (mm *MonitorManager) UpdateMonitors(ctx context.Context) {
34 res, err := mm.Client.Monitors(ctx, &connect.Request[v1.MonitorsRequest]{})
35 if err != nil {
36 log.Printf("Failed to fetch monitors: %v", err)
37 return
38 }
39
40 currentIDs := make(map[string]struct{})
41
42 // HTTP monitors: start jobs for new monitors
43 for _, m := range res.Msg.HttpMonitors {
44 currentIDs[m.Id] = struct{}{}
45 _, err := mm.Scheduler.Lookup(m.Id)
46 if err != nil {
47 interval := time.Duration(intervalToSecond(m.Periodicity)) * time.Second
48 task := tasks.Task{
49 Interval: interval,
50 RunOnce: false,
51 RunSingleInstance: true,
52 // StartAfter: time.Duration(1) * time.Second,
53 ErrFunc: func(e error) {
54 log.Printf("An error occurred when executing task %s", e)
55 },
56 FuncWithTaskContext: func(ctx tasks.TaskContext) error {
57 monitor := m
58 c := context.Background()
59 log.Printf("Starting job for monitor %s (%s)", monitor.Id, monitor.Url)
60 data, err := mm.JobRunner.HTTPJob(c, monitor)
61
62 if err != nil {
63 log.Printf("Monitor check failed for %s (%s): %v", monitor.Id, monitor.Url, err)
64 return err
65 }
66 resp, ingestErr := mm.Client.IngestHTTP(c, &connect.Request[v1.IngestHTTPRequest]{
67 Msg: &v1.IngestHTTPRequest{
68 MonitorId: monitor.Id,
69 Id: data.ID,
70 Url: monitor.Url,
71 Message: data.Message,
72 Latency: data.Latency,
73 Timing: data.Timing,
74 Headers: data.Headers,
75 Body: data.Body,
76 RequestStatus: data.RequestStatus,
77 StatusCode: int64(data.StatusCode),
78 Error: int64(data.Error),
79 CronTimestamp: data.CronTimestamp,
80 Timestamp: data.Timestamp,
81 },
82 })
83 if ingestErr != nil {
84 log.Printf("Failed to ingest HTTP result for %s (%s): %v", monitor.Id, monitor.Url, ingestErr)
85 return ingestErr
86 }
87 log.Printf("Monitor check succeeded for %s (%s), ingest response: %v", monitor.Id, monitor.Url, resp)
88 return nil
89 },
90 }
91
92 err := mm.Scheduler.AddWithID(m.Id, &task)
93
94 if err != nil {
95 log.Printf("Failed to add HTTP monitor job for %s (%s): %v", m.Id, m.Url, err)
96 continue
97 }
98 log.Printf("Started monitoring job for %s (%s)", m.Id, m.Url)
99 continue
100 }
101
102 }
103
104 // TCP monitors: start jobs for new monitors
105 for _, m := range res.Msg.TcpMonitors {
106 currentIDs[m.Id] = struct{}{}
107 _, err := mm.Scheduler.Lookup(m.Id)
108 if err != nil {
109
110 interval := time.Duration(intervalToSecond(m.Periodicity)) * time.Second
111 task := tasks.Task{
112 Interval: interval,
113 RunOnce: false,
114 // StartAfter: time.Now().Add(5 * time.Millisecond),
115 RunSingleInstance: true,
116 FuncWithTaskContext: func(ctx tasks.TaskContext) error {
117
118 monitor := m
119 log.Printf("Starting TCP job for monitor %s (%s)", monitor.Id, monitor.Uri)
120 data, err := mm.JobRunner.TCPJob(ctx.Context, monitor)
121 if err != nil {
122 log.Printf("TCP monitor check failed for %s (%s): %v", monitor.Id, monitor.Uri, err)
123 }
124 resp, ingestErr := mm.Client.IngestTCP(ctx.Context, &connect.Request[v1.IngestTCPRequest]{
125 Msg: &v1.IngestTCPRequest{
126 MonitorId: monitor.Id,
127 Id: data.ID,
128 Uri: monitor.Uri,
129 Message: data.Message,
130 Latency: data.Latency,
131 RequestStatus: data.RequestStatus,
132 Error: int64(data.Error),
133 CronTimestamp: data.CronTimestamp,
134 Timestamp: data.Timestamp,
135 },
136 })
137 if ingestErr != nil {
138 log.Printf("Failed to ingest TCP result for %s (%s): %v", monitor.Id, monitor.Uri, ingestErr)
139 return ingestErr
140 }
141 log.Printf("TCP monitor check succeeded for %s (%s), ingest response: %v", monitor.Id, monitor.Uri, resp)
142
143 return nil
144 },
145 }
146 err := mm.Scheduler.AddWithID(m.Id, &task)
147
148 if err != nil {
149 log.Printf("Failed to add TCP monitor job for %s (%s): %v", m.Id, m.Uri, err)
150 continue
151 }
152 log.Printf("Started TCP monitoring job for %s (%s)", m.Id, m.Uri)
153 }
154 }
155
156 for _, m := range res.Msg.DnsMonitors {
157 currentIDs[m.Id] = struct{}{}
158 _, err := mm.Scheduler.Lookup(m.Id)
159 if err != nil {
160
161 interval := time.Duration(intervalToSecond(m.Periodicity)) * time.Second
162 task := tasks.Task{
163 Interval: interval,
164 RunOnce: false,
165 // StartAfter: time.Now().Add(5 * time.Millisecond),
166 RunSingleInstance: true,
167 FuncWithTaskContext: func(ctx tasks.TaskContext) error {
168
169 monitor := m
170 log.Printf("Starting TCP job for monitor %s (%s)", monitor.Id, monitor.Uri)
171 _, err := mm.JobRunner.DNSJob(ctx.Context, monitor)
172 if err != nil {
173 log.Printf("TCP monitor check failed for %s (%s): %v", monitor.Id, monitor.Uri, err)
174 }
175 resp, ingestErr := mm.Client.IngestDNS(ctx.Context, &connect.Request[v1.IngestDNSRequest]{
176 Msg: &v1.IngestDNSRequest{
177 MonitorId: monitor.Id,
178
179 // Id: data.ID,
180 // Uri: monitor.Uri,
181 // Message: data.Message,
182 // Latency: data.Latency,
183 // RequestStatus: data.RequestStatus,
184 // Error: int64(data.Error),
185 // CronTimestamp: data.CronTimestamp,
186 // Timestamp: data.Timestamp,
187 },
188 })
189 if ingestErr != nil {
190 log.Printf("Failed to ingest TCP result for %s (%s): %v", monitor.Id, monitor.Uri, ingestErr)
191 return ingestErr
192 }
193 log.Printf("TCP monitor check succeeded for %s (%s), ingest response: %v", monitor.Id, monitor.Uri, resp)
194
195 return nil
196 },
197 }
198 err := mm.Scheduler.AddWithID(m.Id, &task)
199
200 if err != nil {
201 log.Printf("Failed to add TCP monitor job for %s (%s): %v", m.Id, m.Uri, err)
202 continue
203 }
204 log.Printf("Started TCP monitoring job for %s (%s)", m.Id, m.Uri)
205 }
206 }
207
208 for id := range mm.Scheduler.Tasks() {
209 if _, stillExists := currentIDs[id]; !stillExists {
210 mm.Scheduler.Del(id)
211 }
212 }
213
214}
215
216func intervalToSecond(interval string) int {
217 switch interval {
218 case Interval30s:
219 return 30
220 case Interval1m:
221 return 60
222 case Interval5m:
223 return 300
224 case Interval10m:
225 return 600
226 case Interval30m:
227 return 1800
228 case Interval1h:
229 return 3600
230 case Interval10s:
231 return 10
232 default:
233 return 0
234 }
235}