Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/rtmps 157 lines 3.2 kB view raw
1package media 2 3import ( 4 "bufio" 5 "fmt" 6 "os" 7 "path/filepath" 8 "regexp" 9 "runtime" 10 "runtime/debug" 11 "strings" 12 "sync" 13 "syscall" 14 "testing" 15 "time" 16 17 "github.com/acarl005/stripansi" 18 "github.com/stretchr/testify/require" 19 "stream.place/streamplace/pkg/gstinit" 20) 21 22const IGNORE_LEAKS = "STREAMPLACE_IGNORE_LEAKS" 23const GST_DEBUG_NEEDED = "leaks:9,GST_TRACER:9" 24const LEAK_LINE = "GST_TRACER :0:: object-alive" 25 26var LEAK_DONE_REGEX = regexp.MustCompile(`listed\s+(\d+)\s+alive\s+objects`) 27 28var LeakReport = []string{} 29var LeakReportMutex sync.Mutex 30var LeakDoneCh = make(chan struct{}) 31 32func TestMain(m *testing.M) { 33 if os.Getenv(IGNORE_LEAKS) != "" { 34 gstinit.InitGST() 35 os.Exit(m.Run()) 36 return 37 } 38 gstDebug := os.Getenv("GST_DEBUG") 39 if gstDebug == "" { 40 gstDebug = GST_DEBUG_NEEDED 41 } else { 42 gstDebug = fmt.Sprintf("%s,%s", gstDebug, GST_DEBUG_NEEDED) 43 } 44 os.Setenv("GST_DEBUG", gstDebug) 45 os.Setenv("GST_TRACERS", "leaks") 46 os.Setenv("GST_LEAKS_TRACER_SIG", "1") 47 debug.SetGCPercent(5) 48 49 f, err := os.MkdirTemp("", "") 50 if err != nil { 51 panic(err) 52 } 53 fName := filepath.Join(f, "leak.log") 54 err = syscall.Mkfifo(fName, 0640) 55 if err != nil { 56 panic(err) 57 } 58 os.Setenv("GST_DEBUG_FILE", fName) 59 60 go func() { 61 pipe, err := os.OpenFile(fName, os.O_RDONLY, 0640) 62 if err != nil { 63 panic(err) 64 } 65 defer pipe.Close() 66 // Read and print each line from FD 67 scanner := bufio.NewScanner(pipe) 68 for scanner.Scan() { 69 line := scanner.Text() 70 fmt.Println(line) 71 line = stripansi.Strip(line) 72 if strings.Contains(line, LEAK_LINE) { 73 LeakReportMutex.Lock() 74 LeakReport = append(LeakReport, line) 75 LeakReportMutex.Unlock() 76 } else if LEAK_DONE_REGEX.MatchString(line) { 77 LeakDoneCh <- struct{}{} 78 } else { 79 continue 80 } 81 } 82 if err := scanner.Err(); err != nil { 83 panic(err) 84 } 85 }() 86 gstinit.InitGST() 87 os.Exit(m.Run()) 88} 89 90func getLeakCount(t *testing.T) int { 91 if os.Getenv(IGNORE_LEAKS) != "" { 92 return 0 93 } 94 process, err := os.FindProcess(os.Getpid()) 95 LeakReportMutex.Lock() 96 LeakReport = []string{} 97 LeakReportMutex.Unlock() 98 99 // we want CI to be extra reliable here and a little slower is okay 100 flushes := 2 101 if os.Getenv("CI") != "" { 102 flushes = 5 103 } 104 105 for i := 0; i < flushes; i++ { 106 ch := make(chan struct{}) 107 done := false 108 go func() { 109 thing := &[]byte{} 110 runtime.SetFinalizer(thing, func(thing *[]byte) { 111 done = true 112 ch <- struct{}{} 113 }) 114 }() 115 116 go func() { 117 runtime.GC() 118 runtime.GC() 119 for { 120 if done { 121 break 122 } 123 runtime.GC() 124 runtime.GC() 125 time.Sleep(500 * time.Millisecond) 126 } 127 <-ch 128 }() 129 } 130 131 time.Sleep(time.Duration(flushes) * time.Second) 132 133 err = process.Signal(os.Signal(syscall.SIGUSR1)) 134 require.NoError(t, err) 135 136 <-LeakDoneCh 137 138 LeakReportMutex.Lock() 139 after := len(LeakReport) 140 LeakReportMutex.Unlock() 141 return after 142} 143 144func checkGStreamerLeaks(t *testing.T, expected int) { 145 if os.Getenv(IGNORE_LEAKS) != "" { 146 return 147 } 148 leaks := getLeakCount(t) 149 if leaks > expected { 150 LeakReportMutex.Lock() 151 for _, l := range LeakReport { 152 fmt.Println(l) 153 } 154 LeakReportMutex.Unlock() 155 } 156 require.Equal(t, expected, len(LeakReport), "Leaks found") 157}