Live video on the AT Protocol
1package spmetrics
2
3import (
4 "context"
5 "sync"
6 "time"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10 "stream.place/streamplace/pkg/log"
11)
12
13const SessionExpireTime = 30 * time.Second //nolint:all
14
15var viewers = map[string]int{}
16var viewersLock sync.RWMutex
17
18var sessions = map[string]map[string]time.Time{}
19var sessionsLock sync.RWMutex
20var Viewers = promauto.NewGaugeVec(prometheus.GaugeOpts{
21 Name: "streamplace_viewers",
22 Help: "number of current viewers per user",
23}, []string{"streamer"})
24
25var ViewersTotal = promauto.NewGauge(prometheus.GaugeOpts{
26 Name: "streamplace_viewers_total",
27 Help: "total number of viewers",
28})
29
30var TranscodeAttemptsTotal = promauto.NewCounter(prometheus.CounterOpts{
31 Name: "streamplace_transcode_attempts_total",
32 Help: "total number of transcode attempts",
33})
34
35var TranscodeSuccessesTotal = promauto.NewCounter(prometheus.CounterOpts{
36 Name: "streamplace_transcode_successes_total",
37 Help: "total number of transcode successes",
38})
39
40var TranscodeErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
41 Name: "streamplace_transcode_errors_total",
42 Help: "total number of transcode errors",
43})
44
45var TranscodeDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
46 Name: "streamplace_transcode_duration_ms",
47 Help: "duration of transcode in ms",
48 Buckets: []float64{0, 250, 500, 750, 1000, 1250, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 10000},
49}, []string{"streamer"})
50
51var SigningDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
52 Name: "streamplace_signing_duration_ms",
53 Help: "duration of transcode in ms",
54 Buckets: []float64{0, 250, 500, 750, 1000, 1250, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 10000, 20000, 30000, 60000},
55}, []string{"streamer"})
56
57var QueuedTranscodeDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{
58 Name: "streamplace_queued_transcode_duration_ms",
59 Help: "duration of transcode in ms, including time spent waiting",
60}, []string{"streamer"})
61
62var Version = promauto.NewCounterVec(prometheus.CounterOpts{
63 Name: "streamplace_version",
64 Help: "version of streamplace",
65}, []string{"version"})
66
67var WebsocketsOpen = promauto.NewGauge(prometheus.GaugeOpts{
68 Name: "streamplace_websockets_open",
69 Help: "number of open websockets",
70})
71
72var SegmentSubscriptionsOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{
73 Name: "streamplace_segment_subscriptions_open",
74 Help: "number of open new segment subscriptions",
75}, []string{"streamer", "rendition"})
76
77func ViewerInc(user string) {
78 go func() {
79 viewersLock.Lock()
80 defer viewersLock.Unlock()
81 viewers[user]++
82 Viewers.WithLabelValues(user).Set(float64(viewers[user]))
83 ViewersTotal.Inc()
84 }()
85}
86
87func ViewerDec(user string) {
88 go func() {
89 viewersLock.Lock()
90 defer viewersLock.Unlock()
91 viewers[user]--
92 if viewers[user] == 0 {
93 Viewers.DeleteLabelValues(user)
94 } else {
95 Viewers.WithLabelValues(user).Set(float64(viewers[user]))
96 }
97 ViewersTotal.Dec()
98 }()
99}
100
101func GetViewCount(user string) int {
102 viewersLock.RLock()
103 defer viewersLock.RUnlock()
104 return viewers[user]
105}
106
107func SessionSeen(user string, session string) {
108 now := time.Now()
109 go func() {
110 sessionsLock.Lock()
111 defer sessionsLock.Unlock()
112 if _, ok := sessions[user]; !ok {
113 sessions[user] = map[string]time.Time{}
114 }
115 if _, ok := sessions[user][session]; !ok {
116 log.Warn(context.TODO(), "ViewerInc", "user", user, "session", session)
117 ViewerInc(user)
118 }
119 sessions[user][session] = now
120 }()
121}
122
123func ExpireSessions(ctx context.Context) error {
124 for {
125 select {
126 case <-ctx.Done():
127 return nil
128 case <-time.After(5 * time.Second):
129 sessionsLock.Lock()
130 for user, sessions := range sessions {
131 for session, seen := range sessions {
132 if time.Since(seen) > SessionExpireTime {
133 delete(sessions, session)
134 ViewerDec(user)
135 }
136 }
137 }
138 sessionsLock.Unlock()
139 }
140 }
141}