Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/streamplace/oatproxy/pkg/oatproxy"
20 "golang.org/x/term"
21 "stream.place/streamplace/pkg/aqhttp"
22 "stream.place/streamplace/pkg/atproto"
23 "stream.place/streamplace/pkg/bus"
24 "stream.place/streamplace/pkg/crypto/signers"
25 "stream.place/streamplace/pkg/crypto/signers/eip712"
26 "stream.place/streamplace/pkg/director"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/media"
29 "stream.place/streamplace/pkg/notifications"
30 "stream.place/streamplace/pkg/replication"
31 "stream.place/streamplace/pkg/replication/boring"
32 "stream.place/streamplace/pkg/resync"
33 "stream.place/streamplace/pkg/rtmps"
34 v0 "stream.place/streamplace/pkg/schema/v0"
35 "stream.place/streamplace/pkg/spmetrics"
36
37 "github.com/ThalesGroup/crypto11"
38 _ "github.com/go-gst/go-glib/glib"
39 _ "github.com/go-gst/go-gst/gst"
40 "stream.place/streamplace/pkg/api"
41 "stream.place/streamplace/pkg/config"
42 "stream.place/streamplace/pkg/model"
43)
44
45// Additional jobs that can be injected by platforms
46type jobFunc func(ctx context.Context, cli *config.CLI) error
47
48// parse the CLI and fire up an streamplace node!
49func start(build *config.BuildFlags, platformJobs []jobFunc) error {
50 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
51 err := media.RunSelfTest(context.Background())
52 if err != nil {
53 if selfTest {
54 fmt.Println(err.Error())
55 os.Exit(1)
56 } else {
57 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
58 if retryCount >= 3 {
59 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
60 return err
61 }
62 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
63 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
64 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
65 if err != nil {
66 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
67 return err
68 }
69 panic("invalid code path: exec succeeded but we're still here???")
70 }
71 }
72 if selfTest {
73 runtime.GC()
74 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
75 log.Error(context.Background(), "error creating pprof", "error", err)
76 }
77 fmt.Println("self-test successful!")
78 os.Exit(0)
79 }
80
81 if len(os.Args) > 1 && os.Args[1] == "stream" {
82 if len(os.Args) != 3 {
83 fmt.Println("usage: streamplace stream [user]")
84 os.Exit(1)
85 }
86 return Stream(os.Args[2])
87 }
88
89 if len(os.Args) > 1 && os.Args[1] == "live" {
90 cli := config.CLI{Build: build}
91 fs := cli.NewFlagSet("streamplace live")
92
93 err := cli.Parse(fs, os.Args[2:])
94 if err != nil {
95 return err
96 }
97
98 args := fs.Args()
99 if len(args) != 1 {
100 fmt.Println("usage: streamplace live [flags] [stream-key]")
101 os.Exit(1)
102 }
103
104 return Live(args[0], cli.HTTPInternalAddr)
105 }
106
107 if len(os.Args) > 1 && os.Args[1] == "sign" {
108 return Sign(context.Background())
109 }
110
111 if len(os.Args) > 1 && os.Args[1] == "whep" {
112 return WHEP(os.Args[2:])
113 }
114 if len(os.Args) > 1 && os.Args[1] == "whip" {
115 return WHIP(os.Args[2:])
116 }
117
118 if len(os.Args) > 1 && os.Args[1] == "self-test" {
119 err := media.RunSelfTest(context.Background())
120 if err != nil {
121 fmt.Println(err.Error())
122 os.Exit(1)
123 }
124 fmt.Println("self-test successful!")
125 os.Exit(0)
126 }
127 _ = flag.Set("logtostderr", "true")
128 vFlag := flag.Lookup("v")
129 cli := config.CLI{Build: build}
130 fs := cli.NewFlagSet("streamplace")
131 verbosity := fs.String("v", "3", "log verbosity level")
132 version := fs.Bool("version", false, "print version and exit")
133
134 err = cli.Parse(
135 fs, os.Args[1:],
136 )
137 if err != nil {
138 return err
139 }
140 err = flag.CommandLine.Parse(nil)
141 if err != nil {
142 return err
143 }
144 _ = vFlag.Value.Set(*verbosity)
145 log.SetColorLogger(cli.Color)
146 ctx := context.Background()
147 ctx = log.WithDebugValue(ctx, cli.Debug)
148
149 log.Log(ctx,
150 "streamplace",
151 "version", build.Version,
152 "buildTime", build.BuildTimeStr(),
153 "uuid", build.UUID,
154 "runtime.GOOS", runtime.GOOS,
155 "runtime.GOARCH", runtime.GOARCH,
156 "runtime.Version", runtime.Version())
157 if *version {
158 return nil
159 }
160 spmetrics.Version.WithLabelValues(build.Version).Inc()
161
162 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
163 if len(os.Args) > 1 && os.Args[1] == "resync" {
164 return resync.Resync(ctx, &cli)
165 }
166
167 err = os.MkdirAll(cli.DataDir, os.ModePerm)
168 if err != nil {
169 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
170 }
171 schema, err := v0.MakeV0Schema()
172 if err != nil {
173 return err
174 }
175 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
176 Schema: schema,
177 EthKeystorePath: cli.EthKeystorePath,
178 EthAccountAddr: cli.EthAccountAddr,
179 EthKeystorePassword: cli.EthPassword,
180 })
181 if err != nil {
182 return err
183 }
184 var signer crypto.Signer = eip712signer
185 if cli.PKCS11ModulePath != "" {
186 conf := &crypto11.Config{
187 Path: cli.PKCS11ModulePath,
188 }
189 count := 0
190 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
191 if val != "" {
192 count += 1
193 }
194 }
195 if count != 1 {
196 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
197 }
198 if cli.PKCS11TokenSlot != "" {
199 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
200 if err != nil {
201 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
202 }
203 numint := int(num)
204 // why does crypto11 want this as a reference? odd.
205 conf.SlotNumber = &numint
206 }
207 if cli.PKCS11TokenLabel != "" {
208 conf.TokenLabel = cli.PKCS11TokenLabel
209 }
210 if cli.PKCS11TokenSerial != "" {
211 conf.TokenSerial = cli.PKCS11TokenSerial
212 }
213 pin := cli.PKCS11Pin
214 if pin == "" {
215 fmt.Printf("Please enter PKCS11 PIN: ")
216 password, err := term.ReadPassword(int(os.Stdin.Fd()))
217 fmt.Println("")
218 if err != nil {
219 return fmt.Errorf("error reading PKCS11 password: %w", err)
220 }
221 pin = string(password)
222 }
223 conf.Pin = pin
224
225 sc, err := crypto11.Configure(conf)
226 if err != nil {
227 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
228 }
229 var id []byte = nil
230 var label []byte = nil
231 if cli.PKCS11KeypairID != "" {
232 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
233 if err != nil {
234 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
235 }
236 id = []byte{byte(num)}
237 }
238 if cli.PKCS11KeypairLabel != "" {
239 label = []byte(cli.PKCS11KeypairLabel)
240 }
241 hwsigner, err := sc.FindKeyPair(id, label)
242 if err != nil {
243 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
244 }
245 if hwsigner == nil {
246 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
247 }
248 addr, err := signers.HexAddrFromSigner(hwsigner)
249 if err != nil {
250 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
251 }
252 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
253 signer = hwsigner
254 }
255 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
256 mod, err := model.MakeDB(cli.DBPath)
257 if err != nil {
258 return err
259 }
260 handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod)
261 if err != nil {
262 return err
263 }
264 defer handle.Close()
265 var noter notifications.FirebaseNotifier
266 if cli.FirebaseServiceAccount != "" {
267 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
268 if err != nil {
269 return err
270 }
271 }
272
273 jwkPath := cli.DataFilePath([]string{"jwk.json"})
274 jwk, err := atproto.EnsureJWK(ctx, jwkPath)
275 if err != nil {
276 return err
277 }
278 cli.JWK = jwk
279
280 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"})
281 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath)
282 if err != nil {
283 return err
284 }
285 cli.AccessJWK = accessJWK
286
287 b := bus.NewBus()
288 atsync := &atproto.ATProtoSynchronizer{
289 CLI: &cli,
290 Model: mod,
291 Noter: noter,
292 Bus: b,
293 }
294 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
295 if err != nil {
296 return err
297 }
298
299 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
300 if err != nil {
301 return err
302 }
303
304 clientMetadata := &oatproxy.OAuthClientMetadata{
305 Scope: "atproto transition:generic",
306 ClientName: "Streamplace",
307 RedirectURIs: []string{
308 fmt.Sprintf("https://%s/login", cli.PublicHost),
309 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
310 },
311 }
312
313 op := oatproxy.New(&oatproxy.Config{
314 Host: cli.PublicHost,
315 CreateOAuthSession: mod.CreateOAuthSession,
316 UpdateOAuthSession: mod.UpdateOAuthSession,
317 GetOAuthSession: mod.LoadOAuthSession,
318 Scope: "atproto transition:generic",
319 UpstreamJWK: cli.JWK,
320 DownstreamJWK: cli.AccessJWK,
321 ClientMetadata: clientMetadata,
322 })
323 d := director.NewDirector(mm, mod, &cli, b, op)
324 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op)
325 if err != nil {
326 return err
327 }
328
329 group, ctx := TimeoutGroupWithContext(ctx)
330 ctx = log.WithLogValues(ctx, "version", build.Version)
331
332 group.Go(func() error {
333 return handleSignals(ctx)
334 })
335
336 if cli.TracingEndpoint != "" {
337 group.Go(func() error {
338 return startTelemetry(ctx, cli.TracingEndpoint)
339 })
340 }
341
342 if cli.Secure {
343 group.Go(func() error {
344 return a.ServeHTTPS(ctx)
345 })
346 group.Go(func() error {
347 return a.ServeHTTPRedirect(ctx)
348 })
349 if cli.RTMPServerAddon != "" {
350 group.Go(func() error {
351 return rtmps.ServeRTMPS(ctx, &cli)
352 })
353 }
354 } else {
355 group.Go(func() error {
356 return a.ServeHTTP(ctx)
357 })
358 }
359
360 group.Go(func() error {
361 return a.ServeInternalHTTP(ctx)
362 })
363
364 if !cli.NoFirehose {
365 group.Go(func() error {
366 return atsync.StartFirehose(ctx)
367 })
368 }
369
370 group.Go(func() error {
371 return spmetrics.ExpireSessions(ctx)
372 })
373
374 group.Go(func() error {
375 return mod.StartSegmentCleaner(ctx)
376 })
377
378 group.Go(func() error {
379 return d.Start(ctx)
380 })
381
382 if cli.TestStream {
383 // regular stream self-test
384 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
385 Schema: schema,
386 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
387 })
388 if err != nil {
389 return err
390 }
391 atkey, err := atproto.ParsePubKey(signer.Public())
392 if err != nil {
393 return err
394 }
395 did := atkey.DIDKey()
396 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
397 if err != nil {
398 return err
399 }
400 err = mod.UpdateIdentity(&model.Identity{
401 ID: testMediaSigner.Pub().String(),
402 Handle: "stream-self-tester",
403 DID: "",
404 })
405 if err != nil {
406 return err
407 }
408 cli.AllowedStreams = append(cli.AllowedStreams, did)
409 a.Aliases["self-test"] = did
410 group.Go(func() error {
411 return mm.TestSource(ctx, testMediaSigner)
412 })
413
414 // Start a test stream that will run intermittently
415 intermittentSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
416 Schema: schema,
417 EthKeystorePath: filepath.Join(cli.DataDir, "intermittent-signer"),
418 })
419 if err != nil {
420 return err
421 }
422 atkey2, err := atproto.ParsePubKey(intermittentSigner.Public())
423 if err != nil {
424 return err
425 }
426 did2 := atkey2.DIDKey()
427 intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, intermittentSigner)
428 if err != nil {
429 return err
430 }
431 err = mod.UpdateIdentity(&model.Identity{
432 ID: intermittentMediaSigner.Pub().String(),
433 Handle: "stream-intermittent-tester",
434 DID: "",
435 })
436 if err != nil {
437 return err
438 }
439 cli.AllowedStreams = append(cli.AllowedStreams, did2)
440 a.Aliases["intermittent-self-test"] = did2
441
442 group.Go(func() error {
443 for {
444 // Start intermittent stream
445 intermittentCtx, cancel := context.WithCancel(ctx)
446 done := make(chan struct{})
447 go func() {
448 _ = mm.TestSource(intermittentCtx, intermittentMediaSigner)
449 close(done)
450 }()
451 // Stream ON for 15 seconds
452 time.Sleep(15 * time.Second)
453 // Stop stream
454 cancel()
455 <-done // Wait for TestSource to exit
456 // Stream OFF for 15 seconds
457 time.Sleep(15 * time.Second)
458 }
459 })
460 }
461
462 for _, job := range platformJobs {
463 group.Go(func() error {
464 return job(ctx, &cli)
465 })
466 }
467
468 if cli.WHIPTest != "" {
469 group.Go(func() error {
470 err := WHIP(strings.Split(cli.WHIPTest, " "))
471 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
472 time.Sleep(time.Second * 3)
473 // gst.Deinit()
474 log.Warn(ctx, "gst deinit complete, exiting")
475 return err
476 })
477 }
478
479 return group.Wait()
480}
481
482var ErrCaughtSignal = errors.New("caught signal")
483
484func handleSignals(ctx context.Context) error {
485 c := make(chan os.Signal, 1)
486 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
487 for {
488 select {
489 case s := <-c:
490 if s == syscall.SIGABRT {
491 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
492 log.Error(ctx, "failed to create pprof", "error", err)
493 }
494 }
495 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
496 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
497 case <-ctx.Done():
498 return nil
499 }
500 }
501}