Live video on the AT Protocol
1package config
2
3import (
4 "crypto/rsa"
5 "crypto/x509"
6 "encoding/pem"
7 "errors"
8 "flag"
9 "fmt"
10 "io"
11 "net"
12 "os"
13 "path/filepath"
14 "runtime"
15 "strconv"
16 "strings"
17 "time"
18
19 "github.com/lestrrat-go/jwx/v2/jwk"
20 "github.com/peterbourgon/ff/v3"
21 "golang.org/x/exp/rand"
22 "stream.place/streamplace/pkg/aqtime"
23 "stream.place/streamplace/pkg/constants"
24 "stream.place/streamplace/pkg/crypto/aqpub"
25)
26
27const SP_DATA_DIR = "$SP_DATA_DIR"
28const SEGMENTS_DIR = "segments"
29
30type BuildFlags struct {
31 Version string
32 BuildTime int64
33 UUID string
34}
35
36func (b BuildFlags) BuildTimeStr() string {
37 ts := time.Unix(b.BuildTime, 0)
38 return ts.UTC().Format(time.RFC3339)
39}
40
41func (b BuildFlags) BuildTimeStrExpo() string {
42 ts := time.Unix(b.BuildTime, 0)
43 return ts.UTC().Format("2006-01-02T15:04:05.000Z")
44}
45
46type CLI struct {
47 AdminAccount string
48 Build *BuildFlags
49 DataDir string
50 DBPath string
51 EthAccountAddr string
52 EthKeystorePath string
53 EthPassword string
54 FirebaseServiceAccount string
55 GitLabURL string
56 HttpAddr string
57 HttpInternalAddr string
58 HttpsAddr string
59 Secure bool
60 NoMist bool
61 MistAdminPort int
62 MistHTTPPort int
63 MistRTMPPort int
64 SigningKeyPath string
65 TAURL string
66 TLSCertPath string
67 TLSKeyPath string
68 PKCS11ModulePath string
69 PKCS11Pin string
70 PKCS11TokenSlot string
71 PKCS11TokenLabel string
72 PKCS11TokenSerial string
73 PKCS11KeypairLabel string
74 PKCS11KeypairID string
75 StreamerName string
76 RelayHost string
77 Debug map[string]map[string]int
78 AllowedStreams []string
79 WideOpen bool
80 Peers []string
81 Redirects []string
82 TestStream bool
83 FrontendProxy string
84 AppBundleID string
85 NoFirehose bool
86 PrintChat bool
87 Color string
88 LivepeerGatewayURL string
89 WHIPTest string
90 Thumbnail bool
91 SmearAudio bool
92 ExternalSigning bool
93 TracingEndpoint string
94 PublicHost string
95 RateLimitPerSecond int
96 RateLimitBurst int
97 RateLimitWebsocket int
98 JWK jwk.Key
99 AccessJWK jwk.Key
100 dataDirFlags []*string
101}
102
103var STREAMPLACE_SCHEME_PREFIX = "streamplace://"
104
105func (cli *CLI) OwnInternalURL() string {
106 // No errors because we know it's valid from AddrFlag
107 host, port, _ := net.SplitHostPort(cli.HttpInternalAddr)
108 ip := net.ParseIP(host)
109 if ip.IsUnspecified() {
110 host = "127.0.0.1"
111 }
112 addr := net.JoinHostPort(host, port)
113 return fmt.Sprintf("http://%s", addr)
114}
115
116func (cli *CLI) ParseSigningKey() (*rsa.PrivateKey, error) {
117 bs, err := os.ReadFile(cli.SigningKeyPath)
118 if err != nil {
119 return nil, err
120 }
121 block, _ := pem.Decode(bs)
122 if block == nil {
123 return nil, fmt.Errorf("no RSA key found in signing key")
124 }
125 key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
126 if err != nil {
127 return nil, err
128 }
129 return key, nil
130}
131
132func RandomTrailer(length int) string {
133 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
134
135 res := make([]byte, length)
136 for i := 0; i < length; i++ {
137 res[i] = charset[rand.Intn(len(charset))]
138 }
139 return string(res)
140}
141
142func DefaultDataDir() string {
143 home, err := os.UserHomeDir()
144 if err != nil {
145 // not fatal unless the user doesn't set one later
146 return ""
147 }
148 return filepath.Join(home, ".streamplace")
149}
150
151func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
152 err := ff.Parse(
153 fs, os.Args[1:],
154 ff.WithEnvVarPrefix("SP"),
155 )
156 if err != nil {
157 return err
158 }
159 if cli.DataDir == "" {
160 return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
161 }
162 for _, dest := range cli.dataDirFlags {
163 *dest = strings.Replace(*dest, SP_DATA_DIR, cli.DataDir, 1)
164 }
165 return nil
166}
167
168func (cli *CLI) DataFilePath(fpath []string) string {
169 if cli.DataDir == "" {
170 panic("no data dir configured")
171 }
172 // windows does not like colons
173 safe := []string{}
174 for _, p := range fpath {
175 safe = append(safe, strings.ReplaceAll(p, ":", "-"))
176 }
177 fpath = append([]string{cli.DataDir}, safe...)
178 fdpath := filepath.Join(fpath...)
179 return fdpath
180}
181
182// does a file exist in our data dir?
183func (cli *CLI) DataFileExists(fpath []string) (bool, error) {
184 ddpath := cli.DataFilePath(fpath)
185 _, err := os.Stat(ddpath)
186 if err == nil {
187 return true, nil
188 }
189 if errors.Is(err, os.ErrNotExist) {
190 return false, nil
191 }
192 return false, err
193}
194
195// write a file to our data dir
196func (cli *CLI) DataFileWrite(fpath []string, r io.Reader, overwrite bool) error {
197 fd, err := cli.DataFileCreate(fpath, overwrite)
198 if err != nil {
199 return err
200 }
201 defer fd.Close()
202 _, err = io.Copy(fd, r)
203 if err != nil {
204 return err
205 }
206
207 return nil
208}
209
210// create a file in our data dir. don't forget to close it!
211func (cli *CLI) DataFileCreate(fpath []string, overwrite bool) (*os.File, error) {
212 ddpath := cli.DataFilePath(fpath)
213 if !overwrite {
214 exists, err := cli.DataFileExists(fpath)
215 if err != nil {
216 return nil, err
217 }
218 if exists {
219 return nil, fmt.Errorf("refusing to overwrite file that exists: %s", ddpath)
220 }
221 }
222 if len(fpath) > 1 {
223 dirs, _ := filepath.Split(ddpath)
224 err := os.MkdirAll(dirs, os.ModePerm)
225 if err != nil {
226 return nil, fmt.Errorf("error creating subdirectories for %s: %w", ddpath, err)
227 }
228 }
229 return os.Create(ddpath)
230}
231
232// get a path to a segment file in our database
233func (cli *CLI) SegmentFilePath(user string, file string) (string, error) {
234 ext := filepath.Ext(file)
235 base := strings.TrimSuffix(file, ext)
236 aqt, err := aqtime.FromString(base)
237 if err != nil {
238 return "", err
239 }
240 fname := fmt.Sprintf("%s%s", aqt.FileSafeString(), ext)
241 yr, mon, day, hr, min, _, _ := aqt.Parts()
242 return cli.DataFilePath([]string{SEGMENTS_DIR, user, yr, mon, day, hr, min, fname}), nil
243}
244
245// get a path to a segment file in our database
246func (cli *CLI) HLSDir(user string) (string, error) {
247 return cli.DataFilePath([]string{SEGMENTS_DIR, "hls", user}), nil
248}
249
250// create a segment file in our database
251func (cli *CLI) SegmentFileCreate(user string, aqt aqtime.AQTime, ext string) (*os.File, error) {
252 fname := fmt.Sprintf("%s.%s", aqt.FileSafeString(), ext)
253 yr, mon, day, hr, min, _, _ := aqt.Parts()
254 return cli.DataFileCreate([]string{SEGMENTS_DIR, user, yr, mon, day, hr, min, fname}, false)
255}
256
257// read a file from our data dir
258func (cli *CLI) DataFileRead(fpath []string, w io.Writer) error {
259 ddpath := cli.DataFilePath(fpath)
260
261 fd, err := os.Open(ddpath)
262 if err != nil {
263 return err
264 }
265 _, err = io.Copy(w, fd)
266 if err != nil {
267 return err
268 }
269
270 return nil
271}
272
273func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
274 cli.dataDirFlags = append(cli.dataDirFlags, dest)
275 *dest = filepath.Join(SP_DATA_DIR, defaultValue)
276 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
277 fs.Func(name, usage, func(s string) error {
278 *dest = s
279 return nil
280 })
281}
282
283func (cli *CLI) HasMist() bool {
284 return runtime.GOOS == "linux"
285}
286
287// type for comma-separated ethereum addresses
288func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
289 *dest = []aqpub.Pub{}
290 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
291 fs.Func(name, usage, func(s string) error {
292 if s == "" {
293 return nil
294 }
295 strs := strings.Split(s, ",")
296 for _, str := range strs {
297 pub, err := aqpub.FromHexString(str)
298 if err != nil {
299 return err
300 }
301 *dest = append(*dest, pub)
302 }
303 return nil
304 })
305}
306
307func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) {
308 *dest = []string{}
309 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
310 fs.Func(name, usage, func(s string) error {
311 if s == "" {
312 return nil
313 }
314 strs := strings.Split(s, ",")
315 *dest = append(*dest, strs...)
316 return nil
317 })
318}
319
320// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
321func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
322 *dest = map[string]map[string]int{}
323 fs.Func(name, usage, func(s string) error {
324 if s == "" {
325 return nil
326 }
327 pairs := strings.Split(s, ",")
328 for _, pair := range pairs {
329 scoreSplit := strings.Split(pair, ":")
330 if len(scoreSplit) != 2 {
331 return fmt.Errorf("invalid debug flag: %s", pair)
332 }
333 score, err := strconv.Atoi(scoreSplit[1])
334 if err != nil {
335 return fmt.Errorf("invalid debug flag: %s", pair)
336 }
337 selectorSplit := strings.Split(scoreSplit[0], "=")
338 if len(selectorSplit) != 2 {
339 return fmt.Errorf("invalid debug flag: %s", pair)
340 }
341 _, ok := (*dest)[selectorSplit[0]]
342 if !ok {
343 (*dest)[selectorSplit[0]] = map[string]int{}
344 }
345 (*dest)[selectorSplit[0]][selectorSplit[1]] = score
346 }
347
348 return nil
349 })
350}
351
352func (cli *CLI) StreamIsAllowed(did string) error {
353 if cli.WideOpen {
354 return nil
355 }
356 // if the user set no test streams, anyone can stream
357 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1)
358 // but only valid atproto accounts! did:key is only allowed for our local test stream
359 isDIDKey := strings.HasPrefix(did, constants.DID_KEY_PREFIX)
360 if openServer && !isDIDKey {
361 return nil
362 }
363 for _, a := range cli.AllowedStreams {
364 if a == did {
365 return nil
366 }
367 }
368 return fmt.Errorf("user is not allowed to stream")
369}