Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
20 "github.com/peterbourgon/ff/v3"
21 "github.com/streamplace/oatproxy/pkg/oatproxy"
22 "golang.org/x/term"
23 "stream.place/streamplace/pkg/aqhttp"
24 "stream.place/streamplace/pkg/atproto"
25 "stream.place/streamplace/pkg/bus"
26 "stream.place/streamplace/pkg/crypto/signers"
27 "stream.place/streamplace/pkg/crypto/signers/eip712"
28 "stream.place/streamplace/pkg/director"
29 "stream.place/streamplace/pkg/log"
30 "stream.place/streamplace/pkg/media"
31 "stream.place/streamplace/pkg/notifications"
32 "stream.place/streamplace/pkg/replication"
33 "stream.place/streamplace/pkg/replication/boring"
34 "stream.place/streamplace/pkg/resync"
35 "stream.place/streamplace/pkg/rtmps"
36 v0 "stream.place/streamplace/pkg/schema/v0"
37 "stream.place/streamplace/pkg/spmetrics"
38
39 "github.com/ThalesGroup/crypto11"
40 _ "github.com/go-gst/go-glib/glib"
41 _ "github.com/go-gst/go-gst/gst"
42 "stream.place/streamplace/pkg/api"
43 "stream.place/streamplace/pkg/config"
44 "stream.place/streamplace/pkg/model"
45)
46
47// Additional jobs that can be injected by platforms
48type jobFunc func(ctx context.Context, cli *config.CLI) error
49
50// parse the CLI and fire up an streamplace node!
51func start(build *config.BuildFlags, platformJobs []jobFunc) error {
52 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
53 err := media.RunSelfTest(context.Background())
54 if err != nil {
55 if selfTest {
56 fmt.Println(err.Error())
57 os.Exit(1)
58 } else {
59 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
60 if retryCount >= 3 {
61 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
62 return err
63 }
64 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
65 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
66 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
67 if err != nil {
68 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
69 return err
70 }
71 panic("invalid code path: exec succeeded but we're still here???")
72 }
73 }
74 if selfTest {
75 runtime.GC()
76 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
77 log.Error(context.Background(), "error creating pprof", "error", err)
78 }
79 fmt.Println("self-test successful!")
80 os.Exit(0)
81 }
82
83 if len(os.Args) > 1 && os.Args[1] == "stream" {
84 if len(os.Args) != 3 {
85 fmt.Println("usage: streamplace stream [user]")
86 os.Exit(1)
87 }
88 return Stream(os.Args[2])
89 }
90
91 if len(os.Args) > 1 && os.Args[1] == "live" {
92 cli := config.CLI{Build: build}
93 fs := cli.NewFlagSet("streamplace live")
94
95 err := cli.Parse(fs, os.Args[2:])
96 if err != nil {
97 return err
98 }
99
100 args := fs.Args()
101 if len(args) != 1 {
102 fmt.Println("usage: streamplace live [flags] [stream-key]")
103 os.Exit(1)
104 }
105
106 return Live(args[0], cli.HTTPInternalAddr)
107 }
108
109 if len(os.Args) > 1 && os.Args[1] == "sign" {
110 return Sign(context.Background())
111 }
112
113 if len(os.Args) > 1 && os.Args[1] == "whep" {
114 return WHEP(os.Args[2:])
115 }
116 if len(os.Args) > 1 && os.Args[1] == "whip" {
117 return WHIP(os.Args[2:])
118 }
119
120 if len(os.Args) > 1 && os.Args[1] == "clip" {
121 cli := config.CLI{Build: build}
122 fs := cli.NewFlagSet("streamplace clip")
123 out := fs.String("out", "", "output file")
124
125 err := cli.Parse(fs, os.Args[2:])
126 if err != nil {
127 return err
128 }
129 ctx := context.Background()
130 ctx = log.WithDebugValue(ctx, cli.Debug)
131 return Clip(ctx, fs.Args(), *out)
132 }
133
134 if len(os.Args) > 1 && os.Args[1] == "self-test" {
135 err := media.RunSelfTest(context.Background())
136 if err != nil {
137 fmt.Println(err.Error())
138 os.Exit(1)
139 }
140 fmt.Println("self-test successful!")
141 os.Exit(0)
142 }
143
144 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
145 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
146 _ = starter.NewLivepeerConfig(lpfs)
147 err = ff.Parse(lpfs, os.Args[2:],
148 ff.WithConfigFileFlag("config"),
149 ff.WithEnvVarPrefix("LP"),
150 )
151 if err != nil {
152 return err
153 }
154 err = GoLivepeer(context.Background(), lpfs)
155 if err != nil {
156 log.Error(context.Background(), "error in livepeer", "error", err)
157 os.Exit(1)
158 }
159 os.Exit(0)
160 }
161
162 _ = flag.Set("logtostderr", "true")
163 vFlag := flag.Lookup("v")
164 cli := config.CLI{Build: build}
165 fs := cli.NewFlagSet("streamplace")
166 verbosity := fs.String("v", "3", "log verbosity level")
167 version := fs.Bool("version", false, "print version and exit")
168
169 err = cli.Parse(
170 fs, os.Args[1:],
171 )
172 if err != nil {
173 return err
174 }
175 err = flag.CommandLine.Parse(nil)
176 if err != nil {
177 return err
178 }
179 _ = vFlag.Value.Set(*verbosity)
180 log.SetColorLogger(cli.Color)
181 ctx := context.Background()
182 ctx = log.WithDebugValue(ctx, cli.Debug)
183
184 log.Log(ctx,
185 "streamplace",
186 "version", build.Version,
187 "buildTime", build.BuildTimeStr(),
188 "uuid", build.UUID,
189 "runtime.GOOS", runtime.GOOS,
190 "runtime.GOARCH", runtime.GOARCH,
191 "runtime.Version", runtime.Version())
192 if *version {
193 return nil
194 }
195 if cli.LivepeerHelp {
196 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
197 _ = starter.NewLivepeerConfig(lpFlags)
198 lpFlags.VisitAll(func(f *flag.Flag) {
199 adapted := config.ToSnakeCase(f.Name)
200 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
201 usage := fmt.Sprintf(" %s", f.Usage)
202 if f.DefValue != "" {
203 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
204 }
205 fmt.Printf(" %s\n", usage)
206 })
207 return nil
208 }
209
210 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
211 if len(os.Args) > 1 && os.Args[1] == "resync" {
212 return resync.Resync(ctx, &cli)
213 }
214
215 err = os.MkdirAll(cli.DataDir, os.ModePerm)
216 if err != nil {
217 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
218 }
219 schema, err := v0.MakeV0Schema()
220 if err != nil {
221 return err
222 }
223 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
224 Schema: schema,
225 EthKeystorePath: cli.EthKeystorePath,
226 EthAccountAddr: cli.EthAccountAddr,
227 EthKeystorePassword: cli.EthPassword,
228 })
229 if err != nil {
230 return err
231 }
232 var signer crypto.Signer = eip712signer
233 if cli.PKCS11ModulePath != "" {
234 conf := &crypto11.Config{
235 Path: cli.PKCS11ModulePath,
236 }
237 count := 0
238 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
239 if val != "" {
240 count += 1
241 }
242 }
243 if count != 1 {
244 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
245 }
246 if cli.PKCS11TokenSlot != "" {
247 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
248 if err != nil {
249 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
250 }
251 numint := int(num)
252 // why does crypto11 want this as a reference? odd.
253 conf.SlotNumber = &numint
254 }
255 if cli.PKCS11TokenLabel != "" {
256 conf.TokenLabel = cli.PKCS11TokenLabel
257 }
258 if cli.PKCS11TokenSerial != "" {
259 conf.TokenSerial = cli.PKCS11TokenSerial
260 }
261 pin := cli.PKCS11Pin
262 if pin == "" {
263 fmt.Printf("Please enter PKCS11 PIN: ")
264 password, err := term.ReadPassword(int(os.Stdin.Fd()))
265 fmt.Println("")
266 if err != nil {
267 return fmt.Errorf("error reading PKCS11 password: %w", err)
268 }
269 pin = string(password)
270 }
271 conf.Pin = pin
272
273 sc, err := crypto11.Configure(conf)
274 if err != nil {
275 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
276 }
277 var id []byte = nil
278 var label []byte = nil
279 if cli.PKCS11KeypairID != "" {
280 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
281 if err != nil {
282 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
283 }
284 id = []byte{byte(num)}
285 }
286 if cli.PKCS11KeypairLabel != "" {
287 label = []byte(cli.PKCS11KeypairLabel)
288 }
289 hwsigner, err := sc.FindKeyPair(id, label)
290 if err != nil {
291 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
292 }
293 if hwsigner == nil {
294 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
295 }
296 addr, err := signers.HexAddrFromSigner(hwsigner)
297 if err != nil {
298 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
299 }
300 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
301 signer = hwsigner
302 }
303 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
304 mod, err := model.MakeDB(cli.DBPath)
305 if err != nil {
306 return err
307 }
308 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod)
309 if err != nil {
310 return err
311 }
312 defer handle.Close()
313 var noter notifications.FirebaseNotifier
314 if cli.FirebaseServiceAccount != "" {
315 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
316 if err != nil {
317 return err
318 }
319 }
320
321 jwkPath := cli.DataFilePath([]string{"jwk.json"})
322 jwk, err := atproto.EnsureJWK(ctx, jwkPath)
323 if err != nil {
324 return err
325 }
326 cli.JWK = jwk
327
328 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"})
329 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath)
330 if err != nil {
331 return err
332 }
333 cli.AccessJWK = accessJWK
334
335 b := bus.NewBus()
336 atsync := &atproto.ATProtoSynchronizer{
337 CLI: &cli,
338 Model: mod,
339 Noter: noter,
340 Bus: b,
341 }
342 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
343 if err != nil {
344 return err
345 }
346
347 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
348 if err != nil {
349 return err
350 }
351
352 clientMetadata := &oatproxy.OAuthClientMetadata{
353 Scope: "atproto transition:generic",
354 ClientName: "Streamplace",
355 RedirectURIs: []string{
356 fmt.Sprintf("https://%s/login", cli.PublicHost),
357 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
358 },
359 }
360
361 op := oatproxy.New(&oatproxy.Config{
362 Host: cli.PublicHost,
363 CreateOAuthSession: mod.CreateOAuthSession,
364 UpdateOAuthSession: mod.UpdateOAuthSession,
365 GetOAuthSession: mod.LoadOAuthSession,
366 Scope: "atproto transition:generic",
367 UpstreamJWK: cli.JWK,
368 DownstreamJWK: cli.AccessJWK,
369 ClientMetadata: clientMetadata,
370 })
371 d := director.NewDirector(mm, mod, &cli, b, op)
372 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op)
373 if err != nil {
374 return err
375 }
376
377 group, ctx := TimeoutGroupWithContext(ctx)
378 ctx = log.WithLogValues(ctx, "version", build.Version)
379
380 group.Go(func() error {
381 return handleSignals(ctx)
382 })
383
384 if cli.TracingEndpoint != "" {
385 group.Go(func() error {
386 return startTelemetry(ctx, cli.TracingEndpoint)
387 })
388 }
389
390 if cli.Secure {
391 group.Go(func() error {
392 return a.ServeHTTPS(ctx)
393 })
394 group.Go(func() error {
395 return a.ServeHTTPRedirect(ctx)
396 })
397 if cli.RTMPServerAddon != "" {
398 group.Go(func() error {
399 return rtmps.ServeRTMPS(ctx, &cli)
400 })
401 }
402 } else {
403 group.Go(func() error {
404 return a.ServeHTTP(ctx)
405 })
406 }
407
408 group.Go(func() error {
409 return a.ServeInternalHTTP(ctx)
410 })
411
412 if !cli.NoFirehose {
413 group.Go(func() error {
414 return atsync.StartFirehose(ctx)
415 })
416 }
417 for _, labeler := range cli.Labelers {
418 group.Go(func() error {
419 return atsync.StartLabelerFirehose(ctx, labeler)
420 })
421 }
422
423 group.Go(func() error {
424 return spmetrics.ExpireSessions(ctx)
425 })
426
427 group.Go(func() error {
428 return mod.StartSegmentCleaner(ctx)
429 })
430
431 if cli.LivepeerGateway {
432 // make a file to make sure the directory exists
433 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
434 if err != nil {
435 return err
436 }
437 fd.Close()
438 if err != nil {
439 return err
440 }
441 group.Go(func() error {
442 return GoLivepeer(ctx, fs)
443 })
444 }
445
446 group.Go(func() error {
447 return d.Start(ctx)
448 })
449
450 if cli.TestStream {
451 // regular stream self-test
452 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
453 Schema: schema,
454 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
455 })
456 if err != nil {
457 return err
458 }
459 atkey, err := atproto.ParsePubKey(signer.Public())
460 if err != nil {
461 return err
462 }
463 did := atkey.DIDKey()
464 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
465 if err != nil {
466 return err
467 }
468 err = mod.UpdateIdentity(&model.Identity{
469 ID: testMediaSigner.Pub().String(),
470 Handle: "stream-self-tester",
471 DID: "",
472 })
473 if err != nil {
474 return err
475 }
476 cli.AllowedStreams = append(cli.AllowedStreams, did)
477 a.Aliases["self-test"] = did
478 group.Go(func() error {
479 return mm.TestSource(ctx, testMediaSigner)
480 })
481
482 // Start a test stream that will run intermittently
483 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
484 Schema: schema,
485 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
486 })
487 if err != nil {
488 return err
489 }
490 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
491 if err != nil {
492 return err
493 }
494 did2 := atkey2.DIDKey()
495 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner)
496 if err != nil {
497 return err
498 }
499 err = mod.UpdateIdentity(&model.Identity{
500 ID: intermittentMediaSigner.Pub().String(),
501 Handle: "stream-intermittent-tester",
502 DID: "",
503 })
504 if err != nil {
505 return err
506 }
507 cli.AllowedStreams = append(cli.AllowedStreams, did2)
508 a.Aliases["intermittent-self-test"] = did2
509
510 group.Go(func() error {
511 for {
512 // Start intermittent stream
513 intermittentCtx, cancel := context.WithCancel(ctx)
514 done := make(chan struct{})
515 go func() {
516 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
517 close(done)
518 }()
519 // Stream ON for 15 seconds
520 time.Sleep(15 * time.Second)
521 // Stop stream
522 cancel()
523 <-done // Wait for TestSource to exit
524 // Stream OFF for 15 seconds
525 time.Sleep(15 * time.Second)
526 }
527 })
528 }
529
530 for _, job := range platformJobs {
531 group.Go(func() error {
532 return job(ctx, &cli)
533 })
534 }
535
536 if cli.WHIPTest != "" {
537 group.Go(func() error {
538 err := WHIP(strings.Split(cli.WHIPTest, " "))
539 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
540 time.Sleep(time.Second * 3)
541 // gst.Deinit()
542 log.Warn(ctx, "gst deinit complete, exiting")
543 return err
544 })
545 }
546
547 return group.Wait()
548}
549
550var ErrCaughtSignal = errors.New("caught signal")
551
552func handleSignals(ctx context.Context) error {
553 c := make(chan os.Signal, 1)
554 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
555 for {
556 select {
557 case s := <-c:
558 if s == syscall.SIGABRT {
559 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
560 log.Error(ctx, "failed to create pprof", "error", err)
561 }
562 }
563 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
564 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
565 case <-ctx.Done():
566 return nil
567 }
568 }
569}