Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.2 620 lines 18 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/aqtime" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/livepeer" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/media" 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/renditions" 24 "stream.place/streamplace/pkg/replication/iroh_replicator" 25 "stream.place/streamplace/pkg/spmetrics" 26 "stream.place/streamplace/pkg/statedb" 27 "stream.place/streamplace/pkg/streamplace" 28 "stream.place/streamplace/pkg/thumbnail" 29) 30 31type StreamSession struct { 32 mm *media.MediaManager 33 mod model.Model 34 cli *config.CLI 35 bus *bus.Bus 36 op *oatproxy.OATProxy 37 hls *media.M3U8 38 lp *livepeer.LivepeerSession 39 repoDID string 40 segmentChan chan struct{} 41 lastStatus time.Time 42 lastStatusCID *string 43 lastStatusLock sync.Mutex 44 lastOriginTime time.Time 45 lastOriginLock sync.Mutex 46 g *errgroup.Group 47 started chan struct{} 48 ctx context.Context 49 packets []bus.PacketizedSegment 50 statefulDB *statedb.StatefulDB 51 swarm *iroh_replicator.IrohSwarm 52} 53 54func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 55 ctx, cancel := context.WithCancel(ctx) 56 ss.g, ctx = errgroup.WithContext(ctx) 57 sid := livepeer.RandomTrailer(8) 58 ctx = log.WithLogValues(ctx, "sid", sid) 59 ss.ctx = ctx 60 log.Log(ctx, "starting stream session") 61 defer cancel() 62 spseg, err := notif.Segment.ToStreamplaceSegment() 63 if err != nil { 64 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 65 } 66 var allRenditions renditions.Renditions 67 68 if ss.cli.LivepeerGatewayURL != "" { 69 allRenditions, err = renditions.GenerateRenditions(spseg) 70 } else { 71 allRenditions = []renditions.Rendition{} 72 } 73 if err != nil { 74 return err 75 } 76 if spseg.Duration == nil { 77 return fmt.Errorf("segment duration is required to calculate bitrate") 78 } 79 dur := time.Duration(*spseg.Duration) 80 byteLen := len(notif.Data) 81 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 82 sourceRendition := renditions.Rendition{ 83 Name: "source", 84 Bitrate: bitrate, 85 Width: spseg.Video[0].Width, 86 Height: spseg.Video[0].Height, 87 } 88 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 89 ss.hls = media.NewM3U8(allRenditions) 90 91 // for _, r := range allRenditions { 92 // g.Go(func() error { 93 // for { 94 // if ctx.Err() != nil { 95 // return nil 96 // } 97 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 98 // if ctx.Err() != nil { 99 // return nil 100 // } 101 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 102 // time.Sleep(time.Second * 5) 103 // } 104 // }) 105 // } 106 107 close(ss.started) 108 109 for { 110 select { 111 case <-ss.segmentChan: 112 // reset timer 113 case <-ctx.Done(): 114 return ss.g.Wait() 115 // case <-time.After(time.Minute * 1): 116 case <-time.After(time.Second * 60): 117 log.Log(ctx, "no new segments for 1 minute, shutting down") 118 for _, r := range allRenditions { 119 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 120 } 121 if notif.Local { 122 ss.Go(ctx, func() error { 123 return ss.DeleteStatus(spseg.Creator) 124 }) 125 } 126 cancel() 127 } 128 } 129} 130 131// Execute a goroutine in the context of the stream session. Errors are 132// non-fatal; if you actually want to melt the universe on an error you 133// should panic() 134func (ss *StreamSession) Go(ctx context.Context, f func() error) { 135 <-ss.started 136 ss.g.Go(func() error { 137 err := f() 138 if err != nil { 139 log.Error(ctx, "error in goroutine", "error", err) 140 } 141 return nil 142 }) 143} 144 145func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 146 <-ss.started 147 go func() { 148 select { 149 case <-ss.ctx.Done(): 150 return 151 case ss.segmentChan <- struct{}{}: 152 } 153 }() 154 aqt := aqtime.FromTime(notif.Segment.StartTime) 155 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 156 notif.Segment.MediaData.Size = len(notif.Data) 157 err := ss.mod.CreateSegment(notif.Segment) 158 if err != nil { 159 return fmt.Errorf("could not add segment to database: %w", err) 160 } 161 spseg, err := notif.Segment.ToStreamplaceSegment() 162 if err != nil { 163 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 164 } 165 166 ss.bus.Publish(spseg.Creator, spseg) 167 ss.Go(ctx, func() error { 168 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 169 Filepath: notif.Segment.ID, 170 Data: notif.Data, 171 }) 172 }) 173 174 if ss.cli.Thumbnail { 175 ss.Go(ctx, func() error { 176 return ss.Thumbnail(ctx, spseg.Creator, notif) 177 }) 178 } 179 180 if notif.Local { 181 ss.Go(ctx, func() error { 182 return ss.UpdateStatus(ctx, spseg.Creator) 183 }) 184 185 ss.Go(ctx, func() error { 186 return ss.UpdateBroadcastOrigin(ctx) 187 }) 188 } 189 190 if ss.cli.LivepeerGatewayURL != "" { 191 ss.Go(ctx, func() error { 192 start := time.Now() 193 err := ss.Transcode(ctx, spseg, notif.Data) 194 took := time.Since(start) 195 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 196 return err 197 }) 198 } 199 200 return nil 201} 202 203func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 204 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 205 locked := lock.TryLock() 206 if !locked { 207 // we're already generating a thumbnail for this user, skip 208 return nil 209 } 210 defer lock.Unlock() 211 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 212 if err != nil { 213 return err 214 } 215 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 216 // we have a thumbnail <60sec old, skip generating a new one 217 return nil 218 } 219 r := bytes.NewReader(not.Data) 220 aqt := aqtime.FromTime(not.Segment.StartTime) 221 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 222 if err != nil { 223 return err 224 } 225 defer fd.Close() 226 err = media.Thumbnail(ctx, r, fd, "jpeg") 227 if err != nil { 228 return err 229 } 230 thumb := &model.Thumbnail{ 231 Format: "jpeg", 232 SegmentID: not.Segment.ID, 233 } 234 err = ss.mod.CreateThumbnail(thumb) 235 if err != nil { 236 return err 237 } 238 return nil 239} 240 241func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) { 242 if pv == nil { 243 return nil, fmt.Errorf("post view is nil") 244 } 245 rec, ok := pv.Record.Val.(*bsky.FeedPost) 246 if !ok { 247 return nil, fmt.Errorf("post view record is not a feed post") 248 } 249 if rec.Embed == nil { 250 return nil, fmt.Errorf("post view embed is nil") 251 } 252 if rec.Embed.EmbedExternal == nil { 253 return nil, fmt.Errorf("post view embed external view is nil") 254 } 255 if rec.Embed.EmbedExternal.External == nil { 256 return nil, fmt.Errorf("post view embed external is nil") 257 } 258 if rec.Embed.EmbedExternal.External.Thumb == nil { 259 return nil, fmt.Errorf("post view embed external thumb is nil") 260 } 261 return rec.Embed.EmbedExternal.External.Thumb, nil 262} 263 264func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 265 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 266 ss.lastStatusLock.Lock() 267 defer ss.lastStatusLock.Unlock() 268 if time.Since(ss.lastStatus) < time.Minute { 269 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 270 return nil 271 } 272 273 session, err := ss.statefulDB.GetSessionByDID(repoDID) 274 if err != nil { 275 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 276 } 277 if session == nil { 278 return fmt.Errorf("no session found for repoDID: %s", repoDID) 279 } 280 281 session, err = ss.op.RefreshIfNeeded(session) 282 if err != nil { 283 return fmt.Errorf("could not refresh session for repoDID: %w", err) 284 } 285 286 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 287 if err != nil { 288 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 289 } 290 lsv, err := ls.ToLivestreamView() 291 if err != nil { 292 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 293 } 294 295 post, err := ss.mod.GetFeedPost(ls.PostURI) 296 if err != nil { 297 return fmt.Errorf("could not get feed post: %w", err) 298 } 299 if post == nil { 300 return fmt.Errorf("feed post not found for livestream: %w", err) 301 } 302 postView, err := post.ToBskyPostView() 303 if err != nil { 304 return fmt.Errorf("could not convert feed post to bsky post view: %w", err) 305 } 306 thumb, err := getThumbnailCID(postView) 307 if err != nil { 308 return fmt.Errorf("could not get thumbnail cid: %w", err) 309 } 310 311 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 312 if err != nil { 313 return fmt.Errorf("could not get repo for repoDID: %w", err) 314 } 315 316 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 317 if !ok { 318 return fmt.Errorf("livestream is not a streamplace livestream") 319 } 320 321 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 322 323 if lsr.CanonicalUrl != nil { 324 canonicalUrl = *lsr.CanonicalUrl 325 } 326 327 actorStatusEmbed := bsky.ActorStatus_Embed{ 328 EmbedExternal: &bsky.EmbedExternal{ 329 External: &bsky.EmbedExternal_External{ 330 Title: lsr.Title, 331 Uri: canonicalUrl, 332 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 333 Thumb: thumb, 334 }, 335 }, 336 } 337 338 duration := int64(120) 339 status := bsky.ActorStatus{ 340 Status: "app.bsky.actor.status#live", 341 DurationMinutes: &duration, 342 Embed: &actorStatusEmbed, 343 CreatedAt: time.Now().Format(time.RFC3339), 344 } 345 346 client, err := ss.op.GetXrpcClient(session) 347 if err != nil { 348 return fmt.Errorf("could not get xrpc client: %w", err) 349 } 350 351 var swapRecord *string 352 getOutput := atproto.RepoGetRecord_Output{} 353 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 354 "repo": repoDID, 355 "collection": "app.bsky.actor.status", 356 "rkey": "self", 357 }, nil, &getOutput) 358 if err != nil { 359 xErr, ok := err.(*xrpc.Error) 360 if !ok { 361 return fmt.Errorf("could not get record: %w", err) 362 } 363 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 364 return fmt.Errorf("could not get record: %w", err) 365 } 366 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 367 } else { 368 log.Debug(ctx, "got record", "record", getOutput) 369 swapRecord = getOutput.Cid 370 } 371 372 inp := atproto.RepoPutRecord_Input{ 373 Collection: "app.bsky.actor.status", 374 Record: &util.LexiconTypeDecoder{Val: &status}, 375 Rkey: "self", 376 Repo: repoDID, 377 SwapRecord: swapRecord, 378 } 379 out := atproto.RepoPutRecord_Output{} 380 381 ss.lastStatusCID = &out.Cid 382 383 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 384 if err != nil { 385 return fmt.Errorf("could not create record: %w", err) 386 } 387 log.Debug(ctx, "created status record", "out", out) 388 389 ss.lastStatus = time.Now() 390 391 return nil 392} 393 394func (ss *StreamSession) DeleteStatus(repoDID string) error { 395 // need a special extra context because the stream session context is already cancelled 396 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 397 ss.lastStatusLock.Lock() 398 defer ss.lastStatusLock.Unlock() 399 if ss.lastStatusCID == nil { 400 log.Debug(ctx, "no status cid to delete") 401 return nil 402 } 403 inp := atproto.RepoDeleteRecord_Input{ 404 Collection: "app.bsky.actor.status", 405 Rkey: "self", 406 Repo: repoDID, 407 } 408 inp.SwapRecord = ss.lastStatusCID 409 out := atproto.RepoDeleteRecord_Output{} 410 411 session, err := ss.statefulDB.GetSessionByDID(repoDID) 412 if err != nil { 413 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 414 } 415 if session == nil { 416 return fmt.Errorf("no session found for repoDID: %s", repoDID) 417 } 418 419 session, err = ss.op.RefreshIfNeeded(session) 420 if err != nil { 421 return fmt.Errorf("could not refresh session for repoDID: %w", err) 422 } 423 424 client, err := ss.op.GetXrpcClient(session) 425 if err != nil { 426 return fmt.Errorf("could not get xrpc client: %w", err) 427 } 428 429 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out) 430 if err != nil { 431 return fmt.Errorf("could not delete record: %w", err) 432 } 433 434 ss.lastStatusCID = nil 435 return nil 436} 437 438var originUpdateInterval = time.Second * 30 439 440func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error { 441 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 442 ss.lastOriginLock.Lock() 443 defer ss.lastOriginLock.Unlock() 444 if time.Since(ss.lastOriginTime) < originUpdateInterval { 445 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 446 return nil 447 } 448 origin := streamplace.BroadcastOrigin{ 449 Streamer: ss.repoDID, 450 Server: fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost), 451 UpdatedAt: time.Now().Format(time.RFC3339), 452 IrohTicket: &ss.swarm.NodeTicket, 453 } 454 455 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 456 if err != nil { 457 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 458 } 459 if session == nil { 460 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 461 } 462 463 session, err = ss.op.RefreshIfNeeded(session) 464 if err != nil { 465 return fmt.Errorf("could not refresh session for repoDID: %w", err) 466 } 467 468 client, err := ss.op.GetXrpcClient(session) 469 if err != nil { 470 return fmt.Errorf("could not get xrpc client: %w", err) 471 } 472 473 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost) 474 475 var swapRecord *string 476 getOutput := atproto.RepoGetRecord_Output{} 477 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 478 "repo": ss.repoDID, 479 "collection": "place.stream.broadcast.origin", 480 "rkey": rkey, 481 }, nil, &getOutput) 482 if err != nil { 483 xErr, ok := err.(*xrpc.Error) 484 if !ok { 485 return fmt.Errorf("could not get record: %w", err) 486 } 487 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 488 return fmt.Errorf("could not get record: %w", err) 489 } 490 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 491 } else { 492 log.Debug(ctx, "got record", "record", getOutput) 493 swapRecord = getOutput.Cid 494 } 495 496 inp := atproto.RepoPutRecord_Input{ 497 Collection: "place.stream.broadcast.origin", 498 Record: &util.LexiconTypeDecoder{Val: &origin}, 499 Rkey: fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost), 500 Repo: ss.repoDID, 501 SwapRecord: swapRecord, 502 } 503 out := atproto.RepoPutRecord_Output{} 504 505 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 506 if err != nil { 507 return fmt.Errorf("could not create record: %w", err) 508 } 509 510 ss.lastOriginTime = time.Now() 511 return nil 512} 513 514func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 515 rs, err := renditions.GenerateRenditions(spseg) 516 if err != nil { 517 return fmt.Errorf("failed to generated renditions: %w", err) 518 } 519 520 if ss.lp == nil { 521 var err error 522 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 523 if err != nil { 524 return err 525 } 526 527 } 528 spmetrics.TranscodeAttemptsTotal.Inc() 529 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 530 if err != nil { 531 spmetrics.TranscodeErrorsTotal.Inc() 532 return err 533 } 534 if len(rs) != len(segs) { 535 spmetrics.TranscodeErrorsTotal.Inc() 536 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 537 } 538 spmetrics.TranscodeSuccessesTotal.Inc() 539 aqt, err := aqtime.FromString(spseg.StartTime) 540 if err != nil { 541 return err 542 } 543 for i, seg := range segs { 544 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 545 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 546 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 547 if err != nil { 548 return fmt.Errorf("failed to create transcoded segment file: %w", err) 549 } 550 defer fd.Close() 551 _, err = fd.Write(seg) 552 if err != nil { 553 return fmt.Errorf("failed to write transcoded segment file: %w", err) 554 } 555 ss.Go(ctx, func() error { 556 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 557 Filepath: fd.Name(), 558 Data: seg, 559 }) 560 }) 561 562 } 563 return nil 564} 565 566func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 567 ss.Go(ctx, func() error { 568 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 569 }) 570 ss.Go(ctx, func() error { 571 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 572 }) 573 return nil 574} 575 576func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 577 packet, err := media.Packetize(ctx, seg) 578 if err != nil { 579 return fmt.Errorf("failed to packetize segment: %w", err) 580 } 581 seg.PacketizedData = packet 582 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 583 return nil 584} 585 586func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 587 buf := bytes.Buffer{} 588 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 589 if err != nil { 590 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 591 } 592 // newSeg := &streamplace.Segment{ 593 // LexiconTypeID: "place.stream.segment", 594 // Id: spseg.Id, 595 // Creator: spseg.Creator, 596 // StartTime: spseg.StartTime, 597 // Duration: &dur, 598 // Audio: spseg.Audio, 599 // Video: spseg.Video, 600 // SigningKey: spseg.SigningKey, 601 // } 602 aqt, err := aqtime.FromString(spseg.StartTime) 603 if err != nil { 604 return fmt.Errorf("failed to parse segment start time: %w", err) 605 } 606 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 607 rend, err := ss.hls.GetRendition(rendition) 608 if err != nil { 609 return fmt.Errorf("failed to get rendition: %w", err) 610 } 611 if err := rend.NewSegment(&media.Segment{ 612 Buf: &buf, 613 Duration: time.Duration(dur), 614 Time: aqt.Time(), 615 }); err != nil { 616 return fmt.Errorf("failed to create new segment: %w", err) 617 } 618 619 return nil 620}