Live video on the AT Protocol
1package cmd
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net/http"
8 "strings"
9 "time"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "github.com/pion/webrtc/v4"
14 pionmedia "github.com/pion/webrtc/v4/pkg/media"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/crypto/spkey"
17 "stream.place/streamplace/pkg/gstinit"
18 "stream.place/streamplace/pkg/log"
19 "stream.place/streamplace/pkg/media"
20)
21
22func WHIP(ctx context.Context, streamKey string, count int, viewers int, duration time.Duration, file string, endpoint string, freezeAfter time.Duration) error {
23 if file == "" {
24 return fmt.Errorf("file is required")
25 }
26 gstinit.InitGST()
27
28 if duration > 0 {
29 var cancel context.CancelFunc
30 ctx, cancel = context.WithTimeout(ctx, duration)
31 defer cancel()
32 }
33
34 w := &WHIPClient{
35 StreamKey: streamKey,
36 File: file,
37 Endpoint: endpoint,
38 Count: count,
39 FreezeAfter: freezeAfter,
40 Viewers: viewers,
41 }
42
43 return w.WHIP(ctx)
44}
45
46type WHIPClient struct {
47 StreamKey string
48 File string
49 Endpoint string
50 Count int
51 FreezeAfter time.Duration
52 Viewers int
53}
54
55var failureStates = []webrtc.ICEConnectionState{
56 webrtc.ICEConnectionStateFailed,
57 webrtc.ICEConnectionStateDisconnected,
58 webrtc.ICEConnectionStateClosed,
59 webrtc.ICEConnectionStateCompleted,
60}
61
62type WHIPConnection struct {
63 peerConnection *webrtc.PeerConnection
64 audioTrack *webrtc.TrackLocalStaticSample
65 videoTrack *webrtc.TrackLocalStaticSample
66 did string
67}
68
69func (w *WHIPClient) WHIP(ctx context.Context) error {
70 ctx, cancel := context.WithCancel(ctx)
71 defer cancel()
72
73 pipelineSlice := []string{
74 "filesrc name=filesrc ! qtdemux name=demux",
75 "demux.video_0 ! tee name=video_tee",
76 "demux.audio_0 ! tee name=audio_tee",
77 "video_tee. ! queue ! h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink",
78 "audio_tee. ! queue ! opusparse ! appsink name=audioappsink",
79 // "matroskamux name=mux ! fakesink name=fakesink sync=true",
80 // "video_tee. ! mux.video_0",
81 // "audio_tee. ! mux.audio_0",
82 }
83
84 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
85 if err != nil {
86 return err
87 }
88
89 fileSrc, err := pipeline.GetElementByName("filesrc")
90 if err != nil {
91 return err
92 }
93
94 if err := fileSrc.Set("location", w.File); err != nil {
95 return err
96 }
97
98 videoSink, err := pipeline.GetElementByName("videoappsink")
99 if err != nil {
100 return err
101 }
102
103 audioSink, err := pipeline.GetElementByName("audioappsink")
104 if err != nil {
105 return err
106 }
107
108 startTime := time.Now()
109 sinks := []*app.Sink{
110 app.SinkFromElement(videoSink),
111 app.SinkFromElement(audioSink),
112 }
113 // Create accumulators for tracking elapsed duration
114 accumulators := make([]time.Duration, len(sinks))
115
116 conns := make([]*WHIPConnection, w.Count)
117 g := &errgroup.Group{}
118 for i := 0; i < w.Count; i++ {
119 ctx := ctx
120 // var streamKey string
121 var did string
122 var streamKey string
123 if w.StreamKey != "" {
124 streamKey = w.StreamKey
125 } else {
126 priv, pub, err := spkey.GenerateStreamKey()
127 if err != nil {
128 return err
129 }
130
131 did = pub.DIDKey()
132 ctx = log.WithLogValues(ctx, "did", did)
133 streamKey = priv.Multibase()
134 }
135
136 g.Go(func() error {
137 conn, err := w.StartWHIPConnection(ctx, streamKey, did)
138 if err != nil {
139 return err
140 }
141 conns[i] = conn
142 ctx := log.WithLogValues(ctx, "did", did)
143 conn.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
144 log.Log(ctx, "WHIP connection State has changed", "state", connectionState.String())
145 for _, state := range failureStates {
146 if connectionState == state {
147 log.Log(ctx, "connection failed, cancelling")
148 cancel()
149 }
150 }
151 })
152 go func() {
153 <-ctx.Done()
154 if conn.peerConnection != nil {
155 conn.peerConnection.Close()
156 }
157 }()
158 return nil
159 })
160 }
161
162 if err := g.Wait(); err != nil {
163 return err
164 }
165
166 // Start a ticker to print elapsed duration every second
167 go func() {
168 ticker := time.NewTicker(time.Second)
169 defer ticker.Stop()
170
171 for {
172 select {
173 case <-ctx.Done():
174 return
175 case <-ticker.C:
176 for i, duration := range accumulators {
177 trackType := "video"
178 if i == 1 {
179 trackType = "audio"
180 }
181 target := startTime.Add(time.Duration(accumulators[i]))
182 diff := time.Since(target)
183 log.Debug(ctx, "elapsed duration", "track", trackType, "duration", duration, "diff", diff)
184 }
185 }
186 }
187 }()
188
189 errCh := make(chan error, 1)
190
191 for i := range sinks {
192 func(i int) {
193 sink := sinks[i]
194 trackType := "video"
195 if i == 1 {
196 trackType = "audio"
197 }
198
199 sink.SetCallbacks(&app.SinkCallbacks{
200 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
201
202 sample := sink.PullSample()
203 if sample == nil {
204 return gst.FlowEOS
205 }
206
207 buffer := sample.GetBuffer()
208 if buffer == nil {
209 return gst.FlowError
210 }
211
212 samples := buffer.Map(gst.MapRead).Bytes()
213 defer buffer.Unmap()
214
215 durationPtr := buffer.Duration().AsDuration()
216 var duration time.Duration
217 if durationPtr == nil {
218 errCh <- fmt.Errorf("%v duration: nil", trackType)
219 return gst.FlowError
220 } else {
221 // fmt.Printf("%v duration: %v\n", trackType, *durationPtr)
222 duration = *durationPtr
223 }
224
225 accumulators[i] += duration
226
227 if w.FreezeAfter == 0 || time.Since(startTime) < w.FreezeAfter {
228 for _, conn := range conns {
229 if trackType == "video" {
230 if err := conn.videoTrack.WriteSample(pionmedia.Sample{Data: samples, Duration: duration}); err != nil {
231 log.Log(ctx, "error writing video sample", "error", err)
232 errCh <- err
233 return gst.FlowError
234 }
235 } else {
236 if err := conn.audioTrack.WriteSample(pionmedia.Sample{Data: samples, Duration: duration}); err != nil {
237 log.Log(ctx, "error writing video sample", "error", err)
238 errCh <- err
239 return gst.FlowError
240 }
241 }
242 }
243 }
244
245 return gst.FlowOK
246 },
247 })
248 }(i)
249 }
250
251 go func() {
252 if err := media.HandleBusMessages(ctx, pipeline); err != nil {
253 log.Log(ctx, "pipeline error", "error", err)
254 }
255 cancel()
256 }()
257
258 if err = pipeline.SetState(gst.StatePlaying); err != nil {
259 return err
260 }
261 if w.Viewers > 0 {
262 whepG, ctx := errgroup.WithContext(ctx)
263 for i := 0; i < w.Count; i++ {
264 did := conns[i].did
265 w := &WHEPClient{
266 Endpoint: fmt.Sprintf("%s/api/playback/%s/webrtc", w.Endpoint, did),
267 Count: w.Viewers,
268 }
269 whepG.Go(func() error {
270 return w.WHEP(ctx)
271 })
272 }
273 if err := whepG.Wait(); err != nil {
274 return err
275 }
276 }
277
278 <-ctx.Done()
279 err = pipeline.BlockSetState(gst.StateNull)
280 if err != nil {
281 return err
282 }
283
284 select {
285 case err := <-errCh:
286 return err
287 case <-ctx.Done():
288 return ctx.Err()
289 }
290}
291
292func (w *WHIPClient) StartWHIPConnection(ctx context.Context, streamKey string, did string) (*WHIPConnection, error) {
293
294 // Prepare the configuration
295 config := webrtc.Configuration{}
296
297 // Create a new RTCPeerConnection
298 peerConnection, err := webrtc.NewPeerConnection(config)
299 if err != nil {
300 return nil, err
301 }
302
303 // Create a audio track
304 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "audio/opus"}, "audio", "pion1")
305 if err != nil {
306 return nil, err
307 }
308 _, err = peerConnection.AddTrack(audioTrack)
309 if err != nil {
310 return nil, err
311 }
312
313 // Create a video track
314 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/h264"}, "video", "pion2")
315 if err != nil {
316 return nil, err
317 }
318 _, err = peerConnection.AddTrack(videoTrack)
319 if err != nil {
320 return nil, err
321 }
322
323 // Create an offer
324 offer, err := peerConnection.CreateOffer(nil)
325 if err != nil {
326 return nil, err
327 }
328
329 // Set the generated offer as our LocalDescription
330 err = peerConnection.SetLocalDescription(offer)
331 if err != nil {
332 return nil, err
333 }
334
335 // Wait for ICE gathering to complete
336 // gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
337 // <-gatherComplete
338
339 // Create HTTP client and prepare the request
340 client := &http.Client{}
341
342 // Send the WHIP request to the server
343 req, err := http.NewRequest("POST", w.Endpoint, strings.NewReader(offer.SDP))
344 if err != nil {
345 return nil, err
346 }
347 req.Header.Set("Authorization", "Bearer "+streamKey)
348 req.Header.Set("Content-Type", "application/sdp")
349
350 // Execute the request
351 resp, err := client.Do(req)
352 if err != nil {
353 return nil, err
354 }
355 defer resp.Body.Close()
356
357 // Read and process the answer
358 answerBytes, err := io.ReadAll(resp.Body)
359 if err != nil {
360 return nil, err
361 }
362
363 // Parse the SDP answer
364 var answer webrtc.SessionDescription
365 answer.Type = webrtc.SDPTypeAnswer
366 answer.SDP = string(answerBytes)
367
368 // Apply the answer as remote description
369 err = peerConnection.SetRemoteDescription(answer)
370 if err != nil {
371 return nil, err
372 }
373
374 gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
375 <-gatherComplete
376
377 conn := &WHIPConnection{
378 peerConnection: peerConnection,
379 audioTrack: audioTrack,
380 videoTrack: videoTrack,
381 did: did,
382 }
383
384 return conn, nil
385}