Live video on the AT Protocol
1package config
2
3import (
4 "context"
5 "crypto/rsa"
6 "crypto/x509"
7 "encoding/json"
8 "encoding/pem"
9 "errors"
10 "flag"
11 "fmt"
12 "io"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime"
17 "strconv"
18 "strings"
19 "time"
20
21 "math/rand/v2"
22
23 "github.com/lestrrat-go/jwx/v2/jwk"
24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25 "github.com/lmittmann/tint"
26 slogGorm "github.com/orandin/slog-gorm"
27 "github.com/peterbourgon/ff/v3"
28 "stream.place/streamplace/pkg/aqtime"
29 "stream.place/streamplace/pkg/constants"
30 "stream.place/streamplace/pkg/crypto/aqpub"
31 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
32 "stream.place/streamplace/pkg/log"
33)
34
35const SPDataDir = "$SP_DATA_DIR"
36const SegmentsDir = "segments"
37
38type BuildFlags struct {
39 Version string
40 BuildTime int64
41 UUID string
42}
43
44func (b BuildFlags) BuildTimeStr() string {
45 ts := time.Unix(b.BuildTime, 0)
46 return ts.UTC().Format(time.RFC3339)
47}
48
49func (b BuildFlags) BuildTimeStrExpo() string {
50 ts := time.Unix(b.BuildTime, 0)
51 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
52}
53
54type CLI struct {
55 AdminAccount string
56 Build *BuildFlags
57 DataDir string
58 DBURL string
59 LocalDBURL string
60 EthAccountAddr string
61 EthKeystorePath string
62 EthPassword string
63 FirebaseServiceAccount string
64 FirebaseServiceAccountFile string
65 GitLabURL string
66 HTTPAddr string
67 HTTPInternalAddr string
68 HTTPSAddr string
69 RTMPAddr string
70 RTMPSAddr string
71 RTMPSAddonAddr string
72 Secure bool
73 NoMist bool
74 MistAdminPort int
75 MistHTTPPort int
76 MistRTMPPort int
77 SigningKeyPath string
78 TAURL string
79 TLSCertPath string
80 TLSKeyPath string
81 PKCS11ModulePath string
82 PKCS11Pin string
83 PKCS11TokenSlot string
84 PKCS11TokenLabel string
85 PKCS11TokenSerial string
86 PKCS11KeypairLabel string
87 PKCS11KeypairID string
88 StreamerName string
89 RelayHost string
90 Debug map[string]map[string]int
91 AllowedStreams []string
92 WideOpen bool
93 Peers []string
94 Redirects []string
95 TestStream bool
96 FrontendProxy string
97 PublicOAuth bool
98 AppBundleID string
99 NoFirehose bool
100 PrintChat bool
101 Color string
102 LivepeerGatewayURL string
103 LivepeerGateway bool
104 WHIPTest string
105 Thumbnail bool
106 SmearAudio bool
107 ExternalSigning bool
108 RTMPServerAddon string
109 TracingEndpoint string
110 BroadcasterHost string
111 XXDeprecatedPublicHost string
112 ServerHost string
113 RateLimitPerSecond int
114 RateLimitBurst int
115 RateLimitWebsocket int
116 JWK jwk.Key
117 AccessJWK jwk.Key
118 dataDirFlags []*string
119 DiscordWebhooks []*discordtypes.Webhook
120 NewWebRTCPlayback bool
121 AppleTeamID string
122 AndroidCertFingerprint string
123 Labelers []string
124 AtprotoDID string
125 LivepeerHelp bool
126 PLCURL string
127 ContentFilters *ContentFilters
128 DefaultRecommendedStreamers []string
129 SQLLogging bool
130 SentryDSN string
131 LivepeerDebug bool
132 Tickets []string
133 IrohTopic string
134 DID string
135 DisableIrohRelay bool
136 DevAccountCreds map[string]string
137 StreamSessionTimeout time.Duration
138 Replicators []string
139 WebsocketURL string
140 BehindHTTPSProxy bool
141 SegmentDebugDir string
142 AdminDIDs []string
143 Syndicate []string
144 PlayerTelemetry bool
145}
146
147// ContentFilters represents the content filtering configuration
148type ContentFilters struct {
149 ContentWarnings struct {
150 Enabled bool `json:"enabled"`
151 BlockedWarnings []string `json:"blocked_warnings"`
152 } `json:"content_warnings"`
153 DistributionPolicy struct {
154 Enabled bool `json:"enabled"`
155 } `json:"distribution_policy"`
156}
157
158const (
159 ReplicatorWebsocket string = "websocket"
160 ReplicatorIroh string = "iroh"
161)
162
163func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
164 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
165 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
166 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
167 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
168 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
169 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
170 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
171 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
172 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
173 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
174 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
175 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
176 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key")
177 fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key")
178 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
179 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
180 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
181 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
182 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
183 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
184 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
185 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
186 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
187 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
188 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
189 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
190 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
191 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
192 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
193 fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development")
194 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
195 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
196 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
197 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node")
198 cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to")
199 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four")
200 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
201 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
202 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
203 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
204 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
205 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
206 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
207 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)")
208 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate")
209 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters")
210 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
211 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
212
213 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
214 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
215 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
216 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
217 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
218 fs.StringVar(&cli.RTMPSAddonAddr, "rtmps-addon-addr", ":1936", "address to listen for RTMPS on the addon server")
219 fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)")
220 fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)")
221 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
222 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
223 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
224 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
225 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to")
226 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
227 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules")
228 cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available")
229 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
230 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory")
231 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
232 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
233 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
234 fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to")
235 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
236 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
237 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
238 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth")
239 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?")
240 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)")
241 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)")
242 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS")
243 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations")
244 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)")
245 fs.BoolVar(&cli.PlayerTelemetry, "player-telemetry", true, "enable player telemetry")
246 fs.StringVar(&cli.LocalDBURL, "local-db-url", "sqlite://$SP_DATA_DIR/localdb.sqlite", "URL of the local database to use for storing local data")
247 cli.dataDirFlags = append(cli.dataDirFlags, &cli.LocalDBURL)
248
249 fs.Bool("external-signing", true, "DEPRECATED, does nothing.")
250 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
251
252 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
253 _ = starter.NewLivepeerConfig(lpFlags)
254 lpFlags.VisitAll(func(f *flag.Flag) {
255 adapted := LivepeerFlags.CamelToSnake[f.Name]
256 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
257 })
258
259 if runtime.GOOS == "linux" {
260 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
261 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
262 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
263 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
264 }
265 return fs
266}
267
268var StreamplaceSchemePrefix = "streamplace://"
269
270func (cli *CLI) OwnPublicURL() string {
271 // No errors because we know it's valid from AddrFlag
272 host, port, _ := net.SplitHostPort(cli.HTTPAddr)
273
274 ip := net.ParseIP(host)
275 if host == "" || ip.IsUnspecified() {
276 host = "127.0.0.1"
277 }
278 addr := net.JoinHostPort(host, port)
279 return fmt.Sprintf("http://%s", addr)
280}
281
282func (cli *CLI) OwnInternalURL() string {
283 // No errors because we know it's valid from AddrFlag
284 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
285
286 ip := net.ParseIP(host)
287 if ip.IsUnspecified() {
288 host = "127.0.0.1"
289 }
290 addr := net.JoinHostPort(host, port)
291 return fmt.Sprintf("http://%s", addr)
292}
293
294func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
295 bs, err := os.ReadFile(cli.SigningKeyPath)
296 if err != nil {
297 return nil, err
298 }
299 block, _ := pem.Decode(bs)
300 if block == nil {
301 return nil, fmt.Errorf("no RSA key found in signing key")
302 }
303 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
304 if err != nil {
305 return nil, err
306 }
307 return key, nil
308}
309
310func RandomTrailer(length int) string {
311 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
312
313 res := make([]byte, length)
314 for i := 0; i < length; i++ {
315 res[i] = charset[rand.IntN(len(charset))]
316 }
317 return string(res)
318}
319
320func DefaultDataDir() string {
321 home, err := os.UserHomeDir()
322 if err != nil {
323 // not fatal unless the user doesn't set one later
324 return ""
325 }
326 return filepath.Join(home, ".streamplace")
327}
328
329var GormLogger = slogGorm.New(
330 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
331 TimeFormat: time.RFC3339,
332 })),
333 slogGorm.WithTraceAll(),
334)
335
336func DisableSQLLogging() {
337 GormLogger = slogGorm.New(
338 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
339 TimeFormat: time.RFC3339,
340 })),
341 )
342}
343
344func EnableSQLLogging() {
345 GormLogger = slogGorm.New(
346 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
347 TimeFormat: time.RFC3339,
348 })),
349 slogGorm.WithTraceAll(),
350 )
351}
352
353func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
354 err := ff.Parse(
355 fs, args,
356 ff.WithEnvVarPrefix("SP"),
357 )
358 if err != nil {
359 return err
360 }
361 if cli.DataDir == "" {
362 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
363 }
364 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
365 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
366 }
367 if cli.LivepeerGateway {
368 log.MonkeypatchStderr()
369 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
370 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0")
371 if err != nil {
372 return err
373 }
374 err = fs.Set("livepeer.data-dir", gatewayPath)
375 if err != nil {
376 return err
377 }
378 err = fs.Set("livepeer.gateway", "true")
379 if err != nil {
380 return err
381 }
382 httpAddrFlag := fs.Lookup("livepeer.http-addr")
383 if httpAddrFlag == nil {
384 return fmt.Errorf("livepeer.http-addr not found")
385 }
386 httpAddr := httpAddrFlag.Value.String()
387 if httpAddr == "" {
388 httpAddr = "127.0.0.1:8935"
389 err = fs.Set("livepeer.http-addr", httpAddr)
390 if err != nil {
391 return err
392 }
393 }
394 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
395 }
396 for _, dest := range cli.dataDirFlags {
397 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
398 }
399 if !cli.SQLLogging {
400 DisableSQLLogging()
401 } else {
402 EnableSQLLogging()
403 }
404 if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" {
405 log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate")
406 cli.BroadcasterHost = cli.XXDeprecatedPublicHost
407 }
408 if cli.ServerHost == "" && cli.BroadcasterHost != "" {
409 cli.ServerHost = cli.BroadcasterHost
410 }
411 if cli.PublicOAuth {
412 log.Warn(context.Background(), "--dev-public-oauth is set, this is not recommended for production")
413 }
414 if cli.FirebaseServiceAccount != "" && cli.FirebaseServiceAccountFile != "" {
415 return fmt.Errorf("defining both firebase-service-account and firebase-service-account-file doesn't make sense. do you want a base64-encoded string or a file?")
416 }
417 if cli.FirebaseServiceAccountFile != "" {
418 bs, err := os.ReadFile(cli.FirebaseServiceAccountFile)
419 if err != nil {
420 return err
421 }
422 cli.FirebaseServiceAccount = string(bs)
423 }
424 return nil
425}
426
427func (cli *CLI) DataFilePath(fpath []string) string {
428 if cli.DataDir == "" {
429 panic("no data dir configured")
430 }
431 // windows does not like colons
432 safe := []string{}
433 for _, p := range fpath {
434 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
435 }
436 fpath = append([]string{cli.DataDir}, safe...)
437 fdpath := filepath.Join(fpath...)
438 return fdpath
439}
440
441// does a file exist in our data dir?
442func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
443 ddpath := cli.DataFilePath(fpath)
444 _, err := os.Stat(ddpath)
445 if err == nil {
446 return true, nil
447 }
448 if errors.Is(err, os.ErrNotExist) {
449 return false, nil
450 }
451 return false, err
452}
453
454// write a file to our data dir
455func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
456 fd, err := cli.DataFileCreate(fpath, overwrite)
457 if err != nil {
458 return err
459 }
460 defer fd.Close()
461 _, err = io.Copy(fd, r)
462 if err != nil {
463 return err
464 }
465
466 return nil
467}
468
469// create a file in our data dir. don't forget to close it!
470func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
471 ddpath := cli.DataFilePath(fpath)
472 if !overwrite {
473 exists, err := cli.DataFileExists(fpath)
474 if err != nil {
475 return nil, err
476 }
477 if exists {
478 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
479 }
480 }
481 if len(fpath) > 1 {
482 dirs, _ := filepath.Split(ddpath)
483 err := os.MkdirAll(dirs, os.ModePerm)
484 if err != nil {
485 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
486 }
487 }
488 return os.Create(ddpath)
489}
490
491// get a path to a segment file in our database
492func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
493 ext := filepath.Ext(file)
494 base := strings.TrimSuffix(file, ext)
495 aqt, err := aqtime.FromString(base)
496 if err != nil {
497 return "", err
498 }
499 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
500 yr, mon, day, hr, min, _, _ := aqt.Parts()
501 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
502}
503
504// get a path to a segment file in our database
505func (cli *CLI) HLSDir(user string) (string, error) {
506 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
507}
508
509// create a segment file in our database
510func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
511 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
512 yr, mon, day, hr, min, _, _ := aqt.Parts()
513 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
514}
515
516// read a file from our data dir
517func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
518 ddpath := cli.DataFilePath(fpath)
519
520 fd, err := os.Open(ddpath)
521 if err != nil {
522 return err
523 }
524 _, err = io.Copy(w, fd)
525 if err != nil {
526 return err
527 }
528
529 return nil
530}
531
532func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
533 cli.dataDirFlags = append(cli.dataDirFlags, dest)
534 *dest = filepath.Join(SPDataDir, defaultValue)
535 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
536 fs.Func(name, usage, func(s string) error {
537 *dest = s
538 return nil
539 })
540}
541
542func (cli *CLI) HasMist() bool {
543 return runtime.GOOS == "linux"
544}
545
546// type for comma-separated ethereum addresses
547func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
548 *dest = []aqpub.Pub{}
549 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
550 fs.Func(name, usage, func(s string) error {
551 if s == "" {
552 return nil
553 }
554 strs := strings.Split(s, ",")
555 for _, str := range strs {
556 pub, err := aqpub.FromHexString(str)
557 if err != nil {
558 return err
559 }
560 *dest = append(*dest, pub)
561 }
562 return nil
563 })
564}
565
566func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) {
567 *dest = defaultValue
568 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
569 fs.Func(name, usage, func(s string) error {
570 if s == "" {
571 return nil
572 }
573 strs := strings.Split(s, ",")
574 *dest = append([]string{}, strs...)
575 return nil
576 })
577}
578
579func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) {
580 *dest = map[string]string{}
581 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
582 fs.Func(name, usage, func(s string) error {
583 if s == "" {
584 return nil
585 }
586 pairs := strings.Split(s, ",")
587 for _, pair := range pairs {
588 parts := strings.Split(pair, "=")
589 if len(parts) != 2 {
590 return fmt.Errorf("invalid kv flag: %s", pair)
591 }
592 (*dest)[parts[0]] = parts[1]
593 }
594 return nil
595 })
596}
597
598func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
599 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
600 fs.Func(name, usage, func(s string) error {
601 if s == "" {
602 return nil
603 }
604 return json.Unmarshal([]byte(s), dest)
605 })
606}
607
608// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
609func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
610 *dest = map[string]map[string]int{}
611 fs.Func(name, usage, func(s string) error {
612 if s == "" {
613 return nil
614 }
615 pairs := strings.Split(s, ",")
616 for _, pair := range pairs {
617 scoreSplit := strings.Split(pair, ":")
618 if len(scoreSplit) != 2 {
619 return fmt.Errorf("invalid debug flag: %s", pair)
620 }
621 score, err := strconv.Atoi(scoreSplit[1])
622 if err != nil {
623 return fmt.Errorf("invalid debug flag: %s", pair)
624 }
625 selectorSplit := strings.Split(scoreSplit[0], "=")
626 if len(selectorSplit) != 2 {
627 return fmt.Errorf("invalid debug flag: %s", pair)
628 }
629 _, ok := (*dest)[selectorSplit[0]]
630 if !ok {
631 (*dest)[selectorSplit[0]] = map[string]int{}
632 }
633 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
634 }
635
636 return nil
637 })
638}
639
640func (cli *CLI) StreamIsAllowed(did string) error {
641 if cli.WideOpen {
642 return nil
643 }
644 // if the user set no test streams, anyone can stream
645 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
646 // but only valid atproto accounts! did:key is only allowed for our local test stream
647 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
648 if openServer && !isDIDKey {
649 return nil
650 }
651 for _, a := range cli.AllowedStreams {
652 if a == did {
653 return nil
654 }
655 }
656 return fmt.Errorf("user is not allowed to stream")
657}
658
659func (cli *CLI) MyDID() string {
660 return fmt.Sprintf("did:web:%s", cli.BroadcasterHost)
661}
662
663func (cli *CLI) HasHTTPS() bool {
664 return cli.Secure || cli.BehindHTTPSProxy
665}
666
667func (cli *CLI) DumpDebugSegment(ctx context.Context, name string, r io.Reader) {
668 if cli.SegmentDebugDir == "" {
669 return
670 }
671 go func() {
672 err := os.MkdirAll(cli.SegmentDebugDir, 0755)
673 if err != nil {
674 log.Error(ctx, "failed to create debug directory", "error", err)
675 return
676 }
677 now := aqtime.FromTime(time.Now())
678 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-%s", now.FileSafeString(), strings.ReplaceAll(name, ":", "-")))
679 fd, err := os.Create(outFile)
680 if err != nil {
681 log.Error(ctx, "failed to create debug file", "error", err)
682 return
683 }
684 defer fd.Close()
685 _, err = io.Copy(fd, r)
686 if err != nil {
687 log.Error(ctx, "failed to copy debug file", "error", err)
688 return
689 }
690 log.Log(ctx, "wrote debug file", "path", outFile)
691 }()
692}
693
694func (cli *CLI) ShouldSyndicate(did string) bool {
695 for _, d := range cli.Syndicate {
696 if d == "*" {
697 return true
698 }
699 if d == did {
700 return true
701 }
702 }
703 return false
704}