Live video on the AT Protocol
at eli/optional-convergence 695 lines 26 kB view raw
1package config 2 3import ( 4 "context" 5 "crypto/rsa" 6 "crypto/x509" 7 "encoding/json" 8 "encoding/pem" 9 "errors" 10 "flag" 11 "fmt" 12 "io" 13 "net" 14 "os" 15 "path/filepath" 16 "runtime" 17 "strconv" 18 "strings" 19 "time" 20 21 "math/rand/v2" 22 23 "github.com/lestrrat-go/jwx/v2/jwk" 24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 25 "github.com/lmittmann/tint" 26 slogGorm "github.com/orandin/slog-gorm" 27 "github.com/peterbourgon/ff/v3" 28 "stream.place/streamplace/pkg/aqtime" 29 "stream.place/streamplace/pkg/constants" 30 "stream.place/streamplace/pkg/crypto/aqpub" 31 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 32 "stream.place/streamplace/pkg/log" 33) 34 35const SPDataDir = "$SP_DATA_DIR" 36const SegmentsDir = "segments" 37 38type BuildFlags struct { 39 Version string 40 BuildTime int64 41 UUID string 42} 43 44func (b BuildFlags) BuildTimeStr() string { 45 ts := time.Unix(b.BuildTime, 0) 46 return ts.UTC().Format(time.RFC3339) 47} 48 49func (b BuildFlags) BuildTimeStrExpo() string { 50 ts := time.Unix(b.BuildTime, 0) 51 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 52} 53 54type CLI struct { 55 AdminAccount string 56 Build *BuildFlags 57 DataDir string 58 DBURL string 59 EthAccountAddr string 60 EthKeystorePath string 61 EthPassword string 62 FirebaseServiceAccount string 63 FirebaseServiceAccountFile string 64 GitLabURL string 65 HTTPAddr string 66 HTTPInternalAddr string 67 HTTPSAddr string 68 RTMPAddr string 69 RTMPSAddr string 70 Secure bool 71 NoMist bool 72 MistAdminPort int 73 MistHTTPPort int 74 MistRTMPPort int 75 SigningKeyPath string 76 TAURL string 77 TLSCertPath string 78 TLSKeyPath string 79 PKCS11ModulePath string 80 PKCS11Pin string 81 PKCS11TokenSlot string 82 PKCS11TokenLabel string 83 PKCS11TokenSerial string 84 PKCS11KeypairLabel string 85 PKCS11KeypairID string 86 StreamerName string 87 RelayHost string 88 Debug map[string]map[string]int 89 AllowedStreams []string 90 WideOpen bool 91 Peers []string 92 Redirects []string 93 TestStream bool 94 FrontendProxy string 95 PublicOAuth bool 96 AppBundleID string 97 NoFirehose bool 98 PrintChat bool 99 Color string 100 LivepeerGatewayURL string 101 LivepeerGateway bool 102 WHIPTest string 103 Thumbnail bool 104 SmearAudio bool 105 ExternalSigning bool 106 RTMPServerAddon string 107 TracingEndpoint string 108 BroadcasterHost string 109 XXDeprecatedPublicHost string 110 ServerHost string 111 RateLimitPerSecond int 112 RateLimitBurst int 113 RateLimitWebsocket int 114 JWK jwk.Key 115 AccessJWK jwk.Key 116 dataDirFlags []*string 117 DiscordWebhooks []*discordtypes.Webhook 118 NewWebRTCPlayback bool 119 AppleTeamID string 120 AndroidCertFingerprint string 121 Labelers []string 122 AtprotoDID string 123 LivepeerHelp bool 124 PLCURL string 125 ContentFilters *ContentFilters 126 DefaultRecommendedStreamers []string 127 SQLLogging bool 128 SentryDSN string 129 LivepeerDebug bool 130 Tickets []string 131 IrohTopic string 132 DID string 133 DisableIrohRelay bool 134 DevAccountCreds map[string]string 135 StreamSessionTimeout time.Duration 136 Replicators []string 137 WebsocketURL string 138 BehindHTTPSProxy bool 139 SegmentDebugDir string 140 Syndicate []string 141} 142 143// ContentFilters represents the content filtering configuration 144type ContentFilters struct { 145 ContentWarnings struct { 146 Enabled bool `json:"enabled"` 147 BlockedWarnings []string `json:"blocked_warnings"` 148 } `json:"content_warnings"` 149 DistributionPolicy struct { 150 Enabled bool `json:"enabled"` 151 } `json:"distribution_policy"` 152} 153 154const ( 155 ReplicatorWebsocket string = "websocket" 156 ReplicatorIroh string = "iroh" 157) 158 159func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 160 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 161 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 162 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 163 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 164 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 165 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 166 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 167 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 168 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 169 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 170 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 171 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 172 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key") 173 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key") 174 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 175 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 176 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 177 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 178 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 179 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 180 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 181 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 182 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 183 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 184 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 185 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 186 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 187 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 188 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 189 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development") 190 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 191 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 192 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 193 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node") 194 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to") 195 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four") 196 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 197 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 198 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 199 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 200 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 201 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 202 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 203 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)") 204 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate") 205 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters") 206 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 207 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 208 209 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 210 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 211 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 212 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 213 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 214 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)") 215 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)") 216 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 217 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 218 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 219 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 220 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to") 221 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 222 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules") 223 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available") 224 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 225 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 226 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 227 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 228 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 229 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to") 230 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with") 231 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)") 232 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 233 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 234 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 235 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)") 236 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)") 237 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS") 238 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)") 239 240 fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 241 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 242 243 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 244 _ = starter.NewLivepeerConfig(lpFlags) 245 lpFlags.VisitAll(func(f *flag.Flag) { 246 adapted := LivepeerFlags.CamelToSnake[f.Name] 247 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 248 }) 249 250 if runtime.GOOS == "linux" { 251 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 252 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 253 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 254 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 255 } 256 return fs 257} 258 259var StreamplaceSchemePrefix = "streamplace://" 260 261func (cli *CLI) OwnPublicURL() string { 262 // No errors because we know it's valid from AddrFlag 263 host, port, _ := net.SplitHostPort(cli.HTTPAddr) 264 265 ip := net.ParseIP(host) 266 if host == "" || ip.IsUnspecified() { 267 host = "127.0.0.1" 268 } 269 addr := net.JoinHostPort(host, port) 270 return fmt.Sprintf("http://%s", addr) 271} 272 273func (cli *CLI) OwnInternalURL() string { 274 // No errors because we know it's valid from AddrFlag 275 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 276 277 ip := net.ParseIP(host) 278 if ip.IsUnspecified() { 279 host = "127.0.0.1" 280 } 281 addr := net.JoinHostPort(host, port) 282 return fmt.Sprintf("http://%s", addr) 283} 284 285func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 286 bs, err := os.ReadFile(cli.SigningKeyPath) 287 if err != nil { 288 return nil, err 289 } 290 block, _ := pem.Decode(bs) 291 if block == nil { 292 return nil, fmt.Errorf("no RSA key found in signing key") 293 } 294 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 295 if err != nil { 296 return nil, err 297 } 298 return key, nil 299} 300 301func RandomTrailer(length int) string { 302 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 303 304 res := make([]byte, length) 305 for i := 0; i < length; i++ { 306 res[i] = charset[rand.IntN(len(charset))] 307 } 308 return string(res) 309} 310 311func DefaultDataDir() string { 312 home, err := os.UserHomeDir() 313 if err != nil { 314 // not fatal unless the user doesn't set one later 315 return "" 316 } 317 return filepath.Join(home, ".streamplace") 318} 319 320var GormLogger = slogGorm.New( 321 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 322 TimeFormat: time.RFC3339, 323 })), 324 slogGorm.WithTraceAll(), 325) 326 327func DisableSQLLogging() { 328 GormLogger = slogGorm.New( 329 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 330 TimeFormat: time.RFC3339, 331 })), 332 ) 333} 334 335func EnableSQLLogging() { 336 GormLogger = slogGorm.New( 337 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 338 TimeFormat: time.RFC3339, 339 })), 340 slogGorm.WithTraceAll(), 341 ) 342} 343 344func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 345 err := ff.Parse( 346 fs, args, 347 ff.WithEnvVarPrefix("SP"), 348 ) 349 if err != nil { 350 return err 351 } 352 if cli.DataDir == "" { 353 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 354 } 355 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 356 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 357 } 358 if cli.LivepeerGateway { 359 log.MonkeypatchStderr() 360 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 361 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0") 362 if err != nil { 363 return err 364 } 365 err = fs.Set("livepeer.data-dir", gatewayPath) 366 if err != nil { 367 return err 368 } 369 err = fs.Set("livepeer.gateway", "true") 370 if err != nil { 371 return err 372 } 373 httpAddrFlag := fs.Lookup("livepeer.http-addr") 374 if httpAddrFlag == nil { 375 return fmt.Errorf("livepeer.http-addr not found") 376 } 377 httpAddr := httpAddrFlag.Value.String() 378 if httpAddr == "" { 379 httpAddr = "127.0.0.1:8935" 380 err = fs.Set("livepeer.http-addr", httpAddr) 381 if err != nil { 382 return err 383 } 384 } 385 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 386 } 387 for _, dest := range cli.dataDirFlags { 388 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 389 } 390 if !cli.SQLLogging { 391 DisableSQLLogging() 392 } else { 393 EnableSQLLogging() 394 } 395 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" { 396 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate") 397 cli.BroadcasterHost = cli.XXDeprecatedPublicHost 398 } 399 if cli.ServerHost == "" && cli.BroadcasterHost != "" { 400 cli.ServerHost = cli.BroadcasterHost 401 } 402 if cli.PublicOAuth { 403 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production") 404 } 405 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" { 406 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?") 407 } 408 if cli.FirebaseServiceAccountFile != "" { 409 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile) 410 if err != nil { 411 return err 412 } 413 cli.FirebaseServiceAccount = string(bs) 414 } 415 return nil 416} 417 418func (cli *CLI) DataFilePath(fpath []string) string { 419 if cli.DataDir == "" { 420 panic("no data dir configured") 421 } 422 // windows does not like colons 423 safe := []string{} 424 for _, p := range fpath { 425 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 426 } 427 fpath = append([]string{cli.DataDir}, safe...) 428 fdpath := filepath.Join(fpath...) 429 return fdpath 430} 431 432// does a file exist in our data dir? 433func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 434 ddpath := cli.DataFilePath(fpath) 435 _, err := os.Stat(ddpath) 436 if err == nil { 437 return true, nil 438 } 439 if errors.Is(err, os.ErrNotExist) { 440 return false, nil 441 } 442 return false, err 443} 444 445// write a file to our data dir 446func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 447 fd, err := cli.DataFileCreate(fpath, overwrite) 448 if err != nil { 449 return err 450 } 451 defer fd.Close() 452 _, err = io.Copy(fd, r) 453 if err != nil { 454 return err 455 } 456 457 return nil 458} 459 460// create a file in our data dir. don't forget to close it! 461func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 462 ddpath := cli.DataFilePath(fpath) 463 if !overwrite { 464 exists, err := cli.DataFileExists(fpath) 465 if err != nil { 466 return nil, err 467 } 468 if exists { 469 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 470 } 471 } 472 if len(fpath) > 1 { 473 dirs, _ := filepath.Split(ddpath) 474 err := os.MkdirAll(dirs, os.ModePerm) 475 if err != nil { 476 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 477 } 478 } 479 return os.Create(ddpath) 480} 481 482// get a path to a segment file in our database 483func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 484 ext := filepath.Ext(file) 485 base := strings.TrimSuffix(file, ext) 486 aqt, err := aqtime.FromString(base) 487 if err != nil { 488 return "", err 489 } 490 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 491 yr, mon, day, hr, min, _, _ := aqt.Parts() 492 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 493} 494 495// get a path to a segment file in our database 496func (cli *CLI) HLSDir(user string) (string, error) { 497 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 498} 499 500// create a segment file in our database 501func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 502 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 503 yr, mon, day, hr, min, _, _ := aqt.Parts() 504 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 505} 506 507// read a file from our data dir 508func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 509 ddpath := cli.DataFilePath(fpath) 510 511 fd, err := os.Open(ddpath) 512 if err != nil { 513 return err 514 } 515 _, err = io.Copy(w, fd) 516 if err != nil { 517 return err 518 } 519 520 return nil 521} 522 523func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 524 cli.dataDirFlags = append(cli.dataDirFlags, dest) 525 *dest = filepath.Join(SPDataDir, defaultValue) 526 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 527 fs.Func(name, usage, func(s string) error { 528 *dest = s 529 return nil 530 }) 531} 532 533func (cli *CLI) HasMist() bool { 534 return runtime.GOOS == "linux" 535} 536 537// type for comma-separated ethereum addresses 538func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 539 *dest = []aqpub.Pub{} 540 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 541 fs.Func(name, usage, func(s string) error { 542 if s == "" { 543 return nil 544 } 545 strs := strings.Split(s, ",") 546 for _, str := range strs { 547 pub, err := aqpub.FromHexString(str) 548 if err != nil { 549 return err 550 } 551 *dest = append(*dest, pub) 552 } 553 return nil 554 }) 555} 556 557func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) { 558 *dest = defaultValue 559 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 560 fs.Func(name, usage, func(s string) error { 561 if s == "" { 562 return nil 563 } 564 strs := strings.Split(s, ",") 565 *dest = append([]string{}, strs...) 566 return nil 567 }) 568} 569 570func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) { 571 *dest = map[string]string{} 572 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 573 fs.Func(name, usage, func(s string) error { 574 if s == "" { 575 return nil 576 } 577 pairs := strings.Split(s, ",") 578 for _, pair := range pairs { 579 parts := strings.Split(pair, "=") 580 if len(parts) != 2 { 581 return fmt.Errorf("invalid kv flag: %s", pair) 582 } 583 (*dest)[parts[0]] = parts[1] 584 } 585 return nil 586 }) 587} 588 589func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 590 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 591 fs.Func(name, usage, func(s string) error { 592 if s == "" { 593 return nil 594 } 595 return json.Unmarshal([]byte(s), dest) 596 }) 597} 598 599// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 600func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 601 *dest = map[string]map[string]int{} 602 fs.Func(name, usage, func(s string) error { 603 if s == "" { 604 return nil 605 } 606 pairs := strings.Split(s, ",") 607 for _, pair := range pairs { 608 scoreSplit := strings.Split(pair, ":") 609 if len(scoreSplit) != 2 { 610 return fmt.Errorf("invalid debug flag: %s", pair) 611 } 612 score, err := strconv.Atoi(scoreSplit[1]) 613 if err != nil { 614 return fmt.Errorf("invalid debug flag: %s", pair) 615 } 616 selectorSplit := strings.Split(scoreSplit[0], "=") 617 if len(selectorSplit) != 2 { 618 return fmt.Errorf("invalid debug flag: %s", pair) 619 } 620 _, ok := (*dest)[selectorSplit[0]] 621 if !ok { 622 (*dest)[selectorSplit[0]] = map[string]int{} 623 } 624 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 625 } 626 627 return nil 628 }) 629} 630 631func (cli *CLI) StreamIsAllowed(did string) error { 632 if cli.WideOpen { 633 return nil 634 } 635 // if the user set no test streams, anyone can stream 636 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 637 // but only valid atproto accounts! did:key is only allowed for our local test stream 638 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 639 if openServer && !isDIDKey { 640 return nil 641 } 642 for _, a := range cli.AllowedStreams { 643 if a == did { 644 return nil 645 } 646 } 647 return fmt.Errorf("user is not allowed to stream") 648} 649 650func (cli *CLI) MyDID() string { 651 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost) 652} 653 654func (cli *CLI) HasHTTPS() bool { 655 return cli.Secure || cli.BehindHTTPSProxy 656} 657 658func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) { 659 if cli.SegmentDebugDir == "" { 660 return 661 } 662 go func() { 663 err := os.MkdirAll(cli.SegmentDebugDir, 0755) 664 if err != nil { 665 log.Error(ctx, "failed to create debug directory", "error", err) 666 return 667 } 668 now := aqtime.FromTime(time.Now()) 669 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-"))) 670 fd, err := os.Create(outFile) 671 if err != nil { 672 log.Error(ctx, "failed to create debug file", "error", err) 673 return 674 } 675 defer fd.Close() 676 _, err = io.Copy(fd, r) 677 if err != nil { 678 log.Error(ctx, "failed to copy debug file", "error", err) 679 return 680 } 681 log.Log(ctx, "wrote debug file", "path", outFile) 682 }() 683} 684 685func (cli *CLI) ShouldSyndicate(did string) bool { 686 for _, d := range cli.Syndicate { 687 if d == "*" { 688 return true 689 } 690 if d == did { 691 return true 692 } 693 } 694 return false 695}