Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "strings"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "stream.place/streamplace/pkg/aqio"
13 "stream.place/streamplace/pkg/bus"
14 "stream.place/streamplace/pkg/log"
15)
16
17// CombineSegments combines a list of segments into a single segment that maintains all of the manifests
18func CombineSegments(ctx context.Context, inputFds []io.ReadSeeker, ms MediaSigner, output io.ReadWriteSeeker) error {
19 rws := aqio.NewReadWriteSeeker([]byte{})
20 err := CombineSegmentsUnsigned(ctx, inputFds, rws, true)
21 if err != nil {
22 return err
23 }
24 // rewind all the inputs for the signer
25 for _, fd := range inputFds {
26 _, err := fd.Seek(0, io.SeekStart)
27 if err != nil {
28 return err
29 }
30 }
31 bs, err := rws.Bytes()
32 if err != nil {
33 return err
34 }
35 err = ms.SignConcatMP4(context.Background(), bytes.NewReader(bs), inputFds, output)
36 if err != nil {
37 return err
38 }
39 return nil
40}
41
42func CombineSegmentsUnsigned(ctx context.Context, sources []io.ReadSeeker, w io.Writer, doH264Parse bool) error {
43 ctx = log.WithLogValues(ctx, "mediafunc", "CombineSegmentsUnsigned")
44 ctx, cancel := context.WithCancel(ctx)
45 defer cancel()
46
47 pipelineSlice := []string{
48 fmt.Sprintf("mp4mux name=muxer faststart=true interleave-bytes=%d interleave-time=%d movie-timescale=60000 trak-timescale=60000 ! appsink sync=false name=mp4sink", InterleaveBytes, InterleaveTime),
49 "capsfilter caps=video/x-h264,parsed=true name=videoqueue ! queue ! muxer.",
50 "capsfilter caps=audio/x-opus,framed=true name=audioparse ! queue ! muxer.",
51 }
52
53 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
54 if err != nil {
55 return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
56 }
57
58 segCh := make(chan *bus.Seg)
59 go func() {
60 for _, source := range sources {
61 bs, err := io.ReadAll(source)
62 if err != nil {
63 err = fmt.Errorf("failed to read file: %w", err)
64 pipeline.Error(err.Error(), err)
65 return
66 }
67 segCh <- &bus.Seg{
68 Filepath: "ignored",
69 Data: bs,
70 }
71 }
72 close(segCh)
73 }()
74
75 concatBin, err := ConcatBin(ctx, segCh, doH264Parse)
76 if err != nil {
77 return fmt.Errorf("failed to create concat bin: %w", err)
78 }
79
80 err = pipeline.Add(concatBin.Element)
81 if err != nil {
82 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
83 }
84
85 videoPad := concatBin.GetStaticPad("video_0")
86 if videoPad == nil {
87 return fmt.Errorf("video pad not found")
88 }
89
90 audioPad := concatBin.GetStaticPad("audio_0")
91 if audioPad == nil {
92 return fmt.Errorf("audio pad not found")
93 }
94
95 // Get the videoparse and audioparse elements from the pipeline
96 videoQueue, err := pipeline.GetElementByName("videoqueue")
97 if err != nil {
98 return fmt.Errorf("failed to get video parse element: %w", err)
99 }
100
101 audioParse, err := pipeline.GetElementByName("audioparse")
102 if err != nil {
103 return fmt.Errorf("failed to get audio parse element: %w", err)
104 }
105
106 // Link the concat bin pads to the parse element sink pads
107 linked := videoPad.Link(videoQueue.GetStaticPad("sink"))
108 if linked != gst.PadLinkOK {
109 return fmt.Errorf("failed to link video pad to video parse element: %v", linked)
110 }
111
112 linked = audioPad.Link(audioParse.GetStaticPad("sink"))
113 if linked != gst.PadLinkOK {
114 return fmt.Errorf("failed to link audio pad to audio parse element: %v", linked)
115 }
116
117 // Get the mp4sink element and set up its callback
118 mp4Sink, err := pipeline.GetElementByName("mp4sink")
119 if err != nil {
120 return fmt.Errorf("failed to get mp4sink element: %w", err)
121 }
122
123 appSink := app.SinkFromElement(mp4Sink)
124 appSink.SetCallbacks(&app.SinkCallbacks{
125 NewSampleFunc: WriterNewSample(ctx, w),
126 })
127
128 errCh := make(chan error)
129 go func() {
130 err := HandleBusMessages(ctx, pipeline)
131 errCh <- err
132 }()
133
134 // Start the pipeline
135 err = pipeline.SetState(gst.StatePlaying)
136 if err != nil {
137 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
138 }
139 defer func() {
140 err := pipeline.BlockSetState(gst.StateNull)
141 if err != nil {
142 log.Error(ctx, "failed to set pipeline state to null", "error", err)
143 }
144 }()
145
146 err = <-errCh
147 if err != nil {
148 return fmt.Errorf("pipeline error: %w", err)
149 }
150
151 return nil
152}