Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "crypto"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "sync"
11
12 "github.com/google/uuid"
13 "github.com/pion/interceptor"
14 "github.com/pion/interceptor/pkg/intervalpli"
15 "github.com/pion/webrtc/v4"
16 "go.opentelemetry.io/otel"
17 "stream.place/streamplace/pkg/aqtime"
18 "stream.place/streamplace/pkg/atproto"
19 "stream.place/streamplace/pkg/bus"
20 "stream.place/streamplace/pkg/config"
21 "stream.place/streamplace/pkg/gstinit"
22 "stream.place/streamplace/pkg/media/segchanman"
23 "stream.place/streamplace/pkg/model"
24
25 "stream.place/streamplace/pkg/replication"
26
27 "git.stream.place/streamplace/c2pa-go/pkg/c2pa/generated/manifeststore"
28 "github.com/piprate/json-gold/ld"
29)
30
31const CERT_FILE = "cert.pem"
32const SEGMENTS_DIR = "segments"
33
34var STREAMPLACE_METADATA = "place.stream.metadata"
35
36type MediaManager struct {
37 cli *config.CLI
38 segChanMan *segchanman.SegChanMan
39 replicator replication.Replicator
40 hlsRunning map[string]*M3U8
41 hlsRunningMut sync.Mutex
42 httpPipes map[string]io.Writer
43 httpPipesMutex sync.Mutex
44 newSegmentSubs []chan *NewSegmentNotification
45 newSegmentSubsMutex sync.RWMutex
46 model model.Model
47 bus *bus.Bus
48 atsync *atproto.ATProtoSynchronizer
49 webrtcAPI *webrtc.API
50 webrtcConfig webrtc.Configuration
51}
52
53type NewSegmentNotification struct {
54 Segment *model.Segment
55 Data []byte
56 Metadata *SegmentMetadata
57}
58
59func RunSelfTest(ctx context.Context) error {
60 gstinit.InitGST()
61 return SelfTest(ctx)
62}
63
64func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, rep replication.Replicator, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) {
65 gstinit.InitGST()
66 err := SelfTest(ctx)
67 if err != nil {
68 return nil, fmt.Errorf("error in gstreamer self-test: %w", err)
69 }
70
71 m := &webrtc.MediaEngine{}
72 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
73 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
74 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
75 // for each PeerConnection.
76 i := &interceptor.Registry{}
77
78 // Register a intervalpli factory
79 // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender.
80 // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates
81 // A real world application should process incoming RTCP packets from viewers and forward them to senders
82 intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
83 if err != nil {
84 return nil, fmt.Errorf("failed to create intervalpli factory: %w", err)
85 }
86 i.Add(intervalPliFactory)
87
88 if err := m.RegisterCodec(webrtc.RTPCodecParameters{
89 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
90 PayloadType: 102,
91 }, webrtc.RTPCodecTypeVideo); err != nil {
92 return nil, err
93 }
94 if err := m.RegisterCodec(webrtc.RTPCodecParameters{
95 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
96 PayloadType: 111,
97 }, webrtc.RTPCodecTypeAudio); err != nil {
98 return nil, err
99 }
100
101 // Use the default set of Interceptors
102 if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil {
103 return nil, fmt.Errorf("failed to register default interceptors: %w", err)
104 }
105
106 // Create the API object with the MediaEngine
107 api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
108
109 // Prepare the configuration
110 config := webrtc.Configuration{
111 ICEServers: []webrtc.ICEServer{
112 {
113 URLs: []string{"stun:stun.l.google.com:19302"},
114 },
115 },
116 }
117 return &MediaManager{
118 cli: cli,
119 segChanMan: segchanman.MakeSegChanMan(),
120 replicator: rep,
121 hlsRunning: map[string]*M3U8{},
122 httpPipes: map[string]io.Writer{},
123 model: mod,
124 bus: bus,
125 atsync: atsync,
126 webrtcAPI: api,
127 webrtcConfig: config,
128 }, nil
129}
130
131// replacement for os.Pipe that works on windows
132func (mm *MediaManager) HTTPPipe() (string, io.ReadCloser, func(), error) {
133 uu, err := uuid.NewV7()
134 if err != nil {
135 return "", nil, nil, err
136 }
137 mm.httpPipesMutex.Lock()
138 defer mm.httpPipesMutex.Unlock()
139 u := fmt.Sprintf("%s/http-pipe/%s", mm.cli.OwnInternalURL(), uu.String())
140 done := func() {
141 mm.httpPipesMutex.Lock()
142 defer mm.httpPipesMutex.Unlock()
143 delete(mm.httpPipes, uu.String())
144 }
145 r, w := io.Pipe()
146 mm.httpPipes[uu.String()] = w
147 return u, r, done, nil
148}
149
150func (mm *MediaManager) GetHTTPPipeWriter(uu string) io.Writer {
151 mm.httpPipesMutex.Lock()
152 defer mm.httpPipesMutex.Unlock()
153 return mm.httpPipes[uu]
154}
155
156// register a handler for all new segments that come in
157func (mm *MediaManager) NewSegment() <-chan *NewSegmentNotification {
158 ch := make(chan *NewSegmentNotification)
159 mm.newSegmentSubsMutex.Lock()
160 defer mm.newSegmentSubsMutex.Unlock()
161 mm.newSegmentSubs = append(mm.newSegmentSubs, ch)
162 return ch
163}
164
165// subscribe to the latest segments from a given user for livestreaming purposes
166func (mm *MediaManager) SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg {
167 return mm.segChanMan.SubscribeSegment(ctx, user, rendition)
168}
169
170func (mm *MediaManager) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg) {
171 mm.segChanMan.UnsubscribeSegment(ctx, user, rendition, ch)
172}
173
174// subscribe to the latest segments from a given user for livestreaming purposes
175func (mm *MediaManager) PublishSegment(ctx context.Context, user, rendition string, seg *segchanman.Seg) {
176 mm.segChanMan.PublishSegment(ctx, user, rendition, seg)
177}
178
179type obj map[string]any
180
181type StringVal struct {
182 Value string `json:"@value"`
183}
184
185type ExpandedSchemaOrg []struct {
186 Creator []StringVal `json:"http://purl.org/dc/elements/1.1/creator"`
187 Date []StringVal `json:"http://purl.org/dc/elements/1.1/date"`
188 Title []StringVal `json:"http://purl.org/dc/elements/1.1/title"`
189}
190
191type SegmentMetadata struct {
192 StartTime aqtime.AQTime
193 Title string
194 Creator string
195}
196
197var ErrInvalidMetadata = errors.New("invalid segment metadata")
198
199func ParseSegmentAssertions(ctx context.Context, mani *manifeststore.Manifest) (*SegmentMetadata, error) {
200 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentAssertions")
201 defer span.End()
202 var ass *manifeststore.ManifestAssertion
203 for _, a := range mani.Assertions {
204 if a.Label == STREAMPLACE_METADATA {
205 ass = &a
206 break
207 }
208 }
209 if ass == nil {
210 return nil, fmt.Errorf("couldn't find %s assertions", STREAMPLACE_METADATA)
211 }
212 proc := ld.NewJsonLdProcessor()
213 options := ld.NewJsonLdOptions("")
214 flat, err := proc.Expand(ass.Data, options)
215 if err != nil {
216 return nil, err
217 }
218 bs, err := json.Marshal(flat)
219 if err != nil {
220 return nil, err
221 }
222 var metas ExpandedSchemaOrg
223 err = json.Unmarshal(bs, &metas)
224 if err != nil {
225 return nil, err
226 }
227 if len(metas) != 1 {
228 return nil, ErrInvalidMetadata
229 }
230 meta := metas[0]
231 if len(meta.Creator) == 0 {
232 return nil, ErrInvalidMetadata
233 }
234 if len(meta.Title) != 1 {
235 return nil, ErrInvalidMetadata
236 }
237 if len(meta.Date) != 1 {
238 return nil, ErrInvalidMetadata
239 }
240 start, err := aqtime.FromString(meta.Date[0].Value)
241 if err != nil {
242 return nil, err
243 }
244 out := SegmentMetadata{
245 StartTime: start,
246 Title: meta.Title[0].Value,
247 Creator: meta.Creator[0].Value,
248 }
249 return &out, nil
250}