Live video on the AT Protocol
at eli/sync-tangled 432 lines 15 kB view raw
1package cmd 2 3import ( 4 "context" 5 "crypto" 6 "errors" 7 "flag" 8 "fmt" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "strconv" 15 "strings" 16 "syscall" 17 "time" 18 19 "golang.org/x/term" 20 "stream.place/streamplace/pkg/aqhttp" 21 "stream.place/streamplace/pkg/atproto" 22 "stream.place/streamplace/pkg/bus" 23 "stream.place/streamplace/pkg/crypto/signers" 24 "stream.place/streamplace/pkg/crypto/signers/eip712" 25 "stream.place/streamplace/pkg/director" 26 "stream.place/streamplace/pkg/log" 27 "stream.place/streamplace/pkg/media" 28 "stream.place/streamplace/pkg/notifications" 29 "stream.place/streamplace/pkg/replication" 30 "stream.place/streamplace/pkg/replication/boring" 31 v0 "stream.place/streamplace/pkg/schema/v0" 32 "stream.place/streamplace/pkg/spmetrics" 33 34 "github.com/ThalesGroup/crypto11" 35 _ "github.com/go-gst/go-glib/glib" 36 _ "github.com/go-gst/go-gst/gst" 37 "stream.place/streamplace/pkg/api" 38 "stream.place/streamplace/pkg/config" 39 "stream.place/streamplace/pkg/model" 40) 41 42// Additional jobs that can be injected by platforms 43type jobFunc func(ctx context.Context, cli *config.CLI) error 44 45// parse the CLI and fire up an streamplace node! 46func start(build *config.BuildFlags, platformJobs []jobFunc) error { 47 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test" 48 err := media.RunSelfTest(context.Background()) 49 if err != nil { 50 if selfTest { 51 fmt.Println(err.Error()) 52 os.Exit(1) 53 } else { 54 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY")) 55 if retryCount >= 3 { 56 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err) 57 return err 58 } 59 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1) 60 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1)) 61 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ()) 62 if err != nil { 63 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err) 64 return err 65 } 66 panic("invalid code path: exec succeeded but we're still here???") 67 } 68 } 69 if selfTest { 70 runtime.GC() 71 pprof.Lookup("goroutine").WriteTo(os.Stderr, 2) 72 fmt.Println("self-test successful!") 73 os.Exit(0) 74 } 75 76 if len(os.Args) > 1 && os.Args[1] == "stream" { 77 if len(os.Args) != 3 { 78 fmt.Println("usage: streamplace stream [user]") 79 os.Exit(1) 80 } 81 return Stream(os.Args[2]) 82 } 83 84 if len(os.Args) > 1 && os.Args[1] == "sign" { 85 return Sign(context.Background()) 86 } 87 88 if len(os.Args) > 1 && os.Args[1] == "whep" { 89 return WHEP(os.Args[2:]) 90 } 91 if len(os.Args) > 1 && os.Args[1] == "whip" { 92 return WHIP(os.Args[2:]) 93 } 94 95 if len(os.Args) > 1 && os.Args[1] == "self-test" { 96 err := media.RunSelfTest(context.Background()) 97 if err != nil { 98 fmt.Println(err.Error()) 99 os.Exit(1) 100 } 101 fmt.Println("self-test successful!") 102 os.Exit(0) 103 } 104 flag.Set("logtostderr", "true") 105 vFlag := flag.Lookup("v") 106 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 107 cli := config.CLI{Build: build} 108 fs.StringVar(&cli.DataDir, "data-dir", config.DefaultDataDir(), "directory for keeping all streamplace data") 109 fs.StringVar(&cli.HttpAddr, "http-addr", ":38080", "Public HTTP address") 110 fs.StringVar(&cli.HttpInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 111 fs.StringVar(&cli.HttpsAddr, "https-addr", ":38443", "Public HTTPS address") 112 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 113 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 114 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 115 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 116 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 117 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 118 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 119 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 120 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 121 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 122 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 123 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 124 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 125 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 126 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 127 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 128 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 129 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 130 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 131 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 132 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 133 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 134 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 135 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 136 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 137 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 138 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 139 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 140 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 141 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 142 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 143 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 144 verbosity := fs.String("v", "3", "log verbosity level") 145 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 146 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 147 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 148 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 149 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 150 fs.BoolVar(&cli.ExternalSigning, "external-signing", false, "enable external signing via exec (prevents potential memory leak)") 151 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 152 version := fs.Bool("version", false, "print version and exit") 153 154 if runtime.GOOS == "linux" { 155 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 156 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 157 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 158 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 159 } 160 161 err = cli.Parse( 162 fs, os.Args[1:], 163 ) 164 if err != nil { 165 return err 166 } 167 err = flag.CommandLine.Parse(nil) 168 if err != nil { 169 return err 170 } 171 vFlag.Value.Set(*verbosity) 172 log.SetColorLogger(cli.Color) 173 ctx := context.Background() 174 ctx = log.WithDebugValue(ctx, cli.Debug) 175 176 log.Log(ctx, 177 "streamplace", 178 "version", build.Version, 179 "buildTime", build.BuildTimeStr(), 180 "uuid", build.UUID, 181 "runtime.GOOS", runtime.GOOS, 182 "runtime.GOARCH", runtime.GOARCH, 183 "runtime.Version", runtime.Version()) 184 if *version { 185 return nil 186 } 187 spmetrics.Version.WithLabelValues(build.Version).Inc() 188 189 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 190 191 err = os.MkdirAll(cli.DataDir, os.ModePerm) 192 if err != nil { 193 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 194 } 195 schema, err := v0.MakeV0Schema() 196 if err != nil { 197 return err 198 } 199 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 200 Schema: schema, 201 EthKeystorePath: cli.EthKeystorePath, 202 EthAccountAddr: cli.EthAccountAddr, 203 EthKeystorePassword: cli.EthPassword, 204 }) 205 if err != nil { 206 return err 207 } 208 var signer crypto.Signer = eip712signer 209 if cli.PKCS11ModulePath != "" { 210 conf := &crypto11.Config{ 211 Path: cli.PKCS11ModulePath, 212 } 213 count := 0 214 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} { 215 if val != "" { 216 count += 1 217 } 218 } 219 if count != 1 { 220 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count) 221 } 222 if cli.PKCS11TokenSlot != "" { 223 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16) 224 if err != nil { 225 return fmt.Errorf("error parsing pkcs11-slot: %w", err) 226 } 227 numint := int(num) 228 // why does crypto11 want this as a reference? odd. 229 conf.SlotNumber = &numint 230 } 231 if cli.PKCS11TokenLabel != "" { 232 conf.TokenLabel = cli.PKCS11TokenLabel 233 } 234 if cli.PKCS11TokenSerial != "" { 235 conf.TokenSerial = cli.PKCS11TokenSerial 236 } 237 pin := cli.PKCS11Pin 238 if pin == "" { 239 fmt.Printf("Please enter PKCS11 PIN: ") 240 password, err := term.ReadPassword(int(os.Stdin.Fd())) 241 fmt.Println("") 242 if err != nil { 243 return fmt.Errorf("error reading PKCS11 password: %w", err) 244 } 245 pin = string(password) 246 } 247 conf.Pin = pin 248 249 sc, err := crypto11.Configure(conf) 250 if err != nil { 251 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err) 252 } 253 var id []byte = nil 254 var label []byte = nil 255 if cli.PKCS11KeypairID != "" { 256 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8) 257 if err != nil { 258 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err) 259 } 260 id = []byte{byte(num)} 261 } 262 if cli.PKCS11KeypairLabel != "" { 263 label = []byte(cli.PKCS11KeypairLabel) 264 } 265 hwsigner, err := sc.FindKeyPair(id, label) 266 if err != nil { 267 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err) 268 } 269 if hwsigner == nil { 270 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel) 271 } 272 addr, err := signers.HexAddrFromSigner(hwsigner) 273 if err != nil { 274 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err) 275 } 276 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 277 signer = hwsigner 278 } 279 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 280 mod, err := model.MakeDB(cli.DBPath) 281 if err != nil { 282 return err 283 } 284 var noter notifications.FirebaseNotifier 285 if cli.FirebaseServiceAccount != "" { 286 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) 287 if err != nil { 288 return err 289 } 290 } 291 b := bus.NewBus() 292 atsync := &atproto.ATProtoSynchronizer{ 293 CLI: &cli, 294 Model: mod, 295 Noter: noter, 296 Bus: b, 297 } 298 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 299 if err != nil { 300 return err 301 } 302 303 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer) 304 if err != nil { 305 return err 306 } 307 308 d := director.NewDirector(mm, mod, &cli, b) 309 310 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d) 311 if err != nil { 312 return err 313 } 314 315 group, ctx := TimeoutGroupWithContext(ctx) 316 ctx = log.WithLogValues(ctx, "version", build.Version) 317 318 group.Go(func() error { 319 return handleSignals(ctx) 320 }) 321 322 if cli.TracingEndpoint != "" { 323 group.Go(func() error { 324 return startTelemetry(ctx, cli.TracingEndpoint) 325 }) 326 } 327 328 if cli.Secure { 329 group.Go(func() error { 330 return a.ServeHTTPS(ctx) 331 }) 332 group.Go(func() error { 333 return a.ServeHTTPRedirect(ctx) 334 }) 335 } else { 336 group.Go(func() error { 337 return a.ServeHTTP(ctx) 338 }) 339 } 340 341 group.Go(func() error { 342 return a.ServeInternalHTTP(ctx) 343 }) 344 345 if !cli.NoFirehose { 346 group.Go(func() error { 347 return atsync.StartFirehose(ctx) 348 }) 349 } 350 351 group.Go(func() error { 352 return spmetrics.ExpireSessions(ctx) 353 }) 354 355 group.Go(func() error { 356 return mod.StartSegmentCleaner(ctx) 357 }) 358 359 group.Go(func() error { 360 return d.Start(ctx) 361 }) 362 363 if cli.TestStream { 364 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{ 365 Schema: schema, 366 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"), 367 }) 368 if err != nil { 369 return err 370 } 371 atkey, err := atproto.ParsePubKey(signer.Public()) 372 if err != nil { 373 return err 374 } 375 did := atkey.DIDKey() 376 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner) 377 if err != nil { 378 return err 379 } 380 err = mod.UpdateIdentity(&model.Identity{ 381 ID: testMediaSigner.Pub().String(), 382 Handle: "stream-self-tester", 383 DID: "", 384 }) 385 if err != nil { 386 return err 387 } 388 cli.AllowedStreams = append(cli.AllowedStreams, did) 389 a.Aliases["self-test"] = did 390 group.Go(func() error { 391 return mm.TestSource(ctx, testMediaSigner) 392 }) 393 } 394 395 for _, job := range platformJobs { 396 group.Go(func() error { 397 return job(ctx, &cli) 398 }) 399 } 400 401 if cli.WHIPTest != "" { 402 group.Go(func() error { 403 err := WHIP(strings.Split(cli.WHIPTest, " ")) 404 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 405 time.Sleep(time.Second * 3) 406 // gst.Deinit() 407 log.Warn(ctx, "gst deinit complete, exiting") 408 return err 409 }) 410 } 411 412 return group.Wait() 413} 414 415var ErrCaughtSignal = errors.New("caught signal") 416 417func handleSignals(ctx context.Context) error { 418 c := make(chan os.Signal, 1) 419 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 420 for { 421 select { 422 case s := <-c: 423 if s == syscall.SIGABRT { 424 pprof.Lookup("goroutine").WriteTo(os.Stderr, 2) 425 } 426 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 427 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s) 428 case <-ctx.Done(): 429 return nil 430 } 431 } 432}