Live video on the AT Protocol
at eli/bump-xcode 569 lines 15 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 20 "github.com/peterbourgon/ff/v3" 21 "github.com/streamplace/oatproxy/pkg/oatproxy" 22 "golang.org/x/term" 23 "stream.place/streamplace/pkg/aqhttp" 24 "stream.place/streamplace/pkg/atproto" 25 "stream.place/streamplace/pkg/bus" 26 "stream.place/streamplace/pkg/crypto/signers" 27 "stream.place/streamplace/pkg/crypto/signers/eip712" 28 "stream.place/streamplace/pkg/director" 29 "stream.place/streamplace/pkg/log" 30 "stream.place/streamplace/pkg/media" 31 "stream.place/streamplace/pkg/notifications" 32 "stream.place/streamplace/pkg/replication" 33 "stream.place/streamplace/pkg/replication/boring" 34 "stream.place/streamplace/pkg/resync" 35 "stream.place/streamplace/pkg/rtmps" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 "stream.place/streamplace/pkg/spmetrics" 38 39 "github.com/ThalesGroup/crypto11" 40 _ "github.com/go-gst/go-glib/glib" 41 _ "github.com/go-gst/go-gst/gst" 42 "stream.place/streamplace/pkg/api" 43 "stream.place/streamplace/pkg/config" 44 "stream.place/streamplace/pkg/model" 45) 46 47// Additional jobs that can be injected by platforms 48type jobFunc func(ctx context.Context, cli *config.CLI) error 49 50// parse the CLI and fire up an streamplace node! 51func start(build *config.BuildFlags, platformJobs []jobFunc) error { 52 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 53 err := media.RunSelfTest(context.Background()) 54 if err != nil { 55 if selfTest { 56 fmt.Println(err.Error()) 57 os.Exit(1) 58 } else { 59 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 60 if retryCount >= 3 { 61 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 62 return err 63 } 64 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 65 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 66 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 67 if err != nil { 68 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 69 return err 70 } 71 panic("invalid code path: exec succeeded but we're still here???") 72 } 73 } 74 if selfTest { 75 runtime.GC() 76 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 77 log.Error(context.Background(), "error creating pprof", "error", err) 78 } 79 fmt.Println("self-test successful!") 80 os.Exit(0) 81 } 82 83 if len(os.Args) > 1 && os.Args[1] == "stream" { 84 if len(os.Args) != 3 { 85 fmt.Println("usage: streamplace stream [user]") 86 os.Exit(1) 87 } 88 return Stream(os.Args[2]) 89 } 90 91 if len(os.Args) > 1 && os.Args[1] == "live" { 92 cli := config.CLI{Build: build} 93 fs := cli.NewFlagSet("streamplace live") 94 95 err := cli.Parse(fs, os.Args[2:]) 96 if err != nil { 97 return err 98 } 99 100 args := fs.Args() 101 if len(args) != 1 { 102 fmt.Println("usage: streamplace live [flags] [stream-key]") 103 os.Exit(1) 104 } 105 106 return Live(args[0], cli.HTTPInternalAddr) 107 } 108 109 if len(os.Args) > 1 && os.Args[1] == "sign" { 110 return Sign(context.Background()) 111 } 112 113 if len(os.Args) > 1 && os.Args[1] == "whep" { 114 return WHEP(os.Args[2:]) 115 } 116 if len(os.Args) > 1 && os.Args[1] == "whip" { 117 return WHIP(os.Args[2:]) 118 } 119 120 if len(os.Args) > 1 && os.Args[1] == "clip" { 121 cli := config.CLI{Build: build} 122 fs := cli.NewFlagSet("streamplace clip") 123 out := fs.String("out", "", "output file") 124 125 err := cli.Parse(fs, os.Args[2:]) 126 if err != nil { 127 return err 128 } 129 ctx := context.Background() 130 ctx = log.WithDebugValue(ctx, cli.Debug) 131 return Clip(ctx, fs.Args(), *out) 132 } 133 134 if len(os.Args) > 1 && os.Args[1] == "self-test" { 135 err := media.RunSelfTest(context.Background()) 136 if err != nil { 137 fmt.Println(err.Error()) 138 os.Exit(1) 139 } 140 fmt.Println("self-test successful!") 141 os.Exit(0) 142 } 143 144 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 145 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 146 _ = starter.NewLivepeerConfig(lpfs) 147 err = ff.Parse(lpfs, os.Args[2:], 148 ff.WithConfigFileFlag("config"), 149 ff.WithEnvVarPrefix("LP"), 150 ) 151 if err != nil { 152 return err 153 } 154 err = GoLivepeer(context.Background(), lpfs) 155 if err != nil { 156 log.Error(context.Background(), "error in livepeer", "error", err) 157 os.Exit(1) 158 } 159 os.Exit(0) 160 } 161 162 _ = flag.Set("logtostderr", "true") 163 vFlag := flag.Lookup("v") 164 cli := config.CLI{Build: build} 165 fs := cli.NewFlagSet("streamplace") 166 verbosity := fs.String("v", "3", "log verbosity level") 167 version := fs.Bool("version", false, "print version and exit") 168 169 err = cli.Parse( 170 fs, os.Args[1:], 171 ) 172 if err != nil { 173 return err 174 } 175 err = flag.CommandLine.Parse(nil) 176 if err != nil { 177 return err 178 } 179 _ = vFlag.Value.Set(*verbosity) 180 log.SetColorLogger(cli.Color) 181 ctx := context.Background() 182 ctx = log.WithDebugValue(ctx, cli.Debug) 183 184 log.Log(ctx, 185 "streamplace", 186 "version", build.Version, 187 "buildTime", build.BuildTimeStr(), 188 "uuid", build.UUID, 189 "runtime.GOOS", runtime.GOOS, 190 "runtime.GOARCH", runtime.GOARCH, 191 "runtime.Version", runtime.Version()) 192 if *version { 193 return nil 194 } 195 if cli.LivepeerHelp { 196 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 197 _ = starter.NewLivepeerConfig(lpFlags) 198 lpFlags.VisitAll(func(f *flag.Flag) { 199 adapted := config.ToSnakeCase(f.Name) 200 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 201 usage := fmt.Sprintf(" %s", f.Usage) 202 if f.DefValue != "" { 203 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 204 } 205 fmt.Printf(" %s\n", usage) 206 }) 207 return nil 208 } 209 210 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 211 if len(os.Args) > 1 && os.Args[1] == "resync" { 212 return resync.Resync(ctx, &cli) 213 } 214 215 err = os.MkdirAll(cli.DataDir, os.ModePerm) 216 if err != nil { 217 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 218 } 219 schema, err := v0.MakeV0Schema() 220 if err != nil { 221 return err 222 } 223 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 224 Schema: schema, 225 EthKeystorePath: cli.EthKeystorePath, 226 EthAccountAddr: cli.EthAccountAddr, 227 EthKeystorePassword: cli.EthPassword, 228 }) 229 if err != nil { 230 return err 231 } 232 var signer crypto.Signer = eip712signer 233 if cli.PKCS11ModulePath != "" { 234 conf := &crypto11.Config{ 235 Path: cli.PKCS11ModulePath, 236 } 237 count := 0 238 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 239 if val != "" { 240 count += 1 241 } 242 } 243 if count != 1 { 244 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 245 } 246 if cli.PKCS11TokenSlot != "" { 247 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 248 if err != nil { 249 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 250 } 251 numint := int(num) 252 // why does crypto11 want this as a reference? odd. 253 conf.SlotNumber = &numint 254 } 255 if cli.PKCS11TokenLabel != "" { 256 conf.TokenLabel = cli.PKCS11TokenLabel 257 } 258 if cli.PKCS11TokenSerial != "" { 259 conf.TokenSerial = cli.PKCS11TokenSerial 260 } 261 pin := cli.PKCS11Pin 262 if pin == "" { 263 fmt.Printf("Please enter PKCS11 PIN: ") 264 password, err := term.ReadPassword(int(os.Stdin.Fd())) 265 fmt.Println("") 266 if err != nil { 267 return fmt.Errorf("error reading PKCS11 password: %w", err) 268 } 269 pin = string(password) 270 } 271 conf.Pin = pin 272 273 sc, err := crypto11.Configure(conf) 274 if err != nil { 275 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 276 } 277 var id []byte = nil 278 var label []byte = nil 279 if cli.PKCS11KeypairID != "" { 280 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 281 if err != nil { 282 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 283 } 284 id = []byte{byte(num)} 285 } 286 if cli.PKCS11KeypairLabel != "" { 287 label = []byte(cli.PKCS11KeypairLabel) 288 } 289 hwsigner, err := sc.FindKeyPair(id, label) 290 if err != nil { 291 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 292 } 293 if hwsigner == nil { 294 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 295 } 296 addr, err := signers.HexAddrFromSigner(hwsigner) 297 if err != nil { 298 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 299 } 300 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 301 signer = hwsigner 302 } 303 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 304 mod, err := model.MakeDB(cli.DBPath) 305 if err != nil { 306 return err 307 } 308 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod) 309 if err != nil { 310 return err 311 } 312 defer handle.Close() 313 var noter notifications.FirebaseNotifier 314 if cli.FirebaseServiceAccount != "" { 315 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 316 if err != nil { 317 return err 318 } 319 } 320 321 jwkPath := cli.DataFilePath([]string{"jwk.json"}) 322 jwk, err := atproto.EnsureJWK(ctx, jwkPath) 323 if err != nil { 324 return err 325 } 326 cli.JWK = jwk 327 328 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"}) 329 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath) 330 if err != nil { 331 return err 332 } 333 cli.AccessJWK = accessJWK 334 335 b := bus.NewBus() 336 atsync := &atproto.ATProtoSynchronizer{ 337 CLI: &cli, 338 Model: mod, 339 Noter: noter, 340 Bus: b, 341 } 342 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 343 if err != nil { 344 return err 345 } 346 347 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 348 if err != nil { 349 return err 350 } 351 352 clientMetadata := &oatproxy.OAuthClientMetadata{ 353 Scope: "atproto transition:generic", 354 ClientName: "Streamplace", 355 RedirectURIs: []string{ 356 fmt.Sprintf("https://%s/login", cli.PublicHost), 357 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 358 }, 359 } 360 361 op := oatproxy.New(&oatproxy.Config{ 362 Host: cli.PublicHost, 363 CreateOAuthSession: mod.CreateOAuthSession, 364 UpdateOAuthSession: mod.UpdateOAuthSession, 365 GetOAuthSession: mod.LoadOAuthSession, 366 Scope: "atproto transition:generic", 367 UpstreamJWK: cli.JWK, 368 DownstreamJWK: cli.AccessJWK, 369 ClientMetadata: clientMetadata, 370 }) 371 d := director.NewDirector(mm, mod, &cli, b, op) 372 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op) 373 if err != nil { 374 return err 375 } 376 377 group, ctx := TimeoutGroupWithContext(ctx) 378 ctx = log.WithLogValues(ctx, "version", build.Version) 379 380 group.Go(func() error { 381 return handleSignals(ctx) 382 }) 383 384 if cli.TracingEndpoint != "" { 385 group.Go(func() error { 386 return startTelemetry(ctx, cli.TracingEndpoint) 387 }) 388 } 389 390 if cli.Secure { 391 group.Go(func() error { 392 return a.ServeHTTPS(ctx) 393 }) 394 group.Go(func() error { 395 return a.ServeHTTPRedirect(ctx) 396 }) 397 if cli.RTMPServerAddon != "" { 398 group.Go(func() error { 399 return rtmps.ServeRTMPS(ctx, &cli) 400 }) 401 } 402 } else { 403 group.Go(func() error { 404 return a.ServeHTTP(ctx) 405 }) 406 } 407 408 group.Go(func() error { 409 return a.ServeInternalHTTP(ctx) 410 }) 411 412 if !cli.NoFirehose { 413 group.Go(func() error { 414 return atsync.StartFirehose(ctx) 415 }) 416 } 417 for _, labeler := range cli.Labelers { 418 group.Go(func() error { 419 return atsync.StartLabelerFirehose(ctx, labeler) 420 }) 421 } 422 423 group.Go(func() error { 424 return spmetrics.ExpireSessions(ctx) 425 }) 426 427 group.Go(func() error { 428 return mod.StartSegmentCleaner(ctx) 429 }) 430 431 if cli.LivepeerGateway { 432 // make a file to make sure the directory exists 433 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 434 if err != nil { 435 return err 436 } 437 fd.Close() 438 if err != nil { 439 return err 440 } 441 group.Go(func() error { 442 return GoLivepeer(ctx, fs) 443 }) 444 } 445 446 group.Go(func() error { 447 return d.Start(ctx) 448 }) 449 450 if cli.TestStream { 451 // regular stream self-test 452 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 453 Schema: schema, 454 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 455 }) 456 if err != nil { 457 return err 458 } 459 atkey, err := atproto.ParsePubKey(signer.Public()) 460 if err != nil { 461 return err 462 } 463 did := atkey.DIDKey() 464 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 465 if err != nil { 466 return err 467 } 468 err = mod.UpdateIdentity(&model.Identity{ 469 ID: testMediaSigner.Pub().String(), 470 Handle: "stream-self-tester", 471 DID: "", 472 }) 473 if err != nil { 474 return err 475 } 476 cli.AllowedStreams = append(cli.AllowedStreams, did) 477 a.Aliases["self-test"] = did 478 group.Go(func() error { 479 return mm.TestSource(ctx, testMediaSigner) 480 }) 481 482 // Start a test stream that will run intermittently 483 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 484 Schema: schema, 485 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 486 }) 487 if err != nil { 488 return err 489 } 490 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 491 if err != nil { 492 return err 493 } 494 did2 := atkey2.DIDKey() 495 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 496 if err != nil { 497 return err 498 } 499 err = mod.UpdateIdentity(&model.Identity{ 500 ID: intermittentMediaSigner.Pub().String(), 501 Handle: "stream-intermittent-tester", 502 DID: "", 503 }) 504 if err != nil { 505 return err 506 } 507 cli.AllowedStreams = append(cli.AllowedStreams, did2) 508 a.Aliases["intermittent-self-test"] = did2 509 510 group.Go(func() error { 511 for { 512 // Start intermittent stream 513 intermittentCtx, cancel := context.WithCancel(ctx) 514 done := make(chan struct{}) 515 go func() { 516 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 517 close(done) 518 }() 519 // Stream ON for 15 seconds 520 time.Sleep(15 * time.Second) 521 // Stop stream 522 cancel() 523 <-done // Wait for TestSource to exit 524 // Stream OFF for 15 seconds 525 time.Sleep(15 * time.Second) 526 } 527 }) 528 } 529 530 for _, job := range platformJobs { 531 group.Go(func() error { 532 return job(ctx, &cli) 533 }) 534 } 535 536 if cli.WHIPTest != "" { 537 group.Go(func() error { 538 err := WHIP(strings.Split(cli.WHIPTest, " ")) 539 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 540 time.Sleep(time.Second * 3) 541 // gst.Deinit() 542 log.Warn(ctx, "gst deinit complete, exiting") 543 return err 544 }) 545 } 546 547 return group.Wait() 548} 549 550var ErrCaughtSignal = errors.New("caught signal") 551 552func handleSignals(ctx context.Context) error { 553 c := make(chan os.Signal, 1) 554 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 555 for { 556 select { 557 case s := <-c: 558 if s == syscall.SIGABRT { 559 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 560 log.Error(ctx, "failed to create pprof", "error", err) 561 } 562 } 563 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 564 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 565 case <-ctx.Done(): 566 return nil 567 } 568 } 569}