Live video on the AT Protocol
at eli/rust-logging 669 lines 17 kB view raw
1package cmd 2 3import ( 4 "bytes" 5 "context" 6 "crypto" 7 "crypto/rand" 8 "errors" 9 "flag" 10 "fmt" 11 "net/url" 12 "os" 13 "os/signal" 14 "path/filepath" 15 "runtime" 16 "runtime/pprof" 17 "strconv" 18 "strings" 19 "syscall" 20 "time" 21 22 "github.com/bluesky-social/indigo/carstore" 23 "github.com/ethereum/go-ethereum/common/hexutil" 24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 25 "github.com/peterbourgon/ff/v3" 26 "github.com/streamplace/oatproxy/pkg/oatproxy" 27 "golang.org/x/term" 28 "stream.place/streamplace/pkg/aqhttp" 29 "stream.place/streamplace/pkg/atproto" 30 "stream.place/streamplace/pkg/bus" 31 "stream.place/streamplace/pkg/crypto/signers" 32 "stream.place/streamplace/pkg/crypto/signers/eip712" 33 "stream.place/streamplace/pkg/director" 34 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 35 "stream.place/streamplace/pkg/log" 36 "stream.place/streamplace/pkg/media" 37 "stream.place/streamplace/pkg/notifications" 38 "stream.place/streamplace/pkg/replication/iroh_replicator" 39 "stream.place/streamplace/pkg/rtmps" 40 v0 "stream.place/streamplace/pkg/schema/v0" 41 "stream.place/streamplace/pkg/spmetrics" 42 "stream.place/streamplace/pkg/statedb" 43 "stream.place/streamplace/pkg/storage" 44 45 "github.com/ThalesGroup/crypto11" 46 _ "github.com/go-gst/go-glib/glib" 47 _ "github.com/go-gst/go-gst/gst" 48 "stream.place/streamplace/pkg/api" 49 "stream.place/streamplace/pkg/config" 50 "stream.place/streamplace/pkg/model" 51) 52 53// Additional jobs that can be injected by platforms 54type jobFunc func(ctx context.Context, cli *config.CLI) error 55 56// parse the CLI and fire up an streamplace node! 57func start(build *config.BuildFlags, platformJobs []jobFunc) error { 58 iroh_streamplace.InitLogging() 59 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 60 err := media.RunSelfTest(context.Background()) 61 if err != nil { 62 if selfTest { 63 fmt.Println(err.Error()) 64 os.Exit(1) 65 } else { 66 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 67 if retryCount >= 3 { 68 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 69 return err 70 } 71 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 72 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 73 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 74 if err != nil { 75 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 76 return err 77 } 78 panic("invalid code path: exec succeeded but we're still here???") 79 } 80 } 81 if selfTest { 82 runtime.GC() 83 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 84 log.Error(context.Background(), "error creating pprof", "error", err) 85 } 86 fmt.Println("self-test successful!") 87 os.Exit(0) 88 } 89 90 if len(os.Args) > 1 && os.Args[1] == "stream" { 91 if len(os.Args) != 3 { 92 fmt.Println("usage: streamplace stream [user]") 93 os.Exit(1) 94 } 95 return Stream(os.Args[2]) 96 } 97 98 if len(os.Args) > 1 && os.Args[1] == "live" { 99 cli := config.CLI{Build: build} 100 fs := cli.NewFlagSet("streamplace live") 101 102 err := cli.Parse(fs, os.Args[2:]) 103 if err != nil { 104 return err 105 } 106 107 args := fs.Args() 108 if len(args) != 1 { 109 fmt.Println("usage: streamplace live [flags] [stream-key]") 110 os.Exit(1) 111 } 112 113 return Live(args[0], cli.HTTPInternalAddr) 114 } 115 116 if len(os.Args) > 1 && os.Args[1] == "sign" { 117 return Sign(context.Background()) 118 } 119 120 if len(os.Args) > 1 && os.Args[1] == "whep" { 121 return WHEP(os.Args[2:]) 122 } 123 if len(os.Args) > 1 && os.Args[1] == "whip" { 124 return WHIP(os.Args[2:]) 125 } 126 127 if len(os.Args) > 1 && os.Args[1] == "clip" { 128 cli := config.CLI{Build: build} 129 fs := cli.NewFlagSet("streamplace clip") 130 out := fs.String("out", "", "output file") 131 132 err := cli.Parse(fs, os.Args[2:]) 133 if err != nil { 134 return err 135 } 136 ctx := context.Background() 137 ctx = log.WithDebugValue(ctx, cli.Debug) 138 return Clip(ctx, fs.Args(), *out) 139 } 140 141 if len(os.Args) > 1 && os.Args[1] == "self-test" { 142 err := media.RunSelfTest(context.Background()) 143 if err != nil { 144 fmt.Println(err.Error()) 145 os.Exit(1) 146 } 147 fmt.Println("self-test successful!") 148 os.Exit(0) 149 } 150 151 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 152 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 153 _ = starter.NewLivepeerConfig(lpfs) 154 err = ff.Parse(lpfs, os.Args[2:], 155 ff.WithConfigFileFlag("config"), 156 ff.WithEnvVarPrefix("LP"), 157 ) 158 if err != nil { 159 return err 160 } 161 err = GoLivepeer(context.Background(), lpfs) 162 if err != nil { 163 log.Error(context.Background(), "error in livepeer", "error", err) 164 os.Exit(1) 165 } 166 os.Exit(0) 167 } 168 169 _ = flag.Set("logtostderr", "true") 170 vFlag := flag.Lookup("v") 171 cli := config.CLI{Build: build} 172 fs := cli.NewFlagSet("streamplace") 173 verbosity := fs.String("v", "3", "log verbosity level") 174 version := fs.Bool("version", false, "print version and exit") 175 176 err = cli.Parse( 177 fs, os.Args[1:], 178 ) 179 if err != nil { 180 return err 181 } 182 183 err = flag.CommandLine.Parse(nil) 184 if err != nil { 185 return err 186 } 187 _ = vFlag.Value.Set(*verbosity) 188 log.SetColorLogger(cli.Color) 189 ctx := context.Background() 190 ctx = log.WithDebugValue(ctx, cli.Debug) 191 192 log.Log(ctx, 193 "streamplace", 194 "version", build.Version, 195 "buildTime", build.BuildTimeStr(), 196 "uuid", build.UUID, 197 "runtime.GOOS", runtime.GOOS, 198 "runtime.GOARCH", runtime.GOARCH, 199 "runtime.Version", runtime.Version()) 200 if *version { 201 return nil 202 } 203 204 if len(os.Args) > 1 && os.Args[1] == "migrate" { 205 return statedb.Migrate(&cli) 206 } 207 208 spmetrics.Version.WithLabelValues(build.Version).Inc() 209 if cli.LivepeerHelp { 210 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 211 _ = starter.NewLivepeerConfig(lpFlags) 212 lpFlags.VisitAll(func(f *flag.Flag) { 213 adapted := config.ToSnakeCase(f.Name) 214 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 215 usage := fmt.Sprintf(" %s", f.Usage) 216 if f.DefValue != "" { 217 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 218 } 219 fmt.Printf(" %s\n", usage) 220 }) 221 return nil 222 } 223 224 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 225 226 err = os.MkdirAll(cli.DataDir, os.ModePerm) 227 if err != nil { 228 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 229 } 230 schema, err := v0.MakeV0Schema() 231 if err != nil { 232 return err 233 } 234 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 235 Schema: schema, 236 EthKeystorePath: cli.EthKeystorePath, 237 EthAccountAddr: cli.EthAccountAddr, 238 EthKeystorePassword: cli.EthPassword, 239 }) 240 if err != nil { 241 return err 242 } 243 var signer crypto.Signer = eip712signer 244 if cli.PKCS11ModulePath != "" { 245 conf := &crypto11.Config{ 246 Path: cli.PKCS11ModulePath, 247 } 248 count := 0 249 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 250 if val != "" { 251 count += 1 252 } 253 } 254 if count != 1 { 255 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 256 } 257 if cli.PKCS11TokenSlot != "" { 258 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 259 if err != nil { 260 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 261 } 262 numint := int(num) 263 // why does crypto11 want this as a reference? odd. 264 conf.SlotNumber = &numint 265 } 266 if cli.PKCS11TokenLabel != "" { 267 conf.TokenLabel = cli.PKCS11TokenLabel 268 } 269 if cli.PKCS11TokenSerial != "" { 270 conf.TokenSerial = cli.PKCS11TokenSerial 271 } 272 pin := cli.PKCS11Pin 273 if pin == "" { 274 fmt.Printf("Please enter PKCS11 PIN: ") 275 password, err := term.ReadPassword(int(os.Stdin.Fd())) 276 fmt.Println("") 277 if err != nil { 278 return fmt.Errorf("error reading PKCS11 password: %w", err) 279 } 280 pin = string(password) 281 } 282 conf.Pin = pin 283 284 sc, err := crypto11.Configure(conf) 285 if err != nil { 286 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 287 } 288 var id []byte = nil 289 var label []byte = nil 290 if cli.PKCS11KeypairID != "" { 291 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 292 if err != nil { 293 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 294 } 295 id = []byte{byte(num)} 296 } 297 if cli.PKCS11KeypairLabel != "" { 298 label = []byte(cli.PKCS11KeypairLabel) 299 } 300 hwsigner, err := sc.FindKeyPair(id, label) 301 if err != nil { 302 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 303 } 304 if hwsigner == nil { 305 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 306 } 307 addr, err := signers.HexAddrFromSigner(hwsigner) 308 if err != nil { 309 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 310 } 311 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 312 signer = hwsigner 313 } 314 315 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 316 if err != nil { 317 return err 318 } 319 var noter notifications.FirebaseNotifier 320 if cli.FirebaseServiceAccount != "" { 321 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 322 if err != nil { 323 return err 324 } 325 } 326 327 group, ctx := TimeoutGroupWithContext(ctx) 328 329 out := carstore.SQLiteStore{} 330 err = out.Open(":memory:") 331 if err != nil { 332 return err 333 } 334 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 335 if err != nil { 336 return err 337 } 338 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 339 if err != nil { 340 return err 341 } 342 defer handle.Close() 343 344 jwk, err := state.EnsureJWK(ctx, "jwk") 345 if err != nil { 346 return err 347 } 348 cli.JWK = jwk 349 350 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 351 if err != nil { 352 return err 353 } 354 cli.AccessJWK = accessJWK 355 356 b := bus.NewBus() 357 atsync := &atproto.ATProtoSynchronizer{ 358 CLI: &cli, 359 Model: mod, 360 StatefulDB: state, 361 Noter: noter, 362 Bus: b, 363 } 364 err = atsync.Migrate(ctx) 365 if err != nil { 366 return fmt.Errorf("failed to migrate: %w", err) 367 } 368 369 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync) 370 if err != nil { 371 return err 372 } 373 374 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod) 375 if err != nil { 376 return err 377 } 378 379 var clientMetadata *oatproxy.OAuthClientMetadata 380 var host string 381 if cli.PublicOAuth { 382 u, err := url.Parse(cli.OwnPublicURL()) 383 if err != nil { 384 return err 385 } 386 host = u.Host 387 clientMetadata = &oatproxy.OAuthClientMetadata{ 388 Scope: "atproto transition:generic", 389 ClientName: "Streamplace", 390 RedirectURIs: []string{ 391 fmt.Sprintf("%s/login", cli.OwnPublicURL()), 392 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()), 393 }, 394 } 395 } else { 396 host = cli.BroadcasterHost 397 clientMetadata = &oatproxy.OAuthClientMetadata{ 398 Scope: "atproto transition:generic", 399 ClientName: "Streamplace", 400 RedirectURIs: []string{ 401 fmt.Sprintf("https://%s/login", cli.BroadcasterHost), 402 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost), 403 }, 404 } 405 } 406 407 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 408 if err != nil { 409 return err 410 } 411 if !exists { 412 secret := make([]byte, 32) 413 _, err := rand.Read(secret) 414 if err != nil { 415 return fmt.Errorf("failed to generate random secret: %w", err) 416 } 417 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 418 if err != nil { 419 return err 420 } 421 } 422 buf := bytes.Buffer{} 423 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 424 if err != nil { 425 return err 426 } 427 secret := buf.Bytes() 428 var topic []byte 429 if cli.IrohTopic != "" { 430 topic, err = hexutil.Decode("0x" + cli.IrohTopic) 431 if err != nil { 432 return err 433 } 434 } 435 swarm, err := iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 436 if err != nil { 437 return err 438 } 439 440 op := oatproxy.New(&oatproxy.Config{ 441 Host: host, 442 CreateOAuthSession: state.CreateOAuthSession, 443 UpdateOAuthSession: state.UpdateOAuthSession, 444 GetOAuthSession: state.LoadOAuthSession, 445 Lock: state.GetNamedLock, 446 Scope: "atproto transition:generic", 447 UpstreamJWK: cli.JWK, 448 DownstreamJWK: cli.AccessJWK, 449 ClientMetadata: clientMetadata, 450 Public: cli.PublicOAuth, 451 }) 452 d := director.NewDirector(mm, mod, &cli, b, op, state, swarm) 453 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 454 if err != nil { 455 return err 456 } 457 458 ctx = log.WithLogValues(ctx, "version", build.Version) 459 460 group.Go(func() error { 461 return handleSignals(ctx) 462 }) 463 464 group.Go(func() error { 465 return state.ProcessQueue(ctx) 466 }) 467 468 if cli.TracingEndpoint != "" { 469 group.Go(func() error { 470 return startTelemetry(ctx, cli.TracingEndpoint) 471 }) 472 } 473 474 if cli.Secure { 475 group.Go(func() error { 476 return a.ServeHTTPS(ctx) 477 }) 478 group.Go(func() error { 479 return a.ServeHTTPRedirect(ctx) 480 }) 481 if cli.RTMPServerAddon != "" { 482 group.Go(func() error { 483 return rtmps.ServeRTMPS(ctx, &cli) 484 }) 485 } 486 } else { 487 group.Go(func() error { 488 return a.ServeHTTP(ctx) 489 }) 490 } 491 492 group.Go(func() error { 493 return a.ServeInternalHTTP(ctx) 494 }) 495 496 if !cli.NoFirehose { 497 group.Go(func() error { 498 return atsync.StartFirehose(ctx) 499 }) 500 } 501 for _, labeler := range cli.Labelers { 502 group.Go(func() error { 503 return atsync.StartLabelerFirehose(ctx, labeler) 504 }) 505 } 506 507 group.Go(func() error { 508 return a.ExpireSessions(ctx) 509 }) 510 511 group.Go(func() error { 512 return storage.StartSegmentCleaner(ctx, mod, &cli) 513 }) 514 515 group.Go(func() error { 516 return mod.StartSegmentCleaner(ctx) 517 }) 518 519 group.Go(func() error { 520 return swarm.Start(ctx, cli.Tickets) 521 }) 522 523 if cli.LivepeerGateway { 524 // make a file to make sure the directory exists 525 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 526 if err != nil { 527 return err 528 } 529 fd.Close() 530 if err != nil { 531 return err 532 } 533 group.Go(func() error { 534 err := GoLivepeer(ctx, fs) 535 if err != nil { 536 return err 537 } 538 // livepeer returns nil on error, so we need to check if we're responsible 539 if ctx.Err() == nil { 540 return fmt.Errorf("livepeer exited") 541 } 542 return nil 543 }) 544 } 545 546 group.Go(func() error { 547 return d.Start(ctx) 548 }) 549 550 if cli.TestStream { 551 // regular stream self-test 552 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 553 Schema: schema, 554 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 555 }) 556 if err != nil { 557 return err 558 } 559 atkey, err := atproto.ParsePubKey(signer.Public()) 560 if err != nil { 561 return err 562 } 563 did := atkey.DIDKey() 564 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner, mod) 565 if err != nil { 566 return err 567 } 568 err = mod.UpdateIdentity(&model.Identity{ 569 ID: testMediaSigner.Pub().String(), 570 Handle: "stream-self-tester", 571 DID: "", 572 }) 573 if err != nil { 574 return err 575 } 576 cli.AllowedStreams = append(cli.AllowedStreams, did) 577 a.Aliases["self-test"] = did 578 group.Go(func() error { 579 return mm.TestSource(ctx, testMediaSigner) 580 }) 581 582 // Start a test stream that will run intermittently 583 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 584 Schema: schema, 585 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 586 }) 587 if err != nil { 588 return err 589 } 590 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 591 if err != nil { 592 return err 593 } 594 did2 := atkey2.DIDKey() 595 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner, mod) 596 if err != nil { 597 return err 598 } 599 err = mod.UpdateIdentity(&model.Identity{ 600 ID: intermittentMediaSigner.Pub().String(), 601 Handle: "stream-intermittent-tester", 602 DID: "", 603 }) 604 if err != nil { 605 return err 606 } 607 cli.AllowedStreams = append(cli.AllowedStreams, did2) 608 a.Aliases["intermittent-self-test"] = did2 609 610 group.Go(func() error { 611 for { 612 // Start intermittent stream 613 intermittentCtx, cancel := context.WithCancel(ctx) 614 done := make(chan struct{}) 615 go func() { 616 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 617 close(done) 618 }() 619 // Stream ON for 15 seconds 620 time.Sleep(15 * time.Second) 621 // Stop stream 622 cancel() 623 <-done // Wait for TestSource to exit 624 // Stream OFF for 15 seconds 625 time.Sleep(15 * time.Second) 626 } 627 }) 628 } 629 630 for _, job := range platformJobs { 631 group.Go(func() error { 632 return job(ctx, &cli) 633 }) 634 } 635 636 if cli.WHIPTest != "" { 637 group.Go(func() error { 638 err := WHIP(strings.Split(cli.WHIPTest, " ")) 639 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 640 time.Sleep(time.Second * 3) 641 // gst.Deinit() 642 log.Warn(ctx, "gst deinit complete, exiting") 643 return err 644 }) 645 } 646 647 return group.Wait() 648} 649 650var ErrCaughtSignal = errors.New("caught signal") 651 652func handleSignals(ctx context.Context) error { 653 c := make(chan os.Signal, 1) 654 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 655 for { 656 select { 657 case s := <-c: 658 if s == syscall.SIGABRT { 659 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 660 log.Error(ctx, "failed to create pprof", "error", err) 661 } 662 } 663 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 664 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 665 case <-ctx.Done(): 666 return nil 667 } 668 } 669}