Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "stream.place/streamplace/pkg/bus"
12 "stream.place/streamplace/pkg/log"
13)
14
15// take in a segment and return a bunch of packets suitable for webrtc
16func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) {
17
18 pipelineSlice := []string{
19 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink",
20 "opusparse name=audioparse ! appsink sync=false name=audioappsink",
21 }
22
23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
24 if err != nil {
25 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all
26 }
27
28 demuxBin, err := ConcatDemuxBin(ctx, seg)
29 if err != nil {
30 return nil, fmt.Errorf("failed to create concat bin: %w", err)
31 }
32
33 err = pipeline.Add(demuxBin.Element)
34 if err != nil {
35 return nil, fmt.Errorf("failed to add demux bin to bin: %w", err)
36 }
37
38 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0")
39 if demuxBinPadVideoSrc == nil {
40 return nil, fmt.Errorf("failed to get demux bin video src pad")
41 }
42
43 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0")
44 if demuxBinPadAudioSrc == nil {
45 return nil, fmt.Errorf("failed to get demux bin audio src pad")
46 }
47
48 videoParse, err := pipeline.GetElementByName("videoparse")
49 if err != nil {
50 return nil, fmt.Errorf("failed to get video parse element: %w", err)
51 }
52
53 audioParse, err := pipeline.GetElementByName("audioparse")
54 if err != nil {
55 return nil, fmt.Errorf("failed to get audio parse element: %w", err)
56 }
57
58 linked := demuxBinPadVideoSrc.Link(videoParse.GetStaticPad("sink"))
59 if linked != gst.PadLinkOK {
60 return nil, fmt.Errorf("failed to link demux bin video src pad to video parse element: %v", linked)
61 }
62
63 linked = demuxBinPadAudioSrc.Link(audioParse.GetStaticPad("sink"))
64 if linked != gst.PadLinkOK {
65 return nil, fmt.Errorf("failed to link demux bin audio src pad to audio parse element: %v", linked)
66 }
67
68 videoSink, err := pipeline.GetElementByName("videoappsink")
69 if err != nil {
70 return nil, fmt.Errorf("failed to get video appsink element: %w", err)
71 }
72 if videoSink == nil {
73 return nil, fmt.Errorf("failed to get video appsink element")
74 }
75
76 audioSink, err := pipeline.GetElementByName("audioappsink")
77 if err != nil {
78 return nil, fmt.Errorf("failed to get audio appsink element: %w", err)
79 }
80 if audioSink == nil {
81 return nil, fmt.Errorf("failed to get audio appsink element")
82 }
83
84 videoOutput := [][]byte{}
85 audioOutput := [][]byte{}
86 // eosCh := make(chan struct{})
87
88 videoappsink := app.SinkFromElement(videoSink)
89 videoappsink.SetCallbacks(&app.SinkCallbacks{
90 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
91 sample := sink.PullSample()
92 if sample == nil {
93 return gst.FlowEOS
94 }
95
96 buffer := sample.GetBuffer()
97 if buffer == nil {
98 return gst.FlowError
99 }
100
101 samples := buffer.Bytes()
102
103 videoOutput = append(videoOutput, samples)
104
105 // clockTime := buffer.Duration()
106 // dur := clockTime.AsDuration()
107 // if dur != nil {
108 // log.Log(ctx, "video duration", "duration", *dur)
109 // } else {
110 // log.Error(ctx, "no video duration", "samples", len(samples))
111 // }
112
113 return gst.FlowOK
114 },
115 EOSFunc: func(sink *app.Sink) {
116 log.Debug(ctx, "videoappsink EOSFunc")
117 },
118 })
119
120 segDur := time.Duration(0)
121
122 audioappsink := app.SinkFromElement(audioSink)
123 audioappsink.SetCallbacks(&app.SinkCallbacks{
124 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
125 sample := sink.PullSample()
126 if sample == nil {
127 log.Warn(ctx, "audioappsink NewSampleFunc EOS")
128 return gst.FlowEOS
129 }
130
131 buffer := sample.GetBuffer()
132 if buffer == nil {
133 return gst.FlowError
134 }
135
136 samples := buffer.Bytes()
137
138 audioOutput = append(audioOutput, samples)
139
140 clockTime := buffer.Duration()
141 dur := clockTime.AsDuration()
142 if dur != nil {
143 segDur += *dur
144 } else {
145 log.Error(ctx, "no audio duration", "samples", len(samples))
146 return gst.FlowError
147 }
148
149 return gst.FlowOK
150 },
151 EOSFunc: func(sink *app.Sink) {
152 log.Debug(ctx, "audioappsink EOSFunc")
153 },
154 })
155
156 busErr := make(chan error)
157 go func() {
158 err := HandleBusMessages(ctx, pipeline)
159 if err != nil {
160 log.Log(ctx, "pipeline error", "error", err)
161 }
162 busErr <- err
163 }()
164
165 err = pipeline.SetState(gst.StatePlaying)
166 if err != nil {
167 return nil, fmt.Errorf("failed to set pipeline to playing state: %w", err)
168 }
169
170 defer func() {
171 err = pipeline.SetState(gst.StateNull)
172 if err != nil {
173 log.Error(ctx, "failed to set pipeline to null state", "error", err)
174 }
175 err = pipeline.Remove(demuxBin.Element)
176 if err != nil {
177 log.Error(ctx, "failed to remove demux bin from bin", "error", err)
178 }
179 }()
180
181 err = <-busErr
182 if err != nil {
183 return nil, fmt.Errorf("pipeline error: %w", err)
184 }
185
186 return &bus.PacketizedSegment{
187 Video: videoOutput,
188 Audio: audioOutput,
189 Duration: segDur,
190 }, nil
191}