Live video on the AT Protocol
at eli/database-resync 495 lines 18 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/streamplace/oatproxy/pkg/oatproxy" 20 "golang.org/x/term" 21 "stream.place/streamplace/pkg/aqhttp" 22 "stream.place/streamplace/pkg/atproto" 23 "stream.place/streamplace/pkg/bus" 24 "stream.place/streamplace/pkg/crypto/signers" 25 "stream.place/streamplace/pkg/crypto/signers/eip712" 26 "stream.place/streamplace/pkg/director" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/media" 29 "stream.place/streamplace/pkg/notifications" 30 "stream.place/streamplace/pkg/replication" 31 "stream.place/streamplace/pkg/replication/boring" 32 "stream.place/streamplace/pkg/resync" 33 "stream.place/streamplace/pkg/rtmps" 34 v0 "stream.place/streamplace/pkg/schema/v0" 35 "stream.place/streamplace/pkg/spmetrics" 36 37 "github.com/ThalesGroup/crypto11" 38 _ "github.com/go-gst/go-glib/glib" 39 _ "github.com/go-gst/go-gst/gst" 40 "stream.place/streamplace/pkg/api" 41 "stream.place/streamplace/pkg/config" 42 "stream.place/streamplace/pkg/model" 43) 44 45// Additional jobs that can be injected by platforms 46type jobFunc func(ctx context.Context, cli *config.CLI) error 47 48// parse the CLI and fire up an streamplace node! 49func start(build *config.BuildFlags, platformJobs []jobFunc) error { 50 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 51 err := media.RunSelfTest(context.Background()) 52 if err != nil { 53 if selfTest { 54 fmt.Println(err.Error()) 55 os.Exit(1) 56 } else { 57 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 58 if retryCount >= 3 { 59 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 60 return err 61 } 62 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 63 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 64 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 65 if err != nil { 66 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 67 return err 68 } 69 panic("invalid code path: exec succeeded but we're still here???") 70 } 71 } 72 if selfTest { 73 runtime.GC() 74 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 75 log.Error(context.Background(), "error creating pprof", "error", err) 76 } 77 fmt.Println("self-test successful!") 78 os.Exit(0) 79 } 80 81 if len(os.Args) > 1 && os.Args[1] == "stream" { 82 if len(os.Args) != 3 { 83 fmt.Println("usage: streamplace stream [user]") 84 os.Exit(1) 85 } 86 return Stream(os.Args[2]) 87 } 88 89 if len(os.Args) > 1 && os.Args[1] == "live" { 90 if len(os.Args) != 3 { 91 fmt.Println("usage: streamplace live [stream-key]") 92 os.Exit(1) 93 } 94 return Live(os.Args[2]) 95 } 96 97 if len(os.Args) > 1 && os.Args[1] == "sign" { 98 return Sign(context.Background()) 99 } 100 101 if len(os.Args) > 1 && os.Args[1] == "whep" { 102 return WHEP(os.Args[2:]) 103 } 104 if len(os.Args) > 1 && os.Args[1] == "whip" { 105 return WHIP(os.Args[2:]) 106 } 107 108 if len(os.Args) > 1 && os.Args[1] == "self-test" { 109 err := media.RunSelfTest(context.Background()) 110 if err != nil { 111 fmt.Println(err.Error()) 112 os.Exit(1) 113 } 114 fmt.Println("self-test successful!") 115 os.Exit(0) 116 } 117 _ = flag.Set("logtostderr", "true") 118 vFlag := flag.Lookup("v") 119 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 120 cli := config.CLI{Build: build} 121 fs.StringVar(&cli.DataDir, "data-dir", config.DefaultDataDir(), "directory for keeping all streamplace data") 122 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 123 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 124 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 125 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 126 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 127 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 128 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 129 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 130 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 131 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 132 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 133 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 134 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 135 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 136 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 137 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 138 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 139 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 140 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 141 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 142 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 143 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 144 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 145 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 146 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 147 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 148 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 149 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 150 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 151 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 152 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 153 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 154 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 155 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 156 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 157 verbosity := fs.String("v", "3", "log verbosity level") 158 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 159 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 160 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 161 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 162 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 163 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 164 fs.BoolVar(&cli.ExternalSigning, "external-signing", false, "enable external signing via exec (prevents potential memory leak)") 165 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 166 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 167 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 168 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 169 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 170 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 171 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 172 version := fs.Bool("version", false, "print version and exit") 173 174 if runtime.GOOS == "linux" { 175 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 176 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 177 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 178 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 179 } 180 181 err = cli.Parse( 182 fs, os.Args[1:], 183 ) 184 if err != nil { 185 return err 186 } 187 err = flag.CommandLine.Parse(nil) 188 if err != nil { 189 return err 190 } 191 _ = vFlag.Value.Set(*verbosity) 192 log.SetColorLogger(cli.Color) 193 ctx := context.Background() 194 ctx = log.WithDebugValue(ctx, cli.Debug) 195 196 log.Log(ctx, 197 "streamplace", 198 "version", build.Version, 199 "buildTime", build.BuildTimeStr(), 200 "uuid", build.UUID, 201 "runtime.GOOS", runtime.GOOS, 202 "runtime.GOARCH", runtime.GOARCH, 203 "runtime.Version", runtime.Version()) 204 if *version { 205 return nil 206 } 207 spmetrics.Version.WithLabelValues(build.Version).Inc() 208 209 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 210 if len(os.Args) > 1 && os.Args[1] == "resync" { 211 return resync.Resync(ctx, &cli) 212 } 213 214 err = os.MkdirAll(cli.DataDir, os.ModePerm) 215 if err != nil { 216 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 217 } 218 schema, err := v0.MakeV0Schema() 219 if err != nil { 220 return err 221 } 222 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 223 Schema: schema, 224 EthKeystorePath: cli.EthKeystorePath, 225 EthAccountAddr: cli.EthAccountAddr, 226 EthKeystorePassword: cli.EthPassword, 227 }) 228 if err != nil { 229 return err 230 } 231 var signer crypto.Signer = eip712signer 232 if cli.PKCS11ModulePath != "" { 233 conf := &crypto11.Config{ 234 Path: cli.PKCS11ModulePath, 235 } 236 count := 0 237 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 238 if val != "" { 239 count += 1 240 } 241 } 242 if count != 1 { 243 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 244 } 245 if cli.PKCS11TokenSlot != "" { 246 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 247 if err != nil { 248 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 249 } 250 numint := int(num) 251 // why does crypto11 want this as a reference? odd. 252 conf.SlotNumber = &numint 253 } 254 if cli.PKCS11TokenLabel != "" { 255 conf.TokenLabel = cli.PKCS11TokenLabel 256 } 257 if cli.PKCS11TokenSerial != "" { 258 conf.TokenSerial = cli.PKCS11TokenSerial 259 } 260 pin := cli.PKCS11Pin 261 if pin == "" { 262 fmt.Printf("Please enter PKCS11 PIN: ") 263 password, err := term.ReadPassword(int(os.Stdin.Fd())) 264 fmt.Println("") 265 if err != nil { 266 return fmt.Errorf("error reading PKCS11 password: %w", err) 267 } 268 pin = string(password) 269 } 270 conf.Pin = pin 271 272 sc, err := crypto11.Configure(conf) 273 if err != nil { 274 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 275 } 276 var id []byte = nil 277 var label []byte = nil 278 if cli.PKCS11KeypairID != "" { 279 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 280 if err != nil { 281 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 282 } 283 id = []byte{byte(num)} 284 } 285 if cli.PKCS11KeypairLabel != "" { 286 label = []byte(cli.PKCS11KeypairLabel) 287 } 288 hwsigner, err := sc.FindKeyPair(id, label) 289 if err != nil { 290 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 291 } 292 if hwsigner == nil { 293 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 294 } 295 addr, err := signers.HexAddrFromSigner(hwsigner) 296 if err != nil { 297 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 298 } 299 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 300 signer = hwsigner 301 } 302 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 303 mod, err := model.MakeDB(cli.DBPath) 304 if err != nil { 305 return err 306 } 307 var noter notifications.FirebaseNotifier 308 if cli.FirebaseServiceAccount != "" { 309 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 310 if err != nil { 311 return err 312 } 313 } 314 315 jwkPath := cli.DataFilePath([]string{"jwk.json"}) 316 jwk, err := atproto.EnsureJWK(ctx, jwkPath) 317 if err != nil { 318 return err 319 } 320 cli.JWK = jwk 321 322 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"}) 323 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath) 324 if err != nil { 325 return err 326 } 327 cli.AccessJWK = accessJWK 328 329 b := bus.NewBus() 330 atsync := &atproto.ATProtoSynchronizer{ 331 CLI: &cli, 332 Model: mod, 333 Noter: noter, 334 Bus: b, 335 } 336 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 337 if err != nil { 338 return err 339 } 340 341 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 342 if err != nil { 343 return err 344 } 345 346 clientMetadata := &oatproxy.OAuthClientMetadata{ 347 Scope: "atproto transition:generic", 348 ClientName: "Streamplace", 349 RedirectURIs: []string{ 350 fmt.Sprintf("https://%s/login", cli.PublicHost), 351 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 352 }, 353 } 354 355 op := oatproxy.New(&oatproxy.Config{ 356 Host: cli.PublicHost, 357 CreateOAuthSession: mod.CreateOAuthSession, 358 UpdateOAuthSession: mod.UpdateOAuthSession, 359 GetOAuthSession: mod.LoadOAuthSession, 360 Scope: "atproto transition:generic", 361 UpstreamJWK: cli.JWK, 362 DownstreamJWK: cli.AccessJWK, 363 ClientMetadata: clientMetadata, 364 }) 365 d := director.NewDirector(mm, mod, &cli, b, op) 366 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op) 367 if err != nil { 368 return err 369 } 370 371 group, ctx := TimeoutGroupWithContext(ctx) 372 ctx = log.WithLogValues(ctx, "version", build.Version) 373 374 group.Go(func() error { 375 return handleSignals(ctx) 376 }) 377 378 if cli.TracingEndpoint != "" { 379 group.Go(func() error { 380 return startTelemetry(ctx, cli.TracingEndpoint) 381 }) 382 } 383 384 if cli.Secure { 385 group.Go(func() error { 386 return a.ServeHTTPS(ctx) 387 }) 388 group.Go(func() error { 389 return a.ServeHTTPRedirect(ctx) 390 }) 391 if cli.RTMPServerAddon != "" { 392 group.Go(func() error { 393 return rtmps.ServeRTMPS(ctx, &cli) 394 }) 395 } 396 } else { 397 group.Go(func() error { 398 return a.ServeHTTP(ctx) 399 }) 400 } 401 402 group.Go(func() error { 403 return a.ServeInternalHTTP(ctx) 404 }) 405 406 if !cli.NoFirehose { 407 group.Go(func() error { 408 return atsync.StartFirehose(ctx) 409 }) 410 } 411 412 group.Go(func() error { 413 return spmetrics.ExpireSessions(ctx) 414 }) 415 416 group.Go(func() error { 417 return mod.StartSegmentCleaner(ctx) 418 }) 419 420 group.Go(func() error { 421 return d.Start(ctx) 422 }) 423 424 if cli.TestStream { 425 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 426 Schema: schema, 427 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 428 }) 429 if err != nil { 430 return err 431 } 432 atkey, err := atproto.ParsePubKey(signer.Public()) 433 if err != nil { 434 return err 435 } 436 did := atkey.DIDKey() 437 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 438 if err != nil { 439 return err 440 } 441 err = mod.UpdateIdentity(&model.Identity{ 442 ID: testMediaSigner.Pub().String(), 443 Handle: "stream-self-tester", 444 DID: "", 445 }) 446 if err != nil { 447 return err 448 } 449 cli.AllowedStreams = append(cli.AllowedStreams, did) 450 a.Aliases["self-test"] = did 451 group.Go(func() error { 452 return mm.TestSource(ctx, testMediaSigner) 453 }) 454 } 455 456 for _, job := range platformJobs { 457 group.Go(func() error { 458 return job(ctx, &cli) 459 }) 460 } 461 462 if cli.WHIPTest != "" { 463 group.Go(func() error { 464 err := WHIP(strings.Split(cli.WHIPTest, " ")) 465 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 466 time.Sleep(time.Second * 3) 467 // gst.Deinit() 468 log.Warn(ctx, "gst deinit complete, exiting") 469 return err 470 }) 471 } 472 473 return group.Wait() 474} 475 476var ErrCaughtSignal = errors.New("caught signal") 477 478func handleSignals(ctx context.Context) error { 479 c := make(chan os.Signal, 1) 480 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 481 for { 482 select { 483 case s := <-c: 484 if s == syscall.SIGABRT { 485 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 486 log.Error(ctx, "failed to create pprof", "error", err) 487 } 488 } 489 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 490 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 491 case <-ctx.Done(): 492 return nil 493 } 494 } 495}