Live video on the AT Protocol
1package director
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/streamplace/oatproxy/pkg/oatproxy"
9 "golang.org/x/sync/errgroup"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/localdb"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/media"
15 "stream.place/streamplace/pkg/model"
16 "stream.place/streamplace/pkg/replication"
17 "stream.place/streamplace/pkg/statedb"
18)
19
20// director is responsible for managing the lifecycle of a stream, making business
21// logic decisions about when to do things like
22// - size of the in-memory segment cache
23// - transcoding
24// - thumbnail generation
25
26type Director struct {
27 mm *media.MediaManager
28 mod model.Model
29 cli *config.CLI
30 bus *bus.Bus
31 streamSessions map[string]*StreamSession
32 streamSessionsMu sync.Mutex
33 op *oatproxy.OATProxy
34 statefulDB *statedb.StatefulDB
35 replicator replication.Replicator
36 localDB localdb.LocalDB
37}
38
39func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator, ldb localdb.LocalDB) *Director {
40 return &Director{
41 mm: mm,
42 mod: mod,
43 cli: cli,
44 bus: bus,
45 streamSessions: make(map[string]*StreamSession),
46 streamSessionsMu: sync.Mutex{},
47 op: op,
48 statefulDB: statefulDB,
49 replicator: replicator,
50 localDB: ldb,
51 }
52}
53
54func (d *Director) Start(ctx context.Context) error {
55 newSeg := d.mm.NewSegment()
56 ctx, cancel := context.WithCancel(ctx)
57 defer cancel()
58 g, ctx := errgroup.WithContext(ctx)
59 for {
60 select {
61 case <-ctx.Done():
62 cancel()
63 return g.Wait()
64 case not := <-newSeg:
65 d.streamSessionsMu.Lock()
66 ss, ok := d.streamSessions[not.Segment.RepoDID]
67 if !ok {
68 ss = &StreamSession{
69 hls: nil,
70 lp: nil,
71 repoDID: not.Segment.RepoDID,
72 mm: d.mm,
73 mod: d.mod,
74 cli: d.cli,
75 bus: d.bus,
76 segmentChan: make(chan struct{}),
77 op: d.op,
78 packets: make([]bus.PacketizedSegment, 0),
79 started: make(chan struct{}),
80 statefulDB: d.statefulDB,
81 replicator: d.replicator,
82 // Initialize notification channels (buffered size 1 for coalescing)
83 statusUpdateChan: make(chan struct{}, 1),
84 originUpdateChan: make(chan struct{}, 1),
85 localDB: d.localDB,
86 }
87 d.streamSessions[not.Segment.RepoDID] = ss
88 g.Go(func() error {
89 err := ss.Start(ctx, not)
90 if err != nil {
91 log.Error(ctx, "could not start stream session", "error", err)
92 }
93 d.streamSessionsMu.Lock()
94 delete(d.streamSessions, not.Segment.RepoDID)
95 d.streamSessionsMu.Unlock()
96 return nil
97 })
98 }
99 d.streamSessionsMu.Unlock()
100
101 err := ss.NewSegment(ctx, not)
102 if err != nil {
103 log.Error(ctx, "could not add segment to stream session", "error", err)
104 }
105 }
106 }
107}
108
109func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) {
110 d.streamSessionsMu.Lock()
111 defer d.streamSessionsMu.Unlock()
112 ss, ok := d.streamSessions[repoDID]
113 if !ok {
114 return nil, fmt.Errorf("stream session not found")
115 }
116 return ss.hls, nil
117}