Live video on the AT Protocol
at next 317 lines 10 kB view raw
1package multitest 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "net/http" 8 "os" 9 "os/exec" 10 "path/filepath" 11 "runtime" 12 "strings" 13 "testing" 14 "time" 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 "github.com/bluesky-social/indigo/util" 19 scraper "github.com/starttoaster/prometheus-exporter-scraper" 20 "github.com/stretchr/testify/require" 21 "golang.org/x/sync/errgroup" 22 "stream.place/streamplace/pkg/cmd" 23 "stream.place/streamplace/pkg/crypto/spkey" 24 "stream.place/streamplace/pkg/devenv" 25 "stream.place/streamplace/pkg/gstinit" 26 "stream.place/streamplace/pkg/log" 27 "stream.place/streamplace/pkg/streamplace" 28 "stream.place/streamplace/test/remote" 29) 30 31func TestMultinodeSyndication(t *testing.T) { 32 if os.Getenv("GITHUB_ACTION") != "" { 33 t.Skip("Skipping multitest in GitHub Actions") 34 } 35 gstinit.InitGST() 36 dev := devenv.WithDevEnv(t) 37 acct1 := dev.CreateAccount(t) 38 acct2 := dev.CreateAccount(t) 39 ctx, cancel := context.WithCancel(context.Background()) 40 defer cancel() 41 node1 := startStreamplaceNode(ctx, "node1", t, dev) 42 node2 := startStreamplaceNode(ctx, "node2", t, dev) 43 node3 := startStreamplaceNode(ctx, "node3", t, dev) 44 node1.StartStream(t, acct1) 45 node2.PlayStream(t, acct1) 46 node3.PlayStream(t, acct1) 47 <-time.After(10 * time.Second) 48 node2.Shutdown(t) 49 <-time.After(20 * time.Second) 50 node4 := startStreamplaceNode(ctx, "node4", t, dev) 51 node4.StartStream(t, acct2) 52 node4.PlayStream(t, acct1) 53 node1.PlayStream(t, acct2) 54 node3.PlayStream(t, acct2) 55 <-time.After(30 * time.Second) 56} 57 58func TestOriginSwap(t *testing.T) { 59 if os.Getenv("GITHUB_ACTION") != "" { 60 t.Skip("Skipping multitest in GitHub Actions") 61 } 62 gstinit.InitGST() 63 ctx, cancel := context.WithCancel(context.Background()) 64 defer cancel() 65 dev := devenv.WithDevEnv(t) 66 acct1 := dev.CreateAccount(t) 67 acct2 := dev.CreateAccount(t) 68 node1 := startStreamplaceNode(ctx, "node1", t, dev) 69 node2 := startStreamplaceNode(ctx, "node2", t, dev) 70 node3 := startStreamplaceNode(ctx, "node3", t, dev) 71 // node4 := startStreamplaceNode(ctx, "node4", t, dev) 72 node1.StartStream(t, acct1) 73 node2.StartStream(t, acct2) 74 node1.PlayStream(t, acct1) 75 node2.PlayStream(t, acct1) 76 node3.PlayStream(t, acct1) 77 node1.PlayStream(t, acct2) 78 node2.PlayStream(t, acct2) 79 node3.PlayStream(t, acct2) 80 // node4.PlayStream(t, acct1) 81 <-time.After(30 * time.Second) 82 // node1.StopStream(t, acct1) 83 // node2.StartStream(t, acct1) 84 // <-time.After(20 * time.Second) 85 // // node2.StopStream(t, acct1) 86 // // node3.StartStream(t, acct1) 87 // // node4.Shutdown(t) 88 // // <-time.After(10 * time.Second) 89} 90 91var currentPort = 10000 92 93func nextPort() int { 94 currentPort++ 95 return currentPort 96} 97 98type TestNode struct { 99 Env map[string]string 100 Dev *devenv.DevEnv 101 Cmd *exec.Cmd 102 Ctx context.Context // don't ever do this, it's just a test 103 Shutdown func(t *testing.T) 104 ActiveStreams map[string]context.CancelFunc 105 Name string 106} 107 108func startStreamplaceNode(ctx context.Context, name string, t *testing.T, dev *devenv.DevEnv) *TestNode { 109 nodeCtx, nodeCancel := context.WithCancel(ctx) 110 dataDir := t.TempDir() 111 devAccountCreds := []string{} 112 for _, acct := range dev.Accounts { 113 devAccountCreds = append(devAccountCreds, fmt.Sprintf("%s=%s", acct.DID, acct.Password)) 114 } 115 apiPort := nextPort() 116 env := map[string]string{ 117 "SP_HTTP_ADDR": fmt.Sprintf("127.0.0.1:%d", apiPort), 118 "SP_HTTP_INTERNAL_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()), 119 "SP_RTMP_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()), 120 "SP_RELAY_HOST": strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 121 "SP_PLC_URL": dev.PLCURL, 122 "SP_DATA_DIR": dataDir, 123 "SP_DEV_ACCOUNT_CREDS": strings.Join(devAccountCreds, ","), 124 "SP_STREAM_SESSION_TIMEOUT": "3s", 125 "SP_COLOR": "true", 126 "RUST_LOG": os.Getenv("RUST_LOG"), 127 "SP_BROADCASTER_HOST": fmt.Sprintf("%s.example.com", name), 128 "SP_WEBSOCKET_URL": fmt.Sprintf("ws://127.0.0.1:%d", apiPort), 129 } 130 _, file, _, _ := runtime.Caller(0) 131 buildDir := fmt.Sprintf("build-%s-%s", runtime.GOOS, runtime.GOARCH) 132 abs, err := filepath.Abs(filepath.Join(filepath.Dir(file), "..", "..", buildDir, "streamplace")) 133 require.NoErrorf(t, err, "[%s] failed to resolve absolute binary path", name) 134 // Run the streamplace binary at abs with the environment env 135 cmd := exec.Command(abs) 136 cmd.Env = []string{} 137 for k, v := range env { 138 cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v)) 139 } 140 141 stdoutPipe, err := cmd.StdoutPipe() 142 require.NoErrorf(t, err, "[%s] failed to get stdout pipe", name) 143 stderrPipe, err := cmd.StderrPipe() 144 require.NoErrorf(t, err, "[%s] failed to get stderr pipe", name) 145 146 stdoutDone := make(chan struct{}) 147 stderrDone := make(chan struct{}) 148 149 // Goroutine to read stdout and prefix lines 150 go func() { 151 defer close(stdoutDone) 152 scanner := bufio.NewScanner(stdoutPipe) 153 for scanner.Scan() { 154 fmt.Fprintf(os.Stdout, "[%s STDOUT] %s\n", name, scanner.Text()) 155 } 156 if err := scanner.Err(); err != nil { 157 fmt.Fprintf(os.Stdout, "[%s STDOUT] Error reading stdout: %v\n", name, err) 158 } 159 }() 160 // Goroutine to read stderr and prefix lines 161 go func() { 162 defer close(stderrDone) 163 scanner := bufio.NewScanner(stderrPipe) 164 for scanner.Scan() { 165 fmt.Fprintf(os.Stdout, "[%s STDERR] %s\n", name, scanner.Text()) 166 } 167 if err := scanner.Err(); err != nil { 168 fmt.Fprintf(os.Stdout, "[%s STDERR] Error reading stderr: %v\n", name, err) 169 } 170 }() 171 172 err = cmd.Start() 173 require.NoErrorf(t, err, "[%s] failed to start streamplace process", name) 174 175 // Wait for the streamplace node to be ready by polling the health endpoint 176 healthz := fmt.Sprintf("http://%s/api/healthz", env["SP_HTTP_ADDR"]) 177 client := &http.Client{Timeout: 2 * time.Second} 178 for { 179 resp, err := client.Get(healthz) 180 if err == nil { 181 defer resp.Body.Close() 182 if resp.StatusCode == 200 { 183 break 184 } 185 } 186 time.Sleep(200 * time.Millisecond) 187 } 188 node := &TestNode{ 189 Env: env, 190 Dev: dev, 191 Cmd: cmd, 192 Ctx: nodeCtx, 193 ActiveStreams: make(map[string]context.CancelFunc), 194 Name: name, 195 } 196 go func() { 197 <-nodeCtx.Done() 198 node.Shutdown(t) 199 }() 200 go func() { 201 for { 202 select { 203 case <-nodeCtx.Done(): 204 return 205 case <-time.After(1 * time.Second): 206 scrp, err := scraper.NewWebScraper(fmt.Sprintf("http://%s/metrics", env["SP_HTTP_INTERNAL_ADDR"])) 207 require.NoErrorf(t, err, "[%s] failed to create scraper", name) 208 data, err := scrp.ScrapeWeb() 209 require.NoErrorf(t, err, "[%s] failed to scrape metrics", name) 210 found := false 211 for _, metric := range data.Gauges { 212 if metric.Key == "streamplace_send_segment_calls" { 213 // require.Lessf(t, metric.Value, float64(2), "[%s] send segment calls should be < 2, got %f", name, metric.Value) 214 log.Log(nodeCtx, fmt.Sprintf("[%s] open send_segment calls", name), "open", metric.Value) 215 found = true 216 break 217 } 218 } 219 if !found { 220 require.Fail(t, fmt.Sprintf("[%s] send segment calls metric not found", name)) 221 } 222 } 223 } 224 }() 225 shuttingDown := false 226 nodeShutdown := func(t *testing.T) { 227 if shuttingDown { 228 return 229 } 230 shuttingDown = true 231 nodeCancel() 232 _ = cmd.Process.Kill() 233 _, _ = cmd.Process.Wait() 234 // Wait for stdout/stderr readers to finish 235 <-stdoutDone 236 <-stderrDone 237 } 238 node.Shutdown = nodeShutdown 239 t.Cleanup(func() { 240 node.Shutdown(t) 241 }) 242 return node 243} 244 245func (node *TestNode) StartStream(t *testing.T, acct *devenv.DevEnvAccount) { 246 streamCtx, streamCancel := context.WithCancel(node.Ctx) 247 node.ActiveStreams[acct.DID] = streamCancel 248 priv, pub, err := spkey.GenerateStreamKeyForDID(acct.DID) 249 require.NoErrorf(t, err, "[%s] failed to generate stream key for DID %s", node.Name, acct.DID) 250 createdBy := "multitest" 251 streamKey := streamplace.Key{ 252 SigningKey: pub.DIDKey(), 253 CreatedAt: time.Now().Format(util.ISO8601), 254 CreatedBy: &createdBy, 255 } 256 _, err = comatproto.RepoCreateRecord(context.TODO(), acct.XRPC, &comatproto.RepoCreateRecord_Input{ 257 Collection: "place.stream.key", 258 Repo: acct.DID, 259 Record: &lexutil.LexiconTypeDecoder{Val: &streamKey}, 260 }) 261 require.NoErrorf(t, err, "[%s] failed to create Repo record for DID %s", node.Name, acct.DID) 262 log.Log(context.Background(), "created stream key", "did", acct.DID, "pub", pub.DIDKey()) 263 time.Sleep(1 * time.Second) 264 whip := &cmd.WHIPClient{ 265 StreamKey: priv, 266 File: remote.RemoteFixture("3188c071b354f2e548d7f2d332699758e8e3ab1600280e5b07cb67eedc64f274/BigBuckBunny_1sGOP_240p30_NoBframes.mp4"), 267 Endpoint: fmt.Sprintf("http://%s", node.Env["SP_HTTP_ADDR"]), 268 Count: 1, 269 } 270 271 g, ctx := errgroup.WithContext(streamCtx) 272 g.Go(func() error { 273 return whip.WHIP(ctx) 274 }) 275} 276 277func (node *TestNode) StopStream(t *testing.T, acct *devenv.DevEnvAccount) { 278 cancel := node.ActiveStreams[acct.DID] 279 if cancel == nil { 280 require.FailNow(t, fmt.Sprintf("[%s] stream not active for did %s", node.Name, acct.DID)) 281 } 282 cancel() 283 delete(node.ActiveStreams, acct.DID) 284} 285 286func (node *TestNode) PlayStream(t *testing.T, acct *devenv.DevEnvAccount) { 287 whep := &cmd.WHEPClient{ 288 Endpoint: fmt.Sprintf("http://%s/api/playback/%s/webrtc", node.Env["SP_HTTP_ADDR"], acct.DID), 289 Count: 1, 290 } 291 g, ctx := errgroup.WithContext(node.Ctx) 292 g.Go(func() error { 293 return whep.WHEP(ctx) 294 }) 295 start := time.Now() 296 // start at -1 to give them an extra go-round to boot 297 prevVideoTotal := -1 298 prevAudioTotal := -1 299 g.Go(func() error { 300 for { 301 select { 302 case <-ctx.Done(): 303 return ctx.Err() 304 case <-time.After(5 * time.Second): 305 stats := whep.Stats[0] 306 videoStats := stats["video"] 307 audioStats := stats["audio"] 308 if videoStats.Total == prevVideoTotal || audioStats.Total == prevAudioTotal { 309 require.FailNow(t, fmt.Sprintf("[%s] stream playback stalled did=%s, video=%d, audio=%d, elapsed=%s", node.Name, acct.DID, videoStats.Total, audioStats.Total, time.Since(start))) 310 } 311 prevVideoTotal = videoStats.Total 312 prevAudioTotal = audioStats.Total 313 log.Log(ctx, fmt.Sprintf("[%s] stream playback", node.Name), "did", acct.DID, "video", videoStats.Total, "audio", audioStats.Total, "elapsed", time.Since(start)) 314 } 315 } 316 }) 317}