fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package media
2
3import (
4 "bufio"
5 "fmt"
6 "os"
7 "path/filepath"
8 "regexp"
9 "runtime"
10 "runtime/debug"
11 "strconv"
12 "strings"
13 "sync"
14 "syscall"
15 "testing"
16 "time"
17
18 "github.com/acarl005/stripansi"
19 "github.com/cenkalti/backoff/v5"
20 "github.com/stretchr/testify/require"
21 "stream.place/streamplace/pkg/gstinit"
22)
23
24var streamplaceTestCount = 50
25
26func init() {
27 testRunsStr := os.Getenv("STREAMPLACE_TEST_COUNT")
28 if testRunsStr != "" {
29 var err error
30 streamplaceTestCount, err = strconv.Atoi(testRunsStr)
31 if err != nil {
32 panic(fmt.Sprintf("STREAMPLACE_TEST_COUNT is not a number: %s", testRunsStr))
33 }
34 }
35}
36
37var LeakTestMutex sync.Mutex
38
39const IgnoreLeaks = "STREAMPLACE_IGNORE_LEAKS"
40const ShowTrace = "STREAMPLACE_SHOW_TRACE"
41const GSTDebugNeeded = "leaks:9,GST_TRACER:9"
42const LeakLine = "GST_TRACER :0:: object-alive"
43
44var LeakDoneRegex = regexp.MustCompile(`listed\s+(\d+)\s+alive\s+objects`)
45
46var LeakReport = []string{}
47var LeakReportMutex sync.Mutex
48var LeakDoneCh = make(chan struct{})
49
50func TestMain(m *testing.M) {
51 if os.Getenv(IgnoreLeaks) != "" {
52 gstinit.InitGST()
53 os.Exit(m.Run())
54 return
55 }
56 showTrace := os.Getenv(ShowTrace) != ""
57 gstDebug := os.Getenv("GST_DEBUG")
58 if gstDebug == "" {
59 gstDebug = GSTDebugNeeded
60 } else {
61 gstDebug = fmt.Sprintf("%s,%s", gstDebug, GSTDebugNeeded)
62 }
63 os.Setenv("GST_DEBUG", gstDebug)
64 os.Setenv("GST_TRACERS", "leaks")
65 os.Setenv("GST_LEAKS_TRACER_SIG", "1")
66 debug.SetGCPercent(5)
67
68 f, err := os.MkdirTemp("", "")
69 if err != nil {
70 panic(err)
71 }
72 fName := filepath.Join(f, "leak.log")
73 err = syscall.Mkfifo(fName, 0640)
74 if err != nil {
75 panic(err)
76 }
77 os.Setenv("GST_DEBUG_FILE", fName)
78
79 go func() {
80 pipe, err := os.OpenFile(fName, os.O_RDONLY, 0640)
81 if err != nil {
82 panic(err)
83 }
84 defer pipe.Close()
85 // Read and print each line from FD
86 scanner := bufio.NewScanner(pipe)
87 for scanner.Scan() {
88 lineAnsi := scanner.Text()
89 line := stripansi.Strip(lineAnsi)
90 if !strings.Contains(line, " TRACE ") || showTrace {
91 fmt.Println(lineAnsi)
92 }
93 if strings.Contains(line, LeakLine) {
94 LeakReportMutex.Lock()
95 LeakReport = append(LeakReport, line)
96 LeakReportMutex.Unlock()
97 } else if LeakDoneRegex.MatchString(line) {
98 LeakDoneCh <- struct{}{}
99 } else {
100 continue
101 }
102 }
103 if err := scanner.Err(); err != nil {
104 panic(err)
105 }
106 }()
107 gstinit.InitGST()
108 os.Exit(m.Run())
109}
110
111// Often the GC is instance, but sometimes it takes a while. So, we retry a few times
112// with exponential backoff, giving the GC more time to do its thing.
113func getLeakCount(t *testing.T) int {
114 ticker := backoff.NewTicker(backoff.NewExponentialBackOff())
115 defer ticker.Stop()
116 var leaks int
117 for i := 0; i < 10; i++ {
118 leaks = getLeakCountInner(t)
119 if leaks == 0 {
120 return leaks
121 }
122 if i < 9 {
123 <-ticker.C
124 }
125 }
126 return leaks
127}
128
129func getLeakCountInner(t *testing.T) int {
130 if os.Getenv(IgnoreLeaks) != "" {
131 return 0
132 }
133 process, err := os.FindProcess(os.Getpid())
134 require.NoError(t, err)
135
136 LeakReportMutex.Lock()
137 LeakReport = []string{}
138 LeakReportMutex.Unlock()
139
140 flushes := 2
141
142 for range flushes {
143 ch := make(chan struct{})
144 done := false
145 go func() {
146 thing := &[]byte{}
147 runtime.SetFinalizer(thing, func(thing *[]byte) {
148 done = true
149 ch <- struct{}{}
150 })
151 }()
152
153 go func() {
154 runtime.GC()
155 runtime.GC()
156 for !done {
157
158 runtime.GC()
159 runtime.GC()
160 time.Sleep(500 * time.Millisecond)
161 }
162 <-ch
163 }()
164 }
165
166 err = process.Signal(os.Signal(syscall.SIGUSR1))
167 require.NoError(t, err)
168
169 <-LeakDoneCh
170
171 LeakReportMutex.Lock()
172 after := len(LeakReport)
173 LeakReportMutex.Unlock()
174 return after
175}
176
177func checkGStreamerLeaks(t *testing.T, expected int) {
178 if os.Getenv(IgnoreLeaks) != "" {
179 return
180 }
181 leaks := getLeakCount(t)
182 if leaks > expected {
183 LeakReportMutex.Lock()
184 for _, l := range LeakReport {
185 fmt.Println(l)
186 }
187 LeakReportMutex.Unlock()
188 require.Equal(t, expected, len(LeakReport), "Leaks found")
189 }
190}
191
192func withNoGSTLeaks(t *testing.T, f func()) {
193 LeakTestMutex.Lock()
194 defer LeakTestMutex.Unlock()
195 gstinit.InitGST()
196 before := getLeakCount(t)
197 defer checkGStreamerLeaks(t, before)
198 // ignore := goleak.IgnoreCurrent()
199 // defer goleak.VerifyNone(t, ignore)
200 f()
201}