Openstatus www.openstatus.dev
at main 235 lines 6.9 kB view raw
1package scheduler 2 3import ( 4 "context" 5 "log" 6 "sync" 7 "time" 8 9 "connectrpc.com/connect" 10 "github.com/madflojo/tasks" 11 "github.com/openstatushq/openstatus/apps/checker/pkg/job" 12 v1 "github.com/openstatushq/openstatus/apps/checker/proto/private_location/v1" 13) 14 15const ( 16 Interval10s = "10s" 17 Interval30s = "30s" 18 Interval1m = "1m" 19 Interval5m = "5m" 20 Interval10m = "10m" 21 Interval30m = "30m" 22 Interval1h = "1h" 23) 24 25type MonitorManager struct { 26 Client v1.PrivateLocationServiceClient 27 JobRunner job.JobRunner 28 Scheduler *tasks.Scheduler 29 mu sync.Mutex 30} 31 32// UpdateMonitors fetches the latest monitors and starts/stops jobs as needed 33func (mm *MonitorManager) UpdateMonitors(ctx context.Context) { 34 res, err := mm.Client.Monitors(ctx, &connect.Request[v1.MonitorsRequest]{}) 35 if err != nil { 36 log.Printf("Failed to fetch monitors: %v", err) 37 return 38 } 39 40 currentIDs := make(map[string]struct{}) 41 42 // HTTP monitors: start jobs for new monitors 43 for _, m := range res.Msg.HttpMonitors { 44 currentIDs[m.Id] = struct{}{} 45 _, err := mm.Scheduler.Lookup(m.Id) 46 if err != nil { 47 interval := time.Duration(intervalToSecond(m.Periodicity)) * time.Second 48 task := tasks.Task{ 49 Interval: interval, 50 RunOnce: false, 51 RunSingleInstance: true, 52 // StartAfter: time.Duration(1) * time.Second, 53 ErrFunc: func(e error) { 54 log.Printf("An error occurred when executing task %s", e) 55 }, 56 FuncWithTaskContext: func(ctx tasks.TaskContext) error { 57 monitor := m 58 c := context.Background() 59 log.Printf("Starting job for monitor %s (%s)", monitor.Id, monitor.Url) 60 data, err := mm.JobRunner.HTTPJob(c, monitor) 61 62 if err != nil { 63 log.Printf("Monitor check failed for %s (%s): %v", monitor.Id, monitor.Url, err) 64 return err 65 } 66 resp, ingestErr := mm.Client.IngestHTTP(c, &connect.Request[v1.IngestHTTPRequest]{ 67 Msg: &v1.IngestHTTPRequest{ 68 MonitorId: monitor.Id, 69 Id: data.ID, 70 Url: monitor.Url, 71 Message: data.Message, 72 Latency: data.Latency, 73 Timing: data.Timing, 74 Headers: data.Headers, 75 Body: data.Body, 76 RequestStatus: data.RequestStatus, 77 StatusCode: int64(data.StatusCode), 78 Error: int64(data.Error), 79 CronTimestamp: data.CronTimestamp, 80 Timestamp: data.Timestamp, 81 }, 82 }) 83 if ingestErr != nil { 84 log.Printf("Failed to ingest HTTP result for %s (%s): %v", monitor.Id, monitor.Url, ingestErr) 85 return ingestErr 86 } 87 log.Printf("Monitor check succeeded for %s (%s), ingest response: %v", monitor.Id, monitor.Url, resp) 88 return nil 89 }, 90 } 91 92 err := mm.Scheduler.AddWithID(m.Id, &task) 93 94 if err != nil { 95 log.Printf("Failed to add HTTP monitor job for %s (%s): %v", m.Id, m.Url, err) 96 continue 97 } 98 log.Printf("Started monitoring job for %s (%s)", m.Id, m.Url) 99 continue 100 } 101 102 } 103 104 // TCP monitors: start jobs for new monitors 105 for _, m := range res.Msg.TcpMonitors { 106 currentIDs[m.Id] = struct{}{} 107 _, err := mm.Scheduler.Lookup(m.Id) 108 if err != nil { 109 110 interval := time.Duration(intervalToSecond(m.Periodicity)) * time.Second 111 task := tasks.Task{ 112 Interval: interval, 113 RunOnce: false, 114 // StartAfter: time.Now().Add(5 * time.Millisecond), 115 RunSingleInstance: true, 116 FuncWithTaskContext: func(ctx tasks.TaskContext) error { 117 118 monitor := m 119 log.Printf("Starting TCP job for monitor %s (%s)", monitor.Id, monitor.Uri) 120 data, err := mm.JobRunner.TCPJob(ctx.Context, monitor) 121 if err != nil { 122 log.Printf("TCP monitor check failed for %s (%s): %v", monitor.Id, monitor.Uri, err) 123 } 124 resp, ingestErr := mm.Client.IngestTCP(ctx.Context, &connect.Request[v1.IngestTCPRequest]{ 125 Msg: &v1.IngestTCPRequest{ 126 MonitorId: monitor.Id, 127 Id: data.ID, 128 Uri: monitor.Uri, 129 Message: data.Message, 130 Latency: data.Latency, 131 RequestStatus: data.RequestStatus, 132 Error: int64(data.Error), 133 CronTimestamp: data.CronTimestamp, 134 Timestamp: data.Timestamp, 135 }, 136 }) 137 if ingestErr != nil { 138 log.Printf("Failed to ingest TCP result for %s (%s): %v", monitor.Id, monitor.Uri, ingestErr) 139 return ingestErr 140 } 141 log.Printf("TCP monitor check succeeded for %s (%s), ingest response: %v", monitor.Id, monitor.Uri, resp) 142 143 return nil 144 }, 145 } 146 err := mm.Scheduler.AddWithID(m.Id, &task) 147 148 if err != nil { 149 log.Printf("Failed to add TCP monitor job for %s (%s): %v", m.Id, m.Uri, err) 150 continue 151 } 152 log.Printf("Started TCP monitoring job for %s (%s)", m.Id, m.Uri) 153 } 154 } 155 156 for _, m := range res.Msg.DnsMonitors { 157 currentIDs[m.Id] = struct{}{} 158 _, err := mm.Scheduler.Lookup(m.Id) 159 if err != nil { 160 161 interval := time.Duration(intervalToSecond(m.Periodicity)) * time.Second 162 task := tasks.Task{ 163 Interval: interval, 164 RunOnce: false, 165 // StartAfter: time.Now().Add(5 * time.Millisecond), 166 RunSingleInstance: true, 167 FuncWithTaskContext: func(ctx tasks.TaskContext) error { 168 169 monitor := m 170 log.Printf("Starting TCP job for monitor %s (%s)", monitor.Id, monitor.Uri) 171 _, err := mm.JobRunner.DNSJob(ctx.Context, monitor) 172 if err != nil { 173 log.Printf("TCP monitor check failed for %s (%s): %v", monitor.Id, monitor.Uri, err) 174 } 175 resp, ingestErr := mm.Client.IngestDNS(ctx.Context, &connect.Request[v1.IngestDNSRequest]{ 176 Msg: &v1.IngestDNSRequest{ 177 MonitorId: monitor.Id, 178 179 // Id: data.ID, 180 // Uri: monitor.Uri, 181 // Message: data.Message, 182 // Latency: data.Latency, 183 // RequestStatus: data.RequestStatus, 184 // Error: int64(data.Error), 185 // CronTimestamp: data.CronTimestamp, 186 // Timestamp: data.Timestamp, 187 }, 188 }) 189 if ingestErr != nil { 190 log.Printf("Failed to ingest TCP result for %s (%s): %v", monitor.Id, monitor.Uri, ingestErr) 191 return ingestErr 192 } 193 log.Printf("TCP monitor check succeeded for %s (%s), ingest response: %v", monitor.Id, monitor.Uri, resp) 194 195 return nil 196 }, 197 } 198 err := mm.Scheduler.AddWithID(m.Id, &task) 199 200 if err != nil { 201 log.Printf("Failed to add TCP monitor job for %s (%s): %v", m.Id, m.Uri, err) 202 continue 203 } 204 log.Printf("Started TCP monitoring job for %s (%s)", m.Id, m.Uri) 205 } 206 } 207 208 for id := range mm.Scheduler.Tasks() { 209 if _, stillExists := currentIDs[id]; !stillExists { 210 mm.Scheduler.Del(id) 211 } 212 } 213 214} 215 216func intervalToSecond(interval string) int { 217 switch interval { 218 case Interval30s: 219 return 30 220 case Interval1m: 221 return 60 222 case Interval5m: 223 return 300 224 case Interval10m: 225 return 600 226 case Interval30m: 227 return 1800 228 case Interval1h: 229 return 3600 230 case Interval10s: 231 return 10 232 default: 233 return 0 234 } 235}