Live video on the AT Protocol
1package media
2
3import (
4 "bufio"
5 "fmt"
6 "os"
7 "path/filepath"
8 "regexp"
9 "runtime"
10 "runtime/debug"
11 "strings"
12 "sync"
13 "syscall"
14 "testing"
15 "time"
16
17 "github.com/acarl005/stripansi"
18 "github.com/stretchr/testify/require"
19 "stream.place/streamplace/pkg/gstinit"
20)
21
22const IgnoreLeaks = "STREAMPLACE_IGNORE_LEAKS"
23const GSTDebugNeeded = "leaks:9,GST_TRACER:9"
24const LeakLine = "GST_TRACER :0:: object-alive"
25
26var LeakDoneRegex = regexp.MustCompile(`listed\s+(\d+)\s+alive\s+objects`)
27
28var LeakReport = []string{}
29var LeakReportMutex sync.Mutex
30var LeakDoneCh = make(chan struct{})
31
32func TestMain(m *testing.M) {
33 if os.Getenv(IgnoreLeaks) != "" {
34 gstinit.InitGST()
35 os.Exit(m.Run())
36 return
37 }
38 gstDebug := os.Getenv("GST_DEBUG")
39 if gstDebug == "" {
40 gstDebug = GSTDebugNeeded
41 } else {
42 gstDebug = fmt.Sprintf("%s,%s", gstDebug, GSTDebugNeeded)
43 }
44 os.Setenv("GST_DEBUG", gstDebug)
45 os.Setenv("GST_TRACERS", "leaks")
46 os.Setenv("GST_LEAKS_TRACER_SIG", "1")
47 debug.SetGCPercent(5)
48
49 f, err := os.MkdirTemp("", "")
50 if err != nil {
51 panic(err)
52 }
53 fName := filepath.Join(f, "leak.log")
54 err = syscall.Mkfifo(fName, 0640)
55 if err != nil {
56 panic(err)
57 }
58 os.Setenv("GST_DEBUG_FILE", fName)
59
60 go func() {
61 pipe, err := os.OpenFile(fName, os.O_RDONLY, 0640)
62 if err != nil {
63 panic(err)
64 }
65 defer pipe.Close()
66 // Read and print each line from FD
67 scanner := bufio.NewScanner(pipe)
68 for scanner.Scan() {
69 line := scanner.Text()
70 fmt.Println(line)
71 line = stripansi.Strip(line)
72 if strings.Contains(line, LeakLine) {
73 LeakReportMutex.Lock()
74 LeakReport = append(LeakReport, line)
75 LeakReportMutex.Unlock()
76 } else if LeakDoneRegex.MatchString(line) {
77 LeakDoneCh <- struct{}{}
78 } else {
79 continue
80 }
81 }
82 if err := scanner.Err(); err != nil {
83 panic(err)
84 }
85 }()
86 gstinit.InitGST()
87 os.Exit(m.Run())
88}
89
90func getLeakCount(t *testing.T) int {
91 if os.Getenv(IgnoreLeaks) != "" {
92 return 0
93 }
94 process, err := os.FindProcess(os.Getpid())
95 require.NoError(t, err)
96
97 LeakReportMutex.Lock()
98 LeakReport = []string{}
99 LeakReportMutex.Unlock()
100
101 // we want CI to be extra reliable here and a little slower is okay
102 flushes := 2
103 if os.Getenv("CI") != "" {
104 flushes = 5
105 }
106
107 for range flushes {
108 ch := make(chan struct{})
109 done := false
110 go func() {
111 thing := &[]byte{}
112 runtime.SetFinalizer(thing, func(thing *[]byte) {
113 done = true
114 ch <- struct{}{}
115 })
116 }()
117
118 go func() {
119 runtime.GC()
120 runtime.GC()
121 for !done {
122
123 runtime.GC()
124 runtime.GC()
125 time.Sleep(500 * time.Millisecond)
126 }
127 <-ch
128 }()
129 }
130
131 time.Sleep(time.Duration(flushes) * time.Second)
132
133 err = process.Signal(os.Signal(syscall.SIGUSR1))
134 require.NoError(t, err)
135
136 <-LeakDoneCh
137
138 LeakReportMutex.Lock()
139 after := len(LeakReport)
140 LeakReportMutex.Unlock()
141 return after
142}
143
144func checkGStreamerLeaks(t *testing.T, expected int) {
145 if os.Getenv(IgnoreLeaks) != "" {
146 return
147 }
148 leaks := getLeakCount(t)
149 if leaks > expected {
150 LeakReportMutex.Lock()
151 for _, l := range LeakReport {
152 fmt.Println(l)
153 }
154 LeakReportMutex.Unlock()
155 }
156 require.Equal(t, expected, len(LeakReport), "Leaks found")
157}