Live video on the AT Protocol
1package config
2
3import (
4 "context"
5 "crypto/rsa"
6 "crypto/x509"
7 "encoding/json"
8 "encoding/pem"
9 "errors"
10 "flag"
11 "fmt"
12 "io"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime"
17 "strconv"
18 "strings"
19 "time"
20
21 "math/rand/v2"
22
23 "github.com/lestrrat-go/jwx/v2/jwk"
24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25 "github.com/lmittmann/tint"
26 slogGorm "github.com/orandin/slog-gorm"
27 "github.com/peterbourgon/ff/v3"
28 "stream.place/streamplace/pkg/aqtime"
29 "stream.place/streamplace/pkg/constants"
30 "stream.place/streamplace/pkg/crypto/aqpub"
31 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
32 "stream.place/streamplace/pkg/log"
33)
34
35const SPDataDir = "$SP_DATA_DIR"
36const SegmentsDir = "segments"
37
38type BuildFlags struct {
39 Version string
40 BuildTime int64
41 UUID string
42}
43
44func (b BuildFlags) BuildTimeStr() string {
45 ts := time.Unix(b.BuildTime, 0)
46 return ts.UTC().Format(time.RFC3339)
47}
48
49func (b BuildFlags) BuildTimeStrExpo() string {
50 ts := time.Unix(b.BuildTime, 0)
51 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
52}
53
54type CLI struct {
55 AdminAccount string
56 Build *BuildFlags
57 DataDir string
58 DBURL string
59 EthAccountAddr string
60 EthKeystorePath string
61 EthPassword string
62 FirebaseServiceAccount string
63 FirebaseServiceAccountFile string
64 GitLabURL string
65 HTTPAddr string
66 HTTPInternalAddr string
67 HTTPSAddr string
68 RtmpsAddr string
69 Secure bool
70 NoMist bool
71 MistAdminPort int
72 MistHTTPPort int
73 MistRTMPPort int
74 SigningKeyPath string
75 TAURL string
76 TLSCertPath string
77 TLSKeyPath string
78 PKCS11ModulePath string
79 PKCS11Pin string
80 PKCS11TokenSlot string
81 PKCS11TokenLabel string
82 PKCS11TokenSerial string
83 PKCS11KeypairLabel string
84 PKCS11KeypairID string
85 StreamerName string
86 RelayHost string
87 Debug map[string]map[string]int
88 AllowedStreams []string
89 WideOpen bool
90 Peers []string
91 Redirects []string
92 TestStream bool
93 FrontendProxy string
94 PublicOAuth bool
95 AppBundleID string
96 NoFirehose bool
97 PrintChat bool
98 Color string
99 LivepeerGatewayURL string
100 LivepeerGateway bool
101 WHIPTest string
102 Thumbnail bool
103 SmearAudio bool
104 ExternalSigning bool
105 RTMPServerAddon string
106 TracingEndpoint string
107 BroadcasterHost string
108 XXDeprecatedPublicHost string
109 ServerHost string
110 RateLimitPerSecond int
111 RateLimitBurst int
112 RateLimitWebsocket int
113 JWK jwk.Key
114 AccessJWK jwk.Key
115 dataDirFlags []*string
116 DiscordWebhooks []*discordtypes.Webhook
117 NewWebRTCPlayback bool
118 AppleTeamID string
119 AndroidCertFingerprint string
120 Labelers []string
121 AtprotoDID string
122 LivepeerHelp bool
123 PLCURL string
124 ContentFilters *ContentFilters
125 SQLLogging bool
126 SentryDSN string
127 LivepeerDebug bool
128 Tickets []string
129 IrohTopic string
130 DID string
131 DisableIrohRelay bool
132 DevAccountCreds map[string]string
133 StreamSessionTimeout time.Duration
134 Replicators []string
135 WebsocketURL string
136 BehindHTTPSProxy bool
137 SegmentDebugDir string
138 Syndicate []string
139}
140
141// ContentFilters represents the content filtering configuration
142type ContentFilters struct {
143 ContentWarnings struct {
144 Enabled bool `json:"enabled"`
145 BlockedWarnings []string `json:"blocked_warnings"`
146 } `json:"content_warnings"`
147 DistributionPolicy struct {
148 Enabled bool `json:"enabled"`
149 } `json:"distribution_policy"`
150}
151
152const (
153 ReplicatorWebsocket string = "websocket"
154 ReplicatorIroh string = "iroh"
155)
156
157func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
158 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
159 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
160 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
161 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
162 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
163 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
164 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
165 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
166 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
167 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
168 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
169 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
170 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key")
171 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key")
172 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
173 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
174 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
175 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
176 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
177 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
178 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
179 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
180 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
181 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
182 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
183 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
184 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
185 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
186 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
187 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development")
188 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
189 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
190 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
191 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node")
192 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to")
193 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four")
194 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
195 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
196 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
197 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
198 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
199 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
200 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
201 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)")
202 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate")
203 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters")
204 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
205 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
206 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
207 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
208 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
209 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
210 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
211 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
212 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
213 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
214 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
215 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
216 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to")
217 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
218 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules")
219 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
220 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory")
221 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
222 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
223 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
224 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to")
225 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
226 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
227 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
228 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth")
229 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?")
230 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)")
231 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)")
232 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS")
233 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)")
234
235 fs.Bool("external-signing", true, "DEPRECATED, does nothing.")
236 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
237
238 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
239 _ = starter.NewLivepeerConfig(lpFlags)
240 lpFlags.VisitAll(func(f *flag.Flag) {
241 adapted := LivepeerFlags.CamelToSnake[f.Name]
242 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
243 })
244
245 if runtime.GOOS == "linux" {
246 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
247 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
248 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
249 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
250 }
251 return fs
252}
253
254var StreamplaceSchemePrefix = "streamplace://"
255
256func (cli *CLI) OwnPublicURL() string {
257 // No errors because we know it's valid from AddrFlag
258 host, port, _ := net.SplitHostPort(cli.HTTPAddr)
259
260 ip := net.ParseIP(host)
261 if host == "" || ip.IsUnspecified() {
262 host = "127.0.0.1"
263 }
264 addr := net.JoinHostPort(host, port)
265 return fmt.Sprintf("http://%s", addr)
266}
267
268func (cli *CLI) OwnInternalURL() string {
269 // No errors because we know it's valid from AddrFlag
270 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
271
272 ip := net.ParseIP(host)
273 if ip.IsUnspecified() {
274 host = "127.0.0.1"
275 }
276 addr := net.JoinHostPort(host, port)
277 return fmt.Sprintf("http://%s", addr)
278}
279
280func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
281 bs, err := os.ReadFile(cli.SigningKeyPath)
282 if err != nil {
283 return nil, err
284 }
285 block, _ := pem.Decode(bs)
286 if block == nil {
287 return nil, fmt.Errorf("no RSA key found in signing key")
288 }
289 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
290 if err != nil {
291 return nil, err
292 }
293 return key, nil
294}
295
296func RandomTrailer(length int) string {
297 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
298
299 res := make([]byte, length)
300 for i := 0; i < length; i++ {
301 res[i] = charset[rand.IntN(len(charset))]
302 }
303 return string(res)
304}
305
306func DefaultDataDir() string {
307 home, err := os.UserHomeDir()
308 if err != nil {
309 // not fatal unless the user doesn't set one later
310 return ""
311 }
312 return filepath.Join(home, ".streamplace")
313}
314
315var GormLogger = slogGorm.New(
316 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
317 TimeFormat: time.RFC3339,
318 })),
319 slogGorm.WithTraceAll(),
320)
321
322func DisableSQLLogging() {
323 GormLogger = slogGorm.New(
324 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
325 TimeFormat: time.RFC3339,
326 })),
327 )
328}
329
330func EnableSQLLogging() {
331 GormLogger = slogGorm.New(
332 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
333 TimeFormat: time.RFC3339,
334 })),
335 slogGorm.WithTraceAll(),
336 )
337}
338
339func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
340 err := ff.Parse(
341 fs, args,
342 ff.WithEnvVarPrefix("SP"),
343 )
344 if err != nil {
345 return err
346 }
347 if cli.DataDir == "" {
348 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
349 }
350 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
351 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
352 }
353 if cli.LivepeerGateway {
354 log.MonkeypatchStderr()
355 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
356 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0")
357 if err != nil {
358 return err
359 }
360 err = fs.Set("livepeer.data-dir", gatewayPath)
361 if err != nil {
362 return err
363 }
364 err = fs.Set("livepeer.gateway", "true")
365 if err != nil {
366 return err
367 }
368 httpAddrFlag := fs.Lookup("livepeer.http-addr")
369 if httpAddrFlag == nil {
370 return fmt.Errorf("livepeer.http-addr not found")
371 }
372 httpAddr := httpAddrFlag.Value.String()
373 if httpAddr == "" {
374 httpAddr = "127.0.0.1:8935"
375 err = fs.Set("livepeer.http-addr", httpAddr)
376 if err != nil {
377 return err
378 }
379 }
380 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
381 }
382 for _, dest := range cli.dataDirFlags {
383 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
384 }
385 if !cli.SQLLogging {
386 DisableSQLLogging()
387 } else {
388 EnableSQLLogging()
389 }
390 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" {
391 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate")
392 cli.BroadcasterHost = cli.XXDeprecatedPublicHost
393 }
394 if cli.ServerHost == "" && cli.BroadcasterHost != "" {
395 cli.ServerHost = cli.BroadcasterHost
396 }
397 if cli.PublicOAuth {
398 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production")
399 }
400 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" {
401 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?")
402 }
403 if cli.FirebaseServiceAccountFile != "" {
404 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile)
405 if err != nil {
406 return err
407 }
408 cli.FirebaseServiceAccount = string(bs)
409 }
410 return nil
411}
412
413func (cli *CLI) DataFilePath(fpath []string) string {
414 if cli.DataDir == "" {
415 panic("no data dir configured")
416 }
417 // windows does not like colons
418 safe := []string{}
419 for _, p := range fpath {
420 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
421 }
422 fpath = append([]string{cli.DataDir}, safe...)
423 fdpath := filepath.Join(fpath...)
424 return fdpath
425}
426
427// does a file exist in our data dir?
428func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
429 ddpath := cli.DataFilePath(fpath)
430 _, err := os.Stat(ddpath)
431 if err == nil {
432 return true, nil
433 }
434 if errors.Is(err, os.ErrNotExist) {
435 return false, nil
436 }
437 return false, err
438}
439
440// write a file to our data dir
441func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
442 fd, err := cli.DataFileCreate(fpath, overwrite)
443 if err != nil {
444 return err
445 }
446 defer fd.Close()
447 _, err = io.Copy(fd, r)
448 if err != nil {
449 return err
450 }
451
452 return nil
453}
454
455// create a file in our data dir. don't forget to close it!
456func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
457 ddpath := cli.DataFilePath(fpath)
458 if !overwrite {
459 exists, err := cli.DataFileExists(fpath)
460 if err != nil {
461 return nil, err
462 }
463 if exists {
464 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
465 }
466 }
467 if len(fpath) > 1 {
468 dirs, _ := filepath.Split(ddpath)
469 err := os.MkdirAll(dirs, os.ModePerm)
470 if err != nil {
471 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
472 }
473 }
474 return os.Create(ddpath)
475}
476
477// get a path to a segment file in our database
478func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
479 ext := filepath.Ext(file)
480 base := strings.TrimSuffix(file, ext)
481 aqt, err := aqtime.FromString(base)
482 if err != nil {
483 return "", err
484 }
485 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
486 yr, mon, day, hr, min, _, _ := aqt.Parts()
487 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
488}
489
490// get a path to a segment file in our database
491func (cli *CLI) HLSDir(user string) (string, error) {
492 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
493}
494
495// create a segment file in our database
496func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
497 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
498 yr, mon, day, hr, min, _, _ := aqt.Parts()
499 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
500}
501
502// read a file from our data dir
503func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
504 ddpath := cli.DataFilePath(fpath)
505
506 fd, err := os.Open(ddpath)
507 if err != nil {
508 return err
509 }
510 _, err = io.Copy(w, fd)
511 if err != nil {
512 return err
513 }
514
515 return nil
516}
517
518func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
519 cli.dataDirFlags = append(cli.dataDirFlags, dest)
520 *dest = filepath.Join(SPDataDir, defaultValue)
521 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
522 fs.Func(name, usage, func(s string) error {
523 *dest = s
524 return nil
525 })
526}
527
528func (cli *CLI) HasMist() bool {
529 return runtime.GOOS == "linux"
530}
531
532// type for comma-separated ethereum addresses
533func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
534 *dest = []aqpub.Pub{}
535 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
536 fs.Func(name, usage, func(s string) error {
537 if s == "" {
538 return nil
539 }
540 strs := strings.Split(s, ",")
541 for _, str := range strs {
542 pub, err := aqpub.FromHexString(str)
543 if err != nil {
544 return err
545 }
546 *dest = append(*dest, pub)
547 }
548 return nil
549 })
550}
551
552func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) {
553 *dest = defaultValue
554 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
555 fs.Func(name, usage, func(s string) error {
556 if s == "" {
557 return nil
558 }
559 strs := strings.Split(s, ",")
560 *dest = append([]string{}, strs...)
561 return nil
562 })
563}
564
565func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) {
566 *dest = map[string]string{}
567 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
568 fs.Func(name, usage, func(s string) error {
569 if s == "" {
570 return nil
571 }
572 pairs := strings.Split(s, ",")
573 for _, pair := range pairs {
574 parts := strings.Split(pair, "=")
575 if len(parts) != 2 {
576 return fmt.Errorf("invalid kv flag: %s", pair)
577 }
578 (*dest)[parts[0]] = parts[1]
579 }
580 return nil
581 })
582}
583
584func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
585 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
586 fs.Func(name, usage, func(s string) error {
587 if s == "" {
588 return nil
589 }
590 return json.Unmarshal([]byte(s), dest)
591 })
592}
593
594// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
595func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
596 *dest = map[string]map[string]int{}
597 fs.Func(name, usage, func(s string) error {
598 if s == "" {
599 return nil
600 }
601 pairs := strings.Split(s, ",")
602 for _, pair := range pairs {
603 scoreSplit := strings.Split(pair, ":")
604 if len(scoreSplit) != 2 {
605 return fmt.Errorf("invalid debug flag: %s", pair)
606 }
607 score, err := strconv.Atoi(scoreSplit[1])
608 if err != nil {
609 return fmt.Errorf("invalid debug flag: %s", pair)
610 }
611 selectorSplit := strings.Split(scoreSplit[0], "=")
612 if len(selectorSplit) != 2 {
613 return fmt.Errorf("invalid debug flag: %s", pair)
614 }
615 _, ok := (*dest)[selectorSplit[0]]
616 if !ok {
617 (*dest)[selectorSplit[0]] = map[string]int{}
618 }
619 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
620 }
621
622 return nil
623 })
624}
625
626func (cli *CLI) StreamIsAllowed(did string) error {
627 if cli.WideOpen {
628 return nil
629 }
630 // if the user set no test streams, anyone can stream
631 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
632 // but only valid atproto accounts! did:key is only allowed for our local test stream
633 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
634 if openServer && !isDIDKey {
635 return nil
636 }
637 for _, a := range cli.AllowedStreams {
638 if a == did {
639 return nil
640 }
641 }
642 return fmt.Errorf("user is not allowed to stream")
643}
644
645func (cli *CLI) MyDID() string {
646 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost)
647}
648
649func (cli *CLI) HasHTTPS() bool {
650 return cli.Secure || cli.BehindHTTPSProxy
651}
652
653func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) {
654 if cli.SegmentDebugDir == "" {
655 return
656 }
657 go func() {
658 err := os.MkdirAll(cli.SegmentDebugDir, 0755)
659 if err != nil {
660 log.Error(ctx, "failed to create debug directory", "error", err)
661 return
662 }
663 now := aqtime.FromTime(time.Now())
664 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-")))
665 fd, err := os.Create(outFile)
666 if err != nil {
667 log.Error(ctx, "failed to create debug file", "error", err)
668 return
669 }
670 defer fd.Close()
671 _, err = io.Copy(fd, r)
672 if err != nil {
673 log.Error(ctx, "failed to copy debug file", "error", err)
674 return
675 }
676 log.Log(ctx, "wrote debug file", "path", outFile)
677 }()
678}
679
680func (cli *CLI) ShouldSyndicate(did string) bool {
681 for _, d := range cli.Syndicate {
682 if d == "*" {
683 return true
684 }
685 if d == did {
686 return true
687 }
688 }
689 return false
690}