1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "log/slog"
8 "os"
9 "os/signal"
10 "sync"
11 "syscall"
12
13 "net/http"
14 _ "net/http/pprof"
15
16 "github.com/bluesky-social/indigo/querycheck"
17 "github.com/bluesky-social/indigo/util/tracing"
18 "github.com/labstack/echo-contrib/pprof"
19 "github.com/labstack/echo/v4"
20
21 "github.com/labstack/echo/v4/middleware"
22
23 "github.com/prometheus/client_golang/prometheus/promhttp"
24 "go.opentelemetry.io/otel/trace"
25
26 "github.com/carlmjohnson/versioninfo"
27 "github.com/urfave/cli/v2"
28)
29
30func main() {
31 app := cli.App{
32 Name: "querycheck",
33 Usage: "a postgresql query plan checker",
34 Version: versioninfo.Short(),
35 }
36
37 app.Flags = []cli.Flag{
38 &cli.StringFlag{
39 Name: "postgres-url",
40 Usage: "postgres url for storing events",
41 Value: "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable",
42 EnvVars: []string{"POSTGRES_URL"},
43 },
44 &cli.IntFlag{
45 Name: "port",
46 Usage: "port to serve metrics on",
47 Value: 8080,
48 EnvVars: []string{"PORT"},
49 },
50 &cli.StringFlag{
51 Name: "auth-token",
52 Usage: "auth token for accessing the querycheck api",
53 Value: "",
54 EnvVars: []string{"AUTH_TOKEN"},
55 },
56 }
57
58 app.Action = Querycheck
59
60 err := app.Run(os.Args)
61 if err != nil {
62 log.Fatal(err)
63 }
64}
65
66var tracer trace.Tracer
67
68// Querycheck is the main function for querycheck
69func Querycheck(cctx *cli.Context) error {
70 ctx := cctx.Context
71 ctx, cancel := context.WithCancel(ctx)
72 defer cancel()
73
74 // Trap SIGINT to trigger a shutdown.
75 signals := make(chan os.Signal, 1)
76 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
77
78 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
79 Level: slog.LevelInfo,
80 }))
81 defer func() {
82 logger.Info("main function teardown")
83 }()
84
85 logger = logger.With("source", "querycheck_main")
86 logger.Info("starting querycheck")
87
88 // Registers a tracer Provider globally if the exporter endpoint is set
89 if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" {
90 logger.Info("initializing tracer")
91 shutdown, err := tracing.InstallExportPipeline(ctx, "Querycheck", 1)
92 if err != nil {
93 log.Fatal(err)
94 }
95 defer func() {
96 if err := shutdown(ctx); err != nil {
97 log.Fatal(err)
98 }
99 }()
100 }
101
102 wg := sync.WaitGroup{}
103
104 // HTTP Server setup and Middleware Plumbing
105 e := echo.New()
106 e.HideBanner = true
107 e.HidePort = true
108 pprof.Register(e)
109 e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
110 e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig))
111
112 // Start the query checker
113 querychecker, err := querycheck.NewQuerychecker(ctx, cctx.String("postgres-url"))
114 if err != nil {
115 log.Fatalf("failed to create querychecker: %+v\n", err)
116 }
117
118 // getLikeCountQuery := `SELECT *
119 // FROM like_counts
120 // WHERE actor_did = 'did:plc:q6gjnaw2blty4crticxkmujt'
121 // AND ns = 'app.bsky.feed.post'
122 // AND rkey = '3k3jf5lgbsw24'
123 // LIMIT 1;`
124
125 // querychecker.AddQuery(ctx, "get_like_count", getLikeCountQuery, time.Second*20)
126
127 err = querychecker.Start()
128 if err != nil {
129 log.Fatalf("failed to start querychecker: %+v\n", err)
130 }
131
132 e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
133 return func(c echo.Context) error {
134 if cctx.String("auth-token") != "" && c.Request().Header.Get("Authorization") != cctx.String("auth-token") {
135 return c.String(http.StatusUnauthorized, "unauthorized")
136 }
137 return next(c)
138 }
139 })
140
141 e.GET("/query", querychecker.HandleGetQuery)
142 e.GET("/queries", querychecker.HandleGetQueries)
143 e.POST("/query", querychecker.HandleAddQuery)
144 e.PUT("/query", querychecker.HandleUpdateQuery)
145 e.DELETE("/query", querychecker.HandleDeleteQuery)
146
147 // Start the metrics server
148 wg.Add(1)
149 go func() {
150 logger.Info("starting metrics serverd", "port", cctx.Int("port"))
151 if err := e.Start(fmt.Sprintf(":%d", cctx.Int("port"))); err != nil {
152 logger.Error("failed to start metrics server", "err", err)
153 }
154 wg.Done()
155 }()
156
157 select {
158 case <-signals:
159 cancel()
160 fmt.Println("shutting down on signal")
161 case <-ctx.Done():
162 fmt.Println("shutting down on context done")
163 }
164
165 logger.Info("shutting down, waiting for workers to clean up")
166
167 if err := e.Shutdown(ctx); err != nil {
168 logger.Error("failed to shut down metrics server", "err", err)
169 wg.Done()
170 }
171
172 querychecker.Stop()
173
174 wg.Wait()
175 logger.Info("shut down successfully")
176
177 return nil
178}