appview,knotserver,spindle: rework jetstream #480

merged
opened by oppi.li targeting master from push-mtsxyxnkznyy

do not return errors from ingesters, this causes the read loop to be killed.

Signed-off-by: oppiliappan me@oppi.li

Changed files
+70 -58
appview
jetstream
knotserver
log
spindle
+2 -2
appview/ingester.go
··· 71 71 } 72 72 73 73 if err != nil { 74 - l.Error("error ingesting record", "err", err) 74 + l.Debug("error ingesting record", "err", err) 75 75 } 76 76 77 - return err 77 + return nil 78 78 } 79 79 } 80 80
+6 -4
jetstream/jetstream.go
··· 68 68 type processor func(context.Context, *models.Event) error 69 69 70 70 func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 71 - // empty filter => all dids allowed 72 - if len(j.wantedDids) == 0 { 73 - return processFunc 74 - } 75 71 // since this closure references j.WantedDids; it should auto-update 76 72 // existing instances of the closure when j.WantedDids is mutated 77 73 return func(ctx context.Context, evt *models.Event) error { 74 + 75 + // empty filter => all dids allowed 76 + if len(j.wantedDids) == 0 { 77 + return processFunc(ctx, evt) 78 + } 79 + 78 80 if _, ok := j.wantedDids[evt.Did]; ok { 79 81 return processFunc(ctx, evt) 80 82 } else {
+5 -5
knotserver/handler.go
··· 52 52 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 53 53 } 54 54 55 - err = h.jc.StartJetstream(ctx, h.processMessages) 56 - if err != nil { 57 - return nil, fmt.Errorf("failed to start jetstream: %w", err) 58 - } 59 - 60 55 // Check if the knot knows about any Dids; 61 56 // if it does, it is already initialized and we can repopulate the 62 57 // Jetstream subscriptions. ··· 73 68 } 74 69 } 75 70 71 + err = h.jc.StartJetstream(ctx, h.processMessages) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to start jetstream: %w", err) 74 + } 75 + 76 76 r.Get("/", h.Index) 77 77 r.Get("/capabilities", h.Capabilities) 78 78 r.Get("/version", h.Version)
+46 -42
knotserver/ingester.go
··· 25 25 "tangled.sh/tangled.sh/core/workflow" 26 26 ) 27 27 28 - func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 28 + func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error { 29 29 l := log.FromContext(ctx) 30 + raw := json.RawMessage(event.Commit.Record) 31 + did := event.Did 32 + 33 + var record tangled.PublicKey 34 + if err := json.Unmarshal(raw, &record); err != nil { 35 + return fmt.Errorf("failed to unmarshal record: %w", err) 36 + } 37 + 30 38 pk := db.PublicKey{ 31 39 Did: did, 32 40 PublicKey: record, ··· 39 47 return nil 40 48 } 41 49 42 - func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 50 + func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error { 43 51 l := log.FromContext(ctx) 52 + raw := json.RawMessage(event.Commit.Record) 53 + did := event.Did 54 + 55 + var record tangled.KnotMember 56 + if err := json.Unmarshal(raw, &record); err != nil { 57 + return fmt.Errorf("failed to unmarshal record: %w", err) 58 + } 44 59 45 60 if record.Domain != h.c.Server.Hostname { 46 61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) ··· 72 87 return nil 73 88 } 74 89 75 - func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 90 + func (h *Handle) processPull(ctx context.Context, event *models.Event) error { 91 + raw := json.RawMessage(event.Commit.Record) 92 + did := event.Did 93 + 94 + var record tangled.RepoPull 95 + if err := json.Unmarshal(raw, &record); err != nil { 96 + return fmt.Errorf("failed to unmarshal record: %w", err) 97 + } 98 + 76 99 l := log.FromContext(ctx) 77 100 l = l.With("handler", "processPull") 78 101 l = l.With("did", did) ··· 204 227 return nil 205 228 } 206 229 207 - event := db.Event{ 230 + ev := db.Event{ 208 231 Rkey: TID(), 209 232 Nsid: tangled.PipelineNSID, 210 233 EventJson: string(eventJson), 211 234 } 212 235 213 - return h.db.InsertEvent(event, h.n) 236 + return h.db.InsertEvent(ev, h.n) 214 237 } 215 238 216 239 // duplicated from add collaborator 217 - func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { 240 + func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error { 241 + raw := json.RawMessage(event.Commit.Record) 242 + did := event.Did 243 + 244 + var record tangled.RepoCollaborator 245 + if err := json.Unmarshal(raw, &record); err != nil { 246 + return fmt.Errorf("failed to unmarshal record: %w", err) 247 + } 248 + 218 249 repoAt, err := syntax.ParseATURI(record.Repo) 219 250 if err != nil { 220 251 return err ··· 247 278 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 248 279 249 280 // check perms for this user 250 - if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 281 + if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil { 251 282 return fmt.Errorf("insufficient permissions: %w", err) 252 283 } 253 284 ··· 307 338 } 308 339 309 340 func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 310 - did := event.Did 311 341 if event.Kind != models.EventKindCommit { 312 342 return nil 313 343 } ··· 321 351 } 322 352 }() 323 353 324 - raw := json.RawMessage(event.Commit.Record) 325 - 326 354 switch event.Commit.Collection { 327 355 case tangled.PublicKeyNSID: 328 - var record tangled.PublicKey 329 - if err := json.Unmarshal(raw, &record); err != nil { 330 - return fmt.Errorf("failed to unmarshal record: %w", err) 331 - } 332 - if err := h.processPublicKey(ctx, did, record); err != nil { 333 - return fmt.Errorf("failed to process public key: %w", err) 334 - } 335 - 356 + err = h.processPublicKey(ctx, event) 336 357 case tangled.KnotMemberNSID: 337 - var record tangled.KnotMember 338 - if err := json.Unmarshal(raw, &record); err != nil { 339 - return fmt.Errorf("failed to unmarshal record: %w", err) 340 - } 341 - if err := h.processKnotMember(ctx, did, record); err != nil { 342 - return fmt.Errorf("failed to process knot member: %w", err) 343 - } 344 - 358 + err = h.processKnotMember(ctx, event) 345 359 case tangled.RepoPullNSID: 346 - var record tangled.RepoPull 347 - if err := json.Unmarshal(raw, &record); err != nil { 348 - return fmt.Errorf("failed to unmarshal record: %w", err) 349 - } 350 - if err := h.processPull(ctx, did, record); err != nil { 351 - return fmt.Errorf("failed to process knot member: %w", err) 352 - } 353 - 360 + err = h.processPull(ctx, event) 354 361 case tangled.RepoCollaboratorNSID: 355 - var record tangled.RepoCollaborator 356 - if err := json.Unmarshal(raw, &record); err != nil { 357 - return fmt.Errorf("failed to unmarshal record: %w", err) 358 - } 359 - if err := h.processCollaborator(ctx, did, record); err != nil { 360 - return fmt.Errorf("failed to process knot member: %w", err) 361 - } 362 + err = h.processCollaborator(ctx, event) 363 + } 362 364 365 + if err != nil { 366 + h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 363 367 } 364 368 365 - return err 369 + return nil 366 370 }
+3 -1
log/log.go
··· 9 9 // NewHandler sets up a new slog.Handler with the service name 10 10 // as an attribute 11 11 func NewHandler(name string) slog.Handler { 12 - handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}) 12 + handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 13 + Level: slog.LevelDebug, 14 + }) 13 15 14 16 var attrs []slog.Attr 15 17 attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
+8 -4
spindle/ingester.go
··· 40 40 41 41 switch e.Commit.Collection { 42 42 case tangled.SpindleMemberNSID: 43 - s.ingestMember(ctx, e) 43 + err = s.ingestMember(ctx, e) 44 44 case tangled.RepoNSID: 45 - s.ingestRepo(ctx, e) 45 + err = s.ingestRepo(ctx, e) 46 46 case tangled.RepoCollaboratorNSID: 47 - s.ingestCollaborator(ctx, e) 47 + err = s.ingestCollaborator(ctx, e) 48 48 } 49 49 50 - return err 50 + if err != nil { 51 + s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 52 + } 53 + 54 + return nil 51 55 } 52 56 } 53 57