Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/streamplace/oatproxy/pkg/oatproxy"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/aqtime"
18 "stream.place/streamplace/pkg/bus"
19 "stream.place/streamplace/pkg/config"
20 "stream.place/streamplace/pkg/livepeer"
21 "stream.place/streamplace/pkg/log"
22 "stream.place/streamplace/pkg/media"
23 "stream.place/streamplace/pkg/model"
24 "stream.place/streamplace/pkg/renditions"
25 "stream.place/streamplace/pkg/replication/iroh_replicator"
26 "stream.place/streamplace/pkg/spmetrics"
27 "stream.place/streamplace/pkg/statedb"
28 "stream.place/streamplace/pkg/streamplace"
29 "stream.place/streamplace/pkg/thumbnail"
30)
31
32type StreamSession struct {
33 mm *media.MediaManager
34 mod model.Model
35 cli *config.CLI
36 bus *bus.Bus
37 op *oatproxy.OATProxy
38 hls *media.M3U8
39 lp *livepeer.LivepeerSession
40 repoDID string
41 segmentChan chan struct{}
42 lastStatus time.Time
43 lastStatusCID *string
44 lastStatusLock sync.Mutex
45 lastOriginTime time.Time
46 lastOriginLock sync.Mutex
47 g *errgroup.Group
48 started chan struct{}
49 ctx context.Context
50 packets []bus.PacketizedSegment
51 statefulDB *statedb.StatefulDB
52 swarm *iroh_replicator.IrohSwarm
53}
54
55func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
56 ctx, cancel := context.WithCancel(ctx)
57 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc()
58 ss.g, ctx = errgroup.WithContext(ctx)
59 sid := livepeer.RandomTrailer(8)
60 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID)
61 ss.ctx = ctx
62 log.Log(ctx, "starting stream session")
63 defer cancel()
64 spseg, err := notif.Segment.ToStreamplaceSegment()
65 if err != nil {
66 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
67 }
68 var allRenditions renditions.Renditions
69
70 if ss.cli.LivepeerGatewayURL != "" {
71 allRenditions, err = renditions.GenerateRenditions(spseg)
72 } else {
73 allRenditions = []renditions.Rendition{}
74 }
75 if err != nil {
76 return err
77 }
78 if spseg.Duration == nil {
79 return fmt.Errorf("segment duration is required to calculate bitrate")
80 }
81 dur := time.Duration(*spseg.Duration)
82 byteLen := len(notif.Data)
83 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
84 sourceRendition := renditions.Rendition{
85 Name: "source",
86 Bitrate: bitrate,
87 Width: spseg.Video[0].Width,
88 Height: spseg.Video[0].Height,
89 }
90 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
91 ss.hls = media.NewM3U8(allRenditions)
92
93 // for _, r := range allRenditions {
94 // g.Go(func() error {
95 // for {
96 // if ctx.Err() != nil {
97 // return nil
98 // }
99 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
100 // if ctx.Err() != nil {
101 // return nil
102 // }
103 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
104 // time.Sleep(time.Second * 5)
105 // }
106 // })
107 // }
108
109 close(ss.started)
110
111 for {
112 select {
113 case <-ss.segmentChan:
114 // reset timer
115 case <-ctx.Done():
116 return ss.g.Wait()
117 // case <-time.After(time.Minute * 1):
118 case <-time.After(time.Second * 60):
119 log.Log(ctx, "no new segments for 1 minute, shutting down")
120 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec()
121 for _, r := range allRenditions {
122 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
123 }
124 if notif.Local {
125 ss.Go(ctx, func() error {
126 return ss.DeleteStatus(spseg.Creator)
127 })
128 }
129 cancel()
130 }
131 }
132}
133
134// Execute a goroutine in the context of the stream session. Errors are
135// non-fatal; if you actually want to melt the universe on an error you
136// should panic()
137func (ss *StreamSession) Go(ctx context.Context, f func() error) {
138 <-ss.started
139 ss.g.Go(func() error {
140 err := f()
141 if err != nil {
142 log.Error(ctx, "error in stream_session goroutine", "error", err)
143 }
144 return nil
145 })
146}
147
148func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
149 <-ss.started
150 go func() {
151 select {
152 case <-ss.ctx.Done():
153 return
154 case ss.segmentChan <- struct{}{}:
155 }
156 }()
157 aqt := aqtime.FromTime(notif.Segment.StartTime)
158 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
159 notif.Segment.MediaData.Size = len(notif.Data)
160 err := ss.mod.CreateSegment(notif.Segment)
161 if err != nil {
162 return fmt.Errorf("could not add segment to database: %w", err)
163 }
164 spseg, err := notif.Segment.ToStreamplaceSegment()
165 if err != nil {
166 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
167 }
168
169 ss.bus.Publish(spseg.Creator, spseg)
170 ss.Go(ctx, func() error {
171 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
172 Filepath: notif.Segment.ID,
173 Data: notif.Data,
174 })
175 })
176
177 if ss.cli.Thumbnail {
178 ss.Go(ctx, func() error {
179 return ss.Thumbnail(ctx, spseg.Creator, notif)
180 })
181 }
182
183 if notif.Local {
184 ss.Go(ctx, func() error {
185 return ss.UpdateStatus(ctx, spseg.Creator)
186 })
187
188 ss.Go(ctx, func() error {
189 return ss.UpdateBroadcastOrigin(ctx)
190 })
191 }
192
193 if ss.cli.LivepeerGatewayURL != "" {
194 ss.Go(ctx, func() error {
195 start := time.Now()
196 err := ss.Transcode(ctx, spseg, notif.Data)
197 took := time.Since(start)
198 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
199 return err
200 })
201 }
202
203 // trigger a notification blast if this is a new livestream
204 if notif.Metadata.Livestream != nil {
205 ss.Go(ctx, func() error {
206 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator)
207 if err != nil {
208 return fmt.Errorf("failed to get repo: %w", err)
209 }
210 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator)
211 if err != nil {
212 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
213 }
214 if livestreamModel == nil {
215 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator)
216 return nil
217 }
218 lsv, err := livestreamModel.ToLivestreamView()
219 if err != nil {
220 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err)
221 }
222 if !shouldNotify(lsv) {
223 log.Warn(ctx, "is not set to notify", "repoDID", spseg.Creator)
224 return nil
225 }
226 task := &statedb.NotificationTask{
227 Livestream: lsv,
228 PDSURL: r.PDS,
229 }
230 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator)
231 if err != nil {
232 return fmt.Errorf("failed to get chat profile: %w", err)
233 }
234 if cp != nil {
235 spcp, err := cp.ToStreamplaceChatProfile()
236 if err != nil {
237 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
238 }
239 task.ChatProfile = spcp
240 }
241
242 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri)))
243 if err != nil {
244 log.Error(ctx, "failed to enqueue notification task", "err", err)
245 }
246 return ss.UpdateStatus(ctx, spseg.Creator)
247 })
248 } else {
249 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator)
250 }
251
252 return nil
253}
254
255func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool {
256 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
257 if !ok {
258 return true
259 }
260 if lsvr.NotificationSettings == nil {
261 return true
262 }
263 settings := lsvr.NotificationSettings
264 if settings.PushNotification == nil {
265 return true
266 }
267 return *settings.PushNotification
268}
269
270func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
271 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
272 locked := lock.TryLock()
273 if !locked {
274 // we're already generating a thumbnail for this user, skip
275 return nil
276 }
277 defer lock.Unlock()
278 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
279 if err != nil {
280 return err
281 }
282 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
283 // we have a thumbnail <60sec old, skip generating a new one
284 return nil
285 }
286 r := bytes.NewReader(not.Data)
287 aqt := aqtime.FromTime(not.Segment.StartTime)
288 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
289 if err != nil {
290 return err
291 }
292 defer fd.Close()
293 err = media.Thumbnail(ctx, r, fd, "jpeg")
294 if err != nil {
295 return err
296 }
297 thumb := &model.Thumbnail{
298 Format: "jpeg",
299 SegmentID: not.Segment.ID,
300 }
301 err = ss.mod.CreateThumbnail(thumb)
302 if err != nil {
303 return err
304 }
305 return nil
306}
307
308func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
309 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
310 ss.lastStatusLock.Lock()
311 defer ss.lastStatusLock.Unlock()
312 if time.Since(ss.lastStatus) < time.Minute {
313 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
314 return nil
315 }
316
317 session, err := ss.statefulDB.GetSessionByDID(repoDID)
318 if err != nil {
319 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
320 }
321 if session == nil {
322 return fmt.Errorf("no session found for repoDID: %s", repoDID)
323 }
324
325 session, err = ss.op.RefreshIfNeeded(session)
326 if err != nil {
327 return fmt.Errorf("could not refresh session for repoDID: %w", err)
328 }
329
330 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
331 if err != nil {
332 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
333 }
334 lsv, err := ls.ToLivestreamView()
335 if err != nil {
336 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
337 }
338
339 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
340 if !ok {
341 return fmt.Errorf("livestream is not a streamplace livestream")
342 }
343 thumb := lsvr.Thumb
344
345 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
346 if err != nil {
347 return fmt.Errorf("could not get repo for repoDID: %w", err)
348 }
349
350 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
351 if !ok {
352 return fmt.Errorf("livestream is not a streamplace livestream")
353 }
354
355 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
356
357 if lsr.CanonicalUrl != nil {
358 canonicalUrl = *lsr.CanonicalUrl
359 }
360
361 actorStatusEmbed := bsky.ActorStatus_Embed{
362 EmbedExternal: &bsky.EmbedExternal{
363 External: &bsky.EmbedExternal_External{
364 Title: lsr.Title,
365 Uri: canonicalUrl,
366 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
367 Thumb: thumb,
368 },
369 },
370 }
371
372 duration := int64(120)
373 status := bsky.ActorStatus{
374 Status: "app.bsky.actor.status#live",
375 DurationMinutes: &duration,
376 Embed: &actorStatusEmbed,
377 CreatedAt: time.Now().Format(time.RFC3339),
378 }
379
380 client, err := ss.op.GetXrpcClient(session)
381 if err != nil {
382 return fmt.Errorf("could not get xrpc client: %w", err)
383 }
384
385 var swapRecord *string
386 getOutput := atproto.RepoGetRecord_Output{}
387 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
388 "repo": repoDID,
389 "collection": "app.bsky.actor.status",
390 "rkey": "self",
391 }, nil, &getOutput)
392 if err != nil {
393 xErr, ok := err.(*xrpc.Error)
394 if !ok {
395 return fmt.Errorf("could not get record: %w", err)
396 }
397 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
398 return fmt.Errorf("could not get record: %w", err)
399 }
400 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
401 } else {
402 log.Debug(ctx, "got record", "record", getOutput)
403 swapRecord = getOutput.Cid
404 }
405
406 inp := atproto.RepoPutRecord_Input{
407 Collection: "app.bsky.actor.status",
408 Record: &lexutil.LexiconTypeDecoder{Val: &status},
409 Rkey: "self",
410 Repo: repoDID,
411 SwapRecord: swapRecord,
412 }
413 out := atproto.RepoPutRecord_Output{}
414
415 ss.lastStatusCID = &out.Cid
416
417 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
418 if err != nil {
419 return fmt.Errorf("could not create record: %w", err)
420 }
421 log.Debug(ctx, "created status record", "out", out)
422
423 ss.lastStatus = time.Now()
424
425 return nil
426}
427
428func (ss *StreamSession) DeleteStatus(repoDID string) error {
429 // need a special extra context because the stream session context is already cancelled
430 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID)
431 ss.lastStatusLock.Lock()
432 defer ss.lastStatusLock.Unlock()
433 if ss.lastStatusCID == nil {
434 log.Debug(ctx, "no status cid to delete")
435 return nil
436 }
437 inp := atproto.RepoDeleteRecord_Input{
438 Collection: "app.bsky.actor.status",
439 Rkey: "self",
440 Repo: repoDID,
441 }
442 inp.SwapRecord = ss.lastStatusCID
443 out := atproto.RepoDeleteRecord_Output{}
444
445 session, err := ss.statefulDB.GetSessionByDID(repoDID)
446 if err != nil {
447 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
448 }
449 if session == nil {
450 return fmt.Errorf("no session found for repoDID: %s", repoDID)
451 }
452
453 session, err = ss.op.RefreshIfNeeded(session)
454 if err != nil {
455 return fmt.Errorf("could not refresh session for repoDID: %w", err)
456 }
457
458 client, err := ss.op.GetXrpcClient(session)
459 if err != nil {
460 return fmt.Errorf("could not get xrpc client: %w", err)
461 }
462
463 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out)
464 if err != nil {
465 return fmt.Errorf("could not delete record: %w", err)
466 }
467
468 ss.lastStatusCID = nil
469 return nil
470}
471
472var originUpdateInterval = time.Second * 30
473
474func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error {
475 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
476 ss.lastOriginLock.Lock()
477 defer ss.lastOriginLock.Unlock()
478 if time.Since(ss.lastOriginTime) < originUpdateInterval {
479 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
480 return nil
481 }
482 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost)
483 origin := streamplace.BroadcastOrigin{
484 Streamer: ss.repoDID,
485 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost),
486 Broadcaster: &broadcaster,
487 UpdatedAt: time.Now().UTC().Format(util.ISO8601),
488 IrohTicket: &ss.swarm.NodeTicket,
489 }
490
491 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
492 if err != nil {
493 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
494 }
495 if session == nil {
496 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
497 }
498
499 session, err = ss.op.RefreshIfNeeded(session)
500 if err != nil {
501 return fmt.Errorf("could not refresh session for repoDID: %w", err)
502 }
503
504 client, err := ss.op.GetXrpcClient(session)
505 if err != nil {
506 return fmt.Errorf("could not get xrpc client: %w", err)
507 }
508
509 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost)
510
511 var swapRecord *string
512 getOutput := atproto.RepoGetRecord_Output{}
513 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
514 "repo": ss.repoDID,
515 "collection": "place.stream.broadcast.origin",
516 "rkey": rkey,
517 }, nil, &getOutput)
518 if err != nil {
519 xErr, ok := err.(*xrpc.Error)
520 if !ok {
521 return fmt.Errorf("could not get record: %w", err)
522 }
523 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
524 return fmt.Errorf("could not get record: %w", err)
525 }
526 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
527 } else {
528 log.Debug(ctx, "got record", "record", getOutput)
529 swapRecord = getOutput.Cid
530 }
531
532 inp := atproto.RepoPutRecord_Input{
533 Collection: "place.stream.broadcast.origin",
534 Record: &lexutil.LexiconTypeDecoder{Val: &origin},
535 Rkey: rkey,
536 Repo: ss.repoDID,
537 SwapRecord: swapRecord,
538 }
539 out := atproto.RepoPutRecord_Output{}
540
541 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
542 if err != nil {
543 return fmt.Errorf("could not create record: %w", err)
544 }
545
546 ss.lastOriginTime = time.Now()
547 return nil
548}
549
550func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
551 rs, err := renditions.GenerateRenditions(spseg)
552 if err != nil {
553 return fmt.Errorf("failed to generated renditions: %w", err)
554 }
555
556 if ss.lp == nil {
557 var err error
558 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
559 if err != nil {
560 return err
561 }
562
563 }
564 spmetrics.TranscodeAttemptsTotal.Inc()
565 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
566 if err != nil {
567 spmetrics.TranscodeErrorsTotal.Inc()
568 return err
569 }
570 if len(rs) != len(segs) {
571 spmetrics.TranscodeErrorsTotal.Inc()
572 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
573 }
574 spmetrics.TranscodeSuccessesTotal.Inc()
575 aqt, err := aqtime.FromString(spseg.StartTime)
576 if err != nil {
577 return err
578 }
579 for i, seg := range segs {
580 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
581 log.Debug(ctx, "publishing segment", "rendition", rs[i])
582 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
583 if err != nil {
584 return fmt.Errorf("failed to create transcoded segment file: %w", err)
585 }
586 defer fd.Close()
587 _, err = fd.Write(seg)
588 if err != nil {
589 return fmt.Errorf("failed to write transcoded segment file: %w", err)
590 }
591 ss.Go(ctx, func() error {
592 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
593 Filepath: fd.Name(),
594 Data: seg,
595 })
596 })
597
598 }
599 return nil
600}
601
602func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
603 ss.Go(ctx, func() error {
604 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
605 })
606 ss.Go(ctx, func() error {
607 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
608 })
609 return nil
610}
611
612func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
613 packet, err := media.Packetize(ctx, seg)
614 if err != nil {
615 return fmt.Errorf("failed to packetize segment: %w", err)
616 }
617 seg.PacketizedData = packet
618 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
619 return nil
620}
621
622func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
623 buf := bytes.Buffer{}
624 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
625 if err != nil {
626 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
627 }
628 // newSeg := &streamplace.Segment{
629 // LexiconTypeID: "place.stream.segment",
630 // Id: spseg.Id,
631 // Creator: spseg.Creator,
632 // StartTime: spseg.StartTime,
633 // Duration: &dur,
634 // Audio: spseg.Audio,
635 // Video: spseg.Video,
636 // SigningKey: spseg.SigningKey,
637 // }
638 aqt, err := aqtime.FromString(spseg.StartTime)
639 if err != nil {
640 return fmt.Errorf("failed to parse segment start time: %w", err)
641 }
642 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
643 rend, err := ss.hls.GetRendition(rendition)
644 if err != nil {
645 return fmt.Errorf("failed to get rendition: %w", err)
646 }
647 if err := rend.NewSegment(&media.Segment{
648 Buf: &buf,
649 Duration: time.Duration(dur),
650 Time: aqt.Time(),
651 }); err != nil {
652 return fmt.Errorf("failed to create new segment: %w", err)
653 }
654
655 return nil
656}