Live video on the AT Protocol
1package livepeer
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "math/rand"
10 "mime"
11 "mime/multipart"
12 "net/http"
13 "strings"
14 "time"
15
16 "golang.org/x/net/context/ctxhttp"
17 "stream.place/streamplace/pkg/aqhttp"
18 "stream.place/streamplace/pkg/log"
19 "stream.place/streamplace/pkg/media"
20 "stream.place/streamplace/pkg/renditions"
21 "stream.place/streamplace/pkg/spmetrics"
22 "stream.place/streamplace/pkg/streamplace"
23)
24
25const SegmentsInFlight = 2
26
27type LivepeerSession struct {
28 SessionID string
29 Count int
30 GatewayURL string
31 Guard chan struct{}
32}
33
34// borrowed from catalyst-api
35func RandomTrailer(length int) string {
36 const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
37
38 res := make([]byte, length)
39 for i := 0; i < length; i++ {
40 res[i] = charset[rand.Intn(len(charset))]
41 }
42 return string(res)
43}
44
45func NewLivepeerSession(ctx context.Context, did string, gatewayURL string) (*LivepeerSession, error) {
46 sessionID := RandomTrailer(8)
47 return &LivepeerSession{
48 SessionID: fmt.Sprintf("%s-%s", did, sessionID),
49 Count: 0,
50 GatewayURL: gatewayURL,
51 Guard: make(chan struct{}, SegmentsInFlight),
52 }, nil
53}
54
55func (ls *LivepeerSession) PostSegmentToGateway(ctx context.Context, buf []byte, spseg *streamplace.Segment, rs renditions.Renditions) ([][]byte, error) {
56 ctx = log.WithLogValues(ctx, "func", "PostSegmentToGateway")
57 lpProfiles := rs.ToLivepeerProfiles()
58 transcodingConfiguration := map[string]any{
59 "manifestID": ls.SessionID,
60 "profiles": lpProfiles,
61 }
62 bs, err := json.Marshal(transcodingConfiguration)
63 if err != nil {
64 return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err)
65 }
66 tsSeg := bytes.Buffer{}
67 audioSeg := bytes.Buffer{}
68 err = media.MP4ToMPEGTSVideoMP4Audio(ctx, bytes.NewReader(buf), &tsSeg, &audioSeg)
69 if err != nil {
70 return nil, fmt.Errorf("failed to convert mp4 to ts video/mp4 audio: %w", err)
71 }
72 if tsSeg.Len() == 0 {
73 return nil, fmt.Errorf("no video in segment")
74 }
75 if audioSeg.Len() == 0 {
76 return nil, fmt.Errorf("no audio in segment")
77 }
78 ls.Guard <- struct{}{}
79 start := time.Now()
80 // check if context is done since we were waiting for the lock
81 if ctx.Err() != nil {
82 <-ls.Guard
83 return nil, ctx.Err()
84 }
85 ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
86 defer cancel()
87 seqNo := ls.Count
88 url := fmt.Sprintf("%s/live/%s/%d.ts", ls.GatewayURL, ls.SessionID, seqNo)
89 ls.Count++
90
91 dur := time.Duration(*spseg.Duration)
92 durationMs := int(dur.Milliseconds())
93 log.Debug(ctx, "posting segment to livepeer gateway", "duration_ms", durationMs, "url", url)
94
95 vid := spseg.Video[0]
96 width := int(vid.Width)
97 height := int(vid.Height)
98
99 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(tsSeg.Bytes()))
100 if err != nil {
101 <-ls.Guard
102 return nil, fmt.Errorf("failed to create request: %w", err)
103 }
104 req.Header.Set("Accept", "multipart/mixed")
105 req.Header.Set("Content-Duration", fmt.Sprintf("%d", durationMs))
106 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height))
107 req.Header.Set("Livepeer-Transcode-Configuration", string(bs))
108
109 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req)
110 if err != nil {
111 <-ls.Guard
112 return nil, fmt.Errorf("failed to send segment to gateway (config %s): %w", string(bs), err)
113 }
114 <-ls.Guard
115 defer resp.Body.Close()
116
117 if resp.StatusCode != http.StatusOK {
118 errOut, _ := io.ReadAll(resp.Body)
119 return nil, fmt.Errorf("gateway returned non-OK status (config %s): %d, %s", string(bs), resp.StatusCode, string(errOut))
120 }
121
122 var out [][]byte
123
124 mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
125 if err != nil {
126 return nil, fmt.Errorf("failed to parse media type: %w", err)
127 }
128 if strings.HasPrefix(mediaType, "multipart/") {
129 mr := multipart.NewReader(resp.Body, params["boundary"])
130 for {
131 p, err := mr.NextPart()
132 if err == io.EOF {
133 break
134 }
135 ctx := log.WithLogValues(ctx, "part", p.FileName())
136 if err != nil {
137 return nil, fmt.Errorf("failed to get next part: %w", err)
138 }
139 mp4Bs := bytes.Buffer{}
140 audioReader := bytes.NewReader(audioSeg.Bytes())
141 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs)
142 if err != nil {
143 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err)
144 }
145 bs := mp4Bs.Bytes()
146 log.Debug(ctx, "got part back from livepeer gateway", "length", len(bs), "name", p.FileName())
147 out = append(out, bs)
148 }
149 }
150 spmetrics.TranscodeDuration.WithLabelValues(spseg.Creator).Observe(float64(time.Since(start).Milliseconds()))
151 return out, nil
152}