Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "math"
8 "strings"
9 "sync"
10 "time"
11
12 "github.com/google/uuid"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/renditions"
15)
16
17// how many segments are served in the live playlist?
18const LivePlaylistSize = 8
19
20// how long should we keep old segments around?
21const RetainSegmentSize = LivePlaylistSize * 3
22
23const IndexM3U8 = "index.m3u8"
24
25type Segment struct {
26 MSN uint64 // media sequence number
27 Duration time.Duration
28 Buf *bytes.Buffer
29 Time time.Time
30 Closed bool
31 StartTS *uint64
32 EndTS *uint64
33}
34
35type M3U8 struct {
36 curSeg uint64
37 pendingSegments []*Segment
38 waits []chan struct{}
39 renditions []*M3U8Rendition
40}
41
42type M3U8Rendition struct {
43 Rendition renditions.Rendition
44 Segments []*Segment
45 SegmentLock sync.RWMutex
46 MSN uint64
47}
48
49func NewM3U8(renditions renditions.Renditions) *M3U8 {
50 rends := []*M3U8Rendition{}
51 for _, r := range renditions {
52 mr := &M3U8Rendition{
53 Rendition: r,
54 }
55 rends = append(rends, mr)
56 }
57 return &M3U8{
58 curSeg: 0,
59 renditions: rends,
60 }
61}
62
63func (r *M3U8Rendition) GetMediaLine(session string) string {
64 // m.waitForStart()
65 lines := []string{}
66 lines = append(lines, "#EXTM3U")
67 lines = append(lines, fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d", r.Rendition.Bitrate, r.Rendition.Width, r.Rendition.Height))
68 lines = append(lines, fmt.Sprintf("%s/%s?session=%s", r.Rendition.Name, IndexM3U8, session))
69 return strings.Join(lines, "\n")
70}
71
72func (r *M3U8Rendition) GetPlaylist(session string) []byte {
73 if session == "" {
74 uu, err := uuid.NewV7()
75 if err != nil {
76 panic(err)
77 }
78 session = uu.String()
79 }
80 r.SegmentLock.RLock()
81 defer r.SegmentLock.RUnlock()
82 // m.waitForStart()
83 lines := []string{}
84 lines = append(lines, "#EXTM3U")
85 lines = append(lines, "#EXT-X-VERSION:3")
86 startWith := len(r.Segments) - LivePlaylistSize
87 if startWith < 0 {
88 startWith = 0
89 }
90 if len(r.Segments) == 0 {
91 return []byte{}
92 }
93 firstSeg := r.Segments[startWith]
94 lastSeg := r.Segments[len(r.Segments)-1]
95 targetDuration := int64(math.Round(lastSeg.Duration.Seconds()))
96 lines = append(lines, fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d", firstSeg.MSN))
97 lines = append(lines, fmt.Sprintf("#EXT-X-DISCONTINUITY-SEQUENCE:%d", firstSeg.MSN))
98 lines = append(lines, fmt.Sprintf("#EXT-X-TARGETDURATION:%d", targetDuration+1))
99 lines = append(lines, "#EXT-X-INDEPENDENT-SEGMENTS")
100 lines = append(lines, "")
101 lastSegments := r.Segments[startWith:]
102 for _, seg := range lastSegments {
103 dur := seg.Duration
104 lines = append(lines, "#EXT-X-DISCONTINUITY")
105 lines = append(lines, fmt.Sprintf("#EXT-X-PROGRAM-DATE-TIME:%s", seg.Time.Format(time.RFC3339Nano)))
106 lines = append(lines, fmt.Sprintf("#EXTINF:%f,", dur.Seconds()))
107 lines = append(lines, fmt.Sprintf("segment%05d.ts?session=%s", seg.MSN, session))
108 }
109 lines = append(lines, "")
110 return []byte(strings.Join(lines, "\n"))
111}
112
113func (r *M3U8Rendition) GetSegment(session string, filename string) []byte {
114 r.SegmentLock.RLock()
115 defer r.SegmentLock.RUnlock()
116 for _, seg := range r.Segments {
117 if fmt.Sprintf("segment%05d.ts", seg.MSN) == filename {
118 return seg.Buf.Bytes()
119 }
120 }
121 return nil
122}
123
124func (m *M3U8) GetMultivariantPlaylist(rendition string) []byte {
125 uu, err := uuid.NewV7()
126 if err != nil {
127 panic(err)
128 }
129 // m.waitForStart()
130 lines := []string{}
131 lines = append(lines, "#EXTM3U")
132 for _, r := range m.renditions {
133 if rendition == "" || r.Rendition.Name == rendition {
134 lines = append(lines, r.GetMediaLine(uu.String()))
135 }
136 }
137 return []byte(strings.Join(lines, "\n"))
138}
139
140// needs to handle:
141// - index.m3u8
142// - 720p/stream.m3u8
143// - 720p/segment00015.ts
144func (m *M3U8) GetFile(str string, session string, rendition string) ([]byte, error) {
145 str = strings.TrimPrefix(str, "/")
146 if str == IndexM3U8 {
147 return m.GetMultivariantPlaylist(rendition), nil
148 }
149 parts := strings.Split(str, "/")
150 if len(parts) != 2 {
151 return nil, fmt.Errorf("invalid path")
152 }
153 rStr := parts[0]
154 fStr := parts[1]
155 rend, err := m.GetRendition(rStr)
156 if err != nil {
157 return nil, err
158 }
159 log.Debug(context.Background(), "m3u8 get file", "str", str, "session", session, "rend", rStr, "file", fStr)
160 if fStr == IndexM3U8 {
161 return rend.GetPlaylist(session), nil
162 }
163 seg := rend.GetSegment(session, fStr)
164 if seg == nil {
165 return nil, fmt.Errorf("segment not found")
166 }
167 return seg, nil
168}
169
170func (r *M3U8Rendition) NewSegment(seg *Segment) error {
171 r.SegmentLock.Lock()
172 defer r.SegmentLock.Unlock()
173 seg.MSN = r.MSN
174 r.MSN += 1
175 r.Segments = append(r.Segments, seg)
176 if len(r.Segments) > RetainSegmentSize {
177 // Calculate how many segments to remove
178 removeCount := len(r.Segments) - RetainSegmentSize
179 // Remove the oldest segments (from the front of the slice)
180 r.Segments = r.Segments[removeCount:]
181 }
182 return nil
183}
184
185func (m *M3U8) GetRendition(rendition string) (*M3U8Rendition, error) {
186 for _, r := range m.renditions {
187 if r.Rendition.Name == rendition {
188 return r, nil
189 }
190 }
191 return nil, fmt.Errorf("rendition not found")
192}