Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/bluesky-social/indigo/carstore"
20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
21 "github.com/peterbourgon/ff/v3"
22 "github.com/streamplace/oatproxy/pkg/oatproxy"
23 "golang.org/x/term"
24 "stream.place/streamplace/pkg/aqhttp"
25 "stream.place/streamplace/pkg/atproto"
26 "stream.place/streamplace/pkg/bus"
27 "stream.place/streamplace/pkg/crypto/signers"
28 "stream.place/streamplace/pkg/crypto/signers/eip712"
29 "stream.place/streamplace/pkg/director"
30 "stream.place/streamplace/pkg/log"
31 "stream.place/streamplace/pkg/media"
32 "stream.place/streamplace/pkg/notifications"
33 "stream.place/streamplace/pkg/replication"
34 "stream.place/streamplace/pkg/replication/boring"
35 "stream.place/streamplace/pkg/rtmps"
36 v0 "stream.place/streamplace/pkg/schema/v0"
37 "stream.place/streamplace/pkg/spmetrics"
38 "stream.place/streamplace/pkg/statedb"
39
40 "github.com/ThalesGroup/crypto11"
41 _ "github.com/go-gst/go-glib/glib"
42 _ "github.com/go-gst/go-gst/gst"
43 "stream.place/streamplace/pkg/api"
44 "stream.place/streamplace/pkg/config"
45 "stream.place/streamplace/pkg/model"
46)
47
48// Additional jobs that can be injected by platforms
49type jobFunc func(ctx context.Context, cli *config.CLI) error
50
51// parse the CLI and fire up an streamplace node!
52func start(build *config.BuildFlags, platformJobs []jobFunc) error {
53 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
54 err := media.RunSelfTest(context.Background())
55 if err != nil {
56 if selfTest {
57 fmt.Println(err.Error())
58 os.Exit(1)
59 } else {
60 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
61 if retryCount >= 3 {
62 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
63 return err
64 }
65 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
66 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
67 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
68 if err != nil {
69 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
70 return err
71 }
72 panic("invalid code path: exec succeeded but we're still here???")
73 }
74 }
75 if selfTest {
76 runtime.GC()
77 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
78 log.Error(context.Background(), "error creating pprof", "error", err)
79 }
80 fmt.Println("self-test successful!")
81 os.Exit(0)
82 }
83
84 if len(os.Args) > 1 && os.Args[1] == "stream" {
85 if len(os.Args) != 3 {
86 fmt.Println("usage: streamplace stream [user]")
87 os.Exit(1)
88 }
89 return Stream(os.Args[2])
90 }
91
92 if len(os.Args) > 1 && os.Args[1] == "live" {
93 cli := config.CLI{Build: build}
94 fs := cli.NewFlagSet("streamplace live")
95
96 err := cli.Parse(fs, os.Args[2:])
97 if err != nil {
98 return err
99 }
100
101 args := fs.Args()
102 if len(args) != 1 {
103 fmt.Println("usage: streamplace live [flags] [stream-key]")
104 os.Exit(1)
105 }
106
107 return Live(args[0], cli.HTTPInternalAddr)
108 }
109
110 if len(os.Args) > 1 && os.Args[1] == "sign" {
111 return Sign(context.Background())
112 }
113
114 if len(os.Args) > 1 && os.Args[1] == "whep" {
115 return WHEP(os.Args[2:])
116 }
117 if len(os.Args) > 1 && os.Args[1] == "whip" {
118 return WHIP(os.Args[2:])
119 }
120
121 if len(os.Args) > 1 && os.Args[1] == "clip" {
122 cli := config.CLI{Build: build}
123 fs := cli.NewFlagSet("streamplace clip")
124 out := fs.String("out", "", "output file")
125
126 err := cli.Parse(fs, os.Args[2:])
127 if err != nil {
128 return err
129 }
130 ctx := context.Background()
131 ctx = log.WithDebugValue(ctx, cli.Debug)
132 return Clip(ctx, fs.Args(), *out)
133 }
134
135 if len(os.Args) > 1 && os.Args[1] == "self-test" {
136 err := media.RunSelfTest(context.Background())
137 if err != nil {
138 fmt.Println(err.Error())
139 os.Exit(1)
140 }
141 fmt.Println("self-test successful!")
142 os.Exit(0)
143 }
144
145 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
146 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
147 _ = starter.NewLivepeerConfig(lpfs)
148 err = ff.Parse(lpfs, os.Args[2:],
149 ff.WithConfigFileFlag("config"),
150 ff.WithEnvVarPrefix("LP"),
151 )
152 if err != nil {
153 return err
154 }
155 err = GoLivepeer(context.Background(), lpfs)
156 if err != nil {
157 log.Error(context.Background(), "error in livepeer", "error", err)
158 os.Exit(1)
159 }
160 os.Exit(0)
161 }
162
163 _ = flag.Set("logtostderr", "true")
164 vFlag := flag.Lookup("v")
165 cli := config.CLI{Build: build}
166 fs := cli.NewFlagSet("streamplace")
167 verbosity := fs.String("v", "3", "log verbosity level")
168 version := fs.Bool("version", false, "print version and exit")
169
170 err = cli.Parse(
171 fs, os.Args[1:],
172 )
173 if err != nil {
174 return err
175 }
176 err = flag.CommandLine.Parse(nil)
177 if err != nil {
178 return err
179 }
180 _ = vFlag.Value.Set(*verbosity)
181 log.SetColorLogger(cli.Color)
182 ctx := context.Background()
183 ctx = log.WithDebugValue(ctx, cli.Debug)
184
185 log.Log(ctx,
186 "streamplace",
187 "version", build.Version,
188 "buildTime", build.BuildTimeStr(),
189 "uuid", build.UUID,
190 "runtime.GOOS", runtime.GOOS,
191 "runtime.GOARCH", runtime.GOARCH,
192 "runtime.Version", runtime.Version())
193 if *version {
194 return nil
195 }
196
197 if len(os.Args) > 1 && os.Args[1] == "migrate" {
198 return statedb.Migrate(&cli)
199 }
200
201 spmetrics.Version.WithLabelValues(build.Version).Inc()
202 if cli.LivepeerHelp {
203 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
204 _ = starter.NewLivepeerConfig(lpFlags)
205 lpFlags.VisitAll(func(f *flag.Flag) {
206 adapted := config.ToSnakeCase(f.Name)
207 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
208 usage := fmt.Sprintf(" %s", f.Usage)
209 if f.DefValue != "" {
210 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
211 }
212 fmt.Printf(" %s\n", usage)
213 })
214 return nil
215 }
216
217 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
218
219 err = os.MkdirAll(cli.DataDir, os.ModePerm)
220 if err != nil {
221 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
222 }
223 schema, err := v0.MakeV0Schema()
224 if err != nil {
225 return err
226 }
227 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
228 Schema: schema,
229 EthKeystorePath: cli.EthKeystorePath,
230 EthAccountAddr: cli.EthAccountAddr,
231 EthKeystorePassword: cli.EthPassword,
232 })
233 if err != nil {
234 return err
235 }
236 var signer crypto.Signer = eip712signer
237 if cli.PKCS11ModulePath != "" {
238 conf := &crypto11.Config{
239 Path: cli.PKCS11ModulePath,
240 }
241 count := 0
242 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
243 if val != "" {
244 count += 1
245 }
246 }
247 if count != 1 {
248 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
249 }
250 if cli.PKCS11TokenSlot != "" {
251 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
252 if err != nil {
253 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
254 }
255 numint := int(num)
256 // why does crypto11 want this as a reference? odd.
257 conf.SlotNumber = &numint
258 }
259 if cli.PKCS11TokenLabel != "" {
260 conf.TokenLabel = cli.PKCS11TokenLabel
261 }
262 if cli.PKCS11TokenSerial != "" {
263 conf.TokenSerial = cli.PKCS11TokenSerial
264 }
265 pin := cli.PKCS11Pin
266 if pin == "" {
267 fmt.Printf("Please enter PKCS11 PIN: ")
268 password, err := term.ReadPassword(int(os.Stdin.Fd()))
269 fmt.Println("")
270 if err != nil {
271 return fmt.Errorf("error reading PKCS11 password: %w", err)
272 }
273 pin = string(password)
274 }
275 conf.Pin = pin
276
277 sc, err := crypto11.Configure(conf)
278 if err != nil {
279 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
280 }
281 var id []byte = nil
282 var label []byte = nil
283 if cli.PKCS11KeypairID != "" {
284 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
285 if err != nil {
286 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
287 }
288 id = []byte{byte(num)}
289 }
290 if cli.PKCS11KeypairLabel != "" {
291 label = []byte(cli.PKCS11KeypairLabel)
292 }
293 hwsigner, err := sc.FindKeyPair(id, label)
294 if err != nil {
295 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
296 }
297 if hwsigner == nil {
298 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
299 }
300 addr, err := signers.HexAddrFromSigner(hwsigner)
301 if err != nil {
302 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
303 }
304 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
305 signer = hwsigner
306 }
307 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
308
309 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
310 if err != nil {
311 return err
312 }
313 var noter notifications.FirebaseNotifier
314 if cli.FirebaseServiceAccount != "" {
315 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
316 if err != nil {
317 return err
318 }
319 }
320
321 out := carstore.SQLiteStore{}
322 err = out.Open(":memory:")
323 if err != nil {
324 return err
325 }
326 state, err := statedb.MakeDB(&cli, noter, mod)
327 if err != nil {
328 return err
329 }
330 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
331 if err != nil {
332 return err
333 }
334 defer handle.Close()
335
336 jwk, err := state.EnsureJWK(ctx, "jwk")
337 if err != nil {
338 return err
339 }
340 cli.JWK = jwk
341
342 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
343 if err != nil {
344 return err
345 }
346 cli.AccessJWK = accessJWK
347
348 b := bus.NewBus()
349 atsync := &atproto.ATProtoSynchronizer{
350 CLI: &cli,
351 Model: mod,
352 StatefulDB: state,
353 Noter: noter,
354 Bus: b,
355 }
356 err = atsync.Migrate(ctx)
357 if err != nil {
358 return fmt.Errorf("failed to migrate: %w", err)
359 }
360
361 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
362 if err != nil {
363 return err
364 }
365
366 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
367 if err != nil {
368 return err
369 }
370
371 clientMetadata := &oatproxy.OAuthClientMetadata{
372 Scope: "atproto transition:generic",
373 ClientName: "Streamplace",
374 RedirectURIs: []string{
375 fmt.Sprintf("https://%s/login", cli.PublicHost),
376 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
377 },
378 }
379
380 op := oatproxy.New(&oatproxy.Config{
381 Host: cli.PublicHost,
382 CreateOAuthSession: state.CreateOAuthSession,
383 UpdateOAuthSession: state.UpdateOAuthSession,
384 GetOAuthSession: state.LoadOAuthSession,
385 Lock: state.GetNamedLock,
386 Scope: "atproto transition:generic",
387 UpstreamJWK: cli.JWK,
388 DownstreamJWK: cli.AccessJWK,
389 ClientMetadata: clientMetadata,
390 })
391 d := director.NewDirector(mm, mod, &cli, b, op, state)
392 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op)
393 if err != nil {
394 return err
395 }
396
397 group, ctx := TimeoutGroupWithContext(ctx)
398 ctx = log.WithLogValues(ctx, "version", build.Version)
399
400 group.Go(func() error {
401 return handleSignals(ctx)
402 })
403
404 group.Go(func() error {
405 return state.ProcessQueue(ctx)
406 })
407
408 if cli.TracingEndpoint != "" {
409 group.Go(func() error {
410 return startTelemetry(ctx, cli.TracingEndpoint)
411 })
412 }
413
414 if cli.Secure {
415 group.Go(func() error {
416 return a.ServeHTTPS(ctx)
417 })
418 group.Go(func() error {
419 return a.ServeHTTPRedirect(ctx)
420 })
421 if cli.RTMPServerAddon != "" {
422 group.Go(func() error {
423 return rtmps.ServeRTMPS(ctx, &cli)
424 })
425 }
426 } else {
427 group.Go(func() error {
428 return a.ServeHTTP(ctx)
429 })
430 }
431
432 group.Go(func() error {
433 return a.ServeInternalHTTP(ctx)
434 })
435
436 if !cli.NoFirehose {
437 group.Go(func() error {
438 return atsync.StartFirehose(ctx)
439 })
440 }
441 for _, labeler := range cli.Labelers {
442 group.Go(func() error {
443 return atsync.StartLabelerFirehose(ctx, labeler)
444 })
445 }
446
447 group.Go(func() error {
448 return spmetrics.ExpireSessions(ctx)
449 })
450
451 group.Go(func() error {
452 return mod.StartSegmentCleaner(ctx)
453 })
454
455 if cli.LivepeerGateway {
456 // make a file to make sure the directory exists
457 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
458 if err != nil {
459 return err
460 }
461 fd.Close()
462 if err != nil {
463 return err
464 }
465 group.Go(func() error {
466 return GoLivepeer(ctx, fs)
467 })
468 }
469
470 group.Go(func() error {
471 return d.Start(ctx)
472 })
473
474 if cli.TestStream {
475 // regular stream self-test
476 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
477 Schema: schema,
478 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
479 })
480 if err != nil {
481 return err
482 }
483 atkey, err := atproto.ParsePubKey(signer.Public())
484 if err != nil {
485 return err
486 }
487 did := atkey.DIDKey()
488 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
489 if err != nil {
490 return err
491 }
492 err = mod.UpdateIdentity(&model.Identity{
493 ID: testMediaSigner.Pub().String(),
494 Handle: "stream-self-tester",
495 DID: "",
496 })
497 if err != nil {
498 return err
499 }
500 cli.AllowedStreams = append(cli.AllowedStreams, did)
501 a.Aliases["self-test"] = did
502 group.Go(func() error {
503 return mm.TestSource(ctx, testMediaSigner)
504 })
505
506 // Start a test stream that will run intermittently
507 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
508 Schema: schema,
509 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
510 })
511 if err != nil {
512 return err
513 }
514 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
515 if err != nil {
516 return err
517 }
518 did2 := atkey2.DIDKey()
519 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner)
520 if err != nil {
521 return err
522 }
523 err = mod.UpdateIdentity(&model.Identity{
524 ID: intermittentMediaSigner.Pub().String(),
525 Handle: "stream-intermittent-tester",
526 DID: "",
527 })
528 if err != nil {
529 return err
530 }
531 cli.AllowedStreams = append(cli.AllowedStreams, did2)
532 a.Aliases["intermittent-self-test"] = did2
533
534 group.Go(func() error {
535 for {
536 // Start intermittent stream
537 intermittentCtx, cancel := context.WithCancel(ctx)
538 done := make(chan struct{})
539 go func() {
540 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
541 close(done)
542 }()
543 // Stream ON for 15 seconds
544 time.Sleep(15 * time.Second)
545 // Stop stream
546 cancel()
547 <-done // Wait for TestSource to exit
548 // Stream OFF for 15 seconds
549 time.Sleep(15 * time.Second)
550 }
551 })
552 }
553
554 for _, job := range platformJobs {
555 group.Go(func() error {
556 return job(ctx, &cli)
557 })
558 }
559
560 if cli.WHIPTest != "" {
561 group.Go(func() error {
562 err := WHIP(strings.Split(cli.WHIPTest, " "))
563 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
564 time.Sleep(time.Second * 3)
565 // gst.Deinit()
566 log.Warn(ctx, "gst deinit complete, exiting")
567 return err
568 })
569 }
570
571 return group.Wait()
572}
573
574var ErrCaughtSignal = errors.New("caught signal")
575
576func handleSignals(ctx context.Context) error {
577 c := make(chan os.Signal, 1)
578 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
579 for {
580 select {
581 case s := <-c:
582 if s == syscall.SIGABRT {
583 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
584 log.Error(ctx, "failed to create pprof", "error", err)
585 }
586 }
587 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
588 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
589 case <-ctx.Done():
590 return nil
591 }
592 }
593}