Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "sort"
11
12 "golang.org/x/sync/errgroup"
13 "stream.place/streamplace/pkg/aqio"
14 c2patypes "stream.place/streamplace/pkg/c2patypes"
15 "stream.place/streamplace/pkg/config"
16 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
17 "stream.place/streamplace/pkg/log"
18)
19
20type SplitSegment struct {
21 Filename string
22 Data []byte
23}
24
25type ManifestResult struct {
26 Manifests map[string]c2patypes.Manifest `json:"manifests"`
27 Certs map[string]string `json:"certs"`
28}
29
30type ManifestAndMetadata struct {
31 Manifest c2patypes.Manifest
32 SegmentMetadata *SegmentMetadata
33}
34
35type ReadWriteSeekCloser interface {
36 io.ReadWriteSeeker
37 io.Closer
38}
39
40type SegmentToSign struct {
41 unsignedSeg io.ReadSeeker
42 manifestId string
43 cert []byte
44 outputSeg io.ReadWriteSeeker
45 closeCh chan struct{}
46}
47
48func NewSegmentToSign(unsignedSeg io.ReadSeeker, manifestId string, cert []byte, outputSeg io.ReadWriteSeeker) *SegmentToSign {
49 return &SegmentToSign{
50 unsignedSeg: unsignedSeg,
51 manifestId: manifestId,
52 cert: cert,
53 outputSeg: outputSeg,
54 closeCh: make(chan struct{}),
55 }
56}
57
58func (s *SegmentToSign) UnsignedSegStream() iroh_streamplace.Stream {
59 return c2patypes.NewReader(s.unsignedSeg)
60}
61
62func (s *SegmentToSign) ManifestId() string {
63 return s.manifestId
64}
65
66func (s *SegmentToSign) Cert() []byte {
67 return s.cert
68}
69
70func (s *SegmentToSign) OutputSegStream() iroh_streamplace.Stream {
71 return c2patypes.NewWriter(s.outputSeg)
72}
73
74func (s *SegmentToSign) Close() {
75 close(s.closeCh)
76}
77
78func (s *SegmentToSign) Done() {
79 <-s.closeCh
80}
81
82type ManySegmentsToSign struct {
83 segmentCh chan iroh_streamplace.SegmentToSign
84 readyCh chan struct{}
85}
86
87func (m *ManySegmentsToSign) Next() *iroh_streamplace.SegmentToSign {
88 if m.readyCh != nil {
89 close(m.readyCh)
90 m.readyCh = nil
91 }
92 seg := <-m.segmentCh
93 if seg == nil {
94 return nil
95 }
96 return &seg
97}
98
99// split a signed concatenated mp4 into its constituent signed segments
100func SplitSegments(ctx context.Context, cli *config.CLI, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error {
101 manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(input))
102 if err != nil {
103 return fmt.Errorf("failed to get manifests: %w", err)
104 }
105 _, err = input.Seek(0, io.SeekStart)
106 if err != nil {
107 return fmt.Errorf("failed to seek to start: %w", err)
108 }
109 var manifests ManifestResult
110 err = json.Unmarshal([]byte(manifestsStr), &manifests)
111 if err != nil {
112 return fmt.Errorf("failed to unmarshal manifests: %w", err)
113 }
114 manifestList := []ManifestAndMetadata{}
115 for _, manifest := range manifests.Manifests {
116 metadata, err := ParseSegmentAssertions(context.Background(), &manifest)
117 if errors.Is(err, ErrMissingMetadata) {
118 log.Error(ctx, "missing metadata", "manifest", manifest.Label)
119 continue
120 }
121 if err != nil {
122 return fmt.Errorf("failed to parse segment assertions: %w", err)
123 }
124 manifestList = append(manifestList, ManifestAndMetadata{
125 Manifest: manifest,
126 SegmentMetadata: metadata,
127 })
128 }
129 sort.Slice(manifestList, func(i, j int) bool {
130 m1 := manifestList[i]
131 m2 := manifestList[j]
132 return m1.SegmentMetadata.StartTime.Time().Before(m2.SegmentMetadata.StartTime.Time())
133 })
134 certList := [][]byte{}
135 for _, manifest := range manifestList {
136 certList = append(certList, []byte(manifests.Certs[*manifest.Manifest.Label]))
137 }
138
139 segmentCh := make(chan iroh_streamplace.SegmentToSign)
140 readyCh := make(chan struct{})
141 mss := &ManySegmentsToSign{
142 segmentCh: segmentCh,
143 readyCh: readyCh,
144 }
145 g, ctx := errgroup.WithContext(ctx)
146 unsignedCh := make(chan *SplitSegment)
147 streamer := manifestList[0].SegmentMetadata.Creator
148
149 // note: we're passing the input to two places here and need to make sure
150 // they're not running into problems with concurrent seeking. so we use
151 // this readyCh as a hack - it only fires after Rust is done with the input
152
153 g.Go(func() error {
154 err := iroh_streamplace.Resign(mss, c2patypes.NewReader(input))
155 if err != nil {
156 return fmt.Errorf("failed to resign segments: %w", err)
157 }
158 return nil
159 })
160 g.Go(func() error {
161 defer close(unsignedCh)
162 <-readyCh
163 // rust is done with the input, rewind and start segmenting
164 _, err := input.Seek(0, io.SeekStart)
165 if err != nil {
166 return fmt.Errorf("failed to seek to start: %w", err)
167 }
168 err = SegmentUnsigned(ctx, cli, streamer, input, true, unsignedCh)
169 if err != nil {
170 return fmt.Errorf("failed to segment file: %w", err)
171 }
172 return nil
173 })
174 validationErrors := []error{}
175 g.Go(func() error {
176 defer close(segmentCh)
177 i := 0
178 for unsignedSeg := range unsignedCh {
179 meta := manifestList[i].SegmentMetadata
180 fname := fmt.Sprintf("%s.mp4", meta.StartTime.FileSafeString())
181 rwsc := cb(fname)
182 rws := aqio.NewReadWriteSeeker(unsignedSeg.Data)
183 ss := NewSegmentToSign(c2patypes.NewReader(rws), *manifestList[i].Manifest.Label, certList[i], rwsc)
184 i += 1
185 segmentCh <- ss
186 ss.Done()
187 _, err := rwsc.Seek(0, io.SeekStart)
188 if err != nil {
189 return fmt.Errorf("failed to seek to start: %w", err)
190 }
191 bs, err := io.ReadAll(rwsc)
192 if err != nil {
193 return fmt.Errorf("failed to read segment file: %w", err)
194 }
195 _, validationError := ValidateMP4Media(ctx, bs)
196 if validationError != nil {
197 validationErrors = append(validationErrors, validationError)
198 cli.DumpDebugSegment(ctx, fmt.Sprintf("%s-invalid.mp4", fname), bytes.NewReader(bs))
199 }
200 log.Log(ctx, "validated segment file", "path", fname)
201 err = rwsc.Close()
202 if err != nil {
203 return fmt.Errorf("failed to close segment file: %w", err)
204 }
205 }
206 return nil
207 })
208
209 err = g.Wait()
210 if err != nil {
211 return fmt.Errorf("failed to split segments: %w", err)
212 }
213 if len(validationErrors) > 0 {
214 return fmt.Errorf("%d errors validating segments; first error: %w", len(validationErrors), validationErrors[0])
215 }
216 return nil
217}