Live video on the AT Protocol
1package director
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/streamplace/oatproxy/pkg/oatproxy"
9 "golang.org/x/sync/errgroup"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/log"
13 "stream.place/streamplace/pkg/media"
14 "stream.place/streamplace/pkg/model"
15 "stream.place/streamplace/pkg/statedb"
16)
17
18// director is responsible for managing the lifecycle of a stream, making business
19// logic decisions about when to do things like
20// - size of the in-memory segment cache
21// - transcoding
22// - thumbnail generation
23
24type Director struct {
25 mm *media.MediaManager
26 mod model.Model
27 cli *config.CLI
28 bus *bus.Bus
29 streamSessions map[string]*StreamSession
30 streamSessionsMu sync.Mutex
31 op *oatproxy.OATProxy
32 statefulDB *statedb.StatefulDB
33}
34
35func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB) *Director {
36 return &Director{
37 mm: mm,
38 mod: mod,
39 cli: cli,
40 bus: bus,
41 streamSessions: make(map[string]*StreamSession),
42 streamSessionsMu: sync.Mutex{},
43 op: op,
44 statefulDB: statefulDB,
45 }
46}
47
48func (d *Director) Start(ctx context.Context) error {
49 newSeg := d.mm.NewSegment()
50 ctx, cancel := context.WithCancel(ctx)
51 defer cancel()
52 g, ctx := errgroup.WithContext(ctx)
53 for {
54 select {
55 case <-ctx.Done():
56 cancel()
57 return g.Wait()
58 case not := <-newSeg:
59 d.streamSessionsMu.Lock()
60 ss, ok := d.streamSessions[not.Segment.RepoDID]
61 if !ok {
62 ss = &StreamSession{
63 hls: nil,
64 lp: nil,
65 repoDID: not.Segment.RepoDID,
66 mm: d.mm,
67 mod: d.mod,
68 cli: d.cli,
69 bus: d.bus,
70 segmentChan: make(chan struct{}),
71 op: d.op,
72 packets: make([]bus.PacketizedSegment, 0),
73 started: make(chan struct{}),
74 statefulDB: d.statefulDB,
75 }
76 d.streamSessions[not.Segment.RepoDID] = ss
77 g.Go(func() error {
78 err := ss.Start(ctx, not)
79 if err != nil {
80 log.Error(ctx, "could not start stream session", "error", err)
81 }
82 d.streamSessionsMu.Lock()
83 delete(d.streamSessions, not.Segment.RepoDID)
84 d.streamSessionsMu.Unlock()
85 return nil
86 })
87 }
88 d.streamSessionsMu.Unlock()
89 err := ss.NewSegment(ctx, not)
90 if err != nil {
91 log.Error(ctx, "could not add segment to stream session", "error", err)
92 }
93 }
94 }
95}
96
97func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) {
98 d.streamSessionsMu.Lock()
99 defer d.streamSessionsMu.Unlock()
100 ss, ok := d.streamSessions[repoDID]
101 if !ok {
102 return nil, fmt.Errorf("stream session not found")
103 }
104 return ss.hls, nil
105}