Live video on the AT Protocol
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/streamplace/oatproxy/pkg/oatproxy" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/aqtime" 18 "stream.place/streamplace/pkg/bus" 19 "stream.place/streamplace/pkg/config" 20 "stream.place/streamplace/pkg/livepeer" 21 "stream.place/streamplace/pkg/log" 22 "stream.place/streamplace/pkg/media" 23 "stream.place/streamplace/pkg/model" 24 "stream.place/streamplace/pkg/renditions" 25 "stream.place/streamplace/pkg/replication/iroh_replicator" 26 "stream.place/streamplace/pkg/spmetrics" 27 "stream.place/streamplace/pkg/statedb" 28 "stream.place/streamplace/pkg/streamplace" 29 "stream.place/streamplace/pkg/thumbnail" 30) 31 32type StreamSession struct { 33 mm *media.MediaManager 34 mod model.Model 35 cli *config.CLI 36 bus *bus.Bus 37 op *oatproxy.OATProxy 38 hls *media.M3U8 39 lp *livepeer.LivepeerSession 40 repoDID string 41 segmentChan chan struct{} 42 lastStatus time.Time 43 lastStatusCID *string 44 lastStatusLock sync.Mutex 45 lastOriginTime time.Time 46 lastOriginLock sync.Mutex 47 g *errgroup.Group 48 started chan struct{} 49 ctx context.Context 50 packets []bus.PacketizedSegment 51 statefulDB *statedb.StatefulDB 52 swarm *iroh_replicator.IrohSwarm 53} 54 55func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 56 ctx, cancel := context.WithCancel(ctx) 57 ss.g, ctx = errgroup.WithContext(ctx) 58 sid := livepeer.RandomTrailer(8) 59 ctx = log.WithLogValues(ctx, "sid", sid) 60 ss.ctx = ctx 61 log.Log(ctx, "starting stream session") 62 defer cancel() 63 spseg, err := notif.Segment.ToStreamplaceSegment() 64 if err != nil { 65 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 66 } 67 var allRenditions renditions.Renditions 68 69 if ss.cli.LivepeerGatewayURL != "" { 70 allRenditions, err = renditions.GenerateRenditions(spseg) 71 } else { 72 allRenditions = []renditions.Rendition{} 73 } 74 if err != nil { 75 return err 76 } 77 if spseg.Duration == nil { 78 return fmt.Errorf("segment duration is required to calculate bitrate") 79 } 80 dur := time.Duration(*spseg.Duration) 81 byteLen := len(notif.Data) 82 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 83 sourceRendition := renditions.Rendition{ 84 Name: "source", 85 Bitrate: bitrate, 86 Width: spseg.Video[0].Width, 87 Height: spseg.Video[0].Height, 88 } 89 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 90 ss.hls = media.NewM3U8(allRenditions) 91 92 // for _, r := range allRenditions { 93 // g.Go(func() error { 94 // for { 95 // if ctx.Err() != nil { 96 // return nil 97 // } 98 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 99 // if ctx.Err() != nil { 100 // return nil 101 // } 102 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 103 // time.Sleep(time.Second * 5) 104 // } 105 // }) 106 // } 107 108 close(ss.started) 109 110 for { 111 select { 112 case <-ss.segmentChan: 113 // reset timer 114 case <-ctx.Done(): 115 return ss.g.Wait() 116 // case <-time.After(time.Minute * 1): 117 case <-time.After(time.Second * 60): 118 log.Log(ctx, "no new segments for 1 minute, shutting down") 119 for _, r := range allRenditions { 120 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 121 } 122 if notif.Local { 123 ss.Go(ctx, func() error { 124 return ss.DeleteStatus(spseg.Creator) 125 }) 126 } 127 cancel() 128 } 129 } 130} 131 132// Execute a goroutine in the context of the stream session. Errors are 133// non-fatal; if you actually want to melt the universe on an error you 134// should panic() 135func (ss *StreamSession) Go(ctx context.Context, f func() error) { 136 <-ss.started 137 ss.g.Go(func() error { 138 err := f() 139 if err != nil { 140 log.Error(ctx, "error in goroutine", "error", err) 141 } 142 return nil 143 }) 144} 145 146func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 147 <-ss.started 148 go func() { 149 select { 150 case <-ss.ctx.Done(): 151 return 152 case ss.segmentChan <- struct{}{}: 153 } 154 }() 155 aqt := aqtime.FromTime(notif.Segment.StartTime) 156 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 157 notif.Segment.MediaData.Size = len(notif.Data) 158 err := ss.mod.CreateSegment(notif.Segment) 159 if err != nil { 160 return fmt.Errorf("could not add segment to database: %w", err) 161 } 162 spseg, err := notif.Segment.ToStreamplaceSegment() 163 if err != nil { 164 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 165 } 166 167 ss.bus.Publish(spseg.Creator, spseg) 168 ss.Go(ctx, func() error { 169 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 170 Filepath: notif.Segment.ID, 171 Data: notif.Data, 172 }) 173 }) 174 175 if ss.cli.Thumbnail { 176 ss.Go(ctx, func() error { 177 return ss.Thumbnail(ctx, spseg.Creator, notif) 178 }) 179 } 180 181 if notif.Local { 182 ss.Go(ctx, func() error { 183 return ss.UpdateStatus(ctx, spseg.Creator) 184 }) 185 186 ss.Go(ctx, func() error { 187 return ss.UpdateBroadcastOrigin(ctx) 188 }) 189 } 190 191 if ss.cli.LivepeerGatewayURL != "" { 192 ss.Go(ctx, func() error { 193 start := time.Now() 194 err := ss.Transcode(ctx, spseg, notif.Data) 195 took := time.Since(start) 196 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 197 return err 198 }) 199 } 200 201 return nil 202} 203 204func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 205 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 206 locked := lock.TryLock() 207 if !locked { 208 // we're already generating a thumbnail for this user, skip 209 return nil 210 } 211 defer lock.Unlock() 212 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 213 if err != nil { 214 return err 215 } 216 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 217 // we have a thumbnail <60sec old, skip generating a new one 218 return nil 219 } 220 r := bytes.NewReader(not.Data) 221 aqt := aqtime.FromTime(not.Segment.StartTime) 222 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 223 if err != nil { 224 return err 225 } 226 defer fd.Close() 227 err = media.Thumbnail(ctx, r, fd, "jpeg") 228 if err != nil { 229 return err 230 } 231 thumb := &model.Thumbnail{ 232 Format: "jpeg", 233 SegmentID: not.Segment.ID, 234 } 235 err = ss.mod.CreateThumbnail(thumb) 236 if err != nil { 237 return err 238 } 239 return nil 240} 241 242func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*lexutil.LexBlob, error) { 243 if pv == nil { 244 return nil, fmt.Errorf("post view is nil") 245 } 246 rec, ok := pv.Record.Val.(*bsky.FeedPost) 247 if !ok { 248 return nil, fmt.Errorf("post view record is not a feed post") 249 } 250 if rec.Embed == nil { 251 return nil, fmt.Errorf("post view embed is nil") 252 } 253 if rec.Embed.EmbedExternal == nil { 254 return nil, fmt.Errorf("post view embed external view is nil") 255 } 256 if rec.Embed.EmbedExternal.External == nil { 257 return nil, fmt.Errorf("post view embed external is nil") 258 } 259 if rec.Embed.EmbedExternal.External.Thumb == nil { 260 return nil, fmt.Errorf("post view embed external thumb is nil") 261 } 262 return rec.Embed.EmbedExternal.External.Thumb, nil 263} 264 265func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 266 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 267 ss.lastStatusLock.Lock() 268 defer ss.lastStatusLock.Unlock() 269 if time.Since(ss.lastStatus) < time.Minute { 270 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 271 return nil 272 } 273 274 session, err := ss.statefulDB.GetSessionByDID(repoDID) 275 if err != nil { 276 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 277 } 278 if session == nil { 279 return fmt.Errorf("no session found for repoDID: %s", repoDID) 280 } 281 282 session, err = ss.op.RefreshIfNeeded(session) 283 if err != nil { 284 return fmt.Errorf("could not refresh session for repoDID: %w", err) 285 } 286 287 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 288 if err != nil { 289 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 290 } 291 lsv, err := ls.ToLivestreamView() 292 if err != nil { 293 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 294 } 295 296 post, err := ss.mod.GetFeedPost(ls.PostURI) 297 if err != nil { 298 return fmt.Errorf("could not get feed post: %w", err) 299 } 300 if post == nil { 301 return fmt.Errorf("feed post not found for livestream: %w", err) 302 } 303 postView, err := post.ToBskyPostView() 304 if err != nil { 305 return fmt.Errorf("could not convert feed post to bsky post view: %w", err) 306 } 307 thumb, err := getThumbnailCID(postView) 308 if err != nil { 309 return fmt.Errorf("could not get thumbnail cid: %w", err) 310 } 311 312 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 313 if err != nil { 314 return fmt.Errorf("could not get repo for repoDID: %w", err) 315 } 316 317 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 318 if !ok { 319 return fmt.Errorf("livestream is not a streamplace livestream") 320 } 321 322 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 323 324 if lsr.CanonicalUrl != nil { 325 canonicalUrl = *lsr.CanonicalUrl 326 } 327 328 actorStatusEmbed := bsky.ActorStatus_Embed{ 329 EmbedExternal: &bsky.EmbedExternal{ 330 External: &bsky.EmbedExternal_External{ 331 Title: lsr.Title, 332 Uri: canonicalUrl, 333 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 334 Thumb: thumb, 335 }, 336 }, 337 } 338 339 duration := int64(120) 340 status := bsky.ActorStatus{ 341 Status: "app.bsky.actor.status#live", 342 DurationMinutes: &duration, 343 Embed: &actorStatusEmbed, 344 CreatedAt: time.Now().Format(time.RFC3339), 345 } 346 347 client, err := ss.op.GetXrpcClient(session) 348 if err != nil { 349 return fmt.Errorf("could not get xrpc client: %w", err) 350 } 351 352 var swapRecord *string 353 getOutput := atproto.RepoGetRecord_Output{} 354 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 355 "repo": repoDID, 356 "collection": "app.bsky.actor.status", 357 "rkey": "self", 358 }, nil, &getOutput) 359 if err != nil { 360 xErr, ok := err.(*xrpc.Error) 361 if !ok { 362 return fmt.Errorf("could not get record: %w", err) 363 } 364 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 365 return fmt.Errorf("could not get record: %w", err) 366 } 367 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 368 } else { 369 log.Debug(ctx, "got record", "record", getOutput) 370 swapRecord = getOutput.Cid 371 } 372 373 inp := atproto.RepoPutRecord_Input{ 374 Collection: "app.bsky.actor.status", 375 Record: &lexutil.LexiconTypeDecoder{Val: &status}, 376 Rkey: "self", 377 Repo: repoDID, 378 SwapRecord: swapRecord, 379 } 380 out := atproto.RepoPutRecord_Output{} 381 382 ss.lastStatusCID = &out.Cid 383 384 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 385 if err != nil { 386 return fmt.Errorf("could not create record: %w", err) 387 } 388 log.Debug(ctx, "created status record", "out", out) 389 390 ss.lastStatus = time.Now() 391 392 return nil 393} 394 395func (ss *StreamSession) DeleteStatus(repoDID string) error { 396 // need a special extra context because the stream session context is already cancelled 397 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 398 ss.lastStatusLock.Lock() 399 defer ss.lastStatusLock.Unlock() 400 if ss.lastStatusCID == nil { 401 log.Debug(ctx, "no status cid to delete") 402 return nil 403 } 404 inp := atproto.RepoDeleteRecord_Input{ 405 Collection: "app.bsky.actor.status", 406 Rkey: "self", 407 Repo: repoDID, 408 } 409 inp.SwapRecord = ss.lastStatusCID 410 out := atproto.RepoDeleteRecord_Output{} 411 412 session, err := ss.statefulDB.GetSessionByDID(repoDID) 413 if err != nil { 414 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 415 } 416 if session == nil { 417 return fmt.Errorf("no session found for repoDID: %s", repoDID) 418 } 419 420 session, err = ss.op.RefreshIfNeeded(session) 421 if err != nil { 422 return fmt.Errorf("could not refresh session for repoDID: %w", err) 423 } 424 425 client, err := ss.op.GetXrpcClient(session) 426 if err != nil { 427 return fmt.Errorf("could not get xrpc client: %w", err) 428 } 429 430 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out) 431 if err != nil { 432 return fmt.Errorf("could not delete record: %w", err) 433 } 434 435 ss.lastStatusCID = nil 436 return nil 437} 438 439var originUpdateInterval = time.Second * 30 440 441func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error { 442 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 443 ss.lastOriginLock.Lock() 444 defer ss.lastOriginLock.Unlock() 445 if time.Since(ss.lastOriginTime) < originUpdateInterval { 446 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 447 return nil 448 } 449 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost) 450 origin := streamplace.BroadcastOrigin{ 451 Streamer: ss.repoDID, 452 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost), 453 Broadcaster: &broadcaster, 454 UpdatedAt: time.Now().Format(util.ISO8601), 455 IrohTicket: &ss.swarm.NodeTicket, 456 } 457 458 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 459 if err != nil { 460 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 461 } 462 if session == nil { 463 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 464 } 465 466 session, err = ss.op.RefreshIfNeeded(session) 467 if err != nil { 468 return fmt.Errorf("could not refresh session for repoDID: %w", err) 469 } 470 471 client, err := ss.op.GetXrpcClient(session) 472 if err != nil { 473 return fmt.Errorf("could not get xrpc client: %w", err) 474 } 475 476 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost) 477 478 var swapRecord *string 479 getOutput := atproto.RepoGetRecord_Output{} 480 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 481 "repo": ss.repoDID, 482 "collection": "place.stream.broadcast.origin", 483 "rkey": rkey, 484 }, nil, &getOutput) 485 if err != nil { 486 xErr, ok := err.(*xrpc.Error) 487 if !ok { 488 return fmt.Errorf("could not get record: %w", err) 489 } 490 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 491 return fmt.Errorf("could not get record: %w", err) 492 } 493 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 494 } else { 495 log.Debug(ctx, "got record", "record", getOutput) 496 swapRecord = getOutput.Cid 497 } 498 499 inp := atproto.RepoPutRecord_Input{ 500 Collection: "place.stream.broadcast.origin", 501 Record: &lexutil.LexiconTypeDecoder{Val: &origin}, 502 Rkey: fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost), 503 Repo: ss.repoDID, 504 SwapRecord: swapRecord, 505 } 506 out := atproto.RepoPutRecord_Output{} 507 508 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 509 if err != nil { 510 return fmt.Errorf("could not create record: %w", err) 511 } 512 513 ss.lastOriginTime = time.Now() 514 return nil 515} 516 517func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 518 rs, err := renditions.GenerateRenditions(spseg) 519 if err != nil { 520 return fmt.Errorf("failed to generated renditions: %w", err) 521 } 522 523 if ss.lp == nil { 524 var err error 525 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 526 if err != nil { 527 return err 528 } 529 530 } 531 spmetrics.TranscodeAttemptsTotal.Inc() 532 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 533 if err != nil { 534 spmetrics.TranscodeErrorsTotal.Inc() 535 return err 536 } 537 if len(rs) != len(segs) { 538 spmetrics.TranscodeErrorsTotal.Inc() 539 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 540 } 541 spmetrics.TranscodeSuccessesTotal.Inc() 542 aqt, err := aqtime.FromString(spseg.StartTime) 543 if err != nil { 544 return err 545 } 546 for i, seg := range segs { 547 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 548 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 549 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 550 if err != nil { 551 return fmt.Errorf("failed to create transcoded segment file: %w", err) 552 } 553 defer fd.Close() 554 _, err = fd.Write(seg) 555 if err != nil { 556 return fmt.Errorf("failed to write transcoded segment file: %w", err) 557 } 558 ss.Go(ctx, func() error { 559 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 560 Filepath: fd.Name(), 561 Data: seg, 562 }) 563 }) 564 565 } 566 return nil 567} 568 569func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 570 ss.Go(ctx, func() error { 571 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 572 }) 573 ss.Go(ctx, func() error { 574 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 575 }) 576 return nil 577} 578 579func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 580 packet, err := media.Packetize(ctx, seg) 581 if err != nil { 582 return fmt.Errorf("failed to packetize segment: %w", err) 583 } 584 seg.PacketizedData = packet 585 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 586 return nil 587} 588 589func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 590 buf := bytes.Buffer{} 591 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 592 if err != nil { 593 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 594 } 595 // newSeg := &streamplace.Segment{ 596 // LexiconTypeID: "place.stream.segment", 597 // Id: spseg.Id, 598 // Creator: spseg.Creator, 599 // StartTime: spseg.StartTime, 600 // Duration: &dur, 601 // Audio: spseg.Audio, 602 // Video: spseg.Video, 603 // SigningKey: spseg.SigningKey, 604 // } 605 aqt, err := aqtime.FromString(spseg.StartTime) 606 if err != nil { 607 return fmt.Errorf("failed to parse segment start time: %w", err) 608 } 609 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 610 rend, err := ss.hls.GetRendition(rendition) 611 if err != nil { 612 return fmt.Errorf("failed to get rendition: %w", err) 613 } 614 if err := rend.NewSegment(&media.Segment{ 615 Buf: &buf, 616 Duration: time.Duration(dur), 617 Time: aqt.Time(), 618 }); err != nil { 619 return fmt.Errorf("failed to create new segment: %w", err) 620 } 621 622 return nil 623}