forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/go-git/go-git/v5/plumbing"
11 spindle "tangled.sh/tangled.sh/core/spindle/models"
12)
13
14type Pipeline struct {
15 Id int
16 Rkey string
17 Knot string
18 RepoOwner syntax.DID
19 RepoName string
20 TriggerId int
21 Sha string
22 Created time.Time
23
24 // populate when querying for reverse mappings
25 Trigger *Trigger
26 Statuses map[string]WorkflowStatus
27}
28
29type WorkflowStatus struct {
30 Data []PipelineStatus
31}
32
33func (w WorkflowStatus) Latest() PipelineStatus {
34 return w.Data[len(w.Data)-1]
35}
36
37// time taken by this workflow to reach an "end state"
38func (w WorkflowStatus) TimeTaken() time.Duration {
39 var start, end *time.Time
40 for _, s := range w.Data {
41 if s.Status.IsStart() {
42 start = &s.Created
43 }
44 if s.Status.IsFinish() {
45 end = &s.Created
46 }
47 }
48
49 if start != nil && end != nil && end.After(*start) {
50 return end.Sub(*start)
51 }
52
53 return 0
54}
55
56func (p Pipeline) Counts() map[string]int {
57 m := make(map[string]int)
58 for _, w := range p.Statuses {
59 m[w.Latest().Status.String()] += 1
60 }
61 return m
62}
63
64func (p Pipeline) TimeTaken() time.Duration {
65 var s time.Duration
66 for _, w := range p.Statuses {
67 s += w.TimeTaken()
68 }
69 return s
70}
71
72func (p Pipeline) Workflows() []string {
73 var ws []string
74 for v := range p.Statuses {
75 ws = append(ws, v)
76 }
77 slices.Sort(ws)
78 return ws
79}
80
81// if we know that a spindle has picked up this pipeline, then it is Responding
82func (p Pipeline) IsResponding() bool {
83 return len(p.Statuses) != 0
84}
85
86type Trigger struct {
87 Id int
88 Kind string
89
90 // push trigger fields
91 PushRef *string
92 PushNewSha *string
93 PushOldSha *string
94
95 // pull request trigger fields
96 PRSourceBranch *string
97 PRTargetBranch *string
98 PRSourceSha *string
99 PRAction *string
100}
101
102func (t *Trigger) IsPush() bool {
103 return t != nil && t.Kind == "push"
104}
105
106func (t *Trigger) IsPullRequest() bool {
107 return t != nil && t.Kind == "pull_request"
108}
109
110func (t *Trigger) TargetRef() string {
111 if t.IsPush() {
112 return plumbing.ReferenceName(*t.PushRef).Short()
113 } else if t.IsPullRequest() {
114 return *t.PRTargetBranch
115 }
116
117 return ""
118}
119
120type PipelineStatus struct {
121 ID int
122 Spindle string
123 Rkey string
124 PipelineKnot string
125 PipelineRkey string
126 Created time.Time
127 Workflow string
128 Status spindle.StatusKind
129 Error *string
130 ExitCode int
131}
132
133func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) {
134 var pipelines []Pipeline
135
136 var conditions []string
137 var args []any
138 for _, filter := range filters {
139 conditions = append(conditions, filter.Condition())
140 args = append(args, filter.Arg()...)
141 }
142
143 whereClause := ""
144 if conditions != nil {
145 whereClause = " where " + strings.Join(conditions, " and ")
146 }
147
148 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
149
150 rows, err := e.Query(query, args...)
151
152 if err != nil {
153 return nil, err
154 }
155 defer rows.Close()
156
157 for rows.Next() {
158 var pipeline Pipeline
159 var createdAt string
160 err = rows.Scan(
161 &pipeline.Id,
162 &pipeline.Rkey,
163 &pipeline.Knot,
164 &pipeline.RepoOwner,
165 &pipeline.RepoName,
166 &pipeline.Sha,
167 &createdAt,
168 )
169 if err != nil {
170 return nil, err
171 }
172
173 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
174 pipeline.Created = t
175 }
176
177 pipelines = append(pipelines, pipeline)
178 }
179
180 if err = rows.Err(); err != nil {
181 return nil, err
182 }
183
184 return pipelines, nil
185}
186
187func AddPipeline(e Execer, pipeline Pipeline) error {
188 args := []any{
189 pipeline.Rkey,
190 pipeline.Knot,
191 pipeline.RepoOwner,
192 pipeline.RepoName,
193 pipeline.TriggerId,
194 pipeline.Sha,
195 }
196
197 placeholders := make([]string, len(args))
198 for i := range placeholders {
199 placeholders[i] = "?"
200 }
201
202 query := fmt.Sprintf(`
203 insert or ignore into pipelines (
204 rkey,
205 knot,
206 repo_owner,
207 repo_name,
208 trigger_id,
209 sha
210 ) values (%s)
211 `, strings.Join(placeholders, ","))
212
213 _, err := e.Exec(query, args...)
214
215 return err
216}
217
218func AddTrigger(e Execer, trigger Trigger) (int64, error) {
219 args := []any{
220 trigger.Kind,
221 trigger.PushRef,
222 trigger.PushNewSha,
223 trigger.PushOldSha,
224 trigger.PRSourceBranch,
225 trigger.PRTargetBranch,
226 trigger.PRSourceSha,
227 trigger.PRAction,
228 }
229
230 placeholders := make([]string, len(args))
231 for i := range placeholders {
232 placeholders[i] = "?"
233 }
234
235 query := fmt.Sprintf(`insert or ignore into triggers (
236 kind,
237 push_ref,
238 push_new_sha,
239 push_old_sha,
240 pr_source_branch,
241 pr_target_branch,
242 pr_source_sha,
243 pr_action
244 ) values (%s)`, strings.Join(placeholders, ","))
245
246 res, err := e.Exec(query, args...)
247 if err != nil {
248 return 0, err
249 }
250
251 return res.LastInsertId()
252}
253
254func AddPipelineStatus(e Execer, status PipelineStatus) error {
255 args := []any{
256 status.Spindle,
257 status.Rkey,
258 status.PipelineKnot,
259 status.PipelineRkey,
260 status.Workflow,
261 status.Status,
262 status.Error,
263 status.ExitCode,
264 status.Created.Format(time.RFC3339),
265 }
266
267 placeholders := make([]string, len(args))
268 for i := range placeholders {
269 placeholders[i] = "?"
270 }
271
272 query := fmt.Sprintf(`
273 insert or ignore into pipeline_statuses (
274 spindle,
275 rkey,
276 pipeline_knot,
277 pipeline_rkey,
278 workflow,
279 status,
280 error,
281 exit_code,
282 created
283 ) values (%s)
284 `, strings.Join(placeholders, ","))
285
286 _, err := e.Exec(query, args...)
287 return err
288}
289
290// this is a mega query, but the most useful one:
291// get N pipelines, for each one get the latest status of its N workflows
292func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) {
293 var conditions []string
294 var args []any
295 for _, filter := range filters {
296 filter.key = "p." + filter.key // the table is aliased in the query to `p`
297 conditions = append(conditions, filter.Condition())
298 args = append(args, filter.Arg()...)
299 }
300
301 whereClause := ""
302 if conditions != nil {
303 whereClause = " where " + strings.Join(conditions, " and ")
304 }
305
306 query := fmt.Sprintf(`
307 select
308 p.id,
309 p.knot,
310 p.rkey,
311 p.repo_owner,
312 p.repo_name,
313 p.sha,
314 p.created,
315 t.id,
316 t.kind,
317 t.push_ref,
318 t.push_new_sha,
319 t.push_old_sha,
320 t.pr_source_branch,
321 t.pr_target_branch,
322 t.pr_source_sha,
323 t.pr_action
324 from
325 pipelines p
326 join
327 triggers t ON p.trigger_id = t.id
328 %s
329 `, whereClause)
330
331 rows, err := e.Query(query, args...)
332 if err != nil {
333 return nil, err
334 }
335 defer rows.Close()
336
337 pipelines := make(map[string]Pipeline)
338 for rows.Next() {
339 var p Pipeline
340 var t Trigger
341 var created string
342
343 err := rows.Scan(
344 &p.Id,
345 &p.Knot,
346 &p.Rkey,
347 &p.RepoOwner,
348 &p.RepoName,
349 &p.Sha,
350 &created,
351 &p.TriggerId,
352 &t.Kind,
353 &t.PushRef,
354 &t.PushNewSha,
355 &t.PushOldSha,
356 &t.PRSourceBranch,
357 &t.PRTargetBranch,
358 &t.PRSourceSha,
359 &t.PRAction,
360 )
361 if err != nil {
362 return nil, err
363 }
364
365 p.Created, err = time.Parse(time.RFC3339, created)
366 if err != nil {
367 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
368 }
369
370 t.Id = p.TriggerId
371 p.Trigger = &t
372 p.Statuses = make(map[string]WorkflowStatus)
373
374 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
375 pipelines[k] = p
376 }
377
378 // get all statuses
379 // the where clause here is of the form:
380 //
381 // where (pipeline_knot = k1 and pipeline_rkey = r1)
382 // or (pipeline_knot = k2 and pipeline_rkey = r2)
383 conditions = nil
384 args = nil
385 for _, p := range pipelines {
386 knotFilter := FilterEq("pipeline_knot", p.Knot)
387 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
388 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
389 args = append(args, p.Knot)
390 args = append(args, p.Rkey)
391 }
392 whereClause = ""
393 if conditions != nil {
394 whereClause = "where " + strings.Join(conditions, " or ")
395 }
396 query = fmt.Sprintf(`
397 select
398 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
399 from
400 pipeline_statuses
401 %s
402 `, whereClause)
403
404 rows, err = e.Query(query, args...)
405 if err != nil {
406 return nil, err
407 }
408 defer rows.Close()
409
410 for rows.Next() {
411 var ps PipelineStatus
412 var created string
413
414 err := rows.Scan(
415 &ps.ID,
416 &ps.Spindle,
417 &ps.Rkey,
418 &ps.PipelineKnot,
419 &ps.PipelineRkey,
420 &created,
421 &ps.Workflow,
422 &ps.Status,
423 &ps.Error,
424 &ps.ExitCode,
425 )
426 if err != nil {
427 return nil, err
428 }
429
430 ps.Created, err = time.Parse(time.RFC3339, created)
431 if err != nil {
432 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
433 }
434
435 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
436
437 // extract
438 pipeline, ok := pipelines[key]
439 if !ok {
440 continue
441 }
442 statuses, _ := pipeline.Statuses[ps.Workflow]
443 if !ok {
444 pipeline.Statuses[ps.Workflow] = WorkflowStatus{}
445 }
446
447 // append
448 statuses.Data = append(statuses.Data, ps)
449
450 // reassign
451 pipeline.Statuses[ps.Workflow] = statuses
452 pipelines[key] = pipeline
453 }
454
455 var all []Pipeline
456 for _, p := range pipelines {
457 for _, s := range p.Statuses {
458 slices.SortFunc(s.Data, func(a, b PipelineStatus) int {
459 if a.Created.After(b.Created) {
460 return 1
461 }
462 if a.Created.Before(b.Created) {
463 return -1
464 }
465 if a.ID > b.ID {
466 return 1
467 }
468 if a.ID < b.ID {
469 return -1
470 }
471 return 0
472 })
473 }
474 all = append(all, p)
475 }
476
477 // sort pipelines by date
478 slices.SortFunc(all, func(a, b Pipeline) int {
479 if a.Created.After(b.Created) {
480 return -1
481 }
482 return 1
483 })
484
485 return all, nil
486}