porting all github actions from bluesky-social/indigo to tangled CI
at main 4.3 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "log/slog" 8 "os" 9 "os/signal" 10 "sync" 11 "syscall" 12 13 "net/http" 14 _ "net/http/pprof" 15 16 "github.com/bluesky-social/indigo/querycheck" 17 "github.com/bluesky-social/indigo/util/tracing" 18 "github.com/labstack/echo-contrib/pprof" 19 "github.com/labstack/echo/v4" 20 21 "github.com/labstack/echo/v4/middleware" 22 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 "go.opentelemetry.io/otel/trace" 25 26 "github.com/carlmjohnson/versioninfo" 27 "github.com/urfave/cli/v2" 28) 29 30func main() { 31 app := cli.App{ 32 Name: "querycheck", 33 Usage: "a postgresql query plan checker", 34 Version: versioninfo.Short(), 35 } 36 37 app.Flags = []cli.Flag{ 38 &cli.StringFlag{ 39 Name: "postgres-url", 40 Usage: "postgres url for storing events", 41 Value: "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable", 42 EnvVars: []string{"POSTGRES_URL"}, 43 }, 44 &cli.IntFlag{ 45 Name: "port", 46 Usage: "port to serve metrics on", 47 Value: 8080, 48 EnvVars: []string{"PORT"}, 49 }, 50 &cli.StringFlag{ 51 Name: "auth-token", 52 Usage: "auth token for accessing the querycheck api", 53 Value: "", 54 EnvVars: []string{"AUTH_TOKEN"}, 55 }, 56 } 57 58 app.Action = Querycheck 59 60 err := app.Run(os.Args) 61 if err != nil { 62 log.Fatal(err) 63 } 64} 65 66var tracer trace.Tracer 67 68// Querycheck is the main function for querycheck 69func Querycheck(cctx *cli.Context) error { 70 ctx := cctx.Context 71 ctx, cancel := context.WithCancel(ctx) 72 defer cancel() 73 74 // Trap SIGINT to trigger a shutdown. 75 signals := make(chan os.Signal, 1) 76 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 77 78 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 79 Level: slog.LevelInfo, 80 })) 81 defer func() { 82 logger.Info("main function teardown") 83 }() 84 85 logger = logger.With("source", "querycheck_main") 86 logger.Info("starting querycheck") 87 88 // Registers a tracer Provider globally if the exporter endpoint is set 89 if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" { 90 logger.Info("initializing tracer") 91 shutdown, err := tracing.InstallExportPipeline(ctx, "Querycheck", 1) 92 if err != nil { 93 log.Fatal(err) 94 } 95 defer func() { 96 if err := shutdown(ctx); err != nil { 97 log.Fatal(err) 98 } 99 }() 100 } 101 102 wg := sync.WaitGroup{} 103 104 // HTTP Server setup and Middleware Plumbing 105 e := echo.New() 106 e.HideBanner = true 107 e.HidePort = true 108 pprof.Register(e) 109 e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 110 e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) 111 112 // Start the query checker 113 querychecker, err := querycheck.NewQuerychecker(ctx, cctx.String("postgres-url")) 114 if err != nil { 115 log.Fatalf("failed to create querychecker: %+v\n", err) 116 } 117 118 // getLikeCountQuery := `SELECT * 119 // FROM like_counts 120 // WHERE actor_did = 'did:plc:q6gjnaw2blty4crticxkmujt' 121 // AND ns = 'app.bsky.feed.post' 122 // AND rkey = '3k3jf5lgbsw24' 123 // LIMIT 1;` 124 125 // querychecker.AddQuery(ctx, "get_like_count", getLikeCountQuery, time.Second*20) 126 127 err = querychecker.Start() 128 if err != nil { 129 log.Fatalf("failed to start querychecker: %+v\n", err) 130 } 131 132 e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { 133 return func(c echo.Context) error { 134 if cctx.String("auth-token") != "" && c.Request().Header.Get("Authorization") != cctx.String("auth-token") { 135 return c.String(http.StatusUnauthorized, "unauthorized") 136 } 137 return next(c) 138 } 139 }) 140 141 e.GET("/query", querychecker.HandleGetQuery) 142 e.GET("/queries", querychecker.HandleGetQueries) 143 e.POST("/query", querychecker.HandleAddQuery) 144 e.PUT("/query", querychecker.HandleUpdateQuery) 145 e.DELETE("/query", querychecker.HandleDeleteQuery) 146 147 // Start the metrics server 148 wg.Add(1) 149 go func() { 150 logger.Info("starting metrics serverd", "port", cctx.Int("port")) 151 if err := e.Start(fmt.Sprintf(":%d", cctx.Int("port"))); err != nil { 152 logger.Error("failed to start metrics server", "err", err) 153 } 154 wg.Done() 155 }() 156 157 select { 158 case <-signals: 159 cancel() 160 fmt.Println("shutting down on signal") 161 case <-ctx.Done(): 162 fmt.Println("shutting down on context done") 163 } 164 165 logger.Info("shutting down, waiting for workers to clean up") 166 167 if err := e.Shutdown(ctx); err != nil { 168 logger.Error("failed to shut down metrics server", "err", err) 169 wg.Done() 170 } 171 172 querychecker.Stop() 173 174 wg.Wait() 175 logger.Info("shut down successfully") 176 177 return nil 178}