Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/bluesky-social/indigo/carstore"
20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
21 "github.com/peterbourgon/ff/v3"
22 "github.com/streamplace/oatproxy/pkg/oatproxy"
23 "golang.org/x/term"
24 "stream.place/streamplace/pkg/aqhttp"
25 "stream.place/streamplace/pkg/atproto"
26 "stream.place/streamplace/pkg/bus"
27 "stream.place/streamplace/pkg/crypto/signers"
28 "stream.place/streamplace/pkg/crypto/signers/eip712"
29 "stream.place/streamplace/pkg/director"
30 "stream.place/streamplace/pkg/log"
31 "stream.place/streamplace/pkg/media"
32 "stream.place/streamplace/pkg/notifications"
33 "stream.place/streamplace/pkg/replication"
34 "stream.place/streamplace/pkg/replication/boring"
35 "stream.place/streamplace/pkg/rtmps"
36 v0 "stream.place/streamplace/pkg/schema/v0"
37 "stream.place/streamplace/pkg/spmetrics"
38 "stream.place/streamplace/pkg/statedb"
39
40 "github.com/ThalesGroup/crypto11"
41 _ "github.com/go-gst/go-glib/glib"
42 _ "github.com/go-gst/go-gst/gst"
43 "stream.place/streamplace/pkg/api"
44 "stream.place/streamplace/pkg/config"
45 "stream.place/streamplace/pkg/model"
46
47 _ "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
48)
49
50// Additional jobs that can be injected by platforms
51type jobFunc func(ctx context.Context, cli *config.CLI) error
52
53// parse the CLI and fire up an streamplace node!
54func start(build *config.BuildFlags, platformJobs []jobFunc) error {
55 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
56 err := media.RunSelfTest(context.Background())
57 if err != nil {
58 if selfTest {
59 fmt.Println(err.Error())
60 os.Exit(1)
61 } else {
62 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
63 if retryCount >= 3 {
64 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
65 return err
66 }
67 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
68 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
69 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
70 if err != nil {
71 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
72 return err
73 }
74 panic("invalid code path: exec succeeded but we're still here???")
75 }
76 }
77 if selfTest {
78 runtime.GC()
79 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
80 log.Error(context.Background(), "error creating pprof", "error", err)
81 }
82 fmt.Println("self-test successful!")
83 os.Exit(0)
84 }
85
86 if len(os.Args) > 1 && os.Args[1] == "stream" {
87 if len(os.Args) != 3 {
88 fmt.Println("usage: streamplace stream [user]")
89 os.Exit(1)
90 }
91 return Stream(os.Args[2])
92 }
93
94 if len(os.Args) > 1 && os.Args[1] == "live" {
95 cli := config.CLI{Build: build}
96 fs := cli.NewFlagSet("streamplace live")
97
98 err := cli.Parse(fs, os.Args[2:])
99 if err != nil {
100 return err
101 }
102
103 args := fs.Args()
104 if len(args) != 1 {
105 fmt.Println("usage: streamplace live [flags] [stream-key]")
106 os.Exit(1)
107 }
108
109 return Live(args[0], cli.HTTPInternalAddr)
110 }
111
112 if len(os.Args) > 1 && os.Args[1] == "sign" {
113 return Sign(context.Background())
114 }
115
116 if len(os.Args) > 1 && os.Args[1] == "whep" {
117 return WHEP(os.Args[2:])
118 }
119 if len(os.Args) > 1 && os.Args[1] == "whip" {
120 return WHIP(os.Args[2:])
121 }
122
123 if len(os.Args) > 1 && os.Args[1] == "clip" {
124 cli := config.CLI{Build: build}
125 fs := cli.NewFlagSet("streamplace clip")
126 out := fs.String("out", "", "output file")
127
128 err := cli.Parse(fs, os.Args[2:])
129 if err != nil {
130 return err
131 }
132 ctx := context.Background()
133 ctx = log.WithDebugValue(ctx, cli.Debug)
134 return Clip(ctx, fs.Args(), *out)
135 }
136
137 if len(os.Args) > 1 && os.Args[1] == "segment" {
138 cli := config.CLI{Build: build}
139 fs := cli.NewFlagSet("streamplace clip")
140 out := fs.String("out-dir", "", "output directory")
141
142 err := cli.Parse(fs, os.Args[2:])
143 if err != nil {
144 return err
145 }
146 ctx := context.Background()
147 ctx = log.WithDebugValue(ctx, cli.Debug)
148 return media.SegmentFile(ctx, fs.Args()[0], *out)
149 }
150
151 if len(os.Args) > 1 && os.Args[1] == "self-test" {
152 err := media.RunSelfTest(context.Background())
153 if err != nil {
154 fmt.Println(err.Error())
155 os.Exit(1)
156 }
157 fmt.Println("self-test successful!")
158 os.Exit(0)
159 }
160
161 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
162 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
163 _ = starter.NewLivepeerConfig(lpfs)
164 err = ff.Parse(lpfs, os.Args[2:],
165 ff.WithConfigFileFlag("config"),
166 ff.WithEnvVarPrefix("LP"),
167 )
168 if err != nil {
169 return err
170 }
171 err = GoLivepeer(context.Background(), lpfs)
172 if err != nil {
173 log.Error(context.Background(), "error in livepeer", "error", err)
174 os.Exit(1)
175 }
176 os.Exit(0)
177 }
178
179 _ = flag.Set("logtostderr", "true")
180 vFlag := flag.Lookup("v")
181 cli := config.CLI{Build: build}
182 fs := cli.NewFlagSet("streamplace")
183 verbosity := fs.String("v", "3", "log verbosity level")
184 version := fs.Bool("version", false, "print version and exit")
185
186 err = cli.Parse(
187 fs, os.Args[1:],
188 )
189 if err != nil {
190 return err
191 }
192 err = flag.CommandLine.Parse(nil)
193 if err != nil {
194 return err
195 }
196 _ = vFlag.Value.Set(*verbosity)
197 log.SetColorLogger(cli.Color)
198 ctx := context.Background()
199 ctx = log.WithDebugValue(ctx, cli.Debug)
200
201 log.Log(ctx,
202 "streamplace",
203 "version", build.Version,
204 "buildTime", build.BuildTimeStr(),
205 "uuid", build.UUID,
206 "runtime.GOOS", runtime.GOOS,
207 "runtime.GOARCH", runtime.GOARCH,
208 "runtime.Version", runtime.Version())
209 if *version {
210 return nil
211 }
212
213 if len(os.Args) > 1 && os.Args[1] == "migrate" {
214 return statedb.Migrate(&cli)
215 }
216
217 spmetrics.Version.WithLabelValues(build.Version).Inc()
218 if cli.LivepeerHelp {
219 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
220 _ = starter.NewLivepeerConfig(lpFlags)
221 lpFlags.VisitAll(func(f *flag.Flag) {
222 adapted := config.ToSnakeCase(f.Name)
223 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
224 usage := fmt.Sprintf(" %s", f.Usage)
225 if f.DefValue != "" {
226 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
227 }
228 fmt.Printf(" %s\n", usage)
229 })
230 return nil
231 }
232
233 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
234
235 err = os.MkdirAll(cli.DataDir, os.ModePerm)
236 if err != nil {
237 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
238 }
239 schema, err := v0.MakeV0Schema()
240 if err != nil {
241 return err
242 }
243 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
244 Schema: schema,
245 EthKeystorePath: cli.EthKeystorePath,
246 EthAccountAddr: cli.EthAccountAddr,
247 EthKeystorePassword: cli.EthPassword,
248 })
249 if err != nil {
250 return err
251 }
252 var signer crypto.Signer = eip712signer
253 if cli.PKCS11ModulePath != "" {
254 conf := &crypto11.Config{
255 Path: cli.PKCS11ModulePath,
256 }
257 count := 0
258 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
259 if val != "" {
260 count += 1
261 }
262 }
263 if count != 1 {
264 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
265 }
266 if cli.PKCS11TokenSlot != "" {
267 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
268 if err != nil {
269 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
270 }
271 numint := int(num)
272 // why does crypto11 want this as a reference? odd.
273 conf.SlotNumber = &numint
274 }
275 if cli.PKCS11TokenLabel != "" {
276 conf.TokenLabel = cli.PKCS11TokenLabel
277 }
278 if cli.PKCS11TokenSerial != "" {
279 conf.TokenSerial = cli.PKCS11TokenSerial
280 }
281 pin := cli.PKCS11Pin
282 if pin == "" {
283 fmt.Printf("Please enter PKCS11 PIN: ")
284 password, err := term.ReadPassword(int(os.Stdin.Fd()))
285 fmt.Println("")
286 if err != nil {
287 return fmt.Errorf("error reading PKCS11 password: %w", err)
288 }
289 pin = string(password)
290 }
291 conf.Pin = pin
292
293 sc, err := crypto11.Configure(conf)
294 if err != nil {
295 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
296 }
297 var id []byte = nil
298 var label []byte = nil
299 if cli.PKCS11KeypairID != "" {
300 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
301 if err != nil {
302 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
303 }
304 id = []byte{byte(num)}
305 }
306 if cli.PKCS11KeypairLabel != "" {
307 label = []byte(cli.PKCS11KeypairLabel)
308 }
309 hwsigner, err := sc.FindKeyPair(id, label)
310 if err != nil {
311 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
312 }
313 if hwsigner == nil {
314 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
315 }
316 addr, err := signers.HexAddrFromSigner(hwsigner)
317 if err != nil {
318 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
319 }
320 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
321 signer = hwsigner
322 }
323 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
324
325 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
326 if err != nil {
327 return err
328 }
329 var noter notifications.FirebaseNotifier
330 if cli.FirebaseServiceAccount != "" {
331 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
332 if err != nil {
333 return err
334 }
335 }
336
337 group, ctx := TimeoutGroupWithContext(ctx)
338
339 out := carstore.SQLiteStore{}
340 err = out.Open(":memory:")
341 if err != nil {
342 return err
343 }
344 state, err := statedb.MakeDB(ctx, &cli, noter, mod)
345 if err != nil {
346 return err
347 }
348 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
349 if err != nil {
350 return err
351 }
352 defer handle.Close()
353
354 jwk, err := state.EnsureJWK(ctx, "jwk")
355 if err != nil {
356 return err
357 }
358 cli.JWK = jwk
359
360 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
361 if err != nil {
362 return err
363 }
364 cli.AccessJWK = accessJWK
365
366 b := bus.NewBus()
367 atsync := &atproto.ATProtoSynchronizer{
368 CLI: &cli,
369 Model: mod,
370 StatefulDB: state,
371 Noter: noter,
372 Bus: b,
373 }
374 err = atsync.Migrate(ctx)
375 if err != nil {
376 return fmt.Errorf("failed to migrate: %w", err)
377 }
378
379 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
380 if err != nil {
381 return err
382 }
383
384 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
385 if err != nil {
386 return err
387 }
388
389 clientMetadata := &oatproxy.OAuthClientMetadata{
390 Scope: "atproto transition:generic",
391 ClientName: "Streamplace",
392 RedirectURIs: []string{
393 fmt.Sprintf("https://%s/login", cli.PublicHost),
394 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
395 },
396 }
397
398 op := oatproxy.New(&oatproxy.Config{
399 Host: cli.PublicHost,
400 CreateOAuthSession: state.CreateOAuthSession,
401 UpdateOAuthSession: state.UpdateOAuthSession,
402 GetOAuthSession: state.LoadOAuthSession,
403 Lock: state.GetNamedLock,
404 Scope: "atproto transition:generic",
405 UpstreamJWK: cli.JWK,
406 DownstreamJWK: cli.AccessJWK,
407 ClientMetadata: clientMetadata,
408 })
409 d := director.NewDirector(mm, mod, &cli, b, op, state)
410 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op)
411 if err != nil {
412 return err
413 }
414
415 ctx = log.WithLogValues(ctx, "version", build.Version)
416
417 group.Go(func() error {
418 return handleSignals(ctx)
419 })
420
421 group.Go(func() error {
422 return state.ProcessQueue(ctx)
423 })
424
425 if cli.TracingEndpoint != "" {
426 group.Go(func() error {
427 return startTelemetry(ctx, cli.TracingEndpoint)
428 })
429 }
430
431 if cli.Secure {
432 group.Go(func() error {
433 return a.ServeHTTPS(ctx)
434 })
435 group.Go(func() error {
436 return a.ServeHTTPRedirect(ctx)
437 })
438 if cli.RTMPServerAddon != "" {
439 group.Go(func() error {
440 return rtmps.ServeRTMPS(ctx, &cli)
441 })
442 }
443 } else {
444 group.Go(func() error {
445 return a.ServeHTTP(ctx)
446 })
447 }
448
449 group.Go(func() error {
450 return a.ServeInternalHTTP(ctx)
451 })
452
453 if !cli.NoFirehose {
454 group.Go(func() error {
455 return atsync.StartFirehose(ctx)
456 })
457 }
458 for _, labeler := range cli.Labelers {
459 group.Go(func() error {
460 return atsync.StartLabelerFirehose(ctx, labeler)
461 })
462 }
463
464 group.Go(func() error {
465 return spmetrics.ExpireSessions(ctx)
466 })
467
468 group.Go(func() error {
469 return mod.StartSegmentCleaner(ctx)
470 })
471
472 if cli.LivepeerGateway {
473 // make a file to make sure the directory exists
474 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
475 if err != nil {
476 return err
477 }
478 fd.Close()
479 if err != nil {
480 return err
481 }
482 group.Go(func() error {
483 return GoLivepeer(ctx, fs)
484 })
485 }
486
487 group.Go(func() error {
488 return d.Start(ctx)
489 })
490
491 if cli.TestStream {
492 // regular stream self-test
493 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
494 Schema: schema,
495 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
496 })
497 if err != nil {
498 return err
499 }
500 atkey, err := atproto.ParsePubKey(signer.Public())
501 if err != nil {
502 return err
503 }
504 did := atkey.DIDKey()
505 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
506 if err != nil {
507 return err
508 }
509 err = mod.UpdateIdentity(&model.Identity{
510 ID: testMediaSigner.Pub().String(),
511 Handle: "stream-self-tester",
512 DID: "",
513 })
514 if err != nil {
515 return err
516 }
517 cli.AllowedStreams = append(cli.AllowedStreams, did)
518 a.Aliases["self-test"] = did
519 group.Go(func() error {
520 return mm.TestSource(ctx, testMediaSigner)
521 })
522
523 // Start a test stream that will run intermittently
524 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
525 Schema: schema,
526 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
527 })
528 if err != nil {
529 return err
530 }
531 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
532 if err != nil {
533 return err
534 }
535 did2 := atkey2.DIDKey()
536 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner)
537 if err != nil {
538 return err
539 }
540 err = mod.UpdateIdentity(&model.Identity{
541 ID: intermittentMediaSigner.Pub().String(),
542 Handle: "stream-intermittent-tester",
543 DID: "",
544 })
545 if err != nil {
546 return err
547 }
548 cli.AllowedStreams = append(cli.AllowedStreams, did2)
549 a.Aliases["intermittent-self-test"] = did2
550
551 group.Go(func() error {
552 for {
553 // Start intermittent stream
554 intermittentCtx, cancel := context.WithCancel(ctx)
555 done := make(chan struct{})
556 go func() {
557 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
558 close(done)
559 }()
560 // Stream ON for 15 seconds
561 time.Sleep(15 * time.Second)
562 // Stop stream
563 cancel()
564 <-done // Wait for TestSource to exit
565 // Stream OFF for 15 seconds
566 time.Sleep(15 * time.Second)
567 }
568 })
569 }
570
571 for _, job := range platformJobs {
572 group.Go(func() error {
573 return job(ctx, &cli)
574 })
575 }
576
577 if cli.WHIPTest != "" {
578 group.Go(func() error {
579 err := WHIP(strings.Split(cli.WHIPTest, " "))
580 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
581 time.Sleep(time.Second * 3)
582 // gst.Deinit()
583 log.Warn(ctx, "gst deinit complete, exiting")
584 return err
585 })
586 }
587
588 return group.Wait()
589}
590
591var ErrCaughtSignal = errors.New("caught signal")
592
593func handleSignals(ctx context.Context) error {
594 c := make(chan os.Signal, 1)
595 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
596 for {
597 select {
598 case s := <-c:
599 if s == syscall.SIGABRT {
600 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
601 log.Error(ctx, "failed to create pprof", "error", err)
602 }
603 }
604 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
605 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
606 case <-ctx.Done():
607 return nil
608 }
609 }
610}