Monorepo for Tangled tangled.org

knotserver,spindle: remove all pipeline logics from knotserver

`sh.tangled.pipeline` events are now completely generated & streamed
from spindle

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me d4ce7472 dd7b57ae

verified
Changed files
+1 -286
knotserver
spindle
-136
knotserver/ingester.go
··· 7 7 "io" 8 8 "net/http" 9 9 "net/url" 10 - "path/filepath" 11 10 "strings" 12 11 13 12 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 17 16 securejoin "github.com/cyphar/filepath-securejoin" 18 17 "tangled.org/core/api/tangled" 19 18 "tangled.org/core/knotserver/db" 20 - "tangled.org/core/knotserver/git" 21 19 "tangled.org/core/log" 22 20 "tangled.org/core/rbac" 23 - "tangled.org/core/workflow" 24 21 ) 25 22 26 23 func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { ··· 85 82 return nil 86 83 } 87 84 88 - func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 89 - raw := json.RawMessage(event.Commit.Record) 90 - did := event.Did 91 - 92 - var record tangled.RepoPull 93 - if err := json.Unmarshal(raw, &record); err != nil { 94 - return fmt.Errorf("failed to unmarshal record: %w", err) 95 - } 96 - 97 - l := log.FromContext(ctx) 98 - l = l.With("handler", "processPull") 99 - l = l.With("did", did) 100 - 101 - if record.Target == nil { 102 - return fmt.Errorf("ignoring pull record: target repo is nil") 103 - } 104 - 105 - l = l.With("target_repo", record.Target.Repo) 106 - l = l.With("target_branch", record.Target.Branch) 107 - 108 - if record.Source == nil { 109 - return fmt.Errorf("ignoring pull record: not a branch-based pull request") 110 - } 111 - 112 - if record.Source.Repo != nil { 113 - return fmt.Errorf("ignoring pull record: fork based pull") 114 - } 115 - 116 - repoAt, err := syntax.ParseATURI(record.Target.Repo) 117 - if err != nil { 118 - return fmt.Errorf("failed to parse ATURI: %w", err) 119 - } 120 - 121 - // resolve this aturi to extract the repo record 122 - ident, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 123 - if err != nil || ident.Handle.IsInvalidHandle() { 124 - return fmt.Errorf("failed to resolve handle: %w", err) 125 - } 126 - 127 - xrpcc := xrpc.Client{ 128 - Host: ident.PDSEndpoint(), 129 - } 130 - 131 - resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 132 - if err != nil { 133 - return fmt.Errorf("failed to resolver repo: %w", err) 134 - } 135 - 136 - repo := resp.Value.Val.(*tangled.Repo) 137 - 138 - if repo.Knot != h.c.Server.Hostname { 139 - return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 140 - } 141 - 142 - didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 143 - if err != nil { 144 - return fmt.Errorf("failed to construct relative repo path: %w", err) 145 - } 146 - 147 - repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 148 - if err != nil { 149 - return fmt.Errorf("failed to construct absolute repo path: %w", err) 150 - } 151 - 152 - gr, err := git.Open(repoPath, record.Source.Sha) 153 - if err != nil { 154 - return fmt.Errorf("failed to open git repository: %w", err) 155 - } 156 - 157 - workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 158 - if err != nil { 159 - return fmt.Errorf("failed to open workflow directory: %w", err) 160 - } 161 - 162 - var pipeline workflow.RawPipeline 163 - for _, e := range workflowDir { 164 - if !e.IsFile() { 165 - continue 166 - } 167 - 168 - fpath := filepath.Join(workflow.WorkflowDir, e.Name) 169 - contents, err := gr.RawContent(fpath) 170 - if err != nil { 171 - continue 172 - } 173 - 174 - pipeline = append(pipeline, workflow.RawWorkflow{ 175 - Name: e.Name, 176 - Contents: contents, 177 - }) 178 - } 179 - 180 - trigger := tangled.Pipeline_PullRequestTriggerData{ 181 - Action: "create", 182 - SourceBranch: record.Source.Branch, 183 - SourceSha: record.Source.Sha, 184 - TargetBranch: record.Target.Branch, 185 - } 186 - 187 - compiler := workflow.Compiler{ 188 - Trigger: tangled.Pipeline_TriggerMetadata{ 189 - Kind: string(workflow.TriggerKindPullRequest), 190 - PullRequest: &trigger, 191 - Repo: &tangled.Pipeline_TriggerRepo{ 192 - Did: ident.DID.String(), 193 - Knot: repo.Knot, 194 - Repo: repo.Name, 195 - }, 196 - }, 197 - } 198 - 199 - cp := compiler.Compile(compiler.Parse(pipeline)) 200 - eventJson, err := json.Marshal(cp) 201 - if err != nil { 202 - return fmt.Errorf("failed to marshal pipeline event: %w", err) 203 - } 204 - 205 - // do not run empty pipelines 206 - if cp.Workflows == nil { 207 - return nil 208 - } 209 - 210 - ev := db.Event{ 211 - Rkey: TID(), 212 - Nsid: tangled.PipelineNSID, 213 - EventJson: string(eventJson), 214 - } 215 - 216 - return h.db.InsertEvent(ev, h.n) 217 - } 218 - 219 85 // duplicated from add collaborator 220 86 func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 221 87 raw := json.RawMessage(event.Commit.Record) ··· 338 204 err = h.processPublicKey(ctx, event) 339 205 case tangled.KnotMemberNSID: 340 206 err = h.processKnotMember(ctx, event) 341 - case tangled.RepoPullNSID: 342 - err = h.processPull(ctx, event) 343 207 case tangled.RepoCollaboratorNSID: 344 208 err = h.processCollaborator(ctx, event) 345 209 }
-109
knotserver/internal.go
··· 23 23 "tangled.org/core/log" 24 24 "tangled.org/core/notifier" 25 25 "tangled.org/core/rbac" 26 - "tangled.org/core/workflow" 27 26 ) 28 27 29 28 type InternalHandle struct { ··· 188 187 l.Error("failed to reply with compare link", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 189 188 // non-fatal 190 189 } 191 - 192 - err = h.triggerPipeline(&resp.Messages, line, gitUserDid, repoDid, repoName, pushOptions) 193 - if err != nil { 194 - l.Error("failed to trigger pipeline", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 195 - // non-fatal 196 - } 197 190 } 198 191 199 192 writeJSON(w, resp) ··· 242 235 } 243 236 244 237 return errors.Join(errs, h.db.InsertEvent(event, h.n)) 245 - } 246 - 247 - func (h *InternalHandle) triggerPipeline( 248 - clientMsgs *[]string, 249 - line git.PostReceiveLine, 250 - gitUserDid string, 251 - repoDid string, 252 - repoName string, 253 - pushOptions PushOptions, 254 - ) error { 255 - if pushOptions.skipCi { 256 - return nil 257 - } 258 - 259 - didSlashRepo, err := securejoin.SecureJoin(repoDid, repoName) 260 - if err != nil { 261 - return err 262 - } 263 - 264 - repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 265 - if err != nil { 266 - return err 267 - } 268 - 269 - gr, err := git.Open(repoPath, line.Ref) 270 - if err != nil { 271 - return err 272 - } 273 - 274 - workflowDir, err := gr.FileTree(context.Background(), workflow.WorkflowDir) 275 - if err != nil { 276 - return err 277 - } 278 - 279 - var pipeline workflow.RawPipeline 280 - for _, e := range workflowDir { 281 - if !e.IsFile() { 282 - continue 283 - } 284 - 285 - fpath := filepath.Join(workflow.WorkflowDir, e.Name) 286 - contents, err := gr.RawContent(fpath) 287 - if err != nil { 288 - continue 289 - } 290 - 291 - pipeline = append(pipeline, workflow.RawWorkflow{ 292 - Name: e.Name, 293 - Contents: contents, 294 - }) 295 - } 296 - 297 - trigger := tangled.Pipeline_PushTriggerData{ 298 - Ref: line.Ref, 299 - OldSha: line.OldSha.String(), 300 - NewSha: line.NewSha.String(), 301 - } 302 - 303 - compiler := workflow.Compiler{ 304 - Trigger: tangled.Pipeline_TriggerMetadata{ 305 - Kind: string(workflow.TriggerKindPush), 306 - Push: &trigger, 307 - Repo: &tangled.Pipeline_TriggerRepo{ 308 - Did: repoDid, 309 - Knot: h.c.Server.Hostname, 310 - Repo: repoName, 311 - }, 312 - }, 313 - } 314 - 315 - cp := compiler.Compile(compiler.Parse(pipeline)) 316 - eventJson, err := json.Marshal(cp) 317 - if err != nil { 318 - return err 319 - } 320 - 321 - for _, e := range compiler.Diagnostics.Errors { 322 - *clientMsgs = append(*clientMsgs, e.String()) 323 - } 324 - 325 - if pushOptions.verboseCi { 326 - if compiler.Diagnostics.IsEmpty() { 327 - *clientMsgs = append(*clientMsgs, "success: pipeline compiled with no diagnostics") 328 - } 329 - 330 - for _, w := range compiler.Diagnostics.Warnings { 331 - *clientMsgs = append(*clientMsgs, w.String()) 332 - } 333 - } 334 - 335 - // do not run empty pipelines 336 - if cp.Workflows == nil { 337 - return nil 338 - } 339 - 340 - event := db.Event{ 341 - Rkey: TID(), 342 - Nsid: tangled.PipelineNSID, 343 - EventJson: string(eventJson), 344 - } 345 - 346 - return h.db.InsertEvent(event, h.n) 347 238 } 348 239 349 240 func (h *InternalHandle) emitCompareLink(
-1
knotserver/server.go
··· 79 79 jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{ 80 80 tangled.PublicKeyNSID, 81 81 tangled.KnotMemberNSID, 82 - tangled.RepoPullNSID, 83 82 tangled.RepoCollaboratorNSID, 84 83 }, nil, log.SubLogger(logger, "jetstream"), db, true, c.Server.LogDids) 85 84 if err != nil {
+1 -40
spindle/server.go
··· 271 271 func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 272 272 l := log.FromContext(ctx).With("handler", "processKnotStream") 273 273 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 274 - if msg.Nsid == tangled.PipelineNSID { 275 - return nil 276 - tpl := tangled.Pipeline{} 277 - err := json.Unmarshal(msg.EventJson, &tpl) 278 - if err != nil { 279 - fmt.Println("error unmarshalling", err) 280 - return err 281 - } 282 - 283 - if tpl.TriggerMetadata == nil { 284 - return fmt.Errorf("no trigger metadata found") 285 - } 286 - 287 - if tpl.TriggerMetadata.Repo == nil { 288 - return fmt.Errorf("no repo data found") 289 - } 290 - 291 - if src.Key() != tpl.TriggerMetadata.Repo.Knot { 292 - return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 293 - } 294 - 295 - // filter by repos 296 - _, err = s.db.GetRepoWithName( 297 - syntax.DID(tpl.TriggerMetadata.Repo.Did), 298 - tpl.TriggerMetadata.Repo.Repo, 299 - ) 300 - if err != nil { 301 - return fmt.Errorf("failed to get repo: %w", err) 302 - } 303 - 304 - pipelineId := models.PipelineId{ 305 - Knot: src.Key(), 306 - Rkey: msg.Rkey, 307 - } 308 - 309 - err = s.processPipeline(ctx, tpl, pipelineId) 310 - if err != nil { 311 - return err 312 - } 313 - } else if msg.Nsid == tangled.GitRefUpdateNSID { 274 + if msg.Nsid == tangled.GitRefUpdateNSID { 314 275 event := tangled.GitRefUpdate{} 315 276 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 316 277 l.Error("error unmarshalling", "err", err)