forked from tangled.org/core
Monorepo for Tangled

appview/db: add query to fetch pipeline statuses

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 3a36b3f4 1ac0de7c

verified
+2 -2
appview/db/artifact.go
··· 64 64 var args []any 65 65 for _, filter := range filters { 66 66 conditions = append(conditions, filter.Condition()) 67 - args = append(args, filter.arg) 67 + args = append(args, filter.Arg()...) 68 68 } 69 69 70 70 whereClause := "" ··· 135 135 var args []any 136 136 for _, filter := range filters { 137 137 conditions = append(conditions, filter.Condition()) 138 - args = append(args, filter.arg) 138 + args = append(args, filter.Arg()...) 139 139 } 140 140 141 141 whereClause := ""
+39
appview/db/db.go
··· 5 5 "database/sql" 6 6 "fmt" 7 7 "log" 8 + "reflect" 9 + "strings" 8 10 9 11 _ "github.com/mattn/go-sqlite3" 10 12 ) ··· 342 344 343 345 -- every pipeline must be associated with exactly one commit 344 346 sha text not null check (length(sha) = 40), 347 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 345 348 346 349 -- trigger data 347 350 trigger_id integer not null, ··· 600 603 func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) } 601 604 func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) } 602 605 func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) } 606 + func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) } 603 607 604 608 func (f filter) Condition() string { 609 + rv := reflect.ValueOf(f.arg) 610 + kind := rv.Kind() 611 + 612 + // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)` 613 + if kind == reflect.Slice || kind == reflect.Array { 614 + if rv.Len() == 0 { 615 + panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key)) 616 + } 617 + 618 + placeholders := make([]string, rv.Len()) 619 + for i := range placeholders { 620 + placeholders[i] = "?" 621 + } 622 + 623 + return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", ")) 624 + } 625 + 605 626 return fmt.Sprintf("%s %s ?", f.key, f.cmp) 606 627 } 628 + 629 + func (f filter) Arg() []any { 630 + rv := reflect.ValueOf(f.arg) 631 + kind := rv.Kind() 632 + if kind == reflect.Slice || kind == reflect.Array { 633 + if rv.Len() == 0 { 634 + panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key)) 635 + } 636 + 637 + out := make([]any, rv.Len()) 638 + for i := range rv.Len() { 639 + out[i] = rv.Index(i).Interface() 640 + } 641 + return out 642 + } 643 + 644 + return []any{f.arg} 645 + }
+234 -5
appview/db/pipeline.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 + "slices" 5 6 "strings" 6 7 "time" 7 8 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 + spindle "tangled.sh/tangled.sh/core/spindle/models" 9 11 ) 10 12 11 13 type Pipeline struct { ··· 16 18 RepoName string 17 19 TriggerId int 18 20 Sha string 21 + Created time.Time 22 + 23 + // populate when querying for reverse mappings 24 + Trigger *Trigger 25 + Statuses map[string]WorkflowStatus 26 + } 19 27 20 - // populate when querying for revers mappings 21 - Trigger *Trigger 28 + type WorkflowStatus struct { 29 + data []PipelineStatus 30 + } 31 + 32 + func (w WorkflowStatus) Latest() PipelineStatus { 33 + return w.data[len(w.data)-1] 34 + } 35 + 36 + // time taken by this workflow to reach an "end state" 37 + func (w WorkflowStatus) TimeTaken() time.Duration { 38 + var start, end *time.Time 39 + for _, s := range w.data { 40 + if s.Status.IsStart() { 41 + start = &s.Created 42 + } 43 + if s.Status.IsFinish() { 44 + end = &s.Created 45 + } 46 + } 47 + 48 + if start != nil && end != nil && end.After(*start) { 49 + return end.Sub(*start) 50 + } 51 + 52 + return 0 53 + } 54 + 55 + func (p Pipeline) Counts() map[string]int { 56 + m := make(map[string]int) 57 + for _, w := range p.Statuses { 58 + m[w.Latest().Status.String()] += 1 59 + } 60 + return m 22 61 } 23 62 24 63 type Trigger struct { ··· 45 84 PipelineRkey string 46 85 Created time.Time 47 86 Workflow string 48 - Status string 87 + Status spindle.StatusKind 49 88 Error *string 50 89 ExitCode int 51 90 } ··· 57 96 var args []any 58 97 for _, filter := range filters { 59 98 conditions = append(conditions, filter.Condition()) 60 - args = append(args, filter.arg) 99 + args = append(args, filter.Arg()...) 61 100 } 62 101 63 102 whereClause := "" ··· 65 104 whereClause = " where " + strings.Join(conditions, " and ") 66 105 } 67 106 68 - query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha from pipelines %s`, whereClause) 107 + query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 69 108 70 109 rows, err := e.Query(query, args...) 71 110 ··· 76 115 77 116 for rows.Next() { 78 117 var pipeline Pipeline 118 + var createdAt string 79 119 err = rows.Scan( 80 120 &pipeline.Id, 81 121 &pipeline.Rkey, ··· 83 123 &pipeline.RepoOwner, 84 124 &pipeline.RepoName, 85 125 &pipeline.Sha, 126 + &createdAt, 86 127 ) 87 128 if err != nil { 88 129 return nil, err 130 + } 131 + 132 + if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 133 + pipeline.Created = t 89 134 } 90 135 91 136 pipelines = append(pipelines, pipeline) ··· 198 243 _, err := e.Exec(query, args...) 199 244 return err 200 245 } 246 + 247 + // this is a mega query, but the most useful one: 248 + // get N pipelines, for each one get the latest status of its N workflows 249 + func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) { 250 + var conditions []string 251 + var args []any 252 + for _, filter := range filters { 253 + filter.key = "p." + filter.key // the table is aliased in the query to `p` 254 + conditions = append(conditions, filter.Condition()) 255 + args = append(args, filter.Arg()...) 256 + } 257 + 258 + whereClause := "" 259 + if conditions != nil { 260 + whereClause = " where " + strings.Join(conditions, " and ") 261 + } 262 + 263 + query := fmt.Sprintf(` 264 + select 265 + p.id AS pipeline_id, 266 + p.knot, 267 + p.rkey, 268 + p.repo_owner, 269 + p.repo_name, 270 + p.sha, 271 + p.created, 272 + t.id AS trigger_id, 273 + t.kind, 274 + t.push_ref, 275 + t.push_new_sha, 276 + t.push_old_sha, 277 + t.pr_source_branch, 278 + t.pr_target_branch, 279 + t.pr_source_sha, 280 + t.pr_action 281 + from 282 + pipelines p 283 + join 284 + triggers t ON p.trigger_id = t.id 285 + %s 286 + order by p.created desc 287 + `, whereClause) 288 + 289 + rows, err := e.Query(query, args...) 290 + if err != nil { 291 + return nil, err 292 + } 293 + defer rows.Close() 294 + 295 + pipelines := make(map[string]Pipeline) 296 + for rows.Next() { 297 + var p Pipeline 298 + var t Trigger 299 + var created string 300 + 301 + err := rows.Scan( 302 + &p.Id, 303 + &p.Knot, 304 + &p.Rkey, 305 + &p.RepoOwner, 306 + &p.RepoName, 307 + &p.Sha, 308 + &created, 309 + &p.TriggerId, 310 + &t.Kind, 311 + &t.PushRef, 312 + &t.PushNewSha, 313 + &t.PushOldSha, 314 + &t.PRSourceBranch, 315 + &t.PRTargetBranch, 316 + &t.PRSourceSha, 317 + &t.PRAction, 318 + ) 319 + if err != nil { 320 + return nil, err 321 + } 322 + 323 + // Parse created time manually 324 + p.Created, err = time.Parse(time.RFC3339, created) 325 + if err != nil { 326 + return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 327 + } 328 + 329 + // Link trigger to pipeline 330 + t.Id = p.TriggerId 331 + p.Trigger = &t 332 + p.Statuses = make(map[string]WorkflowStatus) 333 + 334 + k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 335 + pipelines[k] = p 336 + } 337 + 338 + // get all statuses 339 + // the where clause here is of the form: 340 + // 341 + // where (pipeline_knot = k1 and pipeline_rkey = r1) 342 + // or (pipeline_knot = k2 and pipeline_rkey = r2) 343 + conditions = nil 344 + args = nil 345 + for _, p := range pipelines { 346 + knotFilter := FilterEq("pipeline_knot", p.Knot) 347 + rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) 348 + conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 349 + args = append(args, p.Knot) 350 + args = append(args, p.Rkey) 351 + } 352 + whereClause = "" 353 + if conditions != nil { 354 + whereClause = "where " + strings.Join(conditions, " or ") 355 + } 356 + query = fmt.Sprintf(` 357 + select 358 + id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 359 + from 360 + pipeline_statuses 361 + %s 362 + `, whereClause) 363 + 364 + rows, err = e.Query(query, args...) 365 + if err != nil { 366 + return nil, err 367 + } 368 + defer rows.Close() 369 + 370 + for rows.Next() { 371 + var ps PipelineStatus 372 + var created string 373 + 374 + err := rows.Scan( 375 + &ps.ID, 376 + &ps.Spindle, 377 + &ps.Rkey, 378 + &ps.PipelineKnot, 379 + &ps.PipelineRkey, 380 + &created, 381 + &ps.Workflow, 382 + &ps.Status, 383 + &ps.Error, 384 + &ps.ExitCode, 385 + ) 386 + if err != nil { 387 + return nil, err 388 + } 389 + 390 + ps.Created, err = time.Parse(time.RFC3339, created) 391 + if err != nil { 392 + return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 393 + } 394 + 395 + key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 396 + 397 + // extract 398 + pipeline, ok := pipelines[key] 399 + if !ok { 400 + continue 401 + } 402 + statuses, _ := pipeline.Statuses[ps.Workflow] 403 + if !ok { 404 + pipeline.Statuses[ps.Workflow] = WorkflowStatus{} 405 + } 406 + 407 + // append 408 + statuses.data = append(statuses.data, ps) 409 + 410 + // reassign 411 + pipeline.Statuses[ps.Workflow] = statuses 412 + pipelines[key] = pipeline 413 + } 414 + 415 + var all []Pipeline 416 + for _, p := range pipelines { 417 + for _, s := range p.Statuses { 418 + slices.SortFunc(s.data, func(a, b PipelineStatus) int { 419 + if a.Created.After(b.Created) { 420 + return 1 421 + } 422 + return -1 423 + }) 424 + } 425 + all = append(all, p) 426 + } 427 + 428 + return all, nil 429 + }
+3 -3
appview/db/pulls.go
··· 311 311 var args []any 312 312 for _, filter := range filters { 313 313 conditions = append(conditions, filter.Condition()) 314 - args = append(args, filter.arg) 314 + args = append(args, filter.Arg()...) 315 315 } 316 316 317 317 whereClause := "" ··· 866 866 867 867 for _, filter := range filters { 868 868 conditions = append(conditions, filter.Condition()) 869 - args = append(args, filter.arg) 869 + args = append(args, filter.Arg()...) 870 870 } 871 871 872 872 whereClause := "" ··· 891 891 892 892 for _, filter := range filters { 893 893 conditions = append(conditions, filter.Condition()) 894 - args = append(args, filter.arg) 894 + args = append(args, filter.Arg()...) 895 895 } 896 896 897 897 whereClause := ""
+1 -1
appview/db/punchcard.go
··· 45 45 var args []any 46 46 for _, filter := range filters { 47 47 conditions = append(conditions, filter.Condition()) 48 - args = append(args, filter.arg) 48 + args = append(args, filter.Arg()...) 49 49 } 50 50 51 51 whereClause := ""
+3 -3
appview/db/spindle.go
··· 24 24 var args []any 25 25 for _, filter := range filters { 26 26 conditions = append(conditions, filter.Condition()) 27 - args = append(args, filter.arg) 27 + args = append(args, filter.Arg()...) 28 28 } 29 29 30 30 whereClause := "" ··· 98 98 var args []any 99 99 for _, filter := range filters { 100 100 conditions = append(conditions, filter.Condition()) 101 - args = append(args, filter.arg) 101 + args = append(args, filter.Arg()...) 102 102 } 103 103 104 104 whereClause := "" ··· 121 121 var args []any 122 122 for _, filter := range filters { 123 123 conditions = append(conditions, filter.Condition()) 124 - args = append(args, filter.arg) 124 + args = append(args, filter.Arg()...) 125 125 } 126 126 127 127 whereClause := ""