Live video on the AT Protocol
1package config
2
3import (
4 "context"
5 "crypto/rsa"
6 "crypto/x509"
7 "encoding/json"
8 "encoding/pem"
9 "errors"
10 "flag"
11 "fmt"
12 "io"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime"
17 "strconv"
18 "strings"
19 "time"
20
21 "math/rand/v2"
22
23 "github.com/lestrrat-go/jwx/v2/jwk"
24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25 "github.com/lmittmann/tint"
26 slogGorm "github.com/orandin/slog-gorm"
27 "github.com/peterbourgon/ff/v3"
28 "stream.place/streamplace/pkg/aqtime"
29 "stream.place/streamplace/pkg/constants"
30 "stream.place/streamplace/pkg/crypto/aqpub"
31 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
32 "stream.place/streamplace/pkg/log"
33)
34
35const SPDataDir = "$SP_DATA_DIR"
36const SegmentsDir = "segments"
37
38type BuildFlags struct {
39 Version string
40 BuildTime int64
41 UUID string
42}
43
44func (b BuildFlags) BuildTimeStr() string {
45 ts := time.Unix(b.BuildTime, 0)
46 return ts.UTC().Format(time.RFC3339)
47}
48
49func (b BuildFlags) BuildTimeStrExpo() string {
50 ts := time.Unix(b.BuildTime, 0)
51 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
52}
53
54type CLI struct {
55 AdminAccount string
56 Build *BuildFlags
57 DataDir string
58 DBURL string
59 EthAccountAddr string
60 EthKeystorePath string
61 EthPassword string
62 FirebaseServiceAccount string
63 FirebaseServiceAccountFile string
64 GitLabURL string
65 HTTPAddr string
66 HTTPInternalAddr string
67 HTTPSAddr string
68 RTMPAddr string
69 RTMPSAddr string
70 RTMPSAddonAddr string
71 Secure bool
72 NoMist bool
73 MistAdminPort int
74 MistHTTPPort int
75 MistRTMPPort int
76 SigningKeyPath string
77 TAURL string
78 TLSCertPath string
79 TLSKeyPath string
80 PKCS11ModulePath string
81 PKCS11Pin string
82 PKCS11TokenSlot string
83 PKCS11TokenLabel string
84 PKCS11TokenSerial string
85 PKCS11KeypairLabel string
86 PKCS11KeypairID string
87 StreamerName string
88 RelayHost string
89 Debug map[string]map[string]int
90 AllowedStreams []string
91 WideOpen bool
92 Peers []string
93 Redirects []string
94 TestStream bool
95 FrontendProxy string
96 PublicOAuth bool
97 AppBundleID string
98 NoFirehose bool
99 PrintChat bool
100 Color string
101 LivepeerGatewayURL string
102 LivepeerGateway bool
103 WHIPTest string
104 Thumbnail bool
105 SmearAudio bool
106 ExternalSigning bool
107 RTMPServerAddon string
108 TracingEndpoint string
109 BroadcasterHost string
110 XXDeprecatedPublicHost string
111 ServerHost string
112 RateLimitPerSecond int
113 RateLimitBurst int
114 RateLimitWebsocket int
115 JWK jwk.Key
116 AccessJWK jwk.Key
117 dataDirFlags []*string
118 DiscordWebhooks []*discordtypes.Webhook
119 NewWebRTCPlayback bool
120 AppleTeamID string
121 AndroidCertFingerprint string
122 Labelers []string
123 AtprotoDID string
124 LivepeerHelp bool
125 PLCURL string
126 ContentFilters *ContentFilters
127 DefaultRecommendedStreamers []string
128 SQLLogging bool
129 SentryDSN string
130 LivepeerDebug bool
131 Tickets []string
132 IrohTopic string
133 DID string
134 DisableIrohRelay bool
135 DevAccountCreds map[string]string
136 StreamSessionTimeout time.Duration
137 Replicators []string
138 WebsocketURL string
139 BehindHTTPSProxy bool
140 SegmentDebugDir string
141 AdminDIDs []string
142 Syndicate []string
143 PlayerTelemetry bool
144}
145
146// ContentFilters represents the content filtering configuration
147type ContentFilters struct {
148 ContentWarnings struct {
149 Enabled bool `json:"enabled"`
150 BlockedWarnings []string `json:"blocked_warnings"`
151 } `json:"content_warnings"`
152 DistributionPolicy struct {
153 Enabled bool `json:"enabled"`
154 } `json:"distribution_policy"`
155}
156
157const (
158 ReplicatorWebsocket string = "websocket"
159 ReplicatorIroh string = "iroh"
160)
161
162func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
163 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
164 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
165 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
166 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
167 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
168 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
169 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
170 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
171 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
172 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
173 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
174 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
175 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key")
176 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key")
177 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
178 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
179 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
180 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
181 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
182 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
183 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
184 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
185 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
186 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
187 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
188 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
189 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
190 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
191 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
192 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development")
193 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
194 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
195 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
196 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node")
197 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to")
198 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four")
199 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
200 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
201 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
202 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
203 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
204 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
205 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
206 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)")
207 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate")
208 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters")
209 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
210 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
211
212 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
213 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
214 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
215 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
216 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
217 fs.StringVar(&cli.RTMPSAddonAddr, "rtmps-addon-addr", ":1936", "address to listen for RTMPS on the addon server")
218 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)")
219 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)")
220 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
221 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
222 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
223 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
224 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to")
225 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
226 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules")
227 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available")
228 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
229 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory")
230 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
231 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
232 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
233 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to")
234 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
235 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
236 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
237 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth")
238 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?")
239 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)")
240 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)")
241 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS")
242 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations")
243 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)")
244 fs.BoolVar(&cli.PlayerTelemetry, "player-telemetry", true, "enable player telemetry")
245
246 fs.Bool("external-signing", true, "DEPRECATED, does nothing.")
247 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
248
249 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
250 _ = starter.NewLivepeerConfig(lpFlags)
251 lpFlags.VisitAll(func(f *flag.Flag) {
252 adapted := LivepeerFlags.CamelToSnake[f.Name]
253 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
254 })
255
256 if runtime.GOOS == "linux" {
257 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
258 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
259 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
260 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
261 }
262 return fs
263}
264
265var StreamplaceSchemePrefix = "streamplace://"
266
267func (cli *CLI) OwnPublicURL() string {
268 // No errors because we know it's valid from AddrFlag
269 host, port, _ := net.SplitHostPort(cli.HTTPAddr)
270
271 ip := net.ParseIP(host)
272 if host == "" || ip.IsUnspecified() {
273 host = "127.0.0.1"
274 }
275 addr := net.JoinHostPort(host, port)
276 return fmt.Sprintf("http://%s", addr)
277}
278
279func (cli *CLI) OwnInternalURL() string {
280 // No errors because we know it's valid from AddrFlag
281 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
282
283 ip := net.ParseIP(host)
284 if ip.IsUnspecified() {
285 host = "127.0.0.1"
286 }
287 addr := net.JoinHostPort(host, port)
288 return fmt.Sprintf("http://%s", addr)
289}
290
291func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
292 bs, err := os.ReadFile(cli.SigningKeyPath)
293 if err != nil {
294 return nil, err
295 }
296 block, _ := pem.Decode(bs)
297 if block == nil {
298 return nil, fmt.Errorf("no RSA key found in signing key")
299 }
300 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
301 if err != nil {
302 return nil, err
303 }
304 return key, nil
305}
306
307func RandomTrailer(length int) string {
308 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
309
310 res := make([]byte, length)
311 for i := 0; i < length; i++ {
312 res[i] = charset[rand.IntN(len(charset))]
313 }
314 return string(res)
315}
316
317func DefaultDataDir() string {
318 home, err := os.UserHomeDir()
319 if err != nil {
320 // not fatal unless the user doesn't set one later
321 return ""
322 }
323 return filepath.Join(home, ".streamplace")
324}
325
326var GormLogger = slogGorm.New(
327 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
328 TimeFormat: time.RFC3339,
329 })),
330 slogGorm.WithTraceAll(),
331)
332
333func DisableSQLLogging() {
334 GormLogger = slogGorm.New(
335 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
336 TimeFormat: time.RFC3339,
337 })),
338 )
339}
340
341func EnableSQLLogging() {
342 GormLogger = slogGorm.New(
343 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
344 TimeFormat: time.RFC3339,
345 })),
346 slogGorm.WithTraceAll(),
347 )
348}
349
350func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
351 err := ff.Parse(
352 fs, args,
353 ff.WithEnvVarPrefix("SP"),
354 )
355 if err != nil {
356 return err
357 }
358 if cli.DataDir == "" {
359 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
360 }
361 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
362 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
363 }
364 if cli.LivepeerGateway {
365 log.MonkeypatchStderr()
366 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
367 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0")
368 if err != nil {
369 return err
370 }
371 err = fs.Set("livepeer.data-dir", gatewayPath)
372 if err != nil {
373 return err
374 }
375 err = fs.Set("livepeer.gateway", "true")
376 if err != nil {
377 return err
378 }
379 httpAddrFlag := fs.Lookup("livepeer.http-addr")
380 if httpAddrFlag == nil {
381 return fmt.Errorf("livepeer.http-addr not found")
382 }
383 httpAddr := httpAddrFlag.Value.String()
384 if httpAddr == "" {
385 httpAddr = "127.0.0.1:8935"
386 err = fs.Set("livepeer.http-addr", httpAddr)
387 if err != nil {
388 return err
389 }
390 }
391 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
392 }
393 for _, dest := range cli.dataDirFlags {
394 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
395 }
396 if !cli.SQLLogging {
397 DisableSQLLogging()
398 } else {
399 EnableSQLLogging()
400 }
401 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" {
402 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate")
403 cli.BroadcasterHost = cli.XXDeprecatedPublicHost
404 }
405 if cli.ServerHost == "" && cli.BroadcasterHost != "" {
406 cli.ServerHost = cli.BroadcasterHost
407 }
408 if cli.PublicOAuth {
409 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production")
410 }
411 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" {
412 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?")
413 }
414 if cli.FirebaseServiceAccountFile != "" {
415 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile)
416 if err != nil {
417 return err
418 }
419 cli.FirebaseServiceAccount = string(bs)
420 }
421 return nil
422}
423
424func (cli *CLI) DataFilePath(fpath []string) string {
425 if cli.DataDir == "" {
426 panic("no data dir configured")
427 }
428 // windows does not like colons
429 safe := []string{}
430 for _, p := range fpath {
431 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
432 }
433 fpath = append([]string{cli.DataDir}, safe...)
434 fdpath := filepath.Join(fpath...)
435 return fdpath
436}
437
438// does a file exist in our data dir?
439func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
440 ddpath := cli.DataFilePath(fpath)
441 _, err := os.Stat(ddpath)
442 if err == nil {
443 return true, nil
444 }
445 if errors.Is(err, os.ErrNotExist) {
446 return false, nil
447 }
448 return false, err
449}
450
451// write a file to our data dir
452func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
453 fd, err := cli.DataFileCreate(fpath, overwrite)
454 if err != nil {
455 return err
456 }
457 defer fd.Close()
458 _, err = io.Copy(fd, r)
459 if err != nil {
460 return err
461 }
462
463 return nil
464}
465
466// create a file in our data dir. don't forget to close it!
467func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
468 ddpath := cli.DataFilePath(fpath)
469 if !overwrite {
470 exists, err := cli.DataFileExists(fpath)
471 if err != nil {
472 return nil, err
473 }
474 if exists {
475 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
476 }
477 }
478 if len(fpath) > 1 {
479 dirs, _ := filepath.Split(ddpath)
480 err := os.MkdirAll(dirs, os.ModePerm)
481 if err != nil {
482 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
483 }
484 }
485 return os.Create(ddpath)
486}
487
488// get a path to a segment file in our database
489func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
490 ext := filepath.Ext(file)
491 base := strings.TrimSuffix(file, ext)
492 aqt, err := aqtime.FromString(base)
493 if err != nil {
494 return "", err
495 }
496 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
497 yr, mon, day, hr, min, _, _ := aqt.Parts()
498 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
499}
500
501// get a path to a segment file in our database
502func (cli *CLI) HLSDir(user string) (string, error) {
503 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
504}
505
506// create a segment file in our database
507func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
508 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
509 yr, mon, day, hr, min, _, _ := aqt.Parts()
510 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
511}
512
513// read a file from our data dir
514func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
515 ddpath := cli.DataFilePath(fpath)
516
517 fd, err := os.Open(ddpath)
518 if err != nil {
519 return err
520 }
521 _, err = io.Copy(w, fd)
522 if err != nil {
523 return err
524 }
525
526 return nil
527}
528
529func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
530 cli.dataDirFlags = append(cli.dataDirFlags, dest)
531 *dest = filepath.Join(SPDataDir, defaultValue)
532 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
533 fs.Func(name, usage, func(s string) error {
534 *dest = s
535 return nil
536 })
537}
538
539func (cli *CLI) HasMist() bool {
540 return runtime.GOOS == "linux"
541}
542
543// type for comma-separated ethereum addresses
544func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
545 *dest = []aqpub.Pub{}
546 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
547 fs.Func(name, usage, func(s string) error {
548 if s == "" {
549 return nil
550 }
551 strs := strings.Split(s, ",")
552 for _, str := range strs {
553 pub, err := aqpub.FromHexString(str)
554 if err != nil {
555 return err
556 }
557 *dest = append(*dest, pub)
558 }
559 return nil
560 })
561}
562
563func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) {
564 *dest = defaultValue
565 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
566 fs.Func(name, usage, func(s string) error {
567 if s == "" {
568 return nil
569 }
570 strs := strings.Split(s, ",")
571 *dest = append([]string{}, strs...)
572 return nil
573 })
574}
575
576func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) {
577 *dest = map[string]string{}
578 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
579 fs.Func(name, usage, func(s string) error {
580 if s == "" {
581 return nil
582 }
583 pairs := strings.Split(s, ",")
584 for _, pair := range pairs {
585 parts := strings.Split(pair, "=")
586 if len(parts) != 2 {
587 return fmt.Errorf("invalid kv flag: %s", pair)
588 }
589 (*dest)[parts[0]] = parts[1]
590 }
591 return nil
592 })
593}
594
595func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
596 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
597 fs.Func(name, usage, func(s string) error {
598 if s == "" {
599 return nil
600 }
601 return json.Unmarshal([]byte(s), dest)
602 })
603}
604
605// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
606func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
607 *dest = map[string]map[string]int{}
608 fs.Func(name, usage, func(s string) error {
609 if s == "" {
610 return nil
611 }
612 pairs := strings.Split(s, ",")
613 for _, pair := range pairs {
614 scoreSplit := strings.Split(pair, ":")
615 if len(scoreSplit) != 2 {
616 return fmt.Errorf("invalid debug flag: %s", pair)
617 }
618 score, err := strconv.Atoi(scoreSplit[1])
619 if err != nil {
620 return fmt.Errorf("invalid debug flag: %s", pair)
621 }
622 selectorSplit := strings.Split(scoreSplit[0], "=")
623 if len(selectorSplit) != 2 {
624 return fmt.Errorf("invalid debug flag: %s", pair)
625 }
626 _, ok := (*dest)[selectorSplit[0]]
627 if !ok {
628 (*dest)[selectorSplit[0]] = map[string]int{}
629 }
630 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
631 }
632
633 return nil
634 })
635}
636
637func (cli *CLI) StreamIsAllowed(did string) error {
638 if cli.WideOpen {
639 return nil
640 }
641 // if the user set no test streams, anyone can stream
642 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
643 // but only valid atproto accounts! did:key is only allowed for our local test stream
644 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
645 if openServer && !isDIDKey {
646 return nil
647 }
648 for _, a := range cli.AllowedStreams {
649 if a == did {
650 return nil
651 }
652 }
653 return fmt.Errorf("user is not allowed to stream")
654}
655
656func (cli *CLI) MyDID() string {
657 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost)
658}
659
660func (cli *CLI) HasHTTPS() bool {
661 return cli.Secure || cli.BehindHTTPSProxy
662}
663
664func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) {
665 if cli.SegmentDebugDir == "" {
666 return
667 }
668 go func() {
669 err := os.MkdirAll(cli.SegmentDebugDir, 0755)
670 if err != nil {
671 log.Error(ctx, "failed to create debug directory", "error", err)
672 return
673 }
674 now := aqtime.FromTime(time.Now())
675 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-")))
676 fd, err := os.Create(outFile)
677 if err != nil {
678 log.Error(ctx, "failed to create debug file", "error", err)
679 return
680 }
681 defer fd.Close()
682 _, err = io.Copy(fd, r)
683 if err != nil {
684 log.Error(ctx, "failed to copy debug file", "error", err)
685 return
686 }
687 log.Log(ctx, "wrote debug file", "path", outFile)
688 }()
689}
690
691func (cli *CLI) ShouldSyndicate(did string) bool {
692 for _, d := range cli.Syndicate {
693 if d == "*" {
694 return true
695 }
696 if d == did {
697 return true
698 }
699 }
700 return false
701}