Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "golang.org/x/term"
20 "stream.place/streamplace/pkg/aqhttp"
21 "stream.place/streamplace/pkg/atproto"
22 "stream.place/streamplace/pkg/bus"
23 "stream.place/streamplace/pkg/crypto/signers"
24 "stream.place/streamplace/pkg/crypto/signers/eip712"
25 "stream.place/streamplace/pkg/director"
26 "stream.place/streamplace/pkg/log"
27 "stream.place/streamplace/pkg/media"
28 "stream.place/streamplace/pkg/notifications"
29 "stream.place/streamplace/pkg/replication"
30 "stream.place/streamplace/pkg/replication/boring"
31 v0 "stream.place/streamplace/pkg/schema/v0"
32 "stream.place/streamplace/pkg/spmetrics"
33
34 "github.com/ThalesGroup/crypto11"
35 _ "github.com/go-gst/go-glib/glib"
36 _ "github.com/go-gst/go-gst/gst"
37 "stream.place/streamplace/pkg/api"
38 "stream.place/streamplace/pkg/config"
39 "stream.place/streamplace/pkg/model"
40)
41
42// Additional jobs that can be injected by platforms
43type jobFunc func(ctx context.Context, cli *config.CLI) error
44
45// parse the CLI and fire up an streamplace node!
46func start(build *config.BuildFlags, platformJobs []jobFunc) error {
47 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
48 err := media.RunSelfTest(context.Background())
49 if err != nil {
50 if selfTest {
51 fmt.Println(err.Error())
52 os.Exit(1)
53 } else {
54 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
55 if retryCount >= 3 {
56 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
57 return err
58 }
59 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
60 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
61 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
62 if err != nil {
63 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
64 return err
65 }
66 panic("invalid code path: exec succeeded but we're still here???")
67 }
68 }
69 if selfTest {
70 runtime.GC()
71 pprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
72 fmt.Println("self-test successful!")
73 os.Exit(0)
74 }
75
76 if len(os.Args) > 1 && os.Args[1] == "stream" {
77 if len(os.Args) != 3 {
78 fmt.Println("usage: streamplace stream [user]")
79 os.Exit(1)
80 }
81 return Stream(os.Args[2])
82 }
83
84 if len(os.Args) > 1 && os.Args[1] == "sign" {
85 return Sign(context.Background())
86 }
87
88 if len(os.Args) > 1 && os.Args[1] == "whep" {
89 return WHEP(os.Args[2:])
90 }
91 if len(os.Args) > 1 && os.Args[1] == "whip" {
92 return WHIP(os.Args[2:])
93 }
94
95 if len(os.Args) > 1 && os.Args[1] == "self-test" {
96 err := media.RunSelfTest(context.Background())
97 if err != nil {
98 fmt.Println(err.Error())
99 os.Exit(1)
100 }
101 fmt.Println("self-test successful!")
102 os.Exit(0)
103 }
104 flag.Set("logtostderr", "true")
105 vFlag := flag.Lookup("v")
106 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
107 cli := config.CLI{Build: build}
108 fs.StringVar(&cli.DataDir, "data-dir", config.DefaultDataDir(), "directory for keeping all streamplace data")
109 fs.StringVar(&cli.HttpAddr, "http-addr", ":38080", "Public HTTP address")
110 fs.StringVar(&cli.HttpInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
111 fs.StringVar(&cli.HttpsAddr, "https-addr", ":38443", "Public HTTPS address")
112 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
113 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
114 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
115 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
116 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file")
117 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
118 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
119 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
120 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
121 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
122 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
123 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
124 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
125 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
126 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
127 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
128 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
129 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
130 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
131 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
132 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
133 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
134 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
135 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
136 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
137 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
138 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
139 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
140 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
141 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
142 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
143 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
144 verbosity := fs.String("v", "3", "log verbosity level")
145 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
146 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
147 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
148 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
149 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
150 fs.BoolVar(&cli.ExternalSigning, "external-signing", false, "enable external signing via exec (prevents potential memory leak)")
151 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
152 version := fs.Bool("version", false, "print version and exit")
153
154 if runtime.GOOS == "linux" {
155 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
156 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
157 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
158 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
159 }
160
161 err = cli.Parse(
162 fs, os.Args[1:],
163 )
164 if err != nil {
165 return err
166 }
167 err = flag.CommandLine.Parse(nil)
168 if err != nil {
169 return err
170 }
171 vFlag.Value.Set(*verbosity)
172 log.SetColorLogger(cli.Color)
173 ctx := context.Background()
174 ctx = log.WithDebugValue(ctx, cli.Debug)
175
176 log.Log(ctx,
177 "streamplace",
178 "version", build.Version,
179 "buildTime", build.BuildTimeStr(),
180 "uuid", build.UUID,
181 "runtime.GOOS", runtime.GOOS,
182 "runtime.GOARCH", runtime.GOARCH,
183 "runtime.Version", runtime.Version())
184 if *version {
185 return nil
186 }
187 spmetrics.Version.WithLabelValues(build.Version).Inc()
188
189 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
190
191 err = os.MkdirAll(cli.DataDir, os.ModePerm)
192 if err != nil {
193 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
194 }
195 schema, err := v0.MakeV0Schema()
196 if err != nil {
197 return err
198 }
199 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
200 Schema: schema,
201 EthKeystorePath: cli.EthKeystorePath,
202 EthAccountAddr: cli.EthAccountAddr,
203 EthKeystorePassword: cli.EthPassword,
204 })
205 if err != nil {
206 return err
207 }
208 var signer crypto.Signer = eip712signer
209 if cli.PKCS11ModulePath != "" {
210 conf := &crypto11.Config{
211 Path: cli.PKCS11ModulePath,
212 }
213 count := 0
214 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
215 if val != "" {
216 count += 1
217 }
218 }
219 if count != 1 {
220 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
221 }
222 if cli.PKCS11TokenSlot != "" {
223 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
224 if err != nil {
225 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
226 }
227 numint := int(num)
228 // why does crypto11 want this as a reference? odd.
229 conf.SlotNumber = &numint
230 }
231 if cli.PKCS11TokenLabel != "" {
232 conf.TokenLabel = cli.PKCS11TokenLabel
233 }
234 if cli.PKCS11TokenSerial != "" {
235 conf.TokenSerial = cli.PKCS11TokenSerial
236 }
237 pin := cli.PKCS11Pin
238 if pin == "" {
239 fmt.Printf("Please enter PKCS11 PIN: ")
240 password, err := term.ReadPassword(int(os.Stdin.Fd()))
241 fmt.Println("")
242 if err != nil {
243 return fmt.Errorf("error reading PKCS11 password: %w", err)
244 }
245 pin = string(password)
246 }
247 conf.Pin = pin
248
249 sc, err := crypto11.Configure(conf)
250 if err != nil {
251 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
252 }
253 var id []byte = nil
254 var label []byte = nil
255 if cli.PKCS11KeypairID != "" {
256 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
257 if err != nil {
258 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
259 }
260 id = []byte{byte(num)}
261 }
262 if cli.PKCS11KeypairLabel != "" {
263 label = []byte(cli.PKCS11KeypairLabel)
264 }
265 hwsigner, err := sc.FindKeyPair(id, label)
266 if err != nil {
267 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
268 }
269 if hwsigner == nil {
270 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
271 }
272 addr, err := signers.HexAddrFromSigner(hwsigner)
273 if err != nil {
274 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
275 }
276 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
277 signer = hwsigner
278 }
279 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
280 mod, err := model.MakeDB(cli.DBPath)
281 if err != nil {
282 return err
283 }
284 var noter notifications.FirebaseNotifier
285 if cli.FirebaseServiceAccount != "" {
286 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
287 if err != nil {
288 return err
289 }
290 }
291 b := bus.NewBus()
292 atsync := &atproto.ATProtoSynchronizer{
293 CLI: &cli,
294 Model: mod,
295 Noter: noter,
296 Bus: b,
297 }
298 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
299 if err != nil {
300 return err
301 }
302
303 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
304 if err != nil {
305 return err
306 }
307
308 d := director.NewDirector(mm, mod, &cli, b)
309
310 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d)
311 if err != nil {
312 return err
313 }
314
315 group, ctx := TimeoutGroupWithContext(ctx)
316 ctx = log.WithLogValues(ctx, "version", build.Version)
317
318 group.Go(func() error {
319 return handleSignals(ctx)
320 })
321
322 if cli.TracingEndpoint != "" {
323 group.Go(func() error {
324 return startTelemetry(ctx, cli.TracingEndpoint)
325 })
326 }
327
328 if cli.Secure {
329 group.Go(func() error {
330 return a.ServeHTTPS(ctx)
331 })
332 group.Go(func() error {
333 return a.ServeHTTPRedirect(ctx)
334 })
335 } else {
336 group.Go(func() error {
337 return a.ServeHTTP(ctx)
338 })
339 }
340
341 group.Go(func() error {
342 return a.ServeInternalHTTP(ctx)
343 })
344
345 if !cli.NoFirehose {
346 group.Go(func() error {
347 return atsync.StartFirehose(ctx)
348 })
349 }
350
351 group.Go(func() error {
352 return spmetrics.ExpireSessions(ctx)
353 })
354
355 group.Go(func() error {
356 return mod.StartSegmentCleaner(ctx)
357 })
358
359 group.Go(func() error {
360 return d.Start(ctx)
361 })
362
363 if cli.TestStream {
364 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
365 Schema: schema,
366 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
367 })
368 if err != nil {
369 return err
370 }
371 atkey, err := atproto.ParsePubKey(signer.Public())
372 if err != nil {
373 return err
374 }
375 did := atkey.DIDKey()
376 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
377 if err != nil {
378 return err
379 }
380 err = mod.UpdateIdentity(&model.Identity{
381 ID: testMediaSigner.Pub().String(),
382 Handle: "stream-self-tester",
383 DID: "",
384 })
385 if err != nil {
386 return err
387 }
388 cli.AllowedStreams = append(cli.AllowedStreams, did)
389 a.Aliases["self-test"] = did
390 group.Go(func() error {
391 return mm.TestSource(ctx, testMediaSigner)
392 })
393 }
394
395 for _, job := range platformJobs {
396 group.Go(func() error {
397 return job(ctx, &cli)
398 })
399 }
400
401 if cli.WHIPTest != "" {
402 group.Go(func() error {
403 err := WHIP(strings.Split(cli.WHIPTest, " "))
404 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
405 time.Sleep(time.Second * 3)
406 // gst.Deinit()
407 log.Warn(ctx, "gst deinit complete, exiting")
408 return err
409 })
410 }
411
412 return group.Wait()
413}
414
415var ErrCaughtSignal = errors.New("caught signal")
416
417func handleSignals(ctx context.Context) error {
418 c := make(chan os.Signal, 1)
419 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
420 for {
421 select {
422 case s := <-c:
423 if s == syscall.SIGABRT {
424 pprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
425 }
426 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
427 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
428 case <-ctx.Done():
429 return nil
430 }
431 }
432}