Live video on the AT Protocol
1package livepeer
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "math/rand"
10 "mime"
11 "mime/multipart"
12 "net/http"
13 "os"
14 "strings"
15 "time"
16
17 "golang.org/x/net/context/ctxhttp"
18 "stream.place/streamplace/pkg/aqhttp"
19 "stream.place/streamplace/pkg/config"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/renditions"
23 "stream.place/streamplace/pkg/spmetrics"
24 "stream.place/streamplace/pkg/streamplace"
25)
26
27const SegmentsInFlight = 2
28
29type LivepeerSession struct {
30 SessionID string
31 Count int
32 GatewayURL string
33 Guard chan struct{}
34 CLI *config.CLI
35}
36
37// borrowed from catalyst-api
38func RandomTrailer(length int) string {
39 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
40
41 res := make([]byte, length)
42 for i := 0; i < length; i++ {
43 res[i] = charset[rand.Intn(len(charset))]
44 }
45 return string(res)
46}
47
48func NewLivepeerSession(ctx context.Context, cli *config.CLI, did string, gatewayURL string) (*LivepeerSession, error) {
49 sessionID := fmt.Sprintf("%s-%s", did, RandomTrailer(8))
50 sessionID = strings.ReplaceAll(sessionID, ":", "")
51 sessionID = strings.ReplaceAll(sessionID, ".", "")
52 return &LivepeerSession{
53 SessionID: sessionID,
54 Count: 0,
55 GatewayURL: gatewayURL,
56 Guard: make(chan struct{}, SegmentsInFlight),
57 CLI: cli,
58 }, nil
59}
60
61func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) {
62 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway")
63 lpProfiles := rs.ToLivepeerProfiles()
64 sessionIDRen := fmt.Sprintf("%s-%dren", ls.SessionID, len(rs))
65 transcodingConfiguration := map[string]any{
66 "manifestID": sessionIDRen,
67 "profiles": lpProfiles,
68 }
69 bs, err := json.Marshal(transcodingConfiguration)
70 if err != nil {
71 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err)
72 }
73 tsSeg := bytes.Buffer{}
74 audioSeg := bytes.Buffer{}
75 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg)
76 if err != nil {
77 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err)
78 }
79 if tsSeg.Len() == 0 {
80 return nil, fmt.Errorf("no video in segment")
81 }
82 if audioSeg.Len() == 0 {
83 return nil, fmt.Errorf("no audio in segment")
84 }
85 ls.Guard <- struct{}{}
86 start := time.Now()
87 // check if context is done since we were waiting for the lock
88 if ctx.Err() != nil {
89 <-ls.Guard
90 return nil, ctx.Err()
91 }
92 ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
93 defer cancel()
94 seqNo := ls.Count
95 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, sessionIDRen, seqNo)
96 ls.Count++
97
98 dur := time.Duration(*spseg.Duration)
99 durationMs := int(dur.Milliseconds())
100 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url)
101
102 vid := spseg.Video[0]
103 width := int(vid.Width)
104 height := int(vid.Height)
105
106 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes()))
107 if err != nil {
108 <-ls.Guard
109 return nil, fmt.Errorf("failed to create request: %w", err)
110 }
111 req.Header.Set("Accept", "multipart/mixed")
112 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs))
113 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height))
114 req.Header.Set("Livepeer-Transcode-Configuration", string(bs))
115
116 if ls.CLI.LivepeerDebug {
117 debugDir := ls.CLI.DataFilePath([]string{"livepeer-debug"})
118 err = os.MkdirAll(debugDir, 0755)
119 if err != nil {
120 return nil, fmt.Errorf("failed to create debug directory: %w", err)
121 }
122 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-input.ts", ls.CLI.DataDir, sessionIDRen, seqNo)
123 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644)
124 if err != nil {
125 return nil, fmt.Errorf("failed to write debug file: %w", err)
126 }
127 bs, err := json.MarshalIndent(req.Header, "", " ")
128 if err != nil {
129 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err)
130 }
131 configFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-config.json", ls.CLI.DataDir, sessionIDRen, seqNo)
132 err = os.WriteFile(configFile, bs, 0644)
133 if err != nil {
134 return nil, fmt.Errorf("failed to write debug file: %w", err)
135 }
136 log.Log(ctx, "wrote debug file", "file", debugFile)
137 }
138
139 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req)
140 if err != nil {
141 <-ls.Guard
142 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err)
143 }
144 <-ls.Guard
145 defer resp.Body.Close()
146
147 if resp.StatusCode != http.StatusOK {
148 errOut, _ := io.ReadAll(resp.Body)
149 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut))
150 }
151
152 var out [][]byte
153
154 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
155 if err != nil {
156 return nil, fmt.Errorf("failed to parse media type: %w", err)
157 }
158 if strings.HasPrefix(mediaType, "multipart/") {
159 mr := multipart.NewReader(resp.Body, params["boundary"])
160 for {
161 p, err := mr.NextPart()
162 if err == io.EOF {
163 break
164 }
165 ctx := log.WithLogValues(ctx, "part", p.FileName())
166 if err != nil {
167 return nil, fmt.Errorf("failed to get next part: %w", err)
168 }
169 mp4Bs := bytes.Buffer{}
170 audioReader := bytes.NewReader(audioSeg.Bytes())
171 if ls.CLI.LivepeerDebug {
172 debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-output-%s", ls.CLI.DataDir, sessionIDRen, seqNo, p.FileName())
173 err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644)
174 if err != nil {
175 return nil, fmt.Errorf("failed to write debug file: %w", err)
176 }
177 log.Log(ctx, "wrote debug file", "file", debugFile)
178 }
179 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs)
180 if err != nil {
181 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err)
182 }
183 bs := mp4Bs.Bytes()
184 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName())
185 out = append(out, bs)
186 }
187 }
188 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds()))
189 return out, nil
190}