Live video on the AT Protocol
at eli/postgres 591 lines 15 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/bluesky-social/indigo/carstore" 20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 21 "github.com/peterbourgon/ff/v3" 22 "github.com/streamplace/oatproxy/pkg/oatproxy" 23 "golang.org/x/term" 24 "stream.place/streamplace/pkg/aqhttp" 25 "stream.place/streamplace/pkg/atproto" 26 "stream.place/streamplace/pkg/bus" 27 "stream.place/streamplace/pkg/crypto/signers" 28 "stream.place/streamplace/pkg/crypto/signers/eip712" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/media" 32 "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/replication" 34 "stream.place/streamplace/pkg/replication/boring" 35 "stream.place/streamplace/pkg/resync" 36 "stream.place/streamplace/pkg/rtmps" 37 v0 "stream.place/streamplace/pkg/schema/v0" 38 "stream.place/streamplace/pkg/spmetrics" 39 "stream.place/streamplace/pkg/statedb" 40 41 "github.com/ThalesGroup/crypto11" 42 _ "github.com/go-gst/go-glib/glib" 43 _ "github.com/go-gst/go-gst/gst" 44 "stream.place/streamplace/pkg/api" 45 "stream.place/streamplace/pkg/config" 46 "stream.place/streamplace/pkg/model" 47) 48 49// Additional jobs that can be injected by platforms 50type jobFunc func(ctx context.Context, cli *config.CLI) error 51 52// parse the CLI and fire up an streamplace node! 53func start(build *config.BuildFlags, platformJobs []jobFunc) error { 54 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 55 err := media.RunSelfTest(context.Background()) 56 if err != nil { 57 if selfTest { 58 fmt.Println(err.Error()) 59 os.Exit(1) 60 } else { 61 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 62 if retryCount >= 3 { 63 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 64 return err 65 } 66 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 67 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 68 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 69 if err != nil { 70 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 71 return err 72 } 73 panic("invalid code path: exec succeeded but we're still here???") 74 } 75 } 76 if selfTest { 77 runtime.GC() 78 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 79 log.Error(context.Background(), "error creating pprof", "error", err) 80 } 81 fmt.Println("self-test successful!") 82 os.Exit(0) 83 } 84 85 if len(os.Args) > 1 && os.Args[1] == "stream" { 86 if len(os.Args) != 3 { 87 fmt.Println("usage: streamplace stream [user]") 88 os.Exit(1) 89 } 90 return Stream(os.Args[2]) 91 } 92 93 if len(os.Args) > 1 && os.Args[1] == "live" { 94 cli := config.CLI{Build: build} 95 fs := cli.NewFlagSet("streamplace live") 96 97 err := cli.Parse(fs, os.Args[2:]) 98 if err != nil { 99 return err 100 } 101 102 args := fs.Args() 103 if len(args) != 1 { 104 fmt.Println("usage: streamplace live [flags] [stream-key]") 105 os.Exit(1) 106 } 107 108 return Live(args[0], cli.HTTPInternalAddr) 109 } 110 111 if len(os.Args) > 1 && os.Args[1] == "sign" { 112 return Sign(context.Background()) 113 } 114 115 if len(os.Args) > 1 && os.Args[1] == "whep" { 116 return WHEP(os.Args[2:]) 117 } 118 if len(os.Args) > 1 && os.Args[1] == "whip" { 119 return WHIP(os.Args[2:]) 120 } 121 122 if len(os.Args) > 1 && os.Args[1] == "clip" { 123 cli := config.CLI{Build: build} 124 fs := cli.NewFlagSet("streamplace clip") 125 out := fs.String("out", "", "output file") 126 127 err := cli.Parse(fs, os.Args[2:]) 128 if err != nil { 129 return err 130 } 131 ctx := context.Background() 132 ctx = log.WithDebugValue(ctx, cli.Debug) 133 return Clip(ctx, fs.Args(), *out) 134 } 135 136 if len(os.Args) > 1 && os.Args[1] == "self-test" { 137 err := media.RunSelfTest(context.Background()) 138 if err != nil { 139 fmt.Println(err.Error()) 140 os.Exit(1) 141 } 142 fmt.Println("self-test successful!") 143 os.Exit(0) 144 } 145 146 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 147 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 148 _ = starter.NewLivepeerConfig(lpfs) 149 err = ff.Parse(lpfs, os.Args[2:], 150 ff.WithConfigFileFlag("config"), 151 ff.WithEnvVarPrefix("LP"), 152 ) 153 if err != nil { 154 return err 155 } 156 err = GoLivepeer(context.Background(), lpfs) 157 if err != nil { 158 log.Error(context.Background(), "error in livepeer", "error", err) 159 os.Exit(1) 160 } 161 os.Exit(0) 162 } 163 164 _ = flag.Set("logtostderr", "true") 165 vFlag := flag.Lookup("v") 166 cli := config.CLI{Build: build} 167 fs := cli.NewFlagSet("streamplace") 168 verbosity := fs.String("v", "3", "log verbosity level") 169 version := fs.Bool("version", false, "print version and exit") 170 171 err = cli.Parse( 172 fs, os.Args[1:], 173 ) 174 if err != nil { 175 return err 176 } 177 err = flag.CommandLine.Parse(nil) 178 if err != nil { 179 return err 180 } 181 _ = vFlag.Value.Set(*verbosity) 182 log.SetColorLogger(cli.Color) 183 ctx := context.Background() 184 ctx = log.WithDebugValue(ctx, cli.Debug) 185 186 log.Log(ctx, 187 "streamplace", 188 "version", build.Version, 189 "buildTime", build.BuildTimeStr(), 190 "uuid", build.UUID, 191 "runtime.GOOS", runtime.GOOS, 192 "runtime.GOARCH", runtime.GOARCH, 193 "runtime.Version", runtime.Version()) 194 if *version { 195 return nil 196 } 197 198 if len(os.Args) > 1 && os.Args[1] == "migrate" { 199 return statedb.Migrate(&cli) 200 } 201 202 spmetrics.Version.WithLabelValues(build.Version).Inc() 203 if cli.LivepeerHelp { 204 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 205 _ = starter.NewLivepeerConfig(lpFlags) 206 lpFlags.VisitAll(func(f *flag.Flag) { 207 adapted := config.ToSnakeCase(f.Name) 208 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 209 usage := fmt.Sprintf(" %s", f.Usage) 210 if f.DefValue != "" { 211 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 212 } 213 fmt.Printf(" %s\n", usage) 214 }) 215 return nil 216 } 217 218 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 219 if len(os.Args) > 1 && os.Args[1] == "resync" { 220 return resync.Resync(ctx, &cli) 221 } 222 223 err = os.MkdirAll(cli.DataDir, os.ModePerm) 224 if err != nil { 225 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 226 } 227 schema, err := v0.MakeV0Schema() 228 if err != nil { 229 return err 230 } 231 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 232 Schema: schema, 233 EthKeystorePath: cli.EthKeystorePath, 234 EthAccountAddr: cli.EthAccountAddr, 235 EthKeystorePassword: cli.EthPassword, 236 }) 237 if err != nil { 238 return err 239 } 240 var signer crypto.Signer = eip712signer 241 if cli.PKCS11ModulePath != "" { 242 conf := &crypto11.Config{ 243 Path: cli.PKCS11ModulePath, 244 } 245 count := 0 246 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 247 if val != "" { 248 count += 1 249 } 250 } 251 if count != 1 { 252 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 253 } 254 if cli.PKCS11TokenSlot != "" { 255 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 256 if err != nil { 257 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 258 } 259 numint := int(num) 260 // why does crypto11 want this as a reference? odd. 261 conf.SlotNumber = &numint 262 } 263 if cli.PKCS11TokenLabel != "" { 264 conf.TokenLabel = cli.PKCS11TokenLabel 265 } 266 if cli.PKCS11TokenSerial != "" { 267 conf.TokenSerial = cli.PKCS11TokenSerial 268 } 269 pin := cli.PKCS11Pin 270 if pin == "" { 271 fmt.Printf("Please enter PKCS11 PIN: ") 272 password, err := term.ReadPassword(int(os.Stdin.Fd())) 273 fmt.Println("") 274 if err != nil { 275 return fmt.Errorf("error reading PKCS11 password: %w", err) 276 } 277 pin = string(password) 278 } 279 conf.Pin = pin 280 281 sc, err := crypto11.Configure(conf) 282 if err != nil { 283 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 284 } 285 var id []byte = nil 286 var label []byte = nil 287 if cli.PKCS11KeypairID != "" { 288 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 289 if err != nil { 290 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 291 } 292 id = []byte{byte(num)} 293 } 294 if cli.PKCS11KeypairLabel != "" { 295 label = []byte(cli.PKCS11KeypairLabel) 296 } 297 hwsigner, err := sc.FindKeyPair(id, label) 298 if err != nil { 299 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 300 } 301 if hwsigner == nil { 302 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 303 } 304 addr, err := signers.HexAddrFromSigner(hwsigner) 305 if err != nil { 306 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 307 } 308 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 309 signer = hwsigner 310 } 311 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 312 313 mod, err := model.MakeDB(cli.IndexDBPath) 314 if err != nil { 315 return err 316 } 317 var noter notifications.FirebaseNotifier 318 if cli.FirebaseServiceAccount != "" { 319 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 320 if err != nil { 321 return err 322 } 323 } 324 out := carstore.SQLiteStore{} 325 err = out.Open(":memory:") 326 if err != nil { 327 return err 328 } 329 state, err := statedb.MakeDB(&cli, noter, mod) 330 if err != nil { 331 return err 332 } 333 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 334 if err != nil { 335 return err 336 } 337 defer handle.Close() 338 339 jwk, err := state.EnsureJWK(ctx, "jwk") 340 if err != nil { 341 return err 342 } 343 cli.JWK = jwk 344 345 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 346 if err != nil { 347 return err 348 } 349 cli.AccessJWK = accessJWK 350 351 b := bus.NewBus() 352 atsync := &atproto.ATProtoSynchronizer{ 353 CLI: &cli, 354 Model: mod, 355 StatefulDB: state, 356 Noter: noter, 357 Bus: b, 358 } 359 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 360 if err != nil { 361 return err 362 } 363 364 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 365 if err != nil { 366 return err 367 } 368 369 clientMetadata := &oatproxy.OAuthClientMetadata{ 370 Scope: "atproto transition:generic", 371 ClientName: "Streamplace", 372 RedirectURIs: []string{ 373 fmt.Sprintf("https://%s/login", cli.PublicHost), 374 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 375 }, 376 } 377 378 op := oatproxy.New(&oatproxy.Config{ 379 Host: cli.PublicHost, 380 CreateOAuthSession: state.CreateOAuthSession, 381 UpdateOAuthSession: state.UpdateOAuthSession, 382 GetOAuthSession: state.LoadOAuthSession, 383 Lock: state.GetNamedLock, 384 Scope: "atproto transition:generic", 385 UpstreamJWK: cli.JWK, 386 DownstreamJWK: cli.AccessJWK, 387 ClientMetadata: clientMetadata, 388 }) 389 d := director.NewDirector(mm, mod, &cli, b, op, state) 390 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 391 if err != nil { 392 return err 393 } 394 395 group, ctx := TimeoutGroupWithContext(ctx) 396 ctx = log.WithLogValues(ctx, "version", build.Version) 397 398 group.Go(func() error { 399 return handleSignals(ctx) 400 }) 401 402 group.Go(func() error { 403 return state.ProcessQueue(ctx) 404 }) 405 406 if cli.TracingEndpoint != "" { 407 group.Go(func() error { 408 return startTelemetry(ctx, cli.TracingEndpoint) 409 }) 410 } 411 412 if cli.Secure { 413 group.Go(func() error { 414 return a.ServeHTTPS(ctx) 415 }) 416 group.Go(func() error { 417 return a.ServeHTTPRedirect(ctx) 418 }) 419 if cli.RTMPServerAddon != "" { 420 group.Go(func() error { 421 return rtmps.ServeRTMPS(ctx, &cli) 422 }) 423 } 424 } else { 425 group.Go(func() error { 426 return a.ServeHTTP(ctx) 427 }) 428 } 429 430 group.Go(func() error { 431 return a.ServeInternalHTTP(ctx) 432 }) 433 434 if !cli.NoFirehose { 435 group.Go(func() error { 436 return atsync.StartFirehose(ctx) 437 }) 438 } 439 for _, labeler := range cli.Labelers { 440 group.Go(func() error { 441 return atsync.StartLabelerFirehose(ctx, labeler) 442 }) 443 } 444 445 group.Go(func() error { 446 return spmetrics.ExpireSessions(ctx) 447 }) 448 449 group.Go(func() error { 450 return mod.StartSegmentCleaner(ctx) 451 }) 452 453 if cli.LivepeerGateway { 454 // make a file to make sure the directory exists 455 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 456 if err != nil { 457 return err 458 } 459 fd.Close() 460 if err != nil { 461 return err 462 } 463 group.Go(func() error { 464 return GoLivepeer(ctx, fs) 465 }) 466 } 467 468 group.Go(func() error { 469 return d.Start(ctx) 470 }) 471 472 if cli.TestStream { 473 // regular stream self-test 474 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 475 Schema: schema, 476 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 477 }) 478 if err != nil { 479 return err 480 } 481 atkey, err := atproto.ParsePubKey(signer.Public()) 482 if err != nil { 483 return err 484 } 485 did := atkey.DIDKey() 486 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 487 if err != nil { 488 return err 489 } 490 err = mod.UpdateIdentity(&model.Identity{ 491 ID: testMediaSigner.Pub().String(), 492 Handle: "stream-self-tester", 493 DID: "", 494 }) 495 if err != nil { 496 return err 497 } 498 cli.AllowedStreams = append(cli.AllowedStreams, did) 499 a.Aliases["self-test"] = did 500 group.Go(func() error { 501 return mm.TestSource(ctx, testMediaSigner) 502 }) 503 504 // Start a test stream that will run intermittently 505 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 506 Schema: schema, 507 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 508 }) 509 if err != nil { 510 return err 511 } 512 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 513 if err != nil { 514 return err 515 } 516 did2 := atkey2.DIDKey() 517 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 518 if err != nil { 519 return err 520 } 521 err = mod.UpdateIdentity(&model.Identity{ 522 ID: intermittentMediaSigner.Pub().String(), 523 Handle: "stream-intermittent-tester", 524 DID: "", 525 }) 526 if err != nil { 527 return err 528 } 529 cli.AllowedStreams = append(cli.AllowedStreams, did2) 530 a.Aliases["intermittent-self-test"] = did2 531 532 group.Go(func() error { 533 for { 534 // Start intermittent stream 535 intermittentCtx, cancel := context.WithCancel(ctx) 536 done := make(chan struct{}) 537 go func() { 538 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 539 close(done) 540 }() 541 // Stream ON for 15 seconds 542 time.Sleep(15 * time.Second) 543 // Stop stream 544 cancel() 545 <-done // Wait for TestSource to exit 546 // Stream OFF for 15 seconds 547 time.Sleep(15 * time.Second) 548 } 549 }) 550 } 551 552 for _, job := range platformJobs { 553 group.Go(func() error { 554 return job(ctx, &cli) 555 }) 556 } 557 558 if cli.WHIPTest != "" { 559 group.Go(func() error { 560 err := WHIP(strings.Split(cli.WHIPTest, " ")) 561 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 562 time.Sleep(time.Second * 3) 563 // gst.Deinit() 564 log.Warn(ctx, "gst deinit complete, exiting") 565 return err 566 }) 567 } 568 569 return group.Wait() 570} 571 572var ErrCaughtSignal = errors.New("caught signal") 573 574func handleSignals(ctx context.Context) error { 575 c := make(chan os.Signal, 1) 576 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 577 for { 578 select { 579 case s := <-c: 580 if s == syscall.SIGABRT { 581 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 582 log.Error(ctx, "failed to create pprof", "error", err) 583 } 584 } 585 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 586 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 587 case <-ctx.Done(): 588 return nil 589 } 590 } 591}