Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/get-segments 95 lines 2.4 kB view raw
1package director 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "golang.org/x/sync/errgroup" 9 "stream.place/streamplace/pkg/bus" 10 "stream.place/streamplace/pkg/config" 11 "stream.place/streamplace/pkg/log" 12 "stream.place/streamplace/pkg/media" 13 "stream.place/streamplace/pkg/model" 14) 15 16// director is responsible for managing the lifecycle of a stream, making business 17// logic decisions about when to do things like 18// - size of the in-memory segment cache 19// - transcoding 20// - thumbnail generation 21 22type Director struct { 23 mm *media.MediaManager 24 mod model.Model 25 cli *config.CLI 26 bus *bus.Bus 27 streamSessions map[string]*StreamSession 28 streamSessionsMu sync.Mutex 29} 30 31func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus) *Director { 32 return &Director{ 33 mm: mm, 34 mod: mod, 35 cli: cli, 36 bus: bus, 37 streamSessions: make(map[string]*StreamSession), 38 streamSessionsMu: sync.Mutex{}, 39 } 40} 41 42func (d *Director) Start(ctx context.Context) error { 43 newSeg := d.mm.NewSegment() 44 ctx, cancel := context.WithCancel(ctx) 45 defer cancel() 46 g, ctx := errgroup.WithContext(ctx) 47 for { 48 select { 49 case <-ctx.Done(): 50 cancel() 51 return g.Wait() 52 case not := <-newSeg: 53 d.streamSessionsMu.Lock() 54 ss, ok := d.streamSessions[not.Segment.RepoDID] 55 if !ok { 56 ss = &StreamSession{ 57 hls: nil, 58 lp: nil, 59 repoDID: not.Segment.RepoDID, 60 mm: d.mm, 61 mod: d.mod, 62 cli: d.cli, 63 bus: d.bus, 64 segmentChan: make(chan struct{}), 65 } 66 d.streamSessions[not.Segment.RepoDID] = ss 67 g.Go(func() error { 68 err := ss.Start(ctx, not) 69 if err != nil { 70 log.Error(ctx, "could not start stream session", "error", err) 71 } 72 d.streamSessionsMu.Lock() 73 delete(d.streamSessions, not.Segment.RepoDID) 74 d.streamSessionsMu.Unlock() 75 return nil 76 }) 77 } 78 d.streamSessionsMu.Unlock() 79 err := ss.NewSegment(ctx, not) 80 if err != nil { 81 log.Error(ctx, "could not add segment to stream session", "error", err) 82 } 83 } 84 } 85} 86 87func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) { 88 d.streamSessionsMu.Lock() 89 defer d.streamSessionsMu.Unlock() 90 ss, ok := d.streamSessions[repoDID] 91 if !ok { 92 return nil, fmt.Errorf("stream session not found") 93 } 94 return ss.hls, nil 95}