Live video on the AT Protocol
at eli/postgres 104 lines 2.8 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "go.opentelemetry.io/otel" 12 "go.opentelemetry.io/otel/attribute" 13 "go.opentelemetry.io/otel/trace" 14 "stream.place/streamplace/pkg/globalerror" 15 "stream.place/streamplace/pkg/log" 16) 17 18// element that takes the input stream, muxes to mp4, and signs the result 19func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 20 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 21 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 22 "name": "signer", 23 "async-finalize": true, 24 "sink-factory": "appsink", 25 "muxer-factory": "mp4mux", 26 "max-size-bytes": 1, 27 }) 28 if err != nil { 29 return nil, err 30 } 31 32 p := elem.GetRequestPad("video") 33 if p == nil { 34 return nil, fmt.Errorf("failed to get video pad") 35 } 36 p = elem.GetRequestPad("audio_%u") 37 if p == nil { 38 return nil, fmt.Errorf("failed to get audio pad") 39 } 40 41 resetTimer := make(chan struct{}) 42 43 go func() { 44 for { 45 select { 46 case <-ctx.Done(): 47 return 48 case <-resetTimer: 49 continue 50 case <-time.After(time.Second * 30): 51 log.Warn(ctx, "no new segment for 30 seconds") 52 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)") 53 return 54 } 55 } 56 }() 57 58 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 59 buf := &bytes.Buffer{} 60 appsink := app.SinkFromElement(sinkEle) 61 if appsink == nil { 62 panic("appsink should not be nil") 63 } 64 65 appsink.SetCallbacks(&app.SinkCallbacks{ 66 NewSampleFunc: WriterNewSample(ctx, buf), 67 EOSFunc: func(sink *app.Sink) { 68 ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 69 attribute.String("streamer", ms.Streamer()), 70 )) 71 defer span.End() 72 resetTimer <- struct{}{} 73 now := time.Now().UnixMilli() 74 bs := buf.Bytes() 75 if mm.cli.SmearAudio { 76 smearedBuf := &bytes.Buffer{} 77 err := SmearAudioTimestamps(ctx, bytes.NewReader(buf.Bytes()), smearedBuf) 78 if err != nil { 79 log.Error(ctx, "error smearing audio timestamps", "error", err) 80 return 81 } 82 bs = smearedBuf.Bytes() 83 } 84 bs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 85 if err != nil { 86 log.Error(ctx, "error signing segment", "error", err) 87 return 88 } 89 90 err = mm.ValidateMP4(ctx, bytes.NewReader(bs)) 91 if err != nil { 92 log.Error(ctx, "error validating segment", "error", err) 93 globalerror.GlobalError(err) 94 return 95 } 96 }, 97 }) 98 }) 99 if err != nil { 100 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err) 101 } 102 103 return elem, nil 104}