Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "time"
8
9 "golang.org/x/sync/errgroup"
10 "stream.place/streamplace/pkg/aqtime"
11 "stream.place/streamplace/pkg/bus"
12 "stream.place/streamplace/pkg/config"
13 "stream.place/streamplace/pkg/livepeer"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/media"
16 "stream.place/streamplace/pkg/media/segchanman"
17 "stream.place/streamplace/pkg/model"
18 "stream.place/streamplace/pkg/renditions"
19 "stream.place/streamplace/pkg/spmetrics"
20 "stream.place/streamplace/pkg/streamplace"
21 "stream.place/streamplace/pkg/thumbnail"
22)
23
24type StreamSession struct {
25 mm *media.MediaManager
26 mod model.Model
27 cli *config.CLI
28 bus *bus.Bus
29 hls *media.M3U8
30 lp *livepeer.LivepeerSession
31 repoDID string
32 segmentChan chan struct{}
33}
34
35func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error {
36
37 sid := livepeer.RandomTrailer(8)
38 ctx = log.WithLogValues(ctx, "sid", sid)
39 ctx, cancel := context.WithCancel(ctx)
40 log.Log(ctx, "starting stream session")
41 defer cancel()
42 spseg, err := not.Segment.ToStreamplaceSegment()
43 if err != nil {
44 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
45 }
46 var allRenditions renditions.Renditions
47
48 if ss.cli.LivepeerGatewayURL != "" {
49 allRenditions, err = renditions.GenerateRenditions(spseg)
50 } else {
51 allRenditions = []renditions.Rendition{}
52 }
53 if err != nil {
54 return err
55 }
56 if spseg.Duration == nil {
57 return fmt.Errorf("segment duration is required to calculate bitrate")
58 }
59 dur := time.Duration(*spseg.Duration)
60 byteLen := len(not.Data)
61 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
62 sourceRendition := renditions.Rendition{
63 Name: "source",
64 Bitrate: bitrate,
65 Width: spseg.Video[0].Width,
66 Height: spseg.Video[0].Height,
67 }
68 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
69 ss.hls = media.NewM3U8(allRenditions)
70
71 g, ctx := errgroup.WithContext(ctx)
72
73 // for _, r := range allRenditions {
74 // g.Go(func() error {
75 // for {
76 // if ctx.Err() != nil {
77 // return nil
78 // }
79 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
80 // if ctx.Err() != nil {
81 // return nil
82 // }
83 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
84 // time.Sleep(time.Second * 5)
85 // }
86 // })
87 // }
88
89 for {
90 select {
91 case <-ss.segmentChan:
92 // reset timer
93 case <-ctx.Done():
94 return g.Wait()
95 // case <-time.After(time.Minute * 1):
96 case <-time.After(time.Second * 60):
97 log.Log(ctx, "no new segments for 1 minute, shutting down")
98 cancel()
99 }
100 }
101}
102
103func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error {
104 if ctx.Err() != nil {
105 return nil
106 }
107 ss.segmentChan <- struct{}{}
108 aqt := aqtime.FromTime(not.Segment.StartTime)
109 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString())
110 err := ss.mod.CreateSegment(not.Segment)
111 if err != nil {
112 return fmt.Errorf("could not add segment to database: %w", err)
113 }
114 spseg, err := not.Segment.ToStreamplaceSegment()
115 if err != nil {
116 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
117 }
118
119 ss.bus.Publish(spseg.Creator, spseg)
120 go ss.TryAddToHLS(ctx, spseg, "source", not.Data)
121
122 if ss.cli.Thumbnail {
123 go func() {
124 err := ss.Thumbnail(ctx, spseg.Creator, not)
125 if err != nil {
126 log.Error(ctx, "could not create thumbnail", "error", err)
127 }
128 }()
129 }
130
131 if ss.cli.LivepeerGatewayURL != "" {
132 go func() {
133 start := time.Now()
134 err := ss.Transcode(ctx, spseg, not.Data)
135 took := time.Since(start)
136 if err != nil {
137 log.Error(ctx, "could not transcode", "error", err, "took", took)
138 } else {
139 log.Log(ctx, "transcoded segment", "took", took)
140 }
141 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(time.Since(start).Milliseconds()))
142 }()
143 }
144
145 return nil
146}
147
148func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
149 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
150 locked := lock.TryLock()
151 if !locked {
152 // we're already generating a thumbnail for this user, skip
153 return nil
154 }
155 defer lock.Unlock()
156 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
157 if err != nil {
158 return err
159 }
160 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
161 // we have a thumbnail <60sec old, skip generating a new one
162 return nil
163 }
164 r := bytes.NewReader(not.Data)
165 aqt := aqtime.FromTime(not.Segment.StartTime)
166 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "png")
167 if err != nil {
168 return err
169 }
170 defer fd.Close()
171 err = media.Thumbnail(ctx, r, fd)
172 if err != nil {
173 return err
174 }
175 thumb := &model.Thumbnail{
176 Format: "png",
177 SegmentID: not.Segment.ID,
178 }
179 err = ss.mod.CreateThumbnail(thumb)
180 if err != nil {
181 return err
182 }
183 return nil
184}
185
186func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
187 rs, err := renditions.GenerateRenditions(spseg)
188 if ss.lp == nil {
189 var err error
190 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL)
191 if err != nil {
192 return err
193 }
194
195 }
196 spmetrics.TranscodeAttemptsTotal.Inc()
197 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
198 if err != nil {
199 spmetrics.TranscodeErrorsTotal.Inc()
200 return err
201 }
202 if len(rs) != len(segs) {
203 spmetrics.TranscodeErrorsTotal.Inc()
204 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
205 }
206 spmetrics.TranscodeSuccessesTotal.Inc()
207 aqt, err := aqtime.FromString(spseg.StartTime)
208 if err != nil {
209 return err
210 }
211 for i, seg := range segs {
212 log.Debug(ctx, "publishing segment", "rendition", rs[i])
213 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
214 if err != nil {
215 return err
216 }
217 defer fd.Close()
218 fd.Write(seg)
219 go ss.TryAddToHLS(ctx, spseg, rs[i].Name, seg)
220 go ss.mm.PublishSegment(ctx, spseg.Creator, rs[i].Name, &segchanman.Seg{
221 Filepath: fd.Name(),
222 Data: seg,
223 })
224 }
225 return nil
226}
227
228func (ss *StreamSession) TryAddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) {
229 ctx = log.WithLogValues(ctx, "rendition", rendition)
230 err := ss.AddToHLS(ctx, spseg, rendition, data)
231 if err != nil {
232 log.Error(ctx, "could not add to hls", "error", err)
233 }
234}
235
236func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
237 buf := bytes.Buffer{}
238 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
239 if err != nil {
240 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
241 }
242 // newSeg := &streamplace.Segment{
243 // LexiconTypeID: "place.stream.segment",
244 // Id: spseg.Id,
245 // Creator: spseg.Creator,
246 // StartTime: spseg.StartTime,
247 // Duration: &dur,
248 // Audio: spseg.Audio,
249 // Video: spseg.Video,
250 // SigningKey: spseg.SigningKey,
251 // }
252 aqt, err := aqtime.FromString(spseg.StartTime)
253 if err != nil {
254 return fmt.Errorf("failed to parse segment start time: %w", err)
255 }
256 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
257 ss.hls.GetRendition(rendition).NewSegment(&media.Segment{
258 Buf: &buf,
259 Duration: time.Duration(dur),
260 Time: aqt.Time(),
261 })
262 return nil
263}