Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/resolution-display 117 lines 3.4 kB view raw
1package director 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "github.com/streamplace/oatproxy/pkg/oatproxy" 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/localdb" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/media" 15 "stream.place/streamplace/pkg/model" 16 "stream.place/streamplace/pkg/replication" 17 "stream.place/streamplace/pkg/statedb" 18) 19 20// director is responsible for managing the lifecycle of a stream, making business 21// logic decisions about when to do things like 22// - size of the in-memory segment cache 23// - transcoding 24// - thumbnail generation 25 26type Director struct { 27 mm *media.MediaManager 28 mod model.Model 29 cli *config.CLI 30 bus *bus.Bus 31 streamSessions map[string]*StreamSession 32 streamSessionsMu sync.Mutex 33 op *oatproxy.OATProxy 34 statefulDB *statedb.StatefulDB 35 replicator replication.Replicator 36 localDB localdb.LocalDB 37} 38 39func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator, ldb localdb.LocalDB) *Director { 40 return &Director{ 41 mm: mm, 42 mod: mod, 43 cli: cli, 44 bus: bus, 45 streamSessions: make(map[string]*StreamSession), 46 streamSessionsMu: sync.Mutex{}, 47 op: op, 48 statefulDB: statefulDB, 49 replicator: replicator, 50 localDB: ldb, 51 } 52} 53 54func (d *Director) Start(ctx context.Context) error { 55 newSeg := d.mm.NewSegment() 56 ctx, cancel := context.WithCancel(ctx) 57 defer cancel() 58 g, ctx := errgroup.WithContext(ctx) 59 for { 60 select { 61 case <-ctx.Done(): 62 cancel() 63 return g.Wait() 64 case not := <-newSeg: 65 d.streamSessionsMu.Lock() 66 ss, ok := d.streamSessions[not.Segment.RepoDID] 67 if !ok { 68 ss = &StreamSession{ 69 hls: nil, 70 lp: nil, 71 repoDID: not.Segment.RepoDID, 72 mm: d.mm, 73 mod: d.mod, 74 cli: d.cli, 75 bus: d.bus, 76 segmentChan: make(chan struct{}), 77 op: d.op, 78 packets: make([]bus.PacketizedSegment, 0), 79 started: make(chan struct{}), 80 statefulDB: d.statefulDB, 81 replicator: d.replicator, 82 // Initialize notification channels (buffered size 1 for coalescing) 83 statusUpdateChan: make(chan struct{}, 1), 84 originUpdateChan: make(chan struct{}, 1), 85 localDB: d.localDB, 86 } 87 d.streamSessions[not.Segment.RepoDID] = ss 88 g.Go(func() error { 89 err := ss.Start(ctx, not) 90 if err != nil { 91 log.Error(ctx, "could not start stream session", "error", err) 92 } 93 d.streamSessionsMu.Lock() 94 delete(d.streamSessions, not.Segment.RepoDID) 95 d.streamSessionsMu.Unlock() 96 return nil 97 }) 98 } 99 d.streamSessionsMu.Unlock() 100 101 err := ss.NewSegment(ctx, not) 102 if err != nil { 103 log.Error(ctx, "could not add segment to stream session", "error", err) 104 } 105 } 106 } 107} 108 109func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) { 110 d.streamSessionsMu.Lock() 111 defer d.streamSessionsMu.Unlock() 112 ss, ok := d.streamSessions[repoDID] 113 if !ok { 114 return nil, fmt.Errorf("stream session not found") 115 } 116 return ss.hls, nil 117}