Monorepo for Tangled
at local-dev 343 lines 9.5 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/jetstream/pkg/models" 16 securejoin "github.com/cyphar/filepath-securejoin" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/knotserver/db" 19 "tangled.org/core/knotserver/git" 20 "tangled.org/core/log" 21 "tangled.org/core/rbac" 22 "tangled.org/core/workflow" 23) 24 25func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { 26 l := log.FromContext(ctx) 27 raw := json.RawMessage(event.Commit.Record) 28 did := event.Did 29 30 var record tangled.PublicKey 31 if err := json.Unmarshal(raw, &record); err != nil { 32 return fmt.Errorf("failed to unmarshal record: %w", err) 33 } 34 35 pk := db.PublicKey{ 36 Did: did, 37 PublicKey: record, 38 } 39 if err := h.db.AddPublicKey(pk); err != nil { 40 l.Error("failed to add public key", "error", err) 41 return fmt.Errorf("failed to add public key: %w", err) 42 } 43 l.Info("added public key from firehose", "did", did) 44 return nil 45} 46 47func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error { 48 l := log.FromContext(ctx) 49 raw := json.RawMessage(event.Commit.Record) 50 did := event.Did 51 52 var record tangled.KnotMember 53 if err := json.Unmarshal(raw, &record); err != nil { 54 return fmt.Errorf("failed to unmarshal record: %w", err) 55 } 56 57 if record.Domain != h.c.Server.Hostname { 58 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 59 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 60 } 61 62 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 63 if err != nil || !ok { 64 l.Error("failed to add member", "did", did) 65 return fmt.Errorf("failed to enforce permissions: %w", err) 66 } 67 68 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 69 l.Error("failed to add member", "error", err) 70 return fmt.Errorf("failed to add member: %w", err) 71 } 72 l.Info("added member from firehose", "member", record.Subject) 73 74 if err := h.db.AddDid(record.Subject); err != nil { 75 l.Error("failed to add did", "error", err) 76 return fmt.Errorf("failed to add did: %w", err) 77 } 78 h.jc.AddDid(record.Subject) 79 80 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 81 return fmt.Errorf("failed to fetch and add keys: %w", err) 82 } 83 84 return nil 85} 86 87func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 88 raw := json.RawMessage(event.Commit.Record) 89 did := event.Did 90 91 var record tangled.RepoPull 92 if err := json.Unmarshal(raw, &record); err != nil { 93 return fmt.Errorf("failed to unmarshal record: %w", err) 94 } 95 96 l := log.FromContext(ctx) 97 l = l.With("handler", "processPull") 98 l = l.With("did", did) 99 100 if record.Target == nil { 101 return fmt.Errorf("ignoring pull record: target repo is nil") 102 } 103 104 l = l.With("target_repo", record.Target.Repo) 105 l = l.With("target_branch", record.Target.Branch) 106 107 if record.Source == nil { 108 return fmt.Errorf("ignoring pull record: not a branch-based pull request") 109 } 110 111 if record.Source.Repo != nil { 112 return fmt.Errorf("ignoring pull record: fork based pull") 113 } 114 115 repoAt, err := syntax.ParseATURI(record.Target.Repo) 116 if err != nil { 117 return fmt.Errorf("failed to parse ATURI: %w", err) 118 } 119 120 // resolve this aturi to extract the repo record 121 ident, pdsClient, err := h.resolver.PDSClient(ctx, repoAt.Authority().String()) 122 if err != nil { 123 return fmt.Errorf("failed to resolve identity: %w", err) 124 } 125 126 resp, err := comatproto.RepoGetRecord(ctx, pdsClient, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 127 if err != nil { 128 return fmt.Errorf("failed to resolve repo: %w", err) 129 } 130 131 repo := resp.Value.Val.(*tangled.Repo) 132 133 if repo.Knot != h.c.Server.Hostname { 134 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 135 } 136 137 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 138 if err != nil { 139 return fmt.Errorf("failed to construct relative repo path: %w", err) 140 } 141 142 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 143 if err != nil { 144 return fmt.Errorf("failed to construct absolute repo path: %w", err) 145 } 146 147 gr, err := git.Open(repoPath, record.Source.Sha) 148 if err != nil { 149 return fmt.Errorf("failed to open git repository: %w", err) 150 } 151 152 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 153 if err != nil { 154 return fmt.Errorf("failed to open workflow directory: %w", err) 155 } 156 157 var pipeline workflow.RawPipeline 158 for _, e := range workflowDir { 159 if !e.IsFile() { 160 continue 161 } 162 163 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 164 contents, err := gr.RawContent(fpath) 165 if err != nil { 166 continue 167 } 168 169 pipeline = append(pipeline, workflow.RawWorkflow{ 170 Name: e.Name, 171 Contents: contents, 172 }) 173 } 174 175 trigger := tangled.Pipeline_PullRequestTriggerData{ 176 Action: "create", 177 SourceBranch: record.Source.Branch, 178 SourceSha: record.Source.Sha, 179 TargetBranch: record.Target.Branch, 180 } 181 182 compiler := workflow.Compiler{ 183 Trigger: tangled.Pipeline_TriggerMetadata{ 184 Kind: string(workflow.TriggerKindPullRequest), 185 PullRequest: &trigger, 186 Repo: &tangled.Pipeline_TriggerRepo{ 187 Did: ident.DID.String(), 188 Knot: repo.Knot, 189 Repo: repo.Name, 190 }, 191 }, 192 } 193 194 cp := compiler.Compile(compiler.Parse(pipeline)) 195 eventJson, err := json.Marshal(cp) 196 if err != nil { 197 return fmt.Errorf("failed to marshal pipeline event: %w", err) 198 } 199 200 // do not run empty pipelines 201 if cp.Workflows == nil { 202 return nil 203 } 204 205 ev := db.Event{ 206 Rkey: TID(), 207 Nsid: tangled.PipelineNSID, 208 EventJson: string(eventJson), 209 } 210 211 return h.db.InsertEvent(ev, h.n) 212} 213 214// duplicated from add collaborator 215func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 216 raw := json.RawMessage(event.Commit.Record) 217 did := event.Did 218 219 var record tangled.RepoCollaborator 220 if err := json.Unmarshal(raw, &record); err != nil { 221 return fmt.Errorf("failed to unmarshal record: %w", err) 222 } 223 224 repoAt, err := syntax.ParseATURI(record.Repo) 225 if err != nil { 226 return err 227 } 228 229 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 230 if err != nil || subjectId.Handle.IsInvalidHandle() { 231 return err 232 } 233 234 // TODO: fix this for good, we need to fetch the record here unfortunately 235 // resolve this aturi to extract the repo record 236 owner, pdsClient, err := h.resolver.PDSClient(ctx, repoAt.Authority().String()) 237 if err != nil { 238 return fmt.Errorf("failed to resolve owner: %w", err) 239 } 240 241 resp, err := comatproto.RepoGetRecord(ctx, pdsClient, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 242 if err != nil { 243 return err 244 } 245 246 repo := resp.Value.Val.(*tangled.Repo) 247 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 248 249 // check perms for this user 250 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo) 251 if err != nil { 252 return fmt.Errorf("failed to check permissions: %w", err) 253 } 254 if !ok { 255 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo) 256 } 257 258 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 259 return err 260 } 261 h.jc.AddDid(subjectId.DID.String()) 262 263 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 264 return err 265 } 266 267 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 268} 269 270func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 271 l := log.FromContext(ctx) 272 273 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 274 if err != nil { 275 l.Error("error building endpoint url", "did", did, "error", err.Error()) 276 return fmt.Errorf("error building endpoint url: %w", err) 277 } 278 279 resp, err := http.Get(keysEndpoint) 280 if err != nil { 281 l.Error("error getting keys", "did", did, "error", err) 282 return fmt.Errorf("error getting keys: %w", err) 283 } 284 defer resp.Body.Close() 285 286 if resp.StatusCode == http.StatusNotFound { 287 l.Info("no keys found for did", "did", did) 288 return nil 289 } 290 291 plaintext, err := io.ReadAll(resp.Body) 292 if err != nil { 293 l.Error("error reading response body", "error", err) 294 return fmt.Errorf("error reading response body: %w", err) 295 } 296 297 for key := range strings.SplitSeq(string(plaintext), "\n") { 298 if key == "" { 299 continue 300 } 301 pk := db.PublicKey{ 302 Did: did, 303 } 304 pk.Key = key 305 if err := h.db.AddPublicKey(pk); err != nil { 306 l.Error("failed to add public key", "error", err) 307 return fmt.Errorf("failed to add public key: %w", err) 308 } 309 } 310 return nil 311} 312 313func (h *Knot) processMessages(ctx context.Context, event *models.Event) error { 314 if event.Kind != models.EventKindCommit { 315 return nil 316 } 317 318 var err error 319 defer func() { 320 eventTime := event.TimeUS 321 lastTimeUs := eventTime + 1 322 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 323 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 324 } 325 }() 326 327 switch event.Commit.Collection { 328 case tangled.PublicKeyNSID: 329 err = h.processPublicKey(ctx, event) 330 case tangled.KnotMemberNSID: 331 err = h.processKnotMember(ctx, event) 332 case tangled.RepoPullNSID: 333 err = h.processPull(ctx, event) 334 case tangled.RepoCollaboratorNSID: 335 err = h.processCollaborator(ctx, event) 336 } 337 338 if err != nil { 339 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 340 } 341 342 return nil 343}