forked from tangled.org/core
this repo has no description

appview,knotserver,spindle: rework jetstream

do not return errors from ingesters, this causes the read loop to be
killed.

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled c8d76b33 bb4ac7ec

Changed files
+70 -58
appview
jetstream
knotserver
log
spindle
+2 -2
appview/ingester.go
··· 71 } 72 73 if err != nil { 74 - l.Error("error ingesting record", "err", err) 75 } 76 77 - return err 78 } 79 } 80
··· 71 } 72 73 if err != nil { 74 + l.Debug("error ingesting record", "err", err) 75 } 76 77 + return nil 78 } 79 } 80
+6 -4
jetstream/jetstream.go
··· 68 type processor func(context.Context, *models.Event) error 69 70 func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 71 - // empty filter => all dids allowed 72 - if len(j.wantedDids) == 0 { 73 - return processFunc 74 - } 75 // since this closure references j.WantedDids; it should auto-update 76 // existing instances of the closure when j.WantedDids is mutated 77 return func(ctx context.Context, evt *models.Event) error { 78 if _, ok := j.wantedDids[evt.Did]; ok { 79 return processFunc(ctx, evt) 80 } else {
··· 68 type processor func(context.Context, *models.Event) error 69 70 func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 71 // since this closure references j.WantedDids; it should auto-update 72 // existing instances of the closure when j.WantedDids is mutated 73 return func(ctx context.Context, evt *models.Event) error { 74 + 75 + // empty filter => all dids allowed 76 + if len(j.wantedDids) == 0 { 77 + return processFunc(ctx, evt) 78 + } 79 + 80 if _, ok := j.wantedDids[evt.Did]; ok { 81 return processFunc(ctx, evt) 82 } else {
+5 -5
knotserver/handler.go
··· 52 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 53 } 54 55 - err = h.jc.StartJetstream(ctx, h.processMessages) 56 - if err != nil { 57 - return nil, fmt.Errorf("failed to start jetstream: %w", err) 58 - } 59 - 60 // Check if the knot knows about any Dids; 61 // if it does, it is already initialized and we can repopulate the 62 // Jetstream subscriptions. ··· 71 for _, d := range dids { 72 h.jc.AddDid(d) 73 } 74 } 75 76 r.Get("/", h.Index)
··· 52 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 53 } 54 55 // Check if the knot knows about any Dids; 56 // if it does, it is already initialized and we can repopulate the 57 // Jetstream subscriptions. ··· 66 for _, d := range dids { 67 h.jc.AddDid(d) 68 } 69 + } 70 + 71 + err = h.jc.StartJetstream(ctx, h.processMessages) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to start jetstream: %w", err) 74 } 75 76 r.Get("/", h.Index)
+46 -42
knotserver/ingester.go
··· 25 "tangled.sh/tangled.sh/core/workflow" 26 ) 27 28 - func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 29 l := log.FromContext(ctx) 30 pk := db.PublicKey{ 31 Did: did, 32 PublicKey: record, ··· 39 return nil 40 } 41 42 - func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 43 l := log.FromContext(ctx) 44 45 if record.Domain != h.c.Server.Hostname { 46 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) ··· 72 return nil 73 } 74 75 - func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 76 l := log.FromContext(ctx) 77 l = l.With("handler", "processPull") 78 l = l.With("did", did) ··· 200 return nil 201 } 202 203 - event := db.Event{ 204 Rkey: TID(), 205 Nsid: tangled.PipelineNSID, 206 EventJson: string(eventJson), 207 } 208 209 - return h.db.InsertEvent(event, h.n) 210 } 211 212 // duplicated from add collaborator 213 - func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { 214 repoAt, err := syntax.ParseATURI(record.Repo) 215 if err != nil { 216 return err ··· 243 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 244 245 // check perms for this user 246 - if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 247 return fmt.Errorf("insufficient permissions: %w", err) 248 } 249 ··· 303 } 304 305 func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 306 - did := event.Did 307 if event.Kind != models.EventKindCommit { 308 return nil 309 } ··· 317 } 318 }() 319 320 - raw := json.RawMessage(event.Commit.Record) 321 - 322 switch event.Commit.Collection { 323 case tangled.PublicKeyNSID: 324 - var record tangled.PublicKey 325 - if err := json.Unmarshal(raw, &record); err != nil { 326 - return fmt.Errorf("failed to unmarshal record: %w", err) 327 - } 328 - if err := h.processPublicKey(ctx, did, record); err != nil { 329 - return fmt.Errorf("failed to process public key: %w", err) 330 - } 331 - 332 case tangled.KnotMemberNSID: 333 - var record tangled.KnotMember 334 - if err := json.Unmarshal(raw, &record); err != nil { 335 - return fmt.Errorf("failed to unmarshal record: %w", err) 336 - } 337 - if err := h.processKnotMember(ctx, did, record); err != nil { 338 - return fmt.Errorf("failed to process knot member: %w", err) 339 - } 340 - 341 case tangled.RepoPullNSID: 342 - var record tangled.RepoPull 343 - if err := json.Unmarshal(raw, &record); err != nil { 344 - return fmt.Errorf("failed to unmarshal record: %w", err) 345 - } 346 - if err := h.processPull(ctx, did, record); err != nil { 347 - return fmt.Errorf("failed to process knot member: %w", err) 348 - } 349 - 350 case tangled.RepoCollaboratorNSID: 351 - var record tangled.RepoCollaborator 352 - if err := json.Unmarshal(raw, &record); err != nil { 353 - return fmt.Errorf("failed to unmarshal record: %w", err) 354 - } 355 - if err := h.processCollaborator(ctx, did, record); err != nil { 356 - return fmt.Errorf("failed to process knot member: %w", err) 357 - } 358 359 } 360 361 - return err 362 }
··· 25 "tangled.sh/tangled.sh/core/workflow" 26 ) 27 28 + func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error { 29 l := log.FromContext(ctx) 30 + raw := json.RawMessage(event.Commit.Record) 31 + did := event.Did 32 + 33 + var record tangled.PublicKey 34 + if err := json.Unmarshal(raw, &record); err != nil { 35 + return fmt.Errorf("failed to unmarshal record: %w", err) 36 + } 37 + 38 pk := db.PublicKey{ 39 Did: did, 40 PublicKey: record, ··· 47 return nil 48 } 49 50 + func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error { 51 l := log.FromContext(ctx) 52 + raw := json.RawMessage(event.Commit.Record) 53 + did := event.Did 54 + 55 + var record tangled.KnotMember 56 + if err := json.Unmarshal(raw, &record); err != nil { 57 + return fmt.Errorf("failed to unmarshal record: %w", err) 58 + } 59 60 if record.Domain != h.c.Server.Hostname { 61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) ··· 87 return nil 88 } 89 90 + func (h *Handle) processPull(ctx context.Context, event *models.Event) error { 91 + raw := json.RawMessage(event.Commit.Record) 92 + did := event.Did 93 + 94 + var record tangled.RepoPull 95 + if err := json.Unmarshal(raw, &record); err != nil { 96 + return fmt.Errorf("failed to unmarshal record: %w", err) 97 + } 98 + 99 l := log.FromContext(ctx) 100 l = l.With("handler", "processPull") 101 l = l.With("did", did) ··· 223 return nil 224 } 225 226 + ev := db.Event{ 227 Rkey: TID(), 228 Nsid: tangled.PipelineNSID, 229 EventJson: string(eventJson), 230 } 231 232 + return h.db.InsertEvent(ev, h.n) 233 } 234 235 // duplicated from add collaborator 236 + func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error { 237 + raw := json.RawMessage(event.Commit.Record) 238 + did := event.Did 239 + 240 + var record tangled.RepoCollaborator 241 + if err := json.Unmarshal(raw, &record); err != nil { 242 + return fmt.Errorf("failed to unmarshal record: %w", err) 243 + } 244 + 245 repoAt, err := syntax.ParseATURI(record.Repo) 246 if err != nil { 247 return err ··· 274 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 275 276 // check perms for this user 277 + if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil { 278 return fmt.Errorf("insufficient permissions: %w", err) 279 } 280 ··· 334 } 335 336 func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 337 if event.Kind != models.EventKindCommit { 338 return nil 339 } ··· 347 } 348 }() 349 350 switch event.Commit.Collection { 351 case tangled.PublicKeyNSID: 352 + err = h.processPublicKey(ctx, event) 353 case tangled.KnotMemberNSID: 354 + err = h.processKnotMember(ctx, event) 355 case tangled.RepoPullNSID: 356 + err = h.processPull(ctx, event) 357 case tangled.RepoCollaboratorNSID: 358 + err = h.processCollaborator(ctx, event) 359 + } 360 361 + if err != nil { 362 + h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 363 } 364 365 + return nil 366 }
+3 -1
log/log.go
··· 9 // NewHandler sets up a new slog.Handler with the service name 10 // as an attribute 11 func NewHandler(name string) slog.Handler { 12 - handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}) 13 14 var attrs []slog.Attr 15 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
··· 9 // NewHandler sets up a new slog.Handler with the service name 10 // as an attribute 11 func NewHandler(name string) slog.Handler { 12 + handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 13 + Level: slog.LevelDebug, 14 + }) 15 16 var attrs []slog.Attr 17 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
+8 -4
spindle/ingester.go
··· 40 41 switch e.Commit.Collection { 42 case tangled.SpindleMemberNSID: 43 - s.ingestMember(ctx, e) 44 case tangled.RepoNSID: 45 - s.ingestRepo(ctx, e) 46 case tangled.RepoCollaboratorNSID: 47 - s.ingestCollaborator(ctx, e) 48 } 49 50 - return err 51 } 52 } 53
··· 40 41 switch e.Commit.Collection { 42 case tangled.SpindleMemberNSID: 43 + err = s.ingestMember(ctx, e) 44 case tangled.RepoNSID: 45 + err = s.ingestRepo(ctx, e) 46 case tangled.RepoCollaboratorNSID: 47 + err = s.ingestCollaborator(ctx, e) 48 } 49 50 + if err != nil { 51 + s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 52 + } 53 + 54 + return nil 55 } 56 } 57