forked from tangled.org/core
this repo has no description

spindle: db: setup pipelines table & helpers

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi baeaadbf 55ef369a

verified
Changed files
+141 -38
spindle
+17 -4
spindle/db/db.go
··· 1 1 package db 2 2 3 - import "database/sql" 3 + import ( 4 + "database/sql" 5 + 6 + _ "github.com/mattn/go-sqlite3" 7 + ) 4 8 5 9 type DB struct { 6 10 *sql.DB ··· 27 31 ); 28 32 29 33 create table if not exists pipelines ( 30 - rkey text not null, 31 - pipeline text not null, -- json 32 - primary key rkey 34 + at_uri text not null, 35 + status text not null, 36 + 37 + -- only set if status is 'failed' 38 + error text, 39 + exit_code integer, 40 + 41 + started_at timestamp not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 42 + updated_at timestamp not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 43 + finished_at timestamp, 44 + 45 + primary key (at_uri) 33 46 ); 34 47 `) 35 48 if err != nil {
+124 -34
spindle/db/pipelines.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 + "time" 6 + 7 + "tangled.sh/tangled.sh/core/api/tangled" 8 + "tangled.sh/tangled.sh/core/knotserver/notifier" 9 + ) 10 + 11 + type PipelineStatus string 12 + 13 + var ( 14 + PipelinePending PipelineStatus = "pending" 15 + PipelineRunning PipelineStatus = "running" 16 + PipelineFailed PipelineStatus = "failed" 17 + PipelineTimeout PipelineStatus = "timeout" 18 + PipelineCancelled PipelineStatus = "cancelled" 19 + PipelineSuccess PipelineStatus = "success" 5 20 ) 6 21 7 22 type Pipeline struct { 8 - Rkey string 9 - PipelineJson string 23 + Rkey string `json:"rkey"` 24 + Knot string `json:"knot"` 25 + Status PipelineStatus `json:"status"` 26 + 27 + // only if Failed 28 + Error string `json:"error"` 29 + ExitCode int `json:"exit_code"` 30 + 31 + StartedAt time.Time `json:"started_at"` 32 + UpdatedAt time.Time `json:"updated_at"` 33 + FinishedAt time.Time `json:"finished_at"` 10 34 } 11 35 12 - func (d *DB) InsertPipeline(pipeline Pipeline) error { 13 - _, err := d.Exec( 14 - `insert into pipelines (rkey, nsid, event) values (?, ?, ?)`, 15 - pipeline.Rkey, 16 - pipeline.PipelineJson, 17 - ) 36 + func (p Pipeline) AsRecord() *tangled.PipelineStatus { 37 + exitCode64 := int64(p.ExitCode) 38 + finishedAt := p.FinishedAt.String() 39 + 40 + return &tangled.PipelineStatus{ 41 + Pipeline: fmt.Sprintf("at://%s/%s", p.Knot, p.Rkey), 42 + Status: string(p.Status), 43 + 44 + ExitCode: &exitCode64, 45 + Error: &p.Error, 46 + 47 + StartedAt: p.StartedAt.String(), 48 + UpdatedAt: p.UpdatedAt.String(), 49 + FinishedAt: &finishedAt, 50 + } 51 + } 18 52 19 - return err 53 + func pipelineAtUri(rkey, knot string) string { 54 + return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey) 20 55 } 21 56 22 - func (d *DB) GetPipeline(rkey, cursor string) (Pipeline, error) { 23 - whereClause := "where rkey = ?" 24 - args := []any{rkey} 57 + func (db *DB) CreatePipeline(rkey, knot string, n *notifier.Notifier) error { 58 + _, err := db.Exec(` 59 + insert into pipelines (at_uri, status) 60 + values (?, ?) 61 + `, pipelineAtUri(rkey, knot), PipelinePending) 25 62 26 - if cursor != "" { 27 - whereClause += " and rkey > ?" 28 - args = append(args, cursor) 63 + if err != nil { 64 + return err 29 65 } 66 + n.NotifyAll() 67 + return nil 68 + } 30 69 31 - query := fmt.Sprintf(` 32 - select rkey, pipeline 33 - from pipelines 34 - %s 35 - limit 1 36 - `, whereClause) 70 + func (db *DB) MarkPipelineRunning(rkey, knot string, n *notifier.Notifier) error { 71 + _, err := db.Exec(` 72 + update pipelines 73 + set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 74 + where at_uri = ? 75 + `, PipelineRunning, pipelineAtUri(rkey, knot)) 37 76 38 - row := d.QueryRow(query, args...) 77 + if err != nil { 78 + return err 79 + } 80 + n.NotifyAll() 81 + return nil 82 + } 83 + 84 + func (db *DB) MarkPipelineFailed(rkey, knot string, exitCode int, errorMsg string, n *notifier.Notifier) error { 85 + _, err := db.Exec(` 86 + update pipelines 87 + set status = ?, 88 + exit_code = ?, 89 + error = ?, 90 + updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 91 + finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 92 + where at_uri = ? 93 + `, PipelineFailed, exitCode, errorMsg, pipelineAtUri(rkey, knot)) 94 + if err != nil { 95 + return err 96 + } 97 + n.NotifyAll() 98 + return nil 99 + } 100 + 101 + func (db *DB) MarkPipelineTimeout(rkey, knot string, n *notifier.Notifier) error { 102 + _, err := db.Exec(` 103 + update pipelines 104 + set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 105 + where at_uri = ? 106 + `, PipelineTimeout, pipelineAtUri(rkey, knot)) 107 + if err != nil { 108 + return err 109 + } 110 + n.NotifyAll() 111 + return nil 112 + } 113 + 114 + func (db *DB) MarkPipelineSuccess(rkey, knot string, n *notifier.Notifier) error { 115 + _, err := db.Exec(` 116 + update pipelines 117 + set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 118 + finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 119 + where at_uri = ? 120 + `, PipelineSuccess, pipelineAtUri(rkey, knot)) 39 121 40 - var p Pipeline 41 - err := row.Scan(&p.Rkey, &p.PipelineJson) 42 122 if err != nil { 43 - return Pipeline{}, err 123 + return err 44 124 } 125 + n.NotifyAll() 126 + return nil 127 + } 45 128 46 - return p, nil 129 + func (db *DB) GetPipeline(rkey, knot string) (Pipeline, error) { 130 + var p Pipeline 131 + err := db.QueryRow(` 132 + select rkey, status, error, exit_code, started_at, updated_at, finished_at 133 + from pipelines 134 + where at_uri = ? 135 + `, pipelineAtUri(rkey, knot)).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 136 + return p, err 47 137 } 48 138 49 - func (d *DB) GetPipelines(cursor string) ([]Pipeline, error) { 139 + func (db *DB) GetPipelines(cursor string) ([]Pipeline, error) { 50 140 whereClause := "" 51 141 args := []any{} 52 142 if cursor != "" { ··· 55 145 } 56 146 57 147 query := fmt.Sprintf(` 58 - select rkey, nsid, pipeline 148 + select rkey, status, error, exit_code, started_at, updated_at, finished_at 59 149 from pipelines 60 150 %s 61 151 order by rkey asc 62 152 limit 100 63 153 `, whereClause) 64 154 65 - rows, err := d.Query(query, args...) 155 + rows, err := db.Query(query, args...) 66 156 if err != nil { 67 157 return nil, err 68 158 } 69 159 defer rows.Close() 70 160 71 - var evts []Pipeline 161 + var pipelines []Pipeline 72 162 for rows.Next() { 73 - var ev Pipeline 74 - rows.Scan(&ev.Rkey, &ev.PipelineJson) 75 - evts = append(evts, ev) 163 + var p Pipeline 164 + rows.Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 165 + pipelines = append(pipelines, p) 76 166 } 77 167 78 168 if err := rows.Err(); err != nil { 79 169 return nil, err 80 170 } 81 171 82 - return evts, nil 172 + return pipelines, nil 83 173 }