forked from tangled.org/core
Monorepo for Tangled

jetstream: selectively update subscribers

No need to restart all subscribers when we get a new wantedDid.

Changed files
+99 -21
jetstream
+99 -21
jetstream/jetstream.go
··· 127 127 return 128 128 } 129 129 130 - // stop all subscribers first 131 - for _, sub := range j.subscribers { 132 - if sub.running && sub.cancel != nil { 133 - sub.cancel() 134 - sub.running = false 135 - } 136 - } 137 - 138 130 // calculate how many subscribers we need 139 131 totalDids := len(j.cfg.WantedDids) 140 132 subscribersNeeded := (totalDids + j.maxDidsPerSubscriber - 1) / j.maxDidsPerSubscriber // ceiling division 141 133 142 - // create or reuse subscribers as needed 143 - j.subscribers = j.subscribers[:0] 134 + // first case: no subscribers yet; create all needed subscribers 135 + if len(j.subscribers) == 0 { 136 + for i := range subscribersNeeded { 137 + startIdx := i * j.maxDidsPerSubscriber 138 + endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids) 144 139 145 - for i := range subscribersNeeded { 140 + subscriberDids := j.cfg.WantedDids[startIdx:endIdx] 141 + 142 + subCfg := *j.cfg 143 + subCfg.WantedDids = subscriberDids 144 + 145 + ident := fmt.Sprintf("%s-%d", j.baseIdent, i) 146 + subscriber := &JetstreamSubscriber{ 147 + dids: subscriberDids, 148 + ident: ident, 149 + } 150 + j.subscribers = append(j.subscribers, subscriber) 151 + 152 + j.subscriberWg.Add(1) 153 + go j.startSubscriber(subscriber, &subCfg) 154 + } 155 + return 156 + } 157 + 158 + // second case: we have more subscribers than needed, stop extra subscribers 159 + if len(j.subscribers) > subscribersNeeded { 160 + for i := subscribersNeeded; i < len(j.subscribers); i++ { 161 + sub := j.subscribers[i] 162 + if sub.running && sub.cancel != nil { 163 + sub.cancel() 164 + sub.running = false 165 + } 166 + } 167 + j.subscribers = j.subscribers[:subscribersNeeded] 168 + } 169 + 170 + // third case: we need more subscribers 171 + if len(j.subscribers) < subscribersNeeded { 172 + existingCount := len(j.subscribers) 173 + // Create additional subscribers 174 + for i := existingCount; i < subscribersNeeded; i++ { 175 + startIdx := i * j.maxDidsPerSubscriber 176 + endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids) 177 + 178 + subscriberDids := j.cfg.WantedDids[startIdx:endIdx] 179 + 180 + subCfg := *j.cfg 181 + subCfg.WantedDids = subscriberDids 182 + 183 + ident := fmt.Sprintf("%s-%d", j.baseIdent, i) 184 + subscriber := &JetstreamSubscriber{ 185 + dids: subscriberDids, 186 + ident: ident, 187 + } 188 + j.subscribers = append(j.subscribers, subscriber) 189 + 190 + j.subscriberWg.Add(1) 191 + go j.startSubscriber(subscriber, &subCfg) 192 + } 193 + } 194 + 195 + // fourth case: update existing subscribers with new wantedDids 196 + for i := 0; i < subscribersNeeded && i < len(j.subscribers); i++ { 146 197 startIdx := i * j.maxDidsPerSubscriber 147 198 endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids) 199 + newDids := j.cfg.WantedDids[startIdx:endIdx] 148 200 149 - subscriberDids := j.cfg.WantedDids[startIdx:endIdx] 201 + // if the dids for this subscriber have changed, restart it 202 + sub := j.subscribers[i] 203 + if !didSlicesEqual(sub.dids, newDids) { 204 + j.l.Info("subscriber DIDs changed, updating", 205 + "subscriber", sub.ident, 206 + "old_count", len(sub.dids), 207 + "new_count", len(newDids)) 150 208 151 - subCfg := *j.cfg 152 - subCfg.WantedDids = subscriberDids 209 + if sub.running && sub.cancel != nil { 210 + sub.cancel() 211 + sub.running = false 212 + } 153 213 154 - ident := fmt.Sprintf("%s-%d", j.baseIdent, i) 155 - subscriber := &JetstreamSubscriber{ 156 - dids: subscriberDids, 157 - ident: ident, 214 + subCfg := *j.cfg 215 + subCfg.WantedDids = newDids 216 + 217 + sub.dids = newDids 218 + 219 + j.subscriberWg.Add(1) 220 + go j.startSubscriber(sub, &subCfg) 158 221 } 159 - j.subscribers = append(j.subscribers, subscriber) 222 + } 223 + } 160 224 161 - j.subscriberWg.Add(1) 162 - go j.startSubscriber(subscriber, &subCfg) 225 + func didSlicesEqual(a, b []string) bool { 226 + if len(a) != len(b) { 227 + return false 163 228 } 229 + 230 + aMap := make(map[string]struct{}, len(a)) 231 + for _, did := range a { 232 + aMap[did] = struct{}{} 233 + } 234 + 235 + for _, did := range b { 236 + if _, exists := aMap[did]; !exists { 237 + return false 238 + } 239 + } 240 + 241 + return true 164 242 } 165 243 166 244 // startSubscriber initializes and starts a single subscriber