tangled
alpha
login
or
join now
moll.dev
/
core
forked from
tangled.org/core
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
knotserver: remove wantedDids filter for now
anirudh.fi
10 months ago
65dda689
a7da1394
+50
-261
5 changed files
expand all
collapse all
unified
split
cmd
knotserver
main.go
jetstream
jetstream.go
knotserver
handler.go
jetstream.go
routes.go
+1
-1
cmd/knotserver/main.go
···
49
49
jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{
50
50
tangled.PublicKeyNSID,
51
51
tangled.KnotMemberNSID,
52
52
-
}, nil, l, db, true)
52
52
+
}, nil, l, db, false)
53
53
if err != nil {
54
54
l.Error("failed to setup jetstream", "error", err)
55
55
}
+46
-257
jetstream/jetstream.go
···
19
19
UpdateLastTimeUs(int64) error
20
20
}
21
21
22
22
-
type JetstreamSubscriber struct {
23
23
-
client *client.Client
24
24
-
cancel context.CancelFunc
25
25
-
dids []string
26
26
-
ident string
27
27
-
running bool
28
28
-
}
29
29
-
30
22
type JetstreamClient struct {
31
31
-
cfg *client.ClientConfig
32
32
-
baseIdent string
33
33
-
l *slog.Logger
34
34
-
db DB
35
35
-
waitForDid bool
36
36
-
maxDidsPerSubscriber int
23
23
+
cfg *client.ClientConfig
24
24
+
client *client.Client
25
25
+
ident string
26
26
+
l *slog.Logger
37
27
38
38
-
mu sync.RWMutex
39
39
-
subscribers []*JetstreamSubscriber
40
40
-
processFunc func(context.Context, *models.Event) error
41
41
-
subscriberWg sync.WaitGroup
28
28
+
db DB
29
29
+
waitForDid bool
30
30
+
mu sync.RWMutex
31
31
+
32
32
+
cancel context.CancelFunc
33
33
+
cancelMu sync.Mutex
42
34
}
43
35
44
36
func (j *JetstreamClient) AddDid(did string) {
···
46
38
return
47
39
}
48
40
j.mu.Lock()
49
49
-
defer j.mu.Unlock()
50
50
-
51
51
-
// Just add to the config for now, actual subscriber management happens in UpdateDids
52
41
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
42
42
+
j.mu.Unlock()
53
43
}
54
44
55
45
func (j *JetstreamClient) UpdateDids(dids []string) {
···
59
49
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
60
50
}
61
51
}
62
62
-
63
63
-
needRebalance := j.processFunc != nil
64
52
j.mu.Unlock()
65
53
66
66
-
if needRebalance {
67
67
-
j.rebalanceSubscribers()
54
54
+
j.cancelMu.Lock()
55
55
+
if j.cancel != nil {
56
56
+
j.cancel()
68
57
}
58
58
+
j.cancelMu.Unlock()
69
59
}
70
60
71
61
func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) {
···
76
66
}
77
67
78
68
return &JetstreamClient{
79
79
-
cfg: cfg,
80
80
-
baseIdent: ident,
81
81
-
db: db,
82
82
-
l: logger,
83
83
-
waitForDid: waitForDid,
84
84
-
subscribers: make([]*JetstreamSubscriber, 0),
85
85
-
maxDidsPerSubscriber: 100,
69
69
+
cfg: cfg,
70
70
+
ident: ident,
71
71
+
db: db,
72
72
+
l: logger,
73
73
+
74
74
+
// This will make the goroutine in StartJetstream wait until
75
75
+
// cfg.WantedDids has been populated, typically using UpdateDids.
76
76
+
waitForDid: waitForDid,
86
77
}, nil
87
78
}
88
79
89
80
// StartJetstream starts the jetstream client and processes events using the provided processFunc.
90
81
// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).
91
82
func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
92
92
-
j.mu.Lock()
93
93
-
j.processFunc = processFunc
94
94
-
j.mu.Unlock()
95
95
-
96
96
-
if j.waitForDid {
97
97
-
// Start a goroutine to wait for DIDs and then start subscribers
98
98
-
go func() {
99
99
-
for {
100
100
-
j.mu.RLock()
101
101
-
hasDids := len(j.cfg.WantedDids) > 0
102
102
-
j.mu.RUnlock()
103
103
-
104
104
-
if hasDids {
105
105
-
j.l.Info("done waiting for did, starting subscribers")
106
106
-
j.rebalanceSubscribers()
107
107
-
return
108
108
-
}
109
109
-
time.Sleep(time.Second)
110
110
-
}
111
111
-
}()
112
112
-
} else {
113
113
-
// Start subscribers immediately
114
114
-
j.rebalanceSubscribers()
115
115
-
}
116
116
-
117
117
-
return nil
118
118
-
}
119
119
-
120
120
-
// rebalanceSubscribers creates, updates, or removes subscribers based on the current list of DIDs
121
121
-
func (j *JetstreamClient) rebalanceSubscribers() {
122
122
-
j.mu.Lock()
123
123
-
defer j.mu.Unlock()
124
124
-
125
125
-
if j.processFunc == nil {
126
126
-
j.l.Warn("cannot rebalance subscribers without a process function")
127
127
-
return
128
128
-
}
129
129
-
130
130
-
// calculate how many subscribers we need
131
131
-
totalDids := len(j.cfg.WantedDids)
132
132
-
subscribersNeeded := (totalDids + j.maxDidsPerSubscriber - 1) / j.maxDidsPerSubscriber // ceiling division
133
133
-
134
134
-
// first case: no subscribers yet; create all needed subscribers
135
135
-
if len(j.subscribers) == 0 {
136
136
-
for i := range subscribersNeeded {
137
137
-
startIdx := i * j.maxDidsPerSubscriber
138
138
-
endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids)
139
139
-
140
140
-
subscriberDids := j.cfg.WantedDids[startIdx:endIdx]
83
83
+
logger := j.l
141
84
142
142
-
subCfg := *j.cfg
143
143
-
subCfg.WantedDids = subscriberDids
144
144
-
145
145
-
ident := fmt.Sprintf("%s-%d", j.baseIdent, i)
146
146
-
subscriber := &JetstreamSubscriber{
147
147
-
dids: subscriberDids,
148
148
-
ident: ident,
149
149
-
}
150
150
-
j.subscribers = append(j.subscribers, subscriber)
85
85
+
sched := sequential.NewScheduler(j.ident, logger, processFunc)
151
86
152
152
-
j.subscriberWg.Add(1)
153
153
-
go j.startSubscriber(subscriber, &subCfg)
154
154
-
}
155
155
-
return
87
87
+
client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
88
88
+
if err != nil {
89
89
+
return fmt.Errorf("failed to create jetstream client: %w", err)
156
90
}
91
91
+
j.client = client
157
92
158
158
-
// second case: we have more subscribers than needed, stop extra subscribers
159
159
-
if len(j.subscribers) > subscribersNeeded {
160
160
-
for i := subscribersNeeded; i < len(j.subscribers); i++ {
161
161
-
sub := j.subscribers[i]
162
162
-
if sub.running && sub.cancel != nil {
163
163
-
sub.cancel()
164
164
-
sub.running = false
93
93
+
go func() {
94
94
+
if j.waitForDid {
95
95
+
for len(j.cfg.WantedDids) == 0 {
96
96
+
time.Sleep(time.Second)
165
97
}
166
98
}
167
167
-
j.subscribers = j.subscribers[:subscribersNeeded]
168
168
-
}
99
99
+
logger.Info("done waiting for did")
100
100
+
j.connectAndRead(ctx)
101
101
+
}()
169
102
170
170
-
// third case: we need more subscribers
171
171
-
if len(j.subscribers) < subscribersNeeded {
172
172
-
existingCount := len(j.subscribers)
173
173
-
// Create additional subscribers
174
174
-
for i := existingCount; i < subscribersNeeded; i++ {
175
175
-
startIdx := i * j.maxDidsPerSubscriber
176
176
-
endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids)
177
177
-
178
178
-
subscriberDids := j.cfg.WantedDids[startIdx:endIdx]
179
179
-
180
180
-
subCfg := *j.cfg
181
181
-
subCfg.WantedDids = subscriberDids
182
182
-
183
183
-
ident := fmt.Sprintf("%s-%d", j.baseIdent, i)
184
184
-
subscriber := &JetstreamSubscriber{
185
185
-
dids: subscriberDids,
186
186
-
ident: ident,
187
187
-
}
188
188
-
j.subscribers = append(j.subscribers, subscriber)
189
189
-
190
190
-
j.subscriberWg.Add(1)
191
191
-
go j.startSubscriber(subscriber, &subCfg)
192
192
-
}
193
193
-
}
194
194
-
195
195
-
// fourth case: update existing subscribers with new wantedDids
196
196
-
for i := 0; i < subscribersNeeded && i < len(j.subscribers); i++ {
197
197
-
startIdx := i * j.maxDidsPerSubscriber
198
198
-
endIdx := min((i+1)*j.maxDidsPerSubscriber, totalDids)
199
199
-
newDids := j.cfg.WantedDids[startIdx:endIdx]
200
200
-
201
201
-
// if the dids for this subscriber have changed, restart it
202
202
-
sub := j.subscribers[i]
203
203
-
if !didSlicesEqual(sub.dids, newDids) {
204
204
-
j.l.Info("subscriber DIDs changed, updating",
205
205
-
"subscriber", sub.ident,
206
206
-
"old_count", len(sub.dids),
207
207
-
"new_count", len(newDids))
208
208
-
209
209
-
if sub.running && sub.cancel != nil {
210
210
-
sub.cancel()
211
211
-
sub.running = false
212
212
-
}
213
213
-
214
214
-
subCfg := *j.cfg
215
215
-
subCfg.WantedDids = newDids
216
216
-
217
217
-
sub.dids = newDids
218
218
-
219
219
-
j.subscriberWg.Add(1)
220
220
-
go j.startSubscriber(sub, &subCfg)
221
221
-
}
222
222
-
}
103
103
+
return nil
223
104
}
224
105
225
225
-
func didSlicesEqual(a, b []string) bool {
226
226
-
if len(a) != len(b) {
227
227
-
return false
228
228
-
}
229
229
-
230
230
-
aMap := make(map[string]struct{}, len(a))
231
231
-
for _, did := range a {
232
232
-
aMap[did] = struct{}{}
233
233
-
}
234
234
-
235
235
-
for _, did := range b {
236
236
-
if _, exists := aMap[did]; !exists {
237
237
-
return false
238
238
-
}
239
239
-
}
240
240
-
241
241
-
return true
242
242
-
}
243
243
-
244
244
-
// startSubscriber initializes and starts a single subscriber
245
245
-
func (j *JetstreamClient) startSubscriber(sub *JetstreamSubscriber, cfg *client.ClientConfig) {
246
246
-
defer j.subscriberWg.Done()
247
247
-
248
248
-
logger := j.l.With("subscriber", sub.ident)
249
249
-
logger.Info("starting subscriber", "dids_count", len(sub.dids))
250
250
-
251
251
-
sched := sequential.NewScheduler(sub.ident, logger, j.processFunc)
252
252
-
253
253
-
client, err := client.NewClient(cfg, log.New("jetstream-"+sub.ident), sched)
254
254
-
if err != nil {
255
255
-
logger.Error("failed to create jetstream client", "error", err)
256
256
-
return
257
257
-
}
258
258
-
259
259
-
sub.client = client
260
260
-
261
261
-
j.mu.Lock()
262
262
-
sub.running = true
263
263
-
j.mu.Unlock()
264
264
-
265
265
-
j.connectAndReadForSubscriber(sub)
266
266
-
}
267
267
-
268
268
-
func (j *JetstreamClient) connectAndReadForSubscriber(sub *JetstreamSubscriber) {
269
269
-
ctx := context.Background()
270
270
-
l := j.l.With("subscriber", sub.ident)
271
271
-
106
106
+
func (j *JetstreamClient) connectAndRead(ctx context.Context) {
107
107
+
l := log.FromContext(ctx)
272
108
for {
273
273
-
// Check if this subscriber should still be running
274
274
-
j.mu.RLock()
275
275
-
running := sub.running
276
276
-
j.mu.RUnlock()
277
277
-
278
278
-
if !running {
279
279
-
l.Info("subscriber marked for shutdown")
280
280
-
return
281
281
-
}
282
282
-
283
109
cursor := j.getLastTimeUs(ctx)
284
110
285
111
connCtx, cancel := context.WithCancel(ctx)
112
112
+
j.cancelMu.Lock()
113
113
+
j.cancel = cancel
114
114
+
j.cancelMu.Unlock()
286
115
287
287
-
j.mu.Lock()
288
288
-
sub.cancel = cancel
289
289
-
j.mu.Unlock()
290
290
-
291
291
-
l.Info("connecting subscriber to jetstream")
292
292
-
if err := sub.client.ConnectAndRead(connCtx, cursor); err != nil {
116
116
+
if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
293
117
l.Error("error reading jetstream", "error", err)
294
118
cancel()
295
295
-
time.Sleep(time.Second) // Small backoff before retry
296
119
continue
297
120
}
298
121
299
122
select {
300
123
case <-ctx.Done():
301
301
-
l.Info("context done, stopping subscriber")
124
124
+
l.Info("context done, stopping jetstream")
302
125
return
303
126
case <-connCtx.Done():
304
127
l.Info("connection context done, reconnecting")
···
307
130
}
308
131
}
309
132
310
310
-
// GetRunningSubscribersCount returns the total number of currently running subscribers
311
311
-
func (j *JetstreamClient) GetRunningSubscribersCount() int {
312
312
-
j.mu.RLock()
313
313
-
defer j.mu.RUnlock()
314
314
-
315
315
-
runningCount := 0
316
316
-
for _, sub := range j.subscribers {
317
317
-
if sub.running {
318
318
-
runningCount++
319
319
-
}
320
320
-
}
321
321
-
322
322
-
return runningCount
323
323
-
}
324
324
-
325
325
-
// Shutdown gracefully stops all subscribers
326
326
-
func (j *JetstreamClient) Shutdown() {
327
327
-
j.mu.Lock()
328
328
-
329
329
-
// Cancel all subscribers
330
330
-
for _, sub := range j.subscribers {
331
331
-
if sub.running && sub.cancel != nil {
332
332
-
sub.cancel()
333
333
-
sub.running = false
334
334
-
}
335
335
-
}
336
336
-
337
337
-
j.mu.Unlock()
338
338
-
339
339
-
// Wait for all subscribers to complete
340
340
-
j.subscriberWg.Wait()
341
341
-
j.l.Info("all subscribers shut down", "total_subscribers", len(j.subscribers), "running_subscribers", j.GetRunningSubscribersCount())
342
342
-
}
343
343
-
344
133
func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
345
134
l := log.FromContext(ctx)
346
135
lastTimeUs, err := j.db.GetLastTimeUs()
···
353
142
}
354
143
}
355
144
356
356
-
// If last time is older than 2 days, start from now
145
145
+
// If last time is older than a week, start from now
357
146
if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
358
147
lastTimeUs = time.Now().UnixMicro()
359
148
l.Warn("last time us is older than 2 days; discarding that and starting from now")
···
363
152
}
364
153
}
365
154
366
366
-
l.Info("found last time_us", "time_us", lastTimeUs, "running_subscribers", j.GetRunningSubscribersCount())
155
155
+
l.Info("found last time_us", "time_us", lastTimeUs)
367
156
return &lastTimeUs
368
157
}
+1
-1
knotserver/handler.go
···
62
62
if len(dids) > 0 {
63
63
h.knotInitialized = true
64
64
close(h.init)
65
65
-
h.jc.UpdateDids(dids)
65
65
+
// h.jc.UpdateDids(dids)
66
66
}
67
67
68
68
r.Get("/", h.Index)
+1
-1
knotserver/jetstream.go
···
118
118
if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil {
119
119
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
120
120
}
121
121
-
h.jc.UpdateDids([]string{did})
121
121
+
// h.jc.UpdateDids([]string{did})
122
122
}()
123
123
124
124
raw := json.RawMessage(event.Commit.Record)
+1
-1
knotserver/routes.go
···
769
769
return
770
770
}
771
771
772
772
-
h.jc.UpdateDids([]string{data.Did})
772
772
+
// h.jc.UpdateDids([]string{data.Did})
773
773
if err := h.e.AddOwner(ThisServer, data.Did); err != nil {
774
774
l.Error("adding owner", "error", err.Error())
775
775
writeError(w, err.Error(), http.StatusInternalServerError)