Live video on the AT Protocol
at eli/oatproxy-panic 667 lines 17 kB view raw
1package cmd 2 3import ( 4 "bytes" 5 "context" 6 "crypto" 7 "crypto/rand" 8 "errors" 9 "flag" 10 "fmt" 11 "net/url" 12 "os" 13 "os/signal" 14 "path/filepath" 15 "runtime" 16 "runtime/pprof" 17 "strconv" 18 "strings" 19 "syscall" 20 "time" 21 22 "github.com/bluesky-social/indigo/carstore" 23 "github.com/ethereum/go-ethereum/common/hexutil" 24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 25 "github.com/peterbourgon/ff/v3" 26 "github.com/streamplace/oatproxy/pkg/oatproxy" 27 "golang.org/x/term" 28 "stream.place/streamplace/pkg/aqhttp" 29 "stream.place/streamplace/pkg/atproto" 30 "stream.place/streamplace/pkg/bus" 31 "stream.place/streamplace/pkg/crypto/signers" 32 "stream.place/streamplace/pkg/crypto/signers/eip712" 33 "stream.place/streamplace/pkg/director" 34 "stream.place/streamplace/pkg/log" 35 "stream.place/streamplace/pkg/media" 36 "stream.place/streamplace/pkg/notifications" 37 "stream.place/streamplace/pkg/replication/iroh_replicator" 38 "stream.place/streamplace/pkg/rtmps" 39 v0 "stream.place/streamplace/pkg/schema/v0" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/statedb" 42 "stream.place/streamplace/pkg/storage" 43 44 "github.com/ThalesGroup/crypto11" 45 _ "github.com/go-gst/go-glib/glib" 46 _ "github.com/go-gst/go-gst/gst" 47 "stream.place/streamplace/pkg/api" 48 "stream.place/streamplace/pkg/config" 49 "stream.place/streamplace/pkg/model" 50) 51 52// Additional jobs that can be injected by platforms 53type jobFunc func(ctx context.Context, cli *config.CLI) error 54 55// parse the CLI and fire up an streamplace node! 56func start(build *config.BuildFlags, platformJobs []jobFunc) error { 57 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 58 err := media.RunSelfTest(context.Background()) 59 if err != nil { 60 if selfTest { 61 fmt.Println(err.Error()) 62 os.Exit(1) 63 } else { 64 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 65 if retryCount >= 3 { 66 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 67 return err 68 } 69 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 70 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 71 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 72 if err != nil { 73 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 74 return err 75 } 76 panic("invalid code path: exec succeeded but we're still here???") 77 } 78 } 79 if selfTest { 80 runtime.GC() 81 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 82 log.Error(context.Background(), "error creating pprof", "error", err) 83 } 84 fmt.Println("self-test successful!") 85 os.Exit(0) 86 } 87 88 if len(os.Args) > 1 && os.Args[1] == "stream" { 89 if len(os.Args) != 3 { 90 fmt.Println("usage: streamplace stream [user]") 91 os.Exit(1) 92 } 93 return Stream(os.Args[2]) 94 } 95 96 if len(os.Args) > 1 && os.Args[1] == "live" { 97 cli := config.CLI{Build: build} 98 fs := cli.NewFlagSet("streamplace live") 99 100 err := cli.Parse(fs, os.Args[2:]) 101 if err != nil { 102 return err 103 } 104 105 args := fs.Args() 106 if len(args) != 1 { 107 fmt.Println("usage: streamplace live [flags] [stream-key]") 108 os.Exit(1) 109 } 110 111 return Live(args[0], cli.HTTPInternalAddr) 112 } 113 114 if len(os.Args) > 1 && os.Args[1] == "sign" { 115 return Sign(context.Background()) 116 } 117 118 if len(os.Args) > 1 && os.Args[1] == "whep" { 119 return WHEP(os.Args[2:]) 120 } 121 if len(os.Args) > 1 && os.Args[1] == "whip" { 122 return WHIP(os.Args[2:]) 123 } 124 125 if len(os.Args) > 1 && os.Args[1] == "clip" { 126 cli := config.CLI{Build: build} 127 fs := cli.NewFlagSet("streamplace clip") 128 out := fs.String("out", "", "output file") 129 130 err := cli.Parse(fs, os.Args[2:]) 131 if err != nil { 132 return err 133 } 134 ctx := context.Background() 135 ctx = log.WithDebugValue(ctx, cli.Debug) 136 return Clip(ctx, fs.Args(), *out) 137 } 138 139 if len(os.Args) > 1 && os.Args[1] == "self-test" { 140 err := media.RunSelfTest(context.Background()) 141 if err != nil { 142 fmt.Println(err.Error()) 143 os.Exit(1) 144 } 145 fmt.Println("self-test successful!") 146 os.Exit(0) 147 } 148 149 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 150 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 151 _ = starter.NewLivepeerConfig(lpfs) 152 err = ff.Parse(lpfs, os.Args[2:], 153 ff.WithConfigFileFlag("config"), 154 ff.WithEnvVarPrefix("LP"), 155 ) 156 if err != nil { 157 return err 158 } 159 err = GoLivepeer(context.Background(), lpfs) 160 if err != nil { 161 log.Error(context.Background(), "error in livepeer", "error", err) 162 os.Exit(1) 163 } 164 os.Exit(0) 165 } 166 167 _ = flag.Set("logtostderr", "true") 168 vFlag := flag.Lookup("v") 169 cli := config.CLI{Build: build} 170 fs := cli.NewFlagSet("streamplace") 171 verbosity := fs.String("v", "3", "log verbosity level") 172 version := fs.Bool("version", false, "print version and exit") 173 174 err = cli.Parse( 175 fs, os.Args[1:], 176 ) 177 if err != nil { 178 return err 179 } 180 181 err = flag.CommandLine.Parse(nil) 182 if err != nil { 183 return err 184 } 185 _ = vFlag.Value.Set(*verbosity) 186 log.SetColorLogger(cli.Color) 187 ctx := context.Background() 188 ctx = log.WithDebugValue(ctx, cli.Debug) 189 190 log.Log(ctx, 191 "streamplace", 192 "version", build.Version, 193 "buildTime", build.BuildTimeStr(), 194 "uuid", build.UUID, 195 "runtime.GOOS", runtime.GOOS, 196 "runtime.GOARCH", runtime.GOARCH, 197 "runtime.Version", runtime.Version()) 198 if *version { 199 return nil 200 } 201 202 if len(os.Args) > 1 && os.Args[1] == "migrate" { 203 return statedb.Migrate(&cli) 204 } 205 206 spmetrics.Version.WithLabelValues(build.Version).Inc() 207 if cli.LivepeerHelp { 208 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 209 _ = starter.NewLivepeerConfig(lpFlags) 210 lpFlags.VisitAll(func(f *flag.Flag) { 211 adapted := config.ToSnakeCase(f.Name) 212 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 213 usage := fmt.Sprintf(" %s", f.Usage) 214 if f.DefValue != "" { 215 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 216 } 217 fmt.Printf(" %s\n", usage) 218 }) 219 return nil 220 } 221 222 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 223 224 err = os.MkdirAll(cli.DataDir, os.ModePerm) 225 if err != nil { 226 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 227 } 228 schema, err := v0.MakeV0Schema() 229 if err != nil { 230 return err 231 } 232 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 233 Schema: schema, 234 EthKeystorePath: cli.EthKeystorePath, 235 EthAccountAddr: cli.EthAccountAddr, 236 EthKeystorePassword: cli.EthPassword, 237 }) 238 if err != nil { 239 return err 240 } 241 var signer crypto.Signer = eip712signer 242 if cli.PKCS11ModulePath != "" { 243 conf := &crypto11.Config{ 244 Path: cli.PKCS11ModulePath, 245 } 246 count := 0 247 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 248 if val != "" { 249 count += 1 250 } 251 } 252 if count != 1 { 253 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 254 } 255 if cli.PKCS11TokenSlot != "" { 256 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 257 if err != nil { 258 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 259 } 260 numint := int(num) 261 // why does crypto11 want this as a reference? odd. 262 conf.SlotNumber = &numint 263 } 264 if cli.PKCS11TokenLabel != "" { 265 conf.TokenLabel = cli.PKCS11TokenLabel 266 } 267 if cli.PKCS11TokenSerial != "" { 268 conf.TokenSerial = cli.PKCS11TokenSerial 269 } 270 pin := cli.PKCS11Pin 271 if pin == "" { 272 fmt.Printf("Please enter PKCS11 PIN: ") 273 password, err := term.ReadPassword(int(os.Stdin.Fd())) 274 fmt.Println("") 275 if err != nil { 276 return fmt.Errorf("error reading PKCS11 password: %w", err) 277 } 278 pin = string(password) 279 } 280 conf.Pin = pin 281 282 sc, err := crypto11.Configure(conf) 283 if err != nil { 284 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 285 } 286 var id []byte = nil 287 var label []byte = nil 288 if cli.PKCS11KeypairID != "" { 289 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 290 if err != nil { 291 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 292 } 293 id = []byte{byte(num)} 294 } 295 if cli.PKCS11KeypairLabel != "" { 296 label = []byte(cli.PKCS11KeypairLabel) 297 } 298 hwsigner, err := sc.FindKeyPair(id, label) 299 if err != nil { 300 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 301 } 302 if hwsigner == nil { 303 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 304 } 305 addr, err := signers.HexAddrFromSigner(hwsigner) 306 if err != nil { 307 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 308 } 309 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 310 signer = hwsigner 311 } 312 313 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 314 if err != nil { 315 return err 316 } 317 var noter notifications.FirebaseNotifier 318 if cli.FirebaseServiceAccount != "" { 319 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 320 if err != nil { 321 return err 322 } 323 } 324 325 group, ctx := TimeoutGroupWithContext(ctx) 326 327 out := carstore.SQLiteStore{} 328 err = out.Open(":memory:") 329 if err != nil { 330 return err 331 } 332 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 333 if err != nil { 334 return err 335 } 336 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 337 if err != nil { 338 return err 339 } 340 defer handle.Close() 341 342 jwk, err := state.EnsureJWK(ctx, "jwk") 343 if err != nil { 344 return err 345 } 346 cli.JWK = jwk 347 348 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 349 if err != nil { 350 return err 351 } 352 cli.AccessJWK = accessJWK 353 354 b := bus.NewBus() 355 atsync := &atproto.ATProtoSynchronizer{ 356 CLI: &cli, 357 Model: mod, 358 StatefulDB: state, 359 Noter: noter, 360 Bus: b, 361 } 362 err = atsync.Migrate(ctx) 363 if err != nil { 364 return fmt.Errorf("failed to migrate: %w", err) 365 } 366 367 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync) 368 if err != nil { 369 return err 370 } 371 372 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod) 373 if err != nil { 374 return err 375 } 376 377 var clientMetadata *oatproxy.OAuthClientMetadata 378 var host string 379 if cli.PublicOAuth { 380 u, err := url.Parse(cli.OwnPublicURL()) 381 if err != nil { 382 return err 383 } 384 host = u.Host 385 clientMetadata = &oatproxy.OAuthClientMetadata{ 386 Scope: "atproto transition:generic", 387 ClientName: "Streamplace", 388 RedirectURIs: []string{ 389 fmt.Sprintf("%s/login", cli.OwnPublicURL()), 390 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()), 391 }, 392 } 393 } else { 394 host = cli.BroadcasterHost 395 clientMetadata = &oatproxy.OAuthClientMetadata{ 396 Scope: "atproto transition:generic", 397 ClientName: "Streamplace", 398 RedirectURIs: []string{ 399 fmt.Sprintf("https://%s/login", cli.BroadcasterHost), 400 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost), 401 }, 402 } 403 } 404 405 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 406 if err != nil { 407 return err 408 } 409 if !exists { 410 secret := make([]byte, 32) 411 _, err := rand.Read(secret) 412 if err != nil { 413 return fmt.Errorf("failed to generate random secret: %w", err) 414 } 415 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 416 if err != nil { 417 return err 418 } 419 } 420 buf := bytes.Buffer{} 421 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 422 if err != nil { 423 return err 424 } 425 secret := buf.Bytes() 426 var topic []byte 427 if cli.IrohTopic != "" { 428 topic, err = hexutil.Decode("0x" + cli.IrohTopic) 429 if err != nil { 430 return err 431 } 432 } 433 swarm, err := iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 434 if err != nil { 435 return err 436 } 437 438 op := oatproxy.New(&oatproxy.Config{ 439 Host: host, 440 CreateOAuthSession: state.CreateOAuthSession, 441 UpdateOAuthSession: state.UpdateOAuthSession, 442 GetOAuthSession: state.LoadOAuthSession, 443 Lock: state.GetNamedLock, 444 Scope: "atproto transition:generic", 445 UpstreamJWK: cli.JWK, 446 DownstreamJWK: cli.AccessJWK, 447 ClientMetadata: clientMetadata, 448 Public: cli.PublicOAuth, 449 }) 450 d := director.NewDirector(mm, mod, &cli, b, op, state, swarm) 451 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 452 if err != nil { 453 return err 454 } 455 456 ctx = log.WithLogValues(ctx, "version", build.Version) 457 458 group.Go(func() error { 459 return handleSignals(ctx) 460 }) 461 462 group.Go(func() error { 463 return state.ProcessQueue(ctx) 464 }) 465 466 if cli.TracingEndpoint != "" { 467 group.Go(func() error { 468 return startTelemetry(ctx, cli.TracingEndpoint) 469 }) 470 } 471 472 if cli.Secure { 473 group.Go(func() error { 474 return a.ServeHTTPS(ctx) 475 }) 476 group.Go(func() error { 477 return a.ServeHTTPRedirect(ctx) 478 }) 479 if cli.RTMPServerAddon != "" { 480 group.Go(func() error { 481 return rtmps.ServeRTMPS(ctx, &cli) 482 }) 483 } 484 } else { 485 group.Go(func() error { 486 return a.ServeHTTP(ctx) 487 }) 488 } 489 490 group.Go(func() error { 491 return a.ServeInternalHTTP(ctx) 492 }) 493 494 if !cli.NoFirehose { 495 group.Go(func() error { 496 return atsync.StartFirehose(ctx) 497 }) 498 } 499 for _, labeler := range cli.Labelers { 500 group.Go(func() error { 501 return atsync.StartLabelerFirehose(ctx, labeler) 502 }) 503 } 504 505 group.Go(func() error { 506 return a.ExpireSessions(ctx) 507 }) 508 509 group.Go(func() error { 510 return storage.StartSegmentCleaner(ctx, mod, &cli) 511 }) 512 513 group.Go(func() error { 514 return mod.StartSegmentCleaner(ctx) 515 }) 516 517 group.Go(func() error { 518 return swarm.Start(ctx, cli.Tickets) 519 }) 520 521 if cli.LivepeerGateway { 522 // make a file to make sure the directory exists 523 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 524 if err != nil { 525 return err 526 } 527 fd.Close() 528 if err != nil { 529 return err 530 } 531 group.Go(func() error { 532 err := GoLivepeer(ctx, fs) 533 if err != nil { 534 return err 535 } 536 // livepeer returns nil on error, so we need to check if we're responsible 537 if ctx.Err() == nil { 538 return fmt.Errorf("livepeer exited") 539 } 540 return nil 541 }) 542 } 543 544 group.Go(func() error { 545 return d.Start(ctx) 546 }) 547 548 if cli.TestStream { 549 // regular stream self-test 550 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 551 Schema: schema, 552 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 553 }) 554 if err != nil { 555 return err 556 } 557 atkey, err := atproto.ParsePubKey(signer.Public()) 558 if err != nil { 559 return err 560 } 561 did := atkey.DIDKey() 562 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner, mod) 563 if err != nil { 564 return err 565 } 566 err = mod.UpdateIdentity(&model.Identity{ 567 ID: testMediaSigner.Pub().String(), 568 Handle: "stream-self-tester", 569 DID: "", 570 }) 571 if err != nil { 572 return err 573 } 574 cli.AllowedStreams = append(cli.AllowedStreams, did) 575 a.Aliases["self-test"] = did 576 group.Go(func() error { 577 return mm.TestSource(ctx, testMediaSigner) 578 }) 579 580 // Start a test stream that will run intermittently 581 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 582 Schema: schema, 583 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 584 }) 585 if err != nil { 586 return err 587 } 588 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 589 if err != nil { 590 return err 591 } 592 did2 := atkey2.DIDKey() 593 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner, mod) 594 if err != nil { 595 return err 596 } 597 err = mod.UpdateIdentity(&model.Identity{ 598 ID: intermittentMediaSigner.Pub().String(), 599 Handle: "stream-intermittent-tester", 600 DID: "", 601 }) 602 if err != nil { 603 return err 604 } 605 cli.AllowedStreams = append(cli.AllowedStreams, did2) 606 a.Aliases["intermittent-self-test"] = did2 607 608 group.Go(func() error { 609 for { 610 // Start intermittent stream 611 intermittentCtx, cancel := context.WithCancel(ctx) 612 done := make(chan struct{}) 613 go func() { 614 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 615 close(done) 616 }() 617 // Stream ON for 15 seconds 618 time.Sleep(15 * time.Second) 619 // Stop stream 620 cancel() 621 <-done // Wait for TestSource to exit 622 // Stream OFF for 15 seconds 623 time.Sleep(15 * time.Second) 624 } 625 }) 626 } 627 628 for _, job := range platformJobs { 629 group.Go(func() error { 630 return job(ctx, &cli) 631 }) 632 } 633 634 if cli.WHIPTest != "" { 635 group.Go(func() error { 636 err := WHIP(strings.Split(cli.WHIPTest, " ")) 637 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 638 time.Sleep(time.Second * 3) 639 // gst.Deinit() 640 log.Warn(ctx, "gst deinit complete, exiting") 641 return err 642 }) 643 } 644 645 return group.Wait() 646} 647 648var ErrCaughtSignal = errors.New("caught signal") 649 650func handleSignals(ctx context.Context) error { 651 c := make(chan os.Signal, 1) 652 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 653 for { 654 select { 655 case s := <-c: 656 if s == syscall.SIGABRT { 657 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 658 log.Error(ctx, "failed to create pprof", "error", err) 659 } 660 } 661 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 662 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 663 case <-ctx.Done(): 664 return nil 665 } 666 } 667}