Live video on the AT Protocol
at next 220 lines 6.0 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "sort" 11 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/aqio" 14 c2patypes "stream.place/streamplace/pkg/c2patypes" 15 "stream.place/streamplace/pkg/config" 16 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 17 "stream.place/streamplace/pkg/log" 18) 19 20type SplitSegment struct { 21 Filename string 22 Data []byte 23} 24 25type ManifestResult struct { 26 Manifests map[string]c2patypes.Manifest `json:"manifests"` 27 Certs map[string]string `json:"certs"` 28} 29 30type ManifestAndMetadata struct { 31 Manifest c2patypes.Manifest 32 SegmentMetadata *SegmentMetadata 33} 34 35type ReadWriteSeekCloser interface { 36 io.ReadWriteSeeker 37 io.Closer 38} 39 40type SegmentToSign struct { 41 unsignedSeg io.ReadSeeker 42 manifestId string 43 cert []byte 44 outputSeg io.ReadWriteSeeker 45 closeCh chan struct{} 46} 47 48func NewSegmentToSign(unsignedSeg io.ReadSeeker, manifestId string, cert []byte, outputSeg io.ReadWriteSeeker) *SegmentToSign { 49 return &SegmentToSign{ 50 unsignedSeg: unsignedSeg, 51 manifestId: manifestId, 52 cert: cert, 53 outputSeg: outputSeg, 54 closeCh: make(chan struct{}), 55 } 56} 57 58func (s *SegmentToSign) UnsignedSegStream() iroh_streamplace.Stream { 59 return c2patypes.NewReader(s.unsignedSeg) 60} 61 62func (s *SegmentToSign) ManifestId() string { 63 return s.manifestId 64} 65 66func (s *SegmentToSign) Cert() []byte { 67 return s.cert 68} 69 70func (s *SegmentToSign) OutputSegStream() iroh_streamplace.Stream { 71 return c2patypes.NewWriter(s.outputSeg) 72} 73 74func (s *SegmentToSign) Close() { 75 close(s.closeCh) 76} 77 78func (s *SegmentToSign) Done() { 79 <-s.closeCh 80} 81 82type ManySegmentsToSign struct { 83 segmentCh chan iroh_streamplace.SegmentToSign 84 readyCh chan struct{} 85} 86 87func (m *ManySegmentsToSign) Next() *iroh_streamplace.SegmentToSign { 88 if m.readyCh != nil { 89 close(m.readyCh) 90 m.readyCh = nil 91 } 92 seg := <-m.segmentCh 93 if seg == nil { 94 return nil 95 } 96 return &seg 97} 98 99// split a signed concatenated mp4 into its constituent signed segments 100func SplitSegments(ctx context.Context, cli *config.CLI, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error { 101 manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(input)) 102 if err != nil { 103 return fmt.Errorf("failed to get manifests: %w", err) 104 } 105 _, err = input.Seek(0, io.SeekStart) 106 if err != nil { 107 return fmt.Errorf("failed to seek to start: %w", err) 108 } 109 var manifests ManifestResult 110 err = json.Unmarshal([]byte(manifestsStr), &manifests) 111 if err != nil { 112 return fmt.Errorf("failed to unmarshal manifests: %w", err) 113 } 114 manifestList := []ManifestAndMetadata{} 115 for _, manifest := range manifests.Manifests { 116 metadata, err := ParseSegmentAssertions(context.Background(), &manifest) 117 if errors.Is(err, ErrMissingMetadata) { 118 log.Error(ctx, "missing metadata", "manifest", manifest.Label) 119 continue 120 } 121 if err != nil { 122 return fmt.Errorf("failed to parse segment assertions: %w", err) 123 } 124 manifestList = append(manifestList, ManifestAndMetadata{ 125 Manifest: manifest, 126 SegmentMetadata: metadata, 127 }) 128 } 129 if len(manifestList) == 0 { 130 return fmt.Errorf("no manifests found") 131 } 132 sort.Slice(manifestList, func(i, j int) bool { 133 m1 := manifestList[i] 134 m2 := manifestList[j] 135 return m1.SegmentMetadata.StartTime.Time().Before(m2.SegmentMetadata.StartTime.Time()) 136 }) 137 certList := [][]byte{} 138 for _, manifest := range manifestList { 139 certList = append(certList, []byte(manifests.Certs[*manifest.Manifest.Label])) 140 } 141 142 segmentCh := make(chan iroh_streamplace.SegmentToSign) 143 readyCh := make(chan struct{}) 144 mss := &ManySegmentsToSign{ 145 segmentCh: segmentCh, 146 readyCh: readyCh, 147 } 148 g, ctx := errgroup.WithContext(ctx) 149 unsignedCh := make(chan *SplitSegment) 150 streamer := manifestList[0].SegmentMetadata.Creator 151 152 // note: we're passing the input to two places here and need to make sure 153 // they're not running into problems with concurrent seeking. so we use 154 // this readyCh as a hack - it only fires after Rust is done with the input 155 156 g.Go(func() error { 157 err := iroh_streamplace.Resign(mss, c2patypes.NewReader(input)) 158 if err != nil { 159 return fmt.Errorf("failed to resign segments: %w", err) 160 } 161 return nil 162 }) 163 g.Go(func() error { 164 defer close(unsignedCh) 165 <-readyCh 166 // rust is done with the input, rewind and start segmenting 167 _, err := input.Seek(0, io.SeekStart) 168 if err != nil { 169 return fmt.Errorf("failed to seek to start: %w", err) 170 } 171 err = SegmentUnsigned(ctx, cli, streamer, input, true, unsignedCh) 172 if err != nil { 173 return fmt.Errorf("failed to segment file: %w", err) 174 } 175 return nil 176 }) 177 validationErrors := []error{} 178 g.Go(func() error { 179 defer close(segmentCh) 180 i := 0 181 for unsignedSeg := range unsignedCh { 182 meta := manifestList[i].SegmentMetadata 183 fname := fmt.Sprintf("%s.mp4", meta.StartTime.FileSafeString()) 184 rwsc := cb(fname) 185 rws := aqio.NewReadWriteSeeker(unsignedSeg.Data) 186 ss := NewSegmentToSign(c2patypes.NewReader(rws), *manifestList[i].Manifest.Label, certList[i], rwsc) 187 i += 1 188 segmentCh <- ss 189 ss.Done() 190 _, err := rwsc.Seek(0, io.SeekStart) 191 if err != nil { 192 return fmt.Errorf("failed to seek to start: %w", err) 193 } 194 bs, err := io.ReadAll(rwsc) 195 if err != nil { 196 return fmt.Errorf("failed to read segment file: %w", err) 197 } 198 _, validationError := ValidateMP4Media(ctx, bs) 199 if validationError != nil { 200 validationErrors = append(validationErrors, validationError) 201 cli.DumpDebugSegment(ctx, fmt.Sprintf("%s-invalid.mp4", fname), bytes.NewReader(bs)) 202 } 203 log.Log(ctx, "validated segment file", "path", fname) 204 err = rwsc.Close() 205 if err != nil { 206 return fmt.Errorf("failed to close segment file: %w", err) 207 } 208 } 209 return nil 210 }) 211 212 err = g.Wait() 213 if err != nil { 214 return fmt.Errorf("failed to split segments: %w", err) 215 } 216 if len(validationErrors) > 0 { 217 return fmt.Errorf("%d errors validating segments; first error: %w", len(validationErrors), validationErrors[0]) 218 } 219 return nil 220}