Live video on the AT Protocol
at eli/fix-type-export 438 lines 12 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "github.com/streamplace/oatproxy/pkg/oatproxy" 20 "golang.org/x/term" 21 "stream.place/streamplace/pkg/aqhttp" 22 "stream.place/streamplace/pkg/atproto" 23 "stream.place/streamplace/pkg/bus" 24 "stream.place/streamplace/pkg/crypto/signers" 25 "stream.place/streamplace/pkg/crypto/signers/eip712" 26 "stream.place/streamplace/pkg/director" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/media" 29 "stream.place/streamplace/pkg/notifications" 30 "stream.place/streamplace/pkg/replication" 31 "stream.place/streamplace/pkg/replication/boring" 32 "stream.place/streamplace/pkg/resync" 33 "stream.place/streamplace/pkg/rtmps" 34 v0 "stream.place/streamplace/pkg/schema/v0" 35 "stream.place/streamplace/pkg/spmetrics" 36 37 "github.com/ThalesGroup/crypto11" 38 _ "github.com/go-gst/go-glib/glib" 39 _ "github.com/go-gst/go-gst/gst" 40 "stream.place/streamplace/pkg/api" 41 "stream.place/streamplace/pkg/config" 42 "stream.place/streamplace/pkg/model" 43) 44 45// Additional jobs that can be injected by platforms 46type jobFunc func(ctx context.Context, cli *config.CLI) error 47 48// parse the CLI and fire up an streamplace node! 49func start(build *config.BuildFlags, platformJobs []jobFunc) error { 50 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 51 err := media.RunSelfTest(context.Background()) 52 if err != nil { 53 if selfTest { 54 fmt.Println(err.Error()) 55 os.Exit(1) 56 } else { 57 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 58 if retryCount >= 3 { 59 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 60 return err 61 } 62 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 63 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 64 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 65 if err != nil { 66 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 67 return err 68 } 69 panic("invalid code path: exec succeeded but we're still here???") 70 } 71 } 72 if selfTest { 73 runtime.GC() 74 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 75 log.Error(context.Background(), "error creating pprof", "error", err) 76 } 77 fmt.Println("self-test successful!") 78 os.Exit(0) 79 } 80 81 if len(os.Args) > 1 && os.Args[1] == "stream" { 82 if len(os.Args) != 3 { 83 fmt.Println("usage: streamplace stream [user]") 84 os.Exit(1) 85 } 86 return Stream(os.Args[2]) 87 } 88 89 if len(os.Args) > 1 && os.Args[1] == "live" { 90 if len(os.Args) != 3 { 91 fmt.Println("usage: streamplace live [stream-key]") 92 os.Exit(1) 93 } 94 return Live(os.Args[2]) 95 } 96 97 if len(os.Args) > 1 && os.Args[1] == "sign" { 98 return Sign(context.Background()) 99 } 100 101 if len(os.Args) > 1 && os.Args[1] == "whep" { 102 return WHEP(os.Args[2:]) 103 } 104 if len(os.Args) > 1 && os.Args[1] == "whip" { 105 return WHIP(os.Args[2:]) 106 } 107 108 if len(os.Args) > 1 && os.Args[1] == "self-test" { 109 err := media.RunSelfTest(context.Background()) 110 if err != nil { 111 fmt.Println(err.Error()) 112 os.Exit(1) 113 } 114 fmt.Println("self-test successful!") 115 os.Exit(0) 116 } 117 _ = flag.Set("logtostderr", "true") 118 vFlag := flag.Lookup("v") 119 cli := config.CLI{Build: build} 120 fs := cli.NewFlagSet("streamplace") 121 verbosity := fs.String("v", "3", "log verbosity level") 122 version := fs.Bool("version", false, "print version and exit") 123 124 err = cli.Parse( 125 fs, os.Args[1:], 126 ) 127 if err != nil { 128 return err 129 } 130 err = flag.CommandLine.Parse(nil) 131 if err != nil { 132 return err 133 } 134 _ = vFlag.Value.Set(*verbosity) 135 log.SetColorLogger(cli.Color) 136 ctx := context.Background() 137 ctx = log.WithDebugValue(ctx, cli.Debug) 138 139 log.Log(ctx, 140 "streamplace", 141 "version", build.Version, 142 "buildTime", build.BuildTimeStr(), 143 "uuid", build.UUID, 144 "runtime.GOOS", runtime.GOOS, 145 "runtime.GOARCH", runtime.GOARCH, 146 "runtime.Version", runtime.Version()) 147 if *version { 148 return nil 149 } 150 spmetrics.Version.WithLabelValues(build.Version).Inc() 151 152 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 153 if len(os.Args) > 1 && os.Args[1] == "resync" { 154 return resync.Resync(ctx, &cli) 155 } 156 157 err = os.MkdirAll(cli.DataDir, os.ModePerm) 158 if err != nil { 159 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 160 } 161 schema, err := v0.MakeV0Schema() 162 if err != nil { 163 return err 164 } 165 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 166 Schema: schema, 167 EthKeystorePath: cli.EthKeystorePath, 168 EthAccountAddr: cli.EthAccountAddr, 169 EthKeystorePassword: cli.EthPassword, 170 }) 171 if err != nil { 172 return err 173 } 174 var signer crypto.Signer = eip712signer 175 if cli.PKCS11ModulePath != "" { 176 conf := &crypto11.Config{ 177 Path: cli.PKCS11ModulePath, 178 } 179 count := 0 180 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 181 if val != "" { 182 count += 1 183 } 184 } 185 if count != 1 { 186 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 187 } 188 if cli.PKCS11TokenSlot != "" { 189 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 190 if err != nil { 191 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 192 } 193 numint := int(num) 194 // why does crypto11 want this as a reference? odd. 195 conf.SlotNumber = &numint 196 } 197 if cli.PKCS11TokenLabel != "" { 198 conf.TokenLabel = cli.PKCS11TokenLabel 199 } 200 if cli.PKCS11TokenSerial != "" { 201 conf.TokenSerial = cli.PKCS11TokenSerial 202 } 203 pin := cli.PKCS11Pin 204 if pin == "" { 205 fmt.Printf("Please enter PKCS11 PIN: ") 206 password, err := term.ReadPassword(int(os.Stdin.Fd())) 207 fmt.Println("") 208 if err != nil { 209 return fmt.Errorf("error reading PKCS11 password: %w", err) 210 } 211 pin = string(password) 212 } 213 conf.Pin = pin 214 215 sc, err := crypto11.Configure(conf) 216 if err != nil { 217 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 218 } 219 var id []byte = nil 220 var label []byte = nil 221 if cli.PKCS11KeypairID != "" { 222 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 223 if err != nil { 224 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 225 } 226 id = []byte{byte(num)} 227 } 228 if cli.PKCS11KeypairLabel != "" { 229 label = []byte(cli.PKCS11KeypairLabel) 230 } 231 hwsigner, err := sc.FindKeyPair(id, label) 232 if err != nil { 233 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 234 } 235 if hwsigner == nil { 236 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 237 } 238 addr, err := signers.HexAddrFromSigner(hwsigner) 239 if err != nil { 240 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 241 } 242 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 243 signer = hwsigner 244 } 245 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 246 mod, err := model.MakeDB(cli.DBPath) 247 if err != nil { 248 return err 249 } 250 var noter notifications.FirebaseNotifier 251 if cli.FirebaseServiceAccount != "" { 252 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 253 if err != nil { 254 return err 255 } 256 } 257 258 jwkPath := cli.DataFilePath([]string{"jwk.json"}) 259 jwk, err := atproto.EnsureJWK(ctx, jwkPath) 260 if err != nil { 261 return err 262 } 263 cli.JWK = jwk 264 265 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"}) 266 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath) 267 if err != nil { 268 return err 269 } 270 cli.AccessJWK = accessJWK 271 272 b := bus.NewBus() 273 atsync := &atproto.ATProtoSynchronizer{ 274 CLI: &cli, 275 Model: mod, 276 Noter: noter, 277 Bus: b, 278 } 279 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 280 if err != nil { 281 return err 282 } 283 284 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 285 if err != nil { 286 return err 287 } 288 289 clientMetadata := &oatproxy.OAuthClientMetadata{ 290 Scope: "atproto transition:generic", 291 ClientName: "Streamplace", 292 RedirectURIs: []string{ 293 fmt.Sprintf("https://%s/login", cli.PublicHost), 294 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost), 295 }, 296 } 297 298 op := oatproxy.New(&oatproxy.Config{ 299 Host: cli.PublicHost, 300 CreateOAuthSession: mod.CreateOAuthSession, 301 UpdateOAuthSession: mod.UpdateOAuthSession, 302 GetOAuthSession: mod.LoadOAuthSession, 303 Scope: "atproto transition:generic", 304 UpstreamJWK: cli.JWK, 305 DownstreamJWK: cli.AccessJWK, 306 ClientMetadata: clientMetadata, 307 }) 308 d := director.NewDirector(mm, mod, &cli, b, op) 309 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op) 310 if err != nil { 311 return err 312 } 313 314 group, ctx := TimeoutGroupWithContext(ctx) 315 ctx = log.WithLogValues(ctx, "version", build.Version) 316 317 group.Go(func() error { 318 return handleSignals(ctx) 319 }) 320 321 if cli.TracingEndpoint != "" { 322 group.Go(func() error { 323 return startTelemetry(ctx, cli.TracingEndpoint) 324 }) 325 } 326 327 if cli.Secure { 328 group.Go(func() error { 329 return a.ServeHTTPS(ctx) 330 }) 331 group.Go(func() error { 332 return a.ServeHTTPRedirect(ctx) 333 }) 334 if cli.RTMPServerAddon != "" { 335 group.Go(func() error { 336 return rtmps.ServeRTMPS(ctx, &cli) 337 }) 338 } 339 } else { 340 group.Go(func() error { 341 return a.ServeHTTP(ctx) 342 }) 343 } 344 345 group.Go(func() error { 346 return a.ServeInternalHTTP(ctx) 347 }) 348 349 if !cli.NoFirehose { 350 group.Go(func() error { 351 return atsync.StartFirehose(ctx) 352 }) 353 } 354 355 group.Go(func() error { 356 return spmetrics.ExpireSessions(ctx) 357 }) 358 359 group.Go(func() error { 360 return mod.StartSegmentCleaner(ctx) 361 }) 362 363 group.Go(func() error { 364 return d.Start(ctx) 365 }) 366 367 if cli.TestStream { 368 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 369 Schema: schema, 370 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 371 }) 372 if err != nil { 373 return err 374 } 375 atkey, err := atproto.ParsePubKey(signer.Public()) 376 if err != nil { 377 return err 378 } 379 did := atkey.DIDKey() 380 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 381 if err != nil { 382 return err 383 } 384 err = mod.UpdateIdentity(&model.Identity{ 385 ID: testMediaSigner.Pub().String(), 386 Handle: "stream-self-tester", 387 DID: "", 388 }) 389 if err != nil { 390 return err 391 } 392 cli.AllowedStreams = append(cli.AllowedStreams, did) 393 a.Aliases["self-test"] = did 394 group.Go(func() error { 395 return mm.TestSource(ctx, testMediaSigner) 396 }) 397 } 398 399 for _, job := range platformJobs { 400 group.Go(func() error { 401 return job(ctx, &cli) 402 }) 403 } 404 405 if cli.WHIPTest != "" { 406 group.Go(func() error { 407 err := WHIP(strings.Split(cli.WHIPTest, " ")) 408 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 409 time.Sleep(time.Second * 3) 410 // gst.Deinit() 411 log.Warn(ctx, "gst deinit complete, exiting") 412 return err 413 }) 414 } 415 416 return group.Wait() 417} 418 419var ErrCaughtSignal = errors.New("caught signal") 420 421func handleSignals(ctx context.Context) error { 422 c := make(chan os.Signal, 1) 423 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 424 for { 425 select { 426 case s := <-c: 427 if s == syscall.SIGABRT { 428 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 429 log.Error(ctx, "failed to create pprof", "error", err) 430 } 431 } 432 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 433 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 434 case <-ctx.Done(): 435 return nil 436 } 437 } 438}