Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/dropdown-stuff 656 lines 20 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/streamplace/oatproxy/pkg/oatproxy" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/aqtime" 18 "stream.place/streamplace/pkg/bus" 19 "stream.place/streamplace/pkg/config" 20 "stream.place/streamplace/pkg/livepeer" 21 "stream.place/streamplace/pkg/log" 22 "stream.place/streamplace/pkg/media" 23 "stream.place/streamplace/pkg/model" 24 "stream.place/streamplace/pkg/renditions" 25 "stream.place/streamplace/pkg/replication/iroh_replicator" 26 "stream.place/streamplace/pkg/spmetrics" 27 "stream.place/streamplace/pkg/statedb" 28 "stream.place/streamplace/pkg/streamplace" 29 "stream.place/streamplace/pkg/thumbnail" 30) 31 32type StreamSession struct { 33 mm *media.MediaManager 34 mod model.Model 35 cli *config.CLI 36 bus *bus.Bus 37 op *oatproxy.OATProxy 38 hls *media.M3U8 39 lp *livepeer.LivepeerSession 40 repoDID string 41 segmentChan chan struct{} 42 lastStatus time.Time 43 lastStatusCID *string 44 lastStatusLock sync.Mutex 45 lastOriginTime time.Time 46 lastOriginLock sync.Mutex 47 g *errgroup.Group 48 started chan struct{} 49 ctx context.Context 50 packets []bus.PacketizedSegment 51 statefulDB *statedb.StatefulDB 52 swarm *iroh_replicator.IrohSwarm 53} 54 55func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 56 ctx, cancel := context.WithCancel(ctx) 57 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc() 58 ss.g, ctx = errgroup.WithContext(ctx) 59 sid := livepeer.RandomTrailer(8) 60 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID) 61 ss.ctx = ctx 62 log.Log(ctx, "starting stream session") 63 defer cancel() 64 spseg, err := notif.Segment.ToStreamplaceSegment() 65 if err != nil { 66 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 67 } 68 var allRenditions renditions.Renditions 69 70 if ss.cli.LivepeerGatewayURL != "" { 71 allRenditions, err = renditions.GenerateRenditions(spseg) 72 } else { 73 allRenditions = []renditions.Rendition{} 74 } 75 if err != nil { 76 return err 77 } 78 if spseg.Duration == nil { 79 return fmt.Errorf("segment duration is required to calculate bitrate") 80 } 81 dur := time.Duration(*spseg.Duration) 82 byteLen := len(notif.Data) 83 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 84 sourceRendition := renditions.Rendition{ 85 Name: "source", 86 Bitrate: bitrate, 87 Width: spseg.Video[0].Width, 88 Height: spseg.Video[0].Height, 89 } 90 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 91 ss.hls = media.NewM3U8(allRenditions) 92 93 // for _, r := range allRenditions { 94 // g.Go(func() error { 95 // for { 96 // if ctx.Err() != nil { 97 // return nil 98 // } 99 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 100 // if ctx.Err() != nil { 101 // return nil 102 // } 103 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 104 // time.Sleep(time.Second * 5) 105 // } 106 // }) 107 // } 108 109 close(ss.started) 110 111 for { 112 select { 113 case <-ss.segmentChan: 114 // reset timer 115 case <-ctx.Done(): 116 return ss.g.Wait() 117 // case <-time.After(time.Minute * 1): 118 case <-time.After(time.Second * 60): 119 log.Log(ctx, "no new segments for 1 minute, shutting down") 120 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec() 121 for _, r := range allRenditions { 122 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 123 } 124 if notif.Local { 125 ss.Go(ctx, func() error { 126 return ss.DeleteStatus(spseg.Creator) 127 }) 128 } 129 cancel() 130 } 131 } 132} 133 134// Execute a goroutine in the context of the stream session. Errors are 135// non-fatal; if you actually want to melt the universe on an error you 136// should panic() 137func (ss *StreamSession) Go(ctx context.Context, f func() error) { 138 <-ss.started 139 ss.g.Go(func() error { 140 err := f() 141 if err != nil { 142 log.Error(ctx, "error in stream_session goroutine", "error", err) 143 } 144 return nil 145 }) 146} 147 148func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 149 <-ss.started 150 go func() { 151 select { 152 case <-ss.ctx.Done(): 153 return 154 case ss.segmentChan <- struct{}{}: 155 } 156 }() 157 aqt := aqtime.FromTime(notif.Segment.StartTime) 158 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 159 notif.Segment.MediaData.Size = len(notif.Data) 160 err := ss.mod.CreateSegment(notif.Segment) 161 if err != nil { 162 return fmt.Errorf("could not add segment to database: %w", err) 163 } 164 spseg, err := notif.Segment.ToStreamplaceSegment() 165 if err != nil { 166 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 167 } 168 169 ss.bus.Publish(spseg.Creator, spseg) 170 ss.Go(ctx, func() error { 171 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 172 Filepath: notif.Segment.ID, 173 Data: notif.Data, 174 }) 175 }) 176 177 if ss.cli.Thumbnail { 178 ss.Go(ctx, func() error { 179 return ss.Thumbnail(ctx, spseg.Creator, notif) 180 }) 181 } 182 183 if notif.Local { 184 ss.Go(ctx, func() error { 185 return ss.UpdateStatus(ctx, spseg.Creator) 186 }) 187 188 ss.Go(ctx, func() error { 189 return ss.UpdateBroadcastOrigin(ctx) 190 }) 191 } 192 193 if ss.cli.LivepeerGatewayURL != "" { 194 ss.Go(ctx, func() error { 195 start := time.Now() 196 err := ss.Transcode(ctx, spseg, notif.Data) 197 took := time.Since(start) 198 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 199 return err 200 }) 201 } 202 203 // trigger a notification blast if this is a new livestream 204 if notif.Metadata.Livestream != nil { 205 ss.Go(ctx, func() error { 206 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator) 207 if err != nil { 208 return fmt.Errorf("failed to get repo: %w", err) 209 } 210 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator) 211 if err != nil { 212 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 213 } 214 if livestreamModel == nil { 215 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator) 216 return nil 217 } 218 lsv, err := livestreamModel.ToLivestreamView() 219 if err != nil { 220 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err) 221 } 222 if !shouldNotify(lsv) { 223 log.Warn(ctx, "is not set to notify", "repoDID", spseg.Creator) 224 return nil 225 } 226 task := &statedb.NotificationTask{ 227 Livestream: lsv, 228 PDSURL: r.PDS, 229 } 230 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator) 231 if err != nil { 232 return fmt.Errorf("failed to get chat profile: %w", err) 233 } 234 if cp != nil { 235 spcp, err := cp.ToStreamplaceChatProfile() 236 if err != nil { 237 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 238 } 239 task.ChatProfile = spcp 240 } 241 242 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri))) 243 if err != nil { 244 log.Error(ctx, "failed to enqueue notification task", "err", err) 245 } 246 return ss.UpdateStatus(ctx, spseg.Creator) 247 }) 248 } else { 249 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator) 250 } 251 252 return nil 253} 254 255func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool { 256 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 257 if !ok { 258 return true 259 } 260 if lsvr.NotificationSettings == nil { 261 return true 262 } 263 settings := lsvr.NotificationSettings 264 if settings.PushNotification == nil { 265 return true 266 } 267 return *settings.PushNotification 268} 269 270func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 271 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 272 locked := lock.TryLock() 273 if !locked { 274 // we're already generating a thumbnail for this user, skip 275 return nil 276 } 277 defer lock.Unlock() 278 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 279 if err != nil { 280 return err 281 } 282 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 283 // we have a thumbnail <60sec old, skip generating a new one 284 return nil 285 } 286 r := bytes.NewReader(not.Data) 287 aqt := aqtime.FromTime(not.Segment.StartTime) 288 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 289 if err != nil { 290 return err 291 } 292 defer fd.Close() 293 err = media.Thumbnail(ctx, r, fd, "jpeg") 294 if err != nil { 295 return err 296 } 297 thumb := &model.Thumbnail{ 298 Format: "jpeg", 299 SegmentID: not.Segment.ID, 300 } 301 err = ss.mod.CreateThumbnail(thumb) 302 if err != nil { 303 return err 304 } 305 return nil 306} 307 308func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 309 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 310 ss.lastStatusLock.Lock() 311 defer ss.lastStatusLock.Unlock() 312 if time.Since(ss.lastStatus) < time.Minute { 313 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 314 return nil 315 } 316 317 session, err := ss.statefulDB.GetSessionByDID(repoDID) 318 if err != nil { 319 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 320 } 321 if session == nil { 322 return fmt.Errorf("no session found for repoDID: %s", repoDID) 323 } 324 325 session, err = ss.op.RefreshIfNeeded(session) 326 if err != nil { 327 return fmt.Errorf("could not refresh session for repoDID: %w", err) 328 } 329 330 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 331 if err != nil { 332 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 333 } 334 lsv, err := ls.ToLivestreamView() 335 if err != nil { 336 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 337 } 338 339 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 340 if !ok { 341 return fmt.Errorf("livestream is not a streamplace livestream") 342 } 343 thumb := lsvr.Thumb 344 345 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 346 if err != nil { 347 return fmt.Errorf("could not get repo for repoDID: %w", err) 348 } 349 350 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 351 if !ok { 352 return fmt.Errorf("livestream is not a streamplace livestream") 353 } 354 355 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 356 357 if lsr.CanonicalUrl != nil { 358 canonicalUrl = *lsr.CanonicalUrl 359 } 360 361 actorStatusEmbed := bsky.ActorStatus_Embed{ 362 EmbedExternal: &bsky.EmbedExternal{ 363 External: &bsky.EmbedExternal_External{ 364 Title: lsr.Title, 365 Uri: canonicalUrl, 366 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 367 Thumb: thumb, 368 }, 369 }, 370 } 371 372 duration := int64(120) 373 status := bsky.ActorStatus{ 374 Status: "app.bsky.actor.status#live", 375 DurationMinutes: &duration, 376 Embed: &actorStatusEmbed, 377 CreatedAt: time.Now().Format(time.RFC3339), 378 } 379 380 client, err := ss.op.GetXrpcClient(session) 381 if err != nil { 382 return fmt.Errorf("could not get xrpc client: %w", err) 383 } 384 385 var swapRecord *string 386 getOutput := atproto.RepoGetRecord_Output{} 387 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 388 "repo": repoDID, 389 "collection": "app.bsky.actor.status", 390 "rkey": "self", 391 }, nil, &getOutput) 392 if err != nil { 393 xErr, ok := err.(*xrpc.Error) 394 if !ok { 395 return fmt.Errorf("could not get record: %w", err) 396 } 397 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 398 return fmt.Errorf("could not get record: %w", err) 399 } 400 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 401 } else { 402 log.Debug(ctx, "got record", "record", getOutput) 403 swapRecord = getOutput.Cid 404 } 405 406 inp := atproto.RepoPutRecord_Input{ 407 Collection: "app.bsky.actor.status", 408 Record: &lexutil.LexiconTypeDecoder{Val: &status}, 409 Rkey: "self", 410 Repo: repoDID, 411 SwapRecord: swapRecord, 412 } 413 out := atproto.RepoPutRecord_Output{} 414 415 ss.lastStatusCID = &out.Cid 416 417 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 418 if err != nil { 419 return fmt.Errorf("could not create record: %w", err) 420 } 421 log.Debug(ctx, "created status record", "out", out) 422 423 ss.lastStatus = time.Now() 424 425 return nil 426} 427 428func (ss *StreamSession) DeleteStatus(repoDID string) error { 429 // need a special extra context because the stream session context is already cancelled 430 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 431 ss.lastStatusLock.Lock() 432 defer ss.lastStatusLock.Unlock() 433 if ss.lastStatusCID == nil { 434 log.Debug(ctx, "no status cid to delete") 435 return nil 436 } 437 inp := atproto.RepoDeleteRecord_Input{ 438 Collection: "app.bsky.actor.status", 439 Rkey: "self", 440 Repo: repoDID, 441 } 442 inp.SwapRecord = ss.lastStatusCID 443 out := atproto.RepoDeleteRecord_Output{} 444 445 session, err := ss.statefulDB.GetSessionByDID(repoDID) 446 if err != nil { 447 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 448 } 449 if session == nil { 450 return fmt.Errorf("no session found for repoDID: %s", repoDID) 451 } 452 453 session, err = ss.op.RefreshIfNeeded(session) 454 if err != nil { 455 return fmt.Errorf("could not refresh session for repoDID: %w", err) 456 } 457 458 client, err := ss.op.GetXrpcClient(session) 459 if err != nil { 460 return fmt.Errorf("could not get xrpc client: %w", err) 461 } 462 463 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out) 464 if err != nil { 465 return fmt.Errorf("could not delete record: %w", err) 466 } 467 468 ss.lastStatusCID = nil 469 return nil 470} 471 472var originUpdateInterval = time.Second * 30 473 474func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error { 475 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 476 ss.lastOriginLock.Lock() 477 defer ss.lastOriginLock.Unlock() 478 if time.Since(ss.lastOriginTime) < originUpdateInterval { 479 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 480 return nil 481 } 482 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost) 483 origin := streamplace.BroadcastOrigin{ 484 Streamer: ss.repoDID, 485 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost), 486 Broadcaster: &broadcaster, 487 UpdatedAt: time.Now().UTC().Format(util.ISO8601), 488 IrohTicket: &ss.swarm.NodeTicket, 489 } 490 491 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 492 if err != nil { 493 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 494 } 495 if session == nil { 496 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 497 } 498 499 session, err = ss.op.RefreshIfNeeded(session) 500 if err != nil { 501 return fmt.Errorf("could not refresh session for repoDID: %w", err) 502 } 503 504 client, err := ss.op.GetXrpcClient(session) 505 if err != nil { 506 return fmt.Errorf("could not get xrpc client: %w", err) 507 } 508 509 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost) 510 511 var swapRecord *string 512 getOutput := atproto.RepoGetRecord_Output{} 513 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 514 "repo": ss.repoDID, 515 "collection": "place.stream.broadcast.origin", 516 "rkey": rkey, 517 }, nil, &getOutput) 518 if err != nil { 519 xErr, ok := err.(*xrpc.Error) 520 if !ok { 521 return fmt.Errorf("could not get record: %w", err) 522 } 523 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 524 return fmt.Errorf("could not get record: %w", err) 525 } 526 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 527 } else { 528 log.Debug(ctx, "got record", "record", getOutput) 529 swapRecord = getOutput.Cid 530 } 531 532 inp := atproto.RepoPutRecord_Input{ 533 Collection: "place.stream.broadcast.origin", 534 Record: &lexutil.LexiconTypeDecoder{Val: &origin}, 535 Rkey: rkey, 536 Repo: ss.repoDID, 537 SwapRecord: swapRecord, 538 } 539 out := atproto.RepoPutRecord_Output{} 540 541 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 542 if err != nil { 543 return fmt.Errorf("could not create record: %w", err) 544 } 545 546 ss.lastOriginTime = time.Now() 547 return nil 548} 549 550func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 551 rs, err := renditions.GenerateRenditions(spseg) 552 if err != nil { 553 return fmt.Errorf("failed to generated renditions: %w", err) 554 } 555 556 if ss.lp == nil { 557 var err error 558 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 559 if err != nil { 560 return err 561 } 562 563 } 564 spmetrics.TranscodeAttemptsTotal.Inc() 565 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 566 if err != nil { 567 spmetrics.TranscodeErrorsTotal.Inc() 568 return err 569 } 570 if len(rs) != len(segs) { 571 spmetrics.TranscodeErrorsTotal.Inc() 572 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 573 } 574 spmetrics.TranscodeSuccessesTotal.Inc() 575 aqt, err := aqtime.FromString(spseg.StartTime) 576 if err != nil { 577 return err 578 } 579 for i, seg := range segs { 580 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 581 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 582 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 583 if err != nil { 584 return fmt.Errorf("failed to create transcoded segment file: %w", err) 585 } 586 defer fd.Close() 587 _, err = fd.Write(seg) 588 if err != nil { 589 return fmt.Errorf("failed to write transcoded segment file: %w", err) 590 } 591 ss.Go(ctx, func() error { 592 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 593 Filepath: fd.Name(), 594 Data: seg, 595 }) 596 }) 597 598 } 599 return nil 600} 601 602func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 603 ss.Go(ctx, func() error { 604 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 605 }) 606 ss.Go(ctx, func() error { 607 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 608 }) 609 return nil 610} 611 612func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 613 packet, err := media.Packetize(ctx, seg) 614 if err != nil { 615 return fmt.Errorf("failed to packetize segment: %w", err) 616 } 617 seg.PacketizedData = packet 618 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 619 return nil 620} 621 622func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 623 buf := bytes.Buffer{} 624 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 625 if err != nil { 626 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 627 } 628 // newSeg := &streamplace.Segment{ 629 // LexiconTypeID: "place.stream.segment", 630 // Id: spseg.Id, 631 // Creator: spseg.Creator, 632 // StartTime: spseg.StartTime, 633 // Duration: &dur, 634 // Audio: spseg.Audio, 635 // Video: spseg.Video, 636 // SigningKey: spseg.SigningKey, 637 // } 638 aqt, err := aqtime.FromString(spseg.StartTime) 639 if err != nil { 640 return fmt.Errorf("failed to parse segment start time: %w", err) 641 } 642 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 643 rend, err := ss.hls.GetRendition(rendition) 644 if err != nil { 645 return fmt.Errorf("failed to get rendition: %w", err) 646 } 647 if err := rend.NewSegment(&media.Segment{ 648 Buf: &buf, 649 Duration: time.Duration(dur), 650 Time: aqt.Time(), 651 }); err != nil { 652 return fmt.Errorf("failed to create new segment: %w", err) 653 } 654 655 return nil 656}