Live video on the AT Protocol
at eli/multitesting 110 lines 3.0 kB view raw
1package director 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "github.com/streamplace/oatproxy/pkg/oatproxy" 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" 15 "stream.place/streamplace/pkg/replication" 16 "stream.place/streamplace/pkg/statedb" 17) 18 19// director is responsible for managing the lifecycle of a stream, making business 20// logic decisions about when to do things like 21// - size of the in-memory segment cache 22// - transcoding 23// - thumbnail generation 24 25type Director struct { 26 mm *media.MediaManager 27 mod model.Model 28 cli *config.CLI 29 bus *bus.Bus 30 streamSessions map[string]*StreamSession 31 streamSessionsMu sync.Mutex 32 op *oatproxy.OATProxy 33 statefulDB *statedb.StatefulDB 34 replicator replication.Replicator 35} 36 37func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator) *Director { 38 return &Director{ 39 mm: mm, 40 mod: mod, 41 cli: cli, 42 bus: bus, 43 streamSessions: make(map[string]*StreamSession), 44 streamSessionsMu: sync.Mutex{}, 45 op: op, 46 statefulDB: statefulDB, 47 replicator: replicator, 48 } 49} 50 51func (d *Director) Start(ctx context.Context) error { 52 newSeg := d.mm.NewSegment() 53 ctx, cancel := context.WithCancel(ctx) 54 defer cancel() 55 g, ctx := errgroup.WithContext(ctx) 56 for { 57 select { 58 case <-ctx.Done(): 59 cancel() 60 return g.Wait() 61 case not := <-newSeg: 62 d.streamSessionsMu.Lock() 63 ss, ok := d.streamSessions[not.Segment.RepoDID] 64 if !ok { 65 ss = &StreamSession{ 66 hls: nil, 67 lp: nil, 68 repoDID: not.Segment.RepoDID, 69 mm: d.mm, 70 mod: d.mod, 71 cli: d.cli, 72 bus: d.bus, 73 segmentChan: make(chan struct{}), 74 op: d.op, 75 packets: make([]bus.PacketizedSegment, 0), 76 started: make(chan struct{}), 77 statefulDB: d.statefulDB, 78 replicator: d.replicator, 79 } 80 d.streamSessions[not.Segment.RepoDID] = ss 81 g.Go(func() error { 82 err := ss.Start(ctx, not) 83 if err != nil { 84 log.Error(ctx, "could not start stream session", "error", err) 85 } 86 d.streamSessionsMu.Lock() 87 delete(d.streamSessions, not.Segment.RepoDID) 88 d.streamSessionsMu.Unlock() 89 return nil 90 }) 91 } 92 d.streamSessionsMu.Unlock() 93 94 err := ss.NewSegment(ctx, not) 95 if err != nil { 96 log.Error(ctx, "could not add segment to stream session", "error", err) 97 } 98 } 99 } 100} 101 102func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) { 103 d.streamSessionsMu.Lock() 104 defer d.streamSessionsMu.Unlock() 105 ss, ok := d.streamSessions[repoDID] 106 if !ok { 107 return nil, fmt.Errorf("stream session not found") 108 } 109 return ss.hls, nil 110}