Live video on the AT Protocol
1package cmd
2
3import (
4 "bytes"
5 "context"
6 "crypto/rand"
7 "errors"
8 "flag"
9 "fmt"
10 "net/url"
11 "os"
12 "os/signal"
13 "runtime"
14 "runtime/pprof"
15 "slices"
16 "strconv"
17 "strings"
18 "syscall"
19 "time"
20
21 "github.com/bluesky-social/indigo/carstore"
22 "github.com/ethereum/go-ethereum/common/hexutil"
23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
24 "github.com/peterbourgon/ff/v3"
25 "github.com/streamplace/oatproxy/pkg/oatproxy"
26 "stream.place/streamplace/pkg/aqhttp"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/director"
30 "stream.place/streamplace/pkg/gstinit"
31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
32 "stream.place/streamplace/pkg/localdb"
33 "stream.place/streamplace/pkg/log"
34 "stream.place/streamplace/pkg/media"
35 "stream.place/streamplace/pkg/notifications"
36 "stream.place/streamplace/pkg/replication"
37 "stream.place/streamplace/pkg/replication/iroh_replicator"
38 "stream.place/streamplace/pkg/replication/websocketrep"
39 "stream.place/streamplace/pkg/rtmps"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/statedb"
42 "stream.place/streamplace/pkg/storage"
43
44 _ "github.com/go-gst/go-glib/glib"
45 _ "github.com/go-gst/go-gst/gst"
46 "stream.place/streamplace/pkg/api"
47 "stream.place/streamplace/pkg/config"
48 "stream.place/streamplace/pkg/model"
49)
50
51// Additional jobs that can be injected by platforms
52type jobFunc func(ctx context.Context, cli *config.CLI) error
53
54// parse the CLI and fire up an streamplace node!
55func start(build *config.BuildFlags, platformJobs []jobFunc) error {
56 iroh_streamplace.InitLogging()
57 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
58 err := media.RunSelfTest(context.Background())
59 if err != nil {
60 if selfTest {
61 fmt.Println(err.Error())
62 os.Exit(1)
63 } else {
64 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
65 if retryCount >= 3 {
66 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
67 return err
68 }
69 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
70 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
71 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
72 if err != nil {
73 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
74 return err
75 }
76 panic("invalid code path: exec succeeded but we're still here???")
77 }
78 }
79 if selfTest {
80 runtime.GC()
81 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
82 log.Error(context.Background(), "error creating pprof", "error", err)
83 }
84 fmt.Println("self-test successful!")
85 os.Exit(0)
86 }
87
88 if len(os.Args) > 1 && os.Args[1] == "stream" {
89 if len(os.Args) != 3 {
90 fmt.Println("usage: streamplace stream [user]")
91 os.Exit(1)
92 }
93 return Stream(os.Args[2])
94 }
95
96 if len(os.Args) > 1 && os.Args[1] == "live" {
97 cli := config.CLI{Build: build}
98 fs := cli.NewFlagSet("streamplace live")
99
100 err := cli.Parse(fs, os.Args[2:])
101 if err != nil {
102 return err
103 }
104
105 args := fs.Args()
106 if len(args) != 1 {
107 fmt.Println("usage: streamplace live [flags] [stream-key]")
108 os.Exit(1)
109 }
110
111 return Live(args[0], cli.HTTPInternalAddr)
112 }
113
114 if len(os.Args) > 1 && os.Args[1] == "sign" {
115 return Sign(context.Background())
116 }
117
118 if len(os.Args) > 1 && os.Args[1] == "whep" {
119 return WHEP(os.Args[2:])
120 }
121 if len(os.Args) > 1 && os.Args[1] == "whip" {
122 return WHIP(os.Args[2:])
123 }
124
125 if len(os.Args) > 1 && os.Args[1] == "combine" {
126 return Combine(context.Background(), build, os.Args[2:])
127 }
128
129 if len(os.Args) > 1 && os.Args[1] == "split" {
130 cli := config.CLI{Build: build}
131 fs := cli.NewFlagSet("streamplace split")
132
133 err := cli.Parse(fs, os.Args[2:])
134 if err != nil {
135 return err
136 }
137 ctx := context.Background()
138 ctx = log.WithDebugValue(ctx, cli.Debug)
139 if len(fs.Args()) != 2 {
140 fmt.Println("usage: streamplace split [flags] [input file] [output directory]")
141 os.Exit(1)
142 }
143 gstinit.InitGST()
144 return Split(ctx, fs.Args()[0], fs.Args()[1])
145 }
146
147 if len(os.Args) > 1 && os.Args[1] == "self-test" {
148 err := media.RunSelfTest(context.Background())
149 if err != nil {
150 fmt.Println(err.Error())
151 os.Exit(1)
152 }
153 fmt.Println("self-test successful!")
154 os.Exit(0)
155 }
156
157 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
158 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
159 _ = starter.NewLivepeerConfig(lpfs)
160 err = ff.Parse(lpfs, os.Args[2:],
161 ff.WithConfigFileFlag("config"),
162 ff.WithEnvVarPrefix("LP"),
163 )
164 if err != nil {
165 return err
166 }
167 err = GoLivepeer(context.Background(), lpfs)
168 if err != nil {
169 log.Error(context.Background(), "error in livepeer", "error", err)
170 os.Exit(1)
171 }
172 os.Exit(0)
173 }
174
175 _ = flag.Set("logtostderr", "true")
176 vFlag := flag.Lookup("v")
177 cli := config.CLI{Build: build}
178 fs := cli.NewFlagSet("streamplace")
179 verbosity := fs.String("v", "3", "log verbosity level")
180 version := fs.Bool("version", false, "print version and exit")
181
182 err = cli.Parse(
183 fs, os.Args[1:],
184 )
185 if err != nil {
186 return err
187 }
188
189 err = flag.CommandLine.Parse(nil)
190 if err != nil {
191 return err
192 }
193 _ = vFlag.Value.Set(*verbosity)
194 log.SetColorLogger(cli.Color)
195 ctx := context.Background()
196 ctx = log.WithDebugValue(ctx, cli.Debug)
197
198 log.Log(ctx,
199 "streamplace",
200 "version", build.Version,
201 "buildTime", build.BuildTimeStr(),
202 "uuid", build.UUID,
203 "runtime.GOOS", runtime.GOOS,
204 "runtime.GOARCH", runtime.GOARCH,
205 "runtime.Version", runtime.Version())
206 if *version {
207 return nil
208 }
209 signer, err := createSigner(ctx, &cli)
210 if err != nil {
211 return err
212 }
213
214 if len(os.Args) > 1 && os.Args[1] == "migrate" {
215 return statedb.Migrate(&cli)
216 }
217
218 spmetrics.Version.WithLabelValues(build.Version).Inc()
219 if cli.LivepeerHelp {
220 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
221 _ = starter.NewLivepeerConfig(lpFlags)
222 lpFlags.VisitAll(func(f *flag.Flag) {
223 adapted := config.ToSnakeCase(f.Name)
224 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
225 usage := fmt.Sprintf(" %s", f.Usage)
226 if f.DefValue != "" {
227 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
228 }
229 fmt.Printf(" %s\n", usage)
230 })
231 return nil
232 }
233
234 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
235
236 err = os.MkdirAll(cli.DataDir, os.ModePerm)
237 if err != nil {
238 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
239 }
240
241 ldb, err := localdb.MakeDB(cli.LocalDBURL)
242 if err != nil {
243 return err
244 }
245
246 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
247 if err != nil {
248 return err
249 }
250 var noter notifications.FirebaseNotifier
251 if cli.FirebaseServiceAccount != "" {
252 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
253 if err != nil {
254 return err
255 }
256 }
257
258 group, ctx := TimeoutGroupWithContext(ctx)
259
260 out := carstore.SQLiteStore{}
261 err = out.Open(":memory:")
262 if err != nil {
263 return err
264 }
265 state, err := statedb.MakeDB(ctx, &cli, noter, mod)
266 if err != nil {
267 return err
268 }
269 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
270 if err != nil {
271 return err
272 }
273 defer handle.Close()
274
275 jwk, err := state.EnsureJWK(ctx, "jwk")
276 if err != nil {
277 return err
278 }
279 cli.JWK = jwk
280
281 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
282 if err != nil {
283 return err
284 }
285 cli.AccessJWK = accessJWK
286
287 b := bus.NewBus()
288 atsync := &atproto.ATProtoSynchronizer{
289 CLI: &cli,
290 Model: mod,
291 StatefulDB: state,
292 Noter: noter,
293 Bus: b,
294 }
295 err = atsync.Migrate(ctx)
296 if err != nil {
297 return fmt.Errorf("failed to migrate: %w", err)
298 }
299
300 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync, ldb)
301 if err != nil {
302 return err
303 }
304
305 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod)
306 if err != nil {
307 return err
308 }
309
310 var clientMetadata *oatproxy.OAuthClientMetadata
311 var host string
312 if cli.PublicOAuth {
313 u, err := url.Parse(cli.OwnPublicURL())
314 if err != nil {
315 return err
316 }
317 host = u.Host
318 clientMetadata = &oatproxy.OAuthClientMetadata{
319 Scope: atproto.OAuthString,
320 ClientName: "Streamplace",
321 RedirectURIs: []string{
322 fmt.Sprintf("%s/login", cli.OwnPublicURL()),
323 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()),
324 },
325 }
326 } else {
327 host = cli.BroadcasterHost
328 clientMetadata = &oatproxy.OAuthClientMetadata{
329 Scope: atproto.OAuthString,
330 ClientName: "Streamplace",
331 RedirectURIs: []string{
332 fmt.Sprintf("https://%s/login", cli.BroadcasterHost),
333 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost),
334 },
335 }
336 }
337
338 var replicator replication.Replicator = nil
339 if slices.Contains(cli.Replicators, config.ReplicatorIroh) {
340 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"})
341 if err != nil {
342 return err
343 }
344 if !exists {
345 secret := make([]byte, 32)
346 _, err := rand.Read(secret)
347 if err != nil {
348 return fmt.Errorf("failed to generate random secret: %w", err)
349 }
350 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true)
351 if err != nil {
352 return err
353 }
354 }
355 buf := bytes.Buffer{}
356 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf)
357 if err != nil {
358 return err
359 }
360 secret := buf.Bytes()
361 var topic []byte
362 if cli.IrohTopic != "" {
363 topic, err = hexutil.Decode("0x" + cli.IrohTopic)
364 if err != nil {
365 return err
366 }
367 }
368 replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod)
369 if err != nil {
370 return err
371 }
372 }
373 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) {
374 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm)
375 }
376
377 op := oatproxy.New(&oatproxy.Config{
378 Host: host,
379 CreateOAuthSession: state.CreateOAuthSession,
380 UpdateOAuthSession: state.UpdateOAuthSession,
381 GetOAuthSession: state.LoadOAuthSession,
382 Lock: state.GetNamedLock,
383 Scope: atproto.OAuthString,
384 UpstreamJWK: cli.JWK,
385 DownstreamJWK: cli.AccessJWK,
386 ClientMetadata: clientMetadata,
387 Public: cli.PublicOAuth,
388 HTTPClient: &aqhttp.Client,
389 })
390 d := director.NewDirector(mm, mod, &cli, b, op, state, replicator, ldb)
391 a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb)
392 if err != nil {
393 return err
394 }
395
396 ctx = log.WithLogValues(ctx, "version", build.Version)
397
398 group.Go(func() error {
399 return handleSignals(ctx)
400 })
401
402 group.Go(func() error {
403 return state.ProcessQueue(ctx)
404 })
405
406 if cli.TracingEndpoint != "" {
407 group.Go(func() error {
408 return startTelemetry(ctx, cli.TracingEndpoint)
409 })
410 }
411
412 if cli.Secure {
413 group.Go(func() error {
414 return a.ServeHTTPS(ctx)
415 })
416 group.Go(func() error {
417 return a.ServeHTTPRedirect(ctx)
418 })
419 if cli.RTMPServerAddon != "" {
420 group.Go(func() error {
421 return rtmps.ServeRTMPSAddon(ctx, &cli)
422 })
423 }
424 group.Go(func() error {
425 return a.ServeRTMPS(ctx, &cli)
426 })
427 } else {
428 group.Go(func() error {
429 return a.ServeHTTP(ctx)
430 })
431 group.Go(func() error {
432 return a.ServeRTMP(ctx)
433 })
434 }
435
436 group.Go(func() error {
437 return a.ServeInternalHTTP(ctx)
438 })
439
440 if !cli.NoFirehose {
441 group.Go(func() error {
442 return atsync.StartFirehose(ctx)
443 })
444 }
445 for _, labeler := range cli.Labelers {
446 group.Go(func() error {
447 return atsync.StartLabelerFirehose(ctx, labeler)
448 })
449 }
450
451 group.Go(func() error {
452 return a.ExpireSessions(ctx)
453 })
454
455 group.Go(func() error {
456 return storage.StartSegmentCleaner(ctx, ldb, &cli)
457 })
458
459 group.Go(func() error {
460 return ldb.StartSegmentCleaner(ctx)
461 })
462
463 group.Go(func() error {
464 return replicator.Start(ctx, &cli)
465 })
466
467 if cli.LivepeerGateway {
468 // make a file to make sure the directory exists
469 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
470 if err != nil {
471 return err
472 }
473 fd.Close()
474 if err != nil {
475 return err
476 }
477 group.Go(func() error {
478 err := GoLivepeer(ctx, fs)
479 if err != nil {
480 return err
481 }
482 // livepeer returns nil on error, so we need to check if we're responsible
483 if ctx.Err() == nil {
484 return fmt.Errorf("livepeer exited")
485 }
486 return nil
487 })
488 }
489
490 group.Go(func() error {
491 return d.Start(ctx)
492 })
493
494 if cli.TestStream {
495 atkey, err := atproto.ParsePubKey(signer.Public())
496 if err != nil {
497 return err
498 }
499 did := atkey.DIDKey()
500 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, signer, mod)
501 if err != nil {
502 return err
503 }
504 err = mod.UpdateIdentity(&model.Identity{
505 ID: testMediaSigner.Pub().String(),
506 Handle: "stream-self-tester",
507 DID: "",
508 })
509 if err != nil {
510 return err
511 }
512 cli.AllowedStreams = append(cli.AllowedStreams, did)
513 a.Aliases["self-test"] = did
514 group.Go(func() error {
515 return mm.TestSource(ctx, testMediaSigner)
516 })
517
518 // Start a test stream that will run intermittently
519 if err != nil {
520 return err
521 }
522 atkey2, err := atproto.ParsePubKey(signer.Public())
523 if err != nil {
524 return err
525 }
526 did2 := atkey2.DIDKey()
527 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, signer, mod)
528 if err != nil {
529 return err
530 }
531 err = mod.UpdateIdentity(&model.Identity{
532 ID: intermittentMediaSigner.Pub().String(),
533 Handle: "stream-intermittent-tester",
534 DID: "",
535 })
536 if err != nil {
537 return err
538 }
539 cli.AllowedStreams = append(cli.AllowedStreams, did2)
540 a.Aliases["intermittent-self-test"] = did2
541
542 group.Go(func() error {
543 for {
544 // Start intermittent stream
545 intermittentCtx, cancel := context.WithCancel(ctx)
546 done := make(chan struct{})
547 go func() {
548 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
549 close(done)
550 }()
551 // Stream ON for 15 seconds
552 time.Sleep(15 * time.Second)
553 // Stop stream
554 cancel()
555 <-done // Wait for TestSource to exit
556 // Stream OFF for 15 seconds
557 time.Sleep(15 * time.Second)
558 }
559 })
560 }
561
562 for _, job := range platformJobs {
563 group.Go(func() error {
564 return job(ctx, &cli)
565 })
566 }
567
568 if cli.WHIPTest != "" {
569 group.Go(func() error {
570 err := WHIP(strings.Split(cli.WHIPTest, " "))
571 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
572 time.Sleep(time.Second * 3)
573 // gst.Deinit()
574 log.Warn(ctx, "gst deinit complete, exiting")
575 return err
576 })
577 }
578
579 return group.Wait()
580}
581
582var ErrCaughtSignal = errors.New("caught signal")
583
584func handleSignals(ctx context.Context) error {
585 c := make(chan os.Signal, 1)
586 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
587 for {
588 select {
589 case s := <-c:
590 if s == syscall.SIGABRT {
591 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
592 log.Error(ctx, "failed to create pprof", "error", err)
593 }
594 }
595 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
596 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
597 case <-ctx.Done():
598 return nil
599 }
600 }
601}