Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "go.opentelemetry.io/otel"
12 "go.opentelemetry.io/otel/attribute"
13 "go.opentelemetry.io/otel/trace"
14 "stream.place/streamplace/pkg/log"
15)
16
17// element that takes the input stream, muxes to mp4, and signs the result
18func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
19 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1")
20 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
21 "name": "signer",
22 "async-finalize": true,
23 "sink-factory": "appsink",
24 "muxer-factory": "mp4mux",
25 "max-size-bytes": 1,
26 })
27 if err != nil {
28 return nil, err
29 }
30
31 p := elem.GetRequestPad("video")
32 if p == nil {
33 return nil, fmt.Errorf("failed to get video pad")
34 }
35 p = elem.GetRequestPad("audio_%u")
36 if p == nil {
37 return nil, fmt.Errorf("failed to get audio pad")
38 }
39
40 resetTimer := make(chan struct{})
41
42 go func() {
43 for {
44 select {
45 case <-ctx.Done():
46 return
47 case <-resetTimer:
48 continue
49 case <-time.After(time.Second * 30):
50 log.Warn(ctx, "no new segment for 30 seconds")
51 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)")
52 return
53 }
54 }
55 }()
56
57 elem.Connect("sink-added", func(split, sinkEle *gst.Element) {
58 buf := &bytes.Buffer{}
59 appsink := app.SinkFromElement(sinkEle)
60 if appsink == nil {
61 panic("appsink should not be nil")
62 }
63
64 appsink.SetCallbacks(&app.SinkCallbacks{
65 NewSampleFunc: WriterNewSample(ctx, buf),
66 EOSFunc: func(sink *app.Sink) {
67 ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes(
68 attribute.String("streamer", ms.Streamer()),
69 ))
70 defer span.End()
71 resetTimer <- struct{}{}
72 now := time.Now().UnixMilli()
73 bs := buf.Bytes()
74 if mm.cli.SmearAudio {
75 smearedBuf := &bytes.Buffer{}
76 err := SmearAudioTimestamps(ctx, bytes.NewReader(buf.Bytes()), smearedBuf)
77 if err != nil {
78 log.Error(ctx, "error smearing audio timestamps", "error", err)
79 return
80 }
81 bs = smearedBuf.Bytes()
82 }
83 bs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
84 if err != nil {
85 log.Error(ctx, "error signing segment", "error", err)
86 return
87 }
88
89 err = mm.ValidateMP4(ctx, bytes.NewReader(bs))
90 if err != nil {
91 log.Error(ctx, "error validating segment", "error", err)
92 return
93 }
94 },
95 })
96 })
97
98 return elem, nil
99}