Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/orm"
12)
13
14func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) {
15 var pipelines []models.Pipeline
16
17 var conditions []string
18 var args []any
19 for _, filter := range filters {
20 conditions = append(conditions, filter.Condition())
21 args = append(args, filter.Arg()...)
22 }
23
24 whereClause := ""
25 if conditions != nil {
26 whereClause = " where " + strings.Join(conditions, " and ")
27 }
28
29 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
30
31 rows, err := e.Query(query, args...)
32
33 if err != nil {
34 return nil, err
35 }
36 defer rows.Close()
37
38 for rows.Next() {
39 var pipeline models.Pipeline
40 var createdAt string
41 err = rows.Scan(
42 &pipeline.Id,
43 &pipeline.Rkey,
44 &pipeline.Knot,
45 &pipeline.RepoOwner,
46 &pipeline.RepoName,
47 &pipeline.Sha,
48 &createdAt,
49 )
50 if err != nil {
51 return nil, err
52 }
53
54 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
55 pipeline.Created = t
56 }
57
58 pipelines = append(pipelines, pipeline)
59 }
60
61 if err = rows.Err(); err != nil {
62 return nil, err
63 }
64
65 return pipelines, nil
66}
67
68func AddPipeline(e Execer, pipeline models.Pipeline) error {
69 args := []any{
70 pipeline.Rkey,
71 pipeline.Knot,
72 pipeline.RepoOwner,
73 pipeline.RepoName,
74 pipeline.TriggerId,
75 pipeline.Sha,
76 }
77
78 placeholders := make([]string, len(args))
79 for i := range placeholders {
80 placeholders[i] = "?"
81 }
82
83 query := fmt.Sprintf(`
84 insert or ignore into pipelines (
85 rkey,
86 knot,
87 repo_owner,
88 repo_name,
89 trigger_id,
90 sha
91 ) values (%s)
92 `, strings.Join(placeholders, ","))
93
94 _, err := e.Exec(query, args...)
95
96 return err
97}
98
99func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
100 args := []any{
101 trigger.Kind,
102 trigger.PushRef,
103 trigger.PushNewSha,
104 trigger.PushOldSha,
105 trigger.PRSourceBranch,
106 trigger.PRTargetBranch,
107 trigger.PRSourceSha,
108 trigger.PRAction,
109 }
110
111 placeholders := make([]string, len(args))
112 for i := range placeholders {
113 placeholders[i] = "?"
114 }
115
116 query := fmt.Sprintf(`insert or ignore into triggers (
117 kind,
118 push_ref,
119 push_new_sha,
120 push_old_sha,
121 pr_source_branch,
122 pr_target_branch,
123 pr_source_sha,
124 pr_action
125 ) values (%s)`, strings.Join(placeholders, ","))
126
127 res, err := e.Exec(query, args...)
128 if err != nil {
129 return 0, err
130 }
131
132 return res.LastInsertId()
133}
134
135func AddPipelineStatus(e Execer, status models.PipelineStatus) error {
136 args := []any{
137 status.Spindle,
138 status.Rkey,
139 status.PipelineKnot,
140 status.PipelineRkey,
141 status.Workflow,
142 status.Status,
143 status.Error,
144 status.ExitCode,
145 status.Created.Format(time.RFC3339),
146 }
147
148 placeholders := make([]string, len(args))
149 for i := range placeholders {
150 placeholders[i] = "?"
151 }
152
153 query := fmt.Sprintf(`
154 insert or ignore into pipeline_statuses (
155 spindle,
156 rkey,
157 pipeline_knot,
158 pipeline_rkey,
159 workflow,
160 status,
161 error,
162 exit_code,
163 created
164 ) values (%s)
165 `, strings.Join(placeholders, ","))
166
167 _, err := e.Exec(query, args...)
168 return err
169}
170
171// this is a mega query, but the most useful one:
172// get N pipelines, for each one get the latest status of its N workflows
173func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) {
174 var conditions []string
175 var args []any
176 for _, filter := range filters {
177 filter.Key = "p." + filter.Key // the table is aliased in the query to `p`
178 conditions = append(conditions, filter.Condition())
179 args = append(args, filter.Arg()...)
180 }
181
182 whereClause := ""
183 if conditions != nil {
184 whereClause = " where " + strings.Join(conditions, " and ")
185 }
186
187 query := fmt.Sprintf(`
188 select
189 p.id,
190 p.knot,
191 p.rkey,
192 p.repo_owner,
193 p.repo_name,
194 p.sha,
195 p.created,
196 t.id,
197 t.kind,
198 t.push_ref,
199 t.push_new_sha,
200 t.push_old_sha,
201 t.pr_source_branch,
202 t.pr_target_branch,
203 t.pr_source_sha,
204 t.pr_action
205 from
206 pipelines p
207 join
208 triggers t ON p.trigger_id = t.id
209 %s
210 order by p.created desc
211 limit %d
212 `, whereClause, limit)
213
214 rows, err := e.Query(query, args...)
215 if err != nil {
216 return nil, err
217 }
218 defer rows.Close()
219
220 pipelines := make(map[syntax.ATURI]models.Pipeline)
221 for rows.Next() {
222 var p models.Pipeline
223 var t models.Trigger
224 var created string
225
226 err := rows.Scan(
227 &p.Id,
228 &p.Knot,
229 &p.Rkey,
230 &p.RepoOwner,
231 &p.RepoName,
232 &p.Sha,
233 &created,
234 &p.TriggerId,
235 &t.Kind,
236 &t.PushRef,
237 &t.PushNewSha,
238 &t.PushOldSha,
239 &t.PRSourceBranch,
240 &t.PRTargetBranch,
241 &t.PRSourceSha,
242 &t.PRAction,
243 )
244 if err != nil {
245 return nil, err
246 }
247
248 p.Created, err = time.Parse(time.RFC3339, created)
249 if err != nil {
250 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
251 }
252
253 t.Id = p.TriggerId
254 p.Trigger = &t
255 p.Statuses = make(map[string]models.WorkflowStatus)
256
257 pipelines[p.AtUri()] = p
258 }
259
260 // get all statuses
261 // the where clause here is of the form:
262 //
263 // where (pipeline_knot = k1 and pipeline_rkey = r1)
264 // or (pipeline_knot = k2 and pipeline_rkey = r2)
265 conditions = nil
266 args = nil
267 for _, p := range pipelines {
268 knotFilter := orm.FilterEq("pipeline_knot", p.Knot)
269 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey)
270 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
271 args = append(args, p.Knot)
272 args = append(args, p.Rkey)
273 }
274 whereClause = ""
275 if conditions != nil {
276 whereClause = "where " + strings.Join(conditions, " or ")
277 }
278 query = fmt.Sprintf(`
279 select
280 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
281 from
282 pipeline_statuses
283 %s
284 `, whereClause)
285
286 rows, err = e.Query(query, args...)
287 if err != nil {
288 return nil, err
289 }
290 defer rows.Close()
291
292 for rows.Next() {
293 var ps models.PipelineStatus
294 var created string
295
296 err := rows.Scan(
297 &ps.ID,
298 &ps.Spindle,
299 &ps.Rkey,
300 &ps.PipelineKnot,
301 &ps.PipelineRkey,
302 &created,
303 &ps.Workflow,
304 &ps.Status,
305 &ps.Error,
306 &ps.ExitCode,
307 )
308 if err != nil {
309 return nil, err
310 }
311
312 ps.Created, err = time.Parse(time.RFC3339, created)
313 if err != nil {
314 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
315 }
316
317 pipelineAt := ps.PipelineAt()
318
319 // extract
320 pipeline, ok := pipelines[pipelineAt]
321 if !ok {
322 continue
323 }
324 statuses, _ := pipeline.Statuses[ps.Workflow]
325 if !ok {
326 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
327 }
328
329 // append
330 statuses.Data = append(statuses.Data, ps)
331
332 // reassign
333 pipeline.Statuses[ps.Workflow] = statuses
334 pipelines[pipelineAt] = pipeline
335 }
336
337 var all []models.Pipeline
338 for _, p := range pipelines {
339 for _, s := range p.Statuses {
340 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
341 if a.Created.After(b.Created) {
342 return 1
343 }
344 if a.Created.Before(b.Created) {
345 return -1
346 }
347 if a.ID > b.ID {
348 return 1
349 }
350 if a.ID < b.ID {
351 return -1
352 }
353 return 0
354 })
355 }
356 all = append(all, p)
357 }
358
359 // sort pipelines by date
360 slices.SortFunc(all, func(a, b models.Pipeline) int {
361 if a.Created.After(b.Created) {
362 return -1
363 }
364 return 1
365 })
366
367 return all, nil
368}