Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "strings"
10 "time"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "github.com/google/uuid"
15 "stream.place/streamplace/pkg/log"
16)
17
18type SegmentBuffer struct {
19 bytes []byte
20 pts *time.Duration
21 dur *time.Duration
22}
23
24type SegmentData struct {
25 Audio []SegmentBuffer
26 AudioCaps string
27 Video []SegmentBuffer
28 VideoCaps string
29}
30
31func SmearAudioTimestamps(ctx context.Context, input io.Reader, output io.Writer) error {
32 bs, err := io.ReadAll(input)
33 if err != nil {
34 return err
35 }
36 seg, err := ToBuffers(ctx, bytes.NewReader(bs))
37 if err != nil {
38 // Write the input bytes to a file for debugging
39 debugFile := fmt.Sprintf("audio_smear_debug_%s.mp4", uuid.New().String())
40 err = os.WriteFile(debugFile, bs, 0644)
41 if err != nil {
42 log.Log(ctx, "failed to write debug file", "error", err, "path", debugFile)
43 } else {
44 log.Log(ctx, "wrote debug file", "path", debugFile)
45 }
46 return err
47 }
48
49 err = seg.Normalize(ctx)
50 if err != nil {
51 return err
52 }
53
54 return JoinAudioVideo(ctx, seg, output)
55}
56
57func (s *SegmentData) Normalize(ctx context.Context) error {
58 if len(s.Video) == 0 {
59 return fmt.Errorf("no video segments")
60 }
61 if len(s.Audio) == 0 {
62 return fmt.Errorf("no audio segments")
63 }
64
65 lastVideo := s.Video[len(s.Video)-1]
66 lastAudio := s.Audio[len(s.Audio)-1]
67
68 if lastVideo.pts == nil {
69 return fmt.Errorf("last video segment has no pts")
70 }
71 if lastAudio.pts == nil {
72 return fmt.Errorf("last audio segment has no pts")
73 }
74
75 videoEnd := lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds()
76 audioEnd := lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds()
77
78 diff := videoEnd - audioEnd
79 diffPerAudio := diff / int64(len(s.Audio)-1)
80 for i, audio := range s.Audio {
81 newPts := time.Duration(audio.pts.Nanoseconds() + (diffPerAudio * int64(i)))
82 audio.pts = &newPts
83 s.Audio[i] = audio
84 }
85
86 lastVideo = s.Video[len(s.Video)-1]
87 lastAudio = s.Audio[len(s.Audio)-1]
88 videoEnd = lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds()
89 audioEnd = lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds()
90
91 return nil
92}
93
94func ToBuffers(ctx context.Context, input io.Reader) (*SegmentData, error) {
95 ctx = log.WithLogValues(ctx, "func", "SplitAudioVideo")
96
97 pipelineSlice := []string{
98 "appsrc name=mp4src ! qtdemux name=demux",
99 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! appsink sync=false name=videoappsink",
100 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink",
101 }
102
103 ctx, cancel := context.WithCancel(ctx)
104
105 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
106 if err != nil {
107 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err)
108 }
109
110 errCh := make(chan error)
111 go func() {
112 err := HandleBusMessages(ctx, pipeline)
113 cancel()
114 errCh <- err
115 close(errCh)
116 }()
117
118 defer func() {
119 cancel()
120 err := <-errCh
121 if err != nil {
122 log.Error(ctx, "bus handler error", "error", err)
123 }
124 err = pipeline.BlockSetState(gst.StateNull)
125 if err != nil {
126 log.Error(ctx, "failed to set pipeline to null state", "error", err)
127 }
128 }()
129
130 mp4src, err := pipeline.GetElementByName("mp4src")
131 if err != nil {
132 return nil, fmt.Errorf("failed to get mp4src element: %w", err)
133 }
134 src := app.SrcFromElement(mp4src)
135 if src == nil {
136 return nil, fmt.Errorf("failed to get mp4src element: %w", err)
137 }
138 src.SetCallbacks(&app.SourceCallbacks{
139 NeedDataFunc: ReaderNeedData(ctx, input),
140 })
141
142 audioSinkElem, err := pipeline.GetElementByName("audioappsink")
143 if err != nil {
144 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
145 }
146 audioSink := app.SinkFromElement(audioSinkElem)
147 if audioSink == nil {
148 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
149 }
150
151 seg := SegmentData{
152 Audio: []SegmentBuffer{},
153 Video: []SegmentBuffer{},
154 }
155
156 audioSink.SetCallbacks(&app.SinkCallbacks{
157 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
158 sample := sink.PullSample()
159 if sample == nil {
160 return gst.FlowOK
161 }
162
163 // Retrieve the buffer from the sample.
164 buffer := sample.GetBuffer()
165 // log.Log(ctx, "audio buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration(), "dts", buffer.DecodingTimestamp())
166 bs := buffer.Map(gst.MapRead).Bytes()
167 defer buffer.Unmap()
168 sinkPads, err := sink.GetSinkPads()
169 if err != nil {
170 src.Error("could not get sink pads", err)
171 return gst.FlowError
172 }
173 caps := sinkPads[0].GetCurrentCaps()
174 if caps != nil {
175 seg.AudioCaps = caps.String()
176 }
177
178 seg.Audio = append(seg.Audio, SegmentBuffer{
179 bytes: bs,
180 pts: buffer.PresentationTimestamp().AsDuration(),
181 dur: buffer.Duration().AsDuration(),
182 })
183
184 if err != nil {
185 src.Error("could not get sink pads", err)
186 return gst.FlowError
187 }
188
189 return gst.FlowOK
190 },
191 })
192
193 videoSinkElem, err := pipeline.GetElementByName("videoappsink")
194 if err != nil {
195 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
196 }
197 videoSink := app.SinkFromElement(videoSinkElem)
198 if videoSink == nil {
199 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
200 }
201 videoSink.SetCallbacks(&app.SinkCallbacks{
202 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
203 sample := sink.PullSample()
204 if sample == nil {
205 return gst.FlowOK
206 }
207
208 // Retrieve the buffer from the sample.
209 buffer := sample.GetBuffer()
210 // log.Log(ctx, "video buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration())
211 bs := buffer.Map(gst.MapRead).Bytes()
212 defer buffer.Unmap()
213 sinkPads, err := sink.GetSinkPads()
214 if err != nil {
215 src.Error("could not get sink pads", err)
216 return gst.FlowError
217 }
218 caps := sinkPads[0].GetCurrentCaps()
219 if caps != nil {
220 seg.VideoCaps = caps.String()
221 }
222
223 sb := SegmentBuffer{
224 bytes: bs,
225 pts: buffer.PresentationTimestamp().AsDuration(),
226 dur: buffer.Duration().AsDuration(),
227 }
228
229 // log.Log(ctx, "video buffer", "presentation_timestamp", sb.pts, "duration", sb.dur)
230 if sb.pts == nil {
231 sink.Error("no video pts", fmt.Errorf("no video pts"))
232 return gst.FlowError
233 }
234
235 seg.Video = append(seg.Video, sb)
236
237 return gst.FlowOK
238 },
239 })
240
241 pipeline.SetState(gst.StatePlaying)
242
243 <-ctx.Done()
244
245 return &seg, <-errCh
246}
247
248func JoinAudioVideo(ctx context.Context, seg *SegmentData, output io.Writer) error {
249 uu, _ := uuid.NewV7()
250 ctx = log.WithLogValues(ctx, "func", "JoinAudioVideo", "uuid", uu.String())
251
252 pipelineSlice := []string{
253 "mp4mux name=mux ! appsink sync=false name=mp4sink",
254 "appsrc name=videosrc format=time ! queue ! mux.video_0",
255 "appsrc name=audiosrc format=time ! queue ! mux.audio_0",
256 }
257
258 ctx, cancel := context.WithCancel(ctx)
259
260 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
261 if err != nil {
262 return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
263 }
264
265 errCh := make(chan error)
266 go func() {
267 err := HandleBusMessages(ctx, pipeline)
268 cancel()
269 errCh <- err
270 close(errCh)
271 }()
272
273 defer func() {
274 cancel()
275 err := <-errCh
276 if err != nil {
277 log.Error(ctx, "bus handler error", "error", err)
278 }
279 err = pipeline.BlockSetState(gst.StateNull)
280 if err != nil {
281 log.Error(ctx, "failed to set pipeline to null state", "error", err)
282 }
283 }()
284
285 videoSrcElem, err := pipeline.GetElementByName("videosrc")
286 if err != nil {
287 return fmt.Errorf("failed to get videosrc element: %w", err)
288 }
289 videoSrc := app.SrcFromElement(videoSrcElem)
290 if videoSrc == nil {
291 return fmt.Errorf("failed to get videosrc element: %w", err)
292 }
293 videoSrc.SetCaps(gst.NewCapsFromString(seg.VideoCaps))
294 for _, seg := range seg.Video {
295 buf := gst.NewBufferFromBytes(seg.bytes)
296 if seg.pts != nil {
297 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds())))
298 } else {
299 videoSrc.Error("no video pts", fmt.Errorf("no video pts"))
300 return fmt.Errorf("no video pts")
301 }
302 if seg.dur != nil {
303 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds())))
304 }
305 ret := videoSrc.PushBuffer(buf)
306 if ret != gst.FlowOK {
307 return fmt.Errorf("failed to push video buffer: %s", ret)
308 } else {
309 // log.Log(ctx, "pushed video buffer", "presentation_timestamp", seg.pts, "duration", seg.dur)
310 }
311 }
312
313 audioSrcElem, err := pipeline.GetElementByName("audiosrc")
314 if err != nil {
315 return fmt.Errorf("failed to get audiosrc element: %w", err)
316 }
317 audioSrc := app.SrcFromElement(audioSrcElem)
318 if audioSrc == nil {
319 return fmt.Errorf("failed to get audiosrc element: %w", err)
320 }
321 audioSrc.SetCaps(gst.NewCapsFromString(seg.AudioCaps))
322 for _, seg := range seg.Audio {
323 buf := gst.NewBufferFromBytes(seg.bytes)
324 if seg.pts != nil {
325 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds())))
326 }
327 if seg.dur != nil {
328 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds())))
329 }
330 ret := audioSrc.PushBuffer(buf)
331 if ret != gst.FlowOK {
332 return fmt.Errorf("failed to push audio buffer: %s", ret)
333 } else {
334 // log.Log(ctx, "pushed audio buffer", "presentation_timestamp", seg.pts, "duration", seg.dur)
335 }
336 }
337
338 videoSrc.EndStream()
339 audioSrc.EndStream()
340 mp4sinkElem, err := pipeline.GetElementByName("mp4sink")
341 if err != nil {
342 return fmt.Errorf("failed to get mp4sink element: %w", err)
343 }
344 mp4sink := app.SinkFromElement(mp4sinkElem)
345 if mp4sink == nil {
346 return fmt.Errorf("failed to get mp4sink element: %w", err)
347 }
348 mp4sink.SetCallbacks(&app.SinkCallbacks{
349 NewSampleFunc: WriterNewSample(ctx, output),
350 })
351
352 pipeline.SetState(gst.StatePlaying)
353
354 <-ctx.Done()
355
356 return <-errCh
357}