Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "encoding/json"
5 "fmt"
6 "time"
7
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/notifier"
10 "tangled.org/core/spindle/models"
11 "tangled.org/core/tid"
12)
13
14type Event struct {
15 Rkey string `json:"rkey"`
16 Nsid string `json:"nsid"`
17 Created int64 `json:"created"`
18 EventJson string `json:"event"`
19}
20
21func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error {
22 _, err := d.Exec(
23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`,
24 event.Rkey,
25 event.Nsid,
26 event.EventJson,
27 time.Now().UnixNano(),
28 )
29
30 notifier.NotifyAll()
31
32 return err
33}
34
35func (d *DB) GetEvents(cursor int64) ([]Event, error) {
36 whereClause := ""
37 args := []any{}
38 if cursor > 0 {
39 whereClause = "where created > ?"
40 args = append(args, cursor)
41 }
42
43 query := fmt.Sprintf(`
44 select rkey, nsid, event, created
45 from events
46 %s
47 order by created asc
48 limit 100
49 `, whereClause)
50
51 rows, err := d.Query(query, args...)
52 if err != nil {
53 return nil, err
54 }
55 defer rows.Close()
56
57 var evts []Event
58 for rows.Next() {
59 var ev Event
60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil {
61 return nil, err
62 }
63 evts = append(evts, ev)
64 }
65
66 if err := rows.Err(); err != nil {
67 return nil, err
68 }
69
70 return evts, nil
71}
72
73func (d *DB) createStatusEvent(
74 workflowId models.WorkflowId,
75 statusKind models.StatusKind,
76 workflowError *string,
77 exitCode *int64,
78 n *notifier.Notifier,
79) error {
80 now := time.Now()
81 pipelineAtUri := workflowId.PipelineId.AtUri()
82 s := tangled.PipelineStatus{
83 CreatedAt: now.Format(time.RFC3339),
84 Error: workflowError,
85 ExitCode: exitCode,
86 Pipeline: string(pipelineAtUri),
87 Workflow: workflowId.Name,
88 Status: string(statusKind),
89 }
90
91 eventJson, err := json.Marshal(s)
92 if err != nil {
93 return err
94 }
95
96 event := Event{
97 Rkey: tid.TID(),
98 Nsid: tangled.PipelineStatusNSID,
99 Created: now.UnixNano(),
100 EventJson: string(eventJson),
101 }
102
103 return d.insertEvent(event, n)
104
105}
106
107func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
108 pipelineAtUri := workflowId.PipelineId.AtUri()
109
110 var eventJson string
111 err := d.QueryRow(
112 `
113 select
114 event from events
115 where
116 nsid = ?
117 and json_extract(event, '$.pipeline') = ?
118 and json_extract(event, '$.workflow') = ?
119 order by
120 created desc
121 limit
122 1
123 `,
124 tangled.PipelineStatusNSID,
125 string(pipelineAtUri),
126 workflowId.Name,
127 ).Scan(&eventJson)
128
129 if err != nil {
130 return nil, err
131 }
132
133 var status tangled.PipelineStatus
134 if err := json.Unmarshal([]byte(eventJson), &status); err != nil {
135 return nil, err
136 }
137
138 return &status, nil
139}
140
141func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
142 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n)
143}
144
145func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error {
146 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n)
147}
148
149func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n)
151}
152
153func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
154 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n)
155}
156
157func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
158 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n)
159}
160
161func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error {
162 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n)
163}