Live video on the AT Protocol
at eli/postgres 153 lines 4.6 kB view raw
1package livepeer 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "math/rand" 10 "mime" 11 "mime/multipart" 12 "net/http" 13 "strings" 14 "time" 15 16 "golang.org/x/net/context/ctxhttp" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/log" 19 "stream.place/streamplace/pkg/media" 20 "stream.place/streamplace/pkg/renditions" 21 "stream.place/streamplace/pkg/spmetrics" 22 "stream.place/streamplace/pkg/streamplace" 23) 24 25const SegmentsInFlight = 2 26 27type LivepeerSession struct { 28 SessionID string 29 Count int 30 GatewayURL string 31 Guard chan struct{} 32} 33 34// borrowed from catalyst-api 35func RandomTrailer(length int) string { 36 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 37 38 res := make([]byte, length) 39 for i := 0; i < length; i++ { 40 res[i] = charset[rand.Intn(len(charset))] 41 } 42 return string(res) 43} 44 45func NewLivepeerSession(ctx context.Context, did string, gatewayURL string) (*LivepeerSession, error) { 46 sessionID := RandomTrailer(8) 47 return &LivepeerSession{ 48 SessionID: strings.ReplaceAll(fmt.Sprintf("%s-%s", did, sessionID), ":", ""), 49 Count: 0, 50 GatewayURL: gatewayURL, 51 Guard: make(chan struct{}, SegmentsInFlight), 52 }, nil 53} 54 55func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) { 56 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway") 57 lpProfiles := rs.ToLivepeerProfiles() 58 sessionIDRen := fmt.Sprintf("%s-%dren", ls.SessionID, len(rs)) 59 transcodingConfiguration := map[string]any{ 60 "manifestID": sessionIDRen, 61 "profiles": lpProfiles, 62 } 63 bs, err := json.Marshal(transcodingConfiguration) 64 if err != nil { 65 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 66 } 67 tsSeg := bytes.Buffer{} 68 audioSeg := bytes.Buffer{} 69 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg) 70 if err != nil { 71 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err) 72 } 73 if tsSeg.Len() == 0 { 74 return nil, fmt.Errorf("no video in segment") 75 } 76 if audioSeg.Len() == 0 { 77 return nil, fmt.Errorf("no audio in segment") 78 } 79 ls.Guard <- struct{}{} 80 start := time.Now() 81 // check if context is done since we were waiting for the lock 82 if ctx.Err() != nil { 83 <-ls.Guard 84 return nil, ctx.Err() 85 } 86 ctx, cancel := context.WithTimeout(ctx, time.Minute*5) 87 defer cancel() 88 seqNo := ls.Count 89 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, sessionIDRen, seqNo) 90 ls.Count++ 91 92 dur := time.Duration(*spseg.Duration) 93 durationMs := int(dur.Milliseconds()) 94 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url) 95 96 vid := spseg.Video[0] 97 width := int(vid.Width) 98 height := int(vid.Height) 99 100 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes())) 101 if err != nil { 102 <-ls.Guard 103 return nil, fmt.Errorf("failed to create request: %w", err) 104 } 105 req.Header.Set("Accept", "multipart/mixed") 106 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs)) 107 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height)) 108 req.Header.Set("Livepeer-Transcode-Configuration", string(bs)) 109 110 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req) 111 if err != nil { 112 <-ls.Guard 113 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err) 114 } 115 <-ls.Guard 116 defer resp.Body.Close() 117 118 if resp.StatusCode != http.StatusOK { 119 errOut, _ := io.ReadAll(resp.Body) 120 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut)) 121 } 122 123 var out [][]byte 124 125 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) 126 if err != nil { 127 return nil, fmt.Errorf("failed to parse media type: %w", err) 128 } 129 if strings.HasPrefix(mediaType, "multipart/") { 130 mr := multipart.NewReader(resp.Body, params["boundary"]) 131 for { 132 p, err := mr.NextPart() 133 if err == io.EOF { 134 break 135 } 136 ctx := log.WithLogValues(ctx, "part", p.FileName()) 137 if err != nil { 138 return nil, fmt.Errorf("failed to get next part: %w", err) 139 } 140 mp4Bs := bytes.Buffer{} 141 audioReader := bytes.NewReader(audioSeg.Bytes()) 142 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs) 143 if err != nil { 144 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err) 145 } 146 bs := mp4Bs.Bytes() 147 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName()) 148 out = append(out, bs) 149 } 150 } 151 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds())) 152 return out, nil 153}