Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "path/filepath"
10 "slices"
11
12 "github.com/Eyevinn/mp4ff/mp4"
13 "stream.place/streamplace/pkg/aqtime"
14 "stream.place/streamplace/pkg/config"
15 "stream.place/streamplace/pkg/log"
16)
17
18var MaxSegmentTries = 10
19
20// run this segment through the segmenter/splitter until it comes out the
21// same, meaning we can cleanly get it in and out of a concatenated mp4 file
22func ConvergeSegment(ctx context.Context, cli *config.CLI, bs []byte, now int64, streamer string, doH264Parse bool) ([]byte, error) {
23 cli.DumpDebugSegment(ctx, fmt.Sprintf("converge-segment-%s.mp4", streamer), bytes.NewReader(bs))
24
25 log.Debug(ctx, "parsing segment media data", "size", len(bs))
26 _, err := ParseSegmentMediaData(ctx, bs)
27 if err != nil {
28 return nil, fmt.Errorf("error parsing segment media data: %w", err)
29 }
30 // rewrite segmented audio timestamps to work around bug where the last
31 // audio segment gets no duration and then gets dropped upon rewrite
32 smearedBuf := &bytes.Buffer{}
33 log.Debug(ctx, "rewriting audio timestamps", "size", len(bs))
34 err = RewriteAudioTimestamps(ctx, cli, bytes.NewReader(bs), smearedBuf, false)
35 if err != nil {
36 return nil, fmt.Errorf("error rewriting audio timestamps: %w", err)
37 }
38 bs = smearedBuf.Bytes()
39 log.Debug(ctx, "converging segment", "size", len(bs))
40
41 previousBs := []byte{}
42 currentBs := bs
43 i := 0
44 for i = 0; i <= MaxSegmentTries; i++ {
45 if slices.Compare(previousBs, currentBs) == 0 {
46 break
47 }
48 if cli.SegmentDebugDir != "" {
49 mydir := filepath.Join(cli.SegmentDebugDir, streamer)
50 err := os.MkdirAll(mydir, 0755)
51 if err != nil {
52 return nil, fmt.Errorf("failed to create debug directory: %w", err)
53 }
54 aqt := aqtime.FromMillis(now)
55 outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-attempt-%03d.mp4", aqt.FileSafeString(), i))
56 err = os.WriteFile(outFile, currentBs, 0644)
57 if err != nil {
58 return nil, fmt.Errorf("failed to write debug file: %w", err)
59 }
60 log.Log(ctx, "wrote debug file", "path", outFile)
61 }
62 buf := bytes.Buffer{}
63 err := CombineSegmentsUnsigned(ctx, []io.ReadSeeker{bytes.NewReader(currentBs)}, &buf, doH264Parse)
64 if err != nil {
65 return nil, fmt.Errorf("failed to attempt segment convergence: %w", err)
66 }
67 previousBs = currentBs
68 currentBs = buf.Bytes()
69 mp4file, err := mp4.DecodeFile(bytes.NewReader(currentBs))
70 if err != nil {
71 return nil, fmt.Errorf("failed to decode segment: %w", err)
72 }
73 btrt := mp4file.Moov.Trak.Mdia.Minf.Stbl.Stsd.AvcX.Btrt
74 btrt.AvgBitrate = 0
75 btrt.MaxBitrate = 0
76 // log.Log(ctx, "btrt", "average bitrate", btrt.AvgBitrate, "max bitrate", btrt.MaxBitrate)
77 encodedBuf := bytes.Buffer{}
78 err = mp4file.Encode(&encodedBuf)
79 if err != nil {
80 return nil, fmt.Errorf("failed to encode segment: %w", err)
81 }
82 currentBs = encodedBuf.Bytes()
83 }
84 if slices.Compare(previousBs, currentBs) != 0 {
85 return nil, fmt.Errorf("failed to converge segment after %d tries", MaxSegmentTries)
86 }
87 bs = currentBs
88 log.Debug(ctx, "converged segments", "tries", i, "size", len(bs))
89 return currentBs, nil
90}