Live video on the AT Protocol
1package config
2
3import (
4 "crypto/rsa"
5 "crypto/x509"
6 "encoding/json"
7 "encoding/pem"
8 "errors"
9 "flag"
10 "fmt"
11 "io"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime"
16 "strconv"
17 "strings"
18 "time"
19
20 "math/rand/v2"
21
22 "github.com/lestrrat-go/jwx/v2/jwk"
23 "github.com/peterbourgon/ff/v3"
24 "stream.place/streamplace/pkg/aqtime"
25 "stream.place/streamplace/pkg/constants"
26 "stream.place/streamplace/pkg/crypto/aqpub"
27 "stream.place/streamplace/pkg/integrations/discord/discordtypes"
28)
29
30const SPDataDir = "$SP_DATA_DIR"
31const SegmentsDir = "segments"
32
33type BuildFlags struct {
34 Version string
35 BuildTime int64
36 UUID string
37}
38
39func (b BuildFlags) BuildTimeStr() string {
40 ts := time.Unix(b.BuildTime, 0)
41 return ts.UTC().Format(time.RFC3339)
42}
43
44func (b BuildFlags) BuildTimeStrExpo() string {
45 ts := time.Unix(b.BuildTime, 0)
46 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
47}
48
49type CLI struct {
50 AdminAccount string
51 Build *BuildFlags
52 DataDir string
53 DBPath string
54 EthAccountAddr string
55 EthKeystorePath string
56 EthPassword string
57 FirebaseServiceAccount string
58 GitLabURL string
59 HTTPAddr string
60 HTTPInternalAddr string
61 HTTPSAddr string
62 RtmpsAddr string
63 Secure bool
64 NoMist bool
65 MistAdminPort int
66 MistHTTPPort int
67 MistRTMPPort int
68 SigningKeyPath string
69 TAURL string
70 TLSCertPath string
71 TLSKeyPath string
72 PKCS11ModulePath string
73 PKCS11Pin string
74 PKCS11TokenSlot string
75 PKCS11TokenLabel string
76 PKCS11TokenSerial string
77 PKCS11KeypairLabel string
78 PKCS11KeypairID string
79 StreamerName string
80 RelayHost string
81 Debug map[string]map[string]int
82 AllowedStreams []string
83 WideOpen bool
84 Peers []string
85 Redirects []string
86 TestStream bool
87 FrontendProxy string
88 AppBundleID string
89 NoFirehose bool
90 PrintChat bool
91 Color string
92 LivepeerGatewayURL string
93 WHIPTest string
94 Thumbnail bool
95 SmearAudio bool
96 ExternalSigning bool
97 RTMPServerAddon string
98 TracingEndpoint string
99 PublicHost string
100 RateLimitPerSecond int
101 RateLimitBurst int
102 RateLimitWebsocket int
103 JWK jwk.Key
104 AccessJWK jwk.Key
105 dataDirFlags []*string
106 DiscordWebhooks []*discordtypes.Webhook
107 NewWebRTCPlayback bool
108 AppleTeamID string
109 AndroidCertFingerprint string
110}
111
112func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
113 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
114 fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
115 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
116 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
117 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
118 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
119 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
120 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
121 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
122 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file")
123 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
124 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
125 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
126 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
127 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
128 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
129 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
130 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
131 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
132 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
133 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
134 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
135 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
136 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
137 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
138 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
139 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
140 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
141 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
142 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
143 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
144 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
145 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
146 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
147 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
148 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
149 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
150 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
151 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
152 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
153 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
154 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
155 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
156 fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)")
157 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
158 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
159 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
160 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
161 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
162 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
163 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
164 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
165 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
166 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
167
168 if runtime.GOOS == "linux" {
169 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
170 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
171 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
172 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
173 }
174 return fs
175}
176
177var StreamplaceSchemePrefix = "streamplace://"
178
179func (cli *CLI) OwnInternalURL() string {
180 // No errors because we know it's valid from AddrFlag
181 host, port, _ := net.SplitHostPort(cli.HTTPInternalAddr)
182
183 ip := net.ParseIP(host)
184 if ip.IsUnspecified() {
185 host = "127.0.0.1"
186 }
187 addr := net.JoinHostPort(host, port)
188 return fmt.Sprintf("http://%s", addr)
189}
190
191func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
192 bs, err := os.ReadFile(cli.SigningKeyPath)
193 if err != nil {
194 return nil, err
195 }
196 block, _ := pem.Decode(bs)
197 if block == nil {
198 return nil, fmt.Errorf("no RSA key found in signing key")
199 }
200 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
201 if err != nil {
202 return nil, err
203 }
204 return key, nil
205}
206
207func RandomTrailer(length int) string {
208 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
209
210 res := make([]byte, length)
211 for i := 0; i < length; i++ {
212 res[i] = charset[rand.IntN(len(charset))]
213 }
214 return string(res)
215}
216
217func DefaultDataDir() string {
218 home, err := os.UserHomeDir()
219 if err != nil {
220 // not fatal unless the user doesn't set one later
221 return ""
222 }
223 return filepath.Join(home, ".streamplace")
224}
225
226func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
227 err := ff.Parse(
228 fs, args,
229 ff.WithEnvVarPrefix("SP"),
230 )
231 if err != nil {
232 return err
233 }
234 if cli.DataDir == "" {
235 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
236 }
237 for _, dest := range cli.dataDirFlags {
238 *dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
239 }
240 return nil
241}
242
243func (cli *CLI) DataFilePath(fpath []string) string {
244 if cli.DataDir == "" {
245 panic("no data dir configured")
246 }
247 // windows does not like colons
248 safe := []string{}
249 for _, p := range fpath {
250 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
251 }
252 fpath = append([]string{cli.DataDir}, safe...)
253 fdpath := filepath.Join(fpath...)
254 return fdpath
255}
256
257// does a file exist in our data dir?
258func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
259 ddpath := cli.DataFilePath(fpath)
260 _, err := os.Stat(ddpath)
261 if err == nil {
262 return true, nil
263 }
264 if errors.Is(err, os.ErrNotExist) {
265 return false, nil
266 }
267 return false, err
268}
269
270// write a file to our data dir
271func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
272 fd, err := cli.DataFileCreate(fpath, overwrite)
273 if err != nil {
274 return err
275 }
276 defer fd.Close()
277 _, err = io.Copy(fd, r)
278 if err != nil {
279 return err
280 }
281
282 return nil
283}
284
285// create a file in our data dir. don't forget to close it!
286func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
287 ddpath := cli.DataFilePath(fpath)
288 if !overwrite {
289 exists, err := cli.DataFileExists(fpath)
290 if err != nil {
291 return nil, err
292 }
293 if exists {
294 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
295 }
296 }
297 if len(fpath) > 1 {
298 dirs, _ := filepath.Split(ddpath)
299 err := os.MkdirAll(dirs, os.ModePerm)
300 if err != nil {
301 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
302 }
303 }
304 return os.Create(ddpath)
305}
306
307// get a path to a segment file in our database
308func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
309 ext := filepath.Ext(file)
310 base := strings.TrimSuffix(file, ext)
311 aqt, err := aqtime.FromString(base)
312 if err != nil {
313 return "", err
314 }
315 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
316 yr, mon, day, hr, min, _, _ := aqt.Parts()
317 return cli.DataFilePath([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}), nil
318}
319
320// get a path to a segment file in our database
321func (cli *CLI) HLSDir(user string) (string, error) {
322 return cli.DataFilePath([]string{SegmentsDir, "hls", user}), nil
323}
324
325// create a segment file in our database
326func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
327 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
328 yr, mon, day, hr, min, _, _ := aqt.Parts()
329 return cli.DataFileCreate([]string{SegmentsDir, user, yr, mon, day, hr, min, fname}, false)
330}
331
332// read a file from our data dir
333func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
334 ddpath := cli.DataFilePath(fpath)
335
336 fd, err := os.Open(ddpath)
337 if err != nil {
338 return err
339 }
340 _, err = io.Copy(w, fd)
341 if err != nil {
342 return err
343 }
344
345 return nil
346}
347
348func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
349 cli.dataDirFlags = append(cli.dataDirFlags, dest)
350 *dest = filepath.Join(SPDataDir, defaultValue)
351 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
352 fs.Func(name, usage, func(s string) error {
353 *dest = s
354 return nil
355 })
356}
357
358func (cli *CLI) HasMist() bool {
359 return runtime.GOOS == "linux"
360}
361
362// type for comma-separated ethereum addresses
363func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
364 *dest = []aqpub.Pub{}
365 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
366 fs.Func(name, usage, func(s string) error {
367 if s == "" {
368 return nil
369 }
370 strs := strings.Split(s, ",")
371 for _, str := range strs {
372 pub, err := aqpub.FromHexString(str)
373 if err != nil {
374 return err
375 }
376 *dest = append(*dest, pub)
377 }
378 return nil
379 })
380}
381
382func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) {
383 *dest = []string{}
384 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
385 fs.Func(name, usage, func(s string) error {
386 if s == "" {
387 return nil
388 }
389 strs := strings.Split(s, ",")
390 *dest = append(*dest, strs...)
391 return nil
392 })
393}
394
395func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
396 usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
397 fs.Func(name, usage, func(s string) error {
398 if s == "" {
399 return nil
400 }
401 return json.Unmarshal([]byte(s), dest)
402 })
403}
404
405// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
406func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
407 *dest = map[string]map[string]int{}
408 fs.Func(name, usage, func(s string) error {
409 if s == "" {
410 return nil
411 }
412 pairs := strings.Split(s, ",")
413 for _, pair := range pairs {
414 scoreSplit := strings.Split(pair, ":")
415 if len(scoreSplit) != 2 {
416 return fmt.Errorf("invalid debug flag: %s", pair)
417 }
418 score, err := strconv.Atoi(scoreSplit[1])
419 if err != nil {
420 return fmt.Errorf("invalid debug flag: %s", pair)
421 }
422 selectorSplit := strings.Split(scoreSplit[0], "=")
423 if len(selectorSplit) != 2 {
424 return fmt.Errorf("invalid debug flag: %s", pair)
425 }
426 _, ok := (*dest)[selectorSplit[0]]
427 if !ok {
428 (*dest)[selectorSplit[0]] = map[string]int{}
429 }
430 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
431 }
432
433 return nil
434 })
435}
436
437func (cli *CLI) StreamIsAllowed(did string) error {
438 if cli.WideOpen {
439 return nil
440 }
441 // if the user set no test streams, anyone can stream
442 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
443 // but only valid atproto accounts! did:key is only allowed for our local test stream
444 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
445 if openServer && !isDIDKey {
446 return nil
447 }
448 for _, a := range cli.AllowedStreams {
449 if a == did {
450 return nil
451 }
452 }
453 return fmt.Errorf("user is not allowed to stream")
454}
455
456func (cli *CLI) MyDID() string {
457 return fmt.Sprintf("did:web:%s", cli.PublicHost)
458}