Live video on the AT Protocol
at restructure 568 lines 17 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/aqtime" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/livepeer" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/media" 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/renditions" 24 "stream.place/streamplace/pkg/replication/iroh_replicator" 25 "stream.place/streamplace/pkg/spmetrics" 26 "stream.place/streamplace/pkg/statedb" 27 "stream.place/streamplace/pkg/streamplace" 28 "stream.place/streamplace/pkg/thumbnail" 29) 30 31type StreamSession struct { 32 mm *media.MediaManager 33 mod model.Model 34 cli *config.CLI 35 bus *bus.Bus 36 op *oatproxy.OATProxy 37 hls *media.M3U8 38 lp *livepeer.LivepeerSession 39 repoDID string 40 segmentChan chan struct{} 41 lastStatus time.Time 42 lastStatusLock sync.Mutex 43 lastOriginTime time.Time 44 lastOriginLock sync.Mutex 45 g *errgroup.Group 46 started chan struct{} 47 ctx context.Context 48 packets []bus.PacketizedSegment 49 statefulDB *statedb.StatefulDB 50 swarm *iroh_replicator.IrohSwarm 51} 52 53func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 54 ctx, cancel := context.WithCancel(ctx) 55 ss.g, ctx = errgroup.WithContext(ctx) 56 sid := livepeer.RandomTrailer(8) 57 ctx = log.WithLogValues(ctx, "sid", sid) 58 ss.ctx = ctx 59 log.Log(ctx, "starting stream session") 60 defer cancel() 61 spseg, err := notif.Segment.ToStreamplaceSegment() 62 if err != nil { 63 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 64 } 65 var allRenditions renditions.Renditions 66 67 if ss.cli.LivepeerGatewayURL != "" { 68 allRenditions, err = renditions.GenerateRenditions(spseg) 69 } else { 70 allRenditions = []renditions.Rendition{} 71 } 72 if err != nil { 73 return err 74 } 75 if spseg.Duration == nil { 76 return fmt.Errorf("segment duration is required to calculate bitrate") 77 } 78 dur := time.Duration(*spseg.Duration) 79 byteLen := len(notif.Data) 80 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 81 sourceRendition := renditions.Rendition{ 82 Name: "source", 83 Bitrate: bitrate, 84 Width: spseg.Video[0].Width, 85 Height: spseg.Video[0].Height, 86 } 87 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 88 ss.hls = media.NewM3U8(allRenditions) 89 90 // for _, r := range allRenditions { 91 // g.Go(func() error { 92 // for { 93 // if ctx.Err() != nil { 94 // return nil 95 // } 96 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 97 // if ctx.Err() != nil { 98 // return nil 99 // } 100 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 101 // time.Sleep(time.Second * 5) 102 // } 103 // }) 104 // } 105 106 close(ss.started) 107 108 for { 109 select { 110 case <-ss.segmentChan: 111 // reset timer 112 case <-ctx.Done(): 113 return ss.g.Wait() 114 // case <-time.After(time.Minute * 1): 115 case <-time.After(time.Second * 60): 116 log.Log(ctx, "no new segments for 1 minute, shutting down") 117 for _, r := range allRenditions { 118 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 119 } 120 cancel() 121 } 122 } 123} 124 125// Execute a goroutine in the context of the stream session. Errors are 126// non-fatal; if you actually want to melt the universe on an error you 127// should panic() 128func (ss *StreamSession) Go(ctx context.Context, f func() error) { 129 <-ss.started 130 ss.g.Go(func() error { 131 err := f() 132 if err != nil { 133 log.Error(ctx, "error in goroutine", "error", err) 134 } 135 return nil 136 }) 137} 138 139func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 140 <-ss.started 141 go func() { 142 select { 143 case <-ss.ctx.Done(): 144 return 145 case ss.segmentChan <- struct{}{}: 146 } 147 }() 148 aqt := aqtime.FromTime(notif.Segment.StartTime) 149 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 150 notif.Segment.MediaData.Size = len(notif.Data) 151 err := ss.mod.CreateSegment(notif.Segment) 152 if err != nil { 153 return fmt.Errorf("could not add segment to database: %w", err) 154 } 155 spseg, err := notif.Segment.ToStreamplaceSegment() 156 if err != nil { 157 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 158 } 159 160 ss.bus.Publish(spseg.Creator, spseg) 161 ss.Go(ctx, func() error { 162 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 163 Filepath: notif.Segment.ID, 164 Data: notif.Data, 165 }) 166 }) 167 168 if ss.cli.Thumbnail { 169 ss.Go(ctx, func() error { 170 return ss.Thumbnail(ctx, spseg.Creator, notif) 171 }) 172 } 173 174 ss.Go(ctx, func() error { 175 return ss.UpdateStatus(ctx, spseg.Creator) 176 }) 177 178 if notif.Local { 179 ss.Go(ctx, func() error { 180 return ss.UpdateBroadcastOrigin(ctx) 181 }) 182 } 183 184 if ss.cli.LivepeerGatewayURL != "" { 185 ss.Go(ctx, func() error { 186 start := time.Now() 187 err := ss.Transcode(ctx, spseg, notif.Data) 188 took := time.Since(start) 189 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 190 return err 191 }) 192 } 193 194 return nil 195} 196 197func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 198 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 199 locked := lock.TryLock() 200 if !locked { 201 // we're already generating a thumbnail for this user, skip 202 return nil 203 } 204 defer lock.Unlock() 205 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 206 if err != nil { 207 return err 208 } 209 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 210 // we have a thumbnail <60sec old, skip generating a new one 211 return nil 212 } 213 r := bytes.NewReader(not.Data) 214 aqt := aqtime.FromTime(not.Segment.StartTime) 215 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 216 if err != nil { 217 return err 218 } 219 defer fd.Close() 220 err = media.Thumbnail(ctx, r, fd, "jpeg") 221 if err != nil { 222 return err 223 } 224 thumb := &model.Thumbnail{ 225 Format: "jpeg", 226 SegmentID: not.Segment.ID, 227 } 228 err = ss.mod.CreateThumbnail(thumb) 229 if err != nil { 230 return err 231 } 232 return nil 233} 234 235func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) { 236 if pv == nil { 237 return nil, fmt.Errorf("post view is nil") 238 } 239 rec, ok := pv.Record.Val.(*bsky.FeedPost) 240 if !ok { 241 return nil, fmt.Errorf("post view record is not a feed post") 242 } 243 if rec.Embed == nil { 244 return nil, fmt.Errorf("post view embed is nil") 245 } 246 if rec.Embed.EmbedExternal == nil { 247 return nil, fmt.Errorf("post view embed external view is nil") 248 } 249 if rec.Embed.EmbedExternal.External == nil { 250 return nil, fmt.Errorf("post view embed external is nil") 251 } 252 if rec.Embed.EmbedExternal.External.Thumb == nil { 253 return nil, fmt.Errorf("post view embed external thumb is nil") 254 } 255 return rec.Embed.EmbedExternal.External.Thumb, nil 256} 257 258func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 259 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 260 ss.lastStatusLock.Lock() 261 defer ss.lastStatusLock.Unlock() 262 if time.Since(ss.lastStatus) < time.Minute { 263 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 264 return nil 265 } 266 267 session, err := ss.statefulDB.GetSessionByDID(repoDID) 268 if err != nil { 269 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 270 } 271 if session == nil { 272 return fmt.Errorf("no session found for repoDID: %s", repoDID) 273 } 274 275 session, err = ss.op.RefreshIfNeeded(session) 276 if err != nil { 277 return fmt.Errorf("could not refresh session for repoDID: %w", err) 278 } 279 280 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 281 if err != nil { 282 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 283 } 284 lsv, err := ls.ToLivestreamView() 285 if err != nil { 286 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 287 } 288 289 post, err := ss.mod.GetFeedPost(ls.PostURI) 290 if err != nil { 291 return fmt.Errorf("could not get feed post: %w", err) 292 } 293 if post == nil { 294 return fmt.Errorf("feed post not found for livestream: %w", err) 295 } 296 postView, err := post.ToBskyPostView() 297 if err != nil { 298 return fmt.Errorf("could not convert feed post to bsky post view: %w", err) 299 } 300 thumb, err := getThumbnailCID(postView) 301 if err != nil { 302 return fmt.Errorf("could not get thumbnail cid: %w", err) 303 } 304 305 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 306 if err != nil { 307 return fmt.Errorf("could not get repo for repoDID: %w", err) 308 } 309 310 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 311 if !ok { 312 return fmt.Errorf("livestream is not a streamplace livestream") 313 } 314 315 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 316 317 if lsr.CanonicalUrl != nil { 318 canonicalUrl = *lsr.CanonicalUrl 319 } 320 321 actorStatusEmbed := bsky.ActorStatus_Embed{ 322 EmbedExternal: &bsky.EmbedExternal{ 323 External: &bsky.EmbedExternal_External{ 324 Title: lsr.Title, 325 Uri: canonicalUrl, 326 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 327 Thumb: thumb, 328 }, 329 }, 330 } 331 332 duration := int64(2) 333 status := bsky.ActorStatus{ 334 Status: "app.bsky.actor.status#live", 335 DurationMinutes: &duration, 336 Embed: &actorStatusEmbed, 337 CreatedAt: time.Now().Format(time.RFC3339), 338 } 339 340 client, err := ss.op.GetXrpcClient(session) 341 if err != nil { 342 return fmt.Errorf("could not get xrpc client: %w", err) 343 } 344 345 var swapRecord *string 346 getOutput := atproto.RepoGetRecord_Output{} 347 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 348 "repo": repoDID, 349 "collection": "app.bsky.actor.status", 350 "rkey": "self", 351 }, nil, &getOutput) 352 if err != nil { 353 xErr, ok := err.(*xrpc.Error) 354 if !ok { 355 return fmt.Errorf("could not get record: %w", err) 356 } 357 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 358 return fmt.Errorf("could not get record: %w", err) 359 } 360 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 361 } else { 362 log.Debug(ctx, "got record", "record", getOutput) 363 swapRecord = getOutput.Cid 364 } 365 366 inp := atproto.RepoPutRecord_Input{ 367 Collection: "app.bsky.actor.status", 368 Record: &util.LexiconTypeDecoder{Val: &status}, 369 Rkey: "self", 370 Repo: repoDID, 371 SwapRecord: swapRecord, 372 } 373 out := atproto.RepoPutRecord_Output{} 374 375 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 376 if err != nil { 377 return fmt.Errorf("could not create record: %w", err) 378 } 379 log.Debug(ctx, "created status record", "out", out) 380 381 ss.lastStatus = time.Now() 382 383 return nil 384} 385 386var originUpdateInterval = time.Second * 30 387 388func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error { 389 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 390 ss.lastOriginLock.Lock() 391 defer ss.lastOriginLock.Unlock() 392 if time.Since(ss.lastOriginTime) < originUpdateInterval { 393 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 394 return nil 395 } 396 origin := streamplace.BroadcastOrigin{ 397 Streamer: ss.repoDID, 398 Server: fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost), 399 UpdatedAt: time.Now().Format(time.RFC3339), 400 IrohTicket: &ss.swarm.NodeTicket, 401 } 402 403 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 404 if err != nil { 405 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 406 } 407 if session == nil { 408 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 409 } 410 411 session, err = ss.op.RefreshIfNeeded(session) 412 if err != nil { 413 return fmt.Errorf("could not refresh session for repoDID: %w", err) 414 } 415 416 client, err := ss.op.GetXrpcClient(session) 417 if err != nil { 418 return fmt.Errorf("could not get xrpc client: %w", err) 419 } 420 421 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost) 422 423 var swapRecord *string 424 getOutput := atproto.RepoGetRecord_Output{} 425 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 426 "repo": ss.repoDID, 427 "collection": "place.stream.broadcast.origin", 428 "rkey": rkey, 429 }, nil, &getOutput) 430 if err != nil { 431 xErr, ok := err.(*xrpc.Error) 432 if !ok { 433 return fmt.Errorf("could not get record: %w", err) 434 } 435 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 436 return fmt.Errorf("could not get record: %w", err) 437 } 438 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 439 } else { 440 log.Debug(ctx, "got record", "record", getOutput) 441 swapRecord = getOutput.Cid 442 } 443 444 inp := atproto.RepoPutRecord_Input{ 445 Collection: "place.stream.broadcast.origin", 446 Record: &util.LexiconTypeDecoder{Val: &origin}, 447 Rkey: fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost), 448 Repo: ss.repoDID, 449 SwapRecord: swapRecord, 450 } 451 out := atproto.RepoPutRecord_Output{} 452 453 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 454 if err != nil { 455 return fmt.Errorf("could not create record: %w", err) 456 } 457 458 ss.lastOriginTime = time.Now() 459 return nil 460} 461 462func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 463 rs, err := renditions.GenerateRenditions(spseg) 464 if err != nil { 465 return fmt.Errorf("failed to generated renditions: %w", err) 466 } 467 468 if ss.lp == nil { 469 var err error 470 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 471 if err != nil { 472 return err 473 } 474 475 } 476 spmetrics.TranscodeAttemptsTotal.Inc() 477 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 478 if err != nil { 479 spmetrics.TranscodeErrorsTotal.Inc() 480 return err 481 } 482 if len(rs) != len(segs) { 483 spmetrics.TranscodeErrorsTotal.Inc() 484 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 485 } 486 spmetrics.TranscodeSuccessesTotal.Inc() 487 aqt, err := aqtime.FromString(spseg.StartTime) 488 if err != nil { 489 return err 490 } 491 for i, seg := range segs { 492 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 493 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 494 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 495 if err != nil { 496 return fmt.Errorf("failed to create transcoded segment file: %w", err) 497 } 498 defer fd.Close() 499 _, err = fd.Write(seg) 500 if err != nil { 501 return fmt.Errorf("failed to write transcoded segment file: %w", err) 502 } 503 ss.Go(ctx, func() error { 504 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 505 Filepath: fd.Name(), 506 Data: seg, 507 }) 508 }) 509 510 } 511 return nil 512} 513 514func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 515 ss.Go(ctx, func() error { 516 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 517 }) 518 ss.Go(ctx, func() error { 519 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 520 }) 521 return nil 522} 523 524func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 525 packet, err := media.Packetize(ctx, seg) 526 if err != nil { 527 return fmt.Errorf("failed to packetize segment: %w", err) 528 } 529 seg.PacketizedData = packet 530 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 531 return nil 532} 533 534func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 535 buf := bytes.Buffer{} 536 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 537 if err != nil { 538 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 539 } 540 // newSeg := &streamplace.Segment{ 541 // LexiconTypeID: "place.stream.segment", 542 // Id: spseg.Id, 543 // Creator: spseg.Creator, 544 // StartTime: spseg.StartTime, 545 // Duration: &dur, 546 // Audio: spseg.Audio, 547 // Video: spseg.Video, 548 // SigningKey: spseg.SigningKey, 549 // } 550 aqt, err := aqtime.FromString(spseg.StartTime) 551 if err != nil { 552 return fmt.Errorf("failed to parse segment start time: %w", err) 553 } 554 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 555 rend, err := ss.hls.GetRendition(rendition) 556 if err != nil { 557 return fmt.Errorf("failed to get rendition: %w", err) 558 } 559 if err := rend.NewSegment(&media.Segment{ 560 Buf: &buf, 561 Duration: time.Duration(dur), 562 Time: aqt.Time(), 563 }); err != nil { 564 return fmt.Errorf("failed to create new segment: %w", err) 565 } 566 567 return nil 568}