forked from tangled.org/core
Monorepo for Tangled

spindle/queue: enqueue pipeline jobs

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi c3b287b1 7f7d89ac

verified
Changed files
+70 -13
spindle
+37
spindle/queue/queue.go
···
··· 1 + package queue 2 + 3 + type Job struct { 4 + Run func() error 5 + OnFail func(error) 6 + } 7 + 8 + type Queue struct { 9 + jobs chan Job 10 + } 11 + 12 + func NewQueue(size int) *Queue { 13 + return &Queue{ 14 + jobs: make(chan Job, size), 15 + } 16 + } 17 + 18 + func (q *Queue) Enqueue(job Job) bool { 19 + select { 20 + case q.jobs <- job: 21 + return true 22 + default: 23 + return false 24 + } 25 + } 26 + 27 + func (q *Queue) StartRunner() { 28 + go func() { 29 + for job := range q.jobs { 30 + if err := job.Run(); err != nil { 31 + if job.OnFail != nil { 32 + job.OnFail(err) 33 + } 34 + } 35 + } 36 + }() 37 + }
+33 -13
spindle/server.go
··· 1 package spindle 2 3 import ( 4 "encoding/json" 5 "fmt" 6 "log/slog" 7 "net/http" 8 9 "github.com/go-chi/chi/v5" 10 - "golang.org/x/net/context" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" ··· 17 "tangled.sh/tangled.sh/core/spindle/config" 18 "tangled.sh/tangled.sh/core/spindle/db" 19 "tangled.sh/tangled.sh/core/spindle/engine" 20 ) 21 22 type Spindle struct { ··· 26 l *slog.Logger 27 n *notifier.Notifier 28 eng *engine.Engine 29 } 30 31 func Run(ctx context.Context) error { ··· 58 return err 59 } 60 61 spindle := Spindle{ 62 jc: jc, 63 e: e, ··· 65 l: logger, 66 n: &n, 67 eng: eng, 68 } 69 70 go func() { 71 logger.Info("starting event consumer") 72 knotEventSource := knotclient.NewEventSource("localhost:5555") ··· 74 ccfg := knotclient.NewConsumerConfig() 75 ccfg.Logger = logger 76 ccfg.Dev = cfg.Server.Dev 77 - ccfg.ProcessFunc = spindle.exec 78 ccfg.AddEventSource(knotEventSource) 79 80 ec := knotclient.NewEventConsumer(*ccfg) ··· 96 return mux 97 } 98 99 - func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 100 if msg.Nsid == tangled.PipelineNSID { 101 pipeline := tangled.Pipeline{} 102 err := json.Unmarshal(msg.EventJson, &pipeline) ··· 105 return err 106 } 107 108 - // this is a "fake" at uri for now 109 - pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 110 111 - rkey := TID() 112 - err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey) 113 - if err != nil { 114 - return err 115 - } 116 - err = s.eng.StartWorkflows(ctx, &pipeline, rkey) 117 - if err != nil { 118 - return err 119 } 120 } 121
··· 1 package spindle 2 3 import ( 4 + "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" ··· 17 "tangled.sh/tangled.sh/core/spindle/config" 18 "tangled.sh/tangled.sh/core/spindle/db" 19 "tangled.sh/tangled.sh/core/spindle/engine" 20 + "tangled.sh/tangled.sh/core/spindle/queue" 21 ) 22 23 type Spindle struct { ··· 27 l *slog.Logger 28 n *notifier.Notifier 29 eng *engine.Engine 30 + jq *queue.Queue 31 } 32 33 func Run(ctx context.Context) error { ··· 60 return err 61 } 62 63 + jq := queue.NewQueue(100) 64 + 65 + // starts a job queue runner in the background 66 + jq.StartRunner() 67 + 68 spindle := Spindle{ 69 jc: jc, 70 e: e, ··· 72 l: logger, 73 n: &n, 74 eng: eng, 75 + jq: jq, 76 } 77 78 + // for each incoming sh.tangled.pipeline, we execute 79 + // spindle.processPipeline, which in turn enqueues the pipeline 80 + // job in the above registered queue. 81 go func() { 82 logger.Info("starting event consumer") 83 knotEventSource := knotclient.NewEventSource("localhost:5555") ··· 85 ccfg := knotclient.NewConsumerConfig() 86 ccfg.Logger = logger 87 ccfg.Dev = cfg.Server.Dev 88 + ccfg.ProcessFunc = spindle.processPipeline 89 ccfg.AddEventSource(knotEventSource) 90 91 ec := knotclient.NewEventConsumer(*ccfg) ··· 107 return mux 108 } 109 110 + func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 111 if msg.Nsid == tangled.PipelineNSID { 112 pipeline := tangled.Pipeline{} 113 err := json.Unmarshal(msg.EventJson, &pipeline) ··· 116 return err 117 } 118 119 + ok := s.jq.Enqueue(queue.Job{ 120 + Run: func() error { 121 + // this is a "fake" at uri for now 122 + pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 123 124 + rkey := TID() 125 + err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey) 126 + if err != nil { 127 + return err 128 + } 129 + return s.eng.StartWorkflows(ctx, &pipeline, rkey) 130 + }, 131 + OnFail: func(error) { 132 + s.l.Error("pipeline setup failed", "error", err) 133 + }, 134 + }) 135 + if ok { 136 + s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 137 + } else { 138 + s.l.Error("failed to enqueue pipeline: queue is full") 139 } 140 } 141