Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/bluesky-social/indigo/carstore"
20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
21 "github.com/peterbourgon/ff/v3"
22 "github.com/streamplace/oatproxy/pkg/oatproxy"
23 "golang.org/x/term"
24 "stream.place/streamplace/pkg/aqhttp"
25 "stream.place/streamplace/pkg/atproto"
26 "stream.place/streamplace/pkg/bus"
27 "stream.place/streamplace/pkg/crypto/signers"
28 "stream.place/streamplace/pkg/crypto/signers/eip712"
29 "stream.place/streamplace/pkg/director"
30 "stream.place/streamplace/pkg/log"
31 "stream.place/streamplace/pkg/media"
32 "stream.place/streamplace/pkg/notifications"
33 "stream.place/streamplace/pkg/replication"
34 "stream.place/streamplace/pkg/replication/boring"
35 "stream.place/streamplace/pkg/rtmps"
36 v0 "stream.place/streamplace/pkg/schema/v0"
37 "stream.place/streamplace/pkg/spmetrics"
38 "stream.place/streamplace/pkg/statedb"
39
40 "github.com/ThalesGroup/crypto11"
41 _ "github.com/go-gst/go-glib/glib"
42 _ "github.com/go-gst/go-gst/gst"
43 "stream.place/streamplace/pkg/api"
44 "stream.place/streamplace/pkg/config"
45 "stream.place/streamplace/pkg/model"
46
47 _ "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
48)
49
50// Additional jobs that can be injected by platforms
51type jobFunc func(ctx context.Context, cli *config.CLI) error
52
53// parse the CLI and fire up an streamplace node!
54func start(build *config.BuildFlags, platformJobs []jobFunc) error {
55 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
56 err := media.RunSelfTest(context.Background())
57 if err != nil {
58 if selfTest {
59 fmt.Println(err.Error())
60 os.Exit(1)
61 } else {
62 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
63 if retryCount >= 3 {
64 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
65 return err
66 }
67 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
68 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
69 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
70 if err != nil {
71 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
72 return err
73 }
74 panic("invalid code path: exec succeeded but we're still here???")
75 }
76 }
77 if selfTest {
78 runtime.GC()
79 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
80 log.Error(context.Background(), "error creating pprof", "error", err)
81 }
82 fmt.Println("self-test successful!")
83 os.Exit(0)
84 }
85
86 if len(os.Args) > 1 && os.Args[1] == "stream" {
87 if len(os.Args) != 3 {
88 fmt.Println("usage: streamplace stream [user]")
89 os.Exit(1)
90 }
91 return Stream(os.Args[2])
92 }
93
94 if len(os.Args) > 1 && os.Args[1] == "live" {
95 cli := config.CLI{Build: build}
96 fs := cli.NewFlagSet("streamplace live")
97
98 err := cli.Parse(fs, os.Args[2:])
99 if err != nil {
100 return err
101 }
102
103 args := fs.Args()
104 if len(args) != 1 {
105 fmt.Println("usage: streamplace live [flags] [stream-key]")
106 os.Exit(1)
107 }
108
109 return Live(args[0], cli.HTTPInternalAddr)
110 }
111
112 if len(os.Args) > 1 && os.Args[1] == "sign" {
113 return Sign(context.Background())
114 }
115
116 if len(os.Args) > 1 && os.Args[1] == "whep" {
117 return WHEP(os.Args[2:])
118 }
119 if len(os.Args) > 1 && os.Args[1] == "whip" {
120 return WHIP(os.Args[2:])
121 }
122
123 if len(os.Args) > 1 && os.Args[1] == "clip" {
124 cli := config.CLI{Build: build}
125 fs := cli.NewFlagSet("streamplace clip")
126 out := fs.String("out", "", "output file")
127
128 err := cli.Parse(fs, os.Args[2:])
129 if err != nil {
130 return err
131 }
132 ctx := context.Background()
133 ctx = log.WithDebugValue(ctx, cli.Debug)
134 return Clip(ctx, fs.Args(), *out)
135 }
136
137 if len(os.Args) > 1 && os.Args[1] == "self-test" {
138 err := media.RunSelfTest(context.Background())
139 if err != nil {
140 fmt.Println(err.Error())
141 os.Exit(1)
142 }
143 fmt.Println("self-test successful!")
144 os.Exit(0)
145 }
146
147 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
148 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
149 _ = starter.NewLivepeerConfig(lpfs)
150 err = ff.Parse(lpfs, os.Args[2:],
151 ff.WithConfigFileFlag("config"),
152 ff.WithEnvVarPrefix("LP"),
153 )
154 if err != nil {
155 return err
156 }
157 err = GoLivepeer(context.Background(), lpfs)
158 if err != nil {
159 log.Error(context.Background(), "error in livepeer", "error", err)
160 os.Exit(1)
161 }
162 os.Exit(0)
163 }
164
165 _ = flag.Set("logtostderr", "true")
166 vFlag := flag.Lookup("v")
167 cli := config.CLI{Build: build}
168 fs := cli.NewFlagSet("streamplace")
169 verbosity := fs.String("v", "3", "log verbosity level")
170 version := fs.Bool("version", false, "print version and exit")
171
172 err = cli.Parse(
173 fs, os.Args[1:],
174 )
175 if err != nil {
176 return err
177 }
178 err = flag.CommandLine.Parse(nil)
179 if err != nil {
180 return err
181 }
182 _ = vFlag.Value.Set(*verbosity)
183 log.SetColorLogger(cli.Color)
184 ctx := context.Background()
185 ctx = log.WithDebugValue(ctx, cli.Debug)
186
187 log.Log(ctx,
188 "streamplace",
189 "version", build.Version,
190 "buildTime", build.BuildTimeStr(),
191 "uuid", build.UUID,
192 "runtime.GOOS", runtime.GOOS,
193 "runtime.GOARCH", runtime.GOARCH,
194 "runtime.Version", runtime.Version())
195 if *version {
196 return nil
197 }
198
199 if len(os.Args) > 1 && os.Args[1] == "migrate" {
200 return statedb.Migrate(&cli)
201 }
202
203 spmetrics.Version.WithLabelValues(build.Version).Inc()
204 if cli.LivepeerHelp {
205 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
206 _ = starter.NewLivepeerConfig(lpFlags)
207 lpFlags.VisitAll(func(f *flag.Flag) {
208 adapted := config.ToSnakeCase(f.Name)
209 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
210 usage := fmt.Sprintf(" %s", f.Usage)
211 if f.DefValue != "" {
212 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
213 }
214 fmt.Printf(" %s\n", usage)
215 })
216 return nil
217 }
218
219 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
220
221 err = os.MkdirAll(cli.DataDir, os.ModePerm)
222 if err != nil {
223 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
224 }
225 schema, err := v0.MakeV0Schema()
226 if err != nil {
227 return err
228 }
229 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
230 Schema: schema,
231 EthKeystorePath: cli.EthKeystorePath,
232 EthAccountAddr: cli.EthAccountAddr,
233 EthKeystorePassword: cli.EthPassword,
234 })
235 if err != nil {
236 return err
237 }
238 var signer crypto.Signer = eip712signer
239 if cli.PKCS11ModulePath != "" {
240 conf := &crypto11.Config{
241 Path: cli.PKCS11ModulePath,
242 }
243 count := 0
244 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
245 if val != "" {
246 count += 1
247 }
248 }
249 if count != 1 {
250 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
251 }
252 if cli.PKCS11TokenSlot != "" {
253 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
254 if err != nil {
255 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
256 }
257 numint := int(num)
258 // why does crypto11 want this as a reference? odd.
259 conf.SlotNumber = &numint
260 }
261 if cli.PKCS11TokenLabel != "" {
262 conf.TokenLabel = cli.PKCS11TokenLabel
263 }
264 if cli.PKCS11TokenSerial != "" {
265 conf.TokenSerial = cli.PKCS11TokenSerial
266 }
267 pin := cli.PKCS11Pin
268 if pin == "" {
269 fmt.Printf("Please enter PKCS11 PIN: ")
270 password, err := term.ReadPassword(int(os.Stdin.Fd()))
271 fmt.Println("")
272 if err != nil {
273 return fmt.Errorf("error reading PKCS11 password: %w", err)
274 }
275 pin = string(password)
276 }
277 conf.Pin = pin
278
279 sc, err := crypto11.Configure(conf)
280 if err != nil {
281 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
282 }
283 var id []byte = nil
284 var label []byte = nil
285 if cli.PKCS11KeypairID != "" {
286 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
287 if err != nil {
288 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
289 }
290 id = []byte{byte(num)}
291 }
292 if cli.PKCS11KeypairLabel != "" {
293 label = []byte(cli.PKCS11KeypairLabel)
294 }
295 hwsigner, err := sc.FindKeyPair(id, label)
296 if err != nil {
297 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
298 }
299 if hwsigner == nil {
300 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
301 }
302 addr, err := signers.HexAddrFromSigner(hwsigner)
303 if err != nil {
304 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
305 }
306 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
307 signer = hwsigner
308 }
309 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
310
311 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
312 if err != nil {
313 return err
314 }
315 var noter notifications.FirebaseNotifier
316 if cli.FirebaseServiceAccount != "" {
317 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
318 if err != nil {
319 return err
320 }
321 }
322
323 group, ctx := TimeoutGroupWithContext(ctx)
324
325 out := carstore.SQLiteStore{}
326 err = out.Open(":memory:")
327 if err != nil {
328 return err
329 }
330 state, err := statedb.MakeDB(ctx, &cli, noter, mod)
331 if err != nil {
332 return err
333 }
334 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
335 if err != nil {
336 return err
337 }
338 defer handle.Close()
339
340 jwk, err := state.EnsureJWK(ctx, "jwk")
341 if err != nil {
342 return err
343 }
344 cli.JWK = jwk
345
346 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
347 if err != nil {
348 return err
349 }
350 cli.AccessJWK = accessJWK
351
352 b := bus.NewBus()
353 atsync := &atproto.ATProtoSynchronizer{
354 CLI: &cli,
355 Model: mod,
356 StatefulDB: state,
357 Noter: noter,
358 Bus: b,
359 }
360 err = atsync.Migrate(ctx)
361 if err != nil {
362 return fmt.Errorf("failed to migrate: %w", err)
363 }
364
365 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
366 if err != nil {
367 return err
368 }
369
370 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
371 if err != nil {
372 return err
373 }
374
375 clientMetadata := &oatproxy.OAuthClientMetadata{
376 Scope: "atproto transition:generic",
377 ClientName: "Streamplace",
378 RedirectURIs: []string{
379 fmt.Sprintf("https://%s/login", cli.PublicHost),
380 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
381 },
382 }
383
384 op := oatproxy.New(&oatproxy.Config{
385 Host: cli.PublicHost,
386 CreateOAuthSession: state.CreateOAuthSession,
387 UpdateOAuthSession: state.UpdateOAuthSession,
388 GetOAuthSession: state.LoadOAuthSession,
389 Lock: state.GetNamedLock,
390 Scope: "atproto transition:generic",
391 UpstreamJWK: cli.JWK,
392 DownstreamJWK: cli.AccessJWK,
393 ClientMetadata: clientMetadata,
394 })
395 d := director.NewDirector(mm, mod, &cli, b, op, state)
396 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op)
397 if err != nil {
398 return err
399 }
400
401 ctx = log.WithLogValues(ctx, "version", build.Version)
402
403 group.Go(func() error {
404 return handleSignals(ctx)
405 })
406
407 group.Go(func() error {
408 return state.ProcessQueue(ctx)
409 })
410
411 if cli.TracingEndpoint != "" {
412 group.Go(func() error {
413 return startTelemetry(ctx, cli.TracingEndpoint)
414 })
415 }
416
417 if cli.Secure {
418 group.Go(func() error {
419 return a.ServeHTTPS(ctx)
420 })
421 group.Go(func() error {
422 return a.ServeHTTPRedirect(ctx)
423 })
424 if cli.RTMPServerAddon != "" {
425 group.Go(func() error {
426 return rtmps.ServeRTMPS(ctx, &cli)
427 })
428 }
429 } else {
430 group.Go(func() error {
431 return a.ServeHTTP(ctx)
432 })
433 }
434
435 group.Go(func() error {
436 return a.ServeInternalHTTP(ctx)
437 })
438
439 if !cli.NoFirehose {
440 group.Go(func() error {
441 return atsync.StartFirehose(ctx)
442 })
443 }
444 for _, labeler := range cli.Labelers {
445 group.Go(func() error {
446 return atsync.StartLabelerFirehose(ctx, labeler)
447 })
448 }
449
450 group.Go(func() error {
451 return spmetrics.ExpireSessions(ctx)
452 })
453
454 group.Go(func() error {
455 return mod.StartSegmentCleaner(ctx)
456 })
457
458 if cli.LivepeerGateway {
459 // make a file to make sure the directory exists
460 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
461 if err != nil {
462 return err
463 }
464 fd.Close()
465 if err != nil {
466 return err
467 }
468 group.Go(func() error {
469 return GoLivepeer(ctx, fs)
470 })
471 }
472
473 group.Go(func() error {
474 return d.Start(ctx)
475 })
476
477 if cli.TestStream {
478 // regular stream self-test
479 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
480 Schema: schema,
481 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
482 })
483 if err != nil {
484 return err
485 }
486 atkey, err := atproto.ParsePubKey(signer.Public())
487 if err != nil {
488 return err
489 }
490 did := atkey.DIDKey()
491 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
492 if err != nil {
493 return err
494 }
495 err = mod.UpdateIdentity(&model.Identity{
496 ID: testMediaSigner.Pub().String(),
497 Handle: "stream-self-tester",
498 DID: "",
499 })
500 if err != nil {
501 return err
502 }
503 cli.AllowedStreams = append(cli.AllowedStreams, did)
504 a.Aliases["self-test"] = did
505 group.Go(func() error {
506 return mm.TestSource(ctx, testMediaSigner)
507 })
508
509 // Start a test stream that will run intermittently
510 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
511 Schema: schema,
512 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
513 })
514 if err != nil {
515 return err
516 }
517 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
518 if err != nil {
519 return err
520 }
521 did2 := atkey2.DIDKey()
522 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner)
523 if err != nil {
524 return err
525 }
526 err = mod.UpdateIdentity(&model.Identity{
527 ID: intermittentMediaSigner.Pub().String(),
528 Handle: "stream-intermittent-tester",
529 DID: "",
530 })
531 if err != nil {
532 return err
533 }
534 cli.AllowedStreams = append(cli.AllowedStreams, did2)
535 a.Aliases["intermittent-self-test"] = did2
536
537 group.Go(func() error {
538 for {
539 // Start intermittent stream
540 intermittentCtx, cancel := context.WithCancel(ctx)
541 done := make(chan struct{})
542 go func() {
543 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
544 close(done)
545 }()
546 // Stream ON for 15 seconds
547 time.Sleep(15 * time.Second)
548 // Stop stream
549 cancel()
550 <-done // Wait for TestSource to exit
551 // Stream OFF for 15 seconds
552 time.Sleep(15 * time.Second)
553 }
554 })
555 }
556
557 for _, job := range platformJobs {
558 group.Go(func() error {
559 return job(ctx, &cli)
560 })
561 }
562
563 if cli.WHIPTest != "" {
564 group.Go(func() error {
565 err := WHIP(strings.Split(cli.WHIPTest, " "))
566 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
567 time.Sleep(time.Second * 3)
568 // gst.Deinit()
569 log.Warn(ctx, "gst deinit complete, exiting")
570 return err
571 })
572 }
573
574 return group.Wait()
575}
576
577var ErrCaughtSignal = errors.New("caught signal")
578
579func handleSignals(ctx context.Context) error {
580 c := make(chan os.Signal, 1)
581 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
582 for {
583 select {
584 case s := <-c:
585 if s == syscall.SIGABRT {
586 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
587 log.Error(ctx, "failed to create pprof", "error", err)
588 }
589 }
590 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
591 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
592 case <-ctx.Done():
593 return nil
594 }
595 }
596}