Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "crypto"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "sync"
12
13 "github.com/google/uuid"
14 "github.com/pion/interceptor"
15 "github.com/pion/interceptor/pkg/intervalpli"
16 "github.com/pion/webrtc/v4"
17 "go.opentelemetry.io/otel"
18 "stream.place/streamplace/pkg/aqtime"
19 "stream.place/streamplace/pkg/atproto"
20 "stream.place/streamplace/pkg/bus"
21 "stream.place/streamplace/pkg/config"
22 "stream.place/streamplace/pkg/gstinit"
23 "stream.place/streamplace/pkg/model"
24
25 "stream.place/streamplace/pkg/log"
26 "stream.place/streamplace/pkg/replication"
27
28 "git.stream.place/streamplace/c2pa-go/pkg/c2pa/generated/manifeststore"
29 "github.com/piprate/json-gold/ld"
30
31 irohStreamplace "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
32
33 _ "stream.place/streamplace/pkg/streamplacedeps"
34)
35
36const CertFile = "cert.pem"
37const SegmentsDir = "segments"
38
39var StreamplaceMetadata = "place.stream.metadata"
40
41type MediaManager struct {
42 cli *config.CLI
43 replicator replication.Replicator
44 hlsRunning map[string]*M3U8
45 hlsRunningMut sync.Mutex
46 httpPipes map[string]io.Writer
47 httpPipesMutex sync.Mutex
48 newSegmentSubs []chan *NewSegmentNotification
49 newSegmentSubsMutex sync.RWMutex
50 model model.Model
51 bus *bus.Bus
52 atsync *atproto.ATProtoSynchronizer
53 webrtcAPI *webrtc.API
54 webrtcConfig webrtc.Configuration
55}
56
57type NewSegmentNotification struct {
58 Segment *model.Segment
59 Data []byte
60 Metadata *SegmentMetadata
61}
62
63func RunSelfTest(ctx context.Context) error {
64 gstinit.InitGST()
65 return SelfTest(ctx)
66}
67
68func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, rep replication.Replicator, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) {
69 gstinit.InitGST()
70 err := SelfTest(ctx)
71 if err != nil {
72 return nil, fmt.Errorf("error in gstreamer self-test: %w", err)
73 }
74
75 m := &webrtc.MediaEngine{}
76 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
77 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
78 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
79 // for each PeerConnection.
80 i := &interceptor.Registry{}
81
82 // Register a intervalpli factory
83 // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender.
84 // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates
85 // A real world application should process incoming RTCP packets from viewers and forward them to senders
86 intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
87 if err != nil {
88 return nil, fmt.Errorf("failed to create intervalpli factory: %w", err)
89 }
90 i.Add(intervalPliFactory)
91
92 if err := m.RegisterCodec(webrtc.RTPCodecParameters{
93 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
94 PayloadType: 102,
95 }, webrtc.RTPCodecTypeVideo); err != nil {
96 return nil, err
97 }
98 if err := m.RegisterCodec(webrtc.RTPCodecParameters{
99 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
100 PayloadType: 111,
101 }, webrtc.RTPCodecTypeAudio); err != nil {
102 return nil, err
103 }
104
105 // Use the default set of Interceptors
106 if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil {
107 return nil, fmt.Errorf("failed to register default interceptors: %w", err)
108 }
109
110 // Create the API object with the MediaEngine
111 api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
112
113 // Prepare the configuration
114 config := webrtc.Configuration{
115 ICEServers: []webrtc.ICEServer{
116 {
117 URLs: []string{"stun:stun.l.google.com:19302"},
118 },
119 },
120 }
121
122 return &MediaManager{
123 cli: cli,
124 replicator: rep,
125 hlsRunning: map[string]*M3U8{},
126 httpPipes: map[string]io.Writer{},
127 model: mod,
128 bus: bus,
129 atsync: atsync,
130 webrtcAPI: api,
131 webrtcConfig: config,
132 }, nil
133}
134
135func (mm *MediaManager) HandleData(node *irohStreamplace.PublicKey, data []byte) {
136 r := bytes.NewReader(data)
137 ctx := context.Background()
138 err := mm.ValidateMP4(ctx, r)
139 if err != nil {
140 log.Log(ctx, "invalid incoming segment", "error", err)
141 }
142}
143
144// replacement for os.Pipe that works on windows
145func (mm *MediaManager) HTTPPipe() (string, io.ReadCloser, func(), error) {
146 uu, err := uuid.NewV7()
147 if err != nil {
148 return "", nil, nil, err
149 }
150 mm.httpPipesMutex.Lock()
151 defer mm.httpPipesMutex.Unlock()
152 u := fmt.Sprintf("%s/http-pipe/%s", mm.cli.OwnInternalURL(), uu.String())
153 done := func() {
154 mm.httpPipesMutex.Lock()
155 defer mm.httpPipesMutex.Unlock()
156 delete(mm.httpPipes, uu.String())
157 }
158 r, w := io.Pipe()
159 mm.httpPipes[uu.String()] = w
160 return u, r, done, nil
161}
162
163func (mm *MediaManager) GetHTTPPipeWriter(uu string) io.Writer {
164 mm.httpPipesMutex.Lock()
165 defer mm.httpPipesMutex.Unlock()
166 return mm.httpPipes[uu]
167}
168
169// register a handler for all new segments that come in
170func (mm *MediaManager) NewSegment() <-chan *NewSegmentNotification {
171 ch := make(chan *NewSegmentNotification)
172 mm.newSegmentSubsMutex.Lock()
173 defer mm.newSegmentSubsMutex.Unlock()
174 mm.newSegmentSubs = append(mm.newSegmentSubs, ch)
175 return ch
176}
177
178type obj map[string]any
179
180type StringVal struct {
181 Value string `json:"@value"`
182}
183
184type ExpandedSchemaOrg []struct {
185 Creator []StringVal `json:"http://purl.org/dc/elements/1.1/creator"`
186 Date []StringVal `json:"http://purl.org/dc/elements/1.1/date"`
187 Title []StringVal `json:"http://purl.org/dc/elements/1.1/title"`
188}
189
190type SegmentMetadata struct {
191 StartTime aqtime.AQTime
192 Title string
193 Creator string
194}
195
196var ErrInvalidMetadata = errors.New("invalid segment metadata")
197
198func ParseSegmentAssertions(ctx context.Context, mani *manifeststore.Manifest) (*SegmentMetadata, error) {
199 _, span := otel.Tracer("signer").Start(ctx, "ParseSegmentAssertions")
200 defer span.End()
201 var ass *manifeststore.ManifestAssertion
202 for _, a := range mani.Assertions {
203 if a.Label == StreamplaceMetadata {
204 ass = &a
205 break
206 }
207 }
208 if ass == nil {
209 return nil, fmt.Errorf("couldn't find %s assertions", StreamplaceMetadata)
210 }
211 proc := ld.NewJsonLdProcessor()
212 options := ld.NewJsonLdOptions("")
213 flat, err := proc.Expand(ass.Data, options)
214 if err != nil {
215 return nil, err
216 }
217 bs, err := json.Marshal(flat)
218 if err != nil {
219 return nil, err
220 }
221 var metas ExpandedSchemaOrg
222 err = json.Unmarshal(bs, &metas)
223 if err != nil {
224 return nil, err
225 }
226 if len(metas) != 1 {
227 return nil, ErrInvalidMetadata
228 }
229 meta := metas[0]
230 if len(meta.Creator) == 0 {
231 return nil, ErrInvalidMetadata
232 }
233 if len(meta.Title) != 1 {
234 return nil, ErrInvalidMetadata
235 }
236 if len(meta.Date) != 1 {
237 return nil, ErrInvalidMetadata
238 }
239 start, err := aqtime.FromString(meta.Date[0].Value)
240 if err != nil {
241 return nil, err
242 }
243 out := SegmentMetadata{
244 StartTime: start,
245 Title: meta.Title[0].Value,
246 Creator: meta.Creator[0].Value,
247 }
248 return &out, nil
249}