Live video on the AT Protocol
1package config
2
3import (
4 "crypto/rsa"
5 "crypto/x509"
6 "encoding/json"
7 "encoding/pem"
8 "errors"
9 "flag"
10 "fmt"
11 "io"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime"
16 "strconv"
17 "strings"
18 "time"
19
20 "math/rand/v2"
21
22 "github.com/lestrrat-go/jwx/v2/jwk"
23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
24 "github.com/peterbourgon/ff/v3"
25 "stream.place/streamplace/pkg/aqtime"
26 "stream.place/streamplace/pkg/constants"
27 "stream.place/streamplace/pkg/crypto/aqpub"
28 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
29)
30
31const SPDataDir = "$SP_DATA_DIR"
32const SegmentsDir = "segments"
33
34type BuildFlags struct {
35 Version string
36 BuildTime int64
37 UUID string
38}
39
40func (b BuildFlags) BuildTimeStr() string {
41 ts := time.Unix(b.BuildTime, 0)
42 return ts.UTC().Format(time.RFC3339)
43}
44
45func (b BuildFlags) BuildTimeStrExpo() string {
46 ts := time.Unix(b.BuildTime, 0)
47 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
48}
49
50type CLI struct {
51 AdminAccount string
52 Build *BuildFlags
53 DataDir string
54 DBPath string
55 EthAccountAddr string
56 EthKeystorePath string
57 EthPassword string
58 FirebaseServiceAccount string
59 GitLabURL string
60 HTTPAddr string
61 HTTPInternalAddr string
62 HTTPSAddr string
63 RtmpsAddr string
64 Secure bool
65 NoMist bool
66 MistAdminPort int
67 MistHTTPPort int
68 MistRTMPPort int
69 SigningKeyPath string
70 TAURL string
71 TLSCertPath string
72 TLSKeyPath string
73 PKCS11ModulePath string
74 PKCS11Pin string
75 PKCS11TokenSlot string
76 PKCS11TokenLabel string
77 PKCS11TokenSerial string
78 PKCS11KeypairLabel string
79 PKCS11KeypairID string
80 StreamerName string
81 RelayHost string
82 Debug map[string]map[string]int
83 AllowedStreams []string
84 WideOpen bool
85 Peers []string
86 Redirects []string
87 TestStream bool
88 FrontendProxy string
89 AppBundleID string
90 NoFirehose bool
91 PrintChat bool
92 Color string
93 LivepeerGatewayURL string
94 LivepeerGateway bool
95 WHIPTest string
96 Thumbnail bool
97 SmearAudio bool
98 ExternalSigning bool
99 RTMPServerAddon string
100 TracingEndpoint string
101 PublicHost string
102 RateLimitPerSecond int
103 RateLimitBurst int
104 RateLimitWebsocket int
105 JWK jwk.Key
106 AccessJWK jwk.Key
107 dataDirFlags []*string
108 DiscordWebhooks []*discordtypes.Webhook
109 NewWebRTCPlayback bool
110 AppleTeamID string
111 AndroidCertFingerprint string
112 Labelers []string
113 AtprotoDID string
114 LivepeerHelp bool
115}
116
117func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
118 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
119 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
120 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
121 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
122 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
123 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
124 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
125 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
126 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
127 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file")
128 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
129 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
130 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
131 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
132 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
133 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
134 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
135 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
136 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
137 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
138 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
139 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
140 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
141 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
142 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
143 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
144 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
145 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
146 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
147 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
148 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
149 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
150 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
151 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
152 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
153 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
154 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
155 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
156 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
157 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
158 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
159 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
160 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
161 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
162 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)")
163 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
164 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
165 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
166 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
167 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
168 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
169 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
170 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
171 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
172 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
173 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to")
174 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
175 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
176
177 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
178 _ = starter.NewLivepeerConfig(lpFlags)
179 lpFlags.VisitAll(func(f *flag.Flag) {
180 adapted := LivepeerFlags.CamelToSnake[f.Name]
181 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
182 })
183
184 if runtime.GOOS == "linux" {
185 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
186 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
187 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
188 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
189 }
190 return fs
191}
192
193var StreamplaceSchemePrefix = "streamplace://"
194
195func (cli *CLI) OwnInternalURL() string {
196 // No errors because we know it's valid from AddrFlag
197 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
198
199 ip := net.ParseIP(host)
200 if ip.IsUnspecified() {
201 host = "127.0.0.1"
202 }
203 addr := net.JoinHostPort(host, port)
204 return fmt.Sprintf("http://%s", addr)
205}
206
207func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
208 bs, err := os.ReadFile(cli.SigningKeyPath)
209 if err != nil {
210 return nil, err
211 }
212 block, _ := pem.Decode(bs)
213 if block == nil {
214 return nil, fmt.Errorf("no RSA key found in signing key")
215 }
216 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
217 if err != nil {
218 return nil, err
219 }
220 return key, nil
221}
222
223func RandomTrailer(length int) string {
224 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
225
226 res := make([]byte, length)
227 for i := 0; i < length; i++ {
228 res[i] = charset[rand.IntN(len(charset))]
229 }
230 return string(res)
231}
232
233func DefaultDataDir() string {
234 home, err := os.UserHomeDir()
235 if err != nil {
236 // not fatal unless the user doesn't set one later
237 return ""
238 }
239 return filepath.Join(home, ".streamplace")
240}
241
242func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
243 err := ff.Parse(
244 fs, args,
245 ff.WithEnvVarPrefix("SP"),
246 )
247 if err != nil {
248 return err
249 }
250 if cli.DataDir == "" {
251 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
252 }
253 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
254 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
255 }
256 if cli.LivepeerGateway {
257 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
258 err = fs.Set("livepeer.data-dir", gatewayPath)
259 if err != nil {
260 return err
261 }
262 err = fs.Set("livepeer.gateway", "true")
263 if err != nil {
264 return err
265 }
266 httpAddrFlag := fs.Lookup("livepeer.http-addr")
267 if httpAddrFlag == nil {
268 return fmt.Errorf("livepeer.http-addr not found")
269 }
270 httpAddr := httpAddrFlag.Value.String()
271 if httpAddr == "" {
272 httpAddr = "127.0.0.1:8935"
273 err = fs.Set("livepeer.http-addr", httpAddr)
274 if err != nil {
275 return err
276 }
277 }
278 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
279 }
280 for _, dest := range cli.dataDirFlags {
281 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
282 }
283 return nil
284}
285
286func (cli *CLI) DataFilePath(fpath []string) string {
287 if cli.DataDir == "" {
288 panic("no data dir configured")
289 }
290 // windows does not like colons
291 safe := []string{}
292 for _, p := range fpath {
293 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
294 }
295 fpath = append([]string{cli.DataDir}, safe...)
296 fdpath := filepath.Join(fpath...)
297 return fdpath
298}
299
300// does a file exist in our data dir?
301func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
302 ddpath := cli.DataFilePath(fpath)
303 _, err := os.Stat(ddpath)
304 if err == nil {
305 return true, nil
306 }
307 if errors.Is(err, os.ErrNotExist) {
308 return false, nil
309 }
310 return false, err
311}
312
313// write a file to our data dir
314func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
315 fd, err := cli.DataFileCreate(fpath, overwrite)
316 if err != nil {
317 return err
318 }
319 defer fd.Close()
320 _, err = io.Copy(fd, r)
321 if err != nil {
322 return err
323 }
324
325 return nil
326}
327
328// create a file in our data dir. don't forget to close it!
329func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
330 ddpath := cli.DataFilePath(fpath)
331 if !overwrite {
332 exists, err := cli.DataFileExists(fpath)
333 if err != nil {
334 return nil, err
335 }
336 if exists {
337 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
338 }
339 }
340 if len(fpath) > 1 {
341 dirs, _ := filepath.Split(ddpath)
342 err := os.MkdirAll(dirs, os.ModePerm)
343 if err != nil {
344 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
345 }
346 }
347 return os.Create(ddpath)
348}
349
350// get a path to a segment file in our database
351func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
352 ext := filepath.Ext(file)
353 base := strings.TrimSuffix(file, ext)
354 aqt, err := aqtime.FromString(base)
355 if err != nil {
356 return "", err
357 }
358 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
359 yr, mon, day, hr, min, _, _ := aqt.Parts()
360 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
361}
362
363// get a path to a segment file in our database
364func (cli *CLI) HLSDir(user string) (string, error) {
365 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
366}
367
368// create a segment file in our database
369func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
370 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
371 yr, mon, day, hr, min, _, _ := aqt.Parts()
372 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
373}
374
375// read a file from our data dir
376func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
377 ddpath := cli.DataFilePath(fpath)
378
379 fd, err := os.Open(ddpath)
380 if err != nil {
381 return err
382 }
383 _, err = io.Copy(w, fd)
384 if err != nil {
385 return err
386 }
387
388 return nil
389}
390
391func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
392 cli.dataDirFlags = append(cli.dataDirFlags, dest)
393 *dest = filepath.Join(SPDataDir, defaultValue)
394 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
395 fs.Func(name, usage, func(s string) error {
396 *dest = s
397 return nil
398 })
399}
400
401func (cli *CLI) HasMist() bool {
402 return runtime.GOOS == "linux"
403}
404
405// type for comma-separated ethereum addresses
406func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
407 *dest = []aqpub.Pub{}
408 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
409 fs.Func(name, usage, func(s string) error {
410 if s == "" {
411 return nil
412 }
413 strs := strings.Split(s, ",")
414 for _, str := range strs {
415 pub, err := aqpub.FromHexString(str)
416 if err != nil {
417 return err
418 }
419 *dest = append(*dest, pub)
420 }
421 return nil
422 })
423}
424
425func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) {
426 *dest = []string{}
427 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
428 fs.Func(name, usage, func(s string) error {
429 if s == "" {
430 return nil
431 }
432 strs := strings.Split(s, ",")
433 *dest = append(*dest, strs...)
434 return nil
435 })
436}
437
438func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
439 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
440 fs.Func(name, usage, func(s string) error {
441 if s == "" {
442 return nil
443 }
444 return json.Unmarshal([]byte(s), dest)
445 })
446}
447
448// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
449func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
450 *dest = map[string]map[string]int{}
451 fs.Func(name, usage, func(s string) error {
452 if s == "" {
453 return nil
454 }
455 pairs := strings.Split(s, ",")
456 for _, pair := range pairs {
457 scoreSplit := strings.Split(pair, ":")
458 if len(scoreSplit) != 2 {
459 return fmt.Errorf("invalid debug flag: %s", pair)
460 }
461 score, err := strconv.Atoi(scoreSplit[1])
462 if err != nil {
463 return fmt.Errorf("invalid debug flag: %s", pair)
464 }
465 selectorSplit := strings.Split(scoreSplit[0], "=")
466 if len(selectorSplit) != 2 {
467 return fmt.Errorf("invalid debug flag: %s", pair)
468 }
469 _, ok := (*dest)[selectorSplit[0]]
470 if !ok {
471 (*dest)[selectorSplit[0]] = map[string]int{}
472 }
473 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
474 }
475
476 return nil
477 })
478}
479
480func (cli *CLI) StreamIsAllowed(did string) error {
481 if cli.WideOpen {
482 return nil
483 }
484 // if the user set no test streams, anyone can stream
485 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
486 // but only valid atproto accounts! did:key is only allowed for our local test stream
487 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
488 if openServer && !isDIDKey {
489 return nil
490 }
491 for _, a := range cli.AllowedStreams {
492 if a == did {
493 return nil
494 }
495 }
496 return fmt.Errorf("user is not allowed to stream")
497}
498
499func (cli *CLI) MyDID() string {
500 return fmt.Sprintf("did:web:%s", cli.PublicHost)
501}