Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.14 189 lines 6.0 kB view raw
1package livepeer 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "math/rand" 10 "mime" 11 "mime/multipart" 12 "net/http" 13 "os" 14 "strings" 15 "time" 16 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/media" 21 "stream.place/streamplace/pkg/renditions" 22 "stream.place/streamplace/pkg/spmetrics" 23 "stream.place/streamplace/pkg/streamplace" 24) 25 26const SegmentsInFlight = 2 27 28type LivepeerSession struct { 29 SessionID string 30 Count int 31 GatewayURL string 32 Guard chan struct{} 33 CLI *config.CLI 34} 35 36// borrowed from catalyst-api 37func RandomTrailer(length int) string { 38 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" 39 40 res := make([]byte, length) 41 for i := 0; i < length; i++ { 42 res[i] = charset[rand.Intn(len(charset))] 43 } 44 return string(res) 45} 46 47func NewLivepeerSession(ctx context.Context, cli *config.CLI, did string, gatewayURL string) (*LivepeerSession, error) { 48 sessionID := fmt.Sprintf("%s-%s", did, RandomTrailer(8)) 49 sessionID = strings.ReplaceAll(sessionID, ":", "") 50 sessionID = strings.ReplaceAll(sessionID, ".", "") 51 return &LivepeerSession{ 52 SessionID: sessionID, 53 Count: 0, 54 GatewayURL: gatewayURL, 55 Guard: make(chan struct{}, SegmentsInFlight), 56 CLI: cli, 57 }, nil 58} 59 60func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) { 61 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway") 62 lpProfiles := rs.ToLivepeerProfiles() 63 sessionIDRen := fmt.Sprintf("%s-%dren", ls.SessionID, len(rs)) 64 transcodingConfiguration := map[string]any{ 65 "manifestID": sessionIDRen, 66 "profiles": lpProfiles, 67 } 68 bs, err := json.Marshal(transcodingConfiguration) 69 if err != nil { 70 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 71 } 72 tsSeg := bytes.Buffer{} 73 audioSeg := bytes.Buffer{} 74 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg) 75 if err != nil { 76 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err) 77 } 78 if tsSeg.Len() == 0 { 79 return nil, fmt.Errorf("no video in segment") 80 } 81 if audioSeg.Len() == 0 { 82 return nil, fmt.Errorf("no audio in segment") 83 } 84 ls.Guard <- struct{}{} 85 start := time.Now() 86 // check if context is done since we were waiting for the lock 87 if ctx.Err() != nil { 88 <-ls.Guard 89 return nil, ctx.Err() 90 } 91 ctx, cancel := context.WithTimeout(ctx, time.Minute*5) 92 defer cancel() 93 seqNo := ls.Count 94 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, sessionIDRen, seqNo) 95 ls.Count++ 96 97 dur := time.Duration(*spseg.Duration) 98 durationMs := int(dur.Milliseconds()) 99 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url) 100 101 vid := spseg.Video[0] 102 width := int(vid.Width) 103 height := int(vid.Height) 104 105 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes())) 106 if err != nil { 107 <-ls.Guard 108 return nil, fmt.Errorf("failed to create request: %w", err) 109 } 110 req.Header.Set("Accept", "multipart/mixed") 111 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs)) 112 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height)) 113 req.Header.Set("Livepeer-Transcode-Configuration", string(bs)) 114 115 if ls.CLI.LivepeerDebug { 116 debugDir := ls.CLI.DataFilePath([]string{"livepeer-debug"}) 117 err = os.MkdirAll(debugDir, 0755) 118 if err != nil { 119 return nil, fmt.Errorf("failed to create debug directory: %w", err) 120 } 121 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-input.ts", ls.CLI.DataDir, sessionIDRen, seqNo) 122 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 123 if err != nil { 124 return nil, fmt.Errorf("failed to write debug file: %w", err) 125 } 126 bs, err := json.MarshalIndent(req.Header, "", " ") 127 if err != nil { 128 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 129 } 130 configFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-config.json", ls.CLI.DataDir, sessionIDRen, seqNo) 131 err = os.WriteFile(configFile, bs, 0644) 132 if err != nil { 133 return nil, fmt.Errorf("failed to write debug file: %w", err) 134 } 135 log.Log(ctx, "wrote debug file", "file", debugFile) 136 } 137 138 resp, err := aqhttp.DoTrusted(ctx, req) 139 if err != nil { 140 <-ls.Guard 141 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err) 142 } 143 <-ls.Guard 144 defer resp.Body.Close() 145 146 if resp.StatusCode != http.StatusOK { 147 errOut, _ := io.ReadAll(resp.Body) 148 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut)) 149 } 150 151 var out [][]byte 152 153 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) 154 if err != nil { 155 return nil, fmt.Errorf("failed to parse media type: %w", err) 156 } 157 if strings.HasPrefix(mediaType, "multipart/") { 158 mr := multipart.NewReader(resp.Body, params["boundary"]) 159 for { 160 p, err := mr.NextPart() 161 if err == io.EOF { 162 break 163 } 164 ctx := log.WithLogValues(ctx, "part", p.FileName()) 165 if err != nil { 166 return nil, fmt.Errorf("failed to get next part: %w", err) 167 } 168 mp4Bs := bytes.Buffer{} 169 audioReader := bytes.NewReader(audioSeg.Bytes()) 170 if ls.CLI.LivepeerDebug { 171 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-output-%s", ls.CLI.DataDir, sessionIDRen, seqNo, p.FileName()) 172 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 173 if err != nil { 174 return nil, fmt.Errorf("failed to write debug file: %w", err) 175 } 176 log.Log(ctx, "wrote debug file", "file", debugFile) 177 } 178 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs) 179 if err != nil { 180 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err) 181 } 182 bs := mp4Bs.Bytes() 183 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName()) 184 out = append(out, bs) 185 } 186 } 187 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds())) 188 return out, nil 189}