forked from tangled.org/core
Monorepo for Tangled

knotserver: remove wantedDids filter for now

anirudh.fi 93c95212 cfc4b6e6

verified
Changed files
+50 -261
cmd
knotserver
jetstream
knotserver
+1 -1
cmd/knotserver/main.go
··· 49 49 jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{ 50 50 tangled.PublicKeyNSID, 51 51 tangled.KnotMemberNSID, 52 - }, nil, l, db, true) 52 + }, nil, l, db, false) 53 53 if err != nil { 54 54 l.Error("failed to setup jetstream", "error", err) 55 55 }
+46 -257
jetstream/jetstream.go
··· 19 19 UpdateLastTimeUs(int64) error 20 20 } 21 21 22 - type JetstreamSubscriber struct { 23 - client *client.Client 24 - cancel context.CancelFunc 25 - dids []string 26 - ident string 27 - running bool 28 - } 29 - 30 22 type JetstreamClient struct { 31 - cfg *client.ClientConfig 32 - baseIdent string 33 - l *slog.Logger 34 - db DB 35 - waitForDid bool 36 - maxDidsPerSubscriber int 23 + cfg *client.ClientConfig 24 + client *client.Client 25 + ident string 26 + l *slog.Logger 37 27 38 - mu sync.RWMutex 39 - subscribers []*JetstreamSubscriber 40 - processFunc func(context.Context, *models.Event) error 41 - subscriberWg sync.WaitGroup 28 + db DB 29 + waitForDid bool 30 + mu sync.RWMutex 31 + 32 + cancel context.CancelFunc 33 + cancelMu sync.Mutex 42 34 } 43 35 44 36 func (j *JetstreamClient) AddDid(did string) { ··· 46 38 return 47 39 } 48 40 j.mu.Lock() 49 - defer j.mu.Unlock() 50 - 51 - // Just add to the config for now, actual subscriber management happens in UpdateDids 52 41 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 42 + j.mu.Unlock() 53 43 } 54 44 55 45 func (j *JetstreamClient) UpdateDids(dids []string) { ··· 59 49 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 60 50 } 61 51 } 62 - 63 - needRebalance := j.processFunc != nil 64 52 j.mu.Unlock() 65 53 66 - if needRebalance { 67 - j.rebalanceSubscribers() 54 + j.cancelMu.Lock() 55 + if j.cancel != nil { 56 + j.cancel() 68 57 } 58 + j.cancelMu.Unlock() 69 59 } 70 60 71 61 func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) { ··· 76 66 } 77 67 78 68 return &JetstreamClient{ 79 - cfg: cfg, 80 - baseIdent: ident, 81 - db: db, 82 - l: logger, 83 - waitForDid: waitForDid, 84 - subscribers: make([]*JetstreamSubscriber, 0), 85 - maxDidsPerSubscriber: 100, 69 + cfg: cfg, 70 + ident: ident, 71 + db: db, 72 + l: logger, 73 + 74 + // This will make the goroutine in StartJetstream wait until 75 + // cfg.WantedDids has been populated, typically using UpdateDids. 76 + waitForDid: waitForDid, 86 77 }, nil 87 78 } 88 79 89 80 // StartJetstream starts the jetstream client and processes events using the provided processFunc. 90 81 // The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs). 91 82 func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 92 - j.mu.Lock() 93 - j.processFunc = processFunc 94 - j.mu.Unlock() 95 - 96 - if j.waitForDid { 97 - // Start a goroutine to wait for DIDs and then start subscribers 98 - go func() { 99 - for { 100 - j.mu.RLock() 101 - hasDids := len(j.cfg.WantedDids) > 0 102 - j.mu.RUnlock() 103 - 104 - if hasDids { 105 - j.l.Info("done waiting for did, starting subscribers") 106 - j.rebalanceSubscribers() 107 - return 108 - } 109 - time.Sleep(time.Second) 110 - } 111 - }() 112 - } else { 113 - // Start subscribers immediately 114 - j.rebalanceSubscribers() 115 - } 116 - 117 - return nil 118 - } 119 - 120 - // rebalanceSubscribers creates, updates, or removes subscribers based on the current list of DIDs 121 - func (j *JetstreamClient) rebalanceSubscribers() { 122 - j.mu.Lock() 123 - defer j.mu.Unlock() 124 - 125 - if j.processFunc == nil { 126 - j.l.Warn("cannot rebalance subscribers without a process function") 127 - return 128 - } 129 - 130 - // calculate how many subscribers we need 131 - totalDids := len(j.cfg.WantedDids) 132 - subscribersNeeded := (totalDids + j.maxDidsPerSubscriber - 1) / j.maxDidsPerSubscriber // ceiling division 133 - 134 - // first case: no subscribers yet; create all needed subscribers 135 - if len(j.subscribers) == 0 { 136 - for i := range subscribersNeeded { 137 - startIdx := i * j.maxDidsPerSubscriber 138 - endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids) 139 - 140 - subscriberDids := j.cfg.WantedDids[startIdx:endIdx] 83 + logger := j.l 141 84 142 - subCfg := *j.cfg 143 - subCfg.WantedDids = subscriberDids 144 - 145 - ident := fmt.Sprintf("%s-%d", j.baseIdent, i) 146 - subscriber := &JetstreamSubscriber{ 147 - dids: subscriberDids, 148 - ident: ident, 149 - } 150 - j.subscribers = append(j.subscribers, subscriber) 85 + sched := sequential.NewScheduler(j.ident, logger, processFunc) 151 86 152 - j.subscriberWg.Add(1) 153 - go j.startSubscriber(subscriber, &subCfg) 154 - } 155 - return 87 + client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 88 + if err != nil { 89 + return fmt.Errorf("failed to create jetstream client: %w", err) 156 90 } 91 + j.client = client 157 92 158 - // second case: we have more subscribers than needed, stop extra subscribers 159 - if len(j.subscribers) > subscribersNeeded { 160 - for i := subscribersNeeded; i < len(j.subscribers); i++ { 161 - sub := j.subscribers[i] 162 - if sub.running && sub.cancel != nil { 163 - sub.cancel() 164 - sub.running = false 93 + go func() { 94 + if j.waitForDid { 95 + for len(j.cfg.WantedDids) == 0 { 96 + time.Sleep(time.Second) 165 97 } 166 98 } 167 - j.subscribers = j.subscribers[:subscribersNeeded] 168 - } 99 + logger.Info("done waiting for did") 100 + j.connectAndRead(ctx) 101 + }() 169 102 170 - // third case: we need more subscribers 171 - if len(j.subscribers) < subscribersNeeded { 172 - existingCount := len(j.subscribers) 173 - // Create additional subscribers 174 - for i := existingCount; i < subscribersNeeded; i++ { 175 - startIdx := i * j.maxDidsPerSubscriber 176 - endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids) 177 - 178 - subscriberDids := j.cfg.WantedDids[startIdx:endIdx] 179 - 180 - subCfg := *j.cfg 181 - subCfg.WantedDids = subscriberDids 182 - 183 - ident := fmt.Sprintf("%s-%d", j.baseIdent, i) 184 - subscriber := &JetstreamSubscriber{ 185 - dids: subscriberDids, 186 - ident: ident, 187 - } 188 - j.subscribers = append(j.subscribers, subscriber) 189 - 190 - j.subscriberWg.Add(1) 191 - go j.startSubscriber(subscriber, &subCfg) 192 - } 193 - } 194 - 195 - // fourth case: update existing subscribers with new wantedDids 196 - for i := 0; i < subscribersNeeded && i < len(j.subscribers); i++ { 197 - startIdx := i * j.maxDidsPerSubscriber 198 - endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids) 199 - newDids := j.cfg.WantedDids[startIdx:endIdx] 200 - 201 - // if the dids for this subscriber have changed, restart it 202 - sub := j.subscribers[i] 203 - if !didSlicesEqual(sub.dids, newDids) { 204 - j.l.Info("subscriber DIDs changed, updating", 205 - "subscriber", sub.ident, 206 - "old_count", len(sub.dids), 207 - "new_count", len(newDids)) 208 - 209 - if sub.running && sub.cancel != nil { 210 - sub.cancel() 211 - sub.running = false 212 - } 213 - 214 - subCfg := *j.cfg 215 - subCfg.WantedDids = newDids 216 - 217 - sub.dids = newDids 218 - 219 - j.subscriberWg.Add(1) 220 - go j.startSubscriber(sub, &subCfg) 221 - } 222 - } 103 + return nil 223 104 } 224 105 225 - func didSlicesEqual(a, b []string) bool { 226 - if len(a) != len(b) { 227 - return false 228 - } 229 - 230 - aMap := make(map[string]struct{}, len(a)) 231 - for _, did := range a { 232 - aMap[did] = struct{}{} 233 - } 234 - 235 - for _, did := range b { 236 - if _, exists := aMap[did]; !exists { 237 - return false 238 - } 239 - } 240 - 241 - return true 242 - } 243 - 244 - // startSubscriber initializes and starts a single subscriber 245 - func (j *JetstreamClient) startSubscriber(sub *JetstreamSubscriber, cfg *client.ClientConfig) { 246 - defer j.subscriberWg.Done() 247 - 248 - logger := j.l.With("subscriber", sub.ident) 249 - logger.Info("starting subscriber", "dids_count", len(sub.dids)) 250 - 251 - sched := sequential.NewScheduler(sub.ident, logger, j.processFunc) 252 - 253 - client, err := client.NewClient(cfg, log.New("jetstream-"+sub.ident), sched) 254 - if err != nil { 255 - logger.Error("failed to create jetstream client", "error", err) 256 - return 257 - } 258 - 259 - sub.client = client 260 - 261 - j.mu.Lock() 262 - sub.running = true 263 - j.mu.Unlock() 264 - 265 - j.connectAndReadForSubscriber(sub) 266 - } 267 - 268 - func (j *JetstreamClient) connectAndReadForSubscriber(sub *JetstreamSubscriber) { 269 - ctx := context.Background() 270 - l := j.l.With("subscriber", sub.ident) 271 - 106 + func (j *JetstreamClient) connectAndRead(ctx context.Context) { 107 + l := log.FromContext(ctx) 272 108 for { 273 - // Check if this subscriber should still be running 274 - j.mu.RLock() 275 - running := sub.running 276 - j.mu.RUnlock() 277 - 278 - if !running { 279 - l.Info("subscriber marked for shutdown") 280 - return 281 - } 282 - 283 109 cursor := j.getLastTimeUs(ctx) 284 110 285 111 connCtx, cancel := context.WithCancel(ctx) 112 + j.cancelMu.Lock() 113 + j.cancel = cancel 114 + j.cancelMu.Unlock() 286 115 287 - j.mu.Lock() 288 - sub.cancel = cancel 289 - j.mu.Unlock() 290 - 291 - l.Info("connecting subscriber to jetstream") 292 - if err := sub.client.ConnectAndRead(connCtx, cursor); err != nil { 116 + if err := j.client.ConnectAndRead(connCtx, cursor); err != nil { 293 117 l.Error("error reading jetstream", "error", err) 294 118 cancel() 295 - time.Sleep(time.Second) // Small backoff before retry 296 119 continue 297 120 } 298 121 299 122 select { 300 123 case <-ctx.Done(): 301 - l.Info("context done, stopping subscriber") 124 + l.Info("context done, stopping jetstream") 302 125 return 303 126 case <-connCtx.Done(): 304 127 l.Info("connection context done, reconnecting") ··· 307 130 } 308 131 } 309 132 310 - // GetRunningSubscribersCount returns the total number of currently running subscribers 311 - func (j *JetstreamClient) GetRunningSubscribersCount() int { 312 - j.mu.RLock() 313 - defer j.mu.RUnlock() 314 - 315 - runningCount := 0 316 - for _, sub := range j.subscribers { 317 - if sub.running { 318 - runningCount++ 319 - } 320 - } 321 - 322 - return runningCount 323 - } 324 - 325 - // Shutdown gracefully stops all subscribers 326 - func (j *JetstreamClient) Shutdown() { 327 - j.mu.Lock() 328 - 329 - // Cancel all subscribers 330 - for _, sub := range j.subscribers { 331 - if sub.running && sub.cancel != nil { 332 - sub.cancel() 333 - sub.running = false 334 - } 335 - } 336 - 337 - j.mu.Unlock() 338 - 339 - // Wait for all subscribers to complete 340 - j.subscriberWg.Wait() 341 - j.l.Info("all subscribers shut down", "total_subscribers", len(j.subscribers), "running_subscribers", j.GetRunningSubscribersCount()) 342 - } 343 - 344 133 func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 { 345 134 l := log.FromContext(ctx) 346 135 lastTimeUs, err := j.db.GetLastTimeUs() ··· 353 142 } 354 143 } 355 144 356 - // If last time is older than 2 days, start from now 145 + // If last time is older than a week, start from now 357 146 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 { 358 147 lastTimeUs = time.Now().UnixMicro() 359 148 l.Warn("last time us is older than 2 days; discarding that and starting from now") ··· 363 152 } 364 153 } 365 154 366 - l.Info("found last time_us", "time_us", lastTimeUs, "running_subscribers", j.GetRunningSubscribersCount()) 155 + l.Info("found last time_us", "time_us", lastTimeUs) 367 156 return &lastTimeUs 368 157 }
+1 -1
knotserver/handler.go
··· 62 62 if len(dids) > 0 { 63 63 h.knotInitialized = true 64 64 close(h.init) 65 - h.jc.UpdateDids(dids) 65 + // h.jc.UpdateDids(dids) 66 66 } 67 67 68 68 r.Get("/", h.Index)
+1 -1
knotserver/jetstream.go
··· 118 118 if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil { 119 119 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 120 120 } 121 - h.jc.UpdateDids([]string{did}) 121 + // h.jc.UpdateDids([]string{did}) 122 122 }() 123 123 124 124 raw := json.RawMessage(event.Commit.Record)
+1 -1
knotserver/routes.go
··· 769 769 return 770 770 } 771 771 772 - h.jc.UpdateDids([]string{data.Did}) 772 + // h.jc.UpdateDids([]string{data.Did}) 773 773 if err := h.e.AddOwner(ThisServer, data.Did); err != nil { 774 774 l.Error("adding owner", "error", err.Error()) 775 775 writeError(w, err.Error(), http.StatusInternalServerError)