Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/fix-test-flake 690 lines 26 kB view raw
1package config 2 3import ( 4 "context" 5 "crypto/rsa" 6 "crypto/x509" 7 "encoding/json" 8 "encoding/pem" 9 "errors" 10 "flag" 11 "fmt" 12 "io" 13 "net" 14 "os" 15 "path/filepath" 16 "runtime" 17 "strconv" 18 "strings" 19 "time" 20 21 "math/rand/v2" 22 23 "github.com/lestrrat-go/jwx/v2/jwk" 24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 25 "github.com/lmittmann/tint" 26 slogGorm "github.com/orandin/slog-gorm" 27 "github.com/peterbourgon/ff/v3" 28 "stream.place/streamplace/pkg/aqtime" 29 "stream.place/streamplace/pkg/constants" 30 "stream.place/streamplace/pkg/crypto/aqpub" 31 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 32 "stream.place/streamplace/pkg/log" 33) 34 35const SPDataDir = "$SP_DATA_DIR" 36const SegmentsDir = "segments" 37 38type BuildFlags struct { 39 Version string 40 BuildTime int64 41 UUID string 42} 43 44func (b BuildFlags) BuildTimeStr() string { 45 ts := time.Unix(b.BuildTime, 0) 46 return ts.UTC().Format(time.RFC3339) 47} 48 49func (b BuildFlags) BuildTimeStrExpo() string { 50 ts := time.Unix(b.BuildTime, 0) 51 return ts.UTC().Format("2006-01-02T15:04:05.000Z") 52} 53 54type CLI struct { 55 AdminAccount string 56 Build *BuildFlags 57 DataDir string 58 DBURL string 59 EthAccountAddr string 60 EthKeystorePath string 61 EthPassword string 62 FirebaseServiceAccount string 63 FirebaseServiceAccountFile string 64 GitLabURL string 65 HTTPAddr string 66 HTTPInternalAddr string 67 HTTPSAddr string 68 RtmpsAddr string 69 Secure bool 70 NoMist bool 71 MistAdminPort int 72 MistHTTPPort int 73 MistRTMPPort int 74 SigningKeyPath string 75 TAURL string 76 TLSCertPath string 77 TLSKeyPath string 78 PKCS11ModulePath string 79 PKCS11Pin string 80 PKCS11TokenSlot string 81 PKCS11TokenLabel string 82 PKCS11TokenSerial string 83 PKCS11KeypairLabel string 84 PKCS11KeypairID string 85 StreamerName string 86 RelayHost string 87 Debug map[string]map[string]int 88 AllowedStreams []string 89 WideOpen bool 90 Peers []string 91 Redirects []string 92 TestStream bool 93 FrontendProxy string 94 PublicOAuth bool 95 AppBundleID string 96 NoFirehose bool 97 PrintChat bool 98 Color string 99 LivepeerGatewayURL string 100 LivepeerGateway bool 101 WHIPTest string 102 Thumbnail bool 103 SmearAudio bool 104 ExternalSigning bool 105 RTMPServerAddon string 106 TracingEndpoint string 107 BroadcasterHost string 108 XXDeprecatedPublicHost string 109 ServerHost string 110 RateLimitPerSecond int 111 RateLimitBurst int 112 RateLimitWebsocket int 113 JWK jwk.Key 114 AccessJWK jwk.Key 115 dataDirFlags []*string 116 DiscordWebhooks []*discordtypes.Webhook 117 NewWebRTCPlayback bool 118 AppleTeamID string 119 AndroidCertFingerprint string 120 Labelers []string 121 AtprotoDID string 122 LivepeerHelp bool 123 PLCURL string 124 ContentFilters *ContentFilters 125 SQLLogging bool 126 SentryDSN string 127 LivepeerDebug bool 128 Tickets []string 129 IrohTopic string 130 DID string 131 DisableIrohRelay bool 132 DevAccountCreds map[string]string 133 StreamSessionTimeout time.Duration 134 Replicators []string 135 WebsocketURL string 136 BehindHTTPSProxy bool 137 SegmentDebugDir string 138 Syndicate []string 139} 140 141// ContentFilters represents the content filtering configuration 142type ContentFilters struct { 143 ContentWarnings struct { 144 Enabled bool `json:"enabled"` 145 BlockedWarnings []string `json:"blocked_warnings"` 146 } `json:"content_warnings"` 147 DistributionPolicy struct { 148 Enabled bool `json:"enabled"` 149 } `json:"distribution_policy"` 150} 151 152const ( 153 ReplicatorWebsocket string = "websocket" 154 ReplicatorIroh string = "iroh" 155) 156 157func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 158 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) 159 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data") 160 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address") 161 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address") 162 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address") 163 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output") 164 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 165 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 166 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 167 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 168 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 169 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 170 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key") 171 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key") 172 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links") 173 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore") 174 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)") 175 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore") 176 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing") 177 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so") 178 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively") 179 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)") 180 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)") 181 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)") 182 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token") 183 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token") 184 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 185 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 186 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 187 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development") 188 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 189 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 190 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 191 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node") 192 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to") 193 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four") 194 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 195 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 196 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 197 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 198 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 199 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 200 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 201 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)") 202 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate") 203 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters") 204 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 205 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 206 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 207 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 208 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 209 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 210 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 211 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 212 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 213 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 214 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 215 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 216 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to") 217 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 218 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules") 219 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") 220 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 221 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 222 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 223 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 224 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to") 225 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with") 226 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)") 227 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 228 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 229 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 230 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)") 231 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)") 232 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS") 233 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)") 234 235 fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 236 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 237 238 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 239 _ = starter.NewLivepeerConfig(lpFlags) 240 lpFlags.VisitAll(func(f *flag.Flag) { 241 adapted := LivepeerFlags.CamelToSnake[f.Name] 242 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage) 243 }) 244 245 if runtime.GOOS == "linux" { 246 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer") 247 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)") 248 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)") 249 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)") 250 } 251 return fs 252} 253 254var StreamplaceSchemePrefix = "streamplace://" 255 256func (cli *CLI) OwnPublicURL() string { 257 // No errors because we know it's valid from AddrFlag 258 host, port, _ := net.SplitHostPort(cli.HTTPAddr) 259 260 ip := net.ParseIP(host) 261 if host == "" || ip.IsUnspecified() { 262 host = "127.0.0.1" 263 } 264 addr := net.JoinHostPort(host, port) 265 return fmt.Sprintf("http://%s", addr) 266} 267 268func (cli *CLI) OwnInternalURL() string { 269 // No errors because we know it's valid from AddrFlag 270 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr) 271 272 ip := net.ParseIP(host) 273 if ip.IsUnspecified() { 274 host = "127.0.0.1" 275 } 276 addr := net.JoinHostPort(host, port) 277 return fmt.Sprintf("http://%s", addr) 278} 279 280func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) { 281 bs, err := os.ReadFile(cli.SigningKeyPath) 282 if err != nil { 283 return nil, err 284 } 285 block, _ := pem.Decode(bs) 286 if block == nil { 287 return nil, fmt.Errorf("no RSA key found in signing key") 288 } 289 key, err := x509.ParsePKCS1PrivateKey(block.Bytes) 290 if err != nil { 291 return nil, err 292 } 293 return key, nil 294} 295 296func RandomTrailer(length int) string { 297 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 298 299 res := make([]byte, length) 300 for i := 0; i < length; i++ { 301 res[i] = charset[rand.IntN(len(charset))] 302 } 303 return string(res) 304} 305 306func DefaultDataDir() string { 307 home, err := os.UserHomeDir() 308 if err != nil { 309 // not fatal unless the user doesn't set one later 310 return "" 311 } 312 return filepath.Join(home, ".streamplace") 313} 314 315var GormLogger = slogGorm.New( 316 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 317 TimeFormat: time.RFC3339, 318 })), 319 slogGorm.WithTraceAll(), 320) 321 322func DisableSQLLogging() { 323 GormLogger = slogGorm.New( 324 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 325 TimeFormat: time.RFC3339, 326 })), 327 ) 328} 329 330func EnableSQLLogging() { 331 GormLogger = slogGorm.New( 332 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 333 TimeFormat: time.RFC3339, 334 })), 335 slogGorm.WithTraceAll(), 336 ) 337} 338 339func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error { 340 err := ff.Parse( 341 fs, args, 342 ff.WithEnvVarPrefix("SP"), 343 ) 344 if err != nil { 345 return err 346 } 347 if cli.DataDir == "" { 348 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir") 349 } 350 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" { 351 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?") 352 } 353 if cli.LivepeerGateway { 354 log.MonkeypatchStderr() 355 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 356 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0") 357 if err != nil { 358 return err 359 } 360 err = fs.Set("livepeer.data-dir", gatewayPath) 361 if err != nil { 362 return err 363 } 364 err = fs.Set("livepeer.gateway", "true") 365 if err != nil { 366 return err 367 } 368 httpAddrFlag := fs.Lookup("livepeer.http-addr") 369 if httpAddrFlag == nil { 370 return fmt.Errorf("livepeer.http-addr not found") 371 } 372 httpAddr := httpAddrFlag.Value.String() 373 if httpAddr == "" { 374 httpAddr = "127.0.0.1:8935" 375 err = fs.Set("livepeer.http-addr", httpAddr) 376 if err != nil { 377 return err 378 } 379 } 380 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr) 381 } 382 for _, dest := range cli.dataDirFlags { 383 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1) 384 } 385 if !cli.SQLLogging { 386 DisableSQLLogging() 387 } else { 388 EnableSQLLogging() 389 } 390 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" { 391 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate") 392 cli.BroadcasterHost = cli.XXDeprecatedPublicHost 393 } 394 if cli.ServerHost == "" && cli.BroadcasterHost != "" { 395 cli.ServerHost = cli.BroadcasterHost 396 } 397 if cli.PublicOAuth { 398 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production") 399 } 400 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" { 401 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?") 402 } 403 if cli.FirebaseServiceAccountFile != "" { 404 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile) 405 if err != nil { 406 return err 407 } 408 cli.FirebaseServiceAccount = string(bs) 409 } 410 return nil 411} 412 413func (cli *CLI) DataFilePath(fpath []string) string { 414 if cli.DataDir == "" { 415 panic("no data dir configured") 416 } 417 // windows does not like colons 418 safe := []string{} 419 for _, p := range fpath { 420 safe = append(safe, strings.ReplaceAll(p, ":", "-")) 421 } 422 fpath = append([]string{cli.DataDir}, safe...) 423 fdpath := filepath.Join(fpath...) 424 return fdpath 425} 426 427// does a file exist in our data dir? 428func (cli *CLI) DataFileExists(fpath []string) (bool, error) { 429 ddpath := cli.DataFilePath(fpath) 430 _, err := os.Stat(ddpath) 431 if err == nil { 432 return true, nil 433 } 434 if errors.Is(err, os.ErrNotExist) { 435 return false, nil 436 } 437 return false, err 438} 439 440// write a file to our data dir 441func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error { 442 fd, err := cli.DataFileCreate(fpath, overwrite) 443 if err != nil { 444 return err 445 } 446 defer fd.Close() 447 _, err = io.Copy(fd, r) 448 if err != nil { 449 return err 450 } 451 452 return nil 453} 454 455// create a file in our data dir. don't forget to close it! 456func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) { 457 ddpath := cli.DataFilePath(fpath) 458 if !overwrite { 459 exists, err := cli.DataFileExists(fpath) 460 if err != nil { 461 return nil, err 462 } 463 if exists { 464 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath) 465 } 466 } 467 if len(fpath) > 1 { 468 dirs, _ := filepath.Split(ddpath) 469 err := os.MkdirAll(dirs, os.ModePerm) 470 if err != nil { 471 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err) 472 } 473 } 474 return os.Create(ddpath) 475} 476 477// get a path to a segment file in our database 478func (cli *CLI) SegmentFilePath(user string, file string) (string, error) { 479 ext := filepath.Ext(file) 480 base := strings.TrimSuffix(file, ext) 481 aqt, err := aqtime.FromString(base) 482 if err != nil { 483 return "", err 484 } 485 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext) 486 yr, mon, day, hr, min, _, _ := aqt.Parts() 487 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil 488} 489 490// get a path to a segment file in our database 491func (cli *CLI) HLSDir(user string) (string, error) { 492 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil 493} 494 495// create a segment file in our database 496func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) { 497 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext) 498 yr, mon, day, hr, min, _, _ := aqt.Parts() 499 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false) 500} 501 502// read a file from our data dir 503func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error { 504 ddpath := cli.DataFilePath(fpath) 505 506 fd, err := os.Open(ddpath) 507 if err != nil { 508 return err 509 } 510 _, err = io.Copy(w, fd) 511 if err != nil { 512 return err 513 } 514 515 return nil 516} 517 518func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) { 519 cli.dataDirFlags = append(cli.dataDirFlags, dest) 520 *dest = filepath.Join(SPDataDir, defaultValue) 521 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 522 fs.Func(name, usage, func(s string) error { 523 *dest = s 524 return nil 525 }) 526} 527 528func (cli *CLI) HasMist() bool { 529 return runtime.GOOS == "linux" 530} 531 532// type for comma-separated ethereum addresses 533func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) { 534 *dest = []aqpub.Pub{} 535 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 536 fs.Func(name, usage, func(s string) error { 537 if s == "" { 538 return nil 539 } 540 strs := strings.Split(s, ",") 541 for _, str := range strs { 542 pub, err := aqpub.FromHexString(str) 543 if err != nil { 544 return err 545 } 546 *dest = append(*dest, pub) 547 } 548 return nil 549 }) 550} 551 552func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) { 553 *dest = defaultValue 554 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 555 fs.Func(name, usage, func(s string) error { 556 if s == "" { 557 return nil 558 } 559 strs := strings.Split(s, ",") 560 *dest = append([]string{}, strs...) 561 return nil 562 }) 563} 564 565func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) { 566 *dest = map[string]string{} 567 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 568 fs.Func(name, usage, func(s string) error { 569 if s == "" { 570 return nil 571 } 572 pairs := strings.Split(s, ",") 573 for _, pair := range pairs { 574 parts := strings.Split(pair, "=") 575 if len(parts) != 2 { 576 return fmt.Errorf("invalid kv flag: %s", pair) 577 } 578 (*dest)[parts[0]] = parts[1] 579 } 580 return nil 581 }) 582} 583 584func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) { 585 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue) 586 fs.Func(name, usage, func(s string) error { 587 if s == "" { 588 return nil 589 } 590 return json.Unmarshal([]byte(s), dest) 591 }) 592} 593 594// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}} 595func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) { 596 *dest = map[string]map[string]int{} 597 fs.Func(name, usage, func(s string) error { 598 if s == "" { 599 return nil 600 } 601 pairs := strings.Split(s, ",") 602 for _, pair := range pairs { 603 scoreSplit := strings.Split(pair, ":") 604 if len(scoreSplit) != 2 { 605 return fmt.Errorf("invalid debug flag: %s", pair) 606 } 607 score, err := strconv.Atoi(scoreSplit[1]) 608 if err != nil { 609 return fmt.Errorf("invalid debug flag: %s", pair) 610 } 611 selectorSplit := strings.Split(scoreSplit[0], "=") 612 if len(selectorSplit) != 2 { 613 return fmt.Errorf("invalid debug flag: %s", pair) 614 } 615 _, ok := (*dest)[selectorSplit[0]] 616 if !ok { 617 (*dest)[selectorSplit[0]] = map[string]int{} 618 } 619 (*dest)[selectorSplit[0]][selectorSplit[1]] = score 620 } 621 622 return nil 623 }) 624} 625 626func (cli *CLI) StreamIsAllowed(did string) error { 627 if cli.WideOpen { 628 return nil 629 } 630 // if the user set no test streams, anyone can stream 631 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 632 // but only valid atproto accounts! did:key is only allowed for our local test stream 633 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX) 634 if openServer && !isDIDKey { 635 return nil 636 } 637 for _, a := range cli.AllowedStreams { 638 if a == did { 639 return nil 640 } 641 } 642 return fmt.Errorf("user is not allowed to stream") 643} 644 645func (cli *CLI) MyDID() string { 646 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost) 647} 648 649func (cli *CLI) HasHTTPS() bool { 650 return cli.Secure || cli.BehindHTTPSProxy 651} 652 653func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) { 654 if cli.SegmentDebugDir == "" { 655 return 656 } 657 go func() { 658 err := os.MkdirAll(cli.SegmentDebugDir, 0755) 659 if err != nil { 660 log.Error(ctx, "failed to create debug directory", "error", err) 661 return 662 } 663 now := aqtime.FromTime(time.Now()) 664 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-"))) 665 fd, err := os.Create(outFile) 666 if err != nil { 667 log.Error(ctx, "failed to create debug file", "error", err) 668 return 669 } 670 defer fd.Close() 671 _, err = io.Copy(fd, r) 672 if err != nil { 673 log.Error(ctx, "failed to copy debug file", "error", err) 674 return 675 } 676 log.Log(ctx, "wrote debug file", "path", outFile) 677 }() 678} 679 680func (cli *CLI) ShouldSyndicate(did string) bool { 681 for _, d := range cli.Syndicate { 682 if d == "*" { 683 return true 684 } 685 if d == did { 686 return true 687 } 688 } 689 return false 690}