Live video on the AT Protocol
1package config
2
3import (
4 "context"
5 "crypto/rsa"
6 "crypto/x509"
7 "encoding/json"
8 "encoding/pem"
9 "errors"
10 "flag"
11 "fmt"
12 "io"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime"
17 "strconv"
18 "strings"
19 "time"
20
21 "math/rand/v2"
22
23 "github.com/lestrrat-go/jwx/v2/jwk"
24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25 "github.com/lmittmann/tint"
26 slogGorm "github.com/orandin/slog-gorm"
27 "github.com/peterbourgon/ff/v3"
28 "stream.place/streamplace/pkg/aqtime"
29 "stream.place/streamplace/pkg/constants"
30 "stream.place/streamplace/pkg/crypto/aqpub"
31 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
32 "stream.place/streamplace/pkg/log"
33)
34
35const SPDataDir = "$SP_DATA_DIR"
36const SegmentsDir = "segments"
37
38type BuildFlags struct {
39 Version string
40 BuildTime int64
41 UUID string
42}
43
44func (b BuildFlags) BuildTimeStr() string {
45 ts := time.Unix(b.BuildTime, 0)
46 return ts.UTC().Format(time.RFC3339)
47}
48
49func (b BuildFlags) BuildTimeStrExpo() string {
50 ts := time.Unix(b.BuildTime, 0)
51 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
52}
53
54type CLI struct {
55 AdminAccount string
56 Build *BuildFlags
57 DataDir string
58 DBURL string
59 EthAccountAddr string
60 EthKeystorePath string
61 EthPassword string
62 FirebaseServiceAccount string
63 FirebaseServiceAccountFile string
64 GitLabURL string
65 HTTPAddr string
66 HTTPInternalAddr string
67 HTTPSAddr string
68 RTMPAddr string
69 RTMPSAddr string
70 RTMPSAddonAddr string
71 Secure bool
72 NoMist bool
73 MistAdminPort int
74 MistHTTPPort int
75 MistRTMPPort int
76 SigningKeyPath string
77 TAURL string
78 TLSCertPath string
79 TLSKeyPath string
80 PKCS11ModulePath string
81 PKCS11Pin string
82 PKCS11TokenSlot string
83 PKCS11TokenLabel string
84 PKCS11TokenSerial string
85 PKCS11KeypairLabel string
86 PKCS11KeypairID string
87 StreamerName string
88 RelayHost string
89 Debug map[string]map[string]int
90 AllowedStreams []string
91 WideOpen bool
92 Peers []string
93 Redirects []string
94 TestStream bool
95 FrontendProxy string
96 PublicOAuth bool
97 AppBundleID string
98 NoFirehose bool
99 PrintChat bool
100 Color string
101 LivepeerGatewayURL string
102 LivepeerGateway bool
103 WHIPTest string
104 Thumbnail bool
105 SmearAudio bool
106 ExternalSigning bool
107 RTMPServerAddon string
108 TracingEndpoint string
109 BroadcasterHost string
110 XXDeprecatedPublicHost string
111 ServerHost string
112 RateLimitPerSecond int
113 RateLimitBurst int
114 RateLimitWebsocket int
115 JWK jwk.Key
116 AccessJWK jwk.Key
117 dataDirFlags []*string
118 DiscordWebhooks []*discordtypes.Webhook
119 NewWebRTCPlayback bool
120 AppleTeamID string
121 AndroidCertFingerprint string
122 Labelers []string
123 AtprotoDID string
124 LivepeerHelp bool
125 PLCURL string
126 ContentFilters *ContentFilters
127 DefaultRecommendedStreamers []string
128 SQLLogging bool
129 SentryDSN string
130 LivepeerDebug bool
131 Tickets []string
132 IrohTopic string
133 DID string
134 DisableIrohRelay bool
135 DevAccountCreds map[string]string
136 StreamSessionTimeout time.Duration
137 Replicators []string
138 WebsocketURL string
139 BehindHTTPSProxy bool
140 SegmentDebugDir string
141 AdminDIDs []string
142 Syndicate []string
143}
144
145// ContentFilters represents the content filtering configuration
146type ContentFilters struct {
147 ContentWarnings struct {
148 Enabled bool `json:"enabled"`
149 BlockedWarnings []string `json:"blocked_warnings"`
150 } `json:"content_warnings"`
151 DistributionPolicy struct {
152 Enabled bool `json:"enabled"`
153 } `json:"distribution_policy"`
154}
155
156const (
157 ReplicatorWebsocket string = "websocket"
158 ReplicatorIroh string = "iroh"
159)
160
161func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
162 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
163 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
164 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
165 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
166 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
167 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
168 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
169 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
170 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
171 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
172 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
173 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
174 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key")
175 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key")
176 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
177 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
178 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
179 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
180 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
181 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
182 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
183 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
184 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
185 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
186 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
187 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
188 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
189 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
190 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
191 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development")
192 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
193 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
194 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
195 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node")
196 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to")
197 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four")
198 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
199 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
200 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
201 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
202 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
203 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
204 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
205 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)")
206 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate")
207 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters")
208 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
209 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
210
211 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
212 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
213 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
214 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
215 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
216 fs.StringVar(&cli.RTMPSAddonAddr, "rtmps-addon-addr", ":1936", "address to listen for RTMPS on the addon server")
217 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)")
218 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)")
219 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
220 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
221 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
222 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
223 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to")
224 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
225 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules")
226 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available")
227 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
228 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory")
229 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
230 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
231 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
232 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to")
233 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
234 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
235 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
236 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth")
237 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?")
238 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)")
239 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)")
240 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS")
241 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations")
242 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)")
243
244 fs.Bool("external-signing", true, "DEPRECATED, does nothing.")
245 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
246
247 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
248 _ = starter.NewLivepeerConfig(lpFlags)
249 lpFlags.VisitAll(func(f *flag.Flag) {
250 adapted := LivepeerFlags.CamelToSnake[f.Name]
251 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
252 })
253
254 if runtime.GOOS == "linux" {
255 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
256 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
257 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
258 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
259 }
260 return fs
261}
262
263var StreamplaceSchemePrefix = "streamplace://"
264
265func (cli *CLI) OwnPublicURL() string {
266 // No errors because we know it's valid from AddrFlag
267 host, port, _ := net.SplitHostPort(cli.HTTPAddr)
268
269 ip := net.ParseIP(host)
270 if host == "" || ip.IsUnspecified() {
271 host = "127.0.0.1"
272 }
273 addr := net.JoinHostPort(host, port)
274 return fmt.Sprintf("http://%s", addr)
275}
276
277func (cli *CLI) OwnInternalURL() string {
278 // No errors because we know it's valid from AddrFlag
279 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
280
281 ip := net.ParseIP(host)
282 if ip.IsUnspecified() {
283 host = "127.0.0.1"
284 }
285 addr := net.JoinHostPort(host, port)
286 return fmt.Sprintf("http://%s", addr)
287}
288
289func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
290 bs, err := os.ReadFile(cli.SigningKeyPath)
291 if err != nil {
292 return nil, err
293 }
294 block, _ := pem.Decode(bs)
295 if block == nil {
296 return nil, fmt.Errorf("no RSA key found in signing key")
297 }
298 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
299 if err != nil {
300 return nil, err
301 }
302 return key, nil
303}
304
305func RandomTrailer(length int) string {
306 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
307
308 res := make([]byte, length)
309 for i := 0; i < length; i++ {
310 res[i] = charset[rand.IntN(len(charset))]
311 }
312 return string(res)
313}
314
315func DefaultDataDir() string {
316 home, err := os.UserHomeDir()
317 if err != nil {
318 // not fatal unless the user doesn't set one later
319 return ""
320 }
321 return filepath.Join(home, ".streamplace")
322}
323
324var GormLogger = slogGorm.New(
325 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
326 TimeFormat: time.RFC3339,
327 })),
328 slogGorm.WithTraceAll(),
329)
330
331func DisableSQLLogging() {
332 GormLogger = slogGorm.New(
333 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
334 TimeFormat: time.RFC3339,
335 })),
336 )
337}
338
339func EnableSQLLogging() {
340 GormLogger = slogGorm.New(
341 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
342 TimeFormat: time.RFC3339,
343 })),
344 slogGorm.WithTraceAll(),
345 )
346}
347
348func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
349 err := ff.Parse(
350 fs, args,
351 ff.WithEnvVarPrefix("SP"),
352 )
353 if err != nil {
354 return err
355 }
356 if cli.DataDir == "" {
357 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
358 }
359 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
360 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
361 }
362 if cli.LivepeerGateway {
363 log.MonkeypatchStderr()
364 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
365 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0")
366 if err != nil {
367 return err
368 }
369 err = fs.Set("livepeer.data-dir", gatewayPath)
370 if err != nil {
371 return err
372 }
373 err = fs.Set("livepeer.gateway", "true")
374 if err != nil {
375 return err
376 }
377 httpAddrFlag := fs.Lookup("livepeer.http-addr")
378 if httpAddrFlag == nil {
379 return fmt.Errorf("livepeer.http-addr not found")
380 }
381 httpAddr := httpAddrFlag.Value.String()
382 if httpAddr == "" {
383 httpAddr = "127.0.0.1:8935"
384 err = fs.Set("livepeer.http-addr", httpAddr)
385 if err != nil {
386 return err
387 }
388 }
389 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
390 }
391 for _, dest := range cli.dataDirFlags {
392 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
393 }
394 if !cli.SQLLogging {
395 DisableSQLLogging()
396 } else {
397 EnableSQLLogging()
398 }
399 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" {
400 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate")
401 cli.BroadcasterHost = cli.XXDeprecatedPublicHost
402 }
403 if cli.ServerHost == "" && cli.BroadcasterHost != "" {
404 cli.ServerHost = cli.BroadcasterHost
405 }
406 if cli.PublicOAuth {
407 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production")
408 }
409 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" {
410 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?")
411 }
412 if cli.FirebaseServiceAccountFile != "" {
413 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile)
414 if err != nil {
415 return err
416 }
417 cli.FirebaseServiceAccount = string(bs)
418 }
419 return nil
420}
421
422func (cli *CLI) DataFilePath(fpath []string) string {
423 if cli.DataDir == "" {
424 panic("no data dir configured")
425 }
426 // windows does not like colons
427 safe := []string{}
428 for _, p := range fpath {
429 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
430 }
431 fpath = append([]string{cli.DataDir}, safe...)
432 fdpath := filepath.Join(fpath...)
433 return fdpath
434}
435
436// does a file exist in our data dir?
437func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
438 ddpath := cli.DataFilePath(fpath)
439 _, err := os.Stat(ddpath)
440 if err == nil {
441 return true, nil
442 }
443 if errors.Is(err, os.ErrNotExist) {
444 return false, nil
445 }
446 return false, err
447}
448
449// write a file to our data dir
450func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
451 fd, err := cli.DataFileCreate(fpath, overwrite)
452 if err != nil {
453 return err
454 }
455 defer fd.Close()
456 _, err = io.Copy(fd, r)
457 if err != nil {
458 return err
459 }
460
461 return nil
462}
463
464// create a file in our data dir. don't forget to close it!
465func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
466 ddpath := cli.DataFilePath(fpath)
467 if !overwrite {
468 exists, err := cli.DataFileExists(fpath)
469 if err != nil {
470 return nil, err
471 }
472 if exists {
473 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
474 }
475 }
476 if len(fpath) > 1 {
477 dirs, _ := filepath.Split(ddpath)
478 err := os.MkdirAll(dirs, os.ModePerm)
479 if err != nil {
480 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
481 }
482 }
483 return os.Create(ddpath)
484}
485
486// get a path to a segment file in our database
487func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
488 ext := filepath.Ext(file)
489 base := strings.TrimSuffix(file, ext)
490 aqt, err := aqtime.FromString(base)
491 if err != nil {
492 return "", err
493 }
494 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
495 yr, mon, day, hr, min, _, _ := aqt.Parts()
496 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
497}
498
499// get a path to a segment file in our database
500func (cli *CLI) HLSDir(user string) (string, error) {
501 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
502}
503
504// create a segment file in our database
505func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
506 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
507 yr, mon, day, hr, min, _, _ := aqt.Parts()
508 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
509}
510
511// read a file from our data dir
512func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
513 ddpath := cli.DataFilePath(fpath)
514
515 fd, err := os.Open(ddpath)
516 if err != nil {
517 return err
518 }
519 _, err = io.Copy(w, fd)
520 if err != nil {
521 return err
522 }
523
524 return nil
525}
526
527func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
528 cli.dataDirFlags = append(cli.dataDirFlags, dest)
529 *dest = filepath.Join(SPDataDir, defaultValue)
530 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
531 fs.Func(name, usage, func(s string) error {
532 *dest = s
533 return nil
534 })
535}
536
537func (cli *CLI) HasMist() bool {
538 return runtime.GOOS == "linux"
539}
540
541// type for comma-separated ethereum addresses
542func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
543 *dest = []aqpub.Pub{}
544 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
545 fs.Func(name, usage, func(s string) error {
546 if s == "" {
547 return nil
548 }
549 strs := strings.Split(s, ",")
550 for _, str := range strs {
551 pub, err := aqpub.FromHexString(str)
552 if err != nil {
553 return err
554 }
555 *dest = append(*dest, pub)
556 }
557 return nil
558 })
559}
560
561func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) {
562 *dest = defaultValue
563 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
564 fs.Func(name, usage, func(s string) error {
565 if s == "" {
566 return nil
567 }
568 strs := strings.Split(s, ",")
569 *dest = append([]string{}, strs...)
570 return nil
571 })
572}
573
574func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) {
575 *dest = map[string]string{}
576 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
577 fs.Func(name, usage, func(s string) error {
578 if s == "" {
579 return nil
580 }
581 pairs := strings.Split(s, ",")
582 for _, pair := range pairs {
583 parts := strings.Split(pair, "=")
584 if len(parts) != 2 {
585 return fmt.Errorf("invalid kv flag: %s", pair)
586 }
587 (*dest)[parts[0]] = parts[1]
588 }
589 return nil
590 })
591}
592
593func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
594 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
595 fs.Func(name, usage, func(s string) error {
596 if s == "" {
597 return nil
598 }
599 return json.Unmarshal([]byte(s), dest)
600 })
601}
602
603// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
604func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
605 *dest = map[string]map[string]int{}
606 fs.Func(name, usage, func(s string) error {
607 if s == "" {
608 return nil
609 }
610 pairs := strings.Split(s, ",")
611 for _, pair := range pairs {
612 scoreSplit := strings.Split(pair, ":")
613 if len(scoreSplit) != 2 {
614 return fmt.Errorf("invalid debug flag: %s", pair)
615 }
616 score, err := strconv.Atoi(scoreSplit[1])
617 if err != nil {
618 return fmt.Errorf("invalid debug flag: %s", pair)
619 }
620 selectorSplit := strings.Split(scoreSplit[0], "=")
621 if len(selectorSplit) != 2 {
622 return fmt.Errorf("invalid debug flag: %s", pair)
623 }
624 _, ok := (*dest)[selectorSplit[0]]
625 if !ok {
626 (*dest)[selectorSplit[0]] = map[string]int{}
627 }
628 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
629 }
630
631 return nil
632 })
633}
634
635func (cli *CLI) StreamIsAllowed(did string) error {
636 if cli.WideOpen {
637 return nil
638 }
639 // if the user set no test streams, anyone can stream
640 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
641 // but only valid atproto accounts! did:key is only allowed for our local test stream
642 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
643 if openServer && !isDIDKey {
644 return nil
645 }
646 for _, a := range cli.AllowedStreams {
647 if a == did {
648 return nil
649 }
650 }
651 return fmt.Errorf("user is not allowed to stream")
652}
653
654func (cli *CLI) MyDID() string {
655 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost)
656}
657
658func (cli *CLI) HasHTTPS() bool {
659 return cli.Secure || cli.BehindHTTPSProxy
660}
661
662func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) {
663 if cli.SegmentDebugDir == "" {
664 return
665 }
666 go func() {
667 err := os.MkdirAll(cli.SegmentDebugDir, 0755)
668 if err != nil {
669 log.Error(ctx, "failed to create debug directory", "error", err)
670 return
671 }
672 now := aqtime.FromTime(time.Now())
673 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-")))
674 fd, err := os.Create(outFile)
675 if err != nil {
676 log.Error(ctx, "failed to create debug file", "error", err)
677 return
678 }
679 defer fd.Close()
680 _, err = io.Copy(fd, r)
681 if err != nil {
682 log.Error(ctx, "failed to copy debug file", "error", err)
683 return
684 }
685 log.Log(ctx, "wrote debug file", "path", outFile)
686 }()
687}
688
689func (cli *CLI) ShouldSyndicate(did string) bool {
690 for _, d := range cli.Syndicate {
691 if d == "*" {
692 return true
693 }
694 if d == did {
695 return true
696 }
697 }
698 return false
699}