From b2c5bc8e4543bc644b10910abea18e015721f6cc Mon Sep 17 00:00:00 2001 From: Thomas Karpiniec Date: Fri, 5 Dec 2025 10:27:14 +1100 Subject: [PATCH] jetstream: fix data race filtering events by did Signed-off-by: Thomas Karpiniec --- jetstream/jetstream.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index e5d3e98b..fc4480fd 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -72,12 +72,17 @@ func (j *JetstreamClient) withDidFilter(processFunc processor) processor { // existing instances of the closure when j.WantedDids is mutated return func(ctx context.Context, evt *models.Event) error { + j.mu.RLock() // empty filter => all dids allowed - if len(j.wantedDids) == 0 { - return processFunc(ctx, evt) + matches := len(j.wantedDids) == 0 + if !matches { + if _, ok := j.wantedDids[evt.Did]; ok { + matches = true + } } + j.mu.RUnlock() - if _, ok := j.wantedDids[evt.Did]; ok { + if matches { return processFunc(ctx, evt) } else { return nil @@ -122,7 +127,13 @@ func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(c go func() { if j.waitForDid { - for len(j.wantedDids) == 0 { + for { + j.mu.RLock() + hasDid := len(j.wantedDids) != 0 + j.mu.RUnlock() + if hasDid { + break + } time.Sleep(time.Second) } } -- 2.43.0