Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.1 594 lines 15 kB view raw
1package cmd 2 3import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "errors" 8 "flag" 9 "fmt" 10 "net/url" 11 "os" 12 "os/signal" 13 "runtime" 14 "runtime/pprof" 15 "slices" 16 "strconv" 17 "strings" 18 "syscall" 19 "time" 20 21 "github.com/bluesky-social/indigo/carstore" 22 "github.com/ethereum/go-ethereum/common/hexutil" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/peterbourgon/ff/v3" 25 "github.com/streamplace/oatproxy/pkg/oatproxy" 26 "stream.place/streamplace/pkg/aqhttp" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/gstinit" 31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 "stream.place/streamplace/pkg/log" 33 "stream.place/streamplace/pkg/media" 34 "stream.place/streamplace/pkg/notifications" 35 "stream.place/streamplace/pkg/replication" 36 "stream.place/streamplace/pkg/replication/iroh_replicator" 37 "stream.place/streamplace/pkg/replication/websocketrep" 38 "stream.place/streamplace/pkg/rtmps" 39 "stream.place/streamplace/pkg/spmetrics" 40 "stream.place/streamplace/pkg/statedb" 41 "stream.place/streamplace/pkg/storage" 42 43 _ "github.com/go-gst/go-glib/glib" 44 _ "github.com/go-gst/go-gst/gst" 45 "stream.place/streamplace/pkg/api" 46 "stream.place/streamplace/pkg/config" 47 "stream.place/streamplace/pkg/model" 48) 49 50// Additional jobs that can be injected by platforms 51type jobFunc func(ctx context.Context, cli *config.CLI) error 52 53// parse the CLI and fire up an streamplace node! 54func start(build *config.BuildFlags, platformJobs []jobFunc) error { 55 iroh_streamplace.InitLogging() 56 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 57 err := media.RunSelfTest(context.Background()) 58 if err != nil { 59 if selfTest { 60 fmt.Println(err.Error()) 61 os.Exit(1) 62 } else { 63 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 64 if retryCount >= 3 { 65 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 66 return err 67 } 68 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 69 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 70 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 71 if err != nil { 72 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 73 return err 74 } 75 panic("invalid code path: exec succeeded but we're still here???") 76 } 77 } 78 if selfTest { 79 runtime.GC() 80 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 81 log.Error(context.Background(), "error creating pprof", "error", err) 82 } 83 fmt.Println("self-test successful!") 84 os.Exit(0) 85 } 86 87 if len(os.Args) > 1 && os.Args[1] == "stream" { 88 if len(os.Args) != 3 { 89 fmt.Println("usage: streamplace stream [user]") 90 os.Exit(1) 91 } 92 return Stream(os.Args[2]) 93 } 94 95 if len(os.Args) > 1 && os.Args[1] == "live" { 96 cli := config.CLI{Build: build} 97 fs := cli.NewFlagSet("streamplace live") 98 99 err := cli.Parse(fs, os.Args[2:]) 100 if err != nil { 101 return err 102 } 103 104 args := fs.Args() 105 if len(args) != 1 { 106 fmt.Println("usage: streamplace live [flags] [stream-key]") 107 os.Exit(1) 108 } 109 110 return Live(args[0], cli.HTTPInternalAddr) 111 } 112 113 if len(os.Args) > 1 && os.Args[1] == "sign" { 114 return Sign(context.Background()) 115 } 116 117 if len(os.Args) > 1 && os.Args[1] == "whep" { 118 return WHEP(os.Args[2:]) 119 } 120 if len(os.Args) > 1 && os.Args[1] == "whip" { 121 return WHIP(os.Args[2:]) 122 } 123 124 if len(os.Args) > 1 && os.Args[1] == "combine" { 125 return Combine(context.Background(), build, os.Args[2:]) 126 } 127 128 if len(os.Args) > 1 && os.Args[1] == "split" { 129 cli := config.CLI{Build: build} 130 fs := cli.NewFlagSet("streamplace split") 131 132 err := cli.Parse(fs, os.Args[2:]) 133 if err != nil { 134 return err 135 } 136 ctx := context.Background() 137 ctx = log.WithDebugValue(ctx, cli.Debug) 138 if len(fs.Args()) != 2 { 139 fmt.Println("usage: streamplace split [flags] [input file] [output directory]") 140 os.Exit(1) 141 } 142 gstinit.InitGST() 143 return Split(ctx, fs.Args()[0], fs.Args()[1]) 144 } 145 146 if len(os.Args) > 1 && os.Args[1] == "self-test" { 147 err := media.RunSelfTest(context.Background()) 148 if err != nil { 149 fmt.Println(err.Error()) 150 os.Exit(1) 151 } 152 fmt.Println("self-test successful!") 153 os.Exit(0) 154 } 155 156 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 157 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 158 _ = starter.NewLivepeerConfig(lpfs) 159 err = ff.Parse(lpfs, os.Args[2:], 160 ff.WithConfigFileFlag("config"), 161 ff.WithEnvVarPrefix("LP"), 162 ) 163 if err != nil { 164 return err 165 } 166 err = GoLivepeer(context.Background(), lpfs) 167 if err != nil { 168 log.Error(context.Background(), "error in livepeer", "error", err) 169 os.Exit(1) 170 } 171 os.Exit(0) 172 } 173 174 _ = flag.Set("logtostderr", "true") 175 vFlag := flag.Lookup("v") 176 cli := config.CLI{Build: build} 177 fs := cli.NewFlagSet("streamplace") 178 verbosity := fs.String("v", "3", "log verbosity level") 179 version := fs.Bool("version", false, "print version and exit") 180 181 err = cli.Parse( 182 fs, os.Args[1:], 183 ) 184 if err != nil { 185 return err 186 } 187 188 err = flag.CommandLine.Parse(nil) 189 if err != nil { 190 return err 191 } 192 _ = vFlag.Value.Set(*verbosity) 193 log.SetColorLogger(cli.Color) 194 ctx := context.Background() 195 ctx = log.WithDebugValue(ctx, cli.Debug) 196 197 log.Log(ctx, 198 "streamplace", 199 "version", build.Version, 200 "buildTime", build.BuildTimeStr(), 201 "uuid", build.UUID, 202 "runtime.GOOS", runtime.GOOS, 203 "runtime.GOARCH", runtime.GOARCH, 204 "runtime.Version", runtime.Version()) 205 if *version { 206 return nil 207 } 208 signer, err := createSigner(ctx, &cli) 209 if err != nil { 210 return err 211 } 212 213 if len(os.Args) > 1 && os.Args[1] == "migrate" { 214 return statedb.Migrate(&cli) 215 } 216 217 spmetrics.Version.WithLabelValues(build.Version).Inc() 218 if cli.LivepeerHelp { 219 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 220 _ = starter.NewLivepeerConfig(lpFlags) 221 lpFlags.VisitAll(func(f *flag.Flag) { 222 adapted := config.ToSnakeCase(f.Name) 223 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 224 usage := fmt.Sprintf(" %s", f.Usage) 225 if f.DefValue != "" { 226 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 227 } 228 fmt.Printf(" %s\n", usage) 229 }) 230 return nil 231 } 232 233 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 234 235 err = os.MkdirAll(cli.DataDir, os.ModePerm) 236 if err != nil { 237 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 238 } 239 240 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 241 if err != nil { 242 return err 243 } 244 var noter notifications.FirebaseNotifier 245 if cli.FirebaseServiceAccount != "" { 246 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 247 if err != nil { 248 return err 249 } 250 } 251 252 group, ctx := TimeoutGroupWithContext(ctx) 253 254 out := carstore.SQLiteStore{} 255 err = out.Open(":memory:") 256 if err != nil { 257 return err 258 } 259 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 260 if err != nil { 261 return err 262 } 263 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 264 if err != nil { 265 return err 266 } 267 defer handle.Close() 268 269 jwk, err := state.EnsureJWK(ctx, "jwk") 270 if err != nil { 271 return err 272 } 273 cli.JWK = jwk 274 275 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 276 if err != nil { 277 return err 278 } 279 cli.AccessJWK = accessJWK 280 281 b := bus.NewBus() 282 atsync := &atproto.ATProtoSynchronizer{ 283 CLI: &cli, 284 Model: mod, 285 StatefulDB: state, 286 Noter: noter, 287 Bus: b, 288 } 289 err = atsync.Migrate(ctx) 290 if err != nil { 291 return fmt.Errorf("failed to migrate: %w", err) 292 } 293 294 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync) 295 if err != nil { 296 return err 297 } 298 299 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod) 300 if err != nil { 301 return err 302 } 303 304 var clientMetadata *oatproxy.OAuthClientMetadata 305 var host string 306 if cli.PublicOAuth { 307 u, err := url.Parse(cli.OwnPublicURL()) 308 if err != nil { 309 return err 310 } 311 host = u.Host 312 clientMetadata = &oatproxy.OAuthClientMetadata{ 313 Scope: atproto.OAuthString, 314 ClientName: "Streamplace", 315 RedirectURIs: []string{ 316 fmt.Sprintf("%s/login", cli.OwnPublicURL()), 317 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()), 318 }, 319 } 320 } else { 321 host = cli.BroadcasterHost 322 clientMetadata = &oatproxy.OAuthClientMetadata{ 323 Scope: atproto.OAuthString, 324 ClientName: "Streamplace", 325 RedirectURIs: []string{ 326 fmt.Sprintf("https://%s/login", cli.BroadcasterHost), 327 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost), 328 }, 329 } 330 } 331 332 var replicator replication.Replicator = nil 333 if slices.Contains(cli.Replicators, config.ReplicatorIroh) { 334 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 335 if err != nil { 336 return err 337 } 338 if !exists { 339 secret := make([]byte, 32) 340 _, err := rand.Read(secret) 341 if err != nil { 342 return fmt.Errorf("failed to generate random secret: %w", err) 343 } 344 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 345 if err != nil { 346 return err 347 } 348 } 349 buf := bytes.Buffer{} 350 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 351 if err != nil { 352 return err 353 } 354 secret := buf.Bytes() 355 var topic []byte 356 if cli.IrohTopic != "" { 357 topic, err = hexutil.Decode("0x" + cli.IrohTopic) 358 if err != nil { 359 return err 360 } 361 } 362 replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 363 if err != nil { 364 return err 365 } 366 } 367 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) { 368 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm) 369 } 370 371 op := oatproxy.New(&oatproxy.Config{ 372 Host: host, 373 CreateOAuthSession: state.CreateOAuthSession, 374 UpdateOAuthSession: state.UpdateOAuthSession, 375 GetOAuthSession: state.LoadOAuthSession, 376 Lock: state.GetNamedLock, 377 Scope: atproto.OAuthString, 378 UpstreamJWK: cli.JWK, 379 DownstreamJWK: cli.AccessJWK, 380 ClientMetadata: clientMetadata, 381 Public: cli.PublicOAuth, 382 }) 383 d := director.NewDirector(mm, mod, &cli, b, op, state, replicator) 384 a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op) 385 if err != nil { 386 return err 387 } 388 389 ctx = log.WithLogValues(ctx, "version", build.Version) 390 391 group.Go(func() error { 392 return handleSignals(ctx) 393 }) 394 395 group.Go(func() error { 396 return state.ProcessQueue(ctx) 397 }) 398 399 if cli.TracingEndpoint != "" { 400 group.Go(func() error { 401 return startTelemetry(ctx, cli.TracingEndpoint) 402 }) 403 } 404 405 if cli.Secure { 406 group.Go(func() error { 407 return a.ServeHTTPS(ctx) 408 }) 409 group.Go(func() error { 410 return a.ServeHTTPRedirect(ctx) 411 }) 412 if cli.RTMPServerAddon != "" { 413 group.Go(func() error { 414 return rtmps.ServeRTMPSAddon(ctx, &cli) 415 }) 416 } 417 group.Go(func() error { 418 return a.ServeRTMPS(ctx, &cli) 419 }) 420 } else { 421 group.Go(func() error { 422 return a.ServeHTTP(ctx) 423 }) 424 group.Go(func() error { 425 return a.ServeRTMP(ctx) 426 }) 427 } 428 429 group.Go(func() error { 430 return a.ServeInternalHTTP(ctx) 431 }) 432 433 if !cli.NoFirehose { 434 group.Go(func() error { 435 return atsync.StartFirehose(ctx) 436 }) 437 } 438 for _, labeler := range cli.Labelers { 439 group.Go(func() error { 440 return atsync.StartLabelerFirehose(ctx, labeler) 441 }) 442 } 443 444 group.Go(func() error { 445 return a.ExpireSessions(ctx) 446 }) 447 448 group.Go(func() error { 449 return storage.StartSegmentCleaner(ctx, mod, &cli) 450 }) 451 452 group.Go(func() error { 453 return mod.StartSegmentCleaner(ctx) 454 }) 455 456 group.Go(func() error { 457 return replicator.Start(ctx, &cli) 458 }) 459 460 if cli.LivepeerGateway { 461 // make a file to make sure the directory exists 462 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 463 if err != nil { 464 return err 465 } 466 fd.Close() 467 if err != nil { 468 return err 469 } 470 group.Go(func() error { 471 err := GoLivepeer(ctx, fs) 472 if err != nil { 473 return err 474 } 475 // livepeer returns nil on error, so we need to check if we're responsible 476 if ctx.Err() == nil { 477 return fmt.Errorf("livepeer exited") 478 } 479 return nil 480 }) 481 } 482 483 group.Go(func() error { 484 return d.Start(ctx) 485 }) 486 487 if cli.TestStream { 488 atkey, err := atproto.ParsePubKey(signer.Public()) 489 if err != nil { 490 return err 491 } 492 did := atkey.DIDKey() 493 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, signer, mod) 494 if err != nil { 495 return err 496 } 497 err = mod.UpdateIdentity(&model.Identity{ 498 ID: testMediaSigner.Pub().String(), 499 Handle: "stream-self-tester", 500 DID: "", 501 }) 502 if err != nil { 503 return err 504 } 505 cli.AllowedStreams = append(cli.AllowedStreams, did) 506 a.Aliases["self-test"] = did 507 group.Go(func() error { 508 return mm.TestSource(ctx, testMediaSigner) 509 }) 510 511 // Start a test stream that will run intermittently 512 if err != nil { 513 return err 514 } 515 atkey2, err := atproto.ParsePubKey(signer.Public()) 516 if err != nil { 517 return err 518 } 519 did2 := atkey2.DIDKey() 520 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, signer, mod) 521 if err != nil { 522 return err 523 } 524 err = mod.UpdateIdentity(&model.Identity{ 525 ID: intermittentMediaSigner.Pub().String(), 526 Handle: "stream-intermittent-tester", 527 DID: "", 528 }) 529 if err != nil { 530 return err 531 } 532 cli.AllowedStreams = append(cli.AllowedStreams, did2) 533 a.Aliases["intermittent-self-test"] = did2 534 535 group.Go(func() error { 536 for { 537 // Start intermittent stream 538 intermittentCtx, cancel := context.WithCancel(ctx) 539 done := make(chan struct{}) 540 go func() { 541 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 542 close(done) 543 }() 544 // Stream ON for 15 seconds 545 time.Sleep(15 * time.Second) 546 // Stop stream 547 cancel() 548 <-done // Wait for TestSource to exit 549 // Stream OFF for 15 seconds 550 time.Sleep(15 * time.Second) 551 } 552 }) 553 } 554 555 for _, job := range platformJobs { 556 group.Go(func() error { 557 return job(ctx, &cli) 558 }) 559 } 560 561 if cli.WHIPTest != "" { 562 group.Go(func() error { 563 err := WHIP(strings.Split(cli.WHIPTest, " ")) 564 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 565 time.Sleep(time.Second * 3) 566 // gst.Deinit() 567 log.Warn(ctx, "gst deinit complete, exiting") 568 return err 569 }) 570 } 571 572 return group.Wait() 573} 574 575var ErrCaughtSignal = errors.New("caught signal") 576 577func handleSignals(ctx context.Context) error { 578 c := make(chan os.Signal, 1) 579 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 580 for { 581 select { 582 case s := <-c: 583 if s == syscall.SIGABRT { 584 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 585 log.Error(ctx, "failed to create pprof", "error", err) 586 } 587 } 588 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 589 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 590 case <-ctx.Done(): 591 return nil 592 } 593 } 594}