Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.25 593 lines 15 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/bluesky-social/indigo/carstore" 20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 21 "github.com/peterbourgon/ff/v3" 22 "github.com/streamplace/oatproxy/pkg/oatproxy" 23 "golang.org/x/term" 24 "stream.place/streamplace/pkg/aqhttp" 25 "stream.place/streamplace/pkg/atproto" 26 "stream.place/streamplace/pkg/bus" 27 "stream.place/streamplace/pkg/crypto/signers" 28 "stream.place/streamplace/pkg/crypto/signers/eip712" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/media" 32 "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/replication" 34 "stream.place/streamplace/pkg/replication/boring" 35 "stream.place/streamplace/pkg/rtmps" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 "stream.place/streamplace/pkg/spmetrics" 38 "stream.place/streamplace/pkg/statedb" 39 40 "github.com/ThalesGroup/crypto11" 41 _ "github.com/go-gst/go-glib/glib" 42 _ "github.com/go-gst/go-gst/gst" 43 "stream.place/streamplace/pkg/api" 44 "stream.place/streamplace/pkg/config" 45 "stream.place/streamplace/pkg/model" 46) 47 48// Additional jobs that can be injected by platforms 49type jobFunc func(ctx context.Context, cli *config.CLI) error 50 51// parse the CLI and fire up an streamplace node! 52func start(build *config.BuildFlags, platformJobs []jobFunc) error { 53 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 54 err := media.RunSelfTest(context.Background()) 55 if err != nil { 56 if selfTest { 57 fmt.Println(err.Error()) 58 os.Exit(1) 59 } else { 60 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 61 if retryCount >= 3 { 62 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 63 return err 64 } 65 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 66 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 67 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 68 if err != nil { 69 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 70 return err 71 } 72 panic("invalid code path: exec succeeded but we're still here???") 73 } 74 } 75 if selfTest { 76 runtime.GC() 77 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 78 log.Error(context.Background(), "error creating pprof", "error", err) 79 } 80 fmt.Println("self-test successful!") 81 os.Exit(0) 82 } 83 84 if len(os.Args) > 1 && os.Args[1] == "stream" { 85 if len(os.Args) != 3 { 86 fmt.Println("usage: streamplace stream [user]") 87 os.Exit(1) 88 } 89 return Stream(os.Args[2]) 90 } 91 92 if len(os.Args) > 1 && os.Args[1] == "live" { 93 cli := config.CLI{Build: build} 94 fs := cli.NewFlagSet("streamplace live") 95 96 err := cli.Parse(fs, os.Args[2:]) 97 if err != nil { 98 return err 99 } 100 101 args := fs.Args() 102 if len(args) != 1 { 103 fmt.Println("usage: streamplace live [flags] [stream-key]") 104 os.Exit(1) 105 } 106 107 return Live(args[0], cli.HTTPInternalAddr) 108 } 109 110 if len(os.Args) > 1 && os.Args[1] == "sign" { 111 return Sign(context.Background()) 112 } 113 114 if len(os.Args) > 1 && os.Args[1] == "whep" { 115 return WHEP(os.Args[2:]) 116 } 117 if len(os.Args) > 1 && os.Args[1] == "whip" { 118 return WHIP(os.Args[2:]) 119 } 120 121 if len(os.Args) > 1 && os.Args[1] == "clip" { 122 cli := config.CLI{Build: build} 123 fs := cli.NewFlagSet("streamplace clip") 124 out := fs.String("out", "", "output file") 125 126 err := cli.Parse(fs, os.Args[2:]) 127 if err != nil { 128 return err 129 } 130 ctx := context.Background() 131 ctx = log.WithDebugValue(ctx, cli.Debug) 132 return Clip(ctx, fs.Args(), *out) 133 } 134 135 if len(os.Args) > 1 && os.Args[1] == "self-test" { 136 err := media.RunSelfTest(context.Background()) 137 if err != nil { 138 fmt.Println(err.Error()) 139 os.Exit(1) 140 } 141 fmt.Println("self-test successful!") 142 os.Exit(0) 143 } 144 145 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 146 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 147 _ = starter.NewLivepeerConfig(lpfs) 148 err = ff.Parse(lpfs, os.Args[2:], 149 ff.WithConfigFileFlag("config"), 150 ff.WithEnvVarPrefix("LP"), 151 ) 152 if err != nil { 153 return err 154 } 155 err = GoLivepeer(context.Background(), lpfs) 156 if err != nil { 157 log.Error(context.Background(), "error in livepeer", "error", err) 158 os.Exit(1) 159 } 160 os.Exit(0) 161 } 162 163 _ = flag.Set("logtostderr", "true") 164 vFlag := flag.Lookup("v") 165 cli := config.CLI{Build: build} 166 fs := cli.NewFlagSet("streamplace") 167 verbosity := fs.String("v", "3", "log verbosity level") 168 version := fs.Bool("version", false, "print version and exit") 169 170 err = cli.Parse( 171 fs, os.Args[1:], 172 ) 173 if err != nil { 174 return err 175 } 176 err = flag.CommandLine.Parse(nil) 177 if err != nil { 178 return err 179 } 180 _ = vFlag.Value.Set(*verbosity) 181 log.SetColorLogger(cli.Color) 182 ctx := context.Background() 183 ctx = log.WithDebugValue(ctx, cli.Debug) 184 185 log.Log(ctx, 186 "streamplace", 187 "version", build.Version, 188 "buildTime", build.BuildTimeStr(), 189 "uuid", build.UUID, 190 "runtime.GOOS", runtime.GOOS, 191 "runtime.GOARCH", runtime.GOARCH, 192 "runtime.Version", runtime.Version()) 193 if *version { 194 return nil 195 } 196 197 if len(os.Args) > 1 && os.Args[1] == "migrate" { 198 return statedb.Migrate(&cli) 199 } 200 201 spmetrics.Version.WithLabelValues(build.Version).Inc() 202 if cli.LivepeerHelp { 203 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 204 _ = starter.NewLivepeerConfig(lpFlags) 205 lpFlags.VisitAll(func(f *flag.Flag) { 206 adapted := config.ToSnakeCase(f.Name) 207 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 208 usage := fmt.Sprintf(" %s", f.Usage) 209 if f.DefValue != "" { 210 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 211 } 212 fmt.Printf(" %s\n", usage) 213 }) 214 return nil 215 } 216 217 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 218 219 err = os.MkdirAll(cli.DataDir, os.ModePerm) 220 if err != nil { 221 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 222 } 223 schema, err := v0.MakeV0Schema() 224 if err != nil { 225 return err 226 } 227 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 228 Schema: schema, 229 EthKeystorePath: cli.EthKeystorePath, 230 EthAccountAddr: cli.EthAccountAddr, 231 EthKeystorePassword: cli.EthPassword, 232 }) 233 if err != nil { 234 return err 235 } 236 var signer crypto.Signer = eip712signer 237 if cli.PKCS11ModulePath != "" { 238 conf := &crypto11.Config{ 239 Path: cli.PKCS11ModulePath, 240 } 241 count := 0 242 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 243 if val != "" { 244 count += 1 245 } 246 } 247 if count != 1 { 248 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 249 } 250 if cli.PKCS11TokenSlot != "" { 251 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 252 if err != nil { 253 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 254 } 255 numint := int(num) 256 // why does crypto11 want this as a reference? odd. 257 conf.SlotNumber = &numint 258 } 259 if cli.PKCS11TokenLabel != "" { 260 conf.TokenLabel = cli.PKCS11TokenLabel 261 } 262 if cli.PKCS11TokenSerial != "" { 263 conf.TokenSerial = cli.PKCS11TokenSerial 264 } 265 pin := cli.PKCS11Pin 266 if pin == "" { 267 fmt.Printf("Please enter PKCS11 PIN: ") 268 password, err := term.ReadPassword(int(os.Stdin.Fd())) 269 fmt.Println("") 270 if err != nil { 271 return fmt.Errorf("error reading PKCS11 password: %w", err) 272 } 273 pin = string(password) 274 } 275 conf.Pin = pin 276 277 sc, err := crypto11.Configure(conf) 278 if err != nil { 279 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 280 } 281 var id []byte = nil 282 var label []byte = nil 283 if cli.PKCS11KeypairID != "" { 284 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 285 if err != nil { 286 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 287 } 288 id = []byte{byte(num)} 289 } 290 if cli.PKCS11KeypairLabel != "" { 291 label = []byte(cli.PKCS11KeypairLabel) 292 } 293 hwsigner, err := sc.FindKeyPair(id, label) 294 if err != nil { 295 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 296 } 297 if hwsigner == nil { 298 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 299 } 300 addr, err := signers.HexAddrFromSigner(hwsigner) 301 if err != nil { 302 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 303 } 304 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 305 signer = hwsigner 306 } 307 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 308 309 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 310 if err != nil { 311 return err 312 } 313 var noter notifications.FirebaseNotifier 314 if cli.FirebaseServiceAccount != "" { 315 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 316 if err != nil { 317 return err 318 } 319 } 320 321 out := carstore.SQLiteStore{} 322 err = out.Open(":memory:") 323 if err != nil { 324 return err 325 } 326 state, err := statedb.MakeDB(&cli, noter, mod) 327 if err != nil { 328 return err 329 } 330 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 331 if err != nil { 332 return err 333 } 334 defer handle.Close() 335 336 jwk, err := state.EnsureJWK(ctx, "jwk") 337 if err != nil { 338 return err 339 } 340 cli.JWK = jwk 341 342 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 343 if err != nil { 344 return err 345 } 346 cli.AccessJWK = accessJWK 347 348 b := bus.NewBus() 349 atsync := &atproto.ATProtoSynchronizer{ 350 CLI: &cli, 351 Model: mod, 352 StatefulDB: state, 353 Noter: noter, 354 Bus: b, 355 } 356 err = atsync.Migrate(ctx) 357 if err != nil { 358 return fmt.Errorf("failed to migrate: %w", err) 359 } 360 361 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 362 if err != nil { 363 return err 364 } 365 366 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 367 if err != nil { 368 return err 369 } 370 371 clientMetadata := &oatproxy.OAuthClientMetadata{ 372 Scope: "atproto transition:generic", 373 ClientName: "Streamplace", 374 RedirectURIs: []string{ 375 fmt.Sprintf("https://%s/login", cli.PublicHost), 376 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 377 }, 378 } 379 380 op := oatproxy.New(&oatproxy.Config{ 381 Host: cli.PublicHost, 382 CreateOAuthSession: state.CreateOAuthSession, 383 UpdateOAuthSession: state.UpdateOAuthSession, 384 GetOAuthSession: state.LoadOAuthSession, 385 Lock: state.GetNamedLock, 386 Scope: "atproto transition:generic", 387 UpstreamJWK: cli.JWK, 388 DownstreamJWK: cli.AccessJWK, 389 ClientMetadata: clientMetadata, 390 }) 391 d := director.NewDirector(mm, mod, &cli, b, op, state) 392 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 393 if err != nil { 394 return err 395 } 396 397 group, ctx := TimeoutGroupWithContext(ctx) 398 ctx = log.WithLogValues(ctx, "version", build.Version) 399 400 group.Go(func() error { 401 return handleSignals(ctx) 402 }) 403 404 group.Go(func() error { 405 return state.ProcessQueue(ctx) 406 }) 407 408 if cli.TracingEndpoint != "" { 409 group.Go(func() error { 410 return startTelemetry(ctx, cli.TracingEndpoint) 411 }) 412 } 413 414 if cli.Secure { 415 group.Go(func() error { 416 return a.ServeHTTPS(ctx) 417 }) 418 group.Go(func() error { 419 return a.ServeHTTPRedirect(ctx) 420 }) 421 if cli.RTMPServerAddon != "" { 422 group.Go(func() error { 423 return rtmps.ServeRTMPS(ctx, &cli) 424 }) 425 } 426 } else { 427 group.Go(func() error { 428 return a.ServeHTTP(ctx) 429 }) 430 } 431 432 group.Go(func() error { 433 return a.ServeInternalHTTP(ctx) 434 }) 435 436 if !cli.NoFirehose { 437 group.Go(func() error { 438 return atsync.StartFirehose(ctx) 439 }) 440 } 441 for _, labeler := range cli.Labelers { 442 group.Go(func() error { 443 return atsync.StartLabelerFirehose(ctx, labeler) 444 }) 445 } 446 447 group.Go(func() error { 448 return spmetrics.ExpireSessions(ctx) 449 }) 450 451 group.Go(func() error { 452 return mod.StartSegmentCleaner(ctx) 453 }) 454 455 if cli.LivepeerGateway { 456 // make a file to make sure the directory exists 457 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 458 if err != nil { 459 return err 460 } 461 fd.Close() 462 if err != nil { 463 return err 464 } 465 group.Go(func() error { 466 return GoLivepeer(ctx, fs) 467 }) 468 } 469 470 group.Go(func() error { 471 return d.Start(ctx) 472 }) 473 474 if cli.TestStream { 475 // regular stream self-test 476 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 477 Schema: schema, 478 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 479 }) 480 if err != nil { 481 return err 482 } 483 atkey, err := atproto.ParsePubKey(signer.Public()) 484 if err != nil { 485 return err 486 } 487 did := atkey.DIDKey() 488 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 489 if err != nil { 490 return err 491 } 492 err = mod.UpdateIdentity(&model.Identity{ 493 ID: testMediaSigner.Pub().String(), 494 Handle: "stream-self-tester", 495 DID: "", 496 }) 497 if err != nil { 498 return err 499 } 500 cli.AllowedStreams = append(cli.AllowedStreams, did) 501 a.Aliases["self-test"] = did 502 group.Go(func() error { 503 return mm.TestSource(ctx, testMediaSigner) 504 }) 505 506 // Start a test stream that will run intermittently 507 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 508 Schema: schema, 509 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 510 }) 511 if err != nil { 512 return err 513 } 514 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 515 if err != nil { 516 return err 517 } 518 did2 := atkey2.DIDKey() 519 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 520 if err != nil { 521 return err 522 } 523 err = mod.UpdateIdentity(&model.Identity{ 524 ID: intermittentMediaSigner.Pub().String(), 525 Handle: "stream-intermittent-tester", 526 DID: "", 527 }) 528 if err != nil { 529 return err 530 } 531 cli.AllowedStreams = append(cli.AllowedStreams, did2) 532 a.Aliases["intermittent-self-test"] = did2 533 534 group.Go(func() error { 535 for { 536 // Start intermittent stream 537 intermittentCtx, cancel := context.WithCancel(ctx) 538 done := make(chan struct{}) 539 go func() { 540 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 541 close(done) 542 }() 543 // Stream ON for 15 seconds 544 time.Sleep(15 * time.Second) 545 // Stop stream 546 cancel() 547 <-done // Wait for TestSource to exit 548 // Stream OFF for 15 seconds 549 time.Sleep(15 * time.Second) 550 } 551 }) 552 } 553 554 for _, job := range platformJobs { 555 group.Go(func() error { 556 return job(ctx, &cli) 557 }) 558 } 559 560 if cli.WHIPTest != "" { 561 group.Go(func() error { 562 err := WHIP(strings.Split(cli.WHIPTest, " ")) 563 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 564 time.Sleep(time.Second * 3) 565 // gst.Deinit() 566 log.Warn(ctx, "gst deinit complete, exiting") 567 return err 568 }) 569 } 570 571 return group.Wait() 572} 573 574var ErrCaughtSignal = errors.New("caught signal") 575 576func handleSignals(ctx context.Context) error { 577 c := make(chan os.Signal, 1) 578 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 579 for { 580 select { 581 case s := <-c: 582 if s == syscall.SIGABRT { 583 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 584 log.Error(ctx, "failed to create pprof", "error", err) 585 } 586 } 587 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 588 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 589 case <-ctx.Done(): 590 return nil 591 } 592 } 593}