tangled
alpha
login
or
join now
stream.place
/
streamplace
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
ope fix formatting
seiso.moe
9 months ago
891b8a68
ce468d15
+39
-39
2 changed files
expand all
collapse all
unified
split
pkg
api
api.go
websocket.go
+25
-25
pkg/api/api.go
···
747
}
748
749
func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
750
-
a.limitersMu.Lock()
751
-
defer a.limitersMu.Unlock()
752
-
753
-
limiter, exists := a.limiters[ip]
754
-
if !exists {
755
-
// 5 actions per second with a burst of 3
756
-
limiter = rate.NewLimiter(rate.Limit(5.0), 3)
757
-
a.limiters[ip] = limiter
758
-
}
759
-
760
-
return limiter
761
}
762
763
func (a *StreamplaceAPI) getMsgLimiter(connID string) *rate.Limiter {
764
-
a.msgLimitersMu.Lock()
765
-
defer a.msgLimitersMu.Unlock()
766
-
767
-
limiter, exists := a.msgLimiters[connID]
768
-
if !exists {
769
-
// 10 messages per second with a burst of 20
770
-
limiter = rate.NewLimiter(rate.Limit(10), 20)
771
-
a.msgLimiters[connID] = limiter
772
-
}
773
-
774
-
return limiter
775
}
776
777
func (a *StreamplaceAPI) removeMsgLimiter(connID string) {
778
-
a.msgLimitersMu.Lock()
779
-
defer a.msgLimitersMu.Unlock()
780
-
delete(a.msgLimiters, connID)
781
}
···
747
}
748
749
func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
750
+
a.limitersMu.Lock()
751
+
defer a.limitersMu.Unlock()
752
+
753
+
limiter, exists := a.limiters[ip]
754
+
if !exists {
755
+
// 5 actions per second with a burst of 3
756
+
limiter = rate.NewLimiter(rate.Limit(5.0), 3)
757
+
a.limiters[ip] = limiter
758
+
}
759
+
760
+
return limiter
761
}
762
763
func (a *StreamplaceAPI) getMsgLimiter(connID string) *rate.Limiter {
764
+
a.msgLimitersMu.Lock()
765
+
defer a.msgLimitersMu.Unlock()
766
+
767
+
limiter, exists := a.msgLimiters[connID]
768
+
if !exists {
769
+
// 10 messages per second with a burst of 20
770
+
limiter = rate.NewLimiter(rate.Limit(10), 20)
771
+
a.msgLimiters[connID] = limiter
772
+
}
773
+
774
+
return limiter
775
}
776
777
func (a *StreamplaceAPI) removeMsgLimiter(connID string) {
778
+
a.msgLimitersMu.Lock()
779
+
defer a.msgLimitersMu.Unlock()
780
+
delete(a.msgLimiters, connID)
781
}
+14
-14
pkg/api/websocket.go
···
37
38
limiter := a.getLimiter(clientIP)
39
if !limiter.Allow() {
40
-
apierrors.WriteHTTPTooManyRequests(w, "rate limit")
41
-
return
42
}
43
44
uu, _ := uuid.NewV7()
···
69
70
msgLimiter := a.getMsgLimiter(connID)
71
defer a.removeMsgLimiter(connID)
72
-
73
initialBurst := make(chan any, 200)
74
err = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
75
if err != nil {
···
227
if !r.OK() {
228
log.Error(ctx, "rate limit exceeded, message rejected")
229
230
-
errorMsg := map[string]string{"error": "rate limit exceeded"}
231
-
errorBytes, _ := json.Marshal(errorMsg)
232
-
conn.WriteMessage(websocket.TextMessage, errorBytes)
233
-
234
-
continue
235
}
236
237
-
// wait for rate limit delay if there is one
238
delay := r.Delay()
239
if delay > 0 {
240
-
select {
241
-
case <-time.After(delay):
242
-
case <-ctx.Done():
243
-
return
244
-
}
245
}
246
247
messageType, message, err := conn.ReadMessage()
···
37
38
limiter := a.getLimiter(clientIP)
39
if !limiter.Allow() {
40
+
apierrors.WriteHTTPTooManyRequests(w, "rate limit")
41
+
return
42
}
43
44
uu, _ := uuid.NewV7()
···
69
70
msgLimiter := a.getMsgLimiter(connID)
71
defer a.removeMsgLimiter(connID)
72
+
73
initialBurst := make(chan any, 200)
74
err = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
75
if err != nil {
···
227
if !r.OK() {
228
log.Error(ctx, "rate limit exceeded, message rejected")
229
230
+
errorMsg := map[string]string{"error": "rate limit exceeded"}
231
+
errorBytes, _ := json.Marshal(errorMsg)
232
+
conn.WriteMessage(websocket.TextMessage, errorBytes)
233
+
234
+
continue
235
}
236
237
+
// wait for rate limit delay if there is one
238
delay := r.Delay()
239
if delay > 0 {
240
+
select {
241
+
case <-time.After(delay):
242
+
case <-ctx.Done():
243
+
return
244
+
}
245
}
246
247
messageType, message, err := conn.ReadMessage()