Live video on the AT Protocol
at eli/fix-gitlab 426 lines 12 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/aqtime" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/livepeer" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/media" 22 "stream.place/streamplace/pkg/media/segchanman" 23 "stream.place/streamplace/pkg/model" 24 "stream.place/streamplace/pkg/renditions" 25 "stream.place/streamplace/pkg/spmetrics" 26 "stream.place/streamplace/pkg/streamplace" 27 "stream.place/streamplace/pkg/thumbnail" 28) 29 30type StreamSession struct { 31 mm *media.MediaManager 32 mod model.Model 33 cli *config.CLI 34 bus *bus.Bus 35 op *oatproxy.OATProxy 36 hls *media.M3U8 37 lp *livepeer.LivepeerSession 38 repoDID string 39 segmentChan chan struct{} 40 lastStatus time.Time 41 lastStatusLock sync.Mutex 42} 43 44func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { 45 46 sid := livepeer.RandomTrailer(8) 47 ctx = log.WithLogValues(ctx, "sid", sid) 48 ctx, cancel := context.WithCancel(ctx) 49 log.Log(ctx, "starting stream session") 50 defer cancel() 51 spseg, err := not.Segment.ToStreamplaceSegment() 52 if err != nil { 53 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 54 } 55 var allRenditions renditions.Renditions 56 57 if ss.cli.LivepeerGatewayURL != "" { 58 allRenditions, err = renditions.GenerateRenditions(spseg) 59 } else { 60 allRenditions = []renditions.Rendition{} 61 } 62 if err != nil { 63 return err 64 } 65 if spseg.Duration == nil { 66 return fmt.Errorf("segment duration is required to calculate bitrate") 67 } 68 dur := time.Duration(*spseg.Duration) 69 byteLen := len(not.Data) 70 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 71 sourceRendition := renditions.Rendition{ 72 Name: "source", 73 Bitrate: bitrate, 74 Width: spseg.Video[0].Width, 75 Height: spseg.Video[0].Height, 76 } 77 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 78 ss.hls = media.NewM3U8(allRenditions) 79 80 g, ctx := errgroup.WithContext(ctx) 81 82 // for _, r := range allRenditions { 83 // g.Go(func() error { 84 // for { 85 // if ctx.Err() != nil { 86 // return nil 87 // } 88 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 89 // if ctx.Err() != nil { 90 // return nil 91 // } 92 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 93 // time.Sleep(time.Second * 5) 94 // } 95 // }) 96 // } 97 98 for { 99 select { 100 case <-ss.segmentChan: 101 // reset timer 102 case <-ctx.Done(): 103 return g.Wait() 104 // case <-time.After(time.Minute * 1): 105 case <-time.After(time.Second * 60): 106 log.Log(ctx, "no new segments for 1 minute, shutting down") 107 cancel() 108 } 109 } 110} 111 112func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error { 113 if ctx.Err() != nil { 114 return nil 115 } 116 ss.segmentChan <- struct{}{} 117 aqt := aqtime.FromTime(not.Segment.StartTime) 118 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 119 err := ss.mod.CreateSegment(not.Segment) 120 if err != nil { 121 return fmt.Errorf("could not add segment to database: %w", err) 122 } 123 spseg, err := not.Segment.ToStreamplaceSegment() 124 if err != nil { 125 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 126 } 127 128 ss.bus.Publish(spseg.Creator, spseg) 129 go ss.TryAddToHLS(ctx, spseg, "source", not.Data) 130 131 if ss.cli.Thumbnail { 132 go func() { 133 err := ss.Thumbnail(ctx, spseg.Creator, not) 134 if err != nil { 135 log.Error(ctx, "could not create thumbnail", "error", err) 136 } 137 }() 138 } 139 140 go func() { 141 err := ss.UpdateStatus(ctx, spseg.Creator) 142 if err != nil { 143 log.Error(ctx, "could not update status", "error", err) 144 } 145 }() 146 147 if ss.cli.LivepeerGatewayURL != "" { 148 go func() { 149 start := time.Now() 150 err := ss.Transcode(ctx, spseg, not.Data) 151 took := time.Since(start) 152 if err != nil { 153 log.Error(ctx, "could not transcode", "error", err, "took", took) 154 } else { 155 log.Log(ctx, "transcoded segment", "took", took) 156 } 157 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(time.Since(start).Milliseconds())) 158 }() 159 } 160 161 return nil 162} 163 164func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 165 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 166 locked := lock.TryLock() 167 if !locked { 168 // we're already generating a thumbnail for this user, skip 169 return nil 170 } 171 defer lock.Unlock() 172 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 173 if err != nil { 174 return err 175 } 176 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 177 // we have a thumbnail <60sec old, skip generating a new one 178 return nil 179 } 180 r := bytes.NewReader(not.Data) 181 aqt := aqtime.FromTime(not.Segment.StartTime) 182 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "png") 183 if err != nil { 184 return err 185 } 186 defer fd.Close() 187 err = media.Thumbnail(ctx, r, fd) 188 if err != nil { 189 return err 190 } 191 thumb := &model.Thumbnail{ 192 Format: "png", 193 SegmentID: not.Segment.ID, 194 } 195 err = ss.mod.CreateThumbnail(thumb) 196 if err != nil { 197 return err 198 } 199 return nil 200} 201 202func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) { 203 if pv == nil { 204 return nil, fmt.Errorf("post view is nil") 205 } 206 rec, ok := pv.Record.Val.(*bsky.FeedPost) 207 if !ok { 208 return nil, fmt.Errorf("post view record is not a feed post") 209 } 210 if rec.Embed == nil { 211 return nil, fmt.Errorf("post view embed is nil") 212 } 213 if rec.Embed.EmbedExternal == nil { 214 return nil, fmt.Errorf("post view embed external view is nil") 215 } 216 if rec.Embed.EmbedExternal.External == nil { 217 return nil, fmt.Errorf("post view embed external is nil") 218 } 219 if rec.Embed.EmbedExternal.External.Thumb == nil { 220 return nil, fmt.Errorf("post view embed external thumb is nil") 221 } 222 return rec.Embed.EmbedExternal.External.Thumb, nil 223} 224 225func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 226 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 227 ss.lastStatusLock.Lock() 228 defer ss.lastStatusLock.Unlock() 229 if time.Since(ss.lastStatus) < time.Minute { 230 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 231 return nil 232 } 233 234 session, err := ss.mod.GetSessionByDID(repoDID) 235 if err != nil { 236 return fmt.Errorf("could not get session for repoDID: %w", err) 237 } 238 if session == nil { 239 return fmt.Errorf("no session found for repoDID: %s", repoDID) 240 } 241 242 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 243 if err != nil { 244 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 245 } 246 lsv, err := ls.ToLivestreamView() 247 if err != nil { 248 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 249 } 250 251 post, err := ss.mod.GetFeedPost(ls.PostCID) 252 if err != nil { 253 return fmt.Errorf("could not get feed post: %w", err) 254 } 255 postView, err := post.ToBskyPostView() 256 if err != nil { 257 return fmt.Errorf("could not convert feed post to bsky post view: %w", err) 258 } 259 thumb, err := getThumbnailCID(postView) 260 if err != nil { 261 return fmt.Errorf("could not get thumbnail cid: %w", err) 262 } 263 264 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 265 if err != nil { 266 return fmt.Errorf("could not get repo for repoDID: %w", err) 267 } 268 269 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 270 if !ok { 271 return fmt.Errorf("livestream is not a streamplace livestream") 272 } 273 274 actorStatusEmbed := bsky.ActorStatus_Embed{ 275 EmbedExternal: &bsky.EmbedExternal{ 276 External: &bsky.EmbedExternal_External{ 277 Title: lsr.Title, 278 Uri: fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle), 279 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost), 280 Thumb: thumb, 281 }, 282 }, 283 } 284 285 duration := int64(2) 286 status := bsky.ActorStatus{ 287 Status: "live", 288 DurationMinutes: &duration, 289 Embed: &actorStatusEmbed, 290 CreatedAt: time.Now().Format(time.RFC3339), 291 } 292 293 client, err := ss.op.GetXrpcClient(session) 294 if err != nil { 295 return fmt.Errorf("could not get xrpc client: %w", err) 296 } 297 298 var swapRecord *string 299 getOutput := atproto.RepoGetRecord_Output{} 300 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 301 "repo": repoDID, 302 "collection": "app.bsky.actor.status", 303 "rkey": "self", 304 }, nil, &getOutput) 305 if err != nil { 306 xErr, ok := err.(*xrpc.Error) 307 if !ok { 308 return fmt.Errorf("could not get record: %w", err) 309 } 310 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 311 return fmt.Errorf("could not get record: %w", err) 312 } 313 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 314 } else { 315 log.Debug(ctx, "got record", "record", getOutput) 316 swapRecord = getOutput.Cid 317 } 318 319 inp := atproto.RepoPutRecord_Input{ 320 Collection: "app.bsky.actor.status", 321 Record: &util.LexiconTypeDecoder{Val: &status}, 322 Rkey: "self", 323 Repo: repoDID, 324 SwapRecord: swapRecord, 325 } 326 out := atproto.RepoPutRecord_Output{} 327 328 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 329 if err != nil { 330 return fmt.Errorf("could not create record: %w", err) 331 } 332 log.Debug(ctx, "created status record", "out", out) 333 334 ss.lastStatus = time.Now() 335 336 return nil 337} 338 339func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 340 rs, err := renditions.GenerateRenditions(spseg) 341 if err != nil { 342 return fmt.Errorf("failed to generated renditions: %w", err) 343 } 344 345 if ss.lp == nil { 346 var err error 347 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL) 348 if err != nil { 349 return err 350 } 351 352 } 353 spmetrics.TranscodeAttemptsTotal.Inc() 354 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 355 if err != nil { 356 spmetrics.TranscodeErrorsTotal.Inc() 357 return err 358 } 359 if len(rs) != len(segs) { 360 spmetrics.TranscodeErrorsTotal.Inc() 361 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 362 } 363 spmetrics.TranscodeSuccessesTotal.Inc() 364 aqt, err := aqtime.FromString(spseg.StartTime) 365 if err != nil { 366 return err 367 } 368 for i, seg := range segs { 369 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 370 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 371 if err != nil { 372 return fmt.Errorf("failed to create transcoded segment file: %w", err) 373 } 374 defer fd.Close() 375 _, err = fd.Write(seg) 376 if err != nil { 377 return fmt.Errorf("failed to write transcoded segment file: %w", err) 378 } 379 go ss.TryAddToHLS(ctx, spseg, rs[i].Name, seg) 380 go ss.mm.PublishSegment(ctx, spseg.Creator, rs[i].Name, &segchanman.Seg{ 381 Filepath: fd.Name(), 382 Data: seg, 383 }) 384 } 385 return nil 386} 387 388func (ss *StreamSession) TryAddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) { 389 ctx = log.WithLogValues(ctx, "rendition", rendition) 390 err := ss.AddToHLS(ctx, spseg, rendition, data) 391 if err != nil { 392 log.Error(ctx, "could not add to hls", "error", err) 393 } 394} 395 396func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 397 buf := bytes.Buffer{} 398 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 399 if err != nil { 400 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 401 } 402 // newSeg := &streamplace.Segment{ 403 // LexiconTypeID: "place.stream.segment", 404 // Id: spseg.Id, 405 // Creator: spseg.Creator, 406 // StartTime: spseg.StartTime, 407 // Duration: &dur, 408 // Audio: spseg.Audio, 409 // Video: spseg.Video, 410 // SigningKey: spseg.SigningKey, 411 // } 412 aqt, err := aqtime.FromString(spseg.StartTime) 413 if err != nil { 414 return fmt.Errorf("failed to parse segment start time: %w", err) 415 } 416 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 417 if err := ss.hls.GetRendition(rendition).NewSegment(&media.Segment{ 418 Buf: &buf, 419 Duration: time.Duration(dur), 420 Time: aqt.Time(), 421 }); err != nil { 422 return fmt.Errorf("failed to create new segment: %w", err) 423 } 424 425 return nil 426}