Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.9 601 lines 15 kB view raw
1package cmd 2 3import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "errors" 8 "flag" 9 "fmt" 10 "net/url" 11 "os" 12 "os/signal" 13 "runtime" 14 "runtime/pprof" 15 "slices" 16 "strconv" 17 "strings" 18 "syscall" 19 "time" 20 21 "github.com/bluesky-social/indigo/carstore" 22 "github.com/ethereum/go-ethereum/common/hexutil" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/peterbourgon/ff/v3" 25 "github.com/streamplace/oatproxy/pkg/oatproxy" 26 "stream.place/streamplace/pkg/aqhttp" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/gstinit" 31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 "stream.place/streamplace/pkg/localdb" 33 "stream.place/streamplace/pkg/log" 34 "stream.place/streamplace/pkg/media" 35 "stream.place/streamplace/pkg/notifications" 36 "stream.place/streamplace/pkg/replication" 37 "stream.place/streamplace/pkg/replication/iroh_replicator" 38 "stream.place/streamplace/pkg/replication/websocketrep" 39 "stream.place/streamplace/pkg/rtmps" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/statedb" 42 "stream.place/streamplace/pkg/storage" 43 44 _ "github.com/go-gst/go-glib/glib" 45 _ "github.com/go-gst/go-gst/gst" 46 "stream.place/streamplace/pkg/api" 47 "stream.place/streamplace/pkg/config" 48 "stream.place/streamplace/pkg/model" 49) 50 51// Additional jobs that can be injected by platforms 52type jobFunc func(ctx context.Context, cli *config.CLI) error 53 54// parse the CLI and fire up an streamplace node! 55func start(build *config.BuildFlags, platformJobs []jobFunc) error { 56 iroh_streamplace.InitLogging() 57 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 58 err := media.RunSelfTest(context.Background()) 59 if err != nil { 60 if selfTest { 61 fmt.Println(err.Error()) 62 os.Exit(1) 63 } else { 64 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 65 if retryCount >= 3 { 66 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 67 return err 68 } 69 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 70 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 71 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 72 if err != nil { 73 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 74 return err 75 } 76 panic("invalid code path: exec succeeded but we're still here???") 77 } 78 } 79 if selfTest { 80 runtime.GC() 81 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 82 log.Error(context.Background(), "error creating pprof", "error", err) 83 } 84 fmt.Println("self-test successful!") 85 os.Exit(0) 86 } 87 88 if len(os.Args) > 1 && os.Args[1] == "stream" { 89 if len(os.Args) != 3 { 90 fmt.Println("usage: streamplace stream [user]") 91 os.Exit(1) 92 } 93 return Stream(os.Args[2]) 94 } 95 96 if len(os.Args) > 1 && os.Args[1] == "live" { 97 cli := config.CLI{Build: build} 98 fs := cli.NewFlagSet("streamplace live") 99 100 err := cli.Parse(fs, os.Args[2:]) 101 if err != nil { 102 return err 103 } 104 105 args := fs.Args() 106 if len(args) != 1 { 107 fmt.Println("usage: streamplace live [flags] [stream-key]") 108 os.Exit(1) 109 } 110 111 return Live(args[0], cli.HTTPInternalAddr) 112 } 113 114 if len(os.Args) > 1 && os.Args[1] == "sign" { 115 return Sign(context.Background()) 116 } 117 118 if len(os.Args) > 1 && os.Args[1] == "whep" { 119 return WHEP(os.Args[2:]) 120 } 121 if len(os.Args) > 1 && os.Args[1] == "whip" { 122 return WHIP(os.Args[2:]) 123 } 124 125 if len(os.Args) > 1 && os.Args[1] == "combine" { 126 return Combine(context.Background(), build, os.Args[2:]) 127 } 128 129 if len(os.Args) > 1 && os.Args[1] == "split" { 130 cli := config.CLI{Build: build} 131 fs := cli.NewFlagSet("streamplace split") 132 133 err := cli.Parse(fs, os.Args[2:]) 134 if err != nil { 135 return err 136 } 137 ctx := context.Background() 138 ctx = log.WithDebugValue(ctx, cli.Debug) 139 if len(fs.Args()) != 2 { 140 fmt.Println("usage: streamplace split [flags] [input file] [output directory]") 141 os.Exit(1) 142 } 143 gstinit.InitGST() 144 return Split(ctx, fs.Args()[0], fs.Args()[1]) 145 } 146 147 if len(os.Args) > 1 && os.Args[1] == "self-test" { 148 err := media.RunSelfTest(context.Background()) 149 if err != nil { 150 fmt.Println(err.Error()) 151 os.Exit(1) 152 } 153 fmt.Println("self-test successful!") 154 os.Exit(0) 155 } 156 157 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 158 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 159 _ = starter.NewLivepeerConfig(lpfs) 160 err = ff.Parse(lpfs, os.Args[2:], 161 ff.WithConfigFileFlag("config"), 162 ff.WithEnvVarPrefix("LP"), 163 ) 164 if err != nil { 165 return err 166 } 167 err = GoLivepeer(context.Background(), lpfs) 168 if err != nil { 169 log.Error(context.Background(), "error in livepeer", "error", err) 170 os.Exit(1) 171 } 172 os.Exit(0) 173 } 174 175 _ = flag.Set("logtostderr", "true") 176 vFlag := flag.Lookup("v") 177 cli := config.CLI{Build: build} 178 fs := cli.NewFlagSet("streamplace") 179 verbosity := fs.String("v", "3", "log verbosity level") 180 version := fs.Bool("version", false, "print version and exit") 181 182 err = cli.Parse( 183 fs, os.Args[1:], 184 ) 185 if err != nil { 186 return err 187 } 188 189 err = flag.CommandLine.Parse(nil) 190 if err != nil { 191 return err 192 } 193 _ = vFlag.Value.Set(*verbosity) 194 log.SetColorLogger(cli.Color) 195 ctx := context.Background() 196 ctx = log.WithDebugValue(ctx, cli.Debug) 197 198 log.Log(ctx, 199 "streamplace", 200 "version", build.Version, 201 "buildTime", build.BuildTimeStr(), 202 "uuid", build.UUID, 203 "runtime.GOOS", runtime.GOOS, 204 "runtime.GOARCH", runtime.GOARCH, 205 "runtime.Version", runtime.Version()) 206 if *version { 207 return nil 208 } 209 signer, err := createSigner(ctx, &cli) 210 if err != nil { 211 return err 212 } 213 214 if len(os.Args) > 1 && os.Args[1] == "migrate" { 215 return statedb.Migrate(&cli) 216 } 217 218 spmetrics.Version.WithLabelValues(build.Version).Inc() 219 if cli.LivepeerHelp { 220 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 221 _ = starter.NewLivepeerConfig(lpFlags) 222 lpFlags.VisitAll(func(f *flag.Flag) { 223 adapted := config.ToSnakeCase(f.Name) 224 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 225 usage := fmt.Sprintf(" %s", f.Usage) 226 if f.DefValue != "" { 227 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 228 } 229 fmt.Printf(" %s\n", usage) 230 }) 231 return nil 232 } 233 234 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 235 236 err = os.MkdirAll(cli.DataDir, os.ModePerm) 237 if err != nil { 238 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 239 } 240 241 ldb, err := localdb.MakeDB(cli.LocalDBURL) 242 if err != nil { 243 return err 244 } 245 246 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 247 if err != nil { 248 return err 249 } 250 var noter notifications.FirebaseNotifier 251 if cli.FirebaseServiceAccount != "" { 252 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 253 if err != nil { 254 return err 255 } 256 } 257 258 group, ctx := TimeoutGroupWithContext(ctx) 259 260 out := carstore.SQLiteStore{} 261 err = out.Open(":memory:") 262 if err != nil { 263 return err 264 } 265 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 266 if err != nil { 267 return err 268 } 269 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 270 if err != nil { 271 return err 272 } 273 defer handle.Close() 274 275 jwk, err := state.EnsureJWK(ctx, "jwk") 276 if err != nil { 277 return err 278 } 279 cli.JWK = jwk 280 281 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 282 if err != nil { 283 return err 284 } 285 cli.AccessJWK = accessJWK 286 287 b := bus.NewBus() 288 atsync := &atproto.ATProtoSynchronizer{ 289 CLI: &cli, 290 Model: mod, 291 StatefulDB: state, 292 Noter: noter, 293 Bus: b, 294 } 295 err = atsync.Migrate(ctx) 296 if err != nil { 297 return fmt.Errorf("failed to migrate: %w", err) 298 } 299 300 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync, ldb) 301 if err != nil { 302 return err 303 } 304 305 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod) 306 if err != nil { 307 return err 308 } 309 310 var clientMetadata *oatproxy.OAuthClientMetadata 311 var host string 312 if cli.PublicOAuth { 313 u, err := url.Parse(cli.OwnPublicURL()) 314 if err != nil { 315 return err 316 } 317 host = u.Host 318 clientMetadata = &oatproxy.OAuthClientMetadata{ 319 Scope: atproto.OAuthString, 320 ClientName: "Streamplace", 321 RedirectURIs: []string{ 322 fmt.Sprintf("%s/login", cli.OwnPublicURL()), 323 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()), 324 }, 325 } 326 } else { 327 host = cli.BroadcasterHost 328 clientMetadata = &oatproxy.OAuthClientMetadata{ 329 Scope: atproto.OAuthString, 330 ClientName: "Streamplace", 331 RedirectURIs: []string{ 332 fmt.Sprintf("https://%s/login", cli.BroadcasterHost), 333 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost), 334 }, 335 } 336 } 337 338 var replicator replication.Replicator = nil 339 if slices.Contains(cli.Replicators, config.ReplicatorIroh) { 340 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 341 if err != nil { 342 return err 343 } 344 if !exists { 345 secret := make([]byte, 32) 346 _, err := rand.Read(secret) 347 if err != nil { 348 return fmt.Errorf("failed to generate random secret: %w", err) 349 } 350 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 351 if err != nil { 352 return err 353 } 354 } 355 buf := bytes.Buffer{} 356 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 357 if err != nil { 358 return err 359 } 360 secret := buf.Bytes() 361 var topic []byte 362 if cli.IrohTopic != "" { 363 topic, err = hexutil.Decode("0x" + cli.IrohTopic) 364 if err != nil { 365 return err 366 } 367 } 368 replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 369 if err != nil { 370 return err 371 } 372 } 373 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) { 374 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm) 375 } 376 377 op := oatproxy.New(&oatproxy.Config{ 378 Host: host, 379 CreateOAuthSession: state.CreateOAuthSession, 380 UpdateOAuthSession: state.UpdateOAuthSession, 381 GetOAuthSession: state.LoadOAuthSession, 382 Lock: state.GetNamedLock, 383 Scope: atproto.OAuthString, 384 UpstreamJWK: cli.JWK, 385 DownstreamJWK: cli.AccessJWK, 386 ClientMetadata: clientMetadata, 387 Public: cli.PublicOAuth, 388 HTTPClient: &aqhttp.Client, 389 }) 390 d := director.NewDirector(mm, mod, &cli, b, op, state, replicator, ldb) 391 a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb) 392 if err != nil { 393 return err 394 } 395 396 ctx = log.WithLogValues(ctx, "version", build.Version) 397 398 group.Go(func() error { 399 return handleSignals(ctx) 400 }) 401 402 group.Go(func() error { 403 return state.ProcessQueue(ctx) 404 }) 405 406 if cli.TracingEndpoint != "" { 407 group.Go(func() error { 408 return startTelemetry(ctx, cli.TracingEndpoint) 409 }) 410 } 411 412 if cli.Secure { 413 group.Go(func() error { 414 return a.ServeHTTPS(ctx) 415 }) 416 group.Go(func() error { 417 return a.ServeHTTPRedirect(ctx) 418 }) 419 if cli.RTMPServerAddon != "" { 420 group.Go(func() error { 421 return rtmps.ServeRTMPSAddon(ctx, &cli) 422 }) 423 } 424 group.Go(func() error { 425 return a.ServeRTMPS(ctx, &cli) 426 }) 427 } else { 428 group.Go(func() error { 429 return a.ServeHTTP(ctx) 430 }) 431 group.Go(func() error { 432 return a.ServeRTMP(ctx) 433 }) 434 } 435 436 group.Go(func() error { 437 return a.ServeInternalHTTP(ctx) 438 }) 439 440 if !cli.NoFirehose { 441 group.Go(func() error { 442 return atsync.StartFirehose(ctx) 443 }) 444 } 445 for _, labeler := range cli.Labelers { 446 group.Go(func() error { 447 return atsync.StartLabelerFirehose(ctx, labeler) 448 }) 449 } 450 451 group.Go(func() error { 452 return a.ExpireSessions(ctx) 453 }) 454 455 group.Go(func() error { 456 return storage.StartSegmentCleaner(ctx, ldb, &cli) 457 }) 458 459 group.Go(func() error { 460 return ldb.StartSegmentCleaner(ctx) 461 }) 462 463 group.Go(func() error { 464 return replicator.Start(ctx, &cli) 465 }) 466 467 if cli.LivepeerGateway { 468 // make a file to make sure the directory exists 469 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 470 if err != nil { 471 return err 472 } 473 fd.Close() 474 if err != nil { 475 return err 476 } 477 group.Go(func() error { 478 err := GoLivepeer(ctx, fs) 479 if err != nil { 480 return err 481 } 482 // livepeer returns nil on error, so we need to check if we're responsible 483 if ctx.Err() == nil { 484 return fmt.Errorf("livepeer exited") 485 } 486 return nil 487 }) 488 } 489 490 group.Go(func() error { 491 return d.Start(ctx) 492 }) 493 494 if cli.TestStream { 495 atkey, err := atproto.ParsePubKey(signer.Public()) 496 if err != nil { 497 return err 498 } 499 did := atkey.DIDKey() 500 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, signer, mod) 501 if err != nil { 502 return err 503 } 504 err = mod.UpdateIdentity(&model.Identity{ 505 ID: testMediaSigner.Pub().String(), 506 Handle: "stream-self-tester", 507 DID: "", 508 }) 509 if err != nil { 510 return err 511 } 512 cli.AllowedStreams = append(cli.AllowedStreams, did) 513 a.Aliases["self-test"] = did 514 group.Go(func() error { 515 return mm.TestSource(ctx, testMediaSigner) 516 }) 517 518 // Start a test stream that will run intermittently 519 if err != nil { 520 return err 521 } 522 atkey2, err := atproto.ParsePubKey(signer.Public()) 523 if err != nil { 524 return err 525 } 526 did2 := atkey2.DIDKey() 527 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, signer, mod) 528 if err != nil { 529 return err 530 } 531 err = mod.UpdateIdentity(&model.Identity{ 532 ID: intermittentMediaSigner.Pub().String(), 533 Handle: "stream-intermittent-tester", 534 DID: "", 535 }) 536 if err != nil { 537 return err 538 } 539 cli.AllowedStreams = append(cli.AllowedStreams, did2) 540 a.Aliases["intermittent-self-test"] = did2 541 542 group.Go(func() error { 543 for { 544 // Start intermittent stream 545 intermittentCtx, cancel := context.WithCancel(ctx) 546 done := make(chan struct{}) 547 go func() { 548 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 549 close(done) 550 }() 551 // Stream ON for 15 seconds 552 time.Sleep(15 * time.Second) 553 // Stop stream 554 cancel() 555 <-done // Wait for TestSource to exit 556 // Stream OFF for 15 seconds 557 time.Sleep(15 * time.Second) 558 } 559 }) 560 } 561 562 for _, job := range platformJobs { 563 group.Go(func() error { 564 return job(ctx, &cli) 565 }) 566 } 567 568 if cli.WHIPTest != "" { 569 group.Go(func() error { 570 err := WHIP(strings.Split(cli.WHIPTest, " ")) 571 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 572 time.Sleep(time.Second * 3) 573 // gst.Deinit() 574 log.Warn(ctx, "gst deinit complete, exiting") 575 return err 576 }) 577 } 578 579 return group.Wait() 580} 581 582var ErrCaughtSignal = errors.New("caught signal") 583 584func handleSignals(ctx context.Context) error { 585 c := make(chan os.Signal, 1) 586 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 587 for { 588 select { 589 case s := <-c: 590 if s == syscall.SIGABRT { 591 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 592 log.Error(ctx, "failed to create pprof", "error", err) 593 } 594 } 595 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 596 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 597 case <-ctx.Done(): 598 return nil 599 } 600 } 601}