Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/streamplace/oatproxy/pkg/oatproxy"
20 "golang.org/x/term"
21 "stream.place/streamplace/pkg/aqhttp"
22 "stream.place/streamplace/pkg/atproto"
23 "stream.place/streamplace/pkg/bus"
24 "stream.place/streamplace/pkg/crypto/signers"
25 "stream.place/streamplace/pkg/crypto/signers/eip712"
26 "stream.place/streamplace/pkg/director"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/media"
29 "stream.place/streamplace/pkg/notifications"
30 "stream.place/streamplace/pkg/replication"
31 "stream.place/streamplace/pkg/replication/boring"
32 "stream.place/streamplace/pkg/resync"
33 "stream.place/streamplace/pkg/rtmps"
34 v0 "stream.place/streamplace/pkg/schema/v0"
35 "stream.place/streamplace/pkg/spmetrics"
36
37 "github.com/ThalesGroup/crypto11"
38 _ "github.com/go-gst/go-glib/glib"
39 _ "github.com/go-gst/go-gst/gst"
40 "stream.place/streamplace/pkg/api"
41 "stream.place/streamplace/pkg/config"
42 "stream.place/streamplace/pkg/model"
43)
44
45// Additional jobs that can be injected by platforms
46type jobFunc func(ctx context.Context, cli *config.CLI) error
47
48// parse the CLI and fire up an streamplace node!
49func start(build *config.BuildFlags, platformJobs []jobFunc) error {
50 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
51 err := media.RunSelfTest(context.Background())
52 if err != nil {
53 if selfTest {
54 fmt.Println(err.Error())
55 os.Exit(1)
56 } else {
57 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
58 if retryCount >= 3 {
59 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
60 return err
61 }
62 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
63 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
64 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
65 if err != nil {
66 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
67 return err
68 }
69 panic("invalid code path: exec succeeded but we're still here???")
70 }
71 }
72 if selfTest {
73 runtime.GC()
74 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
75 log.Error(context.Background(), "error creating pprof", "error", err)
76 }
77 fmt.Println("self-test successful!")
78 os.Exit(0)
79 }
80
81 if len(os.Args) > 1 && os.Args[1] == "stream" {
82 if len(os.Args) != 3 {
83 fmt.Println("usage: streamplace stream [user]")
84 os.Exit(1)
85 }
86 return Stream(os.Args[2])
87 }
88
89 if len(os.Args) > 1 && os.Args[1] == "live" {
90 if len(os.Args) != 3 {
91 fmt.Println("usage: streamplace live [stream-key]")
92 os.Exit(1)
93 }
94 return Live(os.Args[2])
95 }
96
97 if len(os.Args) > 1 && os.Args[1] == "sign" {
98 return Sign(context.Background())
99 }
100
101 if len(os.Args) > 1 && os.Args[1] == "whep" {
102 return WHEP(os.Args[2:])
103 }
104 if len(os.Args) > 1 && os.Args[1] == "whip" {
105 return WHIP(os.Args[2:])
106 }
107
108 if len(os.Args) > 1 && os.Args[1] == "self-test" {
109 err := media.RunSelfTest(context.Background())
110 if err != nil {
111 fmt.Println(err.Error())
112 os.Exit(1)
113 }
114 fmt.Println("self-test successful!")
115 os.Exit(0)
116 }
117 _ = flag.Set("logtostderr", "true")
118 vFlag := flag.Lookup("v")
119 fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
120 cli := config.CLI{Build: build}
121 fs.StringVar(&cli.DataDir, "data-dir", config.DefaultDataDir(), "directory for keeping all streamplace data")
122 fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
123 fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
124 fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
125 fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
126 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
127 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
128 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
129 cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file")
130 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
131 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
132 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
133 cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
134 fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
135 fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
136 fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
137 fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
138 fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
139 fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
140 fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
141 fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
142 fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
143 fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
144 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
145 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
146 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
147 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
148 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
149 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
150 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
151 cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
152 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
153 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
154 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
155 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
156 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
157 verbosity := fs.String("v", "3", "log verbosity level")
158 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
159 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
160 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
161 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
162 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
163 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
164 fs.BoolVar(&cli.ExternalSigning, "external-signing", false, "enable external signing via exec (prevents potential memory leak)")
165 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
166 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
167 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
168 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
169 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
170 fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
171 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
172 version := fs.Bool("version", false, "print version and exit")
173
174 if runtime.GOOS == "linux" {
175 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
176 fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
177 fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
178 fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
179 }
180
181 err = cli.Parse(
182 fs, os.Args[1:],
183 )
184 if err != nil {
185 return err
186 }
187 err = flag.CommandLine.Parse(nil)
188 if err != nil {
189 return err
190 }
191 _ = vFlag.Value.Set(*verbosity)
192 log.SetColorLogger(cli.Color)
193 ctx := context.Background()
194 ctx = log.WithDebugValue(ctx, cli.Debug)
195
196 log.Log(ctx,
197 "streamplace",
198 "version", build.Version,
199 "buildTime", build.BuildTimeStr(),
200 "uuid", build.UUID,
201 "runtime.GOOS", runtime.GOOS,
202 "runtime.GOARCH", runtime.GOARCH,
203 "runtime.Version", runtime.Version())
204 if *version {
205 return nil
206 }
207 spmetrics.Version.WithLabelValues(build.Version).Inc()
208
209 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
210 if len(os.Args) > 1 && os.Args[1] == "resync" {
211 return resync.Resync(ctx, &cli)
212 }
213
214 err = os.MkdirAll(cli.DataDir, os.ModePerm)
215 if err != nil {
216 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
217 }
218 schema, err := v0.MakeV0Schema()
219 if err != nil {
220 return err
221 }
222 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
223 Schema: schema,
224 EthKeystorePath: cli.EthKeystorePath,
225 EthAccountAddr: cli.EthAccountAddr,
226 EthKeystorePassword: cli.EthPassword,
227 })
228 if err != nil {
229 return err
230 }
231 var signer crypto.Signer = eip712signer
232 if cli.PKCS11ModulePath != "" {
233 conf := &crypto11.Config{
234 Path: cli.PKCS11ModulePath,
235 }
236 count := 0
237 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
238 if val != "" {
239 count += 1
240 }
241 }
242 if count != 1 {
243 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
244 }
245 if cli.PKCS11TokenSlot != "" {
246 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
247 if err != nil {
248 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
249 }
250 numint := int(num)
251 // why does crypto11 want this as a reference? odd.
252 conf.SlotNumber = &numint
253 }
254 if cli.PKCS11TokenLabel != "" {
255 conf.TokenLabel = cli.PKCS11TokenLabel
256 }
257 if cli.PKCS11TokenSerial != "" {
258 conf.TokenSerial = cli.PKCS11TokenSerial
259 }
260 pin := cli.PKCS11Pin
261 if pin == "" {
262 fmt.Printf("Please enter PKCS11 PIN: ")
263 password, err := term.ReadPassword(int(os.Stdin.Fd()))
264 fmt.Println("")
265 if err != nil {
266 return fmt.Errorf("error reading PKCS11 password: %w", err)
267 }
268 pin = string(password)
269 }
270 conf.Pin = pin
271
272 sc, err := crypto11.Configure(conf)
273 if err != nil {
274 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
275 }
276 var id []byte = nil
277 var label []byte = nil
278 if cli.PKCS11KeypairID != "" {
279 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
280 if err != nil {
281 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
282 }
283 id = []byte{byte(num)}
284 }
285 if cli.PKCS11KeypairLabel != "" {
286 label = []byte(cli.PKCS11KeypairLabel)
287 }
288 hwsigner, err := sc.FindKeyPair(id, label)
289 if err != nil {
290 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
291 }
292 if hwsigner == nil {
293 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
294 }
295 addr, err := signers.HexAddrFromSigner(hwsigner)
296 if err != nil {
297 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
298 }
299 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
300 signer = hwsigner
301 }
302 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
303 mod, err := model.MakeDB(cli.DBPath)
304 if err != nil {
305 return err
306 }
307 var noter notifications.FirebaseNotifier
308 if cli.FirebaseServiceAccount != "" {
309 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
310 if err != nil {
311 return err
312 }
313 }
314
315 jwkPath := cli.DataFilePath([]string{"jwk.json"})
316 jwk, err := atproto.EnsureJWK(ctx, jwkPath)
317 if err != nil {
318 return err
319 }
320 cli.JWK = jwk
321
322 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"})
323 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath)
324 if err != nil {
325 return err
326 }
327 cli.AccessJWK = accessJWK
328
329 b := bus.NewBus()
330 atsync := &atproto.ATProtoSynchronizer{
331 CLI: &cli,
332 Model: mod,
333 Noter: noter,
334 Bus: b,
335 }
336 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
337 if err != nil {
338 return err
339 }
340
341 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
342 if err != nil {
343 return err
344 }
345
346 clientMetadata := &oatproxy.OAuthClientMetadata{
347 Scope: "atproto transition:generic",
348 ClientName: "Streamplace",
349 RedirectURIs: []string{
350 fmt.Sprintf("https://%s/login", cli.PublicHost),
351 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
352 },
353 }
354
355 op := oatproxy.New(&oatproxy.Config{
356 Host: cli.PublicHost,
357 CreateOAuthSession: mod.CreateOAuthSession,
358 UpdateOAuthSession: mod.UpdateOAuthSession,
359 GetOAuthSession: mod.LoadOAuthSession,
360 Scope: "atproto transition:generic",
361 UpstreamJWK: cli.JWK,
362 DownstreamJWK: cli.AccessJWK,
363 ClientMetadata: clientMetadata,
364 })
365 d := director.NewDirector(mm, mod, &cli, b, op)
366 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op)
367 if err != nil {
368 return err
369 }
370
371 group, ctx := TimeoutGroupWithContext(ctx)
372 ctx = log.WithLogValues(ctx, "version", build.Version)
373
374 group.Go(func() error {
375 return handleSignals(ctx)
376 })
377
378 if cli.TracingEndpoint != "" {
379 group.Go(func() error {
380 return startTelemetry(ctx, cli.TracingEndpoint)
381 })
382 }
383
384 if cli.Secure {
385 group.Go(func() error {
386 return a.ServeHTTPS(ctx)
387 })
388 group.Go(func() error {
389 return a.ServeHTTPRedirect(ctx)
390 })
391 if cli.RTMPServerAddon != "" {
392 group.Go(func() error {
393 return rtmps.ServeRTMPS(ctx, &cli)
394 })
395 }
396 } else {
397 group.Go(func() error {
398 return a.ServeHTTP(ctx)
399 })
400 }
401
402 group.Go(func() error {
403 return a.ServeInternalHTTP(ctx)
404 })
405
406 if !cli.NoFirehose {
407 group.Go(func() error {
408 return atsync.StartFirehose(ctx)
409 })
410 }
411
412 group.Go(func() error {
413 return spmetrics.ExpireSessions(ctx)
414 })
415
416 group.Go(func() error {
417 return mod.StartSegmentCleaner(ctx)
418 })
419
420 group.Go(func() error {
421 return d.Start(ctx)
422 })
423
424 if cli.TestStream {
425 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
426 Schema: schema,
427 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
428 })
429 if err != nil {
430 return err
431 }
432 atkey, err := atproto.ParsePubKey(signer.Public())
433 if err != nil {
434 return err
435 }
436 did := atkey.DIDKey()
437 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
438 if err != nil {
439 return err
440 }
441 err = mod.UpdateIdentity(&model.Identity{
442 ID: testMediaSigner.Pub().String(),
443 Handle: "stream-self-tester",
444 DID: "",
445 })
446 if err != nil {
447 return err
448 }
449 cli.AllowedStreams = append(cli.AllowedStreams, did)
450 a.Aliases["self-test"] = did
451 group.Go(func() error {
452 return mm.TestSource(ctx, testMediaSigner)
453 })
454 }
455
456 for _, job := range platformJobs {
457 group.Go(func() error {
458 return job(ctx, &cli)
459 })
460 }
461
462 if cli.WHIPTest != "" {
463 group.Go(func() error {
464 err := WHIP(strings.Split(cli.WHIPTest, " "))
465 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
466 time.Sleep(time.Second * 3)
467 // gst.Deinit()
468 log.Warn(ctx, "gst deinit complete, exiting")
469 return err
470 })
471 }
472
473 return group.Wait()
474}
475
476var ErrCaughtSignal = errors.New("caught signal")
477
478func handleSignals(ctx context.Context) error {
479 c := make(chan os.Signal, 1)
480 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
481 for {
482 select {
483 case s := <-c:
484 if s == syscall.SIGABRT {
485 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
486 log.Error(ctx, "failed to create pprof", "error", err)
487 }
488 }
489 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
490 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
491 case <-ctx.Done():
492 return nil
493 }
494 }
495}