Live video on the AT Protocol
at eli/handle-changes 596 lines 16 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/bluesky-social/indigo/carstore" 20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 21 "github.com/peterbourgon/ff/v3" 22 "github.com/streamplace/oatproxy/pkg/oatproxy" 23 "golang.org/x/term" 24 "stream.place/streamplace/pkg/aqhttp" 25 "stream.place/streamplace/pkg/atproto" 26 "stream.place/streamplace/pkg/bus" 27 "stream.place/streamplace/pkg/crypto/signers" 28 "stream.place/streamplace/pkg/crypto/signers/eip712" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/media" 32 "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/replication" 34 "stream.place/streamplace/pkg/replication/boring" 35 "stream.place/streamplace/pkg/rtmps" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 "stream.place/streamplace/pkg/spmetrics" 38 "stream.place/streamplace/pkg/statedb" 39 40 "github.com/ThalesGroup/crypto11" 41 _ "github.com/go-gst/go-glib/glib" 42 _ "github.com/go-gst/go-gst/gst" 43 "stream.place/streamplace/pkg/api" 44 "stream.place/streamplace/pkg/config" 45 "stream.place/streamplace/pkg/model" 46 47 _ "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 48) 49 50// Additional jobs that can be injected by platforms 51type jobFunc func(ctx context.Context, cli *config.CLI) error 52 53// parse the CLI and fire up an streamplace node! 54func start(build *config.BuildFlags, platformJobs []jobFunc) error { 55 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 56 err := media.RunSelfTest(context.Background()) 57 if err != nil { 58 if selfTest { 59 fmt.Println(err.Error()) 60 os.Exit(1) 61 } else { 62 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 63 if retryCount >= 3 { 64 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 65 return err 66 } 67 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 68 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 69 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 70 if err != nil { 71 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 72 return err 73 } 74 panic("invalid code path: exec succeeded but we're still here???") 75 } 76 } 77 if selfTest { 78 runtime.GC() 79 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 80 log.Error(context.Background(), "error creating pprof", "error", err) 81 } 82 fmt.Println("self-test successful!") 83 os.Exit(0) 84 } 85 86 if len(os.Args) > 1 && os.Args[1] == "stream" { 87 if len(os.Args) != 3 { 88 fmt.Println("usage: streamplace stream [user]") 89 os.Exit(1) 90 } 91 return Stream(os.Args[2]) 92 } 93 94 if len(os.Args) > 1 && os.Args[1] == "live" { 95 cli := config.CLI{Build: build} 96 fs := cli.NewFlagSet("streamplace live") 97 98 err := cli.Parse(fs, os.Args[2:]) 99 if err != nil { 100 return err 101 } 102 103 args := fs.Args() 104 if len(args) != 1 { 105 fmt.Println("usage: streamplace live [flags] [stream-key]") 106 os.Exit(1) 107 } 108 109 return Live(args[0], cli.HTTPInternalAddr) 110 } 111 112 if len(os.Args) > 1 && os.Args[1] == "sign" { 113 return Sign(context.Background()) 114 } 115 116 if len(os.Args) > 1 && os.Args[1] == "whep" { 117 return WHEP(os.Args[2:]) 118 } 119 if len(os.Args) > 1 && os.Args[1] == "whip" { 120 return WHIP(os.Args[2:]) 121 } 122 123 if len(os.Args) > 1 && os.Args[1] == "clip" { 124 cli := config.CLI{Build: build} 125 fs := cli.NewFlagSet("streamplace clip") 126 out := fs.String("out", "", "output file") 127 128 err := cli.Parse(fs, os.Args[2:]) 129 if err != nil { 130 return err 131 } 132 ctx := context.Background() 133 ctx = log.WithDebugValue(ctx, cli.Debug) 134 return Clip(ctx, fs.Args(), *out) 135 } 136 137 if len(os.Args) > 1 && os.Args[1] == "self-test" { 138 err := media.RunSelfTest(context.Background()) 139 if err != nil { 140 fmt.Println(err.Error()) 141 os.Exit(1) 142 } 143 fmt.Println("self-test successful!") 144 os.Exit(0) 145 } 146 147 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 148 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 149 _ = starter.NewLivepeerConfig(lpfs) 150 err = ff.Parse(lpfs, os.Args[2:], 151 ff.WithConfigFileFlag("config"), 152 ff.WithEnvVarPrefix("LP"), 153 ) 154 if err != nil { 155 return err 156 } 157 err = GoLivepeer(context.Background(), lpfs) 158 if err != nil { 159 log.Error(context.Background(), "error in livepeer", "error", err) 160 os.Exit(1) 161 } 162 os.Exit(0) 163 } 164 165 _ = flag.Set("logtostderr", "true") 166 vFlag := flag.Lookup("v") 167 cli := config.CLI{Build: build} 168 fs := cli.NewFlagSet("streamplace") 169 verbosity := fs.String("v", "3", "log verbosity level") 170 version := fs.Bool("version", false, "print version and exit") 171 172 err = cli.Parse( 173 fs, os.Args[1:], 174 ) 175 if err != nil { 176 return err 177 } 178 err = flag.CommandLine.Parse(nil) 179 if err != nil { 180 return err 181 } 182 _ = vFlag.Value.Set(*verbosity) 183 log.SetColorLogger(cli.Color) 184 ctx := context.Background() 185 ctx = log.WithDebugValue(ctx, cli.Debug) 186 187 log.Log(ctx, 188 "streamplace", 189 "version", build.Version, 190 "buildTime", build.BuildTimeStr(), 191 "uuid", build.UUID, 192 "runtime.GOOS", runtime.GOOS, 193 "runtime.GOARCH", runtime.GOARCH, 194 "runtime.Version", runtime.Version()) 195 if *version { 196 return nil 197 } 198 199 if len(os.Args) > 1 && os.Args[1] == "migrate" { 200 return statedb.Migrate(&cli) 201 } 202 203 spmetrics.Version.WithLabelValues(build.Version).Inc() 204 if cli.LivepeerHelp { 205 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 206 _ = starter.NewLivepeerConfig(lpFlags) 207 lpFlags.VisitAll(func(f *flag.Flag) { 208 adapted := config.ToSnakeCase(f.Name) 209 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 210 usage := fmt.Sprintf(" %s", f.Usage) 211 if f.DefValue != "" { 212 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 213 } 214 fmt.Printf(" %s\n", usage) 215 }) 216 return nil 217 } 218 219 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 220 221 err = os.MkdirAll(cli.DataDir, os.ModePerm) 222 if err != nil { 223 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 224 } 225 schema, err := v0.MakeV0Schema() 226 if err != nil { 227 return err 228 } 229 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 230 Schema: schema, 231 EthKeystorePath: cli.EthKeystorePath, 232 EthAccountAddr: cli.EthAccountAddr, 233 EthKeystorePassword: cli.EthPassword, 234 }) 235 if err != nil { 236 return err 237 } 238 var signer crypto.Signer = eip712signer 239 if cli.PKCS11ModulePath != "" { 240 conf := &crypto11.Config{ 241 Path: cli.PKCS11ModulePath, 242 } 243 count := 0 244 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 245 if val != "" { 246 count += 1 247 } 248 } 249 if count != 1 { 250 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 251 } 252 if cli.PKCS11TokenSlot != "" { 253 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 254 if err != nil { 255 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 256 } 257 numint := int(num) 258 // why does crypto11 want this as a reference? odd. 259 conf.SlotNumber = &numint 260 } 261 if cli.PKCS11TokenLabel != "" { 262 conf.TokenLabel = cli.PKCS11TokenLabel 263 } 264 if cli.PKCS11TokenSerial != "" { 265 conf.TokenSerial = cli.PKCS11TokenSerial 266 } 267 pin := cli.PKCS11Pin 268 if pin == "" { 269 fmt.Printf("Please enter PKCS11 PIN: ") 270 password, err := term.ReadPassword(int(os.Stdin.Fd())) 271 fmt.Println("") 272 if err != nil { 273 return fmt.Errorf("error reading PKCS11 password: %w", err) 274 } 275 pin = string(password) 276 } 277 conf.Pin = pin 278 279 sc, err := crypto11.Configure(conf) 280 if err != nil { 281 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 282 } 283 var id []byte = nil 284 var label []byte = nil 285 if cli.PKCS11KeypairID != "" { 286 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 287 if err != nil { 288 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 289 } 290 id = []byte{byte(num)} 291 } 292 if cli.PKCS11KeypairLabel != "" { 293 label = []byte(cli.PKCS11KeypairLabel) 294 } 295 hwsigner, err := sc.FindKeyPair(id, label) 296 if err != nil { 297 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 298 } 299 if hwsigner == nil { 300 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 301 } 302 addr, err := signers.HexAddrFromSigner(hwsigner) 303 if err != nil { 304 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 305 } 306 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 307 signer = hwsigner 308 } 309 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 310 311 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 312 if err != nil { 313 return err 314 } 315 var noter notifications.FirebaseNotifier 316 if cli.FirebaseServiceAccount != "" { 317 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 318 if err != nil { 319 return err 320 } 321 } 322 323 group, ctx := TimeoutGroupWithContext(ctx) 324 325 out := carstore.SQLiteStore{} 326 err = out.Open(":memory:") 327 if err != nil { 328 return err 329 } 330 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 331 if err != nil { 332 return err 333 } 334 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 335 if err != nil { 336 return err 337 } 338 defer handle.Close() 339 340 jwk, err := state.EnsureJWK(ctx, "jwk") 341 if err != nil { 342 return err 343 } 344 cli.JWK = jwk 345 346 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 347 if err != nil { 348 return err 349 } 350 cli.AccessJWK = accessJWK 351 352 b := bus.NewBus() 353 atsync := &atproto.ATProtoSynchronizer{ 354 CLI: &cli, 355 Model: mod, 356 StatefulDB: state, 357 Noter: noter, 358 Bus: b, 359 } 360 err = atsync.Migrate(ctx) 361 if err != nil { 362 return fmt.Errorf("failed to migrate: %w", err) 363 } 364 365 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 366 if err != nil { 367 return err 368 } 369 370 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 371 if err != nil { 372 return err 373 } 374 375 clientMetadata := &oatproxy.OAuthClientMetadata{ 376 Scope: "atproto transition:generic", 377 ClientName: "Streamplace", 378 RedirectURIs: []string{ 379 fmt.Sprintf("https://%s/login", cli.PublicHost), 380 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 381 }, 382 } 383 384 op := oatproxy.New(&oatproxy.Config{ 385 Host: cli.PublicHost, 386 CreateOAuthSession: state.CreateOAuthSession, 387 UpdateOAuthSession: state.UpdateOAuthSession, 388 GetOAuthSession: state.LoadOAuthSession, 389 Lock: state.GetNamedLock, 390 Scope: "atproto transition:generic", 391 UpstreamJWK: cli.JWK, 392 DownstreamJWK: cli.AccessJWK, 393 ClientMetadata: clientMetadata, 394 }) 395 d := director.NewDirector(mm, mod, &cli, b, op, state) 396 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 397 if err != nil { 398 return err 399 } 400 401 ctx = log.WithLogValues(ctx, "version", build.Version) 402 403 group.Go(func() error { 404 return handleSignals(ctx) 405 }) 406 407 group.Go(func() error { 408 return state.ProcessQueue(ctx) 409 }) 410 411 if cli.TracingEndpoint != "" { 412 group.Go(func() error { 413 return startTelemetry(ctx, cli.TracingEndpoint) 414 }) 415 } 416 417 if cli.Secure { 418 group.Go(func() error { 419 return a.ServeHTTPS(ctx) 420 }) 421 group.Go(func() error { 422 return a.ServeHTTPRedirect(ctx) 423 }) 424 if cli.RTMPServerAddon != "" { 425 group.Go(func() error { 426 return rtmps.ServeRTMPS(ctx, &cli) 427 }) 428 } 429 } else { 430 group.Go(func() error { 431 return a.ServeHTTP(ctx) 432 }) 433 } 434 435 group.Go(func() error { 436 return a.ServeInternalHTTP(ctx) 437 }) 438 439 if !cli.NoFirehose { 440 group.Go(func() error { 441 return atsync.StartFirehose(ctx) 442 }) 443 } 444 for _, labeler := range cli.Labelers { 445 group.Go(func() error { 446 return atsync.StartLabelerFirehose(ctx, labeler) 447 }) 448 } 449 450 group.Go(func() error { 451 return spmetrics.ExpireSessions(ctx) 452 }) 453 454 group.Go(func() error { 455 return mod.StartSegmentCleaner(ctx) 456 }) 457 458 if cli.LivepeerGateway { 459 // make a file to make sure the directory exists 460 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 461 if err != nil { 462 return err 463 } 464 fd.Close() 465 if err != nil { 466 return err 467 } 468 group.Go(func() error { 469 return GoLivepeer(ctx, fs) 470 }) 471 } 472 473 group.Go(func() error { 474 return d.Start(ctx) 475 }) 476 477 if cli.TestStream { 478 // regular stream self-test 479 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 480 Schema: schema, 481 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 482 }) 483 if err != nil { 484 return err 485 } 486 atkey, err := atproto.ParsePubKey(signer.Public()) 487 if err != nil { 488 return err 489 } 490 did := atkey.DIDKey() 491 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 492 if err != nil { 493 return err 494 } 495 err = mod.UpdateIdentity(&model.Identity{ 496 ID: testMediaSigner.Pub().String(), 497 Handle: "stream-self-tester", 498 DID: "", 499 }) 500 if err != nil { 501 return err 502 } 503 cli.AllowedStreams = append(cli.AllowedStreams, did) 504 a.Aliases["self-test"] = did 505 group.Go(func() error { 506 return mm.TestSource(ctx, testMediaSigner) 507 }) 508 509 // Start a test stream that will run intermittently 510 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 511 Schema: schema, 512 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 513 }) 514 if err != nil { 515 return err 516 } 517 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 518 if err != nil { 519 return err 520 } 521 did2 := atkey2.DIDKey() 522 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 523 if err != nil { 524 return err 525 } 526 err = mod.UpdateIdentity(&model.Identity{ 527 ID: intermittentMediaSigner.Pub().String(), 528 Handle: "stream-intermittent-tester", 529 DID: "", 530 }) 531 if err != nil { 532 return err 533 } 534 cli.AllowedStreams = append(cli.AllowedStreams, did2) 535 a.Aliases["intermittent-self-test"] = did2 536 537 group.Go(func() error { 538 for { 539 // Start intermittent stream 540 intermittentCtx, cancel := context.WithCancel(ctx) 541 done := make(chan struct{}) 542 go func() { 543 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 544 close(done) 545 }() 546 // Stream ON for 15 seconds 547 time.Sleep(15 * time.Second) 548 // Stop stream 549 cancel() 550 <-done // Wait for TestSource to exit 551 // Stream OFF for 15 seconds 552 time.Sleep(15 * time.Second) 553 } 554 }) 555 } 556 557 for _, job := range platformJobs { 558 group.Go(func() error { 559 return job(ctx, &cli) 560 }) 561 } 562 563 if cli.WHIPTest != "" { 564 group.Go(func() error { 565 err := WHIP(strings.Split(cli.WHIPTest, " ")) 566 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 567 time.Sleep(time.Second * 3) 568 // gst.Deinit() 569 log.Warn(ctx, "gst deinit complete, exiting") 570 return err 571 }) 572 } 573 574 return group.Wait() 575} 576 577var ErrCaughtSignal = errors.New("caught signal") 578 579func handleSignals(ctx context.Context) error { 580 c := make(chan os.Signal, 1) 581 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 582 for { 583 select { 584 case s := <-c: 585 if s == syscall.SIGABRT { 586 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 587 log.Error(ctx, "failed to create pprof", "error", err) 588 } 589 } 590 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 591 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 592 case <-ctx.Done(): 593 return nil 594 } 595 } 596}