Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "os"
10 "runtime"
11 "strings"
12 "time"
13
14 "github.com/go-gst/go-glib/glib"
15 "github.com/go-gst/go-gst/gst"
16 "github.com/go-gst/go-gst/gst/app"
17 "github.com/skip2/go-qrcode"
18 "golang.org/x/sync/errgroup"
19 "stream.place/streamplace/pkg/aqtime"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/test"
22)
23
24const HLSPlaylist = "stream.m3u8"
25
26// Pipe with a mechanism to keep the FDs not garbage collected
27func SafePipe() (*os.File, *os.File, func(), error) {
28 r, w, err := os.Pipe()
29 if err != nil {
30 return nil, nil, nil, err
31 }
32 return r, w, func() {
33 runtime.KeepAlive(r.Fd())
34 runtime.KeepAlive(w.Fd())
35 }, nil
36}
37
38// basic test to make sure gstreamer functionality is working
39func SelfTest(ctx context.Context) error {
40 ctx = log.WithLogValues(ctx, "mediafunc", "SelfTest")
41 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
42 defer cancel()
43 f, err := test.Files.Open("fixtures/sample-segment.mp4")
44 if err != nil {
45 return fmt.Errorf("failed to open test file: %w", err)
46 }
47 defer f.Close()
48 bs, err := io.ReadAll(f)
49 if err != nil {
50 return fmt.Errorf("failed to read test file: %w", err)
51 }
52
53 pipeline, err := gst.NewPipeline("self-test")
54 if err != nil {
55 return fmt.Errorf("failed to create pipeline: %w", err)
56 }
57
58 srcele, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{
59 "name": "self-test-src",
60 })
61 if err != nil {
62 return fmt.Errorf("failed to create appsrc element: %w", err)
63 }
64 err = pipeline.Add(srcele)
65 if err != nil {
66 return fmt.Errorf("failed to add appsrc to pipeline: %w", err)
67 }
68
69 sinkele, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
70 "name": "self-test-sink",
71 })
72 if err != nil {
73 return fmt.Errorf("failed to create appsink element: %w", err)
74 }
75 err = pipeline.Add(sinkele)
76 if err != nil {
77 return fmt.Errorf("failed to add appsink to pipeline: %w", err)
78 }
79
80 err = srcele.Link(sinkele)
81 if err != nil {
82 return fmt.Errorf("failed to link appsrc to appsink: %w", err)
83 }
84
85 // pipeline, err := gst.NewPipelineFromString("appsrc name=src ! appsink name=sink")
86 // if err != nil {
87 // return err
88 // }
89
90 src := app.SrcFromElement(srcele)
91 src.SetCallbacks(&app.SourceCallbacks{
92 NeedDataFunc: func(self *app.Source, _ uint) {
93 buffer := gst.NewBufferWithSize(int64(len(bs)))
94 buffer.Map(gst.MapWrite).WriteData(bs)
95 defer buffer.Unmap()
96 self.PushBuffer(buffer)
97 log.Debug(ctx, "ending stream")
98 self.EndStream()
99 },
100 })
101
102 output := &bytes.Buffer{}
103
104 if err != nil {
105 return fmt.Errorf("unexpected error: %w", err)
106 }
107
108 appsink := app.SinkFromElement(sinkele)
109 appsink.SetCallbacks(&app.SinkCallbacks{
110 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
111 sample := sink.PullSample()
112 if sample == nil {
113 return gst.FlowOK
114 }
115 // defer sample.Unref()
116
117 // Retrieve the buffer from the sample.
118 buffer := sample.GetBuffer()
119
120 _, err := io.Copy(output, buffer.Reader())
121
122 if err != nil {
123 panic(err)
124 }
125
126 return gst.FlowOK
127 },
128 EOSFunc: func(sink *app.Sink) {
129 log.Debug(ctx, "EOSFunc")
130 cancel()
131 },
132 })
133
134 go func() {
135 if err := HandleBusMessages(ctx, pipeline); err != nil {
136 log.Debug(ctx, "handle bus messages failed", "error", err)
137 }
138 cancel()
139 }()
140
141 // Start the pipeline
142 log.Debug(ctx, "setting pipeline to playing state")
143 err = pipeline.SetState(gst.StatePlaying)
144 if err != nil {
145 return fmt.Errorf("failed to set pipeline to playing state: %w", err)
146 }
147
148 <-ctx.Done()
149
150 if len(output.Bytes()) < 1 {
151 return fmt.Errorf("got a zero-byte buffer from SelfTest")
152 }
153
154 err = pipeline.BlockSetState(gst.StateNull)
155 if err != nil {
156 return fmt.Errorf("failed to set pipeline to null state: %w", err)
157 }
158
159 return nil
160}
161
162const TestSrcWidth = 1280
163const TestSrcHeight = 720
164const QRSize = 256
165
166type QRData struct {
167 Now int64 `json:"now"`
168}
169
170func (mm *MediaManager) TestSource(ctx context.Context, ms MediaSigner) error {
171 mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false)
172
173 pipelineSlice := []string{
174 "h264parse name=videoparse",
175 "compositor name=comp ! videoconvert ! video/x-raw,format=I420 ! x264enc speed-preset=ultrafast key-int-max=30 ! queue ! videoparse.",
176 fmt.Sprintf(`videotestsrc is-live=true ! video/x-raw,format=AYUV,framerate=30/1,width=%d,height=%d ! comp.`, TestSrcWidth, TestSrcHeight),
177 fmt.Sprintf("videobox border-alpha=0 top=-%d left=-%d name=box ! comp.", (TestSrcHeight/2)-(QRSize/2), (TestSrcWidth/2)-(QRSize/2)),
178 "appsrc name=pngsrc ! pngdec ! videoconvert ! videorate ! video/x-raw,format=AYUV,framerate=1/1 ! box.",
179 "appsrc name=timetext ! pngdec ! videoconvert ! videorate ! video/x-raw,format=AYUV,framerate=1/1 ! comp.",
180 "audiotestsrc ! audioconvert ! opusenc inband-fec=true perfect-timestamp=true bitrate=128000 ! queue ! opusparse name=audioparse",
181 }
182
183 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
184 if err != nil {
185 return fmt.Errorf("error creating TestSource pipeline: %w", err)
186 }
187
188 videoparse, err := pipeline.GetElementByName("videoparse")
189 if err != nil {
190 return err
191 }
192
193 audioparse, err := pipeline.GetElementByName("audioparse")
194 if err != nil {
195 return err
196 }
197
198 signer, err := mm.SegmentAndSignElem(ctx, ms)
199 if err != nil {
200 return err
201 }
202 if err := pipeline.Add(signer); err != nil {
203 return err
204 }
205
206 err = videoparse.Link(signer)
207 if err != nil {
208 return fmt.Errorf("link to signer failed: %w", err)
209 }
210 err = audioparse.Link(signer)
211 if err != nil {
212 return fmt.Errorf("link to signer failed: %w", err)
213 }
214
215 pngele, err := pipeline.GetElementByName("pngsrc")
216 if err != nil {
217 return err
218 }
219
220 src := app.SrcFromElement(pngele)
221 src.SetCallbacks(&app.SourceCallbacks{
222 NeedDataFunc: func(self *app.Source, _ uint) {
223 now := time.Now().UnixMilli()
224 data := QRData{Now: now}
225 bs, err := json.Marshal(data)
226 if err != nil {
227 panic(err)
228 }
229 png, err := qrcode.Encode(string(bs), qrcode.Medium, 256)
230 if err != nil {
231 panic(err)
232 }
233 buffer := gst.NewBufferWithSize(int64(len(png)))
234 buffer.Map(gst.MapWrite).WriteData(png)
235 defer buffer.Unmap()
236 self.PushBuffer(buffer)
237 },
238 })
239 tr, err := NewTextRenderer()
240 if err != nil {
241 return err
242 }
243 timetext, err := pipeline.GetElementByName("timetext")
244 if err != nil {
245 return err
246 }
247
248 timesrc := app.SrcFromElement(timetext)
249 timesrc.SetCallbacks(&app.SourceCallbacks{
250 NeedDataFunc: func(self *app.Source, _ uint) {
251 aqt := aqtime.FromTime(time.Now())
252 png, err := tr.GenerateImage(aqt.String(), "#ffffff", "#000000", 36)
253 if err != nil {
254 panic(err)
255 }
256 buffer := gst.NewBufferWithSize(int64(len(png)))
257 buffer.Map(gst.MapWrite).WriteData(png)
258 defer buffer.Unmap()
259 self.PushBuffer(buffer)
260 },
261 })
262 ctx, cancel := context.WithCancel(ctx)
263 defer cancel()
264 go func() {
265 <-ctx.Done()
266 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
267 log.Log(ctx, "failed to set pipeline state", "error", err)
268 }
269 mainLoop.Quit()
270 }()
271
272 go func() {
273 if err := HandleBusMessages(ctx, pipeline); err != nil {
274 log.Log(ctx, "pipeline error", "error", err)
275 }
276 cancel()
277 }()
278
279 // Start the pipeline
280 if err := pipeline.SetState(gst.StatePlaying); err != nil {
281 log.Log(ctx, "failed to set pipeline state", "error", err)
282 }
283
284 g, _ := errgroup.WithContext(ctx)
285
286 g.Go(func() error {
287 mainLoop.Run()
288 log.Log(ctx, "main loop complete")
289 return nil
290 })
291
292 return g.Wait()
293}