Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "strings"
10 "time"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/log"
15)
16
17// element that takes the input stream, muxes to mp4, and signs the result
18func SegmentElem(ctx context.Context, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) {
19 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1")
20 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
21 "name": "signer",
22 "async-finalize": true,
23 "sink-factory": "appsink",
24 "muxer-factory": "mp4mux",
25 "max-size-bytes": 1,
26 })
27 if err != nil {
28 return nil, err
29 }
30
31 p := elem.GetRequestPad("video")
32 if p == nil {
33 return nil, fmt.Errorf("failed to get video pad")
34 }
35 p = elem.GetRequestPad("audio_%u")
36 if p == nil {
37 return nil, fmt.Errorf("failed to get audio pad")
38 }
39
40 resetTimer := make(chan struct{})
41
42 go func() {
43 for {
44 select {
45 case <-ctx.Done():
46 return
47 case <-resetTimer:
48 continue
49 case <-time.After(time.Second * 30):
50 log.Warn(ctx, "no new segment for 30 seconds")
51 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)")
52 return
53 }
54 }
55 }()
56
57 // we didn't need faststart but i'm leaving this commented here in case
58 // you want to change any other muxer properties in the future
59
60 _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) {
61 err := muxEle.SetProperty("presentation-time", false)
62 if err != nil {
63 panic("error setting presentation-time to false: " + err.Error())
64 }
65 err = muxEle.SetProperty("interleave-bytes", InterleaveBytes)
66 if err != nil {
67 panic("error setting interleave-bytes" + err.Error())
68 }
69 err = muxEle.SetProperty("interleave-time", InterleaveTime)
70 if err != nil {
71 panic("error setting interleave-time" + err.Error())
72 }
73 })
74 if err != nil {
75 return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err)
76 }
77
78 // channel to make sure data is emitted in order
79 var ch chan struct{}
80
81 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) {
82 previousSegCh := ch
83 mySegCh := make(chan struct{}, 1)
84 ch = mySegCh
85 buf := &bytes.Buffer{}
86 err := sinkEle.SetProperty("sync", false)
87 if err != nil {
88 panic("error setting sync to false: " + err.Error())
89 }
90 appsink := app.SinkFromElement(sinkEle)
91 if appsink == nil {
92 panic("appsink should not be nil")
93 }
94
95 appsink.SetCallbacks(&app.SinkCallbacks{
96 NewSampleFunc: WriterNewSample(ctx, buf),
97 EOSFunc: func(sink *app.Sink) {
98 // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes(
99 // attribute.String("streamer", ms.Streamer()),
100 // ))
101 // defer span.End()
102 now := time.Now().UnixMilli()
103 resetTimer <- struct{}{}
104 bs := buf.Bytes()
105
106 if previousSegCh != nil {
107 <-previousSegCh
108 }
109 err := cb(ctx, bs, now)
110 if err != nil {
111 log.Error(ctx, "error signing segment", "error", err)
112 return
113 }
114 close(mySegCh)
115
116 },
117 })
118 })
119 if err != nil {
120 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err)
121 }
122
123 return elem, nil
124}
125
126func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
127 return SegmentElem(ctx, func(ctx context.Context, bs []byte, now int64) error {
128 if mm.cli.SmearAudio {
129 smearedBuf := &bytes.Buffer{}
130 err := SmearAudioTimestamps(ctx, bytes.NewReader(bs), smearedBuf)
131 if err != nil {
132 return fmt.Errorf("error smearing audio timestamps: %w", err)
133 }
134 bs = smearedBuf.Bytes()
135 }
136 signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
137 if err != nil {
138 return err
139 }
140 return mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true)
141 })
142}
143
144func SegmentFileUnsigned(ctx context.Context, input string, ch chan *SplitSegment) error {
145 fd, err := os.OpenFile(input, os.O_RDONLY, 0644)
146 log.Log(ctx, "reading file", "file", input)
147 if err != nil {
148 return fmt.Errorf("failed to read file: %w", err)
149 }
150 defer fd.Close()
151 return SegmentUnsigned(ctx, fd, ch)
152}
153
154func SegmentUnsigned(ctx context.Context, input io.Reader, ch chan *SplitSegment) error {
155 ctx, cancel := context.WithCancel(ctx)
156 defer cancel()
157 pipelineSlice := []string{
158 "appsrc name=appsrc ! qtdemux name=demux",
159 "demux. ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1",
160 "demux. ! queue ! opusparse name=audioparse",
161 }
162 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
163 if err != nil {
164 return fmt.Errorf("error creating MKVIngest pipeline: %w", err)
165 }
166
167 srcele, err := pipeline.GetElementByName("appsrc")
168 if err != nil {
169 return err
170 }
171 src := app.SrcFromElement(srcele)
172 src.SetCallbacks(&app.SourceCallbacks{
173 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
174 })
175 videoParseEle, err := pipeline.GetElementByName("videoparse")
176 if err != nil {
177 return err
178 }
179
180 segmenter, err := SegmentElem(ctx, func(ctx context.Context, buf []byte, now int64) error {
181 ch <- &SplitSegment{
182 Filename: fmt.Sprintf("%d.mp4", now),
183 Data: buf,
184 }
185 return nil
186 })
187 if err != nil {
188 return err
189 }
190
191 err = pipeline.Add(segmenter)
192 if err != nil {
193 return err
194 }
195 err = videoParseEle.Link(segmenter)
196 if err != nil {
197 return err
198 }
199 audioparse, err := pipeline.GetElementByName("audioparse")
200 if err != nil {
201 return err
202 }
203 err = audioparse.Link(segmenter)
204 if err != nil {
205 return err
206 }
207
208 busErr := make(chan error)
209 go func() {
210 err := HandleBusMessages(ctx, pipeline)
211 cancel()
212 busErr <- err
213 }()
214
215 err = pipeline.SetState(gst.StatePlaying)
216 if err != nil {
217 return err
218 }
219
220 defer func() {
221 err := pipeline.SetState(gst.StateNull)
222 if err != nil {
223 log.Error(ctx, "error setting pipeline to null state", "error", err)
224 }
225 }()
226
227 err = <-busErr
228 if err != nil {
229 return err
230 }
231
232 return nil
233}