Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.30 105 lines 2.9 kB view raw
1package director 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "github.com/streamplace/oatproxy/pkg/oatproxy" 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" 15 "stream.place/streamplace/pkg/statedb" 16) 17 18// director is responsible for managing the lifecycle of a stream, making business 19// logic decisions about when to do things like 20// - size of the in-memory segment cache 21// - transcoding 22// - thumbnail generation 23 24type Director struct { 25 mm *media.MediaManager 26 mod model.Model 27 cli *config.CLI 28 bus *bus.Bus 29 streamSessions map[string]*StreamSession 30 streamSessionsMu sync.Mutex 31 op *oatproxy.OATProxy 32 statefulDB *statedb.StatefulDB 33} 34 35func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB) *Director { 36 return &Director{ 37 mm: mm, 38 mod: mod, 39 cli: cli, 40 bus: bus, 41 streamSessions: make(map[string]*StreamSession), 42 streamSessionsMu: sync.Mutex{}, 43 op: op, 44 statefulDB: statefulDB, 45 } 46} 47 48func (d *Director) Start(ctx context.Context) error { 49 newSeg := d.mm.NewSegment() 50 ctx, cancel := context.WithCancel(ctx) 51 defer cancel() 52 g, ctx := errgroup.WithContext(ctx) 53 for { 54 select { 55 case <-ctx.Done(): 56 cancel() 57 return g.Wait() 58 case not := <-newSeg: 59 d.streamSessionsMu.Lock() 60 ss, ok := d.streamSessions[not.Segment.RepoDID] 61 if !ok { 62 ss = &StreamSession{ 63 hls: nil, 64 lp: nil, 65 repoDID: not.Segment.RepoDID, 66 mm: d.mm, 67 mod: d.mod, 68 cli: d.cli, 69 bus: d.bus, 70 segmentChan: make(chan struct{}), 71 op: d.op, 72 packets: make([]bus.PacketizedSegment, 0), 73 started: make(chan struct{}), 74 statefulDB: d.statefulDB, 75 } 76 d.streamSessions[not.Segment.RepoDID] = ss 77 g.Go(func() error { 78 err := ss.Start(ctx, not) 79 if err != nil { 80 log.Error(ctx, "could not start stream session", "error", err) 81 } 82 d.streamSessionsMu.Lock() 83 delete(d.streamSessions, not.Segment.RepoDID) 84 d.streamSessionsMu.Unlock() 85 return nil 86 }) 87 } 88 d.streamSessionsMu.Unlock() 89 err := ss.NewSegment(ctx, not) 90 if err != nil { 91 log.Error(ctx, "could not add segment to stream session", "error", err) 92 } 93 } 94 } 95} 96 97func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) { 98 d.streamSessionsMu.Lock() 99 defer d.streamSessionsMu.Unlock() 100 ss, ok := d.streamSessions[repoDID] 101 if !ok { 102 return nil, fmt.Errorf("stream session not found") 103 } 104 return ss.hls, nil 105}