Live video on the AT Protocol
1package media
2
3import (
4 "bufio"
5 "fmt"
6 "os"
7 "path/filepath"
8 "regexp"
9 "runtime"
10 "runtime/debug"
11 "strconv"
12 "strings"
13 "sync"
14 "syscall"
15 "testing"
16 "time"
17
18 "github.com/acarl005/stripansi"
19 "github.com/cenkalti/backoff/v5"
20 "github.com/stretchr/testify/require"
21 "stream.place/streamplace/pkg/gstinit"
22)
23
24var streamplaceTestCount = 50
25
26func init() {
27 testRunsStr := os.Getenv("STREAMPLACE_TEST_COUNT")
28 if testRunsStr != "" {
29 var err error
30 streamplaceTestCount, err = strconv.Atoi(testRunsStr)
31 if err != nil {
32 panic(fmt.Sprintf("STREAMPLACE_TEST_COUNT is not a number: %s", testRunsStr))
33 }
34 }
35}
36
37var LeakTestMutex sync.Mutex
38
39const IgnoreLeaks = "STREAMPLACE_IGNORE_LEAKS"
40const ShowTrace = "STREAMPLACE_SHOW_TRACE"
41const GSTDebugNeeded = "leaks:9,GST_TRACER:9"
42const LeakLine = "GST_TRACER :0:: object-alive"
43
44var LeakDoneRegex = regexp.MustCompile(`listed\s+(\d+)\s+alive\s+objects`)
45
46var LeakReport = []string{}
47var LeakReportMutex sync.Mutex
48var LeakDoneCh = make(chan struct{})
49
50func TestMain(m *testing.M) {
51 if os.Getenv(IgnoreLeaks) != "" {
52 gstinit.InitGST()
53 os.Exit(m.Run())
54 return
55 }
56 showTrace := os.Getenv(ShowTrace) != ""
57 gstDebug := os.Getenv("GST_DEBUG")
58 if gstDebug == "" {
59 gstDebug = GSTDebugNeeded
60 } else {
61 gstDebug = fmt.Sprintf("%s,%s", gstDebug, GSTDebugNeeded)
62 }
63 os.Setenv("GST_DEBUG", gstDebug)
64 os.Setenv("GST_TRACERS", "leaks")
65 os.Setenv("GST_LEAKS_TRACER_SIG", "1")
66 debug.SetGCPercent(5)
67
68 f, err := os.MkdirTemp("", "")
69 if err != nil {
70 panic(err)
71 }
72 fName := filepath.Join(f, "leak.log")
73 err = syscall.Mkfifo(fName, 0640)
74 if err != nil {
75 panic(err)
76 }
77 os.Setenv("GST_DEBUG_FILE", fName)
78
79 go func() {
80 pipe, err := os.OpenFile(fName, os.O_RDONLY, 0640)
81 if err != nil {
82 panic(err)
83 }
84 defer pipe.Close()
85 // Read and print each line from FD
86 scanner := bufio.NewScanner(pipe)
87 for scanner.Scan() {
88 lineAnsi := scanner.Text()
89 line := stripansi.Strip(lineAnsi)
90 if !strings.Contains(line, " TRACE ") || showTrace {
91 fmt.Println(lineAnsi)
92 }
93 if strings.Contains(line, LeakLine) {
94 LeakReportMutex.Lock()
95 LeakReport = append(LeakReport, line)
96 LeakReportMutex.Unlock()
97 } else if LeakDoneRegex.MatchString(line) {
98 LeakDoneCh <- struct{}{}
99 } else {
100 continue
101 }
102 }
103 if err := scanner.Err(); err != nil {
104 panic(err)
105 }
106 }()
107 gstinit.InitGST()
108 os.Exit(m.Run())
109}
110
111// Often the GC is instance, but sometimes it takes a while. So, we retry a few times
112// with exponential backoff, giving the GC more time to do its thing.
113func getLeakCount(t *testing.T) int {
114 ticker := backoff.NewTicker(backoff.NewExponentialBackOff())
115 defer ticker.Stop()
116 var leaks int
117 for i := 0; i < 10; i++ {
118 leaks = getLeakCountInner(t)
119 if leaks == 0 {
120 return leaks
121 }
122 if i < 9 {
123 <-ticker.C
124 }
125 }
126 return leaks
127}
128
129func getLeakCountInner(t *testing.T) int {
130 if os.Getenv(IgnoreLeaks) != "" {
131 return 0
132 }
133 process, err := os.FindProcess(os.Getpid())
134 require.NoError(t, err)
135
136 LeakReportMutex.Lock()
137 LeakReport = []string{}
138 LeakReportMutex.Unlock()
139
140 flushes := 2
141
142 for range flushes {
143 ch := make(chan struct{})
144 done := false
145 go func() {
146 thing := &[]byte{}
147 runtime.SetFinalizer(thing, func(thing *[]byte) {
148 done = true
149 ch <- struct{}{}
150 })
151 }()
152
153 go func() {
154 runtime.GC()
155 runtime.GC()
156 for !done {
157
158 runtime.GC()
159 runtime.GC()
160 time.Sleep(500 * time.Millisecond)
161 }
162 <-ch
163 }()
164 }
165
166 err = process.Signal(os.Signal(syscall.SIGUSR1))
167 require.NoError(t, err)
168
169 <-LeakDoneCh
170
171 LeakReportMutex.Lock()
172 after := len(LeakReport)
173 LeakReportMutex.Unlock()
174 return after
175}
176
177func checkGStreamerLeaks(t *testing.T, expected int) {
178 if os.Getenv(IgnoreLeaks) != "" {
179 return
180 }
181 leaks := getLeakCount(t)
182 if leaks > expected {
183 LeakReportMutex.Lock()
184 for _, l := range LeakReport {
185 fmt.Println(l)
186 }
187 LeakReportMutex.Unlock()
188 require.Equal(t, expected, len(LeakReport), "Leaks found")
189 }
190}
191
192func withNoGSTLeaks(t *testing.T, f func()) {
193 LeakTestMutex.Lock()
194 defer LeakTestMutex.Unlock()
195 gstinit.InitGST()
196 before := getLeakCount(t)
197 defer checkGStreamerLeaks(t, before)
198 // ignore := goleak.IgnoreCurrent()
199 // defer goleak.VerifyNone(t, ignore)
200 f()
201}