Live video on the AT Protocol
1package cmd
2
3import (
4 "bytes"
5 "context"
6 "crypto/rand"
7 "errors"
8 "flag"
9 "fmt"
10 "net/url"
11 "os"
12 "os/signal"
13 "runtime"
14 "runtime/pprof"
15 "slices"
16 "strconv"
17 "strings"
18 "syscall"
19 "time"
20
21 "github.com/bluesky-social/indigo/carstore"
22 "github.com/ethereum/go-ethereum/common/hexutil"
23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
24 "github.com/streamplace/oatproxy/pkg/oatproxy"
25 urfavecli "github.com/urfave/cli/v3"
26 "stream.place/streamplace/pkg/aqhttp"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/director"
30 "stream.place/streamplace/pkg/gstinit"
31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
32 "stream.place/streamplace/pkg/localdb"
33 "stream.place/streamplace/pkg/log"
34 "stream.place/streamplace/pkg/media"
35 "stream.place/streamplace/pkg/notifications"
36 "stream.place/streamplace/pkg/replication"
37 "stream.place/streamplace/pkg/replication/iroh_replicator"
38 "stream.place/streamplace/pkg/replication/websocketrep"
39 "stream.place/streamplace/pkg/rtmps"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/statedb"
42 "stream.place/streamplace/pkg/storage"
43
44 _ "github.com/go-gst/go-glib/glib"
45 _ "github.com/go-gst/go-gst/gst"
46 "stream.place/streamplace/pkg/api"
47 "stream.place/streamplace/pkg/config"
48 "stream.place/streamplace/pkg/model"
49)
50
51// Additional jobs that can be injected by platforms
52type jobFunc func(ctx context.Context, cli *config.CLI) error
53
54// parse the CLI and fire up an streamplace node!
55func start(build *config.BuildFlags, platformJobs []jobFunc) error {
56 iroh_streamplace.InitLogging()
57
58 cli := config.CLI{Build: build}
59 app := cli.NewCommand("streamplace")
60 app.Usage = "decentralized live streaming platform"
61 app.Version = build.Version
62 app.Commands = []*urfavecli.Command{
63 makeSelfTestCommand(build),
64 makeStreamCommand(build),
65 makeLiveCommand(build),
66 makeSignCommand(build),
67 makeWhepCommand(build),
68 makeWhipCommand(build),
69 makeCombineCommand(build),
70 makeSplitCommand(build),
71 makeLivepeerCommand(build),
72 makeMigrateCommand(build),
73 }
74 // Add the verbosity flag
75 // app.Flags = append(app.Flags, &urfavecli.StringFlag{
76 // Name: "v",
77 // Usage: "log verbosity level",
78 // Value: "3",
79 // })
80 app.Before = func(ctx context.Context, cmd *urfavecli.Command) (context.Context, error) {
81 // Run self-test before starting
82 selfTest := cmd.Name == "self-test"
83 err := media.RunSelfTest(ctx)
84 if err != nil {
85 if selfTest {
86 fmt.Println(err.Error())
87 os.Exit(1)
88 } else {
89 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
90 if retryCount >= 3 {
91 log.Error(ctx, "gstreamer self-test failed 3 times, giving up", "error", err)
92 return ctx, err
93 }
94 log.Log(ctx, "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
95 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
96 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
97 if err != nil {
98 log.Error(ctx, "error in gstreamer self-test, could not restart", "error", err)
99 return ctx, err
100 }
101 panic("invalid code path: exec succeeded but we're still here???")
102 }
103 }
104 return ctx, nil
105 }
106 app.Action = func(ctx context.Context, cmd *urfavecli.Command) error {
107 return runMain(ctx, build, platformJobs, cmd, &cli)
108 }
109
110 return app.Run(context.Background(), os.Args)
111}
112
113func runMain(ctx context.Context, build *config.BuildFlags, platformJobs []jobFunc, cmd *urfavecli.Command, cli *config.CLI) error {
114 _ = flag.Set("logtostderr", "true")
115 vFlag := flag.Lookup("v")
116
117 err := cli.Validate(cmd)
118 if err != nil {
119 return err
120 }
121
122 err = flag.CommandLine.Parse(nil)
123 if err != nil {
124 return err
125 }
126 verbosity := cmd.String("v")
127 _ = vFlag.Value.Set(verbosity)
128 log.SetColorLogger(cli.Color)
129 ctx = log.WithDebugValue(ctx, cli.Debug)
130
131 log.Log(ctx,
132 "streamplace",
133 "version", build.Version,
134 "buildTime", build.BuildTimeStr(),
135 "uuid", build.UUID,
136 "runtime.GOOS", runtime.GOOS,
137 "runtime.GOARCH", runtime.GOARCH,
138 "runtime.Version", runtime.Version())
139
140 signer, err := createSigner(ctx, cli)
141 if err != nil {
142 return err
143 }
144
145 if len(os.Args) > 1 && os.Args[1] == "migrate" {
146 return statedb.Migrate(cli)
147 }
148
149 spmetrics.Version.WithLabelValues(build.Version).Inc()
150 if cli.LivepeerHelp {
151 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
152 _ = starter.NewLivepeerConfig(lpFlags)
153 lpFlags.VisitAll(func(f *flag.Flag) {
154 adapted := config.ToSnakeCase(f.Name)
155 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
156 usage := fmt.Sprintf(" %s", f.Usage)
157 if f.DefValue != "" {
158 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
159 }
160 fmt.Printf(" %s\n", usage)
161 })
162 return nil
163 }
164
165 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
166
167 err = os.MkdirAll(cli.DataDir, os.ModePerm)
168 if err != nil {
169 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
170 }
171
172 ldb, err := localdb.MakeDB(cli.LocalDBURL)
173 if err != nil {
174 return err
175 }
176
177 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
178 if err != nil {
179 return err
180 }
181 var noter notifications.FirebaseNotifier
182 if cli.FirebaseServiceAccount != "" {
183 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
184 if err != nil {
185 return err
186 }
187 }
188
189 group, ctx := TimeoutGroupWithContext(ctx)
190
191 out := carstore.SQLiteStore{}
192 err = out.Open(":memory:")
193 if err != nil {
194 return err
195 }
196 state, err := statedb.MakeDB(ctx, cli, noter, mod)
197 if err != nil {
198 return err
199 }
200 handle, err := atproto.MakeLexiconRepo(ctx, cli, mod, state)
201 if err != nil {
202 return err
203 }
204 defer handle.Close()
205
206 jwk, err := state.EnsureJWK(ctx, "jwk")
207 if err != nil {
208 return err
209 }
210 cli.JWK = jwk
211
212 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
213 if err != nil {
214 return err
215 }
216 cli.AccessJWK = accessJWK
217
218 serviceAuthKey, err := state.EnsureServiceAuthKey(ctx)
219 if err != nil {
220 return err
221 }
222 cli.ServiceAuthKey = serviceAuthKey
223
224 b := bus.NewBus()
225 atsync := &atproto.ATProtoSynchronizer{
226 CLI: cli,
227 Model: mod,
228 StatefulDB: state,
229 Noter: noter,
230 Bus: b,
231 }
232 err = atsync.Migrate(ctx)
233 if err != nil {
234 return fmt.Errorf("failed to migrate: %w", err)
235 }
236
237 mm, err := media.MakeMediaManager(ctx, cli, signer, mod, b, atsync, ldb)
238 if err != nil {
239 return err
240 }
241
242 ms, err := media.MakeMediaSigner(ctx, cli, cli.StreamerName, signer, mod)
243 if err != nil {
244 return err
245 }
246
247 var clientMetadata *oatproxy.OAuthClientMetadata
248 var host string
249 if cli.PublicOAuth {
250 u, err := url.Parse(cli.OwnPublicURL())
251 if err != nil {
252 return err
253 }
254 host = u.Host
255 clientMetadata = &oatproxy.OAuthClientMetadata{
256 Scope: atproto.OAuthString,
257 ClientName: "Streamplace",
258 RedirectURIs: []string{
259 fmt.Sprintf("%s/login", cli.OwnPublicURL()),
260 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()),
261 },
262 }
263 } else {
264 host = cli.BroadcasterHost
265 clientMetadata = &oatproxy.OAuthClientMetadata{
266 Scope: atproto.OAuthString,
267 ClientName: "Streamplace",
268 RedirectURIs: []string{
269 fmt.Sprintf("https://%s/login", cli.BroadcasterHost),
270 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost),
271 },
272 }
273 }
274
275 op := oatproxy.New(&oatproxy.Config{
276 Host: host,
277 CreateOAuthSession: state.CreateOAuthSession,
278 UpdateOAuthSession: state.UpdateOAuthSession,
279 GetOAuthSession: state.LoadOAuthSession,
280 Lock: state.GetNamedLock,
281 Scope: atproto.OAuthString,
282 UpstreamJWK: cli.JWK,
283 DownstreamJWK: cli.AccessJWK,
284 ClientMetadata: clientMetadata,
285 Public: cli.PublicOAuth,
286 })
287 state.OATProxy = op
288
289 err = atsync.Migrate(ctx)
290 if err != nil {
291 return fmt.Errorf("failed to migrate: %w", err)
292 }
293
294 var replicator replication.Replicator = nil
295 if slices.Contains(cli.Replicators, config.ReplicatorIroh) {
296 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"})
297 if err != nil {
298 return err
299 }
300 if !exists {
301 secret := make([]byte, 32)
302 _, err := rand.Read(secret)
303 if err != nil {
304 return fmt.Errorf("failed to generate random secret: %w", err)
305 }
306 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true)
307 if err != nil {
308 return err
309 }
310 }
311 buf := bytes.Buffer{}
312 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf)
313 if err != nil {
314 return err
315 }
316 secret := buf.Bytes()
317 var topic []byte
318 if cli.IrohTopic != "" {
319 topic, err = hexutil.Decode("0x" + cli.IrohTopic)
320 if err != nil {
321 return err
322 }
323 }
324 replicator, err = iroh_replicator.NewSwarm(ctx, cli, secret, topic, mm, b, mod)
325 if err != nil {
326 return err
327 }
328 }
329 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) {
330 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm)
331 }
332
333 d := director.NewDirector(mm, mod, cli, b, op, state, replicator, ldb, atsync)
334 a, err := api.MakeStreamplaceAPI(cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb)
335 if err != nil {
336 return err
337 }
338
339 ctx = log.WithLogValues(ctx, "version", build.Version)
340
341 group.Go(func() error {
342 return handleSignals(ctx)
343 })
344
345 group.Go(func() error {
346 return state.ProcessQueue(ctx)
347 })
348
349 if cli.TracingEndpoint != "" {
350 group.Go(func() error {
351 return startTelemetry(ctx, cli.TracingEndpoint)
352 })
353 }
354
355 if cli.Secure {
356 group.Go(func() error {
357 return a.ServeHTTPS(ctx)
358 })
359 group.Go(func() error {
360 return a.ServeHTTPRedirect(ctx)
361 })
362 if cli.RTMPServerAddon != "" {
363 group.Go(func() error {
364 return rtmps.ServeRTMPSAddon(ctx, cli)
365 })
366 }
367 group.Go(func() error {
368 return a.ServeRTMPS(ctx, cli)
369 })
370 } else {
371 group.Go(func() error {
372 return a.ServeHTTP(ctx)
373 })
374 group.Go(func() error {
375 return a.ServeRTMP(ctx)
376 })
377 }
378
379 group.Go(func() error {
380 return a.ServeInternalHTTP(ctx)
381 })
382
383 if !cli.NoFirehose {
384 group.Go(func() error {
385 return atsync.StartFirehose(ctx)
386 })
387 }
388 for _, labeler := range cli.Labelers {
389 group.Go(func() error {
390 return atsync.StartLabelerFirehose(ctx, labeler)
391 })
392 }
393
394 group.Go(func() error {
395 return a.ExpireSessions(ctx)
396 })
397
398 group.Go(func() error {
399 return storage.StartSegmentCleaner(ctx, ldb, cli)
400 })
401
402 if cli.LegacySegmentCleaner {
403 group.Go(func() error {
404 return ldb.StartSegmentCleaner(ctx)
405 })
406 }
407
408 group.Go(func() error {
409 return replicator.Start(ctx, cli)
410 })
411
412 if cli.LivepeerGateway {
413 // make a file to make sure the directory exists
414 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
415 if err != nil {
416 return err
417 }
418 fd.Close()
419 if err != nil {
420 return err
421 }
422 group.Go(func() error {
423 err = GoLivepeer(ctx, config.LivepeerFlagSet)
424 if err != nil {
425 return err
426 }
427 // livepeer returns nil on error, so we need to check if we're responsible
428 if ctx.Err() == nil {
429 return fmt.Errorf("livepeer exited")
430 }
431 return nil
432 })
433 }
434
435 group.Go(func() error {
436 return d.Start(ctx)
437 })
438
439 if cli.TestStream {
440 atkey, err := atproto.ParsePubKey(signer.Public())
441 if err != nil {
442 return err
443 }
444 did := atkey.DIDKey()
445 testMediaSigner, err := media.MakeMediaSigner(ctx, cli, did, signer, mod)
446 if err != nil {
447 return err
448 }
449 err = mod.UpdateIdentity(&model.Identity{
450 ID: testMediaSigner.Pub().String(),
451 Handle: "stream-self-tester",
452 DID: "",
453 })
454 if err != nil {
455 return err
456 }
457 cli.AllowedStreams = append(cli.AllowedStreams, did)
458 a.Aliases["self-test"] = did
459 group.Go(func() error {
460 return mm.TestSource(ctx, testMediaSigner)
461 })
462
463 // Start a test stream that will run intermittently
464 if err != nil {
465 return err
466 }
467 atkey2, err := atproto.ParsePubKey(signer.Public())
468 if err != nil {
469 return err
470 }
471 did2 := atkey2.DIDKey()
472 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, cli, did2, signer, mod)
473 if err != nil {
474 return err
475 }
476 err = mod.UpdateIdentity(&model.Identity{
477 ID: intermittentMediaSigner.Pub().String(),
478 Handle: "stream-intermittent-tester",
479 DID: "",
480 })
481 if err != nil {
482 return err
483 }
484 cli.AllowedStreams = append(cli.AllowedStreams, did2)
485 a.Aliases["intermittent-self-test"] = did2
486
487 group.Go(func() error {
488 for {
489 // Start intermittent stream
490 intermittentCtx, cancel := context.WithCancel(ctx)
491 done := make(chan struct{})
492 go func() {
493 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
494 close(done)
495 }()
496 // Stream ON for 15 seconds
497 time.Sleep(15 * time.Second)
498 // Stop stream
499 cancel()
500 <-done // Wait for TestSource to exit
501 // Stream OFF for 15 seconds
502 time.Sleep(15 * time.Second)
503 }
504 })
505 }
506
507 for _, job := range platformJobs {
508 group.Go(func() error {
509 return job(ctx, cli)
510 })
511 }
512
513 if cli.WHIPTest != "" {
514 group.Go(func() error {
515 // Parse WHIPTest string using the whip command's flag parser
516 whipCmd := makeWhipCommand(build)
517 args := strings.Split(cli.WHIPTest, " ")
518 err := whipCmd.Run(ctx, append([]string{"streamplace", "whip"}, args...))
519 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
520 time.Sleep(time.Second * 3)
521 // gst.Deinit()
522 log.Warn(ctx, "gst deinit complete, exiting")
523 return err
524 })
525 }
526
527 return group.Wait()
528}
529
530var ErrCaughtSignal = errors.New("caught signal")
531
532func handleSignals(ctx context.Context) error {
533 c := make(chan os.Signal, 1)
534 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
535 for {
536 select {
537 case s := <-c:
538 if s == syscall.SIGABRT {
539 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
540 log.Error(ctx, "failed to create pprof", "error", err)
541 }
542 }
543 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
544 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
545 case <-ctx.Done():
546 return nil
547 }
548 }
549}
550
551func makeSelfTestCommand(build *config.BuildFlags) *urfavecli.Command {
552 return &urfavecli.Command{
553 Name: "self-test",
554 Usage: "run gstreamer self-test",
555 Action: func(ctx context.Context, cmd *urfavecli.Command) error {
556 err := media.RunSelfTest(ctx)
557 if err != nil {
558 fmt.Println(err.Error())
559 os.Exit(1)
560 }
561 runtime.GC()
562 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
563 log.Error(ctx, "error creating pprof", "error", err)
564 }
565 fmt.Println("self-test successful!")
566 return nil
567 },
568 }
569}
570
571func makeStreamCommand(build *config.BuildFlags) *urfavecli.Command {
572 return &urfavecli.Command{
573 Name: "stream",
574 Usage: "stream command",
575 ArgsUsage: "[user]",
576 Action: func(ctx context.Context, cmd *urfavecli.Command) error {
577 args := cmd.Args()
578 if args.Len() != 1 {
579 return fmt.Errorf("usage: streamplace stream [user]")
580 }
581 return Stream(args.First())
582 },
583 }
584}
585
586func makeLiveCommand(build *config.BuildFlags) *urfavecli.Command {
587 cli := config.CLI{Build: build}
588 liveCmd := cli.NewCommand("live")
589 liveCmd.Usage = "start live stream"
590 liveCmd.ArgsUsage = "[stream-key]"
591 liveCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error {
592 args := cmd.Args()
593 if args.Len() != 1 {
594 return fmt.Errorf("usage: streamplace live [flags] [stream-key]")
595 }
596 return Live(args.First(), cli.HTTPInternalAddr)
597 }
598 return liveCmd
599}
600
601func makeSignCommand(build *config.BuildFlags) *urfavecli.Command {
602 return &urfavecli.Command{
603 Name: "sign",
604 Usage: "sign command",
605 Flags: []urfavecli.Flag{
606 &urfavecli.StringFlag{
607 Name: "cert",
608 Usage: "path to the certificate file",
609 },
610 &urfavecli.StringFlag{
611 Name: "key",
612 Usage: "base58-encoded secp256k1 private key",
613 },
614 &urfavecli.StringFlag{
615 Name: "streamer",
616 Usage: "streamer name",
617 },
618 &urfavecli.StringFlag{
619 Name: "ta-url",
620 Usage: "timestamp authority server for signing",
621 Value: "http://timestamp.digicert.com",
622 },
623 &urfavecli.IntFlag{
624 Name: "start-time",
625 Usage: "start time of the stream",
626 },
627 &urfavecli.StringFlag{
628 Name: "manifest",
629 Usage: "JSON manifest to use for signing",
630 },
631 },
632 Action: func(ctx context.Context, cmd *urfavecli.Command) error {
633 return Sign(
634 ctx,
635 cmd.String("cert"),
636 cmd.String("key"),
637 cmd.String("streamer"),
638 cmd.String("ta-url"),
639 int64(cmd.Int("start-time")),
640 cmd.String("manifest"),
641 )
642 },
643 }
644}
645
646func makeWhepCommand(build *config.BuildFlags) *urfavecli.Command {
647 return &urfavecli.Command{
648 Name: "whep",
649 Usage: "WHEP client",
650 Flags: []urfavecli.Flag{
651 &urfavecli.IntFlag{
652 Name: "count",
653 Usage: "number of concurrent streams (for load testing)",
654 Value: 1,
655 },
656 &urfavecli.DurationFlag{
657 Name: "duration",
658 Usage: "stop after this long",
659 },
660 &urfavecli.StringFlag{
661 Name: "endpoint",
662 Usage: "endpoint to send the WHEP request to",
663 },
664 },
665 Action: func(ctx context.Context, cmd *urfavecli.Command) error {
666 return WHEP(
667 ctx,
668 cmd.Int("count"),
669 cmd.Duration("duration"),
670 cmd.String("endpoint"),
671 )
672 },
673 }
674}
675
676func makeWhipCommand(build *config.BuildFlags) *urfavecli.Command {
677 return &urfavecli.Command{
678 Name: "whip",
679 Usage: "WHIP client",
680 Flags: []urfavecli.Flag{
681 &urfavecli.StringFlag{
682 Name: "stream-key",
683 Usage: "stream key",
684 },
685 &urfavecli.IntFlag{
686 Name: "count",
687 Usage: "number of concurrent streams (for load testing)",
688 Value: 1,
689 },
690 &urfavecli.IntFlag{
691 Name: "viewers",
692 Usage: "number of viewers to simulate per stream",
693 },
694 &urfavecli.DurationFlag{
695 Name: "duration",
696 Usage: "duration of the stream",
697 },
698 &urfavecli.StringFlag{
699 Name: "file",
700 Usage: "file to stream (needs to be an MP4 containing H264 video and Opus audio)",
701 Required: true,
702 },
703 &urfavecli.StringFlag{
704 Name: "endpoint",
705 Usage: "endpoint to send the WHIP request to",
706 Value: "http://127.0.0.1:38080",
707 },
708 &urfavecli.DurationFlag{
709 Name: "freeze-after",
710 Usage: "freeze the stream after the given duration",
711 },
712 },
713 Action: func(ctx context.Context, cmd *urfavecli.Command) error {
714 return WHIP(
715 ctx,
716 cmd.String("stream-key"),
717 cmd.Int("count"),
718 cmd.Int("viewers"),
719 cmd.Duration("duration"),
720 cmd.String("file"),
721 cmd.String("endpoint"),
722 cmd.Duration("freeze-after"),
723 )
724 },
725 }
726}
727
728func makeCombineCommand(build *config.BuildFlags) *urfavecli.Command {
729 cli := config.CLI{Build: build}
730 combineCmd := cli.NewCommand("combine")
731 combineCmd.Usage = "combine segments"
732 combineCmd.ArgsUsage = "[output] [input1] [input2...]"
733 combineCmd.Flags = []urfavecli.Flag{
734 &urfavecli.StringFlag{
735 Name: "debug-dir",
736 Usage: "directory to write debug output",
737 },
738 }
739 combineCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error {
740 args := cmd.Args()
741 if args.Len() < 2 {
742 return fmt.Errorf("usage: streamplace combine [--debug-dir dir] [output] [input1] [input2...]")
743 }
744 ctx = log.WithDebugValue(ctx, cli.Debug)
745 return Combine(
746 ctx,
747 &cli,
748 cmd.String("debug-dir"),
749 args.Get(0),
750 args.Slice()[1:],
751 )
752 }
753 return combineCmd
754}
755
756func makeSplitCommand(build *config.BuildFlags) *urfavecli.Command {
757 cli := config.CLI{Build: build}
758 splitCmd := cli.NewCommand("split")
759 splitCmd.Usage = "split video file"
760 splitCmd.ArgsUsage = "[input file] [output directory]"
761 splitCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error {
762 args := cmd.Args()
763 if args.Len() != 2 {
764 return fmt.Errorf("usage: streamplace split [flags] [input file] [output directory]")
765 }
766 ctx = log.WithDebugValue(ctx, cli.Debug)
767 gstinit.InitGST()
768 return Split(ctx, args.Get(0), args.Get(1))
769 }
770 return splitCmd
771}
772
773func makeLivepeerCommand(build *config.BuildFlags) *urfavecli.Command {
774 return &urfavecli.Command{
775 Name: "livepeer",
776 Usage: "run livepeer gateway",
777 Action: func(ctx context.Context, cmd *urfavecli.Command) error {
778 return GoLivepeer(ctx, config.LivepeerFlagSet)
779 },
780 }
781}
782
783func makeMigrateCommand(build *config.BuildFlags) *urfavecli.Command {
784 cli := config.CLI{Build: build}
785 return &urfavecli.Command{
786 Name: "migrate",
787 Usage: "run database migrations",
788 Action: func(ctx context.Context, cmd *urfavecli.Command) error {
789 return statedb.Migrate(&cli)
790 },
791 }
792}