Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "slices"
10 "strings"
11 "time"
12
13 "go.opentelemetry.io/otel"
14 "stream.place/streamplace/pkg/aqtime"
15 c2patypes "stream.place/streamplace/pkg/c2patypes"
16 "stream.place/streamplace/pkg/constants"
17 "stream.place/streamplace/pkg/crypto/signers"
18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/model"
21)
22
23type ManifestAndCert struct {
24 Manifest c2patypes.Manifest `json:"manifest"`
25 Cert string `json:"cert"`
26}
27
28func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error {
29 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4")
30 defer span.End()
31 buf, err := io.ReadAll(input)
32 if err != nil {
33 return err
34 }
35 var maniCert ManifestAndCert
36 maniStr, err := iroh_streamplace.GetManifestAndCert(buf)
37 if err != nil {
38 return err
39 }
40 err = json.Unmarshal([]byte(maniStr), &maniCert)
41 if err != nil {
42 return err
43 }
44 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert))
45 if err != nil {
46 return err
47 }
48 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest)
49 if err != nil {
50 return err
51 }
52 if meta.MetadataConfiguration != nil {
53 if meta.MetadataConfiguration.DistributionPolicy != nil {
54 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters
55 if allowedBroadcasters != nil {
56 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) {
57 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters)
58 }
59 }
60 }
61 }
62 mediaData, err := ParseSegmentMediaData(ctx, buf)
63 if err != nil {
64 return err
65 }
66 // special case for test signers that are only signed with a key
67 var repoDID string
68 var signingKeyDID string
69 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) {
70 signingKeyDID = meta.Creator
71 repoDID = meta.Creator
72 } else {
73 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
74 if err != nil {
75 return err
76 }
77 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
78 if err != nil {
79 return err
80 }
81 if signingKey == nil {
82 return fmt.Errorf("no signing key found for %s", pub.DIDKey())
83 }
84 repoDID = repo.DID
85 signingKeyDID = signingKey.DID
86 }
87
88 err = mm.cli.StreamIsAllowed(repoDID)
89 if err != nil {
90 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err)
91 }
92
93 // Apply content filtering after metadata is parsed
94 if mm.cli.ContentFilters != nil {
95 if err := mm.applyContentFilters(ctx, meta); err != nil {
96 return err
97 }
98 }
99
100 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4")
101 if err != nil {
102 return err
103 }
104 defer fd.Close()
105
106 r := bytes.NewReader(buf)
107 if _, err := io.Copy(fd, r); err != nil {
108 return err
109 }
110 var deleteAfter *time.Time
111 if meta.DistributionPolicy != nil && meta.DistributionPolicy.ExpiresAt != nil {
112 deleteAfter = meta.DistributionPolicy.ExpiresAt
113 }
114 seg := &model.Segment{
115 ID: *maniCert.Manifest.Label,
116 SigningKeyDID: signingKeyDID,
117 RepoDID: repoDID,
118 StartTime: meta.StartTime.Time(),
119 Title: meta.Title,
120 Size: len(buf),
121 MediaData: mediaData,
122 ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings),
123 ContentRights: meta.ContentRights,
124 DistributionPolicy: meta.DistributionPolicy,
125 DeleteAfter: deleteAfter,
126 }
127 mm.newSegmentSubsMutex.RLock()
128 defer mm.newSegmentSubsMutex.RUnlock()
129 not := &NewSegmentNotification{
130 Segment: seg,
131 Data: buf,
132 Metadata: meta,
133 Local: local,
134 }
135 for _, ch := range mm.newSegmentSubs {
136 go func() {
137 select {
138 case ch <- not:
139 case <-ctx.Done():
140 return
141 case <-time.After(1 * time.Minute):
142 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *maniCert.Manifest.Label)
143 return
144 }
145 }()
146 }
147 aqt := aqtime.FromTime(meta.StartTime.Time())
148 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *maniCert.Manifest.Label)
149 return nil
150}
151
152// applyContentFilters applies content filtering based on configured rules
153func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error {
154 // Check content warnings (if enabled)
155 if mm.cli.ContentFilters.ContentWarnings.Enabled {
156 for _, warning := range meta.ContentWarnings {
157 if mm.isWarningBlocked(warning) {
158 reason := fmt.Sprintf("content warning blocked: %s", warning)
159 log.Log(ctx, "content filtered",
160 "reason", reason,
161 "filter_type", "content_warning",
162 "creator", meta.Creator,
163 "warning", warning)
164 return fmt.Errorf("content filtered: %s", reason)
165 }
166 }
167 }
168
169 // Check distribution policy (if enabled)
170 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil {
171 if meta.DistributionPolicy.ExpiresAt != nil {
172 if time.Now().After(*meta.DistributionPolicy.ExpiresAt) {
173 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", meta.DistributionPolicy.ExpiresAt)
174 log.Log(ctx, "content filtered",
175 "reason", reason,
176 "filter_type", "distribution_policy",
177 "creator", meta.Creator,
178 "start_time", meta.StartTime,
179 "expires_at", *meta.DistributionPolicy.ExpiresAt)
180 return fmt.Errorf("content filtered: %s", reason)
181 }
182 }
183 }
184
185 return nil
186}
187
188// isWarningBlocked checks if a content warning is in the blocked list
189func (mm *MediaManager) isWarningBlocked(warning string) bool {
190 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings {
191 if warning == blocked {
192 return true
193 }
194 }
195 return false
196}