Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/remove-auth-webhook-url 449 lines 16 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/lestrrat-go/jwx/v2/jwk" 21 "github.com/peterbourgon/ff/v3" 22 "golang.org/x/exp/rand" 23 "stream.place/streamplace/pkg/aqtime" 24 "stream.place/streamplace/pkg/constants" 25 "stream.place/streamplace/pkg/crypto/aqpub" 26 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 27) 28 29const SPDataDir = "$SP_DATA_DIR" 30const SegmentsDir = "segments" 31 32type BuildFlags struct { 33 Version string 34 BuildTime int64 35 UUID string 36} 37 38func (b BuildFlags) BuildTimeStr() string { 39 ts := time.Unix(b.BuildTime, 0) 40 return ts.UTC().Format(time.RFC3339) 41} 42 43func (b BuildFlags) BuildTimeStrExpo() string { 44 ts := time.Unix(b.BuildTime, 0) 45 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 46} 47 48type CLI struct { 49 AdminAccount string 50 Build *BuildFlags 51 DataDir string 52 DBPath string 53 EthAccountAddr string 54 EthKeystorePath string 55 EthPassword string 56 FirebaseServiceAccount string 57 GitLabURL string 58 HTTPAddr string 59 HTTPInternalAddr string 60 HTTPSAddr string 61 RtmpsAddr string 62 Secure bool 63 NoMist bool 64 MistAdminPort int 65 MistHTTPPort int 66 MistRTMPPort int 67 SigningKeyPath string 68 TAURL string 69 TLSCertPath string 70 TLSKeyPath string 71 PKCS11ModulePath string 72 PKCS11Pin string 73 PKCS11TokenSlot string 74 PKCS11TokenLabel string 75 PKCS11TokenSerial string 76 PKCS11KeypairLabel string 77 PKCS11KeypairID string 78 StreamerName string 79 RelayHost string 80 Debug map[string]map[string]int 81 AllowedStreams []string 82 WideOpen bool 83 Peers []string 84 Redirects []string 85 TestStream bool 86 FrontendProxy string 87 AppBundleID string 88 NoFirehose bool 89 PrintChat bool 90 Color string 91 LivepeerGatewayURL string 92 WHIPTest string 93 Thumbnail bool 94 SmearAudio bool 95 ExternalSigning bool 96 RTMPServerAddon string 97 TracingEndpoint string 98 PublicHost string 99 RateLimitPerSecond int 100 RateLimitBurst int 101 RateLimitWebsocket int 102 JWK jwk.Key 103 AccessJWK jwk.Key 104 dataDirFlags []*string 105 DiscordWebhooks []*discordtypes.Webhook 106 NewWebRTCPlayback bool 107} 108 109func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 110 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 111 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 112 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 113 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 114 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 115 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 116 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 117 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 118 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 119 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 120 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 121 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 122 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 123 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 124 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 125 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 126 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 127 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 128 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 129 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 130 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 131 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 132 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 133 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 134 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 135 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 136 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 137 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 138 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 139 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 140 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 141 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 142 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 143 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 144 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 145 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 146 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 147 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 148 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 149 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 150 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 151 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 152 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 153 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 154 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 155 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 156 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 157 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 158 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 159 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 160 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 161 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 162 163 if runtime.GOOS == "linux" { 164 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 165 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 166 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 167 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 168 } 169 return fs 170} 171 172var StreamplaceSchemePrefix = "streamplace://" 173 174func (cli *CLI) OwnInternalURL() string { 175 // No errors because we know it's valid from AddrFlag 176 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 177 178 ip := net.ParseIP(host) 179 if ip.IsUnspecified() { 180 host = "127.0.0.1" 181 } 182 addr := net.JoinHostPort(host, port) 183 return fmt.Sprintf("http://%s", addr) 184} 185 186func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 187 bs, err := os.ReadFile(cli.SigningKeyPath) 188 if err != nil { 189 return nil, err 190 } 191 block, _ := pem.Decode(bs) 192 if block == nil { 193 return nil, fmt.Errorf("no RSA key found in signing key") 194 } 195 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 196 if err != nil { 197 return nil, err 198 } 199 return key, nil 200} 201 202func RandomTrailer(length int) string { 203 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 204 205 res := make([]byte, length) 206 for i := 0; i < length; i++ { 207 res[i] = charset[rand.Intn(len(charset))] 208 } 209 return string(res) 210} 211 212func DefaultDataDir() string { 213 home, err := os.UserHomeDir() 214 if err != nil { 215 // not fatal unless the user doesn't set one later 216 return "" 217 } 218 return filepath.Join(home, ".streamplace") 219} 220 221func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 222 err := ff.Parse( 223 fs, args, 224 ff.WithEnvVarPrefix("SP"), 225 ) 226 if err != nil { 227 return err 228 } 229 if cli.DataDir == "" { 230 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 231 } 232 for _, dest := range cli.dataDirFlags { 233 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 234 } 235 return nil 236} 237 238func (cli *CLI) DataFilePath(fpath []string) string { 239 if cli.DataDir == "" { 240 panic("no data dir configured") 241 } 242 // windows does not like colons 243 safe := []string{} 244 for _, p := range fpath { 245 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 246 } 247 fpath = append([]string{cli.DataDir}, safe...) 248 fdpath := filepath.Join(fpath...) 249 return fdpath 250} 251 252// does a file exist in our data dir? 253func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 254 ddpath := cli.DataFilePath(fpath) 255 _, err := os.Stat(ddpath) 256 if err == nil { 257 return true, nil 258 } 259 if errors.Is(err, os.ErrNotExist) { 260 return false, nil 261 } 262 return false, err 263} 264 265// write a file to our data dir 266func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 267 fd, err := cli.DataFileCreate(fpath, overwrite) 268 if err != nil { 269 return err 270 } 271 defer fd.Close() 272 _, err = io.Copy(fd, r) 273 if err != nil { 274 return err 275 } 276 277 return nil 278} 279 280// create a file in our data dir. don't forget to close it! 281func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 282 ddpath := cli.DataFilePath(fpath) 283 if !overwrite { 284 exists, err := cli.DataFileExists(fpath) 285 if err != nil { 286 return nil, err 287 } 288 if exists { 289 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 290 } 291 } 292 if len(fpath) > 1 { 293 dirs, _ := filepath.Split(ddpath) 294 err := os.MkdirAll(dirs, os.ModePerm) 295 if err != nil { 296 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 297 } 298 } 299 return os.Create(ddpath) 300} 301 302// get a path to a segment file in our database 303func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 304 ext := filepath.Ext(file) 305 base := strings.TrimSuffix(file, ext) 306 aqt, err := aqtime.FromString(base) 307 if err != nil { 308 return "", err 309 } 310 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 311 yr, mon, day, hr, min, _, _ := aqt.Parts() 312 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 313} 314 315// get a path to a segment file in our database 316func (cli *CLI) HLSDir(user string) (string, error) { 317 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 318} 319 320// create a segment file in our database 321func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 322 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 323 yr, mon, day, hr, min, _, _ := aqt.Parts() 324 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 325} 326 327// read a file from our data dir 328func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 329 ddpath := cli.DataFilePath(fpath) 330 331 fd, err := os.Open(ddpath) 332 if err != nil { 333 return err 334 } 335 _, err = io.Copy(w, fd) 336 if err != nil { 337 return err 338 } 339 340 return nil 341} 342 343func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 344 cli.dataDirFlags = append(cli.dataDirFlags, dest) 345 *dest = filepath.Join(SPDataDir, defaultValue) 346 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 347 fs.Func(name, usage, func(s string) error { 348 *dest = s 349 return nil 350 }) 351} 352 353func (cli *CLI) HasMist() bool { 354 return runtime.GOOS == "linux" 355} 356 357// type for comma-separated ethereum addresses 358func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 359 *dest = []aqpub.Pub{} 360 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 361 fs.Func(name, usage, func(s string) error { 362 if s == "" { 363 return nil 364 } 365 strs := strings.Split(s, ",") 366 for _, str := range strs { 367 pub, err := aqpub.FromHexString(str) 368 if err != nil { 369 return err 370 } 371 *dest = append(*dest, pub) 372 } 373 return nil 374 }) 375} 376 377func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 378 *dest = []string{} 379 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 380 fs.Func(name, usage, func(s string) error { 381 if s == "" { 382 return nil 383 } 384 strs := strings.Split(s, ",") 385 *dest = append(*dest, strs...) 386 return nil 387 }) 388} 389 390func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 391 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 392 fs.Func(name, usage, func(s string) error { 393 if s == "" { 394 return nil 395 } 396 return json.Unmarshal([]byte(s), dest) 397 }) 398} 399 400// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 401func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 402 *dest = map[string]map[string]int{} 403 fs.Func(name, usage, func(s string) error { 404 if s == "" { 405 return nil 406 } 407 pairs := strings.Split(s, ",") 408 for _, pair := range pairs { 409 scoreSplit := strings.Split(pair, ":") 410 if len(scoreSplit) != 2 { 411 return fmt.Errorf("invalid debug flag: %s", pair) 412 } 413 score, err := strconv.Atoi(scoreSplit[1]) 414 if err != nil { 415 return fmt.Errorf("invalid debug flag: %s", pair) 416 } 417 selectorSplit := strings.Split(scoreSplit[0], "=") 418 if len(selectorSplit) != 2 { 419 return fmt.Errorf("invalid debug flag: %s", pair) 420 } 421 _, ok := (*dest)[selectorSplit[0]] 422 if !ok { 423 (*dest)[selectorSplit[0]] = map[string]int{} 424 } 425 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 426 } 427 428 return nil 429 }) 430} 431 432func (cli *CLI) StreamIsAllowed(did string) error { 433 if cli.WideOpen { 434 return nil 435 } 436 // if the user set no test streams, anyone can stream 437 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 438 // but only valid atproto accounts! did:key is only allowed for our local test stream 439 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 440 if openServer && !isDIDKey { 441 return nil 442 } 443 for _, a := range cli.AllowedStreams { 444 if a == did { 445 return nil 446 } 447 } 448 return fmt.Errorf("user is not allowed to stream") 449}