Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

webrtc_playback: buffer one segment before playback

Eli Mallon 513bea67 d316831e

+66 -14
+12 -1
pkg/media/packetize.go
··· 109 109 110 110 videoOutput = append(videoOutput, samples) 111 111 112 + // clockTime := buffer.Duration() 113 + // dur := clockTime.AsDuration() 114 + // if dur != nil { 115 + // log.Log(ctx, "video duration", "duration", *dur) 116 + // } else { 117 + // log.Error(ctx, "no video duration", "samples", len(samples)) 118 + // } 119 + 112 120 return gst.FlowOK 113 121 }, 114 122 EOSFunc: func(sink *app.Sink) { ··· 144 152 if dur != nil { 145 153 segDur += *dur 146 154 } else { 147 - return gst.FlowOK 155 + log.Log(ctx, "no audio duration", "samples", len(samples)) 156 + err := fmt.Errorf("no audio duration") 157 + pipeline.Error(err.Error(), err) 158 + return gst.FlowError 148 159 } 149 160 150 161 return gst.FlowOK
+14
pkg/media/packetize_test.go
··· 9 9 10 10 "github.com/stretchr/testify/require" 11 11 "go.uber.org/goleak" 12 + "golang.org/x/sync/errgroup" 12 13 "stream.place/streamplace/pkg/gstinit" 13 14 "stream.place/streamplace/pkg/media/segchanman" 14 15 ) ··· 19 20 defer checkGStreamerLeaks(t, before) 20 21 ignore := goleak.IgnoreCurrent() 21 22 defer goleak.VerifyNone(t, ignore) 23 + 24 + g, _ := errgroup.WithContext(context.Background()) 25 + for range streamplaceTestCount { 26 + g.Go(func() error { 27 + innerTestPacketize(t) 28 + return nil 29 + }) 30 + } 31 + err := g.Wait() 32 + require.NoError(t, err) 33 + } 34 + 35 + func innerTestPacketize(t *testing.T) { 22 36 filename := getFixture("sample-segment.mp4") 23 37 inputFile, err := os.Open(filename) 24 38 require.NoError(t, err)
+40 -13
pkg/media/webrtc_playback2.go
··· 8 8 "github.com/google/uuid" 9 9 "github.com/pion/webrtc/v4" 10 10 "github.com/pion/webrtc/v4/pkg/media" 11 + "golang.org/x/sync/errgroup" 11 12 "stream.place/streamplace/pkg/log" 12 13 "stream.place/streamplace/pkg/spmetrics" 13 14 ) ··· 118 119 } 119 120 }() 120 121 122 + lastPacketTime := time.Now() 123 + 124 + p1 := <-packetQueue 125 + p2 := <-packetQueue 126 + bufPacketQueue := make(chan *PacketizedSegment, 1024) 127 + go func() { 128 + bufPacketQueue <- p1 129 + bufPacketQueue <- p2 130 + for { 131 + select { 132 + case <-ctx.Done(): 133 + return 134 + case packet := <-packetQueue: 135 + bufPacketQueue <- packet 136 + } 137 + } 138 + }() 139 + 121 140 for { 122 141 select { 123 142 case <-ctx.Done(): 124 143 return 125 - case packet := <-packetQueue: 144 + case packet := <-bufPacketQueue: 126 145 videoDur := packet.Duration / time.Duration(len(packet.Video)) 127 146 audioDur := packet.Duration / time.Duration(len(packet.Audio)) 128 - go func() { 147 + g, _ := errgroup.WithContext(ctx) 148 + 149 + g.Go(func() error { 129 150 for _, video := range packet.Video { 151 + // log.Log(ctx, "writing video sample", "duration", videoDur) 130 152 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur}) 131 153 if err != nil { 132 - log.Error(ctx, "failed to write video sample", "error", err) 133 - cancel() 134 - return 154 + return fmt.Errorf("failed to write video sample: %w", err) 135 155 } 136 156 137 157 select { 138 158 case <-ctx.Done(): 139 - return 159 + return nil 140 160 case <-time.After(videoDur): 141 161 continue 142 162 } 143 163 } 144 - }() 145 - go func() { 164 + return nil 165 + }) 166 + g.Go(func() error { 167 + log.Log(ctx, "time since last packet", "time", time.Since(lastPacketTime)) 146 168 for _, audio := range packet.Audio { 147 169 err := audioTrack.WriteSample(media.Sample{Data: audio, Duration: audioDur}) 148 170 if err != nil { 149 - log.Error(ctx, "failed to write audio sample", "error", err) 150 - cancel() 151 - return 171 + return fmt.Errorf("failed to write audio sample: %w", err) 152 172 } 153 173 select { 154 174 case <-ctx.Done(): 155 - return 175 + return nil 156 176 case <-time.After(audioDur): 157 177 continue 158 178 } 159 179 } 160 - }() 180 + lastPacketTime = time.Now() 181 + return nil 182 + }) 183 + 184 + if err := g.Wait(); err != nil { 185 + log.Error(ctx, "failed to write samples", "error", err) 186 + cancel() 187 + } 161 188 } 162 189 } 163 190 }()