Live video on the AT Protocol
1package config
2
3import (
4 "crypto/rsa"
5 "crypto/x509"
6 "encoding/json"
7 "encoding/pem"
8 "errors"
9 "flag"
10 "fmt"
11 "io"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/lestrrat-go/jwx/v2/jwk"
21 "github.com/peterbourgon/ff/v3"
22 "golang.org/x/exp/rand"
23 "stream.place/streamplace/pkg/aqtime"
24 "stream.place/streamplace/pkg/constants"
25 "stream.place/streamplace/pkg/crypto/aqpub"
26 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
27)
28
29const SPDataDir = "$SP_DATA_DIR"
30const SegmentsDir = "segments"
31
32type BuildFlags struct {
33 Version string
34 BuildTime int64
35 UUID string
36}
37
38func (b BuildFlags) BuildTimeStr() string {
39 ts := time.Unix(b.BuildTime, 0)
40 return ts.UTC().Format(time.RFC3339)
41}
42
43func (b BuildFlags) BuildTimeStrExpo() string {
44 ts := time.Unix(b.BuildTime, 0)
45 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
46}
47
48type CLI struct {
49 AdminAccount string
50 Build *BuildFlags
51 DataDir string
52 DBPath string
53 EthAccountAddr string
54 EthKeystorePath string
55 EthPassword string
56 FirebaseServiceAccount string
57 GitLabURL string
58 HTTPAddr string
59 HTTPInternalAddr string
60 HTTPSAddr string
61 RtmpsAddr string
62 Secure bool
63 NoMist bool
64 MistAdminPort int
65 MistHTTPPort int
66 MistRTMPPort int
67 SigningKeyPath string
68 TAURL string
69 TLSCertPath string
70 TLSKeyPath string
71 PKCS11ModulePath string
72 PKCS11Pin string
73 PKCS11TokenSlot string
74 PKCS11TokenLabel string
75 PKCS11TokenSerial string
76 PKCS11KeypairLabel string
77 PKCS11KeypairID string
78 StreamerName string
79 RelayHost string
80 Debug map[string]map[string]int
81 AllowedStreams []string
82 WideOpen bool
83 Peers []string
84 Redirects []string
85 TestStream bool
86 FrontendProxy string
87 AppBundleID string
88 NoFirehose bool
89 PrintChat bool
90 Color string
91 LivepeerGatewayURL string
92 WHIPTest string
93 Thumbnail bool
94 SmearAudio bool
95 ExternalSigning bool
96 RTMPServerAddon string
97 TracingEndpoint string
98 PublicHost string
99 RateLimitPerSecond int
100 RateLimitBurst int
101 RateLimitWebsocket int
102 JWK jwk.Key
103 AccessJWK jwk.Key
104 dataDirFlags []*string
105 DiscordWebhooks []*discordtypes.Webhook
106 NewWebRTCPlayback bool
107}
108
109func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
110 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
111 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
112 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
113 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
114 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
115 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
116 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
117 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
118 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
119 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file")
120 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
121 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
122 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
123 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
124 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
125 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
126 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
127 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
128 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
129 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
130 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
131 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
132 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
133 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
134 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
135 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
136 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
137 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
138 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
139 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
140 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
141 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
142 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
143 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
144 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
145 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
146 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
147 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
148 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
149 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
150 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
151 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
152 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
153 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)")
154 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
155 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
156 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
157 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
158 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
159 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
160 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
161 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
162
163 if runtime.GOOS == "linux" {
164 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
165 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
166 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
167 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
168 }
169 return fs
170}
171
172var StreamplaceSchemePrefix = "streamplace://"
173
174func (cli *CLI) OwnInternalURL() string {
175 // No errors because we know it's valid from AddrFlag
176 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
177
178 ip := net.ParseIP(host)
179 if ip.IsUnspecified() {
180 host = "127.0.0.1"
181 }
182 addr := net.JoinHostPort(host, port)
183 return fmt.Sprintf("http://%s", addr)
184}
185
186func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
187 bs, err := os.ReadFile(cli.SigningKeyPath)
188 if err != nil {
189 return nil, err
190 }
191 block, _ := pem.Decode(bs)
192 if block == nil {
193 return nil, fmt.Errorf("no RSA key found in signing key")
194 }
195 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
196 if err != nil {
197 return nil, err
198 }
199 return key, nil
200}
201
202func RandomTrailer(length int) string {
203 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
204
205 res := make([]byte, length)
206 for i := 0; i < length; i++ {
207 res[i] = charset[rand.Intn(len(charset))]
208 }
209 return string(res)
210}
211
212func DefaultDataDir() string {
213 home, err := os.UserHomeDir()
214 if err != nil {
215 // not fatal unless the user doesn't set one later
216 return ""
217 }
218 return filepath.Join(home, ".streamplace")
219}
220
221func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
222 err := ff.Parse(
223 fs, args,
224 ff.WithEnvVarPrefix("SP"),
225 )
226 if err != nil {
227 return err
228 }
229 if cli.DataDir == "" {
230 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
231 }
232 for _, dest := range cli.dataDirFlags {
233 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
234 }
235 return nil
236}
237
238func (cli *CLI) DataFilePath(fpath []string) string {
239 if cli.DataDir == "" {
240 panic("no data dir configured")
241 }
242 // windows does not like colons
243 safe := []string{}
244 for _, p := range fpath {
245 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
246 }
247 fpath = append([]string{cli.DataDir}, safe...)
248 fdpath := filepath.Join(fpath...)
249 return fdpath
250}
251
252// does a file exist in our data dir?
253func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
254 ddpath := cli.DataFilePath(fpath)
255 _, err := os.Stat(ddpath)
256 if err == nil {
257 return true, nil
258 }
259 if errors.Is(err, os.ErrNotExist) {
260 return false, nil
261 }
262 return false, err
263}
264
265// write a file to our data dir
266func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
267 fd, err := cli.DataFileCreate(fpath, overwrite)
268 if err != nil {
269 return err
270 }
271 defer fd.Close()
272 _, err = io.Copy(fd, r)
273 if err != nil {
274 return err
275 }
276
277 return nil
278}
279
280// create a file in our data dir. don't forget to close it!
281func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
282 ddpath := cli.DataFilePath(fpath)
283 if !overwrite {
284 exists, err := cli.DataFileExists(fpath)
285 if err != nil {
286 return nil, err
287 }
288 if exists {
289 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
290 }
291 }
292 if len(fpath) > 1 {
293 dirs, _ := filepath.Split(ddpath)
294 err := os.MkdirAll(dirs, os.ModePerm)
295 if err != nil {
296 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
297 }
298 }
299 return os.Create(ddpath)
300}
301
302// get a path to a segment file in our database
303func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
304 ext := filepath.Ext(file)
305 base := strings.TrimSuffix(file, ext)
306 aqt, err := aqtime.FromString(base)
307 if err != nil {
308 return "", err
309 }
310 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
311 yr, mon, day, hr, min, _, _ := aqt.Parts()
312 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
313}
314
315// get a path to a segment file in our database
316func (cli *CLI) HLSDir(user string) (string, error) {
317 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
318}
319
320// create a segment file in our database
321func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
322 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
323 yr, mon, day, hr, min, _, _ := aqt.Parts()
324 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
325}
326
327// read a file from our data dir
328func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
329 ddpath := cli.DataFilePath(fpath)
330
331 fd, err := os.Open(ddpath)
332 if err != nil {
333 return err
334 }
335 _, err = io.Copy(w, fd)
336 if err != nil {
337 return err
338 }
339
340 return nil
341}
342
343func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
344 cli.dataDirFlags = append(cli.dataDirFlags, dest)
345 *dest = filepath.Join(SPDataDir, defaultValue)
346 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
347 fs.Func(name, usage, func(s string) error {
348 *dest = s
349 return nil
350 })
351}
352
353func (cli *CLI) HasMist() bool {
354 return runtime.GOOS == "linux"
355}
356
357// type for comma-separated ethereum addresses
358func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
359 *dest = []aqpub.Pub{}
360 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
361 fs.Func(name, usage, func(s string) error {
362 if s == "" {
363 return nil
364 }
365 strs := strings.Split(s, ",")
366 for _, str := range strs {
367 pub, err := aqpub.FromHexString(str)
368 if err != nil {
369 return err
370 }
371 *dest = append(*dest, pub)
372 }
373 return nil
374 })
375}
376
377func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) {
378 *dest = []string{}
379 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
380 fs.Func(name, usage, func(s string) error {
381 if s == "" {
382 return nil
383 }
384 strs := strings.Split(s, ",")
385 *dest = append(*dest, strs...)
386 return nil
387 })
388}
389
390func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
391 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
392 fs.Func(name, usage, func(s string) error {
393 if s == "" {
394 return nil
395 }
396 return json.Unmarshal([]byte(s), dest)
397 })
398}
399
400// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
401func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
402 *dest = map[string]map[string]int{}
403 fs.Func(name, usage, func(s string) error {
404 if s == "" {
405 return nil
406 }
407 pairs := strings.Split(s, ",")
408 for _, pair := range pairs {
409 scoreSplit := strings.Split(pair, ":")
410 if len(scoreSplit) != 2 {
411 return fmt.Errorf("invalid debug flag: %s", pair)
412 }
413 score, err := strconv.Atoi(scoreSplit[1])
414 if err != nil {
415 return fmt.Errorf("invalid debug flag: %s", pair)
416 }
417 selectorSplit := strings.Split(scoreSplit[0], "=")
418 if len(selectorSplit) != 2 {
419 return fmt.Errorf("invalid debug flag: %s", pair)
420 }
421 _, ok := (*dest)[selectorSplit[0]]
422 if !ok {
423 (*dest)[selectorSplit[0]] = map[string]int{}
424 }
425 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
426 }
427
428 return nil
429 })
430}
431
432func (cli *CLI) StreamIsAllowed(did string) error {
433 if cli.WideOpen {
434 return nil
435 }
436 // if the user set no test streams, anyone can stream
437 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
438 // but only valid atproto accounts! did:key is only allowed for our local test stream
439 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
440 if openServer && !isDIDKey {
441 return nil
442 }
443 for _, a := range cli.AllowedStreams {
444 if a == did {
445 return nil
446 }
447 }
448 return fmt.Errorf("user is not allowed to stream")
449}