Openstatus www.openstatus.dev
at main 280 lines 7.2 kB view raw
1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "math/rand/v2" 9 "net/http" 10 "os" 11 "os/signal" 12 "syscall" 13 "time" 14 15 "github.com/gin-gonic/gin" 16 "github.com/google/uuid" 17 "github.com/openstatushq/openstatus/apps/checker/handlers" 18 19 "github.com/openstatushq/openstatus/apps/checker/pkg/logger" 20 "github.com/openstatushq/openstatus/apps/checker/pkg/tinybird" 21 "github.com/rs/zerolog/log" 22 "go.opentelemetry.io/contrib/bridges/otelslog" 23 // otelz "go.opentelemetry.io/contrib/bridges/otelzerolog" 24 "go.opentelemetry.io/otel/log/global" 25 "go.opentelemetry.io/otel/attribute" 26 otlploghttp "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" 27 sdklog "go.opentelemetry.io/otel/sdk/log" 28 "go.opentelemetry.io/otel/sdk/resource" 29 semconv "go.opentelemetry.io/otel/semconv/v1.24.0" 30) 31 32func shouldSample(event map[string]any) bool { 33 statusCode, _ := event["status_code"].(int) 34 durationMs, _ := event["duration_ms"].(int) 35 36 // Always capture: server errors 37 if statusCode >= 500 { 38 return true 39 } 40 41 // Always capture: explicit errors 42 if _, hasError := event["error"]; hasError { 43 return true 44 } 45 46 // Always capture: slow requests (above p99 - 2s threshold) 47 if durationMs > 2000 { 48 return true 49 } 50 51 // Higher sampling for client errors (4xx) - 100% 52 if statusCode >= 400 && statusCode < 500 { 53 return true 54 } 55 56 // Random sample successful, fast requests at 20% 57 return rand.Float64() < 0.2 58} 59 60// MapToAttrs converts a map[string]any to a slice of slog.Attr 61func MapToAttrs(m map[string]any) []slog.Attr { 62 attrs := make([]slog.Attr, 0, len(m)) 63 for k, v := range m { 64 attrs = append(attrs, toAttr(k, v)) 65 } 66 return attrs 67} 68 69func toAttr(key string, value any) slog.Attr { 70 switch v := value.(type) { 71 case string: 72 return slog.String(key, v) 73 case int: 74 return slog.Int(key, v) 75 case int64: 76 return slog.Int64(key, v) 77 case float64: 78 return slog.Float64(key, v) 79 case bool: 80 return slog.Bool(key, v) 81 case time.Time: 82 return slog.Time(key, v) 83 case time.Duration: 84 return slog.Duration(key, v) 85 case map[string]any: 86 return slog.Group(key, mapToAny(v)...) 87 default: 88 return slog.Any(key, v) 89 } 90} 91 92func mapToAny(m map[string]any) []any { 93 args := make([]any, 0, len(m)*2) 94 for k, v := range m { 95 args = append(args, toAttr(k, v)) 96 } 97 return args 98} 99 100func Logger() gin.HandlerFunc { 101 return func(c *gin.Context) { 102 startTime := time.Now() 103 104 // Generate or get request ID 105 requestID := c.GetHeader("X-Request-ID") 106 if requestID == "" { 107 requestID = uuid.New().String() 108 } 109 c.Set("requestId", requestID) 110 111 // Build wide event context at request start 112 event := map[string]any{ 113 "timestamp": startTime.Format(time.RFC3339), 114 "request_id": requestID, 115 "method": c.Request.Method, 116 "path": c.Request.URL.Path, 117 "url": c.Request.Host + c.Request.URL.String(), 118 "user_agent": c.GetHeader("User-Agent"), 119 "content_type": c.GetHeader("Content-Type"), 120 } 121 c.Set("event", event) 122 123 // Process request 124 c.Next() 125 126 // After request - capture response details 127 duration := time.Since(startTime).Milliseconds() 128 status := c.Writer.Status() 129 130 event["status_code"] = status 131 event["duration_ms"] = int(duration) 132 133 // var requestErr error 134 if len(c.Errors) > 0 { 135 event["outcome"] = "error" 136 lastErr := c.Errors.Last() 137 event["error"] = map[string]any{ 138 "type": "GinError", 139 "message": lastErr.Error(), 140 } 141 } else { 142 event["outcome"] = "success" 143 } 144 145 if shouldSample(event) { 146 attrs := MapToAttrs(event) 147 slog.LogAttrs(c.Request.Context(),slog.LevelInfo, "request done", attrs...) 148 } 149 150 log.Debug(). 151 Int("status_code", status). 152 Int64("duration_ms", duration). 153 Str("request_id", requestID). 154 Msg("Request completed") 155 } 156} 157 158func main() { 159 ctx, cancel := context.WithCancel(context.Background()) 160 defer cancel() 161 162 done := make(chan os.Signal, 1) 163 signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 164 165 go func() { 166 <-done 167 cancel() 168 }() 169 170 // environment variables. 171 var region string 172 cronSecret := env("CRON_SECRET", "") 173 tinyBirdToken := env("TINYBIRD_TOKEN", "") 174 logLevel := env("LOG_LEVEL", "info") 175 cloudProvider := env("CLOUD_PROVIDER", "fly") 176 axiomToken := env("AXIOM_TOKEN", "") 177 axiomDataset := env("AXIOM_DATASET", "dev") 178 switch cloudProvider { 179 case "fly": 180 region = env("FLY_REGION", env("REGION", "local")) 181 182 case "koyeb": 183 region = fmt.Sprintf("koyeb_%s", env("KOYEB_REGION", env("REGION", "local"))) 184 185 case "railway": 186 region = fmt.Sprintf("railway_%s", env("RAILWAY_REPLICA_REGION", env("REGION", "local"))) 187 default: 188 log.Fatal().Msgf("unsupported cloud provider: %s", cloudProvider) 189 } 190 logger.Configure(logLevel) 191 192 // Define resource with service name, version, and environment 193 res := resource.NewWithAttributes( 194 semconv.SchemaURL, 195 semconv.ServiceNameKey.String("openstatus-checker"), 196 semconv.ServiceVersionKey.String("1.0.0"), 197 attribute.String("environment", "production"), 198 attribute.String("cloud.provider", cloudProvider), 199 attribute.String("cloud.region", region), 200 ) 201 202 // Set up OTLP log exporter for Axiom 203 exporter, err := otlploghttp.New(ctx, 204 otlploghttp.WithEndpointURL("https://eu-central-1.aws.edge.axiom.co/v1/logs"), 205 otlploghttp.WithHeaders(map[string]string{ 206 "Authorization": "Bearer " + axiomToken, 207 "X-Axiom-Dataset": axiomDataset, 208 }), 209 ) 210 if err != nil { 211 log.Fatal().Err(err).Msg("failed to create OTLP exporter") 212 } 213 214 // Create log provider with resource and batch processor 215 logProvider := sdklog.NewLoggerProvider( 216 sdklog.WithResource(res), 217 sdklog.WithProcessor(sdklog.NewBatchProcessor(exporter)), 218 219 ) 220 defer logProvider.Shutdown(ctx) 221 222 global.SetLoggerProvider(logProvider) 223 slog.SetDefault(otelslog.NewLogger("openstatus-checker")) 224 httpClient := &http.Client{ 225 Timeout: 45 * time.Second, 226 } 227 228 defer httpClient.CloseIdleConnections() 229 230 tinybirdClient := tinybird.NewClient(httpClient, tinyBirdToken) 231 232 h := &handlers.Handler{ 233 Secret: cronSecret, 234 CloudProvider: cloudProvider, 235 Region: region, 236 TbClient: tinybirdClient, 237 } 238 239 router := gin.New() 240 router.Use(gin.Recovery()) 241 router.Use(Logger()) 242 router.POST("/checker", h.HTTPCheckerHandler) 243 router.POST("/checker/http", h.HTTPCheckerHandler) 244 router.POST("/checker/tcp", h.TCPHandler) 245 router.POST("/checker/dns", h.DNSHandler) 246 router.POST("/ping/:region", h.PingRegionHandler) 247 router.POST("/tcp/:region", h.TCPHandlerRegion) 248 router.POST("/dns/:region", h.DNSHandlerRegion) 249 250 router.GET("/health", func(c *gin.Context) { 251 c.JSON(http.StatusOK, gin.H{"message": "pong", "region": region, "provider": cloudProvider}) 252 }) 253 254 httpServer := &http.Server{ 255 Addr: fmt.Sprintf("0.0.0.0:%s", env("PORT", "8080")), 256 Handler: router, 257 } 258 259 go func() { 260 if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { 261 log.Ctx(ctx).Error().Err(err).Msg("failed to start http server") 262 cancel() 263 } 264 }() 265 266 <-ctx.Done() 267 if err := httpServer.Shutdown(ctx); err != nil { 268 log.Ctx(ctx).Error().Err(err).Msg("failed to shutdown http server") 269 270 return 271 } 272} 273 274func env(key, fallback string) string { 275 if value, ok := os.LookupEnv(key); ok { 276 return value 277 } 278 279 return fallback 280}