Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/chat-bottom-offset 453 lines 17 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/lestrrat-go/jwx/v2/jwk" 21 "github.com/peterbourgon/ff/v3" 22 "golang.org/x/exp/rand" 23 "stream.place/streamplace/pkg/aqtime" 24 "stream.place/streamplace/pkg/constants" 25 "stream.place/streamplace/pkg/crypto/aqpub" 26 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 27) 28 29const SPDataDir = "$SP_DATA_DIR" 30const SegmentsDir = "segments" 31 32type BuildFlags struct { 33 Version string 34 BuildTime int64 35 UUID string 36} 37 38func (b BuildFlags) BuildTimeStr() string { 39 ts := time.Unix(b.BuildTime, 0) 40 return ts.UTC().Format(time.RFC3339) 41} 42 43func (b BuildFlags) BuildTimeStrExpo() string { 44 ts := time.Unix(b.BuildTime, 0) 45 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 46} 47 48type CLI struct { 49 AdminAccount string 50 Build *BuildFlags 51 DataDir string 52 DBPath string 53 EthAccountAddr string 54 EthKeystorePath string 55 EthPassword string 56 FirebaseServiceAccount string 57 GitLabURL string 58 HTTPAddr string 59 HTTPInternalAddr string 60 HTTPSAddr string 61 RtmpsAddr string 62 Secure bool 63 NoMist bool 64 MistAdminPort int 65 MistHTTPPort int 66 MistRTMPPort int 67 SigningKeyPath string 68 TAURL string 69 TLSCertPath string 70 TLSKeyPath string 71 PKCS11ModulePath string 72 PKCS11Pin string 73 PKCS11TokenSlot string 74 PKCS11TokenLabel string 75 PKCS11TokenSerial string 76 PKCS11KeypairLabel string 77 PKCS11KeypairID string 78 StreamerName string 79 RelayHost string 80 Debug map[string]map[string]int 81 AllowedStreams []string 82 WideOpen bool 83 Peers []string 84 Redirects []string 85 TestStream bool 86 FrontendProxy string 87 AppBundleID string 88 NoFirehose bool 89 PrintChat bool 90 Color string 91 LivepeerGatewayURL string 92 WHIPTest string 93 Thumbnail bool 94 SmearAudio bool 95 ExternalSigning bool 96 RTMPServerAddon string 97 TracingEndpoint string 98 PublicHost string 99 RateLimitPerSecond int 100 RateLimitBurst int 101 RateLimitWebsocket int 102 JWK jwk.Key 103 AccessJWK jwk.Key 104 dataDirFlags []*string 105 DiscordWebhooks []*discordtypes.Webhook 106 NewWebRTCPlayback bool 107 AppleTeamID string 108 AndroidCertFingerprint string 109} 110 111func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 112 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 113 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 114 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 115 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 116 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 117 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 118 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 119 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 120 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 121 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 122 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 123 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 124 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 125 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 126 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 127 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 128 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 129 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 130 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 131 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 132 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 133 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 134 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 135 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 136 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 137 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 138 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 139 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 140 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 141 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 142 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 143 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 144 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 145 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 146 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 147 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 148 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 149 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 150 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 151 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 152 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 153 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 154 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 155 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 156 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 157 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 158 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 159 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 160 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 161 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 162 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 163 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 164 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 165 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 166 167 if runtime.GOOS == "linux" { 168 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 169 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 170 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 171 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 172 } 173 return fs 174} 175 176var StreamplaceSchemePrefix = "streamplace://" 177 178func (cli *CLI) OwnInternalURL() string { 179 // No errors because we know it's valid from AddrFlag 180 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 181 182 ip := net.ParseIP(host) 183 if ip.IsUnspecified() { 184 host = "127.0.0.1" 185 } 186 addr := net.JoinHostPort(host, port) 187 return fmt.Sprintf("http://%s", addr) 188} 189 190func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 191 bs, err := os.ReadFile(cli.SigningKeyPath) 192 if err != nil { 193 return nil, err 194 } 195 block, _ := pem.Decode(bs) 196 if block == nil { 197 return nil, fmt.Errorf("no RSA key found in signing key") 198 } 199 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 200 if err != nil { 201 return nil, err 202 } 203 return key, nil 204} 205 206func RandomTrailer(length int) string { 207 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 208 209 res := make([]byte, length) 210 for i := 0; i < length; i++ { 211 res[i] = charset[rand.Intn(len(charset))] 212 } 213 return string(res) 214} 215 216func DefaultDataDir() string { 217 home, err := os.UserHomeDir() 218 if err != nil { 219 // not fatal unless the user doesn't set one later 220 return "" 221 } 222 return filepath.Join(home, ".streamplace") 223} 224 225func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 226 err := ff.Parse( 227 fs, args, 228 ff.WithEnvVarPrefix("SP"), 229 ) 230 if err != nil { 231 return err 232 } 233 if cli.DataDir == "" { 234 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 235 } 236 for _, dest := range cli.dataDirFlags { 237 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 238 } 239 return nil 240} 241 242func (cli *CLI) DataFilePath(fpath []string) string { 243 if cli.DataDir == "" { 244 panic("no data dir configured") 245 } 246 // windows does not like colons 247 safe := []string{} 248 for _, p := range fpath { 249 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 250 } 251 fpath = append([]string{cli.DataDir}, safe...) 252 fdpath := filepath.Join(fpath...) 253 return fdpath 254} 255 256// does a file exist in our data dir? 257func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 258 ddpath := cli.DataFilePath(fpath) 259 _, err := os.Stat(ddpath) 260 if err == nil { 261 return true, nil 262 } 263 if errors.Is(err, os.ErrNotExist) { 264 return false, nil 265 } 266 return false, err 267} 268 269// write a file to our data dir 270func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 271 fd, err := cli.DataFileCreate(fpath, overwrite) 272 if err != nil { 273 return err 274 } 275 defer fd.Close() 276 _, err = io.Copy(fd, r) 277 if err != nil { 278 return err 279 } 280 281 return nil 282} 283 284// create a file in our data dir. don't forget to close it! 285func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 286 ddpath := cli.DataFilePath(fpath) 287 if !overwrite { 288 exists, err := cli.DataFileExists(fpath) 289 if err != nil { 290 return nil, err 291 } 292 if exists { 293 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 294 } 295 } 296 if len(fpath) > 1 { 297 dirs, _ := filepath.Split(ddpath) 298 err := os.MkdirAll(dirs, os.ModePerm) 299 if err != nil { 300 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 301 } 302 } 303 return os.Create(ddpath) 304} 305 306// get a path to a segment file in our database 307func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 308 ext := filepath.Ext(file) 309 base := strings.TrimSuffix(file, ext) 310 aqt, err := aqtime.FromString(base) 311 if err != nil { 312 return "", err 313 } 314 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 315 yr, mon, day, hr, min, _, _ := aqt.Parts() 316 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 317} 318 319// get a path to a segment file in our database 320func (cli *CLI) HLSDir(user string) (string, error) { 321 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 322} 323 324// create a segment file in our database 325func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 326 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 327 yr, mon, day, hr, min, _, _ := aqt.Parts() 328 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 329} 330 331// read a file from our data dir 332func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 333 ddpath := cli.DataFilePath(fpath) 334 335 fd, err := os.Open(ddpath) 336 if err != nil { 337 return err 338 } 339 _, err = io.Copy(w, fd) 340 if err != nil { 341 return err 342 } 343 344 return nil 345} 346 347func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 348 cli.dataDirFlags = append(cli.dataDirFlags, dest) 349 *dest = filepath.Join(SPDataDir, defaultValue) 350 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 351 fs.Func(name, usage, func(s string) error { 352 *dest = s 353 return nil 354 }) 355} 356 357func (cli *CLI) HasMist() bool { 358 return runtime.GOOS == "linux" 359} 360 361// type for comma-separated ethereum addresses 362func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 363 *dest = []aqpub.Pub{} 364 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 365 fs.Func(name, usage, func(s string) error { 366 if s == "" { 367 return nil 368 } 369 strs := strings.Split(s, ",") 370 for _, str := range strs { 371 pub, err := aqpub.FromHexString(str) 372 if err != nil { 373 return err 374 } 375 *dest = append(*dest, pub) 376 } 377 return nil 378 }) 379} 380 381func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 382 *dest = []string{} 383 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 384 fs.Func(name, usage, func(s string) error { 385 if s == "" { 386 return nil 387 } 388 strs := strings.Split(s, ",") 389 *dest = append(*dest, strs...) 390 return nil 391 }) 392} 393 394func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 395 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 396 fs.Func(name, usage, func(s string) error { 397 if s == "" { 398 return nil 399 } 400 return json.Unmarshal([]byte(s), dest) 401 }) 402} 403 404// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 405func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 406 *dest = map[string]map[string]int{} 407 fs.Func(name, usage, func(s string) error { 408 if s == "" { 409 return nil 410 } 411 pairs := strings.Split(s, ",") 412 for _, pair := range pairs { 413 scoreSplit := strings.Split(pair, ":") 414 if len(scoreSplit) != 2 { 415 return fmt.Errorf("invalid debug flag: %s", pair) 416 } 417 score, err := strconv.Atoi(scoreSplit[1]) 418 if err != nil { 419 return fmt.Errorf("invalid debug flag: %s", pair) 420 } 421 selectorSplit := strings.Split(scoreSplit[0], "=") 422 if len(selectorSplit) != 2 { 423 return fmt.Errorf("invalid debug flag: %s", pair) 424 } 425 _, ok := (*dest)[selectorSplit[0]] 426 if !ok { 427 (*dest)[selectorSplit[0]] = map[string]int{} 428 } 429 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 430 } 431 432 return nil 433 }) 434} 435 436func (cli *CLI) StreamIsAllowed(did string) error { 437 if cli.WideOpen { 438 return nil 439 } 440 // if the user set no test streams, anyone can stream 441 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 442 // but only valid atproto accounts! did:key is only allowed for our local test stream 443 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 444 if openServer && !isDIDKey { 445 return nil 446 } 447 for _, a := range cli.AllowedStreams { 448 if a == did { 449 return nil 450 } 451 } 452 return fmt.Errorf("user is not allowed to stream") 453}