···5252 return nil, fmt.Errorf("failed to setup enforcer: %w", err)
5353 }
54545555- err = h.jc.StartJetstream(ctx, h.processMessages)
5656- if err != nil {
5757- return nil, fmt.Errorf("failed to start jetstream: %w", err)
5858- }
5959-6055 // Check if the knot knows about any Dids;
6156 // if it does, it is already initialized and we can repopulate the
6257 // Jetstream subscriptions.
···7368 }
7469 }
75707171+ err = h.jc.StartJetstream(ctx, h.processMessages)
7272+ if err != nil {
7373+ return nil, fmt.Errorf("failed to start jetstream: %w", err)
7474+ }
7575+7676 r.Get("/", h.Index)
7777 r.Get("/capabilities", h.Capabilities)
7878 r.Get("/version", h.Version)
+46-42
knotserver/ingester.go
···2525 "tangled.sh/tangled.sh/core/workflow"
2626)
27272828-func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
2828+func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error {
2929 l := log.FromContext(ctx)
3030+ raw := json.RawMessage(event.Commit.Record)
3131+ did := event.Did
3232+3333+ var record tangled.PublicKey
3434+ if err := json.Unmarshal(raw, &record); err != nil {
3535+ return fmt.Errorf("failed to unmarshal record: %w", err)
3636+ }
3737+3038 pk := db.PublicKey{
3139 Did: did,
3240 PublicKey: record,
···3947 return nil
4048}
41494242-func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
5050+func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error {
4351 l := log.FromContext(ctx)
5252+ raw := json.RawMessage(event.Commit.Record)
5353+ did := event.Did
5454+5555+ var record tangled.KnotMember
5656+ if err := json.Unmarshal(raw, &record); err != nil {
5757+ return fmt.Errorf("failed to unmarshal record: %w", err)
5858+ }
44594560 if record.Domain != h.c.Server.Hostname {
4661 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
···7287 return nil
7388}
74897575-func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
9090+func (h *Handle) processPull(ctx context.Context, event *models.Event) error {
9191+ raw := json.RawMessage(event.Commit.Record)
9292+ did := event.Did
9393+9494+ var record tangled.RepoPull
9595+ if err := json.Unmarshal(raw, &record); err != nil {
9696+ return fmt.Errorf("failed to unmarshal record: %w", err)
9797+ }
9898+7699 l := log.FromContext(ctx)
77100 l = l.With("handler", "processPull")
78101 l = l.With("did", did)
···204227 return nil
205228 }
206229207207- event := db.Event{
230230+ ev := db.Event{
208231 Rkey: TID(),
209232 Nsid: tangled.PipelineNSID,
210233 EventJson: string(eventJson),
211234 }
212235213213- return h.db.InsertEvent(event, h.n)
236236+ return h.db.InsertEvent(ev, h.n)
214237}
215238216239// duplicated from add collaborator
217217-func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
240240+func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error {
241241+ raw := json.RawMessage(event.Commit.Record)
242242+ did := event.Did
243243+244244+ var record tangled.RepoCollaborator
245245+ if err := json.Unmarshal(raw, &record); err != nil {
246246+ return fmt.Errorf("failed to unmarshal record: %w", err)
247247+ }
248248+218249 repoAt, err := syntax.ParseATURI(record.Repo)
219250 if err != nil {
220251 return err
···247278 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
248279249280 // check perms for this user
250250- if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
281281+ if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil {
251282 return fmt.Errorf("insufficient permissions: %w", err)
252283 }
253284···307338}
308339309340func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
310310- did := event.Did
311341 if event.Kind != models.EventKindCommit {
312342 return nil
313343 }
···321351 }
322352 }()
323353324324- raw := json.RawMessage(event.Commit.Record)
325325-326354 switch event.Commit.Collection {
327355 case tangled.PublicKeyNSID:
328328- var record tangled.PublicKey
329329- if err := json.Unmarshal(raw, &record); err != nil {
330330- return fmt.Errorf("failed to unmarshal record: %w", err)
331331- }
332332- if err := h.processPublicKey(ctx, did, record); err != nil {
333333- return fmt.Errorf("failed to process public key: %w", err)
334334- }
335335-356356+ err = h.processPublicKey(ctx, event)
336357 case tangled.KnotMemberNSID:
337337- var record tangled.KnotMember
338338- if err := json.Unmarshal(raw, &record); err != nil {
339339- return fmt.Errorf("failed to unmarshal record: %w", err)
340340- }
341341- if err := h.processKnotMember(ctx, did, record); err != nil {
342342- return fmt.Errorf("failed to process knot member: %w", err)
343343- }
344344-358358+ err = h.processKnotMember(ctx, event)
345359 case tangled.RepoPullNSID:
346346- var record tangled.RepoPull
347347- if err := json.Unmarshal(raw, &record); err != nil {
348348- return fmt.Errorf("failed to unmarshal record: %w", err)
349349- }
350350- if err := h.processPull(ctx, did, record); err != nil {
351351- return fmt.Errorf("failed to process knot member: %w", err)
352352- }
353353-360360+ err = h.processPull(ctx, event)
354361 case tangled.RepoCollaboratorNSID:
355355- var record tangled.RepoCollaborator
356356- if err := json.Unmarshal(raw, &record); err != nil {
357357- return fmt.Errorf("failed to unmarshal record: %w", err)
358358- }
359359- if err := h.processCollaborator(ctx, did, record); err != nil {
360360- return fmt.Errorf("failed to process knot member: %w", err)
361361- }
362362+ err = h.processCollaborator(ctx, event)
363363+ }
362364365365+ if err != nil {
366366+ h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
363367 }
364368365365- return err
369369+ return nil
366370}
+3-1
log/log.go
···99// NewHandler sets up a new slog.Handler with the service name
1010// as an attribute
1111func NewHandler(name string) slog.Handler {
1212- handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})
1212+ handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
1313+ Level: slog.LevelDebug,
1414+ })
13151416 var attrs []slog.Attr
1517 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})