Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "crypto"
6 "errors"
7 "flag"
8 "fmt"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "runtime"
13 "runtime/pprof"
14 "strconv"
15 "strings"
16 "syscall"
17 "time"
18
19 "github.com/streamplace/oatproxy/pkg/oatproxy"
20 "golang.org/x/term"
21 "stream.place/streamplace/pkg/aqhttp"
22 "stream.place/streamplace/pkg/atproto"
23 "stream.place/streamplace/pkg/bus"
24 "stream.place/streamplace/pkg/crypto/signers"
25 "stream.place/streamplace/pkg/crypto/signers/eip712"
26 "stream.place/streamplace/pkg/director"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/media"
29 "stream.place/streamplace/pkg/notifications"
30 "stream.place/streamplace/pkg/replication"
31 "stream.place/streamplace/pkg/replication/boring"
32 "stream.place/streamplace/pkg/resync"
33 "stream.place/streamplace/pkg/rtmps"
34 v0 "stream.place/streamplace/pkg/schema/v0"
35 "stream.place/streamplace/pkg/spmetrics"
36
37 "github.com/ThalesGroup/crypto11"
38 _ "github.com/go-gst/go-glib/glib"
39 _ "github.com/go-gst/go-gst/gst"
40 "stream.place/streamplace/pkg/api"
41 "stream.place/streamplace/pkg/config"
42 "stream.place/streamplace/pkg/model"
43)
44
45// Additional jobs that can be injected by platforms
46type jobFunc func(ctx context.Context, cli *config.CLI) error
47
48// parse the CLI and fire up an streamplace node!
49func start(build *config.BuildFlags, platformJobs []jobFunc) error {
50 selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
51 err := media.RunSelfTest(context.Background())
52 if err != nil {
53 if selfTest {
54 fmt.Println(err.Error())
55 os.Exit(1)
56 } else {
57 retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
58 if retryCount >= 3 {
59 log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
60 return err
61 }
62 log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
63 os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
64 err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
65 if err != nil {
66 log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
67 return err
68 }
69 panic("invalid code path: exec succeeded but we're still here???")
70 }
71 }
72 if selfTest {
73 runtime.GC()
74 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
75 log.Error(context.Background(), "error creating pprof", "error", err)
76 }
77 fmt.Println("self-test successful!")
78 os.Exit(0)
79 }
80
81 if len(os.Args) > 1 && os.Args[1] == "stream" {
82 if len(os.Args) != 3 {
83 fmt.Println("usage: streamplace stream [user]")
84 os.Exit(1)
85 }
86 return Stream(os.Args[2])
87 }
88
89 if len(os.Args) > 1 && os.Args[1] == "live" {
90 if len(os.Args) != 3 {
91 fmt.Println("usage: streamplace live [stream-key]")
92 os.Exit(1)
93 }
94 return Live(os.Args[2])
95 }
96
97 if len(os.Args) > 1 && os.Args[1] == "sign" {
98 return Sign(context.Background())
99 }
100
101 if len(os.Args) > 1 && os.Args[1] == "whep" {
102 return WHEP(os.Args[2:])
103 }
104 if len(os.Args) > 1 && os.Args[1] == "whip" {
105 return WHIP(os.Args[2:])
106 }
107
108 if len(os.Args) > 1 && os.Args[1] == "self-test" {
109 err := media.RunSelfTest(context.Background())
110 if err != nil {
111 fmt.Println(err.Error())
112 os.Exit(1)
113 }
114 fmt.Println("self-test successful!")
115 os.Exit(0)
116 }
117 _ = flag.Set("logtostderr", "true")
118 vFlag := flag.Lookup("v")
119 cli := config.CLI{Build: build}
120 fs := cli.NewFlagSet("streamplace")
121 verbosity := fs.String("v", "3", "log verbosity level")
122 version := fs.Bool("version", false, "print version and exit")
123
124 err = cli.Parse(
125 fs, os.Args[1:],
126 )
127 if err != nil {
128 return err
129 }
130 err = flag.CommandLine.Parse(nil)
131 if err != nil {
132 return err
133 }
134 _ = vFlag.Value.Set(*verbosity)
135 log.SetColorLogger(cli.Color)
136 ctx := context.Background()
137 ctx = log.WithDebugValue(ctx, cli.Debug)
138
139 log.Log(ctx,
140 "streamplace",
141 "version", build.Version,
142 "buildTime", build.BuildTimeStr(),
143 "uuid", build.UUID,
144 "runtime.GOOS", runtime.GOOS,
145 "runtime.GOARCH", runtime.GOARCH,
146 "runtime.Version", runtime.Version())
147 if *version {
148 return nil
149 }
150 spmetrics.Version.WithLabelValues(build.Version).Inc()
151
152 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
153 if len(os.Args) > 1 && os.Args[1] == "resync" {
154 return resync.Resync(ctx, &cli)
155 }
156
157 err = os.MkdirAll(cli.DataDir, os.ModePerm)
158 if err != nil {
159 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err)
160 }
161 schema, err := v0.MakeV0Schema()
162 if err != nil {
163 return err
164 }
165 eip712signer, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
166 Schema: schema,
167 EthKeystorePath: cli.EthKeystorePath,
168 EthAccountAddr: cli.EthAccountAddr,
169 EthKeystorePassword: cli.EthPassword,
170 })
171 if err != nil {
172 return err
173 }
174 var signer crypto.Signer = eip712signer
175 if cli.PKCS11ModulePath != "" {
176 conf := &crypto11.Config{
177 Path: cli.PKCS11ModulePath,
178 }
179 count := 0
180 for _, val := range []string{cli.PKCS11TokenSlot, cli.PKCS11TokenLabel, cli.PKCS11TokenSerial} {
181 if val != "" {
182 count += 1
183 }
184 }
185 if count != 1 {
186 return fmt.Errorf("need exactly one of pkcs11-token-slot, pkcs11-token-label, or pkcs11-token-serial (got %d)", count)
187 }
188 if cli.PKCS11TokenSlot != "" {
189 num, err := strconv.ParseInt(cli.PKCS11TokenSlot, 10, 16)
190 if err != nil {
191 return fmt.Errorf("error parsing pkcs11-slot: %w", err)
192 }
193 numint := int(num)
194 // why does crypto11 want this as a reference? odd.
195 conf.SlotNumber = &numint
196 }
197 if cli.PKCS11TokenLabel != "" {
198 conf.TokenLabel = cli.PKCS11TokenLabel
199 }
200 if cli.PKCS11TokenSerial != "" {
201 conf.TokenSerial = cli.PKCS11TokenSerial
202 }
203 pin := cli.PKCS11Pin
204 if pin == "" {
205 fmt.Printf("Please enter PKCS11 PIN: ")
206 password, err := term.ReadPassword(int(os.Stdin.Fd()))
207 fmt.Println("")
208 if err != nil {
209 return fmt.Errorf("error reading PKCS11 password: %w", err)
210 }
211 pin = string(password)
212 }
213 conf.Pin = pin
214
215 sc, err := crypto11.Configure(conf)
216 if err != nil {
217 return fmt.Errorf("error initalizing PKCS11 HSM: %w", err)
218 }
219 var id []byte = nil
220 var label []byte = nil
221 if cli.PKCS11KeypairID != "" {
222 num, err := strconv.ParseInt(cli.PKCS11KeypairID, 10, 8)
223 if err != nil {
224 return fmt.Errorf("error parsing pkcs11-keypair-id: %w", err)
225 }
226 id = []byte{byte(num)}
227 }
228 if cli.PKCS11KeypairLabel != "" {
229 label = []byte(cli.PKCS11KeypairLabel)
230 }
231 hwsigner, err := sc.FindKeyPair(id, label)
232 if err != nil {
233 return fmt.Errorf("error finding keypair on PKCS11 token: %w", err)
234 }
235 if hwsigner == nil {
236 return fmt.Errorf("keypair on token not found (tried id='%s' label='%s')", cli.PKCS11KeypairID, cli.PKCS11KeypairLabel)
237 }
238 addr, err := signers.HexAddrFromSigner(hwsigner)
239 if err != nil {
240 return fmt.Errorf("error getting ethereum address for hardware keypair: %w", err)
241 }
242 log.Log(ctx, "successfully initialized hardware signer", "address", addr)
243 signer = hwsigner
244 }
245 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers}
246 mod, err := model.MakeDB(cli.DBPath)
247 if err != nil {
248 return err
249 }
250 var noter notifications.FirebaseNotifier
251 if cli.FirebaseServiceAccount != "" {
252 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
253 if err != nil {
254 return err
255 }
256 }
257
258 jwkPath := cli.DataFilePath([]string{"jwk.json"})
259 jwk, err := atproto.EnsureJWK(ctx, jwkPath)
260 if err != nil {
261 return err
262 }
263 cli.JWK = jwk
264
265 accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"})
266 accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath)
267 if err != nil {
268 return err
269 }
270 cli.AccessJWK = accessJWK
271
272 b := bus.NewBus()
273 atsync := &atproto.ATProtoSynchronizer{
274 CLI: &cli,
275 Model: mod,
276 Noter: noter,
277 Bus: b,
278 }
279 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync)
280 if err != nil {
281 return err
282 }
283
284 ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer)
285 if err != nil {
286 return err
287 }
288
289 clientMetadata := &oatproxy.OAuthClientMetadata{
290 Scope: "atproto transition:generic",
291 ClientName: "Streamplace",
292 RedirectURIs: []string{
293 fmt.Sprintf("https://%s/login", cli.PublicHost),
294 fmt.Sprintf("https://%s/api/app-return", cli.PublicHost),
295 },
296 }
297
298 op := oatproxy.New(&oatproxy.Config{
299 Host: cli.PublicHost,
300 CreateOAuthSession: mod.CreateOAuthSession,
301 UpdateOAuthSession: mod.UpdateOAuthSession,
302 GetOAuthSession: mod.LoadOAuthSession,
303 Scope: "atproto transition:generic",
304 UpstreamJWK: cli.JWK,
305 DownstreamJWK: cli.AccessJWK,
306 ClientMetadata: clientMetadata,
307 })
308 d := director.NewDirector(mm, mod, &cli, b, op)
309 a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op)
310 if err != nil {
311 return err
312 }
313
314 group, ctx := TimeoutGroupWithContext(ctx)
315 ctx = log.WithLogValues(ctx, "version", build.Version)
316
317 group.Go(func() error {
318 return handleSignals(ctx)
319 })
320
321 if cli.TracingEndpoint != "" {
322 group.Go(func() error {
323 return startTelemetry(ctx, cli.TracingEndpoint)
324 })
325 }
326
327 if cli.Secure {
328 group.Go(func() error {
329 return a.ServeHTTPS(ctx)
330 })
331 group.Go(func() error {
332 return a.ServeHTTPRedirect(ctx)
333 })
334 if cli.RTMPServerAddon != "" {
335 group.Go(func() error {
336 return rtmps.ServeRTMPS(ctx, &cli)
337 })
338 }
339 } else {
340 group.Go(func() error {
341 return a.ServeHTTP(ctx)
342 })
343 }
344
345 group.Go(func() error {
346 return a.ServeInternalHTTP(ctx)
347 })
348
349 if !cli.NoFirehose {
350 group.Go(func() error {
351 return atsync.StartFirehose(ctx)
352 })
353 }
354
355 group.Go(func() error {
356 return spmetrics.ExpireSessions(ctx)
357 })
358
359 group.Go(func() error {
360 return mod.StartSegmentCleaner(ctx)
361 })
362
363 group.Go(func() error {
364 return d.Start(ctx)
365 })
366
367 if cli.TestStream {
368 testSigner, err := eip712.MakeEIP712Signer(ctx, &eip712.EIP712SignerOptions{
369 Schema: schema,
370 EthKeystorePath: filepath.Join(cli.DataDir, "test-signer"),
371 })
372 if err != nil {
373 return err
374 }
375 atkey, err := atproto.ParsePubKey(signer.Public())
376 if err != nil {
377 return err
378 }
379 did := atkey.DIDKey()
380 testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, testSigner)
381 if err != nil {
382 return err
383 }
384 err = mod.UpdateIdentity(&model.Identity{
385 ID: testMediaSigner.Pub().String(),
386 Handle: "stream-self-tester",
387 DID: "",
388 })
389 if err != nil {
390 return err
391 }
392 cli.AllowedStreams = append(cli.AllowedStreams, did)
393 a.Aliases["self-test"] = did
394 group.Go(func() error {
395 return mm.TestSource(ctx, testMediaSigner)
396 })
397 }
398
399 for _, job := range platformJobs {
400 group.Go(func() error {
401 return job(ctx, &cli)
402 })
403 }
404
405 if cli.WHIPTest != "" {
406 group.Go(func() error {
407 err := WHIP(strings.Split(cli.WHIPTest, " "))
408 log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer")
409 time.Sleep(time.Second * 3)
410 // gst.Deinit()
411 log.Warn(ctx, "gst deinit complete, exiting")
412 return err
413 })
414 }
415
416 return group.Wait()
417}
418
419var ErrCaughtSignal = errors.New("caught signal")
420
421func handleSignals(ctx context.Context) error {
422 c := make(chan os.Signal, 1)
423 signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
424 for {
425 select {
426 case s := <-c:
427 if s == syscall.SIGABRT {
428 if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
429 log.Error(ctx, "failed to create pprof", "error", err)
430 }
431 }
432 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s)
433 return fmt.Errorf("%w signal=%v", ErrCaughtSignal, s)
434 case <-ctx.Done():
435 return nil
436 }
437 }
438}