Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "strings"
9
10 "go.opentelemetry.io/otel"
11 "stream.place/streamplace/pkg/aqtime"
12 "stream.place/streamplace/pkg/constants"
13 "stream.place/streamplace/pkg/crypto/signers"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/model"
16
17 "git.stream.place/streamplace/c2pa-go/pkg/c2pa"
18)
19
20func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader) error {
21 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4")
22 defer span.End()
23 buf, err := io.ReadAll(input)
24 if err != nil {
25 return err
26 }
27 r := bytes.NewReader(buf)
28 reader, err := c2pa.FromStream(r, "video/mp4")
29 if err != nil {
30 return err
31 }
32 mani := reader.GetActiveManifest()
33 certs := reader.GetProvenanceCertChain()
34 pub, err := signers.ParseES256KCert([]byte(certs))
35 if err != nil {
36 return err
37 }
38 meta, err := ParseSegmentAssertions(ctx, mani)
39 if err != nil {
40 return err
41 }
42 mediaData, err := ParseSegmentMediaData(ctx, buf)
43 if err != nil {
44 return err
45 }
46 // special case for test signers that are only signed with a key
47 var repoDID string
48 var signingKeyDID string
49 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) {
50 signingKeyDID = meta.Creator
51 repoDID = meta.Creator
52 } else {
53 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
54 if err != nil {
55 return err
56 }
57 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
58 if err != nil {
59 return err
60 }
61 if signingKey == nil {
62 return fmt.Errorf("no signing key found for %s", pub.DIDKey())
63 }
64 repoDID = repo.DID
65 signingKeyDID = signingKey.DID
66 }
67
68 err = mm.cli.StreamIsAllowed(repoDID)
69 if err != nil {
70 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err)
71 }
72 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4")
73 if err != nil {
74 return err
75 }
76 defer fd.Close()
77 go mm.replicator.NewSegment(ctx, buf)
78 r = bytes.NewReader(buf)
79 if _, err := io.Copy(fd, r); err != nil {
80 return err
81 }
82 seg := &model.Segment{
83 ID: *mani.Label,
84 SigningKeyDID: signingKeyDID,
85 RepoDID: repoDID,
86 StartTime: meta.StartTime.Time(),
87 Title: meta.Title,
88 Size: len(buf),
89 MediaData: mediaData,
90 }
91 mm.newSegmentSubsMutex.RLock()
92 defer mm.newSegmentSubsMutex.RUnlock()
93 not := &NewSegmentNotification{
94 Segment: seg,
95 Data: buf,
96 Metadata: meta,
97 }
98 for _, ch := range mm.newSegmentSubs {
99 go func() { ch <- not }()
100 }
101 aqt := aqtime.FromTime(meta.StartTime.Time())
102 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *mani.Label)
103 return nil
104}