Live video on the AT Protocol
at natb/block-javascript-protocol 238 lines 6.8 kB view raw
1package media 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 8 "github.com/go-gst/go-gst/gst" 9 "stream.place/streamplace/pkg/bus" 10 "stream.place/streamplace/pkg/log" 11) 12 13var ErrConcatDone = errors.New("concat done") 14 15func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg, doH264Parse bool) (*gst.Bin, error) { 16 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 17 bin := gst.NewBin("concat-bin") 18 19 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{ 20 "name": "concat-streamsynchronizer", 21 }) 22 if err != nil { 23 return nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 24 } 25 26 err = bin.Add(streamsynchronizer) 27 if err != nil { 28 return nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) 29 } 30 31 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u") 32 if syncPadVideoSink == nil { 33 return nil, fmt.Errorf("failed to get sync video sink pad") 34 } 35 36 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u") 37 if syncPadAudioSink == nil { 38 return nil, fmt.Errorf("failed to get sync audio sink pad") 39 } 40 41 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0") 42 if syncPadVideoSrc == nil { 43 return nil, fmt.Errorf("failed to get sync video src pad") 44 } 45 46 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1") 47 if syncPadAudioSrc == nil { 48 return nil, fmt.Errorf("failed to get sync audio src pad") 49 } 50 51 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 52 "name": "concat-multiqueue", 53 }) 54 if err != nil { 55 return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 56 } 57 err = bin.Add(mq) 58 if err != nil { 59 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 60 } 61 62 // 10x default multiqueue size 63 err = mq.SetProperty("max-size-time", uint64(200000000000)) 64 if err != nil { 65 return nil, fmt.Errorf("failed to set max-size-time: %w", err) 66 } 67 err = mq.SetProperty("max-size-bytes", uint(1048576000)) 68 if err != nil { 69 return nil, fmt.Errorf("failed to set max-size-bytes: %w", err) 70 } 71 err = mq.SetProperty("max-size-buffers", uint(500)) 72 if err != nil { 73 return nil, fmt.Errorf("failed to set max-size-buffers: %w", err) 74 } 75 76 mqVideoSink := mq.GetRequestPad("sink_%u") 77 if mqVideoSink == nil { 78 return nil, fmt.Errorf("video sink pad not found") 79 } 80 81 mqAudioSink := mq.GetRequestPad("sink_%u") 82 if mqAudioSink == nil { 83 return nil, fmt.Errorf("audio sink pad not found") 84 } 85 86 mqVideoSrc := mq.GetStaticPad("src_0") 87 if mqVideoSrc == nil { 88 return nil, fmt.Errorf("video source pad not found") 89 } 90 91 mqAudioSrc := mq.GetStaticPad("src_1") 92 if mqAudioSrc == nil { 93 return nil, fmt.Errorf("audio source pad not found") 94 } 95 96 linked := syncPadVideoSrc.Link(mqVideoSink) 97 if linked != gst.PadLinkOK { 98 return nil, fmt.Errorf("failed to link sync video src pad to multiqueue video sink pad: %v", linked) 99 } 100 101 linked = syncPadAudioSrc.Link(mqAudioSink) 102 if linked != gst.PadLinkOK { 103 return nil, fmt.Errorf("failed to link sync audio src pad to multiqueue audio sink pad: %v", linked) 104 } 105 106 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc) 107 if videoGhost == nil { 108 return nil, fmt.Errorf("failed to create video ghost pad") 109 } 110 111 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc) 112 if audioGhost == nil { 113 return nil, fmt.Errorf("failed to create audio ghost pad") 114 } 115 116 ok := bin.AddPad(videoGhost.Pad) 117 if !ok { 118 return nil, fmt.Errorf("failed to add video ghost pad to bin") 119 } 120 121 ok = bin.AddPad(audioGhost.Pad) 122 if !ok { 123 return nil, fmt.Errorf("failed to add audio ghost pad to bin") 124 } 125 126 go func() { 127 for { 128 select { 129 case seg := <-segCh: 130 if seg == nil { 131 132 ok := syncPadVideoSrc.PushEvent(gst.NewEOSEvent()) 133 if !ok { 134 log.Error(ctx, "failed to post EOS message", "error", ok) 135 } 136 ok = syncPadAudioSrc.PushEvent(gst.NewEOSEvent()) 137 if !ok { 138 log.Error(ctx, "failed to post EOS message", "error", ok) 139 } 140 log.Debug(ctx, "concat completed") 141 142 return 143 } 144 err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink, doH264Parse) 145 if err != nil { 146 log.Error(ctx, "failed to add concat demuxer", "error", err) 147 bin.Error(err.Error(), err) 148 return 149 } 150 case <-ctx.Done(): 151 return 152 } 153 } 154 }() 155 156 return bin, nil 157} 158 159func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad, doH264Parse bool) error { 160 var cancel context.CancelFunc 161 ctx, cancel = context.WithCancel(ctx) 162 defer cancel() 163 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 164 165 log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath) 166 demuxBin, err := ConcatDemuxBin(ctx, seg, doH264Parse) 167 if err != nil { 168 return fmt.Errorf("failed to create demux bin: %w", err) 169 } 170 171 err = bin.Add(demuxBin.Element) 172 if err != nil { 173 return fmt.Errorf("failed to add demux bin to bin: %w", err) 174 } 175 176 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 177 if demuxBinPadVideoSrc == nil { 178 return fmt.Errorf("failed to get demux bin video src pad") 179 } 180 181 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 182 if demuxBinPadAudioSrc == nil { 183 return fmt.Errorf("failed to get demux bin audio src pad") 184 } 185 186 linked := demuxBinPadVideoSrc.Link(syncPadVideoSink) 187 if linked != gst.PadLinkOK { 188 return fmt.Errorf("failed to link demux bin video src pad to sync video sink pad: %v", linked) 189 } 190 191 linked = demuxBinPadAudioSrc.Link(syncPadAudioSink) 192 if linked != gst.PadLinkOK { 193 return fmt.Errorf("failed to link demux bin audio src pad to sync audio sink pad: %v", linked) 194 } 195 196 eosCh := make(chan struct{}) 197 eos := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 198 if pad.GetDirection() != gst.PadDirectionSource { 199 return gst.PadProbeOK 200 } 201 if info.GetEvent().Type() != gst.EventTypeEOS { 202 return gst.PadProbeOK 203 } 204 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection()) 205 downstreamPad := pad.GetPeer() 206 unlinked := pad.Unlink(downstreamPad) 207 if !unlinked { 208 log.Error(ctx, "failed to unlink pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", unlinked) 209 } 210 go func() { 211 eosCh <- struct{}{} 212 }() 213 return gst.PadProbeRemove 214 } 215 demuxBinPadVideoSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 216 demuxBinPadAudioSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 217 218 if err := bin.SetState(gst.StatePlaying); err != nil { 219 return fmt.Errorf("failed to set state: %w", err) 220 } 221 222 <-eosCh 223 <-eosCh 224 225 err = bin.Remove(demuxBin.Element) 226 if err != nil { 227 return fmt.Errorf("failed to remove demux bin from bin: %w", err) 228 } 229 230 err = demuxBin.SetState(gst.StateNull) 231 if err != nil { 232 return fmt.Errorf("failed to set demux bin to null state: %w", err) 233 } 234 235 log.Debug(ctx, "removed concat demuxer", "seg", seg.Filepath) 236 237 return nil 238}