Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "slices"
10 "strings"
11 "time"
12
13 "go.opentelemetry.io/otel"
14 "stream.place/streamplace/pkg/aqtime"
15 c2patypes "stream.place/streamplace/pkg/c2patypes"
16 "stream.place/streamplace/pkg/constants"
17 "stream.place/streamplace/pkg/crypto/signers"
18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/model"
21)
22
23type ManifestAndCert struct {
24 Manifest c2patypes.Manifest `json:"manifest"`
25 Cert string `json:"cert"`
26}
27
28func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error {
29 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4")
30 defer span.End()
31 buf, err := io.ReadAll(input)
32 if err != nil {
33 return err
34 }
35 var maniCert ManifestAndCert
36 maniStr, err := iroh_streamplace.GetManifestAndCert(buf)
37 if err != nil {
38 return err
39 }
40 err = json.Unmarshal([]byte(maniStr), &maniCert)
41 if err != nil {
42 return err
43 }
44 label := maniCert.Manifest.Label
45 if label != nil && mm.model != nil {
46 oldSeg, err := mm.model.GetSegment(*label)
47 if err != nil {
48 return err
49 }
50 if oldSeg != nil {
51 log.Warn(ctx, "segment already exists, skipping", "segmentID", *label)
52 return nil
53 }
54 }
55 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert))
56 if err != nil {
57 return err
58 }
59 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest)
60 if err != nil {
61 return err
62 }
63 if meta.MetadataConfiguration != nil {
64 if meta.MetadataConfiguration.DistributionPolicy != nil {
65 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters
66 if allowedBroadcasters != nil {
67 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) {
68 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters)
69 }
70 }
71 }
72 }
73 mediaData, err := ParseSegmentMediaData(ctx, buf)
74 if err != nil {
75 return err
76 }
77 // special case for test signers that are only signed with a key
78 var repoDID string
79 var signingKeyDID string
80 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) {
81 signingKeyDID = meta.Creator
82 repoDID = meta.Creator
83 } else {
84 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
85 if err != nil {
86 return err
87 }
88 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
89 if err != nil {
90 return err
91 }
92 if signingKey == nil {
93 return fmt.Errorf("no signing key found for %s", pub.DIDKey())
94 }
95 repoDID = repo.DID
96 signingKeyDID = signingKey.DID
97 }
98
99 err = mm.cli.StreamIsAllowed(repoDID)
100 if err != nil {
101 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err)
102 }
103
104 // Apply content filtering after metadata is parsed
105 if mm.cli.ContentFilters != nil {
106 if err := mm.applyContentFilters(ctx, meta); err != nil {
107 return err
108 }
109 }
110
111 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4")
112 if err != nil {
113 return err
114 }
115 defer fd.Close()
116
117 r := bytes.NewReader(buf)
118 if _, err := io.Copy(fd, r); err != nil {
119 return err
120 }
121 var deleteAfter *time.Time
122 if meta.DistributionPolicy != nil && meta.DistributionPolicy.ExpiresAt != nil {
123 deleteAfter = meta.DistributionPolicy.ExpiresAt
124 }
125 seg := &model.Segment{
126 ID: *maniCert.Manifest.Label,
127 SigningKeyDID: signingKeyDID,
128 RepoDID: repoDID,
129 StartTime: meta.StartTime.Time(),
130 Title: meta.Title,
131 Size: len(buf),
132 MediaData: mediaData,
133 ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings),
134 ContentRights: meta.ContentRights,
135 DistributionPolicy: meta.DistributionPolicy,
136 DeleteAfter: deleteAfter,
137 }
138 mm.newSegmentSubsMutex.RLock()
139 defer mm.newSegmentSubsMutex.RUnlock()
140 not := &NewSegmentNotification{
141 Segment: seg,
142 Data: buf,
143 Metadata: meta,
144 Local: local,
145 }
146 for _, ch := range mm.newSegmentSubs {
147 go func() {
148 select {
149 case ch <- not:
150 case <-ctx.Done():
151 return
152 case <-time.After(1 * time.Minute):
153 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *maniCert.Manifest.Label)
154 return
155 }
156 }()
157 }
158 aqt := aqtime.FromTime(meta.StartTime.Time())
159 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *maniCert.Manifest.Label)
160 return nil
161}
162
163// applyContentFilters applies content filtering based on configured rules
164func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error {
165 // Check content warnings (if enabled)
166 if mm.cli.ContentFilters.ContentWarnings.Enabled {
167 for _, warning := range meta.ContentWarnings {
168 if mm.isWarningBlocked(warning) {
169 reason := fmt.Sprintf("content warning blocked: %s", warning)
170 log.Log(ctx, "content filtered",
171 "reason", reason,
172 "filter_type", "content_warning",
173 "creator", meta.Creator,
174 "warning", warning)
175 return fmt.Errorf("content filtered: %s", reason)
176 }
177 }
178 }
179
180 // Check distribution policy (if enabled)
181 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil {
182 if meta.DistributionPolicy.ExpiresAt != nil {
183 if time.Now().After(*meta.DistributionPolicy.ExpiresAt) {
184 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", meta.DistributionPolicy.ExpiresAt)
185 log.Log(ctx, "content filtered",
186 "reason", reason,
187 "filter_type", "distribution_policy",
188 "creator", meta.Creator,
189 "start_time", meta.StartTime,
190 "expires_at", *meta.DistributionPolicy.ExpiresAt)
191 return fmt.Errorf("content filtered: %s", reason)
192 }
193 }
194 }
195
196 return nil
197}
198
199// isWarningBlocked checks if a content warning is in the blocked list
200func (mm *MediaManager) isWarningBlocked(warning string) bool {
201 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings {
202 if warning == blocked {
203 return true
204 }
205 }
206 return false
207}