Live video on the AT Protocol
at eli/database-resync 109 lines 2.9 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "strings" 9 10 "go.opentelemetry.io/otel" 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/constants" 13 "stream.place/streamplace/pkg/crypto/signers" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/media/segchanman" 16 "stream.place/streamplace/pkg/model" 17 18 "git.stream.place/streamplace/c2pa-go/pkg/c2pa" 19) 20 21func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader) error { 22 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 23 defer span.End() 24 buf, err := io.ReadAll(input) 25 if err != nil { 26 return err 27 } 28 r := bytes.NewReader(buf) 29 reader, err := c2pa.FromStream(r, "video/mp4") 30 if err != nil { 31 return err 32 } 33 mani := reader.GetActiveManifest() 34 certs := reader.GetProvenanceCertChain() 35 pub, err := signers.ParseES256KCert([]byte(certs)) 36 if err != nil { 37 return err 38 } 39 meta, err := ParseSegmentAssertions(ctx, mani) 40 if err != nil { 41 return err 42 } 43 mediaData, err := ParseSegmentMediaData(ctx, buf) 44 if err != nil { 45 return err 46 } 47 // special case for test signers that are only signed with a key 48 var repoDID string 49 var signingKeyDID string 50 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 51 signingKeyDID = meta.Creator 52 repoDID = meta.Creator 53 } else { 54 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 55 if err != nil { 56 return err 57 } 58 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 59 if err != nil { 60 return err 61 } 62 if signingKey == nil { 63 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 64 } 65 repoDID = repo.DID 66 signingKeyDID = signingKey.DID 67 } 68 69 err = mm.cli.StreamIsAllowed(repoDID) 70 if err != nil { 71 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 72 } 73 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 74 if err != nil { 75 return err 76 } 77 defer fd.Close() 78 go mm.replicator.NewSegment(ctx, buf) 79 r = bytes.NewReader(buf) 80 if _, err := io.Copy(fd, r); err != nil { 81 return err 82 } 83 scmSeg := &segchanman.Seg{ 84 Filepath: fd.Name(), 85 Data: buf, 86 } 87 go mm.PublishSegment(ctx, repoDID, "source", scmSeg) 88 seg := &model.Segment{ 89 ID: *mani.Label, 90 SigningKeyDID: signingKeyDID, 91 RepoDID: repoDID, 92 StartTime: meta.StartTime.Time(), 93 Title: meta.Title, 94 MediaData: mediaData, 95 } 96 mm.newSegmentSubsMutex.RLock() 97 defer mm.newSegmentSubsMutex.RUnlock() 98 not := &NewSegmentNotification{ 99 Segment: seg, 100 Data: buf, 101 Metadata: meta, 102 } 103 for _, ch := range mm.newSegmentSubs { 104 go func() { ch <- not }() 105 } 106 aqt := aqtime.FromTime(meta.StartTime.Time()) 107 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *mani.Label) 108 return nil 109}