Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/bluesky-social/indigo/carstore"
20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
21 "github.com/peterbourgon/ff/v3"
22 "github.com/streamplace/oatproxy/pkg/oatproxy"
23 "golang.org/x/term"
24 "stream.place/streamplace/pkg/aqhttp"
25 "stream.place/streamplace/pkg/atproto"
26 "stream.place/streamplace/pkg/bus"
27 "stream.place/streamplace/pkg/crypto/signers"
28 "stream.place/streamplace/pkg/crypto/signers/eip712"
29 "stream.place/streamplace/pkg/director"
30 "stream.place/streamplace/pkg/log"
31 "stream.place/streamplace/pkg/media"
32 "stream.place/streamplace/pkg/notifications"
33 "stream.place/streamplace/pkg/replication"
34 "stream.place/streamplace/pkg/replication/boring"
35 "stream.place/streamplace/pkg/resync"
36 "stream.place/streamplace/pkg/rtmps"
37 v0 "stream.place/streamplace/pkg/schema/v0"
38 "stream.place/streamplace/pkg/spmetrics"
39 "stream.place/streamplace/pkg/statedb"
40
41 "github.com/ThalesGroup/crypto11"
42 _ "github.com/go-gst/go-glib/glib"
43 _ "github.com/go-gst/go-gst/gst"
44 "stream.place/streamplace/pkg/api"
45 "stream.place/streamplace/pkg/config"
46 "stream.place/streamplace/pkg/model"
47)
48
49// Additional jobs that can be injected by platforms
50type jobFunc func(ctx context.Context, cli *config.CLI) error
51
52// parse the CLI and fire up an streamplace node!
53func start(build *config.BuildFlags, platformJobs []jobFunc) error {
54 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
55 err := media.RunSelfTest(context.Background())
56 if err != nil {
57 if selfTest {
58 fmt.Println(err.Error())
59 os.Exit(1)
60 } else {
61 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
62 if retryCount >= 3 {
63 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
64 return err
65 }
66 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
67 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
68 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
69 if err != nil {
70 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
71 return err
72 }
73 panic("invalid code path: exec succeeded but we're still here???")
74 }
75 }
76 if selfTest {
77 runtime.GC()
78 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
79 log.Error(context.Background(), "error creating pprof", "error", err)
80 }
81 fmt.Println("self-test successful!")
82 os.Exit(0)
83 }
84
85 if len(os.Args) > 1 && os.Args[1] == "stream" {
86 if len(os.Args) != 3 {
87 fmt.Println("usage: streamplace stream [user]")
88 os.Exit(1)
89 }
90 return Stream(os.Args[2])
91 }
92
93 if len(os.Args) > 1 && os.Args[1] == "live" {
94 cli := config.CLI{Build: build}
95 fs := cli.NewFlagSet("streamplace live")
96
97 err := cli.Parse(fs, os.Args[2:])
98 if err != nil {
99 return err
100 }
101
102 args := fs.Args()
103 if len(args) != 1 {
104 fmt.Println("usage: streamplace live [flags] [stream-key]")
105 os.Exit(1)
106 }
107
108 return Live(args[0], cli.HTTPInternalAddr)
109 }
110
111 if len(os.Args) > 1 && os.Args[1] == "sign" {
112 return Sign(context.Background())
113 }
114
115 if len(os.Args) > 1 && os.Args[1] == "whep" {
116 return WHEP(os.Args[2:])
117 }
118 if len(os.Args) > 1 && os.Args[1] == "whip" {
119 return WHIP(os.Args[2:])
120 }
121
122 if len(os.Args) > 1 && os.Args[1] == "clip" {
123 cli := config.CLI{Build: build}
124 fs := cli.NewFlagSet("streamplace clip")
125 out := fs.String("out", "", "output file")
126
127 err := cli.Parse(fs, os.Args[2:])
128 if err != nil {
129 return err
130 }
131 ctx := context.Background()
132 ctx = log.WithDebugValue(ctx, cli.Debug)
133 return Clip(ctx, fs.Args(), *out)
134 }
135
136 if len(os.Args) > 1 && os.Args[1] == "self-test" {
137 err := media.RunSelfTest(context.Background())
138 if err != nil {
139 fmt.Println(err.Error())
140 os.Exit(1)
141 }
142 fmt.Println("self-test successful!")
143 os.Exit(0)
144 }
145
146 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
147 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
148 _ = starter.NewLivepeerConfig(lpfs)
149 err = ff.Parse(lpfs, os.Args[2:],
150 ff.WithConfigFileFlag("config"),
151 ff.WithEnvVarPrefix("LP"),
152 )
153 if err != nil {
154 return err
155 }
156 err = GoLivepeer(context.Background(), lpfs)
157 if err != nil {
158 log.Error(context.Background(), "error in livepeer", "error", err)
159 os.Exit(1)
160 }
161 os.Exit(0)
162 }
163
164 _ = flag.Set("logtostderr", "true")
165 vFlag := flag.Lookup("v")
166 cli := config.CLI{Build: build}
167 fs := cli.NewFlagSet("streamplace")
168 verbosity := fs.String("v", "3", "log verbosity level")
169 version := fs.Bool("version", false, "print version and exit")
170
171 err = cli.Parse(
172 fs, os.Args[1:],
173 )
174 if err != nil {
175 return err
176 }
177 err = flag.CommandLine.Parse(nil)
178 if err != nil {
179 return err
180 }
181 _ = vFlag.Value.Set(*verbosity)
182 log.SetColorLogger(cli.Color)
183 ctx := context.Background()
184 ctx = log.WithDebugValue(ctx, cli.Debug)
185
186 log.Log(ctx,
187 "streamplace",
188 "version", build.Version,
189 "buildTime", build.BuildTimeStr(),
190 "uuid", build.UUID,
191 "runtime.GOOS", runtime.GOOS,
192 "runtime.GOARCH", runtime.GOARCH,
193 "runtime.Version", runtime.Version())
194 if *version {
195 return nil
196 }
197
198 if len(os.Args) > 1 && os.Args[1] == "migrate" {
199 return statedb.Migrate(&cli)
200 }
201
202 spmetrics.Version.WithLabelValues(build.Version).Inc()
203 if cli.LivepeerHelp {
204 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
205 _ = starter.NewLivepeerConfig(lpFlags)
206 lpFlags.VisitAll(func(f *flag.Flag) {
207 adapted := config.ToSnakeCase(f.Name)
208 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
209 usage := fmt.Sprintf(" %s", f.Usage)
210 if f.DefValue != "" {
211 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
212 }
213 fmt.Printf(" %s\n", usage)
214 })
215 return nil
216 }
217
218 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
219 if len(os.Args) > 1 && os.Args[1] == "resync" {
220 return resync.Resync(ctx, &cli)
221 }
222
223 err = os.MkdirAll(cli.DataDir, os.ModePerm)
224 if err != nil {
225 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
226 }
227 schema, err := v0.MakeV0Schema()
228 if err != nil {
229 return err
230 }
231 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
232 Schema: schema,
233 EthKeystorePath: cli.EthKeystorePath,
234 EthAccountAddr: cli.EthAccountAddr,
235 EthKeystorePassword: cli.EthPassword,
236 })
237 if err != nil {
238 return err
239 }
240 var signer crypto.Signer = eip712signer
241 if cli.PKCS11ModulePath != "" {
242 conf := &crypto11.Config{
243 Path: cli.PKCS11ModulePath,
244 }
245 count := 0
246 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
247 if val != "" {
248 count += 1
249 }
250 }
251 if count != 1 {
252 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
253 }
254 if cli.PKCS11TokenSlot != "" {
255 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
256 if err != nil {
257 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
258 }
259 numint := int(num)
260 // why does crypto11 want this as a reference? odd.
261 conf.SlotNumber = &numint
262 }
263 if cli.PKCS11TokenLabel != "" {
264 conf.TokenLabel = cli.PKCS11TokenLabel
265 }
266 if cli.PKCS11TokenSerial != "" {
267 conf.TokenSerial = cli.PKCS11TokenSerial
268 }
269 pin := cli.PKCS11Pin
270 if pin == "" {
271 fmt.Printf("Please enter PKCS11 PIN: ")
272 password, err := term.ReadPassword(int(os.Stdin.Fd()))
273 fmt.Println("")
274 if err != nil {
275 return fmt.Errorf("error reading PKCS11 password: %w", err)
276 }
277 pin = string(password)
278 }
279 conf.Pin = pin
280
281 sc, err := crypto11.Configure(conf)
282 if err != nil {
283 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
284 }
285 var id []byte = nil
286 var label []byte = nil
287 if cli.PKCS11KeypairID != "" {
288 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
289 if err != nil {
290 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
291 }
292 id = []byte{byte(num)}
293 }
294 if cli.PKCS11KeypairLabel != "" {
295 label = []byte(cli.PKCS11KeypairLabel)
296 }
297 hwsigner, err := sc.FindKeyPair(id, label)
298 if err != nil {
299 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
300 }
301 if hwsigner == nil {
302 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
303 }
304 addr, err := signers.HexAddrFromSigner(hwsigner)
305 if err != nil {
306 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
307 }
308 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
309 signer = hwsigner
310 }
311 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
312
313 mod, err := model.MakeDB(cli.IndexDBPath)
314 if err != nil {
315 return err
316 }
317 var noter notifications.FirebaseNotifier
318 if cli.FirebaseServiceAccount != "" {
319 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
320 if err != nil {
321 return err
322 }
323 }
324 out := carstore.SQLiteStore{}
325 err = out.Open(":memory:")
326 if err != nil {
327 return err
328 }
329 state, err := statedb.MakeDB(&cli, noter, mod)
330 if err != nil {
331 return err
332 }
333 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
334 if err != nil {
335 return err
336 }
337 defer handle.Close()
338
339 jwk, err := state.EnsureJWK(ctx, "jwk")
340 if err != nil {
341 return err
342 }
343 cli.JWK = jwk
344
345 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
346 if err != nil {
347 return err
348 }
349 cli.AccessJWK = accessJWK
350
351 b := bus.NewBus()
352 atsync := &atproto.ATProtoSynchronizer{
353 CLI: &cli,
354 Model: mod,
355 StatefulDB: state,
356 Noter: noter,
357 Bus: b,
358 }
359 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
360 if err != nil {
361 return err
362 }
363
364 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
365 if err != nil {
366 return err
367 }
368
369 clientMetadata := &oatproxy.OAuthClientMetadata{
370 Scope: "atproto transition:generic",
371 ClientName: "Streamplace",
372 RedirectURIs: []string{
373 fmt.Sprintf("https://%s/login", cli.PublicHost),
374 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
375 },
376 }
377
378 op := oatproxy.New(&oatproxy.Config{
379 Host: cli.PublicHost,
380 CreateOAuthSession: state.CreateOAuthSession,
381 UpdateOAuthSession: state.UpdateOAuthSession,
382 GetOAuthSession: state.LoadOAuthSession,
383 Lock: state.GetNamedLock,
384 Scope: "atproto transition:generic",
385 UpstreamJWK: cli.JWK,
386 DownstreamJWK: cli.AccessJWK,
387 ClientMetadata: clientMetadata,
388 })
389 d := director.NewDirector(mm, mod, &cli, b, op, state)
390 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op)
391 if err != nil {
392 return err
393 }
394
395 group, ctx := TimeoutGroupWithContext(ctx)
396 ctx = log.WithLogValues(ctx, "version", build.Version)
397
398 group.Go(func() error {
399 return handleSignals(ctx)
400 })
401
402 group.Go(func() error {
403 return state.ProcessQueue(ctx)
404 })
405
406 if cli.TracingEndpoint != "" {
407 group.Go(func() error {
408 return startTelemetry(ctx, cli.TracingEndpoint)
409 })
410 }
411
412 if cli.Secure {
413 group.Go(func() error {
414 return a.ServeHTTPS(ctx)
415 })
416 group.Go(func() error {
417 return a.ServeHTTPRedirect(ctx)
418 })
419 if cli.RTMPServerAddon != "" {
420 group.Go(func() error {
421 return rtmps.ServeRTMPS(ctx, &cli)
422 })
423 }
424 } else {
425 group.Go(func() error {
426 return a.ServeHTTP(ctx)
427 })
428 }
429
430 group.Go(func() error {
431 return a.ServeInternalHTTP(ctx)
432 })
433
434 if !cli.NoFirehose {
435 group.Go(func() error {
436 return atsync.StartFirehose(ctx)
437 })
438 }
439 for _, labeler := range cli.Labelers {
440 group.Go(func() error {
441 return atsync.StartLabelerFirehose(ctx, labeler)
442 })
443 }
444
445 group.Go(func() error {
446 return spmetrics.ExpireSessions(ctx)
447 })
448
449 group.Go(func() error {
450 return mod.StartSegmentCleaner(ctx)
451 })
452
453 if cli.LivepeerGateway {
454 // make a file to make sure the directory exists
455 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
456 if err != nil {
457 return err
458 }
459 fd.Close()
460 if err != nil {
461 return err
462 }
463 group.Go(func() error {
464 return GoLivepeer(ctx, fs)
465 })
466 }
467
468 group.Go(func() error {
469 return d.Start(ctx)
470 })
471
472 if cli.TestStream {
473 // regular stream self-test
474 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
475 Schema: schema,
476 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
477 })
478 if err != nil {
479 return err
480 }
481 atkey, err := atproto.ParsePubKey(signer.Public())
482 if err != nil {
483 return err
484 }
485 did := atkey.DIDKey()
486 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
487 if err != nil {
488 return err
489 }
490 err = mod.UpdateIdentity(&model.Identity{
491 ID: testMediaSigner.Pub().String(),
492 Handle: "stream-self-tester",
493 DID: "",
494 })
495 if err != nil {
496 return err
497 }
498 cli.AllowedStreams = append(cli.AllowedStreams, did)
499 a.Aliases["self-test"] = did
500 group.Go(func() error {
501 return mm.TestSource(ctx, testMediaSigner)
502 })
503
504 // Start a test stream that will run intermittently
505 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
506 Schema: schema,
507 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
508 })
509 if err != nil {
510 return err
511 }
512 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
513 if err != nil {
514 return err
515 }
516 did2 := atkey2.DIDKey()
517 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner)
518 if err != nil {
519 return err
520 }
521 err = mod.UpdateIdentity(&model.Identity{
522 ID: intermittentMediaSigner.Pub().String(),
523 Handle: "stream-intermittent-tester",
524 DID: "",
525 })
526 if err != nil {
527 return err
528 }
529 cli.AllowedStreams = append(cli.AllowedStreams, did2)
530 a.Aliases["intermittent-self-test"] = did2
531
532 group.Go(func() error {
533 for {
534 // Start intermittent stream
535 intermittentCtx, cancel := context.WithCancel(ctx)
536 done := make(chan struct{})
537 go func() {
538 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
539 close(done)
540 }()
541 // Stream ON for 15 seconds
542 time.Sleep(15 * time.Second)
543 // Stop stream
544 cancel()
545 <-done // Wait for TestSource to exit
546 // Stream OFF for 15 seconds
547 time.Sleep(15 * time.Second)
548 }
549 })
550 }
551
552 for _, job := range platformJobs {
553 group.Go(func() error {
554 return job(ctx, &cli)
555 })
556 }
557
558 if cli.WHIPTest != "" {
559 group.Go(func() error {
560 err := WHIP(strings.Split(cli.WHIPTest, " "))
561 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
562 time.Sleep(time.Second * 3)
563 // gst.Deinit()
564 log.Warn(ctx, "gst deinit complete, exiting")
565 return err
566 })
567 }
568
569 return group.Wait()
570}
571
572var ErrCaughtSignal = errors.New("caught signal")
573
574func handleSignals(ctx context.Context) error {
575 c := make(chan os.Signal, 1)
576 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
577 for {
578 select {
579 case s := <-c:
580 if s == syscall.SIGABRT {
581 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
582 log.Error(ctx, "failed to create pprof", "error", err)
583 }
584 }
585 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
586 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
587 case <-ctx.Done():
588 return nil
589 }
590 }
591}