+194
cmd/querycheck/main.go
+194
cmd/querycheck/main.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"log"
7
+
"os"
8
+
"os/signal"
9
+
"sync"
10
+
"syscall"
11
+
"time"
12
+
13
+
"net/http"
14
+
_ "net/http/pprof"
15
+
16
+
"github.com/bluesky-social/indigo/querycheck"
17
+
"github.com/bluesky-social/indigo/util/tracing"
18
+
"github.com/labstack/echo-contrib/pprof"
19
+
"github.com/labstack/echo/v4"
20
+
21
+
"github.com/labstack/echo/v4/middleware"
22
+
23
+
"github.com/prometheus/client_golang/prometheus/promhttp"
24
+
"go.opentelemetry.io/otel/trace"
25
+
"go.uber.org/zap"
26
+
27
+
"github.com/urfave/cli/v2"
28
+
)
29
+
30
+
func main() {
31
+
app := cli.App{
32
+
Name: "querycheck",
33
+
Usage: "a postgresql query plan checker",
34
+
Version: "0.0.1",
35
+
}
36
+
37
+
app.Flags = []cli.Flag{
38
+
&cli.StringFlag{
39
+
Name: "postgres-url",
40
+
Usage: "postgres url for storing events",
41
+
Value: "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable",
42
+
EnvVars: []string{"POSTGRES_URL"},
43
+
},
44
+
&cli.IntFlag{
45
+
Name: "port",
46
+
Usage: "port to serve metrics on",
47
+
Value: 8080,
48
+
EnvVars: []string{"PORT"},
49
+
},
50
+
&cli.StringFlag{
51
+
Name: "auth-token",
52
+
Usage: "auth token for accessing the querycheck api",
53
+
Value: "localdev",
54
+
EnvVars: []string{"AUTH_TOKEN"},
55
+
},
56
+
}
57
+
58
+
app.Action = Querycheck
59
+
60
+
err := app.Run(os.Args)
61
+
if err != nil {
62
+
log.Fatal(err)
63
+
}
64
+
}
65
+
66
+
var tracer trace.Tracer
67
+
68
+
// Querycheck is the main function for querycheck
69
+
func Querycheck(cctx *cli.Context) error {
70
+
ctx := cctx.Context
71
+
ctx, cancel := context.WithCancel(ctx)
72
+
defer cancel()
73
+
74
+
// Trap SIGINT to trigger a shutdown.
75
+
signals := make(chan os.Signal, 1)
76
+
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
77
+
78
+
rawlog, err := zap.NewDevelopment()
79
+
if err != nil {
80
+
log.Fatalf("failed to create logger: %+v\n", err)
81
+
}
82
+
defer func() {
83
+
log.Printf("main function teardown\n")
84
+
err := rawlog.Sync()
85
+
if err != nil {
86
+
log.Printf("failed to sync logger on teardown: %+v", err.Error())
87
+
}
88
+
}()
89
+
90
+
log := rawlog.Sugar().With("source", "querycheck_main")
91
+
92
+
log.Info("starting querycheck")
93
+
94
+
// Registers a tracer Provider globally if the exporter endpoint is set
95
+
if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" {
96
+
log.Info("initializing tracer...")
97
+
shutdown, err := tracing.InstallExportPipeline(ctx, "Querycheck", 1)
98
+
if err != nil {
99
+
log.Fatal(err)
100
+
}
101
+
defer func() {
102
+
if err := shutdown(ctx); err != nil {
103
+
log.Fatal(err)
104
+
}
105
+
}()
106
+
}
107
+
108
+
wg := sync.WaitGroup{}
109
+
110
+
// HTTP Server setup and Middleware Plumbing
111
+
e := echo.New()
112
+
e.HideBanner = true
113
+
e.HidePort = true
114
+
pprof.Register(e)
115
+
e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
116
+
e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig))
117
+
118
+
// Start the query checker
119
+
querychecker, err := querycheck.NewQuerychecker(ctx, cctx.String("postgres-url"))
120
+
if err != nil {
121
+
log.Fatalf("failed to create querychecker: %+v\n", err)
122
+
}
123
+
124
+
getLikesQuery := `SELECT *
125
+
FROM likes
126
+
WHERE subject_actor_did = 'did:plc:q6gjnaw2blty4crticxkmujt'
127
+
AND subject_namespace = 'app.bsky.feed.post'
128
+
AND subject_rkey = '3k3jf5lgbsw24'
129
+
ORDER BY created_at DESC
130
+
LIMIT 50;`
131
+
132
+
querychecker.AddQuery(ctx, "get_likes", getLikesQuery, time.Second*30)
133
+
134
+
getLikeCountQuery := `SELECT *
135
+
FROM like_counts
136
+
WHERE actor_did = 'did:plc:q6gjnaw2blty4crticxkmujt'
137
+
AND ns = 'app.bsky.feed.post'
138
+
AND rkey = '3k3jf5lgbsw24'
139
+
LIMIT 1;`
140
+
141
+
querychecker.AddQuery(ctx, "get_like_count", getLikeCountQuery, time.Second*20)
142
+
143
+
err = querychecker.Start()
144
+
if err != nil {
145
+
log.Fatalf("failed to start querychecker: %+v\n", err)
146
+
}
147
+
148
+
e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
149
+
return func(c echo.Context) error {
150
+
if c.Request().Header.Get("Authorization") != cctx.String("auth-token") {
151
+
return c.String(http.StatusUnauthorized, "unauthorized")
152
+
}
153
+
return next(c)
154
+
}
155
+
})
156
+
157
+
e.GET("/query", querychecker.HandleGetQuery)
158
+
e.GET("/queries", querychecker.HandleGetQueries)
159
+
e.POST("/query", querychecker.HandleAddQuery)
160
+
e.PUT("/query", querychecker.HandleUpdateQuery)
161
+
e.DELETE("/query", querychecker.HandleDeleteQuery)
162
+
163
+
// Start the metrics server
164
+
wg.Add(1)
165
+
go func() {
166
+
log.Infof("starting metrics server on port %d", cctx.Int("port"))
167
+
if err := e.Start(fmt.Sprintf(":%d", cctx.Int("port"))); err != nil {
168
+
log.Errorf("failed to start metrics server: %+v\n", err)
169
+
}
170
+
wg.Done()
171
+
}()
172
+
173
+
select {
174
+
case <-signals:
175
+
cancel()
176
+
fmt.Println("shutting down on signal")
177
+
case <-ctx.Done():
178
+
fmt.Println("shutting down on context done")
179
+
}
180
+
181
+
log.Info("shutting down, waiting for workers to clean up...")
182
+
183
+
if err := e.Shutdown(ctx); err != nil {
184
+
log.Errorf("failed to shut down metrics server: %+v\n", err)
185
+
wg.Done()
186
+
}
187
+
188
+
querychecker.Stop()
189
+
190
+
wg.Wait()
191
+
log.Info("shut down successfully")
192
+
193
+
return nil
194
+
}
+3
-3
go.mod
+3
-3
go.mod
···
26
26
github.com/ipfs/go-log/v2 v2.5.1
27
27
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4
28
28
github.com/ipld/go-car/v2 v2.9.0
29
+
github.com/jackc/pgx/v5 v5.3.0
29
30
github.com/joho/godotenv v1.5.1
30
31
github.com/labstack/echo-contrib v0.15.0
31
32
github.com/labstack/echo/v4 v4.10.2
···
45
46
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0
46
47
go.opentelemetry.io/otel v1.16.0
47
48
go.opentelemetry.io/otel/exporters/jaeger v1.14.0
49
+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0
48
50
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0
49
51
go.opentelemetry.io/otel/sdk v1.16.0
52
+
go.opentelemetry.io/otel/trace v1.16.0
50
53
go.uber.org/zap v1.24.0
51
54
golang.org/x/crypto v0.11.0
52
55
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
···
93
96
github.com/ipld/go-ipld-prime v0.20.0 // indirect
94
97
github.com/jackc/pgpassfile v1.0.0 // indirect
95
98
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
96
-
github.com/jackc/pgx/v5 v5.3.0 // indirect
97
99
github.com/jbenet/goprocess v0.1.4 // indirect
98
100
github.com/jinzhu/inflection v1.0.0 // indirect
99
101
github.com/jinzhu/now v1.1.5 // indirect
···
130
132
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
131
133
go.opencensus.io v0.24.0 // indirect
132
134
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
133
-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
134
135
go.opentelemetry.io/otel/metric v1.16.0 // indirect
135
-
go.opentelemetry.io/otel/trace v1.16.0 // indirect
136
136
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
137
137
go.uber.org/atomic v1.10.0 // indirect
138
138
go.uber.org/multierr v1.11.0 // indirect
+319
querycheck/check.go
+319
querycheck/check.go
···
1
+
package querycheck
2
+
3
+
import (
4
+
"context"
5
+
"log"
6
+
"math"
7
+
"sync"
8
+
"time"
9
+
10
+
"github.com/jackc/pgx/v5"
11
+
"go.opentelemetry.io/otel"
12
+
"go.opentelemetry.io/otel/attribute"
13
+
"go.uber.org/zap"
14
+
)
15
+
16
+
var tracer = otel.Tracer("querycheck")
17
+
18
+
// Query is a query to check
19
+
type Query struct {
20
+
Name string
21
+
Query string
22
+
LastPlan *QueryPlan
23
+
LastChecked time.Time
24
+
LastError error
25
+
CheckEvery time.Duration
26
+
27
+
lk sync.RWMutex
28
+
in chan struct{}
29
+
out chan struct{}
30
+
}
31
+
32
+
// Querychecker is a query checker meta object
33
+
type Querychecker struct {
34
+
Queries []*Query
35
+
Logger *zap.SugaredLogger
36
+
37
+
connectionURL string
38
+
lk sync.RWMutex
39
+
}
40
+
41
+
// NewQuerychecker creates a new querychecker
42
+
func NewQuerychecker(ctx context.Context, connectionURL string) (*Querychecker, error) {
43
+
logger, err := zap.NewDevelopment()
44
+
if err != nil {
45
+
return nil, err
46
+
}
47
+
l := logger.Sugar().With("source", "querychecker_manager")
48
+
49
+
return &Querychecker{
50
+
connectionURL: connectionURL,
51
+
Logger: l,
52
+
Queries: []*Query{},
53
+
}, nil
54
+
}
55
+
56
+
// AddQuery adds a query to the checker
57
+
func (q *Querychecker) AddQuery(ctx context.Context, name, query string, checkEvery time.Duration) {
58
+
ctx, span := tracer.Start(ctx, "AddQuery")
59
+
defer span.End()
60
+
61
+
span.SetAttributes(attribute.String("name", name))
62
+
span.SetAttributes(attribute.String("query", query))
63
+
span.SetAttributes(attribute.String("checkEvery", checkEvery.String()))
64
+
65
+
q.lk.Lock()
66
+
q.Queries = append(q.Queries, &Query{
67
+
Name: name,
68
+
Query: query,
69
+
CheckEvery: checkEvery,
70
+
71
+
in: make(chan struct{}),
72
+
out: make(chan struct{}),
73
+
})
74
+
q.lk.Unlock()
75
+
}
76
+
77
+
// RemoveQuery removes a query from the checker
78
+
func (q *Querychecker) RemoveQuery(ctx context.Context, name string) {
79
+
ctx, span := tracer.Start(ctx, "RemoveQuery")
80
+
defer span.End()
81
+
82
+
span.SetAttributes(attribute.String("name", name))
83
+
84
+
q.lk.Lock()
85
+
defer q.lk.Unlock()
86
+
for i, qu := range q.Queries {
87
+
if qu.Name == name {
88
+
q.Queries = append(q.Queries[:i], q.Queries[i+1:]...)
89
+
return
90
+
}
91
+
}
92
+
}
93
+
94
+
// GetQuery returns a copy of the query
95
+
func (q *Querychecker) GetQuery(ctx context.Context, name string) *Query {
96
+
ctx, span := tracer.Start(ctx, "GetQuery")
97
+
defer span.End()
98
+
99
+
span.SetAttributes(attribute.String("name", name))
100
+
101
+
q.lk.RLock()
102
+
defer q.lk.RUnlock()
103
+
for _, qu := range q.Queries {
104
+
if qu.Name == name {
105
+
return &Query{
106
+
Name: qu.Name,
107
+
Query: qu.Query,
108
+
LastPlan: qu.LastPlan,
109
+
LastChecked: qu.LastChecked,
110
+
LastError: qu.LastError,
111
+
CheckEvery: qu.CheckEvery,
112
+
}
113
+
}
114
+
}
115
+
return nil
116
+
}
117
+
118
+
// GetQueries returns a copy of the queries
119
+
func (q *Querychecker) GetQueries(ctx context.Context) []*Query {
120
+
ctx, span := tracer.Start(ctx, "GetQueries")
121
+
defer span.End()
122
+
123
+
q.lk.RLock()
124
+
defer q.lk.RUnlock()
125
+
queries := make([]*Query, len(q.Queries))
126
+
for i, qu := range q.Queries {
127
+
queries[i] = &Query{
128
+
Name: qu.Name,
129
+
Query: qu.Query,
130
+
LastPlan: qu.LastPlan,
131
+
LastChecked: qu.LastChecked,
132
+
LastError: qu.LastError,
133
+
CheckEvery: qu.CheckEvery,
134
+
}
135
+
}
136
+
137
+
return queries
138
+
}
139
+
140
+
// UpdateQuery updates a query
141
+
func (q *Querychecker) UpdateQuery(ctx context.Context, name, query string, checkEvery time.Duration) {
142
+
ctx, span := tracer.Start(ctx, "UpdateQuery")
143
+
defer span.End()
144
+
145
+
span.SetAttributes(attribute.String("name", name))
146
+
span.SetAttributes(attribute.String("query", query))
147
+
span.SetAttributes(attribute.String("checkEvery", checkEvery.String()))
148
+
149
+
for _, qu := range q.Queries {
150
+
if qu.Name == name {
151
+
qu.lk.Lock()
152
+
qu.Query = query
153
+
qu.CheckEvery = checkEvery
154
+
qu.lk.Unlock()
155
+
return
156
+
}
157
+
}
158
+
}
159
+
160
+
// Start starts the query checker routines
161
+
func (q *Querychecker) Start() error {
162
+
ctx, span := tracer.Start(context.Background(), "Start")
163
+
defer span.End()
164
+
165
+
for _, qu := range q.Queries {
166
+
go func(query *Query) {
167
+
rawlog, err := zap.NewDevelopment()
168
+
if err != nil {
169
+
log.Fatalf("failed to create logger: %+v\n", err)
170
+
}
171
+
log := rawlog.Sugar().With("source", "query_checker_routine", "query", query.Name)
172
+
173
+
log.Infof("query checker routine started for query: %s\n", query.Name)
174
+
log.Infof("Query: \n%s\n", query.Query)
175
+
176
+
// Check the query plan every CheckEvery duration
177
+
ticker := time.NewTicker(query.CheckEvery)
178
+
defer ticker.Stop()
179
+
180
+
query.LastPlan, err = q.CheckQueryPlan(ctx, query.Query)
181
+
if err != nil {
182
+
log.Errorf("failed to check query plan: %+v\n", err)
183
+
}
184
+
185
+
if query.LastPlan != nil {
186
+
log.Infof("Initial plan:\n%+v\n", query.LastPlan.String())
187
+
query.RecordPlanMetrics(*query.LastPlan)
188
+
query.LastChecked = time.Now()
189
+
}
190
+
191
+
for {
192
+
select {
193
+
case <-ticker.C:
194
+
log.Info("checking query plan")
195
+
196
+
query.lk.RLock()
197
+
queryString := query.Query
198
+
query.lk.RUnlock()
199
+
200
+
qp, err := q.CheckQueryPlan(ctx, queryString)
201
+
202
+
query.lk.Lock()
203
+
query.LastChecked = time.Now()
204
+
query.LastError = err
205
+
query.lk.Unlock()
206
+
207
+
execCounter.WithLabelValues(query.Name).Inc()
208
+
209
+
if err != nil || qp == nil {
210
+
if qp == nil {
211
+
log.Errorf("query plan is nil")
212
+
}
213
+
log.Errorf("failed to check query plan: %+v\n", err)
214
+
errorCounter.WithLabelValues(query.Name).Inc()
215
+
continue
216
+
}
217
+
218
+
query.lk.RLock()
219
+
lastPlan := *query.LastPlan
220
+
query.lk.RUnlock()
221
+
222
+
query.RecordPlanMetrics(*qp)
223
+
224
+
if !qp.HasSameStructureAs(lastPlan) {
225
+
sign := "+"
226
+
diff := math.Abs(lastPlan.Plan.ActualTotalTime - qp.Plan.ActualTotalTime)
227
+
if lastPlan.Plan.ActualTotalTime > qp.Plan.ActualTotalTime {
228
+
sign = "-"
229
+
}
230
+
231
+
log.Infof("query plan has changed (%s%.03fms): \n%+v\n", sign, diff, qp.String())
232
+
233
+
query.lk.Lock()
234
+
query.LastPlan = qp
235
+
query.lk.Unlock()
236
+
}
237
+
case <-query.in:
238
+
log.Info("shutting down query checker routine")
239
+
query.out <- struct{}{}
240
+
return
241
+
}
242
+
}
243
+
}(qu)
244
+
}
245
+
246
+
return nil
247
+
}
248
+
249
+
// Stop stops the query checker routines
250
+
func (q *Querychecker) Stop() {
251
+
_, span := tracer.Start(context.Background(), "Stop")
252
+
defer span.End()
253
+
254
+
q.Logger.Info("stopping query checker")
255
+
256
+
for _, qu := range q.Queries {
257
+
qu.in <- struct{}{}
258
+
}
259
+
260
+
for _, qu := range q.Queries {
261
+
<-qu.out
262
+
}
263
+
264
+
q.Logger.Info("query checker stopped")
265
+
}
266
+
267
+
// CheckQueryPlan checks the query plan for a given query
268
+
func (q *Querychecker) CheckQueryPlan(ctx context.Context, query string) (*QueryPlan, error) {
269
+
ctx, span := tracer.Start(ctx, "CheckQueryPlan")
270
+
defer span.End()
271
+
272
+
conn, err := pgx.Connect(ctx, q.connectionURL)
273
+
if err != nil {
274
+
return nil, err
275
+
}
276
+
defer conn.Close(ctx)
277
+
278
+
rows, err := conn.Query(ctx, "EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS, FORMAT JSON) "+query)
279
+
if err != nil {
280
+
return nil, err
281
+
}
282
+
283
+
var plan QueryPlan
284
+
285
+
for rows.Next() {
286
+
var plans QueryPlans
287
+
err := rows.Scan(&plans)
288
+
if err != nil {
289
+
return nil, err
290
+
}
291
+
for _, p := range plans {
292
+
plan = p
293
+
}
294
+
}
295
+
296
+
return &plan, nil
297
+
}
298
+
299
+
// RecordPlanMetrics records the query plan metrics
300
+
func (qu *Query) RecordPlanMetrics(qp QueryPlan) {
301
+
execDurationCounter.WithLabelValues(qu.Name).Add(qp.Plan.ActualTotalTime)
302
+
blocksHitCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedHitBlocks))
303
+
blocksReadCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedReadBlocks))
304
+
blocksWrittenCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedWrittenBlocks))
305
+
blocksDirtyCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedDirtiedBlocks))
306
+
ioReadTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOReadTime)
307
+
ioWriteTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOWriteTime)
308
+
tempWrittenBlocksCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.TempWrittenBlocks))
309
+
310
+
qu.RecordPlanNode(qp.Plan)
311
+
}
312
+
313
+
// RecordPlanNode records the query plan node metrics
314
+
func (qu *Query) RecordPlanNode(p Plan) {
315
+
planNodeCounter.WithLabelValues(qu.Name, p.NodeType).Inc()
316
+
for _, n := range p.Plans {
317
+
qu.RecordPlanNode(n)
318
+
}
319
+
}
+95
querycheck/handlers.go
+95
querycheck/handlers.go
···
1
+
package querycheck
2
+
3
+
import (
4
+
"time"
5
+
6
+
"github.com/labstack/echo/v4"
7
+
)
8
+
9
+
type RetQuery struct {
10
+
Name string `json:"name"`
11
+
Query string `json:"query"`
12
+
LastPlan *QueryPlan `json:"last_plan"`
13
+
LastChecked time.Time `json:"last_checked"`
14
+
LastError error `json:"last_error"`
15
+
CheckEvery string `json:"check_every"`
16
+
}
17
+
18
+
func (q *Querychecker) HandleGetQueries(c echo.Context) error {
19
+
queries := q.GetQueries(c.Request().Context())
20
+
retQueries := []RetQuery{}
21
+
for _, query := range queries {
22
+
retQueries = append(retQueries, RetQuery{
23
+
Name: query.Name,
24
+
Query: query.Query,
25
+
LastPlan: query.LastPlan,
26
+
LastChecked: query.LastChecked,
27
+
LastError: query.LastError,
28
+
CheckEvery: query.CheckEvery.String(),
29
+
})
30
+
}
31
+
32
+
return c.JSON(200, retQueries)
33
+
}
34
+
35
+
func (q *Querychecker) HandleGetQuery(c echo.Context) error {
36
+
query := q.GetQuery(c.Request().Context(), c.Param("name"))
37
+
if query == nil {
38
+
return c.JSON(404, echo.Map{
39
+
"message": "not found",
40
+
})
41
+
}
42
+
43
+
retQuery := RetQuery{
44
+
Name: query.Name,
45
+
Query: query.Query,
46
+
LastPlan: query.LastPlan,
47
+
LastChecked: query.LastChecked,
48
+
LastError: query.LastError,
49
+
CheckEvery: query.CheckEvery.String(),
50
+
}
51
+
52
+
return c.JSON(200, retQuery)
53
+
}
54
+
55
+
func (q *Querychecker) HandleDeleteQuery(c echo.Context) error {
56
+
q.RemoveQuery(c.Request().Context(), c.Param("name"))
57
+
return c.JSON(200, echo.Map{
58
+
"message": "success",
59
+
})
60
+
}
61
+
62
+
type AddQueryRequest struct {
63
+
Name string `json:"name"`
64
+
Query string `json:"query"`
65
+
CheckEvery int64 `json:"check_every_ms"`
66
+
}
67
+
68
+
func (q *Querychecker) HandleAddQuery(c echo.Context) error {
69
+
var req AddQueryRequest
70
+
c.Bind(&req)
71
+
q.AddQuery(c.Request().Context(), req.Name, req.Query, time.Duration(req.CheckEvery)*time.Millisecond)
72
+
return c.JSON(200, echo.Map{
73
+
"name": req.Name,
74
+
"query": req.Query,
75
+
"check_every_ms": req.CheckEvery,
76
+
"message": "success",
77
+
})
78
+
}
79
+
80
+
type UpdateQueryRequest struct {
81
+
Query string `json:"query"`
82
+
CheckEvery int64 `json:"check_every_ms"`
83
+
}
84
+
85
+
func (q *Querychecker) HandleUpdateQuery(c echo.Context) error {
86
+
var req UpdateQueryRequest
87
+
c.Bind(&req)
88
+
q.UpdateQuery(c.Request().Context(), c.Param("name"), req.Query, time.Duration(req.CheckEvery)*time.Millisecond)
89
+
return c.JSON(200, echo.Map{
90
+
"name": c.Param("name"),
91
+
"query": req.Query,
92
+
"check_every_ms": req.CheckEvery,
93
+
"message": "success",
94
+
})
95
+
}
+61
querycheck/metrics.go
+61
querycheck/metrics.go
···
1
+
package querycheck
2
+
3
+
import (
4
+
"github.com/prometheus/client_golang/prometheus"
5
+
"github.com/prometheus/client_golang/prometheus/promauto"
6
+
)
7
+
8
+
var execCounter = promauto.NewCounterVec(prometheus.CounterOpts{
9
+
Name: "querycheck_exec_total",
10
+
Help: "total number of executions since starting the querycheck",
11
+
}, []string{"query"})
12
+
13
+
var execDurationCounter = promauto.NewCounterVec(prometheus.CounterOpts{
14
+
Name: "querycheck_exec_duration_ms_total",
15
+
Help: "total ms spent executing the query since starting the querycheck",
16
+
}, []string{"query"})
17
+
18
+
var errorCounter = promauto.NewCounterVec(prometheus.CounterOpts{
19
+
Name: "querycheck_errors_total",
20
+
Help: "number of errors encountered since starting the querycheck",
21
+
}, []string{"query"})
22
+
23
+
var blocksHitCounter = promauto.NewCounterVec(prometheus.CounterOpts{
24
+
Name: "querycheck_blocks_hit_total",
25
+
Help: "blocks hit total since starting the querycheck",
26
+
}, []string{"query"})
27
+
28
+
var blocksReadCounter = promauto.NewCounterVec(prometheus.CounterOpts{
29
+
Name: "querycheck_blocks_read_total",
30
+
Help: "blocks read total since starting the querycheck",
31
+
}, []string{"query"})
32
+
33
+
var blocksWrittenCounter = promauto.NewCounterVec(prometheus.CounterOpts{
34
+
Name: "querycheck_blocks_written_total",
35
+
Help: "blocks written total since starting the querycheck",
36
+
}, []string{"query"})
37
+
38
+
var blocksDirtyCounter = promauto.NewCounterVec(prometheus.CounterOpts{
39
+
Name: "querycheck_blocks_dirty_total",
40
+
Help: "blocks dirty total since starting the querycheck",
41
+
}, []string{"query"})
42
+
43
+
var ioReadTimeCounter = promauto.NewCounterVec(prometheus.CounterOpts{
44
+
Name: "querycheck_io_read_ms_total",
45
+
Help: "io read time (in ms) total since starting the querycheck",
46
+
}, []string{"query"})
47
+
48
+
var ioWriteTimeCounter = promauto.NewCounterVec(prometheus.CounterOpts{
49
+
Name: "querycheck_io_write_ms_total",
50
+
Help: "io write time (in ms) total since starting the querycheck",
51
+
}, []string{"query"})
52
+
53
+
var tempWrittenBlocksCounter = promauto.NewCounterVec(prometheus.CounterOpts{
54
+
Name: "querycheck_temp_written_blocks_total",
55
+
Help: "temp written blocks total since starting the querycheck",
56
+
}, []string{"query"})
57
+
58
+
var planNodeCounter = promauto.NewCounterVec(prometheus.CounterOpts{
59
+
Name: "querycheck_plan_node_count_total",
60
+
Help: "plan node count total since starting the querycheck",
61
+
}, []string{"query", "node_type"})
+166
querycheck/plan.go
+166
querycheck/plan.go
···
1
+
package querycheck
2
+
3
+
import "fmt"
4
+
5
+
type Plan struct {
6
+
NodeType string `json:"Node Type"`
7
+
ParallelAware bool `json:"Parallel Aware"`
8
+
AsyncCapable bool `json:"Async Capable"`
9
+
StartupCost float64 `json:"Startup Cost"`
10
+
TotalCost float64 `json:"Total Cost"`
11
+
PlanRows int `json:"Plan Rows"`
12
+
PlanWidth int `json:"Plan Width"`
13
+
ActualStartupTime float64 `json:"Actual Startup Time"`
14
+
ActualTotalTime float64 `json:"Actual Total Time"`
15
+
ActualRows int `json:"Actual Rows"`
16
+
ActualLoops int `json:"Actual Loops"`
17
+
Output []string `json:"Output"`
18
+
SharedHitBlocks int `json:"Shared Hit Blocks"`
19
+
SharedReadBlocks int `json:"Shared Read Blocks"`
20
+
SharedDirtiedBlocks int `json:"Shared Dirtied Blocks"`
21
+
SharedWrittenBlocks int `json:"Shared Written Blocks"`
22
+
LocalHitBlocks int `json:"Local Hit Blocks"`
23
+
LocalReadBlocks int `json:"Local Read Blocks"`
24
+
LocalDirtiedBlocks int `json:"Local Dirtied Blocks"`
25
+
LocalWrittenBlocks int `json:"Local Written Blocks"`
26
+
TempReadBlocks int `json:"Temp Read Blocks"`
27
+
TempWrittenBlocks int `json:"Temp Written Blocks"`
28
+
IOReadTime float64 `json:"I/O Read Time"`
29
+
IOWriteTime float64 `json:"I/O Write Time"`
30
+
Plans []Plan `json:"Plans,omitempty"`
31
+
ParentRelationship string `json:"Parent Relationship,omitempty"`
32
+
SortKey []string `json:"Sort Key,omitempty"`
33
+
SortMethod string `json:"Sort Method,omitempty"`
34
+
SortSpaceUsed int `json:"Sort Space Used,omitempty"`
35
+
SortSpaceType string `json:"Sort Space Type,omitempty"`
36
+
WorkersPlanned int `json:"Workers Planned,omitempty"`
37
+
WorkersLaunched int `json:"Workers Launched,omitempty"`
38
+
SingleCopy bool `json:"Single Copy,omitempty"`
39
+
RelationName string `json:"Relation Name,omitempty"`
40
+
Schema string `json:"Schema,omitempty"`
41
+
Alias string `json:"Alias,omitempty"`
42
+
Filter string `json:"Filter,omitempty"`
43
+
RowsRemovedByFilter int `json:"Rows Removed by Filter,omitempty"`
44
+
Workers []Worker `json:"Workers,omitempty"`
45
+
}
46
+
47
+
type Worker struct {
48
+
WorkerNumber int `json:"Worker Number"`
49
+
ActualStartupTime float64 `json:"Actual Startup Time"`
50
+
ActualTotalTime float64 `json:"Actual Total Time"`
51
+
ActualRows int `json:"Actual Rows"`
52
+
ActualLoops int `json:"Actual Loops"`
53
+
JIT JIT `json:"JIT"`
54
+
SharedHitBlocks int `json:"Shared Hit Blocks"`
55
+
SharedReadBlocks int `json:"Shared Read Blocks"`
56
+
SharedDirtiedBlocks int `json:"Shared Dirtied Blocks"`
57
+
SharedWrittenBlocks int `json:"Shared Written Blocks"`
58
+
LocalHitBlocks int `json:"Local Hit Blocks"`
59
+
LocalReadBlocks int `json:"Local Read Blocks"`
60
+
LocalDirtiedBlocks int `json:"Local Dirtied Blocks"`
61
+
LocalWrittenBlocks int `json:"Local Written Blocks"`
62
+
TempReadBlocks int `json:"Temp Read Blocks"`
63
+
TempWrittenBlocks int `json:"Temp Written Blocks"`
64
+
IOReadTime float64 `json:"I/O Read Time"`
65
+
IOWriteTime float64 `json:"I/O Write Time"`
66
+
}
67
+
68
+
type JIT struct {
69
+
Functions int `json:"Functions"`
70
+
Options Options `json:"Options"`
71
+
Timing Timing `json:"Timing"`
72
+
}
73
+
74
+
type Options struct {
75
+
Inlining bool `json:"Inlining"`
76
+
Optimization bool `json:"Optimization"`
77
+
Expressions bool `json:"Expressions"`
78
+
Deforming bool `json:"Deforming"`
79
+
}
80
+
81
+
type Timing struct {
82
+
Generation float64 `json:"Generation"`
83
+
Inlining float64 `json:"Inlining"`
84
+
Optimization float64 `json:"Optimization"`
85
+
Emission float64 `json:"Emission"`
86
+
Total float64 `json:"Total"`
87
+
}
88
+
89
+
type QueryPlan struct {
90
+
Plan Plan `json:"Plan"`
91
+
QueryIdentifier int64 `json:"Query Identifier"`
92
+
Planning Planning `json:"Planning"`
93
+
PlanningTime float64 `json:"Planning Time"`
94
+
Triggers []string `json:"Triggers"`
95
+
JIT JIT `json:"JIT"`
96
+
ExecutionTime float64 `json:"Execution Time"`
97
+
}
98
+
99
+
type Planning struct {
100
+
SharedHitBlocks int `json:"Shared Hit Blocks"`
101
+
SharedReadBlocks int `json:"Shared Read Blocks"`
102
+
SharedDirtiedBlocks int `json:"Shared Dirtied Blocks"`
103
+
SharedWrittenBlocks int `json:"Shared Written Blocks"`
104
+
LocalHitBlocks int `json:"Local Hit Blocks"`
105
+
LocalReadBlocks int `json:"Local Read Blocks"`
106
+
LocalDirtiedBlocks int `json:"Local Dirtied Blocks"`
107
+
LocalWrittenBlocks int `json:"Local Written Blocks"`
108
+
TempReadBlocks int `json:"Temp Read Blocks"`
109
+
TempWrittenBlocks int `json:"Temp Written Blocks"`
110
+
IOReadTime float64 `json:"I/O Read Time"`
111
+
IOWriteTime float64 `json:"I/O Write Time"`
112
+
}
113
+
114
+
type QueryPlans []QueryPlan
115
+
116
+
func (q *QueryPlan) String() string {
117
+
ret := ""
118
+
ret += q.Plan.String(1)
119
+
return ret
120
+
}
121
+
122
+
func (p *Plan) String(i int) string {
123
+
ret := ""
124
+
ret += fmt.Sprintf("(%s) Timing: %fms | IO Read: %fms (H %d R %d D %d W %d) | IO Write: %fms",
125
+
p.NodeType,
126
+
p.ActualTotalTime,
127
+
p.IOReadTime,
128
+
p.SharedHitBlocks,
129
+
p.SharedReadBlocks,
130
+
p.SharedWrittenBlocks,
131
+
p.SharedDirtiedBlocks,
132
+
p.IOWriteTime,
133
+
)
134
+
135
+
for _, plan := range p.Plans {
136
+
ret += "\n"
137
+
for j := 0; j < i; j++ {
138
+
ret += "\t"
139
+
}
140
+
ret += plan.String(i + 1)
141
+
}
142
+
143
+
return ret
144
+
}
145
+
146
+
func (q *QueryPlan) HasSameStructureAs(other QueryPlan) bool {
147
+
return q.Plan.HasSameStructureAs(other.Plan)
148
+
}
149
+
150
+
func (p *Plan) HasSameStructureAs(other Plan) bool {
151
+
if p.NodeType != other.NodeType {
152
+
return false
153
+
}
154
+
155
+
if len(p.Plans) != len(other.Plans) {
156
+
return false
157
+
}
158
+
159
+
for i, plan := range p.Plans {
160
+
if !plan.HasSameStructureAs(other.Plans[i]) {
161
+
return false
162
+
}
163
+
}
164
+
165
+
return true
166
+
}
+50
util/tracing/tracing.go
+50
util/tracing/tracing.go
···
1
+
package tracing
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
7
+
"go.opentelemetry.io/otel"
8
+
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
9
+
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
10
+
"go.opentelemetry.io/otel/sdk/resource"
11
+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
12
+
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
13
+
)
14
+
15
+
func InstallExportPipeline(ctx context.Context, serviceName string, sampleRatio float64) (func(context.Context) error, error) {
16
+
client := otlptracehttp.NewClient()
17
+
exporter, err := otlptrace.New(ctx, client)
18
+
if err != nil {
19
+
return nil, fmt.Errorf("creating OTLP trace exporter: %w", err)
20
+
}
21
+
22
+
tracerProvider := newTraceProvider(exporter, serviceName, sampleRatio)
23
+
otel.SetTracerProvider(tracerProvider)
24
+
25
+
return tracerProvider.Shutdown, nil
26
+
}
27
+
28
+
func newTraceProvider(exp sdktrace.SpanExporter, serviceName string, sampleRatio float64) *sdktrace.TracerProvider {
29
+
// Ensure default SDK resources and the required service name are set.
30
+
r, err := resource.Merge(
31
+
resource.Default(),
32
+
resource.NewWithAttributes(
33
+
semconv.SchemaURL,
34
+
semconv.ServiceName(serviceName),
35
+
),
36
+
)
37
+
38
+
if err != nil {
39
+
panic(err)
40
+
}
41
+
42
+
// initialize the traceIDRatioBasedSampler
43
+
traceIDRatioBasedSampler := sdktrace.TraceIDRatioBased(sampleRatio)
44
+
45
+
return sdktrace.NewTracerProvider(
46
+
sdktrace.WithSampler(traceIDRatioBasedSampler),
47
+
sdktrace.WithBatcher(exp),
48
+
sdktrace.WithResource(r),
49
+
)
50
+
}