Live video on the AT Protocol
1package cmd
2
3import (
4 "bytes"
5 "context"
6 "crypto"
7 "crypto/rand"
8 "errors"
9 "flag"
10 "fmt"
11 "net/url"
12 "os"
13 "os/signal"
14 "path/filepath"
15 "runtime"
16 "runtime/pprof"
17 "strconv"
18 "strings"
19 "syscall"
20 "time"
21
22 "github.com/bluesky-social/indigo/carstore"
23 "github.com/ethereum/go-ethereum/common/hexutil"
24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25 "github.com/peterbourgon/ff/v3"
26 "github.com/streamplace/oatproxy/pkg/oatproxy"
27 "golang.org/x/term"
28 "stream.place/streamplace/pkg/aqhttp"
29 "stream.place/streamplace/pkg/atproto"
30 "stream.place/streamplace/pkg/bus"
31 "stream.place/streamplace/pkg/crypto/signers"
32 "stream.place/streamplace/pkg/crypto/signers/eip712"
33 "stream.place/streamplace/pkg/director"
34 "stream.place/streamplace/pkg/log"
35 "stream.place/streamplace/pkg/media"
36 "stream.place/streamplace/pkg/notifications"
37 "stream.place/streamplace/pkg/replication/iroh_replicator"
38 "stream.place/streamplace/pkg/rtmps"
39 v0 "stream.place/streamplace/pkg/schema/v0"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/statedb"
42 "stream.place/streamplace/pkg/storage"
43
44 "github.com/ThalesGroup/crypto11"
45 _ "github.com/go-gst/go-glib/glib"
46 _ "github.com/go-gst/go-gst/gst"
47 "stream.place/streamplace/pkg/api"
48 "stream.place/streamplace/pkg/config"
49 "stream.place/streamplace/pkg/model"
50)
51
52// Additional jobs that can be injected by platforms
53type jobFunc func(ctx context.Context, cli *config.CLI) error
54
55// parse the CLI and fire up an streamplace node!
56func start(build *config.BuildFlags, platformJobs []jobFunc) error {
57 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
58 err := media.RunSelfTest(context.Background())
59 if err != nil {
60 if selfTest {
61 fmt.Println(err.Error())
62 os.Exit(1)
63 } else {
64 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
65 if retryCount >= 3 {
66 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
67 return err
68 }
69 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
70 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
71 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
72 if err != nil {
73 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
74 return err
75 }
76 panic("invalid code path: exec succeeded but we're still here???")
77 }
78 }
79 if selfTest {
80 runtime.GC()
81 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
82 log.Error(context.Background(), "error creating pprof", "error", err)
83 }
84 fmt.Println("self-test successful!")
85 os.Exit(0)
86 }
87
88 if len(os.Args) > 1 && os.Args[1] == "stream" {
89 if len(os.Args) != 3 {
90 fmt.Println("usage: streamplace stream [user]")
91 os.Exit(1)
92 }
93 return Stream(os.Args[2])
94 }
95
96 if len(os.Args) > 1 && os.Args[1] == "live" {
97 cli := config.CLI{Build: build}
98 fs := cli.NewFlagSet("streamplace live")
99
100 err := cli.Parse(fs, os.Args[2:])
101 if err != nil {
102 return err
103 }
104
105 args := fs.Args()
106 if len(args) != 1 {
107 fmt.Println("usage: streamplace live [flags] [stream-key]")
108 os.Exit(1)
109 }
110
111 return Live(args[0], cli.HTTPInternalAddr)
112 }
113
114 if len(os.Args) > 1 && os.Args[1] == "sign" {
115 return Sign(context.Background())
116 }
117
118 if len(os.Args) > 1 && os.Args[1] == "whep" {
119 return WHEP(os.Args[2:])
120 }
121 if len(os.Args) > 1 && os.Args[1] == "whip" {
122 return WHIP(os.Args[2:])
123 }
124
125 if len(os.Args) > 1 && os.Args[1] == "clip" {
126 cli := config.CLI{Build: build}
127 fs := cli.NewFlagSet("streamplace clip")
128 out := fs.String("out", "", "output file")
129
130 err := cli.Parse(fs, os.Args[2:])
131 if err != nil {
132 return err
133 }
134 ctx := context.Background()
135 ctx = log.WithDebugValue(ctx, cli.Debug)
136 return Clip(ctx, fs.Args(), *out)
137 }
138
139 if len(os.Args) > 1 && os.Args[1] == "self-test" {
140 err := media.RunSelfTest(context.Background())
141 if err != nil {
142 fmt.Println(err.Error())
143 os.Exit(1)
144 }
145 fmt.Println("self-test successful!")
146 os.Exit(0)
147 }
148
149 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
150 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
151 _ = starter.NewLivepeerConfig(lpfs)
152 err = ff.Parse(lpfs, os.Args[2:],
153 ff.WithConfigFileFlag("config"),
154 ff.WithEnvVarPrefix("LP"),
155 )
156 if err != nil {
157 return err
158 }
159 err = GoLivepeer(context.Background(), lpfs)
160 if err != nil {
161 log.Error(context.Background(), "error in livepeer", "error", err)
162 os.Exit(1)
163 }
164 os.Exit(0)
165 }
166
167 _ = flag.Set("logtostderr", "true")
168 vFlag := flag.Lookup("v")
169 cli := config.CLI{Build: build}
170 fs := cli.NewFlagSet("streamplace")
171 verbosity := fs.String("v", "3", "log verbosity level")
172 version := fs.Bool("version", false, "print version and exit")
173
174 err = cli.Parse(
175 fs, os.Args[1:],
176 )
177 if err != nil {
178 return err
179 }
180
181 err = flag.CommandLine.Parse(nil)
182 if err != nil {
183 return err
184 }
185 _ = vFlag.Value.Set(*verbosity)
186 log.SetColorLogger(cli.Color)
187 ctx := context.Background()
188 ctx = log.WithDebugValue(ctx, cli.Debug)
189
190 log.Log(ctx,
191 "streamplace",
192 "version", build.Version,
193 "buildTime", build.BuildTimeStr(),
194 "uuid", build.UUID,
195 "runtime.GOOS", runtime.GOOS,
196 "runtime.GOARCH", runtime.GOARCH,
197 "runtime.Version", runtime.Version())
198 if *version {
199 return nil
200 }
201
202 if len(os.Args) > 1 && os.Args[1] == "migrate" {
203 return statedb.Migrate(&cli)
204 }
205
206 spmetrics.Version.WithLabelValues(build.Version).Inc()
207 if cli.LivepeerHelp {
208 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
209 _ = starter.NewLivepeerConfig(lpFlags)
210 lpFlags.VisitAll(func(f *flag.Flag) {
211 adapted := config.ToSnakeCase(f.Name)
212 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
213 usage := fmt.Sprintf(" %s", f.Usage)
214 if f.DefValue != "" {
215 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
216 }
217 fmt.Printf(" %s\n", usage)
218 })
219 return nil
220 }
221
222 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
223
224 err = os.MkdirAll(cli.DataDir, os.ModePerm)
225 if err != nil {
226 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
227 }
228 schema, err := v0.MakeV0Schema()
229 if err != nil {
230 return err
231 }
232 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
233 Schema: schema,
234 EthKeystorePath: cli.EthKeystorePath,
235 EthAccountAddr: cli.EthAccountAddr,
236 EthKeystorePassword: cli.EthPassword,
237 })
238 if err != nil {
239 return err
240 }
241 var signer crypto.Signer = eip712signer
242 if cli.PKCS11ModulePath != "" {
243 conf := &crypto11.Config{
244 Path: cli.PKCS11ModulePath,
245 }
246 count := 0
247 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
248 if val != "" {
249 count += 1
250 }
251 }
252 if count != 1 {
253 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
254 }
255 if cli.PKCS11TokenSlot != "" {
256 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
257 if err != nil {
258 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
259 }
260 numint := int(num)
261 // why does crypto11 want this as a reference? odd.
262 conf.SlotNumber = &numint
263 }
264 if cli.PKCS11TokenLabel != "" {
265 conf.TokenLabel = cli.PKCS11TokenLabel
266 }
267 if cli.PKCS11TokenSerial != "" {
268 conf.TokenSerial = cli.PKCS11TokenSerial
269 }
270 pin := cli.PKCS11Pin
271 if pin == "" {
272 fmt.Printf("Please enter PKCS11 PIN: ")
273 password, err := term.ReadPassword(int(os.Stdin.Fd()))
274 fmt.Println("")
275 if err != nil {
276 return fmt.Errorf("error reading PKCS11 password: %w", err)
277 }
278 pin = string(password)
279 }
280 conf.Pin = pin
281
282 sc, err := crypto11.Configure(conf)
283 if err != nil {
284 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
285 }
286 var id []byte = nil
287 var label []byte = nil
288 if cli.PKCS11KeypairID != "" {
289 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
290 if err != nil {
291 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
292 }
293 id = []byte{byte(num)}
294 }
295 if cli.PKCS11KeypairLabel != "" {
296 label = []byte(cli.PKCS11KeypairLabel)
297 }
298 hwsigner, err := sc.FindKeyPair(id, label)
299 if err != nil {
300 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
301 }
302 if hwsigner == nil {
303 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
304 }
305 addr, err := signers.HexAddrFromSigner(hwsigner)
306 if err != nil {
307 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
308 }
309 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
310 signer = hwsigner
311 }
312
313 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
314 if err != nil {
315 return err
316 }
317 var noter notifications.FirebaseNotifier
318 if cli.FirebaseServiceAccount != "" {
319 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
320 if err != nil {
321 return err
322 }
323 }
324
325 group, ctx := TimeoutGroupWithContext(ctx)
326
327 out := carstore.SQLiteStore{}
328 err = out.Open(":memory:")
329 if err != nil {
330 return err
331 }
332 state, err := statedb.MakeDB(ctx, &cli, noter, mod)
333 if err != nil {
334 return err
335 }
336 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
337 if err != nil {
338 return err
339 }
340 defer handle.Close()
341
342 jwk, err := state.EnsureJWK(ctx, "jwk")
343 if err != nil {
344 return err
345 }
346 cli.JWK = jwk
347
348 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
349 if err != nil {
350 return err
351 }
352 cli.AccessJWK = accessJWK
353
354 b := bus.NewBus()
355 atsync := &atproto.ATProtoSynchronizer{
356 CLI: &cli,
357 Model: mod,
358 StatefulDB: state,
359 Noter: noter,
360 Bus: b,
361 }
362 err = atsync.Migrate(ctx)
363 if err != nil {
364 return fmt.Errorf("failed to migrate: %w", err)
365 }
366
367 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync)
368 if err != nil {
369 return err
370 }
371
372 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod)
373 if err != nil {
374 return err
375 }
376
377 var clientMetadata *oatproxy.OAuthClientMetadata
378 var host string
379 if cli.PublicOAuth {
380 u, err := url.Parse(cli.OwnPublicURL())
381 if err != nil {
382 return err
383 }
384 host = u.Host
385 clientMetadata = &oatproxy.OAuthClientMetadata{
386 Scope: "atproto transition:generic",
387 ClientName: "Streamplace",
388 RedirectURIs: []string{
389 fmt.Sprintf("%s/login", cli.OwnPublicURL()),
390 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()),
391 },
392 }
393 } else {
394 host = cli.BroadcasterHost
395 clientMetadata = &oatproxy.OAuthClientMetadata{
396 Scope: "atproto transition:generic",
397 ClientName: "Streamplace",
398 RedirectURIs: []string{
399 fmt.Sprintf("https://%s/login", cli.BroadcasterHost),
400 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost),
401 },
402 }
403 }
404
405 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"})
406 if err != nil {
407 return err
408 }
409 if !exists {
410 secret := make([]byte, 32)
411 _, err := rand.Read(secret)
412 if err != nil {
413 return fmt.Errorf("failed to generate random secret: %w", err)
414 }
415 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true)
416 if err != nil {
417 return err
418 }
419 }
420 buf := bytes.Buffer{}
421 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf)
422 if err != nil {
423 return err
424 }
425 secret := buf.Bytes()
426 var topic []byte
427 if cli.IrohTopic != "" {
428 topic, err = hexutil.Decode("0x" + cli.IrohTopic)
429 if err != nil {
430 return err
431 }
432 }
433 swarm, err := iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod)
434 if err != nil {
435 return err
436 }
437
438 op := oatproxy.New(&oatproxy.Config{
439 Host: host,
440 CreateOAuthSession: state.CreateOAuthSession,
441 UpdateOAuthSession: state.UpdateOAuthSession,
442 GetOAuthSession: state.LoadOAuthSession,
443 Lock: state.GetNamedLock,
444 Scope: "atproto transition:generic",
445 UpstreamJWK: cli.JWK,
446 DownstreamJWK: cli.AccessJWK,
447 ClientMetadata: clientMetadata,
448 Public: cli.PublicOAuth,
449 })
450 d := director.NewDirector(mm, mod, &cli, b, op, state, swarm)
451 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op)
452 if err != nil {
453 return err
454 }
455
456 ctx = log.WithLogValues(ctx, "version", build.Version)
457
458 group.Go(func() error {
459 return handleSignals(ctx)
460 })
461
462 group.Go(func() error {
463 return state.ProcessQueue(ctx)
464 })
465
466 if cli.TracingEndpoint != "" {
467 group.Go(func() error {
468 return startTelemetry(ctx, cli.TracingEndpoint)
469 })
470 }
471
472 if cli.Secure {
473 group.Go(func() error {
474 return a.ServeHTTPS(ctx)
475 })
476 group.Go(func() error {
477 return a.ServeHTTPRedirect(ctx)
478 })
479 if cli.RTMPServerAddon != "" {
480 group.Go(func() error {
481 return rtmps.ServeRTMPS(ctx, &cli)
482 })
483 }
484 } else {
485 group.Go(func() error {
486 return a.ServeHTTP(ctx)
487 })
488 }
489
490 group.Go(func() error {
491 return a.ServeInternalHTTP(ctx)
492 })
493
494 if !cli.NoFirehose {
495 group.Go(func() error {
496 return atsync.StartFirehose(ctx)
497 })
498 }
499 for _, labeler := range cli.Labelers {
500 group.Go(func() error {
501 return atsync.StartLabelerFirehose(ctx, labeler)
502 })
503 }
504
505 group.Go(func() error {
506 return a.ExpireSessions(ctx)
507 })
508
509 group.Go(func() error {
510 return storage.StartSegmentCleaner(ctx, mod, &cli)
511 })
512
513 group.Go(func() error {
514 return mod.StartSegmentCleaner(ctx)
515 })
516
517 group.Go(func() error {
518 return swarm.Start(ctx, cli.Tickets)
519 })
520
521 if cli.LivepeerGateway {
522 // make a file to make sure the directory exists
523 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
524 if err != nil {
525 return err
526 }
527 fd.Close()
528 if err != nil {
529 return err
530 }
531 group.Go(func() error {
532 err := GoLivepeer(ctx, fs)
533 if err != nil {
534 return err
535 }
536 // livepeer returns nil on error, so we need to check if we're responsible
537 if ctx.Err() == nil {
538 return fmt.Errorf("livepeer exited")
539 }
540 return nil
541 })
542 }
543
544 group.Go(func() error {
545 return d.Start(ctx)
546 })
547
548 if cli.TestStream {
549 // regular stream self-test
550 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
551 Schema: schema,
552 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
553 })
554 if err != nil {
555 return err
556 }
557 atkey, err := atproto.ParsePubKey(signer.Public())
558 if err != nil {
559 return err
560 }
561 did := atkey.DIDKey()
562 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner, mod)
563 if err != nil {
564 return err
565 }
566 err = mod.UpdateIdentity(&model.Identity{
567 ID: testMediaSigner.Pub().String(),
568 Handle: "stream-self-tester",
569 DID: "",
570 })
571 if err != nil {
572 return err
573 }
574 cli.AllowedStreams = append(cli.AllowedStreams, did)
575 a.Aliases["self-test"] = did
576 group.Go(func() error {
577 return mm.TestSource(ctx, testMediaSigner)
578 })
579
580 // Start a test stream that will run intermittently
581 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
582 Schema: schema,
583 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
584 })
585 if err != nil {
586 return err
587 }
588 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
589 if err != nil {
590 return err
591 }
592 did2 := atkey2.DIDKey()
593 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner, mod)
594 if err != nil {
595 return err
596 }
597 err = mod.UpdateIdentity(&model.Identity{
598 ID: intermittentMediaSigner.Pub().String(),
599 Handle: "stream-intermittent-tester",
600 DID: "",
601 })
602 if err != nil {
603 return err
604 }
605 cli.AllowedStreams = append(cli.AllowedStreams, did2)
606 a.Aliases["intermittent-self-test"] = did2
607
608 group.Go(func() error {
609 for {
610 // Start intermittent stream
611 intermittentCtx, cancel := context.WithCancel(ctx)
612 done := make(chan struct{})
613 go func() {
614 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
615 close(done)
616 }()
617 // Stream ON for 15 seconds
618 time.Sleep(15 * time.Second)
619 // Stop stream
620 cancel()
621 <-done // Wait for TestSource to exit
622 // Stream OFF for 15 seconds
623 time.Sleep(15 * time.Second)
624 }
625 })
626 }
627
628 for _, job := range platformJobs {
629 group.Go(func() error {
630 return job(ctx, &cli)
631 })
632 }
633
634 if cli.WHIPTest != "" {
635 group.Go(func() error {
636 err := WHIP(strings.Split(cli.WHIPTest, " "))
637 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
638 time.Sleep(time.Second * 3)
639 // gst.Deinit()
640 log.Warn(ctx, "gst deinit complete, exiting")
641 return err
642 })
643 }
644
645 return group.Wait()
646}
647
648var ErrCaughtSignal = errors.New("caught signal")
649
650func handleSignals(ctx context.Context) error {
651 c := make(chan os.Signal, 1)
652 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
653 for {
654 select {
655 case s := <-c:
656 if s == syscall.SIGABRT {
657 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
658 log.Error(ctx, "failed to create pprof", "error", err)
659 }
660 }
661 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
662 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
663 case <-ctx.Done():
664 return nil
665 }
666 }
667}