+1
appview/state/state.go
+1
appview/state/state.go
+3
-14
jetstream/jetstream.go
+3
-14
jetstream/jetstream.go
···
61
61
}, nil
62
62
}
63
63
64
+
// StartJetstream starts the jetstream client and processes events using the provided processFunc.
65
+
// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).
64
66
func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
65
67
logger := log.FromContext(ctx)
66
68
67
-
pf := func(ctx context.Context, e *models.Event) error {
68
-
err := processFunc(ctx, e)
69
-
if err != nil {
70
-
return err
71
-
}
72
-
73
-
if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil {
74
-
return err
75
-
}
76
-
77
-
return nil
78
-
}
79
-
80
-
sched := sequential.NewScheduler(j.ident, logger, pf)
69
+
sched := sequential.NewScheduler(j.ident, logger, processFunc)
81
70
82
71
client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
83
72
if err != nil {