Live video on the AT Protocol
at eli/fix-context-recursion 504 lines 19 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "math/rand/v2" 21 22 "github.com/lestrrat-go/jwx/v2/jwk" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/peterbourgon/ff/v3" 25 "stream.place/streamplace/pkg/aqtime" 26 "stream.place/streamplace/pkg/constants" 27 "stream.place/streamplace/pkg/crypto/aqpub" 28 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 29) 30 31const SPDataDir = "$SP_DATA_DIR" 32const SegmentsDir = "segments" 33 34type BuildFlags struct { 35 Version string 36 BuildTime int64 37 UUID string 38} 39 40func (b BuildFlags) BuildTimeStr() string { 41 ts := time.Unix(b.BuildTime, 0) 42 return ts.UTC().Format(time.RFC3339) 43} 44 45func (b BuildFlags) BuildTimeStrExpo() string { 46 ts := time.Unix(b.BuildTime, 0) 47 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 48} 49 50type CLI struct { 51 AdminAccount string 52 Build *BuildFlags 53 DataDir string 54 DBURL string 55 EthAccountAddr string 56 EthKeystorePath string 57 EthPassword string 58 FirebaseServiceAccount string 59 GitLabURL string 60 HTTPAddr string 61 HTTPInternalAddr string 62 HTTPSAddr string 63 RtmpsAddr string 64 Secure bool 65 NoMist bool 66 MistAdminPort int 67 MistHTTPPort int 68 MistRTMPPort int 69 SigningKeyPath string 70 TAURL string 71 TLSCertPath string 72 TLSKeyPath string 73 PKCS11ModulePath string 74 PKCS11Pin string 75 PKCS11TokenSlot string 76 PKCS11TokenLabel string 77 PKCS11TokenSerial string 78 PKCS11KeypairLabel string 79 PKCS11KeypairID string 80 StreamerName string 81 RelayHost string 82 Debug map[string]map[string]int 83 AllowedStreams []string 84 WideOpen bool 85 Peers []string 86 Redirects []string 87 TestStream bool 88 FrontendProxy string 89 AppBundleID string 90 NoFirehose bool 91 PrintChat bool 92 Color string 93 LivepeerGatewayURL string 94 LivepeerGateway bool 95 WHIPTest string 96 Thumbnail bool 97 SmearAudio bool 98 ExternalSigning bool 99 RTMPServerAddon string 100 TracingEndpoint string 101 PublicHost string 102 RateLimitPerSecond int 103 RateLimitBurst int 104 RateLimitWebsocket int 105 JWK jwk.Key 106 AccessJWK jwk.Key 107 dataDirFlags []*string 108 DiscordWebhooks []*discordtypes.Webhook 109 NewWebRTCPlayback bool 110 AppleTeamID string 111 AndroidCertFingerprint string 112 Labelers []string 113 AtprotoDID string 114 LivepeerHelp bool 115 PLCURL string 116} 117 118func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 119 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 120 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 121 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 122 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 123 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 124 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 125 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 126 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 127 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 128 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 129 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 130 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 131 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 132 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 133 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 134 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 135 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 136 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 137 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 138 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 139 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 140 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 141 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 142 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 143 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 144 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 145 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 146 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 147 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 148 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 149 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 150 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 151 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 152 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 153 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 154 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 155 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 156 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 157 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 158 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 159 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 160 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 161 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 162 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 163 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 164 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 165 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 166 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 167 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 168 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 169 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 170 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 171 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 172 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 173 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 174 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 175 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to") 176 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 177 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 178 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 179 180 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 181 _ = starter.NewLivepeerConfig(lpFlags) 182 lpFlags.VisitAll(func(f *flag.Flag) { 183 adapted := LivepeerFlags.CamelToSnake[f.Name] 184 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 185 }) 186 187 if runtime.GOOS == "linux" { 188 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 189 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 190 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 191 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 192 } 193 return fs 194} 195 196var StreamplaceSchemePrefix = "streamplace://" 197 198func (cli *CLI) OwnInternalURL() string { 199 // No errors because we know it's valid from AddrFlag 200 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 201 202 ip := net.ParseIP(host) 203 if ip.IsUnspecified() { 204 host = "127.0.0.1" 205 } 206 addr := net.JoinHostPort(host, port) 207 return fmt.Sprintf("http://%s", addr) 208} 209 210func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 211 bs, err := os.ReadFile(cli.SigningKeyPath) 212 if err != nil { 213 return nil, err 214 } 215 block, _ := pem.Decode(bs) 216 if block == nil { 217 return nil, fmt.Errorf("no RSA key found in signing key") 218 } 219 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 220 if err != nil { 221 return nil, err 222 } 223 return key, nil 224} 225 226func RandomTrailer(length int) string { 227 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 228 229 res := make([]byte, length) 230 for i := 0; i < length; i++ { 231 res[i] = charset[rand.IntN(len(charset))] 232 } 233 return string(res) 234} 235 236func DefaultDataDir() string { 237 home, err := os.UserHomeDir() 238 if err != nil { 239 // not fatal unless the user doesn't set one later 240 return "" 241 } 242 return filepath.Join(home, ".streamplace") 243} 244 245func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 246 err := ff.Parse( 247 fs, args, 248 ff.WithEnvVarPrefix("SP"), 249 ) 250 if err != nil { 251 return err 252 } 253 if cli.DataDir == "" { 254 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 255 } 256 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 257 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 258 } 259 if cli.LivepeerGateway { 260 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 261 err = fs.Set("livepeer.data-dir", gatewayPath) 262 if err != nil { 263 return err 264 } 265 err = fs.Set("livepeer.gateway", "true") 266 if err != nil { 267 return err 268 } 269 httpAddrFlag := fs.Lookup("livepeer.http-addr") 270 if httpAddrFlag == nil { 271 return fmt.Errorf("livepeer.http-addr not found") 272 } 273 httpAddr := httpAddrFlag.Value.String() 274 if httpAddr == "" { 275 httpAddr = "127.0.0.1:8935" 276 err = fs.Set("livepeer.http-addr", httpAddr) 277 if err != nil { 278 return err 279 } 280 } 281 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 282 } 283 for _, dest := range cli.dataDirFlags { 284 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 285 } 286 return nil 287} 288 289func (cli *CLI) DataFilePath(fpath []string) string { 290 if cli.DataDir == "" { 291 panic("no data dir configured") 292 } 293 // windows does not like colons 294 safe := []string{} 295 for _, p := range fpath { 296 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 297 } 298 fpath = append([]string{cli.DataDir}, safe...) 299 fdpath := filepath.Join(fpath...) 300 return fdpath 301} 302 303// does a file exist in our data dir? 304func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 305 ddpath := cli.DataFilePath(fpath) 306 _, err := os.Stat(ddpath) 307 if err == nil { 308 return true, nil 309 } 310 if errors.Is(err, os.ErrNotExist) { 311 return false, nil 312 } 313 return false, err 314} 315 316// write a file to our data dir 317func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 318 fd, err := cli.DataFileCreate(fpath, overwrite) 319 if err != nil { 320 return err 321 } 322 defer fd.Close() 323 _, err = io.Copy(fd, r) 324 if err != nil { 325 return err 326 } 327 328 return nil 329} 330 331// create a file in our data dir. don't forget to close it! 332func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 333 ddpath := cli.DataFilePath(fpath) 334 if !overwrite { 335 exists, err := cli.DataFileExists(fpath) 336 if err != nil { 337 return nil, err 338 } 339 if exists { 340 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 341 } 342 } 343 if len(fpath) > 1 { 344 dirs, _ := filepath.Split(ddpath) 345 err := os.MkdirAll(dirs, os.ModePerm) 346 if err != nil { 347 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 348 } 349 } 350 return os.Create(ddpath) 351} 352 353// get a path to a segment file in our database 354func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 355 ext := filepath.Ext(file) 356 base := strings.TrimSuffix(file, ext) 357 aqt, err := aqtime.FromString(base) 358 if err != nil { 359 return "", err 360 } 361 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 362 yr, mon, day, hr, min, _, _ := aqt.Parts() 363 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 364} 365 366// get a path to a segment file in our database 367func (cli *CLI) HLSDir(user string) (string, error) { 368 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 369} 370 371// create a segment file in our database 372func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 373 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 374 yr, mon, day, hr, min, _, _ := aqt.Parts() 375 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 376} 377 378// read a file from our data dir 379func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 380 ddpath := cli.DataFilePath(fpath) 381 382 fd, err := os.Open(ddpath) 383 if err != nil { 384 return err 385 } 386 _, err = io.Copy(w, fd) 387 if err != nil { 388 return err 389 } 390 391 return nil 392} 393 394func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 395 cli.dataDirFlags = append(cli.dataDirFlags, dest) 396 *dest = filepath.Join(SPDataDir, defaultValue) 397 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 398 fs.Func(name, usage, func(s string) error { 399 *dest = s 400 return nil 401 }) 402} 403 404func (cli *CLI) HasMist() bool { 405 return runtime.GOOS == "linux" 406} 407 408// type for comma-separated ethereum addresses 409func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 410 *dest = []aqpub.Pub{} 411 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 412 fs.Func(name, usage, func(s string) error { 413 if s == "" { 414 return nil 415 } 416 strs := strings.Split(s, ",") 417 for _, str := range strs { 418 pub, err := aqpub.FromHexString(str) 419 if err != nil { 420 return err 421 } 422 *dest = append(*dest, pub) 423 } 424 return nil 425 }) 426} 427 428func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 429 *dest = []string{} 430 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 431 fs.Func(name, usage, func(s string) error { 432 if s == "" { 433 return nil 434 } 435 strs := strings.Split(s, ",") 436 *dest = append(*dest, strs...) 437 return nil 438 }) 439} 440 441func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 442 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 443 fs.Func(name, usage, func(s string) error { 444 if s == "" { 445 return nil 446 } 447 return json.Unmarshal([]byte(s), dest) 448 }) 449} 450 451// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 452func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 453 *dest = map[string]map[string]int{} 454 fs.Func(name, usage, func(s string) error { 455 if s == "" { 456 return nil 457 } 458 pairs := strings.Split(s, ",") 459 for _, pair := range pairs { 460 scoreSplit := strings.Split(pair, ":") 461 if len(scoreSplit) != 2 { 462 return fmt.Errorf("invalid debug flag: %s", pair) 463 } 464 score, err := strconv.Atoi(scoreSplit[1]) 465 if err != nil { 466 return fmt.Errorf("invalid debug flag: %s", pair) 467 } 468 selectorSplit := strings.Split(scoreSplit[0], "=") 469 if len(selectorSplit) != 2 { 470 return fmt.Errorf("invalid debug flag: %s", pair) 471 } 472 _, ok := (*dest)[selectorSplit[0]] 473 if !ok { 474 (*dest)[selectorSplit[0]] = map[string]int{} 475 } 476 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 477 } 478 479 return nil 480 }) 481} 482 483func (cli *CLI) StreamIsAllowed(did string) error { 484 if cli.WideOpen { 485 return nil 486 } 487 // if the user set no test streams, anyone can stream 488 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 489 // but only valid atproto accounts! did:key is only allowed for our local test stream 490 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 491 if openServer && !isDIDKey { 492 return nil 493 } 494 for _, a := range cli.AllowedStreams { 495 if a == did { 496 return nil 497 } 498 } 499 return fmt.Errorf("user is not allowed to stream") 500} 501 502func (cli *CLI) MyDID() string { 503 return fmt.Sprintf("did:web:%s", cli.PublicHost) 504}