Live video on the AT Protocol
at natb/block-javascript-protocol 293 lines 7.4 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "os" 10 "runtime" 11 "strings" 12 "time" 13 14 "github.com/go-gst/go-glib/glib" 15 "github.com/go-gst/go-gst/gst" 16 "github.com/go-gst/go-gst/gst/app" 17 "github.com/skip2/go-qrcode" 18 "golang.org/x/sync/errgroup" 19 "stream.place/streamplace/pkg/aqtime" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/test" 22) 23 24const HLSPlaylist = "stream.m3u8" 25 26// Pipe with a mechanism to keep the FDs not garbage collected 27func SafePipe() (*os.File, *os.File, func(), error) { 28 r, w, err := os.Pipe() 29 if err != nil { 30 return nil, nil, nil, err 31 } 32 return r, w, func() { 33 runtime.KeepAlive(r.Fd()) 34 runtime.KeepAlive(w.Fd()) 35 }, nil 36} 37 38// basic test to make sure gstreamer functionality is working 39func SelfTest(ctx context.Context) error { 40 ctx = log.WithLogValues(ctx, "mediafunc", "SelfTest") 41 ctx, cancel := context.WithTimeout(ctx, 5*time.Second) 42 defer cancel() 43 f, err := test.Files.Open("fixtures/sample-segment.mp4") 44 if err != nil { 45 return fmt.Errorf("failed to open test file: %w", err) 46 } 47 defer f.Close() 48 bs, err := io.ReadAll(f) 49 if err != nil { 50 return fmt.Errorf("failed to read test file: %w", err) 51 } 52 53 pipeline, err := gst.NewPipeline("self-test") 54 if err != nil { 55 return fmt.Errorf("failed to create pipeline: %w", err) 56 } 57 58 srcele, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{ 59 "name": "self-test-src", 60 }) 61 if err != nil { 62 return fmt.Errorf("failed to create appsrc element: %w", err) 63 } 64 err = pipeline.Add(srcele) 65 if err != nil { 66 return fmt.Errorf("failed to add appsrc to pipeline: %w", err) 67 } 68 69 sinkele, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 70 "name": "self-test-sink", 71 }) 72 if err != nil { 73 return fmt.Errorf("failed to create appsink element: %w", err) 74 } 75 err = pipeline.Add(sinkele) 76 if err != nil { 77 return fmt.Errorf("failed to add appsink to pipeline: %w", err) 78 } 79 80 err = srcele.Link(sinkele) 81 if err != nil { 82 return fmt.Errorf("failed to link appsrc to appsink: %w", err) 83 } 84 85 // pipeline, err := gst.NewPipelineFromString("appsrc name=src ! appsink name=sink") 86 // if err != nil { 87 // return err 88 // } 89 90 src := app.SrcFromElement(srcele) 91 src.SetCallbacks(&app.SourceCallbacks{ 92 NeedDataFunc: func(self *app.Source, _ uint) { 93 buffer := gst.NewBufferWithSize(int64(len(bs))) 94 buffer.Map(gst.MapWrite).WriteData(bs) 95 defer buffer.Unmap() 96 self.PushBuffer(buffer) 97 log.Debug(ctx, "ending stream") 98 self.EndStream() 99 }, 100 }) 101 102 output := &bytes.Buffer{} 103 104 if err != nil { 105 return fmt.Errorf("unexpected error: %w", err) 106 } 107 108 appsink := app.SinkFromElement(sinkele) 109 appsink.SetCallbacks(&app.SinkCallbacks{ 110 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 111 sample := sink.PullSample() 112 if sample == nil { 113 return gst.FlowOK 114 } 115 // defer sample.Unref() 116 117 // Retrieve the buffer from the sample. 118 buffer := sample.GetBuffer() 119 120 _, err := io.Copy(output, buffer.Reader()) 121 122 if err != nil { 123 panic(err) 124 } 125 126 return gst.FlowOK 127 }, 128 EOSFunc: func(sink *app.Sink) { 129 log.Debug(ctx, "EOSFunc") 130 cancel() 131 }, 132 }) 133 134 go func() { 135 if err := HandleBusMessages(ctx, pipeline); err != nil { 136 log.Debug(ctx, "handle bus messages failed", "error", err) 137 } 138 cancel() 139 }() 140 141 // Start the pipeline 142 log.Debug(ctx, "setting pipeline to playing state") 143 err = pipeline.SetState(gst.StatePlaying) 144 if err != nil { 145 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 146 } 147 148 <-ctx.Done() 149 150 if len(output.Bytes()) < 1 { 151 return fmt.Errorf("got a zero-byte buffer from SelfTest") 152 } 153 154 err = pipeline.BlockSetState(gst.StateNull) 155 if err != nil { 156 return fmt.Errorf("failed to set pipeline to null state: %w", err) 157 } 158 159 return nil 160} 161 162const TestSrcWidth = 1280 163const TestSrcHeight = 720 164const QRSize = 256 165 166type QRData struct { 167 Now int64 `json:"now"` 168} 169 170func (mm *MediaManager) TestSource(ctx context.Context, ms MediaSigner) error { 171 mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) 172 173 pipelineSlice := []string{ 174 "h264parse name=videoparse", 175 "compositor name=comp ! videoconvert ! video/x-raw,format=I420 ! x264enc speed-preset=ultrafast key-int-max=30 ! queue ! videoparse.", 176 fmt.Sprintf(`videotestsrc is-live=true ! video/x-raw,format=AYUV,framerate=30/1,width=%d,height=%d ! comp.`, TestSrcWidth, TestSrcHeight), 177 fmt.Sprintf("videobox border-alpha=0 top=-%d left=-%d name=box ! comp.", (TestSrcHeight/2)-(QRSize/2), (TestSrcWidth/2)-(QRSize/2)), 178 "appsrc name=pngsrc ! pngdec ! videoconvert ! videorate ! video/x-raw,format=AYUV,framerate=1/1 ! box.", 179 "appsrc name=timetext ! pngdec ! videoconvert ! videorate ! video/x-raw,format=AYUV,framerate=1/1 ! comp.", 180 "audiotestsrc ! audioconvert ! opusenc inband-fec=true perfect-timestamp=true bitrate=128000 ! queue ! opusparse name=audioparse", 181 } 182 183 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 184 if err != nil { 185 return fmt.Errorf("error creating TestSource pipeline: %w", err) 186 } 187 188 videoparse, err := pipeline.GetElementByName("videoparse") 189 if err != nil { 190 return err 191 } 192 193 audioparse, err := pipeline.GetElementByName("audioparse") 194 if err != nil { 195 return err 196 } 197 198 signer, err := mm.SegmentAndSignElem(ctx, ms) 199 if err != nil { 200 return err 201 } 202 if err := pipeline.Add(signer); err != nil { 203 return err 204 } 205 206 err = videoparse.Link(signer) 207 if err != nil { 208 return fmt.Errorf("link to signer failed: %w", err) 209 } 210 err = audioparse.Link(signer) 211 if err != nil { 212 return fmt.Errorf("link to signer failed: %w", err) 213 } 214 215 pngele, err := pipeline.GetElementByName("pngsrc") 216 if err != nil { 217 return err 218 } 219 220 src := app.SrcFromElement(pngele) 221 src.SetCallbacks(&app.SourceCallbacks{ 222 NeedDataFunc: func(self *app.Source, _ uint) { 223 now := time.Now().UnixMilli() 224 data := QRData{Now: now} 225 bs, err := json.Marshal(data) 226 if err != nil { 227 panic(err) 228 } 229 png, err := qrcode.Encode(string(bs), qrcode.Medium, 256) 230 if err != nil { 231 panic(err) 232 } 233 buffer := gst.NewBufferWithSize(int64(len(png))) 234 buffer.Map(gst.MapWrite).WriteData(png) 235 defer buffer.Unmap() 236 self.PushBuffer(buffer) 237 }, 238 }) 239 tr, err := NewTextRenderer() 240 if err != nil { 241 return err 242 } 243 timetext, err := pipeline.GetElementByName("timetext") 244 if err != nil { 245 return err 246 } 247 248 timesrc := app.SrcFromElement(timetext) 249 timesrc.SetCallbacks(&app.SourceCallbacks{ 250 NeedDataFunc: func(self *app.Source, _ uint) { 251 aqt := aqtime.FromTime(time.Now()) 252 png, err := tr.GenerateImage(aqt.String(), "#ffffff", "#000000", 36) 253 if err != nil { 254 panic(err) 255 } 256 buffer := gst.NewBufferWithSize(int64(len(png))) 257 buffer.Map(gst.MapWrite).WriteData(png) 258 defer buffer.Unmap() 259 self.PushBuffer(buffer) 260 }, 261 }) 262 ctx, cancel := context.WithCancel(ctx) 263 defer cancel() 264 go func() { 265 <-ctx.Done() 266 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 267 log.Log(ctx, "failed to set pipeline state", "error", err) 268 } 269 mainLoop.Quit() 270 }() 271 272 go func() { 273 if err := HandleBusMessages(ctx, pipeline); err != nil { 274 log.Log(ctx, "pipeline error", "error", err) 275 } 276 cancel() 277 }() 278 279 // Start the pipeline 280 if err := pipeline.SetState(gst.StatePlaying); err != nil { 281 log.Log(ctx, "failed to set pipeline state", "error", err) 282 } 283 284 g, _ := errgroup.WithContext(ctx) 285 286 g.Go(func() error { 287 mainLoop.Run() 288 log.Log(ctx, "main loop complete") 289 return nil 290 }) 291 292 return g.Wait() 293}