Live video on the AT Protocol
at eli/docker-deployment-docs 190 lines 6.1 kB view raw
1package livepeer 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "math/rand" 10 "mime" 11 "mime/multipart" 12 "net/http" 13 "os" 14 "strings" 15 "time" 16 17 "golang.org/x/net/context/ctxhttp" 18 "stream.place/streamplace/pkg/aqhttp" 19 "stream.place/streamplace/pkg/config" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/media" 22 "stream.place/streamplace/pkg/renditions" 23 "stream.place/streamplace/pkg/spmetrics" 24 "stream.place/streamplace/pkg/streamplace" 25) 26 27const SegmentsInFlight = 2 28 29type LivepeerSession struct { 30 SessionID string 31 Count int 32 GatewayURL string 33 Guard chan struct{} 34 CLI *config.CLI 35} 36 37// borrowed from catalyst-api 38func RandomTrailer(length int) string { 39 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 40 41 res := make([]byte, length) 42 for i := 0; i < length; i++ { 43 res[i] = charset[rand.Intn(len(charset))] 44 } 45 return string(res) 46} 47 48func NewLivepeerSession(ctx context.Context, cli *config.CLI, did string, gatewayURL string) (*LivepeerSession, error) { 49 sessionID := fmt.Sprintf("%s-%s", did, RandomTrailer(8)) 50 sessionID = strings.ReplaceAll(sessionID, ":", "") 51 sessionID = strings.ReplaceAll(sessionID, ".", "") 52 return &LivepeerSession{ 53 SessionID: sessionID, 54 Count: 0, 55 GatewayURL: gatewayURL, 56 Guard: make(chan struct{}, SegmentsInFlight), 57 CLI: cli, 58 }, nil 59} 60 61func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) { 62 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway") 63 lpProfiles := rs.ToLivepeerProfiles() 64 sessionIDRen := fmt.Sprintf("%s-%dren", ls.SessionID, len(rs)) 65 transcodingConfiguration := map[string]any{ 66 "manifestID": sessionIDRen, 67 "profiles": lpProfiles, 68 } 69 bs, err := json.Marshal(transcodingConfiguration) 70 if err != nil { 71 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 72 } 73 tsSeg := bytes.Buffer{} 74 audioSeg := bytes.Buffer{} 75 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg) 76 if err != nil { 77 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err) 78 } 79 if tsSeg.Len() == 0 { 80 return nil, fmt.Errorf("no video in segment") 81 } 82 if audioSeg.Len() == 0 { 83 return nil, fmt.Errorf("no audio in segment") 84 } 85 ls.Guard <- struct{}{} 86 start := time.Now() 87 // check if context is done since we were waiting for the lock 88 if ctx.Err() != nil { 89 <-ls.Guard 90 return nil, ctx.Err() 91 } 92 ctx, cancel := context.WithTimeout(ctx, time.Minute*5) 93 defer cancel() 94 seqNo := ls.Count 95 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, sessionIDRen, seqNo) 96 ls.Count++ 97 98 dur := time.Duration(*spseg.Duration) 99 durationMs := int(dur.Milliseconds()) 100 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url) 101 102 vid := spseg.Video[0] 103 width := int(vid.Width) 104 height := int(vid.Height) 105 106 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes())) 107 if err != nil { 108 <-ls.Guard 109 return nil, fmt.Errorf("failed to create request: %w", err) 110 } 111 req.Header.Set("Accept", "multipart/mixed") 112 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs)) 113 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height)) 114 req.Header.Set("Livepeer-Transcode-Configuration", string(bs)) 115 116 if ls.CLI.LivepeerDebug { 117 debugDir := ls.CLI.DataFilePath([]string{"livepeer-debug"}) 118 err = os.MkdirAll(debugDir, 0755) 119 if err != nil { 120 return nil, fmt.Errorf("failed to create debug directory: %w", err) 121 } 122 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-input.ts", ls.CLI.DataDir, sessionIDRen, seqNo) 123 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 124 if err != nil { 125 return nil, fmt.Errorf("failed to write debug file: %w", err) 126 } 127 bs, err := json.MarshalIndent(req.Header, "", " ") 128 if err != nil { 129 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 130 } 131 configFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-config.json", ls.CLI.DataDir, sessionIDRen, seqNo) 132 err = os.WriteFile(configFile, bs, 0644) 133 if err != nil { 134 return nil, fmt.Errorf("failed to write debug file: %w", err) 135 } 136 log.Log(ctx, "wrote debug file", "file", debugFile) 137 } 138 139 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req) 140 if err != nil { 141 <-ls.Guard 142 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err) 143 } 144 <-ls.Guard 145 defer resp.Body.Close() 146 147 if resp.StatusCode != http.StatusOK { 148 errOut, _ := io.ReadAll(resp.Body) 149 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut)) 150 } 151 152 var out [][]byte 153 154 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) 155 if err != nil { 156 return nil, fmt.Errorf("failed to parse media type: %w", err) 157 } 158 if strings.HasPrefix(mediaType, "multipart/") { 159 mr := multipart.NewReader(resp.Body, params["boundary"]) 160 for { 161 p, err := mr.NextPart() 162 if err == io.EOF { 163 break 164 } 165 ctx := log.WithLogValues(ctx, "part", p.FileName()) 166 if err != nil { 167 return nil, fmt.Errorf("failed to get next part: %w", err) 168 } 169 mp4Bs := bytes.Buffer{} 170 audioReader := bytes.NewReader(audioSeg.Bytes()) 171 if ls.CLI.LivepeerDebug { 172 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-output-%s", ls.CLI.DataDir, sessionIDRen, seqNo, p.FileName()) 173 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 174 if err != nil { 175 return nil, fmt.Errorf("failed to write debug file: %w", err) 176 } 177 log.Log(ctx, "wrote debug file", "file", debugFile) 178 } 179 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs) 180 if err != nil { 181 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err) 182 } 183 bs := mp4Bs.Bytes() 184 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName()) 185 out = append(out, bs) 186 } 187 } 188 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds())) 189 return out, nil 190}