Live video on the AT Protocol
1package media
2
3import (
4 "bufio"
5 "fmt"
6 "os"
7 "path/filepath"
8 "regexp"
9 "runtime"
10 "runtime/debug"
11 "strconv"
12 "strings"
13 "sync"
14 "syscall"
15 "testing"
16 "time"
17
18 "github.com/acarl005/stripansi"
19 "github.com/cenkalti/backoff/v5"
20 "github.com/stretchr/testify/require"
21 "stream.place/streamplace/pkg/gstinit"
22)
23
24var streamplaceTestCount = 50
25
26func init() {
27 testRunsStr := os.Getenv("STREAMPLACE_TEST_COUNT")
28 if testRunsStr != "" {
29 var err error
30 streamplaceTestCount, err = strconv.Atoi(testRunsStr)
31 if err != nil {
32 panic(fmt.Sprintf("STREAMPLACE_TEST_COUNT is not a number: %s", testRunsStr))
33 }
34 }
35}
36
37var LeakTestMutex sync.Mutex
38
39const IgnoreLeaks = "STREAMPLACE_IGNORE_LEAKS"
40const ShowTrace = "STREAMPLACE_SHOW_TRACE"
41const GSTDebugNeeded = "leaks:9,GST_TRACER:9"
42const LeakLine = "GST_TRACER :0:: object-alive"
43
44var LeakDoneRegex = regexp.MustCompile(`listed\s+(\d+)\s+alive\s+objects`)
45
46var LeakReport = []string{}
47var LeakReportMutex sync.Mutex
48var LeakDoneCh = make(chan struct{})
49
50func TestMain(m *testing.M) {
51 if os.Getenv(IgnoreLeaks) != "" {
52 gstinit.InitGST()
53 os.Exit(m.Run())
54 return
55 }
56 showTrace := os.Getenv(ShowTrace) != ""
57 gstDebug := os.Getenv("GST_DEBUG")
58 if gstDebug == "" {
59 gstDebug = GSTDebugNeeded
60 } else {
61 gstDebug = fmt.Sprintf("%s,%s", gstDebug, GSTDebugNeeded)
62 }
63 os.Setenv("GST_DEBUG", gstDebug)
64 os.Setenv("GST_TRACERS", "leaks")
65 os.Setenv("GST_LEAKS_TRACER_SIG", "1")
66 debug.SetGCPercent(5)
67
68 f, err := os.MkdirTemp("", "")
69 if err != nil {
70 panic(err)
71 }
72 fName := filepath.Join(f, "leak.log")
73 err = syscall.Mkfifo(fName, 0640)
74 if err != nil {
75 panic(err)
76 }
77 os.Setenv("GST_DEBUG_FILE", fName)
78
79 go func() {
80 pipe, err := os.OpenFile(fName, os.O_RDONLY, 0640)
81 if err != nil {
82 panic(err)
83 }
84 defer pipe.Close()
85 // Read and print each line from FD
86 scanner := bufio.NewScanner(pipe)
87 for scanner.Scan() {
88 lineAnsi := scanner.Text()
89 line := stripansi.Strip(lineAnsi)
90 if !strings.Contains(line, " TRACE ") || showTrace {
91 fmt.Println(lineAnsi)
92 }
93 if strings.Contains(line, LeakLine) {
94 LeakReportMutex.Lock()
95 LeakReport = append(LeakReport, line)
96 LeakReportMutex.Unlock()
97 } else if LeakDoneRegex.MatchString(line) {
98 LeakDoneCh <- struct{}{}
99 } else {
100 continue
101 }
102 }
103 if err := scanner.Err(); err != nil {
104 panic(err)
105 }
106 }()
107 gstinit.InitGST()
108 os.Exit(m.Run())
109}
110
111// Often the GC is instance, but sometimes it takes a while. So, we retry a few times
112// with exponential backoff, giving the GC more time to do its thing.
113func getLeakCount(t *testing.T) int {
114 ticker := backoff.NewTicker(backoff.NewExponentialBackOff())
115 defer ticker.Stop()
116 var leaks int
117 for i := 0; i < 10; i++ {
118 leaks = getLeakCountInner(t)
119 if leaks == 0 {
120 return leaks
121 }
122 if i < 9 {
123 <-ticker.C
124 }
125 }
126 return leaks
127}
128
129func getLeakCountInner(t *testing.T) int {
130 if os.Getenv(IgnoreLeaks) != "" {
131 return 0
132 }
133 process, err := os.FindProcess(os.Getpid())
134 require.NoError(t, err)
135
136 LeakReportMutex.Lock()
137 LeakReport = []string{}
138 LeakReportMutex.Unlock()
139
140 // we want CI to be extra reliable here and a little slower is okay
141 flushes := 2
142
143 for range flushes {
144 ch := make(chan struct{})
145 done := false
146 go func() {
147 thing := &[]byte{}
148 runtime.SetFinalizer(thing, func(thing *[]byte) {
149 done = true
150 ch <- struct{}{}
151 })
152 }()
153
154 go func() {
155 runtime.GC()
156 runtime.GC()
157 for !done {
158
159 runtime.GC()
160 runtime.GC()
161 time.Sleep(500 * time.Millisecond)
162 }
163 <-ch
164 }()
165 }
166
167 err = process.Signal(os.Signal(syscall.SIGUSR1))
168 require.NoError(t, err)
169
170 <-LeakDoneCh
171
172 LeakReportMutex.Lock()
173 after := len(LeakReport)
174 LeakReportMutex.Unlock()
175 return after
176}
177
178func checkGStreamerLeaks(t *testing.T, expected int) {
179 if os.Getenv(IgnoreLeaks) != "" {
180 return
181 }
182 leaks := getLeakCount(t)
183 if leaks > expected {
184 LeakReportMutex.Lock()
185 for _, l := range LeakReport {
186 fmt.Println(l)
187 }
188 LeakReportMutex.Unlock()
189 require.Equal(t, expected, len(LeakReport), "Leaks found")
190 }
191}
192
193func withNoGSTLeaks(t *testing.T, f func()) {
194 LeakTestMutex.Lock()
195 defer LeakTestMutex.Unlock()
196 gstinit.InitGST()
197 before := getLeakCount(t)
198 defer checkGStreamerLeaks(t, before)
199 // ignore := goleak.IgnoreCurrent()
200 // defer goleak.VerifyNone(t, ignore)
201 f()
202}