Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/streamplace-package 99 lines 2.6 kB view raw
1package director 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "github.com/streamplace/oatproxy/pkg/oatproxy" 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" 15) 16 17// director is responsible for managing the lifecycle of a stream, making business 18// logic decisions about when to do things like 19// - size of the in-memory segment cache 20// - transcoding 21// - thumbnail generation 22 23type Director struct { 24 mm *media.MediaManager 25 mod model.Model 26 cli *config.CLI 27 bus *bus.Bus 28 streamSessions map[string]*StreamSession 29 streamSessionsMu sync.Mutex 30 op *oatproxy.OATProxy 31} 32 33func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy) *Director { 34 return &Director{ 35 mm: mm, 36 mod: mod, 37 cli: cli, 38 bus: bus, 39 streamSessions: make(map[string]*StreamSession), 40 streamSessionsMu: sync.Mutex{}, 41 op: op, 42 } 43} 44 45func (d *Director) Start(ctx context.Context) error { 46 newSeg := d.mm.NewSegment() 47 ctx, cancel := context.WithCancel(ctx) 48 defer cancel() 49 g, ctx := errgroup.WithContext(ctx) 50 for { 51 select { 52 case <-ctx.Done(): 53 cancel() 54 return g.Wait() 55 case not := <-newSeg: 56 d.streamSessionsMu.Lock() 57 ss, ok := d.streamSessions[not.Segment.RepoDID] 58 if !ok { 59 ss = &StreamSession{ 60 hls: nil, 61 lp: nil, 62 repoDID: not.Segment.RepoDID, 63 mm: d.mm, 64 mod: d.mod, 65 cli: d.cli, 66 bus: d.bus, 67 segmentChan: make(chan struct{}), 68 op: d.op, 69 } 70 d.streamSessions[not.Segment.RepoDID] = ss 71 g.Go(func() error { 72 err := ss.Start(ctx, not) 73 if err != nil { 74 log.Error(ctx, "could not start stream session", "error", err) 75 } 76 d.streamSessionsMu.Lock() 77 delete(d.streamSessions, not.Segment.RepoDID) 78 d.streamSessionsMu.Unlock() 79 return nil 80 }) 81 } 82 d.streamSessionsMu.Unlock() 83 err := ss.NewSegment(ctx, not) 84 if err != nil { 85 log.Error(ctx, "could not add segment to stream session", "error", err) 86 } 87 } 88 } 89} 90 91func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) { 92 d.streamSessionsMu.Lock() 93 defer d.streamSessionsMu.Unlock() 94 ss, ok := d.streamSessions[repoDID] 95 if !ok { 96 return nil, fmt.Errorf("stream session not found") 97 } 98 return ss.hls, nil 99}