Live video on the AT Protocol
at eli/postgres 504 lines 19 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "math/rand/v2" 21 22 "github.com/lestrrat-go/jwx/v2/jwk" 23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 24 "github.com/peterbourgon/ff/v3" 25 "stream.place/streamplace/pkg/aqtime" 26 "stream.place/streamplace/pkg/constants" 27 "stream.place/streamplace/pkg/crypto/aqpub" 28 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 29) 30 31const SPDataDir = "$SP_DATA_DIR" 32const SegmentsDir = "segments" 33 34type BuildFlags struct { 35 Version string 36 BuildTime int64 37 UUID string 38} 39 40func (b BuildFlags) BuildTimeStr() string { 41 ts := time.Unix(b.BuildTime, 0) 42 return ts.UTC().Format(time.RFC3339) 43} 44 45func (b BuildFlags) BuildTimeStrExpo() string { 46 ts := time.Unix(b.BuildTime, 0) 47 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 48} 49 50type CLI struct { 51 AdminAccount string 52 Build *BuildFlags 53 DataDir string 54 DBURL string 55 IndexDBPath string 56 EthAccountAddr string 57 EthKeystorePath string 58 EthPassword string 59 FirebaseServiceAccount string 60 GitLabURL string 61 HTTPAddr string 62 HTTPInternalAddr string 63 HTTPSAddr string 64 RtmpsAddr string 65 Secure bool 66 NoMist bool 67 MistAdminPort int 68 MistHTTPPort int 69 MistRTMPPort int 70 SigningKeyPath string 71 TAURL string 72 TLSCertPath string 73 TLSKeyPath string 74 PKCS11ModulePath string 75 PKCS11Pin string 76 PKCS11TokenSlot string 77 PKCS11TokenLabel string 78 PKCS11TokenSerial string 79 PKCS11KeypairLabel string 80 PKCS11KeypairID string 81 StreamerName string 82 RelayHost string 83 Debug map[string]map[string]int 84 AllowedStreams []string 85 WideOpen bool 86 Peers []string 87 Redirects []string 88 TestStream bool 89 FrontendProxy string 90 AppBundleID string 91 NoFirehose bool 92 PrintChat bool 93 Color string 94 LivepeerGatewayURL string 95 LivepeerGateway bool 96 WHIPTest string 97 Thumbnail bool 98 SmearAudio bool 99 ExternalSigning bool 100 RTMPServerAddon string 101 TracingEndpoint string 102 PublicHost string 103 RateLimitPerSecond int 104 RateLimitBurst int 105 RateLimitWebsocket int 106 JWK jwk.Key 107 AccessJWK jwk.Key 108 dataDirFlags []*string 109 DiscordWebhooks []*discordtypes.Webhook 110 NewWebRTCPlayback bool 111 AppleTeamID string 112 AndroidCertFingerprint string 113 Labelers []string 114 AtprotoDID string 115 LivepeerHelp bool 116} 117 118func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 119 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 120 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 121 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 122 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 123 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 124 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 125 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 126 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 127 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 128 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 129 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 130 cli.DataDirFlag(fs, &cli.IndexDBPath, "index-db-path", "db.sqlite", "path to sqlite database file for maintaining atproto index") 131 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 132 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 133 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 134 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 135 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 136 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 137 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 138 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 139 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 140 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 141 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 142 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 143 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 144 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 145 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 146 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 147 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 148 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 149 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 150 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 151 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 152 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 153 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 154 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 155 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 156 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 157 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 158 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 159 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 160 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 161 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 162 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 163 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 164 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 165 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 166 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 167 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 168 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 169 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 170 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 171 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 172 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 173 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 174 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 175 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 176 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to") 177 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 178 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 179 180 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 181 _ = starter.NewLivepeerConfig(lpFlags) 182 lpFlags.VisitAll(func(f *flag.Flag) { 183 adapted := LivepeerFlags.CamelToSnake[f.Name] 184 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 185 }) 186 187 if runtime.GOOS == "linux" { 188 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 189 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 190 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 191 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 192 } 193 return fs 194} 195 196var StreamplaceSchemePrefix = "streamplace://" 197 198func (cli *CLI) OwnInternalURL() string { 199 // No errors because we know it's valid from AddrFlag 200 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 201 202 ip := net.ParseIP(host) 203 if ip.IsUnspecified() { 204 host = "127.0.0.1" 205 } 206 addr := net.JoinHostPort(host, port) 207 return fmt.Sprintf("http://%s", addr) 208} 209 210func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 211 bs, err := os.ReadFile(cli.SigningKeyPath) 212 if err != nil { 213 return nil, err 214 } 215 block, _ := pem.Decode(bs) 216 if block == nil { 217 return nil, fmt.Errorf("no RSA key found in signing key") 218 } 219 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 220 if err != nil { 221 return nil, err 222 } 223 return key, nil 224} 225 226func RandomTrailer(length int) string { 227 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 228 229 res := make([]byte, length) 230 for i := 0; i < length; i++ { 231 res[i] = charset[rand.IntN(len(charset))] 232 } 233 return string(res) 234} 235 236func DefaultDataDir() string { 237 home, err := os.UserHomeDir() 238 if err != nil { 239 // not fatal unless the user doesn't set one later 240 return "" 241 } 242 return filepath.Join(home, ".streamplace") 243} 244 245func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 246 err := ff.Parse( 247 fs, args, 248 ff.WithEnvVarPrefix("SP"), 249 ) 250 if err != nil { 251 return err 252 } 253 if cli.DataDir == "" { 254 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 255 } 256 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 257 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 258 } 259 if cli.LivepeerGateway { 260 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 261 err = fs.Set("livepeer.data-dir", gatewayPath) 262 if err != nil { 263 return err 264 } 265 err = fs.Set("livepeer.gateway", "true") 266 if err != nil { 267 return err 268 } 269 httpAddrFlag := fs.Lookup("livepeer.http-addr") 270 if httpAddrFlag == nil { 271 return fmt.Errorf("livepeer.http-addr not found") 272 } 273 httpAddr := httpAddrFlag.Value.String() 274 if httpAddr == "" { 275 httpAddr = "127.0.0.1:8935" 276 err = fs.Set("livepeer.http-addr", httpAddr) 277 if err != nil { 278 return err 279 } 280 } 281 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 282 } 283 for _, dest := range cli.dataDirFlags { 284 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 285 } 286 return nil 287} 288 289func (cli *CLI) DataFilePath(fpath []string) string { 290 if cli.DataDir == "" { 291 panic("no data dir configured") 292 } 293 // windows does not like colons 294 safe := []string{} 295 for _, p := range fpath { 296 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 297 } 298 fpath = append([]string{cli.DataDir}, safe...) 299 fdpath := filepath.Join(fpath...) 300 return fdpath 301} 302 303// does a file exist in our data dir? 304func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 305 ddpath := cli.DataFilePath(fpath) 306 _, err := os.Stat(ddpath) 307 if err == nil { 308 return true, nil 309 } 310 if errors.Is(err, os.ErrNotExist) { 311 return false, nil 312 } 313 return false, err 314} 315 316// write a file to our data dir 317func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 318 fd, err := cli.DataFileCreate(fpath, overwrite) 319 if err != nil { 320 return err 321 } 322 defer fd.Close() 323 _, err = io.Copy(fd, r) 324 if err != nil { 325 return err 326 } 327 328 return nil 329} 330 331// create a file in our data dir. don't forget to close it! 332func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 333 ddpath := cli.DataFilePath(fpath) 334 if !overwrite { 335 exists, err := cli.DataFileExists(fpath) 336 if err != nil { 337 return nil, err 338 } 339 if exists { 340 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 341 } 342 } 343 if len(fpath) > 1 { 344 dirs, _ := filepath.Split(ddpath) 345 err := os.MkdirAll(dirs, os.ModePerm) 346 if err != nil { 347 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 348 } 349 } 350 return os.Create(ddpath) 351} 352 353// get a path to a segment file in our database 354func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 355 ext := filepath.Ext(file) 356 base := strings.TrimSuffix(file, ext) 357 aqt, err := aqtime.FromString(base) 358 if err != nil { 359 return "", err 360 } 361 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 362 yr, mon, day, hr, min, _, _ := aqt.Parts() 363 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 364} 365 366// get a path to a segment file in our database 367func (cli *CLI) HLSDir(user string) (string, error) { 368 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 369} 370 371// create a segment file in our database 372func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 373 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 374 yr, mon, day, hr, min, _, _ := aqt.Parts() 375 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 376} 377 378// read a file from our data dir 379func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 380 ddpath := cli.DataFilePath(fpath) 381 382 fd, err := os.Open(ddpath) 383 if err != nil { 384 return err 385 } 386 _, err = io.Copy(w, fd) 387 if err != nil { 388 return err 389 } 390 391 return nil 392} 393 394func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 395 cli.dataDirFlags = append(cli.dataDirFlags, dest) 396 *dest = filepath.Join(SPDataDir, defaultValue) 397 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 398 fs.Func(name, usage, func(s string) error { 399 *dest = s 400 return nil 401 }) 402} 403 404func (cli *CLI) HasMist() bool { 405 return runtime.GOOS == "linux" 406} 407 408// type for comma-separated ethereum addresses 409func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 410 *dest = []aqpub.Pub{} 411 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 412 fs.Func(name, usage, func(s string) error { 413 if s == "" { 414 return nil 415 } 416 strs := strings.Split(s, ",") 417 for _, str := range strs { 418 pub, err := aqpub.FromHexString(str) 419 if err != nil { 420 return err 421 } 422 *dest = append(*dest, pub) 423 } 424 return nil 425 }) 426} 427 428func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 429 *dest = []string{} 430 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 431 fs.Func(name, usage, func(s string) error { 432 if s == "" { 433 return nil 434 } 435 strs := strings.Split(s, ",") 436 *dest = append(*dest, strs...) 437 return nil 438 }) 439} 440 441func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 442 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 443 fs.Func(name, usage, func(s string) error { 444 if s == "" { 445 return nil 446 } 447 return json.Unmarshal([]byte(s), dest) 448 }) 449} 450 451// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 452func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 453 *dest = map[string]map[string]int{} 454 fs.Func(name, usage, func(s string) error { 455 if s == "" { 456 return nil 457 } 458 pairs := strings.Split(s, ",") 459 for _, pair := range pairs { 460 scoreSplit := strings.Split(pair, ":") 461 if len(scoreSplit) != 2 { 462 return fmt.Errorf("invalid debug flag: %s", pair) 463 } 464 score, err := strconv.Atoi(scoreSplit[1]) 465 if err != nil { 466 return fmt.Errorf("invalid debug flag: %s", pair) 467 } 468 selectorSplit := strings.Split(scoreSplit[0], "=") 469 if len(selectorSplit) != 2 { 470 return fmt.Errorf("invalid debug flag: %s", pair) 471 } 472 _, ok := (*dest)[selectorSplit[0]] 473 if !ok { 474 (*dest)[selectorSplit[0]] = map[string]int{} 475 } 476 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 477 } 478 479 return nil 480 }) 481} 482 483func (cli *CLI) StreamIsAllowed(did string) error { 484 if cli.WideOpen { 485 return nil 486 } 487 // if the user set no test streams, anyone can stream 488 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 489 // but only valid atproto accounts! did:key is only allowed for our local test stream 490 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 491 if openServer && !isDIDKey { 492 return nil 493 } 494 for _, a := range cli.AllowedStreams { 495 if a == did { 496 return nil 497 } 498 } 499 return fmt.Errorf("user is not allowed to stream") 500} 501 502func (cli *CLI) MyDID() string { 503 return fmt.Sprintf("did:web:%s", cli.PublicHost) 504}