Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/packetize-tweak 105 lines 2.9 kB view raw
1package director 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "github.com/streamplace/oatproxy/pkg/oatproxy" 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" 15 "stream.place/streamplace/pkg/statedb" 16) 17 18// director is responsible for managing the lifecycle of a stream, making business 19// logic decisions about when to do things like 20// - size of the in-memory segment cache 21// - transcoding 22// - thumbnail generation 23 24type Director struct { 25 mm *media.MediaManager 26 mod model.Model 27 cli *config.CLI 28 bus *bus.Bus 29 streamSessions map[string]*StreamSession 30 streamSessionsMu sync.Mutex 31 op *oatproxy.OATProxy 32 statefulDB *statedb.StatefulDB 33} 34 35func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB) *Director { 36 return &Director{ 37 mm: mm, 38 mod: mod, 39 cli: cli, 40 bus: bus, 41 streamSessions: make(map[string]*StreamSession), 42 streamSessionsMu: sync.Mutex{}, 43 op: op, 44 statefulDB: statefulDB, 45 } 46} 47 48func (d *Director) Start(ctx context.Context) error { 49 newSeg := d.mm.NewSegment() 50 ctx, cancel := context.WithCancel(ctx) 51 defer cancel() 52 g, ctx := errgroup.WithContext(ctx) 53 for { 54 select { 55 case <-ctx.Done(): 56 cancel() 57 return g.Wait() 58 case not := <-newSeg: 59 d.streamSessionsMu.Lock() 60 ss, ok := d.streamSessions[not.Segment.RepoDID] 61 if !ok { 62 ss = &StreamSession{ 63 hls: nil, 64 lp: nil, 65 repoDID: not.Segment.RepoDID, 66 mm: d.mm, 67 mod: d.mod, 68 cli: d.cli, 69 bus: d.bus, 70 segmentChan: make(chan struct{}), 71 op: d.op, 72 packets: make([]bus.PacketizedSegment, 0), 73 started: make(chan struct{}), 74 statefulDB: d.statefulDB, 75 } 76 d.streamSessions[not.Segment.RepoDID] = ss 77 g.Go(func() error { 78 err := ss.Start(ctx, not) 79 if err != nil { 80 log.Error(ctx, "could not start stream session", "error", err) 81 } 82 d.streamSessionsMu.Lock() 83 delete(d.streamSessions, not.Segment.RepoDID) 84 d.streamSessionsMu.Unlock() 85 return nil 86 }) 87 } 88 d.streamSessionsMu.Unlock() 89 err := ss.NewSegment(ctx, not) 90 if err != nil { 91 log.Error(ctx, "could not add segment to stream session", "error", err) 92 } 93 } 94 } 95} 96 97func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) { 98 d.streamSessionsMu.Lock() 99 defer d.streamSessionsMu.Unlock() 100 ss, ok := d.streamSessions[repoDID] 101 if !ok { 102 return nil, fmt.Errorf("stream session not found") 103 } 104 return ss.hls, nil 105}