Live video on the AT Protocol
1package director
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/streamplace/oatproxy/pkg/oatproxy"
9 "golang.org/x/sync/errgroup"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/log"
13 "stream.place/streamplace/pkg/media"
14 "stream.place/streamplace/pkg/model"
15 "stream.place/streamplace/pkg/replication"
16 "stream.place/streamplace/pkg/statedb"
17)
18
19// director is responsible for managing the lifecycle of a stream, making business
20// logic decisions about when to do things like
21// - size of the in-memory segment cache
22// - transcoding
23// - thumbnail generation
24
25type Director struct {
26 mm *media.MediaManager
27 mod model.Model
28 cli *config.CLI
29 bus *bus.Bus
30 streamSessions map[string]*StreamSession
31 streamSessionsMu sync.Mutex
32 op *oatproxy.OATProxy
33 statefulDB *statedb.StatefulDB
34 replicator replication.Replicator
35}
36
37func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator) *Director {
38 return &Director{
39 mm: mm,
40 mod: mod,
41 cli: cli,
42 bus: bus,
43 streamSessions: make(map[string]*StreamSession),
44 streamSessionsMu: sync.Mutex{},
45 op: op,
46 statefulDB: statefulDB,
47 replicator: replicator,
48 }
49}
50
51func (d *Director) Start(ctx context.Context) error {
52 newSeg := d.mm.NewSegment()
53 ctx, cancel := context.WithCancel(ctx)
54 defer cancel()
55 g, ctx := errgroup.WithContext(ctx)
56 for {
57 select {
58 case <-ctx.Done():
59 cancel()
60 return g.Wait()
61 case not := <-newSeg:
62 d.streamSessionsMu.Lock()
63 ss, ok := d.streamSessions[not.Segment.RepoDID]
64 if !ok {
65 ss = &StreamSession{
66 hls: nil,
67 lp: nil,
68 repoDID: not.Segment.RepoDID,
69 mm: d.mm,
70 mod: d.mod,
71 cli: d.cli,
72 bus: d.bus,
73 segmentChan: make(chan struct{}),
74 op: d.op,
75 packets: make([]bus.PacketizedSegment, 0),
76 started: make(chan struct{}),
77 statefulDB: d.statefulDB,
78 replicator: d.replicator,
79 }
80 d.streamSessions[not.Segment.RepoDID] = ss
81 g.Go(func() error {
82 err := ss.Start(ctx, not)
83 if err != nil {
84 log.Error(ctx, "could not start stream session", "error", err)
85 }
86 d.streamSessionsMu.Lock()
87 delete(d.streamSessions, not.Segment.RepoDID)
88 d.streamSessionsMu.Unlock()
89 return nil
90 })
91 }
92 d.streamSessionsMu.Unlock()
93
94 err := ss.NewSegment(ctx, not)
95 if err != nil {
96 log.Error(ctx, "could not add segment to stream session", "error", err)
97 }
98 }
99 }
100}
101
102func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) {
103 d.streamSessionsMu.Lock()
104 defer d.streamSessionsMu.Unlock()
105 ss, ok := d.streamSessions[repoDID]
106 if !ok {
107 return nil, fmt.Errorf("stream session not found")
108 }
109 return ss.hls, nil
110}