Live video on the AT Protocol
at eli/rust-experimentation 213 lines 5.7 kB view raw
1package media 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "sort" 10 11 "golang.org/x/sync/errgroup" 12 "stream.place/streamplace/pkg/aqio" 13 c2patypes "stream.place/streamplace/pkg/c2patypes" 14 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 15 "stream.place/streamplace/pkg/log" 16) 17 18type SplitSegment struct { 19 Filename string 20 Data []byte 21} 22 23type ManifestResult struct { 24 Manifests map[string]c2patypes.Manifest `json:"manifests"` 25 Certs map[string]string `json:"certs"` 26} 27 28type ManifestAndMetadata struct { 29 Manifest c2patypes.Manifest 30 SegmentMetadata *SegmentMetadata 31} 32 33type ReadWriteSeekCloser interface { 34 io.ReadWriteSeeker 35 io.Closer 36} 37 38type SegmentToSign struct { 39 unsignedSeg io.ReadSeeker 40 manifestId string 41 cert []byte 42 outputSeg io.ReadWriteSeeker 43 closeCh chan struct{} 44} 45 46func NewSegmentToSign(unsignedSeg io.ReadSeeker, manifestId string, cert []byte, outputSeg io.ReadWriteSeeker) *SegmentToSign { 47 return &SegmentToSign{ 48 unsignedSeg: unsignedSeg, 49 manifestId: manifestId, 50 cert: cert, 51 outputSeg: outputSeg, 52 closeCh: make(chan struct{}), 53 } 54} 55 56func (s *SegmentToSign) UnsignedSegStream() iroh_streamplace.Stream { 57 return c2patypes.NewReader(s.unsignedSeg) 58} 59 60func (s *SegmentToSign) ManifestId() string { 61 return s.manifestId 62} 63 64func (s *SegmentToSign) Cert() []byte { 65 return s.cert 66} 67 68func (s *SegmentToSign) OutputSegStream() iroh_streamplace.Stream { 69 return c2patypes.NewWriter(s.outputSeg) 70} 71 72func (s *SegmentToSign) Close() { 73 close(s.closeCh) 74} 75 76func (s *SegmentToSign) Done() { 77 <-s.closeCh 78} 79 80type ManySegmentsToSign struct { 81 segmentCh chan iroh_streamplace.SegmentToSign 82 readyCh chan struct{} 83} 84 85func (m *ManySegmentsToSign) Next() *iroh_streamplace.SegmentToSign { 86 if m.readyCh != nil { 87 close(m.readyCh) 88 m.readyCh = nil 89 } 90 seg := <-m.segmentCh 91 if seg == nil { 92 return nil 93 } 94 return &seg 95} 96 97// split a signed concatenated mp4 into its constituent signed segments 98func SplitSegments(ctx context.Context, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error { 99 manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(input)) 100 if err != nil { 101 return fmt.Errorf("failed to get manifests: %w", err) 102 } 103 _, err = input.Seek(0, io.SeekStart) 104 if err != nil { 105 return fmt.Errorf("failed to seek to start: %w", err) 106 } 107 var manifests ManifestResult 108 err = json.Unmarshal([]byte(manifestsStr), &manifests) 109 if err != nil { 110 return fmt.Errorf("failed to unmarshal manifests: %w", err) 111 } 112 manifestList := []ManifestAndMetadata{} 113 for _, manifest := range manifests.Manifests { 114 metadata, err := ParseSegmentAssertions(context.Background(), &manifest) 115 if errors.Is(err, ErrMissingMetadata) { 116 log.Error(ctx, "missing metadata", "manifest", manifest.Label) 117 continue 118 } 119 if err != nil { 120 return fmt.Errorf("failed to parse segment assertions: %w", err) 121 } 122 manifestList = append(manifestList, ManifestAndMetadata{ 123 Manifest: manifest, 124 SegmentMetadata: metadata, 125 }) 126 } 127 sort.Slice(manifestList, func(i, j int) bool { 128 m1 := manifestList[i] 129 m2 := manifestList[j] 130 return m1.SegmentMetadata.StartTime.Time().Before(m2.SegmentMetadata.StartTime.Time()) 131 }) 132 certList := [][]byte{} 133 for _, manifest := range manifestList { 134 certList = append(certList, []byte(manifests.Certs[*manifest.Manifest.Label])) 135 } 136 137 segmentCh := make(chan iroh_streamplace.SegmentToSign) 138 readyCh := make(chan struct{}) 139 mss := &ManySegmentsToSign{ 140 segmentCh: segmentCh, 141 readyCh: readyCh, 142 } 143 g, ctx := errgroup.WithContext(ctx) 144 unsignedCh := make(chan *SplitSegment) 145 146 // note: we're passing the input to two places here and need to make sure 147 // they're not running into problems with concurrent seeking. so we use 148 // this readyCh as a hack - it only fires after Rust is done with the input 149 150 g.Go(func() error { 151 err := iroh_streamplace.Resign(mss, c2patypes.NewReader(input)) 152 if err != nil { 153 return fmt.Errorf("failed to resign segments: %w", err) 154 } 155 return nil 156 }) 157 g.Go(func() error { 158 defer close(unsignedCh) 159 <-readyCh 160 // rust is done with the input, rewind and start segmenting 161 _, err := input.Seek(0, io.SeekStart) 162 if err != nil { 163 return fmt.Errorf("failed to seek to start: %w", err) 164 } 165 err = SegmentUnsigned(ctx, input, unsignedCh) 166 if err != nil { 167 return fmt.Errorf("failed to segment file: %w", err) 168 } 169 return nil 170 }) 171 validationErrors := []error{} 172 g.Go(func() error { 173 defer close(segmentCh) 174 i := 0 175 for unsignedSeg := range unsignedCh { 176 meta := manifestList[i].SegmentMetadata 177 fname := fmt.Sprintf("%s.mp4", meta.StartTime.FileSafeString()) 178 rwsc := cb(fname) 179 rws := aqio.NewReadWriteSeeker(unsignedSeg.Data) 180 ss := NewSegmentToSign(c2patypes.NewReader(rws), *manifestList[i].Manifest.Label, certList[i], rwsc) 181 i += 1 182 segmentCh <- ss 183 ss.Done() 184 _, err := rwsc.Seek(0, io.SeekStart) 185 if err != nil { 186 return fmt.Errorf("failed to seek to start: %w", err) 187 } 188 bs, err := io.ReadAll(rwsc) 189 if err != nil { 190 return fmt.Errorf("failed to read segment file: %w", err) 191 } 192 _, validationError := ValidateMP4Media(ctx, bs) 193 if validationError != nil { 194 validationErrors = append(validationErrors, validationError) 195 } 196 log.Log(ctx, "validated segment file", "path", fname) 197 err = rwsc.Close() 198 if err != nil { 199 return fmt.Errorf("failed to close segment file: %w", err) 200 } 201 } 202 return nil 203 }) 204 205 err = g.Wait() 206 if err != nil { 207 return fmt.Errorf("failed to split segments: %w", err) 208 } 209 if len(validationErrors) > 0 { 210 return fmt.Errorf("%d errors validating segments; first error: %w", len(validationErrors), validationErrors[0]) 211 } 212 return nil 213}