Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.26 104 lines 2.8 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "go.opentelemetry.io/otel" 12 "go.opentelemetry.io/otel/attribute" 13 "go.opentelemetry.io/otel/trace" 14 "stream.place/streamplace/pkg/globalerror" 15 "stream.place/streamplace/pkg/log" 16) 17 18// element that takes the input stream, muxes to mp4, and signs the result 19func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 20 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 21 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 22 "name": "signer", 23 "async-finalize": true, 24 "sink-factory": "appsink", 25 "muxer-factory": "mp4mux", 26 "max-size-bytes": 1, 27 }) 28 if err != nil { 29 return nil, err 30 } 31 32 p := elem.GetRequestPad("video") 33 if p == nil { 34 return nil, fmt.Errorf("failed to get video pad") 35 } 36 p = elem.GetRequestPad("audio_%u") 37 if p == nil { 38 return nil, fmt.Errorf("failed to get audio pad") 39 } 40 41 resetTimer := make(chan struct{}) 42 43 go func() { 44 for { 45 select { 46 case <-ctx.Done(): 47 return 48 case <-resetTimer: 49 continue 50 case <-time.After(time.Second * 30): 51 log.Warn(ctx, "no new segment for 30 seconds") 52 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)") 53 return 54 } 55 } 56 }() 57 58 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 59 buf := &bytes.Buffer{} 60 appsink := app.SinkFromElement(sinkEle) 61 if appsink == nil { 62 panic("appsink should not be nil") 63 } 64 65 appsink.SetCallbacks(&app.SinkCallbacks{ 66 NewSampleFunc: WriterNewSample(ctx, buf), 67 EOSFunc: func(sink *app.Sink) { 68 ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 69 attribute.String("streamer", ms.Streamer()), 70 )) 71 defer span.End() 72 resetTimer <- struct{}{} 73 now := time.Now().UnixMilli() 74 bs := buf.Bytes() 75 if mm.cli.SmearAudio { 76 smearedBuf := &bytes.Buffer{} 77 err := SmearAudioTimestamps(ctx, bytes.NewReader(buf.Bytes()), smearedBuf) 78 if err != nil { 79 log.Error(ctx, "error smearing audio timestamps", "error", err) 80 return 81 } 82 bs = smearedBuf.Bytes() 83 } 84 bs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 85 if err != nil { 86 log.Error(ctx, "error signing segment", "error", err) 87 return 88 } 89 90 err = mm.ValidateMP4(ctx, bytes.NewReader(bs)) 91 if err != nil { 92 log.Error(ctx, "error validating segment", "error", err) 93 globalerror.GlobalError(err) 94 return 95 } 96 }, 97 }) 98 }) 99 if err != nil { 100 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err) 101 } 102 103 return elem, nil 104}