Live video on the AT Protocol
at eli/rtmp-push 189 lines 6.0 kB view raw
1package livepeer 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "math/rand" 10 "mime" 11 "mime/multipart" 12 "net/http" 13 "os" 14 "strings" 15 "time" 16 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/media" 21 "stream.place/streamplace/pkg/renditions" 22 "stream.place/streamplace/pkg/spmetrics" 23 "stream.place/streamplace/pkg/streamplace" 24) 25 26const SegmentsInFlight = 2 27 28type LivepeerSession struct { 29 SessionID string 30 Count int 31 GatewayURL string 32 Guard chan struct{} 33 CLI *config.CLI 34} 35 36// borrowed from catalyst-api 37func RandomTrailer(length int) string { 38 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 39 40 res := make([]byte, length) 41 for i := 0; i < length; i++ { 42 res[i] = charset[rand.Intn(len(charset))] 43 } 44 return string(res) 45} 46 47func NewLivepeerSession(ctx context.Context, cli *config.CLI, did string, gatewayURL string) (*LivepeerSession, error) { 48 sessionID := fmt.Sprintf("%s-%s", did, RandomTrailer(8)) 49 sessionID = strings.ReplaceAll(sessionID, ":", "") 50 sessionID = strings.ReplaceAll(sessionID, ".", "") 51 return &LivepeerSession{ 52 SessionID: sessionID, 53 Count: 0, 54 GatewayURL: gatewayURL, 55 Guard: make(chan struct{}, SegmentsInFlight), 56 CLI: cli, 57 }, nil 58} 59 60func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) { 61 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway") 62 lpProfiles := rs.ToLivepeerProfiles() 63 sessionIDRen := fmt.Sprintf("%s-%dren", ls.SessionID, len(rs)) 64 transcodingConfiguration := map[string]any{ 65 "manifestID": sessionIDRen, 66 "profiles": lpProfiles, 67 } 68 bs, err := json.Marshal(transcodingConfiguration) 69 if err != nil { 70 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 71 } 72 tsSeg := bytes.Buffer{} 73 audioSeg := bytes.Buffer{} 74 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg) 75 if err != nil { 76 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err) 77 } 78 if tsSeg.Len() == 0 { 79 return nil, fmt.Errorf("no video in segment") 80 } 81 if audioSeg.Len() == 0 { 82 return nil, fmt.Errorf("no audio in segment") 83 } 84 ls.Guard <- struct{}{} 85 start := time.Now() 86 // check if context is done since we were waiting for the lock 87 if ctx.Err() != nil { 88 <-ls.Guard 89 return nil, ctx.Err() 90 } 91 ctx, cancel := context.WithTimeout(ctx, time.Minute*5) 92 defer cancel() 93 seqNo := ls.Count 94 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, sessionIDRen, seqNo) 95 ls.Count++ 96 97 dur := time.Duration(*spseg.Duration) 98 durationMs := int(dur.Milliseconds()) 99 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url) 100 101 vid := spseg.Video[0] 102 width := int(vid.Width) 103 height := int(vid.Height) 104 105 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes())) 106 if err != nil { 107 <-ls.Guard 108 return nil, fmt.Errorf("failed to create request: %w", err) 109 } 110 req.Header.Set("Accept", "multipart/mixed") 111 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs)) 112 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height)) 113 req.Header.Set("Livepeer-Transcode-Configuration", string(bs)) 114 115 if ls.CLI.LivepeerDebug { 116 debugDir := ls.CLI.DataFilePath([]string{"livepeer-debug"}) 117 err = os.MkdirAll(debugDir, 0755) 118 if err != nil { 119 return nil, fmt.Errorf("failed to create debug directory: %w", err) 120 } 121 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-input.ts", ls.CLI.DataDir, sessionIDRen, seqNo) 122 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 123 if err != nil { 124 return nil, fmt.Errorf("failed to write debug file: %w", err) 125 } 126 bs, err := json.MarshalIndent(req.Header, "", " ") 127 if err != nil { 128 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 129 } 130 configFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-config.json", ls.CLI.DataDir, sessionIDRen, seqNo) 131 err = os.WriteFile(configFile, bs, 0644) 132 if err != nil { 133 return nil, fmt.Errorf("failed to write debug file: %w", err) 134 } 135 log.Log(ctx, "wrote debug file", "file", debugFile) 136 } 137 138 resp, err := aqhttp.DoTrusted(ctx, req) 139 if err != nil { 140 <-ls.Guard 141 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err) 142 } 143 <-ls.Guard 144 defer resp.Body.Close() 145 146 if resp.StatusCode != http.StatusOK { 147 errOut, _ := io.ReadAll(resp.Body) 148 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut)) 149 } 150 151 var out [][]byte 152 153 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) 154 if err != nil { 155 return nil, fmt.Errorf("failed to parse media type: %w", err) 156 } 157 if strings.HasPrefix(mediaType, "multipart/") { 158 mr := multipart.NewReader(resp.Body, params["boundary"]) 159 for { 160 p, err := mr.NextPart() 161 if err == io.EOF { 162 break 163 } 164 ctx := log.WithLogValues(ctx, "part", p.FileName()) 165 if err != nil { 166 return nil, fmt.Errorf("failed to get next part: %w", err) 167 } 168 mp4Bs := bytes.Buffer{} 169 audioReader := bytes.NewReader(audioSeg.Bytes()) 170 if ls.CLI.LivepeerDebug { 171 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-output-%s", ls.CLI.DataDir, sessionIDRen, seqNo, p.FileName()) 172 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 173 if err != nil { 174 return nil, fmt.Errorf("failed to write debug file: %w", err) 175 } 176 log.Log(ctx, "wrote debug file", "file", debugFile) 177 } 178 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs) 179 if err != nil { 180 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err) 181 } 182 bs := mp4Bs.Bytes() 183 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName()) 184 out = append(out, bs) 185 } 186 } 187 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds())) 188 return out, nil 189}