Live video on the AT Protocol
at natb/command-errors 704 lines 27 kB view raw
1package config 2 3import ( 4 "context" 5 "crypto/rsa" 6 "crypto/x509" 7 "encoding/json" 8 "encoding/pem" 9 "errors" 10 "flag" 11 "fmt" 12 "io" 13 "net" 14 "os" 15 "path/filepath" 16 "runtime" 17 "strconv" 18 "strings" 19 "time" 20 21 "math/rand/v2" 22 23 "github.com/lestrrat-go/jwx/v2/jwk" 24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 25 "github.com/lmittmann/tint" 26 slogGorm "github.com/orandin/slog-gorm" 27 "github.com/peterbourgon/ff/v3" 28 "stream.place/streamplace/pkg/aqtime" 29 "stream.place/streamplace/pkg/constants" 30 "stream.place/streamplace/pkg/crypto/aqpub" 31 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 32 "stream.place/streamplace/pkg/log" 33) 34 35const SPDataDir = "$SP_DATA_DIR" 36const SegmentsDir = "segments" 37 38type BuildFlags struct { 39 Version string 40 BuildTime int64 41 UUID string 42} 43 44func (b BuildFlags) BuildTimeStr() string { 45 ts := time.Unix(b.BuildTime, 0) 46 return ts.UTC().Format(time.RFC3339) 47} 48 49func (b BuildFlags) BuildTimeStrExpo() string { 50 ts := time.Unix(b.BuildTime, 0) 51 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 52} 53 54type CLI struct { 55 AdminAccount string 56 Build *BuildFlags 57 DataDir string 58 DBURL string 59 LocalDBURL string 60 EthAccountAddr string 61 EthKeystorePath string 62 EthPassword string 63 FirebaseServiceAccount string 64 FirebaseServiceAccountFile string 65 GitLabURL string 66 HTTPAddr string 67 HTTPInternalAddr string 68 HTTPSAddr string 69 RTMPAddr string 70 RTMPSAddr string 71 RTMPSAddonAddr string 72 Secure bool 73 NoMist bool 74 MistAdminPort int 75 MistHTTPPort int 76 MistRTMPPort int 77 SigningKeyPath string 78 TAURL string 79 TLSCertPath string 80 TLSKeyPath string 81 PKCS11ModulePath string 82 PKCS11Pin string 83 PKCS11TokenSlot string 84 PKCS11TokenLabel string 85 PKCS11TokenSerial string 86 PKCS11KeypairLabel string 87 PKCS11KeypairID string 88 StreamerName string 89 RelayHost string 90 Debug map[string]map[string]int 91 AllowedStreams []string 92 WideOpen bool 93 Peers []string 94 Redirects []string 95 TestStream bool 96 FrontendProxy string 97 PublicOAuth bool 98 AppBundleID string 99 NoFirehose bool 100 PrintChat bool 101 Color string 102 LivepeerGatewayURL string 103 LivepeerGateway bool 104 WHIPTest string 105 Thumbnail bool 106 SmearAudio bool 107 ExternalSigning bool 108 RTMPServerAddon string 109 TracingEndpoint string 110 BroadcasterHost string 111 XXDeprecatedPublicHost string 112 ServerHost string 113 RateLimitPerSecond int 114 RateLimitBurst int 115 RateLimitWebsocket int 116 JWK jwk.Key 117 AccessJWK jwk.Key 118 dataDirFlags []*string 119 DiscordWebhooks []*discordtypes.Webhook 120 NewWebRTCPlayback bool 121 AppleTeamID string 122 AndroidCertFingerprint string 123 Labelers []string 124 AtprotoDID string 125 LivepeerHelp bool 126 PLCURL string 127 ContentFilters *ContentFilters 128 DefaultRecommendedStreamers []string 129 SQLLogging bool 130 SentryDSN string 131 LivepeerDebug bool 132 Tickets []string 133 IrohTopic string 134 DID string 135 DisableIrohRelay bool 136 DevAccountCreds map[string]string 137 StreamSessionTimeout time.Duration 138 Replicators []string 139 WebsocketURL string 140 BehindHTTPSProxy bool 141 SegmentDebugDir string 142 AdminDIDs []string 143 Syndicate []string 144 PlayerTelemetry bool 145} 146 147// ContentFilters represents the content filtering configuration 148type ContentFilters struct { 149 ContentWarnings struct { 150 Enabled bool `json:"enabled"` 151 BlockedWarnings []string `json:"blocked_warnings"` 152 } `json:"content_warnings"` 153 DistributionPolicy struct { 154 Enabled bool `json:"enabled"` 155 } `json:"distribution_policy"` 156} 157 158const ( 159 ReplicatorWebsocket string = "websocket" 160 ReplicatorIroh string = "iroh" 161) 162 163func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 164 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 165 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 166 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 167 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 168 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 169 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 170 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 171 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 172 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 173 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 174 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 175 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 176 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key") 177 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key") 178 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 179 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 180 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 181 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 182 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 183 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 184 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 185 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 186 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 187 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 188 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 189 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 190 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 191 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 192 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 193 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development") 194 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 195 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 196 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 197 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node") 198 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to") 199 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four") 200 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 201 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 202 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 203 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 204 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 205 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 206 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 207 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)") 208 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate") 209 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters") 210 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 211 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 212 213 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 214 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 215 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 216 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 217 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 218 fs.StringVar(&cli.RTMPSAddonAddr, "rtmps-addon-addr", ":1936", "address to listen for RTMPS on the addon server") 219 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)") 220 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)") 221 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 222 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 223 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 224 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 225 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to") 226 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 227 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules") 228 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available") 229 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 230 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 231 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 232 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 233 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 234 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to") 235 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with") 236 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)") 237 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 238 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 239 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 240 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)") 241 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)") 242 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS") 243 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations") 244 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)") 245 fs.BoolVar(&cli.PlayerTelemetry, "player-telemetry", true, "enable player telemetry") 246 fs.StringVar(&cli.LocalDBURL, "local-db-url", "sqlite://$SP_DATA_DIR/localdb.sqlite", "URL of the local database to use for storing local data") 247 cli.dataDirFlags = append(cli.dataDirFlags, &cli.LocalDBURL) 248 249 fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 250 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 251 252 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 253 _ = starter.NewLivepeerConfig(lpFlags) 254 lpFlags.VisitAll(func(f *flag.Flag) { 255 adapted := LivepeerFlags.CamelToSnake[f.Name] 256 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 257 }) 258 259 if runtime.GOOS == "linux" { 260 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 261 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 262 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 263 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 264 } 265 return fs 266} 267 268var StreamplaceSchemePrefix = "streamplace://" 269 270func (cli *CLI) OwnPublicURL() string { 271 // No errors because we know it's valid from AddrFlag 272 host, port, _ := net.SplitHostPort(cli.HTTPAddr) 273 274 ip := net.ParseIP(host) 275 if host == "" || ip.IsUnspecified() { 276 host = "127.0.0.1" 277 } 278 addr := net.JoinHostPort(host, port) 279 return fmt.Sprintf("http://%s", addr) 280} 281 282func (cli *CLI) OwnInternalURL() string { 283 // No errors because we know it's valid from AddrFlag 284 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 285 286 ip := net.ParseIP(host) 287 if ip.IsUnspecified() { 288 host = "127.0.0.1" 289 } 290 addr := net.JoinHostPort(host, port) 291 return fmt.Sprintf("http://%s", addr) 292} 293 294func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 295 bs, err := os.ReadFile(cli.SigningKeyPath) 296 if err != nil { 297 return nil, err 298 } 299 block, _ := pem.Decode(bs) 300 if block == nil { 301 return nil, fmt.Errorf("no RSA key found in signing key") 302 } 303 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 304 if err != nil { 305 return nil, err 306 } 307 return key, nil 308} 309 310func RandomTrailer(length int) string { 311 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 312 313 res := make([]byte, length) 314 for i := 0; i < length; i++ { 315 res[i] = charset[rand.IntN(len(charset))] 316 } 317 return string(res) 318} 319 320func DefaultDataDir() string { 321 home, err := os.UserHomeDir() 322 if err != nil { 323 // not fatal unless the user doesn't set one later 324 return "" 325 } 326 return filepath.Join(home, ".streamplace") 327} 328 329var GormLogger = slogGorm.New( 330 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 331 TimeFormat: time.RFC3339, 332 })), 333 slogGorm.WithTraceAll(), 334) 335 336func DisableSQLLogging() { 337 GormLogger = slogGorm.New( 338 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 339 TimeFormat: time.RFC3339, 340 })), 341 ) 342} 343 344func EnableSQLLogging() { 345 GormLogger = slogGorm.New( 346 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 347 TimeFormat: time.RFC3339, 348 })), 349 slogGorm.WithTraceAll(), 350 ) 351} 352 353func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 354 err := ff.Parse( 355 fs, args, 356 ff.WithEnvVarPrefix("SP"), 357 ) 358 if err != nil { 359 return err 360 } 361 if cli.DataDir == "" { 362 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 363 } 364 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 365 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 366 } 367 if cli.LivepeerGateway { 368 log.MonkeypatchStderr() 369 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 370 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0") 371 if err != nil { 372 return err 373 } 374 err = fs.Set("livepeer.data-dir", gatewayPath) 375 if err != nil { 376 return err 377 } 378 err = fs.Set("livepeer.gateway", "true") 379 if err != nil { 380 return err 381 } 382 httpAddrFlag := fs.Lookup("livepeer.http-addr") 383 if httpAddrFlag == nil { 384 return fmt.Errorf("livepeer.http-addr not found") 385 } 386 httpAddr := httpAddrFlag.Value.String() 387 if httpAddr == "" { 388 httpAddr = "127.0.0.1:8935" 389 err = fs.Set("livepeer.http-addr", httpAddr) 390 if err != nil { 391 return err 392 } 393 } 394 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 395 } 396 for _, dest := range cli.dataDirFlags { 397 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 398 } 399 if !cli.SQLLogging { 400 DisableSQLLogging() 401 } else { 402 EnableSQLLogging() 403 } 404 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" { 405 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate") 406 cli.BroadcasterHost = cli.XXDeprecatedPublicHost 407 } 408 if cli.ServerHost == "" && cli.BroadcasterHost != "" { 409 cli.ServerHost = cli.BroadcasterHost 410 } 411 if cli.PublicOAuth { 412 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production") 413 } 414 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" { 415 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?") 416 } 417 if cli.FirebaseServiceAccountFile != "" { 418 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile) 419 if err != nil { 420 return err 421 } 422 cli.FirebaseServiceAccount = string(bs) 423 } 424 return nil 425} 426 427func (cli *CLI) DataFilePath(fpath []string) string { 428 if cli.DataDir == "" { 429 panic("no data dir configured") 430 } 431 // windows does not like colons 432 safe := []string{} 433 for _, p := range fpath { 434 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 435 } 436 fpath = append([]string{cli.DataDir}, safe...) 437 fdpath := filepath.Join(fpath...) 438 return fdpath 439} 440 441// does a file exist in our data dir? 442func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 443 ddpath := cli.DataFilePath(fpath) 444 _, err := os.Stat(ddpath) 445 if err == nil { 446 return true, nil 447 } 448 if errors.Is(err, os.ErrNotExist) { 449 return false, nil 450 } 451 return false, err 452} 453 454// write a file to our data dir 455func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 456 fd, err := cli.DataFileCreate(fpath, overwrite) 457 if err != nil { 458 return err 459 } 460 defer fd.Close() 461 _, err = io.Copy(fd, r) 462 if err != nil { 463 return err 464 } 465 466 return nil 467} 468 469// create a file in our data dir. don't forget to close it! 470func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 471 ddpath := cli.DataFilePath(fpath) 472 if !overwrite { 473 exists, err := cli.DataFileExists(fpath) 474 if err != nil { 475 return nil, err 476 } 477 if exists { 478 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 479 } 480 } 481 if len(fpath) > 1 { 482 dirs, _ := filepath.Split(ddpath) 483 err := os.MkdirAll(dirs, os.ModePerm) 484 if err != nil { 485 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 486 } 487 } 488 return os.Create(ddpath) 489} 490 491// get a path to a segment file in our database 492func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 493 ext := filepath.Ext(file) 494 base := strings.TrimSuffix(file, ext) 495 aqt, err := aqtime.FromString(base) 496 if err != nil { 497 return "", err 498 } 499 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 500 yr, mon, day, hr, min, _, _ := aqt.Parts() 501 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 502} 503 504// get a path to a segment file in our database 505func (cli *CLI) HLSDir(user string) (string, error) { 506 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 507} 508 509// create a segment file in our database 510func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 511 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 512 yr, mon, day, hr, min, _, _ := aqt.Parts() 513 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 514} 515 516// read a file from our data dir 517func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 518 ddpath := cli.DataFilePath(fpath) 519 520 fd, err := os.Open(ddpath) 521 if err != nil { 522 return err 523 } 524 _, err = io.Copy(w, fd) 525 if err != nil { 526 return err 527 } 528 529 return nil 530} 531 532func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 533 cli.dataDirFlags = append(cli.dataDirFlags, dest) 534 *dest = filepath.Join(SPDataDir, defaultValue) 535 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 536 fs.Func(name, usage, func(s string) error { 537 *dest = s 538 return nil 539 }) 540} 541 542func (cli *CLI) HasMist() bool { 543 return runtime.GOOS == "linux" 544} 545 546// type for comma-separated ethereum addresses 547func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 548 *dest = []aqpub.Pub{} 549 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 550 fs.Func(name, usage, func(s string) error { 551 if s == "" { 552 return nil 553 } 554 strs := strings.Split(s, ",") 555 for _, str := range strs { 556 pub, err := aqpub.FromHexString(str) 557 if err != nil { 558 return err 559 } 560 *dest = append(*dest, pub) 561 } 562 return nil 563 }) 564} 565 566func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) { 567 *dest = defaultValue 568 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 569 fs.Func(name, usage, func(s string) error { 570 if s == "" { 571 return nil 572 } 573 strs := strings.Split(s, ",") 574 *dest = append([]string{}, strs...) 575 return nil 576 }) 577} 578 579func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) { 580 *dest = map[string]string{} 581 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 582 fs.Func(name, usage, func(s string) error { 583 if s == "" { 584 return nil 585 } 586 pairs := strings.Split(s, ",") 587 for _, pair := range pairs { 588 parts := strings.Split(pair, "=") 589 if len(parts) != 2 { 590 return fmt.Errorf("invalid kv flag: %s", pair) 591 } 592 (*dest)[parts[0]] = parts[1] 593 } 594 return nil 595 }) 596} 597 598func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 599 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 600 fs.Func(name, usage, func(s string) error { 601 if s == "" { 602 return nil 603 } 604 return json.Unmarshal([]byte(s), dest) 605 }) 606} 607 608// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 609func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 610 *dest = map[string]map[string]int{} 611 fs.Func(name, usage, func(s string) error { 612 if s == "" { 613 return nil 614 } 615 pairs := strings.Split(s, ",") 616 for _, pair := range pairs { 617 scoreSplit := strings.Split(pair, ":") 618 if len(scoreSplit) != 2 { 619 return fmt.Errorf("invalid debug flag: %s", pair) 620 } 621 score, err := strconv.Atoi(scoreSplit[1]) 622 if err != nil { 623 return fmt.Errorf("invalid debug flag: %s", pair) 624 } 625 selectorSplit := strings.Split(scoreSplit[0], "=") 626 if len(selectorSplit) != 2 { 627 return fmt.Errorf("invalid debug flag: %s", pair) 628 } 629 _, ok := (*dest)[selectorSplit[0]] 630 if !ok { 631 (*dest)[selectorSplit[0]] = map[string]int{} 632 } 633 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 634 } 635 636 return nil 637 }) 638} 639 640func (cli *CLI) StreamIsAllowed(did string) error { 641 if cli.WideOpen { 642 return nil 643 } 644 // if the user set no test streams, anyone can stream 645 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 646 // but only valid atproto accounts! did:key is only allowed for our local test stream 647 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 648 if openServer && !isDIDKey { 649 return nil 650 } 651 for _, a := range cli.AllowedStreams { 652 if a == did { 653 return nil 654 } 655 } 656 return fmt.Errorf("user is not allowed to stream") 657} 658 659func (cli *CLI) MyDID() string { 660 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost) 661} 662 663func (cli *CLI) HasHTTPS() bool { 664 return cli.Secure || cli.BehindHTTPSProxy 665} 666 667func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) { 668 if cli.SegmentDebugDir == "" { 669 return 670 } 671 go func() { 672 err := os.MkdirAll(cli.SegmentDebugDir, 0755) 673 if err != nil { 674 log.Error(ctx, "failed to create debug directory", "error", err) 675 return 676 } 677 now := aqtime.FromTime(time.Now()) 678 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-"))) 679 fd, err := os.Create(outFile) 680 if err != nil { 681 log.Error(ctx, "failed to create debug file", "error", err) 682 return 683 } 684 defer fd.Close() 685 _, err = io.Copy(fd, r) 686 if err != nil { 687 log.Error(ctx, "failed to copy debug file", "error", err) 688 return 689 } 690 log.Log(ctx, "wrote debug file", "path", outFile) 691 }() 692} 693 694func (cli *CLI) ShouldSyndicate(did string) bool { 695 for _, d := range cli.Syndicate { 696 if d == "*" { 697 return true 698 } 699 if d == did { 700 return true 701 } 702 } 703 return false 704}