Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/localdb 600 lines 15 kB view raw
1package cmd 2 3import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "errors" 8 "flag" 9 "fmt" 10 "net/url" 11 "os" 12 "os/signal" 13 "runtime" 14 "runtime/pprof" 15 "slices" 16 "strconv" 17 "strings" 18 "syscall" 19 "time" 20 21 "github.com/bluesky-social/indigo/carstore" 22 "github.com/ethereum/go-ethereum/common/hexutil" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/peterbourgon/ff/v3" 25 "github.com/streamplace/oatproxy/pkg/oatproxy" 26 "stream.place/streamplace/pkg/aqhttp" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/gstinit" 31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 "stream.place/streamplace/pkg/localdb" 33 "stream.place/streamplace/pkg/log" 34 "stream.place/streamplace/pkg/media" 35 "stream.place/streamplace/pkg/notifications" 36 "stream.place/streamplace/pkg/replication" 37 "stream.place/streamplace/pkg/replication/iroh_replicator" 38 "stream.place/streamplace/pkg/replication/websocketrep" 39 "stream.place/streamplace/pkg/rtmps" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/statedb" 42 "stream.place/streamplace/pkg/storage" 43 44 _ "github.com/go-gst/go-glib/glib" 45 _ "github.com/go-gst/go-gst/gst" 46 "stream.place/streamplace/pkg/api" 47 "stream.place/streamplace/pkg/config" 48 "stream.place/streamplace/pkg/model" 49) 50 51// Additional jobs that can be injected by platforms 52type jobFunc func(ctx context.Context, cli *config.CLI) error 53 54// parse the CLI and fire up an streamplace node! 55func start(build *config.BuildFlags, platformJobs []jobFunc) error { 56 iroh_streamplace.InitLogging() 57 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 58 err := media.RunSelfTest(context.Background()) 59 if err != nil { 60 if selfTest { 61 fmt.Println(err.Error()) 62 os.Exit(1) 63 } else { 64 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 65 if retryCount >= 3 { 66 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 67 return err 68 } 69 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 70 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 71 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 72 if err != nil { 73 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 74 return err 75 } 76 panic("invalid code path: exec succeeded but we're still here???") 77 } 78 } 79 if selfTest { 80 runtime.GC() 81 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 82 log.Error(context.Background(), "error creating pprof", "error", err) 83 } 84 fmt.Println("self-test successful!") 85 os.Exit(0) 86 } 87 88 if len(os.Args) > 1 && os.Args[1] == "stream" { 89 if len(os.Args) != 3 { 90 fmt.Println("usage: streamplace stream [user]") 91 os.Exit(1) 92 } 93 return Stream(os.Args[2]) 94 } 95 96 if len(os.Args) > 1 && os.Args[1] == "live" { 97 cli := config.CLI{Build: build} 98 fs := cli.NewFlagSet("streamplace live") 99 100 err := cli.Parse(fs, os.Args[2:]) 101 if err != nil { 102 return err 103 } 104 105 args := fs.Args() 106 if len(args) != 1 { 107 fmt.Println("usage: streamplace live [flags] [stream-key]") 108 os.Exit(1) 109 } 110 111 return Live(args[0], cli.HTTPInternalAddr) 112 } 113 114 if len(os.Args) > 1 && os.Args[1] == "sign" { 115 return Sign(context.Background()) 116 } 117 118 if len(os.Args) > 1 && os.Args[1] == "whep" { 119 return WHEP(os.Args[2:]) 120 } 121 if len(os.Args) > 1 && os.Args[1] == "whip" { 122 return WHIP(os.Args[2:]) 123 } 124 125 if len(os.Args) > 1 && os.Args[1] == "combine" { 126 return Combine(context.Background(), build, os.Args[2:]) 127 } 128 129 if len(os.Args) > 1 && os.Args[1] == "split" { 130 cli := config.CLI{Build: build} 131 fs := cli.NewFlagSet("streamplace split") 132 133 err := cli.Parse(fs, os.Args[2:]) 134 if err != nil { 135 return err 136 } 137 ctx := context.Background() 138 ctx = log.WithDebugValue(ctx, cli.Debug) 139 if len(fs.Args()) != 2 { 140 fmt.Println("usage: streamplace split [flags] [input file] [output directory]") 141 os.Exit(1) 142 } 143 gstinit.InitGST() 144 return Split(ctx, fs.Args()[0], fs.Args()[1]) 145 } 146 147 if len(os.Args) > 1 && os.Args[1] == "self-test" { 148 err := media.RunSelfTest(context.Background()) 149 if err != nil { 150 fmt.Println(err.Error()) 151 os.Exit(1) 152 } 153 fmt.Println("self-test successful!") 154 os.Exit(0) 155 } 156 157 if len(os.Args) > 1 && os.Args[1] == "livepeer" { 158 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError) 159 _ = starter.NewLivepeerConfig(lpfs) 160 err = ff.Parse(lpfs, os.Args[2:], 161 ff.WithConfigFileFlag("config"), 162 ff.WithEnvVarPrefix("LP"), 163 ) 164 if err != nil { 165 return err 166 } 167 err = GoLivepeer(context.Background(), lpfs) 168 if err != nil { 169 log.Error(context.Background(), "error in livepeer", "error", err) 170 os.Exit(1) 171 } 172 os.Exit(0) 173 } 174 175 _ = flag.Set("logtostderr", "true") 176 vFlag := flag.Lookup("v") 177 cli := config.CLI{Build: build} 178 fs := cli.NewFlagSet("streamplace") 179 verbosity := fs.String("v", "3", "log verbosity level") 180 version := fs.Bool("version", false, "print version and exit") 181 182 err = cli.Parse( 183 fs, os.Args[1:], 184 ) 185 if err != nil { 186 return err 187 } 188 189 err = flag.CommandLine.Parse(nil) 190 if err != nil { 191 return err 192 } 193 _ = vFlag.Value.Set(*verbosity) 194 log.SetColorLogger(cli.Color) 195 ctx := context.Background() 196 ctx = log.WithDebugValue(ctx, cli.Debug) 197 198 log.Log(ctx, 199 "streamplace", 200 "version", build.Version, 201 "buildTime", build.BuildTimeStr(), 202 "uuid", build.UUID, 203 "runtime.GOOS", runtime.GOOS, 204 "runtime.GOARCH", runtime.GOARCH, 205 "runtime.Version", runtime.Version()) 206 if *version { 207 return nil 208 } 209 signer, err := createSigner(ctx, &cli) 210 if err != nil { 211 return err 212 } 213 214 if len(os.Args) > 1 && os.Args[1] == "migrate" { 215 return statedb.Migrate(&cli) 216 } 217 218 spmetrics.Version.WithLabelValues(build.Version).Inc() 219 if cli.LivepeerHelp { 220 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 221 _ = starter.NewLivepeerConfig(lpFlags) 222 lpFlags.VisitAll(func(f *flag.Flag) { 223 adapted := config.ToSnakeCase(f.Name) 224 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted)) 225 usage := fmt.Sprintf(" %s", f.Usage) 226 if f.DefValue != "" { 227 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue) 228 } 229 fmt.Printf(" %s\n", usage) 230 }) 231 return nil 232 } 233 234 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 235 236 err = os.MkdirAll(cli.DataDir, os.ModePerm) 237 if err != nil { 238 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 239 } 240 241 ldb, err := localdb.MakeDB(cli.LocalDBURL) 242 if err != nil { 243 return err 244 } 245 246 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 247 if err != nil { 248 return err 249 } 250 var noter notifications.FirebaseNotifier 251 if cli.FirebaseServiceAccount != "" { 252 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 253 if err != nil { 254 return err 255 } 256 } 257 258 group, ctx := TimeoutGroupWithContext(ctx) 259 260 out := carstore.SQLiteStore{} 261 err = out.Open(":memory:") 262 if err != nil { 263 return err 264 } 265 state, err := statedb.MakeDB(ctx, &cli, noter, mod) 266 if err != nil { 267 return err 268 } 269 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 270 if err != nil { 271 return err 272 } 273 defer handle.Close() 274 275 jwk, err := state.EnsureJWK(ctx, "jwk") 276 if err != nil { 277 return err 278 } 279 cli.JWK = jwk 280 281 accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 282 if err != nil { 283 return err 284 } 285 cli.AccessJWK = accessJWK 286 287 b := bus.NewBus() 288 atsync := &atproto.ATProtoSynchronizer{ 289 CLI: &cli, 290 Model: mod, 291 StatefulDB: state, 292 Noter: noter, 293 Bus: b, 294 } 295 err = atsync.Migrate(ctx) 296 if err != nil { 297 return fmt.Errorf("failed to migrate: %w", err) 298 } 299 300 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync, ldb) 301 if err != nil { 302 return err 303 } 304 305 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod) 306 if err != nil { 307 return err 308 } 309 310 var clientMetadata *oatproxy.OAuthClientMetadata 311 var host string 312 if cli.PublicOAuth { 313 u, err := url.Parse(cli.OwnPublicURL()) 314 if err != nil { 315 return err 316 } 317 host = u.Host 318 clientMetadata = &oatproxy.OAuthClientMetadata{ 319 Scope: atproto.OAuthString, 320 ClientName: "Streamplace", 321 RedirectURIs: []string{ 322 fmt.Sprintf("%s/login", cli.OwnPublicURL()), 323 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()), 324 }, 325 } 326 } else { 327 host = cli.BroadcasterHost 328 clientMetadata = &oatproxy.OAuthClientMetadata{ 329 Scope: atproto.OAuthString, 330 ClientName: "Streamplace", 331 RedirectURIs: []string{ 332 fmt.Sprintf("https://%s/login", cli.BroadcasterHost), 333 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost), 334 }, 335 } 336 } 337 338 var replicator replication.Replicator = nil 339 if slices.Contains(cli.Replicators, config.ReplicatorIroh) { 340 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 341 if err != nil { 342 return err 343 } 344 if !exists { 345 secret := make([]byte, 32) 346 _, err := rand.Read(secret) 347 if err != nil { 348 return fmt.Errorf("failed to generate random secret: %w", err) 349 } 350 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 351 if err != nil { 352 return err 353 } 354 } 355 buf := bytes.Buffer{} 356 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 357 if err != nil { 358 return err 359 } 360 secret := buf.Bytes() 361 var topic []byte 362 if cli.IrohTopic != "" { 363 topic, err = hexutil.Decode("0x" + cli.IrohTopic) 364 if err != nil { 365 return err 366 } 367 } 368 replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 369 if err != nil { 370 return err 371 } 372 } 373 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) { 374 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm) 375 } 376 377 op := oatproxy.New(&oatproxy.Config{ 378 Host: host, 379 CreateOAuthSession: state.CreateOAuthSession, 380 UpdateOAuthSession: state.UpdateOAuthSession, 381 GetOAuthSession: state.LoadOAuthSession, 382 Lock: state.GetNamedLock, 383 Scope: atproto.OAuthString, 384 UpstreamJWK: cli.JWK, 385 DownstreamJWK: cli.AccessJWK, 386 ClientMetadata: clientMetadata, 387 Public: cli.PublicOAuth, 388 }) 389 d := director.NewDirector(mm, mod, &cli, b, op, state, replicator, ldb) 390 a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb) 391 if err != nil { 392 return err 393 } 394 395 ctx = log.WithLogValues(ctx, "version", build.Version) 396 397 group.Go(func() error { 398 return handleSignals(ctx) 399 }) 400 401 group.Go(func() error { 402 return state.ProcessQueue(ctx) 403 }) 404 405 if cli.TracingEndpoint != "" { 406 group.Go(func() error { 407 return startTelemetry(ctx, cli.TracingEndpoint) 408 }) 409 } 410 411 if cli.Secure { 412 group.Go(func() error { 413 return a.ServeHTTPS(ctx) 414 }) 415 group.Go(func() error { 416 return a.ServeHTTPRedirect(ctx) 417 }) 418 if cli.RTMPServerAddon != "" { 419 group.Go(func() error { 420 return rtmps.ServeRTMPSAddon(ctx, &cli) 421 }) 422 } 423 group.Go(func() error { 424 return a.ServeRTMPS(ctx, &cli) 425 }) 426 } else { 427 group.Go(func() error { 428 return a.ServeHTTP(ctx) 429 }) 430 group.Go(func() error { 431 return a.ServeRTMP(ctx) 432 }) 433 } 434 435 group.Go(func() error { 436 return a.ServeInternalHTTP(ctx) 437 }) 438 439 if !cli.NoFirehose { 440 group.Go(func() error { 441 return atsync.StartFirehose(ctx) 442 }) 443 } 444 for _, labeler := range cli.Labelers { 445 group.Go(func() error { 446 return atsync.StartLabelerFirehose(ctx, labeler) 447 }) 448 } 449 450 group.Go(func() error { 451 return a.ExpireSessions(ctx) 452 }) 453 454 group.Go(func() error { 455 return storage.StartSegmentCleaner(ctx, ldb, &cli) 456 }) 457 458 group.Go(func() error { 459 return ldb.StartSegmentCleaner(ctx) 460 }) 461 462 group.Go(func() error { 463 return replicator.Start(ctx, &cli) 464 }) 465 466 if cli.LivepeerGateway { 467 // make a file to make sure the directory exists 468 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true) 469 if err != nil { 470 return err 471 } 472 fd.Close() 473 if err != nil { 474 return err 475 } 476 group.Go(func() error { 477 err := GoLivepeer(ctx, fs) 478 if err != nil { 479 return err 480 } 481 // livepeer returns nil on error, so we need to check if we're responsible 482 if ctx.Err() == nil { 483 return fmt.Errorf("livepeer exited") 484 } 485 return nil 486 }) 487 } 488 489 group.Go(func() error { 490 return d.Start(ctx) 491 }) 492 493 if cli.TestStream { 494 atkey, err := atproto.ParsePubKey(signer.Public()) 495 if err != nil { 496 return err 497 } 498 did := atkey.DIDKey() 499 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, signer, mod) 500 if err != nil { 501 return err 502 } 503 err = mod.UpdateIdentity(&model.Identity{ 504 ID: testMediaSigner.Pub().String(), 505 Handle: "stream-self-tester", 506 DID: "", 507 }) 508 if err != nil { 509 return err 510 } 511 cli.AllowedStreams = append(cli.AllowedStreams, did) 512 a.Aliases["self-test"] = did 513 group.Go(func() error { 514 return mm.TestSource(ctx, testMediaSigner) 515 }) 516 517 // Start a test stream that will run intermittently 518 if err != nil { 519 return err 520 } 521 atkey2, err := atproto.ParsePubKey(signer.Public()) 522 if err != nil { 523 return err 524 } 525 did2 := atkey2.DIDKey() 526 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, signer, mod) 527 if err != nil { 528 return err 529 } 530 err = mod.UpdateIdentity(&model.Identity{ 531 ID: intermittentMediaSigner.Pub().String(), 532 Handle: "stream-intermittent-tester", 533 DID: "", 534 }) 535 if err != nil { 536 return err 537 } 538 cli.AllowedStreams = append(cli.AllowedStreams, did2) 539 a.Aliases["intermittent-self-test"] = did2 540 541 group.Go(func() error { 542 for { 543 // Start intermittent stream 544 intermittentCtx, cancel := context.WithCancel(ctx) 545 done := make(chan struct{}) 546 go func() { 547 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 548 close(done) 549 }() 550 // Stream ON for 15 seconds 551 time.Sleep(15 * time.Second) 552 // Stop stream 553 cancel() 554 <-done // Wait for TestSource to exit 555 // Stream OFF for 15 seconds 556 time.Sleep(15 * time.Second) 557 } 558 }) 559 } 560 561 for _, job := range platformJobs { 562 group.Go(func() error { 563 return job(ctx, &cli) 564 }) 565 } 566 567 if cli.WHIPTest != "" { 568 group.Go(func() error { 569 err := WHIP(strings.Split(cli.WHIPTest, " ")) 570 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 571 time.Sleep(time.Second * 3) 572 // gst.Deinit() 573 log.Warn(ctx, "gst deinit complete, exiting") 574 return err 575 }) 576 } 577 578 return group.Wait() 579} 580 581var ErrCaughtSignal = errors.New("caught signal") 582 583func handleSignals(ctx context.Context) error { 584 c := make(chan os.Signal, 1) 585 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 586 for { 587 select { 588 case s := <-c: 589 if s == syscall.SIGABRT { 590 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 591 log.Error(ctx, "failed to create pprof", "error", err) 592 } 593 } 594 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 595 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 596 case <-ctx.Done(): 597 return nil 598 } 599 } 600}