Live video on the AT Protocol
at eli/buffer-thumbnails 530 lines 19 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "math/rand/v2" 21 22 "github.com/lestrrat-go/jwx/v2/jwk" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/lmittmann/tint" 25 slogGorm "github.com/orandin/slog-gorm" 26 "github.com/peterbourgon/ff/v3" 27 "stream.place/streamplace/pkg/aqtime" 28 "stream.place/streamplace/pkg/constants" 29 "stream.place/streamplace/pkg/crypto/aqpub" 30 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 31) 32 33const SPDataDir = "$SP_DATA_DIR" 34const SegmentsDir = "segments" 35 36type BuildFlags struct { 37 Version string 38 BuildTime int64 39 UUID string 40} 41 42func (b BuildFlags) BuildTimeStr() string { 43 ts := time.Unix(b.BuildTime, 0) 44 return ts.UTC().Format(time.RFC3339) 45} 46 47func (b BuildFlags) BuildTimeStrExpo() string { 48 ts := time.Unix(b.BuildTime, 0) 49 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 50} 51 52type CLI struct { 53 AdminAccount string 54 Build *BuildFlags 55 DataDir string 56 DBURL string 57 EthAccountAddr string 58 EthKeystorePath string 59 EthPassword string 60 FirebaseServiceAccount string 61 GitLabURL string 62 HTTPAddr string 63 HTTPInternalAddr string 64 HTTPSAddr string 65 RtmpsAddr string 66 Secure bool 67 NoMist bool 68 MistAdminPort int 69 MistHTTPPort int 70 MistRTMPPort int 71 SigningKeyPath string 72 TAURL string 73 TLSCertPath string 74 TLSKeyPath string 75 PKCS11ModulePath string 76 PKCS11Pin string 77 PKCS11TokenSlot string 78 PKCS11TokenLabel string 79 PKCS11TokenSerial string 80 PKCS11KeypairLabel string 81 PKCS11KeypairID string 82 StreamerName string 83 RelayHost string 84 Debug map[string]map[string]int 85 AllowedStreams []string 86 WideOpen bool 87 Peers []string 88 Redirects []string 89 TestStream bool 90 FrontendProxy string 91 AppBundleID string 92 NoFirehose bool 93 PrintChat bool 94 Color string 95 LivepeerGatewayURL string 96 LivepeerGateway bool 97 WHIPTest string 98 Thumbnail bool 99 SmearAudio bool 100 ExternalSigning bool 101 RTMPServerAddon string 102 TracingEndpoint string 103 PublicHost string 104 RateLimitPerSecond int 105 RateLimitBurst int 106 RateLimitWebsocket int 107 JWK jwk.Key 108 AccessJWK jwk.Key 109 dataDirFlags []*string 110 DiscordWebhooks []*discordtypes.Webhook 111 NewWebRTCPlayback bool 112 AppleTeamID string 113 AndroidCertFingerprint string 114 Labelers []string 115 AtprotoDID string 116 LivepeerHelp bool 117 PLCURL string 118 SQLLogging bool 119 SentryDSN string 120 LivepeerDebug bool 121} 122 123func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 124 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 125 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 126 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 127 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 128 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 129 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 130 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 131 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 132 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 133 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 134 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 135 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 136 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 137 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 138 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 139 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 140 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 141 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 142 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 143 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 144 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 145 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 146 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 147 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 148 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 149 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 150 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 151 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 152 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 153 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 154 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 155 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 156 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 157 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 158 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 159 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 160 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 161 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 162 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 163 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 164 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 165 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 166 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 167 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 168 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 169 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 170 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 171 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 172 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 173 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 174 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 175 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 176 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 177 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 178 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 179 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 180 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to") 181 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 182 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 183 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 184 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 185 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 186 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 187 188 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 189 _ = starter.NewLivepeerConfig(lpFlags) 190 lpFlags.VisitAll(func(f *flag.Flag) { 191 adapted := LivepeerFlags.CamelToSnake[f.Name] 192 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 193 }) 194 195 if runtime.GOOS == "linux" { 196 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 197 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 198 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 199 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 200 } 201 return fs 202} 203 204var StreamplaceSchemePrefix = "streamplace://" 205 206func (cli *CLI) OwnInternalURL() string { 207 // No errors because we know it's valid from AddrFlag 208 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 209 210 ip := net.ParseIP(host) 211 if ip.IsUnspecified() { 212 host = "127.0.0.1" 213 } 214 addr := net.JoinHostPort(host, port) 215 return fmt.Sprintf("http://%s", addr) 216} 217 218func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 219 bs, err := os.ReadFile(cli.SigningKeyPath) 220 if err != nil { 221 return nil, err 222 } 223 block, _ := pem.Decode(bs) 224 if block == nil { 225 return nil, fmt.Errorf("no RSA key found in signing key") 226 } 227 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 228 if err != nil { 229 return nil, err 230 } 231 return key, nil 232} 233 234func RandomTrailer(length int) string { 235 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 236 237 res := make([]byte, length) 238 for i := 0; i < length; i++ { 239 res[i] = charset[rand.IntN(len(charset))] 240 } 241 return string(res) 242} 243 244func DefaultDataDir() string { 245 home, err := os.UserHomeDir() 246 if err != nil { 247 // not fatal unless the user doesn't set one later 248 return "" 249 } 250 return filepath.Join(home, ".streamplace") 251} 252 253var GormLogger = slogGorm.New( 254 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 255 TimeFormat: time.RFC3339, 256 })), 257 // slogGorm.WithTraceAll(), 258) 259 260func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 261 err := ff.Parse( 262 fs, args, 263 ff.WithEnvVarPrefix("SP"), 264 ) 265 if err != nil { 266 return err 267 } 268 if cli.DataDir == "" { 269 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 270 } 271 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 272 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 273 } 274 if cli.LivepeerGateway { 275 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 276 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0") 277 if err != nil { 278 return err 279 } 280 err = fs.Set("livepeer.data-dir", gatewayPath) 281 if err != nil { 282 return err 283 } 284 err = fs.Set("livepeer.gateway", "true") 285 if err != nil { 286 return err 287 } 288 httpAddrFlag := fs.Lookup("livepeer.http-addr") 289 if httpAddrFlag == nil { 290 return fmt.Errorf("livepeer.http-addr not found") 291 } 292 httpAddr := httpAddrFlag.Value.String() 293 if httpAddr == "" { 294 httpAddr = "127.0.0.1:8935" 295 err = fs.Set("livepeer.http-addr", httpAddr) 296 if err != nil { 297 return err 298 } 299 } 300 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 301 } 302 for _, dest := range cli.dataDirFlags { 303 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 304 } 305 if !cli.SQLLogging { 306 GormLogger = slogGorm.New( 307 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 308 TimeFormat: time.RFC3339, 309 })), 310 ) 311 } 312 return nil 313} 314 315func (cli *CLI) DataFilePath(fpath []string) string { 316 if cli.DataDir == "" { 317 panic("no data dir configured") 318 } 319 // windows does not like colons 320 safe := []string{} 321 for _, p := range fpath { 322 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 323 } 324 fpath = append([]string{cli.DataDir}, safe...) 325 fdpath := filepath.Join(fpath...) 326 return fdpath 327} 328 329// does a file exist in our data dir? 330func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 331 ddpath := cli.DataFilePath(fpath) 332 _, err := os.Stat(ddpath) 333 if err == nil { 334 return true, nil 335 } 336 if errors.Is(err, os.ErrNotExist) { 337 return false, nil 338 } 339 return false, err 340} 341 342// write a file to our data dir 343func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 344 fd, err := cli.DataFileCreate(fpath, overwrite) 345 if err != nil { 346 return err 347 } 348 defer fd.Close() 349 _, err = io.Copy(fd, r) 350 if err != nil { 351 return err 352 } 353 354 return nil 355} 356 357// create a file in our data dir. don't forget to close it! 358func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 359 ddpath := cli.DataFilePath(fpath) 360 if !overwrite { 361 exists, err := cli.DataFileExists(fpath) 362 if err != nil { 363 return nil, err 364 } 365 if exists { 366 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 367 } 368 } 369 if len(fpath) > 1 { 370 dirs, _ := filepath.Split(ddpath) 371 err := os.MkdirAll(dirs, os.ModePerm) 372 if err != nil { 373 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 374 } 375 } 376 return os.Create(ddpath) 377} 378 379// get a path to a segment file in our database 380func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 381 ext := filepath.Ext(file) 382 base := strings.TrimSuffix(file, ext) 383 aqt, err := aqtime.FromString(base) 384 if err != nil { 385 return "", err 386 } 387 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 388 yr, mon, day, hr, min, _, _ := aqt.Parts() 389 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 390} 391 392// get a path to a segment file in our database 393func (cli *CLI) HLSDir(user string) (string, error) { 394 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 395} 396 397// create a segment file in our database 398func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 399 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 400 yr, mon, day, hr, min, _, _ := aqt.Parts() 401 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 402} 403 404// read a file from our data dir 405func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 406 ddpath := cli.DataFilePath(fpath) 407 408 fd, err := os.Open(ddpath) 409 if err != nil { 410 return err 411 } 412 _, err = io.Copy(w, fd) 413 if err != nil { 414 return err 415 } 416 417 return nil 418} 419 420func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 421 cli.dataDirFlags = append(cli.dataDirFlags, dest) 422 *dest = filepath.Join(SPDataDir, defaultValue) 423 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 424 fs.Func(name, usage, func(s string) error { 425 *dest = s 426 return nil 427 }) 428} 429 430func (cli *CLI) HasMist() bool { 431 return runtime.GOOS == "linux" 432} 433 434// type for comma-separated ethereum addresses 435func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 436 *dest = []aqpub.Pub{} 437 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 438 fs.Func(name, usage, func(s string) error { 439 if s == "" { 440 return nil 441 } 442 strs := strings.Split(s, ",") 443 for _, str := range strs { 444 pub, err := aqpub.FromHexString(str) 445 if err != nil { 446 return err 447 } 448 *dest = append(*dest, pub) 449 } 450 return nil 451 }) 452} 453 454func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 455 *dest = []string{} 456 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 457 fs.Func(name, usage, func(s string) error { 458 if s == "" { 459 return nil 460 } 461 strs := strings.Split(s, ",") 462 *dest = append(*dest, strs...) 463 return nil 464 }) 465} 466 467func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 468 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 469 fs.Func(name, usage, func(s string) error { 470 if s == "" { 471 return nil 472 } 473 return json.Unmarshal([]byte(s), dest) 474 }) 475} 476 477// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 478func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 479 *dest = map[string]map[string]int{} 480 fs.Func(name, usage, func(s string) error { 481 if s == "" { 482 return nil 483 } 484 pairs := strings.Split(s, ",") 485 for _, pair := range pairs { 486 scoreSplit := strings.Split(pair, ":") 487 if len(scoreSplit) != 2 { 488 return fmt.Errorf("invalid debug flag: %s", pair) 489 } 490 score, err := strconv.Atoi(scoreSplit[1]) 491 if err != nil { 492 return fmt.Errorf("invalid debug flag: %s", pair) 493 } 494 selectorSplit := strings.Split(scoreSplit[0], "=") 495 if len(selectorSplit) != 2 { 496 return fmt.Errorf("invalid debug flag: %s", pair) 497 } 498 _, ok := (*dest)[selectorSplit[0]] 499 if !ok { 500 (*dest)[selectorSplit[0]] = map[string]int{} 501 } 502 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 503 } 504 505 return nil 506 }) 507} 508 509func (cli *CLI) StreamIsAllowed(did string) error { 510 if cli.WideOpen { 511 return nil 512 } 513 // if the user set no test streams, anyone can stream 514 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 515 // but only valid atproto accounts! did:key is only allowed for our local test stream 516 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 517 if openServer && !isDIDKey { 518 return nil 519 } 520 for _, a := range cli.AllowedStreams { 521 if a == did { 522 return nil 523 } 524 } 525 return fmt.Errorf("user is not allowed to stream") 526} 527 528func (cli *CLI) MyDID() string { 529 return fmt.Sprintf("did:web:%s", cli.PublicHost) 530}