Live video on the AT Protocol
at eli/rtmprec 251 lines 7.7 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "slices" 10 "strings" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/crypto" 14 "go.opentelemetry.io/otel" 15 "stream.place/streamplace/pkg/aqio" 16 "stream.place/streamplace/pkg/aqtime" 17 c2patypes "stream.place/streamplace/pkg/c2patypes" 18 "stream.place/streamplace/pkg/constants" 19 "stream.place/streamplace/pkg/crypto/signers" 20 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 21 "stream.place/streamplace/pkg/log" 22 "stream.place/streamplace/pkg/model" 23) 24 25type ManifestAndCert struct { 26 Manifest c2patypes.Manifest `json:"manifest"` 27 Cert string `json:"cert"` 28 ValidationResults c2patypes.ValidationResults `json:"validation_results"` 29} 30 31func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error { 32 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 33 defer span.End() 34 buf, err := io.ReadAll(input) 35 if err != nil { 36 return fmt.Errorf("failed to read input: %w", err) 37 } 38 39 valid, err := ValidateMP4Media(ctx, buf) 40 if err != nil { 41 return fmt.Errorf("failed to validate MP4 media: %w", err) 42 } 43 meta := valid.Meta 44 pub := valid.Pub 45 mediaData := valid.MediaData 46 manifest := valid.Manifest 47 48 label := manifest.Label 49 if label != nil && mm.model != nil { 50 oldSeg, err := mm.model.GetSegment(*label) 51 if err != nil { 52 return fmt.Errorf("failed to get old segment: %w", err) 53 } 54 if oldSeg != nil { 55 log.Warn(ctx, "segment already exists, skipping", "segmentID", *label) 56 return nil 57 } 58 } 59 60 if meta.MetadataConfiguration != nil { 61 if meta.MetadataConfiguration.DistributionPolicy != nil { 62 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters 63 if allowedBroadcasters != nil { 64 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) { 65 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters) 66 } 67 } 68 } 69 } 70 71 var repoDID string 72 var signingKeyDID string 73 // special case for test signers that are only signed with a key 74 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 75 signingKeyDID = meta.Creator 76 repoDID = meta.Creator 77 } else { 78 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 79 if err != nil { 80 return err 81 } 82 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 83 if err != nil { 84 return err 85 } 86 if signingKey == nil { 87 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 88 } 89 repoDID = repo.DID 90 signingKeyDID = signingKey.DID 91 } 92 93 err = mm.cli.StreamIsAllowed(repoDID) 94 if err != nil { 95 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 96 } 97 98 // Apply content filtering after metadata is parsed 99 if mm.cli.ContentFilters != nil { 100 if err := mm.applyContentFilters(ctx, meta); err != nil { 101 return err 102 } 103 } 104 105 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 106 if err != nil { 107 return err 108 } 109 defer fd.Close() 110 111 r := bytes.NewReader(buf) 112 if _, err := io.Copy(fd, r); err != nil { 113 return err 114 } 115 var deleteAfter *time.Time 116 if meta.DistributionPolicy != nil && meta.DistributionPolicy.ExpiresAt != nil { 117 deleteAfter = meta.DistributionPolicy.ExpiresAt 118 } 119 seg := &model.Segment{ 120 ID: *label, 121 SigningKeyDID: signingKeyDID, 122 RepoDID: repoDID, 123 StartTime: meta.StartTime.Time(), 124 Title: meta.Title, 125 Size: len(buf), 126 MediaData: mediaData, 127 ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings), 128 ContentRights: meta.ContentRights, 129 DistributionPolicy: meta.DistributionPolicy, 130 DeleteAfter: deleteAfter, 131 } 132 mm.newSegmentSubsMutex.RLock() 133 defer mm.newSegmentSubsMutex.RUnlock() 134 not := &NewSegmentNotification{ 135 Segment: seg, 136 Data: buf, 137 Metadata: meta, 138 Local: local, 139 } 140 for _, ch := range mm.newSegmentSubs { 141 go func() { 142 select { 143 case ch <- not: 144 case <-ctx.Done(): 145 return 146 case <-time.After(1 * time.Minute): 147 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *label) 148 return 149 } 150 }() 151 } 152 aqt := aqtime.FromTime(meta.StartTime.Time()) 153 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *label) 154 return nil 155} 156 157// applyContentFilters applies content filtering based on configured rules 158func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error { 159 // Check content warnings (if enabled) 160 if mm.cli.ContentFilters.ContentWarnings.Enabled { 161 for _, warning := range meta.ContentWarnings { 162 if mm.isWarningBlocked(warning) { 163 reason := fmt.Sprintf("content warning blocked: %s", warning) 164 log.Log(ctx, "content filtered", 165 "reason", reason, 166 "filter_type", "content_warning", 167 "creator", meta.Creator, 168 "warning", warning) 169 return fmt.Errorf("content filtered: %s", reason) 170 } 171 } 172 } 173 174 // Check distribution policy (if enabled) 175 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil { 176 if meta.DistributionPolicy.ExpiresAt != nil { 177 if time.Now().After(*meta.DistributionPolicy.ExpiresAt) { 178 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", meta.DistributionPolicy.ExpiresAt) 179 log.Log(ctx, "content filtered", 180 "reason", reason, 181 "filter_type", "distribution_policy", 182 "creator", meta.Creator, 183 "start_time", meta.StartTime, 184 "expires_at", *meta.DistributionPolicy.ExpiresAt) 185 return fmt.Errorf("content filtered: %s", reason) 186 } 187 } 188 } 189 190 return nil 191} 192 193// isWarningBlocked checks if a content warning is in the blocked list 194func (mm *MediaManager) isWarningBlocked(warning string) bool { 195 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings { 196 if warning == blocked { 197 return true 198 } 199 } 200 return false 201} 202 203type ValidationResult struct { 204 Pub *crypto.PublicKeyK256 205 Meta *SegmentMetadata 206 MediaData *model.SegmentMediaData 207 Manifest *c2patypes.Manifest 208 Cert string 209} 210 211// validate a signed mp4 file unto itself, ignoring whether this user is allowed and whatnot 212func ValidateMP4Media(ctx context.Context, buf []byte) (*ValidationResult, error) { 213 var maniCert ManifestAndCert 214 maniStr, err := iroh_streamplace.GetManifestAndCert(c2patypes.NewReader(aqio.NewReadWriteSeeker(buf))) 215 if err != nil { 216 return nil, err 217 } 218 err = json.Unmarshal([]byte(maniStr), &maniCert) 219 if err != nil { 220 return nil, fmt.Errorf("failed to unmarshal manifest and cert: %w", err) 221 } 222 activeManifest := maniCert.ValidationResults.ActiveManifest 223 if activeManifest != nil { 224 if activeManifest.Failure == nil { 225 return nil, fmt.Errorf("active manifest failure array not found?!") 226 } 227 if len(activeManifest.Failure) > 0 { 228 bs, _ := json.Marshal(activeManifest.Failure) 229 return nil, fmt.Errorf("active manifest has failures: %s", string(bs)) 230 } 231 } 232 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 233 if err != nil { 234 return nil, err 235 } 236 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 237 if err != nil { 238 return nil, err 239 } 240 mediaData, err := ParseSegmentMediaData(ctx, buf) 241 if err != nil { 242 return nil, err 243 } 244 return &ValidationResult{ 245 Pub: pub, 246 Meta: meta, 247 MediaData: mediaData, 248 Manifest: &maniCert.Manifest, 249 Cert: maniCert.Cert, 250 }, nil 251}