Live video on the AT Protocol
at eli/handle-changes 104 lines 2.7 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "strings" 9 10 "go.opentelemetry.io/otel" 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/constants" 13 "stream.place/streamplace/pkg/crypto/signers" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/model" 16 17 "git.stream.place/streamplace/c2pa-go/pkg/c2pa" 18) 19 20func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader) error { 21 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 22 defer span.End() 23 buf, err := io.ReadAll(input) 24 if err != nil { 25 return err 26 } 27 r := bytes.NewReader(buf) 28 reader, err := c2pa.FromStream(r, "video/mp4") 29 if err != nil { 30 return err 31 } 32 mani := reader.GetActiveManifest() 33 certs := reader.GetProvenanceCertChain() 34 pub, err := signers.ParseES256KCert([]byte(certs)) 35 if err != nil { 36 return err 37 } 38 meta, err := ParseSegmentAssertions(ctx, mani) 39 if err != nil { 40 return err 41 } 42 mediaData, err := ParseSegmentMediaData(ctx, buf) 43 if err != nil { 44 return err 45 } 46 // special case for test signers that are only signed with a key 47 var repoDID string 48 var signingKeyDID string 49 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 50 signingKeyDID = meta.Creator 51 repoDID = meta.Creator 52 } else { 53 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 54 if err != nil { 55 return err 56 } 57 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 58 if err != nil { 59 return err 60 } 61 if signingKey == nil { 62 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 63 } 64 repoDID = repo.DID 65 signingKeyDID = signingKey.DID 66 } 67 68 err = mm.cli.StreamIsAllowed(repoDID) 69 if err != nil { 70 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 71 } 72 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 73 if err != nil { 74 return err 75 } 76 defer fd.Close() 77 go mm.replicator.NewSegment(ctx, buf) 78 r = bytes.NewReader(buf) 79 if _, err := io.Copy(fd, r); err != nil { 80 return err 81 } 82 seg := &model.Segment{ 83 ID: *mani.Label, 84 SigningKeyDID: signingKeyDID, 85 RepoDID: repoDID, 86 StartTime: meta.StartTime.Time(), 87 Title: meta.Title, 88 Size: len(buf), 89 MediaData: mediaData, 90 } 91 mm.newSegmentSubsMutex.RLock() 92 defer mm.newSegmentSubsMutex.RUnlock() 93 not := &NewSegmentNotification{ 94 Segment: seg, 95 Data: buf, 96 Metadata: meta, 97 } 98 for _, ch := range mm.newSegmentSubs { 99 go func() { ch <- not }() 100 } 101 aqt := aqtime.FromTime(meta.StartTime.Time()) 102 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *mani.Label) 103 return nil 104}