Live video on the AT Protocol
1package config
2
3import (
4 "context"
5 "crypto/rsa"
6 "crypto/x509"
7 "encoding/json"
8 "encoding/pem"
9 "errors"
10 "flag"
11 "fmt"
12 "io"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime"
17 "strconv"
18 "strings"
19 "time"
20
21 "math/rand/v2"
22
23 "github.com/lestrrat-go/jwx/v2/jwk"
24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25 "github.com/lmittmann/tint"
26 slogGorm "github.com/orandin/slog-gorm"
27 "github.com/peterbourgon/ff/v3"
28 "stream.place/streamplace/pkg/aqtime"
29 "stream.place/streamplace/pkg/constants"
30 "stream.place/streamplace/pkg/crypto/aqpub"
31 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
32 "stream.place/streamplace/pkg/log"
33)
34
35const SPDataDir = "$SP_DATA_DIR"
36const SegmentsDir = "segments"
37
38type BuildFlags struct {
39 Version string
40 BuildTime int64
41 UUID string
42}
43
44func (b BuildFlags) BuildTimeStr() string {
45 ts := time.Unix(b.BuildTime, 0)
46 return ts.UTC().Format(time.RFC3339)
47}
48
49func (b BuildFlags) BuildTimeStrExpo() string {
50 ts := time.Unix(b.BuildTime, 0)
51 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
52}
53
54type CLI struct {
55 AdminAccount string
56 Build *BuildFlags
57 DataDir string
58 DBURL string
59 EthAccountAddr string
60 EthKeystorePath string
61 EthPassword string
62 FirebaseServiceAccount string
63 FirebaseServiceAccountFile string
64 GitLabURL string
65 HTTPAddr string
66 HTTPInternalAddr string
67 HTTPSAddr string
68 RTMPAddr string
69 RTMPSAddr string
70 Secure bool
71 NoMist bool
72 MistAdminPort int
73 MistHTTPPort int
74 MistRTMPPort int
75 SigningKeyPath string
76 TAURL string
77 TLSCertPath string
78 TLSKeyPath string
79 PKCS11ModulePath string
80 PKCS11Pin string
81 PKCS11TokenSlot string
82 PKCS11TokenLabel string
83 PKCS11TokenSerial string
84 PKCS11KeypairLabel string
85 PKCS11KeypairID string
86 StreamerName string
87 RelayHost string
88 Debug map[string]map[string]int
89 AllowedStreams []string
90 WideOpen bool
91 Peers []string
92 Redirects []string
93 TestStream bool
94 FrontendProxy string
95 PublicOAuth bool
96 AppBundleID string
97 NoFirehose bool
98 PrintChat bool
99 Color string
100 LivepeerGatewayURL string
101 LivepeerGateway bool
102 WHIPTest string
103 Thumbnail bool
104 SmearAudio bool
105 ExternalSigning bool
106 RTMPServerAddon string
107 TracingEndpoint string
108 BroadcasterHost string
109 XXDeprecatedPublicHost string
110 ServerHost string
111 RateLimitPerSecond int
112 RateLimitBurst int
113 RateLimitWebsocket int
114 JWK jwk.Key
115 AccessJWK jwk.Key
116 dataDirFlags []*string
117 DiscordWebhooks []*discordtypes.Webhook
118 NewWebRTCPlayback bool
119 AppleTeamID string
120 AndroidCertFingerprint string
121 Labelers []string
122 AtprotoDID string
123 LivepeerHelp bool
124 PLCURL string
125 ContentFilters *ContentFilters
126 DefaultRecommendedStreamers []string
127 SQLLogging bool
128 SentryDSN string
129 LivepeerDebug bool
130 Tickets []string
131 IrohTopic string
132 DID string
133 DisableIrohRelay bool
134 DevAccountCreds map[string]string
135 StreamSessionTimeout time.Duration
136 Replicators []string
137 WebsocketURL string
138 BehindHTTPSProxy bool
139 SegmentDebugDir string
140 Syndicate []string
141}
142
143// ContentFilters represents the content filtering configuration
144type ContentFilters struct {
145 ContentWarnings struct {
146 Enabled bool `json:"enabled"`
147 BlockedWarnings []string `json:"blocked_warnings"`
148 } `json:"content_warnings"`
149 DistributionPolicy struct {
150 Enabled bool `json:"enabled"`
151 } `json:"distribution_policy"`
152}
153
154const (
155 ReplicatorWebsocket string = "websocket"
156 ReplicatorIroh string = "iroh"
157)
158
159func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
160 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
161 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
162 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
163 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
164 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
165 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
166 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
167 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
168 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
169 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
170 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
171 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
172 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key")
173 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key")
174 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
175 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
176 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
177 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
178 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
179 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
180 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
181 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
182 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
183 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
184 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
185 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
186 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
187 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
188 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
189 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development")
190 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
191 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
192 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
193 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node")
194 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to")
195 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four")
196 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
197 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
198 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
199 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
200 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
201 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
202 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
203 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)")
204 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate")
205 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters")
206 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
207 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
208
209 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
210 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
211 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
212 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
213 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
214 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)")
215 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)")
216 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
217 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
218 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
219 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
220 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to")
221 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
222 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules")
223 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available")
224 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
225 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory")
226 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
227 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
228 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
229 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to")
230 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
231 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
232 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
233 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth")
234 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?")
235 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)")
236 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)")
237 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS")
238 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)")
239
240 fs.Bool("external-signing", true, "DEPRECATED, does nothing.")
241 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
242
243 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
244 _ = starter.NewLivepeerConfig(lpFlags)
245 lpFlags.VisitAll(func(f *flag.Flag) {
246 adapted := LivepeerFlags.CamelToSnake[f.Name]
247 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
248 })
249
250 if runtime.GOOS == "linux" {
251 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
252 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
253 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
254 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
255 }
256 return fs
257}
258
259var StreamplaceSchemePrefix = "streamplace://"
260
261func (cli *CLI) OwnPublicURL() string {
262 // No errors because we know it's valid from AddrFlag
263 host, port, _ := net.SplitHostPort(cli.HTTPAddr)
264
265 ip := net.ParseIP(host)
266 if host == "" || ip.IsUnspecified() {
267 host = "127.0.0.1"
268 }
269 addr := net.JoinHostPort(host, port)
270 return fmt.Sprintf("http://%s", addr)
271}
272
273func (cli *CLI) OwnInternalURL() string {
274 // No errors because we know it's valid from AddrFlag
275 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
276
277 ip := net.ParseIP(host)
278 if ip.IsUnspecified() {
279 host = "127.0.0.1"
280 }
281 addr := net.JoinHostPort(host, port)
282 return fmt.Sprintf("http://%s", addr)
283}
284
285func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
286 bs, err := os.ReadFile(cli.SigningKeyPath)
287 if err != nil {
288 return nil, err
289 }
290 block, _ := pem.Decode(bs)
291 if block == nil {
292 return nil, fmt.Errorf("no RSA key found in signing key")
293 }
294 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
295 if err != nil {
296 return nil, err
297 }
298 return key, nil
299}
300
301func RandomTrailer(length int) string {
302 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
303
304 res := make([]byte, length)
305 for i := 0; i < length; i++ {
306 res[i] = charset[rand.IntN(len(charset))]
307 }
308 return string(res)
309}
310
311func DefaultDataDir() string {
312 home, err := os.UserHomeDir()
313 if err != nil {
314 // not fatal unless the user doesn't set one later
315 return ""
316 }
317 return filepath.Join(home, ".streamplace")
318}
319
320var GormLogger = slogGorm.New(
321 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
322 TimeFormat: time.RFC3339,
323 })),
324 slogGorm.WithTraceAll(),
325)
326
327func DisableSQLLogging() {
328 GormLogger = slogGorm.New(
329 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
330 TimeFormat: time.RFC3339,
331 })),
332 )
333}
334
335func EnableSQLLogging() {
336 GormLogger = slogGorm.New(
337 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
338 TimeFormat: time.RFC3339,
339 })),
340 slogGorm.WithTraceAll(),
341 )
342}
343
344func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
345 err := ff.Parse(
346 fs, args,
347 ff.WithEnvVarPrefix("SP"),
348 )
349 if err != nil {
350 return err
351 }
352 if cli.DataDir == "" {
353 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
354 }
355 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
356 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
357 }
358 if cli.LivepeerGateway {
359 log.MonkeypatchStderr()
360 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
361 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0")
362 if err != nil {
363 return err
364 }
365 err = fs.Set("livepeer.data-dir", gatewayPath)
366 if err != nil {
367 return err
368 }
369 err = fs.Set("livepeer.gateway", "true")
370 if err != nil {
371 return err
372 }
373 httpAddrFlag := fs.Lookup("livepeer.http-addr")
374 if httpAddrFlag == nil {
375 return fmt.Errorf("livepeer.http-addr not found")
376 }
377 httpAddr := httpAddrFlag.Value.String()
378 if httpAddr == "" {
379 httpAddr = "127.0.0.1:8935"
380 err = fs.Set("livepeer.http-addr", httpAddr)
381 if err != nil {
382 return err
383 }
384 }
385 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
386 }
387 for _, dest := range cli.dataDirFlags {
388 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
389 }
390 if !cli.SQLLogging {
391 DisableSQLLogging()
392 } else {
393 EnableSQLLogging()
394 }
395 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" {
396 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate")
397 cli.BroadcasterHost = cli.XXDeprecatedPublicHost
398 }
399 if cli.ServerHost == "" && cli.BroadcasterHost != "" {
400 cli.ServerHost = cli.BroadcasterHost
401 }
402 if cli.PublicOAuth {
403 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production")
404 }
405 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" {
406 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?")
407 }
408 if cli.FirebaseServiceAccountFile != "" {
409 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile)
410 if err != nil {
411 return err
412 }
413 cli.FirebaseServiceAccount = string(bs)
414 }
415 return nil
416}
417
418func (cli *CLI) DataFilePath(fpath []string) string {
419 if cli.DataDir == "" {
420 panic("no data dir configured")
421 }
422 // windows does not like colons
423 safe := []string{}
424 for _, p := range fpath {
425 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
426 }
427 fpath = append([]string{cli.DataDir}, safe...)
428 fdpath := filepath.Join(fpath...)
429 return fdpath
430}
431
432// does a file exist in our data dir?
433func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
434 ddpath := cli.DataFilePath(fpath)
435 _, err := os.Stat(ddpath)
436 if err == nil {
437 return true, nil
438 }
439 if errors.Is(err, os.ErrNotExist) {
440 return false, nil
441 }
442 return false, err
443}
444
445// write a file to our data dir
446func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
447 fd, err := cli.DataFileCreate(fpath, overwrite)
448 if err != nil {
449 return err
450 }
451 defer fd.Close()
452 _, err = io.Copy(fd, r)
453 if err != nil {
454 return err
455 }
456
457 return nil
458}
459
460// create a file in our data dir. don't forget to close it!
461func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
462 ddpath := cli.DataFilePath(fpath)
463 if !overwrite {
464 exists, err := cli.DataFileExists(fpath)
465 if err != nil {
466 return nil, err
467 }
468 if exists {
469 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
470 }
471 }
472 if len(fpath) > 1 {
473 dirs, _ := filepath.Split(ddpath)
474 err := os.MkdirAll(dirs, os.ModePerm)
475 if err != nil {
476 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
477 }
478 }
479 return os.Create(ddpath)
480}
481
482// get a path to a segment file in our database
483func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
484 ext := filepath.Ext(file)
485 base := strings.TrimSuffix(file, ext)
486 aqt, err := aqtime.FromString(base)
487 if err != nil {
488 return "", err
489 }
490 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
491 yr, mon, day, hr, min, _, _ := aqt.Parts()
492 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
493}
494
495// get a path to a segment file in our database
496func (cli *CLI) HLSDir(user string) (string, error) {
497 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
498}
499
500// create a segment file in our database
501func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
502 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
503 yr, mon, day, hr, min, _, _ := aqt.Parts()
504 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
505}
506
507// read a file from our data dir
508func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
509 ddpath := cli.DataFilePath(fpath)
510
511 fd, err := os.Open(ddpath)
512 if err != nil {
513 return err
514 }
515 _, err = io.Copy(w, fd)
516 if err != nil {
517 return err
518 }
519
520 return nil
521}
522
523func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
524 cli.dataDirFlags = append(cli.dataDirFlags, dest)
525 *dest = filepath.Join(SPDataDir, defaultValue)
526 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
527 fs.Func(name, usage, func(s string) error {
528 *dest = s
529 return nil
530 })
531}
532
533func (cli *CLI) HasMist() bool {
534 return runtime.GOOS == "linux"
535}
536
537// type for comma-separated ethereum addresses
538func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
539 *dest = []aqpub.Pub{}
540 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
541 fs.Func(name, usage, func(s string) error {
542 if s == "" {
543 return nil
544 }
545 strs := strings.Split(s, ",")
546 for _, str := range strs {
547 pub, err := aqpub.FromHexString(str)
548 if err != nil {
549 return err
550 }
551 *dest = append(*dest, pub)
552 }
553 return nil
554 })
555}
556
557func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) {
558 *dest = defaultValue
559 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
560 fs.Func(name, usage, func(s string) error {
561 if s == "" {
562 return nil
563 }
564 strs := strings.Split(s, ",")
565 *dest = append([]string{}, strs...)
566 return nil
567 })
568}
569
570func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) {
571 *dest = map[string]string{}
572 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
573 fs.Func(name, usage, func(s string) error {
574 if s == "" {
575 return nil
576 }
577 pairs := strings.Split(s, ",")
578 for _, pair := range pairs {
579 parts := strings.Split(pair, "=")
580 if len(parts) != 2 {
581 return fmt.Errorf("invalid kv flag: %s", pair)
582 }
583 (*dest)[parts[0]] = parts[1]
584 }
585 return nil
586 })
587}
588
589func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
590 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
591 fs.Func(name, usage, func(s string) error {
592 if s == "" {
593 return nil
594 }
595 return json.Unmarshal([]byte(s), dest)
596 })
597}
598
599// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
600func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
601 *dest = map[string]map[string]int{}
602 fs.Func(name, usage, func(s string) error {
603 if s == "" {
604 return nil
605 }
606 pairs := strings.Split(s, ",")
607 for _, pair := range pairs {
608 scoreSplit := strings.Split(pair, ":")
609 if len(scoreSplit) != 2 {
610 return fmt.Errorf("invalid debug flag: %s", pair)
611 }
612 score, err := strconv.Atoi(scoreSplit[1])
613 if err != nil {
614 return fmt.Errorf("invalid debug flag: %s", pair)
615 }
616 selectorSplit := strings.Split(scoreSplit[0], "=")
617 if len(selectorSplit) != 2 {
618 return fmt.Errorf("invalid debug flag: %s", pair)
619 }
620 _, ok := (*dest)[selectorSplit[0]]
621 if !ok {
622 (*dest)[selectorSplit[0]] = map[string]int{}
623 }
624 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
625 }
626
627 return nil
628 })
629}
630
631func (cli *CLI) StreamIsAllowed(did string) error {
632 if cli.WideOpen {
633 return nil
634 }
635 // if the user set no test streams, anyone can stream
636 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
637 // but only valid atproto accounts! did:key is only allowed for our local test stream
638 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
639 if openServer && !isDIDKey {
640 return nil
641 }
642 for _, a := range cli.AllowedStreams {
643 if a == did {
644 return nil
645 }
646 }
647 return fmt.Errorf("user is not allowed to stream")
648}
649
650func (cli *CLI) MyDID() string {
651 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost)
652}
653
654func (cli *CLI) HasHTTPS() bool {
655 return cli.Secure || cli.BehindHTTPSProxy
656}
657
658func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) {
659 if cli.SegmentDebugDir == "" {
660 return
661 }
662 go func() {
663 err := os.MkdirAll(cli.SegmentDebugDir, 0755)
664 if err != nil {
665 log.Error(ctx, "failed to create debug directory", "error", err)
666 return
667 }
668 now := aqtime.FromTime(time.Now())
669 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-")))
670 fd, err := os.Create(outFile)
671 if err != nil {
672 log.Error(ctx, "failed to create debug file", "error", err)
673 return
674 }
675 defer fd.Close()
676 _, err = io.Copy(fd, r)
677 if err != nil {
678 log.Error(ctx, "failed to copy debug file", "error", err)
679 return
680 }
681 log.Log(ctx, "wrote debug file", "path", outFile)
682 }()
683}
684
685func (cli *CLI) ShouldSyndicate(did string) bool {
686 for _, d := range cli.Syndicate {
687 if d == "*" {
688 return true
689 }
690 if d == did {
691 return true
692 }
693 }
694 return false
695}