Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/too-many-keyframes 701 lines 27 kB view raw
1package config 2 3import ( 4 "context" 5 "crypto/rsa" 6 "crypto/x509" 7 "encoding/json" 8 "encoding/pem" 9 "errors" 10 "flag" 11 "fmt" 12 "io" 13 "net" 14 "os" 15 "path/filepath" 16 "runtime" 17 "strconv" 18 "strings" 19 "time" 20 21 "math/rand/v2" 22 23 "github.com/lestrrat-go/jwx/v2/jwk" 24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 25 "github.com/lmittmann/tint" 26 slogGorm "github.com/orandin/slog-gorm" 27 "github.com/peterbourgon/ff/v3" 28 "stream.place/streamplace/pkg/aqtime" 29 "stream.place/streamplace/pkg/constants" 30 "stream.place/streamplace/pkg/crypto/aqpub" 31 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 32 "stream.place/streamplace/pkg/log" 33) 34 35const SPDataDir = "$SP_DATA_DIR" 36const SegmentsDir = "segments" 37 38type BuildFlags struct { 39 Version string 40 BuildTime int64 41 UUID string 42} 43 44func (b BuildFlags) BuildTimeStr() string { 45 ts := time.Unix(b.BuildTime, 0) 46 return ts.UTC().Format(time.RFC3339) 47} 48 49func (b BuildFlags) BuildTimeStrExpo() string { 50 ts := time.Unix(b.BuildTime, 0) 51 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 52} 53 54type CLI struct { 55 AdminAccount string 56 Build *BuildFlags 57 DataDir string 58 DBURL string 59 EthAccountAddr string 60 EthKeystorePath string 61 EthPassword string 62 FirebaseServiceAccount string 63 FirebaseServiceAccountFile string 64 GitLabURL string 65 HTTPAddr string 66 HTTPInternalAddr string 67 HTTPSAddr string 68 RTMPAddr string 69 RTMPSAddr string 70 RTMPSAddonAddr string 71 Secure bool 72 NoMist bool 73 MistAdminPort int 74 MistHTTPPort int 75 MistRTMPPort int 76 SigningKeyPath string 77 TAURL string 78 TLSCertPath string 79 TLSKeyPath string 80 PKCS11ModulePath string 81 PKCS11Pin string 82 PKCS11TokenSlot string 83 PKCS11TokenLabel string 84 PKCS11TokenSerial string 85 PKCS11KeypairLabel string 86 PKCS11KeypairID string 87 StreamerName string 88 RelayHost string 89 Debug map[string]map[string]int 90 AllowedStreams []string 91 WideOpen bool 92 Peers []string 93 Redirects []string 94 TestStream bool 95 FrontendProxy string 96 PublicOAuth bool 97 AppBundleID string 98 NoFirehose bool 99 PrintChat bool 100 Color string 101 LivepeerGatewayURL string 102 LivepeerGateway bool 103 WHIPTest string 104 Thumbnail bool 105 SmearAudio bool 106 ExternalSigning bool 107 RTMPServerAddon string 108 TracingEndpoint string 109 BroadcasterHost string 110 XXDeprecatedPublicHost string 111 ServerHost string 112 RateLimitPerSecond int 113 RateLimitBurst int 114 RateLimitWebsocket int 115 JWK jwk.Key 116 AccessJWK jwk.Key 117 dataDirFlags []*string 118 DiscordWebhooks []*discordtypes.Webhook 119 NewWebRTCPlayback bool 120 AppleTeamID string 121 AndroidCertFingerprint string 122 Labelers []string 123 AtprotoDID string 124 LivepeerHelp bool 125 PLCURL string 126 ContentFilters *ContentFilters 127 DefaultRecommendedStreamers []string 128 SQLLogging bool 129 SentryDSN string 130 LivepeerDebug bool 131 Tickets []string 132 IrohTopic string 133 DID string 134 DisableIrohRelay bool 135 DevAccountCreds map[string]string 136 StreamSessionTimeout time.Duration 137 Replicators []string 138 WebsocketURL string 139 BehindHTTPSProxy bool 140 SegmentDebugDir string 141 AdminDIDs []string 142 Syndicate []string 143 PlayerTelemetry bool 144} 145 146// ContentFilters represents the content filtering configuration 147type ContentFilters struct { 148 ContentWarnings struct { 149 Enabled bool `json:"enabled"` 150 BlockedWarnings []string `json:"blocked_warnings"` 151 } `json:"content_warnings"` 152 DistributionPolicy struct { 153 Enabled bool `json:"enabled"` 154 } `json:"distribution_policy"` 155} 156 157const ( 158 ReplicatorWebsocket string = "websocket" 159 ReplicatorIroh string = "iroh" 160) 161 162func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 163 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 164 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 165 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 166 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 167 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 168 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 169 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 170 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 171 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 172 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 173 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 174 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 175 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key") 176 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key") 177 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 178 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 179 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 180 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 181 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 182 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 183 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 184 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 185 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 186 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 187 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 188 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 189 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 190 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 191 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 192 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development") 193 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 194 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 195 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 196 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node") 197 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to") 198 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four") 199 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 200 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 201 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 202 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 203 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 204 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 205 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 206 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)") 207 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate") 208 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters") 209 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 210 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 211 212 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 213 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 214 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 215 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 216 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 217 fs.StringVar(&cli.RTMPSAddonAddr, "rtmps-addon-addr", ":1936", "address to listen for RTMPS on the addon server") 218 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)") 219 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)") 220 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 221 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 222 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 223 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 224 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to") 225 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 226 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules") 227 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available") 228 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 229 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 230 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 231 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 232 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 233 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to") 234 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with") 235 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)") 236 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 237 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 238 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 239 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)") 240 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)") 241 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS") 242 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations") 243 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)") 244 fs.BoolVar(&cli.PlayerTelemetry, "player-telemetry", true, "enable player telemetry") 245 246 fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 247 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 248 249 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 250 _ = starter.NewLivepeerConfig(lpFlags) 251 lpFlags.VisitAll(func(f *flag.Flag) { 252 adapted := LivepeerFlags.CamelToSnake[f.Name] 253 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 254 }) 255 256 if runtime.GOOS == "linux" { 257 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 258 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 259 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 260 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 261 } 262 return fs 263} 264 265var StreamplaceSchemePrefix = "streamplace://" 266 267func (cli *CLI) OwnPublicURL() string { 268 // No errors because we know it's valid from AddrFlag 269 host, port, _ := net.SplitHostPort(cli.HTTPAddr) 270 271 ip := net.ParseIP(host) 272 if host == "" || ip.IsUnspecified() { 273 host = "127.0.0.1" 274 } 275 addr := net.JoinHostPort(host, port) 276 return fmt.Sprintf("http://%s", addr) 277} 278 279func (cli *CLI) OwnInternalURL() string { 280 // No errors because we know it's valid from AddrFlag 281 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 282 283 ip := net.ParseIP(host) 284 if ip.IsUnspecified() { 285 host = "127.0.0.1" 286 } 287 addr := net.JoinHostPort(host, port) 288 return fmt.Sprintf("http://%s", addr) 289} 290 291func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 292 bs, err := os.ReadFile(cli.SigningKeyPath) 293 if err != nil { 294 return nil, err 295 } 296 block, _ := pem.Decode(bs) 297 if block == nil { 298 return nil, fmt.Errorf("no RSA key found in signing key") 299 } 300 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 301 if err != nil { 302 return nil, err 303 } 304 return key, nil 305} 306 307func RandomTrailer(length int) string { 308 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 309 310 res := make([]byte, length) 311 for i := 0; i < length; i++ { 312 res[i] = charset[rand.IntN(len(charset))] 313 } 314 return string(res) 315} 316 317func DefaultDataDir() string { 318 home, err := os.UserHomeDir() 319 if err != nil { 320 // not fatal unless the user doesn't set one later 321 return "" 322 } 323 return filepath.Join(home, ".streamplace") 324} 325 326var GormLogger = slogGorm.New( 327 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 328 TimeFormat: time.RFC3339, 329 })), 330 slogGorm.WithTraceAll(), 331) 332 333func DisableSQLLogging() { 334 GormLogger = slogGorm.New( 335 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 336 TimeFormat: time.RFC3339, 337 })), 338 ) 339} 340 341func EnableSQLLogging() { 342 GormLogger = slogGorm.New( 343 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 344 TimeFormat: time.RFC3339, 345 })), 346 slogGorm.WithTraceAll(), 347 ) 348} 349 350func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 351 err := ff.Parse( 352 fs, args, 353 ff.WithEnvVarPrefix("SP"), 354 ) 355 if err != nil { 356 return err 357 } 358 if cli.DataDir == "" { 359 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 360 } 361 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 362 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 363 } 364 if cli.LivepeerGateway { 365 log.MonkeypatchStderr() 366 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 367 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0") 368 if err != nil { 369 return err 370 } 371 err = fs.Set("livepeer.data-dir", gatewayPath) 372 if err != nil { 373 return err 374 } 375 err = fs.Set("livepeer.gateway", "true") 376 if err != nil { 377 return err 378 } 379 httpAddrFlag := fs.Lookup("livepeer.http-addr") 380 if httpAddrFlag == nil { 381 return fmt.Errorf("livepeer.http-addr not found") 382 } 383 httpAddr := httpAddrFlag.Value.String() 384 if httpAddr == "" { 385 httpAddr = "127.0.0.1:8935" 386 err = fs.Set("livepeer.http-addr", httpAddr) 387 if err != nil { 388 return err 389 } 390 } 391 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 392 } 393 for _, dest := range cli.dataDirFlags { 394 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 395 } 396 if !cli.SQLLogging { 397 DisableSQLLogging() 398 } else { 399 EnableSQLLogging() 400 } 401 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" { 402 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate") 403 cli.BroadcasterHost = cli.XXDeprecatedPublicHost 404 } 405 if cli.ServerHost == "" && cli.BroadcasterHost != "" { 406 cli.ServerHost = cli.BroadcasterHost 407 } 408 if cli.PublicOAuth { 409 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production") 410 } 411 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" { 412 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?") 413 } 414 if cli.FirebaseServiceAccountFile != "" { 415 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile) 416 if err != nil { 417 return err 418 } 419 cli.FirebaseServiceAccount = string(bs) 420 } 421 return nil 422} 423 424func (cli *CLI) DataFilePath(fpath []string) string { 425 if cli.DataDir == "" { 426 panic("no data dir configured") 427 } 428 // windows does not like colons 429 safe := []string{} 430 for _, p := range fpath { 431 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 432 } 433 fpath = append([]string{cli.DataDir}, safe...) 434 fdpath := filepath.Join(fpath...) 435 return fdpath 436} 437 438// does a file exist in our data dir? 439func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 440 ddpath := cli.DataFilePath(fpath) 441 _, err := os.Stat(ddpath) 442 if err == nil { 443 return true, nil 444 } 445 if errors.Is(err, os.ErrNotExist) { 446 return false, nil 447 } 448 return false, err 449} 450 451// write a file to our data dir 452func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 453 fd, err := cli.DataFileCreate(fpath, overwrite) 454 if err != nil { 455 return err 456 } 457 defer fd.Close() 458 _, err = io.Copy(fd, r) 459 if err != nil { 460 return err 461 } 462 463 return nil 464} 465 466// create a file in our data dir. don't forget to close it! 467func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 468 ddpath := cli.DataFilePath(fpath) 469 if !overwrite { 470 exists, err := cli.DataFileExists(fpath) 471 if err != nil { 472 return nil, err 473 } 474 if exists { 475 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 476 } 477 } 478 if len(fpath) > 1 { 479 dirs, _ := filepath.Split(ddpath) 480 err := os.MkdirAll(dirs, os.ModePerm) 481 if err != nil { 482 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 483 } 484 } 485 return os.Create(ddpath) 486} 487 488// get a path to a segment file in our database 489func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 490 ext := filepath.Ext(file) 491 base := strings.TrimSuffix(file, ext) 492 aqt, err := aqtime.FromString(base) 493 if err != nil { 494 return "", err 495 } 496 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 497 yr, mon, day, hr, min, _, _ := aqt.Parts() 498 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 499} 500 501// get a path to a segment file in our database 502func (cli *CLI) HLSDir(user string) (string, error) { 503 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 504} 505 506// create a segment file in our database 507func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 508 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 509 yr, mon, day, hr, min, _, _ := aqt.Parts() 510 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 511} 512 513// read a file from our data dir 514func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 515 ddpath := cli.DataFilePath(fpath) 516 517 fd, err := os.Open(ddpath) 518 if err != nil { 519 return err 520 } 521 _, err = io.Copy(w, fd) 522 if err != nil { 523 return err 524 } 525 526 return nil 527} 528 529func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 530 cli.dataDirFlags = append(cli.dataDirFlags, dest) 531 *dest = filepath.Join(SPDataDir, defaultValue) 532 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 533 fs.Func(name, usage, func(s string) error { 534 *dest = s 535 return nil 536 }) 537} 538 539func (cli *CLI) HasMist() bool { 540 return runtime.GOOS == "linux" 541} 542 543// type for comma-separated ethereum addresses 544func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 545 *dest = []aqpub.Pub{} 546 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 547 fs.Func(name, usage, func(s string) error { 548 if s == "" { 549 return nil 550 } 551 strs := strings.Split(s, ",") 552 for _, str := range strs { 553 pub, err := aqpub.FromHexString(str) 554 if err != nil { 555 return err 556 } 557 *dest = append(*dest, pub) 558 } 559 return nil 560 }) 561} 562 563func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) { 564 *dest = defaultValue 565 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 566 fs.Func(name, usage, func(s string) error { 567 if s == "" { 568 return nil 569 } 570 strs := strings.Split(s, ",") 571 *dest = append([]string{}, strs...) 572 return nil 573 }) 574} 575 576func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) { 577 *dest = map[string]string{} 578 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 579 fs.Func(name, usage, func(s string) error { 580 if s == "" { 581 return nil 582 } 583 pairs := strings.Split(s, ",") 584 for _, pair := range pairs { 585 parts := strings.Split(pair, "=") 586 if len(parts) != 2 { 587 return fmt.Errorf("invalid kv flag: %s", pair) 588 } 589 (*dest)[parts[0]] = parts[1] 590 } 591 return nil 592 }) 593} 594 595func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 596 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 597 fs.Func(name, usage, func(s string) error { 598 if s == "" { 599 return nil 600 } 601 return json.Unmarshal([]byte(s), dest) 602 }) 603} 604 605// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 606func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 607 *dest = map[string]map[string]int{} 608 fs.Func(name, usage, func(s string) error { 609 if s == "" { 610 return nil 611 } 612 pairs := strings.Split(s, ",") 613 for _, pair := range pairs { 614 scoreSplit := strings.Split(pair, ":") 615 if len(scoreSplit) != 2 { 616 return fmt.Errorf("invalid debug flag: %s", pair) 617 } 618 score, err := strconv.Atoi(scoreSplit[1]) 619 if err != nil { 620 return fmt.Errorf("invalid debug flag: %s", pair) 621 } 622 selectorSplit := strings.Split(scoreSplit[0], "=") 623 if len(selectorSplit) != 2 { 624 return fmt.Errorf("invalid debug flag: %s", pair) 625 } 626 _, ok := (*dest)[selectorSplit[0]] 627 if !ok { 628 (*dest)[selectorSplit[0]] = map[string]int{} 629 } 630 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 631 } 632 633 return nil 634 }) 635} 636 637func (cli *CLI) StreamIsAllowed(did string) error { 638 if cli.WideOpen { 639 return nil 640 } 641 // if the user set no test streams, anyone can stream 642 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 643 // but only valid atproto accounts! did:key is only allowed for our local test stream 644 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 645 if openServer && !isDIDKey { 646 return nil 647 } 648 for _, a := range cli.AllowedStreams { 649 if a == did { 650 return nil 651 } 652 } 653 return fmt.Errorf("user is not allowed to stream") 654} 655 656func (cli *CLI) MyDID() string { 657 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost) 658} 659 660func (cli *CLI) HasHTTPS() bool { 661 return cli.Secure || cli.BehindHTTPSProxy 662} 663 664func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) { 665 if cli.SegmentDebugDir == "" { 666 return 667 } 668 go func() { 669 err := os.MkdirAll(cli.SegmentDebugDir, 0755) 670 if err != nil { 671 log.Error(ctx, "failed to create debug directory", "error", err) 672 return 673 } 674 now := aqtime.FromTime(time.Now()) 675 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-"))) 676 fd, err := os.Create(outFile) 677 if err != nil { 678 log.Error(ctx, "failed to create debug file", "error", err) 679 return 680 } 681 defer fd.Close() 682 _, err = io.Copy(fd, r) 683 if err != nil { 684 log.Error(ctx, "failed to copy debug file", "error", err) 685 return 686 } 687 log.Log(ctx, "wrote debug file", "path", outFile) 688 }() 689} 690 691func (cli *CLI) ShouldSyndicate(did string) bool { 692 for _, d := range cli.Syndicate { 693 if d == "*" { 694 return true 695 } 696 if d == did { 697 return true 698 } 699 } 700 return false 701}