Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/federated-viewership 385 lines 9.1 kB view raw
1package cmd 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "net/http" 8 "strings" 9 "time" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "github.com/pion/webrtc/v4" 14 pionmedia "github.com/pion/webrtc/v4/pkg/media" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/crypto/spkey" 17 "stream.place/streamplace/pkg/gstinit" 18 "stream.place/streamplace/pkg/log" 19 "stream.place/streamplace/pkg/media" 20) 21 22func WHIP(ctx context.Context, streamKey string, count int, viewers int, duration time.Duration, file string, endpoint string, freezeAfter time.Duration) error { 23 if file == "" { 24 return fmt.Errorf("file is required") 25 } 26 gstinit.InitGST() 27 28 if duration > 0 { 29 var cancel context.CancelFunc 30 ctx, cancel = context.WithTimeout(ctx, duration) 31 defer cancel() 32 } 33 34 w := &WHIPClient{ 35 StreamKey: streamKey, 36 File: file, 37 Endpoint: endpoint, 38 Count: count, 39 FreezeAfter: freezeAfter, 40 Viewers: viewers, 41 } 42 43 return w.WHIP(ctx) 44} 45 46type WHIPClient struct { 47 StreamKey string 48 File string 49 Endpoint string 50 Count int 51 FreezeAfter time.Duration 52 Viewers int 53} 54 55var failureStates = []webrtc.ICEConnectionState{ 56 webrtc.ICEConnectionStateFailed, 57 webrtc.ICEConnectionStateDisconnected, 58 webrtc.ICEConnectionStateClosed, 59 webrtc.ICEConnectionStateCompleted, 60} 61 62type WHIPConnection struct { 63 peerConnection *webrtc.PeerConnection 64 audioTrack *webrtc.TrackLocalStaticSample 65 videoTrack *webrtc.TrackLocalStaticSample 66 did string 67} 68 69func (w *WHIPClient) WHIP(ctx context.Context) error { 70 ctx, cancel := context.WithCancel(ctx) 71 defer cancel() 72 73 pipelineSlice := []string{ 74 "filesrc name=filesrc ! qtdemux name=demux", 75 "demux.video_0 ! tee name=video_tee", 76 "demux.audio_0 ! tee name=audio_tee", 77 "video_tee. ! queue ! h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 78 "audio_tee. ! queue ! opusparse ! appsink name=audioappsink", 79 // "matroskamux name=mux ! fakesink name=fakesink sync=true", 80 // "video_tee. ! mux.video_0", 81 // "audio_tee. ! mux.audio_0", 82 } 83 84 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 85 if err != nil { 86 return err 87 } 88 89 fileSrc, err := pipeline.GetElementByName("filesrc") 90 if err != nil { 91 return err 92 } 93 94 if err := fileSrc.Set("location", w.File); err != nil { 95 return err 96 } 97 98 videoSink, err := pipeline.GetElementByName("videoappsink") 99 if err != nil { 100 return err 101 } 102 103 audioSink, err := pipeline.GetElementByName("audioappsink") 104 if err != nil { 105 return err 106 } 107 108 startTime := time.Now() 109 sinks := []*app.Sink{ 110 app.SinkFromElement(videoSink), 111 app.SinkFromElement(audioSink), 112 } 113 // Create accumulators for tracking elapsed duration 114 accumulators := make([]time.Duration, len(sinks)) 115 116 conns := make([]*WHIPConnection, w.Count) 117 g := &errgroup.Group{} 118 for i := 0; i < w.Count; i++ { 119 ctx := ctx 120 // var streamKey string 121 var did string 122 var streamKey string 123 if w.StreamKey != "" { 124 streamKey = w.StreamKey 125 } else { 126 priv, pub, err := spkey.GenerateStreamKey() 127 if err != nil { 128 return err 129 } 130 131 did = pub.DIDKey() 132 ctx = log.WithLogValues(ctx, "did", did) 133 streamKey = priv.Multibase() 134 } 135 136 g.Go(func() error { 137 conn, err := w.StartWHIPConnection(ctx, streamKey, did) 138 if err != nil { 139 return err 140 } 141 conns[i] = conn 142 ctx := log.WithLogValues(ctx, "did", did) 143 conn.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 144 log.Log(ctx, "WHIP connection State has changed", "state", connectionState.String()) 145 for _, state := range failureStates { 146 if connectionState == state { 147 log.Log(ctx, "connection failed, cancelling") 148 cancel() 149 } 150 } 151 }) 152 go func() { 153 <-ctx.Done() 154 if conn.peerConnection != nil { 155 conn.peerConnection.Close() 156 } 157 }() 158 return nil 159 }) 160 } 161 162 if err := g.Wait(); err != nil { 163 return err 164 } 165 166 // Start a ticker to print elapsed duration every second 167 go func() { 168 ticker := time.NewTicker(time.Second) 169 defer ticker.Stop() 170 171 for { 172 select { 173 case <-ctx.Done(): 174 return 175 case <-ticker.C: 176 for i, duration := range accumulators { 177 trackType := "video" 178 if i == 1 { 179 trackType = "audio" 180 } 181 target := startTime.Add(time.Duration(accumulators[i])) 182 diff := time.Since(target) 183 log.Debug(ctx, "elapsed duration", "track", trackType, "duration", duration, "diff", diff) 184 } 185 } 186 } 187 }() 188 189 errCh := make(chan error, 1) 190 191 for i := range sinks { 192 func(i int) { 193 sink := sinks[i] 194 trackType := "video" 195 if i == 1 { 196 trackType = "audio" 197 } 198 199 sink.SetCallbacks(&app.SinkCallbacks{ 200 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 201 202 sample := sink.PullSample() 203 if sample == nil { 204 return gst.FlowEOS 205 } 206 207 buffer := sample.GetBuffer() 208 if buffer == nil { 209 return gst.FlowError 210 } 211 212 samples := buffer.Map(gst.MapRead).Bytes() 213 defer buffer.Unmap() 214 215 durationPtr := buffer.Duration().AsDuration() 216 var duration time.Duration 217 if durationPtr == nil { 218 errCh <- fmt.Errorf("%v duration: nil", trackType) 219 return gst.FlowError 220 } else { 221 // fmt.Printf("%v duration: %v\n", trackType, *durationPtr) 222 duration = *durationPtr 223 } 224 225 accumulators[i] += duration 226 227 if w.FreezeAfter == 0 || time.Since(startTime) < w.FreezeAfter { 228 for _, conn := range conns { 229 if trackType == "video" { 230 if err := conn.videoTrack.WriteSample(pionmedia.Sample{Data: samples, Duration: duration}); err != nil { 231 log.Log(ctx, "error writing video sample", "error", err) 232 errCh <- err 233 return gst.FlowError 234 } 235 } else { 236 if err := conn.audioTrack.WriteSample(pionmedia.Sample{Data: samples, Duration: duration}); err != nil { 237 log.Log(ctx, "error writing video sample", "error", err) 238 errCh <- err 239 return gst.FlowError 240 } 241 } 242 } 243 } 244 245 return gst.FlowOK 246 }, 247 }) 248 }(i) 249 } 250 251 go func() { 252 if err := media.HandleBusMessages(ctx, pipeline); err != nil { 253 log.Log(ctx, "pipeline error", "error", err) 254 } 255 cancel() 256 }() 257 258 if err = pipeline.SetState(gst.StatePlaying); err != nil { 259 return err 260 } 261 if w.Viewers > 0 { 262 whepG, ctx := errgroup.WithContext(ctx) 263 for i := 0; i < w.Count; i++ { 264 did := conns[i].did 265 w := &WHEPClient{ 266 Endpoint: fmt.Sprintf("%s/api/playback/%s/webrtc", w.Endpoint, did), 267 Count: w.Viewers, 268 } 269 whepG.Go(func() error { 270 return w.WHEP(ctx) 271 }) 272 } 273 if err := whepG.Wait(); err != nil { 274 return err 275 } 276 } 277 278 <-ctx.Done() 279 err = pipeline.BlockSetState(gst.StateNull) 280 if err != nil { 281 return err 282 } 283 284 select { 285 case err := <-errCh: 286 return err 287 case <-ctx.Done(): 288 return ctx.Err() 289 } 290} 291 292func (w *WHIPClient) StartWHIPConnection(ctx context.Context, streamKey string, did string) (*WHIPConnection, error) { 293 294 // Prepare the configuration 295 config := webrtc.Configuration{} 296 297 // Create a new RTCPeerConnection 298 peerConnection, err := webrtc.NewPeerConnection(config) 299 if err != nil { 300 return nil, err 301 } 302 303 // Create a audio track 304 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "audio/opus"}, "audio", "pion1") 305 if err != nil { 306 return nil, err 307 } 308 _, err = peerConnection.AddTrack(audioTrack) 309 if err != nil { 310 return nil, err 311 } 312 313 // Create a video track 314 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/h264"}, "video", "pion2") 315 if err != nil { 316 return nil, err 317 } 318 _, err = peerConnection.AddTrack(videoTrack) 319 if err != nil { 320 return nil, err 321 } 322 323 // Create an offer 324 offer, err := peerConnection.CreateOffer(nil) 325 if err != nil { 326 return nil, err 327 } 328 329 // Set the generated offer as our LocalDescription 330 err = peerConnection.SetLocalDescription(offer) 331 if err != nil { 332 return nil, err 333 } 334 335 // Wait for ICE gathering to complete 336 // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 337 // <-gatherComplete 338 339 // Create HTTP client and prepare the request 340 client := &http.Client{} 341 342 // Send the WHIP request to the server 343 req, err := http.NewRequest("POST", w.Endpoint, strings.NewReader(offer.SDP)) 344 if err != nil { 345 return nil, err 346 } 347 req.Header.Set("Authorization", "Bearer "+streamKey) 348 req.Header.Set("Content-Type", "application/sdp") 349 350 // Execute the request 351 resp, err := client.Do(req) 352 if err != nil { 353 return nil, err 354 } 355 defer resp.Body.Close() 356 357 // Read and process the answer 358 answerBytes, err := io.ReadAll(resp.Body) 359 if err != nil { 360 return nil, err 361 } 362 363 // Parse the SDP answer 364 var answer webrtc.SessionDescription 365 answer.Type = webrtc.SDPTypeAnswer 366 answer.SDP = string(answerBytes) 367 368 // Apply the answer as remote description 369 err = peerConnection.SetRemoteDescription(answer) 370 if err != nil { 371 return nil, err 372 } 373 374 gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 375 <-gatherComplete 376 377 conn := &WHIPConnection{ 378 peerConnection: peerConnection, 379 audioTrack: audioTrack, 380 videoTrack: videoTrack, 381 did: did, 382 } 383 384 return conn, nil 385}