forked from tangled.org/core
Monorepo for Tangled

spindle: rework db schema

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 82d62fcf a8a3163d

verified
Changed files
+653 -408
api
cmd
spindle
lexicons
pipeline
spindle
tid
+47 -70
api/tangled/cbor_gen.go
··· 1816 1816 fieldCount-- 1817 1817 } 1818 1818 1819 - if t.FinishedAt == nil { 1820 - fieldCount-- 1821 - } 1822 - 1823 1819 if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 1824 1820 return err 1825 1821 } ··· 1930 1926 1931 1927 } 1932 1928 1933 - // t.StartedAt (string) (string) 1934 - if len("startedAt") > 1000000 { 1935 - return xerrors.Errorf("Value in field \"startedAt\" was too long") 1929 + // t.Pipeline (string) (string) 1930 + if len("pipeline") > 1000000 { 1931 + return xerrors.Errorf("Value in field \"pipeline\" was too long") 1936 1932 } 1937 1933 1938 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("startedAt"))); err != nil { 1934 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("pipeline"))); err != nil { 1939 1935 return err 1940 1936 } 1941 - if _, err := cw.WriteString(string("startedAt")); err != nil { 1937 + if _, err := cw.WriteString(string("pipeline")); err != nil { 1942 1938 return err 1943 1939 } 1944 1940 1945 - if len(t.StartedAt) > 1000000 { 1946 - return xerrors.Errorf("Value in field t.StartedAt was too long") 1941 + if len(t.Pipeline) > 1000000 { 1942 + return xerrors.Errorf("Value in field t.Pipeline was too long") 1947 1943 } 1948 1944 1949 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.StartedAt))); err != nil { 1945 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Pipeline))); err != nil { 1950 1946 return err 1951 1947 } 1952 - if _, err := cw.WriteString(string(t.StartedAt)); err != nil { 1948 + if _, err := cw.WriteString(string(t.Pipeline)); err != nil { 1953 1949 return err 1954 1950 } 1955 1951 1956 - // t.UpdatedAt (string) (string) 1957 - if len("updatedAt") > 1000000 { 1958 - return xerrors.Errorf("Value in field \"updatedAt\" was too long") 1952 + // t.Workflow (string) (string) 1953 + if len("workflow") > 1000000 { 1954 + return xerrors.Errorf("Value in field \"workflow\" was too long") 1959 1955 } 1960 1956 1961 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("updatedAt"))); err != nil { 1957 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("workflow"))); err != nil { 1962 1958 return err 1963 1959 } 1964 - if _, err := cw.WriteString(string("updatedAt")); err != nil { 1960 + if _, err := cw.WriteString(string("workflow")); err != nil { 1965 1961 return err 1966 1962 } 1967 1963 1968 - if len(t.UpdatedAt) > 1000000 { 1969 - return xerrors.Errorf("Value in field t.UpdatedAt was too long") 1964 + if len(t.Workflow) > 1000000 { 1965 + return xerrors.Errorf("Value in field t.Workflow was too long") 1970 1966 } 1971 1967 1972 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.UpdatedAt))); err != nil { 1968 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Workflow))); err != nil { 1973 1969 return err 1974 1970 } 1975 - if _, err := cw.WriteString(string(t.UpdatedAt)); err != nil { 1971 + if _, err := cw.WriteString(string(t.Workflow)); err != nil { 1976 1972 return err 1977 1973 } 1978 1974 1979 - // t.FinishedAt (string) (string) 1980 - if t.FinishedAt != nil { 1975 + // t.CreatedAt (string) (string) 1976 + if len("createdAt") > 1000000 { 1977 + return xerrors.Errorf("Value in field \"createdAt\" was too long") 1978 + } 1981 1979 1982 - if len("finishedAt") > 1000000 { 1983 - return xerrors.Errorf("Value in field \"finishedAt\" was too long") 1984 - } 1980 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("createdAt"))); err != nil { 1981 + return err 1982 + } 1983 + if _, err := cw.WriteString(string("createdAt")); err != nil { 1984 + return err 1985 + } 1985 1986 1986 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("finishedAt"))); err != nil { 1987 - return err 1988 - } 1989 - if _, err := cw.WriteString(string("finishedAt")); err != nil { 1990 - return err 1991 - } 1987 + if len(t.CreatedAt) > 1000000 { 1988 + return xerrors.Errorf("Value in field t.CreatedAt was too long") 1989 + } 1992 1990 1993 - if t.FinishedAt == nil { 1994 - if _, err := cw.Write(cbg.CborNull); err != nil { 1995 - return err 1996 - } 1997 - } else { 1998 - if len(*t.FinishedAt) > 1000000 { 1999 - return xerrors.Errorf("Value in field t.FinishedAt was too long") 2000 - } 2001 - 2002 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.FinishedAt))); err != nil { 2003 - return err 2004 - } 2005 - if _, err := cw.WriteString(string(*t.FinishedAt)); err != nil { 2006 - return err 2007 - } 2008 - } 1991 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.CreatedAt))); err != nil { 1992 + return err 1993 + } 1994 + if _, err := cw.WriteString(string(t.CreatedAt)); err != nil { 1995 + return err 2009 1996 } 2010 1997 return nil 2011 1998 } ··· 2035 2022 2036 2023 n := extra 2037 2024 2038 - nameBuf := make([]byte, 10) 2025 + nameBuf := make([]byte, 9) 2039 2026 for i := uint64(0); i < n; i++ { 2040 2027 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 2041 2028 if err != nil { ··· 2130 2117 t.ExitCode = (*int64)(&extraI) 2131 2118 } 2132 2119 } 2133 - // t.StartedAt (string) (string) 2134 - case "startedAt": 2120 + // t.Pipeline (string) (string) 2121 + case "pipeline": 2135 2122 2136 2123 { 2137 2124 sval, err := cbg.ReadStringWithMax(cr, 1000000) ··· 2139 2126 return err 2140 2127 } 2141 2128 2142 - t.StartedAt = string(sval) 2129 + t.Pipeline = string(sval) 2143 2130 } 2144 - // t.UpdatedAt (string) (string) 2145 - case "updatedAt": 2131 + // t.Workflow (string) (string) 2132 + case "workflow": 2146 2133 2147 2134 { 2148 2135 sval, err := cbg.ReadStringWithMax(cr, 1000000) ··· 2150 2137 return err 2151 2138 } 2152 2139 2153 - t.UpdatedAt = string(sval) 2140 + t.Workflow = string(sval) 2154 2141 } 2155 - // t.FinishedAt (string) (string) 2156 - case "finishedAt": 2142 + // t.CreatedAt (string) (string) 2143 + case "createdAt": 2157 2144 2158 2145 { 2159 - b, err := cr.ReadByte() 2146 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 2160 2147 if err != nil { 2161 2148 return err 2162 2149 } 2163 - if b != cbg.CborNull[0] { 2164 - if err := cr.UnreadByte(); err != nil { 2165 - return err 2166 - } 2167 2150 2168 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 2169 - if err != nil { 2170 - return err 2171 - } 2172 - 2173 - t.FinishedAt = (*string)(&sval) 2174 - } 2151 + t.CreatedAt = string(sval) 2175 2152 } 2176 2153 2177 2154 default:
+6 -8
api/tangled/pipelinestatus.go
··· 18 18 // RECORDTYPE: PipelineStatus 19 19 type PipelineStatus struct { 20 20 LexiconTypeID string `json:"$type,const=sh.tangled.pipeline.status" cborgen:"$type,const=sh.tangled.pipeline.status"` 21 + // createdAt: time of creation of this status update 22 + CreatedAt string `json:"createdAt" cborgen:"createdAt"` 21 23 // error: error message if failed 22 24 Error *string `json:"error,omitempty" cborgen:"error,omitempty"` 23 25 // exitCode: exit code if failed 24 26 ExitCode *int64 `json:"exitCode,omitempty" cborgen:"exitCode,omitempty"` 25 - // finishedAt: pipeline finish time, if finished 26 - FinishedAt *string `json:"finishedAt,omitempty" cborgen:"finishedAt,omitempty"` 27 - // pipeline: pipeline at ref 27 + // pipeline: ATURI of the pipeline 28 28 Pipeline string `json:"pipeline" cborgen:"pipeline"` 29 - // startedAt: pipeline start time 30 - StartedAt string `json:"startedAt" cborgen:"startedAt"` 31 - // status: Pipeline status 29 + // status: status of the workflow 32 30 Status string `json:"status" cborgen:"status"` 33 - // updatedAt: pipeline last updated time 34 - UpdatedAt string `json:"updatedAt" cborgen:"updatedAt"` 31 + // workflow: name of the workflow within this pipeline 32 + Workflow string `json:"workflow" cborgen:"workflow"` 35 33 }
+1 -1
api/tangled/tangledpipeline.go
··· 85 85 // Pipeline_Workflow is a "workflow" in the sh.tangled.pipeline schema. 86 86 type Pipeline_Workflow struct { 87 87 Clone *Pipeline_CloneOpts `json:"clone" cborgen:"clone"` 88 - Dependencies []Pipeline_Dependencies_Elem `json:"dependencies" cborgen:"dependencies"` 88 + Dependencies []Pipeline_Dependencies_Elem `json:"dependencies" cborgen:"dependencies"` 89 89 Environment []*Pipeline_Workflow_Environment_Elem `json:"environment" cborgen:"environment"` 90 90 Name string `json:"name" cborgen:"name"` 91 91 Steps []*Pipeline_Step `json:"steps" cborgen:"steps"`
+1
cmd/spindle/main.go
··· 6 6 7 7 "tangled.sh/tangled.sh/core/log" 8 8 "tangled.sh/tangled.sh/core/spindle" 9 + _ "tangled.sh/tangled.sh/core/tid" 9 10 ) 10 11 11 12 func main() {
+13 -18
lexicons/pipeline/status.json
··· 9 9 "key": "tid", 10 10 "record": { 11 11 "type": "object", 12 - "required": ["pipeline", "status", "startedAt", "updatedAt"], 12 + "required": ["pipeline", "workflow", "status", "createdAt"], 13 13 "properties": { 14 14 "pipeline": { 15 15 "type": "string", 16 16 "format": "at-uri", 17 - "description": "pipeline at ref" 17 + "description": "ATURI of the pipeline" 18 + }, 19 + "workflow": { 20 + "type": "string", 21 + "format": "at-uri", 22 + "description": "name of the workflow within this pipeline" 18 23 }, 19 24 "status": { 20 25 "type": "string", 21 - "description": "Pipeline status", 26 + "description": "status of the workflow", 22 27 "enum": [ 23 28 "pending", 24 29 "running", ··· 27 32 "cancelled", 28 33 "success" 29 34 ] 35 + }, 36 + "createdAt": { 37 + "type": "string", 38 + "format": "datetime", 39 + "description": "time of creation of this status update" 30 40 }, 31 41 "error": { 32 42 "type": "string", ··· 35 45 "exitCode": { 36 46 "type": "integer", 37 47 "description": "exit code if failed" 38 - }, 39 - "startedAt": { 40 - "type": "string", 41 - "format": "datetime", 42 - "description": "pipeline start time" 43 - }, 44 - "updatedAt": { 45 - "type": "string", 46 - "format": "datetime", 47 - "description": "pipeline last updated time" 48 - }, 49 - "finishedAt": { 50 - "type": "string", 51 - "format": "datetime", 52 - "description": "pipeline finish time, if finished" 53 48 } 54 49 } 55 50 }
+5 -13
spindle/db/db.go
··· 30 30 did text primary key 31 31 ); 32 32 33 - create table if not exists pipeline_status ( 33 + -- status event for a single workflow 34 + create table if not exists events ( 34 35 rkey text not null, 35 - pipeline text not null, 36 - status text not null, 37 - 38 - -- only set if status is 'failed' 39 - error text, 40 - exit_code integer, 41 - 42 - started_at timestamp not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 43 - updated_at timestamp not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 44 - finished_at timestamp, 45 - 46 - primary key (rkey) 36 + nsid text not null, 37 + event text not null, -- json 38 + created integer not null -- unix nanos 47 39 ); 48 40 `) 49 41 if err != nil {
+148
spindle/db/events.go
··· 1 + package db 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "time" 7 + 8 + "tangled.sh/tangled.sh/core/api/tangled" 9 + "tangled.sh/tangled.sh/core/notifier" 10 + "tangled.sh/tangled.sh/core/spindle/models" 11 + "tangled.sh/tangled.sh/core/tid" 12 + ) 13 + 14 + type Event struct { 15 + Rkey string `json:"rkey"` 16 + Nsid string `json:"nsid"` 17 + Created int64 `json:"created"` 18 + EventJson string `json:"event"` 19 + } 20 + 21 + func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 22 + _, err := d.Exec( 23 + `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 + event.Rkey, 25 + event.Nsid, 26 + event.EventJson, 27 + time.Now().UnixNano(), 28 + ) 29 + 30 + notifier.NotifyAll() 31 + 32 + return err 33 + } 34 + 35 + func (d *DB) GetEvents(cursor int64) ([]Event, error) { 36 + whereClause := "" 37 + args := []any{} 38 + if cursor > 0 { 39 + whereClause = "where created > ?" 40 + args = append(args, cursor) 41 + } 42 + 43 + query := fmt.Sprintf(` 44 + select rkey, nsid, event, created 45 + from events 46 + %s 47 + order by created asc 48 + limit 100 49 + `, whereClause) 50 + 51 + rows, err := d.Query(query, args...) 52 + if err != nil { 53 + return nil, err 54 + } 55 + defer rows.Close() 56 + 57 + var evts []Event 58 + for rows.Next() { 59 + var ev Event 60 + if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 61 + return nil, err 62 + } 63 + evts = append(evts, ev) 64 + } 65 + 66 + if err := rows.Err(); err != nil { 67 + return nil, err 68 + } 69 + 70 + return evts, nil 71 + } 72 + 73 + func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error { 74 + eventJson, err := json.Marshal(s) 75 + if err != nil { 76 + return err 77 + } 78 + 79 + event := Event{ 80 + Rkey: rkey, 81 + Nsid: tangled.PipelineStatusNSID, 82 + Created: time.Now().UnixNano(), 83 + EventJson: string(eventJson), 84 + } 85 + 86 + return d.InsertEvent(event, n) 87 + } 88 + 89 + type StatusKind string 90 + 91 + var ( 92 + StatusKindPending StatusKind = "pending" 93 + StatusKindRunning StatusKind = "running" 94 + StatusKindFailed StatusKind = "failed" 95 + StatusKindTimeout StatusKind = "timeout" 96 + StatusKindCancelled StatusKind = "cancelled" 97 + StatusKindSuccess StatusKind = "success" 98 + ) 99 + 100 + func (d *DB) createStatusEvent( 101 + workflowId models.WorkflowId, 102 + statusKind StatusKind, 103 + workflowError *string, 104 + exitCode *int64, 105 + n *notifier.Notifier, 106 + ) error { 107 + now := time.Now() 108 + pipelineAtUri := workflowId.PipelineId.AtUri() 109 + s := tangled.PipelineStatus{ 110 + CreatedAt: now.Format(time.RFC3339), 111 + Error: workflowError, 112 + ExitCode: exitCode, 113 + Pipeline: string(pipelineAtUri), 114 + Workflow: workflowId.Name, 115 + Status: string(statusKind), 116 + } 117 + 118 + eventJson, err := json.Marshal(s) 119 + if err != nil { 120 + return err 121 + } 122 + 123 + event := Event{ 124 + Rkey: tid.TID(), 125 + Nsid: tangled.PipelineStatusNSID, 126 + Created: now.UnixNano(), 127 + EventJson: string(eventJson), 128 + } 129 + 130 + return d.InsertEvent(event, n) 131 + 132 + } 133 + 134 + func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 135 + return d.createStatusEvent(workflowId, StatusKindPending, nil, nil, n) 136 + } 137 + 138 + func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 139 + return d.createStatusEvent(workflowId, StatusKindRunning, nil, nil, n) 140 + } 141 + 142 + func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 143 + return d.createStatusEvent(workflowId, StatusKindFailed, &workflowError, &exitCode, n) 144 + } 145 + 146 + func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 147 + return d.createStatusEvent(workflowId, StatusKindSuccess, nil, nil, n) 148 + }
+205 -177
spindle/db/pipelines.go
··· 1 1 package db 2 2 3 - import ( 4 - "fmt" 5 - "time" 6 - 7 - "tangled.sh/tangled.sh/core/api/tangled" 8 - "tangled.sh/tangled.sh/core/notifier" 9 - ) 10 - 11 - type PipelineRunStatus string 12 - 13 - var ( 14 - PipelinePending PipelineRunStatus = "pending" 15 - PipelineRunning PipelineRunStatus = "running" 16 - PipelineFailed PipelineRunStatus = "failed" 17 - PipelineTimeout PipelineRunStatus = "timeout" 18 - PipelineCancelled PipelineRunStatus = "cancelled" 19 - PipelineSuccess PipelineRunStatus = "success" 20 - ) 21 - 22 - type PipelineStatus struct { 23 - Rkey string `json:"rkey"` 24 - Pipeline string `json:"pipeline"` 25 - Status PipelineRunStatus `json:"status"` 26 - 27 - // only if Failed 28 - Error string `json:"error"` 29 - ExitCode int `json:"exit_code"` 30 - 31 - StartedAt time.Time `json:"started_at"` 32 - UpdatedAt time.Time `json:"updated_at"` 33 - FinishedAt time.Time `json:"finished_at"` 34 - } 35 - 36 - func (p PipelineStatus) AsRecord() *tangled.PipelineStatus { 37 - exitCode64 := int64(p.ExitCode) 38 - finishedAt := p.FinishedAt.String() 39 - 40 - return &tangled.PipelineStatus{ 41 - LexiconTypeID: tangled.PipelineStatusNSID, 42 - Pipeline: p.Pipeline, 43 - Status: string(p.Status), 44 - 45 - ExitCode: &exitCode64, 46 - Error: &p.Error, 47 - 48 - StartedAt: p.StartedAt.String(), 49 - UpdatedAt: p.UpdatedAt.String(), 50 - FinishedAt: &finishedAt, 51 - } 52 - } 53 - 54 - func pipelineAtUri(rkey, knot string) string { 55 - return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey) 56 - } 57 - 58 - func (db *DB) CreatePipeline(rkey, pipeline string, n *notifier.Notifier) error { 59 - _, err := db.Exec(` 60 - insert into pipeline_status (rkey, status, pipeline) 61 - values (?, ?, ?) 62 - `, rkey, PipelinePending, pipeline) 63 - 64 - if err != nil { 65 - return err 66 - } 67 - n.NotifyAll() 68 - return nil 69 - } 70 - 71 - func (db *DB) MarkPipelineRunning(rkey string, n *notifier.Notifier) error { 72 - _, err := db.Exec(` 73 - update pipeline_status 74 - set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 75 - where rkey = ? 76 - `, PipelineRunning, rkey) 77 - 78 - if err != nil { 79 - return err 80 - } 81 - n.NotifyAll() 82 - return nil 83 - } 84 - 85 - func (db *DB) MarkPipelineFailed(rkey string, exitCode int, errorMsg string, n *notifier.Notifier) error { 86 - _, err := db.Exec(` 87 - update pipeline_status 88 - set status = ?, 89 - exit_code = ?, 90 - error = ?, 91 - updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 92 - finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 93 - where rkey = ? 94 - `, PipelineFailed, exitCode, errorMsg, rkey) 95 - if err != nil { 96 - return err 97 - } 98 - n.NotifyAll() 99 - return nil 100 - } 101 - 102 - func (db *DB) MarkPipelineTimeout(rkey string, n *notifier.Notifier) error { 103 - _, err := db.Exec(` 104 - update pipeline_status 105 - set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 106 - where rkey = ? 107 - `, PipelineTimeout, rkey) 108 - if err != nil { 109 - return err 110 - } 111 - n.NotifyAll() 112 - return nil 113 - } 114 - 115 - func (db *DB) MarkPipelineSuccess(rkey string, n *notifier.Notifier) error { 116 - _, err := db.Exec(` 117 - update pipeline_status 118 - set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 119 - finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 120 - where rkey = ? 121 - `, PipelineSuccess, rkey) 122 - 123 - if err != nil { 124 - return err 125 - } 126 - n.NotifyAll() 127 - return nil 128 - } 129 - 130 - func (db *DB) GetPipelineStatus(rkey string) (PipelineStatus, error) { 131 - var p PipelineStatus 132 - err := db.QueryRow(` 133 - select rkey, status, error, exit_code, started_at, updated_at, finished_at 134 - from pipelines 135 - where rkey = ? 136 - `, rkey).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 137 - return p, err 138 - } 139 - 140 - func (db *DB) GetPipelineStatusAsRecords(cursor string) ([]PipelineStatus, error) { 141 - whereClause := "" 142 - args := []any{} 143 - if cursor != "" { 144 - whereClause = "where rkey > ?" 145 - args = append(args, cursor) 146 - } 147 - 148 - query := fmt.Sprintf(` 149 - select rkey, status, error, exit_code, started_at, updated_at, finished_at 150 - from pipeline_status 151 - %s 152 - order by rkey asc 153 - limit 100 154 - `, whereClause) 155 - 156 - rows, err := db.Query(query, args...) 157 - if err != nil { 158 - return nil, err 159 - } 160 - defer rows.Close() 161 - 162 - var pipelines []PipelineStatus 163 - for rows.Next() { 164 - var p PipelineStatus 165 - rows.Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 166 - pipelines = append(pipelines, p) 167 - } 168 - 169 - if err := rows.Err(); err != nil { 170 - return nil, err 171 - } 172 - 173 - records := []*tangled.PipelineStatus{} 174 - for _, p := range pipelines { 175 - records = append(records, p.AsRecord()) 176 - } 177 - 178 - return pipelines, nil 179 - } 3 + // 4 + // import ( 5 + // "database/sql" 6 + // "fmt" 7 + // "time" 8 + // 9 + // "tangled.sh/tangled.sh/core/api/tangled" 10 + // "tangled.sh/tangled.sh/core/notifier" 11 + // ) 12 + // 13 + // type PipelineRunStatus string 14 + // 15 + // var ( 16 + // PipelinePending PipelineRunStatus = "pending" 17 + // PipelineRunning PipelineRunStatus = "running" 18 + // PipelineFailed PipelineRunStatus = "failed" 19 + // PipelineTimeout PipelineRunStatus = "timeout" 20 + // PipelineCancelled PipelineRunStatus = "cancelled" 21 + // PipelineSuccess PipelineRunStatus = "success" 22 + // ) 23 + // 24 + // type PipelineStatus struct { 25 + // Rkey string `json:"rkey"` 26 + // Pipeline string `json:"pipeline"` 27 + // Status PipelineRunStatus `json:"status"` 28 + // 29 + // // only if Failed 30 + // Error string `json:"error"` 31 + // ExitCode int `json:"exit_code"` 32 + // 33 + // LastUpdate int64 `json:"last_update"` 34 + // StartedAt time.Time `json:"started_at"` 35 + // UpdatedAt time.Time `json:"updated_at"` 36 + // FinishedAt time.Time `json:"finished_at"` 37 + // } 38 + // 39 + // func (p PipelineStatus) AsRecord() *tangled.PipelineStatus { 40 + // exitCode64 := int64(p.ExitCode) 41 + // finishedAt := p.FinishedAt.String() 42 + // 43 + // return &tangled.PipelineStatus{ 44 + // LexiconTypeID: tangled.PipelineStatusNSID, 45 + // Pipeline: p.Pipeline, 46 + // Status: string(p.Status), 47 + // 48 + // ExitCode: &exitCode64, 49 + // Error: &p.Error, 50 + // 51 + // StartedAt: p.StartedAt.String(), 52 + // UpdatedAt: p.UpdatedAt.String(), 53 + // FinishedAt: &finishedAt, 54 + // } 55 + // } 56 + // 57 + // func pipelineAtUri(rkey, knot string) string { 58 + // return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey) 59 + // } 60 + // 61 + // func (db *DB) CreatePipeline(rkey, pipeline string, n *notifier.Notifier) error { 62 + // _, err := db.Exec(` 63 + // insert into pipeline_status (rkey, status, pipeline, last_update) 64 + // values (?, ?, ?, ?) 65 + // `, rkey, PipelinePending, pipeline, time.Now().UnixNano()) 66 + // 67 + // if err != nil { 68 + // return err 69 + // } 70 + // n.NotifyAll() 71 + // return nil 72 + // } 73 + // 74 + // func (db *DB) MarkPipelineRunning(rkey string, n *notifier.Notifier) error { 75 + // _, err := db.Exec(` 76 + // update pipeline_status 77 + // set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), last_update = ? 78 + // where rkey = ? 79 + // `, PipelineRunning, rkey, time.Now().UnixNano()) 80 + // 81 + // if err != nil { 82 + // return err 83 + // } 84 + // n.NotifyAll() 85 + // return nil 86 + // } 87 + // 88 + // func (db *DB) MarkPipelineFailed(rkey string, exitCode int, errorMsg string, n *notifier.Notifier) error { 89 + // _, err := db.Exec(` 90 + // update pipeline_status 91 + // set status = ?, 92 + // exit_code = ?, 93 + // error = ?, 94 + // updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 95 + // finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 96 + // last_update = ? 97 + // where rkey = ? 98 + // `, PipelineFailed, exitCode, errorMsg, rkey, time.Now().UnixNano()) 99 + // if err != nil { 100 + // return err 101 + // } 102 + // n.NotifyAll() 103 + // return nil 104 + // } 105 + // 106 + // func (db *DB) MarkPipelineTimeout(rkey string, n *notifier.Notifier) error { 107 + // _, err := db.Exec(` 108 + // update pipeline_status 109 + // set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 110 + // where rkey = ? 111 + // `, PipelineTimeout, rkey) 112 + // if err != nil { 113 + // return err 114 + // } 115 + // n.NotifyAll() 116 + // return nil 117 + // } 118 + // 119 + // func (db *DB) MarkPipelineSuccess(rkey string, n *notifier.Notifier) error { 120 + // _, err := db.Exec(` 121 + // update pipeline_status 122 + // set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 123 + // finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 124 + // where rkey = ? 125 + // `, PipelineSuccess, rkey) 126 + // 127 + // if err != nil { 128 + // return err 129 + // } 130 + // n.NotifyAll() 131 + // return nil 132 + // } 133 + // 134 + // func (db *DB) GetPipelineStatus(rkey string) (PipelineStatus, error) { 135 + // var p PipelineStatus 136 + // err := db.QueryRow(` 137 + // select rkey, status, error, exit_code, started_at, updated_at, finished_at 138 + // from pipelines 139 + // where rkey = ? 140 + // `, rkey).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 141 + // return p, err 142 + // } 143 + // 144 + // func (db *DB) GetPipelineStatusAsRecords(cursor int64) ([]PipelineStatus, error) { 145 + // whereClause := "" 146 + // args := []any{} 147 + // if cursor != 0 { 148 + // whereClause = "where created_at > ?" 149 + // args = append(args, cursor) 150 + // } 151 + // 152 + // query := fmt.Sprintf(` 153 + // select rkey, status, error, exit_code, created_at, started_at, updated_at, finished_at 154 + // from pipeline_status 155 + // %s 156 + // order by created_at asc 157 + // limit 100 158 + // `, whereClause) 159 + // 160 + // rows, err := db.Query(query, args...) 161 + // if err != nil { 162 + // return nil, err 163 + // } 164 + // defer rows.Close() 165 + // 166 + // var pipelines []PipelineStatus 167 + // for rows.Next() { 168 + // var p PipelineStatus 169 + // var pipelineError sql.NullString 170 + // var exitCode sql.NullInt64 171 + // var startedAt, updatedAt string 172 + // var finishedAt sql.NullTime 173 + // 174 + // err := rows.Scan(&p.Rkey, &p.Status, &pipelineError, &exitCode, &p.LastUpdate, &startedAt, &updatedAt, &finishedAt) 175 + // if err != nil { 176 + // return nil, err 177 + // } 178 + // 179 + // if pipelineError.Valid { 180 + // p.Error = pipelineError.String 181 + // } 182 + // 183 + // if exitCode.Valid { 184 + // p.ExitCode = int(exitCode.Int64) 185 + // } 186 + // 187 + // if v, err := time.Parse(time.RFC3339, startedAt); err == nil { 188 + // p.StartedAt = v 189 + // } 190 + // 191 + // if v, err := time.Parse(time.RFC3339, updatedAt); err == nil { 192 + // p.UpdatedAt = v 193 + // } 194 + // 195 + // if finishedAt.Valid { 196 + // p.FinishedAt = finishedAt.Time 197 + // } 198 + // 199 + // pipelines = append(pipelines, p) 200 + // } 201 + // 202 + // if err := rows.Err(); err != nil { 203 + // return nil, err 204 + // } 205 + // 206 + // return pipelines, nil 207 + // }
+83 -73
spindle/engine/engine.go
··· 10 10 "path" 11 11 "strings" 12 12 "sync" 13 - "syscall" 14 13 15 14 "github.com/docker/docker/api/types/container" 16 15 "github.com/docker/docker/api/types/image" ··· 19 18 "github.com/docker/docker/api/types/volume" 20 19 "github.com/docker/docker/client" 21 20 "github.com/docker/docker/pkg/stdcopy" 22 - "golang.org/x/sync/errgroup" 23 21 "tangled.sh/tangled.sh/core/api/tangled" 24 22 "tangled.sh/tangled.sh/core/log" 25 23 "tangled.sh/tangled.sh/core/notifier" 26 24 "tangled.sh/tangled.sh/core/spindle/db" 25 + "tangled.sh/tangled.sh/core/spindle/models" 27 26 ) 28 27 29 28 const ( ··· 69 68 return e, nil 70 69 } 71 70 72 - func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 73 - e.l.Info("starting all workflows in parallel", "pipeline", id) 71 + func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) { 72 + e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 74 73 75 - err := e.db.MarkPipelineRunning(id, e.n) 76 - if err != nil { 77 - return err 78 - } 74 + wg := sync.WaitGroup{} 75 + for _, w := range pipeline.Workflows { 76 + wg.Add(1) 77 + go func() error { 78 + defer wg.Done() 79 + wid := models.WorkflowId{ 80 + PipelineId: pipelineId, 81 + Name: w.Name, 82 + } 79 83 80 - g := errgroup.Group{} 81 - for _, w := range pipeline.Workflows { 82 - g.Go(func() error { 83 - err := e.SetupWorkflow(ctx, id, w.Name) 84 + err := e.db.StatusRunning(wid, e.n) 84 85 if err != nil { 85 86 return err 86 87 } 87 88 88 - defer e.DestroyWorkflow(ctx, id, w.Name) 89 + err = e.SetupWorkflow(ctx, wid) 90 + if err != nil { 91 + e.l.Error("setting up worklow", "wid", wid, "err", err) 92 + return err 93 + } 94 + defer e.DestroyWorkflow(ctx, wid) 89 95 90 96 // TODO: actual checks for image/registry etc. 91 97 var deps string ··· 101 107 cimg := path.Join("nixery.dev", deps) 102 108 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 103 109 if err != nil { 104 - e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 105 - err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 110 + e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 111 + 112 + err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 106 113 if err != nil { 107 114 return err 108 115 } 116 + 109 117 return fmt.Errorf("pulling image: %w", err) 110 118 } 111 119 defer reader.Close() 112 120 io.Copy(os.Stdout, reader) 113 121 114 - err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg) 122 + err = e.StartSteps(ctx, w.Steps, wid, cimg) 115 123 if err != nil { 116 - e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 117 - return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 124 + e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 125 + 126 + err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 127 + if err != nil { 128 + return err 129 + } 130 + } 131 + 132 + err = e.db.StatusSuccess(wid, e.n) 133 + if err != nil { 134 + return err 118 135 } 119 136 120 137 return nil 121 - }) 138 + }() 122 139 } 123 140 124 - err = g.Wait() 125 - if err != nil { 126 - e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 127 - return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 128 - } 129 - 130 - e.l.Info("pipeline success!", "id", id) 131 - return e.db.MarkPipelineSuccess(id, e.n) 141 + wg.Wait() 132 142 } 133 143 134 144 // SetupWorkflow sets up a new network for the workflow and volumes for 135 145 // the workspace and Nix store. These are persisted across steps and are 136 146 // destroyed at the end of the workflow. 137 - func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error { 138 - e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName) 147 + func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 148 + e.l.Info("setting up workflow", "workflow", wid) 139 149 140 150 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 141 - Name: workspaceVolume(id, workflowName), 151 + Name: workspaceVolume(wid), 142 152 Driver: "local", 143 153 }) 144 154 if err != nil { 145 155 return err 146 156 } 147 - e.registerCleanup(id, workflowName, func(ctx context.Context) error { 148 - return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true) 157 + e.registerCleanup(wid, func(ctx context.Context) error { 158 + return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 149 159 }) 150 160 151 161 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 152 - Name: nixVolume(id, workflowName), 162 + Name: nixVolume(wid), 153 163 Driver: "local", 154 164 }) 155 165 if err != nil { 156 166 return err 157 167 } 158 - e.registerCleanup(id, workflowName, func(ctx context.Context) error { 159 - return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true) 168 + e.registerCleanup(wid, func(ctx context.Context) error { 169 + return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 160 170 }) 161 171 162 - _, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{ 172 + _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 163 173 Driver: "bridge", 164 174 }) 165 175 if err != nil { 166 176 return err 167 177 } 168 - e.registerCleanup(id, workflowName, func(ctx context.Context) error { 169 - return e.docker.NetworkRemove(ctx, networkName(id, workflowName)) 178 + e.registerCleanup(wid, func(ctx context.Context) error { 179 + return e.docker.NetworkRemove(ctx, networkName(wid)) 170 180 }) 171 181 172 182 return nil ··· 175 185 // StartSteps starts all steps sequentially with the same base image. 176 186 // ONLY marks pipeline as failed if container's exit code is non-zero. 177 187 // All other errors are bubbled up. 178 - func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error { 188 + func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error { 179 189 // set up logging channels 180 190 e.chanMu.Lock() 181 - if _, exists := e.stdoutChans[id]; !exists { 182 - e.stdoutChans[id] = make(chan string, 100) 191 + if _, exists := e.stdoutChans[wid.String()]; !exists { 192 + e.stdoutChans[wid.String()] = make(chan string, 100) 183 193 } 184 - if _, exists := e.stderrChans[id]; !exists { 185 - e.stderrChans[id] = make(chan string, 100) 194 + if _, exists := e.stderrChans[wid.String()]; !exists { 195 + e.stderrChans[wid.String()] = make(chan string, 100) 186 196 } 187 197 e.chanMu.Unlock() 188 198 189 199 // close channels after all steps are complete 190 200 defer func() { 191 - close(e.stdoutChans[id]) 192 - close(e.stderrChans[id]) 201 + close(e.stdoutChans[wid.String()]) 202 + close(e.stderrChans[wid.String()]) 193 203 }() 194 204 195 205 for _, step := range steps { 196 - hostConfig := hostConfig(id, workflowName) 206 + hostConfig := hostConfig(wid) 197 207 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 198 208 Image: image, 199 209 Cmd: []string{"bash", "-c", step.Command}, ··· 206 216 return fmt.Errorf("creating container: %w", err) 207 217 } 208 218 209 - err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil) 219 + err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 210 220 if err != nil { 211 221 return fmt.Errorf("connecting network: %w", err) 212 222 } ··· 222 232 wg.Add(1) 223 233 go func() { 224 234 defer wg.Done() 225 - err := e.TailStep(ctx, resp.ID, id) 235 + err := e.TailStep(ctx, resp.ID, wid) 226 236 if err != nil { 227 237 e.l.Error("failed to tail container", "container", resp.ID) 228 238 return ··· 237 247 return err 238 248 } 239 249 240 - err = e.DestroyStep(ctx, resp.ID, id) 250 + err = e.DestroyStep(ctx, resp.ID) 241 251 if err != nil { 242 252 return err 243 253 } 244 254 245 255 if state.ExitCode != 0 { 246 - e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) 247 - return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 256 + e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode) 257 + // return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 248 258 } 249 259 } 250 260 ··· 272 282 return info.State, nil 273 283 } 274 284 275 - func (e *Engine) TailStep(ctx context.Context, containerID, pipelineID string) error { 285 + func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error { 276 286 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 277 287 Follow: true, 278 288 ShowStdout: true, ··· 308 318 // once all steps are done. 309 319 go func() { 310 320 e.chanMu.RLock() 311 - stdoutCh := e.stdoutChans[pipelineID] 321 + stdoutCh := e.stdoutChans[wid.String()] 312 322 e.chanMu.RUnlock() 313 323 314 324 scanner := bufio.NewScanner(rpipeOut) ··· 325 335 // once all steps are done. 326 336 go func() { 327 337 e.chanMu.RLock() 328 - stderrCh := e.stderrChans[pipelineID] 338 + stderrCh := e.stderrChans[wid.String()] 329 339 e.chanMu.RUnlock() 330 340 331 341 scanner := bufio.NewScanner(rpipeErr) ··· 340 350 return nil 341 351 } 342 352 343 - func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error { 344 - err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String()) 353 + func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 354 + err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 345 355 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 346 356 return err 347 357 } ··· 357 367 return nil 358 368 } 359 369 360 - func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error { 370 + func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 361 371 e.cleanupMu.Lock() 362 - key := fmt.Sprintf("%s-%s", pipelineID, workflowName) 372 + key := wid.String() 363 373 364 374 fns := e.cleanup[key] 365 375 delete(e.cleanup, key) ··· 367 377 368 378 for _, fn := range fns { 369 379 if err := fn(ctx); err != nil { 370 - e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err) 380 + e.l.Error("failed to cleanup workflow resource", "workflowId", wid) 371 381 } 372 382 } 373 383 return nil 374 384 } 375 385 376 - func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) { 386 + func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) { 377 387 e.chanMu.RLock() 378 388 defer e.chanMu.RUnlock() 379 389 380 - stdoutCh, ok1 := e.stdoutChans[pipelineID] 381 - stderrCh, ok2 := e.stderrChans[pipelineID] 390 + stdoutCh, ok1 := e.stdoutChans[wid.String()] 391 + stderrCh, ok2 := e.stderrChans[wid.String()] 382 392 383 393 if !ok1 || !ok2 { 384 394 return nil, nil, false ··· 386 396 return stdoutCh, stderrCh, true 387 397 } 388 398 389 - func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) { 399 + func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 390 400 e.cleanupMu.Lock() 391 401 defer e.cleanupMu.Unlock() 392 402 393 - key := fmt.Sprintf("%s-%s", pipelineID, workflowName) 403 + key := wid.String() 394 404 e.cleanup[key] = append(e.cleanup[key], fn) 395 405 } 396 406 397 - func workspaceVolume(id, name string) string { 398 - return fmt.Sprintf("workspace-%s-%s", id, name) 407 + func workspaceVolume(wid models.WorkflowId) string { 408 + return fmt.Sprintf("workspace-%s", wid) 399 409 } 400 410 401 - func nixVolume(id, name string) string { 402 - return fmt.Sprintf("nix-%s-%s", id, name) 411 + func nixVolume(wid models.WorkflowId) string { 412 + return fmt.Sprintf("nix-%s", wid) 403 413 } 404 414 405 - func networkName(id, name string) string { 406 - return fmt.Sprintf("workflow-network-%s-%s", id, name) 415 + func networkName(wid models.WorkflowId) string { 416 + return fmt.Sprintf("workflow-network-%s", wid) 407 417 } 408 418 409 - func hostConfig(id, name string) *container.HostConfig { 419 + func hostConfig(wid models.WorkflowId) *container.HostConfig { 410 420 hostConfig := &container.HostConfig{ 411 421 Mounts: []mount.Mount{ 412 422 { 413 423 Type: mount.TypeVolume, 414 - Source: workspaceVolume(id, name), 424 + Source: workspaceVolume(wid), 415 425 Target: workspaceDir, 416 426 }, 417 427 { 418 428 Type: mount.TypeVolume, 419 - Source: nixVolume(id, name), 429 + Source: nixVolume(wid), 420 430 Target: "/nix", 421 431 }, 422 432 },
+37
spindle/models/models.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + "regexp" 6 + 7 + "tangled.sh/tangled.sh/core/api/tangled" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + ) 11 + 12 + var ( 13 + re = regexp.MustCompile(`[^a-zA-Z0-9_.-]`) 14 + ) 15 + 16 + type PipelineId struct { 17 + Knot string 18 + Rkey string 19 + } 20 + 21 + func (p *PipelineId) AtUri() syntax.ATURI { 22 + return syntax.ATURI(fmt.Sprintf("at://did:web:%s/%s/%s", p.Knot, tangled.PipelineNSID, p.Rkey)) 23 + } 24 + 25 + type WorkflowId struct { 26 + PipelineId 27 + Name string 28 + } 29 + 30 + func (wid WorkflowId) String() string { 31 + return fmt.Sprintf("%s-%s-%s", normalize(wid.Knot), wid.Rkey, normalize(wid.Name)) 32 + } 33 + 34 + func normalize(name string) string { 35 + normalized := re.ReplaceAllString(name, "-") 36 + return normalized 37 + }
+29 -11
spindle/queue/queue.go
··· 1 1 package queue 2 2 3 + import ( 4 + "sync" 5 + ) 6 + 3 7 type Job struct { 4 8 Run func() error 5 9 OnFail func(error) 6 10 } 7 11 8 12 type Queue struct { 9 - jobs chan Job 13 + jobs chan Job 14 + workers int 15 + wg sync.WaitGroup 10 16 } 11 17 12 - func NewQueue(size int) *Queue { 18 + func NewQueue(queueSize, numWorkers int) *Queue { 13 19 return &Queue{ 14 - jobs: make(chan Job, size), 20 + jobs: make(chan Job, queueSize), 21 + workers: numWorkers, 15 22 } 16 23 } 17 24 ··· 24 31 } 25 32 } 26 33 27 - func (q *Queue) StartRunner() { 28 - go func() { 29 - for job := range q.jobs { 30 - if err := job.Run(); err != nil { 31 - if job.OnFail != nil { 32 - job.OnFail(err) 33 - } 34 + func (q *Queue) Start() { 35 + for range q.workers { 36 + q.wg.Add(1) 37 + go q.worker() 38 + } 39 + } 40 + 41 + func (q *Queue) worker() { 42 + defer q.wg.Done() 43 + for job := range q.jobs { 44 + if err := job.Run(); err != nil { 45 + if job.OnFail != nil { 46 + job.OnFail(err) 34 47 } 35 48 } 36 - }() 49 + } 50 + } 51 + 52 + func (q *Queue) Stop() { 53 + close(q.jobs) 54 + q.wg.Wait() 37 55 }
+23 -13
spindle/server.go
··· 18 18 "tangled.sh/tangled.sh/core/spindle/config" 19 19 "tangled.sh/tangled.sh/core/spindle/db" 20 20 "tangled.sh/tangled.sh/core/spindle/engine" 21 + "tangled.sh/tangled.sh/core/spindle/models" 21 22 "tangled.sh/tangled.sh/core/spindle/queue" 22 23 ) 23 24 ··· 61 62 return err 62 63 } 63 64 64 - jq := queue.NewQueue(100) 65 + jq := queue.NewQueue(100, 2) 65 66 66 67 // starts a job queue runner in the background 67 - jq.StartRunner() 68 + jq.Start() 69 + defer jq.Stop() 68 70 69 71 spindle := Spindle{ 70 72 jc: jc, ··· 109 111 mux := chi.NewRouter() 110 112 111 113 mux.HandleFunc("/events", s.Events) 112 - mux.HandleFunc("/logs/{pipelineID}", s.Logs) 114 + mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 113 115 return mux 114 116 } 115 117 ··· 122 124 return err 123 125 } 124 126 125 - ok := s.jq.Enqueue(queue.Job{ 126 - Run: func() error { 127 - // this is a "fake" at uri for now 128 - pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 129 - 130 - rkey := TID() 127 + pipelineId := models.PipelineId{ 128 + Knot: src.Knot, 129 + Rkey: msg.Rkey, 130 + } 131 131 132 - err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n) 132 + for _, w := range pipeline.Workflows { 133 + if w != nil { 134 + err := s.db.StatusPending(models.WorkflowId{ 135 + PipelineId: pipelineId, 136 + Name: w.Name, 137 + }, s.n) 133 138 if err != nil { 134 139 return err 135 140 } 141 + } 142 + } 136 143 137 - return s.eng.StartWorkflows(ctx, &pipeline, rkey) 144 + ok := s.jq.Enqueue(queue.Job{ 145 + Run: func() error { 146 + s.eng.StartWorkflows(ctx, &pipeline, pipelineId) 147 + return nil 138 148 }, 139 - OnFail: func(error) { 140 - s.l.Error("pipeline run failed", "error", err) 149 + OnFail: func(jobError error) { 150 + s.l.Error("pipeline run failed", "error", jobError) 141 151 }, 142 152 }) 143 153 if ok {
+54 -23
spindle/stream.go
··· 1 1 package spindle 2 2 3 3 import ( 4 + "context" 4 5 "fmt" 5 6 "net/http" 7 + "strconv" 6 8 "time" 7 9 8 - "context" 10 + "tangled.sh/tangled.sh/core/spindle/models" 9 11 10 12 "github.com/go-chi/chi/v5" 11 13 "github.com/gorilla/websocket" ··· 18 20 19 21 func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 20 22 l := s.l.With("handler", "Events") 21 - l.Info("received new connection") 23 + l.Debug("received new connection") 22 24 23 25 conn, err := upgrader.Upgrade(w, r, nil) 24 26 if err != nil { ··· 27 29 return 28 30 } 29 31 defer conn.Close() 30 - l.Info("upgraded http to wss") 32 + l.Debug("upgraded http to wss") 31 33 32 34 ch := s.n.Subscribe() 33 35 defer s.n.Unsubscribe(ch) ··· 44 46 } 45 47 }() 46 48 47 - cursor := "" 49 + defaultCursor := time.Now().UnixNano() 50 + cursorStr := r.URL.Query().Get("cursor") 51 + cursor, err := strconv.ParseInt(cursorStr, 10, 64) 52 + if err != nil { 53 + l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 54 + } 55 + if cursor == 0 { 56 + cursor = defaultCursor 57 + } 48 58 49 59 // complete backfill first before going to live data 50 - l.Info("going through backfill", "cursor", cursor) 60 + l.Debug("going through backfill", "cursor", cursor) 51 61 if err := s.streamPipelines(conn, &cursor); err != nil { 52 62 l.Error("failed to backfill", "err", err) 53 63 return ··· 57 67 // wait for new data or timeout 58 68 select { 59 69 case <-ctx.Done(): 60 - l.Info("stopping stream: client closed connection") 70 + l.Debug("stopping stream: client closed connection") 61 71 return 62 72 case <-ch: 63 73 // we have been notified of new data 64 - l.Info("going through live data", "cursor", cursor) 74 + l.Debug("going through live data", "cursor", cursor) 65 75 if err := s.streamPipelines(conn, &cursor); err != nil { 66 76 l.Error("failed to stream", "err", err) 67 77 return 68 78 } 69 79 case <-time.After(30 * time.Second): 70 80 // send a keep-alive 71 - l.Info("sent keepalive") 81 + l.Debug("sent keepalive") 72 82 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 73 83 l.Error("failed to write control", "err", err) 74 84 } ··· 79 89 func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 80 90 l := s.l.With("handler", "Logs") 81 91 82 - pipelineID := chi.URLParam(r, "pipelineID") 83 - if pipelineID == "" { 84 - http.Error(w, "pipelineID required", http.StatusBadRequest) 92 + knot := chi.URLParam(r, "knot") 93 + if knot == "" { 94 + http.Error(w, "knot required", http.StatusBadRequest) 95 + return 96 + } 97 + 98 + rkey := chi.URLParam(r, "rkey") 99 + if rkey == "" { 100 + http.Error(w, "rkey required", http.StatusBadRequest) 101 + return 102 + } 103 + 104 + name := chi.URLParam(r, "name") 105 + if name == "" { 106 + http.Error(w, "name required", http.StatusBadRequest) 85 107 return 86 108 } 87 - l = l.With("pipelineID", pipelineID) 109 + 110 + wid := models.WorkflowId{ 111 + PipelineId: models.PipelineId{ 112 + Knot: knot, 113 + Rkey: rkey, 114 + }, 115 + Name: name, 116 + } 117 + 118 + l = l.With("knot", knot, "rkey", rkey, "name", name) 88 119 89 120 conn, err := upgrader.Upgrade(w, r, nil) 90 121 if err != nil { ··· 93 124 return 94 125 } 95 126 defer conn.Close() 96 - l.Info("upgraded http to wss") 127 + l.Debug("upgraded http to wss") 97 128 98 129 ctx, cancel := context.WithCancel(r.Context()) 99 130 defer cancel() ··· 101 132 go func() { 102 133 for { 103 134 if _, _, err := conn.NextReader(); err != nil { 104 - l.Info("client disconnected", "err", err) 135 + l.Debug("client disconnected", "err", err) 105 136 cancel() 106 137 return 107 138 } 108 139 } 109 140 }() 110 141 111 - if err := s.streamLogs(ctx, conn, pipelineID); err != nil { 142 + if err := s.streamLogs(ctx, conn, wid); err != nil { 112 143 l.Error("streamLogs failed", "err", err) 113 144 } 114 - l.Info("logs connection closed") 145 + l.Debug("logs connection closed") 115 146 } 116 147 117 - func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, pipelineID string) error { 118 - l := s.l.With("pipelineID", pipelineID) 148 + func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 149 + l := s.l.With("workflow_id", wid.String()) 119 150 120 - stdoutCh, stderrCh, ok := s.eng.LogChannels(pipelineID) 151 + stdoutCh, stderrCh, ok := s.eng.LogChannels(wid) 121 152 if !ok { 122 - return fmt.Errorf("pipelineID %q not found", pipelineID) 153 + return fmt.Errorf("workflow_id %q not found", wid.String()) 123 154 } 124 155 125 156 done := make(chan struct{}) ··· 174 205 return nil 175 206 } 176 207 177 - func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error { 178 - ops, err := s.db.GetPipelineStatusAsRecords(*cursor) 208 + func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 209 + ops, err := s.db.GetEvents(*cursor) 179 210 if err != nil { 180 211 s.l.Debug("err", "err", err) 181 212 return err ··· 187 218 s.l.Debug("err", "err", err) 188 219 return err 189 220 } 190 - *cursor = op.Rkey 221 + *cursor = op.Created 191 222 } 192 223 193 224 return nil
+1 -1
spindle/tid.go tid/tid.go
··· 1 - package spindle 1 + package tid 2 2 3 3 import "github.com/bluesky-social/indigo/atproto/syntax" 4 4