forked from
tangled.org/core
Monorepo for Tangled
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 securejoin "github.com/cyphar/filepath-securejoin"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/knotserver/db"
19 "tangled.org/core/knotserver/git"
20 "tangled.org/core/log"
21 "tangled.org/core/rbac"
22 "tangled.org/core/workflow"
23)
24
25func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error {
26 l := log.FromContext(ctx)
27 raw := json.RawMessage(event.Commit.Record)
28 did := event.Did
29
30 var record tangled.PublicKey
31 if err := json.Unmarshal(raw, &record); err != nil {
32 return fmt.Errorf("failed to unmarshal record: %w", err)
33 }
34
35 pk := db.PublicKey{
36 Did: did,
37 PublicKey: record,
38 }
39 if err := h.db.AddPublicKey(pk); err != nil {
40 l.Error("failed to add public key", "error", err)
41 return fmt.Errorf("failed to add public key: %w", err)
42 }
43 l.Info("added public key from firehose", "did", did)
44 return nil
45}
46
47func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error {
48 l := log.FromContext(ctx)
49 raw := json.RawMessage(event.Commit.Record)
50 did := event.Did
51
52 var record tangled.KnotMember
53 if err := json.Unmarshal(raw, &record); err != nil {
54 return fmt.Errorf("failed to unmarshal record: %w", err)
55 }
56
57 if record.Domain != h.c.Server.Hostname {
58 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
59 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
60 }
61
62 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
63 if err != nil || !ok {
64 l.Error("failed to add member", "did", did)
65 return fmt.Errorf("failed to enforce permissions: %w", err)
66 }
67
68 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
69 l.Error("failed to add member", "error", err)
70 return fmt.Errorf("failed to add member: %w", err)
71 }
72 l.Info("added member from firehose", "member", record.Subject)
73
74 if err := h.db.AddDid(record.Subject); err != nil {
75 l.Error("failed to add did", "error", err)
76 return fmt.Errorf("failed to add did: %w", err)
77 }
78 h.jc.AddDid(record.Subject)
79
80 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
81 return fmt.Errorf("failed to fetch and add keys: %w", err)
82 }
83
84 return nil
85}
86
87func (h *Knot) processPull(ctx context.Context, event *models.Event) error {
88 raw := json.RawMessage(event.Commit.Record)
89 did := event.Did
90
91 var record tangled.RepoPull
92 if err := json.Unmarshal(raw, &record); err != nil {
93 return fmt.Errorf("failed to unmarshal record: %w", err)
94 }
95
96 l := log.FromContext(ctx)
97 l = l.With("handler", "processPull")
98 l = l.With("did", did)
99
100 if record.Target == nil {
101 return fmt.Errorf("ignoring pull record: target repo is nil")
102 }
103
104 l = l.With("target_repo", record.Target.Repo)
105 l = l.With("target_branch", record.Target.Branch)
106
107 if record.Source == nil {
108 return fmt.Errorf("ignoring pull record: not a branch-based pull request")
109 }
110
111 if record.Source.Repo != nil {
112 return fmt.Errorf("ignoring pull record: fork based pull")
113 }
114
115 repoAt, err := syntax.ParseATURI(record.Target.Repo)
116 if err != nil {
117 return fmt.Errorf("failed to parse ATURI: %w", err)
118 }
119
120 // resolve this aturi to extract the repo record
121 ident, pdsClient, err := h.resolver.PDSClient(ctx, repoAt.Authority().String())
122 if err != nil {
123 return fmt.Errorf("failed to resolve identity: %w", err)
124 }
125
126 resp, err := comatproto.RepoGetRecord(ctx, pdsClient, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
127 if err != nil {
128 return fmt.Errorf("failed to resolve repo: %w", err)
129 }
130
131 repo := resp.Value.Val.(*tangled.Repo)
132
133 if repo.Knot != h.c.Server.Hostname {
134 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
135 }
136
137 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name)
138 if err != nil {
139 return fmt.Errorf("failed to construct relative repo path: %w", err)
140 }
141
142 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
143 if err != nil {
144 return fmt.Errorf("failed to construct absolute repo path: %w", err)
145 }
146
147 gr, err := git.Open(repoPath, record.Source.Sha)
148 if err != nil {
149 return fmt.Errorf("failed to open git repository: %w", err)
150 }
151
152 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
153 if err != nil {
154 return fmt.Errorf("failed to open workflow directory: %w", err)
155 }
156
157 var pipeline workflow.RawPipeline
158 for _, e := range workflowDir {
159 if !e.IsFile() {
160 continue
161 }
162
163 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
164 contents, err := gr.RawContent(fpath)
165 if err != nil {
166 continue
167 }
168
169 pipeline = append(pipeline, workflow.RawWorkflow{
170 Name: e.Name,
171 Contents: contents,
172 })
173 }
174
175 trigger := tangled.Pipeline_PullRequestTriggerData{
176 Action: "create",
177 SourceBranch: record.Source.Branch,
178 SourceSha: record.Source.Sha,
179 TargetBranch: record.Target.Branch,
180 }
181
182 compiler := workflow.Compiler{
183 Trigger: tangled.Pipeline_TriggerMetadata{
184 Kind: string(workflow.TriggerKindPullRequest),
185 PullRequest: &trigger,
186 Repo: &tangled.Pipeline_TriggerRepo{
187 Did: ident.DID.String(),
188 Knot: repo.Knot,
189 Repo: repo.Name,
190 },
191 },
192 }
193
194 cp := compiler.Compile(compiler.Parse(pipeline))
195 eventJson, err := json.Marshal(cp)
196 if err != nil {
197 return fmt.Errorf("failed to marshal pipeline event: %w", err)
198 }
199
200 // do not run empty pipelines
201 if cp.Workflows == nil {
202 return nil
203 }
204
205 ev := db.Event{
206 Rkey: TID(),
207 Nsid: tangled.PipelineNSID,
208 EventJson: string(eventJson),
209 }
210
211 return h.db.InsertEvent(ev, h.n)
212}
213
214// duplicated from add collaborator
215func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error {
216 raw := json.RawMessage(event.Commit.Record)
217 did := event.Did
218
219 var record tangled.RepoCollaborator
220 if err := json.Unmarshal(raw, &record); err != nil {
221 return fmt.Errorf("failed to unmarshal record: %w", err)
222 }
223
224 repoAt, err := syntax.ParseATURI(record.Repo)
225 if err != nil {
226 return err
227 }
228
229 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
230 if err != nil || subjectId.Handle.IsInvalidHandle() {
231 return err
232 }
233
234 // TODO: fix this for good, we need to fetch the record here unfortunately
235 // resolve this aturi to extract the repo record
236 owner, pdsClient, err := h.resolver.PDSClient(ctx, repoAt.Authority().String())
237 if err != nil {
238 return fmt.Errorf("failed to resolve owner: %w", err)
239 }
240
241 resp, err := comatproto.RepoGetRecord(ctx, pdsClient, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
242 if err != nil {
243 return err
244 }
245
246 repo := resp.Value.Val.(*tangled.Repo)
247 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
248
249 // check perms for this user
250 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo)
251 if err != nil {
252 return fmt.Errorf("failed to check permissions: %w", err)
253 }
254 if !ok {
255 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo)
256 }
257
258 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
259 return err
260 }
261 h.jc.AddDid(subjectId.DID.String())
262
263 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
264 return err
265 }
266
267 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
268}
269
270func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
271 l := log.FromContext(ctx)
272
273 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
274 if err != nil {
275 l.Error("error building endpoint url", "did", did, "error", err.Error())
276 return fmt.Errorf("error building endpoint url: %w", err)
277 }
278
279 resp, err := http.Get(keysEndpoint)
280 if err != nil {
281 l.Error("error getting keys", "did", did, "error", err)
282 return fmt.Errorf("error getting keys: %w", err)
283 }
284 defer resp.Body.Close()
285
286 if resp.StatusCode == http.StatusNotFound {
287 l.Info("no keys found for did", "did", did)
288 return nil
289 }
290
291 plaintext, err := io.ReadAll(resp.Body)
292 if err != nil {
293 l.Error("error reading response body", "error", err)
294 return fmt.Errorf("error reading response body: %w", err)
295 }
296
297 for key := range strings.SplitSeq(string(plaintext), "\n") {
298 if key == "" {
299 continue
300 }
301 pk := db.PublicKey{
302 Did: did,
303 }
304 pk.Key = key
305 if err := h.db.AddPublicKey(pk); err != nil {
306 l.Error("failed to add public key", "error", err)
307 return fmt.Errorf("failed to add public key: %w", err)
308 }
309 }
310 return nil
311}
312
313func (h *Knot) processMessages(ctx context.Context, event *models.Event) error {
314 if event.Kind != models.EventKindCommit {
315 return nil
316 }
317
318 var err error
319 defer func() {
320 eventTime := event.TimeUS
321 lastTimeUs := eventTime + 1
322 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
323 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
324 }
325 }()
326
327 switch event.Commit.Collection {
328 case tangled.PublicKeyNSID:
329 err = h.processPublicKey(ctx, event)
330 case tangled.KnotMemberNSID:
331 err = h.processKnotMember(ctx, event)
332 case tangled.RepoPullNSID:
333 err = h.processPull(ctx, event)
334 case tangled.RepoCollaboratorNSID:
335 err = h.processCollaborator(ctx, event)
336 }
337
338 if err != nil {
339 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
340 }
341
342 return nil
343}