Live video on the AT Protocol
at eli/deterministic-muxing 610 lines 16 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/bluesky-social/indigo/carstore" 20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 21 "github.com/peterbourgon/ff/v3" 22 "github.com/streamplace/oatproxy/pkg/oatproxy" 23 "golang.org/x/term" 24 "stream.place/streamplace/pkg/aqhttp" 25 "stream.place/streamplace/pkg/atproto" 26 "stream.place/streamplace/pkg/bus" 27 "stream.place/streamplace/pkg/crypto/signers" 28 "stream.place/streamplace/pkg/crypto/signers/eip712" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/media" 32 "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/replication" 34 "stream.place/streamplace/pkg/replication/boring" 35 "stream.place/streamplace/pkg/rtmps" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 "stream.place/streamplace/pkg/spmetrics" 38 "stream.place/streamplace/pkg/statedb" 39 40 "github.com/ThalesGroup/crypto11" 41 _ "github.com/go-gst/go-glib/glib" 42 _ "github.com/go-gst/go-gst/gst" 43 "stream.place/streamplace/pkg/api" 44 "stream.place/streamplace/pkg/config" 45 "stream.place/streamplace/pkg/model" 46 47 _ "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 48) 49 50// Additional jobs that can be injected by platforms 51type jobFunc func(ctx context.Context, cli *config.CLI) error 52 53// parse the CLI and fire up an streamplace node! 54func start(build *config.BuildFlags, platformJobs []jobFunc) error { 55 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 56 err := media.RunSelfTest(context.Background()) 57 if err != nil { 58 if selfTest { 59 fmt.Println(err.Error()) 60 os.Exit(1) 61 } else { 62 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 63 if retryCount >= 3 { 64 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 65 return err 66 } 67 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 68 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 69 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 70 if err != nil { 71 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 72 return err 73 } 74 panic("invalid code path: exec succeeded but we're still here???") 75 } 76 } 77 if selfTest { 78 runtime.GC() 79 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 80 log.Error(context.Background(), "error creating pprof", "error", err) 81 } 82 fmt.Println("self-test successful!") 83 os.Exit(0) 84 } 85 86 if len(os.Args) > 1 && os.Args[1] == "stream" { 87 if len(os.Args) != 3 { 88 fmt.Println("usage: streamplace stream [user]") 89 os.Exit(1) 90 } 91 return Stream(os.Args[2]) 92 } 93 94 if len(os.Args) > 1 && os.Args[1] == "live" { 95 cli := config.CLI{Build: build} 96 fs := cli.NewFlagSet("streamplace live") 97 98 err := cli.Parse(fs, os.Args[2:]) 99 if err != nil { 100 return err 101 } 102 103 args := fs.Args() 104 if len(args) != 1 { 105 fmt.Println("usage: streamplace live [flags] [stream-key]") 106 os.Exit(1) 107 } 108 109 return Live(args[0], cli.HTTPInternalAddr) 110 } 111 112 if len(os.Args) > 1 && os.Args[1] == "sign" { 113 return Sign(context.Background()) 114 } 115 116 if len(os.Args) > 1 && os.Args[1] == "whep" { 117 return WHEP(os.Args[2:]) 118 } 119 if len(os.Args) > 1 && os.Args[1] == "whip" { 120 return WHIP(os.Args[2:]) 121 } 122 123 if len(os.Args) > 1 && os.Args[1] == "clip" { 124 cli := config.CLI{Build: build} 125 fs := cli.NewFlagSet("streamplace clip") 126 out := fs.String("out", "", "output file") 127 128 err := cli.Parse(fs, os.Args[2:]) 129 if err != nil { 130 return err 131 } 132 ctx := context.Background() 133 ctx = log.WithDebugValue(ctx, cli.Debug) 134 return Clip(ctx, fs.Args(), *out) 135 } 136 137 if len(os.Args) > 1 && os.Args[1] == "segment" { 138 cli := config.CLI{Build: build} 139 fs := cli.NewFlagSet("streamplace clip") 140 out := fs.String("out-dir", "", "output directory") 141 142 err := cli.Parse(fs, os.Args[2:]) 143 if err != nil { 144 return err 145 } 146 ctx := context.Background() 147 ctx = log.WithDebugValue(ctx, cli.Debug) 148 return media.SegmentFile(ctx, fs.Args()[0], *out) 149 } 150 151 if len(os.Args) > 1 && os.Args[1] == "self-test" { 152 err := media.RunSelfTest(context.Background()) 153 if err != nil { 154 fmt.Println(err.Error()) 155 os.Exit(1) 156 } 157 fmt.Println("self-test successful!") 158 os.Exit(0) 159 } 160 161 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 162 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 163 _ = starter.NewLivepeerConfig(lpfs) 164 err = ff.Parse(lpfs, os.Args[2:], 165 ff.WithConfigFileFlag("config"), 166 ff.WithEnvVarPrefix("LP"), 167 ) 168 if err != nil { 169 return err 170 } 171 err = GoLivepeer(context.Background(), lpfs) 172 if err != nil { 173 log.Error(context.Background(), "error in livepeer", "error", err) 174 os.Exit(1) 175 } 176 os.Exit(0) 177 } 178 179 _ = flag.Set("logtostderr", "true") 180 vFlag := flag.Lookup("v") 181 cli := config.CLI{Build: build} 182 fs := cli.NewFlagSet("streamplace") 183 verbosity := fs.String("v", "3", "log verbosity level") 184 version := fs.Bool("version", false, "print version and exit") 185 186 err = cli.Parse( 187 fs, os.Args[1:], 188 ) 189 if err != nil { 190 return err 191 } 192 err = flag.CommandLine.Parse(nil) 193 if err != nil { 194 return err 195 } 196 _ = vFlag.Value.Set(*verbosity) 197 log.SetColorLogger(cli.Color) 198 ctx := context.Background() 199 ctx = log.WithDebugValue(ctx, cli.Debug) 200 201 log.Log(ctx, 202 "streamplace", 203 "version", build.Version, 204 "buildTime", build.BuildTimeStr(), 205 "uuid", build.UUID, 206 "runtime.GOOS", runtime.GOOS, 207 "runtime.GOARCH", runtime.GOARCH, 208 "runtime.Version", runtime.Version()) 209 if *version { 210 return nil 211 } 212 213 if len(os.Args) > 1 && os.Args[1] == "migrate" { 214 return statedb.Migrate(&cli) 215 } 216 217 spmetrics.Version.WithLabelValues(build.Version).Inc() 218 if cli.LivepeerHelp { 219 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 220 _ = starter.NewLivepeerConfig(lpFlags) 221 lpFlags.VisitAll(func(f *flag.Flag) { 222 adapted := config.ToSnakeCase(f.Name) 223 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 224 usage := fmt.Sprintf(" %s", f.Usage) 225 if f.DefValue != "" { 226 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 227 } 228 fmt.Printf(" %s\n", usage) 229 }) 230 return nil 231 } 232 233 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 234 235 err = os.MkdirAll(cli.DataDir, os.ModePerm) 236 if err != nil { 237 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 238 } 239 schema, err := v0.MakeV0Schema() 240 if err != nil { 241 return err 242 } 243 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 244 Schema: schema, 245 EthKeystorePath: cli.EthKeystorePath, 246 EthAccountAddr: cli.EthAccountAddr, 247 EthKeystorePassword: cli.EthPassword, 248 }) 249 if err != nil { 250 return err 251 } 252 var signer crypto.Signer = eip712signer 253 if cli.PKCS11ModulePath != "" { 254 conf := &crypto11.Config{ 255 Path: cli.PKCS11ModulePath, 256 } 257 count := 0 258 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 259 if val != "" { 260 count += 1 261 } 262 } 263 if count != 1 { 264 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 265 } 266 if cli.PKCS11TokenSlot != "" { 267 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 268 if err != nil { 269 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 270 } 271 numint := int(num) 272 // why does crypto11 want this as a reference? odd. 273 conf.SlotNumber = &numint 274 } 275 if cli.PKCS11TokenLabel != "" { 276 conf.TokenLabel = cli.PKCS11TokenLabel 277 } 278 if cli.PKCS11TokenSerial != "" { 279 conf.TokenSerial = cli.PKCS11TokenSerial 280 } 281 pin := cli.PKCS11Pin 282 if pin == "" { 283 fmt.Printf("Please enter PKCS11 PIN: ") 284 password, err := term.ReadPassword(int(os.Stdin.Fd())) 285 fmt.Println("") 286 if err != nil { 287 return fmt.Errorf("error reading PKCS11 password: %w", err) 288 } 289 pin = string(password) 290 } 291 conf.Pin = pin 292 293 sc, err := crypto11.Configure(conf) 294 if err != nil { 295 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 296 } 297 var id []byte = nil 298 var label []byte = nil 299 if cli.PKCS11KeypairID != "" { 300 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 301 if err != nil { 302 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 303 } 304 id = []byte{byte(num)} 305 } 306 if cli.PKCS11KeypairLabel != "" { 307 label = []byte(cli.PKCS11KeypairLabel) 308 } 309 hwsigner, err := sc.FindKeyPair(id, label) 310 if err != nil { 311 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 312 } 313 if hwsigner == nil { 314 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 315 } 316 addr, err := signers.HexAddrFromSigner(hwsigner) 317 if err != nil { 318 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 319 } 320 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 321 signer = hwsigner 322 } 323 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 324 325 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 326 if err != nil { 327 return err 328 } 329 var noter notifications.FirebaseNotifier 330 if cli.FirebaseServiceAccount != "" { 331 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 332 if err != nil { 333 return err 334 } 335 } 336 337 group, ctx := TimeoutGroupWithContext(ctx) 338 339 out := carstore.SQLiteStore{} 340 err = out.Open(":memory:") 341 if err != nil { 342 return err 343 } 344 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 345 if err != nil { 346 return err 347 } 348 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 349 if err != nil { 350 return err 351 } 352 defer handle.Close() 353 354 jwk, err := state.EnsureJWK(ctx, "jwk") 355 if err != nil { 356 return err 357 } 358 cli.JWK = jwk 359 360 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 361 if err != nil { 362 return err 363 } 364 cli.AccessJWK = accessJWK 365 366 b := bus.NewBus() 367 atsync := &atproto.ATProtoSynchronizer{ 368 CLI: &cli, 369 Model: mod, 370 StatefulDB: state, 371 Noter: noter, 372 Bus: b, 373 } 374 err = atsync.Migrate(ctx) 375 if err != nil { 376 return fmt.Errorf("failed to migrate: %w", err) 377 } 378 379 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 380 if err != nil { 381 return err 382 } 383 384 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 385 if err != nil { 386 return err 387 } 388 389 clientMetadata := &oatproxy.OAuthClientMetadata{ 390 Scope: "atproto transition:generic", 391 ClientName: "Streamplace", 392 RedirectURIs: []string{ 393 fmt.Sprintf("https://%s/login", cli.PublicHost), 394 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 395 }, 396 } 397 398 op := oatproxy.New(&oatproxy.Config{ 399 Host: cli.PublicHost, 400 CreateOAuthSession: state.CreateOAuthSession, 401 UpdateOAuthSession: state.UpdateOAuthSession, 402 GetOAuthSession: state.LoadOAuthSession, 403 Lock: state.GetNamedLock, 404 Scope: "atproto transition:generic", 405 UpstreamJWK: cli.JWK, 406 DownstreamJWK: cli.AccessJWK, 407 ClientMetadata: clientMetadata, 408 }) 409 d := director.NewDirector(mm, mod, &cli, b, op, state) 410 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 411 if err != nil { 412 return err 413 } 414 415 ctx = log.WithLogValues(ctx, "version", build.Version) 416 417 group.Go(func() error { 418 return handleSignals(ctx) 419 }) 420 421 group.Go(func() error { 422 return state.ProcessQueue(ctx) 423 }) 424 425 if cli.TracingEndpoint != "" { 426 group.Go(func() error { 427 return startTelemetry(ctx, cli.TracingEndpoint) 428 }) 429 } 430 431 if cli.Secure { 432 group.Go(func() error { 433 return a.ServeHTTPS(ctx) 434 }) 435 group.Go(func() error { 436 return a.ServeHTTPRedirect(ctx) 437 }) 438 if cli.RTMPServerAddon != "" { 439 group.Go(func() error { 440 return rtmps.ServeRTMPS(ctx, &cli) 441 }) 442 } 443 } else { 444 group.Go(func() error { 445 return a.ServeHTTP(ctx) 446 }) 447 } 448 449 group.Go(func() error { 450 return a.ServeInternalHTTP(ctx) 451 }) 452 453 if !cli.NoFirehose { 454 group.Go(func() error { 455 return atsync.StartFirehose(ctx) 456 }) 457 } 458 for _, labeler := range cli.Labelers { 459 group.Go(func() error { 460 return atsync.StartLabelerFirehose(ctx, labeler) 461 }) 462 } 463 464 group.Go(func() error { 465 return spmetrics.ExpireSessions(ctx) 466 }) 467 468 group.Go(func() error { 469 return mod.StartSegmentCleaner(ctx) 470 }) 471 472 if cli.LivepeerGateway { 473 // make a file to make sure the directory exists 474 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 475 if err != nil { 476 return err 477 } 478 fd.Close() 479 if err != nil { 480 return err 481 } 482 group.Go(func() error { 483 return GoLivepeer(ctx, fs) 484 }) 485 } 486 487 group.Go(func() error { 488 return d.Start(ctx) 489 }) 490 491 if cli.TestStream { 492 // regular stream self-test 493 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 494 Schema: schema, 495 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 496 }) 497 if err != nil { 498 return err 499 } 500 atkey, err := atproto.ParsePubKey(signer.Public()) 501 if err != nil { 502 return err 503 } 504 did := atkey.DIDKey() 505 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 506 if err != nil { 507 return err 508 } 509 err = mod.UpdateIdentity(&model.Identity{ 510 ID: testMediaSigner.Pub().String(), 511 Handle: "stream-self-tester", 512 DID: "", 513 }) 514 if err != nil { 515 return err 516 } 517 cli.AllowedStreams = append(cli.AllowedStreams, did) 518 a.Aliases["self-test"] = did 519 group.Go(func() error { 520 return mm.TestSource(ctx, testMediaSigner) 521 }) 522 523 // Start a test stream that will run intermittently 524 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 525 Schema: schema, 526 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 527 }) 528 if err != nil { 529 return err 530 } 531 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 532 if err != nil { 533 return err 534 } 535 did2 := atkey2.DIDKey() 536 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 537 if err != nil { 538 return err 539 } 540 err = mod.UpdateIdentity(&model.Identity{ 541 ID: intermittentMediaSigner.Pub().String(), 542 Handle: "stream-intermittent-tester", 543 DID: "", 544 }) 545 if err != nil { 546 return err 547 } 548 cli.AllowedStreams = append(cli.AllowedStreams, did2) 549 a.Aliases["intermittent-self-test"] = did2 550 551 group.Go(func() error { 552 for { 553 // Start intermittent stream 554 intermittentCtx, cancel := context.WithCancel(ctx) 555 done := make(chan struct{}) 556 go func() { 557 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 558 close(done) 559 }() 560 // Stream ON for 15 seconds 561 time.Sleep(15 * time.Second) 562 // Stop stream 563 cancel() 564 <-done // Wait for TestSource to exit 565 // Stream OFF for 15 seconds 566 time.Sleep(15 * time.Second) 567 } 568 }) 569 } 570 571 for _, job := range platformJobs { 572 group.Go(func() error { 573 return job(ctx, &cli) 574 }) 575 } 576 577 if cli.WHIPTest != "" { 578 group.Go(func() error { 579 err := WHIP(strings.Split(cli.WHIPTest, " ")) 580 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 581 time.Sleep(time.Second * 3) 582 // gst.Deinit() 583 log.Warn(ctx, "gst deinit complete, exiting") 584 return err 585 }) 586 } 587 588 return group.Wait() 589} 590 591var ErrCaughtSignal = errors.New("caught signal") 592 593func handleSignals(ctx context.Context) error { 594 c := make(chan os.Signal, 1) 595 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 596 for { 597 select { 598 case s := <-c: 599 if s == syscall.SIGABRT { 600 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 601 log.Error(ctx, "failed to create pprof", "error", err) 602 } 603 } 604 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 605 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 606 case <-ctx.Done(): 607 return nil 608 } 609 } 610}