Live video on the AT Protocol
at eli/problem-detection 473 lines 14 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/aqtime" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/livepeer" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/media" 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/renditions" 24 "stream.place/streamplace/pkg/spmetrics" 25 "stream.place/streamplace/pkg/streamplace" 26 "stream.place/streamplace/pkg/thumbnail" 27) 28 29type StreamSession struct { 30 mm *media.MediaManager 31 mod model.Model 32 cli *config.CLI 33 bus *bus.Bus 34 op *oatproxy.OATProxy 35 hls *media.M3U8 36 lp *livepeer.LivepeerSession 37 repoDID string 38 segmentChan chan struct{} 39 lastStatus time.Time 40 lastStatusLock sync.Mutex 41 g *errgroup.Group 42 started chan struct{} 43 ctx context.Context 44 packets []bus.PacketizedSegment 45} 46 47func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { 48 ctx, cancel := context.WithCancel(ctx) 49 ss.g, ctx = errgroup.WithContext(ctx) 50 sid := livepeer.RandomTrailer(8) 51 ctx = log.WithLogValues(ctx, "sid", sid) 52 ss.ctx = ctx 53 log.Log(ctx, "starting stream session") 54 defer cancel() 55 spseg, err := not.Segment.ToStreamplaceSegment() 56 if err != nil { 57 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 58 } 59 var allRenditions renditions.Renditions 60 61 if ss.cli.LivepeerGatewayURL != "" { 62 allRenditions, err = renditions.GenerateRenditions(spseg) 63 } else { 64 allRenditions = []renditions.Rendition{} 65 } 66 if err != nil { 67 return err 68 } 69 if spseg.Duration == nil { 70 return fmt.Errorf("segment duration is required to calculate bitrate") 71 } 72 dur := time.Duration(*spseg.Duration) 73 byteLen := len(not.Data) 74 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 75 sourceRendition := renditions.Rendition{ 76 Name: "source", 77 Bitrate: bitrate, 78 Width: spseg.Video[0].Width, 79 Height: spseg.Video[0].Height, 80 } 81 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 82 ss.hls = media.NewM3U8(allRenditions) 83 84 // for _, r := range allRenditions { 85 // g.Go(func() error { 86 // for { 87 // if ctx.Err() != nil { 88 // return nil 89 // } 90 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 91 // if ctx.Err() != nil { 92 // return nil 93 // } 94 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 95 // time.Sleep(time.Second * 5) 96 // } 97 // }) 98 // } 99 100 close(ss.started) 101 102 for { 103 select { 104 case <-ss.segmentChan: 105 // reset timer 106 case <-ctx.Done(): 107 return ss.g.Wait() 108 // case <-time.After(time.Minute * 1): 109 case <-time.After(time.Second * 60): 110 log.Log(ctx, "no new segments for 1 minute, shutting down") 111 for _, r := range allRenditions { 112 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 113 } 114 cancel() 115 } 116 } 117} 118 119// Execute a goroutine in the context of the stream session. Errors are 120// non-fatal; if you actually want to melt the universe on an error you 121// should panic() 122func (ss *StreamSession) Go(ctx context.Context, f func() error) { 123 <-ss.started 124 ss.g.Go(func() error { 125 err := f() 126 if err != nil { 127 log.Error(ctx, "error in goroutine", "error", err) 128 } 129 return nil 130 }) 131} 132 133func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error { 134 <-ss.started 135 go func() { 136 select { 137 case <-ss.ctx.Done(): 138 return 139 case ss.segmentChan <- struct{}{}: 140 } 141 }() 142 aqt := aqtime.FromTime(not.Segment.StartTime) 143 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 144 err := ss.mod.CreateSegment(not.Segment) 145 if err != nil { 146 return fmt.Errorf("could not add segment to database: %w", err) 147 } 148 spseg, err := not.Segment.ToStreamplaceSegment() 149 if err != nil { 150 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 151 } 152 153 ss.bus.Publish(spseg.Creator, spseg) 154 ss.Go(ctx, func() error { 155 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 156 Filepath: not.Segment.ID, 157 Data: not.Data, 158 }) 159 }) 160 161 if ss.cli.Thumbnail { 162 ss.Go(ctx, func() error { 163 return ss.Thumbnail(ctx, spseg.Creator, not) 164 }) 165 } 166 167 ss.Go(ctx, func() error { 168 return ss.UpdateStatus(ctx, spseg.Creator) 169 }) 170 171 if ss.cli.LivepeerGatewayURL != "" { 172 ss.Go(ctx, func() error { 173 start := time.Now() 174 err := ss.Transcode(ctx, spseg, not.Data) 175 took := time.Since(start) 176 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 177 return err 178 }) 179 } 180 181 return nil 182} 183 184func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 185 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 186 locked := lock.TryLock() 187 if !locked { 188 // we're already generating a thumbnail for this user, skip 189 return nil 190 } 191 defer lock.Unlock() 192 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 193 if err != nil { 194 return err 195 } 196 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 197 // we have a thumbnail <60sec old, skip generating a new one 198 return nil 199 } 200 r := bytes.NewReader(not.Data) 201 aqt := aqtime.FromTime(not.Segment.StartTime) 202 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 203 if err != nil { 204 return err 205 } 206 defer fd.Close() 207 err = media.Thumbnail(ctx, r, fd, "jpeg") 208 if err != nil { 209 return err 210 } 211 thumb := &model.Thumbnail{ 212 Format: "jpeg", 213 SegmentID: not.Segment.ID, 214 } 215 err = ss.mod.CreateThumbnail(thumb) 216 if err != nil { 217 return err 218 } 219 return nil 220} 221 222func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) { 223 if pv == nil { 224 return nil, fmt.Errorf("post view is nil") 225 } 226 rec, ok := pv.Record.Val.(*bsky.FeedPost) 227 if !ok { 228 return nil, fmt.Errorf("post view record is not a feed post") 229 } 230 if rec.Embed == nil { 231 return nil, fmt.Errorf("post view embed is nil") 232 } 233 if rec.Embed.EmbedExternal == nil { 234 return nil, fmt.Errorf("post view embed external view is nil") 235 } 236 if rec.Embed.EmbedExternal.External == nil { 237 return nil, fmt.Errorf("post view embed external is nil") 238 } 239 if rec.Embed.EmbedExternal.External.Thumb == nil { 240 return nil, fmt.Errorf("post view embed external thumb is nil") 241 } 242 return rec.Embed.EmbedExternal.External.Thumb, nil 243} 244 245func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 246 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 247 ss.lastStatusLock.Lock() 248 defer ss.lastStatusLock.Unlock() 249 if time.Since(ss.lastStatus) < time.Minute { 250 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 251 return nil 252 } 253 254 session, err := ss.mod.GetSessionByDID(repoDID) 255 if err != nil { 256 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 257 } 258 if session == nil { 259 return fmt.Errorf("no session found for repoDID: %s", repoDID) 260 } 261 262 session, err = ss.op.RefreshIfNeeded(session) 263 if err != nil { 264 return fmt.Errorf("could not refresh session for repoDID: %w", err) 265 } 266 267 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 268 if err != nil { 269 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 270 } 271 lsv, err := ls.ToLivestreamView() 272 if err != nil { 273 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 274 } 275 276 post, err := ss.mod.GetFeedPost(ls.PostCID) 277 if err != nil { 278 return fmt.Errorf("could not get feed post: %w", err) 279 } 280 if post == nil { 281 return fmt.Errorf("feed post not found for livestream: %w", err) 282 } 283 postView, err := post.ToBskyPostView() 284 if err != nil { 285 return fmt.Errorf("could not convert feed post to bsky post view: %w", err) 286 } 287 thumb, err := getThumbnailCID(postView) 288 if err != nil { 289 return fmt.Errorf("could not get thumbnail cid: %w", err) 290 } 291 292 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 293 if err != nil { 294 return fmt.Errorf("could not get repo for repoDID: %w", err) 295 } 296 297 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 298 if !ok { 299 return fmt.Errorf("livestream is not a streamplace livestream") 300 } 301 302 actorStatusEmbed := bsky.ActorStatus_Embed{ 303 EmbedExternal: &bsky.EmbedExternal{ 304 External: &bsky.EmbedExternal_External{ 305 Title: lsr.Title, 306 Uri: fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle), 307 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost), 308 Thumb: thumb, 309 }, 310 }, 311 } 312 313 duration := int64(2) 314 status := bsky.ActorStatus{ 315 Status: "live", 316 DurationMinutes: &duration, 317 Embed: &actorStatusEmbed, 318 CreatedAt: time.Now().Format(time.RFC3339), 319 } 320 321 client, err := ss.op.GetXrpcClient(session) 322 if err != nil { 323 return fmt.Errorf("could not get xrpc client: %w", err) 324 } 325 326 var swapRecord *string 327 getOutput := atproto.RepoGetRecord_Output{} 328 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 329 "repo": repoDID, 330 "collection": "app.bsky.actor.status", 331 "rkey": "self", 332 }, nil, &getOutput) 333 if err != nil { 334 xErr, ok := err.(*xrpc.Error) 335 if !ok { 336 return fmt.Errorf("could not get record: %w", err) 337 } 338 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 339 return fmt.Errorf("could not get record: %w", err) 340 } 341 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 342 } else { 343 log.Debug(ctx, "got record", "record", getOutput) 344 swapRecord = getOutput.Cid 345 } 346 347 inp := atproto.RepoPutRecord_Input{ 348 Collection: "app.bsky.actor.status", 349 Record: &util.LexiconTypeDecoder{Val: &status}, 350 Rkey: "self", 351 Repo: repoDID, 352 SwapRecord: swapRecord, 353 } 354 out := atproto.RepoPutRecord_Output{} 355 356 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 357 if err != nil { 358 return fmt.Errorf("could not create record: %w", err) 359 } 360 log.Debug(ctx, "created status record", "out", out) 361 362 ss.lastStatus = time.Now() 363 364 return nil 365} 366 367func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 368 rs, err := renditions.GenerateRenditions(spseg) 369 if err != nil { 370 return fmt.Errorf("failed to generated renditions: %w", err) 371 } 372 373 if ss.lp == nil { 374 var err error 375 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL) 376 if err != nil { 377 return err 378 } 379 380 } 381 spmetrics.TranscodeAttemptsTotal.Inc() 382 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 383 if err != nil { 384 spmetrics.TranscodeErrorsTotal.Inc() 385 return err 386 } 387 if len(rs) != len(segs) { 388 spmetrics.TranscodeErrorsTotal.Inc() 389 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 390 } 391 spmetrics.TranscodeSuccessesTotal.Inc() 392 aqt, err := aqtime.FromString(spseg.StartTime) 393 if err != nil { 394 return err 395 } 396 for i, seg := range segs { 397 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 398 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 399 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 400 if err != nil { 401 return fmt.Errorf("failed to create transcoded segment file: %w", err) 402 } 403 defer fd.Close() 404 _, err = fd.Write(seg) 405 if err != nil { 406 return fmt.Errorf("failed to write transcoded segment file: %w", err) 407 } 408 ss.Go(ctx, func() error { 409 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 410 Filepath: fd.Name(), 411 Data: seg, 412 }) 413 }) 414 415 } 416 return nil 417} 418 419func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 420 ss.Go(ctx, func() error { 421 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 422 }) 423 ss.Go(ctx, func() error { 424 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 425 }) 426 return nil 427} 428 429func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 430 packet, err := media.Packetize(ctx, seg) 431 if err != nil { 432 return fmt.Errorf("failed to packetize segment: %w", err) 433 } 434 seg.PacketizedData = packet 435 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 436 return nil 437} 438 439func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 440 buf := bytes.Buffer{} 441 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 442 if err != nil { 443 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 444 } 445 // newSeg := &streamplace.Segment{ 446 // LexiconTypeID: "place.stream.segment", 447 // Id: spseg.Id, 448 // Creator: spseg.Creator, 449 // StartTime: spseg.StartTime, 450 // Duration: &dur, 451 // Audio: spseg.Audio, 452 // Video: spseg.Video, 453 // SigningKey: spseg.SigningKey, 454 // } 455 aqt, err := aqtime.FromString(spseg.StartTime) 456 if err != nil { 457 return fmt.Errorf("failed to parse segment start time: %w", err) 458 } 459 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 460 rend, err := ss.hls.GetRendition(rendition) 461 if err != nil { 462 return fmt.Errorf("failed to get rendition: %w", err) 463 } 464 if err := rend.NewSegment(&media.Segment{ 465 Buf: &buf, 466 Duration: time.Duration(dur), 467 Time: aqt.Time(), 468 }); err != nil { 469 return fmt.Errorf("failed to create new segment: %w", err) 470 } 471 472 return nil 473}