Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "sort"
11
12 "golang.org/x/sync/errgroup"
13 "stream.place/streamplace/pkg/aqio"
14 c2patypes "stream.place/streamplace/pkg/c2patypes"
15 "stream.place/streamplace/pkg/config"
16 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
17 "stream.place/streamplace/pkg/log"
18)
19
20type SplitSegment struct {
21 Filename string
22 Data []byte
23}
24
25type ManifestResult struct {
26 Manifests map[string]c2patypes.Manifest `json:"manifests"`
27 Certs map[string]string `json:"certs"`
28}
29
30type ManifestAndMetadata struct {
31 Manifest c2patypes.Manifest
32 SegmentMetadata *SegmentMetadata
33}
34
35type ReadWriteSeekCloser interface {
36 io.ReadWriteSeeker
37 io.Closer
38}
39
40type SegmentToSign struct {
41 unsignedSeg io.ReadSeeker
42 manifestId string
43 cert []byte
44 outputSeg io.ReadWriteSeeker
45 closeCh chan struct{}
46}
47
48func NewSegmentToSign(unsignedSeg io.ReadSeeker, manifestId string, cert []byte, outputSeg io.ReadWriteSeeker) *SegmentToSign {
49 return &SegmentToSign{
50 unsignedSeg: unsignedSeg,
51 manifestId: manifestId,
52 cert: cert,
53 outputSeg: outputSeg,
54 closeCh: make(chan struct{}),
55 }
56}
57
58func (s *SegmentToSign) UnsignedSegStream() iroh_streamplace.Stream {
59 return c2patypes.NewReader(s.unsignedSeg)
60}
61
62func (s *SegmentToSign) ManifestId() string {
63 return s.manifestId
64}
65
66func (s *SegmentToSign) Cert() []byte {
67 return s.cert
68}
69
70func (s *SegmentToSign) OutputSegStream() iroh_streamplace.Stream {
71 return c2patypes.NewWriter(s.outputSeg)
72}
73
74func (s *SegmentToSign) Close() {
75 close(s.closeCh)
76}
77
78func (s *SegmentToSign) Done() {
79 <-s.closeCh
80}
81
82type ManySegmentsToSign struct {
83 segmentCh chan iroh_streamplace.SegmentToSign
84 readyCh chan struct{}
85}
86
87func (m *ManySegmentsToSign) Next() *iroh_streamplace.SegmentToSign {
88 if m.readyCh != nil {
89 close(m.readyCh)
90 m.readyCh = nil
91 }
92 seg := <-m.segmentCh
93 if seg == nil {
94 return nil
95 }
96 return &seg
97}
98
99// split a signed concatenated mp4 into its constituent signed segments
100func SplitSegments(ctx context.Context, cli *config.CLI, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error {
101 manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(input))
102 if err != nil {
103 return fmt.Errorf("failed to get manifests: %w", err)
104 }
105 _, err = input.Seek(0, io.SeekStart)
106 if err != nil {
107 return fmt.Errorf("failed to seek to start: %w", err)
108 }
109 var manifests ManifestResult
110 err = json.Unmarshal([]byte(manifestsStr), &manifests)
111 if err != nil {
112 return fmt.Errorf("failed to unmarshal manifests: %w", err)
113 }
114 manifestList := []ManifestAndMetadata{}
115 for _, manifest := range manifests.Manifests {
116 metadata, err := ParseSegmentAssertions(context.Background(), &manifest)
117 if errors.Is(err, ErrMissingMetadata) {
118 log.Error(ctx, "missing metadata", "manifest", manifest.Label)
119 continue
120 }
121 if err != nil {
122 return fmt.Errorf("failed to parse segment assertions: %w", err)
123 }
124 manifestList = append(manifestList, ManifestAndMetadata{
125 Manifest: manifest,
126 SegmentMetadata: metadata,
127 })
128 }
129 if len(manifestList) == 0 {
130 return fmt.Errorf("no manifests found")
131 }
132 sort.Slice(manifestList, func(i, j int) bool {
133 m1 := manifestList[i]
134 m2 := manifestList[j]
135 return m1.SegmentMetadata.StartTime.Time().Before(m2.SegmentMetadata.StartTime.Time())
136 })
137 certList := [][]byte{}
138 for _, manifest := range manifestList {
139 certList = append(certList, []byte(manifests.Certs[*manifest.Manifest.Label]))
140 }
141
142 segmentCh := make(chan iroh_streamplace.SegmentToSign)
143 readyCh := make(chan struct{})
144 mss := &ManySegmentsToSign{
145 segmentCh: segmentCh,
146 readyCh: readyCh,
147 }
148 g, ctx := errgroup.WithContext(ctx)
149 unsignedCh := make(chan *SplitSegment)
150 streamer := manifestList[0].SegmentMetadata.Creator
151
152 // note: we're passing the input to two places here and need to make sure
153 // they're not running into problems with concurrent seeking. so we use
154 // this readyCh as a hack - it only fires after Rust is done with the input
155
156 g.Go(func() error {
157 err := iroh_streamplace.Resign(mss, c2patypes.NewReader(input))
158 if err != nil {
159 return fmt.Errorf("failed to resign segments: %w", err)
160 }
161 return nil
162 })
163 g.Go(func() error {
164 defer close(unsignedCh)
165 <-readyCh
166 // rust is done with the input, rewind and start segmenting
167 _, err := input.Seek(0, io.SeekStart)
168 if err != nil {
169 return fmt.Errorf("failed to seek to start: %w", err)
170 }
171 err = SegmentUnsigned(ctx, cli, streamer, input, true, unsignedCh)
172 if err != nil {
173 return fmt.Errorf("failed to segment file: %w", err)
174 }
175 return nil
176 })
177 validationErrors := []error{}
178 g.Go(func() error {
179 defer close(segmentCh)
180 i := 0
181 for unsignedSeg := range unsignedCh {
182 meta := manifestList[i].SegmentMetadata
183 fname := fmt.Sprintf("%s.mp4", meta.StartTime.FileSafeString())
184 rwsc := cb(fname)
185 rws := aqio.NewReadWriteSeeker(unsignedSeg.Data)
186 ss := NewSegmentToSign(c2patypes.NewReader(rws), *manifestList[i].Manifest.Label, certList[i], rwsc)
187 i += 1
188 segmentCh <- ss
189 ss.Done()
190 _, err := rwsc.Seek(0, io.SeekStart)
191 if err != nil {
192 return fmt.Errorf("failed to seek to start: %w", err)
193 }
194 bs, err := io.ReadAll(rwsc)
195 if err != nil {
196 return fmt.Errorf("failed to read segment file: %w", err)
197 }
198 _, validationError := ValidateMP4Media(ctx, bs)
199 if validationError != nil {
200 validationErrors = append(validationErrors, validationError)
201 cli.DumpDebugSegment(ctx, fmt.Sprintf("%s-invalid.mp4", fname), bytes.NewReader(bs))
202 }
203 log.Log(ctx, "validated segment file", "path", fname)
204 err = rwsc.Close()
205 if err != nil {
206 return fmt.Errorf("failed to close segment file: %w", err)
207 }
208 }
209 return nil
210 })
211
212 err = g.Wait()
213 if err != nil {
214 return fmt.Errorf("failed to split segments: %w", err)
215 }
216 if len(validationErrors) > 0 {
217 return fmt.Errorf("%d errors validating segments; first error: %w", len(validationErrors), validationErrors[0])
218 }
219 return nil
220}