Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "crypto"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "sync"
11
12 "github.com/google/uuid"
13 "github.com/pion/interceptor"
14 "github.com/pion/interceptor/pkg/intervalpli"
15 "github.com/pion/webrtc/v4"
16 "go.opentelemetry.io/otel"
17 "stream.place/streamplace/pkg/aqtime"
18 "stream.place/streamplace/pkg/atproto"
19 "stream.place/streamplace/pkg/bus"
20 "stream.place/streamplace/pkg/config"
21 "stream.place/streamplace/pkg/gstinit"
22 "stream.place/streamplace/pkg/model"
23
24 "stream.place/streamplace/pkg/replication"
25
26 "git.stream.place/streamplace/c2pa-go/pkg/c2pa/generated/manifeststore"
27 "github.com/piprate/json-gold/ld"
28)
29
30// #cgo pkg-config: streamplacedeps-uninstalled
31import "C"
32
33const CertFile = "cert.pem"
34const SegmentsDir = "segments"
35
36var StreamplaceMetadata = "place.stream.metadata"
37
38type MediaManager struct {
39 cli *config.CLI
40 replicator replication.Replicator
41 hlsRunning map[string]*M3U8
42 hlsRunningMut sync.Mutex
43 httpPipes map[string]io.Writer
44 httpPipesMutex sync.Mutex
45 newSegmentSubs []chan *NewSegmentNotification
46 newSegmentSubsMutex sync.RWMutex
47 model model.Model
48 bus *bus.Bus
49 atsync *atproto.ATProtoSynchronizer
50 webrtcAPI *webrtc.API
51 webrtcConfig webrtc.Configuration
52}
53
54type NewSegmentNotification struct {
55 Segment *model.Segment
56 Data []byte
57 Metadata *SegmentMetadata
58}
59
60func RunSelfTest(ctx context.Context) error {
61 gstinit.InitGST()
62 return SelfTest(ctx)
63}
64
65func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, rep replication.Replicator, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) {
66 gstinit.InitGST()
67 err := SelfTest(ctx)
68 if err != nil {
69 return nil, fmt.Errorf("error in gstreamer self-test: %w", err)
70 }
71
72 m := &webrtc.MediaEngine{}
73 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
74 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
75 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
76 // for each PeerConnection.
77 i := &interceptor.Registry{}
78
79 // Register a intervalpli factory
80 // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender.
81 // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates
82 // A real world application should process incoming RTCP packets from viewers and forward them to senders
83 intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
84 if err != nil {
85 return nil, fmt.Errorf("failed to create intervalpli factory: %w", err)
86 }
87 i.Add(intervalPliFactory)
88
89 if err := m.RegisterCodec(webrtc.RTPCodecParameters{
90 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
91 PayloadType: 102,
92 }, webrtc.RTPCodecTypeVideo); err != nil {
93 return nil, err
94 }
95 if err := m.RegisterCodec(webrtc.RTPCodecParameters{
96 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
97 PayloadType: 111,
98 }, webrtc.RTPCodecTypeAudio); err != nil {
99 return nil, err
100 }
101
102 // Use the default set of Interceptors
103 if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil {
104 return nil, fmt.Errorf("failed to register default interceptors: %w", err)
105 }
106
107 // Create the API object with the MediaEngine
108 api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
109
110 // Prepare the configuration
111 config := webrtc.Configuration{
112 ICEServers: []webrtc.ICEServer{
113 {
114 URLs: []string{"stun:stun.l.google.com:19302"},
115 },
116 },
117 }
118 return &MediaManager{
119 cli: cli,
120 replicator: rep,
121 hlsRunning: map[string]*M3U8{},
122 httpPipes: map[string]io.Writer{},
123 model: mod,
124 bus: bus,
125 atsync: atsync,
126 webrtcAPI: api,
127 webrtcConfig: config,
128 }, nil
129}
130
131// replacement for os.Pipe that works on windows
132func (mm *MediaManager) HTTPPipe() (string, io.ReadCloser, func(), error) {
133 uu, err := uuid.NewV7()
134 if err != nil {
135 return "", nil, nil, err
136 }
137 mm.httpPipesMutex.Lock()
138 defer mm.httpPipesMutex.Unlock()
139 u := fmt.Sprintf("%s/http-pipe/%s", mm.cli.OwnInternalURL(), uu.String())
140 done := func() {
141 mm.httpPipesMutex.Lock()
142 defer mm.httpPipesMutex.Unlock()
143 delete(mm.httpPipes, uu.String())
144 }
145 r, w := io.Pipe()
146 mm.httpPipes[uu.String()] = w
147 return u, r, done, nil
148}
149
150func (mm *MediaManager) GetHTTPPipeWriter(uu string) io.Writer {
151 mm.httpPipesMutex.Lock()
152 defer mm.httpPipesMutex.Unlock()
153 return mm.httpPipes[uu]
154}
155
156// register a handler for all new segments that come in
157func (mm *MediaManager) NewSegment() <-chan *NewSegmentNotification {
158 ch := make(chan *NewSegmentNotification)
159 mm.newSegmentSubsMutex.Lock()
160 defer mm.newSegmentSubsMutex.Unlock()
161 mm.newSegmentSubs = append(mm.newSegmentSubs, ch)
162 return ch
163}
164
165type obj map[string]any
166
167type StringVal struct {
168 Value string `json:"@value"`
169}
170
171type ExpandedSchemaOrg []struct {
172 Creator []StringVal `json:"http://purl.org/dc/elements/1.1/creator"`
173 Date []StringVal `json:"http://purl.org/dc/elements/1.1/date"`
174 Title []StringVal `json:"http://purl.org/dc/elements/1.1/title"`
175}
176
177type SegmentMetadata struct {
178 StartTime aqtime.AQTime
179 Title string
180 Creator string
181}
182
183var ErrInvalidMetadata = errors.New("invalid segment metadata")
184
185func ParseSegmentAssertions(ctx context.Context, mani *manifeststore.Manifest) (*SegmentMetadata, error) {
186 _, span := otel.Tracer("signer").Start(ctx, "ParseSegmentAssertions")
187 defer span.End()
188 var ass *manifeststore.ManifestAssertion
189 for _, a := range mani.Assertions {
190 if a.Label == StreamplaceMetadata {
191 ass = &a
192 break
193 }
194 }
195 if ass == nil {
196 return nil, fmt.Errorf("couldn't find %s assertions", StreamplaceMetadata)
197 }
198 proc := ld.NewJsonLdProcessor()
199 options := ld.NewJsonLdOptions("")
200 flat, err := proc.Expand(ass.Data, options)
201 if err != nil {
202 return nil, err
203 }
204 bs, err := json.Marshal(flat)
205 if err != nil {
206 return nil, err
207 }
208 var metas ExpandedSchemaOrg
209 err = json.Unmarshal(bs, &metas)
210 if err != nil {
211 return nil, err
212 }
213 if len(metas) != 1 {
214 return nil, ErrInvalidMetadata
215 }
216 meta := metas[0]
217 if len(meta.Creator) == 0 {
218 return nil, ErrInvalidMetadata
219 }
220 if len(meta.Title) != 1 {
221 return nil, ErrInvalidMetadata
222 }
223 if len(meta.Date) != 1 {
224 return nil, ErrInvalidMetadata
225 }
226 start, err := aqtime.FromString(meta.Date[0].Value)
227 if err != nil {
228 return nil, err
229 }
230 out := SegmentMetadata{
231 StartTime: start,
232 Title: meta.Title[0].Value,
233 Creator: meta.Creator[0].Value,
234 }
235 return &out, nil
236}