Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "stream.place/streamplace/pkg/bus"
12 "stream.place/streamplace/pkg/log"
13)
14
15// take in a segment and return a bunch of packets suitable for webrtc
16func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) {
17
18 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
19 defer cancel()
20
21 pipelineSlice := []string{
22 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink",
23 "opusparse name=audioparse ! appsink sync=false name=audioappsink",
24 }
25
26 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
27 if err != nil {
28 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all
29 }
30
31 demuxBin, err := ConcatDemuxBin(ctx, seg)
32 if err != nil {
33 return nil, fmt.Errorf("failed to create concat bin: %w", err)
34 }
35
36 err = pipeline.Add(demuxBin.Element)
37 if err != nil {
38 return nil, fmt.Errorf("failed to add demux bin to bin: %w", err)
39 }
40
41 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0")
42 if demuxBinPadVideoSrc == nil {
43 return nil, fmt.Errorf("failed to get demux bin video src pad")
44 }
45
46 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0")
47 if demuxBinPadAudioSrc == nil {
48 return nil, fmt.Errorf("failed to get demux bin audio src pad")
49 }
50
51 videoParse, err := pipeline.GetElementByName("videoparse")
52 if err != nil {
53 return nil, fmt.Errorf("failed to get video parse element: %w", err)
54 }
55
56 audioParse, err := pipeline.GetElementByName("audioparse")
57 if err != nil {
58 return nil, fmt.Errorf("failed to get audio parse element: %w", err)
59 }
60
61 linked := demuxBinPadVideoSrc.Link(videoParse.GetStaticPad("sink"))
62 if linked != gst.PadLinkOK {
63 return nil, fmt.Errorf("failed to link demux bin video src pad to video parse element: %v", linked)
64 }
65
66 linked = demuxBinPadAudioSrc.Link(audioParse.GetStaticPad("sink"))
67 if linked != gst.PadLinkOK {
68 return nil, fmt.Errorf("failed to link demux bin audio src pad to audio parse element: %v", linked)
69 }
70
71 videoSink, err := pipeline.GetElementByName("videoappsink")
72 if err != nil {
73 return nil, fmt.Errorf("failed to get video appsink element: %w", err)
74 }
75 if videoSink == nil {
76 return nil, fmt.Errorf("failed to get video appsink element")
77 }
78
79 audioSink, err := pipeline.GetElementByName("audioappsink")
80 if err != nil {
81 return nil, fmt.Errorf("failed to get audio appsink element: %w", err)
82 }
83 if audioSink == nil {
84 return nil, fmt.Errorf("failed to get audio appsink element")
85 }
86
87 videoOutput := [][]byte{}
88 audioOutput := [][]byte{}
89 // eosCh := make(chan struct{})
90
91 videoappsink := app.SinkFromElement(videoSink)
92 videoappsink.SetCallbacks(&app.SinkCallbacks{
93 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
94 sample := sink.PullSample()
95 if sample == nil {
96 return gst.FlowEOS
97 }
98
99 buffer := sample.GetBuffer()
100 if buffer == nil {
101 return gst.FlowError
102 }
103
104 samples := buffer.Bytes()
105
106 videoOutput = append(videoOutput, samples)
107
108 // clockTime := buffer.Duration()
109 // dur := clockTime.AsDuration()
110 // if dur != nil {
111 // log.Log(ctx, "video duration", "duration", *dur)
112 // } else {
113 // log.Error(ctx, "no video duration", "samples", len(samples))
114 // }
115
116 return gst.FlowOK
117 },
118 EOSFunc: func(sink *app.Sink) {
119 log.Debug(ctx, "videoappsink EOSFunc")
120 },
121 })
122
123 segDur := time.Duration(0)
124
125 audioappsink := app.SinkFromElement(audioSink)
126 audioappsink.SetCallbacks(&app.SinkCallbacks{
127 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
128 sample := sink.PullSample()
129 if sample == nil {
130 log.Warn(ctx, "audioappsink NewSampleFunc EOS")
131 return gst.FlowEOS
132 }
133
134 buffer := sample.GetBuffer()
135 if buffer == nil {
136 return gst.FlowError
137 }
138
139 samples := buffer.Bytes()
140
141 audioOutput = append(audioOutput, samples)
142
143 clockTime := buffer.Duration()
144 dur := clockTime.AsDuration()
145 if dur != nil {
146 segDur += *dur
147 } else {
148 log.Error(ctx, "no audio duration", "samples", len(samples))
149 return gst.FlowError
150 }
151
152 return gst.FlowOK
153 },
154 EOSFunc: func(sink *app.Sink) {
155 log.Debug(ctx, "audioappsink EOSFunc")
156 },
157 })
158
159 busErr := make(chan error)
160 go func() {
161 err := HandleBusMessages(ctx, pipeline)
162 if err != nil {
163 log.Log(ctx, "pipeline error", "error", err)
164 }
165 busErr <- err
166 }()
167
168 err = pipeline.SetState(gst.StatePlaying)
169 if err != nil {
170 return nil, fmt.Errorf("failed to set pipeline to playing state: %w", err)
171 }
172
173 defer func() {
174 err = pipeline.SetState(gst.StateNull)
175 if err != nil {
176 log.Error(ctx, "failed to set pipeline to null state", "error", err)
177 }
178 err = pipeline.Remove(demuxBin.Element)
179 if err != nil {
180 log.Error(ctx, "failed to remove demux bin from bin", "error", err)
181 }
182 }()
183
184 err = <-busErr
185 if err != nil {
186 return nil, fmt.Errorf("packetize pipeline error filename=%s, error=%w", seg.Filepath, err)
187 }
188
189 return &bus.PacketizedSegment{
190 Video: videoOutput,
191 Audio: audioOutput,
192 Duration: segDur,
193 }, nil
194}