Live video on the AT Protocol
at eli/docker-linting 99 lines 2.7 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "go.opentelemetry.io/otel" 12 "go.opentelemetry.io/otel/attribute" 13 "go.opentelemetry.io/otel/trace" 14 "stream.place/streamplace/pkg/log" 15) 16 17// element that takes the input stream, muxes to mp4, and signs the result 18func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 19 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 20 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 21 "name": "signer", 22 "async-finalize": true, 23 "sink-factory": "appsink", 24 "muxer-factory": "mp4mux", 25 "max-size-bytes": 1, 26 }) 27 if err != nil { 28 return nil, err 29 } 30 31 p := elem.GetRequestPad("video") 32 if p == nil { 33 return nil, fmt.Errorf("failed to get video pad") 34 } 35 p = elem.GetRequestPad("audio_%u") 36 if p == nil { 37 return nil, fmt.Errorf("failed to get audio pad") 38 } 39 40 resetTimer := make(chan struct{}) 41 42 go func() { 43 for { 44 select { 45 case <-ctx.Done(): 46 return 47 case <-resetTimer: 48 continue 49 case <-time.After(time.Second * 30): 50 log.Warn(ctx, "no new segment for 30 seconds") 51 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)") 52 return 53 } 54 } 55 }() 56 57 elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 58 buf := &bytes.Buffer{} 59 appsink := app.SinkFromElement(sinkEle) 60 if appsink == nil { 61 panic("appsink should not be nil") 62 } 63 64 appsink.SetCallbacks(&app.SinkCallbacks{ 65 NewSampleFunc: WriterNewSample(ctx, buf), 66 EOSFunc: func(sink *app.Sink) { 67 ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 68 attribute.String("streamer", ms.Streamer()), 69 )) 70 defer span.End() 71 resetTimer <- struct{}{} 72 now := time.Now().UnixMilli() 73 bs := buf.Bytes() 74 if mm.cli.SmearAudio { 75 smearedBuf := &bytes.Buffer{} 76 err := SmearAudioTimestamps(ctx, bytes.NewReader(buf.Bytes()), smearedBuf) 77 if err != nil { 78 log.Error(ctx, "error smearing audio timestamps", "error", err) 79 return 80 } 81 bs = smearedBuf.Bytes() 82 } 83 bs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 84 if err != nil { 85 log.Error(ctx, "error signing segment", "error", err) 86 return 87 } 88 89 err = mm.ValidateMP4(ctx, bytes.NewReader(bs)) 90 if err != nil { 91 log.Error(ctx, "error validating segment", "error", err) 92 return 93 } 94 }, 95 }) 96 }) 97 98 return elem, nil 99}