Live video on the AT Protocol
1package multitest
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "net/http"
8 "os"
9 "os/exec"
10 "path/filepath"
11 "runtime"
12 "strings"
13 "testing"
14 "time"
15
16 comatproto "github.com/bluesky-social/indigo/api/atproto"
17 lexutil "github.com/bluesky-social/indigo/lex/util"
18 "github.com/bluesky-social/indigo/util"
19 scraper "github.com/starttoaster/prometheus-exporter-scraper"
20 "github.com/stretchr/testify/require"
21 "golang.org/x/sync/errgroup"
22 "stream.place/streamplace/pkg/cmd"
23 "stream.place/streamplace/pkg/crypto/spkey"
24 "stream.place/streamplace/pkg/devenv"
25 "stream.place/streamplace/pkg/gstinit"
26 "stream.place/streamplace/pkg/log"
27 "stream.place/streamplace/pkg/streamplace"
28 "stream.place/streamplace/test/remote"
29)
30
31func TestMultinodeSyndication(t *testing.T) {
32 if os.Getenv("GITHUB_ACTION") != "" {
33 t.Skip("Skipping multitest in GitHub Actions")
34 }
35 gstinit.InitGST()
36 dev := devenv.WithDevEnv(t)
37 acct1 := dev.CreateAccount(t)
38 acct2 := dev.CreateAccount(t)
39 ctx, cancel := context.WithCancel(context.Background())
40 defer cancel()
41 node1 := startStreamplaceNode(ctx, "node1", t, dev)
42 node2 := startStreamplaceNode(ctx, "node2", t, dev)
43 node3 := startStreamplaceNode(ctx, "node3", t, dev)
44 node1.StartStream(t, acct1)
45 node2.PlayStream(t, acct1)
46 node3.PlayStream(t, acct1)
47 <-time.After(10 * time.Second)
48 node2.Shutdown(t)
49 <-time.After(20 * time.Second)
50 node4 := startStreamplaceNode(ctx, "node4", t, dev)
51 node4.StartStream(t, acct2)
52 node4.PlayStream(t, acct1)
53 node1.PlayStream(t, acct2)
54 node3.PlayStream(t, acct2)
55 <-time.After(30 * time.Second)
56}
57
58func TestOriginSwap(t *testing.T) {
59 if os.Getenv("GITHUB_ACTION") != "" {
60 t.Skip("Skipping multitest in GitHub Actions")
61 }
62 gstinit.InitGST()
63 ctx, cancel := context.WithCancel(context.Background())
64 defer cancel()
65 dev := devenv.WithDevEnv(t)
66 acct1 := dev.CreateAccount(t)
67 acct2 := dev.CreateAccount(t)
68 node1 := startStreamplaceNode(ctx, "node1", t, dev)
69 node2 := startStreamplaceNode(ctx, "node2", t, dev)
70 node3 := startStreamplaceNode(ctx, "node3", t, dev)
71 // node4 := startStreamplaceNode(ctx, "node4", t, dev)
72 node1.StartStream(t, acct1)
73 node2.StartStream(t, acct2)
74 node1.PlayStream(t, acct1)
75 node2.PlayStream(t, acct1)
76 node3.PlayStream(t, acct1)
77 node1.PlayStream(t, acct2)
78 node2.PlayStream(t, acct2)
79 node3.PlayStream(t, acct2)
80 // node4.PlayStream(t, acct1)
81 <-time.After(30 * time.Second)
82 // node1.StopStream(t, acct1)
83 // node2.StartStream(t, acct1)
84 // <-time.After(20 * time.Second)
85 // // node2.StopStream(t, acct1)
86 // // node3.StartStream(t, acct1)
87 // // node4.Shutdown(t)
88 // // <-time.After(10 * time.Second)
89}
90
91var currentPort = 10000
92
93func nextPort() int {
94 currentPort++
95 return currentPort
96}
97
98type TestNode struct {
99 Env map[string]string
100 Dev *devenv.DevEnv
101 Cmd *exec.Cmd
102 Ctx context.Context // don't ever do this, it's just a test
103 Shutdown func(t *testing.T)
104 ActiveStreams map[string]context.CancelFunc
105 Name string
106}
107
108func startStreamplaceNode(ctx context.Context, name string, t *testing.T, dev *devenv.DevEnv) *TestNode {
109 nodeCtx, nodeCancel := context.WithCancel(ctx)
110 dataDir := t.TempDir()
111 devAccountCreds := []string{}
112 for _, acct := range dev.Accounts {
113 devAccountCreds = append(devAccountCreds, fmt.Sprintf("%s=%s", acct.DID, acct.Password))
114 }
115 apiPort := nextPort()
116 env := map[string]string{
117 "SP_HTTP_ADDR": fmt.Sprintf("127.0.0.1:%d", apiPort),
118 "SP_HTTP_INTERNAL_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()),
119 "SP_RTMP_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()),
120 "SP_RELAY_HOST": strings.ReplaceAll(dev.PDSURL, "http://", "ws://"),
121 "SP_PLC_URL": dev.PLCURL,
122 "SP_DATA_DIR": dataDir,
123 "SP_DEV_ACCOUNT_CREDS": strings.Join(devAccountCreds, ","),
124 "SP_STREAM_SESSION_TIMEOUT": "3s",
125 "SP_COLOR": "true",
126 "RUST_LOG": os.Getenv("RUST_LOG"),
127 "SP_BROADCASTER_HOST": fmt.Sprintf("%s.example.com", name),
128 "SP_WEBSOCKET_URL": fmt.Sprintf("ws://127.0.0.1:%d", apiPort),
129 }
130 _, file, _, _ := runtime.Caller(0)
131 buildDir := fmt.Sprintf("build-%s-%s", runtime.GOOS, runtime.GOARCH)
132 abs, err := filepath.Abs(filepath.Join(filepath.Dir(file), "..", "..", buildDir, "streamplace"))
133 require.NoErrorf(t, err, "[%s] failed to resolve absolute binary path", name)
134 // Run the streamplace binary at abs with the environment env
135 cmd := exec.Command(abs)
136 cmd.Env = []string{}
137 for k, v := range env {
138 cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
139 }
140
141 stdoutPipe, err := cmd.StdoutPipe()
142 require.NoErrorf(t, err, "[%s] failed to get stdout pipe", name)
143 stderrPipe, err := cmd.StderrPipe()
144 require.NoErrorf(t, err, "[%s] failed to get stderr pipe", name)
145
146 stdoutDone := make(chan struct{})
147 stderrDone := make(chan struct{})
148
149 // Goroutine to read stdout and prefix lines
150 go func() {
151 defer close(stdoutDone)
152 scanner := bufio.NewScanner(stdoutPipe)
153 for scanner.Scan() {
154 fmt.Fprintf(os.Stdout, "[%s STDOUT] %s\n", name, scanner.Text())
155 }
156 if err := scanner.Err(); err != nil {
157 fmt.Fprintf(os.Stdout, "[%s STDOUT] Error reading stdout: %v\n", name, err)
158 }
159 }()
160 // Goroutine to read stderr and prefix lines
161 go func() {
162 defer close(stderrDone)
163 scanner := bufio.NewScanner(stderrPipe)
164 for scanner.Scan() {
165 fmt.Fprintf(os.Stdout, "[%s STDERR] %s\n", name, scanner.Text())
166 }
167 if err := scanner.Err(); err != nil {
168 fmt.Fprintf(os.Stdout, "[%s STDERR] Error reading stderr: %v\n", name, err)
169 }
170 }()
171
172 err = cmd.Start()
173 require.NoErrorf(t, err, "[%s] failed to start streamplace process", name)
174
175 // Wait for the streamplace node to be ready by polling the health endpoint
176 healthz := fmt.Sprintf("http://%s/api/healthz", env["SP_HTTP_ADDR"])
177 client := &http.Client{Timeout: 2 * time.Second}
178 for {
179 resp, err := client.Get(healthz)
180 if err == nil {
181 defer resp.Body.Close()
182 if resp.StatusCode == 200 {
183 break
184 }
185 }
186 time.Sleep(200 * time.Millisecond)
187 }
188 node := &TestNode{
189 Env: env,
190 Dev: dev,
191 Cmd: cmd,
192 Ctx: nodeCtx,
193 ActiveStreams: make(map[string]context.CancelFunc),
194 Name: name,
195 }
196 go func() {
197 <-nodeCtx.Done()
198 node.Shutdown(t)
199 }()
200 go func() {
201 for {
202 select {
203 case <-nodeCtx.Done():
204 return
205 case <-time.After(1 * time.Second):
206 scrp, err := scraper.NewWebScraper(fmt.Sprintf("http://%s/metrics", env["SP_HTTP_INTERNAL_ADDR"]))
207 require.NoErrorf(t, err, "[%s] failed to create scraper", name)
208 data, err := scrp.ScrapeWeb()
209 require.NoErrorf(t, err, "[%s] failed to scrape metrics", name)
210 found := false
211 for _, metric := range data.Gauges {
212 if metric.Key == "streamplace_send_segment_calls" {
213 // require.Lessf(t, metric.Value, float64(2), "[%s] send segment calls should be < 2, got %f", name, metric.Value)
214 log.Log(nodeCtx, fmt.Sprintf("[%s] open send_segment calls", name), "open", metric.Value)
215 found = true
216 break
217 }
218 }
219 if !found {
220 require.Fail(t, fmt.Sprintf("[%s] send segment calls metric not found", name))
221 }
222 }
223 }
224 }()
225 shuttingDown := false
226 nodeShutdown := func(t *testing.T) {
227 if shuttingDown {
228 return
229 }
230 shuttingDown = true
231 nodeCancel()
232 _ = cmd.Process.Kill()
233 _, _ = cmd.Process.Wait()
234 // Wait for stdout/stderr readers to finish
235 <-stdoutDone
236 <-stderrDone
237 }
238 node.Shutdown = nodeShutdown
239 t.Cleanup(func() {
240 node.Shutdown(t)
241 })
242 return node
243}
244
245func (node *TestNode) StartStream(t *testing.T, acct *devenv.DevEnvAccount) {
246 streamCtx, streamCancel := context.WithCancel(node.Ctx)
247 node.ActiveStreams[acct.DID] = streamCancel
248 priv, pub, err := spkey.GenerateStreamKeyForDID(acct.DID)
249 require.NoErrorf(t, err, "[%s] failed to generate stream key for DID %s", node.Name, acct.DID)
250 createdBy := "multitest"
251 streamKey := streamplace.Key{
252 SigningKey: pub.DIDKey(),
253 CreatedAt: time.Now().Format(util.ISO8601),
254 CreatedBy: &createdBy,
255 }
256 _, err = comatproto.RepoCreateRecord(context.TODO(), acct.XRPC, &comatproto.RepoCreateRecord_Input{
257 Collection: "place.stream.key",
258 Repo: acct.DID,
259 Record: &lexutil.LexiconTypeDecoder{Val: &streamKey},
260 })
261 require.NoErrorf(t, err, "[%s] failed to create Repo record for DID %s", node.Name, acct.DID)
262 log.Log(context.Background(), "created stream key", "did", acct.DID, "pub", pub.DIDKey())
263 time.Sleep(1 * time.Second)
264 whip := &cmd.WHIPClient{
265 StreamKey: priv,
266 File: remote.RemoteFixture("3188c071b354f2e548d7f2d332699758e8e3ab1600280e5b07cb67eedc64f274/BigBuckBunny_1sGOP_240p30_NoBframes.mp4"),
267 Endpoint: fmt.Sprintf("http://%s", node.Env["SP_HTTP_ADDR"]),
268 Count: 1,
269 }
270
271 g, ctx := errgroup.WithContext(streamCtx)
272 g.Go(func() error {
273 return whip.WHIP(ctx)
274 })
275}
276
277func (node *TestNode) StopStream(t *testing.T, acct *devenv.DevEnvAccount) {
278 cancel := node.ActiveStreams[acct.DID]
279 if cancel == nil {
280 require.FailNow(t, fmt.Sprintf("[%s] stream not active for did %s", node.Name, acct.DID))
281 }
282 cancel()
283 delete(node.ActiveStreams, acct.DID)
284}
285
286func (node *TestNode) PlayStream(t *testing.T, acct *devenv.DevEnvAccount) {
287 whep := &cmd.WHEPClient{
288 Endpoint: fmt.Sprintf("http://%s/api/playback/%s/webrtc", node.Env["SP_HTTP_ADDR"], acct.DID),
289 Count: 1,
290 }
291 g, ctx := errgroup.WithContext(node.Ctx)
292 g.Go(func() error {
293 return whep.WHEP(ctx)
294 })
295 start := time.Now()
296 // start at -1 to give them an extra go-round to boot
297 prevVideoTotal := -1
298 prevAudioTotal := -1
299 g.Go(func() error {
300 for {
301 select {
302 case <-ctx.Done():
303 return ctx.Err()
304 case <-time.After(5 * time.Second):
305 stats := whep.Stats[0]
306 videoStats := stats["video"]
307 audioStats := stats["audio"]
308 if videoStats.Total == prevVideoTotal || audioStats.Total == prevAudioTotal {
309 require.FailNow(t, fmt.Sprintf("[%s] stream playback stalled did=%s, video=%d, audio=%d, elapsed=%s", node.Name, acct.DID, videoStats.Total, audioStats.Total, time.Since(start)))
310 }
311 prevVideoTotal = videoStats.Total
312 prevAudioTotal = audioStats.Total
313 log.Log(ctx, fmt.Sprintf("[%s] stream playback", node.Name), "did", acct.DID, "video", videoStats.Total, "audio", audioStats.Total, "elapsed", time.Since(start))
314 }
315 }
316 })
317}