Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/rust-experimentation 233 lines 6.0 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "strings" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "stream.place/streamplace/pkg/log" 15) 16 17// element that takes the input stream, muxes to mp4, and signs the result 18func SegmentElem(ctx context.Context, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) { 19 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 20 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 21 "name": "signer", 22 "async-finalize": true, 23 "sink-factory": "appsink", 24 "muxer-factory": "mp4mux", 25 "max-size-bytes": 1, 26 }) 27 if err != nil { 28 return nil, err 29 } 30 31 p := elem.GetRequestPad("video") 32 if p == nil { 33 return nil, fmt.Errorf("failed to get video pad") 34 } 35 p = elem.GetRequestPad("audio_%u") 36 if p == nil { 37 return nil, fmt.Errorf("failed to get audio pad") 38 } 39 40 resetTimer := make(chan struct{}) 41 42 go func() { 43 for { 44 select { 45 case <-ctx.Done(): 46 return 47 case <-resetTimer: 48 continue 49 case <-time.After(time.Second * 30): 50 log.Warn(ctx, "no new segment for 30 seconds") 51 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)") 52 return 53 } 54 } 55 }() 56 57 // we didn't need faststart but i'm leaving this commented here in case 58 // you want to change any other muxer properties in the future 59 60 _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) { 61 err := muxEle.SetProperty("presentation-time", false) 62 if err != nil { 63 panic("error setting presentation-time to false: " + err.Error()) 64 } 65 err = muxEle.SetProperty("interleave-bytes", InterleaveBytes) 66 if err != nil { 67 panic("error setting interleave-bytes" + err.Error()) 68 } 69 err = muxEle.SetProperty("interleave-time", InterleaveTime) 70 if err != nil { 71 panic("error setting interleave-time" + err.Error()) 72 } 73 }) 74 if err != nil { 75 return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err) 76 } 77 78 // channel to make sure data is emitted in order 79 var ch chan struct{} 80 81 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 82 previousSegCh := ch 83 mySegCh := make(chan struct{}, 1) 84 ch = mySegCh 85 buf := &bytes.Buffer{} 86 err := sinkEle.SetProperty("sync", false) 87 if err != nil { 88 panic("error setting sync to false: " + err.Error()) 89 } 90 appsink := app.SinkFromElement(sinkEle) 91 if appsink == nil { 92 panic("appsink should not be nil") 93 } 94 95 appsink.SetCallbacks(&app.SinkCallbacks{ 96 NewSampleFunc: WriterNewSample(ctx, buf), 97 EOSFunc: func(sink *app.Sink) { 98 // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 99 // attribute.String("streamer", ms.Streamer()), 100 // )) 101 // defer span.End() 102 now := time.Now().UnixMilli() 103 resetTimer <- struct{}{} 104 bs := buf.Bytes() 105 106 if previousSegCh != nil { 107 <-previousSegCh 108 } 109 err := cb(ctx, bs, now) 110 if err != nil { 111 log.Error(ctx, "error signing segment", "error", err) 112 return 113 } 114 close(mySegCh) 115 116 }, 117 }) 118 }) 119 if err != nil { 120 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err) 121 } 122 123 return elem, nil 124} 125 126func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 127 return SegmentElem(ctx, func(ctx context.Context, bs []byte, now int64) error { 128 if mm.cli.SmearAudio { 129 smearedBuf := &bytes.Buffer{} 130 err := SmearAudioTimestamps(ctx, bytes.NewReader(bs), smearedBuf) 131 if err != nil { 132 return fmt.Errorf("error smearing audio timestamps: %w", err) 133 } 134 bs = smearedBuf.Bytes() 135 } 136 signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 137 if err != nil { 138 return err 139 } 140 return mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true) 141 }) 142} 143 144func SegmentFileUnsigned(ctx context.Context, input string, ch chan *SplitSegment) error { 145 fd, err := os.OpenFile(input, os.O_RDONLY, 0644) 146 log.Log(ctx, "reading file", "file", input) 147 if err != nil { 148 return fmt.Errorf("failed to read file: %w", err) 149 } 150 defer fd.Close() 151 return SegmentUnsigned(ctx, fd, ch) 152} 153 154func SegmentUnsigned(ctx context.Context, input io.Reader, ch chan *SplitSegment) error { 155 ctx, cancel := context.WithCancel(ctx) 156 defer cancel() 157 pipelineSlice := []string{ 158 "appsrc name=appsrc ! qtdemux name=demux", 159 "demux. ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1", 160 "demux. ! queue ! opusparse name=audioparse", 161 } 162 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 163 if err != nil { 164 return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 165 } 166 167 srcele, err := pipeline.GetElementByName("appsrc") 168 if err != nil { 169 return err 170 } 171 src := app.SrcFromElement(srcele) 172 src.SetCallbacks(&app.SourceCallbacks{ 173 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 174 }) 175 videoParseEle, err := pipeline.GetElementByName("videoparse") 176 if err != nil { 177 return err 178 } 179 180 segmenter, err := SegmentElem(ctx, func(ctx context.Context, buf []byte, now int64) error { 181 ch <- &SplitSegment{ 182 Filename: fmt.Sprintf("%d.mp4", now), 183 Data: buf, 184 } 185 return nil 186 }) 187 if err != nil { 188 return err 189 } 190 191 err = pipeline.Add(segmenter) 192 if err != nil { 193 return err 194 } 195 err = videoParseEle.Link(segmenter) 196 if err != nil { 197 return err 198 } 199 audioparse, err := pipeline.GetElementByName("audioparse") 200 if err != nil { 201 return err 202 } 203 err = audioparse.Link(segmenter) 204 if err != nil { 205 return err 206 } 207 208 busErr := make(chan error) 209 go func() { 210 err := HandleBusMessages(ctx, pipeline) 211 cancel() 212 busErr <- err 213 }() 214 215 err = pipeline.SetState(gst.StatePlaying) 216 if err != nil { 217 return err 218 } 219 220 defer func() { 221 err := pipeline.SetState(gst.StateNull) 222 if err != nil { 223 log.Error(ctx, "error setting pipeline to null state", "error", err) 224 } 225 }() 226 227 err = <-busErr 228 if err != nil { 229 return err 230 } 231 232 return nil 233}