Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.0 699 lines 27 kB view raw
1package config 2 3import ( 4 "context" 5 "crypto/rsa" 6 "crypto/x509" 7 "encoding/json" 8 "encoding/pem" 9 "errors" 10 "flag" 11 "fmt" 12 "io" 13 "net" 14 "os" 15 "path/filepath" 16 "runtime" 17 "strconv" 18 "strings" 19 "time" 20 21 "math/rand/v2" 22 23 "github.com/lestrrat-go/jwx/v2/jwk" 24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 25 "github.com/lmittmann/tint" 26 slogGorm "github.com/orandin/slog-gorm" 27 "github.com/peterbourgon/ff/v3" 28 "stream.place/streamplace/pkg/aqtime" 29 "stream.place/streamplace/pkg/constants" 30 "stream.place/streamplace/pkg/crypto/aqpub" 31 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 32 "stream.place/streamplace/pkg/log" 33) 34 35const SPDataDir = "$SP_DATA_DIR" 36const SegmentsDir = "segments" 37 38type BuildFlags struct { 39 Version string 40 BuildTime int64 41 UUID string 42} 43 44func (b BuildFlags) BuildTimeStr() string { 45 ts := time.Unix(b.BuildTime, 0) 46 return ts.UTC().Format(time.RFC3339) 47} 48 49func (b BuildFlags) BuildTimeStrExpo() string { 50 ts := time.Unix(b.BuildTime, 0) 51 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 52} 53 54type CLI struct { 55 AdminAccount string 56 Build *BuildFlags 57 DataDir string 58 DBURL string 59 EthAccountAddr string 60 EthKeystorePath string 61 EthPassword string 62 FirebaseServiceAccount string 63 FirebaseServiceAccountFile string 64 GitLabURL string 65 HTTPAddr string 66 HTTPInternalAddr string 67 HTTPSAddr string 68 RTMPAddr string 69 RTMPSAddr string 70 RTMPSAddonAddr string 71 Secure bool 72 NoMist bool 73 MistAdminPort int 74 MistHTTPPort int 75 MistRTMPPort int 76 SigningKeyPath string 77 TAURL string 78 TLSCertPath string 79 TLSKeyPath string 80 PKCS11ModulePath string 81 PKCS11Pin string 82 PKCS11TokenSlot string 83 PKCS11TokenLabel string 84 PKCS11TokenSerial string 85 PKCS11KeypairLabel string 86 PKCS11KeypairID string 87 StreamerName string 88 RelayHost string 89 Debug map[string]map[string]int 90 AllowedStreams []string 91 WideOpen bool 92 Peers []string 93 Redirects []string 94 TestStream bool 95 FrontendProxy string 96 PublicOAuth bool 97 AppBundleID string 98 NoFirehose bool 99 PrintChat bool 100 Color string 101 LivepeerGatewayURL string 102 LivepeerGateway bool 103 WHIPTest string 104 Thumbnail bool 105 SmearAudio bool 106 ExternalSigning bool 107 RTMPServerAddon string 108 TracingEndpoint string 109 BroadcasterHost string 110 XXDeprecatedPublicHost string 111 ServerHost string 112 RateLimitPerSecond int 113 RateLimitBurst int 114 RateLimitWebsocket int 115 JWK jwk.Key 116 AccessJWK jwk.Key 117 dataDirFlags []*string 118 DiscordWebhooks []*discordtypes.Webhook 119 NewWebRTCPlayback bool 120 AppleTeamID string 121 AndroidCertFingerprint string 122 Labelers []string 123 AtprotoDID string 124 LivepeerHelp bool 125 PLCURL string 126 ContentFilters *ContentFilters 127 DefaultRecommendedStreamers []string 128 SQLLogging bool 129 SentryDSN string 130 LivepeerDebug bool 131 Tickets []string 132 IrohTopic string 133 DID string 134 DisableIrohRelay bool 135 DevAccountCreds map[string]string 136 StreamSessionTimeout time.Duration 137 Replicators []string 138 WebsocketURL string 139 BehindHTTPSProxy bool 140 SegmentDebugDir string 141 AdminDIDs []string 142 Syndicate []string 143} 144 145// ContentFilters represents the content filtering configuration 146type ContentFilters struct { 147 ContentWarnings struct { 148 Enabled bool `json:"enabled"` 149 BlockedWarnings []string `json:"blocked_warnings"` 150 } `json:"content_warnings"` 151 DistributionPolicy struct { 152 Enabled bool `json:"enabled"` 153 } `json:"distribution_policy"` 154} 155 156const ( 157 ReplicatorWebsocket string = "websocket" 158 ReplicatorIroh string = "iroh" 159) 160 161func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 162 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 163 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 164 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 165 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 166 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 167 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 168 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 169 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 170 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 171 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 172 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 173 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 174 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key") 175 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key") 176 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 177 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 178 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 179 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 180 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 181 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 182 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 183 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 184 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 185 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 186 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 187 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 188 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 189 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 190 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 191 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development") 192 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 193 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 194 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 195 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node") 196 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to") 197 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four") 198 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 199 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 200 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 201 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 202 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 203 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 204 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 205 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)") 206 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate") 207 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters") 208 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 209 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 210 211 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 212 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 213 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 214 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 215 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 216 fs.StringVar(&cli.RTMPSAddonAddr, "rtmps-addon-addr", ":1936", "address to listen for RTMPS on the addon server") 217 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)") 218 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)") 219 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 220 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 221 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 222 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 223 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to") 224 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 225 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules") 226 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available") 227 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 228 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 229 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 230 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 231 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 232 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to") 233 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with") 234 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)") 235 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 236 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 237 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 238 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)") 239 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)") 240 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS") 241 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations") 242 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)") 243 244 fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 245 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 246 247 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 248 _ = starter.NewLivepeerConfig(lpFlags) 249 lpFlags.VisitAll(func(f *flag.Flag) { 250 adapted := LivepeerFlags.CamelToSnake[f.Name] 251 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 252 }) 253 254 if runtime.GOOS == "linux" { 255 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 256 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 257 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 258 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 259 } 260 return fs 261} 262 263var StreamplaceSchemePrefix = "streamplace://" 264 265func (cli *CLI) OwnPublicURL() string { 266 // No errors because we know it's valid from AddrFlag 267 host, port, _ := net.SplitHostPort(cli.HTTPAddr) 268 269 ip := net.ParseIP(host) 270 if host == "" || ip.IsUnspecified() { 271 host = "127.0.0.1" 272 } 273 addr := net.JoinHostPort(host, port) 274 return fmt.Sprintf("http://%s", addr) 275} 276 277func (cli *CLI) OwnInternalURL() string { 278 // No errors because we know it's valid from AddrFlag 279 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 280 281 ip := net.ParseIP(host) 282 if ip.IsUnspecified() { 283 host = "127.0.0.1" 284 } 285 addr := net.JoinHostPort(host, port) 286 return fmt.Sprintf("http://%s", addr) 287} 288 289func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 290 bs, err := os.ReadFile(cli.SigningKeyPath) 291 if err != nil { 292 return nil, err 293 } 294 block, _ := pem.Decode(bs) 295 if block == nil { 296 return nil, fmt.Errorf("no RSA key found in signing key") 297 } 298 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 299 if err != nil { 300 return nil, err 301 } 302 return key, nil 303} 304 305func RandomTrailer(length int) string { 306 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 307 308 res := make([]byte, length) 309 for i := 0; i < length; i++ { 310 res[i] = charset[rand.IntN(len(charset))] 311 } 312 return string(res) 313} 314 315func DefaultDataDir() string { 316 home, err := os.UserHomeDir() 317 if err != nil { 318 // not fatal unless the user doesn't set one later 319 return "" 320 } 321 return filepath.Join(home, ".streamplace") 322} 323 324var GormLogger = slogGorm.New( 325 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 326 TimeFormat: time.RFC3339, 327 })), 328 slogGorm.WithTraceAll(), 329) 330 331func DisableSQLLogging() { 332 GormLogger = slogGorm.New( 333 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 334 TimeFormat: time.RFC3339, 335 })), 336 ) 337} 338 339func EnableSQLLogging() { 340 GormLogger = slogGorm.New( 341 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 342 TimeFormat: time.RFC3339, 343 })), 344 slogGorm.WithTraceAll(), 345 ) 346} 347 348func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 349 err := ff.Parse( 350 fs, args, 351 ff.WithEnvVarPrefix("SP"), 352 ) 353 if err != nil { 354 return err 355 } 356 if cli.DataDir == "" { 357 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 358 } 359 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 360 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 361 } 362 if cli.LivepeerGateway { 363 log.MonkeypatchStderr() 364 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 365 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0") 366 if err != nil { 367 return err 368 } 369 err = fs.Set("livepeer.data-dir", gatewayPath) 370 if err != nil { 371 return err 372 } 373 err = fs.Set("livepeer.gateway", "true") 374 if err != nil { 375 return err 376 } 377 httpAddrFlag := fs.Lookup("livepeer.http-addr") 378 if httpAddrFlag == nil { 379 return fmt.Errorf("livepeer.http-addr not found") 380 } 381 httpAddr := httpAddrFlag.Value.String() 382 if httpAddr == "" { 383 httpAddr = "127.0.0.1:8935" 384 err = fs.Set("livepeer.http-addr", httpAddr) 385 if err != nil { 386 return err 387 } 388 } 389 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 390 } 391 for _, dest := range cli.dataDirFlags { 392 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 393 } 394 if !cli.SQLLogging { 395 DisableSQLLogging() 396 } else { 397 EnableSQLLogging() 398 } 399 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" { 400 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate") 401 cli.BroadcasterHost = cli.XXDeprecatedPublicHost 402 } 403 if cli.ServerHost == "" && cli.BroadcasterHost != "" { 404 cli.ServerHost = cli.BroadcasterHost 405 } 406 if cli.PublicOAuth { 407 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production") 408 } 409 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" { 410 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?") 411 } 412 if cli.FirebaseServiceAccountFile != "" { 413 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile) 414 if err != nil { 415 return err 416 } 417 cli.FirebaseServiceAccount = string(bs) 418 } 419 return nil 420} 421 422func (cli *CLI) DataFilePath(fpath []string) string { 423 if cli.DataDir == "" { 424 panic("no data dir configured") 425 } 426 // windows does not like colons 427 safe := []string{} 428 for _, p := range fpath { 429 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 430 } 431 fpath = append([]string{cli.DataDir}, safe...) 432 fdpath := filepath.Join(fpath...) 433 return fdpath 434} 435 436// does a file exist in our data dir? 437func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 438 ddpath := cli.DataFilePath(fpath) 439 _, err := os.Stat(ddpath) 440 if err == nil { 441 return true, nil 442 } 443 if errors.Is(err, os.ErrNotExist) { 444 return false, nil 445 } 446 return false, err 447} 448 449// write a file to our data dir 450func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 451 fd, err := cli.DataFileCreate(fpath, overwrite) 452 if err != nil { 453 return err 454 } 455 defer fd.Close() 456 _, err = io.Copy(fd, r) 457 if err != nil { 458 return err 459 } 460 461 return nil 462} 463 464// create a file in our data dir. don't forget to close it! 465func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 466 ddpath := cli.DataFilePath(fpath) 467 if !overwrite { 468 exists, err := cli.DataFileExists(fpath) 469 if err != nil { 470 return nil, err 471 } 472 if exists { 473 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 474 } 475 } 476 if len(fpath) > 1 { 477 dirs, _ := filepath.Split(ddpath) 478 err := os.MkdirAll(dirs, os.ModePerm) 479 if err != nil { 480 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 481 } 482 } 483 return os.Create(ddpath) 484} 485 486// get a path to a segment file in our database 487func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 488 ext := filepath.Ext(file) 489 base := strings.TrimSuffix(file, ext) 490 aqt, err := aqtime.FromString(base) 491 if err != nil { 492 return "", err 493 } 494 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 495 yr, mon, day, hr, min, _, _ := aqt.Parts() 496 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 497} 498 499// get a path to a segment file in our database 500func (cli *CLI) HLSDir(user string) (string, error) { 501 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 502} 503 504// create a segment file in our database 505func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 506 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 507 yr, mon, day, hr, min, _, _ := aqt.Parts() 508 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 509} 510 511// read a file from our data dir 512func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 513 ddpath := cli.DataFilePath(fpath) 514 515 fd, err := os.Open(ddpath) 516 if err != nil { 517 return err 518 } 519 _, err = io.Copy(w, fd) 520 if err != nil { 521 return err 522 } 523 524 return nil 525} 526 527func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 528 cli.dataDirFlags = append(cli.dataDirFlags, dest) 529 *dest = filepath.Join(SPDataDir, defaultValue) 530 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 531 fs.Func(name, usage, func(s string) error { 532 *dest = s 533 return nil 534 }) 535} 536 537func (cli *CLI) HasMist() bool { 538 return runtime.GOOS == "linux" 539} 540 541// type for comma-separated ethereum addresses 542func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 543 *dest = []aqpub.Pub{} 544 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 545 fs.Func(name, usage, func(s string) error { 546 if s == "" { 547 return nil 548 } 549 strs := strings.Split(s, ",") 550 for _, str := range strs { 551 pub, err := aqpub.FromHexString(str) 552 if err != nil { 553 return err 554 } 555 *dest = append(*dest, pub) 556 } 557 return nil 558 }) 559} 560 561func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) { 562 *dest = defaultValue 563 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 564 fs.Func(name, usage, func(s string) error { 565 if s == "" { 566 return nil 567 } 568 strs := strings.Split(s, ",") 569 *dest = append([]string{}, strs...) 570 return nil 571 }) 572} 573 574func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) { 575 *dest = map[string]string{} 576 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 577 fs.Func(name, usage, func(s string) error { 578 if s == "" { 579 return nil 580 } 581 pairs := strings.Split(s, ",") 582 for _, pair := range pairs { 583 parts := strings.Split(pair, "=") 584 if len(parts) != 2 { 585 return fmt.Errorf("invalid kv flag: %s", pair) 586 } 587 (*dest)[parts[0]] = parts[1] 588 } 589 return nil 590 }) 591} 592 593func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 594 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 595 fs.Func(name, usage, func(s string) error { 596 if s == "" { 597 return nil 598 } 599 return json.Unmarshal([]byte(s), dest) 600 }) 601} 602 603// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 604func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 605 *dest = map[string]map[string]int{} 606 fs.Func(name, usage, func(s string) error { 607 if s == "" { 608 return nil 609 } 610 pairs := strings.Split(s, ",") 611 for _, pair := range pairs { 612 scoreSplit := strings.Split(pair, ":") 613 if len(scoreSplit) != 2 { 614 return fmt.Errorf("invalid debug flag: %s", pair) 615 } 616 score, err := strconv.Atoi(scoreSplit[1]) 617 if err != nil { 618 return fmt.Errorf("invalid debug flag: %s", pair) 619 } 620 selectorSplit := strings.Split(scoreSplit[0], "=") 621 if len(selectorSplit) != 2 { 622 return fmt.Errorf("invalid debug flag: %s", pair) 623 } 624 _, ok := (*dest)[selectorSplit[0]] 625 if !ok { 626 (*dest)[selectorSplit[0]] = map[string]int{} 627 } 628 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 629 } 630 631 return nil 632 }) 633} 634 635func (cli *CLI) StreamIsAllowed(did string) error { 636 if cli.WideOpen { 637 return nil 638 } 639 // if the user set no test streams, anyone can stream 640 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 641 // but only valid atproto accounts! did:key is only allowed for our local test stream 642 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 643 if openServer && !isDIDKey { 644 return nil 645 } 646 for _, a := range cli.AllowedStreams { 647 if a == did { 648 return nil 649 } 650 } 651 return fmt.Errorf("user is not allowed to stream") 652} 653 654func (cli *CLI) MyDID() string { 655 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost) 656} 657 658func (cli *CLI) HasHTTPS() bool { 659 return cli.Secure || cli.BehindHTTPSProxy 660} 661 662func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) { 663 if cli.SegmentDebugDir == "" { 664 return 665 } 666 go func() { 667 err := os.MkdirAll(cli.SegmentDebugDir, 0755) 668 if err != nil { 669 log.Error(ctx, "failed to create debug directory", "error", err) 670 return 671 } 672 now := aqtime.FromTime(time.Now()) 673 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-"))) 674 fd, err := os.Create(outFile) 675 if err != nil { 676 log.Error(ctx, "failed to create debug file", "error", err) 677 return 678 } 679 defer fd.Close() 680 _, err = io.Copy(fd, r) 681 if err != nil { 682 log.Error(ctx, "failed to copy debug file", "error", err) 683 return 684 } 685 log.Log(ctx, "wrote debug file", "path", outFile) 686 }() 687} 688 689func (cli *CLI) ShouldSyndicate(did string) bool { 690 for _, d := range cli.Syndicate { 691 if d == "*" { 692 return true 693 } 694 if d == did { 695 return true 696 } 697 } 698 return false 699}