Live video on the AT Protocol
1package config
2
3import (
4 "crypto/rsa"
5 "crypto/x509"
6 "encoding/json"
7 "encoding/pem"
8 "errors"
9 "flag"
10 "fmt"
11 "io"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/lestrrat-go/jwx/v2/jwk"
21 "github.com/peterbourgon/ff/v3"
22 "golang.org/x/exp/rand"
23 "stream.place/streamplace/pkg/aqtime"
24 "stream.place/streamplace/pkg/constants"
25 "stream.place/streamplace/pkg/crypto/aqpub"
26 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
27)
28
29const SPDataDir = "$SP_DATA_DIR"
30const SegmentsDir = "segments"
31
32type BuildFlags struct {
33 Version string
34 BuildTime int64
35 UUID string
36}
37
38func (b BuildFlags) BuildTimeStr() string {
39 ts := time.Unix(b.BuildTime, 0)
40 return ts.UTC().Format(time.RFC3339)
41}
42
43func (b BuildFlags) BuildTimeStrExpo() string {
44 ts := time.Unix(b.BuildTime, 0)
45 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
46}
47
48type CLI struct {
49 AdminAccount string
50 Build *BuildFlags
51 DataDir string
52 DBPath string
53 EthAccountAddr string
54 EthKeystorePath string
55 EthPassword string
56 FirebaseServiceAccount string
57 GitLabURL string
58 HTTPAddr string
59 HTTPInternalAddr string
60 HTTPSAddr string
61 RtmpsAddr string
62 Secure bool
63 NoMist bool
64 MistAdminPort int
65 MistHTTPPort int
66 MistRTMPPort int
67 SigningKeyPath string
68 TAURL string
69 TLSCertPath string
70 TLSKeyPath string
71 PKCS11ModulePath string
72 PKCS11Pin string
73 PKCS11TokenSlot string
74 PKCS11TokenLabel string
75 PKCS11TokenSerial string
76 PKCS11KeypairLabel string
77 PKCS11KeypairID string
78 StreamerName string
79 RelayHost string
80 Debug map[string]map[string]int
81 AllowedStreams []string
82 WideOpen bool
83 Peers []string
84 Redirects []string
85 TestStream bool
86 FrontendProxy string
87 AppBundleID string
88 NoFirehose bool
89 PrintChat bool
90 Color string
91 LivepeerGatewayURL string
92 WHIPTest string
93 Thumbnail bool
94 SmearAudio bool
95 ExternalSigning bool
96 RTMPServerAddon string
97 TracingEndpoint string
98 PublicHost string
99 RateLimitPerSecond int
100 RateLimitBurst int
101 RateLimitWebsocket int
102 JWK jwk.Key
103 AccessJWK jwk.Key
104 dataDirFlags []*string
105 DiscordWebhooks []*discordtypes.Webhook
106 NewWebRTCPlayback bool
107 AppleTeamID string
108 AndroidCertFingerprint string
109}
110
111func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
112 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
113 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
114 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
115 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
116 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
117 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
118 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
119 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
120 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
121 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file")
122 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
123 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
124 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
125 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
126 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
127 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
128 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
129 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
130 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
131 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
132 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
133 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
134 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
135 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
136 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
137 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
138 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
139 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
140 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
141 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
142 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
143 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
144 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
145 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
146 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
147 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
148 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
149 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
150 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
151 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
152 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
153 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
154 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
155 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)")
156 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
157 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
158 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
159 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
160 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
161 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
162 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
163 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
164 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
165 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
166
167 if runtime.GOOS == "linux" {
168 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
169 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
170 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
171 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
172 }
173 return fs
174}
175
176var StreamplaceSchemePrefix = "streamplace://"
177
178func (cli *CLI) OwnInternalURL() string {
179 // No errors because we know it's valid from AddrFlag
180 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
181
182 ip := net.ParseIP(host)
183 if ip.IsUnspecified() {
184 host = "127.0.0.1"
185 }
186 addr := net.JoinHostPort(host, port)
187 return fmt.Sprintf("http://%s", addr)
188}
189
190func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
191 bs, err := os.ReadFile(cli.SigningKeyPath)
192 if err != nil {
193 return nil, err
194 }
195 block, _ := pem.Decode(bs)
196 if block == nil {
197 return nil, fmt.Errorf("no RSA key found in signing key")
198 }
199 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
200 if err != nil {
201 return nil, err
202 }
203 return key, nil
204}
205
206func RandomTrailer(length int) string {
207 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
208
209 res := make([]byte, length)
210 for i := 0; i < length; i++ {
211 res[i] = charset[rand.Intn(len(charset))]
212 }
213 return string(res)
214}
215
216func DefaultDataDir() string {
217 home, err := os.UserHomeDir()
218 if err != nil {
219 // not fatal unless the user doesn't set one later
220 return ""
221 }
222 return filepath.Join(home, ".streamplace")
223}
224
225func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
226 err := ff.Parse(
227 fs, args,
228 ff.WithEnvVarPrefix("SP"),
229 )
230 if err != nil {
231 return err
232 }
233 if cli.DataDir == "" {
234 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
235 }
236 for _, dest := range cli.dataDirFlags {
237 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
238 }
239 return nil
240}
241
242func (cli *CLI) DataFilePath(fpath []string) string {
243 if cli.DataDir == "" {
244 panic("no data dir configured")
245 }
246 // windows does not like colons
247 safe := []string{}
248 for _, p := range fpath {
249 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
250 }
251 fpath = append([]string{cli.DataDir}, safe...)
252 fdpath := filepath.Join(fpath...)
253 return fdpath
254}
255
256// does a file exist in our data dir?
257func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
258 ddpath := cli.DataFilePath(fpath)
259 _, err := os.Stat(ddpath)
260 if err == nil {
261 return true, nil
262 }
263 if errors.Is(err, os.ErrNotExist) {
264 return false, nil
265 }
266 return false, err
267}
268
269// write a file to our data dir
270func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
271 fd, err := cli.DataFileCreate(fpath, overwrite)
272 if err != nil {
273 return err
274 }
275 defer fd.Close()
276 _, err = io.Copy(fd, r)
277 if err != nil {
278 return err
279 }
280
281 return nil
282}
283
284// create a file in our data dir. don't forget to close it!
285func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
286 ddpath := cli.DataFilePath(fpath)
287 if !overwrite {
288 exists, err := cli.DataFileExists(fpath)
289 if err != nil {
290 return nil, err
291 }
292 if exists {
293 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
294 }
295 }
296 if len(fpath) > 1 {
297 dirs, _ := filepath.Split(ddpath)
298 err := os.MkdirAll(dirs, os.ModePerm)
299 if err != nil {
300 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
301 }
302 }
303 return os.Create(ddpath)
304}
305
306// get a path to a segment file in our database
307func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
308 ext := filepath.Ext(file)
309 base := strings.TrimSuffix(file, ext)
310 aqt, err := aqtime.FromString(base)
311 if err != nil {
312 return "", err
313 }
314 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
315 yr, mon, day, hr, min, _, _ := aqt.Parts()
316 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
317}
318
319// get a path to a segment file in our database
320func (cli *CLI) HLSDir(user string) (string, error) {
321 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
322}
323
324// create a segment file in our database
325func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
326 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
327 yr, mon, day, hr, min, _, _ := aqt.Parts()
328 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
329}
330
331// read a file from our data dir
332func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
333 ddpath := cli.DataFilePath(fpath)
334
335 fd, err := os.Open(ddpath)
336 if err != nil {
337 return err
338 }
339 _, err = io.Copy(w, fd)
340 if err != nil {
341 return err
342 }
343
344 return nil
345}
346
347func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
348 cli.dataDirFlags = append(cli.dataDirFlags, dest)
349 *dest = filepath.Join(SPDataDir, defaultValue)
350 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
351 fs.Func(name, usage, func(s string) error {
352 *dest = s
353 return nil
354 })
355}
356
357func (cli *CLI) HasMist() bool {
358 return runtime.GOOS == "linux"
359}
360
361// type for comma-separated ethereum addresses
362func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
363 *dest = []aqpub.Pub{}
364 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
365 fs.Func(name, usage, func(s string) error {
366 if s == "" {
367 return nil
368 }
369 strs := strings.Split(s, ",")
370 for _, str := range strs {
371 pub, err := aqpub.FromHexString(str)
372 if err != nil {
373 return err
374 }
375 *dest = append(*dest, pub)
376 }
377 return nil
378 })
379}
380
381func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) {
382 *dest = []string{}
383 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
384 fs.Func(name, usage, func(s string) error {
385 if s == "" {
386 return nil
387 }
388 strs := strings.Split(s, ",")
389 *dest = append(*dest, strs...)
390 return nil
391 })
392}
393
394func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
395 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
396 fs.Func(name, usage, func(s string) error {
397 if s == "" {
398 return nil
399 }
400 return json.Unmarshal([]byte(s), dest)
401 })
402}
403
404// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
405func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
406 *dest = map[string]map[string]int{}
407 fs.Func(name, usage, func(s string) error {
408 if s == "" {
409 return nil
410 }
411 pairs := strings.Split(s, ",")
412 for _, pair := range pairs {
413 scoreSplit := strings.Split(pair, ":")
414 if len(scoreSplit) != 2 {
415 return fmt.Errorf("invalid debug flag: %s", pair)
416 }
417 score, err := strconv.Atoi(scoreSplit[1])
418 if err != nil {
419 return fmt.Errorf("invalid debug flag: %s", pair)
420 }
421 selectorSplit := strings.Split(scoreSplit[0], "=")
422 if len(selectorSplit) != 2 {
423 return fmt.Errorf("invalid debug flag: %s", pair)
424 }
425 _, ok := (*dest)[selectorSplit[0]]
426 if !ok {
427 (*dest)[selectorSplit[0]] = map[string]int{}
428 }
429 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
430 }
431
432 return nil
433 })
434}
435
436func (cli *CLI) StreamIsAllowed(did string) error {
437 if cli.WideOpen {
438 return nil
439 }
440 // if the user set no test streams, anyone can stream
441 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
442 // but only valid atproto accounts! did:key is only allowed for our local test stream
443 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
444 if openServer && !isDIDKey {
445 return nil
446 }
447 for _, a := range cli.AllowedStreams {
448 if a == did {
449 return nil
450 }
451 }
452 return fmt.Errorf("user is not allowed to stream")
453}