+305
knotserver/ingester.go
+305
knotserver/ingester.go
···
1
+
package knotserver
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"fmt"
7
+
"io"
8
+
"net/http"
9
+
"net/url"
10
+
"path/filepath"
11
+
"slices"
12
+
"strings"
13
+
14
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
15
+
"github.com/bluesky-social/indigo/atproto/syntax"
16
+
"github.com/bluesky-social/indigo/xrpc"
17
+
"github.com/bluesky-social/jetstream/pkg/models"
18
+
securejoin "github.com/cyphar/filepath-securejoin"
19
+
"tangled.sh/tangled.sh/core/api/tangled"
20
+
"tangled.sh/tangled.sh/core/appview/idresolver"
21
+
"tangled.sh/tangled.sh/core/knotserver/db"
22
+
"tangled.sh/tangled.sh/core/knotserver/git"
23
+
"tangled.sh/tangled.sh/core/log"
24
+
"tangled.sh/tangled.sh/core/workflow"
25
+
)
26
+
27
+
func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
28
+
l := log.FromContext(ctx)
29
+
pk := db.PublicKey{
30
+
Did: did,
31
+
PublicKey: record,
32
+
}
33
+
if err := h.db.AddPublicKey(pk); err != nil {
34
+
l.Error("failed to add public key", "error", err)
35
+
return fmt.Errorf("failed to add public key: %w", err)
36
+
}
37
+
l.Info("added public key from firehose", "did", did)
38
+
return nil
39
+
}
40
+
41
+
func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
42
+
l := log.FromContext(ctx)
43
+
44
+
if record.Domain != h.c.Server.Hostname {
45
+
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
46
+
return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
47
+
}
48
+
49
+
ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
50
+
if err != nil || !ok {
51
+
l.Error("failed to add member", "did", did)
52
+
return fmt.Errorf("failed to enforce permissions: %w", err)
53
+
}
54
+
55
+
if err := h.e.AddKnotMember(ThisServer, record.Subject); err != nil {
56
+
l.Error("failed to add member", "error", err)
57
+
return fmt.Errorf("failed to add member: %w", err)
58
+
}
59
+
l.Info("added member from firehose", "member", record.Subject)
60
+
61
+
if err := h.db.AddDid(did); err != nil {
62
+
l.Error("failed to add did", "error", err)
63
+
return fmt.Errorf("failed to add did: %w", err)
64
+
}
65
+
h.jc.AddDid(did)
66
+
67
+
if err := h.fetchAndAddKeys(ctx, did); err != nil {
68
+
return fmt.Errorf("failed to fetch and add keys: %w", err)
69
+
}
70
+
71
+
return nil
72
+
}
73
+
74
+
func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
75
+
l := log.FromContext(ctx)
76
+
l = l.With("handler", "processPull")
77
+
l = l.With("did", did)
78
+
l = l.With("target_repo", record.TargetRepo)
79
+
l = l.With("target_branch", record.TargetBranch)
80
+
81
+
if record.Source == nil {
82
+
reason := "not a branch-based pull request"
83
+
l.Info("ignoring pull record", "reason", reason)
84
+
return fmt.Errorf("ignoring pull record: %s", reason)
85
+
}
86
+
87
+
if record.Source.Repo != nil {
88
+
reason := "fork based pull"
89
+
l.Info("ignoring pull record", "reason", reason)
90
+
return fmt.Errorf("ignoring pull record: %s", reason)
91
+
}
92
+
93
+
allDids, err := h.db.GetAllDids()
94
+
if err != nil {
95
+
return err
96
+
}
97
+
98
+
// presently: we only process PRs from collaborators for pipelines
99
+
if !slices.Contains(allDids, did) {
100
+
reason := "not a known did"
101
+
l.Info("rejecting pull record", "reason", reason)
102
+
return fmt.Errorf("rejected pull record: %s, %s", reason, did)
103
+
}
104
+
105
+
repoAt, err := syntax.ParseATURI(record.TargetRepo)
106
+
if err != nil {
107
+
return err
108
+
}
109
+
110
+
// resolve this aturi to extract the repo record
111
+
resolver := idresolver.DefaultResolver()
112
+
ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
113
+
if err != nil || ident.Handle.IsInvalidHandle() {
114
+
return fmt.Errorf("failed to resolve handle: %w", err)
115
+
}
116
+
117
+
xrpcc := xrpc.Client{
118
+
Host: ident.PDSEndpoint(),
119
+
}
120
+
121
+
resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
122
+
if err != nil {
123
+
return err
124
+
}
125
+
126
+
repo := resp.Value.Val.(*tangled.Repo)
127
+
128
+
if repo.Knot != h.c.Server.Hostname {
129
+
reason := "not this knot"
130
+
l.Info("rejecting pull record", "reason", reason)
131
+
return fmt.Errorf("rejected pull record: %s", reason)
132
+
}
133
+
134
+
didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
135
+
if err != nil {
136
+
return err
137
+
}
138
+
139
+
repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
140
+
if err != nil {
141
+
return err
142
+
}
143
+
144
+
gr, err := git.Open(repoPath, record.Source.Branch)
145
+
if err != nil {
146
+
return err
147
+
}
148
+
149
+
workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
150
+
if err != nil {
151
+
return err
152
+
}
153
+
154
+
var pipeline workflow.Pipeline
155
+
for _, e := range workflowDir {
156
+
if !e.IsFile {
157
+
continue
158
+
}
159
+
160
+
fpath := filepath.Join(workflow.WorkflowDir, e.Name)
161
+
contents, err := gr.RawContent(fpath)
162
+
if err != nil {
163
+
continue
164
+
}
165
+
166
+
wf, err := workflow.FromFile(e.Name, contents)
167
+
if err != nil {
168
+
// TODO: log here, respond to client that is pushing
169
+
h.l.Error("failed to parse workflow", "err", err, "path", fpath)
170
+
continue
171
+
}
172
+
173
+
pipeline = append(pipeline, wf)
174
+
}
175
+
176
+
trigger := tangled.Pipeline_PullRequestTriggerData{
177
+
Action: "create",
178
+
SourceBranch: record.Source.Branch,
179
+
SourceSha: record.Source.Sha,
180
+
TargetBranch: record.TargetBranch,
181
+
}
182
+
183
+
compiler := workflow.Compiler{
184
+
Trigger: tangled.Pipeline_TriggerMetadata{
185
+
Kind: string(workflow.TriggerKindPullRequest),
186
+
PullRequest: &trigger,
187
+
Repo: &tangled.Pipeline_TriggerRepo{
188
+
Did: repo.Owner,
189
+
Knot: repo.Knot,
190
+
Repo: repo.Name,
191
+
},
192
+
},
193
+
}
194
+
195
+
cp := compiler.Compile(pipeline)
196
+
eventJson, err := json.Marshal(cp)
197
+
if err != nil {
198
+
return err
199
+
}
200
+
201
+
// do not run empty pipelines
202
+
if cp.Workflows == nil {
203
+
return nil
204
+
}
205
+
206
+
event := db.Event{
207
+
Rkey: TID(),
208
+
Nsid: tangled.PipelineNSID,
209
+
EventJson: string(eventJson),
210
+
}
211
+
212
+
return h.db.InsertEvent(event, h.n)
213
+
}
214
+
215
+
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
216
+
l := log.FromContext(ctx)
217
+
218
+
keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
219
+
if err != nil {
220
+
l.Error("error building endpoint url", "did", did, "error", err.Error())
221
+
return fmt.Errorf("error building endpoint url: %w", err)
222
+
}
223
+
224
+
resp, err := http.Get(keysEndpoint)
225
+
if err != nil {
226
+
l.Error("error getting keys", "did", did, "error", err)
227
+
return fmt.Errorf("error getting keys: %w", err)
228
+
}
229
+
defer resp.Body.Close()
230
+
231
+
if resp.StatusCode == http.StatusNotFound {
232
+
l.Info("no keys found for did", "did", did)
233
+
return nil
234
+
}
235
+
236
+
plaintext, err := io.ReadAll(resp.Body)
237
+
if err != nil {
238
+
l.Error("error reading response body", "error", err)
239
+
return fmt.Errorf("error reading response body: %w", err)
240
+
}
241
+
242
+
for _, key := range strings.Split(string(plaintext), "\n") {
243
+
if key == "" {
244
+
continue
245
+
}
246
+
pk := db.PublicKey{
247
+
Did: did,
248
+
}
249
+
pk.Key = key
250
+
if err := h.db.AddPublicKey(pk); err != nil {
251
+
l.Error("failed to add public key", "error", err)
252
+
return fmt.Errorf("failed to add public key: %w", err)
253
+
}
254
+
}
255
+
return nil
256
+
}
257
+
258
+
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
259
+
did := event.Did
260
+
if event.Kind != models.EventKindCommit {
261
+
return nil
262
+
}
263
+
264
+
var err error
265
+
defer func() {
266
+
eventTime := event.TimeUS
267
+
lastTimeUs := eventTime + 1
268
+
fmt.Println("lastTimeUs", lastTimeUs)
269
+
if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
270
+
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
271
+
}
272
+
}()
273
+
274
+
raw := json.RawMessage(event.Commit.Record)
275
+
276
+
switch event.Commit.Collection {
277
+
case tangled.PublicKeyNSID:
278
+
var record tangled.PublicKey
279
+
if err := json.Unmarshal(raw, &record); err != nil {
280
+
return fmt.Errorf("failed to unmarshal record: %w", err)
281
+
}
282
+
if err := h.processPublicKey(ctx, did, record); err != nil {
283
+
return fmt.Errorf("failed to process public key: %w", err)
284
+
}
285
+
286
+
case tangled.KnotMemberNSID:
287
+
var record tangled.KnotMember
288
+
if err := json.Unmarshal(raw, &record); err != nil {
289
+
return fmt.Errorf("failed to unmarshal record: %w", err)
290
+
}
291
+
if err := h.processKnotMember(ctx, did, record); err != nil {
292
+
return fmt.Errorf("failed to process knot member: %w", err)
293
+
}
294
+
case tangled.RepoPullNSID:
295
+
var record tangled.RepoPull
296
+
if err := json.Unmarshal(raw, &record); err != nil {
297
+
return fmt.Errorf("failed to unmarshal record: %w", err)
298
+
}
299
+
if err := h.processPull(ctx, did, record); err != nil {
300
+
return fmt.Errorf("failed to process knot member: %w", err)
301
+
}
302
+
}
303
+
304
+
return err
305
+
}
-147
knotserver/jetstream.go
-147
knotserver/jetstream.go
···
1
-
package knotserver
2
-
3
-
import (
4
-
"context"
5
-
"encoding/json"
6
-
"fmt"
7
-
"io"
8
-
"net/http"
9
-
"net/url"
10
-
"strings"
11
-
12
-
"github.com/bluesky-social/jetstream/pkg/models"
13
-
"tangled.sh/tangled.sh/core/api/tangled"
14
-
"tangled.sh/tangled.sh/core/knotserver/db"
15
-
"tangled.sh/tangled.sh/core/log"
16
-
)
17
-
18
-
func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
19
-
l := log.FromContext(ctx)
20
-
pk := db.PublicKey{
21
-
Did: did,
22
-
PublicKey: record,
23
-
}
24
-
if err := h.db.AddPublicKey(pk); err != nil {
25
-
l.Error("failed to add public key", "error", err)
26
-
return fmt.Errorf("failed to add public key: %w", err)
27
-
}
28
-
l.Info("added public key from firehose", "did", did)
29
-
return nil
30
-
}
31
-
32
-
func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
33
-
l := log.FromContext(ctx)
34
-
35
-
if record.Domain != h.c.Server.Hostname {
36
-
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
37
-
return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
38
-
}
39
-
40
-
ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
41
-
if err != nil || !ok {
42
-
l.Error("failed to add member", "did", did)
43
-
return fmt.Errorf("failed to enforce permissions: %w", err)
44
-
}
45
-
46
-
if err := h.e.AddKnotMember(ThisServer, record.Subject); err != nil {
47
-
l.Error("failed to add member", "error", err)
48
-
return fmt.Errorf("failed to add member: %w", err)
49
-
}
50
-
l.Info("added member from firehose", "member", record.Subject)
51
-
52
-
if err := h.db.AddDid(did); err != nil {
53
-
l.Error("failed to add did", "error", err)
54
-
return fmt.Errorf("failed to add did: %w", err)
55
-
}
56
-
h.jc.AddDid(did)
57
-
58
-
if err := h.fetchAndAddKeys(ctx, did); err != nil {
59
-
return fmt.Errorf("failed to fetch and add keys: %w", err)
60
-
}
61
-
62
-
return nil
63
-
}
64
-
65
-
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
66
-
l := log.FromContext(ctx)
67
-
68
-
keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
69
-
if err != nil {
70
-
l.Error("error building endpoint url", "did", did, "error", err.Error())
71
-
return fmt.Errorf("error building endpoint url: %w", err)
72
-
}
73
-
74
-
resp, err := http.Get(keysEndpoint)
75
-
if err != nil {
76
-
l.Error("error getting keys", "did", did, "error", err)
77
-
return fmt.Errorf("error getting keys: %w", err)
78
-
}
79
-
defer resp.Body.Close()
80
-
81
-
if resp.StatusCode == http.StatusNotFound {
82
-
l.Info("no keys found for did", "did", did)
83
-
return nil
84
-
}
85
-
86
-
plaintext, err := io.ReadAll(resp.Body)
87
-
if err != nil {
88
-
l.Error("error reading response body", "error", err)
89
-
return fmt.Errorf("error reading response body: %w", err)
90
-
}
91
-
92
-
for _, key := range strings.Split(string(plaintext), "\n") {
93
-
if key == "" {
94
-
continue
95
-
}
96
-
pk := db.PublicKey{
97
-
Did: did,
98
-
}
99
-
pk.Key = key
100
-
if err := h.db.AddPublicKey(pk); err != nil {
101
-
l.Error("failed to add public key", "error", err)
102
-
return fmt.Errorf("failed to add public key: %w", err)
103
-
}
104
-
}
105
-
return nil
106
-
}
107
-
108
-
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
109
-
did := event.Did
110
-
if event.Kind != models.EventKindCommit {
111
-
return nil
112
-
}
113
-
114
-
var err error
115
-
defer func() {
116
-
eventTime := event.TimeUS
117
-
lastTimeUs := eventTime + 1
118
-
fmt.Println("lastTimeUs", lastTimeUs)
119
-
if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
120
-
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
121
-
}
122
-
}()
123
-
124
-
raw := json.RawMessage(event.Commit.Record)
125
-
126
-
switch event.Commit.Collection {
127
-
case tangled.PublicKeyNSID:
128
-
var record tangled.PublicKey
129
-
if err := json.Unmarshal(raw, &record); err != nil {
130
-
return fmt.Errorf("failed to unmarshal record: %w", err)
131
-
}
132
-
if err := h.processPublicKey(ctx, did, record); err != nil {
133
-
return fmt.Errorf("failed to process public key: %w", err)
134
-
}
135
-
136
-
case tangled.KnotMemberNSID:
137
-
var record tangled.KnotMember
138
-
if err := json.Unmarshal(raw, &record); err != nil {
139
-
return fmt.Errorf("failed to unmarshal record: %w", err)
140
-
}
141
-
if err := h.processKnotMember(ctx, did, record); err != nil {
142
-
return fmt.Errorf("failed to process knot member: %w", err)
143
-
}
144
-
}
145
-
146
-
return err
147
-
}