Live video on the AT Protocol
1package config
2
3import (
4 "crypto/rsa"
5 "crypto/x509"
6 "encoding/json"
7 "encoding/pem"
8 "errors"
9 "flag"
10 "fmt"
11 "io"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime"
16 "strconv"
17 "strings"
18 "time"
19
20 "math/rand/v2"
21
22 "github.com/lestrrat-go/jwx/v2/jwk"
23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
24 "github.com/lmittmann/tint"
25 slogGorm "github.com/orandin/slog-gorm"
26 "github.com/peterbourgon/ff/v3"
27 "stream.place/streamplace/pkg/aqtime"
28 "stream.place/streamplace/pkg/constants"
29 "stream.place/streamplace/pkg/crypto/aqpub"
30 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
31)
32
33const SPDataDir = "$SP_DATA_DIR"
34const SegmentsDir = "segments"
35
36type BuildFlags struct {
37 Version string
38 BuildTime int64
39 UUID string
40}
41
42func (b BuildFlags) BuildTimeStr() string {
43 ts := time.Unix(b.BuildTime, 0)
44 return ts.UTC().Format(time.RFC3339)
45}
46
47func (b BuildFlags) BuildTimeStrExpo() string {
48 ts := time.Unix(b.BuildTime, 0)
49 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
50}
51
52type CLI struct {
53 AdminAccount string
54 Build *BuildFlags
55 DataDir string
56 DBURL string
57 EthAccountAddr string
58 EthKeystorePath string
59 EthPassword string
60 FirebaseServiceAccount string
61 GitLabURL string
62 HTTPAddr string
63 HTTPInternalAddr string
64 HTTPSAddr string
65 RtmpsAddr string
66 Secure bool
67 NoMist bool
68 MistAdminPort int
69 MistHTTPPort int
70 MistRTMPPort int
71 SigningKeyPath string
72 TAURL string
73 TLSCertPath string
74 TLSKeyPath string
75 PKCS11ModulePath string
76 PKCS11Pin string
77 PKCS11TokenSlot string
78 PKCS11TokenLabel string
79 PKCS11TokenSerial string
80 PKCS11KeypairLabel string
81 PKCS11KeypairID string
82 StreamerName string
83 RelayHost string
84 Debug map[string]map[string]int
85 AllowedStreams []string
86 WideOpen bool
87 Peers []string
88 Redirects []string
89 TestStream bool
90 FrontendProxy string
91 AppBundleID string
92 NoFirehose bool
93 PrintChat bool
94 Color string
95 LivepeerGatewayURL string
96 LivepeerGateway bool
97 WHIPTest string
98 Thumbnail bool
99 SmearAudio bool
100 ExternalSigning bool
101 RTMPServerAddon string
102 TracingEndpoint string
103 PublicHost string
104 RateLimitPerSecond int
105 RateLimitBurst int
106 RateLimitWebsocket int
107 JWK jwk.Key
108 AccessJWK jwk.Key
109 dataDirFlags []*string
110 DiscordWebhooks []*discordtypes.Webhook
111 NewWebRTCPlayback bool
112 AppleTeamID string
113 AndroidCertFingerprint string
114 Labelers []string
115 AtprotoDID string
116 LivepeerHelp bool
117 PLCURL string
118 SQLLogging bool
119 SentryDSN string
120 LivepeerDebug bool
121}
122
123func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
124 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
125 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
126 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
127 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
128 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
129 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
130 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
131 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
132 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
133 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
134 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
135 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
136 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
137 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
138 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
139 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
140 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
141 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
142 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
143 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
144 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
145 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
146 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
147 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
148 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
149 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
150 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
151 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
152 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
153 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
154 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
155 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
156 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
157 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
158 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
159 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
160 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
161 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
162 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
163 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
164 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
165 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
166 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
167 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
168 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
169 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)")
170 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
171 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
172 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
173 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
174 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
175 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
176 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
177 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
178 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
179 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
180 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to")
181 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
182 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
183 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory")
184 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
185 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
186 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
187
188 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
189 _ = starter.NewLivepeerConfig(lpFlags)
190 lpFlags.VisitAll(func(f *flag.Flag) {
191 adapted := LivepeerFlags.CamelToSnake[f.Name]
192 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
193 })
194
195 if runtime.GOOS == "linux" {
196 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
197 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
198 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
199 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
200 }
201 return fs
202}
203
204var StreamplaceSchemePrefix = "streamplace://"
205
206func (cli *CLI) OwnInternalURL() string {
207 // No errors because we know it's valid from AddrFlag
208 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
209
210 ip := net.ParseIP(host)
211 if ip.IsUnspecified() {
212 host = "127.0.0.1"
213 }
214 addr := net.JoinHostPort(host, port)
215 return fmt.Sprintf("http://%s", addr)
216}
217
218func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
219 bs, err := os.ReadFile(cli.SigningKeyPath)
220 if err != nil {
221 return nil, err
222 }
223 block, _ := pem.Decode(bs)
224 if block == nil {
225 return nil, fmt.Errorf("no RSA key found in signing key")
226 }
227 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
228 if err != nil {
229 return nil, err
230 }
231 return key, nil
232}
233
234func RandomTrailer(length int) string {
235 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
236
237 res := make([]byte, length)
238 for i := 0; i < length; i++ {
239 res[i] = charset[rand.IntN(len(charset))]
240 }
241 return string(res)
242}
243
244func DefaultDataDir() string {
245 home, err := os.UserHomeDir()
246 if err != nil {
247 // not fatal unless the user doesn't set one later
248 return ""
249 }
250 return filepath.Join(home, ".streamplace")
251}
252
253var GormLogger = slogGorm.New(
254 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
255 TimeFormat: time.RFC3339,
256 })),
257 // slogGorm.WithTraceAll(),
258)
259
260func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
261 err := ff.Parse(
262 fs, args,
263 ff.WithEnvVarPrefix("SP"),
264 )
265 if err != nil {
266 return err
267 }
268 if cli.DataDir == "" {
269 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
270 }
271 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
272 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
273 }
274 if cli.LivepeerGateway {
275 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
276 err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0")
277 if err != nil {
278 return err
279 }
280 err = fs.Set("livepeer.data-dir", gatewayPath)
281 if err != nil {
282 return err
283 }
284 err = fs.Set("livepeer.gateway", "true")
285 if err != nil {
286 return err
287 }
288 httpAddrFlag := fs.Lookup("livepeer.http-addr")
289 if httpAddrFlag == nil {
290 return fmt.Errorf("livepeer.http-addr not found")
291 }
292 httpAddr := httpAddrFlag.Value.String()
293 if httpAddr == "" {
294 httpAddr = "127.0.0.1:8935"
295 err = fs.Set("livepeer.http-addr", httpAddr)
296 if err != nil {
297 return err
298 }
299 }
300 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
301 }
302 for _, dest := range cli.dataDirFlags {
303 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
304 }
305 if !cli.SQLLogging {
306 GormLogger = slogGorm.New(
307 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
308 TimeFormat: time.RFC3339,
309 })),
310 )
311 }
312 return nil
313}
314
315func (cli *CLI) DataFilePath(fpath []string) string {
316 if cli.DataDir == "" {
317 panic("no data dir configured")
318 }
319 // windows does not like colons
320 safe := []string{}
321 for _, p := range fpath {
322 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
323 }
324 fpath = append([]string{cli.DataDir}, safe...)
325 fdpath := filepath.Join(fpath...)
326 return fdpath
327}
328
329// does a file exist in our data dir?
330func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
331 ddpath := cli.DataFilePath(fpath)
332 _, err := os.Stat(ddpath)
333 if err == nil {
334 return true, nil
335 }
336 if errors.Is(err, os.ErrNotExist) {
337 return false, nil
338 }
339 return false, err
340}
341
342// write a file to our data dir
343func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
344 fd, err := cli.DataFileCreate(fpath, overwrite)
345 if err != nil {
346 return err
347 }
348 defer fd.Close()
349 _, err = io.Copy(fd, r)
350 if err != nil {
351 return err
352 }
353
354 return nil
355}
356
357// create a file in our data dir. don't forget to close it!
358func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
359 ddpath := cli.DataFilePath(fpath)
360 if !overwrite {
361 exists, err := cli.DataFileExists(fpath)
362 if err != nil {
363 return nil, err
364 }
365 if exists {
366 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
367 }
368 }
369 if len(fpath) > 1 {
370 dirs, _ := filepath.Split(ddpath)
371 err := os.MkdirAll(dirs, os.ModePerm)
372 if err != nil {
373 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
374 }
375 }
376 return os.Create(ddpath)
377}
378
379// get a path to a segment file in our database
380func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
381 ext := filepath.Ext(file)
382 base := strings.TrimSuffix(file, ext)
383 aqt, err := aqtime.FromString(base)
384 if err != nil {
385 return "", err
386 }
387 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
388 yr, mon, day, hr, min, _, _ := aqt.Parts()
389 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
390}
391
392// get a path to a segment file in our database
393func (cli *CLI) HLSDir(user string) (string, error) {
394 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
395}
396
397// create a segment file in our database
398func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
399 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
400 yr, mon, day, hr, min, _, _ := aqt.Parts()
401 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
402}
403
404// read a file from our data dir
405func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
406 ddpath := cli.DataFilePath(fpath)
407
408 fd, err := os.Open(ddpath)
409 if err != nil {
410 return err
411 }
412 _, err = io.Copy(w, fd)
413 if err != nil {
414 return err
415 }
416
417 return nil
418}
419
420func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
421 cli.dataDirFlags = append(cli.dataDirFlags, dest)
422 *dest = filepath.Join(SPDataDir, defaultValue)
423 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
424 fs.Func(name, usage, func(s string) error {
425 *dest = s
426 return nil
427 })
428}
429
430func (cli *CLI) HasMist() bool {
431 return runtime.GOOS == "linux"
432}
433
434// type for comma-separated ethereum addresses
435func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
436 *dest = []aqpub.Pub{}
437 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
438 fs.Func(name, usage, func(s string) error {
439 if s == "" {
440 return nil
441 }
442 strs := strings.Split(s, ",")
443 for _, str := range strs {
444 pub, err := aqpub.FromHexString(str)
445 if err != nil {
446 return err
447 }
448 *dest = append(*dest, pub)
449 }
450 return nil
451 })
452}
453
454func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) {
455 *dest = []string{}
456 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
457 fs.Func(name, usage, func(s string) error {
458 if s == "" {
459 return nil
460 }
461 strs := strings.Split(s, ",")
462 *dest = append(*dest, strs...)
463 return nil
464 })
465}
466
467func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
468 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
469 fs.Func(name, usage, func(s string) error {
470 if s == "" {
471 return nil
472 }
473 return json.Unmarshal([]byte(s), dest)
474 })
475}
476
477// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
478func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
479 *dest = map[string]map[string]int{}
480 fs.Func(name, usage, func(s string) error {
481 if s == "" {
482 return nil
483 }
484 pairs := strings.Split(s, ",")
485 for _, pair := range pairs {
486 scoreSplit := strings.Split(pair, ":")
487 if len(scoreSplit) != 2 {
488 return fmt.Errorf("invalid debug flag: %s", pair)
489 }
490 score, err := strconv.Atoi(scoreSplit[1])
491 if err != nil {
492 return fmt.Errorf("invalid debug flag: %s", pair)
493 }
494 selectorSplit := strings.Split(scoreSplit[0], "=")
495 if len(selectorSplit) != 2 {
496 return fmt.Errorf("invalid debug flag: %s", pair)
497 }
498 _, ok := (*dest)[selectorSplit[0]]
499 if !ok {
500 (*dest)[selectorSplit[0]] = map[string]int{}
501 }
502 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
503 }
504
505 return nil
506 })
507}
508
509func (cli *CLI) StreamIsAllowed(did string) error {
510 if cli.WideOpen {
511 return nil
512 }
513 // if the user set no test streams, anyone can stream
514 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
515 // but only valid atproto accounts! did:key is only allowed for our local test stream
516 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
517 if openServer && !isDIDKey {
518 return nil
519 }
520 for _, a := range cli.AllowedStreams {
521 if a == did {
522 return nil
523 }
524 }
525 return fmt.Errorf("user is not allowed to stream")
526}
527
528func (cli *CLI) MyDID() string {
529 return fmt.Sprintf("did:web:%s", cli.PublicHost)
530}