Live video on the AT Protocol
at v0.7.21 593 lines 15 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/bluesky-social/indigo/carstore" 20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 21 "github.com/peterbourgon/ff/v3" 22 "github.com/streamplace/oatproxy/pkg/oatproxy" 23 "golang.org/x/term" 24 "stream.place/streamplace/pkg/aqhttp" 25 "stream.place/streamplace/pkg/atproto" 26 "stream.place/streamplace/pkg/bus" 27 "stream.place/streamplace/pkg/crypto/signers" 28 "stream.place/streamplace/pkg/crypto/signers/eip712" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/media" 32 "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/replication" 34 "stream.place/streamplace/pkg/replication/boring" 35 "stream.place/streamplace/pkg/rtmps" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 "stream.place/streamplace/pkg/spmetrics" 38 "stream.place/streamplace/pkg/statedb" 39 40 "github.com/ThalesGroup/crypto11" 41 _ "github.com/go-gst/go-glib/glib" 42 _ "github.com/go-gst/go-gst/gst" 43 "stream.place/streamplace/pkg/api" 44 "stream.place/streamplace/pkg/config" 45 "stream.place/streamplace/pkg/model" 46) 47 48// Additional jobs that can be injected by platforms 49type jobFunc func(ctx context.Context, cli *config.CLI) error 50 51// parse the CLI and fire up an streamplace node! 52func start(build *config.BuildFlags, platformJobs []jobFunc) error { 53 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 54 err := media.RunSelfTest(context.Background()) 55 if err != nil { 56 if selfTest { 57 fmt.Println(err.Error()) 58 os.Exit(1) 59 } else { 60 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 61 if retryCount >= 3 { 62 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 63 return err 64 } 65 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 66 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 67 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 68 if err != nil { 69 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 70 return err 71 } 72 panic("invalid code path: exec succeeded but we're still here???") 73 } 74 } 75 if selfTest { 76 runtime.GC() 77 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 78 log.Error(context.Background(), "error creating pprof", "error", err) 79 } 80 fmt.Println("self-test successful!") 81 os.Exit(0) 82 } 83 84 if len(os.Args) > 1 && os.Args[1] == "stream" { 85 if len(os.Args) != 3 { 86 fmt.Println("usage: streamplace stream [user]") 87 os.Exit(1) 88 } 89 return Stream(os.Args[2]) 90 } 91 92 if len(os.Args) > 1 && os.Args[1] == "live" { 93 cli := config.CLI{Build: build} 94 fs := cli.NewFlagSet("streamplace live") 95 96 err := cli.Parse(fs, os.Args[2:]) 97 if err != nil { 98 return err 99 } 100 101 args := fs.Args() 102 if len(args) != 1 { 103 fmt.Println("usage: streamplace live [flags] [stream-key]") 104 os.Exit(1) 105 } 106 107 return Live(args[0], cli.HTTPInternalAddr) 108 } 109 110 if len(os.Args) > 1 && os.Args[1] == "sign" { 111 return Sign(context.Background()) 112 } 113 114 if len(os.Args) > 1 && os.Args[1] == "whep" { 115 return WHEP(os.Args[2:]) 116 } 117 if len(os.Args) > 1 && os.Args[1] == "whip" { 118 return WHIP(os.Args[2:]) 119 } 120 121 if len(os.Args) > 1 && os.Args[1] == "clip" { 122 cli := config.CLI{Build: build} 123 fs := cli.NewFlagSet("streamplace clip") 124 out := fs.String("out", "", "output file") 125 126 err := cli.Parse(fs, os.Args[2:]) 127 if err != nil { 128 return err 129 } 130 ctx := context.Background() 131 ctx = log.WithDebugValue(ctx, cli.Debug) 132 return Clip(ctx, fs.Args(), *out) 133 } 134 135 if len(os.Args) > 1 && os.Args[1] == "self-test" { 136 err := media.RunSelfTest(context.Background()) 137 if err != nil { 138 fmt.Println(err.Error()) 139 os.Exit(1) 140 } 141 fmt.Println("self-test successful!") 142 os.Exit(0) 143 } 144 145 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 146 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 147 _ = starter.NewLivepeerConfig(lpfs) 148 err = ff.Parse(lpfs, os.Args[2:], 149 ff.WithConfigFileFlag("config"), 150 ff.WithEnvVarPrefix("LP"), 151 ) 152 if err != nil { 153 return err 154 } 155 err = GoLivepeer(context.Background(), lpfs) 156 if err != nil { 157 log.Error(context.Background(), "error in livepeer", "error", err) 158 os.Exit(1) 159 } 160 os.Exit(0) 161 } 162 163 _ = flag.Set("logtostderr", "true") 164 vFlag := flag.Lookup("v") 165 cli := config.CLI{Build: build} 166 fs := cli.NewFlagSet("streamplace") 167 verbosity := fs.String("v", "3", "log verbosity level") 168 version := fs.Bool("version", false, "print version and exit") 169 170 err = cli.Parse( 171 fs, os.Args[1:], 172 ) 173 if err != nil { 174 return err 175 } 176 err = flag.CommandLine.Parse(nil) 177 if err != nil { 178 return err 179 } 180 _ = vFlag.Value.Set(*verbosity) 181 log.SetColorLogger(cli.Color) 182 ctx := context.Background() 183 ctx = log.WithDebugValue(ctx, cli.Debug) 184 185 log.Log(ctx, 186 "streamplace", 187 "version", build.Version, 188 "buildTime", build.BuildTimeStr(), 189 "uuid", build.UUID, 190 "runtime.GOOS", runtime.GOOS, 191 "runtime.GOARCH", runtime.GOARCH, 192 "runtime.Version", runtime.Version()) 193 if *version { 194 return nil 195 } 196 197 if len(os.Args) > 1 && os.Args[1] == "migrate" { 198 return statedb.Migrate(&cli) 199 } 200 201 spmetrics.Version.WithLabelValues(build.Version).Inc() 202 if cli.LivepeerHelp { 203 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 204 _ = starter.NewLivepeerConfig(lpFlags) 205 lpFlags.VisitAll(func(f *flag.Flag) { 206 adapted := config.ToSnakeCase(f.Name) 207 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 208 usage := fmt.Sprintf(" %s", f.Usage) 209 if f.DefValue != "" { 210 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 211 } 212 fmt.Printf(" %s\n", usage) 213 }) 214 return nil 215 } 216 217 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 218 219 err = os.MkdirAll(cli.DataDir, os.ModePerm) 220 if err != nil { 221 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 222 } 223 schema, err := v0.MakeV0Schema() 224 if err != nil { 225 return err 226 } 227 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 228 Schema: schema, 229 EthKeystorePath: cli.EthKeystorePath, 230 EthAccountAddr: cli.EthAccountAddr, 231 EthKeystorePassword: cli.EthPassword, 232 }) 233 if err != nil { 234 return err 235 } 236 var signer crypto.Signer = eip712signer 237 if cli.PKCS11ModulePath != "" { 238 conf := &crypto11.Config{ 239 Path: cli.PKCS11ModulePath, 240 } 241 count := 0 242 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 243 if val != "" { 244 count += 1 245 } 246 } 247 if count != 1 { 248 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 249 } 250 if cli.PKCS11TokenSlot != "" { 251 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 252 if err != nil { 253 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 254 } 255 numint := int(num) 256 // why does crypto11 want this as a reference? odd. 257 conf.SlotNumber = &numint 258 } 259 if cli.PKCS11TokenLabel != "" { 260 conf.TokenLabel = cli.PKCS11TokenLabel 261 } 262 if cli.PKCS11TokenSerial != "" { 263 conf.TokenSerial = cli.PKCS11TokenSerial 264 } 265 pin := cli.PKCS11Pin 266 if pin == "" { 267 fmt.Printf("Please enter PKCS11 PIN: ") 268 password, err := term.ReadPassword(int(os.Stdin.Fd())) 269 fmt.Println("") 270 if err != nil { 271 return fmt.Errorf("error reading PKCS11 password: %w", err) 272 } 273 pin = string(password) 274 } 275 conf.Pin = pin 276 277 sc, err := crypto11.Configure(conf) 278 if err != nil { 279 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 280 } 281 var id []byte = nil 282 var label []byte = nil 283 if cli.PKCS11KeypairID != "" { 284 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 285 if err != nil { 286 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 287 } 288 id = []byte{byte(num)} 289 } 290 if cli.PKCS11KeypairLabel != "" { 291 label = []byte(cli.PKCS11KeypairLabel) 292 } 293 hwsigner, err := sc.FindKeyPair(id, label) 294 if err != nil { 295 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 296 } 297 if hwsigner == nil { 298 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 299 } 300 addr, err := signers.HexAddrFromSigner(hwsigner) 301 if err != nil { 302 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 303 } 304 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 305 signer = hwsigner 306 } 307 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 308 309 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 310 if err != nil { 311 return err 312 } 313 var noter notifications.FirebaseNotifier 314 if cli.FirebaseServiceAccount != "" { 315 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 316 if err != nil { 317 return err 318 } 319 } 320 321 out := carstore.SQLiteStore{} 322 err = out.Open(":memory:") 323 if err != nil { 324 return err 325 } 326 state, err := statedb.MakeDB(&cli, noter, mod) 327 if err != nil { 328 return err 329 } 330 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 331 if err != nil { 332 return err 333 } 334 defer handle.Close() 335 336 jwk, err := state.EnsureJWK(ctx, "jwk") 337 if err != nil { 338 return err 339 } 340 cli.JWK = jwk 341 342 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 343 if err != nil { 344 return err 345 } 346 cli.AccessJWK = accessJWK 347 348 b := bus.NewBus() 349 atsync := &atproto.ATProtoSynchronizer{ 350 CLI: &cli, 351 Model: mod, 352 StatefulDB: state, 353 Noter: noter, 354 Bus: b, 355 } 356 err = atsync.Migrate(ctx) 357 if err != nil { 358 return fmt.Errorf("failed to migrate: %w", err) 359 } 360 361 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 362 if err != nil { 363 return err 364 } 365 366 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 367 if err != nil { 368 return err 369 } 370 371 clientMetadata := &oatproxy.OAuthClientMetadata{ 372 Scope: "atproto transition:generic", 373 ClientName: "Streamplace", 374 RedirectURIs: []string{ 375 fmt.Sprintf("https://%s/login", cli.PublicHost), 376 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 377 }, 378 } 379 380 op := oatproxy.New(&oatproxy.Config{ 381 Host: cli.PublicHost, 382 CreateOAuthSession: state.CreateOAuthSession, 383 UpdateOAuthSession: state.UpdateOAuthSession, 384 GetOAuthSession: state.LoadOAuthSession, 385 Lock: state.GetNamedLock, 386 Scope: "atproto transition:generic", 387 UpstreamJWK: cli.JWK, 388 DownstreamJWK: cli.AccessJWK, 389 ClientMetadata: clientMetadata, 390 }) 391 d := director.NewDirector(mm, mod, &cli, b, op, state) 392 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 393 if err != nil { 394 return err 395 } 396 397 group, ctx := TimeoutGroupWithContext(ctx) 398 ctx = log.WithLogValues(ctx, "version", build.Version) 399 400 group.Go(func() error { 401 return handleSignals(ctx) 402 }) 403 404 group.Go(func() error { 405 return state.ProcessQueue(ctx) 406 }) 407 408 if cli.TracingEndpoint != "" { 409 group.Go(func() error { 410 return startTelemetry(ctx, cli.TracingEndpoint) 411 }) 412 } 413 414 if cli.Secure { 415 group.Go(func() error { 416 return a.ServeHTTPS(ctx) 417 }) 418 group.Go(func() error { 419 return a.ServeHTTPRedirect(ctx) 420 }) 421 if cli.RTMPServerAddon != "" { 422 group.Go(func() error { 423 return rtmps.ServeRTMPS(ctx, &cli) 424 }) 425 } 426 } else { 427 group.Go(func() error { 428 return a.ServeHTTP(ctx) 429 }) 430 } 431 432 group.Go(func() error { 433 return a.ServeInternalHTTP(ctx) 434 }) 435 436 if !cli.NoFirehose { 437 group.Go(func() error { 438 return atsync.StartFirehose(ctx) 439 }) 440 } 441 for _, labeler := range cli.Labelers { 442 group.Go(func() error { 443 return atsync.StartLabelerFirehose(ctx, labeler) 444 }) 445 } 446 447 group.Go(func() error { 448 return spmetrics.ExpireSessions(ctx) 449 }) 450 451 group.Go(func() error { 452 return mod.StartSegmentCleaner(ctx) 453 }) 454 455 if cli.LivepeerGateway { 456 // make a file to make sure the directory exists 457 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 458 if err != nil { 459 return err 460 } 461 fd.Close() 462 if err != nil { 463 return err 464 } 465 group.Go(func() error { 466 return GoLivepeer(ctx, fs) 467 }) 468 } 469 470 group.Go(func() error { 471 return d.Start(ctx) 472 }) 473 474 if cli.TestStream { 475 // regular stream self-test 476 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 477 Schema: schema, 478 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 479 }) 480 if err != nil { 481 return err 482 } 483 atkey, err := atproto.ParsePubKey(signer.Public()) 484 if err != nil { 485 return err 486 } 487 did := atkey.DIDKey() 488 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 489 if err != nil { 490 return err 491 } 492 err = mod.UpdateIdentity(&model.Identity{ 493 ID: testMediaSigner.Pub().String(), 494 Handle: "stream-self-tester", 495 DID: "", 496 }) 497 if err != nil { 498 return err 499 } 500 cli.AllowedStreams = append(cli.AllowedStreams, did) 501 a.Aliases["self-test"] = did 502 group.Go(func() error { 503 return mm.TestSource(ctx, testMediaSigner) 504 }) 505 506 // Start a test stream that will run intermittently 507 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 508 Schema: schema, 509 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 510 }) 511 if err != nil { 512 return err 513 } 514 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 515 if err != nil { 516 return err 517 } 518 did2 := atkey2.DIDKey() 519 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 520 if err != nil { 521 return err 522 } 523 err = mod.UpdateIdentity(&model.Identity{ 524 ID: intermittentMediaSigner.Pub().String(), 525 Handle: "stream-intermittent-tester", 526 DID: "", 527 }) 528 if err != nil { 529 return err 530 } 531 cli.AllowedStreams = append(cli.AllowedStreams, did2) 532 a.Aliases["intermittent-self-test"] = did2 533 534 group.Go(func() error { 535 for { 536 // Start intermittent stream 537 intermittentCtx, cancel := context.WithCancel(ctx) 538 done := make(chan struct{}) 539 go func() { 540 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 541 close(done) 542 }() 543 // Stream ON for 15 seconds 544 time.Sleep(15 * time.Second) 545 // Stop stream 546 cancel() 547 <-done // Wait for TestSource to exit 548 // Stream OFF for 15 seconds 549 time.Sleep(15 * time.Second) 550 } 551 }) 552 } 553 554 for _, job := range platformJobs { 555 group.Go(func() error { 556 return job(ctx, &cli) 557 }) 558 } 559 560 if cli.WHIPTest != "" { 561 group.Go(func() error { 562 err := WHIP(strings.Split(cli.WHIPTest, " ")) 563 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 564 time.Sleep(time.Second * 3) 565 // gst.Deinit() 566 log.Warn(ctx, "gst deinit complete, exiting") 567 return err 568 }) 569 } 570 571 return group.Wait() 572} 573 574var ErrCaughtSignal = errors.New("caught signal") 575 576func handleSignals(ctx context.Context) error { 577 c := make(chan os.Signal, 1) 578 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 579 for { 580 select { 581 case s := <-c: 582 if s == syscall.SIGABRT { 583 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 584 log.Error(ctx, "failed to create pprof", "error", err) 585 } 586 } 587 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 588 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 589 case <-ctx.Done(): 590 return nil 591 } 592 } 593}