Live video on the AT Protocol
at eli/multitesting 152 lines 3.8 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "os" 8 "strings" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "github.com/google/uuid" 13 "stream.place/streamplace/pkg/bus" 14 "stream.place/streamplace/pkg/log" 15) 16 17func readFile(ctx context.Context, source string) (*bus.Seg, error) { 18 fd, err := os.Open(source) 19 if err != nil { 20 return nil, fmt.Errorf("failed to open source file: %w", err) 21 } 22 defer fd.Close() 23 bs, err := io.ReadAll(fd) 24 if err != nil { 25 return nil, fmt.Errorf("failed to read source file: %w", err) 26 } 27 seg := &bus.Seg{ 28 Filepath: source, 29 Data: bs, 30 } 31 return seg, nil 32} 33 34// This function remains in scope for the duration of a single users' playback 35func Clip(ctx context.Context, sources []string, w io.Writer) error { 36 uu, err := uuid.NewV7() 37 if err != nil { 38 return err 39 } 40 ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 41 ctx = log.WithLogValues(ctx, "mediafunc", "Clip") 42 ctx, cancel := context.WithCancel(ctx) 43 defer cancel() 44 45 pipelineSlice := []string{ 46 "mp4mux faststart=true name=muxer ! appsink sync=false name=mp4sink", 47 "h264parse name=videoparse ! h264timestamper ! muxer.video_0", 48 "opusparse name=audioparse ! muxer.audio_0", 49 } 50 51 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 52 if err != nil { 53 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 54 } 55 56 segCh := make(chan *bus.Seg) 57 go func() { 58 for _, source := range sources { 59 log.Log(ctx, "reading file", "source", source) 60 seg, err := readFile(ctx, source) 61 if err != nil { 62 err = fmt.Errorf("failed to read file: %w", err) 63 pipeline.Error(err.Error(), err) 64 return 65 } 66 67 segCh <- seg 68 } 69 close(segCh) 70 }() 71 72 concatBin, err := ConcatBin(ctx, segCh) 73 if err != nil { 74 return fmt.Errorf("failed to create concat bin: %w", err) 75 } 76 77 err = pipeline.Add(concatBin.Element) 78 if err != nil { 79 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 80 } 81 82 videoPad := concatBin.GetStaticPad("video_0") 83 if videoPad == nil { 84 return fmt.Errorf("video pad not found") 85 } 86 87 audioPad := concatBin.GetStaticPad("audio_0") 88 if audioPad == nil { 89 return fmt.Errorf("audio pad not found") 90 } 91 92 // Get the videoparse and audioparse elements from the pipeline 93 videoParse, err := pipeline.GetElementByName("videoparse") 94 if err != nil { 95 return fmt.Errorf("failed to get video parse element: %w", err) 96 } 97 98 audioParse, err := pipeline.GetElementByName("audioparse") 99 if err != nil { 100 return fmt.Errorf("failed to get audio parse element: %w", err) 101 } 102 103 // Link the concat bin pads to the parse element sink pads 104 linked := videoPad.Link(videoParse.GetStaticPad("sink")) 105 if linked != gst.PadLinkOK { 106 return fmt.Errorf("failed to link video pad to video parse element: %v", linked) 107 } 108 109 linked = audioPad.Link(audioParse.GetStaticPad("sink")) 110 if linked != gst.PadLinkOK { 111 return fmt.Errorf("failed to link audio pad to audio parse element: %v", linked) 112 } 113 114 // Get the mp4sink element and set up its callback 115 mp4Sink, err := pipeline.GetElementByName("mp4sink") 116 if err != nil { 117 return fmt.Errorf("failed to get mp4sink element: %w", err) 118 } 119 120 eos := make(chan struct{}) 121 122 appSink := app.SinkFromElement(mp4Sink) 123 appSink.SetCallbacks(&app.SinkCallbacks{ 124 NewSampleFunc: WriterNewSample(ctx, w), 125 EOSFunc: func(sink *app.Sink) { 126 close(eos) 127 }, 128 }) 129 130 // Start the pipeline 131 err = pipeline.SetState(gst.StatePlaying) 132 if err != nil { 133 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 134 } 135 defer func() { 136 err := pipeline.BlockSetState(gst.StateNull) 137 if err != nil { 138 log.Error(ctx, "failed to set pipeline state to null", "error", err) 139 } 140 }() 141 142 // Handle bus messages 143 err = HandleBusMessages(ctx, pipeline) 144 145 <-eos 146 147 if err != nil { 148 return fmt.Errorf("pipeline error: %w", err) 149 } 150 151 return nil 152}