Monorepo for Tangled tangled.org
1package db 2 3import ( 4 "encoding/json" 5 "fmt" 6 "time" 7 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/notifier" 10 "tangled.org/core/spindle/models" 11 "tangled.org/core/tid" 12) 13 14type Event struct { 15 Rkey string `json:"rkey"` 16 Nsid string `json:"nsid"` 17 Created int64 `json:"created"` 18 EventJson string `json:"event"` 19} 20 21func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error { 22 _, err := d.Exec( 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 event.Rkey, 25 event.Nsid, 26 event.EventJson, 27 time.Now().UnixNano(), 28 ) 29 30 notifier.NotifyAll() 31 32 return err 33} 34 35func (d *DB) GetEvents(cursor int64) ([]Event, error) { 36 whereClause := "" 37 args := []any{} 38 if cursor > 0 { 39 whereClause = "where created > ?" 40 args = append(args, cursor) 41 } 42 43 query := fmt.Sprintf(` 44 select rkey, nsid, event, created 45 from events 46 %s 47 order by created asc 48 limit 100 49 `, whereClause) 50 51 rows, err := d.Query(query, args...) 52 if err != nil { 53 return nil, err 54 } 55 defer rows.Close() 56 57 var evts []Event 58 for rows.Next() { 59 var ev Event 60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 61 return nil, err 62 } 63 evts = append(evts, ev) 64 } 65 66 if err := rows.Err(); err != nil { 67 return nil, err 68 } 69 70 return evts, nil 71} 72 73func (d *DB) createStatusEvent( 74 workflowId models.WorkflowId, 75 statusKind models.StatusKind, 76 workflowError *string, 77 exitCode *int64, 78 n *notifier.Notifier, 79) error { 80 now := time.Now() 81 pipelineAtUri := workflowId.PipelineId.AtUri() 82 s := tangled.PipelineStatus{ 83 CreatedAt: now.Format(time.RFC3339), 84 Error: workflowError, 85 ExitCode: exitCode, 86 Pipeline: string(pipelineAtUri), 87 Workflow: workflowId.Name, 88 Status: string(statusKind), 89 } 90 91 eventJson, err := json.Marshal(s) 92 if err != nil { 93 return err 94 } 95 96 event := Event{ 97 Rkey: tid.TID(), 98 Nsid: tangled.PipelineStatusNSID, 99 Created: now.UnixNano(), 100 EventJson: string(eventJson), 101 } 102 103 return d.insertEvent(event, n) 104 105} 106 107func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) { 108 pipelineAtUri := workflowId.PipelineId.AtUri() 109 110 var eventJson string 111 err := d.QueryRow( 112 ` 113 select 114 event from events 115 where 116 nsid = ? 117 and json_extract(event, '$.pipeline') = ? 118 and json_extract(event, '$.workflow') = ? 119 order by 120 created desc 121 limit 122 1 123 `, 124 tangled.PipelineStatusNSID, 125 string(pipelineAtUri), 126 workflowId.Name, 127 ).Scan(&eventJson) 128 129 if err != nil { 130 return nil, err 131 } 132 133 var status tangled.PipelineStatus 134 if err := json.Unmarshal([]byte(eventJson), &status); err != nil { 135 return nil, err 136 } 137 138 return &status, nil 139} 140 141func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 142 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n) 143} 144 145func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 146 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n) 147} 148 149func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151} 152 153func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 154 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 155} 156 157func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 158 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 159} 160 161func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error { 162 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n) 163}