Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/version-label 101 lines 2.7 kB view raw
1package director 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "github.com/streamplace/oatproxy/pkg/oatproxy" 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" 15) 16 17// director is responsible for managing the lifecycle of a stream, making business 18// logic decisions about when to do things like 19// - size of the in-memory segment cache 20// - transcoding 21// - thumbnail generation 22 23type Director struct { 24 mm *media.MediaManager 25 mod model.Model 26 cli *config.CLI 27 bus *bus.Bus 28 streamSessions map[string]*StreamSession 29 streamSessionsMu sync.Mutex 30 op *oatproxy.OATProxy 31} 32 33func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy) *Director { 34 return &Director{ 35 mm: mm, 36 mod: mod, 37 cli: cli, 38 bus: bus, 39 streamSessions: make(map[string]*StreamSession), 40 streamSessionsMu: sync.Mutex{}, 41 op: op, 42 } 43} 44 45func (d *Director) Start(ctx context.Context) error { 46 newSeg := d.mm.NewSegment() 47 ctx, cancel := context.WithCancel(ctx) 48 defer cancel() 49 g, ctx := errgroup.WithContext(ctx) 50 for { 51 select { 52 case <-ctx.Done(): 53 cancel() 54 return g.Wait() 55 case not := <-newSeg: 56 d.streamSessionsMu.Lock() 57 ss, ok := d.streamSessions[not.Segment.RepoDID] 58 if !ok { 59 ss = &StreamSession{ 60 hls: nil, 61 lp: nil, 62 repoDID: not.Segment.RepoDID, 63 mm: d.mm, 64 mod: d.mod, 65 cli: d.cli, 66 bus: d.bus, 67 segmentChan: make(chan struct{}), 68 op: d.op, 69 packets: make([]bus.PacketizedSegment, 0), 70 started: make(chan struct{}), 71 } 72 d.streamSessions[not.Segment.RepoDID] = ss 73 g.Go(func() error { 74 err := ss.Start(ctx, not) 75 if err != nil { 76 log.Error(ctx, "could not start stream session", "error", err) 77 } 78 d.streamSessionsMu.Lock() 79 delete(d.streamSessions, not.Segment.RepoDID) 80 d.streamSessionsMu.Unlock() 81 return nil 82 }) 83 } 84 d.streamSessionsMu.Unlock() 85 err := ss.NewSegment(ctx, not) 86 if err != nil { 87 log.Error(ctx, "could not add segment to stream session", "error", err) 88 } 89 } 90 } 91} 92 93func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) { 94 d.streamSessionsMu.Lock() 95 defer d.streamSessionsMu.Unlock() 96 ss, ok := d.streamSessions[repoDID] 97 if !ok { 98 return nil, fmt.Errorf("stream session not found") 99 } 100 return ss.hls, nil 101}