Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "strings"
10
11 "go.opentelemetry.io/otel"
12 "stream.place/streamplace/pkg/aqtime"
13 c2patypes "stream.place/streamplace/pkg/c2patypes"
14 "stream.place/streamplace/pkg/constants"
15 "stream.place/streamplace/pkg/crypto/signers"
16 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/model"
19)
20
21type ManifestAndCert struct {
22 Manifest c2patypes.Manifest `json:"manifest"`
23 Cert string `json:"cert"`
24}
25
26func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader) error {
27 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4")
28 defer span.End()
29 buf, err := io.ReadAll(input)
30 if err != nil {
31 return err
32 }
33 var maniCert ManifestAndCert
34 maniStr, rustErr := iroh_streamplace.GetManifestAndCert(buf)
35 if rustErr.AsError() != nil {
36 return rustErr.AsError()
37 }
38 err = json.Unmarshal([]byte(maniStr), &maniCert)
39 if err != nil {
40 return err
41 }
42 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert))
43 if err != nil {
44 return err
45 }
46 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest)
47 if err != nil {
48 return err
49 }
50 mediaData, err := ParseSegmentMediaData(ctx, buf)
51 if err != nil {
52 return err
53 }
54 // special case for test signers that are only signed with a key
55 var repoDID string
56 var signingKeyDID string
57 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) {
58 signingKeyDID = meta.Creator
59 repoDID = meta.Creator
60 } else {
61 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
62 if err != nil {
63 return err
64 }
65 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
66 if err != nil {
67 return err
68 }
69 if signingKey == nil {
70 return fmt.Errorf("no signing key found for %s", pub.DIDKey())
71 }
72 repoDID = repo.DID
73 signingKeyDID = signingKey.DID
74 }
75
76 err = mm.cli.StreamIsAllowed(repoDID)
77 if err != nil {
78 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err)
79 }
80 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4")
81 if err != nil {
82 return err
83 }
84 defer fd.Close()
85 go mm.replicator.NewSegment(ctx, buf)
86 r := bytes.NewReader(buf)
87 if _, err := io.Copy(fd, r); err != nil {
88 return err
89 }
90 seg := &model.Segment{
91 ID: *maniCert.Manifest.Label,
92 SigningKeyDID: signingKeyDID,
93 RepoDID: repoDID,
94 StartTime: meta.StartTime.Time(),
95 Title: meta.Title,
96 Size: len(buf),
97 MediaData: mediaData,
98 }
99 mm.newSegmentSubsMutex.RLock()
100 defer mm.newSegmentSubsMutex.RUnlock()
101 not := &NewSegmentNotification{
102 Segment: seg,
103 Data: buf,
104 Metadata: meta,
105 }
106 for _, ch := range mm.newSegmentSubs {
107 go func() { ch <- not }()
108 }
109 aqt := aqtime.FromTime(meta.StartTime.Time())
110 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *maniCert.Manifest.Label)
111 return nil
112}