forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package db
2
3import (
4 "encoding/json"
5 "fmt"
6 "time"
7
8 "tangled.sh/tangled.sh/core/api/tangled"
9 "tangled.sh/tangled.sh/core/notifier"
10 "tangled.sh/tangled.sh/core/spindle/models"
11 "tangled.sh/tangled.sh/core/tid"
12)
13
14type Event struct {
15 Rkey string `json:"rkey"`
16 Nsid string `json:"nsid"`
17 Created int64 `json:"created"`
18 EventJson string `json:"event"`
19}
20
21func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error {
22 _, err := d.Exec(
23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`,
24 event.Rkey,
25 event.Nsid,
26 event.EventJson,
27 time.Now().UnixNano(),
28 )
29
30 notifier.NotifyAll()
31
32 return err
33}
34
35func (d *DB) GetEvents(cursor int64) ([]Event, error) {
36 whereClause := ""
37 args := []any{}
38 if cursor > 0 {
39 whereClause = "where created > ?"
40 args = append(args, cursor)
41 }
42
43 query := fmt.Sprintf(`
44 select rkey, nsid, event, created
45 from events
46 %s
47 order by created asc
48 limit 100
49 `, whereClause)
50
51 rows, err := d.Query(query, args...)
52 if err != nil {
53 return nil, err
54 }
55 defer rows.Close()
56
57 var evts []Event
58 for rows.Next() {
59 var ev Event
60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil {
61 return nil, err
62 }
63 evts = append(evts, ev)
64 }
65
66 if err := rows.Err(); err != nil {
67 return nil, err
68 }
69
70 return evts, nil
71}
72
73func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error {
74 eventJson, err := json.Marshal(s)
75 if err != nil {
76 return err
77 }
78
79 event := Event{
80 Rkey: rkey,
81 Nsid: tangled.PipelineStatusNSID,
82 Created: time.Now().UnixNano(),
83 EventJson: string(eventJson),
84 }
85
86 return d.InsertEvent(event, n)
87}
88
89func (d *DB) createStatusEvent(
90 workflowId models.WorkflowId,
91 statusKind models.StatusKind,
92 workflowError *string,
93 exitCode *int64,
94 n *notifier.Notifier,
95) error {
96 now := time.Now()
97 pipelineAtUri := workflowId.PipelineId.AtUri()
98 s := tangled.PipelineStatus{
99 CreatedAt: now.Format(time.RFC3339),
100 Error: workflowError,
101 ExitCode: exitCode,
102 Pipeline: string(pipelineAtUri),
103 Workflow: workflowId.Name,
104 Status: string(statusKind),
105 }
106
107 eventJson, err := json.Marshal(s)
108 if err != nil {
109 return err
110 }
111
112 event := Event{
113 Rkey: tid.TID(),
114 Nsid: tangled.PipelineStatusNSID,
115 Created: now.UnixNano(),
116 EventJson: string(eventJson),
117 }
118
119 return d.InsertEvent(event, n)
120
121}
122
123func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
124 pipelineAtUri := workflowId.PipelineId.AtUri()
125
126 var eventJson string
127 err := d.QueryRow(
128 `
129 select
130 event from events
131 where
132 nsid = ?
133 and json_extract(event, '$.pipeline') = ?
134 and json_extract(event, '$.workflow') = ?
135 order by
136 created desc
137 limit
138 1
139 `,
140 tangled.PipelineStatusNSID,
141 string(pipelineAtUri),
142 workflowId.Name,
143 ).Scan(&eventJson)
144
145 if err != nil {
146 return nil, err
147 }
148
149 var status tangled.PipelineStatus
150 if err := json.Unmarshal([]byte(eventJson), &status); err != nil {
151 return nil, err
152 }
153
154 return &status, nil
155}
156
157func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
158 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n)
159}
160
161func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error {
162 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n)
163}
164
165func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
166 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n)
167}
168
169func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
170 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n)
171}
172
173func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error {
174 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n)
175}