Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/fix-postgres-locking 141 lines 3.9 kB view raw
1package spmetrics 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 "stream.place/streamplace/pkg/log" 11) 12 13const SessionExpireTime = 30 * time.Second //nolint:all 14 15var viewers = map[string]int{} 16var viewersLock sync.RWMutex 17 18var sessions = map[string]map[string]time.Time{} 19var sessionsLock sync.RWMutex 20var Viewers = promauto.NewGaugeVec(prometheus.GaugeOpts{ 21 Name: "streamplace_viewers", 22 Help: "number of current viewers per user", 23}, []string{"streamer"}) 24 25var ViewersTotal = promauto.NewGauge(prometheus.GaugeOpts{ 26 Name: "streamplace_viewers_total", 27 Help: "total number of viewers", 28}) 29 30var TranscodeAttemptsTotal = promauto.NewCounter(prometheus.CounterOpts{ 31 Name: "streamplace_transcode_attempts_total", 32 Help: "total number of transcode attempts", 33}) 34 35var TranscodeSuccessesTotal = promauto.NewCounter(prometheus.CounterOpts{ 36 Name: "streamplace_transcode_successes_total", 37 Help: "total number of transcode successes", 38}) 39 40var TranscodeErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ 41 Name: "streamplace_transcode_errors_total", 42 Help: "total number of transcode errors", 43}) 44 45var TranscodeDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 46 Name: "streamplace_transcode_duration_ms", 47 Help: "duration of transcode in ms", 48 Buckets: []float64{0, 250, 500, 750, 1000, 1250, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 10000}, 49}, []string{"streamer"}) 50 51var SigningDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 52 Name: "streamplace_signing_duration_ms", 53 Help: "duration of transcode in ms", 54 Buckets: []float64{0, 250, 500, 750, 1000, 1250, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 10000, 20000, 30000, 60000}, 55}, []string{"streamer"}) 56 57var QueuedTranscodeDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ 58 Name: "streamplace_queued_transcode_duration_ms", 59 Help: "duration of transcode in ms, including time spent waiting", 60}, []string{"streamer"}) 61 62var Version = promauto.NewCounterVec(prometheus.CounterOpts{ 63 Name: "streamplace_version", 64 Help: "version of streamplace", 65}, []string{"version"}) 66 67var WebsocketsOpen = promauto.NewGauge(prometheus.GaugeOpts{ 68 Name: "streamplace_websockets_open", 69 Help: "number of open websockets", 70}) 71 72var SegmentSubscriptionsOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{ 73 Name: "streamplace_segment_subscriptions_open", 74 Help: "number of open new segment subscriptions", 75}, []string{"streamer", "rendition"}) 76 77func ViewerInc(user string) { 78 go func() { 79 viewersLock.Lock() 80 defer viewersLock.Unlock() 81 viewers[user]++ 82 Viewers.WithLabelValues(user).Set(float64(viewers[user])) 83 ViewersTotal.Inc() 84 }() 85} 86 87func ViewerDec(user string) { 88 go func() { 89 viewersLock.Lock() 90 defer viewersLock.Unlock() 91 viewers[user]-- 92 if viewers[user] == 0 { 93 Viewers.DeleteLabelValues(user) 94 } else { 95 Viewers.WithLabelValues(user).Set(float64(viewers[user])) 96 } 97 ViewersTotal.Dec() 98 }() 99} 100 101func GetViewCount(user string) int { 102 viewersLock.RLock() 103 defer viewersLock.RUnlock() 104 return viewers[user] 105} 106 107func SessionSeen(user string, session string) { 108 now := time.Now() 109 go func() { 110 sessionsLock.Lock() 111 defer sessionsLock.Unlock() 112 if _, ok := sessions[user]; !ok { 113 sessions[user] = map[string]time.Time{} 114 } 115 if _, ok := sessions[user][session]; !ok { 116 log.Warn(context.TODO(), "ViewerInc", "user", user, "session", session) 117 ViewerInc(user) 118 } 119 sessions[user][session] = now 120 }() 121} 122 123func ExpireSessions(ctx context.Context) error { 124 for { 125 select { 126 case <-ctx.Done(): 127 return nil 128 case <-time.After(5 * time.Second): 129 sessionsLock.Lock() 130 for user, sessions := range sessions { 131 for session, seen := range sessions { 132 if time.Since(seen) > SessionExpireTime { 133 delete(sessions, session) 134 ViewerDec(user) 135 } 136 } 137 } 138 sessionsLock.Unlock() 139 } 140 } 141}