Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.13 238 lines 6.8 kB view raw
1package media 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 8 "github.com/go-gst/go-gst/gst" 9 "stream.place/streamplace/pkg/bus" 10 "stream.place/streamplace/pkg/log" 11) 12 13var ErrConcatDone = errors.New("concat done") 14 15func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg, doH264Parse bool) (*gst.Bin, error) { 16 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 17 bin := gst.NewBin("concat-bin") 18 19 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{ 20 "name": "concat-streamsynchronizer", 21 }) 22 if err != nil { 23 return nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 24 } 25 26 err = bin.Add(streamsynchronizer) 27 if err != nil { 28 return nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) 29 } 30 31 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u") 32 if syncPadVideoSink == nil { 33 return nil, fmt.Errorf("failed to get sync video sink pad") 34 } 35 36 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u") 37 if syncPadAudioSink == nil { 38 return nil, fmt.Errorf("failed to get sync audio sink pad") 39 } 40 41 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0") 42 if syncPadVideoSrc == nil { 43 return nil, fmt.Errorf("failed to get sync video src pad") 44 } 45 46 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1") 47 if syncPadAudioSrc == nil { 48 return nil, fmt.Errorf("failed to get sync audio src pad") 49 } 50 51 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 52 "name": "concat-multiqueue", 53 }) 54 if err != nil { 55 return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 56 } 57 err = bin.Add(mq) 58 if err != nil { 59 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 60 } 61 62 // 10x default multiqueue size 63 err = mq.SetProperty("max-size-time", uint64(200000000000)) 64 if err != nil { 65 return nil, fmt.Errorf("failed to set max-size-time: %w", err) 66 } 67 err = mq.SetProperty("max-size-bytes", uint(1048576000)) 68 if err != nil { 69 return nil, fmt.Errorf("failed to set max-size-bytes: %w", err) 70 } 71 err = mq.SetProperty("max-size-buffers", uint(500)) 72 if err != nil { 73 return nil, fmt.Errorf("failed to set max-size-buffers: %w", err) 74 } 75 76 mqVideoSink := mq.GetRequestPad("sink_%u") 77 if mqVideoSink == nil { 78 return nil, fmt.Errorf("video sink pad not found") 79 } 80 81 mqAudioSink := mq.GetRequestPad("sink_%u") 82 if mqAudioSink == nil { 83 return nil, fmt.Errorf("audio sink pad not found") 84 } 85 86 mqVideoSrc := mq.GetStaticPad("src_0") 87 if mqVideoSrc == nil { 88 return nil, fmt.Errorf("video source pad not found") 89 } 90 91 mqAudioSrc := mq.GetStaticPad("src_1") 92 if mqAudioSrc == nil { 93 return nil, fmt.Errorf("audio source pad not found") 94 } 95 96 linked := syncPadVideoSrc.Link(mqVideoSink) 97 if linked != gst.PadLinkOK { 98 return nil, fmt.Errorf("failed to link sync video src pad to multiqueue video sink pad: %v", linked) 99 } 100 101 linked = syncPadAudioSrc.Link(mqAudioSink) 102 if linked != gst.PadLinkOK { 103 return nil, fmt.Errorf("failed to link sync audio src pad to multiqueue audio sink pad: %v", linked) 104 } 105 106 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc) 107 if videoGhost == nil { 108 return nil, fmt.Errorf("failed to create video ghost pad") 109 } 110 111 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc) 112 if audioGhost == nil { 113 return nil, fmt.Errorf("failed to create audio ghost pad") 114 } 115 116 ok := bin.AddPad(videoGhost.Pad) 117 if !ok { 118 return nil, fmt.Errorf("failed to add video ghost pad to bin") 119 } 120 121 ok = bin.AddPad(audioGhost.Pad) 122 if !ok { 123 return nil, fmt.Errorf("failed to add audio ghost pad to bin") 124 } 125 126 go func() { 127 for { 128 select { 129 case seg := <-segCh: 130 if seg == nil { 131 132 ok := syncPadVideoSrc.PushEvent(gst.NewEOSEvent()) 133 if !ok { 134 log.Error(ctx, "failed to post EOS message", "error", ok) 135 } 136 ok = syncPadAudioSrc.PushEvent(gst.NewEOSEvent()) 137 if !ok { 138 log.Error(ctx, "failed to post EOS message", "error", ok) 139 } 140 log.Debug(ctx, "concat completed") 141 142 return 143 } 144 err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink, doH264Parse) 145 if err != nil { 146 log.Error(ctx, "failed to add concat demuxer", "error", err) 147 bin.Error(err.Error(), err) 148 return 149 } 150 case <-ctx.Done(): 151 return 152 } 153 } 154 }() 155 156 return bin, nil 157} 158 159func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad, doH264Parse bool) error { 160 var cancel context.CancelFunc 161 ctx, cancel = context.WithCancel(ctx) 162 defer cancel() 163 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 164 165 log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath) 166 demuxBin, err := ConcatDemuxBin(ctx, seg, doH264Parse) 167 if err != nil { 168 return fmt.Errorf("failed to create demux bin: %w", err) 169 } 170 171 err = bin.Add(demuxBin.Element) 172 if err != nil { 173 return fmt.Errorf("failed to add demux bin to bin: %w", err) 174 } 175 176 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 177 if demuxBinPadVideoSrc == nil { 178 return fmt.Errorf("failed to get demux bin video src pad") 179 } 180 181 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 182 if demuxBinPadAudioSrc == nil { 183 return fmt.Errorf("failed to get demux bin audio src pad") 184 } 185 186 linked := demuxBinPadVideoSrc.Link(syncPadVideoSink) 187 if linked != gst.PadLinkOK { 188 return fmt.Errorf("failed to link demux bin video src pad to sync video sink pad: %v", linked) 189 } 190 191 linked = demuxBinPadAudioSrc.Link(syncPadAudioSink) 192 if linked != gst.PadLinkOK { 193 return fmt.Errorf("failed to link demux bin audio src pad to sync audio sink pad: %v", linked) 194 } 195 196 eosCh := make(chan struct{}) 197 eos := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 198 if pad.GetDirection() != gst.PadDirectionSource { 199 return gst.PadProbeOK 200 } 201 if info.GetEvent().Type() != gst.EventTypeEOS { 202 return gst.PadProbeOK 203 } 204 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection()) 205 downstreamPad := pad.GetPeer() 206 unlinked := pad.Unlink(downstreamPad) 207 if !unlinked { 208 log.Error(ctx, "failed to unlink pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", unlinked) 209 } 210 go func() { 211 eosCh <- struct{}{} 212 }() 213 return gst.PadProbeRemove 214 } 215 demuxBinPadVideoSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 216 demuxBinPadAudioSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 217 218 if err := bin.SetState(gst.StatePlaying); err != nil { 219 return fmt.Errorf("failed to set state: %w", err) 220 } 221 222 <-eosCh 223 <-eosCh 224 225 err = bin.Remove(demuxBin.Element) 226 if err != nil { 227 return fmt.Errorf("failed to remove demux bin from bin: %w", err) 228 } 229 230 err = demuxBin.SetState(gst.StateNull) 231 if err != nil { 232 return fmt.Errorf("failed to set demux bin to null state: %w", err) 233 } 234 235 log.Debug(ctx, "removed concat demuxer", "seg", seg.Filepath) 236 237 return nil 238}