Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.8 399 lines 9.7 kB view raw
1package cmd 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "io" 8 "net/http" 9 "strings" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "github.com/pion/webrtc/v4" 15 pionmedia "github.com/pion/webrtc/v4/pkg/media" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/crypto/spkey" 18 "stream.place/streamplace/pkg/gstinit" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/media" 21) 22 23func WHIP(args []string) error { 24 fs := flag.NewFlagSet("whip", flag.ExitOnError) 25 streamKey := fs.String("stream-key", "", "stream key") 26 count := fs.Int("count", 1, "number of concurrent streams (for load testing)") 27 viewers := fs.Int("viewers", 0, "number of viewers to simulate per stream") 28 duration := fs.Duration("duration", 0, "duration of the stream") 29 file := fs.String("file", "", "file to stream (needs to be an MP4 containing H264 video and Opus audio)") 30 endpoint := fs.String("endpoint", "http://127.0.0.1:38080", "endpoint to send the WHIP request to") 31 freezeAfter := fs.Duration("freeze-after", 0, "freeze the stream after the given duration") 32 err := fs.Parse(args) 33 if *file == "" { 34 return fmt.Errorf("file is required") 35 } 36 if err != nil { 37 return err 38 } 39 gstinit.InitGST() 40 41 ctx := context.Background() 42 if *duration > 0 { 43 var cancel context.CancelFunc 44 ctx, cancel = context.WithTimeout(ctx, *duration) 45 defer cancel() 46 } 47 48 w := &WHIPClient{ 49 StreamKey: *streamKey, 50 File: *file, 51 Endpoint: *endpoint, 52 Count: *count, 53 FreezeAfter: *freezeAfter, 54 Viewers: *viewers, 55 } 56 57 return w.WHIP(ctx) 58} 59 60type WHIPClient struct { 61 StreamKey string 62 File string 63 Endpoint string 64 Count int 65 FreezeAfter time.Duration 66 Viewers int 67} 68 69var failureStates = []webrtc.ICEConnectionState{ 70 webrtc.ICEConnectionStateFailed, 71 webrtc.ICEConnectionStateDisconnected, 72 webrtc.ICEConnectionStateClosed, 73 webrtc.ICEConnectionStateCompleted, 74} 75 76type WHIPConnection struct { 77 peerConnection *webrtc.PeerConnection 78 audioTrack *webrtc.TrackLocalStaticSample 79 videoTrack *webrtc.TrackLocalStaticSample 80 did string 81} 82 83func (w *WHIPClient) WHIP(ctx context.Context) error { 84 ctx, cancel := context.WithCancel(ctx) 85 defer cancel() 86 87 pipelineSlice := []string{ 88 "filesrc name=filesrc ! qtdemux name=demux", 89 "demux.video_0 ! tee name=video_tee", 90 "demux.audio_0 ! tee name=audio_tee", 91 "video_tee. ! queue ! h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 92 "audio_tee. ! queue ! opusparse ! appsink name=audioappsink", 93 // "matroskamux name=mux ! fakesink name=fakesink sync=true", 94 // "video_tee. ! mux.video_0", 95 // "audio_tee. ! mux.audio_0", 96 } 97 98 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 99 if err != nil { 100 return err 101 } 102 103 fileSrc, err := pipeline.GetElementByName("filesrc") 104 if err != nil { 105 return err 106 } 107 108 if err := fileSrc.Set("location", w.File); err != nil { 109 return err 110 } 111 112 videoSink, err := pipeline.GetElementByName("videoappsink") 113 if err != nil { 114 return err 115 } 116 117 audioSink, err := pipeline.GetElementByName("audioappsink") 118 if err != nil { 119 return err 120 } 121 122 startTime := time.Now() 123 sinks := []*app.Sink{ 124 app.SinkFromElement(videoSink), 125 app.SinkFromElement(audioSink), 126 } 127 // Create accumulators for tracking elapsed duration 128 accumulators := make([]time.Duration, len(sinks)) 129 130 conns := make([]*WHIPConnection, w.Count) 131 g := &errgroup.Group{} 132 for i := 0; i < w.Count; i++ { 133 ctx := ctx 134 // var streamKey string 135 var did string 136 var streamKey string 137 if w.StreamKey != "" { 138 streamKey = w.StreamKey 139 } else { 140 priv, pub, err := spkey.GenerateStreamKey() 141 if err != nil { 142 return err 143 } 144 145 did = pub.DIDKey() 146 ctx = log.WithLogValues(ctx, "did", did) 147 streamKey = priv.Multibase() 148 } 149 150 g.Go(func() error { 151 conn, err := w.StartWHIPConnection(ctx, streamKey, did) 152 if err != nil { 153 return err 154 } 155 conns[i] = conn 156 ctx := log.WithLogValues(ctx, "did", did) 157 conn.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 158 log.Log(ctx, "WHIP connection State has changed", "state", connectionState.String()) 159 for _, state := range failureStates { 160 if connectionState == state { 161 log.Log(ctx, "connection failed, cancelling") 162 cancel() 163 } 164 } 165 }) 166 go func() { 167 <-ctx.Done() 168 if conn.peerConnection != nil { 169 conn.peerConnection.Close() 170 } 171 }() 172 return nil 173 }) 174 } 175 176 if err := g.Wait(); err != nil { 177 return err 178 } 179 180 // Start a ticker to print elapsed duration every second 181 go func() { 182 ticker := time.NewTicker(time.Second) 183 defer ticker.Stop() 184 185 for { 186 select { 187 case <-ctx.Done(): 188 return 189 case <-ticker.C: 190 for i, duration := range accumulators { 191 trackType := "video" 192 if i == 1 { 193 trackType = "audio" 194 } 195 target := startTime.Add(time.Duration(accumulators[i])) 196 diff := time.Since(target) 197 log.Debug(ctx, "elapsed duration", "track", trackType, "duration", duration, "diff", diff) 198 } 199 } 200 } 201 }() 202 203 errCh := make(chan error, 1) 204 205 for i := range sinks { 206 func(i int) { 207 sink := sinks[i] 208 trackType := "video" 209 if i == 1 { 210 trackType = "audio" 211 } 212 213 sink.SetCallbacks(&app.SinkCallbacks{ 214 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 215 216 sample := sink.PullSample() 217 if sample == nil { 218 return gst.FlowEOS 219 } 220 221 buffer := sample.GetBuffer() 222 if buffer == nil { 223 return gst.FlowError 224 } 225 226 samples := buffer.Map(gst.MapRead).Bytes() 227 defer buffer.Unmap() 228 229 durationPtr := buffer.Duration().AsDuration() 230 var duration time.Duration 231 if durationPtr == nil { 232 errCh <- fmt.Errorf("%v duration: nil", trackType) 233 return gst.FlowError 234 } else { 235 // fmt.Printf("%v duration: %v\n", trackType, *durationPtr) 236 duration = *durationPtr 237 } 238 239 accumulators[i] += duration 240 241 if w.FreezeAfter == 0 || time.Since(startTime) < w.FreezeAfter { 242 for _, conn := range conns { 243 if trackType == "video" { 244 if err := conn.videoTrack.WriteSample(pionmedia.Sample{Data: samples, Duration: duration}); err != nil { 245 log.Log(ctx, "error writing video sample", "error", err) 246 errCh <- err 247 return gst.FlowError 248 } 249 } else { 250 if err := conn.audioTrack.WriteSample(pionmedia.Sample{Data: samples, Duration: duration}); err != nil { 251 log.Log(ctx, "error writing video sample", "error", err) 252 errCh <- err 253 return gst.FlowError 254 } 255 } 256 } 257 } 258 259 return gst.FlowOK 260 }, 261 }) 262 }(i) 263 } 264 265 go func() { 266 if err := media.HandleBusMessages(ctx, pipeline); err != nil { 267 log.Log(ctx, "pipeline error", "error", err) 268 } 269 cancel() 270 }() 271 272 if err = pipeline.SetState(gst.StatePlaying); err != nil { 273 return err 274 } 275 if w.Viewers > 0 { 276 whepG, ctx := errgroup.WithContext(ctx) 277 for i := 0; i < w.Count; i++ { 278 did := conns[i].did 279 w := &WHEPClient{ 280 Endpoint: fmt.Sprintf("%s/api/playback/%s/webrtc", w.Endpoint, did), 281 Count: w.Viewers, 282 } 283 whepG.Go(func() error { 284 return w.WHEP(ctx) 285 }) 286 } 287 if err := whepG.Wait(); err != nil { 288 return err 289 } 290 } 291 292 <-ctx.Done() 293 err = pipeline.BlockSetState(gst.StateNull) 294 if err != nil { 295 return err 296 } 297 298 select { 299 case err := <-errCh: 300 return err 301 case <-ctx.Done(): 302 return ctx.Err() 303 } 304} 305 306func (w *WHIPClient) StartWHIPConnection(ctx context.Context, streamKey string, did string) (*WHIPConnection, error) { 307 308 // Prepare the configuration 309 config := webrtc.Configuration{} 310 311 // Create a new RTCPeerConnection 312 peerConnection, err := webrtc.NewPeerConnection(config) 313 if err != nil { 314 return nil, err 315 } 316 317 // Create a audio track 318 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "audio/opus"}, "audio", "pion1") 319 if err != nil { 320 return nil, err 321 } 322 _, err = peerConnection.AddTrack(audioTrack) 323 if err != nil { 324 return nil, err 325 } 326 327 // Create a video track 328 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/h264"}, "video", "pion2") 329 if err != nil { 330 return nil, err 331 } 332 _, err = peerConnection.AddTrack(videoTrack) 333 if err != nil { 334 return nil, err 335 } 336 337 // Create an offer 338 offer, err := peerConnection.CreateOffer(nil) 339 if err != nil { 340 return nil, err 341 } 342 343 // Set the generated offer as our LocalDescription 344 err = peerConnection.SetLocalDescription(offer) 345 if err != nil { 346 return nil, err 347 } 348 349 // Wait for ICE gathering to complete 350 // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 351 // <-gatherComplete 352 353 // Create HTTP client and prepare the request 354 client := &http.Client{} 355 356 // Send the WHIP request to the server 357 req, err := http.NewRequest("POST", w.Endpoint, strings.NewReader(offer.SDP)) 358 if err != nil { 359 return nil, err 360 } 361 req.Header.Set("Authorization", "Bearer "+streamKey) 362 req.Header.Set("Content-Type", "application/sdp") 363 364 // Execute the request 365 resp, err := client.Do(req) 366 if err != nil { 367 return nil, err 368 } 369 defer resp.Body.Close() 370 371 // Read and process the answer 372 answerBytes, err := io.ReadAll(resp.Body) 373 if err != nil { 374 return nil, err 375 } 376 377 // Parse the SDP answer 378 var answer webrtc.SessionDescription 379 answer.Type = webrtc.SDPTypeAnswer 380 answer.SDP = string(answerBytes) 381 382 // Apply the answer as remote description 383 err = peerConnection.SetRemoteDescription(answer) 384 if err != nil { 385 return nil, err 386 } 387 388 gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 389 <-gatherComplete 390 391 conn := &WHIPConnection{ 392 peerConnection: peerConnection, 393 audioTrack: audioTrack, 394 videoTrack: videoTrack, 395 did: did, 396 } 397 398 return conn, nil 399}