Monorepo for Tangled tangled.org
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/orm" 12) 13 14func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) { 15 var pipelines []models.Pipeline 16 17 var conditions []string 18 var args []any 19 for _, filter := range filters { 20 conditions = append(conditions, filter.Condition()) 21 args = append(args, filter.Arg()...) 22 } 23 24 whereClause := "" 25 if conditions != nil { 26 whereClause = " where " + strings.Join(conditions, " and ") 27 } 28 29 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 30 31 rows, err := e.Query(query, args...) 32 33 if err != nil { 34 return nil, err 35 } 36 defer rows.Close() 37 38 for rows.Next() { 39 var pipeline models.Pipeline 40 var createdAt string 41 err = rows.Scan( 42 &pipeline.Id, 43 &pipeline.Rkey, 44 &pipeline.Knot, 45 &pipeline.RepoOwner, 46 &pipeline.RepoName, 47 &pipeline.Sha, 48 &createdAt, 49 ) 50 if err != nil { 51 return nil, err 52 } 53 54 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 55 pipeline.Created = t 56 } 57 58 pipelines = append(pipelines, pipeline) 59 } 60 61 if err = rows.Err(); err != nil { 62 return nil, err 63 } 64 65 return pipelines, nil 66} 67 68func AddPipeline(e Execer, pipeline models.Pipeline) error { 69 args := []any{ 70 pipeline.Rkey, 71 pipeline.Knot, 72 pipeline.RepoOwner, 73 pipeline.RepoName, 74 pipeline.TriggerId, 75 pipeline.Sha, 76 } 77 78 placeholders := make([]string, len(args)) 79 for i := range placeholders { 80 placeholders[i] = "?" 81 } 82 83 query := fmt.Sprintf(` 84 insert or ignore into pipelines ( 85 rkey, 86 knot, 87 repo_owner, 88 repo_name, 89 trigger_id, 90 sha 91 ) values (%s) 92 `, strings.Join(placeholders, ",")) 93 94 _, err := e.Exec(query, args...) 95 96 return err 97} 98 99func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 100 args := []any{ 101 trigger.Kind, 102 trigger.PushRef, 103 trigger.PushNewSha, 104 trigger.PushOldSha, 105 trigger.PRSourceBranch, 106 trigger.PRTargetBranch, 107 trigger.PRSourceSha, 108 trigger.PRAction, 109 } 110 111 placeholders := make([]string, len(args)) 112 for i := range placeholders { 113 placeholders[i] = "?" 114 } 115 116 query := fmt.Sprintf(`insert or ignore into triggers ( 117 kind, 118 push_ref, 119 push_new_sha, 120 push_old_sha, 121 pr_source_branch, 122 pr_target_branch, 123 pr_source_sha, 124 pr_action 125 ) values (%s)`, strings.Join(placeholders, ",")) 126 127 res, err := e.Exec(query, args...) 128 if err != nil { 129 return 0, err 130 } 131 132 return res.LastInsertId() 133} 134 135func AddPipelineStatus(e Execer, status models.PipelineStatus) error { 136 args := []any{ 137 status.Spindle, 138 status.Rkey, 139 status.PipelineKnot, 140 status.PipelineRkey, 141 status.Workflow, 142 status.Status, 143 status.Error, 144 status.ExitCode, 145 status.Created.Format(time.RFC3339), 146 } 147 148 placeholders := make([]string, len(args)) 149 for i := range placeholders { 150 placeholders[i] = "?" 151 } 152 153 query := fmt.Sprintf(` 154 insert or ignore into pipeline_statuses ( 155 spindle, 156 rkey, 157 pipeline_knot, 158 pipeline_rkey, 159 workflow, 160 status, 161 error, 162 exit_code, 163 created 164 ) values (%s) 165 `, strings.Join(placeholders, ",")) 166 167 _, err := e.Exec(query, args...) 168 return err 169} 170 171// this is a mega query, but the most useful one: 172// get N pipelines, for each one get the latest status of its N workflows 173func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) { 174 var conditions []string 175 var args []any 176 for _, filter := range filters { 177 filter.Key = "p." + filter.Key // the table is aliased in the query to `p` 178 conditions = append(conditions, filter.Condition()) 179 args = append(args, filter.Arg()...) 180 } 181 182 whereClause := "" 183 if conditions != nil { 184 whereClause = " where " + strings.Join(conditions, " and ") 185 } 186 187 query := fmt.Sprintf(` 188 select 189 p.id, 190 p.knot, 191 p.rkey, 192 p.repo_owner, 193 p.repo_name, 194 p.sha, 195 p.created, 196 t.id, 197 t.kind, 198 t.push_ref, 199 t.push_new_sha, 200 t.push_old_sha, 201 t.pr_source_branch, 202 t.pr_target_branch, 203 t.pr_source_sha, 204 t.pr_action 205 from 206 pipelines p 207 join 208 triggers t ON p.trigger_id = t.id 209 %s 210 order by p.created desc 211 limit %d 212 `, whereClause, limit) 213 214 rows, err := e.Query(query, args...) 215 if err != nil { 216 return nil, err 217 } 218 defer rows.Close() 219 220 pipelines := make(map[syntax.ATURI]models.Pipeline) 221 for rows.Next() { 222 var p models.Pipeline 223 var t models.Trigger 224 var created string 225 226 err := rows.Scan( 227 &p.Id, 228 &p.Knot, 229 &p.Rkey, 230 &p.RepoOwner, 231 &p.RepoName, 232 &p.Sha, 233 &created, 234 &p.TriggerId, 235 &t.Kind, 236 &t.PushRef, 237 &t.PushNewSha, 238 &t.PushOldSha, 239 &t.PRSourceBranch, 240 &t.PRTargetBranch, 241 &t.PRSourceSha, 242 &t.PRAction, 243 ) 244 if err != nil { 245 return nil, err 246 } 247 248 p.Created, err = time.Parse(time.RFC3339, created) 249 if err != nil { 250 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 251 } 252 253 t.Id = p.TriggerId 254 p.Trigger = &t 255 p.Statuses = make(map[string]models.WorkflowStatus) 256 257 pipelines[p.AtUri()] = p 258 } 259 260 // get all statuses 261 // the where clause here is of the form: 262 // 263 // where (pipeline_knot = k1 and pipeline_rkey = r1) 264 // or (pipeline_knot = k2 and pipeline_rkey = r2) 265 conditions = nil 266 args = nil 267 for _, p := range pipelines { 268 knotFilter := orm.FilterEq("pipeline_knot", p.Knot) 269 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey) 270 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 271 args = append(args, p.Knot) 272 args = append(args, p.Rkey) 273 } 274 whereClause = "" 275 if conditions != nil { 276 whereClause = "where " + strings.Join(conditions, " or ") 277 } 278 query = fmt.Sprintf(` 279 select 280 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 281 from 282 pipeline_statuses 283 %s 284 `, whereClause) 285 286 rows, err = e.Query(query, args...) 287 if err != nil { 288 return nil, err 289 } 290 defer rows.Close() 291 292 for rows.Next() { 293 var ps models.PipelineStatus 294 var created string 295 296 err := rows.Scan( 297 &ps.ID, 298 &ps.Spindle, 299 &ps.Rkey, 300 &ps.PipelineKnot, 301 &ps.PipelineRkey, 302 &created, 303 &ps.Workflow, 304 &ps.Status, 305 &ps.Error, 306 &ps.ExitCode, 307 ) 308 if err != nil { 309 return nil, err 310 } 311 312 ps.Created, err = time.Parse(time.RFC3339, created) 313 if err != nil { 314 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 315 } 316 317 pipelineAt := ps.PipelineAt() 318 319 // extract 320 pipeline, ok := pipelines[pipelineAt] 321 if !ok { 322 continue 323 } 324 statuses, _ := pipeline.Statuses[ps.Workflow] 325 if !ok { 326 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 327 } 328 329 // append 330 statuses.Data = append(statuses.Data, ps) 331 332 // reassign 333 pipeline.Statuses[ps.Workflow] = statuses 334 pipelines[pipelineAt] = pipeline 335 } 336 337 var all []models.Pipeline 338 for _, p := range pipelines { 339 for _, s := range p.Statuses { 340 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 341 if a.Created.After(b.Created) { 342 return 1 343 } 344 if a.Created.Before(b.Created) { 345 return -1 346 } 347 if a.ID > b.ID { 348 return 1 349 } 350 if a.ID < b.ID { 351 return -1 352 } 353 return 0 354 }) 355 } 356 all = append(all, p) 357 } 358 359 // sort pipelines by date 360 slices.SortFunc(all, func(a, b models.Pipeline) int { 361 if a.Created.After(b.Created) { 362 return -1 363 } 364 return 1 365 }) 366 367 return all, nil 368}