Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "os"
8 "strings"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "github.com/google/uuid"
13 "stream.place/streamplace/pkg/bus"
14 "stream.place/streamplace/pkg/log"
15)
16
17func readFile(ctx context.Context, source string) (*bus.Seg, error) {
18 fd, err := os.Open(source)
19 if err != nil {
20 return nil, fmt.Errorf("failed to open source file: %w", err)
21 }
22 defer fd.Close()
23 bs, err := io.ReadAll(fd)
24 if err != nil {
25 return nil, fmt.Errorf("failed to read source file: %w", err)
26 }
27 seg := &bus.Seg{
28 Filepath: source,
29 Data: bs,
30 }
31 return seg, nil
32}
33
34// This function remains in scope for the duration of a single users' playback
35func Clip(ctx context.Context, sources []string, w io.Writer) error {
36 uu, err := uuid.NewV7()
37 if err != nil {
38 return err
39 }
40 ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
41 ctx = log.WithLogValues(ctx, "mediafunc", "Clip")
42 ctx, cancel := context.WithCancel(ctx)
43 defer cancel()
44
45 pipelineSlice := []string{
46 "mp4mux faststart=true name=muxer ! appsink sync=false name=mp4sink",
47 "h264parse name=videoparse ! h264timestamper ! muxer.video_0",
48 "opusparse name=audioparse ! muxer.audio_0",
49 }
50
51 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
52 if err != nil {
53 return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
54 }
55
56 segCh := make(chan *bus.Seg)
57 go func() {
58 for _, source := range sources {
59 log.Log(ctx, "reading file", "source", source)
60 seg, err := readFile(ctx, source)
61 if err != nil {
62 err = fmt.Errorf("failed to read file: %w", err)
63 pipeline.Error(err.Error(), err)
64 return
65 }
66
67 segCh <- seg
68 }
69 close(segCh)
70 }()
71
72 concatBin, err := ConcatBin(ctx, segCh)
73 if err != nil {
74 return fmt.Errorf("failed to create concat bin: %w", err)
75 }
76
77 err = pipeline.Add(concatBin.Element)
78 if err != nil {
79 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
80 }
81
82 videoPad := concatBin.GetStaticPad("video_0")
83 if videoPad == nil {
84 return fmt.Errorf("video pad not found")
85 }
86
87 audioPad := concatBin.GetStaticPad("audio_0")
88 if audioPad == nil {
89 return fmt.Errorf("audio pad not found")
90 }
91
92 // Get the videoparse and audioparse elements from the pipeline
93 videoParse, err := pipeline.GetElementByName("videoparse")
94 if err != nil {
95 return fmt.Errorf("failed to get video parse element: %w", err)
96 }
97
98 audioParse, err := pipeline.GetElementByName("audioparse")
99 if err != nil {
100 return fmt.Errorf("failed to get audio parse element: %w", err)
101 }
102
103 // Link the concat bin pads to the parse element sink pads
104 linked := videoPad.Link(videoParse.GetStaticPad("sink"))
105 if linked != gst.PadLinkOK {
106 return fmt.Errorf("failed to link video pad to video parse element: %v", linked)
107 }
108
109 linked = audioPad.Link(audioParse.GetStaticPad("sink"))
110 if linked != gst.PadLinkOK {
111 return fmt.Errorf("failed to link audio pad to audio parse element: %v", linked)
112 }
113
114 // Get the mp4sink element and set up its callback
115 mp4Sink, err := pipeline.GetElementByName("mp4sink")
116 if err != nil {
117 return fmt.Errorf("failed to get mp4sink element: %w", err)
118 }
119
120 eos := make(chan struct{})
121
122 appSink := app.SinkFromElement(mp4Sink)
123 appSink.SetCallbacks(&app.SinkCallbacks{
124 NewSampleFunc: WriterNewSample(ctx, w),
125 EOSFunc: func(sink *app.Sink) {
126 close(eos)
127 },
128 })
129
130 // Start the pipeline
131 err = pipeline.SetState(gst.StatePlaying)
132 if err != nil {
133 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
134 }
135 defer func() {
136 err := pipeline.BlockSetState(gst.StateNull)
137 if err != nil {
138 log.Error(ctx, "failed to set pipeline state to null", "error", err)
139 }
140 }()
141
142 // Handle bus messages
143 err = HandleBusMessages(ctx, pipeline)
144
145 <-eos
146
147 if err != nil {
148 return fmt.Errorf("pipeline error: %w", err)
149 }
150
151 return nil
152}