Live video on the AT Protocol
at eli/database-resync 385 lines 10 kB view raw
1package config 2 3import ( 4 "crypto/rsa" 5 "crypto/x509" 6 "encoding/json" 7 "encoding/pem" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "net" 13 "os" 14 "path/filepath" 15 "runtime" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/lestrrat-go/jwx/v2/jwk" 21 "github.com/peterbourgon/ff/v3" 22 "golang.org/x/exp/rand" 23 "stream.place/streamplace/pkg/aqtime" 24 "stream.place/streamplace/pkg/constants" 25 "stream.place/streamplace/pkg/crypto/aqpub" 26 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 27) 28 29const SPDataDir = "$SP_DATA_DIR" 30const SegmentsDir = "segments" 31 32type BuildFlags struct { 33 Version string 34 BuildTime int64 35 UUID string 36} 37 38func (b BuildFlags) BuildTimeStr() string { 39 ts := time.Unix(b.BuildTime, 0) 40 return ts.UTC().Format(time.RFC3339) 41} 42 43func (b BuildFlags) BuildTimeStrExpo() string { 44 ts := time.Unix(b.BuildTime, 0) 45 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 46} 47 48type CLI struct { 49 AdminAccount string 50 Build *BuildFlags 51 DataDir string 52 DBPath string 53 EthAccountAddr string 54 EthKeystorePath string 55 EthPassword string 56 FirebaseServiceAccount string 57 GitLabURL string 58 HTTPAddr string 59 HTTPInternalAddr string 60 HTTPSAddr string 61 RtmpsAddr string 62 Secure bool 63 NoMist bool 64 MistAdminPort int 65 MistHTTPPort int 66 MistRTMPPort int 67 SigningKeyPath string 68 TAURL string 69 TLSCertPath string 70 TLSKeyPath string 71 PKCS11ModulePath string 72 PKCS11Pin string 73 PKCS11TokenSlot string 74 PKCS11TokenLabel string 75 PKCS11TokenSerial string 76 PKCS11KeypairLabel string 77 PKCS11KeypairID string 78 StreamerName string 79 RelayHost string 80 Debug map[string]map[string]int 81 AllowedStreams []string 82 WideOpen bool 83 Peers []string 84 Redirects []string 85 TestStream bool 86 FrontendProxy string 87 AppBundleID string 88 NoFirehose bool 89 PrintChat bool 90 Color string 91 LivepeerGatewayURL string 92 WHIPTest string 93 Thumbnail bool 94 SmearAudio bool 95 ExternalSigning bool 96 RTMPServerAddon string 97 TracingEndpoint string 98 PublicHost string 99 RateLimitPerSecond int 100 RateLimitBurst int 101 RateLimitWebsocket int 102 JWK jwk.Key 103 AccessJWK jwk.Key 104 dataDirFlags []*string 105 DiscordWebhooks []*discordtypes.Webhook 106} 107 108var StreamplaceSchemePrefix = "streamplace://" 109 110func (cli *CLI) OwnInternalURL() string { 111 // No errors because we know it's valid from AddrFlag 112 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 113 114 ip := net.ParseIP(host) 115 if ip.IsUnspecified() { 116 host = "127.0.0.1" 117 } 118 addr := net.JoinHostPort(host, port) 119 return fmt.Sprintf("http://%s", addr) 120} 121 122func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 123 bs, err := os.ReadFile(cli.SigningKeyPath) 124 if err != nil { 125 return nil, err 126 } 127 block, _ := pem.Decode(bs) 128 if block == nil { 129 return nil, fmt.Errorf("no RSA key found in signing key") 130 } 131 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 132 if err != nil { 133 return nil, err 134 } 135 return key, nil 136} 137 138func RandomTrailer(length int) string { 139 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 140 141 res := make([]byte, length) 142 for i := 0; i < length; i++ { 143 res[i] = charset[rand.Intn(len(charset))] 144 } 145 return string(res) 146} 147 148func DefaultDataDir() string { 149 home, err := os.UserHomeDir() 150 if err != nil { 151 // not fatal unless the user doesn't set one later 152 return "" 153 } 154 return filepath.Join(home, ".streamplace") 155} 156 157func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 158 err := ff.Parse( 159 fs, os.Args[1:], 160 ff.WithEnvVarPrefix("SP"), 161 ) 162 if err != nil { 163 return err 164 } 165 if cli.DataDir == "" { 166 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 167 } 168 for _, dest := range cli.dataDirFlags { 169 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 170 } 171 return nil 172} 173 174func (cli *CLI) DataFilePath(fpath []string) string { 175 if cli.DataDir == "" { 176 panic("no data dir configured") 177 } 178 // windows does not like colons 179 safe := []string{} 180 for _, p := range fpath { 181 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 182 } 183 fpath = append([]string{cli.DataDir}, safe...) 184 fdpath := filepath.Join(fpath...) 185 return fdpath 186} 187 188// does a file exist in our data dir? 189func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 190 ddpath := cli.DataFilePath(fpath) 191 _, err := os.Stat(ddpath) 192 if err == nil { 193 return true, nil 194 } 195 if errors.Is(err, os.ErrNotExist) { 196 return false, nil 197 } 198 return false, err 199} 200 201// write a file to our data dir 202func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 203 fd, err := cli.DataFileCreate(fpath, overwrite) 204 if err != nil { 205 return err 206 } 207 defer fd.Close() 208 _, err = io.Copy(fd, r) 209 if err != nil { 210 return err 211 } 212 213 return nil 214} 215 216// create a file in our data dir. don't forget to close it! 217func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 218 ddpath := cli.DataFilePath(fpath) 219 if !overwrite { 220 exists, err := cli.DataFileExists(fpath) 221 if err != nil { 222 return nil, err 223 } 224 if exists { 225 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 226 } 227 } 228 if len(fpath) > 1 { 229 dirs, _ := filepath.Split(ddpath) 230 err := os.MkdirAll(dirs, os.ModePerm) 231 if err != nil { 232 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 233 } 234 } 235 return os.Create(ddpath) 236} 237 238// get a path to a segment file in our database 239func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 240 ext := filepath.Ext(file) 241 base := strings.TrimSuffix(file, ext) 242 aqt, err := aqtime.FromString(base) 243 if err != nil { 244 return "", err 245 } 246 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 247 yr, mon, day, hr, min, _, _ := aqt.Parts() 248 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 249} 250 251// get a path to a segment file in our database 252func (cli *CLI) HLSDir(user string) (string, error) { 253 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 254} 255 256// create a segment file in our database 257func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 258 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 259 yr, mon, day, hr, min, _, _ := aqt.Parts() 260 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 261} 262 263// read a file from our data dir 264func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 265 ddpath := cli.DataFilePath(fpath) 266 267 fd, err := os.Open(ddpath) 268 if err != nil { 269 return err 270 } 271 _, err = io.Copy(w, fd) 272 if err != nil { 273 return err 274 } 275 276 return nil 277} 278 279func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 280 cli.dataDirFlags = append(cli.dataDirFlags, dest) 281 *dest = filepath.Join(SPDataDir, defaultValue) 282 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 283 fs.Func(name, usage, func(s string) error { 284 *dest = s 285 return nil 286 }) 287} 288 289func (cli *CLI) HasMist() bool { 290 return runtime.GOOS == "linux" 291} 292 293// type for comma-separated ethereum addresses 294func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 295 *dest = []aqpub.Pub{} 296 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 297 fs.Func(name, usage, func(s string) error { 298 if s == "" { 299 return nil 300 } 301 strs := strings.Split(s, ",") 302 for _, str := range strs { 303 pub, err := aqpub.FromHexString(str) 304 if err != nil { 305 return err 306 } 307 *dest = append(*dest, pub) 308 } 309 return nil 310 }) 311} 312 313func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 314 *dest = []string{} 315 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 316 fs.Func(name, usage, func(s string) error { 317 if s == "" { 318 return nil 319 } 320 strs := strings.Split(s, ",") 321 *dest = append(*dest, strs...) 322 return nil 323 }) 324} 325 326func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 327 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 328 fs.Func(name, usage, func(s string) error { 329 if s == "" { 330 return nil 331 } 332 return json.Unmarshal([]byte(s), dest) 333 }) 334} 335 336// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 337func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 338 *dest = map[string]map[string]int{} 339 fs.Func(name, usage, func(s string) error { 340 if s == "" { 341 return nil 342 } 343 pairs := strings.Split(s, ",") 344 for _, pair := range pairs { 345 scoreSplit := strings.Split(pair, ":") 346 if len(scoreSplit) != 2 { 347 return fmt.Errorf("invalid debug flag: %s", pair) 348 } 349 score, err := strconv.Atoi(scoreSplit[1]) 350 if err != nil { 351 return fmt.Errorf("invalid debug flag: %s", pair) 352 } 353 selectorSplit := strings.Split(scoreSplit[0], "=") 354 if len(selectorSplit) != 2 { 355 return fmt.Errorf("invalid debug flag: %s", pair) 356 } 357 _, ok := (*dest)[selectorSplit[0]] 358 if !ok { 359 (*dest)[selectorSplit[0]] = map[string]int{} 360 } 361 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 362 } 363 364 return nil 365 }) 366} 367 368func (cli *CLI) StreamIsAllowed(did string) error { 369 if cli.WideOpen { 370 return nil 371 } 372 // if the user set no test streams, anyone can stream 373 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 374 // but only valid atproto accounts! did:key is only allowed for our local test stream 375 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 376 if openServer && !isDIDKey { 377 return nil 378 } 379 for _, a := range cli.AllowedStreams { 380 if a == did { 381 return nil 382 } 383 } 384 return fmt.Errorf("user is not allowed to stream") 385}