Live video on the AT Protocol
at eli/handle-changes 482 lines 14 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/aqtime" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/livepeer" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/media" 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/renditions" 24 "stream.place/streamplace/pkg/spmetrics" 25 "stream.place/streamplace/pkg/statedb" 26 "stream.place/streamplace/pkg/streamplace" 27 "stream.place/streamplace/pkg/thumbnail" 28) 29 30type StreamSession struct { 31 mm *media.MediaManager 32 mod model.Model 33 cli *config.CLI 34 bus *bus.Bus 35 op *oatproxy.OATProxy 36 hls *media.M3U8 37 lp *livepeer.LivepeerSession 38 repoDID string 39 segmentChan chan struct{} 40 lastStatus time.Time 41 lastStatusLock sync.Mutex 42 g *errgroup.Group 43 started chan struct{} 44 ctx context.Context 45 packets []bus.PacketizedSegment 46 statefulDB *statedb.StatefulDB 47} 48 49func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { 50 ctx, cancel := context.WithCancel(ctx) 51 ss.g, ctx = errgroup.WithContext(ctx) 52 sid := livepeer.RandomTrailer(8) 53 ctx = log.WithLogValues(ctx, "sid", sid) 54 ss.ctx = ctx 55 log.Log(ctx, "starting stream session") 56 defer cancel() 57 spseg, err := not.Segment.ToStreamplaceSegment() 58 if err != nil { 59 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 60 } 61 var allRenditions renditions.Renditions 62 63 if ss.cli.LivepeerGatewayURL != "" { 64 allRenditions, err = renditions.GenerateRenditions(spseg) 65 } else { 66 allRenditions = []renditions.Rendition{} 67 } 68 if err != nil { 69 return err 70 } 71 if spseg.Duration == nil { 72 return fmt.Errorf("segment duration is required to calculate bitrate") 73 } 74 dur := time.Duration(*spseg.Duration) 75 byteLen := len(not.Data) 76 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 77 sourceRendition := renditions.Rendition{ 78 Name: "source", 79 Bitrate: bitrate, 80 Width: spseg.Video[0].Width, 81 Height: spseg.Video[0].Height, 82 } 83 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 84 ss.hls = media.NewM3U8(allRenditions) 85 86 // for _, r := range allRenditions { 87 // g.Go(func() error { 88 // for { 89 // if ctx.Err() != nil { 90 // return nil 91 // } 92 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 93 // if ctx.Err() != nil { 94 // return nil 95 // } 96 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 97 // time.Sleep(time.Second * 5) 98 // } 99 // }) 100 // } 101 102 close(ss.started) 103 104 for { 105 select { 106 case <-ss.segmentChan: 107 // reset timer 108 case <-ctx.Done(): 109 return ss.g.Wait() 110 // case <-time.After(time.Minute * 1): 111 case <-time.After(time.Second * 60): 112 log.Log(ctx, "no new segments for 1 minute, shutting down") 113 for _, r := range allRenditions { 114 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 115 } 116 cancel() 117 } 118 } 119} 120 121// Execute a goroutine in the context of the stream session. Errors are 122// non-fatal; if you actually want to melt the universe on an error you 123// should panic() 124func (ss *StreamSession) Go(ctx context.Context, f func() error) { 125 <-ss.started 126 ss.g.Go(func() error { 127 err := f() 128 if err != nil { 129 log.Error(ctx, "error in goroutine", "error", err) 130 } 131 return nil 132 }) 133} 134 135func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error { 136 <-ss.started 137 go func() { 138 select { 139 case <-ss.ctx.Done(): 140 return 141 case ss.segmentChan <- struct{}{}: 142 } 143 }() 144 aqt := aqtime.FromTime(not.Segment.StartTime) 145 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 146 not.Segment.MediaData.Size = len(not.Data) 147 err := ss.mod.CreateSegment(not.Segment) 148 if err != nil { 149 return fmt.Errorf("could not add segment to database: %w", err) 150 } 151 spseg, err := not.Segment.ToStreamplaceSegment() 152 if err != nil { 153 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 154 } 155 156 ss.bus.Publish(spseg.Creator, spseg) 157 ss.Go(ctx, func() error { 158 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 159 Filepath: not.Segment.ID, 160 Data: not.Data, 161 }) 162 }) 163 164 if ss.cli.Thumbnail { 165 ss.Go(ctx, func() error { 166 return ss.Thumbnail(ctx, spseg.Creator, not) 167 }) 168 } 169 170 ss.Go(ctx, func() error { 171 return ss.UpdateStatus(ctx, spseg.Creator) 172 }) 173 174 if ss.cli.LivepeerGatewayURL != "" { 175 ss.Go(ctx, func() error { 176 start := time.Now() 177 err := ss.Transcode(ctx, spseg, not.Data) 178 took := time.Since(start) 179 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 180 return err 181 }) 182 } 183 184 return nil 185} 186 187func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 188 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 189 locked := lock.TryLock() 190 if !locked { 191 // we're already generating a thumbnail for this user, skip 192 return nil 193 } 194 defer lock.Unlock() 195 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 196 if err != nil { 197 return err 198 } 199 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 200 // we have a thumbnail <60sec old, skip generating a new one 201 return nil 202 } 203 r := bytes.NewReader(not.Data) 204 aqt := aqtime.FromTime(not.Segment.StartTime) 205 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 206 if err != nil { 207 return err 208 } 209 defer fd.Close() 210 err = media.Thumbnail(ctx, r, fd, "jpeg") 211 if err != nil { 212 return err 213 } 214 thumb := &model.Thumbnail{ 215 Format: "jpeg", 216 SegmentID: not.Segment.ID, 217 } 218 err = ss.mod.CreateThumbnail(thumb) 219 if err != nil { 220 return err 221 } 222 return nil 223} 224 225func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) { 226 if pv == nil { 227 return nil, fmt.Errorf("post view is nil") 228 } 229 rec, ok := pv.Record.Val.(*bsky.FeedPost) 230 if !ok { 231 return nil, fmt.Errorf("post view record is not a feed post") 232 } 233 if rec.Embed == nil { 234 return nil, fmt.Errorf("post view embed is nil") 235 } 236 if rec.Embed.EmbedExternal == nil { 237 return nil, fmt.Errorf("post view embed external view is nil") 238 } 239 if rec.Embed.EmbedExternal.External == nil { 240 return nil, fmt.Errorf("post view embed external is nil") 241 } 242 if rec.Embed.EmbedExternal.External.Thumb == nil { 243 return nil, fmt.Errorf("post view embed external thumb is nil") 244 } 245 return rec.Embed.EmbedExternal.External.Thumb, nil 246} 247 248func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 249 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 250 ss.lastStatusLock.Lock() 251 defer ss.lastStatusLock.Unlock() 252 if time.Since(ss.lastStatus) < time.Minute { 253 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 254 return nil 255 } 256 257 session, err := ss.statefulDB.GetSessionByDID(repoDID) 258 if err != nil { 259 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 260 } 261 if session == nil { 262 return fmt.Errorf("no session found for repoDID: %s", repoDID) 263 } 264 265 session, err = ss.op.RefreshIfNeeded(session) 266 if err != nil { 267 return fmt.Errorf("could not refresh session for repoDID: %w", err) 268 } 269 270 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 271 if err != nil { 272 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 273 } 274 lsv, err := ls.ToLivestreamView() 275 if err != nil { 276 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 277 } 278 279 post, err := ss.mod.GetFeedPost(ls.PostURI) 280 if err != nil { 281 return fmt.Errorf("could not get feed post: %w", err) 282 } 283 if post == nil { 284 return fmt.Errorf("feed post not found for livestream: %w", err) 285 } 286 postView, err := post.ToBskyPostView() 287 if err != nil { 288 return fmt.Errorf("could not convert feed post to bsky post view: %w", err) 289 } 290 thumb, err := getThumbnailCID(postView) 291 if err != nil { 292 return fmt.Errorf("could not get thumbnail cid: %w", err) 293 } 294 295 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 296 if err != nil { 297 return fmt.Errorf("could not get repo for repoDID: %w", err) 298 } 299 300 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 301 if !ok { 302 return fmt.Errorf("livestream is not a streamplace livestream") 303 } 304 305 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle) 306 307 if lsr.CanonicalUrl != nil { 308 canonicalUrl = *lsr.CanonicalUrl 309 } 310 311 actorStatusEmbed := bsky.ActorStatus_Embed{ 312 EmbedExternal: &bsky.EmbedExternal{ 313 External: &bsky.EmbedExternal_External{ 314 Title: lsr.Title, 315 Uri: canonicalUrl, 316 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost), 317 Thumb: thumb, 318 }, 319 }, 320 } 321 322 duration := int64(2) 323 status := bsky.ActorStatus{ 324 Status: "app.bsky.actor.status#live", 325 DurationMinutes: &duration, 326 Embed: &actorStatusEmbed, 327 CreatedAt: time.Now().Format(time.RFC3339), 328 } 329 330 client, err := ss.op.GetXrpcClient(session) 331 if err != nil { 332 return fmt.Errorf("could not get xrpc client: %w", err) 333 } 334 335 var swapRecord *string 336 getOutput := atproto.RepoGetRecord_Output{} 337 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 338 "repo": repoDID, 339 "collection": "app.bsky.actor.status", 340 "rkey": "self", 341 }, nil, &getOutput) 342 if err != nil { 343 xErr, ok := err.(*xrpc.Error) 344 if !ok { 345 return fmt.Errorf("could not get record: %w", err) 346 } 347 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 348 return fmt.Errorf("could not get record: %w", err) 349 } 350 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 351 } else { 352 log.Debug(ctx, "got record", "record", getOutput) 353 swapRecord = getOutput.Cid 354 } 355 356 inp := atproto.RepoPutRecord_Input{ 357 Collection: "app.bsky.actor.status", 358 Record: &util.LexiconTypeDecoder{Val: &status}, 359 Rkey: "self", 360 Repo: repoDID, 361 SwapRecord: swapRecord, 362 } 363 out := atproto.RepoPutRecord_Output{} 364 365 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 366 if err != nil { 367 return fmt.Errorf("could not create record: %w", err) 368 } 369 log.Debug(ctx, "created status record", "out", out) 370 371 ss.lastStatus = time.Now() 372 373 return nil 374} 375 376func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 377 rs, err := renditions.GenerateRenditions(spseg) 378 if err != nil { 379 return fmt.Errorf("failed to generated renditions: %w", err) 380 } 381 382 if ss.lp == nil { 383 var err error 384 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL) 385 if err != nil { 386 return err 387 } 388 389 } 390 spmetrics.TranscodeAttemptsTotal.Inc() 391 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 392 if err != nil { 393 spmetrics.TranscodeErrorsTotal.Inc() 394 return err 395 } 396 if len(rs) != len(segs) { 397 spmetrics.TranscodeErrorsTotal.Inc() 398 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 399 } 400 spmetrics.TranscodeSuccessesTotal.Inc() 401 aqt, err := aqtime.FromString(spseg.StartTime) 402 if err != nil { 403 return err 404 } 405 for i, seg := range segs { 406 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 407 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 408 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 409 if err != nil { 410 return fmt.Errorf("failed to create transcoded segment file: %w", err) 411 } 412 defer fd.Close() 413 _, err = fd.Write(seg) 414 if err != nil { 415 return fmt.Errorf("failed to write transcoded segment file: %w", err) 416 } 417 ss.Go(ctx, func() error { 418 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 419 Filepath: fd.Name(), 420 Data: seg, 421 }) 422 }) 423 424 } 425 return nil 426} 427 428func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 429 ss.Go(ctx, func() error { 430 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 431 }) 432 ss.Go(ctx, func() error { 433 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 434 }) 435 return nil 436} 437 438func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 439 packet, err := media.Packetize(ctx, seg) 440 if err != nil { 441 return fmt.Errorf("failed to packetize segment: %w", err) 442 } 443 seg.PacketizedData = packet 444 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 445 return nil 446} 447 448func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 449 buf := bytes.Buffer{} 450 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 451 if err != nil { 452 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 453 } 454 // newSeg := &streamplace.Segment{ 455 // LexiconTypeID: "place.stream.segment", 456 // Id: spseg.Id, 457 // Creator: spseg.Creator, 458 // StartTime: spseg.StartTime, 459 // Duration: &dur, 460 // Audio: spseg.Audio, 461 // Video: spseg.Video, 462 // SigningKey: spseg.SigningKey, 463 // } 464 aqt, err := aqtime.FromString(spseg.StartTime) 465 if err != nil { 466 return fmt.Errorf("failed to parse segment start time: %w", err) 467 } 468 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 469 rend, err := ss.hls.GetRendition(rendition) 470 if err != nil { 471 return fmt.Errorf("failed to get rendition: %w", err) 472 } 473 if err := rend.NewSegment(&media.Segment{ 474 Buf: &buf, 475 Duration: time.Duration(dur), 476 Time: aqt.Time(), 477 }); err != nil { 478 return fmt.Errorf("failed to create new segment: %w", err) 479 } 480 481 return nil 482}