Signed-off-by: Seongmin Lee git@boltless.me
+89
-86
Diff
round #5
-86
appview/state/knotstream.go
-86
appview/state/knotstream.go
···
22
22
"tangled.org/core/log"
23
23
"tangled.org/core/orm"
24
24
"tangled.org/core/rbac"
25
-
"tangled.org/core/workflow"
26
25
27
-
"github.com/bluesky-social/indigo/atproto/syntax"
28
26
"github.com/go-git/go-git/v5/plumbing"
29
27
"github.com/posthog/posthog-go"
30
28
)
···
71
69
switch msg.Nsid {
72
70
case tangled.GitRefUpdateNSID:
73
71
return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
74
-
case tangled.PipelineNSID:
75
-
return ingestPipeline(d, source, msg)
76
72
}
77
73
78
74
return nil
···
288
284
289
285
return tx.Commit()
290
286
}
291
-
292
-
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
293
-
var record tangled.Pipeline
294
-
err := json.Unmarshal(msg.EventJson, &record)
295
-
if err != nil {
296
-
return err
297
-
}
298
-
299
-
if record.TriggerMetadata == nil {
300
-
return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
301
-
}
302
-
303
-
if record.TriggerMetadata.Repo == nil {
304
-
return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
305
-
}
306
-
307
-
// does this repo have a spindle configured?
308
-
repos, err := db.GetRepos(
309
-
d,
310
-
0,
311
-
orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
312
-
orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
313
-
)
314
-
if err != nil {
315
-
return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
316
-
}
317
-
if len(repos) != 1 {
318
-
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
319
-
}
320
-
if repos[0].Spindle == "" {
321
-
return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
322
-
}
323
-
324
-
// trigger info
325
-
var trigger models.Trigger
326
-
var sha string
327
-
trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
328
-
switch trigger.Kind {
329
-
case workflow.TriggerKindPush:
330
-
trigger.PushRef = &record.TriggerMetadata.Push.Ref
331
-
trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
332
-
trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
333
-
sha = *trigger.PushNewSha
334
-
case workflow.TriggerKindPullRequest:
335
-
trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
336
-
trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
337
-
trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
338
-
trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
339
-
sha = *trigger.PRSourceSha
340
-
}
341
-
342
-
tx, err := d.Begin()
343
-
if err != nil {
344
-
return fmt.Errorf("failed to start txn: %w", err)
345
-
}
346
-
347
-
triggerId, err := db.AddTrigger(tx, trigger)
348
-
if err != nil {
349
-
return fmt.Errorf("failed to add trigger entry: %w", err)
350
-
}
351
-
352
-
pipeline := models.Pipeline{
353
-
Rkey: msg.Rkey,
354
-
Knot: source.Key(),
355
-
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
356
-
RepoName: record.TriggerMetadata.Repo.Repo,
357
-
TriggerId: int(triggerId),
358
-
Sha: sha,
359
-
}
360
-
361
-
err = db.AddPipeline(tx, pipeline)
362
-
if err != nil {
363
-
return fmt.Errorf("failed to add pipeline: %w", err)
364
-
}
365
-
366
-
err = tx.Commit()
367
-
if err != nil {
368
-
return fmt.Errorf("failed to commit txn: %w", err)
369
-
}
370
-
371
-
return nil
372
-
}
+89
appview/state/spindlestream.go
+89
appview/state/spindlestream.go
···
20
20
"tangled.org/core/orm"
21
21
"tangled.org/core/rbac"
22
22
spindle "tangled.org/core/spindle/models"
23
+
"tangled.org/core/workflow"
23
24
)
24
25
25
26
func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
···
62
63
func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
63
64
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
64
65
switch msg.Nsid {
66
+
case tangled.PipelineNSID:
67
+
return ingestPipeline(logger, d, source, msg)
65
68
case tangled.PipelineStatusNSID:
66
69
return ingestPipelineStatus(ctx, logger, d, source, msg)
67
70
}
···
70
73
}
71
74
}
72
75
76
+
func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
77
+
var record tangled.Pipeline
78
+
err := json.Unmarshal(msg.EventJson, &record)
79
+
if err != nil {
80
+
return err
81
+
}
82
+
83
+
if record.TriggerMetadata == nil {
84
+
return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
85
+
}
86
+
87
+
if record.TriggerMetadata.Repo == nil {
88
+
return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
89
+
}
90
+
91
+
// does this repo have a spindle configured?
92
+
repos, err := db.GetRepos(
93
+
d,
94
+
0,
95
+
orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
96
+
orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
97
+
)
98
+
if err != nil {
99
+
return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
100
+
}
101
+
if len(repos) != 1 {
102
+
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
103
+
}
104
+
if repos[0].Spindle == "" {
105
+
return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
106
+
}
107
+
108
+
// trigger info
109
+
var trigger models.Trigger
110
+
var sha string
111
+
trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
112
+
switch trigger.Kind {
113
+
case workflow.TriggerKindPush:
114
+
trigger.PushRef = &record.TriggerMetadata.Push.Ref
115
+
trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
116
+
trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
117
+
sha = *trigger.PushNewSha
118
+
case workflow.TriggerKindPullRequest:
119
+
trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
120
+
trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
121
+
trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
122
+
trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
123
+
sha = *trigger.PRSourceSha
124
+
}
125
+
126
+
tx, err := d.Begin()
127
+
if err != nil {
128
+
return fmt.Errorf("failed to start txn: %w", err)
129
+
}
130
+
131
+
triggerId, err := db.AddTrigger(tx, trigger)
132
+
if err != nil {
133
+
return fmt.Errorf("failed to add trigger entry: %w", err)
134
+
}
135
+
136
+
// TODO: we shouldn't even use knot to identify pipelines
137
+
knot := record.TriggerMetadata.Repo.Knot
138
+
pipeline := models.Pipeline{
139
+
Rkey: msg.Rkey,
140
+
Knot: knot,
141
+
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
142
+
RepoName: record.TriggerMetadata.Repo.Repo,
143
+
TriggerId: int(triggerId),
144
+
Sha: sha,
145
+
}
146
+
147
+
err = db.AddPipeline(tx, pipeline)
148
+
if err != nil {
149
+
return fmt.Errorf("failed to add pipeline: %w", err)
150
+
}
151
+
152
+
err = tx.Commit()
153
+
if err != nil {
154
+
return fmt.Errorf("failed to commit txn: %w", err)
155
+
}
156
+
157
+
l.Info("added pipeline", "pipeline", pipeline)
158
+
159
+
return nil
160
+
}
161
+
73
162
func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
74
163
var record tangled.PipelineStatus
75
164
err := json.Unmarshal(msg.EventJson, &record)
History
6 rounds
0 comments
boltless.me
submitted
#5
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
2/3 timeout, 1/3 success
expand
collapse
merge conflicts detected
expand
collapse
expand
collapse
- nix/vm.nix:58
expand 0 comments
boltless.me
submitted
#4
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
3/3 success
expand
collapse
expand 0 comments
boltless.me
submitted
#3
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
1/3 failed, 2/3 success
expand
collapse
expand 0 comments
boltless.me
submitted
#2
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
1/3 failed, 2/3 success
expand
collapse
expand 0 comments
boltless.me
submitted
#1
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
1/3 failed, 2/3 success
expand
collapse
expand 0 comments
boltless.me
submitted
#0
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>