Live video on the AT Protocol
at natb/remove-lex-reference 501 lines 13 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/streamplace/oatproxy/pkg/oatproxy" 20 "golang.org/x/term" 21 "stream.place/streamplace/pkg/aqhttp" 22 "stream.place/streamplace/pkg/atproto" 23 "stream.place/streamplace/pkg/bus" 24 "stream.place/streamplace/pkg/crypto/signers" 25 "stream.place/streamplace/pkg/crypto/signers/eip712" 26 "stream.place/streamplace/pkg/director" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/media" 29 "stream.place/streamplace/pkg/notifications" 30 "stream.place/streamplace/pkg/replication" 31 "stream.place/streamplace/pkg/replication/boring" 32 "stream.place/streamplace/pkg/resync" 33 "stream.place/streamplace/pkg/rtmps" 34 v0 "stream.place/streamplace/pkg/schema/v0" 35 "stream.place/streamplace/pkg/spmetrics" 36 37 "github.com/ThalesGroup/crypto11" 38 _ "github.com/go-gst/go-glib/glib" 39 _ "github.com/go-gst/go-gst/gst" 40 "stream.place/streamplace/pkg/api" 41 "stream.place/streamplace/pkg/config" 42 "stream.place/streamplace/pkg/model" 43) 44 45// Additional jobs that can be injected by platforms 46type jobFunc func(ctx context.Context, cli *config.CLI) error 47 48// parse the CLI and fire up an streamplace node! 49func start(build *config.BuildFlags, platformJobs []jobFunc) error { 50 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 51 err := media.RunSelfTest(context.Background()) 52 if err != nil { 53 if selfTest { 54 fmt.Println(err.Error()) 55 os.Exit(1) 56 } else { 57 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 58 if retryCount >= 3 { 59 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 60 return err 61 } 62 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 63 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 64 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 65 if err != nil { 66 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 67 return err 68 } 69 panic("invalid code path: exec succeeded but we're still here???") 70 } 71 } 72 if selfTest { 73 runtime.GC() 74 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 75 log.Error(context.Background(), "error creating pprof", "error", err) 76 } 77 fmt.Println("self-test successful!") 78 os.Exit(0) 79 } 80 81 if len(os.Args) > 1 && os.Args[1] == "stream" { 82 if len(os.Args) != 3 { 83 fmt.Println("usage: streamplace stream [user]") 84 os.Exit(1) 85 } 86 return Stream(os.Args[2]) 87 } 88 89 if len(os.Args) > 1 && os.Args[1] == "live" { 90 cli := config.CLI{Build: build} 91 fs := cli.NewFlagSet("streamplace live") 92 93 err := cli.Parse(fs, os.Args[2:]) 94 if err != nil { 95 return err 96 } 97 98 args := fs.Args() 99 if len(args) != 1 { 100 fmt.Println("usage: streamplace live [flags] [stream-key]") 101 os.Exit(1) 102 } 103 104 return Live(args[0], cli.HTTPInternalAddr) 105 } 106 107 if len(os.Args) > 1 && os.Args[1] == "sign" { 108 return Sign(context.Background()) 109 } 110 111 if len(os.Args) > 1 && os.Args[1] == "whep" { 112 return WHEP(os.Args[2:]) 113 } 114 if len(os.Args) > 1 && os.Args[1] == "whip" { 115 return WHIP(os.Args[2:]) 116 } 117 118 if len(os.Args) > 1 && os.Args[1] == "self-test" { 119 err := media.RunSelfTest(context.Background()) 120 if err != nil { 121 fmt.Println(err.Error()) 122 os.Exit(1) 123 } 124 fmt.Println("self-test successful!") 125 os.Exit(0) 126 } 127 _ = flag.Set("logtostderr", "true") 128 vFlag := flag.Lookup("v") 129 cli := config.CLI{Build: build} 130 fs := cli.NewFlagSet("streamplace") 131 verbosity := fs.String("v", "3", "log verbosity level") 132 version := fs.Bool("version", false, "print version and exit") 133 134 err = cli.Parse( 135 fs, os.Args[1:], 136 ) 137 if err != nil { 138 return err 139 } 140 err = flag.CommandLine.Parse(nil) 141 if err != nil { 142 return err 143 } 144 _ = vFlag.Value.Set(*verbosity) 145 log.SetColorLogger(cli.Color) 146 ctx := context.Background() 147 ctx = log.WithDebugValue(ctx, cli.Debug) 148 149 log.Log(ctx, 150 "streamplace", 151 "version", build.Version, 152 "buildTime", build.BuildTimeStr(), 153 "uuid", build.UUID, 154 "runtime.GOOS", runtime.GOOS, 155 "runtime.GOARCH", runtime.GOARCH, 156 "runtime.Version", runtime.Version()) 157 if *version { 158 return nil 159 } 160 spmetrics.Version.WithLabelValues(build.Version).Inc() 161 162 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 163 if len(os.Args) > 1 && os.Args[1] == "resync" { 164 return resync.Resync(ctx, &cli) 165 } 166 167 err = os.MkdirAll(cli.DataDir, os.ModePerm) 168 if err != nil { 169 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 170 } 171 schema, err := v0.MakeV0Schema() 172 if err != nil { 173 return err 174 } 175 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 176 Schema: schema, 177 EthKeystorePath: cli.EthKeystorePath, 178 EthAccountAddr: cli.EthAccountAddr, 179 EthKeystorePassword: cli.EthPassword, 180 }) 181 if err != nil { 182 return err 183 } 184 var signer crypto.Signer = eip712signer 185 if cli.PKCS11ModulePath != "" { 186 conf := &crypto11.Config{ 187 Path: cli.PKCS11ModulePath, 188 } 189 count := 0 190 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 191 if val != "" { 192 count += 1 193 } 194 } 195 if count != 1 { 196 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 197 } 198 if cli.PKCS11TokenSlot != "" { 199 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 200 if err != nil { 201 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 202 } 203 numint := int(num) 204 // why does crypto11 want this as a reference? odd. 205 conf.SlotNumber = &numint 206 } 207 if cli.PKCS11TokenLabel != "" { 208 conf.TokenLabel = cli.PKCS11TokenLabel 209 } 210 if cli.PKCS11TokenSerial != "" { 211 conf.TokenSerial = cli.PKCS11TokenSerial 212 } 213 pin := cli.PKCS11Pin 214 if pin == "" { 215 fmt.Printf("Please enter PKCS11 PIN: ") 216 password, err := term.ReadPassword(int(os.Stdin.Fd())) 217 fmt.Println("") 218 if err != nil { 219 return fmt.Errorf("error reading PKCS11 password: %w", err) 220 } 221 pin = string(password) 222 } 223 conf.Pin = pin 224 225 sc, err := crypto11.Configure(conf) 226 if err != nil { 227 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 228 } 229 var id []byte = nil 230 var label []byte = nil 231 if cli.PKCS11KeypairID != "" { 232 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 233 if err != nil { 234 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 235 } 236 id = []byte{byte(num)} 237 } 238 if cli.PKCS11KeypairLabel != "" { 239 label = []byte(cli.PKCS11KeypairLabel) 240 } 241 hwsigner, err := sc.FindKeyPair(id, label) 242 if err != nil { 243 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 244 } 245 if hwsigner == nil { 246 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 247 } 248 addr, err := signers.HexAddrFromSigner(hwsigner) 249 if err != nil { 250 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 251 } 252 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 253 signer = hwsigner 254 } 255 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 256 mod, err := model.MakeDB(cli.DBPath) 257 if err != nil { 258 return err 259 } 260 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod) 261 if err != nil { 262 return err 263 } 264 defer handle.Close() 265 var noter notifications.FirebaseNotifier 266 if cli.FirebaseServiceAccount != "" { 267 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 268 if err != nil { 269 return err 270 } 271 } 272 273 jwkPath := cli.DataFilePath([]string{"jwk.json"}) 274 jwk, err := atproto.EnsureJWK(ctx, jwkPath) 275 if err != nil { 276 return err 277 } 278 cli.JWK = jwk 279 280 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"}) 281 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath) 282 if err != nil { 283 return err 284 } 285 cli.AccessJWK = accessJWK 286 287 b := bus.NewBus() 288 atsync := &atproto.ATProtoSynchronizer{ 289 CLI: &cli, 290 Model: mod, 291 Noter: noter, 292 Bus: b, 293 } 294 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 295 if err != nil { 296 return err 297 } 298 299 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 300 if err != nil { 301 return err 302 } 303 304 clientMetadata := &oatproxy.OAuthClientMetadata{ 305 Scope: "atproto transition:generic", 306 ClientName: "Streamplace", 307 RedirectURIs: []string{ 308 fmt.Sprintf("https://%s/login", cli.PublicHost), 309 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 310 }, 311 } 312 313 op := oatproxy.New(&oatproxy.Config{ 314 Host: cli.PublicHost, 315 CreateOAuthSession: mod.CreateOAuthSession, 316 UpdateOAuthSession: mod.UpdateOAuthSession, 317 GetOAuthSession: mod.LoadOAuthSession, 318 Scope: "atproto transition:generic", 319 UpstreamJWK: cli.JWK, 320 DownstreamJWK: cli.AccessJWK, 321 ClientMetadata: clientMetadata, 322 }) 323 d := director.NewDirector(mm, mod, &cli, b, op) 324 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op) 325 if err != nil { 326 return err 327 } 328 329 group, ctx := TimeoutGroupWithContext(ctx) 330 ctx = log.WithLogValues(ctx, "version", build.Version) 331 332 group.Go(func() error { 333 return handleSignals(ctx) 334 }) 335 336 if cli.TracingEndpoint != "" { 337 group.Go(func() error { 338 return startTelemetry(ctx, cli.TracingEndpoint) 339 }) 340 } 341 342 if cli.Secure { 343 group.Go(func() error { 344 return a.ServeHTTPS(ctx) 345 }) 346 group.Go(func() error { 347 return a.ServeHTTPRedirect(ctx) 348 }) 349 if cli.RTMPServerAddon != "" { 350 group.Go(func() error { 351 return rtmps.ServeRTMPS(ctx, &cli) 352 }) 353 } 354 } else { 355 group.Go(func() error { 356 return a.ServeHTTP(ctx) 357 }) 358 } 359 360 group.Go(func() error { 361 return a.ServeInternalHTTP(ctx) 362 }) 363 364 if !cli.NoFirehose { 365 group.Go(func() error { 366 return atsync.StartFirehose(ctx) 367 }) 368 } 369 370 group.Go(func() error { 371 return spmetrics.ExpireSessions(ctx) 372 }) 373 374 group.Go(func() error { 375 return mod.StartSegmentCleaner(ctx) 376 }) 377 378 group.Go(func() error { 379 return d.Start(ctx) 380 }) 381 382 if cli.TestStream { 383 // regular stream self-test 384 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 385 Schema: schema, 386 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 387 }) 388 if err != nil { 389 return err 390 } 391 atkey, err := atproto.ParsePubKey(signer.Public()) 392 if err != nil { 393 return err 394 } 395 did := atkey.DIDKey() 396 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 397 if err != nil { 398 return err 399 } 400 err = mod.UpdateIdentity(&model.Identity{ 401 ID: testMediaSigner.Pub().String(), 402 Handle: "stream-self-tester", 403 DID: "", 404 }) 405 if err != nil { 406 return err 407 } 408 cli.AllowedStreams = append(cli.AllowedStreams, did) 409 a.Aliases["self-test"] = did 410 group.Go(func() error { 411 return mm.TestSource(ctx, testMediaSigner) 412 }) 413 414 // Start a test stream that will run intermittently 415 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 416 Schema: schema, 417 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"), 418 }) 419 if err != nil { 420 return err 421 } 422 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public()) 423 if err != nil { 424 return err 425 } 426 did2 := atkey2.DIDKey() 427 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner) 428 if err != nil { 429 return err 430 } 431 err = mod.UpdateIdentity(&model.Identity{ 432 ID: intermittentMediaSigner.Pub().String(), 433 Handle: "stream-intermittent-tester", 434 DID: "", 435 }) 436 if err != nil { 437 return err 438 } 439 cli.AllowedStreams = append(cli.AllowedStreams, did2) 440 a.Aliases["intermittent-self-test"] = did2 441 442 group.Go(func() error { 443 for { 444 // Start intermittent stream 445 intermittentCtx, cancel := context.WithCancel(ctx) 446 done := make(chan struct{}) 447 go func() { 448 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner) 449 close(done) 450 }() 451 // Stream ON for 15 seconds 452 time.Sleep(15 * time.Second) 453 // Stop stream 454 cancel() 455 <-done // Wait for TestSource to exit 456 // Stream OFF for 15 seconds 457 time.Sleep(15 * time.Second) 458 } 459 }) 460 } 461 462 for _, job := range platformJobs { 463 group.Go(func() error { 464 return job(ctx, &cli) 465 }) 466 } 467 468 if cli.WHIPTest != "" { 469 group.Go(func() error { 470 err := WHIP(strings.Split(cli.WHIPTest, " ")) 471 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 472 time.Sleep(time.Second * 3) 473 // gst.Deinit() 474 log.Warn(ctx, "gst deinit complete, exiting") 475 return err 476 }) 477 } 478 479 return group.Wait() 480} 481 482var ErrCaughtSignal = errors.New("caught signal") 483 484func handleSignals(ctx context.Context) error { 485 c := make(chan os.Signal, 1) 486 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 487 for { 488 select { 489 case s := <-c: 490 if s == syscall.SIGABRT { 491 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 492 log.Error(ctx, "failed to create pprof", "error", err) 493 } 494 } 495 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 496 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 497 case <-ctx.Done(): 498 return nil 499 } 500 } 501}