Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "strings"
9 "time"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "github.com/google/uuid"
14 "stream.place/streamplace/pkg/config"
15 "stream.place/streamplace/pkg/log"
16)
17
18type SegmentBuffer struct {
19 bytes []byte
20 pts *time.Duration
21 dur *time.Duration
22}
23
24type SegmentData struct {
25 Audio []SegmentBuffer
26 AudioCaps string
27 Video []SegmentBuffer
28 VideoCaps string
29}
30
31func RewriteAudioTimestamps(ctx context.Context, cli *config.CLI, input io.Reader, output io.Writer, doSmear bool) error {
32 bs, err := io.ReadAll(input)
33 if err != nil {
34 return err
35 }
36 seg, err := ToBuffers(ctx, bytes.NewReader(bs))
37 if err != nil {
38 cli.DumpDebugSegment(ctx, "audio_smear_input", bytes.NewReader(bs))
39 return err
40 }
41
42 if doSmear {
43 err = seg.Normalize(ctx)
44 if err != nil {
45 return err
46 }
47 }
48
49 return JoinAudioVideo(ctx, seg, output)
50}
51
52func (s *SegmentData) Normalize(ctx context.Context) error {
53 if len(s.Video) == 0 {
54 return fmt.Errorf("no video segments")
55 }
56 if len(s.Audio) == 0 {
57 return fmt.Errorf("no audio segments")
58 }
59
60 lastVideo := s.Video[len(s.Video)-1]
61 lastAudio := s.Audio[len(s.Audio)-1]
62
63 if lastVideo.pts == nil {
64 return fmt.Errorf("last video segment has no pts")
65 }
66 if lastAudio.pts == nil {
67 return fmt.Errorf("last audio segment has no pts")
68 }
69
70 videoEnd := lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds()
71 audioEnd := lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds()
72
73 diff := videoEnd - audioEnd
74 diffPerAudio := diff / int64(len(s.Audio)-1)
75 for i, audio := range s.Audio {
76 newPts := time.Duration(audio.pts.Nanoseconds() + (diffPerAudio * int64(i)))
77 audio.pts = &newPts
78 s.Audio[i] = audio
79 }
80
81 lastVideo = s.Video[len(s.Video)-1]
82 lastAudio = s.Audio[len(s.Audio)-1]
83
84 return nil
85}
86
87func ToBuffers(ctx context.Context, input io.Reader) (*SegmentData, error) {
88
89 ctx, cancel := context.WithCancel(ctx)
90 defer cancel()
91 ctx = log.WithLogValues(ctx, "func", "SplitAudioVideo")
92
93 pipelineSlice := []string{
94 "appsrc name=mp4src ! qtdemux name=demux",
95 "demux.video_0 ! queue ! appsink sync=false name=videoappsink",
96 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink",
97 }
98
99 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err)
102 }
103
104 mp4src, err := pipeline.GetElementByName("mp4src")
105 if err != nil {
106 return nil, fmt.Errorf("failed to get mp4src element: %w", err)
107 }
108 src := app.SrcFromElement(mp4src)
109 if src == nil {
110 return nil, fmt.Errorf("failed to get mp4src element: %w", err)
111 }
112 src.SetCallbacks(&app.SourceCallbacks{
113 NeedDataFunc: ReaderNeedData(ctx, input),
114 })
115
116 seg := SegmentData{
117 Audio: []SegmentBuffer{},
118 Video: []SegmentBuffer{},
119 }
120
121 audioSinkElem, err := pipeline.GetElementByName("audioappsink")
122 if err != nil {
123 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
124 }
125 audioSink := app.SinkFromElement(audioSinkElem)
126 if audioSink == nil {
127 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
128 }
129
130 audioSink.SetCallbacks(&app.SinkCallbacks{
131 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
132 sample := sink.PullSample()
133 if sample == nil {
134 return gst.FlowOK
135 }
136
137 // Retrieve the buffer from the sample.
138 buffer := sample.GetBuffer()
139 // log.Log(ctx, "audio buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration(), "dts", buffer.DecodingTimestamp())
140 bs := buffer.Map(gst.MapRead).Bytes()
141 defer buffer.Unmap()
142 sinkPads, err := sink.GetSinkPads()
143 if err != nil {
144 src.Error("could not get sink pads", err)
145 return gst.FlowError
146 }
147 caps := sinkPads[0].GetCurrentCaps()
148 if caps != nil {
149 seg.AudioCaps = caps.String()
150 }
151
152 seg.Audio = append(seg.Audio, SegmentBuffer{
153 bytes: bs,
154 pts: buffer.PresentationTimestamp().AsDuration(),
155 dur: buffer.Duration().AsDuration(),
156 })
157
158 return gst.FlowOK
159 },
160 })
161
162 videoSinkElem, err := pipeline.GetElementByName("videoappsink")
163 if err != nil {
164 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
165 }
166 videoSink := app.SinkFromElement(videoSinkElem)
167 if videoSink == nil {
168 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
169 }
170 videoSink.SetCallbacks(&app.SinkCallbacks{
171 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
172 sample := sink.PullSample()
173 if sample == nil {
174 return gst.FlowOK
175 }
176
177 // Retrieve the buffer from the sample.
178 buffer := sample.GetBuffer()
179 // log.Log(ctx, "video buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration())
180 bs := buffer.Map(gst.MapRead).Bytes()
181 defer buffer.Unmap()
182 sinkPads, err := sink.GetSinkPads()
183 if err != nil {
184 src.Error("could not get sink pads", err)
185 return gst.FlowError
186 }
187 caps := sinkPads[0].GetCurrentCaps()
188 if caps != nil {
189 seg.VideoCaps = caps.String()
190 }
191
192 sb := SegmentBuffer{
193 bytes: bs,
194 pts: buffer.PresentationTimestamp().AsDuration(),
195 dur: buffer.Duration().AsDuration(),
196 }
197
198 // log.Log(ctx, "video buffer", "presentation_timestamp", sb.pts, "duration", sb.dur)
199 if sb.pts == nil {
200 sink.Error("no video pts", fmt.Errorf("no video pts"))
201 return gst.FlowError
202 }
203
204 seg.Video = append(seg.Video, sb)
205
206 return gst.FlowOK
207 },
208 })
209 errCh := make(chan error)
210 go func() {
211 err := HandleBusMessages(ctx, pipeline)
212 errCh <- err
213 }()
214
215 defer func() {
216 if err != nil {
217 log.Error(ctx, "bus handler error", "error", err)
218 }
219 err = pipeline.SetState(gst.StateNull)
220 if err != nil {
221 log.Error(ctx, "failed to set pipeline to null state", "error", err)
222 }
223 }()
224
225 if err := pipeline.SetState(gst.StatePlaying); err != nil {
226 return nil, fmt.Errorf("failed to set pipeline state: %w", err)
227 }
228
229 pipelineErr := <-errCh
230
231 if pipelineErr != nil {
232 return nil, fmt.Errorf("pipeline error: %w", pipelineErr)
233 }
234
235 if len(seg.Video) == 0 {
236 return nil, fmt.Errorf("no video segments when rewriting audio")
237 }
238 if len(seg.Audio) == 0 {
239 return nil, fmt.Errorf("no audio segments when rewriting audio")
240 }
241
242 return &seg, nil
243}
244
245func JoinAudioVideo(ctx context.Context, seg *SegmentData, output io.Writer) error {
246 uu, _ := uuid.NewV7()
247 ctx = log.WithLogValues(ctx, "func", "JoinAudioVideo", "uuid", uu.String())
248
249 pipelineSlice := []string{
250 "mp4mux name=mux ! appsink sync=false name=mp4sink",
251 "appsrc name=videosrc format=time ! queue ! mux.video_0",
252 "appsrc name=audiosrc format=time ! queue ! mux.audio_0",
253 }
254
255 ctx, cancel := context.WithCancel(ctx)
256 defer cancel()
257
258 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
259 if err != nil {
260 return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
261 }
262
263 errCh := make(chan error)
264 go func() {
265 err := HandleBusMessages(ctx, pipeline)
266 errCh <- err
267 }()
268
269 videoSrcElem, err := pipeline.GetElementByName("videosrc")
270 if err != nil {
271 return fmt.Errorf("failed to get videosrc element: %w", err)
272 }
273 videoSrc := app.SrcFromElement(videoSrcElem)
274 if videoSrc == nil {
275 return fmt.Errorf("failed to get videosrc element: %w", err)
276 }
277 videoSrc.SetCaps(gst.NewCapsFromString(seg.VideoCaps))
278 for _, seg := range seg.Video {
279 buf := gst.NewBufferFromBytes(seg.bytes)
280 if seg.pts != nil {
281 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds())))
282 } else {
283 videoSrc.Error("no video pts", fmt.Errorf("no video pts"))
284 return fmt.Errorf("no video pts")
285 }
286 if seg.dur != nil {
287 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds())))
288 }
289 ret := videoSrc.PushBuffer(buf)
290 if ret != gst.FlowOK {
291 return fmt.Errorf("failed to push video buffer: %s", ret)
292 }
293 }
294
295 audioSrcElem, err := pipeline.GetElementByName("audiosrc")
296 if err != nil {
297 return fmt.Errorf("failed to get audiosrc element: %w", err)
298 }
299 audioSrc := app.SrcFromElement(audioSrcElem)
300 if audioSrc == nil {
301 return fmt.Errorf("failed to get audiosrc element: %w", err)
302 }
303 audioSrc.SetCaps(gst.NewCapsFromString(seg.AudioCaps))
304 for _, seg := range seg.Audio {
305 buf := gst.NewBufferFromBytes(seg.bytes)
306 if seg.pts != nil {
307 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds())))
308 }
309 if seg.dur != nil {
310 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds())))
311 }
312 ret := audioSrc.PushBuffer(buf)
313 if ret != gst.FlowOK {
314 return fmt.Errorf("failed to push audio buffer: %s", ret)
315 }
316 }
317
318 videoSrc.EndStream()
319 audioSrc.EndStream()
320 mp4sinkElem, err := pipeline.GetElementByName("mp4sink")
321 if err != nil {
322 return fmt.Errorf("failed to get mp4sink element: %w", err)
323 }
324 mp4sink := app.SinkFromElement(mp4sinkElem)
325 if mp4sink == nil {
326 return fmt.Errorf("failed to get mp4sink element: %w", err)
327 }
328 mp4sink.SetCallbacks(&app.SinkCallbacks{
329 NewSampleFunc: WriterNewSample(ctx, output),
330 })
331
332 defer func() {
333 err = pipeline.SetState(gst.StateNull)
334 if err != nil {
335 log.Error(ctx, "failed to set pipeline to null state", "error", err)
336 }
337 }()
338
339 if err := pipeline.SetState(gst.StatePlaying); err != nil {
340 return fmt.Errorf("failed to set pipeline state: %w", err)
341 }
342
343 return <-errCh
344}