Live video on the AT Protocol
at eli/database-resync 152 lines 4.5 kB view raw
1package livepeer 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "math/rand" 10 "mime" 11 "mime/multipart" 12 "net/http" 13 "strings" 14 "time" 15 16 "golang.org/x/net/context/ctxhttp" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/log" 19 "stream.place/streamplace/pkg/media" 20 "stream.place/streamplace/pkg/renditions" 21 "stream.place/streamplace/pkg/spmetrics" 22 "stream.place/streamplace/pkg/streamplace" 23) 24 25const SegmentsInFlight = 2 26 27type LivepeerSession struct { 28 SessionID string 29 Count int 30 GatewayURL string 31 Guard chan struct{} 32} 33 34// borrowed from catalyst-api 35func RandomTrailer(length int) string { 36 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 37 38 res := make([]byte, length) 39 for i := 0; i < length; i++ { 40 res[i] = charset[rand.Intn(len(charset))] 41 } 42 return string(res) 43} 44 45func NewLivepeerSession(ctx context.Context, did string, gatewayURL string) (*LivepeerSession, error) { 46 sessionID := RandomTrailer(8) 47 return &LivepeerSession{ 48 SessionID: fmt.Sprintf("%s-%s", did, sessionID), 49 Count: 0, 50 GatewayURL: gatewayURL, 51 Guard: make(chan struct{}, SegmentsInFlight), 52 }, nil 53} 54 55func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) { 56 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway") 57 lpProfiles := rs.ToLivepeerProfiles() 58 transcodingConfiguration := map[string]any{ 59 "manifestID": ls.SessionID, 60 "profiles": lpProfiles, 61 } 62 bs, err := json.Marshal(transcodingConfiguration) 63 if err != nil { 64 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 65 } 66 tsSeg := bytes.Buffer{} 67 audioSeg := bytes.Buffer{} 68 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg) 69 if err != nil { 70 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err) 71 } 72 if tsSeg.Len() == 0 { 73 return nil, fmt.Errorf("no video in segment") 74 } 75 if audioSeg.Len() == 0 { 76 return nil, fmt.Errorf("no audio in segment") 77 } 78 ls.Guard <- struct{}{} 79 start := time.Now() 80 // check if context is done since we were waiting for the lock 81 if ctx.Err() != nil { 82 <-ls.Guard 83 return nil, ctx.Err() 84 } 85 ctx, cancel := context.WithTimeout(ctx, time.Minute*5) 86 defer cancel() 87 seqNo := ls.Count 88 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, ls.SessionID, seqNo) 89 ls.Count++ 90 91 dur := time.Duration(*spseg.Duration) 92 durationMs := int(dur.Milliseconds()) 93 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url) 94 95 vid := spseg.Video[0] 96 width := int(vid.Width) 97 height := int(vid.Height) 98 99 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes())) 100 if err != nil { 101 <-ls.Guard 102 return nil, fmt.Errorf("failed to create request: %w", err) 103 } 104 req.Header.Set("Accept", "multipart/mixed") 105 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs)) 106 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height)) 107 req.Header.Set("Livepeer-Transcode-Configuration", string(bs)) 108 109 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req) 110 if err != nil { 111 <-ls.Guard 112 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err) 113 } 114 <-ls.Guard 115 defer resp.Body.Close() 116 117 if resp.StatusCode != http.StatusOK { 118 errOut, _ := io.ReadAll(resp.Body) 119 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut)) 120 } 121 122 var out [][]byte 123 124 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) 125 if err != nil { 126 return nil, fmt.Errorf("failed to parse media type: %w", err) 127 } 128 if strings.HasPrefix(mediaType, "multipart/") { 129 mr := multipart.NewReader(resp.Body, params["boundary"]) 130 for { 131 p, err := mr.NextPart() 132 if err == io.EOF { 133 break 134 } 135 ctx := log.WithLogValues(ctx, "part", p.FileName()) 136 if err != nil { 137 return nil, fmt.Errorf("failed to get next part: %w", err) 138 } 139 mp4Bs := bytes.Buffer{} 140 audioReader := bytes.NewReader(audioSeg.Bytes()) 141 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs) 142 if err != nil { 143 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err) 144 } 145 bs := mp4Bs.Bytes() 146 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName()) 147 out = append(out, bs) 148 } 149 } 150 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds())) 151 return out, nil 152}