Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "strings"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "stream.place/streamplace/pkg/log"
12)
13
14// ingest a H264+AAC MKV stream (prolly from an RTMP server)
15func (mm *MediaManager) MKVIngest(ctx context.Context, input io.Reader, ms MediaSigner) error {
16 ctx, cancel := context.WithCancel(ctx)
17 defer cancel()
18 pipelineSlice := []string{
19 "appsrc name=streamsrc ! matroskademux name=demux",
20 "demux. ! queue ! h264parse name=parse",
21 "demux. ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc",
22 }
23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
24 if err != nil {
25 return fmt.Errorf("error creating MKVIngest pipeline: %w", err)
26 }
27
28 srcele, err := pipeline.GetElementByName("streamsrc")
29 if err != nil {
30 return err
31 }
32 // defer runtime.KeepAlive(srcele)
33 src := app.SrcFromElement(srcele)
34 src.SetCallbacks(&app.SourceCallbacks{
35 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
36 })
37 parseEle, err := pipeline.GetElementByName("parse")
38 if err != nil {
39 return err
40 }
41
42 signer, err := mm.SegmentAndSignElem(ctx, ms)
43 if err != nil {
44 return err
45 }
46
47 err = pipeline.Add(signer)
48 if err != nil {
49 return err
50 }
51 err = parseEle.Link(signer)
52 if err != nil {
53 return err
54 }
55 audioenc, err := pipeline.GetElementByName("audioenc")
56 if err != nil {
57 return err
58 }
59 err = audioenc.Link(signer)
60 if err != nil {
61 return err
62 }
63
64 busErr := make(chan error)
65 go func() {
66 err := HandleBusMessages(ctx, pipeline)
67 cancel()
68 busErr <- err
69 }()
70
71 err = pipeline.SetState(gst.StatePlaying)
72 if err != nil {
73 return err
74 }
75
76 defer func() {
77 err := pipeline.SetState(gst.StateNull)
78 if err != nil {
79 log.Error(ctx, "error setting pipeline to null state", "error", err)
80 }
81 }()
82
83 <-busErr
84
85 return nil
86}