···68type processor func(context.Context, *models.Event) error
6970func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
71- // empty filter => all dids allowed
72- if len(j.wantedDids) == 0 {
73- return processFunc
74- }
75 // since this closure references j.WantedDids; it should auto-update
76 // existing instances of the closure when j.WantedDids is mutated
77 return func(ctx context.Context, evt *models.Event) error {
00000078 if _, ok := j.wantedDids[evt.Did]; ok {
79 return processFunc(ctx, evt)
80 } else {
···68type processor func(context.Context, *models.Event) error
6970func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
000071 // since this closure references j.WantedDids; it should auto-update
72 // existing instances of the closure when j.WantedDids is mutated
73 return func(ctx context.Context, evt *models.Event) error {
74+75+ // empty filter => all dids allowed
76+ if len(j.wantedDids) == 0 {
77+ return processFunc(ctx, evt)
78+ }
79+80 if _, ok := j.wantedDids[evt.Did]; ok {
81 return processFunc(ctx, evt)
82 } else {
+5-5
knotserver/handler.go
···52 return nil, fmt.Errorf("failed to setup enforcer: %w", err)
53 }
5455- err = h.jc.StartJetstream(ctx, h.processMessages)
56- if err != nil {
57- return nil, fmt.Errorf("failed to start jetstream: %w", err)
58- }
59-60 // Check if the knot knows about any Dids;
61 // if it does, it is already initialized and we can repopulate the
62 // Jetstream subscriptions.
···73 }
74 }
750000076 r.Get("/", h.Index)
77 r.Get("/capabilities", h.Capabilities)
78 r.Get("/version", h.Version)
···52 return nil, fmt.Errorf("failed to setup enforcer: %w", err)
53 }
540000055 // Check if the knot knows about any Dids;
56 // if it does, it is already initialized and we can repopulate the
57 // Jetstream subscriptions.
···68 }
69 }
7071+ err = h.jc.StartJetstream(ctx, h.processMessages)
72+ if err != nil {
73+ return nil, fmt.Errorf("failed to start jetstream: %w", err)
74+ }
75+76 r.Get("/", h.Index)
77 r.Get("/capabilities", h.Capabilities)
78 r.Get("/version", h.Version)
+46-42
knotserver/ingester.go
···25 "tangled.sh/tangled.sh/core/workflow"
26)
2728-func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
29 l := log.FromContext(ctx)
0000000030 pk := db.PublicKey{
31 Did: did,
32 PublicKey: record,
···39 return nil
40}
4142-func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
43 l := log.FromContext(ctx)
00000004445 if record.Domain != h.c.Server.Hostname {
46 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
···72 return nil
73}
7475-func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
0000000076 l := log.FromContext(ctx)
77 l = l.With("handler", "processPull")
78 l = l.With("did", did)
···204 return nil
205 }
206207- event := db.Event{
208 Rkey: TID(),
209 Nsid: tangled.PipelineNSID,
210 EventJson: string(eventJson),
211 }
212213- return h.db.InsertEvent(event, h.n)
214}
215216// duplicated from add collaborator
217-func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
00000000218 repoAt, err := syntax.ParseATURI(record.Repo)
219 if err != nil {
220 return err
···247 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
248249 // check perms for this user
250- if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
251 return fmt.Errorf("insufficient permissions: %w", err)
252 }
253···307}
308309func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
310- did := event.Did
311 if event.Kind != models.EventKindCommit {
312 return nil
313 }
···321 }
322 }()
323324- raw := json.RawMessage(event.Commit.Record)
325-326 switch event.Commit.Collection {
327 case tangled.PublicKeyNSID:
328- var record tangled.PublicKey
329- if err := json.Unmarshal(raw, &record); err != nil {
330- return fmt.Errorf("failed to unmarshal record: %w", err)
331- }
332- if err := h.processPublicKey(ctx, did, record); err != nil {
333- return fmt.Errorf("failed to process public key: %w", err)
334- }
335-336 case tangled.KnotMemberNSID:
337- var record tangled.KnotMember
338- if err := json.Unmarshal(raw, &record); err != nil {
339- return fmt.Errorf("failed to unmarshal record: %w", err)
340- }
341- if err := h.processKnotMember(ctx, did, record); err != nil {
342- return fmt.Errorf("failed to process knot member: %w", err)
343- }
344-345 case tangled.RepoPullNSID:
346- var record tangled.RepoPull
347- if err := json.Unmarshal(raw, &record); err != nil {
348- return fmt.Errorf("failed to unmarshal record: %w", err)
349- }
350- if err := h.processPull(ctx, did, record); err != nil {
351- return fmt.Errorf("failed to process knot member: %w", err)
352- }
353-354 case tangled.RepoCollaboratorNSID:
355- var record tangled.RepoCollaborator
356- if err := json.Unmarshal(raw, &record); err != nil {
357- return fmt.Errorf("failed to unmarshal record: %w", err)
358- }
359- if err := h.processCollaborator(ctx, did, record); err != nil {
360- return fmt.Errorf("failed to process knot member: %w", err)
361- }
36200363 }
364365- return err
366}
···25 "tangled.sh/tangled.sh/core/workflow"
26)
2728+func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error {
29 l := log.FromContext(ctx)
30+ raw := json.RawMessage(event.Commit.Record)
31+ did := event.Did
32+33+ var record tangled.PublicKey
34+ if err := json.Unmarshal(raw, &record); err != nil {
35+ return fmt.Errorf("failed to unmarshal record: %w", err)
36+ }
37+38 pk := db.PublicKey{
39 Did: did,
40 PublicKey: record,
···47 return nil
48}
4950+func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error {
51 l := log.FromContext(ctx)
52+ raw := json.RawMessage(event.Commit.Record)
53+ did := event.Did
54+55+ var record tangled.KnotMember
56+ if err := json.Unmarshal(raw, &record); err != nil {
57+ return fmt.Errorf("failed to unmarshal record: %w", err)
58+ }
5960 if record.Domain != h.c.Server.Hostname {
61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
···87 return nil
88}
8990+func (h *Handle) processPull(ctx context.Context, event *models.Event) error {
91+ raw := json.RawMessage(event.Commit.Record)
92+ did := event.Did
93+94+ var record tangled.RepoPull
95+ if err := json.Unmarshal(raw, &record); err != nil {
96+ return fmt.Errorf("failed to unmarshal record: %w", err)
97+ }
98+99 l := log.FromContext(ctx)
100 l = l.With("handler", "processPull")
101 l = l.With("did", did)
···227 return nil
228 }
229230+ ev := db.Event{
231 Rkey: TID(),
232 Nsid: tangled.PipelineNSID,
233 EventJson: string(eventJson),
234 }
235236+ return h.db.InsertEvent(ev, h.n)
237}
238239// duplicated from add collaborator
240+func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error {
241+ raw := json.RawMessage(event.Commit.Record)
242+ did := event.Did
243+244+ var record tangled.RepoCollaborator
245+ if err := json.Unmarshal(raw, &record); err != nil {
246+ return fmt.Errorf("failed to unmarshal record: %w", err)
247+ }
248+249 repoAt, err := syntax.ParseATURI(record.Repo)
250 if err != nil {
251 return err
···278 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
279280 // check perms for this user
281+ if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil {
282 return fmt.Errorf("insufficient permissions: %w", err)
283 }
284···338}
339340func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
0341 if event.Kind != models.EventKindCommit {
342 return nil
343 }
···351 }
352 }()
35300354 switch event.Commit.Collection {
355 case tangled.PublicKeyNSID:
356+ err = h.processPublicKey(ctx, event)
0000000357 case tangled.KnotMemberNSID:
358+ err = h.processKnotMember(ctx, event)
0000000359 case tangled.RepoPullNSID:
360+ err = h.processPull(ctx, event)
0000000361 case tangled.RepoCollaboratorNSID:
362+ err = h.processCollaborator(ctx, event)
363+ }
00000364365+ if err != nil {
366+ h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
367 }
368369+ return nil
370}
+3-1
log/log.go
···9// NewHandler sets up a new slog.Handler with the service name
10// as an attribute
11func NewHandler(name string) slog.Handler {
12- handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})
001314 var attrs []slog.Attr
15 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
···9// NewHandler sets up a new slog.Handler with the service name
10// as an attribute
11func NewHandler(name string) slog.Handler {
12+ handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
13+ Level: slog.LevelDebug,
14+ })
1516 var attrs []slog.Attr
17 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})