Monorepo for Tangled tangled.org

spindle/{db,engine}: rework db to use rkey

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi d643cad8 55bf8204

verified
Changed files
+86 -63
spindle
+1
spindle/config/config.go
··· 11 11 DBPath string `env:"DB_PATH, default=spindle.db"` 12 12 Hostname string `env:"HOSTNAME, required"` 13 13 JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 14 + Dev bool `env:"DEV, default=false"` 14 15 } 15 16 16 17 type Config struct {
+4 -3
spindle/db/db.go
··· 30 30 did text primary key 31 31 ); 32 32 33 - create table if not exists pipelines ( 34 - at_uri text not null, 33 + create table if not exists pipeline_status ( 34 + rkey text not null, 35 + pipeline text not null, 35 36 status text not null, 36 37 37 38 -- only set if status is 'failed' ··· 42 43 updated_at timestamp not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 43 44 finished_at timestamp, 44 45 45 - primary key (at_uri) 46 + primary key (rkey) 46 47 ); 47 48 `) 48 49 if err != nil {
+48 -42
spindle/db/pipelines.go
··· 8 8 "tangled.sh/tangled.sh/core/knotserver/notifier" 9 9 ) 10 10 11 - type PipelineStatus string 11 + type PipelineRunStatus string 12 12 13 13 var ( 14 - PipelinePending PipelineStatus = "pending" 15 - PipelineRunning PipelineStatus = "running" 16 - PipelineFailed PipelineStatus = "failed" 17 - PipelineTimeout PipelineStatus = "timeout" 18 - PipelineCancelled PipelineStatus = "cancelled" 19 - PipelineSuccess PipelineStatus = "success" 14 + PipelinePending PipelineRunStatus = "pending" 15 + PipelineRunning PipelineRunStatus = "running" 16 + PipelineFailed PipelineRunStatus = "failed" 17 + PipelineTimeout PipelineRunStatus = "timeout" 18 + PipelineCancelled PipelineRunStatus = "cancelled" 19 + PipelineSuccess PipelineRunStatus = "success" 20 20 ) 21 21 22 - type Pipeline struct { 23 - Rkey string `json:"rkey"` 24 - Knot string `json:"knot"` 25 - Status PipelineStatus `json:"status"` 22 + type PipelineStatus struct { 23 + Rkey string `json:"rkey"` 24 + Pipeline string `json:"pipeline"` 25 + Status PipelineRunStatus `json:"status"` 26 26 27 27 // only if Failed 28 28 Error string `json:"error"` ··· 33 33 FinishedAt time.Time `json:"finished_at"` 34 34 } 35 35 36 - func (p Pipeline) AsRecord() *tangled.PipelineStatus { 36 + func (p PipelineStatus) AsRecord() *tangled.PipelineStatus { 37 37 exitCode64 := int64(p.ExitCode) 38 38 finishedAt := p.FinishedAt.String() 39 39 40 40 return &tangled.PipelineStatus{ 41 - Pipeline: fmt.Sprintf("at://%s/%s", p.Knot, p.Rkey), 42 - Status: string(p.Status), 41 + LexiconTypeID: tangled.PipelineStatusNSID, 42 + Pipeline: p.Pipeline, 43 + Status: string(p.Status), 43 44 44 45 ExitCode: &exitCode64, 45 46 Error: &p.Error, ··· 54 55 return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey) 55 56 } 56 57 57 - func (db *DB) CreatePipeline(rkey, knot string, n *notifier.Notifier) error { 58 + func (db *DB) CreatePipeline(rkey, pipeline string, n *notifier.Notifier) error { 58 59 _, err := db.Exec(` 59 - insert into pipelines (at_uri, status) 60 - values (?, ?) 61 - `, pipelineAtUri(rkey, knot), PipelinePending) 60 + insert into pipeline_status (rkey, status, pipeline) 61 + values (?, ?, ?) 62 + `, rkey, PipelinePending, pipeline) 62 63 63 64 if err != nil { 64 65 return err ··· 67 68 return nil 68 69 } 69 70 70 - func (db *DB) MarkPipelineRunning(rkey, knot string, n *notifier.Notifier) error { 71 + func (db *DB) MarkPipelineRunning(rkey string, n *notifier.Notifier) error { 71 72 _, err := db.Exec(` 72 - update pipelines 73 + update pipeline_status 73 74 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 74 - where at_uri = ? 75 - `, PipelineRunning, pipelineAtUri(rkey, knot)) 75 + where rkey = ? 76 + `, PipelineRunning, rkey) 76 77 77 78 if err != nil { 78 79 return err ··· 81 82 return nil 82 83 } 83 84 84 - func (db *DB) MarkPipelineFailed(rkey, knot string, exitCode int, errorMsg string, n *notifier.Notifier) error { 85 + func (db *DB) MarkPipelineFailed(rkey string, exitCode int, errorMsg string, n *notifier.Notifier) error { 85 86 _, err := db.Exec(` 86 - update pipelines 87 + update pipeline_status 87 88 set status = ?, 88 89 exit_code = ?, 89 90 error = ?, 90 91 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 91 92 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 92 - where at_uri = ? 93 - `, PipelineFailed, exitCode, errorMsg, pipelineAtUri(rkey, knot)) 93 + where rkey = ? 94 + `, PipelineFailed, exitCode, errorMsg, rkey) 94 95 if err != nil { 95 96 return err 96 97 } ··· 98 99 return nil 99 100 } 100 101 101 - func (db *DB) MarkPipelineTimeout(rkey, knot string, n *notifier.Notifier) error { 102 + func (db *DB) MarkPipelineTimeout(rkey string, n *notifier.Notifier) error { 102 103 _, err := db.Exec(` 103 - update pipelines 104 + update pipeline_status 104 105 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 105 - where at_uri = ? 106 - `, PipelineTimeout, pipelineAtUri(rkey, knot)) 106 + where rkey = ? 107 + `, PipelineTimeout, rkey) 107 108 if err != nil { 108 109 return err 109 110 } ··· 111 112 return nil 112 113 } 113 114 114 - func (db *DB) MarkPipelineSuccess(rkey, knot string, n *notifier.Notifier) error { 115 + func (db *DB) MarkPipelineSuccess(rkey string, n *notifier.Notifier) error { 115 116 _, err := db.Exec(` 116 - update pipelines 117 + update pipeline_status 117 118 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 118 119 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 119 - where at_uri = ? 120 - `, PipelineSuccess, pipelineAtUri(rkey, knot)) 120 + where rkey = ? 121 + `, PipelineSuccess, rkey) 121 122 122 123 if err != nil { 123 124 return err ··· 126 127 return nil 127 128 } 128 129 129 - func (db *DB) GetPipeline(rkey, knot string) (Pipeline, error) { 130 - var p Pipeline 130 + func (db *DB) GetPipelineStatus(rkey string) (PipelineStatus, error) { 131 + var p PipelineStatus 131 132 err := db.QueryRow(` 132 133 select rkey, status, error, exit_code, started_at, updated_at, finished_at 133 134 from pipelines 134 - where at_uri = ? 135 - `, pipelineAtUri(rkey, knot)).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 135 + where rkey = ? 136 + `, rkey).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 136 137 return p, err 137 138 } 138 139 139 - func (db *DB) GetPipelines(cursor string) ([]Pipeline, error) { 140 + func (db *DB) GetPipelineStatusAsRecords(cursor string) ([]PipelineStatus, error) { 140 141 whereClause := "" 141 142 args := []any{} 142 143 if cursor != "" { ··· 146 147 147 148 query := fmt.Sprintf(` 148 149 select rkey, status, error, exit_code, started_at, updated_at, finished_at 149 - from pipelines 150 + from pipeline_status 150 151 %s 151 152 order by rkey asc 152 153 limit 100 ··· 158 159 } 159 160 defer rows.Close() 160 161 161 - var pipelines []Pipeline 162 + var pipelines []PipelineStatus 162 163 for rows.Next() { 163 - var p Pipeline 164 + var p PipelineStatus 164 165 rows.Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 165 166 pipelines = append(pipelines, p) 166 167 } 167 168 168 169 if err := rows.Err(); err != nil { 169 170 return nil, err 171 + } 172 + 173 + records := []*tangled.PipelineStatus{} 174 + for _, p := range pipelines { 175 + records = append(records, p.AsRecord()) 170 176 } 171 177 172 178 return pipelines, nil
+2 -2
spindle/engine/engine.go
··· 47 47 48 48 // SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. 49 49 // in the future. In here also goes other setup steps. 50 - func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 50 + func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error { 51 51 e.l.Info("setting up pipeline", "pipeline", id) 52 52 53 53 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ ··· 73 73 return err 74 74 } 75 75 76 - err = e.db.CreatePipeline(id, e.n) 76 + err = e.db.CreatePipeline(id, atUri, e.n) 77 77 return err 78 78 } 79 79
+19 -14
spindle/server.go
··· 70 70 go func() { 71 71 logger.Info("starting event consumer") 72 72 knotEventSource := knotclient.NewEventSource("localhost:5555") 73 - ccfg := knotclient.ConsumerConfig{ 74 - Logger: logger, 75 - ProcessFunc: spindle.exec, 76 - } 73 + 74 + ccfg := knotclient.NewConsumerConfig() 75 + ccfg.Logger = logger 76 + ccfg.Dev = cfg.Server.Dev 77 + ccfg.ProcessFunc = spindle.exec 77 78 ccfg.AddEventSource(knotEventSource) 78 79 79 - ec := knotclient.NewEventConsumer(ccfg) 80 + ec := knotclient.NewEventConsumer(*ccfg) 80 81 81 82 ec.Start(ctx) 82 83 }() ··· 95 96 } 96 97 97 98 func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 98 - pipeline := tangled.Pipeline{} 99 - err := json.Unmarshal(msg.EventJson, &pipeline) 100 - if err != nil { 101 - fmt.Println("error unmarshalling", err) 102 - return err 103 - } 99 + if msg.Nsid == tangled.PipelineNSID { 100 + pipeline := tangled.Pipeline{} 101 + err := json.Unmarshal(msg.EventJson, &pipeline) 102 + if err != nil { 103 + fmt.Println("error unmarshalling", err) 104 + return err 105 + } 104 106 105 - if msg.Nsid == tangled.PipelineNSID { 106 - err = s.eng.SetupPipeline(ctx, &pipeline, msg.Rkey) 107 + // this is a "fake" at uri for now 108 + pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 109 + 110 + rkey := TID() 111 + err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey) 107 112 if err != nil { 108 113 return err 109 114 } 110 - err = s.eng.StartWorkflows(ctx, &pipeline, msg.Rkey) 115 + err = s.eng.StartWorkflows(ctx, &pipeline, rkey) 111 116 if err != nil { 112 117 return err 113 118 }
+3 -2
spindle/stream.go
··· 4 4 "net/http" 5 5 "time" 6 6 7 + "context" 8 + 7 9 "github.com/gorilla/websocket" 8 - "golang.org/x/net/context" 9 10 ) 10 11 11 12 var upgrader = websocket.Upgrader{ ··· 74 75 } 75 76 76 77 func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error { 77 - ops, err := s.db.GetPipelines(*cursor) 78 + ops, err := s.db.GetPipelineStatusAsRecords(*cursor) 78 79 if err != nil { 79 80 s.l.Debug("err", "err", err) 80 81 return err
+9
spindle/tid.go
··· 1 + package spindle 2 + 3 + import "github.com/bluesky-social/indigo/atproto/syntax" 4 + 5 + var TIDClock = syntax.NewTIDClock(0) 6 + 7 + func TID() string { 8 + return TIDClock.Next().String() 9 + }