Live video on the AT Protocol
1package cmd
2
3import (
4 "bytes"
5 "context"
6 "crypto"
7 "crypto/rand"
8 "errors"
9 "flag"
10 "fmt"
11 "net/url"
12 "os"
13 "os/signal"
14 "path/filepath"
15 "runtime"
16 "runtime/pprof"
17 "strconv"
18 "strings"
19 "syscall"
20 "time"
21
22 "github.com/bluesky-social/indigo/carstore"
23 "github.com/ethereum/go-ethereum/common/hexutil"
24 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25 "github.com/peterbourgon/ff/v3"
26 "github.com/streamplace/oatproxy/pkg/oatproxy"
27 "golang.org/x/term"
28 "stream.place/streamplace/pkg/aqhttp"
29 "stream.place/streamplace/pkg/atproto"
30 "stream.place/streamplace/pkg/bus"
31 "stream.place/streamplace/pkg/crypto/signers"
32 "stream.place/streamplace/pkg/crypto/signers/eip712"
33 "stream.place/streamplace/pkg/director"
34 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
35 "stream.place/streamplace/pkg/log"
36 "stream.place/streamplace/pkg/media"
37 "stream.place/streamplace/pkg/notifications"
38 "stream.place/streamplace/pkg/replication/iroh_replicator"
39 "stream.place/streamplace/pkg/rtmps"
40 v0 "stream.place/streamplace/pkg/schema/v0"
41 "stream.place/streamplace/pkg/spmetrics"
42 "stream.place/streamplace/pkg/statedb"
43 "stream.place/streamplace/pkg/storage"
44
45 "github.com/ThalesGroup/crypto11"
46 _ "github.com/go-gst/go-glib/glib"
47 _ "github.com/go-gst/go-gst/gst"
48 "stream.place/streamplace/pkg/api"
49 "stream.place/streamplace/pkg/config"
50 "stream.place/streamplace/pkg/model"
51)
52
53// Additional jobs that can be injected by platforms
54type jobFunc func(ctx context.Context, cli *config.CLI) error
55
56// parse the CLI and fire up an streamplace node!
57func start(build *config.BuildFlags, platformJobs []jobFunc) error {
58 iroh_streamplace.InitLogging()
59 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
60 err := media.RunSelfTest(context.Background())
61 if err != nil {
62 if selfTest {
63 fmt.Println(err.Error())
64 os.Exit(1)
65 } else {
66 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
67 if retryCount >= 3 {
68 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
69 return err
70 }
71 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
72 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
73 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
74 if err != nil {
75 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
76 return err
77 }
78 panic("invalid code path: exec succeeded but we're still here???")
79 }
80 }
81 if selfTest {
82 runtime.GC()
83 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
84 log.Error(context.Background(), "error creating pprof", "error", err)
85 }
86 fmt.Println("self-test successful!")
87 os.Exit(0)
88 }
89
90 if len(os.Args) > 1 && os.Args[1] == "stream" {
91 if len(os.Args) != 3 {
92 fmt.Println("usage: streamplace stream [user]")
93 os.Exit(1)
94 }
95 return Stream(os.Args[2])
96 }
97
98 if len(os.Args) > 1 && os.Args[1] == "live" {
99 cli := config.CLI{Build: build}
100 fs := cli.NewFlagSet("streamplace live")
101
102 err := cli.Parse(fs, os.Args[2:])
103 if err != nil {
104 return err
105 }
106
107 args := fs.Args()
108 if len(args) != 1 {
109 fmt.Println("usage: streamplace live [flags] [stream-key]")
110 os.Exit(1)
111 }
112
113 return Live(args[0], cli.HTTPInternalAddr)
114 }
115
116 if len(os.Args) > 1 && os.Args[1] == "sign" {
117 return Sign(context.Background())
118 }
119
120 if len(os.Args) > 1 && os.Args[1] == "whep" {
121 return WHEP(os.Args[2:])
122 }
123 if len(os.Args) > 1 && os.Args[1] == "whip" {
124 return WHIP(os.Args[2:])
125 }
126
127 if len(os.Args) > 1 && os.Args[1] == "clip" {
128 cli := config.CLI{Build: build}
129 fs := cli.NewFlagSet("streamplace clip")
130 out := fs.String("out", "", "output file")
131
132 err := cli.Parse(fs, os.Args[2:])
133 if err != nil {
134 return err
135 }
136 ctx := context.Background()
137 ctx = log.WithDebugValue(ctx, cli.Debug)
138 return Clip(ctx, fs.Args(), *out)
139 }
140
141 if len(os.Args) > 1 && os.Args[1] == "self-test" {
142 err := media.RunSelfTest(context.Background())
143 if err != nil {
144 fmt.Println(err.Error())
145 os.Exit(1)
146 }
147 fmt.Println("self-test successful!")
148 os.Exit(0)
149 }
150
151 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
152 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
153 _ = starter.NewLivepeerConfig(lpfs)
154 err = ff.Parse(lpfs, os.Args[2:],
155 ff.WithConfigFileFlag("config"),
156 ff.WithEnvVarPrefix("LP"),
157 )
158 if err != nil {
159 return err
160 }
161 err = GoLivepeer(context.Background(), lpfs)
162 if err != nil {
163 log.Error(context.Background(), "error in livepeer", "error", err)
164 os.Exit(1)
165 }
166 os.Exit(0)
167 }
168
169 _ = flag.Set("logtostderr", "true")
170 vFlag := flag.Lookup("v")
171 cli := config.CLI{Build: build}
172 fs := cli.NewFlagSet("streamplace")
173 verbosity := fs.String("v", "3", "log verbosity level")
174 version := fs.Bool("version", false, "print version and exit")
175
176 err = cli.Parse(
177 fs, os.Args[1:],
178 )
179 if err != nil {
180 return err
181 }
182
183 err = flag.CommandLine.Parse(nil)
184 if err != nil {
185 return err
186 }
187 _ = vFlag.Value.Set(*verbosity)
188 log.SetColorLogger(cli.Color)
189 ctx := context.Background()
190 ctx = log.WithDebugValue(ctx, cli.Debug)
191
192 log.Log(ctx,
193 "streamplace",
194 "version", build.Version,
195 "buildTime", build.BuildTimeStr(),
196 "uuid", build.UUID,
197 "runtime.GOOS", runtime.GOOS,
198 "runtime.GOARCH", runtime.GOARCH,
199 "runtime.Version", runtime.Version())
200 if *version {
201 return nil
202 }
203
204 if len(os.Args) > 1 && os.Args[1] == "migrate" {
205 return statedb.Migrate(&cli)
206 }
207
208 spmetrics.Version.WithLabelValues(build.Version).Inc()
209 if cli.LivepeerHelp {
210 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
211 _ = starter.NewLivepeerConfig(lpFlags)
212 lpFlags.VisitAll(func(f *flag.Flag) {
213 adapted := config.ToSnakeCase(f.Name)
214 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
215 usage := fmt.Sprintf(" %s", f.Usage)
216 if f.DefValue != "" {
217 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
218 }
219 fmt.Printf(" %s\n", usage)
220 })
221 return nil
222 }
223
224 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
225
226 err = os.MkdirAll(cli.DataDir, os.ModePerm)
227 if err != nil {
228 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
229 }
230 schema, err := v0.MakeV0Schema()
231 if err != nil {
232 return err
233 }
234 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
235 Schema: schema,
236 EthKeystorePath: cli.EthKeystorePath,
237 EthAccountAddr: cli.EthAccountAddr,
238 EthKeystorePassword: cli.EthPassword,
239 })
240 if err != nil {
241 return err
242 }
243 var signer crypto.Signer = eip712signer
244 if cli.PKCS11ModulePath != "" {
245 conf := &crypto11.Config{
246 Path: cli.PKCS11ModulePath,
247 }
248 count := 0
249 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
250 if val != "" {
251 count += 1
252 }
253 }
254 if count != 1 {
255 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
256 }
257 if cli.PKCS11TokenSlot != "" {
258 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
259 if err != nil {
260 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
261 }
262 numint := int(num)
263 // why does crypto11 want this as a reference? odd.
264 conf.SlotNumber = &numint
265 }
266 if cli.PKCS11TokenLabel != "" {
267 conf.TokenLabel = cli.PKCS11TokenLabel
268 }
269 if cli.PKCS11TokenSerial != "" {
270 conf.TokenSerial = cli.PKCS11TokenSerial
271 }
272 pin := cli.PKCS11Pin
273 if pin == "" {
274 fmt.Printf("Please enter PKCS11 PIN: ")
275 password, err := term.ReadPassword(int(os.Stdin.Fd()))
276 fmt.Println("")
277 if err != nil {
278 return fmt.Errorf("error reading PKCS11 password: %w", err)
279 }
280 pin = string(password)
281 }
282 conf.Pin = pin
283
284 sc, err := crypto11.Configure(conf)
285 if err != nil {
286 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
287 }
288 var id []byte = nil
289 var label []byte = nil
290 if cli.PKCS11KeypairID != "" {
291 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
292 if err != nil {
293 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
294 }
295 id = []byte{byte(num)}
296 }
297 if cli.PKCS11KeypairLabel != "" {
298 label = []byte(cli.PKCS11KeypairLabel)
299 }
300 hwsigner, err := sc.FindKeyPair(id, label)
301 if err != nil {
302 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
303 }
304 if hwsigner == nil {
305 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
306 }
307 addr, err := signers.HexAddrFromSigner(hwsigner)
308 if err != nil {
309 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
310 }
311 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
312 signer = hwsigner
313 }
314
315 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
316 if err != nil {
317 return err
318 }
319 var noter notifications.FirebaseNotifier
320 if cli.FirebaseServiceAccount != "" {
321 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
322 if err != nil {
323 return err
324 }
325 }
326
327 group, ctx := TimeoutGroupWithContext(ctx)
328
329 out := carstore.SQLiteStore{}
330 err = out.Open(":memory:")
331 if err != nil {
332 return err
333 }
334 state, err := statedb.MakeDB(ctx, &cli, noter, mod)
335 if err != nil {
336 return err
337 }
338 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
339 if err != nil {
340 return err
341 }
342 defer handle.Close()
343
344 jwk, err := state.EnsureJWK(ctx, "jwk")
345 if err != nil {
346 return err
347 }
348 cli.JWK = jwk
349
350 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
351 if err != nil {
352 return err
353 }
354 cli.AccessJWK = accessJWK
355
356 b := bus.NewBus()
357 atsync := &atproto.ATProtoSynchronizer{
358 CLI: &cli,
359 Model: mod,
360 StatefulDB: state,
361 Noter: noter,
362 Bus: b,
363 }
364 err = atsync.Migrate(ctx)
365 if err != nil {
366 return fmt.Errorf("failed to migrate: %w", err)
367 }
368
369 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync)
370 if err != nil {
371 return err
372 }
373
374 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod)
375 if err != nil {
376 return err
377 }
378
379 var clientMetadata *oatproxy.OAuthClientMetadata
380 var host string
381 if cli.PublicOAuth {
382 u, err := url.Parse(cli.OwnPublicURL())
383 if err != nil {
384 return err
385 }
386 host = u.Host
387 clientMetadata = &oatproxy.OAuthClientMetadata{
388 Scope: "atproto transition:generic",
389 ClientName: "Streamplace",
390 RedirectURIs: []string{
391 fmt.Sprintf("%s/login", cli.OwnPublicURL()),
392 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()),
393 },
394 }
395 } else {
396 host = cli.BroadcasterHost
397 clientMetadata = &oatproxy.OAuthClientMetadata{
398 Scope: "atproto transition:generic",
399 ClientName: "Streamplace",
400 RedirectURIs: []string{
401 fmt.Sprintf("https://%s/login", cli.BroadcasterHost),
402 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost),
403 },
404 }
405 }
406
407 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"})
408 if err != nil {
409 return err
410 }
411 if !exists {
412 secret := make([]byte, 32)
413 _, err := rand.Read(secret)
414 if err != nil {
415 return fmt.Errorf("failed to generate random secret: %w", err)
416 }
417 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true)
418 if err != nil {
419 return err
420 }
421 }
422 buf := bytes.Buffer{}
423 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf)
424 if err != nil {
425 return err
426 }
427 secret := buf.Bytes()
428 var topic []byte
429 if cli.IrohTopic != "" {
430 topic, err = hexutil.Decode("0x" + cli.IrohTopic)
431 if err != nil {
432 return err
433 }
434 }
435 swarm, err := iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod)
436 if err != nil {
437 return err
438 }
439
440 op := oatproxy.New(&oatproxy.Config{
441 Host: host,
442 CreateOAuthSession: state.CreateOAuthSession,
443 UpdateOAuthSession: state.UpdateOAuthSession,
444 GetOAuthSession: state.LoadOAuthSession,
445 Lock: state.GetNamedLock,
446 Scope: "atproto transition:generic",
447 UpstreamJWK: cli.JWK,
448 DownstreamJWK: cli.AccessJWK,
449 ClientMetadata: clientMetadata,
450 Public: cli.PublicOAuth,
451 })
452 d := director.NewDirector(mm, mod, &cli, b, op, state, swarm)
453 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op)
454 if err != nil {
455 return err
456 }
457
458 ctx = log.WithLogValues(ctx, "version", build.Version)
459
460 group.Go(func() error {
461 return handleSignals(ctx)
462 })
463
464 group.Go(func() error {
465 return state.ProcessQueue(ctx)
466 })
467
468 if cli.TracingEndpoint != "" {
469 group.Go(func() error {
470 return startTelemetry(ctx, cli.TracingEndpoint)
471 })
472 }
473
474 if cli.Secure {
475 group.Go(func() error {
476 return a.ServeHTTPS(ctx)
477 })
478 group.Go(func() error {
479 return a.ServeHTTPRedirect(ctx)
480 })
481 if cli.RTMPServerAddon != "" {
482 group.Go(func() error {
483 return rtmps.ServeRTMPS(ctx, &cli)
484 })
485 }
486 } else {
487 group.Go(func() error {
488 return a.ServeHTTP(ctx)
489 })
490 }
491
492 group.Go(func() error {
493 return a.ServeInternalHTTP(ctx)
494 })
495
496 if !cli.NoFirehose {
497 group.Go(func() error {
498 return atsync.StartFirehose(ctx)
499 })
500 }
501 for _, labeler := range cli.Labelers {
502 group.Go(func() error {
503 return atsync.StartLabelerFirehose(ctx, labeler)
504 })
505 }
506
507 group.Go(func() error {
508 return a.ExpireSessions(ctx)
509 })
510
511 group.Go(func() error {
512 return storage.StartSegmentCleaner(ctx, mod, &cli)
513 })
514
515 group.Go(func() error {
516 return mod.StartSegmentCleaner(ctx)
517 })
518
519 group.Go(func() error {
520 return swarm.Start(ctx, cli.Tickets)
521 })
522
523 if cli.LivepeerGateway {
524 // make a file to make sure the directory exists
525 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
526 if err != nil {
527 return err
528 }
529 fd.Close()
530 if err != nil {
531 return err
532 }
533 group.Go(func() error {
534 err := GoLivepeer(ctx, fs)
535 if err != nil {
536 return err
537 }
538 // livepeer returns nil on error, so we need to check if we're responsible
539 if ctx.Err() == nil {
540 return fmt.Errorf("livepeer exited")
541 }
542 return nil
543 })
544 }
545
546 group.Go(func() error {
547 return d.Start(ctx)
548 })
549
550 if cli.TestStream {
551 // regular stream self-test
552 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
553 Schema: schema,
554 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
555 })
556 if err != nil {
557 return err
558 }
559 atkey, err := atproto.ParsePubKey(signer.Public())
560 if err != nil {
561 return err
562 }
563 did := atkey.DIDKey()
564 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner, mod)
565 if err != nil {
566 return err
567 }
568 err = mod.UpdateIdentity(&model.Identity{
569 ID: testMediaSigner.Pub().String(),
570 Handle: "stream-self-tester",
571 DID: "",
572 })
573 if err != nil {
574 return err
575 }
576 cli.AllowedStreams = append(cli.AllowedStreams, did)
577 a.Aliases["self-test"] = did
578 group.Go(func() error {
579 return mm.TestSource(ctx, testMediaSigner)
580 })
581
582 // Start a test stream that will run intermittently
583 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
584 Schema: schema,
585 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
586 })
587 if err != nil {
588 return err
589 }
590 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
591 if err != nil {
592 return err
593 }
594 did2 := atkey2.DIDKey()
595 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner, mod)
596 if err != nil {
597 return err
598 }
599 err = mod.UpdateIdentity(&model.Identity{
600 ID: intermittentMediaSigner.Pub().String(),
601 Handle: "stream-intermittent-tester",
602 DID: "",
603 })
604 if err != nil {
605 return err
606 }
607 cli.AllowedStreams = append(cli.AllowedStreams, did2)
608 a.Aliases["intermittent-self-test"] = did2
609
610 group.Go(func() error {
611 for {
612 // Start intermittent stream
613 intermittentCtx, cancel := context.WithCancel(ctx)
614 done := make(chan struct{})
615 go func() {
616 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
617 close(done)
618 }()
619 // Stream ON for 15 seconds
620 time.Sleep(15 * time.Second)
621 // Stop stream
622 cancel()
623 <-done // Wait for TestSource to exit
624 // Stream OFF for 15 seconds
625 time.Sleep(15 * time.Second)
626 }
627 })
628 }
629
630 for _, job := range platformJobs {
631 group.Go(func() error {
632 return job(ctx, &cli)
633 })
634 }
635
636 if cli.WHIPTest != "" {
637 group.Go(func() error {
638 err := WHIP(strings.Split(cli.WHIPTest, " "))
639 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
640 time.Sleep(time.Second * 3)
641 // gst.Deinit()
642 log.Warn(ctx, "gst deinit complete, exiting")
643 return err
644 })
645 }
646
647 return group.Wait()
648}
649
650var ErrCaughtSignal = errors.New("caught signal")
651
652func handleSignals(ctx context.Context) error {
653 c := make(chan os.Signal, 1)
654 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
655 for {
656 select {
657 case s := <-c:
658 if s == syscall.SIGABRT {
659 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
660 log.Error(ctx, "failed to create pprof", "error", err)
661 }
662 }
663 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
664 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
665 case <-ctx.Done():
666 return nil
667 }
668 }
669}