Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "sort"
10
11 "golang.org/x/sync/errgroup"
12 "stream.place/streamplace/pkg/aqio"
13 c2patypes "stream.place/streamplace/pkg/c2patypes"
14 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
15 "stream.place/streamplace/pkg/log"
16)
17
18type SplitSegment struct {
19 Filename string
20 Data []byte
21}
22
23type ManifestResult struct {
24 Manifests map[string]c2patypes.Manifest `json:"manifests"`
25 Certs map[string]string `json:"certs"`
26}
27
28type ManifestAndMetadata struct {
29 Manifest c2patypes.Manifest
30 SegmentMetadata *SegmentMetadata
31}
32
33type ReadWriteSeekCloser interface {
34 io.ReadWriteSeeker
35 io.Closer
36}
37
38type SegmentToSign struct {
39 unsignedSeg io.ReadSeeker
40 manifestId string
41 cert []byte
42 outputSeg io.ReadWriteSeeker
43 closeCh chan struct{}
44}
45
46func NewSegmentToSign(unsignedSeg io.ReadSeeker, manifestId string, cert []byte, outputSeg io.ReadWriteSeeker) *SegmentToSign {
47 return &SegmentToSign{
48 unsignedSeg: unsignedSeg,
49 manifestId: manifestId,
50 cert: cert,
51 outputSeg: outputSeg,
52 closeCh: make(chan struct{}),
53 }
54}
55
56func (s *SegmentToSign) UnsignedSegStream() iroh_streamplace.Stream {
57 return c2patypes.NewReader(s.unsignedSeg)
58}
59
60func (s *SegmentToSign) ManifestId() string {
61 return s.manifestId
62}
63
64func (s *SegmentToSign) Cert() []byte {
65 return s.cert
66}
67
68func (s *SegmentToSign) OutputSegStream() iroh_streamplace.Stream {
69 return c2patypes.NewWriter(s.outputSeg)
70}
71
72func (s *SegmentToSign) Close() {
73 close(s.closeCh)
74}
75
76func (s *SegmentToSign) Done() {
77 <-s.closeCh
78}
79
80type ManySegmentsToSign struct {
81 segmentCh chan iroh_streamplace.SegmentToSign
82 readyCh chan struct{}
83}
84
85func (m *ManySegmentsToSign) Next() *iroh_streamplace.SegmentToSign {
86 if m.readyCh != nil {
87 close(m.readyCh)
88 m.readyCh = nil
89 }
90 seg := <-m.segmentCh
91 if seg == nil {
92 return nil
93 }
94 return &seg
95}
96
97// split a signed concatenated mp4 into its constituent signed segments
98func SplitSegments(ctx context.Context, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error {
99 manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(input))
100 if err != nil {
101 return fmt.Errorf("failed to get manifests: %w", err)
102 }
103 _, err = input.Seek(0, io.SeekStart)
104 if err != nil {
105 return fmt.Errorf("failed to seek to start: %w", err)
106 }
107 var manifests ManifestResult
108 err = json.Unmarshal([]byte(manifestsStr), &manifests)
109 if err != nil {
110 return fmt.Errorf("failed to unmarshal manifests: %w", err)
111 }
112 manifestList := []ManifestAndMetadata{}
113 for _, manifest := range manifests.Manifests {
114 metadata, err := ParseSegmentAssertions(context.Background(), &manifest)
115 if errors.Is(err, ErrMissingMetadata) {
116 log.Error(ctx, "missing metadata", "manifest", manifest.Label)
117 continue
118 }
119 if err != nil {
120 return fmt.Errorf("failed to parse segment assertions: %w", err)
121 }
122 manifestList = append(manifestList, ManifestAndMetadata{
123 Manifest: manifest,
124 SegmentMetadata: metadata,
125 })
126 }
127 sort.Slice(manifestList, func(i, j int) bool {
128 m1 := manifestList[i]
129 m2 := manifestList[j]
130 return m1.SegmentMetadata.StartTime.Time().Before(m2.SegmentMetadata.StartTime.Time())
131 })
132 certList := [][]byte{}
133 for _, manifest := range manifestList {
134 certList = append(certList, []byte(manifests.Certs[*manifest.Manifest.Label]))
135 }
136
137 segmentCh := make(chan iroh_streamplace.SegmentToSign)
138 readyCh := make(chan struct{})
139 mss := &ManySegmentsToSign{
140 segmentCh: segmentCh,
141 readyCh: readyCh,
142 }
143 g, ctx := errgroup.WithContext(ctx)
144 unsignedCh := make(chan *SplitSegment)
145
146 // note: we're passing the input to two places here and need to make sure
147 // they're not running into problems with concurrent seeking. so we use
148 // this readyCh as a hack - it only fires after Rust is done with the input
149
150 g.Go(func() error {
151 err := iroh_streamplace.Resign(mss, c2patypes.NewReader(input))
152 if err != nil {
153 return fmt.Errorf("failed to resign segments: %w", err)
154 }
155 return nil
156 })
157 g.Go(func() error {
158 defer close(unsignedCh)
159 <-readyCh
160 // rust is done with the input, rewind and start segmenting
161 _, err := input.Seek(0, io.SeekStart)
162 if err != nil {
163 return fmt.Errorf("failed to seek to start: %w", err)
164 }
165 err = SegmentUnsigned(ctx, input, unsignedCh)
166 if err != nil {
167 return fmt.Errorf("failed to segment file: %w", err)
168 }
169 return nil
170 })
171 validationErrors := []error{}
172 g.Go(func() error {
173 defer close(segmentCh)
174 i := 0
175 for unsignedSeg := range unsignedCh {
176 meta := manifestList[i].SegmentMetadata
177 fname := fmt.Sprintf("%s.mp4", meta.StartTime.FileSafeString())
178 rwsc := cb(fname)
179 rws := aqio.NewReadWriteSeeker(unsignedSeg.Data)
180 ss := NewSegmentToSign(c2patypes.NewReader(rws), *manifestList[i].Manifest.Label, certList[i], rwsc)
181 i += 1
182 segmentCh <- ss
183 ss.Done()
184 _, err := rwsc.Seek(0, io.SeekStart)
185 if err != nil {
186 return fmt.Errorf("failed to seek to start: %w", err)
187 }
188 bs, err := io.ReadAll(rwsc)
189 if err != nil {
190 return fmt.Errorf("failed to read segment file: %w", err)
191 }
192 _, validationError := ValidateMP4Media(ctx, bs)
193 if validationError != nil {
194 validationErrors = append(validationErrors, validationError)
195 }
196 log.Log(ctx, "validated segment file", "path", fname)
197 err = rwsc.Close()
198 if err != nil {
199 return fmt.Errorf("failed to close segment file: %w", err)
200 }
201 }
202 return nil
203 })
204
205 err = g.Wait()
206 if err != nil {
207 return fmt.Errorf("failed to split segments: %w", err)
208 }
209 if len(validationErrors) > 0 {
210 return fmt.Errorf("%d errors validating segments; first error: %w", len(validationErrors), validationErrors[0])
211 }
212 return nil
213}