Live video on the AT Protocol
at natb/command-errors 152 lines 4.2 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "strings" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "stream.place/streamplace/pkg/aqio" 13 "stream.place/streamplace/pkg/bus" 14 "stream.place/streamplace/pkg/log" 15) 16 17// CombineSegments combines a list of segments into a single segment that maintains all of the manifests 18func CombineSegments(ctx context.Context, inputFds []io.ReadSeeker, ms MediaSigner, output io.ReadWriteSeeker) error { 19 rws := aqio.NewReadWriteSeeker([]byte{}) 20 err := CombineSegmentsUnsigned(ctx, inputFds, rws, true) 21 if err != nil { 22 return err 23 } 24 // rewind all the inputs for the signer 25 for _, fd := range inputFds { 26 _, err := fd.Seek(0, io.SeekStart) 27 if err != nil { 28 return err 29 } 30 } 31 bs, err := rws.Bytes() 32 if err != nil { 33 return err 34 } 35 err = ms.SignConcatMP4(context.Background(), bytes.NewReader(bs), inputFds, output) 36 if err != nil { 37 return err 38 } 39 return nil 40} 41 42func CombineSegmentsUnsigned(ctx context.Context, sources []io.ReadSeeker, w io.Writer, doH264Parse bool) error { 43 ctx = log.WithLogValues(ctx, "mediafunc", "CombineSegmentsUnsigned") 44 ctx, cancel := context.WithCancel(ctx) 45 defer cancel() 46 47 pipelineSlice := []string{ 48 fmt.Sprintf("mp4mux name=muxer faststart=true interleave-bytes=%d interleave-time=%d movie-timescale=60000 trak-timescale=60000 ! appsink sync=false name=mp4sink", InterleaveBytes, InterleaveTime), 49 "capsfilter caps=video/x-h264,parsed=true name=videoqueue ! queue ! muxer.", 50 "capsfilter caps=audio/x-opus,framed=true name=audioparse ! queue ! muxer.", 51 } 52 53 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 54 if err != nil { 55 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 56 } 57 58 segCh := make(chan *bus.Seg) 59 go func() { 60 for _, source := range sources { 61 bs, err := io.ReadAll(source) 62 if err != nil { 63 err = fmt.Errorf("failed to read file: %w", err) 64 pipeline.Error(err.Error(), err) 65 return 66 } 67 segCh <- &bus.Seg{ 68 Filepath: "ignored", 69 Data: bs, 70 } 71 } 72 close(segCh) 73 }() 74 75 concatBin, err := ConcatBin(ctx, segCh, doH264Parse) 76 if err != nil { 77 return fmt.Errorf("failed to create concat bin: %w", err) 78 } 79 80 err = pipeline.Add(concatBin.Element) 81 if err != nil { 82 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 83 } 84 85 videoPad := concatBin.GetStaticPad("video_0") 86 if videoPad == nil { 87 return fmt.Errorf("video pad not found") 88 } 89 90 audioPad := concatBin.GetStaticPad("audio_0") 91 if audioPad == nil { 92 return fmt.Errorf("audio pad not found") 93 } 94 95 // Get the videoparse and audioparse elements from the pipeline 96 videoQueue, err := pipeline.GetElementByName("videoqueue") 97 if err != nil { 98 return fmt.Errorf("failed to get video parse element: %w", err) 99 } 100 101 audioParse, err := pipeline.GetElementByName("audioparse") 102 if err != nil { 103 return fmt.Errorf("failed to get audio parse element: %w", err) 104 } 105 106 // Link the concat bin pads to the parse element sink pads 107 linked := videoPad.Link(videoQueue.GetStaticPad("sink")) 108 if linked != gst.PadLinkOK { 109 return fmt.Errorf("failed to link video pad to video parse element: %v", linked) 110 } 111 112 linked = audioPad.Link(audioParse.GetStaticPad("sink")) 113 if linked != gst.PadLinkOK { 114 return fmt.Errorf("failed to link audio pad to audio parse element: %v", linked) 115 } 116 117 // Get the mp4sink element and set up its callback 118 mp4Sink, err := pipeline.GetElementByName("mp4sink") 119 if err != nil { 120 return fmt.Errorf("failed to get mp4sink element: %w", err) 121 } 122 123 appSink := app.SinkFromElement(mp4Sink) 124 appSink.SetCallbacks(&app.SinkCallbacks{ 125 NewSampleFunc: WriterNewSample(ctx, w), 126 }) 127 128 errCh := make(chan error) 129 go func() { 130 err := HandleBusMessages(ctx, pipeline) 131 errCh <- err 132 }() 133 134 // Start the pipeline 135 err = pipeline.SetState(gst.StatePlaying) 136 if err != nil { 137 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 138 } 139 defer func() { 140 err := pipeline.BlockSetState(gst.StateNull) 141 if err != nil { 142 log.Error(ctx, "failed to set pipeline state to null", "error", err) 143 } 144 }() 145 146 err = <-errCh 147 if err != nil { 148 return fmt.Errorf("pipeline error: %w", err) 149 } 150 151 return nil 152}