-86
appview/state/knotstream.go
-86
appview/state/knotstream.go
···
18
18
"tangled.org/core/log"
19
19
"tangled.org/core/orm"
20
20
"tangled.org/core/rbac"
21
-
"tangled.org/core/workflow"
22
21
23
-
"github.com/bluesky-social/indigo/atproto/syntax"
24
22
"github.com/go-git/go-git/v5/plumbing"
25
23
"github.com/posthog/posthog-go"
26
24
)
···
67
65
switch msg.Nsid {
68
66
case tangled.GitRefUpdateNSID:
69
67
return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
70
-
case tangled.PipelineNSID:
71
-
return ingestPipeline(d, source, msg)
72
68
}
73
69
74
70
return nil
···
190
186
191
187
return tx.Commit()
192
188
}
193
-
194
-
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
195
-
var record tangled.Pipeline
196
-
err := json.Unmarshal(msg.EventJson, &record)
197
-
if err != nil {
198
-
return err
199
-
}
200
-
201
-
if record.TriggerMetadata == nil {
202
-
return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
203
-
}
204
-
205
-
if record.TriggerMetadata.Repo == nil {
206
-
return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
207
-
}
208
-
209
-
// does this repo have a spindle configured?
210
-
repos, err := db.GetRepos(
211
-
d,
212
-
0,
213
-
orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
214
-
orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
215
-
)
216
-
if err != nil {
217
-
return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
218
-
}
219
-
if len(repos) != 1 {
220
-
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
221
-
}
222
-
if repos[0].Spindle == "" {
223
-
return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
224
-
}
225
-
226
-
// trigger info
227
-
var trigger models.Trigger
228
-
var sha string
229
-
trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
230
-
switch trigger.Kind {
231
-
case workflow.TriggerKindPush:
232
-
trigger.PushRef = &record.TriggerMetadata.Push.Ref
233
-
trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
234
-
trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
235
-
sha = *trigger.PushNewSha
236
-
case workflow.TriggerKindPullRequest:
237
-
trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
238
-
trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
239
-
trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
240
-
trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
241
-
sha = *trigger.PRSourceSha
242
-
}
243
-
244
-
tx, err := d.Begin()
245
-
if err != nil {
246
-
return fmt.Errorf("failed to start txn: %w", err)
247
-
}
248
-
249
-
triggerId, err := db.AddTrigger(tx, trigger)
250
-
if err != nil {
251
-
return fmt.Errorf("failed to add trigger entry: %w", err)
252
-
}
253
-
254
-
pipeline := models.Pipeline{
255
-
Rkey: msg.Rkey,
256
-
Knot: source.Key(),
257
-
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
258
-
RepoName: record.TriggerMetadata.Repo.Repo,
259
-
TriggerId: int(triggerId),
260
-
Sha: sha,
261
-
}
262
-
263
-
err = db.AddPipeline(tx, pipeline)
264
-
if err != nil {
265
-
return fmt.Errorf("failed to add pipeline: %w", err)
266
-
}
267
-
268
-
err = tx.Commit()
269
-
if err != nil {
270
-
return fmt.Errorf("failed to commit txn: %w", err)
271
-
}
272
-
273
-
return nil
274
-
}
+89
appview/state/spindlestream.go
+89
appview/state/spindlestream.go
···
20
20
"tangled.org/core/orm"
21
21
"tangled.org/core/rbac"
22
22
spindle "tangled.org/core/spindle/models"
23
+
"tangled.org/core/workflow"
23
24
)
24
25
25
26
func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
···
62
63
func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
63
64
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
64
65
switch msg.Nsid {
66
+
case tangled.PipelineNSID:
67
+
return ingestPipeline(logger, d, source, msg)
65
68
case tangled.PipelineStatusNSID:
66
69
return ingestPipelineStatus(ctx, logger, d, source, msg)
67
70
}
68
71
69
72
return nil
70
73
}
74
+
}
75
+
76
+
func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
77
+
var record tangled.Pipeline
78
+
err := json.Unmarshal(msg.EventJson, &record)
79
+
if err != nil {
80
+
return err
81
+
}
82
+
83
+
if record.TriggerMetadata == nil {
84
+
return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
85
+
}
86
+
87
+
if record.TriggerMetadata.Repo == nil {
88
+
return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
89
+
}
90
+
91
+
// does this repo have a spindle configured?
92
+
repos, err := db.GetRepos(
93
+
d,
94
+
0,
95
+
orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
96
+
orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
97
+
)
98
+
if err != nil {
99
+
return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
100
+
}
101
+
if len(repos) != 1 {
102
+
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
103
+
}
104
+
if repos[0].Spindle == "" {
105
+
return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
106
+
}
107
+
108
+
// trigger info
109
+
var trigger models.Trigger
110
+
var sha string
111
+
trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
112
+
switch trigger.Kind {
113
+
case workflow.TriggerKindPush:
114
+
trigger.PushRef = &record.TriggerMetadata.Push.Ref
115
+
trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
116
+
trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
117
+
sha = *trigger.PushNewSha
118
+
case workflow.TriggerKindPullRequest:
119
+
trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
120
+
trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
121
+
trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
122
+
trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
123
+
sha = *trigger.PRSourceSha
124
+
}
125
+
126
+
tx, err := d.Begin()
127
+
if err != nil {
128
+
return fmt.Errorf("failed to start txn: %w", err)
129
+
}
130
+
131
+
triggerId, err := db.AddTrigger(tx, trigger)
132
+
if err != nil {
133
+
return fmt.Errorf("failed to add trigger entry: %w", err)
134
+
}
135
+
136
+
// TODO: we shouldn't even use knot to identify pipelines
137
+
knot := record.TriggerMetadata.Repo.Knot
138
+
pipeline := models.Pipeline{
139
+
Rkey: msg.Rkey,
140
+
Knot: knot,
141
+
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
142
+
RepoName: record.TriggerMetadata.Repo.Repo,
143
+
TriggerId: int(triggerId),
144
+
Sha: sha,
145
+
}
146
+
147
+
err = db.AddPipeline(tx, pipeline)
148
+
if err != nil {
149
+
return fmt.Errorf("failed to add pipeline: %w", err)
150
+
}
151
+
152
+
err = tx.Commit()
153
+
if err != nil {
154
+
return fmt.Errorf("failed to commit txn: %w", err)
155
+
}
156
+
157
+
l.Info("added pipeline", "pipeline", pipeline)
158
+
159
+
return nil
71
160
}
72
161
73
162
func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {