Live video on the AT Protocol
1package director
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "golang.org/x/sync/errgroup"
9 "stream.place/streamplace/pkg/bus"
10 "stream.place/streamplace/pkg/config"
11 "stream.place/streamplace/pkg/log"
12 "stream.place/streamplace/pkg/media"
13 "stream.place/streamplace/pkg/model"
14)
15
16// director is responsible for managing the lifecycle of a stream, making business
17// logic decisions about when to do things like
18// - size of the in-memory segment cache
19// - transcoding
20// - thumbnail generation
21
22type Director struct {
23 mm *media.MediaManager
24 mod model.Model
25 cli *config.CLI
26 bus *bus.Bus
27 streamSessions map[string]*StreamSession
28 streamSessionsMu sync.Mutex
29}
30
31func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus) *Director {
32 return &Director{
33 mm: mm,
34 mod: mod,
35 cli: cli,
36 bus: bus,
37 streamSessions: make(map[string]*StreamSession),
38 streamSessionsMu: sync.Mutex{},
39 }
40}
41
42func (d *Director) Start(ctx context.Context) error {
43 newSeg := d.mm.NewSegment()
44 ctx, cancel := context.WithCancel(ctx)
45 defer cancel()
46 g, ctx := errgroup.WithContext(ctx)
47 for {
48 select {
49 case <-ctx.Done():
50 cancel()
51 return g.Wait()
52 case not := <-newSeg:
53 d.streamSessionsMu.Lock()
54 ss, ok := d.streamSessions[not.Segment.RepoDID]
55 if !ok {
56 ss = &StreamSession{
57 hls: nil,
58 lp: nil,
59 repoDID: not.Segment.RepoDID,
60 mm: d.mm,
61 mod: d.mod,
62 cli: d.cli,
63 bus: d.bus,
64 segmentChan: make(chan struct{}),
65 }
66 d.streamSessions[not.Segment.RepoDID] = ss
67 g.Go(func() error {
68 err := ss.Start(ctx, not)
69 if err != nil {
70 log.Error(ctx, "could not start stream session", "error", err)
71 }
72 d.streamSessionsMu.Lock()
73 delete(d.streamSessions, not.Segment.RepoDID)
74 d.streamSessionsMu.Unlock()
75 return nil
76 })
77 }
78 d.streamSessionsMu.Unlock()
79 err := ss.NewSegment(ctx, not)
80 if err != nil {
81 log.Error(ctx, "could not add segment to stream session", "error", err)
82 }
83 }
84 }
85}
86
87func (d *Director) GetM3U8(ctx context.Context, repoDID string) (*media.M3U8, error) {
88 d.streamSessionsMu.Lock()
89 defer d.streamSessionsMu.Unlock()
90 ss, ok := d.streamSessions[repoDID]
91 if !ok {
92 return nil, fmt.Errorf("stream session not found")
93 }
94 return ss.hls, nil
95}