Live video on the AT Protocol
1package config
2
3import (
4 "crypto/rsa"
5 "crypto/x509"
6 "encoding/json"
7 "encoding/pem"
8 "errors"
9 "flag"
10 "fmt"
11 "io"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime"
16 "strconv"
17 "strings"
18 "time"
19
20 "math/rand/v2"
21
22 "github.com/lestrrat-go/jwx/v2/jwk"
23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
24 "github.com/peterbourgon/ff/v3"
25 "stream.place/streamplace/pkg/aqtime"
26 "stream.place/streamplace/pkg/constants"
27 "stream.place/streamplace/pkg/crypto/aqpub"
28 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
29)
30
31const SPDataDir = "$SP_DATA_DIR"
32const SegmentsDir = "segments"
33
34type BuildFlags struct {
35 Version string
36 BuildTime int64
37 UUID string
38}
39
40func (b BuildFlags) BuildTimeStr() string {
41 ts := time.Unix(b.BuildTime, 0)
42 return ts.UTC().Format(time.RFC3339)
43}
44
45func (b BuildFlags) BuildTimeStrExpo() string {
46 ts := time.Unix(b.BuildTime, 0)
47 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
48}
49
50type CLI struct {
51 AdminAccount string
52 Build *BuildFlags
53 DataDir string
54 DBURL string
55 IndexDBPath string
56 EthAccountAddr string
57 EthKeystorePath string
58 EthPassword string
59 FirebaseServiceAccount string
60 GitLabURL string
61 HTTPAddr string
62 HTTPInternalAddr string
63 HTTPSAddr string
64 RtmpsAddr string
65 Secure bool
66 NoMist bool
67 MistAdminPort int
68 MistHTTPPort int
69 MistRTMPPort int
70 SigningKeyPath string
71 TAURL string
72 TLSCertPath string
73 TLSKeyPath string
74 PKCS11ModulePath string
75 PKCS11Pin string
76 PKCS11TokenSlot string
77 PKCS11TokenLabel string
78 PKCS11TokenSerial string
79 PKCS11KeypairLabel string
80 PKCS11KeypairID string
81 StreamerName string
82 RelayHost string
83 Debug map[string]map[string]int
84 AllowedStreams []string
85 WideOpen bool
86 Peers []string
87 Redirects []string
88 TestStream bool
89 FrontendProxy string
90 AppBundleID string
91 NoFirehose bool
92 PrintChat bool
93 Color string
94 LivepeerGatewayURL string
95 LivepeerGateway bool
96 WHIPTest string
97 Thumbnail bool
98 SmearAudio bool
99 ExternalSigning bool
100 RTMPServerAddon string
101 TracingEndpoint string
102 PublicHost string
103 RateLimitPerSecond int
104 RateLimitBurst int
105 RateLimitWebsocket int
106 JWK jwk.Key
107 AccessJWK jwk.Key
108 dataDirFlags []*string
109 DiscordWebhooks []*discordtypes.Webhook
110 NewWebRTCPlayback bool
111 AppleTeamID string
112 AndroidCertFingerprint string
113 Labelers []string
114 AtprotoDID string
115 LivepeerHelp bool
116}
117
118func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
119 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
120 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
121 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
122 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
123 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
124 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
125 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
126 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
127 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
128 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
129 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
130 cli.DataDirFlag(fs, &cli.IndexDBPath, "index-db-path", "db.sqlite", "path to sqlite database file for maintaining atproto index")
131 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
132 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
133 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
134 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
135 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
136 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
137 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
138 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
139 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
140 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
141 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
142 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
143 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
144 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
145 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
146 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
147 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
148 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
149 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
150 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
151 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
152 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
153 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
154 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
155 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
156 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
157 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
158 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
159 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
160 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
161 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
162 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
163 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
164 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
165 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)")
166 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
167 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
168 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
169 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
170 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
171 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
172 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
173 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
174 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
175 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
176 cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to")
177 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
178 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
179
180 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
181 _ = starter.NewLivepeerConfig(lpFlags)
182 lpFlags.VisitAll(func(f *flag.Flag) {
183 adapted := LivepeerFlags.CamelToSnake[f.Name]
184 fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
185 })
186
187 if runtime.GOOS == "linux" {
188 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
189 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
190 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
191 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
192 }
193 return fs
194}
195
196var StreamplaceSchemePrefix = "streamplace://"
197
198func (cli *CLI) OwnInternalURL() string {
199 // No errors because we know it's valid from AddrFlag
200 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
201
202 ip := net.ParseIP(host)
203 if ip.IsUnspecified() {
204 host = "127.0.0.1"
205 }
206 addr := net.JoinHostPort(host, port)
207 return fmt.Sprintf("http://%s", addr)
208}
209
210func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
211 bs, err := os.ReadFile(cli.SigningKeyPath)
212 if err != nil {
213 return nil, err
214 }
215 block, _ := pem.Decode(bs)
216 if block == nil {
217 return nil, fmt.Errorf("no RSA key found in signing key")
218 }
219 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
220 if err != nil {
221 return nil, err
222 }
223 return key, nil
224}
225
226func RandomTrailer(length int) string {
227 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
228
229 res := make([]byte, length)
230 for i := 0; i < length; i++ {
231 res[i] = charset[rand.IntN(len(charset))]
232 }
233 return string(res)
234}
235
236func DefaultDataDir() string {
237 home, err := os.UserHomeDir()
238 if err != nil {
239 // not fatal unless the user doesn't set one later
240 return ""
241 }
242 return filepath.Join(home, ".streamplace")
243}
244
245func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
246 err := ff.Parse(
247 fs, args,
248 ff.WithEnvVarPrefix("SP"),
249 )
250 if err != nil {
251 return err
252 }
253 if cli.DataDir == "" {
254 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
255 }
256 if cli.LivepeerGateway && cli.LivepeerGatewayURL != "" {
257 return fmt.Errorf("defining both livepeer-gateway and livepeer-gateway-url doesn't make sense. do you want an embedded gateway or an external one?")
258 }
259 if cli.LivepeerGateway {
260 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
261 err = fs.Set("livepeer.data-dir", gatewayPath)
262 if err != nil {
263 return err
264 }
265 err = fs.Set("livepeer.gateway", "true")
266 if err != nil {
267 return err
268 }
269 httpAddrFlag := fs.Lookup("livepeer.http-addr")
270 if httpAddrFlag == nil {
271 return fmt.Errorf("livepeer.http-addr not found")
272 }
273 httpAddr := httpAddrFlag.Value.String()
274 if httpAddr == "" {
275 httpAddr = "127.0.0.1:8935"
276 err = fs.Set("livepeer.http-addr", httpAddr)
277 if err != nil {
278 return err
279 }
280 }
281 cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
282 }
283 for _, dest := range cli.dataDirFlags {
284 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
285 }
286 return nil
287}
288
289func (cli *CLI) DataFilePath(fpath []string) string {
290 if cli.DataDir == "" {
291 panic("no data dir configured")
292 }
293 // windows does not like colons
294 safe := []string{}
295 for _, p := range fpath {
296 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
297 }
298 fpath = append([]string{cli.DataDir}, safe...)
299 fdpath := filepath.Join(fpath...)
300 return fdpath
301}
302
303// does a file exist in our data dir?
304func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
305 ddpath := cli.DataFilePath(fpath)
306 _, err := os.Stat(ddpath)
307 if err == nil {
308 return true, nil
309 }
310 if errors.Is(err, os.ErrNotExist) {
311 return false, nil
312 }
313 return false, err
314}
315
316// write a file to our data dir
317func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
318 fd, err := cli.DataFileCreate(fpath, overwrite)
319 if err != nil {
320 return err
321 }
322 defer fd.Close()
323 _, err = io.Copy(fd, r)
324 if err != nil {
325 return err
326 }
327
328 return nil
329}
330
331// create a file in our data dir. don't forget to close it!
332func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
333 ddpath := cli.DataFilePath(fpath)
334 if !overwrite {
335 exists, err := cli.DataFileExists(fpath)
336 if err != nil {
337 return nil, err
338 }
339 if exists {
340 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
341 }
342 }
343 if len(fpath) > 1 {
344 dirs, _ := filepath.Split(ddpath)
345 err := os.MkdirAll(dirs, os.ModePerm)
346 if err != nil {
347 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
348 }
349 }
350 return os.Create(ddpath)
351}
352
353// get a path to a segment file in our database
354func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
355 ext := filepath.Ext(file)
356 base := strings.TrimSuffix(file, ext)
357 aqt, err := aqtime.FromString(base)
358 if err != nil {
359 return "", err
360 }
361 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
362 yr, mon, day, hr, min, _, _ := aqt.Parts()
363 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
364}
365
366// get a path to a segment file in our database
367func (cli *CLI) HLSDir(user string) (string, error) {
368 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
369}
370
371// create a segment file in our database
372func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
373 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
374 yr, mon, day, hr, min, _, _ := aqt.Parts()
375 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
376}
377
378// read a file from our data dir
379func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
380 ddpath := cli.DataFilePath(fpath)
381
382 fd, err := os.Open(ddpath)
383 if err != nil {
384 return err
385 }
386 _, err = io.Copy(w, fd)
387 if err != nil {
388 return err
389 }
390
391 return nil
392}
393
394func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
395 cli.dataDirFlags = append(cli.dataDirFlags, dest)
396 *dest = filepath.Join(SPDataDir, defaultValue)
397 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
398 fs.Func(name, usage, func(s string) error {
399 *dest = s
400 return nil
401 })
402}
403
404func (cli *CLI) HasMist() bool {
405 return runtime.GOOS == "linux"
406}
407
408// type for comma-separated ethereum addresses
409func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
410 *dest = []aqpub.Pub{}
411 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
412 fs.Func(name, usage, func(s string) error {
413 if s == "" {
414 return nil
415 }
416 strs := strings.Split(s, ",")
417 for _, str := range strs {
418 pub, err := aqpub.FromHexString(str)
419 if err != nil {
420 return err
421 }
422 *dest = append(*dest, pub)
423 }
424 return nil
425 })
426}
427
428func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) {
429 *dest = []string{}
430 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
431 fs.Func(name, usage, func(s string) error {
432 if s == "" {
433 return nil
434 }
435 strs := strings.Split(s, ",")
436 *dest = append(*dest, strs...)
437 return nil
438 })
439}
440
441func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
442 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
443 fs.Func(name, usage, func(s string) error {
444 if s == "" {
445 return nil
446 }
447 return json.Unmarshal([]byte(s), dest)
448 })
449}
450
451// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
452func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
453 *dest = map[string]map[string]int{}
454 fs.Func(name, usage, func(s string) error {
455 if s == "" {
456 return nil
457 }
458 pairs := strings.Split(s, ",")
459 for _, pair := range pairs {
460 scoreSplit := strings.Split(pair, ":")
461 if len(scoreSplit) != 2 {
462 return fmt.Errorf("invalid debug flag: %s", pair)
463 }
464 score, err := strconv.Atoi(scoreSplit[1])
465 if err != nil {
466 return fmt.Errorf("invalid debug flag: %s", pair)
467 }
468 selectorSplit := strings.Split(scoreSplit[0], "=")
469 if len(selectorSplit) != 2 {
470 return fmt.Errorf("invalid debug flag: %s", pair)
471 }
472 _, ok := (*dest)[selectorSplit[0]]
473 if !ok {
474 (*dest)[selectorSplit[0]] = map[string]int{}
475 }
476 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
477 }
478
479 return nil
480 })
481}
482
483func (cli *CLI) StreamIsAllowed(did string) error {
484 if cli.WideOpen {
485 return nil
486 }
487 // if the user set no test streams, anyone can stream
488 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
489 // but only valid atproto accounts! did:key is only allowed for our local test stream
490 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
491 if openServer && !isDIDKey {
492 return nil
493 }
494 for _, a := range cli.AllowedStreams {
495 if a == did {
496 return nil
497 }
498 }
499 return fmt.Errorf("user is not allowed to stream")
500}
501
502func (cli *CLI) MyDID() string {
503 return fmt.Sprintf("did:web:%s", cli.PublicHost)
504}