Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.28 596 lines 16 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/bluesky-social/indigo/carstore" 20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 21 "github.com/peterbourgon/ff/v3" 22 "github.com/streamplace/oatproxy/pkg/oatproxy" 23 "golang.org/x/term" 24 "stream.place/streamplace/pkg/aqhttp" 25 "stream.place/streamplace/pkg/atproto" 26 "stream.place/streamplace/pkg/bus" 27 "stream.place/streamplace/pkg/crypto/signers" 28 "stream.place/streamplace/pkg/crypto/signers/eip712" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/media" 32 "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/replication" 34 "stream.place/streamplace/pkg/replication/boring" 35 "stream.place/streamplace/pkg/rtmps" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 "stream.place/streamplace/pkg/spmetrics" 38 "stream.place/streamplace/pkg/statedb" 39 40 "github.com/ThalesGroup/crypto11" 41 _ "github.com/go-gst/go-glib/glib" 42 _ "github.com/go-gst/go-gst/gst" 43 "stream.place/streamplace/pkg/api" 44 "stream.place/streamplace/pkg/config" 45 "stream.place/streamplace/pkg/model" 46 47 _ "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 48) 49 50// Additional jobs that can be injected by platforms 51type jobFunc func(ctx context.Context, cli *config.CLI) error 52 53// parse the CLI and fire up an streamplace node! 54func start(build *config.BuildFlags, platformJobs []jobFunc) error { 55 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 56 err := media.RunSelfTest(context.Background()) 57 if err != nil { 58 if selfTest { 59 fmt.Println(err.Error()) 60 os.Exit(1) 61 } else { 62 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 63 if retryCount >= 3 { 64 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 65 return err 66 } 67 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 68 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 69 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 70 if err != nil { 71 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 72 return err 73 } 74 panic("invalid code path: exec succeeded but we're still here???") 75 } 76 } 77 if selfTest { 78 runtime.GC() 79 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 80 log.Error(context.Background(), "error creating pprof", "error", err) 81 } 82 fmt.Println("self-test successful!") 83 os.Exit(0) 84 } 85 86 if len(os.Args) > 1 && os.Args[1] == "stream" { 87 if len(os.Args) != 3 { 88 fmt.Println("usage: streamplace stream [user]") 89 os.Exit(1) 90 } 91 return Stream(os.Args[2]) 92 } 93 94 if len(os.Args) > 1 && os.Args[1] == "live" { 95 cli := config.CLI{Build: build} 96 fs := cli.NewFlagSet("streamplace live") 97 98 err := cli.Parse(fs, os.Args[2:]) 99 if err != nil { 100 return err 101 } 102 103 args := fs.Args() 104 if len(args) != 1 { 105 fmt.Println("usage: streamplace live [flags] [stream-key]") 106 os.Exit(1) 107 } 108 109 return Live(args[0], cli.HTTPInternalAddr) 110 } 111 112 if len(os.Args) > 1 && os.Args[1] == "sign" { 113 return Sign(context.Background()) 114 } 115 116 if len(os.Args) > 1 && os.Args[1] == "whep" { 117 return WHEP(os.Args[2:]) 118 } 119 if len(os.Args) > 1 && os.Args[1] == "whip" { 120 return WHIP(os.Args[2:]) 121 } 122 123 if len(os.Args) > 1 && os.Args[1] == "clip" { 124 cli := config.CLI{Build: build} 125 fs := cli.NewFlagSet("streamplace clip") 126 out := fs.String("out", "", "output file") 127 128 err := cli.Parse(fs, os.Args[2:]) 129 if err != nil { 130 return err 131 } 132 ctx := context.Background() 133 ctx = log.WithDebugValue(ctx, cli.Debug) 134 return Clip(ctx, fs.Args(), *out) 135 } 136 137 if len(os.Args) > 1 && os.Args[1] == "self-test" { 138 err := media.RunSelfTest(context.Background()) 139 if err != nil { 140 fmt.Println(err.Error()) 141 os.Exit(1) 142 } 143 fmt.Println("self-test successful!") 144 os.Exit(0) 145 } 146 147 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 148 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 149 _ = starter.NewLivepeerConfig(lpfs) 150 err = ff.Parse(lpfs, os.Args[2:], 151 ff.WithConfigFileFlag("config"), 152 ff.WithEnvVarPrefix("LP"), 153 ) 154 if err != nil { 155 return err 156 } 157 err = GoLivepeer(context.Background(), lpfs) 158 if err != nil { 159 log.Error(context.Background(), "error in livepeer", "error", err) 160 os.Exit(1) 161 } 162 os.Exit(0) 163 } 164 165 _ = flag.Set("logtostderr", "true") 166 vFlag := flag.Lookup("v") 167 cli := config.CLI{Build: build} 168 fs := cli.NewFlagSet("streamplace") 169 verbosity := fs.String("v", "3", "log verbosity level") 170 version := fs.Bool("version", false, "print version and exit") 171 172 err = cli.Parse( 173 fs, os.Args[1:], 174 ) 175 if err != nil { 176 return err 177 } 178 err = flag.CommandLine.Parse(nil) 179 if err != nil { 180 return err 181 } 182 _ = vFlag.Value.Set(*verbosity) 183 log.SetColorLogger(cli.Color) 184 ctx := context.Background() 185 ctx = log.WithDebugValue(ctx, cli.Debug) 186 187 log.Log(ctx, 188 "streamplace", 189 "version", build.Version, 190 "buildTime", build.BuildTimeStr(), 191 "uuid", build.UUID, 192 "runtime.GOOS", runtime.GOOS, 193 "runtime.GOARCH", runtime.GOARCH, 194 "runtime.Version", runtime.Version()) 195 if *version { 196 return nil 197 } 198 199 if len(os.Args) > 1 && os.Args[1] == "migrate" { 200 return statedb.Migrate(&cli) 201 } 202 203 spmetrics.Version.WithLabelValues(build.Version).Inc() 204 if cli.LivepeerHelp { 205 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 206 _ = starter.NewLivepeerConfig(lpFlags) 207 lpFlags.VisitAll(func(f *flag.Flag) { 208 adapted := config.ToSnakeCase(f.Name) 209 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 210 usage := fmt.Sprintf(" %s", f.Usage) 211 if f.DefValue != "" { 212 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 213 } 214 fmt.Printf(" %s\n", usage) 215 }) 216 return nil 217 } 218 219 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 220 221 err = os.MkdirAll(cli.DataDir, os.ModePerm) 222 if err != nil { 223 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 224 } 225 schema, err := v0.MakeV0Schema() 226 if err != nil { 227 return err 228 } 229 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 230 Schema: schema, 231 EthKeystorePath: cli.EthKeystorePath, 232 EthAccountAddr: cli.EthAccountAddr, 233 EthKeystorePassword: cli.EthPassword, 234 }) 235 if err != nil { 236 return err 237 } 238 var signer crypto.Signer = eip712signer 239 if cli.PKCS11ModulePath != "" { 240 conf := &crypto11.Config{ 241 Path: cli.PKCS11ModulePath, 242 } 243 count := 0 244 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 245 if val != "" { 246 count += 1 247 } 248 } 249 if count != 1 { 250 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 251 } 252 if cli.PKCS11TokenSlot != "" { 253 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 254 if err != nil { 255 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 256 } 257 numint := int(num) 258 // why does crypto11 want this as a reference? odd. 259 conf.SlotNumber = &numint 260 } 261 if cli.PKCS11TokenLabel != "" { 262 conf.TokenLabel = cli.PKCS11TokenLabel 263 } 264 if cli.PKCS11TokenSerial != "" { 265 conf.TokenSerial = cli.PKCS11TokenSerial 266 } 267 pin := cli.PKCS11Pin 268 if pin == "" { 269 fmt.Printf("Please enter PKCS11 PIN: ") 270 password, err := term.ReadPassword(int(os.Stdin.Fd())) 271 fmt.Println("") 272 if err != nil { 273 return fmt.Errorf("error reading PKCS11 password: %w", err) 274 } 275 pin = string(password) 276 } 277 conf.Pin = pin 278 279 sc, err := crypto11.Configure(conf) 280 if err != nil { 281 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 282 } 283 var id []byte = nil 284 var label []byte = nil 285 if cli.PKCS11KeypairID != "" { 286 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 287 if err != nil { 288 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 289 } 290 id = []byte{byte(num)} 291 } 292 if cli.PKCS11KeypairLabel != "" { 293 label = []byte(cli.PKCS11KeypairLabel) 294 } 295 hwsigner, err := sc.FindKeyPair(id, label) 296 if err != nil { 297 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 298 } 299 if hwsigner == nil { 300 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 301 } 302 addr, err := signers.HexAddrFromSigner(hwsigner) 303 if err != nil { 304 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 305 } 306 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 307 signer = hwsigner 308 } 309 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 310 311 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 312 if err != nil { 313 return err 314 } 315 var noter notifications.FirebaseNotifier 316 if cli.FirebaseServiceAccount != "" { 317 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 318 if err != nil { 319 return err 320 } 321 } 322 323 group, ctx := TimeoutGroupWithContext(ctx) 324 325 out := carstore.SQLiteStore{} 326 err = out.Open(":memory:") 327 if err != nil { 328 return err 329 } 330 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 331 if err != nil { 332 return err 333 } 334 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 335 if err != nil { 336 return err 337 } 338 defer handle.Close() 339 340 jwk, err := state.EnsureJWK(ctx, "jwk") 341 if err != nil { 342 return err 343 } 344 cli.JWK = jwk 345 346 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 347 if err != nil { 348 return err 349 } 350 cli.AccessJWK = accessJWK 351 352 b := bus.NewBus() 353 atsync := &atproto.ATProtoSynchronizer{ 354 CLI: &cli, 355 Model: mod, 356 StatefulDB: state, 357 Noter: noter, 358 Bus: b, 359 } 360 err = atsync.Migrate(ctx) 361 if err != nil { 362 return fmt.Errorf("failed to migrate: %w", err) 363 } 364 365 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 366 if err != nil { 367 return err 368 } 369 370 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 371 if err != nil { 372 return err 373 } 374 375 clientMetadata := &oatproxy.OAuthClientMetadata{ 376 Scope: "atproto transition:generic", 377 ClientName: "Streamplace", 378 RedirectURIs: []string{ 379 fmt.Sprintf("https://%s/login", cli.PublicHost), 380 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 381 }, 382 } 383 384 op := oatproxy.New(&oatproxy.Config{ 385 Host: cli.PublicHost, 386 CreateOAuthSession: state.CreateOAuthSession, 387 UpdateOAuthSession: state.UpdateOAuthSession, 388 GetOAuthSession: state.LoadOAuthSession, 389 Lock: state.GetNamedLock, 390 Scope: "atproto transition:generic", 391 UpstreamJWK: cli.JWK, 392 DownstreamJWK: cli.AccessJWK, 393 ClientMetadata: clientMetadata, 394 }) 395 d := director.NewDirector(mm, mod, &cli, b, op, state) 396 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 397 if err != nil { 398 return err 399 } 400 401 ctx = log.WithLogValues(ctx, "version", build.Version) 402 403 group.Go(func() error { 404 return handleSignals(ctx) 405 }) 406 407 group.Go(func() error { 408 return state.ProcessQueue(ctx) 409 }) 410 411 if cli.TracingEndpoint != "" { 412 group.Go(func() error { 413 return startTelemetry(ctx, cli.TracingEndpoint) 414 }) 415 } 416 417 if cli.Secure { 418 group.Go(func() error { 419 return a.ServeHTTPS(ctx) 420 }) 421 group.Go(func() error { 422 return a.ServeHTTPRedirect(ctx) 423 }) 424 if cli.RTMPServerAddon != "" { 425 group.Go(func() error { 426 return rtmps.ServeRTMPS(ctx, &cli) 427 }) 428 } 429 } else { 430 group.Go(func() error { 431 return a.ServeHTTP(ctx) 432 }) 433 } 434 435 group.Go(func() error { 436 return a.ServeInternalHTTP(ctx) 437 }) 438 439 if !cli.NoFirehose { 440 group.Go(func() error { 441 return atsync.StartFirehose(ctx) 442 }) 443 } 444 for _, labeler := range cli.Labelers { 445 group.Go(func() error { 446 return atsync.StartLabelerFirehose(ctx, labeler) 447 }) 448 } 449 450 group.Go(func() error { 451 return spmetrics.ExpireSessions(ctx) 452 }) 453 454 group.Go(func() error { 455 return mod.StartSegmentCleaner(ctx) 456 }) 457 458 if cli.LivepeerGateway { 459 // make a file to make sure the directory exists 460 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 461 if err != nil { 462 return err 463 } 464 fd.Close() 465 if err != nil { 466 return err 467 } 468 group.Go(func() error { 469 return GoLivepeer(ctx, fs) 470 }) 471 } 472 473 group.Go(func() error { 474 return d.Start(ctx) 475 }) 476 477 if cli.TestStream { 478 // regular stream self-test 479 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 480 Schema: schema, 481 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 482 }) 483 if err != nil { 484 return err 485 } 486 atkey, err := atproto.ParsePubKey(signer.Public()) 487 if err != nil { 488 return err 489 } 490 did := atkey.DIDKey() 491 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 492 if err != nil { 493 return err 494 } 495 err = mod.UpdateIdentity(&model.Identity{ 496 ID: testMediaSigner.Pub().String(), 497 Handle: "stream-self-tester", 498 DID: "", 499 }) 500 if err != nil { 501 return err 502 } 503 cli.AllowedStreams = append(cli.AllowedStreams, did) 504 a.Aliases["self-test"] = did 505 group.Go(func() error { 506 return mm.TestSource(ctx, testMediaSigner) 507 }) 508 509 // Start a test stream that will run intermittently 510 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 511 Schema: schema, 512 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 513 }) 514 if err != nil { 515 return err 516 } 517 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 518 if err != nil { 519 return err 520 } 521 did2 := atkey2.DIDKey() 522 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 523 if err != nil { 524 return err 525 } 526 err = mod.UpdateIdentity(&model.Identity{ 527 ID: intermittentMediaSigner.Pub().String(), 528 Handle: "stream-intermittent-tester", 529 DID: "", 530 }) 531 if err != nil { 532 return err 533 } 534 cli.AllowedStreams = append(cli.AllowedStreams, did2) 535 a.Aliases["intermittent-self-test"] = did2 536 537 group.Go(func() error { 538 for { 539 // Start intermittent stream 540 intermittentCtx, cancel := context.WithCancel(ctx) 541 done := make(chan struct{}) 542 go func() { 543 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 544 close(done) 545 }() 546 // Stream ON for 15 seconds 547 time.Sleep(15 * time.Second) 548 // Stop stream 549 cancel() 550 <-done // Wait for TestSource to exit 551 // Stream OFF for 15 seconds 552 time.Sleep(15 * time.Second) 553 } 554 }) 555 } 556 557 for _, job := range platformJobs { 558 group.Go(func() error { 559 return job(ctx, &cli) 560 }) 561 } 562 563 if cli.WHIPTest != "" { 564 group.Go(func() error { 565 err := WHIP(strings.Split(cli.WHIPTest, " ")) 566 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 567 time.Sleep(time.Second * 3) 568 // gst.Deinit() 569 log.Warn(ctx, "gst deinit complete, exiting") 570 return err 571 }) 572 } 573 574 return group.Wait() 575} 576 577var ErrCaughtSignal = errors.New("caught signal") 578 579func handleSignals(ctx context.Context) error { 580 c := make(chan os.Signal, 1) 581 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 582 for { 583 select { 584 case s := <-c: 585 if s == syscall.SIGABRT { 586 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 587 log.Error(ctx, "failed to create pprof", "error", err) 588 } 589 } 590 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 591 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 592 case <-ctx.Done(): 593 return nil 594 } 595 } 596}