Live video on the AT Protocol
1package director
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/streamplace/oatproxy/pkg/oatproxy"
9 "golang.org/x/sync/errgroup"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/log"
13 "stream.place/streamplace/pkg/media"
14 "stream.place/streamplace/pkg/model"
15)
16
17// director is responsible for managing the lifecycle of a stream, making business
18// logic decisions about when to do things like
19// - size of the in-memory segment cache
20// - transcoding
21// - thumbnail generation
22
23type Director struct {
24 mm *media.MediaManager
25 mod model.Model
26 cli *config.CLI
27 bus *bus.Bus
28 streamSessions map[string]*StreamSession
29 streamSessionsMu sync.Mutex
30 op *oatproxy.OATProxy
31}
32
33func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy) *Director {
34 return &Director{
35 mm: mm,
36 mod: mod,
37 cli: cli,
38 bus: bus,
39 streamSessions: make(map[string]*StreamSession),
40 streamSessionsMu: sync.Mutex{},
41 op: op,
42 }
43}
44
45func (d *Director) Start(ctx context.Context) error {
46 newSeg := d.mm.NewSegment()
47 ctx, cancel := context.WithCancel(ctx)
48 defer cancel()
49 g, ctx := errgroup.WithContext(ctx)
50 for {
51 select {
52 case <-ctx.Done():
53 cancel()
54 return g.Wait()
55 case not := <-newSeg:
56 d.streamSessionsMu.Lock()
57 ss, ok := d.streamSessions[not.Segment.RepoDID]
58 if !ok {
59 ss = &StreamSession{
60 hls: nil,
61 lp: nil,
62 repoDID: not.Segment.RepoDID,
63 mm: d.mm,
64 mod: d.mod,
65 cli: d.cli,
66 bus: d.bus,
67 segmentChan: make(chan struct{}),
68 op: d.op,
69 }
70 d.streamSessions[not.Segment.RepoDID] = ss
71 g.Go(func() error {
72 err := ss.Start(ctx, not)
73 if err != nil {
74 log.Error(ctx, "could not start stream session", "error", err)
75 }
76 d.streamSessionsMu.Lock()
77 delete(d.streamSessions, not.Segment.RepoDID)
78 d.streamSessionsMu.Unlock()
79 return nil
80 })
81 }
82 d.streamSessionsMu.Unlock()
83 err := ss.NewSegment(ctx, not)
84 if err != nil {
85 log.Error(ctx, "could not add segment to stream session", "error", err)
86 }
87 }
88 }
89}
90
91func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) {
92 d.streamSessionsMu.Lock()
93 defer d.streamSessionsMu.Unlock()
94 ss, ok := d.streamSessions[repoDID]
95 if !ok {
96 return nil, fmt.Errorf("stream session not found")
97 }
98 return ss.hls, nil
99}