Live video on the AT Protocol
1package livepeer
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "math/rand"
10 "mime"
11 "mime/multipart"
12 "net/http"
13 "strings"
14 "time"
15
16 "golang.org/x/net/context/ctxhttp"
17 "stream.place/streamplace/pkg/aqhttp"
18 "stream.place/streamplace/pkg/log"
19 "stream.place/streamplace/pkg/media"
20 "stream.place/streamplace/pkg/renditions"
21 "stream.place/streamplace/pkg/spmetrics"
22 "stream.place/streamplace/pkg/streamplace"
23)
24
25const SegmentsInFlight = 2
26
27type LivepeerSession struct {
28 SessionID string
29 Count int
30 GatewayURL string
31 Guard chan struct{}
32}
33
34// borrowed from catalyst-api
35func RandomTrailer(length int) string {
36 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
37
38 res := make([]byte, length)
39 for i := 0; i < length; i++ {
40 res[i] = charset[rand.Intn(len(charset))]
41 }
42 return string(res)
43}
44
45func NewLivepeerSession(ctx context.Context, did string, gatewayURL string) (*LivepeerSession, error) {
46 sessionID := RandomTrailer(8)
47 return &LivepeerSession{
48 SessionID: strings.ReplaceAll(fmt.Sprintf("%s-%s", did, sessionID), ":", ""),
49 Count: 0,
50 GatewayURL: gatewayURL,
51 Guard: make(chan struct{}, SegmentsInFlight),
52 }, nil
53}
54
55func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) {
56 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway")
57 lpProfiles := rs.ToLivepeerProfiles()
58 sessionIDRen := fmt.Sprintf("%s-%dren", ls.SessionID, len(rs))
59 transcodingConfiguration := map[string]any{
60 "manifestID": sessionIDRen,
61 "profiles": lpProfiles,
62 }
63 bs, err := json.Marshal(transcodingConfiguration)
64 if err != nil {
65 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err)
66 }
67 tsSeg := bytes.Buffer{}
68 audioSeg := bytes.Buffer{}
69 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg)
70 if err != nil {
71 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err)
72 }
73 if tsSeg.Len() == 0 {
74 return nil, fmt.Errorf("no video in segment")
75 }
76 if audioSeg.Len() == 0 {
77 return nil, fmt.Errorf("no audio in segment")
78 }
79 ls.Guard <- struct{}{}
80 start := time.Now()
81 // check if context is done since we were waiting for the lock
82 if ctx.Err() != nil {
83 <-ls.Guard
84 return nil, ctx.Err()
85 }
86 ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
87 defer cancel()
88 seqNo := ls.Count
89 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, sessionIDRen, seqNo)
90 ls.Count++
91
92 dur := time.Duration(*spseg.Duration)
93 durationMs := int(dur.Milliseconds())
94 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url)
95
96 vid := spseg.Video[0]
97 width := int(vid.Width)
98 height := int(vid.Height)
99
100 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes()))
101 if err != nil {
102 <-ls.Guard
103 return nil, fmt.Errorf("failed to create request: %w", err)
104 }
105 req.Header.Set("Accept", "multipart/mixed")
106 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs))
107 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height))
108 req.Header.Set("Livepeer-Transcode-Configuration", string(bs))
109
110 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req)
111 if err != nil {
112 <-ls.Guard
113 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err)
114 }
115 <-ls.Guard
116 defer resp.Body.Close()
117
118 if resp.StatusCode != http.StatusOK {
119 errOut, _ := io.ReadAll(resp.Body)
120 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut))
121 }
122
123 var out [][]byte
124
125 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
126 if err != nil {
127 return nil, fmt.Errorf("failed to parse media type: %w", err)
128 }
129 if strings.HasPrefix(mediaType, "multipart/") {
130 mr := multipart.NewReader(resp.Body, params["boundary"])
131 for {
132 p, err := mr.NextPart()
133 if err == io.EOF {
134 break
135 }
136 ctx := log.WithLogValues(ctx, "part", p.FileName())
137 if err != nil {
138 return nil, fmt.Errorf("failed to get next part: %w", err)
139 }
140 mp4Bs := bytes.Buffer{}
141 audioReader := bytes.NewReader(audioSeg.Bytes())
142 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs)
143 if err != nil {
144 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err)
145 }
146 bs := mp4Bs.Bytes()
147 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName())
148 out = append(out, bs)
149 }
150 }
151 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds()))
152 return out, nil
153}