knotserver: ingest pull records to execute pipelines #275

merged
opened by oppi.li targeting master from push-vquoltwpkuny

also rename jetstream consumer to ingester

Signed-off-by: oppiliappan me@oppi.li

Changed files
+305 -147
knotserver
+305
knotserver/ingester.go
··· 1 + package knotserver 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "net/url" 10 + "path/filepath" 11 + "slices" 12 + "strings" 13 + 14 + comatproto "github.com/bluesky-social/indigo/api/atproto" 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 + "github.com/bluesky-social/indigo/xrpc" 17 + "github.com/bluesky-social/jetstream/pkg/models" 18 + securejoin "github.com/cyphar/filepath-securejoin" 19 + "tangled.sh/tangled.sh/core/api/tangled" 20 + "tangled.sh/tangled.sh/core/appview/idresolver" 21 + "tangled.sh/tangled.sh/core/knotserver/db" 22 + "tangled.sh/tangled.sh/core/knotserver/git" 23 + "tangled.sh/tangled.sh/core/log" 24 + "tangled.sh/tangled.sh/core/workflow" 25 + ) 26 + 27 + func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 28 + l := log.FromContext(ctx) 29 + pk := db.PublicKey{ 30 + Did: did, 31 + PublicKey: record, 32 + } 33 + if err := h.db.AddPublicKey(pk); err != nil { 34 + l.Error("failed to add public key", "error", err) 35 + return fmt.Errorf("failed to add public key: %w", err) 36 + } 37 + l.Info("added public key from firehose", "did", did) 38 + return nil 39 + } 40 + 41 + func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 42 + l := log.FromContext(ctx) 43 + 44 + if record.Domain != h.c.Server.Hostname { 45 + l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 46 + return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 47 + } 48 + 49 + ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 50 + if err != nil || !ok { 51 + l.Error("failed to add member", "did", did) 52 + return fmt.Errorf("failed to enforce permissions: %w", err) 53 + } 54 + 55 + if err := h.e.AddKnotMember(ThisServer, record.Subject); err != nil { 56 + l.Error("failed to add member", "error", err) 57 + return fmt.Errorf("failed to add member: %w", err) 58 + } 59 + l.Info("added member from firehose", "member", record.Subject) 60 + 61 + if err := h.db.AddDid(did); err != nil { 62 + l.Error("failed to add did", "error", err) 63 + return fmt.Errorf("failed to add did: %w", err) 64 + } 65 + h.jc.AddDid(did) 66 + 67 + if err := h.fetchAndAddKeys(ctx, did); err != nil { 68 + return fmt.Errorf("failed to fetch and add keys: %w", err) 69 + } 70 + 71 + return nil 72 + } 73 + 74 + func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 75 + l := log.FromContext(ctx) 76 + l = l.With("handler", "processPull") 77 + l = l.With("did", did) 78 + l = l.With("target_repo", record.TargetRepo) 79 + l = l.With("target_branch", record.TargetBranch) 80 + 81 + if record.Source == nil { 82 + reason := "not a branch-based pull request" 83 + l.Info("ignoring pull record", "reason", reason) 84 + return fmt.Errorf("ignoring pull record: %s", reason) 85 + } 86 + 87 + if record.Source.Repo != nil { 88 + reason := "fork based pull" 89 + l.Info("ignoring pull record", "reason", reason) 90 + return fmt.Errorf("ignoring pull record: %s", reason) 91 + } 92 + 93 + allDids, err := h.db.GetAllDids() 94 + if err != nil { 95 + return err 96 + } 97 + 98 + // presently: we only process PRs from collaborators for pipelines 99 + if !slices.Contains(allDids, did) { 100 + reason := "not a known did" 101 + l.Info("rejecting pull record", "reason", reason) 102 + return fmt.Errorf("rejected pull record: %s, %s", reason, did) 103 + } 104 + 105 + repoAt, err := syntax.ParseATURI(record.TargetRepo) 106 + if err != nil { 107 + return err 108 + } 109 + 110 + // resolve this aturi to extract the repo record 111 + resolver := idresolver.DefaultResolver() 112 + ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 113 + if err != nil || ident.Handle.IsInvalidHandle() { 114 + return fmt.Errorf("failed to resolve handle: %w", err) 115 + } 116 + 117 + xrpcc := xrpc.Client{ 118 + Host: ident.PDSEndpoint(), 119 + } 120 + 121 + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 122 + if err != nil { 123 + return err 124 + } 125 + 126 + repo := resp.Value.Val.(*tangled.Repo) 127 + 128 + if repo.Knot != h.c.Server.Hostname { 129 + reason := "not this knot" 130 + l.Info("rejecting pull record", "reason", reason) 131 + return fmt.Errorf("rejected pull record: %s", reason) 132 + } 133 + 134 + didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 135 + if err != nil { 136 + return err 137 + } 138 + 139 + repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 140 + if err != nil { 141 + return err 142 + } 143 + 144 + gr, err := git.Open(repoPath, record.Source.Branch) 145 + if err != nil { 146 + return err 147 + } 148 + 149 + workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 150 + if err != nil { 151 + return err 152 + } 153 + 154 + var pipeline workflow.Pipeline 155 + for _, e := range workflowDir { 156 + if !e.IsFile { 157 + continue 158 + } 159 + 160 + fpath := filepath.Join(workflow.WorkflowDir, e.Name) 161 + contents, err := gr.RawContent(fpath) 162 + if err != nil { 163 + continue 164 + } 165 + 166 + wf, err := workflow.FromFile(e.Name, contents) 167 + if err != nil { 168 + // TODO: log here, respond to client that is pushing 169 + h.l.Error("failed to parse workflow", "err", err, "path", fpath) 170 + continue 171 + } 172 + 173 + pipeline = append(pipeline, wf) 174 + } 175 + 176 + trigger := tangled.Pipeline_PullRequestTriggerData{ 177 + Action: "create", 178 + SourceBranch: record.Source.Branch, 179 + SourceSha: record.Source.Sha, 180 + TargetBranch: record.TargetBranch, 181 + } 182 + 183 + compiler := workflow.Compiler{ 184 + Trigger: tangled.Pipeline_TriggerMetadata{ 185 + Kind: string(workflow.TriggerKindPullRequest), 186 + PullRequest: &trigger, 187 + Repo: &tangled.Pipeline_TriggerRepo{ 188 + Did: repo.Owner, 189 + Knot: repo.Knot, 190 + Repo: repo.Name, 191 + }, 192 + }, 193 + } 194 + 195 + cp := compiler.Compile(pipeline) 196 + eventJson, err := json.Marshal(cp) 197 + if err != nil { 198 + return err 199 + } 200 + 201 + // do not run empty pipelines 202 + if cp.Workflows == nil { 203 + return nil 204 + } 205 + 206 + event := db.Event{ 207 + Rkey: TID(), 208 + Nsid: tangled.PipelineNSID, 209 + EventJson: string(eventJson), 210 + } 211 + 212 + return h.db.InsertEvent(event, h.n) 213 + } 214 + 215 + func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 216 + l := log.FromContext(ctx) 217 + 218 + keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 219 + if err != nil { 220 + l.Error("error building endpoint url", "did", did, "error", err.Error()) 221 + return fmt.Errorf("error building endpoint url: %w", err) 222 + } 223 + 224 + resp, err := http.Get(keysEndpoint) 225 + if err != nil { 226 + l.Error("error getting keys", "did", did, "error", err) 227 + return fmt.Errorf("error getting keys: %w", err) 228 + } 229 + defer resp.Body.Close() 230 + 231 + if resp.StatusCode == http.StatusNotFound { 232 + l.Info("no keys found for did", "did", did) 233 + return nil 234 + } 235 + 236 + plaintext, err := io.ReadAll(resp.Body) 237 + if err != nil { 238 + l.Error("error reading response body", "error", err) 239 + return fmt.Errorf("error reading response body: %w", err) 240 + } 241 + 242 + for _, key := range strings.Split(string(plaintext), "\n") { 243 + if key == "" { 244 + continue 245 + } 246 + pk := db.PublicKey{ 247 + Did: did, 248 + } 249 + pk.Key = key 250 + if err := h.db.AddPublicKey(pk); err != nil { 251 + l.Error("failed to add public key", "error", err) 252 + return fmt.Errorf("failed to add public key: %w", err) 253 + } 254 + } 255 + return nil 256 + } 257 + 258 + func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 259 + did := event.Did 260 + if event.Kind != models.EventKindCommit { 261 + return nil 262 + } 263 + 264 + var err error 265 + defer func() { 266 + eventTime := event.TimeUS 267 + lastTimeUs := eventTime + 1 268 + fmt.Println("lastTimeUs", lastTimeUs) 269 + if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 270 + err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 271 + } 272 + }() 273 + 274 + raw := json.RawMessage(event.Commit.Record) 275 + 276 + switch event.Commit.Collection { 277 + case tangled.PublicKeyNSID: 278 + var record tangled.PublicKey 279 + if err := json.Unmarshal(raw, &record); err != nil { 280 + return fmt.Errorf("failed to unmarshal record: %w", err) 281 + } 282 + if err := h.processPublicKey(ctx, did, record); err != nil { 283 + return fmt.Errorf("failed to process public key: %w", err) 284 + } 285 + 286 + case tangled.KnotMemberNSID: 287 + var record tangled.KnotMember 288 + if err := json.Unmarshal(raw, &record); err != nil { 289 + return fmt.Errorf("failed to unmarshal record: %w", err) 290 + } 291 + if err := h.processKnotMember(ctx, did, record); err != nil { 292 + return fmt.Errorf("failed to process knot member: %w", err) 293 + } 294 + case tangled.RepoPullNSID: 295 + var record tangled.RepoPull 296 + if err := json.Unmarshal(raw, &record); err != nil { 297 + return fmt.Errorf("failed to unmarshal record: %w", err) 298 + } 299 + if err := h.processPull(ctx, did, record); err != nil { 300 + return fmt.Errorf("failed to process knot member: %w", err) 301 + } 302 + } 303 + 304 + return err 305 + }
-147
knotserver/jetstream.go
··· 1 - package knotserver 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "fmt" 7 - "io" 8 - "net/http" 9 - "net/url" 10 - "strings" 11 - 12 - "github.com/bluesky-social/jetstream/pkg/models" 13 - "tangled.sh/tangled.sh/core/api/tangled" 14 - "tangled.sh/tangled.sh/core/knotserver/db" 15 - "tangled.sh/tangled.sh/core/log" 16 - ) 17 - 18 - func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 19 - l := log.FromContext(ctx) 20 - pk := db.PublicKey{ 21 - Did: did, 22 - PublicKey: record, 23 - } 24 - if err := h.db.AddPublicKey(pk); err != nil { 25 - l.Error("failed to add public key", "error", err) 26 - return fmt.Errorf("failed to add public key: %w", err) 27 - } 28 - l.Info("added public key from firehose", "did", did) 29 - return nil 30 - } 31 - 32 - func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 33 - l := log.FromContext(ctx) 34 - 35 - if record.Domain != h.c.Server.Hostname { 36 - l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 37 - return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 38 - } 39 - 40 - ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 41 - if err != nil || !ok { 42 - l.Error("failed to add member", "did", did) 43 - return fmt.Errorf("failed to enforce permissions: %w", err) 44 - } 45 - 46 - if err := h.e.AddKnotMember(ThisServer, record.Subject); err != nil { 47 - l.Error("failed to add member", "error", err) 48 - return fmt.Errorf("failed to add member: %w", err) 49 - } 50 - l.Info("added member from firehose", "member", record.Subject) 51 - 52 - if err := h.db.AddDid(did); err != nil { 53 - l.Error("failed to add did", "error", err) 54 - return fmt.Errorf("failed to add did: %w", err) 55 - } 56 - h.jc.AddDid(did) 57 - 58 - if err := h.fetchAndAddKeys(ctx, did); err != nil { 59 - return fmt.Errorf("failed to fetch and add keys: %w", err) 60 - } 61 - 62 - return nil 63 - } 64 - 65 - func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 66 - l := log.FromContext(ctx) 67 - 68 - keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 69 - if err != nil { 70 - l.Error("error building endpoint url", "did", did, "error", err.Error()) 71 - return fmt.Errorf("error building endpoint url: %w", err) 72 - } 73 - 74 - resp, err := http.Get(keysEndpoint) 75 - if err != nil { 76 - l.Error("error getting keys", "did", did, "error", err) 77 - return fmt.Errorf("error getting keys: %w", err) 78 - } 79 - defer resp.Body.Close() 80 - 81 - if resp.StatusCode == http.StatusNotFound { 82 - l.Info("no keys found for did", "did", did) 83 - return nil 84 - } 85 - 86 - plaintext, err := io.ReadAll(resp.Body) 87 - if err != nil { 88 - l.Error("error reading response body", "error", err) 89 - return fmt.Errorf("error reading response body: %w", err) 90 - } 91 - 92 - for _, key := range strings.Split(string(plaintext), "\n") { 93 - if key == "" { 94 - continue 95 - } 96 - pk := db.PublicKey{ 97 - Did: did, 98 - } 99 - pk.Key = key 100 - if err := h.db.AddPublicKey(pk); err != nil { 101 - l.Error("failed to add public key", "error", err) 102 - return fmt.Errorf("failed to add public key: %w", err) 103 - } 104 - } 105 - return nil 106 - } 107 - 108 - func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 109 - did := event.Did 110 - if event.Kind != models.EventKindCommit { 111 - return nil 112 - } 113 - 114 - var err error 115 - defer func() { 116 - eventTime := event.TimeUS 117 - lastTimeUs := eventTime + 1 118 - fmt.Println("lastTimeUs", lastTimeUs) 119 - if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 120 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 121 - } 122 - }() 123 - 124 - raw := json.RawMessage(event.Commit.Record) 125 - 126 - switch event.Commit.Collection { 127 - case tangled.PublicKeyNSID: 128 - var record tangled.PublicKey 129 - if err := json.Unmarshal(raw, &record); err != nil { 130 - return fmt.Errorf("failed to unmarshal record: %w", err) 131 - } 132 - if err := h.processPublicKey(ctx, did, record); err != nil { 133 - return fmt.Errorf("failed to process public key: %w", err) 134 - } 135 - 136 - case tangled.KnotMemberNSID: 137 - var record tangled.KnotMember 138 - if err := json.Unmarshal(raw, &record); err != nil { 139 - return fmt.Errorf("failed to unmarshal record: %w", err) 140 - } 141 - if err := h.processKnotMember(ctx, did, record); err != nil { 142 - return fmt.Errorf("failed to process knot member: %w", err) 143 - } 144 - } 145 - 146 - return err 147 - }