Live video on the AT Protocol
1package cmd
2
3import (
4 "bytes"
5 "context"
6 "crypto/rand"
7 "errors"
8 "flag"
9 "fmt"
10 "net/url"
11 "os"
12 "os/signal"
13 "runtime"
14 "runtime/pprof"
15 "slices"
16 "strconv"
17 "strings"
18 "syscall"
19 "time"
20
21 "github.com/bluesky-social/indigo/carstore"
22 "github.com/ethereum/go-ethereum/common/hexutil"
23 "github.com/livepeer/go-livepeer/cmd/livepeer/starter"
24 "github.com/peterbourgon/ff/v3"
25 "github.com/streamplace/oatproxy/pkg/oatproxy"
26 "stream.place/streamplace/pkg/aqhttp"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/director"
30 "stream.place/streamplace/pkg/gstinit"
31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
32 "stream.place/streamplace/pkg/log"
33 "stream.place/streamplace/pkg/media"
34 "stream.place/streamplace/pkg/notifications"
35 "stream.place/streamplace/pkg/replication"
36 "stream.place/streamplace/pkg/replication/iroh_replicator"
37 "stream.place/streamplace/pkg/replication/websocketrep"
38 "stream.place/streamplace/pkg/rtmps"
39 "stream.place/streamplace/pkg/spmetrics"
40 "stream.place/streamplace/pkg/statedb"
41 "stream.place/streamplace/pkg/storage"
42
43 _ "github.com/go-gst/go-glib/glib"
44 _ "github.com/go-gst/go-gst/gst"
45 "stream.place/streamplace/pkg/api"
46 "stream.place/streamplace/pkg/config"
47 "stream.place/streamplace/pkg/model"
48)
49
50// Additional jobs that can be injected by platforms
51type jobFunc func(ctx context.Context, cli *config.CLI) error
52
53// parse the CLI and fire up an streamplace node!
54func start(build *config.BuildFlags, platformJobs []jobFunc) error {
55 iroh_streamplace.InitLogging()
56 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
57 err := media.RunSelfTest(context.Background())
58 if err != nil {
59 if selfTest {
60 fmt.Println(err.Error())
61 os.Exit(1)
62 } else {
63 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
64 if retryCount >= 3 {
65 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
66 return err
67 }
68 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
69 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
70 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
71 if err != nil {
72 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
73 return err
74 }
75 panic("invalid code path: exec succeeded but we're still here???")
76 }
77 }
78 if selfTest {
79 runtime.GC()
80 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
81 log.Error(context.Background(), "error creating pprof", "error", err)
82 }
83 fmt.Println("self-test successful!")
84 os.Exit(0)
85 }
86
87 if len(os.Args) > 1 && os.Args[1] == "stream" {
88 if len(os.Args) != 3 {
89 fmt.Println("usage: streamplace stream [user]")
90 os.Exit(1)
91 }
92 return Stream(os.Args[2])
93 }
94
95 if len(os.Args) > 1 && os.Args[1] == "live" {
96 cli := config.CLI{Build: build}
97 fs := cli.NewFlagSet("streamplace live")
98
99 err := cli.Parse(fs, os.Args[2:])
100 if err != nil {
101 return err
102 }
103
104 args := fs.Args()
105 if len(args) != 1 {
106 fmt.Println("usage: streamplace live [flags] [stream-key]")
107 os.Exit(1)
108 }
109
110 return Live(args[0], cli.HTTPInternalAddr)
111 }
112
113 if len(os.Args) > 1 && os.Args[1] == "sign" {
114 return Sign(context.Background())
115 }
116
117 if len(os.Args) > 1 && os.Args[1] == "whep" {
118 return WHEP(os.Args[2:])
119 }
120 if len(os.Args) > 1 && os.Args[1] == "whip" {
121 return WHIP(os.Args[2:])
122 }
123
124 if len(os.Args) > 1 && os.Args[1] == "combine" {
125 return Combine(context.Background(), build, os.Args[2:])
126 }
127
128 if len(os.Args) > 1 && os.Args[1] == "split" {
129 cli := config.CLI{Build: build}
130 fs := cli.NewFlagSet("streamplace split")
131
132 err := cli.Parse(fs, os.Args[2:])
133 if err != nil {
134 return err
135 }
136 ctx := context.Background()
137 ctx = log.WithDebugValue(ctx, cli.Debug)
138 if len(fs.Args()) != 2 {
139 fmt.Println("usage: streamplace split [flags] [input file] [output directory]")
140 os.Exit(1)
141 }
142 gstinit.InitGST()
143 return Split(ctx, fs.Args()[0], fs.Args()[1])
144 }
145
146 if len(os.Args) > 1 && os.Args[1] == "self-test" {
147 err := media.RunSelfTest(context.Background())
148 if err != nil {
149 fmt.Println(err.Error())
150 os.Exit(1)
151 }
152 fmt.Println("self-test successful!")
153 os.Exit(0)
154 }
155
156 if len(os.Args) > 1 && os.Args[1] == "livepeer" {
157 lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
158 _ = starter.NewLivepeerConfig(lpfs)
159 err = ff.Parse(lpfs, os.Args[2:],
160 ff.WithConfigFileFlag("config"),
161 ff.WithEnvVarPrefix("LP"),
162 )
163 if err != nil {
164 return err
165 }
166 err = GoLivepeer(context.Background(), lpfs)
167 if err != nil {
168 log.Error(context.Background(), "error in livepeer", "error", err)
169 os.Exit(1)
170 }
171 os.Exit(0)
172 }
173
174 _ = flag.Set("logtostderr", "true")
175 vFlag := flag.Lookup("v")
176 cli := config.CLI{Build: build}
177 fs := cli.NewFlagSet("streamplace")
178 verbosity := fs.String("v", "3", "log verbosity level")
179 version := fs.Bool("version", false, "print version and exit")
180
181 err = cli.Parse(
182 fs, os.Args[1:],
183 )
184 if err != nil {
185 return err
186 }
187
188 err = flag.CommandLine.Parse(nil)
189 if err != nil {
190 return err
191 }
192 _ = vFlag.Value.Set(*verbosity)
193 log.SetColorLogger(cli.Color)
194 ctx := context.Background()
195 ctx = log.WithDebugValue(ctx, cli.Debug)
196
197 log.Log(ctx,
198 "streamplace",
199 "version", build.Version,
200 "buildTime", build.BuildTimeStr(),
201 "uuid", build.UUID,
202 "runtime.GOOS", runtime.GOOS,
203 "runtime.GOARCH", runtime.GOARCH,
204 "runtime.Version", runtime.Version())
205 if *version {
206 return nil
207 }
208 signer, err := createSigner(ctx, &cli)
209 if err != nil {
210 return err
211 }
212
213 if len(os.Args) > 1 && os.Args[1] == "migrate" {
214 return statedb.Migrate(&cli)
215 }
216
217 spmetrics.Version.WithLabelValues(build.Version).Inc()
218 if cli.LivepeerHelp {
219 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
220 _ = starter.NewLivepeerConfig(lpFlags)
221 lpFlags.VisitAll(func(f *flag.Flag) {
222 adapted := config.ToSnakeCase(f.Name)
223 fmt.Printf(" -%s\n", fmt.Sprintf("livepeer.%s", adapted))
224 usage := fmt.Sprintf(" %s", f.Usage)
225 if f.DefValue != "" {
226 usage = fmt.Sprintf("%s (default %s)", usage, f.DefValue)
227 }
228 fmt.Printf(" %s\n", usage)
229 })
230 return nil
231 }
232
233 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
234
235 err = os.MkdirAll(cli.DataDir, os.ModePerm)
236 if err != nil {
237 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
238 }
239
240 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"}))
241 if err != nil {
242 return err
243 }
244 var noter notifications.FirebaseNotifier
245 if cli.FirebaseServiceAccount != "" {
246 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
247 if err != nil {
248 return err
249 }
250 }
251
252 group, ctx := TimeoutGroupWithContext(ctx)
253
254 out := carstore.SQLiteStore{}
255 err = out.Open(":memory:")
256 if err != nil {
257 return err
258 }
259 state, err := statedb.MakeDB(ctx, &cli, noter, mod)
260 if err != nil {
261 return err
262 }
263 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
264 if err != nil {
265 return err
266 }
267 defer handle.Close()
268
269 jwk, err := state.EnsureJWK(ctx, "jwk")
270 if err != nil {
271 return err
272 }
273 cli.JWK = jwk
274
275 accessJWK, err := state.EnsureJWK(ctx, "access-jwk")
276 if err != nil {
277 return err
278 }
279 cli.AccessJWK = accessJWK
280
281 b := bus.NewBus()
282 atsync := &atproto.ATProtoSynchronizer{
283 CLI: &cli,
284 Model: mod,
285 StatefulDB: state,
286 Noter: noter,
287 Bus: b,
288 }
289 err = atsync.Migrate(ctx)
290 if err != nil {
291 return fmt.Errorf("failed to migrate: %w", err)
292 }
293
294 mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync)
295 if err != nil {
296 return err
297 }
298
299 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod)
300 if err != nil {
301 return err
302 }
303
304 var clientMetadata *oatproxy.OAuthClientMetadata
305 var host string
306 if cli.PublicOAuth {
307 u, err := url.Parse(cli.OwnPublicURL())
308 if err != nil {
309 return err
310 }
311 host = u.Host
312 clientMetadata = &oatproxy.OAuthClientMetadata{
313 Scope: "atproto transition:generic",
314 ClientName: "Streamplace",
315 RedirectURIs: []string{
316 fmt.Sprintf("%s/login", cli.OwnPublicURL()),
317 fmt.Sprintf("%s/api/app-return", cli.OwnPublicURL()),
318 },
319 }
320 } else {
321 host = cli.BroadcasterHost
322 clientMetadata = &oatproxy.OAuthClientMetadata{
323 Scope: "atproto transition:generic",
324 ClientName: "Streamplace",
325 RedirectURIs: []string{
326 fmt.Sprintf("https://%s/login", cli.BroadcasterHost),
327 fmt.Sprintf("https://%s/api/app-return", cli.BroadcasterHost),
328 },
329 }
330 }
331
332 var replicator replication.Replicator = nil
333 if slices.Contains(cli.Replicators, config.ReplicatorIroh) {
334 exists, err := cli.DataFileExists([]string{"iroh-kv-secret"})
335 if err != nil {
336 return err
337 }
338 if !exists {
339 secret := make([]byte, 32)
340 _, err := rand.Read(secret)
341 if err != nil {
342 return fmt.Errorf("failed to generate random secret: %w", err)
343 }
344 err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true)
345 if err != nil {
346 return err
347 }
348 }
349 buf := bytes.Buffer{}
350 err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf)
351 if err != nil {
352 return err
353 }
354 secret := buf.Bytes()
355 var topic []byte
356 if cli.IrohTopic != "" {
357 topic, err = hexutil.Decode("0x" + cli.IrohTopic)
358 if err != nil {
359 return err
360 }
361 }
362 replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod)
363 if err != nil {
364 return err
365 }
366 }
367 if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) {
368 replicator = websocketrep.NewWebsocketReplicator(b, mod, mm)
369 }
370
371 op := oatproxy.New(&oatproxy.Config{
372 Host: host,
373 CreateOAuthSession: state.CreateOAuthSession,
374 UpdateOAuthSession: state.UpdateOAuthSession,
375 GetOAuthSession: state.LoadOAuthSession,
376 Lock: state.GetNamedLock,
377 Scope: "atproto transition:generic",
378 UpstreamJWK: cli.JWK,
379 DownstreamJWK: cli.AccessJWK,
380 ClientMetadata: clientMetadata,
381 Public: cli.PublicOAuth,
382 })
383 d := director.NewDirector(mm, mod, &cli, b, op, state, replicator)
384 a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op)
385 if err != nil {
386 return err
387 }
388
389 ctx = log.WithLogValues(ctx, "version", build.Version)
390
391 group.Go(func() error {
392 return handleSignals(ctx)
393 })
394
395 group.Go(func() error {
396 return state.ProcessQueue(ctx)
397 })
398
399 if cli.TracingEndpoint != "" {
400 group.Go(func() error {
401 return startTelemetry(ctx, cli.TracingEndpoint)
402 })
403 }
404
405 if cli.Secure {
406 group.Go(func() error {
407 return a.ServeHTTPS(ctx)
408 })
409 group.Go(func() error {
410 return a.ServeHTTPRedirect(ctx)
411 })
412 if cli.RTMPServerAddon != "" {
413 group.Go(func() error {
414 return rtmps.ServeRTMPSAddon(ctx, &cli)
415 })
416 } else {
417 group.Go(func() error {
418 return a.ServeRTMPS(ctx, &cli)
419 })
420 }
421 } else {
422 group.Go(func() error {
423 return a.ServeHTTP(ctx)
424 })
425 group.Go(func() error {
426 return a.ServeRTMP(ctx)
427 })
428 }
429
430 group.Go(func() error {
431 return a.ServeInternalHTTP(ctx)
432 })
433
434 if !cli.NoFirehose {
435 group.Go(func() error {
436 return atsync.StartFirehose(ctx)
437 })
438 }
439 for _, labeler := range cli.Labelers {
440 group.Go(func() error {
441 return atsync.StartLabelerFirehose(ctx, labeler)
442 })
443 }
444
445 group.Go(func() error {
446 return a.ExpireSessions(ctx)
447 })
448
449 group.Go(func() error {
450 return storage.StartSegmentCleaner(ctx, mod, &cli)
451 })
452
453 group.Go(func() error {
454 return mod.StartSegmentCleaner(ctx)
455 })
456
457 group.Go(func() error {
458 return replicator.Start(ctx, &cli)
459 })
460
461 if cli.LivepeerGateway {
462 // make a file to make sure the directory exists
463 fd, err := cli.DataFileCreate([]string{"livepeer", "gateway", "empty"}, true)
464 if err != nil {
465 return err
466 }
467 fd.Close()
468 if err != nil {
469 return err
470 }
471 group.Go(func() error {
472 err := GoLivepeer(ctx, fs)
473 if err != nil {
474 return err
475 }
476 // livepeer returns nil on error, so we need to check if we're responsible
477 if ctx.Err() == nil {
478 return fmt.Errorf("livepeer exited")
479 }
480 return nil
481 })
482 }
483
484 group.Go(func() error {
485 return d.Start(ctx)
486 })
487
488 if cli.TestStream {
489 atkey, err := atproto.ParsePubKey(signer.Public())
490 if err != nil {
491 return err
492 }
493 did := atkey.DIDKey()
494 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, signer, mod)
495 if err != nil {
496 return err
497 }
498 err = mod.UpdateIdentity(&model.Identity{
499 ID: testMediaSigner.Pub().String(),
500 Handle: "stream-self-tester",
501 DID: "",
502 })
503 if err != nil {
504 return err
505 }
506 cli.AllowedStreams = append(cli.AllowedStreams, did)
507 a.Aliases["self-test"] = did
508 group.Go(func() error {
509 return mm.TestSource(ctx, testMediaSigner)
510 })
511
512 // Start a test stream that will run intermittently
513 if err != nil {
514 return err
515 }
516 atkey2, err := atproto.ParsePubKey(signer.Public())
517 if err != nil {
518 return err
519 }
520 did2 := atkey2.DIDKey()
521 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, signer, mod)
522 if err != nil {
523 return err
524 }
525 err = mod.UpdateIdentity(&model.Identity{
526 ID: intermittentMediaSigner.Pub().String(),
527 Handle: "stream-intermittent-tester",
528 DID: "",
529 })
530 if err != nil {
531 return err
532 }
533 cli.AllowedStreams = append(cli.AllowedStreams, did2)
534 a.Aliases["intermittent-self-test"] = did2
535
536 group.Go(func() error {
537 for {
538 // Start intermittent stream
539 intermittentCtx, cancel := context.WithCancel(ctx)
540 done := make(chan struct{})
541 go func() {
542 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
543 close(done)
544 }()
545 // Stream ON for 15 seconds
546 time.Sleep(15 * time.Second)
547 // Stop stream
548 cancel()
549 <-done // Wait for TestSource to exit
550 // Stream OFF for 15 seconds
551 time.Sleep(15 * time.Second)
552 }
553 })
554 }
555
556 for _, job := range platformJobs {
557 group.Go(func() error {
558 return job(ctx, &cli)
559 })
560 }
561
562 if cli.WHIPTest != "" {
563 group.Go(func() error {
564 err := WHIP(strings.Split(cli.WHIPTest, " "))
565 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
566 time.Sleep(time.Second * 3)
567 // gst.Deinit()
568 log.Warn(ctx, "gst deinit complete, exiting")
569 return err
570 })
571 }
572
573 return group.Wait()
574}
575
576var ErrCaughtSignal = errors.New("caught signal")
577
578func handleSignals(ctx context.Context) error {
579 c := make(chan os.Signal, 1)
580 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
581 for {
582 select {
583 case s := <-c:
584 if s == syscall.SIGABRT {
585 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
586 log.Error(ctx, "failed to create pprof", "error", err)
587 }
588 }
589 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
590 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
591 case <-ctx.Done():
592 return nil
593 }
594 }
595}