Live video on the AT Protocol
1package livepeer
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "math/rand"
10 "mime"
11 "mime/multipart"
12 "net/http"
13 "os"
14 "strings"
15 "time"
16
17 "stream.place/streamplace/pkg/aqhttp"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/media"
21 "stream.place/streamplace/pkg/renditions"
22 "stream.place/streamplace/pkg/spmetrics"
23 "stream.place/streamplace/pkg/streamplace"
24)
25
26const SegmentsInFlight = 2
27
28type LivepeerSession struct {
29 SessionID string
30 Count int
31 GatewayURL string
32 Guard chan struct{}
33 CLI *config.CLI
34}
35
36// borrowed from catalyst-api
37func RandomTrailer(length int) string {
38 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
39
40 res := make([]byte, length)
41 for i := 0; i < length; i++ {
42 res[i] = charset[rand.Intn(len(charset))]
43 }
44 return string(res)
45}
46
47func NewLivepeerSession(ctx context.Context, cli *config.CLI, did string, gatewayURL string) (*LivepeerSession, error) {
48 sessionID := fmt.Sprintf("%s-%s", did, RandomTrailer(8))
49 sessionID = strings.ReplaceAll(sessionID, ":", "")
50 sessionID = strings.ReplaceAll(sessionID, ".", "")
51 return &LivepeerSession{
52 SessionID: sessionID,
53 Count: 0,
54 GatewayURL: gatewayURL,
55 Guard: make(chan struct{}, SegmentsInFlight),
56 CLI: cli,
57 }, nil
58}
59
60func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) {
61 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway")
62 lpProfiles := rs.ToLivepeerProfiles()
63 sessionIDRen := fmt.Sprintf("%s-%dren", ls.SessionID, len(rs))
64 transcodingConfiguration := map[string]any{
65 "manifestID": sessionIDRen,
66 "profiles": lpProfiles,
67 }
68 bs, err := json.Marshal(transcodingConfiguration)
69 if err != nil {
70 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err)
71 }
72 tsSeg := bytes.Buffer{}
73 audioSeg := bytes.Buffer{}
74 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg)
75 if err != nil {
76 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err)
77 }
78 if tsSeg.Len() == 0 {
79 return nil, fmt.Errorf("no video in segment")
80 }
81 if audioSeg.Len() == 0 {
82 return nil, fmt.Errorf("no audio in segment")
83 }
84 ls.Guard <- struct{}{}
85 start := time.Now()
86 // check if context is done since we were waiting for the lock
87 if ctx.Err() != nil {
88 <-ls.Guard
89 return nil, ctx.Err()
90 }
91 ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
92 defer cancel()
93 seqNo := ls.Count
94 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, sessionIDRen, seqNo)
95 ls.Count++
96
97 dur := time.Duration(*spseg.Duration)
98 durationMs := int(dur.Milliseconds())
99 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url)
100
101 vid := spseg.Video[0]
102 width := int(vid.Width)
103 height := int(vid.Height)
104
105 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes()))
106 if err != nil {
107 <-ls.Guard
108 return nil, fmt.Errorf("failed to create request: %w", err)
109 }
110 req.Header.Set("Accept", "multipart/mixed")
111 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs))
112 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height))
113 req.Header.Set("Livepeer-Transcode-Configuration", string(bs))
114
115 if ls.CLI.LivepeerDebug {
116 debugDir := ls.CLI.DataFilePath([]string{"livepeer-debug"})
117 err = os.MkdirAll(debugDir, 0755)
118 if err != nil {
119 return nil, fmt.Errorf("failed to create debug directory: %w", err)
120 }
121 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-input.ts", ls.CLI.DataDir, sessionIDRen, seqNo)
122 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644)
123 if err != nil {
124 return nil, fmt.Errorf("failed to write debug file: %w", err)
125 }
126 bs, err := json.MarshalIndent(req.Header, "", " ")
127 if err != nil {
128 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err)
129 }
130 configFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-config.json", ls.CLI.DataDir, sessionIDRen, seqNo)
131 err = os.WriteFile(configFile, bs, 0644)
132 if err != nil {
133 return nil, fmt.Errorf("failed to write debug file: %w", err)
134 }
135 log.Log(ctx, "wrote debug file", "file", debugFile)
136 }
137
138 resp, err := aqhttp.DoTrusted(ctx, req)
139 if err != nil {
140 <-ls.Guard
141 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err)
142 }
143 <-ls.Guard
144 defer resp.Body.Close()
145
146 if resp.StatusCode != http.StatusOK {
147 errOut, _ := io.ReadAll(resp.Body)
148 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut))
149 }
150
151 var out [][]byte
152
153 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
154 if err != nil {
155 return nil, fmt.Errorf("failed to parse media type: %w", err)
156 }
157 if strings.HasPrefix(mediaType, "multipart/") {
158 mr := multipart.NewReader(resp.Body, params["boundary"])
159 for {
160 p, err := mr.NextPart()
161 if err == io.EOF {
162 break
163 }
164 ctx := log.WithLogValues(ctx, "part", p.FileName())
165 if err != nil {
166 return nil, fmt.Errorf("failed to get next part: %w", err)
167 }
168 mp4Bs := bytes.Buffer{}
169 audioReader := bytes.NewReader(audioSeg.Bytes())
170 if ls.CLI.LivepeerDebug {
171 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-output-%s", ls.CLI.DataDir, sessionIDRen, seqNo, p.FileName())
172 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644)
173 if err != nil {
174 return nil, fmt.Errorf("failed to write debug file: %w", err)
175 }
176 log.Log(ctx, "wrote debug file", "file", debugFile)
177 }
178 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs)
179 if err != nil {
180 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err)
181 }
182 bs := mp4Bs.Bytes()
183 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName())
184 out = append(out, bs)
185 }
186 }
187 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds()))
188 return out, nil
189}