+2
-2
appview/ingester.go
+2
-2
appview/ingester.go
+6
-4
jetstream/jetstream.go
+6
-4
jetstream/jetstream.go
···
68
68
type processor func(context.Context, *models.Event) error
69
69
70
70
func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
71
-
// empty filter => all dids allowed
72
-
if len(j.wantedDids) == 0 {
73
-
return processFunc
74
-
}
75
71
// since this closure references j.WantedDids; it should auto-update
76
72
// existing instances of the closure when j.WantedDids is mutated
77
73
return func(ctx context.Context, evt *models.Event) error {
74
+
75
+
// empty filter => all dids allowed
76
+
if len(j.wantedDids) == 0 {
77
+
return processFunc(ctx, evt)
78
+
}
79
+
78
80
if _, ok := j.wantedDids[evt.Did]; ok {
79
81
return processFunc(ctx, evt)
80
82
} else {
+5
-5
knotserver/handler.go
+5
-5
knotserver/handler.go
···
52
52
return nil, fmt.Errorf("failed to setup enforcer: %w", err)
53
53
}
54
54
55
-
err = h.jc.StartJetstream(ctx, h.processMessages)
56
-
if err != nil {
57
-
return nil, fmt.Errorf("failed to start jetstream: %w", err)
58
-
}
59
-
60
55
// Check if the knot knows about any Dids;
61
56
// if it does, it is already initialized and we can repopulate the
62
57
// Jetstream subscriptions.
···
71
66
for _, d := range dids {
72
67
h.jc.AddDid(d)
73
68
}
69
+
}
70
+
71
+
err = h.jc.StartJetstream(ctx, h.processMessages)
72
+
if err != nil {
73
+
return nil, fmt.Errorf("failed to start jetstream: %w", err)
74
74
}
75
75
76
76
r.Get("/", h.Index)
+46
-42
knotserver/ingester.go
+46
-42
knotserver/ingester.go
···
25
25
"tangled.sh/tangled.sh/core/workflow"
26
26
)
27
27
28
-
func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
28
+
func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error {
29
29
l := log.FromContext(ctx)
30
+
raw := json.RawMessage(event.Commit.Record)
31
+
did := event.Did
32
+
33
+
var record tangled.PublicKey
34
+
if err := json.Unmarshal(raw, &record); err != nil {
35
+
return fmt.Errorf("failed to unmarshal record: %w", err)
36
+
}
37
+
30
38
pk := db.PublicKey{
31
39
Did: did,
32
40
PublicKey: record,
···
39
47
return nil
40
48
}
41
49
42
-
func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
50
+
func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error {
43
51
l := log.FromContext(ctx)
52
+
raw := json.RawMessage(event.Commit.Record)
53
+
did := event.Did
54
+
55
+
var record tangled.KnotMember
56
+
if err := json.Unmarshal(raw, &record); err != nil {
57
+
return fmt.Errorf("failed to unmarshal record: %w", err)
58
+
}
44
59
45
60
if record.Domain != h.c.Server.Hostname {
46
61
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
···
72
87
return nil
73
88
}
74
89
75
-
func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
90
+
func (h *Handle) processPull(ctx context.Context, event *models.Event) error {
91
+
raw := json.RawMessage(event.Commit.Record)
92
+
did := event.Did
93
+
94
+
var record tangled.RepoPull
95
+
if err := json.Unmarshal(raw, &record); err != nil {
96
+
return fmt.Errorf("failed to unmarshal record: %w", err)
97
+
}
98
+
76
99
l := log.FromContext(ctx)
77
100
l = l.With("handler", "processPull")
78
101
l = l.With("did", did)
···
204
227
return nil
205
228
}
206
229
207
-
event := db.Event{
230
+
ev := db.Event{
208
231
Rkey: TID(),
209
232
Nsid: tangled.PipelineNSID,
210
233
EventJson: string(eventJson),
211
234
}
212
235
213
-
return h.db.InsertEvent(event, h.n)
236
+
return h.db.InsertEvent(ev, h.n)
214
237
}
215
238
216
239
// duplicated from add collaborator
217
-
func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
240
+
func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error {
241
+
raw := json.RawMessage(event.Commit.Record)
242
+
did := event.Did
243
+
244
+
var record tangled.RepoCollaborator
245
+
if err := json.Unmarshal(raw, &record); err != nil {
246
+
return fmt.Errorf("failed to unmarshal record: %w", err)
247
+
}
248
+
218
249
repoAt, err := syntax.ParseATURI(record.Repo)
219
250
if err != nil {
220
251
return err
···
247
278
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
248
279
249
280
// check perms for this user
250
-
if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
281
+
if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil {
251
282
return fmt.Errorf("insufficient permissions: %w", err)
252
283
}
253
284
···
307
338
}
308
339
309
340
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
310
-
did := event.Did
311
341
if event.Kind != models.EventKindCommit {
312
342
return nil
313
343
}
···
321
351
}
322
352
}()
323
353
324
-
raw := json.RawMessage(event.Commit.Record)
325
-
326
354
switch event.Commit.Collection {
327
355
case tangled.PublicKeyNSID:
328
-
var record tangled.PublicKey
329
-
if err := json.Unmarshal(raw, &record); err != nil {
330
-
return fmt.Errorf("failed to unmarshal record: %w", err)
331
-
}
332
-
if err := h.processPublicKey(ctx, did, record); err != nil {
333
-
return fmt.Errorf("failed to process public key: %w", err)
334
-
}
335
-
356
+
err = h.processPublicKey(ctx, event)
336
357
case tangled.KnotMemberNSID:
337
-
var record tangled.KnotMember
338
-
if err := json.Unmarshal(raw, &record); err != nil {
339
-
return fmt.Errorf("failed to unmarshal record: %w", err)
340
-
}
341
-
if err := h.processKnotMember(ctx, did, record); err != nil {
342
-
return fmt.Errorf("failed to process knot member: %w", err)
343
-
}
344
-
358
+
err = h.processKnotMember(ctx, event)
345
359
case tangled.RepoPullNSID:
346
-
var record tangled.RepoPull
347
-
if err := json.Unmarshal(raw, &record); err != nil {
348
-
return fmt.Errorf("failed to unmarshal record: %w", err)
349
-
}
350
-
if err := h.processPull(ctx, did, record); err != nil {
351
-
return fmt.Errorf("failed to process knot member: %w", err)
352
-
}
353
-
360
+
err = h.processPull(ctx, event)
354
361
case tangled.RepoCollaboratorNSID:
355
-
var record tangled.RepoCollaborator
356
-
if err := json.Unmarshal(raw, &record); err != nil {
357
-
return fmt.Errorf("failed to unmarshal record: %w", err)
358
-
}
359
-
if err := h.processCollaborator(ctx, did, record); err != nil {
360
-
return fmt.Errorf("failed to process knot member: %w", err)
361
-
}
362
+
err = h.processCollaborator(ctx, event)
363
+
}
362
364
365
+
if err != nil {
366
+
h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
363
367
}
364
368
365
-
return err
369
+
return nil
366
370
}
+3
-1
log/log.go
+3
-1
log/log.go
···
9
9
// NewHandler sets up a new slog.Handler with the service name
10
10
// as an attribute
11
11
func NewHandler(name string) slog.Handler {
12
-
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})
12
+
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
13
+
Level: slog.LevelDebug,
14
+
})
13
15
14
16
var attrs []slog.Attr
15
17
attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
+8
-4
spindle/ingester.go
+8
-4
spindle/ingester.go
···
40
40
41
41
switch e.Commit.Collection {
42
42
case tangled.SpindleMemberNSID:
43
-
s.ingestMember(ctx, e)
43
+
err = s.ingestMember(ctx, e)
44
44
case tangled.RepoNSID:
45
-
s.ingestRepo(ctx, e)
45
+
err = s.ingestRepo(ctx, e)
46
46
case tangled.RepoCollaboratorNSID:
47
-
s.ingestCollaborator(ctx, e)
47
+
err = s.ingestCollaborator(ctx, e)
48
48
}
49
49
50
-
return err
50
+
if err != nil {
51
+
s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err)
52
+
}
53
+
54
+
return nil
51
55
}
52
56
}
53
57