Live video on the AT Protocol
at eli/bump-xcode 501 lines 18 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "math/rand/v2" 21 22 "github.com/lestrrat-go/jwx/v2/jwk" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/peterbourgon/ff/v3" 25 "stream.place/streamplace/pkg/aqtime" 26 "stream.place/streamplace/pkg/constants" 27 "stream.place/streamplace/pkg/crypto/aqpub" 28 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 29) 30 31const SPDataDir = "$SP_DATA_DIR" 32const SegmentsDir = "segments" 33 34type BuildFlags struct { 35 Version string 36 BuildTime int64 37 UUID string 38} 39 40func (b BuildFlags) BuildTimeStr() string { 41 ts := time.Unix(b.BuildTime, 0) 42 return ts.UTC().Format(time.RFC3339) 43} 44 45func (b BuildFlags) BuildTimeStrExpo() string { 46 ts := time.Unix(b.BuildTime, 0) 47 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 48} 49 50type CLI struct { 51 AdminAccount string 52 Build *BuildFlags 53 DataDir string 54 DBPath string 55 EthAccountAddr string 56 EthKeystorePath string 57 EthPassword string 58 FirebaseServiceAccount string 59 GitLabURL string 60 HTTPAddr string 61 HTTPInternalAddr string 62 HTTPSAddr string 63 RtmpsAddr string 64 Secure bool 65 NoMist bool 66 MistAdminPort int 67 MistHTTPPort int 68 MistRTMPPort int 69 SigningKeyPath string 70 TAURL string 71 TLSCertPath string 72 TLSKeyPath string 73 PKCS11ModulePath string 74 PKCS11Pin string 75 PKCS11TokenSlot string 76 PKCS11TokenLabel string 77 PKCS11TokenSerial string 78 PKCS11KeypairLabel string 79 PKCS11KeypairID string 80 StreamerName string 81 RelayHost string 82 Debug map[string]map[string]int 83 AllowedStreams []string 84 WideOpen bool 85 Peers []string 86 Redirects []string 87 TestStream bool 88 FrontendProxy string 89 AppBundleID string 90 NoFirehose bool 91 PrintChat bool 92 Color string 93 LivepeerGatewayURL string 94 LivepeerGateway bool 95 WHIPTest string 96 Thumbnail bool 97 SmearAudio bool 98 ExternalSigning bool 99 RTMPServerAddon string 100 TracingEndpoint string 101 PublicHost string 102 RateLimitPerSecond int 103 RateLimitBurst int 104 RateLimitWebsocket int 105 JWK jwk.Key 106 AccessJWK jwk.Key 107 dataDirFlags []*string 108 DiscordWebhooks []*discordtypes.Webhook 109 NewWebRTCPlayback bool 110 AppleTeamID string 111 AndroidCertFingerprint string 112 Labelers []string 113 AtprotoDID string 114 LivepeerHelp bool 115} 116 117func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 118 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 119 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 120 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 121 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 122 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 123 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 124 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 125 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 126 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 127 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 128 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 129 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 130 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 131 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 132 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 133 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 134 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 135 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 136 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 137 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 138 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 139 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 140 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 141 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 142 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 143 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 144 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 145 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 146 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 147 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 148 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 149 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 150 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 151 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 152 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 153 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 154 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 155 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 156 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 157 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 158 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 159 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 160 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 161 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 162 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 163 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 164 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 165 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 166 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 167 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 168 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 169 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 170 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 171 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 172 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 173 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to") 174 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 175 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 176 177 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 178 _ = starter.NewLivepeerConfig(lpFlags) 179 lpFlags.VisitAll(func(f *flag.Flag) { 180 adapted := LivepeerFlags.CamelToSnake[f.Name] 181 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 182 }) 183 184 if runtime.GOOS == "linux" { 185 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 186 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 187 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 188 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 189 } 190 return fs 191} 192 193var StreamplaceSchemePrefix = "streamplace://" 194 195func (cli *CLI) OwnInternalURL() string { 196 // No errors because we know it's valid from AddrFlag 197 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 198 199 ip := net.ParseIP(host) 200 if ip.IsUnspecified() { 201 host = "127.0.0.1" 202 } 203 addr := net.JoinHostPort(host, port) 204 return fmt.Sprintf("http://%s", addr) 205} 206 207func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 208 bs, err := os.ReadFile(cli.SigningKeyPath) 209 if err != nil { 210 return nil, err 211 } 212 block, _ := pem.Decode(bs) 213 if block == nil { 214 return nil, fmt.Errorf("no RSA key found in signing key") 215 } 216 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 217 if err != nil { 218 return nil, err 219 } 220 return key, nil 221} 222 223func RandomTrailer(length int) string { 224 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 225 226 res := make([]byte, length) 227 for i := 0; i < length; i++ { 228 res[i] = charset[rand.IntN(len(charset))] 229 } 230 return string(res) 231} 232 233func DefaultDataDir() string { 234 home, err := os.UserHomeDir() 235 if err != nil { 236 // not fatal unless the user doesn't set one later 237 return "" 238 } 239 return filepath.Join(home, ".streamplace") 240} 241 242func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 243 err := ff.Parse( 244 fs, args, 245 ff.WithEnvVarPrefix("SP"), 246 ) 247 if err != nil { 248 return err 249 } 250 if cli.DataDir == "" { 251 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 252 } 253 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 254 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 255 } 256 if cli.LivepeerGateway { 257 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 258 err = fs.Set("livepeer.data-dir", gatewayPath) 259 if err != nil { 260 return err 261 } 262 err = fs.Set("livepeer.gateway", "true") 263 if err != nil { 264 return err 265 } 266 httpAddrFlag := fs.Lookup("livepeer.http-addr") 267 if httpAddrFlag == nil { 268 return fmt.Errorf("livepeer.http-addr not found") 269 } 270 httpAddr := httpAddrFlag.Value.String() 271 if httpAddr == "" { 272 httpAddr = "127.0.0.1:8935" 273 err = fs.Set("livepeer.http-addr", httpAddr) 274 if err != nil { 275 return err 276 } 277 } 278 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 279 } 280 for _, dest := range cli.dataDirFlags { 281 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 282 } 283 return nil 284} 285 286func (cli *CLI) DataFilePath(fpath []string) string { 287 if cli.DataDir == "" { 288 panic("no data dir configured") 289 } 290 // windows does not like colons 291 safe := []string{} 292 for _, p := range fpath { 293 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 294 } 295 fpath = append([]string{cli.DataDir}, safe...) 296 fdpath := filepath.Join(fpath...) 297 return fdpath 298} 299 300// does a file exist in our data dir? 301func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 302 ddpath := cli.DataFilePath(fpath) 303 _, err := os.Stat(ddpath) 304 if err == nil { 305 return true, nil 306 } 307 if errors.Is(err, os.ErrNotExist) { 308 return false, nil 309 } 310 return false, err 311} 312 313// write a file to our data dir 314func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 315 fd, err := cli.DataFileCreate(fpath, overwrite) 316 if err != nil { 317 return err 318 } 319 defer fd.Close() 320 _, err = io.Copy(fd, r) 321 if err != nil { 322 return err 323 } 324 325 return nil 326} 327 328// create a file in our data dir. don't forget to close it! 329func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 330 ddpath := cli.DataFilePath(fpath) 331 if !overwrite { 332 exists, err := cli.DataFileExists(fpath) 333 if err != nil { 334 return nil, err 335 } 336 if exists { 337 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 338 } 339 } 340 if len(fpath) > 1 { 341 dirs, _ := filepath.Split(ddpath) 342 err := os.MkdirAll(dirs, os.ModePerm) 343 if err != nil { 344 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 345 } 346 } 347 return os.Create(ddpath) 348} 349 350// get a path to a segment file in our database 351func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 352 ext := filepath.Ext(file) 353 base := strings.TrimSuffix(file, ext) 354 aqt, err := aqtime.FromString(base) 355 if err != nil { 356 return "", err 357 } 358 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 359 yr, mon, day, hr, min, _, _ := aqt.Parts() 360 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 361} 362 363// get a path to a segment file in our database 364func (cli *CLI) HLSDir(user string) (string, error) { 365 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 366} 367 368// create a segment file in our database 369func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 370 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 371 yr, mon, day, hr, min, _, _ := aqt.Parts() 372 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 373} 374 375// read a file from our data dir 376func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 377 ddpath := cli.DataFilePath(fpath) 378 379 fd, err := os.Open(ddpath) 380 if err != nil { 381 return err 382 } 383 _, err = io.Copy(w, fd) 384 if err != nil { 385 return err 386 } 387 388 return nil 389} 390 391func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 392 cli.dataDirFlags = append(cli.dataDirFlags, dest) 393 *dest = filepath.Join(SPDataDir, defaultValue) 394 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 395 fs.Func(name, usage, func(s string) error { 396 *dest = s 397 return nil 398 }) 399} 400 401func (cli *CLI) HasMist() bool { 402 return runtime.GOOS == "linux" 403} 404 405// type for comma-separated ethereum addresses 406func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 407 *dest = []aqpub.Pub{} 408 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 409 fs.Func(name, usage, func(s string) error { 410 if s == "" { 411 return nil 412 } 413 strs := strings.Split(s, ",") 414 for _, str := range strs { 415 pub, err := aqpub.FromHexString(str) 416 if err != nil { 417 return err 418 } 419 *dest = append(*dest, pub) 420 } 421 return nil 422 }) 423} 424 425func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 426 *dest = []string{} 427 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 428 fs.Func(name, usage, func(s string) error { 429 if s == "" { 430 return nil 431 } 432 strs := strings.Split(s, ",") 433 *dest = append(*dest, strs...) 434 return nil 435 }) 436} 437 438func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 439 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 440 fs.Func(name, usage, func(s string) error { 441 if s == "" { 442 return nil 443 } 444 return json.Unmarshal([]byte(s), dest) 445 }) 446} 447 448// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 449func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 450 *dest = map[string]map[string]int{} 451 fs.Func(name, usage, func(s string) error { 452 if s == "" { 453 return nil 454 } 455 pairs := strings.Split(s, ",") 456 for _, pair := range pairs { 457 scoreSplit := strings.Split(pair, ":") 458 if len(scoreSplit) != 2 { 459 return fmt.Errorf("invalid debug flag: %s", pair) 460 } 461 score, err := strconv.Atoi(scoreSplit[1]) 462 if err != nil { 463 return fmt.Errorf("invalid debug flag: %s", pair) 464 } 465 selectorSplit := strings.Split(scoreSplit[0], "=") 466 if len(selectorSplit) != 2 { 467 return fmt.Errorf("invalid debug flag: %s", pair) 468 } 469 _, ok := (*dest)[selectorSplit[0]] 470 if !ok { 471 (*dest)[selectorSplit[0]] = map[string]int{} 472 } 473 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 474 } 475 476 return nil 477 }) 478} 479 480func (cli *CLI) StreamIsAllowed(did string) error { 481 if cli.WideOpen { 482 return nil 483 } 484 // if the user set no test streams, anyone can stream 485 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 486 // but only valid atproto accounts! did:key is only allowed for our local test stream 487 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 488 if openServer && !isDIDKey { 489 return nil 490 } 491 for _, a := range cli.AllowedStreams { 492 if a == did { 493 return nil 494 } 495 } 496 return fmt.Errorf("user is not allowed to stream") 497} 498 499func (cli *CLI) MyDID() string { 500 return fmt.Sprintf("did:web:%s", cli.PublicHost) 501}