Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "slices"
10 "strings"
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/crypto"
14 "go.opentelemetry.io/otel"
15 "stream.place/streamplace/pkg/aqio"
16 "stream.place/streamplace/pkg/aqtime"
17 c2patypes "stream.place/streamplace/pkg/c2patypes"
18 "stream.place/streamplace/pkg/constants"
19 "stream.place/streamplace/pkg/crypto/signers"
20 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
21 "stream.place/streamplace/pkg/log"
22 "stream.place/streamplace/pkg/model"
23)
24
25type ManifestAndCert struct {
26 Manifest c2patypes.Manifest `json:"manifest"`
27 Cert string `json:"cert"`
28 ValidationResults c2patypes.ValidationResults `json:"validation_results"`
29}
30
31func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error {
32 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4")
33 defer span.End()
34 buf, err := io.ReadAll(input)
35 if err != nil {
36 return fmt.Errorf("failed to read input: %w", err)
37 }
38
39 valid, err := ValidateMP4Media(ctx, buf)
40 if err != nil {
41 return fmt.Errorf("failed to validate MP4 media: %w", err)
42 }
43 meta := valid.Meta
44 pub := valid.Pub
45 mediaData := valid.MediaData
46 manifest := valid.Manifest
47
48 label := manifest.Label
49 if label != nil && mm.model != nil {
50 oldSeg, err := mm.model.GetSegment(*label)
51 if err != nil {
52 return fmt.Errorf("failed to get old segment: %w", err)
53 }
54 if oldSeg != nil {
55 log.Warn(ctx, "segment already exists, skipping", "segmentID", *label)
56 return nil
57 }
58 }
59
60 if meta.MetadataConfiguration != nil {
61 if meta.MetadataConfiguration.DistributionPolicy != nil {
62 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters
63 if allowedBroadcasters != nil {
64 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) {
65 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters)
66 }
67 }
68 }
69 }
70
71 var repoDID string
72 var signingKeyDID string
73 // special case for test signers that are only signed with a key
74 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) {
75 signingKeyDID = meta.Creator
76 repoDID = meta.Creator
77 } else {
78 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
79 if err != nil {
80 return err
81 }
82 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
83 if err != nil {
84 return err
85 }
86 if signingKey == nil {
87 return fmt.Errorf("no signing key found for %s", pub.DIDKey())
88 }
89 repoDID = repo.DID
90 signingKeyDID = signingKey.DID
91 }
92
93 err = mm.cli.StreamIsAllowed(repoDID)
94 if err != nil {
95 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err)
96 }
97
98 // Apply content filtering after metadata is parsed
99 if mm.cli.ContentFilters != nil {
100 if err := mm.applyContentFilters(ctx, meta); err != nil {
101 return err
102 }
103 }
104
105 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4")
106 if err != nil {
107 return err
108 }
109 defer fd.Close()
110
111 r := bytes.NewReader(buf)
112 if _, err := io.Copy(fd, r); err != nil {
113 return err
114 }
115 var deleteAfter *time.Time
116 if meta.DistributionPolicy != nil && meta.DistributionPolicy.ExpiresAt != nil {
117 deleteAfter = meta.DistributionPolicy.ExpiresAt
118 }
119 seg := &model.Segment{
120 ID: *label,
121 SigningKeyDID: signingKeyDID,
122 RepoDID: repoDID,
123 StartTime: meta.StartTime.Time(),
124 Title: meta.Title,
125 Size: len(buf),
126 MediaData: mediaData,
127 ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings),
128 ContentRights: meta.ContentRights,
129 DistributionPolicy: meta.DistributionPolicy,
130 DeleteAfter: deleteAfter,
131 }
132 mm.newSegmentSubsMutex.RLock()
133 defer mm.newSegmentSubsMutex.RUnlock()
134 not := &NewSegmentNotification{
135 Segment: seg,
136 Data: buf,
137 Metadata: meta,
138 Local: local,
139 }
140 for _, ch := range mm.newSegmentSubs {
141 go func() {
142 select {
143 case ch <- not:
144 case <-ctx.Done():
145 return
146 case <-time.After(1 * time.Minute):
147 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *label)
148 return
149 }
150 }()
151 }
152 aqt := aqtime.FromTime(meta.StartTime.Time())
153 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *label)
154 return nil
155}
156
157// applyContentFilters applies content filtering based on configured rules
158func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error {
159 // Check content warnings (if enabled)
160 if mm.cli.ContentFilters.ContentWarnings.Enabled {
161 for _, warning := range meta.ContentWarnings {
162 if mm.isWarningBlocked(warning) {
163 reason := fmt.Sprintf("content warning blocked: %s", warning)
164 log.Log(ctx, "content filtered",
165 "reason", reason,
166 "filter_type", "content_warning",
167 "creator", meta.Creator,
168 "warning", warning)
169 return fmt.Errorf("content filtered: %s", reason)
170 }
171 }
172 }
173
174 // Check distribution policy (if enabled)
175 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil {
176 if meta.DistributionPolicy.ExpiresAt != nil {
177 if time.Now().After(*meta.DistributionPolicy.ExpiresAt) {
178 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", meta.DistributionPolicy.ExpiresAt)
179 log.Log(ctx, "content filtered",
180 "reason", reason,
181 "filter_type", "distribution_policy",
182 "creator", meta.Creator,
183 "start_time", meta.StartTime,
184 "expires_at", *meta.DistributionPolicy.ExpiresAt)
185 return fmt.Errorf("content filtered: %s", reason)
186 }
187 }
188 }
189
190 return nil
191}
192
193// isWarningBlocked checks if a content warning is in the blocked list
194func (mm *MediaManager) isWarningBlocked(warning string) bool {
195 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings {
196 if warning == blocked {
197 return true
198 }
199 }
200 return false
201}
202
203type ValidationResult struct {
204 Pub *crypto.PublicKeyK256
205 Meta *SegmentMetadata
206 MediaData *model.SegmentMediaData
207 Manifest *c2patypes.Manifest
208 Cert string
209}
210
211// validate a signed mp4 file unto itself, ignoring whether this user is allowed and whatnot
212func ValidateMP4Media(ctx context.Context, buf []byte) (*ValidationResult, error) {
213 var maniCert ManifestAndCert
214 maniStr, err := iroh_streamplace.GetManifestAndCert(c2patypes.NewReader(aqio.NewReadWriteSeeker(buf)))
215 if err != nil {
216 return nil, err
217 }
218 err = json.Unmarshal([]byte(maniStr), &maniCert)
219 if err != nil {
220 return nil, fmt.Errorf("failed to unmarshal manifest and cert: %w", err)
221 }
222 activeManifest := maniCert.ValidationResults.ActiveManifest
223 if activeManifest != nil {
224 if activeManifest.Failure == nil {
225 return nil, fmt.Errorf("active manifest failure array not found?!")
226 }
227 if len(activeManifest.Failure) > 0 {
228 bs, _ := json.Marshal(activeManifest.Failure)
229 return nil, fmt.Errorf("active manifest has failures: %s", string(bs))
230 }
231 }
232 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert))
233 if err != nil {
234 return nil, err
235 }
236 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest)
237 if err != nil {
238 return nil, err
239 }
240 mediaData, err := ParseSegmentMediaData(ctx, buf)
241 if err != nil {
242 return nil, err
243 }
244 return &ValidationResult{
245 Pub: pub,
246 Meta: meta,
247 MediaData: mediaData,
248 Manifest: &maniCert.Manifest,
249 Cert: maniCert.Cert,
250 }, nil
251}