Live video on the AT Protocol

livepeer: keep two segments in flight (and add metrics)

See merge request streamplace/streamplace!125

Changelog: feature

Eli Mallon ec183b20 a10df6e6

+32 -15
+4 -3
pkg/atproto/firehose.go
··· 227 228 case repomgr.EvtKindDeleteRecord: 229 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW { 230 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 231 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String()) 232 if err != nil { ··· 246 } 247 } 248 249 - if err != nil { 250 - return err 251 - } 252 default: 253 log.Error(ctx, "unexpected record op kind") 254 }
··· 227 228 case repomgr.EvtKindDeleteRecord: 229 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW { 230 + if r == nil { 231 + log.Debug(ctx, "no repo found for follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 232 + continue 233 + } 234 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 235 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String()) 236 if err != nil { ··· 250 } 251 } 252 253 default: 254 log.Error(ctx, "unexpected record op kind") 255 }
+1
pkg/director/stream_session.go
··· 135 } else { 136 log.Log(ctx, "transcoded segment", "took", took) 137 } 138 }() 139 } 140
··· 135 } else { 136 log.Log(ctx, "transcoded segment", "took", took) 137 } 138 + spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(time.Since(start).Milliseconds())) 139 }() 140 } 141
+15 -11
pkg/livepeer/livepeer.go
··· 10 "mime/multipart" 11 "net/http" 12 "strings" 13 - "sync" 14 "time" 15 16 "golang.org/x/net/context/ctxhttp" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/log" 19 "stream.place/streamplace/pkg/streamplace" 20 ) 21 22 type LivepeerSession struct { 23 SessionID string 24 Count int 25 GatewayURL string 26 - SegLock sync.Mutex 27 } 28 29 // borrowed from catalyst-api ··· 43 SessionID: fmt.Sprintf("%s-%s", did, sessionID), 44 Count: 0, 45 GatewayURL: gatewayURL, 46 }, nil 47 } 48 49 - func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, seg *streamplace.Segment) ([][]byte, error) { 50 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway") 51 - ls.SegLock.Lock() 52 // check if context is done since we were waiting for the lock 53 if ctx.Err() != nil { 54 - ls.SegLock.Unlock() 55 return nil, ctx.Err() 56 } 57 ctx, cancel := context.WithTimeout(ctx, time.Minute*5) ··· 59 url := fmt.Sprintf("%s/live/%s/%d.mp4", ls.GatewayURL, ls.SessionID, ls.Count) 60 ls.Count++ 61 62 - dur := time.Duration(*seg.Duration) 63 durationMs := int(dur.Milliseconds()) 64 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url) 65 66 - vid := seg.Video[0] 67 width := int(vid.Width) 68 height := int(vid.Height) 69 70 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(buf)) 71 if err != nil { 72 - ls.SegLock.Unlock() 73 return nil, fmt.Errorf("failed to create request: %w", err) 74 } 75 req.Header.Set("Accept", "multipart/mixed") ··· 78 79 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req) 80 if err != nil { 81 - ls.SegLock.Unlock() 82 return nil, fmt.Errorf("failed to send segment to gateway: %w", err) 83 } 84 - ls.SegLock.Unlock() 85 defer resp.Body.Close() 86 87 if resp.StatusCode != http.StatusOK { ··· 113 out = append(out, bs) 114 } 115 } 116 - 117 return out, nil 118 }
··· 10 "mime/multipart" 11 "net/http" 12 "strings" 13 "time" 14 15 "golang.org/x/net/context/ctxhttp" 16 "stream.place/streamplace/pkg/aqhttp" 17 "stream.place/streamplace/pkg/log" 18 + "stream.place/streamplace/pkg/spmetrics" 19 "stream.place/streamplace/pkg/streamplace" 20 ) 21 22 + const SEGMENTS_IN_FLIGHT = 2 23 + 24 type LivepeerSession struct { 25 SessionID string 26 Count int 27 GatewayURL string 28 + Guard chan struct{} 29 } 30 31 // borrowed from catalyst-api ··· 45 SessionID: fmt.Sprintf("%s-%s", did, sessionID), 46 Count: 0, 47 GatewayURL: gatewayURL, 48 + Guard: make(chan struct{}, SEGMENTS_IN_FLIGHT), 49 }, nil 50 } 51 52 + func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment) ([][]byte, error) { 53 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway") 54 + ls.Guard <- struct{}{} 55 + start := time.Now() 56 // check if context is done since we were waiting for the lock 57 if ctx.Err() != nil { 58 + <-ls.Guard 59 return nil, ctx.Err() 60 } 61 ctx, cancel := context.WithTimeout(ctx, time.Minute*5) ··· 63 url := fmt.Sprintf("%s/live/%s/%d.mp4", ls.GatewayURL, ls.SessionID, ls.Count) 64 ls.Count++ 65 66 + dur := time.Duration(*spseg.Duration) 67 durationMs := int(dur.Milliseconds()) 68 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url) 69 70 + vid := spseg.Video[0] 71 width := int(vid.Width) 72 height := int(vid.Height) 73 74 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(buf)) 75 if err != nil { 76 + <-ls.Guard 77 return nil, fmt.Errorf("failed to create request: %w", err) 78 } 79 req.Header.Set("Accept", "multipart/mixed") ··· 82 83 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req) 84 if err != nil { 85 + <-ls.Guard 86 return nil, fmt.Errorf("failed to send segment to gateway: %w", err) 87 } 88 + <-ls.Guard 89 defer resp.Body.Close() 90 91 if resp.StatusCode != http.StatusOK { ··· 117 out = append(out, bs) 118 } 119 } 120 + spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds())) 121 return out, nil 122 }
+12 -1
pkg/spmetrics/spmetrics.go
··· 20 var Viewers = promauto.NewGaugeVec(prometheus.GaugeOpts{ 21 Name: "streamplace_viewers", 22 Help: "number of current viewers per user", 23 - }, []string{"user"}) 24 25 var ViewersTotal = promauto.NewGauge(prometheus.GaugeOpts{ 26 Name: "streamplace_viewers_total", ··· 41 Name: "streamplace_transcode_errors_total", 42 Help: "total number of transcode errors", 43 }) 44 45 var Version = promauto.NewCounterVec(prometheus.CounterOpts{ 46 Name: "streamplace_version",
··· 20 var Viewers = promauto.NewGaugeVec(prometheus.GaugeOpts{ 21 Name: "streamplace_viewers", 22 Help: "number of current viewers per user", 23 + }, []string{"streamer"}) 24 25 var ViewersTotal = promauto.NewGauge(prometheus.GaugeOpts{ 26 Name: "streamplace_viewers_total", ··· 41 Name: "streamplace_transcode_errors_total", 42 Help: "total number of transcode errors", 43 }) 44 + 45 + var TranscodeDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 46 + Name: "streamplace_transcode_duration_ms", 47 + Help: "duration of transcode in ms", 48 + Buckets: []float64{0, 250, 500, 750, 1000, 1250, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 10000}, 49 + }, []string{"streamer"}) 50 + 51 + var QueuedTranscodeDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ 52 + Name: "streamplace_queued_transcode_duration_ms", 53 + Help: "duration of transcode in ms, including time spent waiting", 54 + }, []string{"streamer"}) 55 56 var Version = promauto.NewCounterVec(prometheus.CounterOpts{ 57 Name: "streamplace_version",