Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/server-restart-test 458 lines 17 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "math/rand/v2" 21 22 "github.com/lestrrat-go/jwx/v2/jwk" 23 "github.com/peterbourgon/ff/v3" 24 "stream.place/streamplace/pkg/aqtime" 25 "stream.place/streamplace/pkg/constants" 26 "stream.place/streamplace/pkg/crypto/aqpub" 27 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 28) 29 30const SPDataDir = "$SP_DATA_DIR" 31const SegmentsDir = "segments" 32 33type BuildFlags struct { 34 Version string 35 BuildTime int64 36 UUID string 37} 38 39func (b BuildFlags) BuildTimeStr() string { 40 ts := time.Unix(b.BuildTime, 0) 41 return ts.UTC().Format(time.RFC3339) 42} 43 44func (b BuildFlags) BuildTimeStrExpo() string { 45 ts := time.Unix(b.BuildTime, 0) 46 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 47} 48 49type CLI struct { 50 AdminAccount string 51 Build *BuildFlags 52 DataDir string 53 DBPath string 54 EthAccountAddr string 55 EthKeystorePath string 56 EthPassword string 57 FirebaseServiceAccount string 58 GitLabURL string 59 HTTPAddr string 60 HTTPInternalAddr string 61 HTTPSAddr string 62 RtmpsAddr string 63 Secure bool 64 NoMist bool 65 MistAdminPort int 66 MistHTTPPort int 67 MistRTMPPort int 68 SigningKeyPath string 69 TAURL string 70 TLSCertPath string 71 TLSKeyPath string 72 PKCS11ModulePath string 73 PKCS11Pin string 74 PKCS11TokenSlot string 75 PKCS11TokenLabel string 76 PKCS11TokenSerial string 77 PKCS11KeypairLabel string 78 PKCS11KeypairID string 79 StreamerName string 80 RelayHost string 81 Debug map[string]map[string]int 82 AllowedStreams []string 83 WideOpen bool 84 Peers []string 85 Redirects []string 86 TestStream bool 87 FrontendProxy string 88 AppBundleID string 89 NoFirehose bool 90 PrintChat bool 91 Color string 92 LivepeerGatewayURL string 93 WHIPTest string 94 Thumbnail bool 95 SmearAudio bool 96 ExternalSigning bool 97 RTMPServerAddon string 98 TracingEndpoint string 99 PublicHost string 100 RateLimitPerSecond int 101 RateLimitBurst int 102 RateLimitWebsocket int 103 JWK jwk.Key 104 AccessJWK jwk.Key 105 dataDirFlags []*string 106 DiscordWebhooks []*discordtypes.Webhook 107 NewWebRTCPlayback bool 108 AppleTeamID string 109 AndroidCertFingerprint string 110} 111 112func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 113 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 114 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 115 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 116 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 117 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 118 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 119 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 120 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 121 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 122 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 123 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 124 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 125 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 126 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 127 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 128 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 129 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 130 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 131 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 132 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 133 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 134 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 135 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 136 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 137 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 138 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 139 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 140 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 141 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 142 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 143 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 144 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 145 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 146 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 147 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 148 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 149 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 150 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 151 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 152 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 153 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 154 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 155 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 156 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 157 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 158 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 159 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 160 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 161 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 162 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 163 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 164 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 165 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 166 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 167 168 if runtime.GOOS == "linux" { 169 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 170 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 171 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 172 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 173 } 174 return fs 175} 176 177var StreamplaceSchemePrefix = "streamplace://" 178 179func (cli *CLI) OwnInternalURL() string { 180 // No errors because we know it's valid from AddrFlag 181 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 182 183 ip := net.ParseIP(host) 184 if ip.IsUnspecified() { 185 host = "127.0.0.1" 186 } 187 addr := net.JoinHostPort(host, port) 188 return fmt.Sprintf("http://%s", addr) 189} 190 191func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 192 bs, err := os.ReadFile(cli.SigningKeyPath) 193 if err != nil { 194 return nil, err 195 } 196 block, _ := pem.Decode(bs) 197 if block == nil { 198 return nil, fmt.Errorf("no RSA key found in signing key") 199 } 200 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 201 if err != nil { 202 return nil, err 203 } 204 return key, nil 205} 206 207func RandomTrailer(length int) string { 208 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 209 210 res := make([]byte, length) 211 for i := 0; i < length; i++ { 212 res[i] = charset[rand.IntN(len(charset))] 213 } 214 return string(res) 215} 216 217func DefaultDataDir() string { 218 home, err := os.UserHomeDir() 219 if err != nil { 220 // not fatal unless the user doesn't set one later 221 return "" 222 } 223 return filepath.Join(home, ".streamplace") 224} 225 226func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 227 err := ff.Parse( 228 fs, args, 229 ff.WithEnvVarPrefix("SP"), 230 ) 231 if err != nil { 232 return err 233 } 234 if cli.DataDir == "" { 235 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 236 } 237 for _, dest := range cli.dataDirFlags { 238 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 239 } 240 return nil 241} 242 243func (cli *CLI) DataFilePath(fpath []string) string { 244 if cli.DataDir == "" { 245 panic("no data dir configured") 246 } 247 // windows does not like colons 248 safe := []string{} 249 for _, p := range fpath { 250 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 251 } 252 fpath = append([]string{cli.DataDir}, safe...) 253 fdpath := filepath.Join(fpath...) 254 return fdpath 255} 256 257// does a file exist in our data dir? 258func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 259 ddpath := cli.DataFilePath(fpath) 260 _, err := os.Stat(ddpath) 261 if err == nil { 262 return true, nil 263 } 264 if errors.Is(err, os.ErrNotExist) { 265 return false, nil 266 } 267 return false, err 268} 269 270// write a file to our data dir 271func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 272 fd, err := cli.DataFileCreate(fpath, overwrite) 273 if err != nil { 274 return err 275 } 276 defer fd.Close() 277 _, err = io.Copy(fd, r) 278 if err != nil { 279 return err 280 } 281 282 return nil 283} 284 285// create a file in our data dir. don't forget to close it! 286func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 287 ddpath := cli.DataFilePath(fpath) 288 if !overwrite { 289 exists, err := cli.DataFileExists(fpath) 290 if err != nil { 291 return nil, err 292 } 293 if exists { 294 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 295 } 296 } 297 if len(fpath) > 1 { 298 dirs, _ := filepath.Split(ddpath) 299 err := os.MkdirAll(dirs, os.ModePerm) 300 if err != nil { 301 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 302 } 303 } 304 return os.Create(ddpath) 305} 306 307// get a path to a segment file in our database 308func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 309 ext := filepath.Ext(file) 310 base := strings.TrimSuffix(file, ext) 311 aqt, err := aqtime.FromString(base) 312 if err != nil { 313 return "", err 314 } 315 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 316 yr, mon, day, hr, min, _, _ := aqt.Parts() 317 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 318} 319 320// get a path to a segment file in our database 321func (cli *CLI) HLSDir(user string) (string, error) { 322 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 323} 324 325// create a segment file in our database 326func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 327 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 328 yr, mon, day, hr, min, _, _ := aqt.Parts() 329 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 330} 331 332// read a file from our data dir 333func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 334 ddpath := cli.DataFilePath(fpath) 335 336 fd, err := os.Open(ddpath) 337 if err != nil { 338 return err 339 } 340 _, err = io.Copy(w, fd) 341 if err != nil { 342 return err 343 } 344 345 return nil 346} 347 348func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 349 cli.dataDirFlags = append(cli.dataDirFlags, dest) 350 *dest = filepath.Join(SPDataDir, defaultValue) 351 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 352 fs.Func(name, usage, func(s string) error { 353 *dest = s 354 return nil 355 }) 356} 357 358func (cli *CLI) HasMist() bool { 359 return runtime.GOOS == "linux" 360} 361 362// type for comma-separated ethereum addresses 363func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 364 *dest = []aqpub.Pub{} 365 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 366 fs.Func(name, usage, func(s string) error { 367 if s == "" { 368 return nil 369 } 370 strs := strings.Split(s, ",") 371 for _, str := range strs { 372 pub, err := aqpub.FromHexString(str) 373 if err != nil { 374 return err 375 } 376 *dest = append(*dest, pub) 377 } 378 return nil 379 }) 380} 381 382func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 383 *dest = []string{} 384 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 385 fs.Func(name, usage, func(s string) error { 386 if s == "" { 387 return nil 388 } 389 strs := strings.Split(s, ",") 390 *dest = append(*dest, strs...) 391 return nil 392 }) 393} 394 395func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 396 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 397 fs.Func(name, usage, func(s string) error { 398 if s == "" { 399 return nil 400 } 401 return json.Unmarshal([]byte(s), dest) 402 }) 403} 404 405// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 406func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 407 *dest = map[string]map[string]int{} 408 fs.Func(name, usage, func(s string) error { 409 if s == "" { 410 return nil 411 } 412 pairs := strings.Split(s, ",") 413 for _, pair := range pairs { 414 scoreSplit := strings.Split(pair, ":") 415 if len(scoreSplit) != 2 { 416 return fmt.Errorf("invalid debug flag: %s", pair) 417 } 418 score, err := strconv.Atoi(scoreSplit[1]) 419 if err != nil { 420 return fmt.Errorf("invalid debug flag: %s", pair) 421 } 422 selectorSplit := strings.Split(scoreSplit[0], "=") 423 if len(selectorSplit) != 2 { 424 return fmt.Errorf("invalid debug flag: %s", pair) 425 } 426 _, ok := (*dest)[selectorSplit[0]] 427 if !ok { 428 (*dest)[selectorSplit[0]] = map[string]int{} 429 } 430 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 431 } 432 433 return nil 434 }) 435} 436 437func (cli *CLI) StreamIsAllowed(did string) error { 438 if cli.WideOpen { 439 return nil 440 } 441 // if the user set no test streams, anyone can stream 442 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 443 // but only valid atproto accounts! did:key is only allowed for our local test stream 444 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 445 if openServer && !isDIDKey { 446 return nil 447 } 448 for _, a := range cli.AllowedStreams { 449 if a == did { 450 return nil 451 } 452 } 453 return fmt.Errorf("user is not allowed to stream") 454} 455 456func (cli *CLI) MyDID() string { 457 return fmt.Sprintf("did:web:%s", cli.PublicHost) 458}