Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/docs-url-fix 210 lines 5.8 kB view raw
1package media 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 8 "github.com/go-gst/go-gst/gst" 9 "stream.place/streamplace/pkg/bus" 10 "stream.place/streamplace/pkg/log" 11) 12 13var ErrConcatDone = errors.New("concat done") 14 15func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg) (*gst.Bin, error) { 16 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 17 bin := gst.NewBin("concat-bin") 18 19 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{ 20 "name": "concat-streamsynchronizer", 21 }) 22 if err != nil { 23 return nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 24 } 25 26 err = bin.Add(streamsynchronizer) 27 if err != nil { 28 return nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) 29 } 30 31 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u") 32 if syncPadVideoSink == nil { 33 return nil, fmt.Errorf("failed to get sync video sink pad") 34 } 35 36 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u") 37 if syncPadAudioSink == nil { 38 return nil, fmt.Errorf("failed to get sync audio sink pad") 39 } 40 41 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0") 42 if syncPadVideoSrc == nil { 43 return nil, fmt.Errorf("failed to get sync video src pad") 44 } 45 46 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1") 47 if syncPadAudioSrc == nil { 48 return nil, fmt.Errorf("failed to get sync audio src pad") 49 } 50 51 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 52 "name": "concat-multiqueue", 53 }) 54 if err != nil { 55 return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 56 } 57 err = bin.Add(mq) 58 if err != nil { 59 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 60 } 61 62 mqVideoSink := mq.GetRequestPad("sink_%u") 63 if mqVideoSink == nil { 64 return nil, fmt.Errorf("video sink pad not found") 65 } 66 67 mqAudioSink := mq.GetRequestPad("sink_%u") 68 if mqAudioSink == nil { 69 return nil, fmt.Errorf("audio sink pad not found") 70 } 71 72 mqVideoSrc := mq.GetStaticPad("src_0") 73 if mqVideoSrc == nil { 74 return nil, fmt.Errorf("video source pad not found") 75 } 76 77 mqAudioSrc := mq.GetStaticPad("src_1") 78 if mqAudioSrc == nil { 79 return nil, fmt.Errorf("audio source pad not found") 80 } 81 82 linked := syncPadVideoSrc.Link(mqVideoSink) 83 if linked != gst.PadLinkOK { 84 return nil, fmt.Errorf("failed to link sync video src pad to multiqueue video sink pad: %v", linked) 85 } 86 87 linked = syncPadAudioSrc.Link(mqAudioSink) 88 if linked != gst.PadLinkOK { 89 return nil, fmt.Errorf("failed to link sync audio src pad to multiqueue audio sink pad: %v", linked) 90 } 91 92 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc) 93 if videoGhost == nil { 94 return nil, fmt.Errorf("failed to create video ghost pad") 95 } 96 97 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc) 98 if audioGhost == nil { 99 return nil, fmt.Errorf("failed to create audio ghost pad") 100 } 101 102 ok := bin.AddPad(videoGhost.Pad) 103 if !ok { 104 return nil, fmt.Errorf("failed to add video ghost pad to bin") 105 } 106 107 ok = bin.AddPad(audioGhost.Pad) 108 if !ok { 109 return nil, fmt.Errorf("failed to add audio ghost pad to bin") 110 } 111 112 go func() { 113 for { 114 select { 115 case seg := <-segCh: 116 if seg == nil { 117 bin.Error(ErrConcatDone.Error(), ErrConcatDone) 118 return 119 } 120 err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink) 121 if err != nil { 122 log.Error(ctx, "failed to add concat demuxer", "error", err) 123 bin.Error(err.Error(), err) 124 return 125 } 126 case <-ctx.Done(): 127 return 128 } 129 } 130 }() 131 132 return bin, nil 133} 134 135func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad) error { 136 137 log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath) 138 demuxBin, err := ConcatDemuxBin(ctx, seg) 139 if err != nil { 140 return fmt.Errorf("failed to create demux bin: %w", err) 141 } 142 143 err = bin.Add(demuxBin.Element) 144 if err != nil { 145 return fmt.Errorf("failed to add demux bin to bin: %w", err) 146 } 147 148 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 149 if demuxBinPadVideoSrc == nil { 150 return fmt.Errorf("failed to get demux bin video src pad") 151 } 152 153 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 154 if demuxBinPadAudioSrc == nil { 155 return fmt.Errorf("failed to get demux bin audio src pad") 156 } 157 158 linked := demuxBinPadVideoSrc.Link(syncPadVideoSink) 159 if linked != gst.PadLinkOK { 160 return fmt.Errorf("failed to link demux bin video src pad to sync video sink pad: %v", linked) 161 } 162 163 linked = demuxBinPadAudioSrc.Link(syncPadAudioSink) 164 if linked != gst.PadLinkOK { 165 return fmt.Errorf("failed to link demux bin audio src pad to sync audio sink pad: %v", linked) 166 } 167 168 eosCh := make(chan struct{}) 169 eos := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 170 if pad.GetDirection() != gst.PadDirectionSource { 171 return gst.PadProbeOK 172 } 173 if info.GetEvent().Type() != gst.EventTypeEOS { 174 return gst.PadProbeOK 175 } 176 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection()) 177 downstreamPad := pad.GetPeer() 178 unlinked := pad.Unlink(downstreamPad) 179 if !unlinked { 180 log.Error(ctx, "failed to unlink pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", unlinked) 181 } 182 go func() { 183 eosCh <- struct{}{} 184 }() 185 return gst.PadProbeRemove 186 } 187 demuxBinPadVideoSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 188 demuxBinPadAudioSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 189 190 if err := bin.SetState(gst.StatePlaying); err != nil { 191 return fmt.Errorf("failed to set state: %w", err) 192 } 193 194 <-eosCh 195 <-eosCh 196 197 err = bin.Remove(demuxBin.Element) 198 if err != nil { 199 return fmt.Errorf("failed to remove demux bin from bin: %w", err) 200 } 201 202 err = demuxBin.SetState(gst.StateNull) 203 if err != nil { 204 return fmt.Errorf("failed to set demux bin to null state: %w", err) 205 } 206 207 log.Debug(ctx, "removed concat demuxer", "seg", seg.Filepath) 208 209 return nil 210}