Live video on the AT Protocol
at eli/node-22 263 lines 7.5 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "time" 8 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/aqtime" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/livepeer" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/media" 16 "stream.place/streamplace/pkg/media/segchanman" 17 "stream.place/streamplace/pkg/model" 18 "stream.place/streamplace/pkg/renditions" 19 "stream.place/streamplace/pkg/spmetrics" 20 "stream.place/streamplace/pkg/streamplace" 21 "stream.place/streamplace/pkg/thumbnail" 22) 23 24type StreamSession struct { 25 mm *media.MediaManager 26 mod model.Model 27 cli *config.CLI 28 bus *bus.Bus 29 hls *media.M3U8 30 lp *livepeer.LivepeerSession 31 repoDID string 32 segmentChan chan struct{} 33} 34 35func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { 36 37 sid := livepeer.RandomTrailer(8) 38 ctx = log.WithLogValues(ctx, "sid", sid) 39 ctx, cancel := context.WithCancel(ctx) 40 log.Log(ctx, "starting stream session") 41 defer cancel() 42 spseg, err := not.Segment.ToStreamplaceSegment() 43 if err != nil { 44 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 45 } 46 var allRenditions renditions.Renditions 47 48 if ss.cli.LivepeerGatewayURL != "" { 49 allRenditions, err = renditions.GenerateRenditions(spseg) 50 } else { 51 allRenditions = []renditions.Rendition{} 52 } 53 if err != nil { 54 return err 55 } 56 if spseg.Duration == nil { 57 return fmt.Errorf("segment duration is required to calculate bitrate") 58 } 59 dur := time.Duration(*spseg.Duration) 60 byteLen := len(not.Data) 61 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 62 sourceRendition := renditions.Rendition{ 63 Name: "source", 64 Bitrate: bitrate, 65 Width: spseg.Video[0].Width, 66 Height: spseg.Video[0].Height, 67 } 68 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 69 ss.hls = media.NewM3U8(allRenditions) 70 71 g, ctx := errgroup.WithContext(ctx) 72 73 // for _, r := range allRenditions { 74 // g.Go(func() error { 75 // for { 76 // if ctx.Err() != nil { 77 // return nil 78 // } 79 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 80 // if ctx.Err() != nil { 81 // return nil 82 // } 83 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 84 // time.Sleep(time.Second * 5) 85 // } 86 // }) 87 // } 88 89 for { 90 select { 91 case <-ss.segmentChan: 92 // reset timer 93 case <-ctx.Done(): 94 return g.Wait() 95 // case <-time.After(time.Minute * 1): 96 case <-time.After(time.Second * 60): 97 log.Log(ctx, "no new segments for 1 minute, shutting down") 98 cancel() 99 } 100 } 101} 102 103func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error { 104 if ctx.Err() != nil { 105 return nil 106 } 107 ss.segmentChan <- struct{}{} 108 aqt := aqtime.FromTime(not.Segment.StartTime) 109 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 110 err := ss.mod.CreateSegment(not.Segment) 111 if err != nil { 112 return fmt.Errorf("could not add segment to database: %w", err) 113 } 114 spseg, err := not.Segment.ToStreamplaceSegment() 115 if err != nil { 116 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 117 } 118 119 ss.bus.Publish(spseg.Creator, spseg) 120 go ss.TryAddToHLS(ctx, spseg, "source", not.Data) 121 122 if ss.cli.Thumbnail { 123 go func() { 124 err := ss.Thumbnail(ctx, spseg.Creator, not) 125 if err != nil { 126 log.Error(ctx, "could not create thumbnail", "error", err) 127 } 128 }() 129 } 130 131 if ss.cli.LivepeerGatewayURL != "" { 132 go func() { 133 start := time.Now() 134 err := ss.Transcode(ctx, spseg, not.Data) 135 took := time.Since(start) 136 if err != nil { 137 log.Error(ctx, "could not transcode", "error", err, "took", took) 138 } else { 139 log.Log(ctx, "transcoded segment", "took", took) 140 } 141 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(time.Since(start).Milliseconds())) 142 }() 143 } 144 145 return nil 146} 147 148func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 149 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 150 locked := lock.TryLock() 151 if !locked { 152 // we're already generating a thumbnail for this user, skip 153 return nil 154 } 155 defer lock.Unlock() 156 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 157 if err != nil { 158 return err 159 } 160 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 161 // we have a thumbnail <60sec old, skip generating a new one 162 return nil 163 } 164 r := bytes.NewReader(not.Data) 165 aqt := aqtime.FromTime(not.Segment.StartTime) 166 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "png") 167 if err != nil { 168 return err 169 } 170 defer fd.Close() 171 err = media.Thumbnail(ctx, r, fd) 172 if err != nil { 173 return err 174 } 175 thumb := &model.Thumbnail{ 176 Format: "png", 177 SegmentID: not.Segment.ID, 178 } 179 err = ss.mod.CreateThumbnail(thumb) 180 if err != nil { 181 return err 182 } 183 return nil 184} 185 186func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 187 rs, err := renditions.GenerateRenditions(spseg) 188 if ss.lp == nil { 189 var err error 190 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL) 191 if err != nil { 192 return err 193 } 194 195 } 196 spmetrics.TranscodeAttemptsTotal.Inc() 197 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 198 if err != nil { 199 spmetrics.TranscodeErrorsTotal.Inc() 200 return err 201 } 202 if len(rs) != len(segs) { 203 spmetrics.TranscodeErrorsTotal.Inc() 204 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 205 } 206 spmetrics.TranscodeSuccessesTotal.Inc() 207 aqt, err := aqtime.FromString(spseg.StartTime) 208 if err != nil { 209 return err 210 } 211 for i, seg := range segs { 212 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 213 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 214 if err != nil { 215 return err 216 } 217 defer fd.Close() 218 fd.Write(seg) 219 go ss.TryAddToHLS(ctx, spseg, rs[i].Name, seg) 220 go ss.mm.PublishSegment(ctx, spseg.Creator, rs[i].Name, &segchanman.Seg{ 221 Filepath: fd.Name(), 222 Data: seg, 223 }) 224 } 225 return nil 226} 227 228func (ss *StreamSession) TryAddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) { 229 ctx = log.WithLogValues(ctx, "rendition", rendition) 230 err := ss.AddToHLS(ctx, spseg, rendition, data) 231 if err != nil { 232 log.Error(ctx, "could not add to hls", "error", err) 233 } 234} 235 236func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 237 buf := bytes.Buffer{} 238 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 239 if err != nil { 240 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 241 } 242 // newSeg := &streamplace.Segment{ 243 // LexiconTypeID: "place.stream.segment", 244 // Id: spseg.Id, 245 // Creator: spseg.Creator, 246 // StartTime: spseg.StartTime, 247 // Duration: &dur, 248 // Audio: spseg.Audio, 249 // Video: spseg.Video, 250 // SigningKey: spseg.SigningKey, 251 // } 252 aqt, err := aqtime.FromString(spseg.StartTime) 253 if err != nil { 254 return fmt.Errorf("failed to parse segment start time: %w", err) 255 } 256 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 257 ss.hls.GetRendition(rendition).NewSegment(&media.Segment{ 258 Buf: &buf, 259 Duration: time.Duration(dur), 260 Time: aqt.Time(), 261 }) 262 return nil 263}