Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/eventconsumer"
12 "tangled.org/core/spindle/db"
13 "tangled.org/core/spindle/git"
14 "tangled.org/core/spindle/models"
15 "tangled.org/core/tap"
16 "tangled.org/core/tid"
17 "tangled.org/core/workflow"
18)
19
20func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error {
21 l := s.l.With("component", "tapIndexer")
22
23 var err error
24 switch evt.Type {
25 case tap.EvtRecord:
26 switch evt.Record.Collection.String() {
27 case tangled.SpindleMemberNSID:
28 err = s.processMember(ctx, evt)
29 case tangled.RepoNSID:
30 err = s.processRepo(ctx, evt)
31 case tangled.RepoCollaboratorNSID:
32 err = s.processCollaborator(ctx, evt)
33 case tangled.RepoPullNSID:
34 err = s.processPull(ctx, evt)
35 }
36 case tap.EvtIdentity:
37 // no-op
38 }
39
40 if err != nil {
41 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
42 return err
43 }
44 return nil
45}
46
47// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated)
48
49func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error {
50 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
51
52 l.Info("processing spindle.member record")
53
54 // only listen to members
55 if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
56 l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err)
57 return nil
58 }
59
60 switch evt.Record.Action {
61 case tap.RecordCreateAction, tap.RecordUpdateAction:
62 record := tangled.SpindleMember{}
63 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
64 return fmt.Errorf("parsing record: %w", err)
65 }
66
67 domain := s.cfg.Server.Hostname
68 if record.Instance != domain {
69 l.Info("domain mismatch", "domain", record.Instance, "expected", domain)
70 return nil
71 }
72
73 created, err := time.Parse(record.CreatedAt, time.RFC3339)
74 if err != nil {
75 created = time.Now()
76 }
77 if err := db.AddSpindleMember(s.db, db.SpindleMember{
78 Did: evt.Record.Did,
79 Rkey: evt.Record.Rkey.String(),
80 Instance: record.Instance,
81 Subject: syntax.DID(record.Subject),
82 Created: created,
83 }); err != nil {
84 l.Error("failed to add member", "error", err)
85 return fmt.Errorf("adding member to db: %w", err)
86 }
87 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
88 return fmt.Errorf("adding member to rbac: %w", err)
89 }
90 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
91 return fmt.Errorf("adding did to tap", err)
92 }
93
94 l.Info("added member", "member", record.Subject)
95 return nil
96
97 case tap.RecordDeleteAction:
98 var (
99 did = evt.Record.Did.String()
100 rkey = evt.Record.Rkey.String()
101 )
102 member, err := db.GetSpindleMember(s.db, did, rkey)
103 if err != nil {
104 return fmt.Errorf("finding member: %w", err)
105 }
106
107 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
108 return fmt.Errorf("removing member from db: %w", err)
109 }
110 if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil {
111 return fmt.Errorf("removing member from rbac: %w", err)
112 }
113 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil {
114 return fmt.Errorf("removing did from tap: %w", err)
115 }
116
117 l.Info("removed member", "member", member.Subject)
118 return nil
119 }
120 return nil
121}
122
123func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error {
124 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
125
126 l.Info("processing repo.collaborator record")
127
128 // only listen to members
129 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
130 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
131 return nil
132 }
133
134 switch evt.Record.Action {
135 case tap.RecordCreateAction, tap.RecordUpdateAction:
136 record := tangled.RepoCollaborator{}
137 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
138 l.Error("invalid record", "err", err)
139 return fmt.Errorf("parsing record: %w", err)
140 }
141
142 // retry later if target repo is not ingested yet
143 if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil {
144 l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err)
145 return fmt.Errorf("target repo is unknown")
146 }
147
148 // check perms for this user
149 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil {
150 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
151 return nil
152 }
153
154 if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{
155 Did: evt.Record.Did,
156 Rkey: evt.Record.Rkey,
157 Repo: syntax.ATURI(record.Repo),
158 Subject: syntax.DID(record.Subject),
159 }); err != nil {
160 return fmt.Errorf("adding collaborator to db: %w", err)
161 }
162 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
163 return fmt.Errorf("adding collaborator to rbac: %w", err)
164 }
165 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
166 return fmt.Errorf("adding did to tap: %w", err)
167 }
168
169 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo)
170 return nil
171
172 case tap.RecordDeleteAction:
173 // get existing collaborator
174 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey)
175 if err != nil {
176 return fmt.Errorf("failed to get existing collaborator info: %w", err)
177 }
178
179 // check perms for this user
180 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil {
181 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
182 return nil
183 }
184
185 if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil {
186 return fmt.Errorf("removing collaborator from db: %w", err)
187 }
188 if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil {
189 return fmt.Errorf("removing collaborator from rbac: %w", err)
190 }
191 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil {
192 return fmt.Errorf("removing did from tap: %w", err)
193 }
194
195 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo)
196 return nil
197 }
198 return nil
199}
200
201func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error {
202 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
203
204 l.Info("processing repo record")
205
206 // only listen to members
207 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
208 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
209 return nil
210 }
211
212 switch evt.Record.Action {
213 case tap.RecordCreateAction, tap.RecordUpdateAction:
214 record := tangled.Repo{}
215 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
216 return fmt.Errorf("parsing record: %w", err)
217 }
218
219 domain := s.cfg.Server.Hostname
220 if record.Spindle == nil || *record.Spindle != domain {
221 if record.Spindle == nil {
222 l.Info("spindle isn't configured", "name", record.Name)
223 } else {
224 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
225 }
226 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
227 return fmt.Errorf("deleting repo from db: %w", err)
228 }
229 return nil
230 }
231
232 repo := &db.Repo{
233 Did: evt.Record.Did,
234 Rkey: evt.Record.Rkey,
235 Name: record.Name,
236 Knot: record.Knot,
237 }
238
239 if err := s.db.PutRepo(repo); err != nil {
240 return fmt.Errorf("adding repo to db: %w", err)
241 }
242
243 if err := s.e.AddRepo(evt.Record.AtUri()); err != nil {
244 return fmt.Errorf("adding repo to rbac")
245 }
246
247 // add this knot to the event consumer
248 src := eventconsumer.NewKnotSource(record.Knot)
249 s.ks.AddSource(context.Background(), src)
250
251 // setup sparse sync
252 repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name)
253 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
254 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil {
255 return fmt.Errorf("setting up sparse-clone git repo: %w", err)
256 }
257
258 l.Info("added repo", "repo", evt.Record.AtUri())
259 return nil
260
261 case tap.RecordDeleteAction:
262 // check perms for this user
263 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil {
264 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err)
265 return nil
266 }
267
268 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
269 return fmt.Errorf("deleting repo from db: %w", err)
270 }
271
272 if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil {
273 return fmt.Errorf("deleting repo from rbac: %w", err)
274 }
275
276 l.Info("deleted repo", "repo", evt.Record.AtUri())
277 return nil
278 }
279 return nil
280}
281
282func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error {
283 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
284
285 l.Info("processing pull record")
286
287 // only listen to live events
288 if !evt.Record.Live {
289 l.Info("skipping backfill event", "event", evt.Record.AtUri())
290 return nil
291 }
292
293 switch evt.Record.Action {
294 case tap.RecordCreateAction, tap.RecordUpdateAction:
295 record := tangled.RepoPull{}
296 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
297 l.Error("invalid record", "err", err)
298 return fmt.Errorf("parsing record: %w", err)
299 }
300
301 // ignore legacy records
302 if record.Target == nil {
303 l.Info("ignoring pull record: target repo is nil")
304 return nil
305 }
306
307 // ignore patch-based and fork-based PRs
308 if record.Source == nil || record.Source.Repo != nil {
309 l.Info("ignoring pull record: not a branch-based pull request")
310 return nil
311 }
312
313 // skip if target repo is unknown
314 repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo))
315 if err != nil {
316 l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err)
317 return fmt.Errorf("target repo is unknown")
318 }
319
320 compiler := workflow.Compiler{
321 Trigger: tangled.Pipeline_TriggerMetadata{
322 Kind: string(workflow.TriggerKindPullRequest),
323 PullRequest: &tangled.Pipeline_PullRequestTriggerData{
324 Action: "create",
325 SourceBranch: record.Source.Branch,
326 SourceSha: record.Source.Sha,
327 TargetBranch: record.Target.Branch,
328 },
329 Repo: &tangled.Pipeline_TriggerRepo{
330 Did: repo.Did.String(),
331 Knot: repo.Knot,
332 Repo: repo.Name,
333 },
334 },
335 }
336
337 repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name)
338 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
339
340 // load workflow definitions from rev (without spindle context)
341 rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha)
342 if err != nil {
343 // don't retry
344 l.Error("failed loading pipeline", "err", err)
345 return nil
346 }
347 if len(rawPipeline) == 0 {
348 l.Info("no workflow definition find for the repo. skipping the event")
349 return nil
350 }
351 tpl := compiler.Compile(compiler.Parse(rawPipeline))
352 // TODO: pass compile error to workflow log
353 for _, w := range compiler.Diagnostics.Errors {
354 l.Error(w.String())
355 }
356 for _, w := range compiler.Diagnostics.Warnings {
357 l.Warn(w.String())
358 }
359
360 pipelineId := models.PipelineId{
361 Knot: tpl.TriggerMetadata.Repo.Knot,
362 Rkey: tid.TID(),
363 }
364 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil {
365 l.Error("failed to create pipeline event", "err", err)
366 return nil
367 }
368 err = s.processPipeline(ctx, tpl, pipelineId)
369 if err != nil {
370 // don't retry
371 l.Error("failed processing pipeline", "err", err)
372 return nil
373 }
374 case tap.RecordDeleteAction:
375 // no-op
376 }
377 return nil
378}
379
380func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error {
381 known, err := s.db.IsKnownDid(syntax.DID(did))
382 if err != nil {
383 return fmt.Errorf("ensuring did known state: %w", err)
384 }
385 if !known {
386 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil {
387 return fmt.Errorf("removing did from tap: %w", err)
388 }
389 }
390 return nil
391}