Live video on the AT Protocol
at eli/embedded-rtmp-server 595 lines 15 kB view raw
1package cmd 2 3import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "errors" 8 "flag" 9 "fmt" 10 "net/url" 11 "os" 12 "os/signal" 13 "runtime" 14 "runtime/pprof" 15 "slices" 16 "strconv" 17 "strings" 18 "syscall" 19 "time" 20 21 "github.com/bluesky-social/indigo/carstore" 22 "github.com/ethereum/go-ethereum/common/hexutil" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/peterbourgon/ff/v3" 25 "github.com/streamplace/oatproxy/pkg/oatproxy" 26 "stream.place/streamplace/pkg/aqhttp" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/gstinit" 31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 "stream.place/streamplace/pkg/log" 33 "stream.place/streamplace/pkg/media" 34 "stream.place/streamplace/pkg/notifications" 35 "stream.place/streamplace/pkg/replication" 36 "stream.place/streamplace/pkg/replication/iroh_replicator" 37 "stream.place/streamplace/pkg/replication/websocketrep" 38 "stream.place/streamplace/pkg/rtmps" 39 "stream.place/streamplace/pkg/spmetrics" 40 "stream.place/streamplace/pkg/statedb" 41 "stream.place/streamplace/pkg/storage" 42 43 _ "github.com/go-gst/go-glib/glib" 44 _ "github.com/go-gst/go-gst/gst" 45 "stream.place/streamplace/pkg/api" 46 "stream.place/streamplace/pkg/config" 47 "stream.place/streamplace/pkg/model" 48) 49 50// Additional jobs that can be injected by platforms 51type jobFunc func(ctx context.Context, cli *config.CLI) error 52 53// parse the CLI and fire up an streamplace node! 54func start(build *config.BuildFlags, platformJobs []jobFunc) error { 55 iroh_streamplace.InitLogging() 56 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 57 err := media.RunSelfTest(context.Background()) 58 if err != nil { 59 if selfTest { 60 fmt.Println(err.Error()) 61 os.Exit(1) 62 } else { 63 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 64 if retryCount >= 3 { 65 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 66 return err 67 } 68 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 69 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 70 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 71 if err != nil { 72 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 73 return err 74 } 75 panic("invalid code path: exec succeeded but we're still here???") 76 } 77 } 78 if selfTest { 79 runtime.GC() 80 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 81 log.Error(context.Background(), "error creating pprof", "error", err) 82 } 83 fmt.Println("self-test successful!") 84 os.Exit(0) 85 } 86 87 if len(os.Args) > 1 && os.Args[1] == "stream" { 88 if len(os.Args) != 3 { 89 fmt.Println("usage: streamplace stream [user]") 90 os.Exit(1) 91 } 92 return Stream(os.Args[2]) 93 } 94 95 if len(os.Args) > 1 && os.Args[1] == "live" { 96 cli := config.CLI{Build: build} 97 fs := cli.NewFlagSet("streamplace live") 98 99 err := cli.Parse(fs, os.Args[2:]) 100 if err != nil { 101 return err 102 } 103 104 args := fs.Args() 105 if len(args) != 1 { 106 fmt.Println("usage: streamplace live [flags] [stream-key]") 107 os.Exit(1) 108 } 109 110 return Live(args[0], cli.HTTPInternalAddr) 111 } 112 113 if len(os.Args) > 1 && os.Args[1] == "sign" { 114 return Sign(context.Background()) 115 } 116 117 if len(os.Args) > 1 && os.Args[1] == "whep" { 118 return WHEP(os.Args[2:]) 119 } 120 if len(os.Args) > 1 && os.Args[1] == "whip" { 121 return WHIP(os.Args[2:]) 122 } 123 124 if len(os.Args) > 1 && os.Args[1] == "combine" { 125 return Combine(context.Background(), build, os.Args[2:]) 126 } 127 128 if len(os.Args) > 1 && os.Args[1] == "split" { 129 cli := config.CLI{Build: build} 130 fs := cli.NewFlagSet("streamplace split") 131 132 err := cli.Parse(fs, os.Args[2:]) 133 if err != nil { 134 return err 135 } 136 ctx := context.Background() 137 ctx = log.WithDebugValue(ctx, cli.Debug) 138 if len(fs.Args()) != 2 { 139 fmt.Println("usage: streamplace split [flags] [input file] [output directory]") 140 os.Exit(1) 141 } 142 gstinit.InitGST() 143 return Split(ctx, fs.Args()[0], fs.Args()[1]) 144 } 145 146 if len(os.Args) > 1 && os.Args[1] == "self-test" { 147 err := media.RunSelfTest(context.Background()) 148 if err != nil { 149 fmt.Println(err.Error()) 150 os.Exit(1) 151 } 152 fmt.Println("self-test successful!") 153 os.Exit(0) 154 } 155 156 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 157 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 158 _ = starter.NewLivepeerConfig(lpfs) 159 err = ff.Parse(lpfs, os.Args[2:], 160 ff.WithConfigFileFlag("config"), 161 ff.WithEnvVarPrefix("LP"), 162 ) 163 if err != nil { 164 return err 165 } 166 err = GoLivepeer(context.Background(), lpfs) 167 if err != nil { 168 log.Error(context.Background(), "error in livepeer", "error", err) 169 os.Exit(1) 170 } 171 os.Exit(0) 172 } 173 174 _ = flag.Set("logtostderr", "true") 175 vFlag := flag.Lookup("v") 176 cli := config.CLI{Build: build} 177 fs := cli.NewFlagSet("streamplace") 178 verbosity := fs.String("v", "3", "log verbosity level") 179 version := fs.Bool("version", false, "print version and exit") 180 181 err = cli.Parse( 182 fs, os.Args[1:], 183 ) 184 if err != nil { 185 return err 186 } 187 188 err = flag.CommandLine.Parse(nil) 189 if err != nil { 190 return err 191 } 192 _ = vFlag.Value.Set(*verbosity) 193 log.SetColorLogger(cli.Color) 194 ctx := context.Background() 195 ctx = log.WithDebugValue(ctx, cli.Debug) 196 197 log.Log(ctx, 198 "streamplace", 199 "version", build.Version, 200 "buildTime", build.BuildTimeStr(), 201 "uuid", build.UUID, 202 "runtime.GOOS", runtime.GOOS, 203 "runtime.GOARCH", runtime.GOARCH, 204 "runtime.Version", runtime.Version()) 205 if *version { 206 return nil 207 } 208 signer, err := createSigner(ctx, &cli) 209 if err != nil { 210 return err 211 } 212 213 if len(os.Args) > 1 && os.Args[1] == "migrate" { 214 return statedb.Migrate(&cli) 215 } 216 217 spmetrics.Version.WithLabelValues(build.Version).Inc() 218 if cli.LivepeerHelp { 219 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 220 _ = starter.NewLivepeerConfig(lpFlags) 221 lpFlags.VisitAll(func(f *flag.Flag) { 222 adapted := config.ToSnakeCase(f.Name) 223 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 224 usage := fmt.Sprintf(" %s", f.Usage) 225 if f.DefValue != "" { 226 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 227 } 228 fmt.Printf(" %s\n", usage) 229 }) 230 return nil 231 } 232 233 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 234 235 err = os.MkdirAll(cli.DataDir, os.ModePerm) 236 if err != nil { 237 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 238 } 239 240 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 241 if err != nil { 242 return err 243 } 244 var noter notifications.FirebaseNotifier 245 if cli.FirebaseServiceAccount != "" { 246 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 247 if err != nil { 248 return err 249 } 250 } 251 252 group, ctx := TimeoutGroupWithContext(ctx) 253 254 out := carstore.SQLiteStore{} 255 err = out.Open(":memory:") 256 if err != nil { 257 return err 258 } 259 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 260 if err != nil { 261 return err 262 } 263 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 264 if err != nil { 265 return err 266 } 267 defer handle.Close() 268 269 jwk, err := state.EnsureJWK(ctx, "jwk") 270 if err != nil { 271 return err 272 } 273 cli.JWK = jwk 274 275 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 276 if err != nil { 277 return err 278 } 279 cli.AccessJWK = accessJWK 280 281 b := bus.NewBus() 282 atsync := &atproto.ATProtoSynchronizer{ 283 CLI: &cli, 284 Model: mod, 285 StatefulDB: state, 286 Noter: noter, 287 Bus: b, 288 } 289 err = atsync.Migrate(ctx) 290 if err != nil { 291 return fmt.Errorf("failed to migrate: %w", err) 292 } 293 294 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync) 295 if err != nil { 296 return err 297 } 298 299 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod) 300 if err != nil { 301 return err 302 } 303 304 var clientMetadata *oatproxy.OAuthClientMetadata 305 var host string 306 if cli.PublicOAuth { 307 u, err := url.Parse(cli.OwnPublicURL()) 308 if err != nil { 309 return err 310 } 311 host = u.Host 312 clientMetadata = &oatproxy.OAuthClientMetadata{ 313 Scope: "atproto transition:generic", 314 ClientName: "Streamplace", 315 RedirectURIs: []string{ 316 fmt.Sprintf("%s/login", cli.OwnPublicURL()), 317 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()), 318 }, 319 } 320 } else { 321 host = cli.BroadcasterHost 322 clientMetadata = &oatproxy.OAuthClientMetadata{ 323 Scope: "atproto transition:generic", 324 ClientName: "Streamplace", 325 RedirectURIs: []string{ 326 fmt.Sprintf("https://%s/login", cli.BroadcasterHost), 327 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost), 328 }, 329 } 330 } 331 332 var replicator replication.Replicator = nil 333 if slices.Contains(cli.Replicators, config.ReplicatorIroh) { 334 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 335 if err != nil { 336 return err 337 } 338 if !exists { 339 secret := make([]byte, 32) 340 _, err := rand.Read(secret) 341 if err != nil { 342 return fmt.Errorf("failed to generate random secret: %w", err) 343 } 344 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 345 if err != nil { 346 return err 347 } 348 } 349 buf := bytes.Buffer{} 350 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 351 if err != nil { 352 return err 353 } 354 secret := buf.Bytes() 355 var topic []byte 356 if cli.IrohTopic != "" { 357 topic, err = hexutil.Decode("0x" + cli.IrohTopic) 358 if err != nil { 359 return err 360 } 361 } 362 replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 363 if err != nil { 364 return err 365 } 366 } 367 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) { 368 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm) 369 } 370 371 op := oatproxy.New(&oatproxy.Config{ 372 Host: host, 373 CreateOAuthSession: state.CreateOAuthSession, 374 UpdateOAuthSession: state.UpdateOAuthSession, 375 GetOAuthSession: state.LoadOAuthSession, 376 Lock: state.GetNamedLock, 377 Scope: "atproto transition:generic", 378 UpstreamJWK: cli.JWK, 379 DownstreamJWK: cli.AccessJWK, 380 ClientMetadata: clientMetadata, 381 Public: cli.PublicOAuth, 382 }) 383 d := director.NewDirector(mm, mod, &cli, b, op, state, replicator) 384 a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op) 385 if err != nil { 386 return err 387 } 388 389 ctx = log.WithLogValues(ctx, "version", build.Version) 390 391 group.Go(func() error { 392 return handleSignals(ctx) 393 }) 394 395 group.Go(func() error { 396 return state.ProcessQueue(ctx) 397 }) 398 399 if cli.TracingEndpoint != "" { 400 group.Go(func() error { 401 return startTelemetry(ctx, cli.TracingEndpoint) 402 }) 403 } 404 405 if cli.Secure { 406 group.Go(func() error { 407 return a.ServeHTTPS(ctx) 408 }) 409 group.Go(func() error { 410 return a.ServeHTTPRedirect(ctx) 411 }) 412 if cli.RTMPServerAddon != "" { 413 group.Go(func() error { 414 return rtmps.ServeRTMPSAddon(ctx, &cli) 415 }) 416 } else { 417 group.Go(func() error { 418 return a.ServeRTMPS(ctx, &cli) 419 }) 420 } 421 } else { 422 group.Go(func() error { 423 return a.ServeHTTP(ctx) 424 }) 425 group.Go(func() error { 426 return a.ServeRTMP(ctx) 427 }) 428 } 429 430 group.Go(func() error { 431 return a.ServeInternalHTTP(ctx) 432 }) 433 434 if !cli.NoFirehose { 435 group.Go(func() error { 436 return atsync.StartFirehose(ctx) 437 }) 438 } 439 for _, labeler := range cli.Labelers { 440 group.Go(func() error { 441 return atsync.StartLabelerFirehose(ctx, labeler) 442 }) 443 } 444 445 group.Go(func() error { 446 return a.ExpireSessions(ctx) 447 }) 448 449 group.Go(func() error { 450 return storage.StartSegmentCleaner(ctx, mod, &cli) 451 }) 452 453 group.Go(func() error { 454 return mod.StartSegmentCleaner(ctx) 455 }) 456 457 group.Go(func() error { 458 return replicator.Start(ctx, &cli) 459 }) 460 461 if cli.LivepeerGateway { 462 // make a file to make sure the directory exists 463 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 464 if err != nil { 465 return err 466 } 467 fd.Close() 468 if err != nil { 469 return err 470 } 471 group.Go(func() error { 472 err := GoLivepeer(ctx, fs) 473 if err != nil { 474 return err 475 } 476 // livepeer returns nil on error, so we need to check if we're responsible 477 if ctx.Err() == nil { 478 return fmt.Errorf("livepeer exited") 479 } 480 return nil 481 }) 482 } 483 484 group.Go(func() error { 485 return d.Start(ctx) 486 }) 487 488 if cli.TestStream { 489 atkey, err := atproto.ParsePubKey(signer.Public()) 490 if err != nil { 491 return err 492 } 493 did := atkey.DIDKey() 494 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, signer, mod) 495 if err != nil { 496 return err 497 } 498 err = mod.UpdateIdentity(&model.Identity{ 499 ID: testMediaSigner.Pub().String(), 500 Handle: "stream-self-tester", 501 DID: "", 502 }) 503 if err != nil { 504 return err 505 } 506 cli.AllowedStreams = append(cli.AllowedStreams, did) 507 a.Aliases["self-test"] = did 508 group.Go(func() error { 509 return mm.TestSource(ctx, testMediaSigner) 510 }) 511 512 // Start a test stream that will run intermittently 513 if err != nil { 514 return err 515 } 516 atkey2, err := atproto.ParsePubKey(signer.Public()) 517 if err != nil { 518 return err 519 } 520 did2 := atkey2.DIDKey() 521 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, signer, mod) 522 if err != nil { 523 return err 524 } 525 err = mod.UpdateIdentity(&model.Identity{ 526 ID: intermittentMediaSigner.Pub().String(), 527 Handle: "stream-intermittent-tester", 528 DID: "", 529 }) 530 if err != nil { 531 return err 532 } 533 cli.AllowedStreams = append(cli.AllowedStreams, did2) 534 a.Aliases["intermittent-self-test"] = did2 535 536 group.Go(func() error { 537 for { 538 // Start intermittent stream 539 intermittentCtx, cancel := context.WithCancel(ctx) 540 done := make(chan struct{}) 541 go func() { 542 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 543 close(done) 544 }() 545 // Stream ON for 15 seconds 546 time.Sleep(15 * time.Second) 547 // Stop stream 548 cancel() 549 <-done // Wait for TestSource to exit 550 // Stream OFF for 15 seconds 551 time.Sleep(15 * time.Second) 552 } 553 }) 554 } 555 556 for _, job := range platformJobs { 557 group.Go(func() error { 558 return job(ctx, &cli) 559 }) 560 } 561 562 if cli.WHIPTest != "" { 563 group.Go(func() error { 564 err := WHIP(strings.Split(cli.WHIPTest, " ")) 565 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 566 time.Sleep(time.Second * 3) 567 // gst.Deinit() 568 log.Warn(ctx, "gst deinit complete, exiting") 569 return err 570 }) 571 } 572 573 return group.Wait() 574} 575 576var ErrCaughtSignal = errors.New("caught signal") 577 578func handleSignals(ctx context.Context) error { 579 c := make(chan os.Signal, 1) 580 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 581 for { 582 select { 583 case s := <-c: 584 if s == syscall.SIGABRT { 585 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 586 log.Error(ctx, "failed to create pprof", "error", err) 587 } 588 } 589 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 590 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 591 case <-ctx.Done(): 592 return nil 593 } 594 } 595}