Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "slices"
10 "strings"
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/atcrypto"
14 "go.opentelemetry.io/otel"
15 "stream.place/streamplace/pkg/aqio"
16 "stream.place/streamplace/pkg/aqtime"
17 c2patypes "stream.place/streamplace/pkg/c2patypes"
18 "stream.place/streamplace/pkg/constants"
19 "stream.place/streamplace/pkg/crypto/signers"
20 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
21 "stream.place/streamplace/pkg/localdb"
22 "stream.place/streamplace/pkg/log"
23)
24
25type ManifestAndCert struct {
26 Manifest c2patypes.Manifest `json:"manifest"`
27 Cert string `json:"cert"`
28 ValidationResults c2patypes.ValidationResults `json:"validation_results"`
29}
30
31func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error {
32 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4")
33 defer span.End()
34 buf, err := io.ReadAll(input)
35 if err != nil {
36 return fmt.Errorf("failed to read input: %w", err)
37 }
38
39 valid, err := ValidateMP4Media(ctx, buf)
40 if err != nil {
41 return fmt.Errorf("failed to validate MP4 media: %w", err)
42 }
43 meta := valid.Meta
44 pub := valid.Pub
45 mediaData := valid.MediaData
46 manifest := valid.Manifest
47
48 label := manifest.Label
49 if label != nil && mm.model != nil {
50 oldSeg, err := mm.localDB.GetSegment(*label)
51 if err != nil {
52 return fmt.Errorf("failed to get old segment: %w", err)
53 }
54 if oldSeg != nil {
55 log.Warn(ctx, "segment already exists, skipping", "segmentID", *label)
56 return nil
57 }
58 }
59
60 if meta.MetadataConfiguration != nil {
61 if meta.MetadataConfiguration.DistributionPolicy != nil {
62 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters
63 if allowedBroadcasters != nil {
64 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) {
65 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters)
66 }
67 }
68 }
69 }
70
71 var repoDID string
72 var signingKeyDID string
73 // special case for test signers that are only signed with a key
74 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) {
75 signingKeyDID = meta.Creator
76 repoDID = meta.Creator
77 } else {
78 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
79 if err != nil {
80 return err
81 }
82 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
83 if err != nil {
84 return err
85 }
86 if signingKey == nil {
87 return fmt.Errorf("no signing key found for %s", pub.DIDKey())
88 }
89 repoDID = repo.DID
90 signingKeyDID = signingKey.DID
91 }
92
93 err = mm.cli.StreamIsAllowed(repoDID)
94 if err != nil {
95 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err)
96 }
97
98 // Apply content filtering after metadata is parsed
99 if mm.cli.ContentFilters != nil {
100 if err := mm.applyContentFilters(ctx, meta); err != nil {
101 return err
102 }
103 }
104
105 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4")
106 if err != nil {
107 return err
108 }
109 defer fd.Close()
110
111 r := bytes.NewReader(buf)
112 if _, err := io.Copy(fd, r); err != nil {
113 return err
114 }
115 var deleteAfter *time.Time
116 if meta.DistributionPolicy != nil && meta.DistributionPolicy.DeleteAfterSeconds != nil {
117 expiryTime := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second)
118 deleteAfter = &expiryTime
119 }
120 seg := &localdb.Segment{
121 ID: *label,
122 SigningKeyDID: signingKeyDID,
123 RepoDID: repoDID,
124 StartTime: meta.StartTime.Time(),
125 Title: meta.Title,
126 Size: len(buf),
127 MediaData: mediaData,
128 ContentWarnings: localdb.ContentWarningsSlice(meta.ContentWarnings),
129 ContentRights: meta.ContentRights,
130 DistributionPolicy: meta.DistributionPolicy,
131 DeleteAfter: deleteAfter,
132 }
133 mm.newSegmentSubsMutex.RLock()
134 defer mm.newSegmentSubsMutex.RUnlock()
135 not := &NewSegmentNotification{
136 Segment: seg,
137 Data: buf,
138 Metadata: meta,
139 Local: local,
140 }
141 for _, ch := range mm.newSegmentSubs {
142 go func() {
143 select {
144 case ch <- not:
145 case <-ctx.Done():
146 return
147 case <-time.After(1 * time.Minute):
148 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *label)
149 return
150 }
151 }()
152 }
153 aqt := aqtime.FromTime(meta.StartTime.Time())
154 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *label)
155 return nil
156}
157
158// applyContentFilters applies content filtering based on configured rules
159func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error {
160 // Check content warnings (if enabled)
161 if mm.cli.ContentFilters.ContentWarnings.Enabled {
162 for _, warning := range meta.ContentWarnings {
163 if mm.isWarningBlocked(warning) {
164 reason := fmt.Sprintf("content warning blocked: %s", warning)
165 log.Log(ctx, "content filtered",
166 "reason", reason,
167 "filter_type", "content_warning",
168 "creator", meta.Creator,
169 "warning", warning)
170 return fmt.Errorf("content filtered: %s", reason)
171 }
172 }
173 }
174
175 // Check distribution policy (if enabled)
176 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil {
177 if meta.DistributionPolicy.DeleteAfterSeconds != nil {
178 expiresAt := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second)
179 if time.Now().After(expiresAt) {
180 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", expiresAt)
181 log.Log(ctx, "content filtered",
182 "reason", reason,
183 "filter_type", "distribution_policy",
184 "creator", meta.Creator,
185 "start_time", meta.StartTime,
186 "expires_at", expiresAt)
187 return fmt.Errorf("content filtered: %s", reason)
188 }
189 }
190 }
191
192 return nil
193}
194
195// isWarningBlocked checks if a content warning is in the blocked list
196func (mm *MediaManager) isWarningBlocked(warning string) bool {
197 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings {
198 if warning == blocked {
199 return true
200 }
201 }
202 return false
203}
204
205type ValidationResult struct {
206 Pub *atcrypto.PublicKeyK256
207 Meta *SegmentMetadata
208 MediaData *localdb.SegmentMediaData
209 Manifest *c2patypes.Manifest
210 Cert string
211}
212
213// validate a signed mp4 file unto itself, ignoring whether this user is allowed and whatnot
214func ValidateMP4Media(ctx context.Context, buf []byte) (*ValidationResult, error) {
215 var maniCert ManifestAndCert
216 maniStr, err := iroh_streamplace.GetManifestAndCert(c2patypes.NewReader(aqio.NewReadWriteSeeker(buf)))
217 if err != nil {
218 return nil, err
219 }
220 err = json.Unmarshal([]byte(maniStr), &maniCert)
221 if err != nil {
222 return nil, fmt.Errorf("failed to unmarshal manifest and cert: %w", err)
223 }
224 activeManifest := maniCert.ValidationResults.ActiveManifest
225 if activeManifest != nil {
226 if activeManifest.Failure == nil {
227 return nil, fmt.Errorf("active manifest failure array not found?!")
228 }
229 if len(activeManifest.Failure) > 0 {
230 bs, _ := json.Marshal(activeManifest.Failure)
231 return nil, fmt.Errorf("active manifest has failures: %s", string(bs))
232 }
233 }
234 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert))
235 if err != nil {
236 return nil, err
237 }
238 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest)
239 if err != nil {
240 return nil, err
241 }
242 mediaData, err := ParseSegmentMediaData(ctx, buf)
243 if err != nil {
244 return nil, err
245 }
246 return &ValidationResult{
247 Pub: pub,
248 Meta: meta,
249 MediaData: mediaData,
250 Manifest: &maniCert.Manifest,
251 Cert: maniCert.Cert,
252 }, nil
253}