Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/buffer-thumbnails 192 lines 5.1 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "math" 8 "strings" 9 "sync" 10 "time" 11 12 "github.com/google/uuid" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/renditions" 15) 16 17// how many segments are served in the live playlist? 18const LivePlaylistSize = 8 19 20// how long should we keep old segments around? 21const RetainSegmentSize = LivePlaylistSize * 3 22 23const IndexM3U8 = "index.m3u8" 24 25type Segment struct { 26 MSN uint64 // media sequence number 27 Duration time.Duration 28 Buf *bytes.Buffer 29 Time time.Time 30 Closed bool 31 StartTS *uint64 32 EndTS *uint64 33} 34 35type M3U8 struct { 36 curSeg uint64 37 pendingSegments []*Segment 38 waits []chan struct{} 39 renditions []*M3U8Rendition 40} 41 42type M3U8Rendition struct { 43 Rendition renditions.Rendition 44 Segments []*Segment 45 SegmentLock sync.RWMutex 46 MSN uint64 47} 48 49func NewM3U8(renditions renditions.Renditions) *M3U8 { 50 rends := []*M3U8Rendition{} 51 for _, r := range renditions { 52 mr := &M3U8Rendition{ 53 Rendition: r, 54 } 55 rends = append(rends, mr) 56 } 57 return &M3U8{ 58 curSeg: 0, 59 renditions: rends, 60 } 61} 62 63func (r *M3U8Rendition) GetMediaLine(session string) string { 64 // m.waitForStart() 65 lines := []string{} 66 lines = append(lines, "#EXTM3U") 67 lines = append(lines, fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d", r.Rendition.Bitrate, r.Rendition.Width, r.Rendition.Height)) 68 lines = append(lines, fmt.Sprintf("%s/%s?session=%s", r.Rendition.Name, IndexM3U8, session)) 69 return strings.Join(lines, "\n") 70} 71 72func (r *M3U8Rendition) GetPlaylist(session string) []byte { 73 if session == "" { 74 uu, err := uuid.NewV7() 75 if err != nil { 76 panic(err) 77 } 78 session = uu.String() 79 } 80 r.SegmentLock.RLock() 81 defer r.SegmentLock.RUnlock() 82 // m.waitForStart() 83 lines := []string{} 84 lines = append(lines, "#EXTM3U") 85 lines = append(lines, "#EXT-X-VERSION:3") 86 startWith := len(r.Segments) - LivePlaylistSize 87 if startWith < 0 { 88 startWith = 0 89 } 90 if len(r.Segments) == 0 { 91 return []byte{} 92 } 93 firstSeg := r.Segments[startWith] 94 lastSeg := r.Segments[len(r.Segments)-1] 95 targetDuration := int64(math.Round(lastSeg.Duration.Seconds())) 96 lines = append(lines, fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d", firstSeg.MSN)) 97 lines = append(lines, fmt.Sprintf("#EXT-X-DISCONTINUITY-SEQUENCE:%d", firstSeg.MSN)) 98 lines = append(lines, fmt.Sprintf("#EXT-X-TARGETDURATION:%d", targetDuration+1)) 99 lines = append(lines, "#EXT-X-INDEPENDENT-SEGMENTS") 100 lines = append(lines, "") 101 lastSegments := r.Segments[startWith:] 102 for _, seg := range lastSegments { 103 dur := seg.Duration 104 lines = append(lines, "#EXT-X-DISCONTINUITY") 105 lines = append(lines, fmt.Sprintf("#EXT-X-PROGRAM-DATE-TIME:%s", seg.Time.Format(time.RFC3339Nano))) 106 lines = append(lines, fmt.Sprintf("#EXTINF:%f,", dur.Seconds())) 107 lines = append(lines, fmt.Sprintf("segment%05d.ts?session=%s", seg.MSN, session)) 108 } 109 lines = append(lines, "") 110 return []byte(strings.Join(lines, "\n")) 111} 112 113func (r *M3U8Rendition) GetSegment(session string, filename string) []byte { 114 r.SegmentLock.RLock() 115 defer r.SegmentLock.RUnlock() 116 for _, seg := range r.Segments { 117 if fmt.Sprintf("segment%05d.ts", seg.MSN) == filename { 118 return seg.Buf.Bytes() 119 } 120 } 121 return nil 122} 123 124func (m *M3U8) GetMultivariantPlaylist(rendition string) []byte { 125 uu, err := uuid.NewV7() 126 if err != nil { 127 panic(err) 128 } 129 // m.waitForStart() 130 lines := []string{} 131 lines = append(lines, "#EXTM3U") 132 for _, r := range m.renditions { 133 if rendition == "" || r.Rendition.Name == rendition { 134 lines = append(lines, r.GetMediaLine(uu.String())) 135 } 136 } 137 return []byte(strings.Join(lines, "\n")) 138} 139 140// needs to handle: 141// - index.m3u8 142// - 720p/stream.m3u8 143// - 720p/segment00015.ts 144func (m *M3U8) GetFile(str string, session string, rendition string) ([]byte, error) { 145 str = strings.TrimPrefix(str, "/") 146 if str == IndexM3U8 { 147 return m.GetMultivariantPlaylist(rendition), nil 148 } 149 parts := strings.Split(str, "/") 150 if len(parts) != 2 { 151 return nil, fmt.Errorf("invalid path") 152 } 153 rStr := parts[0] 154 fStr := parts[1] 155 rend, err := m.GetRendition(rStr) 156 if err != nil { 157 return nil, err 158 } 159 log.Debug(context.Background(), "m3u8 get file", "str", str, "session", session, "rend", rStr, "file", fStr) 160 if fStr == IndexM3U8 { 161 return rend.GetPlaylist(session), nil 162 } 163 seg := rend.GetSegment(session, fStr) 164 if seg == nil { 165 return nil, fmt.Errorf("segment not found") 166 } 167 return seg, nil 168} 169 170func (r *M3U8Rendition) NewSegment(seg *Segment) error { 171 r.SegmentLock.Lock() 172 defer r.SegmentLock.Unlock() 173 seg.MSN = r.MSN 174 r.MSN += 1 175 r.Segments = append(r.Segments, seg) 176 if len(r.Segments) > RetainSegmentSize { 177 // Calculate how many segments to remove 178 removeCount := len(r.Segments) - RetainSegmentSize 179 // Remove the oldest segments (from the front of the slice) 180 r.Segments = r.Segments[removeCount:] 181 } 182 return nil 183} 184 185func (m *M3U8) GetRendition(rendition string) (*M3U8Rendition, error) { 186 for _, r := range m.renditions { 187 if r.Rendition.Name == rendition { 188 return r, nil 189 } 190 } 191 return nil, fmt.Errorf("rendition not found") 192}