Live video on the AT Protocol
1package director
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/streamplace/oatproxy/pkg/oatproxy"
9 "golang.org/x/sync/errgroup"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/log"
13 "stream.place/streamplace/pkg/media"
14 "stream.place/streamplace/pkg/model"
15 "stream.place/streamplace/pkg/replication"
16 "stream.place/streamplace/pkg/statedb"
17)
18
19// director is responsible for managing the lifecycle of a stream, making business
20// logic decisions about when to do things like
21// - size of the in-memory segment cache
22// - transcoding
23// - thumbnail generation
24
25type Director struct {
26 mm *media.MediaManager
27 mod model.Model
28 cli *config.CLI
29 bus *bus.Bus
30 streamSessions map[string]*StreamSession
31 streamSessionsMu sync.Mutex
32 op *oatproxy.OATProxy
33 statefulDB *statedb.StatefulDB
34 replicator replication.Replicator
35}
36
37func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator) *Director {
38 return &Director{
39 mm: mm,
40 mod: mod,
41 cli: cli,
42 bus: bus,
43 streamSessions: make(map[string]*StreamSession),
44 streamSessionsMu: sync.Mutex{},
45 op: op,
46 statefulDB: statefulDB,
47 replicator: replicator,
48 }
49}
50
51func (d *Director) Start(ctx context.Context) error {
52 newSeg := d.mm.NewSegment()
53 ctx, cancel := context.WithCancel(ctx)
54 defer cancel()
55 g, ctx := errgroup.WithContext(ctx)
56 for {
57 select {
58 case <-ctx.Done():
59 cancel()
60 return g.Wait()
61 case not := <-newSeg:
62 d.streamSessionsMu.Lock()
63 ss, ok := d.streamSessions[not.Segment.RepoDID]
64 if !ok {
65 ss = &StreamSession{
66 hls: nil,
67 lp: nil,
68 repoDID: not.Segment.RepoDID,
69 mm: d.mm,
70 mod: d.mod,
71 cli: d.cli,
72 bus: d.bus,
73 segmentChan: make(chan struct{}),
74 op: d.op,
75 packets: make([]bus.PacketizedSegment, 0),
76 started: make(chan struct{}),
77 statefulDB: d.statefulDB,
78 replicator: d.replicator,
79 // Initialize notification channels (buffered size 1 for coalescing)
80 statusUpdateChan: make(chan struct{}, 1),
81 originUpdateChan: make(chan struct{}, 1),
82 }
83 d.streamSessions[not.Segment.RepoDID] = ss
84 g.Go(func() error {
85 err := ss.Start(ctx, not)
86 if err != nil {
87 log.Error(ctx, "could not start stream session", "error", err)
88 }
89 d.streamSessionsMu.Lock()
90 delete(d.streamSessions, not.Segment.RepoDID)
91 d.streamSessionsMu.Unlock()
92 return nil
93 })
94 }
95 d.streamSessionsMu.Unlock()
96
97 err := ss.NewSegment(ctx, not)
98 if err != nil {
99 log.Error(ctx, "could not add segment to stream session", "error", err)
100 }
101 }
102 }
103}
104
105func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) {
106 d.streamSessionsMu.Lock()
107 defer d.streamSessionsMu.Unlock()
108 ss, ok := d.streamSessions[repoDID]
109 if !ok {
110 return nil, fmt.Errorf("stream session not found")
111 }
112 return ss.hls, nil
113}