Live video on the AT Protocol
1package director
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/streamplace/oatproxy/pkg/oatproxy"
9 "golang.org/x/sync/errgroup"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/log"
13 "stream.place/streamplace/pkg/media"
14 "stream.place/streamplace/pkg/model"
15)
16
17// director is responsible for managing the lifecycle of a stream, making business
18// logic decisions about when to do things like
19// - size of the in-memory segment cache
20// - transcoding
21// - thumbnail generation
22
23type Director struct {
24 mm *media.MediaManager
25 mod model.Model
26 cli *config.CLI
27 bus *bus.Bus
28 streamSessions map[string]*StreamSession
29 streamSessionsMu sync.Mutex
30 op *oatproxy.OATProxy
31}
32
33func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy) *Director {
34 return &Director{
35 mm: mm,
36 mod: mod,
37 cli: cli,
38 bus: bus,
39 streamSessions: make(map[string]*StreamSession),
40 streamSessionsMu: sync.Mutex{},
41 op: op,
42 }
43}
44
45func (d *Director) Start(ctx context.Context) error {
46 newSeg := d.mm.NewSegment()
47 ctx, cancel := context.WithCancel(ctx)
48 defer cancel()
49 g, ctx := errgroup.WithContext(ctx)
50 for {
51 select {
52 case <-ctx.Done():
53 cancel()
54 return g.Wait()
55 case not := <-newSeg:
56 d.streamSessionsMu.Lock()
57 ss, ok := d.streamSessions[not.Segment.RepoDID]
58 if !ok {
59 ss = &StreamSession{
60 hls: nil,
61 lp: nil,
62 repoDID: not.Segment.RepoDID,
63 mm: d.mm,
64 mod: d.mod,
65 cli: d.cli,
66 bus: d.bus,
67 segmentChan: make(chan struct{}),
68 op: d.op,
69 packets: make([]bus.PacketizedSegment, 0),
70 started: make(chan struct{}),
71 }
72 d.streamSessions[not.Segment.RepoDID] = ss
73 g.Go(func() error {
74 err := ss.Start(ctx, not)
75 if err != nil {
76 log.Error(ctx, "could not start stream session", "error", err)
77 }
78 d.streamSessionsMu.Lock()
79 delete(d.streamSessions, not.Segment.RepoDID)
80 d.streamSessionsMu.Unlock()
81 return nil
82 })
83 }
84 d.streamSessionsMu.Unlock()
85 err := ss.NewSegment(ctx, not)
86 if err != nil {
87 log.Error(ctx, "could not add segment to stream session", "error", err)
88 }
89 }
90 }
91}
92
93func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) {
94 d.streamSessionsMu.Lock()
95 defer d.streamSessionsMu.Unlock()
96 ss, ok := d.streamSessions[repoDID]
97 if !ok {
98 return nil, fmt.Errorf("stream session not found")
99 }
100 return ss.hls, nil
101}