Live video on the AT Protocol
at eli/github-skip-darwin 434 lines 13 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/aqtime" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/livepeer" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/media" 22 "stream.place/streamplace/pkg/media/segchanman" 23 "stream.place/streamplace/pkg/model" 24 "stream.place/streamplace/pkg/renditions" 25 "stream.place/streamplace/pkg/spmetrics" 26 "stream.place/streamplace/pkg/streamplace" 27 "stream.place/streamplace/pkg/thumbnail" 28) 29 30type StreamSession struct { 31 mm *media.MediaManager 32 mod model.Model 33 cli *config.CLI 34 bus *bus.Bus 35 op *oatproxy.OATProxy 36 hls *media.M3U8 37 lp *livepeer.LivepeerSession 38 repoDID string 39 segmentChan chan struct{} 40 lastStatus time.Time 41 lastStatusLock sync.Mutex 42} 43 44func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { 45 46 sid := livepeer.RandomTrailer(8) 47 ctx = log.WithLogValues(ctx, "sid", sid) 48 ctx, cancel := context.WithCancel(ctx) 49 log.Log(ctx, "starting stream session") 50 defer cancel() 51 spseg, err := not.Segment.ToStreamplaceSegment() 52 if err != nil { 53 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 54 } 55 var allRenditions renditions.Renditions 56 57 if ss.cli.LivepeerGatewayURL != "" { 58 allRenditions, err = renditions.GenerateRenditions(spseg) 59 } else { 60 allRenditions = []renditions.Rendition{} 61 } 62 if err != nil { 63 return err 64 } 65 if spseg.Duration == nil { 66 return fmt.Errorf("segment duration is required to calculate bitrate") 67 } 68 dur := time.Duration(*spseg.Duration) 69 byteLen := len(not.Data) 70 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 71 sourceRendition := renditions.Rendition{ 72 Name: "source", 73 Bitrate: bitrate, 74 Width: spseg.Video[0].Width, 75 Height: spseg.Video[0].Height, 76 } 77 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 78 ss.hls = media.NewM3U8(allRenditions) 79 80 g, ctx := errgroup.WithContext(ctx) 81 82 // for _, r := range allRenditions { 83 // g.Go(func() error { 84 // for { 85 // if ctx.Err() != nil { 86 // return nil 87 // } 88 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 89 // if ctx.Err() != nil { 90 // return nil 91 // } 92 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 93 // time.Sleep(time.Second * 5) 94 // } 95 // }) 96 // } 97 98 for { 99 select { 100 case <-ss.segmentChan: 101 // reset timer 102 case <-ctx.Done(): 103 return g.Wait() 104 // case <-time.After(time.Minute * 1): 105 case <-time.After(time.Second * 60): 106 log.Log(ctx, "no new segments for 1 minute, shutting down") 107 cancel() 108 } 109 } 110} 111 112func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error { 113 if ctx.Err() != nil { 114 return nil 115 } 116 ss.segmentChan <- struct{}{} 117 aqt := aqtime.FromTime(not.Segment.StartTime) 118 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 119 err := ss.mod.CreateSegment(not.Segment) 120 if err != nil { 121 return fmt.Errorf("could not add segment to database: %w", err) 122 } 123 spseg, err := not.Segment.ToStreamplaceSegment() 124 if err != nil { 125 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 126 } 127 128 ss.bus.Publish(spseg.Creator, spseg) 129 go ss.TryAddToHLS(ctx, spseg, "source", not.Data) 130 131 if ss.cli.Thumbnail { 132 go func() { 133 err := ss.Thumbnail(ctx, spseg.Creator, not) 134 if err != nil { 135 log.Error(ctx, "could not create thumbnail", "error", err) 136 } 137 }() 138 } 139 140 go func() { 141 err := ss.UpdateStatus(ctx, spseg.Creator) 142 if err != nil { 143 log.Error(ctx, "could not update status", "error", err) 144 } 145 }() 146 147 if ss.cli.LivepeerGatewayURL != "" { 148 go func() { 149 start := time.Now() 150 err := ss.Transcode(ctx, spseg, not.Data) 151 took := time.Since(start) 152 if err != nil { 153 log.Error(ctx, "could not transcode", "error", err, "took", took) 154 } else { 155 log.Log(ctx, "transcoded segment", "took", took) 156 } 157 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(time.Since(start).Milliseconds())) 158 }() 159 } 160 161 return nil 162} 163 164func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 165 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 166 locked := lock.TryLock() 167 if !locked { 168 // we're already generating a thumbnail for this user, skip 169 return nil 170 } 171 defer lock.Unlock() 172 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 173 if err != nil { 174 return err 175 } 176 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 177 // we have a thumbnail <60sec old, skip generating a new one 178 return nil 179 } 180 r := bytes.NewReader(not.Data) 181 aqt := aqtime.FromTime(not.Segment.StartTime) 182 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 183 if err != nil { 184 return err 185 } 186 defer fd.Close() 187 err = media.Thumbnail(ctx, r, fd, "jpeg") 188 if err != nil { 189 return err 190 } 191 thumb := &model.Thumbnail{ 192 Format: "jpeg", 193 SegmentID: not.Segment.ID, 194 } 195 err = ss.mod.CreateThumbnail(thumb) 196 if err != nil { 197 return err 198 } 199 return nil 200} 201 202func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) { 203 if pv == nil { 204 return nil, fmt.Errorf("post view is nil") 205 } 206 rec, ok := pv.Record.Val.(*bsky.FeedPost) 207 if !ok { 208 return nil, fmt.Errorf("post view record is not a feed post") 209 } 210 if rec.Embed == nil { 211 return nil, fmt.Errorf("post view embed is nil") 212 } 213 if rec.Embed.EmbedExternal == nil { 214 return nil, fmt.Errorf("post view embed external view is nil") 215 } 216 if rec.Embed.EmbedExternal.External == nil { 217 return nil, fmt.Errorf("post view embed external is nil") 218 } 219 if rec.Embed.EmbedExternal.External.Thumb == nil { 220 return nil, fmt.Errorf("post view embed external thumb is nil") 221 } 222 return rec.Embed.EmbedExternal.External.Thumb, nil 223} 224 225func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 226 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 227 ss.lastStatusLock.Lock() 228 defer ss.lastStatusLock.Unlock() 229 if time.Since(ss.lastStatus) < time.Minute { 230 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 231 return nil 232 } 233 234 session, err := ss.mod.GetSessionByDID(repoDID) 235 if err != nil { 236 return fmt.Errorf("could not get session for repoDID: %w", err) 237 } 238 if session == nil { 239 return fmt.Errorf("no session found for repoDID: %s", repoDID) 240 } 241 242 session, err = ss.op.RefreshIfNeeded(session) 243 if err != nil { 244 return fmt.Errorf("could not refresh session for repoDID: %w", err) 245 } 246 247 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 248 if err != nil { 249 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 250 } 251 lsv, err := ls.ToLivestreamView() 252 if err != nil { 253 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 254 } 255 256 post, err := ss.mod.GetFeedPost(ls.PostCID) 257 if err != nil { 258 return fmt.Errorf("could not get feed post: %w", err) 259 } 260 if post == nil { 261 return fmt.Errorf("feed post not found for livestream: %w", err) 262 } 263 postView, err := post.ToBskyPostView() 264 if err != nil { 265 return fmt.Errorf("could not convert feed post to bsky post view: %w", err) 266 } 267 thumb, err := getThumbnailCID(postView) 268 if err != nil { 269 return fmt.Errorf("could not get thumbnail cid: %w", err) 270 } 271 272 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 273 if err != nil { 274 return fmt.Errorf("could not get repo for repoDID: %w", err) 275 } 276 277 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 278 if !ok { 279 return fmt.Errorf("livestream is not a streamplace livestream") 280 } 281 282 actorStatusEmbed := bsky.ActorStatus_Embed{ 283 EmbedExternal: &bsky.EmbedExternal{ 284 External: &bsky.EmbedExternal_External{ 285 Title: lsr.Title, 286 Uri: fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle), 287 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost), 288 Thumb: thumb, 289 }, 290 }, 291 } 292 293 duration := int64(2) 294 status := bsky.ActorStatus{ 295 Status: "live", 296 DurationMinutes: &duration, 297 Embed: &actorStatusEmbed, 298 CreatedAt: time.Now().Format(time.RFC3339), 299 } 300 301 client, err := ss.op.GetXrpcClient(session) 302 if err != nil { 303 return fmt.Errorf("could not get xrpc client: %w", err) 304 } 305 306 var swapRecord *string 307 getOutput := atproto.RepoGetRecord_Output{} 308 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 309 "repo": repoDID, 310 "collection": "app.bsky.actor.status", 311 "rkey": "self", 312 }, nil, &getOutput) 313 if err != nil { 314 xErr, ok := err.(*xrpc.Error) 315 if !ok { 316 return fmt.Errorf("could not get record: %w", err) 317 } 318 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 319 return fmt.Errorf("could not get record: %w", err) 320 } 321 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 322 } else { 323 log.Debug(ctx, "got record", "record", getOutput) 324 swapRecord = getOutput.Cid 325 } 326 327 inp := atproto.RepoPutRecord_Input{ 328 Collection: "app.bsky.actor.status", 329 Record: &util.LexiconTypeDecoder{Val: &status}, 330 Rkey: "self", 331 Repo: repoDID, 332 SwapRecord: swapRecord, 333 } 334 out := atproto.RepoPutRecord_Output{} 335 336 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 337 if err != nil { 338 return fmt.Errorf("could not create record: %w", err) 339 } 340 log.Debug(ctx, "created status record", "out", out) 341 342 ss.lastStatus = time.Now() 343 344 return nil 345} 346 347func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 348 rs, err := renditions.GenerateRenditions(spseg) 349 if err != nil { 350 return fmt.Errorf("failed to generated renditions: %w", err) 351 } 352 353 if ss.lp == nil { 354 var err error 355 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL) 356 if err != nil { 357 return err 358 } 359 360 } 361 spmetrics.TranscodeAttemptsTotal.Inc() 362 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 363 if err != nil { 364 spmetrics.TranscodeErrorsTotal.Inc() 365 return err 366 } 367 if len(rs) != len(segs) { 368 spmetrics.TranscodeErrorsTotal.Inc() 369 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 370 } 371 spmetrics.TranscodeSuccessesTotal.Inc() 372 aqt, err := aqtime.FromString(spseg.StartTime) 373 if err != nil { 374 return err 375 } 376 for i, seg := range segs { 377 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 378 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 379 if err != nil { 380 return fmt.Errorf("failed to create transcoded segment file: %w", err) 381 } 382 defer fd.Close() 383 _, err = fd.Write(seg) 384 if err != nil { 385 return fmt.Errorf("failed to write transcoded segment file: %w", err) 386 } 387 go ss.TryAddToHLS(ctx, spseg, rs[i].Name, seg) 388 go ss.mm.PublishSegment(ctx, spseg.Creator, rs[i].Name, &segchanman.Seg{ 389 Filepath: fd.Name(), 390 Data: seg, 391 }) 392 } 393 return nil 394} 395 396func (ss *StreamSession) TryAddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) { 397 ctx = log.WithLogValues(ctx, "rendition", rendition) 398 err := ss.AddToHLS(ctx, spseg, rendition, data) 399 if err != nil { 400 log.Error(ctx, "could not add to hls", "error", err) 401 } 402} 403 404func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 405 buf := bytes.Buffer{} 406 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 407 if err != nil { 408 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 409 } 410 // newSeg := &streamplace.Segment{ 411 // LexiconTypeID: "place.stream.segment", 412 // Id: spseg.Id, 413 // Creator: spseg.Creator, 414 // StartTime: spseg.StartTime, 415 // Duration: &dur, 416 // Audio: spseg.Audio, 417 // Video: spseg.Video, 418 // SigningKey: spseg.SigningKey, 419 // } 420 aqt, err := aqtime.FromString(spseg.StartTime) 421 if err != nil { 422 return fmt.Errorf("failed to parse segment start time: %w", err) 423 } 424 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 425 if err := ss.hls.GetRendition(rendition).NewSegment(&media.Segment{ 426 Buf: &buf, 427 Duration: time.Duration(dur), 428 Time: aqt.Time(), 429 }); err != nil { 430 return fmt.Errorf("failed to create new segment: %w", err) 431 } 432 433 return nil 434}