Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "strings"
9
10 "go.opentelemetry.io/otel"
11 "stream.place/streamplace/pkg/aqtime"
12 "stream.place/streamplace/pkg/constants"
13 "stream.place/streamplace/pkg/crypto/signers"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/media/segchanman"
16 "stream.place/streamplace/pkg/model"
17
18 "git.stream.place/streamplace/c2pa-go/pkg/c2pa"
19)
20
21func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader) error {
22 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4")
23 defer span.End()
24 buf, err := io.ReadAll(input)
25 if err != nil {
26 return err
27 }
28 r := bytes.NewReader(buf)
29 reader, err := c2pa.FromStream(r, "video/mp4")
30 if err != nil {
31 return err
32 }
33 mani := reader.GetActiveManifest()
34 certs := reader.GetProvenanceCertChain()
35 pub, err := signers.ParseES256KCert([]byte(certs))
36 if err != nil {
37 return err
38 }
39 meta, err := ParseSegmentAssertions(ctx, mani)
40 if err != nil {
41 return err
42 }
43 mediaData, err := ParseSegmentMediaData(ctx, buf)
44 if err != nil {
45 return err
46 }
47 // special case for test signers that are only signed with a key
48 var repoDID string
49 var signingKeyDID string
50 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) {
51 signingKeyDID = meta.Creator
52 repoDID = meta.Creator
53 } else {
54 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
55 if err != nil {
56 return err
57 }
58 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
59 if err != nil {
60 return err
61 }
62 if signingKey == nil {
63 return fmt.Errorf("no signing key found for %s", pub.DIDKey())
64 }
65 repoDID = repo.DID
66 signingKeyDID = signingKey.DID
67 }
68
69 err = mm.cli.StreamIsAllowed(repoDID)
70 if err != nil {
71 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err)
72 }
73 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4")
74 if err != nil {
75 return err
76 }
77 defer fd.Close()
78 go mm.replicator.NewSegment(ctx, buf)
79 r = bytes.NewReader(buf)
80 if _, err := io.Copy(fd, r); err != nil {
81 return err
82 }
83 scmSeg := &segchanman.Seg{
84 Filepath: fd.Name(),
85 Data: buf,
86 }
87 go mm.PublishSegment(ctx, repoDID, "source", scmSeg)
88 seg := &model.Segment{
89 ID: *mani.Label,
90 SigningKeyDID: signingKeyDID,
91 RepoDID: repoDID,
92 StartTime: meta.StartTime.Time(),
93 Title: meta.Title,
94 MediaData: mediaData,
95 }
96 mm.newSegmentSubsMutex.RLock()
97 defer mm.newSegmentSubsMutex.RUnlock()
98 not := &NewSegmentNotification{
99 Segment: seg,
100 Data: buf,
101 Metadata: meta,
102 }
103 for _, ch := range mm.newSegmentSubs {
104 go func() { ch <- not }()
105 }
106 aqt := aqtime.FromTime(meta.StartTime.Time())
107 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *mani.Label)
108 return nil
109}