Live video on the AT Protocol
at natb/sync-client-time 217 lines 5.9 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "sort" 11 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/aqio" 14 c2patypes "stream.place/streamplace/pkg/c2patypes" 15 "stream.place/streamplace/pkg/config" 16 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 17 "stream.place/streamplace/pkg/log" 18) 19 20type SplitSegment struct { 21 Filename string 22 Data []byte 23} 24 25type ManifestResult struct { 26 Manifests map[string]c2patypes.Manifest `json:"manifests"` 27 Certs map[string]string `json:"certs"` 28} 29 30type ManifestAndMetadata struct { 31 Manifest c2patypes.Manifest 32 SegmentMetadata *SegmentMetadata 33} 34 35type ReadWriteSeekCloser interface { 36 io.ReadWriteSeeker 37 io.Closer 38} 39 40type SegmentToSign struct { 41 unsignedSeg io.ReadSeeker 42 manifestId string 43 cert []byte 44 outputSeg io.ReadWriteSeeker 45 closeCh chan struct{} 46} 47 48func NewSegmentToSign(unsignedSeg io.ReadSeeker, manifestId string, cert []byte, outputSeg io.ReadWriteSeeker) *SegmentToSign { 49 return &SegmentToSign{ 50 unsignedSeg: unsignedSeg, 51 manifestId: manifestId, 52 cert: cert, 53 outputSeg: outputSeg, 54 closeCh: make(chan struct{}), 55 } 56} 57 58func (s *SegmentToSign) UnsignedSegStream() iroh_streamplace.Stream { 59 return c2patypes.NewReader(s.unsignedSeg) 60} 61 62func (s *SegmentToSign) ManifestId() string { 63 return s.manifestId 64} 65 66func (s *SegmentToSign) Cert() []byte { 67 return s.cert 68} 69 70func (s *SegmentToSign) OutputSegStream() iroh_streamplace.Stream { 71 return c2patypes.NewWriter(s.outputSeg) 72} 73 74func (s *SegmentToSign) Close() { 75 close(s.closeCh) 76} 77 78func (s *SegmentToSign) Done() { 79 <-s.closeCh 80} 81 82type ManySegmentsToSign struct { 83 segmentCh chan iroh_streamplace.SegmentToSign 84 readyCh chan struct{} 85} 86 87func (m *ManySegmentsToSign) Next() *iroh_streamplace.SegmentToSign { 88 if m.readyCh != nil { 89 close(m.readyCh) 90 m.readyCh = nil 91 } 92 seg := <-m.segmentCh 93 if seg == nil { 94 return nil 95 } 96 return &seg 97} 98 99// split a signed concatenated mp4 into its constituent signed segments 100func SplitSegments(ctx context.Context, cli *config.CLI, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error { 101 manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(input)) 102 if err != nil { 103 return fmt.Errorf("failed to get manifests: %w", err) 104 } 105 _, err = input.Seek(0, io.SeekStart) 106 if err != nil { 107 return fmt.Errorf("failed to seek to start: %w", err) 108 } 109 var manifests ManifestResult 110 err = json.Unmarshal([]byte(manifestsStr), &manifests) 111 if err != nil { 112 return fmt.Errorf("failed to unmarshal manifests: %w", err) 113 } 114 manifestList := []ManifestAndMetadata{} 115 for _, manifest := range manifests.Manifests { 116 metadata, err := ParseSegmentAssertions(context.Background(), &manifest) 117 if errors.Is(err, ErrMissingMetadata) { 118 log.Error(ctx, "missing metadata", "manifest", manifest.Label) 119 continue 120 } 121 if err != nil { 122 return fmt.Errorf("failed to parse segment assertions: %w", err) 123 } 124 manifestList = append(manifestList, ManifestAndMetadata{ 125 Manifest: manifest, 126 SegmentMetadata: metadata, 127 }) 128 } 129 sort.Slice(manifestList, func(i, j int) bool { 130 m1 := manifestList[i] 131 m2 := manifestList[j] 132 return m1.SegmentMetadata.StartTime.Time().Before(m2.SegmentMetadata.StartTime.Time()) 133 }) 134 certList := [][]byte{} 135 for _, manifest := range manifestList { 136 certList = append(certList, []byte(manifests.Certs[*manifest.Manifest.Label])) 137 } 138 139 segmentCh := make(chan iroh_streamplace.SegmentToSign) 140 readyCh := make(chan struct{}) 141 mss := &ManySegmentsToSign{ 142 segmentCh: segmentCh, 143 readyCh: readyCh, 144 } 145 g, ctx := errgroup.WithContext(ctx) 146 unsignedCh := make(chan *SplitSegment) 147 streamer := manifestList[0].SegmentMetadata.Creator 148 149 // note: we're passing the input to two places here and need to make sure 150 // they're not running into problems with concurrent seeking. so we use 151 // this readyCh as a hack - it only fires after Rust is done with the input 152 153 g.Go(func() error { 154 err := iroh_streamplace.Resign(mss, c2patypes.NewReader(input)) 155 if err != nil { 156 return fmt.Errorf("failed to resign segments: %w", err) 157 } 158 return nil 159 }) 160 g.Go(func() error { 161 defer close(unsignedCh) 162 <-readyCh 163 // rust is done with the input, rewind and start segmenting 164 _, err := input.Seek(0, io.SeekStart) 165 if err != nil { 166 return fmt.Errorf("failed to seek to start: %w", err) 167 } 168 err = SegmentUnsigned(ctx, cli, streamer, input, true, unsignedCh) 169 if err != nil { 170 return fmt.Errorf("failed to segment file: %w", err) 171 } 172 return nil 173 }) 174 validationErrors := []error{} 175 g.Go(func() error { 176 defer close(segmentCh) 177 i := 0 178 for unsignedSeg := range unsignedCh { 179 meta := manifestList[i].SegmentMetadata 180 fname := fmt.Sprintf("%s.mp4", meta.StartTime.FileSafeString()) 181 rwsc := cb(fname) 182 rws := aqio.NewReadWriteSeeker(unsignedSeg.Data) 183 ss := NewSegmentToSign(c2patypes.NewReader(rws), *manifestList[i].Manifest.Label, certList[i], rwsc) 184 i += 1 185 segmentCh <- ss 186 ss.Done() 187 _, err := rwsc.Seek(0, io.SeekStart) 188 if err != nil { 189 return fmt.Errorf("failed to seek to start: %w", err) 190 } 191 bs, err := io.ReadAll(rwsc) 192 if err != nil { 193 return fmt.Errorf("failed to read segment file: %w", err) 194 } 195 _, validationError := ValidateMP4Media(ctx, bs) 196 if validationError != nil { 197 validationErrors = append(validationErrors, validationError) 198 cli.DumpDebugSegment(ctx, fmt.Sprintf("%s-invalid.mp4", fname), bytes.NewReader(bs)) 199 } 200 log.Log(ctx, "validated segment file", "path", fname) 201 err = rwsc.Close() 202 if err != nil { 203 return fmt.Errorf("failed to close segment file: %w", err) 204 } 205 } 206 return nil 207 }) 208 209 err = g.Wait() 210 if err != nil { 211 return fmt.Errorf("failed to split segments: %w", err) 212 } 213 if len(validationErrors) > 0 { 214 return fmt.Errorf("%d errors validating segments; first error: %w", len(validationErrors), validationErrors[0]) 215 } 216 return nil 217}