appview,knotserver,spindle: rework jetstream #480

merged
opened by oppi.li targeting master from push-mtsxyxnkznyy

do not return errors from ingesters, this causes the read loop to be killed.

Signed-off-by: oppiliappan me@oppi.li

Changed files
+69 -58
appview
jetstream
knotserver
log
spindle
+2 -2
appview/ingester.go
··· 71 } 72 73 if err != nil { 74 - l.Error("error ingesting record", "err", err) 75 } 76 77 - return err 78 } 79 } 80
··· 71 } 72 73 if err != nil { 74 + l.Debug("error ingesting record", "err", err) 75 } 76 77 + return nil 78 } 79 } 80
+5 -4
jetstream/jetstream.go
··· 68 type processor func(context.Context, *models.Event) error 69 70 func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 71 - // empty filter => all dids allowed 72 - if len(j.wantedDids) == 0 { 73 - return processFunc 74 - } 75 // since this closure references j.WantedDids; it should auto-update 76 // existing instances of the closure when j.WantedDids is mutated 77 return func(ctx context.Context, evt *models.Event) error { 78 if _, ok := j.wantedDids[evt.Did]; ok { 79 return processFunc(ctx, evt) 80 } else {
··· 68 type processor func(context.Context, *models.Event) error 69 70 func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 71 // since this closure references j.WantedDids; it should auto-update 72 // existing instances of the closure when j.WantedDids is mutated 73 return func(ctx context.Context, evt *models.Event) error { 74 + // empty filter => all dids allowed 75 + if len(j.wantedDids) == 0 { 76 + return processFunc(ctx, evt) 77 + } 78 + 79 if _, ok := j.wantedDids[evt.Did]; ok { 80 return processFunc(ctx, evt) 81 } else {
+5 -5
knotserver/handler.go
··· 52 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 53 } 54 55 - err = h.jc.StartJetstream(ctx, h.processMessages) 56 - if err != nil { 57 - return nil, fmt.Errorf("failed to start jetstream: %w", err) 58 - } 59 - 60 // Check if the knot knows about any Dids; 61 // if it does, it is already initialized and we can repopulate the 62 // Jetstream subscriptions. ··· 73 } 74 } 75 76 r.Get("/", h.Index) 77 r.Get("/capabilities", h.Capabilities) 78 r.Get("/version", h.Version)
··· 52 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 53 } 54 55 // Check if the knot knows about any Dids; 56 // if it does, it is already initialized and we can repopulate the 57 // Jetstream subscriptions. ··· 68 } 69 } 70 71 + err = h.jc.StartJetstream(ctx, h.processMessages) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to start jetstream: %w", err) 74 + } 75 + 76 r.Get("/", h.Index) 77 r.Get("/capabilities", h.Capabilities) 78 r.Get("/version", h.Version)
+46 -42
knotserver/ingester.go
··· 25 "tangled.sh/tangled.sh/core/workflow" 26 ) 27 28 - func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 29 l := log.FromContext(ctx) 30 pk := db.PublicKey{ 31 Did: did, 32 PublicKey: record, ··· 39 return nil 40 } 41 42 - func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 43 l := log.FromContext(ctx) 44 45 if record.Domain != h.c.Server.Hostname { 46 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) ··· 72 return nil 73 } 74 75 - func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 76 l := log.FromContext(ctx) 77 l = l.With("handler", "processPull") 78 l = l.With("did", did) ··· 204 return nil 205 } 206 207 - event := db.Event{ 208 Rkey: TID(), 209 Nsid: tangled.PipelineNSID, 210 EventJson: string(eventJson), 211 } 212 213 - return h.db.InsertEvent(event, h.n) 214 } 215 216 // duplicated from add collaborator 217 - func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { 218 repoAt, err := syntax.ParseATURI(record.Repo) 219 if err != nil { 220 return err ··· 247 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 248 249 // check perms for this user 250 - if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 251 return fmt.Errorf("insufficient permissions: %w", err) 252 } 253 ··· 307 } 308 309 func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 310 - did := event.Did 311 if event.Kind != models.EventKindCommit { 312 return nil 313 } ··· 321 } 322 }() 323 324 - raw := json.RawMessage(event.Commit.Record) 325 - 326 switch event.Commit.Collection { 327 case tangled.PublicKeyNSID: 328 - var record tangled.PublicKey 329 - if err := json.Unmarshal(raw, &record); err != nil { 330 - return fmt.Errorf("failed to unmarshal record: %w", err) 331 - } 332 - if err := h.processPublicKey(ctx, did, record); err != nil { 333 - return fmt.Errorf("failed to process public key: %w", err) 334 - } 335 - 336 case tangled.KnotMemberNSID: 337 - var record tangled.KnotMember 338 - if err := json.Unmarshal(raw, &record); err != nil { 339 - return fmt.Errorf("failed to unmarshal record: %w", err) 340 - } 341 - if err := h.processKnotMember(ctx, did, record); err != nil { 342 - return fmt.Errorf("failed to process knot member: %w", err) 343 - } 344 - 345 case tangled.RepoPullNSID: 346 - var record tangled.RepoPull 347 - if err := json.Unmarshal(raw, &record); err != nil { 348 - return fmt.Errorf("failed to unmarshal record: %w", err) 349 - } 350 - if err := h.processPull(ctx, did, record); err != nil { 351 - return fmt.Errorf("failed to process knot member: %w", err) 352 - } 353 - 354 case tangled.RepoCollaboratorNSID: 355 - var record tangled.RepoCollaborator 356 - if err := json.Unmarshal(raw, &record); err != nil { 357 - return fmt.Errorf("failed to unmarshal record: %w", err) 358 - } 359 - if err := h.processCollaborator(ctx, did, record); err != nil { 360 - return fmt.Errorf("failed to process knot member: %w", err) 361 - } 362 363 } 364 365 - return err 366 }
··· 25 "tangled.sh/tangled.sh/core/workflow" 26 ) 27 28 + func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error { 29 l := log.FromContext(ctx) 30 + raw := json.RawMessage(event.Commit.Record) 31 + did := event.Did 32 + 33 + var record tangled.PublicKey 34 + if err := json.Unmarshal(raw, &record); err != nil { 35 + return fmt.Errorf("failed to unmarshal record: %w", err) 36 + } 37 + 38 pk := db.PublicKey{ 39 Did: did, 40 PublicKey: record, ··· 47 return nil 48 } 49 50 + func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error { 51 l := log.FromContext(ctx) 52 + raw := json.RawMessage(event.Commit.Record) 53 + did := event.Did 54 + 55 + var record tangled.KnotMember 56 + if err := json.Unmarshal(raw, &record); err != nil { 57 + return fmt.Errorf("failed to unmarshal record: %w", err) 58 + } 59 60 if record.Domain != h.c.Server.Hostname { 61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) ··· 87 return nil 88 } 89 90 + func (h *Handle) processPull(ctx context.Context, event *models.Event) error { 91 + raw := json.RawMessage(event.Commit.Record) 92 + did := event.Did 93 + 94 + var record tangled.RepoPull 95 + if err := json.Unmarshal(raw, &record); err != nil { 96 + return fmt.Errorf("failed to unmarshal record: %w", err) 97 + } 98 + 99 l := log.FromContext(ctx) 100 l = l.With("handler", "processPull") 101 l = l.With("did", did) ··· 227 return nil 228 } 229 230 + ev := db.Event{ 231 Rkey: TID(), 232 Nsid: tangled.PipelineNSID, 233 EventJson: string(eventJson), 234 } 235 236 + return h.db.InsertEvent(ev, h.n) 237 } 238 239 // duplicated from add collaborator 240 + func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error { 241 + raw := json.RawMessage(event.Commit.Record) 242 + did := event.Did 243 + 244 + var record tangled.RepoCollaborator 245 + if err := json.Unmarshal(raw, &record); err != nil { 246 + return fmt.Errorf("failed to unmarshal record: %w", err) 247 + } 248 + 249 repoAt, err := syntax.ParseATURI(record.Repo) 250 if err != nil { 251 return err ··· 278 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 279 280 // check perms for this user 281 + if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil { 282 return fmt.Errorf("insufficient permissions: %w", err) 283 } 284 ··· 338 } 339 340 func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 341 if event.Kind != models.EventKindCommit { 342 return nil 343 } ··· 351 } 352 }() 353 354 switch event.Commit.Collection { 355 case tangled.PublicKeyNSID: 356 + err = h.processPublicKey(ctx, event) 357 case tangled.KnotMemberNSID: 358 + err = h.processKnotMember(ctx, event) 359 case tangled.RepoPullNSID: 360 + err = h.processPull(ctx, event) 361 case tangled.RepoCollaboratorNSID: 362 + err = h.processCollaborator(ctx, event) 363 + } 364 365 + if err != nil { 366 + h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 367 } 368 369 + return nil 370 }
+3 -1
log/log.go
··· 9 // NewHandler sets up a new slog.Handler with the service name 10 // as an attribute 11 func NewHandler(name string) slog.Handler { 12 - handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}) 13 14 var attrs []slog.Attr 15 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
··· 9 // NewHandler sets up a new slog.Handler with the service name 10 // as an attribute 11 func NewHandler(name string) slog.Handler { 12 + handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 13 + Level: slog.LevelDebug, 14 + }) 15 16 var attrs []slog.Attr 17 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
+8 -4
spindle/ingester.go
··· 40 41 switch e.Commit.Collection { 42 case tangled.SpindleMemberNSID: 43 - s.ingestMember(ctx, e) 44 case tangled.RepoNSID: 45 - s.ingestRepo(ctx, e) 46 case tangled.RepoCollaboratorNSID: 47 - s.ingestCollaborator(ctx, e) 48 } 49 50 - return err 51 } 52 } 53
··· 40 41 switch e.Commit.Collection { 42 case tangled.SpindleMemberNSID: 43 + err = s.ingestMember(ctx, e) 44 case tangled.RepoNSID: 45 + err = s.ingestRepo(ctx, e) 46 case tangled.RepoCollaboratorNSID: 47 + err = s.ingestCollaborator(ctx, e) 48 } 49 50 + if err != nil { 51 + s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 52 + } 53 + 54 + return nil 55 } 56 } 57