1package querycheck
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "math"
8 "os"
9 "sync"
10 "time"
11
12 "github.com/jackc/pgx/v5"
13 "go.opentelemetry.io/otel"
14 "go.opentelemetry.io/otel/attribute"
15)
16
17var tracer = otel.Tracer("querycheck")
18
19// Query is a query to check
20type Query struct {
21 Name string
22 Query string
23 LatestPlan *QueryPlan
24 PreviousPlan *QueryPlan
25 LastChecked time.Time
26 LastError error
27 CheckEvery time.Duration
28
29 lk sync.RWMutex
30 in chan struct{}
31 out chan struct{}
32}
33
34// Querychecker is a query checker meta object
35type Querychecker struct {
36 Queries []*Query
37 Logger *slog.Logger
38
39 connectionURL string
40 lk sync.RWMutex
41}
42
43// NewQuerychecker creates a new querychecker
44func NewQuerychecker(ctx context.Context, connectionURL string) (*Querychecker, error) {
45 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
46 Level: slog.LevelInfo,
47 }))
48 logger = logger.With("source", "querychecker_manager")
49
50 return &Querychecker{
51 connectionURL: connectionURL,
52 Logger: logger,
53 Queries: []*Query{},
54 }, nil
55}
56
57// AddQuery adds a query to the checker
58func (q *Querychecker) AddQuery(ctx context.Context, name, query string, checkEvery time.Duration) {
59 ctx, span := tracer.Start(ctx, "AddQuery")
60 defer span.End()
61
62 span.SetAttributes(attribute.String("name", name))
63 span.SetAttributes(attribute.String("query", query))
64 span.SetAttributes(attribute.String("checkEvery", checkEvery.String()))
65
66 q.lk.Lock()
67 q.Queries = append(q.Queries, &Query{
68 Name: name,
69 Query: query,
70 CheckEvery: checkEvery,
71
72 in: make(chan struct{}),
73 out: make(chan struct{}),
74 })
75 q.lk.Unlock()
76}
77
78// RemoveQuery removes a query from the checker
79func (q *Querychecker) RemoveQuery(ctx context.Context, name string) {
80 ctx, span := tracer.Start(ctx, "RemoveQuery")
81 defer span.End()
82
83 span.SetAttributes(attribute.String("name", name))
84
85 q.lk.Lock()
86 defer q.lk.Unlock()
87 for i, qu := range q.Queries {
88 if qu.Name == name {
89 q.Queries = append(q.Queries[:i], q.Queries[i+1:]...)
90 return
91 }
92 }
93}
94
95// GetQuery returns a copy of the query
96func (q *Querychecker) GetQuery(ctx context.Context, name string) *Query {
97 ctx, span := tracer.Start(ctx, "GetQuery")
98 defer span.End()
99
100 span.SetAttributes(attribute.String("name", name))
101
102 q.lk.RLock()
103 defer q.lk.RUnlock()
104 for _, qu := range q.Queries {
105 if qu.Name == name {
106 return &Query{
107 Name: qu.Name,
108 Query: qu.Query,
109 LatestPlan: qu.LatestPlan,
110 PreviousPlan: qu.PreviousPlan,
111 LastChecked: qu.LastChecked,
112 LastError: qu.LastError,
113 CheckEvery: qu.CheckEvery,
114 }
115 }
116 }
117 return nil
118}
119
120// GetQueries returns a copy of the queries
121func (q *Querychecker) GetQueries(ctx context.Context) []*Query {
122 ctx, span := tracer.Start(ctx, "GetQueries")
123 defer span.End()
124
125 q.lk.RLock()
126 defer q.lk.RUnlock()
127 queries := make([]*Query, len(q.Queries))
128 for i, qu := range q.Queries {
129 queries[i] = &Query{
130 Name: qu.Name,
131 Query: qu.Query,
132 LatestPlan: qu.LatestPlan,
133 PreviousPlan: qu.PreviousPlan,
134 LastChecked: qu.LastChecked,
135 LastError: qu.LastError,
136 CheckEvery: qu.CheckEvery,
137 }
138 }
139
140 return queries
141}
142
143// UpdateQuery updates a query
144func (q *Querychecker) UpdateQuery(ctx context.Context, name, query string, checkEvery time.Duration) {
145 ctx, span := tracer.Start(ctx, "UpdateQuery")
146 defer span.End()
147
148 span.SetAttributes(attribute.String("name", name))
149 span.SetAttributes(attribute.String("query", query))
150 span.SetAttributes(attribute.String("checkEvery", checkEvery.String()))
151
152 for _, qu := range q.Queries {
153 if qu.Name == name {
154 qu.lk.Lock()
155 qu.Query = query
156 qu.CheckEvery = checkEvery
157 qu.lk.Unlock()
158 return
159 }
160 }
161}
162
163// Start starts the query checker routines
164func (q *Querychecker) Start() error {
165 ctx, span := tracer.Start(context.Background(), "Start")
166 defer span.End()
167
168 for _, qu := range q.Queries {
169 go func(query *Query) {
170 log := q.Logger.With("source", "query_checker_routine", "query", query.Name)
171
172 log.Info("query checker routine started for query", "query", query.Name)
173 log.Info(fmt.Sprintf("Query: \n%s\n", query.Query))
174
175 // Check the query plan every CheckEvery duration
176 ticker := time.NewTicker(query.CheckEvery)
177 defer ticker.Stop()
178
179 var err error
180 query.LatestPlan, err = q.CheckQueryPlan(ctx, query.Query)
181 if err != nil {
182 log.Error("failed to check query plan", "err", err)
183 }
184
185 if query.LatestPlan != nil {
186 log.Info(fmt.Sprintf("Initial plan:\n%+v\n", query.LatestPlan.String()))
187 query.RecordPlanMetrics(*query.LatestPlan)
188 query.LastChecked = time.Now()
189 }
190
191 for {
192 select {
193 case <-ticker.C:
194 log.Info("checking query plan")
195
196 query.lk.RLock()
197 queryString := query.Query
198 query.lk.RUnlock()
199
200 qp, err := q.CheckQueryPlan(ctx, queryString)
201
202 query.lk.Lock()
203 query.LastChecked = time.Now()
204 query.LastError = err
205 query.lk.Unlock()
206
207 execCounter.WithLabelValues(query.Name).Inc()
208
209 if err != nil || qp == nil {
210 if qp == nil {
211 log.Error("query plan is nil")
212 }
213 log.Error("failed to check query plan", "err", err)
214 errorCounter.WithLabelValues(query.Name).Inc()
215 continue
216 }
217
218 query.lk.RLock()
219 lastPlan := *query.LatestPlan
220 query.lk.RUnlock()
221
222 query.RecordPlanMetrics(*qp)
223
224 if !qp.HasSameStructureAs(lastPlan) {
225 sign := "+"
226 diff := math.Abs(lastPlan.Plan.ActualTotalTime - qp.Plan.ActualTotalTime)
227 if lastPlan.Plan.ActualTotalTime > qp.Plan.ActualTotalTime {
228 sign = "-"
229 }
230
231 log.Info("query plan has changed", "diff", fmt.Sprintf("%s%.03fms", sign, diff), "query_plan", qp.String())
232
233 query.lk.Lock()
234 query.PreviousPlan = query.LatestPlan
235 query.LatestPlan = qp
236 query.lk.Unlock()
237 }
238 case <-query.in:
239 log.Info("shutting down query checker routine")
240 query.out <- struct{}{}
241 return
242 }
243 }
244 }(qu)
245 }
246
247 return nil
248}
249
250// Stop stops the query checker routines
251func (q *Querychecker) Stop() {
252 _, span := tracer.Start(context.Background(), "Stop")
253 defer span.End()
254
255 q.Logger.Info("stopping query checker")
256
257 for _, qu := range q.Queries {
258 qu.in <- struct{}{}
259 }
260
261 for _, qu := range q.Queries {
262 <-qu.out
263 }
264
265 q.Logger.Info("query checker stopped")
266}
267
268// CheckQueryPlan checks the query plan for a given query
269func (q *Querychecker) CheckQueryPlan(ctx context.Context, query string) (*QueryPlan, error) {
270 ctx, span := tracer.Start(ctx, "CheckQueryPlan")
271 defer span.End()
272
273 conn, err := pgx.Connect(ctx, q.connectionURL)
274 if err != nil {
275 return nil, err
276 }
277 defer conn.Close(ctx)
278
279 rows, err := conn.Query(ctx, "EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS, FORMAT JSON) "+query)
280 if err != nil {
281 return nil, err
282 }
283
284 var plan QueryPlan
285
286 for rows.Next() {
287 var plans QueryPlans
288 err := rows.Scan(&plans)
289 if err != nil {
290 return nil, err
291 }
292 for _, p := range plans {
293 plan = p
294 }
295 }
296
297 return &plan, nil
298}
299
300// RecordPlanMetrics records the query plan metrics
301func (qu *Query) RecordPlanMetrics(qp QueryPlan) {
302 execDurationCounter.WithLabelValues(qu.Name).Add(qp.Plan.ActualTotalTime)
303 blocksHitCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedHitBlocks))
304 blocksReadCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedReadBlocks))
305 blocksWrittenCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedWrittenBlocks))
306 blocksDirtyCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedDirtiedBlocks))
307 ioReadTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOReadTime)
308 ioWriteTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOWriteTime)
309 tempWrittenBlocksCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.TempWrittenBlocks))
310
311 qu.RecordPlanNode(qp.Plan)
312}
313
314// RecordPlanNode records the query plan node metrics
315func (qu *Query) RecordPlanNode(p Plan) {
316 planNodeCounter.WithLabelValues(qu.Name, p.NodeType).Inc()
317 for _, n := range p.Plans {
318 qu.RecordPlanNode(n)
319 }
320}