+17
-139
appview/db/pipeline.go
+17
-139
appview/db/pipeline.go
···
6
6
"strings"
7
7
"time"
8
8
9
-
"github.com/bluesky-social/indigo/atproto/syntax"
10
-
"github.com/go-git/go-git/v5/plumbing"
11
-
spindle "tangled.org/core/spindle/models"
12
-
"tangled.org/core/workflow"
9
+
"tangled.org/core/appview/models"
13
10
)
14
11
15
-
type Pipeline struct {
16
-
Id int
17
-
Rkey string
18
-
Knot string
19
-
RepoOwner syntax.DID
20
-
RepoName string
21
-
TriggerId int
22
-
Sha string
23
-
Created time.Time
24
-
25
-
// populate when querying for reverse mappings
26
-
Trigger *Trigger
27
-
Statuses map[string]WorkflowStatus
28
-
}
29
-
30
-
type WorkflowStatus struct {
31
-
Data []PipelineStatus
32
-
}
33
-
34
-
func (w WorkflowStatus) Latest() PipelineStatus {
35
-
return w.Data[len(w.Data)-1]
36
-
}
37
-
38
-
// time taken by this workflow to reach an "end state"
39
-
func (w WorkflowStatus) TimeTaken() time.Duration {
40
-
var start, end *time.Time
41
-
for _, s := range w.Data {
42
-
if s.Status.IsStart() {
43
-
start = &s.Created
44
-
}
45
-
if s.Status.IsFinish() {
46
-
end = &s.Created
47
-
}
48
-
}
49
-
50
-
if start != nil && end != nil && end.After(*start) {
51
-
return end.Sub(*start)
52
-
}
53
-
54
-
return 0
55
-
}
56
-
57
-
func (p Pipeline) Counts() map[string]int {
58
-
m := make(map[string]int)
59
-
for _, w := range p.Statuses {
60
-
m[w.Latest().Status.String()] += 1
61
-
}
62
-
return m
63
-
}
64
-
65
-
func (p Pipeline) TimeTaken() time.Duration {
66
-
var s time.Duration
67
-
for _, w := range p.Statuses {
68
-
s += w.TimeTaken()
69
-
}
70
-
return s
71
-
}
72
-
73
-
func (p Pipeline) Workflows() []string {
74
-
var ws []string
75
-
for v := range p.Statuses {
76
-
ws = append(ws, v)
77
-
}
78
-
slices.Sort(ws)
79
-
return ws
80
-
}
81
-
82
-
// if we know that a spindle has picked up this pipeline, then it is Responding
83
-
func (p Pipeline) IsResponding() bool {
84
-
return len(p.Statuses) != 0
85
-
}
86
-
87
-
type Trigger struct {
88
-
Id int
89
-
Kind workflow.TriggerKind
90
-
91
-
// push trigger fields
92
-
PushRef *string
93
-
PushNewSha *string
94
-
PushOldSha *string
95
-
96
-
// pull request trigger fields
97
-
PRSourceBranch *string
98
-
PRTargetBranch *string
99
-
PRSourceSha *string
100
-
PRAction *string
101
-
}
102
-
103
-
func (t *Trigger) IsPush() bool {
104
-
return t != nil && t.Kind == workflow.TriggerKindPush
105
-
}
106
-
107
-
func (t *Trigger) IsPullRequest() bool {
108
-
return t != nil && t.Kind == workflow.TriggerKindPullRequest
109
-
}
110
-
111
-
func (t *Trigger) TargetRef() string {
112
-
if t.IsPush() {
113
-
return plumbing.ReferenceName(*t.PushRef).Short()
114
-
} else if t.IsPullRequest() {
115
-
return *t.PRTargetBranch
116
-
}
117
-
118
-
return ""
119
-
}
120
-
121
-
type PipelineStatus struct {
122
-
ID int
123
-
Spindle string
124
-
Rkey string
125
-
PipelineKnot string
126
-
PipelineRkey string
127
-
Created time.Time
128
-
Workflow string
129
-
Status spindle.StatusKind
130
-
Error *string
131
-
ExitCode int
132
-
}
133
-
134
-
func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) {
135
-
var pipelines []Pipeline
12
+
func GetPipelines(e Execer, filters ...filter) ([]models.Pipeline, error) {
13
+
var pipelines []models.Pipeline
136
14
137
15
var conditions []string
138
16
var args []any
···
156
34
defer rows.Close()
157
35
158
36
for rows.Next() {
159
-
var pipeline Pipeline
37
+
var pipeline models.Pipeline
160
38
var createdAt string
161
39
err = rows.Scan(
162
40
&pipeline.Id,
···
185
63
return pipelines, nil
186
64
}
187
65
188
-
func AddPipeline(e Execer, pipeline Pipeline) error {
66
+
func AddPipeline(e Execer, pipeline models.Pipeline) error {
189
67
args := []any{
190
68
pipeline.Rkey,
191
69
pipeline.Knot,
···
216
94
return err
217
95
}
218
96
219
-
func AddTrigger(e Execer, trigger Trigger) (int64, error) {
97
+
func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
220
98
args := []any{
221
99
trigger.Kind,
222
100
trigger.PushRef,
···
252
130
return res.LastInsertId()
253
131
}
254
132
255
-
func AddPipelineStatus(e Execer, status PipelineStatus) error {
133
+
func AddPipelineStatus(e Execer, status models.PipelineStatus) error {
256
134
args := []any{
257
135
status.Spindle,
258
136
status.Rkey,
···
290
168
291
169
// this is a mega query, but the most useful one:
292
170
// get N pipelines, for each one get the latest status of its N workflows
293
-
func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) {
171
+
func GetPipelineStatuses(e Execer, filters ...filter) ([]models.Pipeline, error) {
294
172
var conditions []string
295
173
var args []any
296
174
for _, filter := range filters {
···
335
213
}
336
214
defer rows.Close()
337
215
338
-
pipelines := make(map[string]Pipeline)
216
+
pipelines := make(map[string]models.Pipeline)
339
217
for rows.Next() {
340
-
var p Pipeline
341
-
var t Trigger
218
+
var p models.Pipeline
219
+
var t models.Trigger
342
220
var created string
343
221
344
222
err := rows.Scan(
···
370
248
371
249
t.Id = p.TriggerId
372
250
p.Trigger = &t
373
-
p.Statuses = make(map[string]WorkflowStatus)
251
+
p.Statuses = make(map[string]models.WorkflowStatus)
374
252
375
253
k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
376
254
pipelines[k] = p
···
409
287
defer rows.Close()
410
288
411
289
for rows.Next() {
412
-
var ps PipelineStatus
290
+
var ps models.PipelineStatus
413
291
var created string
414
292
415
293
err := rows.Scan(
···
442
320
}
443
321
statuses, _ := pipeline.Statuses[ps.Workflow]
444
322
if !ok {
445
-
pipeline.Statuses[ps.Workflow] = WorkflowStatus{}
323
+
pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
446
324
}
447
325
448
326
// append
···
453
331
pipelines[key] = pipeline
454
332
}
455
333
456
-
var all []Pipeline
334
+
var all []models.Pipeline
457
335
for _, p := range pipelines {
458
336
for _, s := range p.Statuses {
459
-
slices.SortFunc(s.Data, func(a, b PipelineStatus) int {
337
+
slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
460
338
if a.Created.After(b.Created) {
461
339
return 1
462
340
}
···
476
354
}
477
355
478
356
// sort pipelines by date
479
-
slices.SortFunc(all, func(a, b Pipeline) int {
357
+
slices.SortFunc(all, func(a, b models.Pipeline) int {
480
358
if a.Created.After(b.Created) {
481
359
return -1
482
360
}
+130
appview/models/pipeline.go
+130
appview/models/pipeline.go
···
1
+
package models
2
+
3
+
import (
4
+
"slices"
5
+
"time"
6
+
7
+
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
"github.com/go-git/go-git/v5/plumbing"
9
+
spindle "tangled.org/core/spindle/models"
10
+
"tangled.org/core/workflow"
11
+
)
12
+
13
+
type Pipeline struct {
14
+
Id int
15
+
Rkey string
16
+
Knot string
17
+
RepoOwner syntax.DID
18
+
RepoName string
19
+
TriggerId int
20
+
Sha string
21
+
Created time.Time
22
+
23
+
// populate when querying for reverse mappings
24
+
Trigger *Trigger
25
+
Statuses map[string]WorkflowStatus
26
+
}
27
+
28
+
type WorkflowStatus struct {
29
+
Data []PipelineStatus
30
+
}
31
+
32
+
func (w WorkflowStatus) Latest() PipelineStatus {
33
+
return w.Data[len(w.Data)-1]
34
+
}
35
+
36
+
// time taken by this workflow to reach an "end state"
37
+
func (w WorkflowStatus) TimeTaken() time.Duration {
38
+
var start, end *time.Time
39
+
for _, s := range w.Data {
40
+
if s.Status.IsStart() {
41
+
start = &s.Created
42
+
}
43
+
if s.Status.IsFinish() {
44
+
end = &s.Created
45
+
}
46
+
}
47
+
48
+
if start != nil && end != nil && end.After(*start) {
49
+
return end.Sub(*start)
50
+
}
51
+
52
+
return 0
53
+
}
54
+
55
+
func (p Pipeline) Counts() map[string]int {
56
+
m := make(map[string]int)
57
+
for _, w := range p.Statuses {
58
+
m[w.Latest().Status.String()] += 1
59
+
}
60
+
return m
61
+
}
62
+
63
+
func (p Pipeline) TimeTaken() time.Duration {
64
+
var s time.Duration
65
+
for _, w := range p.Statuses {
66
+
s += w.TimeTaken()
67
+
}
68
+
return s
69
+
}
70
+
71
+
func (p Pipeline) Workflows() []string {
72
+
var ws []string
73
+
for v := range p.Statuses {
74
+
ws = append(ws, v)
75
+
}
76
+
slices.Sort(ws)
77
+
return ws
78
+
}
79
+
80
+
// if we know that a spindle has picked up this pipeline, then it is Responding
81
+
func (p Pipeline) IsResponding() bool {
82
+
return len(p.Statuses) != 0
83
+
}
84
+
85
+
type Trigger struct {
86
+
Id int
87
+
Kind workflow.TriggerKind
88
+
89
+
// push trigger fields
90
+
PushRef *string
91
+
PushNewSha *string
92
+
PushOldSha *string
93
+
94
+
// pull request trigger fields
95
+
PRSourceBranch *string
96
+
PRTargetBranch *string
97
+
PRSourceSha *string
98
+
PRAction *string
99
+
}
100
+
101
+
func (t *Trigger) IsPush() bool {
102
+
return t != nil && t.Kind == workflow.TriggerKindPush
103
+
}
104
+
105
+
func (t *Trigger) IsPullRequest() bool {
106
+
return t != nil && t.Kind == workflow.TriggerKindPullRequest
107
+
}
108
+
109
+
func (t *Trigger) TargetRef() string {
110
+
if t.IsPush() {
111
+
return plumbing.ReferenceName(*t.PushRef).Short()
112
+
} else if t.IsPullRequest() {
113
+
return *t.PRTargetBranch
114
+
}
115
+
116
+
return ""
117
+
}
118
+
119
+
type PipelineStatus struct {
120
+
ID int
121
+
Spindle string
122
+
Rkey string
123
+
PipelineKnot string
124
+
PipelineRkey string
125
+
Created time.Time
126
+
Workflow string
127
+
Status spindle.StatusKind
128
+
Error *string
129
+
ExitCode int
130
+
}
+7
-7
appview/pages/pages.go
+7
-7
appview/pages/pages.go
···
588
588
EmailToDidOrHandle map[string]string
589
589
VerifiedCommits commitverify.VerifiedCommits
590
590
Languages []types.RepoLanguageDetails
591
-
Pipelines map[string]db.Pipeline
591
+
Pipelines map[string]models.Pipeline
592
592
NeedsKnotUpgrade bool
593
593
types.RepoIndexResponse
594
594
}
···
631
631
Active string
632
632
EmailToDidOrHandle map[string]string
633
633
VerifiedCommits commitverify.VerifiedCommits
634
-
Pipelines map[string]db.Pipeline
634
+
Pipelines map[string]models.Pipeline
635
635
}
636
636
637
637
func (p *Pages) RepoLog(w io.Writer, params RepoLogParams) error {
···
644
644
RepoInfo repoinfo.RepoInfo
645
645
Active string
646
646
EmailToDidOrHandle map[string]string
647
-
Pipeline *db.Pipeline
647
+
Pipeline *models.Pipeline
648
648
DiffOpts types.DiffOpts
649
649
650
650
// singular because it's always going to be just one
···
1023
1023
Active string
1024
1024
FilteringBy models.PullState
1025
1025
Stacks map[string]models.Stack
1026
-
Pipelines map[string]db.Pipeline
1026
+
Pipelines map[string]models.Pipeline
1027
1027
}
1028
1028
1029
1029
func (p *Pages) RepoPulls(w io.Writer, params RepoPullsParams) error {
···
1058
1058
AbandonedPulls []*models.Pull
1059
1059
MergeCheck types.MergeCheckResponse
1060
1060
ResubmitCheck ResubmitResult
1061
-
Pipelines map[string]db.Pipeline
1061
+
Pipelines map[string]models.Pipeline
1062
1062
1063
1063
OrderedReactionKinds []db.ReactionKind
1064
1064
Reactions map[db.ReactionKind]int
···
1260
1260
type PipelinesParams struct {
1261
1261
LoggedInUser *oauth.User
1262
1262
RepoInfo repoinfo.RepoInfo
1263
-
Pipelines []db.Pipeline
1263
+
Pipelines []models.Pipeline
1264
1264
Active string
1265
1265
}
1266
1266
···
1292
1292
type WorkflowParams struct {
1293
1293
LoggedInUser *oauth.User
1294
1294
RepoInfo repoinfo.RepoInfo
1295
-
Pipeline db.Pipeline
1295
+
Pipeline models.Pipeline
1296
1296
Workflow string
1297
1297
LogUrl string
1298
1298
Active string
+2
-2
appview/pulls/pulls.go
+2
-2
appview/pulls/pulls.go
···
160
160
161
161
repoInfo := f.RepoInfo(user)
162
162
163
-
m := make(map[string]db.Pipeline)
163
+
m := make(map[string]models.Pipeline)
164
164
165
165
var shas []string
166
166
for _, s := range pull.Submissions {
···
552
552
log.Printf("failed to fetch pipeline statuses: %s", err)
553
553
// non-fatal
554
554
}
555
-
m := make(map[string]db.Pipeline)
555
+
m := make(map[string]models.Pipeline)
556
556
for _, p := range ps {
557
557
m[p.Sha] = p
558
558
}
+1
-1
appview/repo/repo.go
+1
-1
appview/repo/repo.go
+3
-2
appview/repo/repo_util.go
+3
-2
appview/repo/repo_util.go
···
10
10
"strings"
11
11
12
12
"tangled.org/core/appview/db"
13
+
"tangled.org/core/appview/models"
13
14
"tangled.org/core/appview/pages/repoinfo"
14
15
"tangled.org/core/types"
15
16
···
143
144
d *db.DB,
144
145
repoInfo repoinfo.RepoInfo,
145
146
shas []string,
146
-
) (map[string]db.Pipeline, error) {
147
-
m := make(map[string]db.Pipeline)
147
+
) (map[string]models.Pipeline, error) {
148
+
m := make(map[string]models.Pipeline)
148
149
149
150
if len(shas) == 0 {
150
151
return m, nil
+2
-2
appview/state/knotstream.go
+2
-2
appview/state/knotstream.go
···
208
208
}
209
209
210
210
// trigger info
211
-
var trigger db.Trigger
211
+
var trigger models.Trigger
212
212
var sha string
213
213
trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
214
214
switch trigger.Kind {
···
235
235
return fmt.Errorf("failed to add trigger entry: %w", err)
236
236
}
237
237
238
-
pipeline := db.Pipeline{
238
+
pipeline := models.Pipeline{
239
239
Rkey: msg.Rkey,
240
240
Knot: source.Key(),
241
241
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
+2
-1
appview/state/spindlestream.go
+2
-1
appview/state/spindlestream.go
···
13
13
"tangled.org/core/appview/cache"
14
14
"tangled.org/core/appview/config"
15
15
"tangled.org/core/appview/db"
16
+
"tangled.org/core/appview/models"
16
17
ec "tangled.org/core/eventconsumer"
17
18
"tangled.org/core/eventconsumer/cursor"
18
19
"tangled.org/core/log"
···
89
90
created = t
90
91
}
91
92
92
-
status := db.PipelineStatus{
93
+
status := models.PipelineStatus{
93
94
Spindle: source.Key(),
94
95
Rkey: msg.Rkey,
95
96
PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),