porting all github actions from bluesky-social/indigo to tangled CI
at main 8.0 kB view raw
1package querycheck 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "math" 8 "os" 9 "sync" 10 "time" 11 12 "github.com/jackc/pgx/v5" 13 "go.opentelemetry.io/otel" 14 "go.opentelemetry.io/otel/attribute" 15) 16 17var tracer = otel.Tracer("querycheck") 18 19// Query is a query to check 20type Query struct { 21 Name string 22 Query string 23 LatestPlan *QueryPlan 24 PreviousPlan *QueryPlan 25 LastChecked time.Time 26 LastError error 27 CheckEvery time.Duration 28 29 lk sync.RWMutex 30 in chan struct{} 31 out chan struct{} 32} 33 34// Querychecker is a query checker meta object 35type Querychecker struct { 36 Queries []*Query 37 Logger *slog.Logger 38 39 connectionURL string 40 lk sync.RWMutex 41} 42 43// NewQuerychecker creates a new querychecker 44func NewQuerychecker(ctx context.Context, connectionURL string) (*Querychecker, error) { 45 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 46 Level: slog.LevelInfo, 47 })) 48 logger = logger.With("source", "querychecker_manager") 49 50 return &Querychecker{ 51 connectionURL: connectionURL, 52 Logger: logger, 53 Queries: []*Query{}, 54 }, nil 55} 56 57// AddQuery adds a query to the checker 58func (q *Querychecker) AddQuery(ctx context.Context, name, query string, checkEvery time.Duration) { 59 ctx, span := tracer.Start(ctx, "AddQuery") 60 defer span.End() 61 62 span.SetAttributes(attribute.String("name", name)) 63 span.SetAttributes(attribute.String("query", query)) 64 span.SetAttributes(attribute.String("checkEvery", checkEvery.String())) 65 66 q.lk.Lock() 67 q.Queries = append(q.Queries, &Query{ 68 Name: name, 69 Query: query, 70 CheckEvery: checkEvery, 71 72 in: make(chan struct{}), 73 out: make(chan struct{}), 74 }) 75 q.lk.Unlock() 76} 77 78// RemoveQuery removes a query from the checker 79func (q *Querychecker) RemoveQuery(ctx context.Context, name string) { 80 ctx, span := tracer.Start(ctx, "RemoveQuery") 81 defer span.End() 82 83 span.SetAttributes(attribute.String("name", name)) 84 85 q.lk.Lock() 86 defer q.lk.Unlock() 87 for i, qu := range q.Queries { 88 if qu.Name == name { 89 q.Queries = append(q.Queries[:i], q.Queries[i+1:]...) 90 return 91 } 92 } 93} 94 95// GetQuery returns a copy of the query 96func (q *Querychecker) GetQuery(ctx context.Context, name string) *Query { 97 ctx, span := tracer.Start(ctx, "GetQuery") 98 defer span.End() 99 100 span.SetAttributes(attribute.String("name", name)) 101 102 q.lk.RLock() 103 defer q.lk.RUnlock() 104 for _, qu := range q.Queries { 105 if qu.Name == name { 106 return &Query{ 107 Name: qu.Name, 108 Query: qu.Query, 109 LatestPlan: qu.LatestPlan, 110 PreviousPlan: qu.PreviousPlan, 111 LastChecked: qu.LastChecked, 112 LastError: qu.LastError, 113 CheckEvery: qu.CheckEvery, 114 } 115 } 116 } 117 return nil 118} 119 120// GetQueries returns a copy of the queries 121func (q *Querychecker) GetQueries(ctx context.Context) []*Query { 122 ctx, span := tracer.Start(ctx, "GetQueries") 123 defer span.End() 124 125 q.lk.RLock() 126 defer q.lk.RUnlock() 127 queries := make([]*Query, len(q.Queries)) 128 for i, qu := range q.Queries { 129 queries[i] = &Query{ 130 Name: qu.Name, 131 Query: qu.Query, 132 LatestPlan: qu.LatestPlan, 133 PreviousPlan: qu.PreviousPlan, 134 LastChecked: qu.LastChecked, 135 LastError: qu.LastError, 136 CheckEvery: qu.CheckEvery, 137 } 138 } 139 140 return queries 141} 142 143// UpdateQuery updates a query 144func (q *Querychecker) UpdateQuery(ctx context.Context, name, query string, checkEvery time.Duration) { 145 ctx, span := tracer.Start(ctx, "UpdateQuery") 146 defer span.End() 147 148 span.SetAttributes(attribute.String("name", name)) 149 span.SetAttributes(attribute.String("query", query)) 150 span.SetAttributes(attribute.String("checkEvery", checkEvery.String())) 151 152 for _, qu := range q.Queries { 153 if qu.Name == name { 154 qu.lk.Lock() 155 qu.Query = query 156 qu.CheckEvery = checkEvery 157 qu.lk.Unlock() 158 return 159 } 160 } 161} 162 163// Start starts the query checker routines 164func (q *Querychecker) Start() error { 165 ctx, span := tracer.Start(context.Background(), "Start") 166 defer span.End() 167 168 for _, qu := range q.Queries { 169 go func(query *Query) { 170 log := q.Logger.With("source", "query_checker_routine", "query", query.Name) 171 172 log.Info("query checker routine started for query", "query", query.Name) 173 log.Info(fmt.Sprintf("Query: \n%s\n", query.Query)) 174 175 // Check the query plan every CheckEvery duration 176 ticker := time.NewTicker(query.CheckEvery) 177 defer ticker.Stop() 178 179 var err error 180 query.LatestPlan, err = q.CheckQueryPlan(ctx, query.Query) 181 if err != nil { 182 log.Error("failed to check query plan", "err", err) 183 } 184 185 if query.LatestPlan != nil { 186 log.Info(fmt.Sprintf("Initial plan:\n%+v\n", query.LatestPlan.String())) 187 query.RecordPlanMetrics(*query.LatestPlan) 188 query.LastChecked = time.Now() 189 } 190 191 for { 192 select { 193 case <-ticker.C: 194 log.Info("checking query plan") 195 196 query.lk.RLock() 197 queryString := query.Query 198 query.lk.RUnlock() 199 200 qp, err := q.CheckQueryPlan(ctx, queryString) 201 202 query.lk.Lock() 203 query.LastChecked = time.Now() 204 query.LastError = err 205 query.lk.Unlock() 206 207 execCounter.WithLabelValues(query.Name).Inc() 208 209 if err != nil || qp == nil { 210 if qp == nil { 211 log.Error("query plan is nil") 212 } 213 log.Error("failed to check query plan", "err", err) 214 errorCounter.WithLabelValues(query.Name).Inc() 215 continue 216 } 217 218 query.lk.RLock() 219 lastPlan := *query.LatestPlan 220 query.lk.RUnlock() 221 222 query.RecordPlanMetrics(*qp) 223 224 if !qp.HasSameStructureAs(lastPlan) { 225 sign := "+" 226 diff := math.Abs(lastPlan.Plan.ActualTotalTime - qp.Plan.ActualTotalTime) 227 if lastPlan.Plan.ActualTotalTime > qp.Plan.ActualTotalTime { 228 sign = "-" 229 } 230 231 log.Info("query plan has changed", "diff", fmt.Sprintf("%s%.03fms", sign, diff), "query_plan", qp.String()) 232 233 query.lk.Lock() 234 query.PreviousPlan = query.LatestPlan 235 query.LatestPlan = qp 236 query.lk.Unlock() 237 } 238 case <-query.in: 239 log.Info("shutting down query checker routine") 240 query.out <- struct{}{} 241 return 242 } 243 } 244 }(qu) 245 } 246 247 return nil 248} 249 250// Stop stops the query checker routines 251func (q *Querychecker) Stop() { 252 _, span := tracer.Start(context.Background(), "Stop") 253 defer span.End() 254 255 q.Logger.Info("stopping query checker") 256 257 for _, qu := range q.Queries { 258 qu.in <- struct{}{} 259 } 260 261 for _, qu := range q.Queries { 262 <-qu.out 263 } 264 265 q.Logger.Info("query checker stopped") 266} 267 268// CheckQueryPlan checks the query plan for a given query 269func (q *Querychecker) CheckQueryPlan(ctx context.Context, query string) (*QueryPlan, error) { 270 ctx, span := tracer.Start(ctx, "CheckQueryPlan") 271 defer span.End() 272 273 conn, err := pgx.Connect(ctx, q.connectionURL) 274 if err != nil { 275 return nil, err 276 } 277 defer conn.Close(ctx) 278 279 rows, err := conn.Query(ctx, "EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS, FORMAT JSON) "+query) 280 if err != nil { 281 return nil, err 282 } 283 284 var plan QueryPlan 285 286 for rows.Next() { 287 var plans QueryPlans 288 err := rows.Scan(&plans) 289 if err != nil { 290 return nil, err 291 } 292 for _, p := range plans { 293 plan = p 294 } 295 } 296 297 return &plan, nil 298} 299 300// RecordPlanMetrics records the query plan metrics 301func (qu *Query) RecordPlanMetrics(qp QueryPlan) { 302 execDurationCounter.WithLabelValues(qu.Name).Add(qp.Plan.ActualTotalTime) 303 blocksHitCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedHitBlocks)) 304 blocksReadCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedReadBlocks)) 305 blocksWrittenCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedWrittenBlocks)) 306 blocksDirtyCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedDirtiedBlocks)) 307 ioReadTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOReadTime) 308 ioWriteTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOWriteTime) 309 tempWrittenBlocksCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.TempWrittenBlocks)) 310 311 qu.RecordPlanNode(qp.Plan) 312} 313 314// RecordPlanNode records the query plan node metrics 315func (qu *Query) RecordPlanNode(p Plan) { 316 planNodeCounter.WithLabelValues(qu.Name, p.NodeType).Inc() 317 for _, n := range p.Plans { 318 qu.RecordPlanNode(n) 319 } 320}