Live video on the AT Protocol
at eli/routing-cleanup 792 lines 20 kB view raw
1package cmd 2 3import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "errors" 8 "flag" 9 "fmt" 10 "net/url" 11 "os" 12 "os/signal" 13 "runtime" 14 "runtime/pprof" 15 "slices" 16 "strconv" 17 "strings" 18 "syscall" 19 "time" 20 21 "github.com/bluesky-social/indigo/carstore" 22 "github.com/ethereum/go-ethereum/common/hexutil" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/streamplace/oatproxy/pkg/oatproxy" 25 urfavecli "github.com/urfave/cli/v3" 26 "stream.place/streamplace/pkg/aqhttp" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/gstinit" 31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 "stream.place/streamplace/pkg/localdb" 33 "stream.place/streamplace/pkg/log" 34 "stream.place/streamplace/pkg/media" 35 "stream.place/streamplace/pkg/notifications" 36 "stream.place/streamplace/pkg/replication" 37 "stream.place/streamplace/pkg/replication/iroh_replicator" 38 "stream.place/streamplace/pkg/replication/websocketrep" 39 "stream.place/streamplace/pkg/rtmps" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/statedb" 42 "stream.place/streamplace/pkg/storage" 43 44 _ "github.com/go-gst/go-glib/glib" 45 _ "github.com/go-gst/go-gst/gst" 46 "stream.place/streamplace/pkg/api" 47 "stream.place/streamplace/pkg/config" 48 "stream.place/streamplace/pkg/model" 49) 50 51// Additional jobs that can be injected by platforms 52type jobFunc func(ctx context.Context, cli *config.CLI) error 53 54// parse the CLI and fire up an streamplace node! 55func start(build *config.BuildFlags, platformJobs []jobFunc) error { 56 iroh_streamplace.InitLogging() 57 58 cli := config.CLI{Build: build} 59 app := cli.NewCommand("streamplace") 60 app.Usage = "decentralized live streaming platform" 61 app.Version = build.Version 62 app.Commands = []*urfavecli.Command{ 63 makeSelfTestCommand(build), 64 makeStreamCommand(build), 65 makeLiveCommand(build), 66 makeSignCommand(build), 67 makeWhepCommand(build), 68 makeWhipCommand(build), 69 makeCombineCommand(build), 70 makeSplitCommand(build), 71 makeLivepeerCommand(build), 72 makeMigrateCommand(build), 73 } 74 // Add the verbosity flag 75 // app.Flags = append(app.Flags, &urfavecli.StringFlag{ 76 // Name: "v", 77 // Usage: "log verbosity level", 78 // Value: "3", 79 // }) 80 app.Before = func(ctx context.Context, cmd *urfavecli.Command) (context.Context, error) { 81 // Run self-test before starting 82 selfTest := cmd.Name == "self-test" 83 err := media.RunSelfTest(ctx) 84 if err != nil { 85 if selfTest { 86 fmt.Println(err.Error()) 87 os.Exit(1) 88 } else { 89 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 90 if retryCount >= 3 { 91 log.Error(ctx, "gstreamer self-test failed 3 times, giving up", "error", err) 92 return ctx, err 93 } 94 log.Log(ctx, "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 95 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 96 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 97 if err != nil { 98 log.Error(ctx, "error in gstreamer self-test, could not restart", "error", err) 99 return ctx, err 100 } 101 panic("invalid code path: exec succeeded but we're still here???") 102 } 103 } 104 return ctx, nil 105 } 106 app.Action = func(ctx context.Context, cmd *urfavecli.Command) error { 107 return runMain(ctx, build, platformJobs, cmd, &cli) 108 } 109 110 return app.Run(context.Background(), os.Args) 111} 112 113func runMain(ctx context.Context, build *config.BuildFlags, platformJobs []jobFunc, cmd *urfavecli.Command, cli *config.CLI) error { 114 _ = flag.Set("logtostderr", "true") 115 vFlag := flag.Lookup("v") 116 117 err := cli.Validate(cmd) 118 if err != nil { 119 return err 120 } 121 122 err = flag.CommandLine.Parse(nil) 123 if err != nil { 124 return err 125 } 126 verbosity := cmd.String("v") 127 _ = vFlag.Value.Set(verbosity) 128 log.SetColorLogger(cli.Color) 129 ctx = log.WithDebugValue(ctx, cli.Debug) 130 131 log.Log(ctx, 132 "streamplace", 133 "version", build.Version, 134 "buildTime", build.BuildTimeStr(), 135 "uuid", build.UUID, 136 "runtime.GOOS", runtime.GOOS, 137 "runtime.GOARCH", runtime.GOARCH, 138 "runtime.Version", runtime.Version()) 139 140 signer, err := createSigner(ctx, cli) 141 if err != nil { 142 return err 143 } 144 145 if len(os.Args) > 1 && os.Args[1] == "migrate" { 146 return statedb.Migrate(cli) 147 } 148 149 spmetrics.Version.WithLabelValues(build.Version).Inc() 150 if cli.LivepeerHelp { 151 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 152 _ = starter.NewLivepeerConfig(lpFlags) 153 lpFlags.VisitAll(func(f *flag.Flag) { 154 adapted := config.ToSnakeCase(f.Name) 155 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 156 usage := fmt.Sprintf(" %s", f.Usage) 157 if f.DefValue != "" { 158 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 159 } 160 fmt.Printf(" %s\n", usage) 161 }) 162 return nil 163 } 164 165 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 166 167 err = os.MkdirAll(cli.DataDir, os.ModePerm) 168 if err != nil { 169 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 170 } 171 172 ldb, err := localdb.MakeDB(cli.LocalDBURL) 173 if err != nil { 174 return err 175 } 176 177 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 178 if err != nil { 179 return err 180 } 181 var noter notifications.FirebaseNotifier 182 if cli.FirebaseServiceAccount != "" { 183 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 184 if err != nil { 185 return err 186 } 187 } 188 189 group, ctx := TimeoutGroupWithContext(ctx) 190 191 out := carstore.SQLiteStore{} 192 err = out.Open(":memory:") 193 if err != nil { 194 return err 195 } 196 state, err := statedb.MakeDB(ctx, cli, noter, mod) 197 if err != nil { 198 return err 199 } 200 handle, err := atproto.MakeLexiconRepo(ctx, cli, mod, state) 201 if err != nil { 202 return err 203 } 204 defer handle.Close() 205 206 jwk, err := state.EnsureJWK(ctx, "jwk") 207 if err != nil { 208 return err 209 } 210 cli.JWK = jwk 211 212 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 213 if err != nil { 214 return err 215 } 216 cli.AccessJWK = accessJWK 217 218 serviceAuthKey, err := state.EnsureServiceAuthKey(ctx) 219 if err != nil { 220 return err 221 } 222 cli.ServiceAuthKey = serviceAuthKey 223 224 b := bus.NewBus() 225 atsync := &atproto.ATProtoSynchronizer{ 226 CLI: cli, 227 Model: mod, 228 StatefulDB: state, 229 Noter: noter, 230 Bus: b, 231 } 232 err = atsync.Migrate(ctx) 233 if err != nil { 234 return fmt.Errorf("failed to migrate: %w", err) 235 } 236 237 mm, err := media.MakeMediaManager(ctx, cli, signer, mod, b, atsync, ldb) 238 if err != nil { 239 return err 240 } 241 242 ms, err := media.MakeMediaSigner(ctx, cli, cli.StreamerName, signer, mod) 243 if err != nil { 244 return err 245 } 246 247 var clientMetadata *oatproxy.OAuthClientMetadata 248 var host string 249 if cli.PublicOAuth { 250 u, err := url.Parse(cli.OwnPublicURL()) 251 if err != nil { 252 return err 253 } 254 host = u.Host 255 clientMetadata = &oatproxy.OAuthClientMetadata{ 256 Scope: atproto.OAuthString, 257 ClientName: "Streamplace", 258 RedirectURIs: []string{ 259 fmt.Sprintf("%s/login", cli.OwnPublicURL()), 260 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()), 261 }, 262 } 263 } else { 264 host = cli.BroadcasterHost 265 clientMetadata = &oatproxy.OAuthClientMetadata{ 266 Scope: atproto.OAuthString, 267 ClientName: "Streamplace", 268 RedirectURIs: []string{ 269 fmt.Sprintf("https://%s/login", cli.BroadcasterHost), 270 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost), 271 }, 272 } 273 } 274 275 op := oatproxy.New(&oatproxy.Config{ 276 Host: host, 277 CreateOAuthSession: state.CreateOAuthSession, 278 UpdateOAuthSession: state.UpdateOAuthSession, 279 GetOAuthSession: state.LoadOAuthSession, 280 Lock: state.GetNamedLock, 281 Scope: atproto.OAuthString, 282 UpstreamJWK: cli.JWK, 283 DownstreamJWK: cli.AccessJWK, 284 ClientMetadata: clientMetadata, 285 Public: cli.PublicOAuth, 286 }) 287 state.OATProxy = op 288 289 err = atsync.Migrate(ctx) 290 if err != nil { 291 return fmt.Errorf("failed to migrate: %w", err) 292 } 293 294 var replicator replication.Replicator = nil 295 if slices.Contains(cli.Replicators, config.ReplicatorIroh) { 296 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 297 if err != nil { 298 return err 299 } 300 if !exists { 301 secret := make([]byte, 32) 302 _, err := rand.Read(secret) 303 if err != nil { 304 return fmt.Errorf("failed to generate random secret: %w", err) 305 } 306 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 307 if err != nil { 308 return err 309 } 310 } 311 buf := bytes.Buffer{} 312 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 313 if err != nil { 314 return err 315 } 316 secret := buf.Bytes() 317 var topic []byte 318 if cli.IrohTopic != "" { 319 topic, err = hexutil.Decode("0x" + cli.IrohTopic) 320 if err != nil { 321 return err 322 } 323 } 324 replicator, err = iroh_replicator.NewSwarm(ctx, cli, secret, topic, mm, b, mod) 325 if err != nil { 326 return err 327 } 328 } 329 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) { 330 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm) 331 } 332 333 d := director.NewDirector(mm, mod, cli, b, op, state, replicator, ldb, atsync) 334 a, err := api.MakeStreamplaceAPI(cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb) 335 if err != nil { 336 return err 337 } 338 339 ctx = log.WithLogValues(ctx, "version", build.Version) 340 341 group.Go(func() error { 342 return handleSignals(ctx) 343 }) 344 345 group.Go(func() error { 346 return state.ProcessQueue(ctx) 347 }) 348 349 if cli.TracingEndpoint != "" { 350 group.Go(func() error { 351 return startTelemetry(ctx, cli.TracingEndpoint) 352 }) 353 } 354 355 if cli.Secure { 356 group.Go(func() error { 357 return a.ServeHTTPS(ctx) 358 }) 359 group.Go(func() error { 360 return a.ServeHTTPRedirect(ctx) 361 }) 362 if cli.RTMPServerAddon != "" { 363 group.Go(func() error { 364 return rtmps.ServeRTMPSAddon(ctx, cli) 365 }) 366 } 367 group.Go(func() error { 368 return a.ServeRTMPS(ctx, cli) 369 }) 370 } else { 371 group.Go(func() error { 372 return a.ServeHTTP(ctx) 373 }) 374 group.Go(func() error { 375 return a.ServeRTMP(ctx) 376 }) 377 } 378 379 group.Go(func() error { 380 return a.ServeInternalHTTP(ctx) 381 }) 382 383 if !cli.NoFirehose { 384 group.Go(func() error { 385 return atsync.StartFirehose(ctx) 386 }) 387 } 388 for _, labeler := range cli.Labelers { 389 group.Go(func() error { 390 return atsync.StartLabelerFirehose(ctx, labeler) 391 }) 392 } 393 394 group.Go(func() error { 395 return a.ExpireSessions(ctx) 396 }) 397 398 group.Go(func() error { 399 return storage.StartSegmentCleaner(ctx, ldb, cli) 400 }) 401 402 if cli.LegacySegmentCleaner { 403 group.Go(func() error { 404 return ldb.StartSegmentCleaner(ctx) 405 }) 406 } 407 408 group.Go(func() error { 409 return replicator.Start(ctx, cli) 410 }) 411 412 if cli.LivepeerGateway { 413 // make a file to make sure the directory exists 414 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 415 if err != nil { 416 return err 417 } 418 fd.Close() 419 if err != nil { 420 return err 421 } 422 group.Go(func() error { 423 err = GoLivepeer(ctx, config.LivepeerFlagSet) 424 if err != nil { 425 return err 426 } 427 // livepeer returns nil on error, so we need to check if we're responsible 428 if ctx.Err() == nil { 429 return fmt.Errorf("livepeer exited") 430 } 431 return nil 432 }) 433 } 434 435 group.Go(func() error { 436 return d.Start(ctx) 437 }) 438 439 if cli.TestStream { 440 atkey, err := atproto.ParsePubKey(signer.Public()) 441 if err != nil { 442 return err 443 } 444 did := atkey.DIDKey() 445 testMediaSigner, err := media.MakeMediaSigner(ctx, cli, did, signer, mod) 446 if err != nil { 447 return err 448 } 449 err = mod.UpdateIdentity(&model.Identity{ 450 ID: testMediaSigner.Pub().String(), 451 Handle: "stream-self-tester", 452 DID: "", 453 }) 454 if err != nil { 455 return err 456 } 457 cli.AllowedStreams = append(cli.AllowedStreams, did) 458 a.Aliases["self-test"] = did 459 group.Go(func() error { 460 return mm.TestSource(ctx, testMediaSigner) 461 }) 462 463 // Start a test stream that will run intermittently 464 if err != nil { 465 return err 466 } 467 atkey2, err := atproto.ParsePubKey(signer.Public()) 468 if err != nil { 469 return err 470 } 471 did2 := atkey2.DIDKey() 472 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, cli, did2, signer, mod) 473 if err != nil { 474 return err 475 } 476 err = mod.UpdateIdentity(&model.Identity{ 477 ID: intermittentMediaSigner.Pub().String(), 478 Handle: "stream-intermittent-tester", 479 DID: "", 480 }) 481 if err != nil { 482 return err 483 } 484 cli.AllowedStreams = append(cli.AllowedStreams, did2) 485 a.Aliases["intermittent-self-test"] = did2 486 487 group.Go(func() error { 488 for { 489 // Start intermittent stream 490 intermittentCtx, cancel := context.WithCancel(ctx) 491 done := make(chan struct{}) 492 go func() { 493 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 494 close(done) 495 }() 496 // Stream ON for 15 seconds 497 time.Sleep(15 * time.Second) 498 // Stop stream 499 cancel() 500 <-done // Wait for TestSource to exit 501 // Stream OFF for 15 seconds 502 time.Sleep(15 * time.Second) 503 } 504 }) 505 } 506 507 for _, job := range platformJobs { 508 group.Go(func() error { 509 return job(ctx, cli) 510 }) 511 } 512 513 if cli.WHIPTest != "" { 514 group.Go(func() error { 515 // Parse WHIPTest string using the whip command's flag parser 516 whipCmd := makeWhipCommand(build) 517 args := strings.Split(cli.WHIPTest, " ") 518 err := whipCmd.Run(ctx, append([]string{"streamplace", "whip"}, args...)) 519 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 520 time.Sleep(time.Second * 3) 521 // gst.Deinit() 522 log.Warn(ctx, "gst deinit complete, exiting") 523 return err 524 }) 525 } 526 527 return group.Wait() 528} 529 530var ErrCaughtSignal = errors.New("caught signal") 531 532func handleSignals(ctx context.Context) error { 533 c := make(chan os.Signal, 1) 534 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 535 for { 536 select { 537 case s := <-c: 538 if s == syscall.SIGABRT { 539 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 540 log.Error(ctx, "failed to create pprof", "error", err) 541 } 542 } 543 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 544 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 545 case <-ctx.Done(): 546 return nil 547 } 548 } 549} 550 551func makeSelfTestCommand(build *config.BuildFlags) *urfavecli.Command { 552 return &urfavecli.Command{ 553 Name: "self-test", 554 Usage: "run gstreamer self-test", 555 Action: func(ctx context.Context, cmd *urfavecli.Command) error { 556 err := media.RunSelfTest(ctx) 557 if err != nil { 558 fmt.Println(err.Error()) 559 os.Exit(1) 560 } 561 runtime.GC() 562 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 563 log.Error(ctx, "error creating pprof", "error", err) 564 } 565 fmt.Println("self-test successful!") 566 return nil 567 }, 568 } 569} 570 571func makeStreamCommand(build *config.BuildFlags) *urfavecli.Command { 572 return &urfavecli.Command{ 573 Name: "stream", 574 Usage: "stream command", 575 ArgsUsage: "[user]", 576 Action: func(ctx context.Context, cmd *urfavecli.Command) error { 577 args := cmd.Args() 578 if args.Len() != 1 { 579 return fmt.Errorf("usage: streamplace stream [user]") 580 } 581 return Stream(args.First()) 582 }, 583 } 584} 585 586func makeLiveCommand(build *config.BuildFlags) *urfavecli.Command { 587 cli := config.CLI{Build: build} 588 liveCmd := cli.NewCommand("live") 589 liveCmd.Usage = "start live stream" 590 liveCmd.ArgsUsage = "[stream-key]" 591 liveCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error { 592 args := cmd.Args() 593 if args.Len() != 1 { 594 return fmt.Errorf("usage: streamplace live [flags] [stream-key]") 595 } 596 return Live(args.First(), cli.HTTPInternalAddr) 597 } 598 return liveCmd 599} 600 601func makeSignCommand(build *config.BuildFlags) *urfavecli.Command { 602 return &urfavecli.Command{ 603 Name: "sign", 604 Usage: "sign command", 605 Flags: []urfavecli.Flag{ 606 &urfavecli.StringFlag{ 607 Name: "cert", 608 Usage: "path to the certificate file", 609 }, 610 &urfavecli.StringFlag{ 611 Name: "key", 612 Usage: "base58-encoded secp256k1 private key", 613 }, 614 &urfavecli.StringFlag{ 615 Name: "streamer", 616 Usage: "streamer name", 617 }, 618 &urfavecli.StringFlag{ 619 Name: "ta-url", 620 Usage: "timestamp authority server for signing", 621 Value: "http://timestamp.digicert.com", 622 }, 623 &urfavecli.IntFlag{ 624 Name: "start-time", 625 Usage: "start time of the stream", 626 }, 627 &urfavecli.StringFlag{ 628 Name: "manifest", 629 Usage: "JSON manifest to use for signing", 630 }, 631 }, 632 Action: func(ctx context.Context, cmd *urfavecli.Command) error { 633 return Sign( 634 ctx, 635 cmd.String("cert"), 636 cmd.String("key"), 637 cmd.String("streamer"), 638 cmd.String("ta-url"), 639 int64(cmd.Int("start-time")), 640 cmd.String("manifest"), 641 ) 642 }, 643 } 644} 645 646func makeWhepCommand(build *config.BuildFlags) *urfavecli.Command { 647 return &urfavecli.Command{ 648 Name: "whep", 649 Usage: "WHEP client", 650 Flags: []urfavecli.Flag{ 651 &urfavecli.IntFlag{ 652 Name: "count", 653 Usage: "number of concurrent streams (for load testing)", 654 Value: 1, 655 }, 656 &urfavecli.DurationFlag{ 657 Name: "duration", 658 Usage: "stop after this long", 659 }, 660 &urfavecli.StringFlag{ 661 Name: "endpoint", 662 Usage: "endpoint to send the WHEP request to", 663 }, 664 }, 665 Action: func(ctx context.Context, cmd *urfavecli.Command) error { 666 return WHEP( 667 ctx, 668 cmd.Int("count"), 669 cmd.Duration("duration"), 670 cmd.String("endpoint"), 671 ) 672 }, 673 } 674} 675 676func makeWhipCommand(build *config.BuildFlags) *urfavecli.Command { 677 return &urfavecli.Command{ 678 Name: "whip", 679 Usage: "WHIP client", 680 Flags: []urfavecli.Flag{ 681 &urfavecli.StringFlag{ 682 Name: "stream-key", 683 Usage: "stream key", 684 }, 685 &urfavecli.IntFlag{ 686 Name: "count", 687 Usage: "number of concurrent streams (for load testing)", 688 Value: 1, 689 }, 690 &urfavecli.IntFlag{ 691 Name: "viewers", 692 Usage: "number of viewers to simulate per stream", 693 }, 694 &urfavecli.DurationFlag{ 695 Name: "duration", 696 Usage: "duration of the stream", 697 }, 698 &urfavecli.StringFlag{ 699 Name: "file", 700 Usage: "file to stream (needs to be an MP4 containing H264 video and Opus audio)", 701 Required: true, 702 }, 703 &urfavecli.StringFlag{ 704 Name: "endpoint", 705 Usage: "endpoint to send the WHIP request to", 706 Value: "http://127.0.0.1:38080", 707 }, 708 &urfavecli.DurationFlag{ 709 Name: "freeze-after", 710 Usage: "freeze the stream after the given duration", 711 }, 712 }, 713 Action: func(ctx context.Context, cmd *urfavecli.Command) error { 714 return WHIP( 715 ctx, 716 cmd.String("stream-key"), 717 cmd.Int("count"), 718 cmd.Int("viewers"), 719 cmd.Duration("duration"), 720 cmd.String("file"), 721 cmd.String("endpoint"), 722 cmd.Duration("freeze-after"), 723 ) 724 }, 725 } 726} 727 728func makeCombineCommand(build *config.BuildFlags) *urfavecli.Command { 729 cli := config.CLI{Build: build} 730 combineCmd := cli.NewCommand("combine") 731 combineCmd.Usage = "combine segments" 732 combineCmd.ArgsUsage = "[output] [input1] [input2...]" 733 combineCmd.Flags = []urfavecli.Flag{ 734 &urfavecli.StringFlag{ 735 Name: "debug-dir", 736 Usage: "directory to write debug output", 737 }, 738 } 739 combineCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error { 740 args := cmd.Args() 741 if args.Len() < 2 { 742 return fmt.Errorf("usage: streamplace combine [--debug-dir dir] [output] [input1] [input2...]") 743 } 744 ctx = log.WithDebugValue(ctx, cli.Debug) 745 return Combine( 746 ctx, 747 &cli, 748 cmd.String("debug-dir"), 749 args.Get(0), 750 args.Slice()[1:], 751 ) 752 } 753 return combineCmd 754} 755 756func makeSplitCommand(build *config.BuildFlags) *urfavecli.Command { 757 cli := config.CLI{Build: build} 758 splitCmd := cli.NewCommand("split") 759 splitCmd.Usage = "split video file" 760 splitCmd.ArgsUsage = "[input file] [output directory]" 761 splitCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error { 762 args := cmd.Args() 763 if args.Len() != 2 { 764 return fmt.Errorf("usage: streamplace split [flags] [input file] [output directory]") 765 } 766 ctx = log.WithDebugValue(ctx, cli.Debug) 767 gstinit.InitGST() 768 return Split(ctx, args.Get(0), args.Get(1)) 769 } 770 return splitCmd 771} 772 773func makeLivepeerCommand(build *config.BuildFlags) *urfavecli.Command { 774 return &urfavecli.Command{ 775 Name: "livepeer", 776 Usage: "run livepeer gateway", 777 Action: func(ctx context.Context, cmd *urfavecli.Command) error { 778 return GoLivepeer(ctx, config.LivepeerFlagSet) 779 }, 780 } 781} 782 783func makeMigrateCommand(build *config.BuildFlags) *urfavecli.Command { 784 cli := config.CLI{Build: build} 785 return &urfavecli.Command{ 786 Name: "migrate", 787 Usage: "run database migrations", 788 Action: func(ctx context.Context, cmd *urfavecli.Command) error { 789 return statedb.Migrate(&cli) 790 }, 791 } 792}